summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--attic/RethinkDB.cpp (renamed from controller/RethinkDB.cpp)0
-rw-r--r--attic/RethinkDB.hpp (renamed from controller/RethinkDB.hpp)0
-rw-r--r--controller/EmbeddedNetworkController.hpp3
-rw-r--r--service/OneService.cpp76
4 files changed, 79 insertions, 0 deletions
diff --git a/controller/RethinkDB.cpp b/attic/RethinkDB.cpp
index d54b30b6..d54b30b6 100644
--- a/controller/RethinkDB.cpp
+++ b/attic/RethinkDB.cpp
diff --git a/controller/RethinkDB.hpp b/attic/RethinkDB.hpp
index 60f04c5b..60f04c5b 100644
--- a/controller/RethinkDB.hpp
+++ b/attic/RethinkDB.hpp
diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp
index df6d4a7b..c3f121c5 100644
--- a/controller/EmbeddedNetworkController.hpp
+++ b/controller/EmbeddedNetworkController.hpp
@@ -146,10 +146,13 @@ private:
Identity _signingId;
std::string _signingIdAddressString;
NetworkController::Sender *_sender;
+
std::unique_ptr<DB> _db;
BlockingQueue< _RQEntry * > _queue;
+
std::vector<std::thread> _threads;
std::mutex _threads_l;
+
std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus;
std::mutex _memberStatus_l;
};
diff --git a/service/OneService.cpp b/service/OneService.cpp
index b1419234..86bae730 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -34,6 +34,9 @@
#include <vector>
#include <algorithm>
#include <list>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
#include "../version.h"
#include "../include/ZeroTierOne.h"
@@ -434,6 +437,8 @@ struct TcpConnection
Mutex writeq_m;
};
+#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 4
+
class OneServiceImpl : public OneService
{
public:
@@ -459,6 +464,18 @@ public:
unsigned int _tertiaryPort;
volatile unsigned int _udpPortPickerCounter;
+#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
+ struct {
+ uint8_t data[2048];
+ std::thread thr;
+ int64_t sock;
+ struct sockaddr_storage from;
+ int size;
+ std::condition_variable cond;
+ std::mutex lock;
+ } _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE];
+#endif
+
// Local configuration and memo-ized information from it
json _localConfig;
Hashtable< uint64_t,std::vector<InetAddress> > _v4Hints;
@@ -587,6 +604,39 @@ public:
_ports[0] = 0;
_ports[1] = 0;
_ports[2] = 0;
+
+#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
+ for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+ _incomingPacketWorker[tn].thr = std::thread([this,tn]() {
+ std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock);
+ for(;;) {
+ _incomingPacketWorker[tn].cond.wait(l);
+ if (_incomingPacketWorker[tn].size < 0) {
+ break;
+ } else if (_incomingPacketWorker[tn].size > 0) {
+ const ZT_ResultCode rc = _node->processWirePacket(
+ (void *)0,
+ OSUtils::now(),
+ _incomingPacketWorker[tn].sock,
+ &(_incomingPacketWorker[tn].from),
+ _incomingPacketWorker[tn].data,
+ (unsigned int)_incomingPacketWorker[tn].size,
+ &_nextBackgroundTaskDeadline);
+ if (ZT_ResultCode_isFatal(rc)) {
+ char tmp[256];
+ OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
+ Mutex::Lock _l(_termReason_m);
+ _termReason = ONE_UNRECOVERABLE_ERROR;
+ _fatalErrorMessage = tmp;
+ this->terminate();
+ break;
+ }
+ }
+ }
+ });
+ }
+#endif
+
#if ZT_VAULT_SUPPORT
curl_global_init(CURL_GLOBAL_DEFAULT);
#endif
@@ -594,6 +644,17 @@ public:
virtual ~OneServiceImpl()
{
+#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
+ for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+ _incomingPacketWorker[tn].lock.lock();
+ _incomingPacketWorker[tn].size = -1;
+ _incomingPacketWorker[tn].lock.unlock();
+ _incomingPacketWorker[tn].cond.notify_all();
+ }
+ for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+ _incomingPacketWorker[tn].thr.join();
+ }
+#endif
_binder.closeAll(_phy);
_phy.close(_localControlSocket4);
_phy.close(_localControlSocket6);
@@ -1840,6 +1901,20 @@ public:
{
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
_lastDirectReceiveFromGlobal = OSUtils::now();
+#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
+ unsigned long cksum = 0;
+ for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) {
+ cksum += ((uint8_t *)from)[i];
+ }
+ const unsigned long tn = cksum % ZT_INCOMING_PACKET_THREAD_POOL_SIZE;
+ _incomingPacketWorker[tn].lock.lock();
+ memcpy(_incomingPacketWorker[tn].data,data,len);
+ _incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock);
+ memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
+ _incomingPacketWorker[tn].size = (int)len;
+ _incomingPacketWorker[tn].lock.unlock();
+ _incomingPacketWorker[tn].cond.notify_all();
+#else
const ZT_ResultCode rc = _node->processWirePacket(
(void *)0,
OSUtils::now(),
@@ -1856,6 +1931,7 @@ public:
_fatalErrorMessage = tmp;
this->terminate();
}
+#endif
}
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)