summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2019-06-19 10:19:59 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2019-06-19 10:19:59 -0700
commitbb0808c99c37eaf8cfbf8d90647933d5899f48a6 (patch)
treeefbe9384064e843bef09638e247aa9051dc4c93b
parentf0295b154b907a0e306e786d2b5885d464c14811 (diff)
downloadinfinitytier-bb0808c99c37eaf8cfbf8d90647933d5899f48a6.tar.gz
infinitytier-bb0808c99c37eaf8cfbf8d90647933d5899f48a6.zip
Remove current multithreaded receive path, which is not that efficient. We will do something better in the future.
-rw-r--r--service/OneService.cpp85
1 files changed, 12 insertions, 73 deletions
diff --git a/service/OneService.cpp b/service/OneService.cpp
index 0043204c..63ee3373 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -147,6 +147,10 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
#endif // ZT_USE_TEST_TAP
+#ifndef ZT_SOFTWARE_UPDATE_DEFAULT
+#define ZT_SOFTWARE_UPDATE_DEFAULT "disable"
+#endif
+
// Sanity limits for HTTP
#define ZT_MAX_HTTP_MESSAGE_SIZE (1024 * 1024 * 64)
#define ZT_MAX_HTTP_CONNECTIONS 65536
@@ -477,12 +481,6 @@ public:
unsigned int _tertiaryPort;
volatile unsigned int _udpPortPickerCounter;
- unsigned long _incomingPacketConcurrency;
- std::vector<OneServiceIncomingPacket *> _incomingPacketMemoryPool;
- BlockingQueue<OneServiceIncomingPacket *> _incomingPacketQueue;
- std::vector<std::thread> _incomingPacketThreads;
- Mutex _incomingPacketMemoryPoolLock,_incomingPacketThreadsLock;
-
// Local configuration and memo-ized information from it
json _localConfig;
Hashtable< uint64_t,std::vector<InetAddress> > _v4Hints;
@@ -615,43 +613,6 @@ public:
_ports[1] = 0;
_ports[2] = 0;
- _incomingPacketConcurrency = std::max((unsigned long)1,std::min((unsigned long)16,(unsigned long)std::thread::hardware_concurrency()));
- char *envPool = std::getenv("INCOMING_PACKET_CONCURRENCY");
- if (envPool != NULL) {
- int tmp = atoi(envPool);
- if (tmp > 0) {
- _incomingPacketConcurrency = tmp;
- }
- }
- for(unsigned long t=0;t<_incomingPacketConcurrency;++t) {
- _incomingPacketThreads.push_back(std::thread([this]() {
- OneServiceIncomingPacket *pkt = nullptr;
- for(;;) {
- if (!_incomingPacketQueue.get(pkt))
- break;
- if (!pkt)
- break;
- if (!_run)
- break;
-
- const ZT_ResultCode rc = _node->processWirePacket(nullptr,pkt->now,pkt->sock,&(pkt->from),pkt->data,pkt->size,&_nextBackgroundTaskDeadline);
- {
- Mutex::Lock l(_incomingPacketMemoryPoolLock);
- _incomingPacketMemoryPool.push_back(pkt);
- }
- if (ZT_ResultCode_isFatal(rc)) {
- char tmp[256];
- OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
- Mutex::Lock _l(_termReason_m);
- _termReason = ONE_UNRECOVERABLE_ERROR;
- _fatalErrorMessage = tmp;
- this->terminate();
- break;
- }
- }
- }));
- }
-
#if ZT_VAULT_SUPPORT
curl_global_init(CURL_GLOBAL_DEFAULT);
#endif
@@ -659,12 +620,6 @@ public:
virtual ~OneServiceImpl()
{
- _incomingPacketQueue.stop();
- _incomingPacketThreadsLock.lock();
- for(auto t=_incomingPacketThreads.begin();t!=_incomingPacketThreads.end();++t)
- t->join();
- _incomingPacketThreadsLock.unlock();
-
_binder.closeAll(_phy);
_phy.close(_localControlSocket4);
_phy.close(_localControlSocket6);
@@ -673,13 +628,6 @@ public:
curl_global_cleanup();
#endif
- _incomingPacketMemoryPoolLock.lock();
- while (!_incomingPacketMemoryPool.empty()) {
- delete _incomingPacketMemoryPool.back();
- _incomingPacketMemoryPool.pop_back();
- }
- _incomingPacketMemoryPoolLock.unlock();
-
#ifdef ZT_USE_MINIUPNPC
delete _portMapper;
#endif
@@ -1917,24 +1865,15 @@ public:
const uint64_t now = OSUtils::now();
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
_lastDirectReceiveFromGlobal = now;
-
- OneServiceIncomingPacket *pkt;
- _incomingPacketMemoryPoolLock.lock();
- if (_incomingPacketMemoryPool.empty()) {
- pkt = new OneServiceIncomingPacket;
- } else {
- pkt = _incomingPacketMemoryPool.back();
- _incomingPacketMemoryPool.pop_back();
+ const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast<int64_t>(sock),reinterpret_cast<const struct sockaddr_storage *>(from),data,len,&_nextBackgroundTaskDeadline);
+ if (ZT_ResultCode_isFatal(rc)) {
+ char tmp[256];
+ OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
+ Mutex::Lock _l(_termReason_m);
+ _termReason = ONE_UNRECOVERABLE_ERROR;
+ _fatalErrorMessage = tmp;
+ this->terminate();
}
- _incomingPacketMemoryPoolLock.unlock();
-
- pkt->now = now;
- pkt->sock = reinterpret_cast<int64_t>(sock);
- memcpy(&(pkt->from),from,sizeof(struct sockaddr_storage));
- pkt->size = (unsigned int)len;
- memcpy(pkt->data,data,len);
-
- _incomingPacketQueue.postLimit(pkt,16 * _incomingPacketConcurrency);
}
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)