summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--one.cpp2
-rw-r--r--osdep/BlockingQueue.hpp23
-rw-r--r--service/OneService.cpp150
3 files changed, 91 insertions, 84 deletions
diff --git a/one.cpp b/one.cpp
index b48e4396..7ee818af 100644
--- a/one.cpp
+++ b/one.cpp
@@ -1354,12 +1354,14 @@ int main(int argc,char **argv)
#ifdef __UNIX_LIKE__
signal(SIGHUP,&_sighandlerHup);
signal(SIGPIPE,SIG_IGN);
+ signal(SIGIO,SIG_IGN);
signal(SIGUSR1,SIG_IGN);
signal(SIGUSR2,SIG_IGN);
signal(SIGALRM,SIG_IGN);
signal(SIGINT,&_sighandlerQuit);
signal(SIGTERM,&_sighandlerQuit);
signal(SIGQUIT,&_sighandlerQuit);
+ signal(SIGINT,&_sighandlerQuit);
/* Ensure that there are no inherited file descriptors open from a previous
* incarnation. This is a hack to ensure that GitHub issue #61 or variants
diff --git a/osdep/BlockingQueue.hpp b/osdep/BlockingQueue.hpp
index 351a095a..03986efe 100644
--- a/osdep/BlockingQueue.hpp
+++ b/osdep/BlockingQueue.hpp
@@ -32,6 +32,8 @@
#include <condition_variable>
#include <chrono>
+#include "Thread.hpp"
+
namespace ZeroTier {
/**
@@ -52,6 +54,23 @@ public:
c.notify_one();
}
+ inline void postWait(T t,unsigned long maxQueueSize)
+ {
+ for(;;) {
+ {
+ std::lock_guard<std::mutex> lock(m);
+ if (q.size() < maxQueueSize) {
+ q.push(t);
+ c.notify_one();
+ return;
+ }
+ }
+ if (!r)
+ break;
+ Thread::sleep(1);
+ }
+ }
+
inline void stop(void)
{
std::lock_guard<std::mutex> lock(m);
@@ -98,8 +117,8 @@ public:
private:
volatile bool r;
std::queue<T> q;
- std::mutex m;
- std::condition_variable c;
+ mutable std::mutex m;
+ mutable std::condition_variable c;
};
} // namespace ZeroTier
diff --git a/service/OneService.cpp b/service/OneService.cpp
index effe90c2..bf24466d 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -60,6 +60,7 @@
#include "../osdep/PortMapper.hpp"
#include "../osdep/Binder.hpp"
#include "../osdep/ManagedRoute.hpp"
+#include "../osdep/BlockingQueue.hpp"
#include "OneService.hpp"
#include "SoftwareUpdater.hpp"
@@ -174,9 +175,6 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
// TCP activity timeout
#define ZT_TCP_ACTIVITY_TIMEOUT 60000
-// Number of receive path threads to start
-#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 8
-
#if ZT_VAULT_SUPPORT
size_t curlResponseWrite(void *ptr, size_t size, size_t nmemb, std::string *data)
{
@@ -440,6 +438,15 @@ struct TcpConnection
Mutex writeq_m;
};
+struct OneServiceIncomingPacket
+{
+ uint64_t now;
+ int64_t sock;
+ struct sockaddr_storage from;
+ unsigned int size;
+ uint8_t data[ZT_MAX_MTU];
+};
+
class OneServiceImpl : public OneService
{
public:
@@ -465,17 +472,10 @@ public:
unsigned int _tertiaryPort;
volatile unsigned int _udpPortPickerCounter;
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
- struct {
- uint8_t data[2048];
- std::thread thr;
- int64_t sock;
- struct sockaddr_storage from;
- int size;
- std::condition_variable cond;
- std::mutex lock;
- } _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE];
-#endif
+ std::vector<OneServiceIncomingPacket *> _incomingPacketMemoryPool;
+ BlockingQueue<OneServiceIncomingPacket *> _incomingPacketQueue;
+ std::vector<std::thread> _incomingPacketThreads;
+ Mutex _incomingPacketMemoryPoolLock,_incomingPacketThreadsLock;
// Local configuration and memo-ized information from it
json _localConfig;
@@ -606,37 +606,32 @@ public:
_ports[1] = 0;
_ports[2] = 0;
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
- for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
- _incomingPacketWorker[tn].thr = std::thread([this,tn]() {
- std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock);
+ for(long t=0;t<std::max((long)1,(long)std::thread::hardware_concurrency());++t) {
+ _incomingPacketThreads.push_back(std::thread([this]() {
+ OneServiceIncomingPacket *pkt = nullptr;
for(;;) {
- _incomingPacketWorker[tn].cond.wait(l);
- if (_incomingPacketWorker[tn].size < 0) {
+ if (!_incomingPacketQueue.get(pkt))
+ break;
+ if (!pkt)
+ break;
+
+ const ZT_ResultCode rc = _node->processWirePacket(nullptr,pkt->now,pkt->sock,&(pkt->from),pkt->data,pkt->size,&_nextBackgroundTaskDeadline);
+ {
+ Mutex::Lock l(_incomingPacketMemoryPoolLock);
+ _incomingPacketMemoryPool.push_back(pkt);
+ }
+ if (ZT_ResultCode_isFatal(rc)) {
+ char tmp[256];
+ OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
+ Mutex::Lock _l(_termReason_m);
+ _termReason = ONE_UNRECOVERABLE_ERROR;
+ _fatalErrorMessage = tmp;
+ this->terminate();
break;
- } else if (_incomingPacketWorker[tn].size > 0) {
- const ZT_ResultCode rc = _node->processWirePacket(
- (void *)0,
- OSUtils::now(),
- _incomingPacketWorker[tn].sock,
- &(_incomingPacketWorker[tn].from),
- _incomingPacketWorker[tn].data,
- (unsigned int)_incomingPacketWorker[tn].size,
- &_nextBackgroundTaskDeadline);
- if (ZT_ResultCode_isFatal(rc)) {
- char tmp[256];
- OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
- Mutex::Lock _l(_termReason_m);
- _termReason = ONE_UNRECOVERABLE_ERROR;
- _fatalErrorMessage = tmp;
- this->terminate();
- break;
- }
}
}
- });
+ }));
}
-#endif
#if ZT_VAULT_SUPPORT
curl_global_init(CURL_GLOBAL_DEFAULT);
@@ -645,24 +640,27 @@ public:
virtual ~OneServiceImpl()
{
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
- for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
- _incomingPacketWorker[tn].lock.lock();
- _incomingPacketWorker[tn].size = -1;
- _incomingPacketWorker[tn].lock.unlock();
- _incomingPacketWorker[tn].cond.notify_all();
- }
- for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
- _incomingPacketWorker[tn].thr.join();
- }
-#endif
+ _incomingPacketQueue.stop();
+ _incomingPacketThreadsLock.lock();
+ for(auto t=_incomingPacketThreads.begin();t!=_incomingPacketThreads.end();++t)
+ t->join();
+ _incomingPacketThreadsLock.unlock();
+
_binder.closeAll(_phy);
_phy.close(_localControlSocket4);
_phy.close(_localControlSocket6);
+
#if ZT_VAULT_SUPPORT
curl_global_cleanup();
#endif
+ _incomingPacketMemoryPoolLock.lock();
+ while (!_incomingPacketMemoryPool.empty()) {
+ delete _incomingPacketMemoryPool.back();
+ _incomingPacketMemoryPool.pop_back();
+ }
+ _incomingPacketMemoryPoolLock.unlock();
+
#ifdef ZT_USE_MINIUPNPC
delete _portMapper;
#endif
@@ -1900,39 +1898,27 @@ public:
inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
{
+ const uint64_t now = OSUtils::now();
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
- _lastDirectReceiveFromGlobal = OSUtils::now();
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
- unsigned long cksum = 0;
- for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) {
- cksum += ((uint8_t *)from)[i];
- }
- const unsigned long tn = cksum % ZT_INCOMING_PACKET_THREAD_POOL_SIZE;
- _incomingPacketWorker[tn].lock.lock();
- memcpy(_incomingPacketWorker[tn].data,data,len);
- _incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock);
- memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
- _incomingPacketWorker[tn].size = (int)len;
- _incomingPacketWorker[tn].lock.unlock();
- _incomingPacketWorker[tn].cond.notify_all();
-#else
- const ZT_ResultCode rc = _node->processWirePacket(
- (void *)0,
- OSUtils::now(),
- reinterpret_cast<int64_t>(sock),
- reinterpret_cast<const struct sockaddr_storage *>(from), // Phy<> uses sockaddr_storage, so it'll always be that big
- data,
- len,
- &_nextBackgroundTaskDeadline);
- if (ZT_ResultCode_isFatal(rc)) {
- char tmp[256];
- OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
- Mutex::Lock _l(_termReason_m);
- _termReason = ONE_UNRECOVERABLE_ERROR;
- _fatalErrorMessage = tmp;
- this->terminate();
+ _lastDirectReceiveFromGlobal = now;
+
+ OneServiceIncomingPacket *pkt;
+ _incomingPacketMemoryPoolLock.lock();
+ if (_incomingPacketMemoryPool.empty()) {
+ pkt = new OneServiceIncomingPacket;
+ } else {
+ pkt = _incomingPacketMemoryPool.back();
+ _incomingPacketMemoryPool.pop_back();
}
-#endif
+ _incomingPacketMemoryPoolLock.unlock();
+
+ pkt->now = now;
+ pkt->sock = reinterpret_cast<int64_t>(sock);
+ ZT_FAST_MEMCPY(&(pkt->from),from,sizeof(struct sockaddr_storage));
+ pkt->size = (unsigned int)len;
+ ZT_FAST_MEMCPY(pkt->data,data,len);
+
+ _incomingPacketQueue.postWait(pkt,64);
}
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)