diff options
-rw-r--r-- | osdep/BlockingQueue.hpp | 26 | ||||
-rw-r--r-- | osdep/MacEthernetTap.cpp | 4 | ||||
-rw-r--r-- | osdep/MacEthernetTap.hpp | 2 | ||||
-rw-r--r-- | osdep/MacEthernetTapAgent.c | 11 | ||||
-rw-r--r-- | service/OneService.cpp | 6 |
5 files changed, 23 insertions, 26 deletions
diff --git a/osdep/BlockingQueue.hpp b/osdep/BlockingQueue.hpp index 03986efe..6f81ddf2 100644 --- a/osdep/BlockingQueue.hpp +++ b/osdep/BlockingQueue.hpp @@ -54,20 +54,16 @@ public: c.notify_one(); } - inline void postWait(T t,unsigned long maxQueueSize) + inline void postLimit(T t,const unsigned long limit) { + std::unique_lock<std::mutex> lock(m); for(;;) { - { - std::lock_guard<std::mutex> lock(m); - if (q.size() < maxQueueSize) { - q.push(t); - c.notify_one(); - return; - } - } - if (!r) + if (q.size() < limit) { + q.push(t); + c.notify_one(); break; - Thread::sleep(1); + } + gc.wait(lock); } } @@ -84,10 +80,14 @@ public: if (!r) return false; while (q.empty()) { c.wait(lock); - if (!r) return false; + if (!r) { + gc.notify_all(); + return false; + } } value = q.front(); q.pop(); + gc.notify_all(); return true; } @@ -118,7 +118,7 @@ private: volatile bool r; std::queue<T> q; mutable std::mutex m; - mutable std::condition_variable c; + mutable std::condition_variable c,gc; }; } // namespace ZeroTier diff --git a/osdep/MacEthernetTap.cpp b/osdep/MacEthernetTap.cpp index 1cfb9375..fb3e3a75 100644 --- a/osdep/MacEthernetTap.cpp +++ b/osdep/MacEthernetTap.cpp @@ -287,7 +287,9 @@ void MacEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType,co iov[1].iov_len = 15; iov[2].iov_base = const_cast<void *>(data); iov[2].iov_len = len; + _putLock.lock(); writev(_agentStdin,iov,3); + _putLock.unlock(); } } @@ -396,8 +398,6 @@ void MacEthernetTap::threadMain() break; } } - } else { - break; } } if (FD_ISSET(_agentStderr,&readfds)) { diff --git a/osdep/MacEthernetTap.hpp b/osdep/MacEthernetTap.hpp index 4b3ac019..eaf131a7 100644 --- a/osdep/MacEthernetTap.hpp +++ b/osdep/MacEthernetTap.hpp @@ -38,6 +38,7 @@ #include "../node/MAC.hpp" #include "../node/InetAddress.hpp" #include "../node/MulticastGroup.hpp" +#include "../node/Mutex.hpp" #include "Thread.hpp" @@ -80,6 +81,7 @@ private: std::string _homePath; std::string _dev; std::vector<MulticastGroup> _multicastGroups; + Mutex _putLock; unsigned int _mtu; unsigned int _metric; int _shutdownSignalPipe[2]; diff --git a/osdep/MacEthernetTapAgent.c b/osdep/MacEthernetTapAgent.c index 91eff25c..a595e154 100644 --- a/osdep/MacEthernetTapAgent.c +++ b/osdep/MacEthernetTapAgent.c @@ -104,8 +104,8 @@ #define P_IFCONFIG "/sbin/ifconfig" -static unsigned char s_pktReadBuf[524288] __attribute__ ((__aligned__(16))); -static unsigned char s_stdinReadBuf[524288] __attribute__ ((__aligned__(16))); +static unsigned char s_pktReadBuf[262144] __attribute__ ((__aligned__(16))); +static unsigned char s_stdinReadBuf[262144] __attribute__ ((__aligned__(16))); static char s_deviceName[IFNAMSIZ]; static char s_peerDeviceName[IFNAMSIZ]; static int s_bpffd = -1; @@ -322,9 +322,6 @@ int main(int argc,char **argv) return ZT_MACETHERNETTAPAGENT_EXIT_CODE_UNABLE_TO_CREATE; } - fcntl(s_ndrvfd,F_SETFL,fcntl(s_ndrvfd,F_GETFL)|O_NONBLOCK); - fcntl(s_bpffd,F_SETFL,fcntl(s_bpffd,F_GETFL)|O_NONBLOCK); - fprintf(stderr,"I %s %s %d.%d.%d.%d\n",s_deviceName,s_peerDeviceName,ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION,ZEROTIER_ONE_VERSION_BUILD); FD_ZERO(&rfds); @@ -357,8 +354,6 @@ int main(int argc,char **argv) } p += BPF_WORDALIGN(h->bh_hdrlen + h->bh_caplen); } - } else { - return ZT_MACETHERNETTAPAGENT_EXIT_CODE_READ_ERROR; } } @@ -431,8 +426,6 @@ int main(int argc,char **argv) break; } } - } else { - return ZT_MACETHERNETTAPAGENT_EXIT_CODE_READ_ERROR; } } } diff --git a/service/OneService.cpp b/service/OneService.cpp index bf24466d..1351cbfb 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -472,6 +472,7 @@ public: unsigned int _tertiaryPort; volatile unsigned int _udpPortPickerCounter; + unsigned long _incomingPacketConcurrency; std::vector<OneServiceIncomingPacket *> _incomingPacketMemoryPool; BlockingQueue<OneServiceIncomingPacket *> _incomingPacketQueue; std::vector<std::thread> _incomingPacketThreads; @@ -606,7 +607,8 @@ public: _ports[1] = 0; _ports[2] = 0; - for(long t=0;t<std::max((long)1,(long)std::thread::hardware_concurrency());++t) { + _incomingPacketConcurrency = std::max((unsigned long)1,std::min((unsigned long)16,(unsigned long)std::thread::hardware_concurrency())); + for(long t=0;t<_incomingPacketConcurrency;++t) { _incomingPacketThreads.push_back(std::thread([this]() { OneServiceIncomingPacket *pkt = nullptr; for(;;) { @@ -1918,7 +1920,7 @@ public: pkt->size = (unsigned int)len; ZT_FAST_MEMCPY(pkt->data,data,len); - _incomingPacketQueue.postWait(pkt,64); + _incomingPacketQueue.postLimit(pkt,16 * _incomingPacketConcurrency); } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) |