diff options
Diffstat (limited to 'node/Cluster.cpp')
| -rw-r--r-- | node/Cluster.cpp | 226 | 
1 files changed, 180 insertions, 46 deletions
| diff --git a/node/Cluster.cpp b/node/Cluster.cpp index ab074b6d..0859c8d3 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -33,8 +33,12 @@  #include <string.h>  #include <math.h> +#include <map>  #include <algorithm> +#include <set>  #include <utility> +#include <list> +#include <stdexcept>  #include "../version.h" @@ -49,6 +53,34 @@  #include "Packet.hpp"  #include "Switch.hpp"  #include "Node.hpp" +#include "Array.hpp" + +/** + * Chunk size for allocating queue entries + * + * Queue entries are allocated in chunks of this many and are added to a pool. + * ZT_CLUSTER_MAX_QUEUE_GLOBAL must be evenly divisible by this. + */ +#define ZT_CLUSTER_QUEUE_CHUNK_SIZE 32 + +/** + * Maximum number of chunks to ever allocate + * + * This is a global sanity limit to prevent resource exhaustion attacks. It + * works out to about 600mb of RAM. You'll never see this on a normal edge + * node. We're unlikely to see this on a root server unless someone is DOSing + * us. In that case cluster relaying will be affected but other functions + * should continue to operate normally. + */ +#define ZT_CLUSTER_MAX_QUEUE_CHUNKS 8194 + +/** + * Max data per queue entry + * + * If we ever support larger transport MTUs this must be increased. The plus + * 16 is just a small margin and has no special meaning. + */ +#define ZT_CLUSTER_SEND_QUEUE_DATA_MAX (ZT_UDP_DEFAULT_PAYLOAD_MTU + 16)  namespace ZeroTier { @@ -61,6 +93,137 @@ static inline double _dist3d(int x1,int y1,int z1,int x2,int y2,int z2)  	return sqrt((dx * dx) + (dy * dy) + (dz * dz));  } +// An entry in _ClusterSendQueue +struct _ClusterSendQueueEntry +{ +	uint64_t timestamp; +	Address fromPeerAddress; +	Address toPeerAddress; +	// if we ever support larger transport MTUs this must be increased +	unsigned char data[ZT_CLUSTER_SEND_QUEUE_DATA_MAX]; +	unsigned int len; +	bool unite; +}; + +// A multi-index map with entry memory pooling -- this allows our queue to +// be O(log(N)) and is complex enough that it makes the code a lot cleaner +// to break it out from Cluster. +class _ClusterSendQueue +{ +public: +	_ClusterSendQueue() : +		_poolCount(0) +	{ +	} + +	~_ClusterSendQueue() {} // memory is automatically freed when _chunks is destroyed + +	inline void enqueue(uint64_t ts,const Address &from,const Address &to,const void *data,unsigned int len,bool unite) +	{ +		if (len > ZT_CLUSTER_SEND_QUEUE_DATA_MAX) +			return; + +		Mutex::Lock _l(_lock); + +		// Delete oldest queue entry if sender has too many queued packets +		{ +			std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(from,(_ClusterSendQueueEntry *)0))); +			std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator oldest(_bySrc.end()); +			unsigned long countForSender = 0; +			while ((qi != _bySrc.end())&&(qi->first == from)) { +				if (++countForSender > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) { +					_byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(oldest->second->toPeerAddress,oldest->second)); +					_pool[_poolCount++] = oldest->second; +					_bySrc.erase(oldest); +					break; +				} else if (oldest == _bySrc.end()) +					oldest = qi; +				++qi; +			} +		} + +		_ClusterSendQueueEntry *e; +		if (_poolCount > 0) { +			e = _pool[--_poolCount]; +		} else { +			if (_chunks.size() >= ZT_CLUSTER_MAX_QUEUE_CHUNKS) +				return; // queue is totally full! +			_chunks.push_back(Array<_ClusterSendQueueEntry,ZT_CLUSTER_QUEUE_CHUNK_SIZE>()); +			e = &(_chunks.back().data[0]); +			for(unsigned int i=1;i<ZT_CLUSTER_QUEUE_CHUNK_SIZE;++i) +				_pool[_poolCount++] = &(_chunks.back().data[i]); +		} + +		e->timestamp = ts; +		e->fromPeerAddress = from; +		e->toPeerAddress = to; +		memcpy(e->data,data,len); +		e->len = len; +		e->unite = unite; + +		_bySrc.insert(std::pair<Address,_ClusterSendQueueEntry *>(from,e)); +		_byDest.insert(std::pair<Address,_ClusterSendQueueEntry *>(to,e)); +	} + +	inline void expire(uint64_t now) +	{ +		Mutex::Lock _l(_lock); +		for(std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.begin());qi!=_bySrc.end();) { +			if ((now - qi->second->timestamp) > ZT_CLUSTER_QUEUE_EXPIRATION) { +				_byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(qi->second->toPeerAddress,qi->second)); +				_pool[_poolCount++] = qi->second; +				_bySrc.erase(qi++); +			} else ++qi; +		} +	} + +	/** +	 * Get and dequeue entries for a given destination address +	 * +	 * After use these entries must be returned with returnToPool()! +	 * +	 * @param dest Destination address +	 * @param results Array to fill with results +	 * @param maxResults Size of results[] in pointers +	 * @return Number of actual results returned +	 */ +	inline unsigned int getByDest(const Address &dest,_ClusterSendQueueEntry **results,unsigned int maxResults) +	{ +		unsigned int count = 0; +		Mutex::Lock _l(_lock); +		std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_byDest.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(dest,(_ClusterSendQueueEntry *)0))); +		while ((qi != _byDest.end())&&(qi->first == dest)) { +			_bySrc.erase(std::pair<Address,_ClusterSendQueueEntry *>(qi->second->fromPeerAddress,qi->second)); +			results[count++] = qi->second; +			if (count == maxResults) +				break; +			_byDest.erase(qi++); +		} +		return count; +	} + +	/** +	 * Return entries to pool after use +	 * +	 * @param entries Array of entries +	 * @param count Number of entries +	 */ +	inline void returnToPool(_ClusterSendQueueEntry **entries,unsigned int count) +	{ +		Mutex::Lock _l(_lock); +		for(unsigned int i=0;i<count;++i) +			_pool[_poolCount++] = entries[i]; +	} + +private: +	std::list< Array<_ClusterSendQueueEntry,ZT_CLUSTER_QUEUE_CHUNK_SIZE> > _chunks; +	_ClusterSendQueueEntry *_pool[ZT_CLUSTER_QUEUE_CHUNK_SIZE * ZT_CLUSTER_MAX_QUEUE_CHUNKS]; +	unsigned long _poolCount; +	std::set< std::pair<Address,_ClusterSendQueueEntry *> > _bySrc; +	std::set< std::pair<Address,_ClusterSendQueueEntry *> > _byDest; +	Mutex _lock; +}; +  Cluster::Cluster(  	const RuntimeEnvironment *renv,  	uint16_t id, @@ -73,6 +236,7 @@ Cluster::Cluster(  	int (*addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *),  	void *addressToLocationFunctionArg) :  	RR(renv), +	_sendQueue(new _ClusterSendQueue()),  	_sendFunction(sendFunction),  	_sendFunctionArg(sendFunctionArg),  	_addressToLocationFunction(addressToLocationFunction), @@ -84,7 +248,8 @@ Cluster::Cluster(  	_zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints),  	_members(new _Member[ZT_CLUSTER_MAX_MEMBERS]),  	_lastFlushed(0), -	_lastCleanedRemotePeers(0) +	_lastCleanedRemotePeers(0), +	_lastCleanedQueue(0)  {  	uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; @@ -105,6 +270,7 @@ Cluster::~Cluster()  	Utils::burn(_masterSecret,sizeof(_masterSecret));  	Utils::burn(_key,sizeof(_key));  	delete [] _members; +	delete _sendQueue;  }  void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) @@ -220,19 +386,13 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)  								_remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)] = RR->node->now();  							} -							std::list<_SQE> q; -							{ -								Mutex::Lock _l(_sendViaClusterQueue_m); -								for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { -									if (qi->toPeerAddress == id.address()) -										q.splice(q.end(),_sendViaClusterQueue,qi++); -									else ++qi; -								} -							} -							for(std::list<_SQE>::iterator qi(q.begin());qi!=q.end();++qi) -								this->sendViaCluster(id.address(),qi->toPeerAddress,qi->data,qi->len,qi->unite); +							_ClusterSendQueueEntry *q[16384]; // 16384 is "tons" +							unsigned int qc = _sendQueue->getByDest(id.address(),q,16384); +							for(unsigned int i=0;i<qc;++i) +								this->sendViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite); +							_sendQueue->returnToPool(q,qc); -							TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),(unsigned int)q.size()); +							TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),qc);  						}  					}	break; @@ -244,7 +404,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)  							peer->identity().serialize(buf);  							Mutex::Lock _l2(_members[fromMemberId].lock);  							_send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size()); -							_flush(fromMemberId);  						}  					}	break; @@ -333,7 +492,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)  								{  									Mutex::Lock _l2(_members[fromMemberId].lock);  									_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); -									_flush(fromMemberId);  								}  								RR->sw->send(rendezvousForLocal,true,0);  							} @@ -379,19 +537,6 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee  	if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check  		return; -	unsigned int queueCount = 0; -	{ -		Mutex::Lock _l(_sendViaClusterQueue_m); -		for(std::list<_SQE>::const_iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();++qi) { -			if (qi->fromPeerAddress == fromPeerAddress) { -				if (++queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) { -					TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); -					return; -				} -			} -		} -	} -  	const uint64_t now = RR->node->now();  	uint64_t mostRecentTs = 0; @@ -423,8 +568,6 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee  			for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {  				Mutex::Lock _l2(_members[*mid].lock);  				_send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); -				if ((enqueueAndWait)&&(queueCount == 0)) -					_flush(*mid);  			}  		} @@ -432,8 +575,7 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee  		// later and return after having broadcasted a WANT_PEER.  		if (enqueueAndWait) {  			TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); -			Mutex::Lock _l(_sendViaClusterQueue_m); -			_sendViaClusterQueue.push_back(_SQE(now,fromPeerAddress,toPeerAddress,data,len,unite)); +			_sendQueue->enqueue(now,fromPeerAddress,toPeerAddress,data,len,unite);  			return;  		}  	} @@ -464,10 +606,8 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee  	{  		Mutex::Lock _l2(_members[mostRecentMemberId].lock); -		if (buf.size() > 0) { +		if (buf.size() > 0)  			_send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); -			_flush(mostRecentMemberId); -		}  		if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0) {  			TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);  			RR->node->putPacket(InetAddress(),_members[mostRecentMemberId].zeroTierPhysicalEndpoints.front(),data,len); @@ -484,7 +624,6 @@ void Cluster::sendDistributedQuery(const Packet &pkt)  	for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {  		Mutex::Lock _l2(_members[*mid].lock);  		_send(*mid,CLUSTER_MESSAGE_REMOTE_PACKET,buf.data(),buf.size()); -		_flush(*mid);  	}  } @@ -495,15 +634,6 @@ void Cluster::doPeriodicTasks()  	if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) {  		_lastFlushed = now; -		{ -			Mutex::Lock _l2(_sendViaClusterQueue_m); -			for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { -				if ((now - qi->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION) -					_sendViaClusterQueue.erase(qi++); -				else ++qi; -			} -		} -  		Mutex::Lock _l(_memberIds_m);  		for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {  			Mutex::Lock _l2(_members[*mid].lock); @@ -549,6 +679,11 @@ void Cluster::doPeriodicTasks()  			else ++rp;  		}  	} + +	if ((now - _lastCleanedQueue) >= ZT_CLUSTER_QUEUE_EXPIRATION) { +		_lastCleanedQueue = now; +		_sendQueue->expire(now); +	}  }  void Cluster::addMember(uint16_t memberId) @@ -768,7 +903,6 @@ void Cluster::_doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep)  			TRACE("responding to remote WHOIS from %s @ %u with identity of %s",remotep.source().toString().c_str(),(unsigned int)fromMemberId,queried.address().toString().c_str());  			Mutex::Lock _l2(_members[fromMemberId].lock);  			_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size()); -			_flush(fromMemberId);  		}  	}  } | 
