diff options
-rw-r--r-- | attic/RethinkDB.cpp (renamed from controller/RethinkDB.cpp) | 0 | ||||
-rw-r--r-- | attic/RethinkDB.hpp (renamed from controller/RethinkDB.hpp) | 0 | ||||
-rw-r--r-- | controller/EmbeddedNetworkController.hpp | 3 | ||||
-rw-r--r-- | service/OneService.cpp | 76 |
4 files changed, 79 insertions, 0 deletions
diff --git a/controller/RethinkDB.cpp b/attic/RethinkDB.cpp index d54b30b6..d54b30b6 100644 --- a/controller/RethinkDB.cpp +++ b/attic/RethinkDB.cpp diff --git a/controller/RethinkDB.hpp b/attic/RethinkDB.hpp index 60f04c5b..60f04c5b 100644 --- a/controller/RethinkDB.hpp +++ b/attic/RethinkDB.hpp diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index df6d4a7b..c3f121c5 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -146,10 +146,13 @@ private: Identity _signingId; std::string _signingIdAddressString; NetworkController::Sender *_sender; + std::unique_ptr<DB> _db; BlockingQueue< _RQEntry * > _queue; + std::vector<std::thread> _threads; std::mutex _threads_l; + std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus; std::mutex _memberStatus_l; }; diff --git a/service/OneService.cpp b/service/OneService.cpp index b1419234..86bae730 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -34,6 +34,9 @@ #include <vector> #include <algorithm> #include <list> +#include <thread> +#include <mutex> +#include <condition_variable> #include "../version.h" #include "../include/ZeroTierOne.h" @@ -434,6 +437,8 @@ struct TcpConnection Mutex writeq_m; }; +#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 4 + class OneServiceImpl : public OneService { public: @@ -459,6 +464,18 @@ public: unsigned int _tertiaryPort; volatile unsigned int _udpPortPickerCounter; +#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE + struct { + uint8_t data[2048]; + std::thread thr; + int64_t sock; + struct sockaddr_storage from; + int size; + std::condition_variable cond; + std::mutex lock; + } _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE]; +#endif + // Local configuration and memo-ized information from it json _localConfig; Hashtable< uint64_t,std::vector<InetAddress> > _v4Hints; @@ -587,6 +604,39 @@ public: _ports[0] = 0; _ports[1] = 0; _ports[2] = 0; + +#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE + for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { + _incomingPacketWorker[tn].thr = std::thread([this,tn]() { + std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock); + for(;;) { + _incomingPacketWorker[tn].cond.wait(l); + if (_incomingPacketWorker[tn].size < 0) { + break; + } else if (_incomingPacketWorker[tn].size > 0) { + const ZT_ResultCode rc = _node->processWirePacket( + (void *)0, + OSUtils::now(), + _incomingPacketWorker[tn].sock, + &(_incomingPacketWorker[tn].from), + _incomingPacketWorker[tn].data, + (unsigned int)_incomingPacketWorker[tn].size, + &_nextBackgroundTaskDeadline); + if (ZT_ResultCode_isFatal(rc)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + } + } + }); + } +#endif + #if ZT_VAULT_SUPPORT curl_global_init(CURL_GLOBAL_DEFAULT); #endif @@ -594,6 +644,17 @@ public: virtual ~OneServiceImpl() { +#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE + for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { + _incomingPacketWorker[tn].lock.lock(); + _incomingPacketWorker[tn].size = -1; + _incomingPacketWorker[tn].lock.unlock(); + _incomingPacketWorker[tn].cond.notify_all(); + } + for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { + _incomingPacketWorker[tn].thr.join(); + } +#endif _binder.closeAll(_phy); _phy.close(_localControlSocket4); _phy.close(_localControlSocket6); @@ -1840,6 +1901,20 @@ public: { if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); +#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE + unsigned long cksum = 0; + for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) { + cksum += ((uint8_t *)from)[i]; + } + const unsigned long tn = cksum % ZT_INCOMING_PACKET_THREAD_POOL_SIZE; + _incomingPacketWorker[tn].lock.lock(); + memcpy(_incomingPacketWorker[tn].data,data,len); + _incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock); + memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); + _incomingPacketWorker[tn].size = (int)len; + _incomingPacketWorker[tn].lock.unlock(); + _incomingPacketWorker[tn].cond.notify_all(); +#else const ZT_ResultCode rc = _node->processWirePacket( (void *)0, OSUtils::now(), @@ -1856,6 +1931,7 @@ public: _fatalErrorMessage = tmp; this->terminate(); } +#endif } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) |