summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--osdep/BlockingQueue.hpp26
-rw-r--r--osdep/MacEthernetTap.cpp4
-rw-r--r--osdep/MacEthernetTap.hpp2
-rw-r--r--osdep/MacEthernetTapAgent.c11
-rw-r--r--service/OneService.cpp6
5 files changed, 23 insertions, 26 deletions
diff --git a/osdep/BlockingQueue.hpp b/osdep/BlockingQueue.hpp
index 03986efe..6f81ddf2 100644
--- a/osdep/BlockingQueue.hpp
+++ b/osdep/BlockingQueue.hpp
@@ -54,20 +54,16 @@ public:
c.notify_one();
}
- inline void postWait(T t,unsigned long maxQueueSize)
+ inline void postLimit(T t,const unsigned long limit)
{
+ std::unique_lock<std::mutex> lock(m);
for(;;) {
- {
- std::lock_guard<std::mutex> lock(m);
- if (q.size() < maxQueueSize) {
- q.push(t);
- c.notify_one();
- return;
- }
- }
- if (!r)
+ if (q.size() < limit) {
+ q.push(t);
+ c.notify_one();
break;
- Thread::sleep(1);
+ }
+ gc.wait(lock);
}
}
@@ -84,10 +80,14 @@ public:
if (!r) return false;
while (q.empty()) {
c.wait(lock);
- if (!r) return false;
+ if (!r) {
+ gc.notify_all();
+ return false;
+ }
}
value = q.front();
q.pop();
+ gc.notify_all();
return true;
}
@@ -118,7 +118,7 @@ private:
volatile bool r;
std::queue<T> q;
mutable std::mutex m;
- mutable std::condition_variable c;
+ mutable std::condition_variable c,gc;
};
} // namespace ZeroTier
diff --git a/osdep/MacEthernetTap.cpp b/osdep/MacEthernetTap.cpp
index 1cfb9375..fb3e3a75 100644
--- a/osdep/MacEthernetTap.cpp
+++ b/osdep/MacEthernetTap.cpp
@@ -287,7 +287,9 @@ void MacEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType,co
iov[1].iov_len = 15;
iov[2].iov_base = const_cast<void *>(data);
iov[2].iov_len = len;
+ _putLock.lock();
writev(_agentStdin,iov,3);
+ _putLock.unlock();
}
}
@@ -396,8 +398,6 @@ void MacEthernetTap::threadMain()
break;
}
}
- } else {
- break;
}
}
if (FD_ISSET(_agentStderr,&readfds)) {
diff --git a/osdep/MacEthernetTap.hpp b/osdep/MacEthernetTap.hpp
index 4b3ac019..eaf131a7 100644
--- a/osdep/MacEthernetTap.hpp
+++ b/osdep/MacEthernetTap.hpp
@@ -38,6 +38,7 @@
#include "../node/MAC.hpp"
#include "../node/InetAddress.hpp"
#include "../node/MulticastGroup.hpp"
+#include "../node/Mutex.hpp"
#include "Thread.hpp"
@@ -80,6 +81,7 @@ private:
std::string _homePath;
std::string _dev;
std::vector<MulticastGroup> _multicastGroups;
+ Mutex _putLock;
unsigned int _mtu;
unsigned int _metric;
int _shutdownSignalPipe[2];
diff --git a/osdep/MacEthernetTapAgent.c b/osdep/MacEthernetTapAgent.c
index 91eff25c..a595e154 100644
--- a/osdep/MacEthernetTapAgent.c
+++ b/osdep/MacEthernetTapAgent.c
@@ -104,8 +104,8 @@
#define P_IFCONFIG "/sbin/ifconfig"
-static unsigned char s_pktReadBuf[524288] __attribute__ ((__aligned__(16)));
-static unsigned char s_stdinReadBuf[524288] __attribute__ ((__aligned__(16)));
+static unsigned char s_pktReadBuf[262144] __attribute__ ((__aligned__(16)));
+static unsigned char s_stdinReadBuf[262144] __attribute__ ((__aligned__(16)));
static char s_deviceName[IFNAMSIZ];
static char s_peerDeviceName[IFNAMSIZ];
static int s_bpffd = -1;
@@ -322,9 +322,6 @@ int main(int argc,char **argv)
return ZT_MACETHERNETTAPAGENT_EXIT_CODE_UNABLE_TO_CREATE;
}
- fcntl(s_ndrvfd,F_SETFL,fcntl(s_ndrvfd,F_GETFL)|O_NONBLOCK);
- fcntl(s_bpffd,F_SETFL,fcntl(s_bpffd,F_GETFL)|O_NONBLOCK);
-
fprintf(stderr,"I %s %s %d.%d.%d.%d\n",s_deviceName,s_peerDeviceName,ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION,ZEROTIER_ONE_VERSION_BUILD);
FD_ZERO(&rfds);
@@ -357,8 +354,6 @@ int main(int argc,char **argv)
}
p += BPF_WORDALIGN(h->bh_hdrlen + h->bh_caplen);
}
- } else {
- return ZT_MACETHERNETTAPAGENT_EXIT_CODE_READ_ERROR;
}
}
@@ -431,8 +426,6 @@ int main(int argc,char **argv)
break;
}
}
- } else {
- return ZT_MACETHERNETTAPAGENT_EXIT_CODE_READ_ERROR;
}
}
}
diff --git a/service/OneService.cpp b/service/OneService.cpp
index bf24466d..1351cbfb 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -472,6 +472,7 @@ public:
unsigned int _tertiaryPort;
volatile unsigned int _udpPortPickerCounter;
+ unsigned long _incomingPacketConcurrency;
std::vector<OneServiceIncomingPacket *> _incomingPacketMemoryPool;
BlockingQueue<OneServiceIncomingPacket *> _incomingPacketQueue;
std::vector<std::thread> _incomingPacketThreads;
@@ -606,7 +607,8 @@ public:
_ports[1] = 0;
_ports[2] = 0;
- for(long t=0;t<std::max((long)1,(long)std::thread::hardware_concurrency());++t) {
+ _incomingPacketConcurrency = std::max((unsigned long)1,std::min((unsigned long)16,(unsigned long)std::thread::hardware_concurrency()));
+ for(long t=0;t<_incomingPacketConcurrency;++t) {
_incomingPacketThreads.push_back(std::thread([this]() {
OneServiceIncomingPacket *pkt = nullptr;
for(;;) {
@@ -1918,7 +1920,7 @@ public:
pkt->size = (unsigned int)len;
ZT_FAST_MEMCPY(pkt->data,data,len);
- _incomingPacketQueue.postWait(pkt,64);
+ _incomingPacketQueue.postLimit(pkt,16 * _incomingPacketConcurrency);
}
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)