From c9c17eaddd6380724a3b0360e8a2e5ae8f84a69c Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Sun, 11 Nov 2018 22:35:15 -0800 Subject: Retire RethinkDB, simple receive path multithreading. --- service/OneService.cpp | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) (limited to 'service') diff --git a/service/OneService.cpp b/service/OneService.cpp index b1419234..86bae730 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -34,6 +34,9 @@ #include #include #include +#include +#include +#include #include "../version.h" #include "../include/ZeroTierOne.h" @@ -434,6 +437,8 @@ struct TcpConnection Mutex writeq_m; }; +#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 4 + class OneServiceImpl : public OneService { public: @@ -459,6 +464,18 @@ public: unsigned int _tertiaryPort; volatile unsigned int _udpPortPickerCounter; +#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE + struct { + uint8_t data[2048]; + std::thread thr; + int64_t sock; + struct sockaddr_storage from; + int size; + std::condition_variable cond; + std::mutex lock; + } _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE]; +#endif + // Local configuration and memo-ized information from it json _localConfig; Hashtable< uint64_t,std::vector > _v4Hints; @@ -587,6 +604,39 @@ public: _ports[0] = 0; _ports[1] = 0; _ports[2] = 0; + +#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE + for(unsigned int tn=0;tn l(_incomingPacketWorker[tn].lock); + for(;;) { + _incomingPacketWorker[tn].cond.wait(l); + if (_incomingPacketWorker[tn].size < 0) { + break; + } else if (_incomingPacketWorker[tn].size > 0) { + const ZT_ResultCode rc = _node->processWirePacket( + (void *)0, + OSUtils::now(), + _incomingPacketWorker[tn].sock, + &(_incomingPacketWorker[tn].from), + _incomingPacketWorker[tn].data, + (unsigned int)_incomingPacketWorker[tn].size, + &_nextBackgroundTaskDeadline); + if (ZT_ResultCode_isFatal(rc)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + } + } + }); + } +#endif + #if ZT_VAULT_SUPPORT curl_global_init(CURL_GLOBAL_DEFAULT); #endif @@ -594,6 +644,17 @@ public: virtual ~OneServiceImpl() { +#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE + for(unsigned int tn=0;tn= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); +#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE + unsigned long cksum = 0; + for(unsigned int i=0;i(sock); + memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); + _incomingPacketWorker[tn].size = (int)len; + _incomingPacketWorker[tn].lock.unlock(); + _incomingPacketWorker[tn].cond.notify_all(); +#else const ZT_ResultCode rc = _node->processWirePacket( (void *)0, OSUtils::now(), @@ -1856,6 +1931,7 @@ public: _fatalErrorMessage = tmp; this->terminate(); } +#endif } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) -- cgit v1.2.3