summaryrefslogtreecommitdiff
path: root/service
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2018-11-13 09:35:20 -0800
committerAdam Ierymenko <adam.ierymenko@gmail.com>2018-11-13 09:35:20 -0800
commit3b6b1d167427b3d3873070bb111f18db5dedac1a (patch)
treed6ed2e4088faf45ee5b147b2e6a70c155441974b /service
parentb937aeb857255c44dcdd66ef0ac3aed1c843fb13 (diff)
downloadinfinitytier-3b6b1d167427b3d3873070bb111f18db5dedac1a.tar.gz
infinitytier-3b6b1d167427b3d3873070bb111f18db5dedac1a.zip
Make incoming packet processor thread pool dynamic based on core count.
Diffstat (limited to 'service')
-rw-r--r--service/OneService.cpp71
1 files changed, 34 insertions, 37 deletions
diff --git a/service/OneService.cpp b/service/OneService.cpp
index effe90c2..ad5680c2 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -174,8 +174,8 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
// TCP activity timeout
#define ZT_TCP_ACTIVITY_TIMEOUT 60000
-// Number of receive path threads to start
-#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 8
+// Max number of packet handler threads to start
+#define ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE 16
#if ZT_VAULT_SUPPORT
size_t curlResponseWrite(void *ptr, size_t size, size_t nmemb, std::string *data)
@@ -465,7 +465,7 @@ public:
unsigned int _tertiaryPort;
volatile unsigned int _udpPortPickerCounter;
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
+ unsigned int _incomingPacketThreadPoolSize;
struct {
uint8_t data[2048];
std::thread thr;
@@ -474,8 +474,7 @@ public:
int size;
std::condition_variable cond;
std::mutex lock;
- } _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE];
-#endif
+ } _incomingPacketWorker[ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE];
// Local configuration and memo-ized information from it
json _localConfig;
@@ -606,8 +605,8 @@ public:
_ports[1] = 0;
_ports[2] = 0;
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
- for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+ _incomingPacketThreadPoolSize = std::max(std::min((unsigned int)std::thread::hardware_concurrency(),(unsigned int)ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE),(unsigned int)1);
+ for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) {
_incomingPacketWorker[tn].thr = std::thread([this,tn]() {
std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock);
for(;;) {
@@ -636,7 +635,6 @@ public:
}
});
}
-#endif
#if ZT_VAULT_SUPPORT
curl_global_init(CURL_GLOBAL_DEFAULT);
@@ -645,17 +643,15 @@ public:
virtual ~OneServiceImpl()
{
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
- for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+ for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) {
_incomingPacketWorker[tn].lock.lock();
_incomingPacketWorker[tn].size = -1;
_incomingPacketWorker[tn].lock.unlock();
_incomingPacketWorker[tn].cond.notify_all();
}
- for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+ for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) {
_incomingPacketWorker[tn].thr.join();
}
-#endif
_binder.closeAll(_phy);
_phy.close(_localControlSocket4);
_phy.close(_localControlSocket6);
@@ -1902,37 +1898,38 @@ public:
{
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
_lastDirectReceiveFromGlobal = OSUtils::now();
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
- unsigned long cksum = 0;
- for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) {
- cksum += ((uint8_t *)from)[i];
+
+ /* Pick worker thread by checksumming the from address. This avoids thread
+ * scheduling caused packet re-ordering by binding each individual remote
+ * peer to a specific thread. It will block globally if that thread is blocked,
+ * so this is not an optimal implementation from the perspective of perfect
+ * thread utilization. Nevertheless using threads this way does greatly
+ * improve performance in heavy load multi-peer scenarios and does so with
+ * little impact on simpler scenarios due to its extreme simplicity. */
+ uint8_t cksum = 0;
+ switch(from->sa_family) {
+ case AF_INET:
+ for(unsigned int i=0;i<4;++i)
+ cksum += ((const uint8_t *)(&(((const struct sockaddr_in *)from)->sin_addr.s_addr)))[i];
+ break;
+ case AF_INET6:
+ for(unsigned int i=0;i<16;++i)
+ cksum += ((const struct sockaddr_in6 *)from)->sin6_addr.s6_addr[i];
+ break;
+ default:
+ for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i)
+ cksum += ((uint8_t *)from)[i];
+ break;
}
- const unsigned long tn = cksum % ZT_INCOMING_PACKET_THREAD_POOL_SIZE;
+ const unsigned int tn = cksum % _incomingPacketThreadPoolSize;
+
_incomingPacketWorker[tn].lock.lock();
- memcpy(_incomingPacketWorker[tn].data,data,len);
+ ZT_FAST_MEMCPY(_incomingPacketWorker[tn].data,data,len);
_incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock);
- memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
+ ZT_FAST_MEMCPY(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
_incomingPacketWorker[tn].size = (int)len;
_incomingPacketWorker[tn].lock.unlock();
_incomingPacketWorker[tn].cond.notify_all();
-#else
- const ZT_ResultCode rc = _node->processWirePacket(
- (void *)0,
- OSUtils::now(),
- reinterpret_cast<int64_t>(sock),
- reinterpret_cast<const struct sockaddr_storage *>(from), // Phy<> uses sockaddr_storage, so it'll always be that big
- data,
- len,
- &_nextBackgroundTaskDeadline);
- if (ZT_ResultCode_isFatal(rc)) {
- char tmp[256];
- OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
- Mutex::Lock _l(_termReason_m);
- _termReason = ONE_UNRECOVERABLE_ERROR;
- _fatalErrorMessage = tmp;
- this->terminate();
- }
-#endif
}
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)