diff options
Diffstat (limited to 'service/OneService.cpp')
| -rw-r--r-- | service/OneService.cpp | 152 |
1 files changed, 70 insertions, 82 deletions
diff --git a/service/OneService.cpp b/service/OneService.cpp index effe90c2..1351cbfb 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -60,6 +60,7 @@ #include "../osdep/PortMapper.hpp" #include "../osdep/Binder.hpp" #include "../osdep/ManagedRoute.hpp" +#include "../osdep/BlockingQueue.hpp" #include "OneService.hpp" #include "SoftwareUpdater.hpp" @@ -174,9 +175,6 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; } // TCP activity timeout #define ZT_TCP_ACTIVITY_TIMEOUT 60000 -// Number of receive path threads to start -#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 8 - #if ZT_VAULT_SUPPORT size_t curlResponseWrite(void *ptr, size_t size, size_t nmemb, std::string *data) { @@ -440,6 +438,15 @@ struct TcpConnection Mutex writeq_m; }; +struct OneServiceIncomingPacket +{ + uint64_t now; + int64_t sock; + struct sockaddr_storage from; + unsigned int size; + uint8_t data[ZT_MAX_MTU]; +}; + class OneServiceImpl : public OneService { public: @@ -465,17 +472,11 @@ public: unsigned int _tertiaryPort; volatile unsigned int _udpPortPickerCounter; -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - struct { - uint8_t data[2048]; - std::thread thr; - int64_t sock; - struct sockaddr_storage from; - int size; - std::condition_variable cond; - std::mutex lock; - } _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE]; -#endif + unsigned long _incomingPacketConcurrency; + std::vector<OneServiceIncomingPacket *> _incomingPacketMemoryPool; + BlockingQueue<OneServiceIncomingPacket *> _incomingPacketQueue; + std::vector<std::thread> _incomingPacketThreads; + Mutex _incomingPacketMemoryPoolLock,_incomingPacketThreadsLock; // Local configuration and memo-ized information from it json _localConfig; @@ -606,37 +607,33 @@ public: _ports[1] = 0; _ports[2] = 0; -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { - _incomingPacketWorker[tn].thr = std::thread([this,tn]() { - std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock); + _incomingPacketConcurrency = std::max((unsigned long)1,std::min((unsigned long)16,(unsigned long)std::thread::hardware_concurrency())); + for(long t=0;t<_incomingPacketConcurrency;++t) { + _incomingPacketThreads.push_back(std::thread([this]() { + OneServiceIncomingPacket *pkt = nullptr; for(;;) { - _incomingPacketWorker[tn].cond.wait(l); - if (_incomingPacketWorker[tn].size < 0) { + if (!_incomingPacketQueue.get(pkt)) + break; + if (!pkt) + break; + + const ZT_ResultCode rc = _node->processWirePacket(nullptr,pkt->now,pkt->sock,&(pkt->from),pkt->data,pkt->size,&_nextBackgroundTaskDeadline); + { + Mutex::Lock l(_incomingPacketMemoryPoolLock); + _incomingPacketMemoryPool.push_back(pkt); + } + if (ZT_ResultCode_isFatal(rc)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); break; - } else if (_incomingPacketWorker[tn].size > 0) { - const ZT_ResultCode rc = _node->processWirePacket( - (void *)0, - OSUtils::now(), - _incomingPacketWorker[tn].sock, - &(_incomingPacketWorker[tn].from), - _incomingPacketWorker[tn].data, - (unsigned int)_incomingPacketWorker[tn].size, - &_nextBackgroundTaskDeadline); - if (ZT_ResultCode_isFatal(rc)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - break; - } } } - }); + })); } -#endif #if ZT_VAULT_SUPPORT curl_global_init(CURL_GLOBAL_DEFAULT); @@ -645,24 +642,27 @@ public: virtual ~OneServiceImpl() { -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { - _incomingPacketWorker[tn].lock.lock(); - _incomingPacketWorker[tn].size = -1; - _incomingPacketWorker[tn].lock.unlock(); - _incomingPacketWorker[tn].cond.notify_all(); - } - for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { - _incomingPacketWorker[tn].thr.join(); - } -#endif + _incomingPacketQueue.stop(); + _incomingPacketThreadsLock.lock(); + for(auto t=_incomingPacketThreads.begin();t!=_incomingPacketThreads.end();++t) + t->join(); + _incomingPacketThreadsLock.unlock(); + _binder.closeAll(_phy); _phy.close(_localControlSocket4); _phy.close(_localControlSocket6); + #if ZT_VAULT_SUPPORT curl_global_cleanup(); #endif + _incomingPacketMemoryPoolLock.lock(); + while (!_incomingPacketMemoryPool.empty()) { + delete _incomingPacketMemoryPool.back(); + _incomingPacketMemoryPool.pop_back(); + } + _incomingPacketMemoryPoolLock.unlock(); + #ifdef ZT_USE_MINIUPNPC delete _portMapper; #endif @@ -1900,39 +1900,27 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len) { + const uint64_t now = OSUtils::now(); if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) - _lastDirectReceiveFromGlobal = OSUtils::now(); -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - unsigned long cksum = 0; - for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) { - cksum += ((uint8_t *)from)[i]; - } - const unsigned long tn = cksum % ZT_INCOMING_PACKET_THREAD_POOL_SIZE; - _incomingPacketWorker[tn].lock.lock(); - memcpy(_incomingPacketWorker[tn].data,data,len); - _incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock); - memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); - _incomingPacketWorker[tn].size = (int)len; - _incomingPacketWorker[tn].lock.unlock(); - _incomingPacketWorker[tn].cond.notify_all(); -#else - const ZT_ResultCode rc = _node->processWirePacket( - (void *)0, - OSUtils::now(), - reinterpret_cast<int64_t>(sock), - reinterpret_cast<const struct sockaddr_storage *>(from), // Phy<> uses sockaddr_storage, so it'll always be that big - data, - len, - &_nextBackgroundTaskDeadline); - if (ZT_ResultCode_isFatal(rc)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); + _lastDirectReceiveFromGlobal = now; + + OneServiceIncomingPacket *pkt; + _incomingPacketMemoryPoolLock.lock(); + if (_incomingPacketMemoryPool.empty()) { + pkt = new OneServiceIncomingPacket; + } else { + pkt = _incomingPacketMemoryPool.back(); + _incomingPacketMemoryPool.pop_back(); } -#endif + _incomingPacketMemoryPoolLock.unlock(); + + pkt->now = now; + pkt->sock = reinterpret_cast<int64_t>(sock); + ZT_FAST_MEMCPY(&(pkt->from),from,sizeof(struct sockaddr_storage)); + pkt->size = (unsigned int)len; + ZT_FAST_MEMCPY(pkt->data,data,len); + + _incomingPacketQueue.postLimit(pkt,16 * _incomingPacketConcurrency); } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) |
