From bb0808c99c37eaf8cfbf8d90647933d5899f48a6 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 19 Jun 2019 10:19:59 -0700 Subject: Remove current multithreaded receive path, which is not that efficient. We will do something better in the future. --- service/OneService.cpp | 85 +++++++------------------------------------------- 1 file changed, 12 insertions(+), 73 deletions(-) (limited to 'service') diff --git a/service/OneService.cpp b/service/OneService.cpp index 0043204c..63ee3373 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -147,6 +147,10 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; } #endif // ZT_USE_TEST_TAP +#ifndef ZT_SOFTWARE_UPDATE_DEFAULT +#define ZT_SOFTWARE_UPDATE_DEFAULT "disable" +#endif + // Sanity limits for HTTP #define ZT_MAX_HTTP_MESSAGE_SIZE (1024 * 1024 * 64) #define ZT_MAX_HTTP_CONNECTIONS 65536 @@ -477,12 +481,6 @@ public: unsigned int _tertiaryPort; volatile unsigned int _udpPortPickerCounter; - unsigned long _incomingPacketConcurrency; - std::vector _incomingPacketMemoryPool; - BlockingQueue _incomingPacketQueue; - std::vector _incomingPacketThreads; - Mutex _incomingPacketMemoryPoolLock,_incomingPacketThreadsLock; - // Local configuration and memo-ized information from it json _localConfig; Hashtable< uint64_t,std::vector > _v4Hints; @@ -615,43 +613,6 @@ public: _ports[1] = 0; _ports[2] = 0; - _incomingPacketConcurrency = std::max((unsigned long)1,std::min((unsigned long)16,(unsigned long)std::thread::hardware_concurrency())); - char *envPool = std::getenv("INCOMING_PACKET_CONCURRENCY"); - if (envPool != NULL) { - int tmp = atoi(envPool); - if (tmp > 0) { - _incomingPacketConcurrency = tmp; - } - } - for(unsigned long t=0;t<_incomingPacketConcurrency;++t) { - _incomingPacketThreads.push_back(std::thread([this]() { - OneServiceIncomingPacket *pkt = nullptr; - for(;;) { - if (!_incomingPacketQueue.get(pkt)) - break; - if (!pkt) - break; - if (!_run) - break; - - const ZT_ResultCode rc = _node->processWirePacket(nullptr,pkt->now,pkt->sock,&(pkt->from),pkt->data,pkt->size,&_nextBackgroundTaskDeadline); - { - Mutex::Lock l(_incomingPacketMemoryPoolLock); - _incomingPacketMemoryPool.push_back(pkt); - } - if (ZT_ResultCode_isFatal(rc)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - break; - } - } - })); - } - #if ZT_VAULT_SUPPORT curl_global_init(CURL_GLOBAL_DEFAULT); #endif @@ -659,12 +620,6 @@ public: virtual ~OneServiceImpl() { - _incomingPacketQueue.stop(); - _incomingPacketThreadsLock.lock(); - for(auto t=_incomingPacketThreads.begin();t!=_incomingPacketThreads.end();++t) - t->join(); - _incomingPacketThreadsLock.unlock(); - _binder.closeAll(_phy); _phy.close(_localControlSocket4); _phy.close(_localControlSocket6); @@ -673,13 +628,6 @@ public: curl_global_cleanup(); #endif - _incomingPacketMemoryPoolLock.lock(); - while (!_incomingPacketMemoryPool.empty()) { - delete _incomingPacketMemoryPool.back(); - _incomingPacketMemoryPool.pop_back(); - } - _incomingPacketMemoryPoolLock.unlock(); - #ifdef ZT_USE_MINIUPNPC delete _portMapper; #endif @@ -1917,24 +1865,15 @@ public: const uint64_t now = OSUtils::now(); if ((len >= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = now; - - OneServiceIncomingPacket *pkt; - _incomingPacketMemoryPoolLock.lock(); - if (_incomingPacketMemoryPool.empty()) { - pkt = new OneServiceIncomingPacket; - } else { - pkt = _incomingPacketMemoryPool.back(); - _incomingPacketMemoryPool.pop_back(); + const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); + if (ZT_ResultCode_isFatal(rc)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); } - _incomingPacketMemoryPoolLock.unlock(); - - pkt->now = now; - pkt->sock = reinterpret_cast(sock); - memcpy(&(pkt->from),from,sizeof(struct sockaddr_storage)); - pkt->size = (unsigned int)len; - memcpy(pkt->data,data,len); - - _incomingPacketQueue.postLimit(pkt,16 * _incomingPacketConcurrency); } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) -- cgit v1.2.3