diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2018-11-13 09:35:20 -0800 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2018-11-13 09:35:20 -0800 |
commit | 3b6b1d167427b3d3873070bb111f18db5dedac1a (patch) | |
tree | d6ed2e4088faf45ee5b147b2e6a70c155441974b /service | |
parent | b937aeb857255c44dcdd66ef0ac3aed1c843fb13 (diff) | |
download | infinitytier-3b6b1d167427b3d3873070bb111f18db5dedac1a.tar.gz infinitytier-3b6b1d167427b3d3873070bb111f18db5dedac1a.zip |
Make incoming packet processor thread pool dynamic based on core count.
Diffstat (limited to 'service')
-rw-r--r-- | service/OneService.cpp | 71 |
1 files changed, 34 insertions, 37 deletions
diff --git a/service/OneService.cpp b/service/OneService.cpp index effe90c2..ad5680c2 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -174,8 +174,8 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; } // TCP activity timeout #define ZT_TCP_ACTIVITY_TIMEOUT 60000 -// Number of receive path threads to start -#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 8 +// Max number of packet handler threads to start +#define ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE 16 #if ZT_VAULT_SUPPORT size_t curlResponseWrite(void *ptr, size_t size, size_t nmemb, std::string *data) @@ -465,7 +465,7 @@ public: unsigned int _tertiaryPort; volatile unsigned int _udpPortPickerCounter; -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE + unsigned int _incomingPacketThreadPoolSize; struct { uint8_t data[2048]; std::thread thr; @@ -474,8 +474,7 @@ public: int size; std::condition_variable cond; std::mutex lock; - } _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE]; -#endif + } _incomingPacketWorker[ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE]; // Local configuration and memo-ized information from it json _localConfig; @@ -606,8 +605,8 @@ public: _ports[1] = 0; _ports[2] = 0; -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { + _incomingPacketThreadPoolSize = std::max(std::min((unsigned int)std::thread::hardware_concurrency(),(unsigned int)ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE),(unsigned int)1); + for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) { _incomingPacketWorker[tn].thr = std::thread([this,tn]() { std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock); for(;;) { @@ -636,7 +635,6 @@ public: } }); } -#endif #if ZT_VAULT_SUPPORT curl_global_init(CURL_GLOBAL_DEFAULT); @@ -645,17 +643,15 @@ public: virtual ~OneServiceImpl() { -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { + for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) { _incomingPacketWorker[tn].lock.lock(); _incomingPacketWorker[tn].size = -1; _incomingPacketWorker[tn].lock.unlock(); _incomingPacketWorker[tn].cond.notify_all(); } - for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { + for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) { _incomingPacketWorker[tn].thr.join(); } -#endif _binder.closeAll(_phy); _phy.close(_localControlSocket4); _phy.close(_localControlSocket6); @@ -1902,37 +1898,38 @@ public: { if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - unsigned long cksum = 0; - for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) { - cksum += ((uint8_t *)from)[i]; + + /* Pick worker thread by checksumming the from address. This avoids thread + * scheduling caused packet re-ordering by binding each individual remote + * peer to a specific thread. It will block globally if that thread is blocked, + * so this is not an optimal implementation from the perspective of perfect + * thread utilization. Nevertheless using threads this way does greatly + * improve performance in heavy load multi-peer scenarios and does so with + * little impact on simpler scenarios due to its extreme simplicity. */ + uint8_t cksum = 0; + switch(from->sa_family) { + case AF_INET: + for(unsigned int i=0;i<4;++i) + cksum += ((const uint8_t *)(&(((const struct sockaddr_in *)from)->sin_addr.s_addr)))[i]; + break; + case AF_INET6: + for(unsigned int i=0;i<16;++i) + cksum += ((const struct sockaddr_in6 *)from)->sin6_addr.s6_addr[i]; + break; + default: + for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) + cksum += ((uint8_t *)from)[i]; + break; } - const unsigned long tn = cksum % ZT_INCOMING_PACKET_THREAD_POOL_SIZE; + const unsigned int tn = cksum % _incomingPacketThreadPoolSize; + _incomingPacketWorker[tn].lock.lock(); - memcpy(_incomingPacketWorker[tn].data,data,len); + ZT_FAST_MEMCPY(_incomingPacketWorker[tn].data,data,len); _incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock); - memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); + ZT_FAST_MEMCPY(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); _incomingPacketWorker[tn].size = (int)len; _incomingPacketWorker[tn].lock.unlock(); _incomingPacketWorker[tn].cond.notify_all(); -#else - const ZT_ResultCode rc = _node->processWirePacket( - (void *)0, - OSUtils::now(), - reinterpret_cast<int64_t>(sock), - reinterpret_cast<const struct sockaddr_storage *>(from), // Phy<> uses sockaddr_storage, so it'll always be that big - data, - len, - &_nextBackgroundTaskDeadline); - if (ZT_ResultCode_isFatal(rc)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - } -#endif } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) |