diff options
| author | Grant Limberg <grant.limberg@zerotier.com> | 2018-11-13 16:00:17 -0800 | 
|---|---|---|
| committer | Grant Limberg <grant.limberg@zerotier.com> | 2018-11-13 16:00:17 -0800 | 
| commit | 01e6df4d46df280b3ad2ae158cb87b5ee98f10d7 (patch) | |
| tree | 231f4ef754a3871456f72895a9ae4899bb4588b8 /service/OneService.cpp | |
| parent | 882b03436d7b40a788f060e0d83a4027f86e73ef (diff) | |
| parent | 690bd933d52c7dbbcddde7c0aff30f7fee91a6d9 (diff) | |
| download | infinitytier-01e6df4d46df280b3ad2ae158cb87b5ee98f10d7.tar.gz infinitytier-01e6df4d46df280b3ad2ae158cb87b5ee98f10d7.zip | |
Merge branch 'dev' of http://git.int.zerotier.com/ZeroTier/ZeroTierOne into dev
Diffstat (limited to 'service/OneService.cpp')
| -rw-r--r-- | service/OneService.cpp | 152 | 
1 files changed, 70 insertions, 82 deletions
| diff --git a/service/OneService.cpp b/service/OneService.cpp index effe90c2..1351cbfb 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -60,6 +60,7 @@  #include "../osdep/PortMapper.hpp"  #include "../osdep/Binder.hpp"  #include "../osdep/ManagedRoute.hpp" +#include "../osdep/BlockingQueue.hpp"  #include "OneService.hpp"  #include "SoftwareUpdater.hpp" @@ -174,9 +175,6 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }  // TCP activity timeout  #define ZT_TCP_ACTIVITY_TIMEOUT 60000 -// Number of receive path threads to start -#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 8 -  #if ZT_VAULT_SUPPORT  size_t curlResponseWrite(void *ptr, size_t size, size_t nmemb, std::string *data)  { @@ -440,6 +438,15 @@ struct TcpConnection  	Mutex writeq_m;  }; +struct OneServiceIncomingPacket +{ +	uint64_t now; +	int64_t sock; +	struct sockaddr_storage from; +	unsigned int size; +	uint8_t data[ZT_MAX_MTU]; +}; +  class OneServiceImpl : public OneService  {  public: @@ -465,17 +472,11 @@ public:  	unsigned int _tertiaryPort;  	volatile unsigned int _udpPortPickerCounter; -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE -	struct { -		uint8_t data[2048]; -		std::thread thr; -		int64_t sock; -		struct sockaddr_storage from; -		int size; -		std::condition_variable cond; -		std::mutex lock; -	} _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE]; -#endif +	unsigned long _incomingPacketConcurrency; +	std::vector<OneServiceIncomingPacket *> _incomingPacketMemoryPool; +	BlockingQueue<OneServiceIncomingPacket *> _incomingPacketQueue; +	std::vector<std::thread> _incomingPacketThreads; +	Mutex _incomingPacketMemoryPoolLock,_incomingPacketThreadsLock;  	// Local configuration and memo-ized information from it  	json _localConfig; @@ -606,37 +607,33 @@ public:  		_ports[1] = 0;  		_ports[2] = 0; -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE -		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { -			_incomingPacketWorker[tn].thr = std::thread([this,tn]() { -				std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock); +		_incomingPacketConcurrency = std::max((unsigned long)1,std::min((unsigned long)16,(unsigned long)std::thread::hardware_concurrency())); +		for(long t=0;t<_incomingPacketConcurrency;++t) { +			_incomingPacketThreads.push_back(std::thread([this]() { +				OneServiceIncomingPacket *pkt = nullptr;  				for(;;) { -					_incomingPacketWorker[tn].cond.wait(l); -					if (_incomingPacketWorker[tn].size < 0) { +					if (!_incomingPacketQueue.get(pkt)) +						break; +					if (!pkt) +						break; + +					const ZT_ResultCode rc = _node->processWirePacket(nullptr,pkt->now,pkt->sock,&(pkt->from),pkt->data,pkt->size,&_nextBackgroundTaskDeadline); +					{ +						Mutex::Lock l(_incomingPacketMemoryPoolLock); +						_incomingPacketMemoryPool.push_back(pkt); +					} +					if (ZT_ResultCode_isFatal(rc)) { +						char tmp[256]; +						OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); +						Mutex::Lock _l(_termReason_m); +						_termReason = ONE_UNRECOVERABLE_ERROR; +						_fatalErrorMessage = tmp; +						this->terminate();  						break; -					} else if (_incomingPacketWorker[tn].size > 0) { -						const ZT_ResultCode rc = _node->processWirePacket( -							(void *)0, -							OSUtils::now(), -							_incomingPacketWorker[tn].sock, -							&(_incomingPacketWorker[tn].from), -							_incomingPacketWorker[tn].data, -							(unsigned int)_incomingPacketWorker[tn].size, -							&_nextBackgroundTaskDeadline); -						if (ZT_ResultCode_isFatal(rc)) { -							char tmp[256]; -							OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); -							Mutex::Lock _l(_termReason_m); -							_termReason = ONE_UNRECOVERABLE_ERROR; -							_fatalErrorMessage = tmp; -							this->terminate(); -							break; -						}  					}  				} -			}); +			}));  		} -#endif  #if ZT_VAULT_SUPPORT  		curl_global_init(CURL_GLOBAL_DEFAULT); @@ -645,24 +642,27 @@ public:  	virtual ~OneServiceImpl()  	{ -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE -		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { -			_incomingPacketWorker[tn].lock.lock(); -			_incomingPacketWorker[tn].size = -1; -			_incomingPacketWorker[tn].lock.unlock(); -			_incomingPacketWorker[tn].cond.notify_all(); -		} -		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) { -			_incomingPacketWorker[tn].thr.join(); -		} -#endif +		_incomingPacketQueue.stop(); +		_incomingPacketThreadsLock.lock(); +		for(auto t=_incomingPacketThreads.begin();t!=_incomingPacketThreads.end();++t) +			t->join(); +		_incomingPacketThreadsLock.unlock(); +  		_binder.closeAll(_phy);  		_phy.close(_localControlSocket4);  		_phy.close(_localControlSocket6); +  #if ZT_VAULT_SUPPORT  		curl_global_cleanup();  #endif +		_incomingPacketMemoryPoolLock.lock(); +		while (!_incomingPacketMemoryPool.empty()) { +			delete _incomingPacketMemoryPool.back(); +			_incomingPacketMemoryPool.pop_back(); +		} +		_incomingPacketMemoryPoolLock.unlock(); +  #ifdef ZT_USE_MINIUPNPC  		delete _portMapper;  #endif @@ -1900,39 +1900,27 @@ public:  	inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)  	{ +		const uint64_t now = OSUtils::now();  		if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) -			_lastDirectReceiveFromGlobal = OSUtils::now(); -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE -		unsigned long cksum = 0; -		for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) { -			cksum += ((uint8_t *)from)[i]; -		} -		const unsigned long tn = cksum % ZT_INCOMING_PACKET_THREAD_POOL_SIZE; -		_incomingPacketWorker[tn].lock.lock(); -		memcpy(_incomingPacketWorker[tn].data,data,len); -		_incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock); -		memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); -		_incomingPacketWorker[tn].size = (int)len; -		_incomingPacketWorker[tn].lock.unlock(); -		_incomingPacketWorker[tn].cond.notify_all(); -#else -		const ZT_ResultCode rc = _node->processWirePacket( -			(void *)0, -			OSUtils::now(), -			reinterpret_cast<int64_t>(sock), -			reinterpret_cast<const struct sockaddr_storage *>(from), // Phy<> uses sockaddr_storage, so it'll always be that big -			data, -			len, -			&_nextBackgroundTaskDeadline); -		if (ZT_ResultCode_isFatal(rc)) { -			char tmp[256]; -			OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); -			Mutex::Lock _l(_termReason_m); -			_termReason = ONE_UNRECOVERABLE_ERROR; -			_fatalErrorMessage = tmp; -			this->terminate(); +			_lastDirectReceiveFromGlobal = now; + +		OneServiceIncomingPacket *pkt; +		_incomingPacketMemoryPoolLock.lock(); +		if (_incomingPacketMemoryPool.empty()) { +			pkt = new OneServiceIncomingPacket; +		} else { +			pkt = _incomingPacketMemoryPool.back(); +			_incomingPacketMemoryPool.pop_back();  		} -#endif +		_incomingPacketMemoryPoolLock.unlock(); + +		pkt->now = now; +		pkt->sock = reinterpret_cast<int64_t>(sock); +		ZT_FAST_MEMCPY(&(pkt->from),from,sizeof(struct sockaddr_storage)); +		pkt->size = (unsigned int)len; +		ZT_FAST_MEMCPY(pkt->data,data,len); + +		_incomingPacketQueue.postLimit(pkt,16 * _incomingPacketConcurrency);  	}  	inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) | 
