diff options
Diffstat (limited to 'node/Cluster.cpp')
-rw-r--r-- | node/Cluster.cpp | 226 |
1 files changed, 180 insertions, 46 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp index ab074b6d..0859c8d3 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -33,8 +33,12 @@ #include <string.h> #include <math.h> +#include <map> #include <algorithm> +#include <set> #include <utility> +#include <list> +#include <stdexcept> #include "../version.h" @@ -49,6 +53,34 @@ #include "Packet.hpp" #include "Switch.hpp" #include "Node.hpp" +#include "Array.hpp" + +/** + * Chunk size for allocating queue entries + * + * Queue entries are allocated in chunks of this many and are added to a pool. + * ZT_CLUSTER_MAX_QUEUE_GLOBAL must be evenly divisible by this. + */ +#define ZT_CLUSTER_QUEUE_CHUNK_SIZE 32 + +/** + * Maximum number of chunks to ever allocate + * + * This is a global sanity limit to prevent resource exhaustion attacks. It + * works out to about 600mb of RAM. You'll never see this on a normal edge + * node. We're unlikely to see this on a root server unless someone is DOSing + * us. In that case cluster relaying will be affected but other functions + * should continue to operate normally. + */ +#define ZT_CLUSTER_MAX_QUEUE_CHUNKS 8194 + +/** + * Max data per queue entry + * + * If we ever support larger transport MTUs this must be increased. The plus + * 16 is just a small margin and has no special meaning. + */ +#define ZT_CLUSTER_SEND_QUEUE_DATA_MAX (ZT_UDP_DEFAULT_PAYLOAD_MTU + 16) namespace ZeroTier { @@ -61,6 +93,137 @@ static inline double _dist3d(int x1,int y1,int z1,int x2,int y2,int z2) return sqrt((dx * dx) + (dy * dy) + (dz * dz)); } +// An entry in _ClusterSendQueue +struct _ClusterSendQueueEntry +{ + uint64_t timestamp; + Address fromPeerAddress; + Address toPeerAddress; + // if we ever support larger transport MTUs this must be increased + unsigned char data[ZT_CLUSTER_SEND_QUEUE_DATA_MAX]; + unsigned int len; + bool unite; +}; + +// A multi-index map with entry memory pooling -- this allows our queue to +// be O(log(N)) and is complex enough that it makes the code a lot cleaner +// to break it out from Cluster. +class _ClusterSendQueue +{ +public: + _ClusterSendQueue() : + _poolCount(0) + { + } + + ~_ClusterSendQueue() {} // memory is automatically freed when _chunks is destroyed + + inline void enqueue(uint64_t ts,const Address &from,const Address &to,const void *data,unsigned int len,bool unite) + { + if (len > ZT_CLUSTER_SEND_QUEUE_DATA_MAX) + return; + + Mutex::Lock _l(_lock); + + // Delete oldest queue entry if sender has too many queued packets + { + std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(from,(_ClusterSendQueueEntry *)0))); + std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator oldest(_bySrc.end()); + unsigned long countForSender = 0; + while ((qi != _bySrc.end())&&(qi->first == from)) { + if (++countForSender > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) { + _byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(oldest->second->toPeerAddress,oldest->second)); + _pool[_poolCount++] = oldest->second; + _bySrc.erase(oldest); + break; + } else if (oldest == _bySrc.end()) + oldest = qi; + ++qi; + } + } + + _ClusterSendQueueEntry *e; + if (_poolCount > 0) { + e = _pool[--_poolCount]; + } else { + if (_chunks.size() >= ZT_CLUSTER_MAX_QUEUE_CHUNKS) + return; // queue is totally full! + _chunks.push_back(Array<_ClusterSendQueueEntry,ZT_CLUSTER_QUEUE_CHUNK_SIZE>()); + e = &(_chunks.back().data[0]); + for(unsigned int i=1;i<ZT_CLUSTER_QUEUE_CHUNK_SIZE;++i) + _pool[_poolCount++] = &(_chunks.back().data[i]); + } + + e->timestamp = ts; + e->fromPeerAddress = from; + e->toPeerAddress = to; + memcpy(e->data,data,len); + e->len = len; + e->unite = unite; + + _bySrc.insert(std::pair<Address,_ClusterSendQueueEntry *>(from,e)); + _byDest.insert(std::pair<Address,_ClusterSendQueueEntry *>(to,e)); + } + + inline void expire(uint64_t now) + { + Mutex::Lock _l(_lock); + for(std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.begin());qi!=_bySrc.end();) { + if ((now - qi->second->timestamp) > ZT_CLUSTER_QUEUE_EXPIRATION) { + _byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(qi->second->toPeerAddress,qi->second)); + _pool[_poolCount++] = qi->second; + _bySrc.erase(qi++); + } else ++qi; + } + } + + /** + * Get and dequeue entries for a given destination address + * + * After use these entries must be returned with returnToPool()! + * + * @param dest Destination address + * @param results Array to fill with results + * @param maxResults Size of results[] in pointers + * @return Number of actual results returned + */ + inline unsigned int getByDest(const Address &dest,_ClusterSendQueueEntry **results,unsigned int maxResults) + { + unsigned int count = 0; + Mutex::Lock _l(_lock); + std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_byDest.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(dest,(_ClusterSendQueueEntry *)0))); + while ((qi != _byDest.end())&&(qi->first == dest)) { + _bySrc.erase(std::pair<Address,_ClusterSendQueueEntry *>(qi->second->fromPeerAddress,qi->second)); + results[count++] = qi->second; + if (count == maxResults) + break; + _byDest.erase(qi++); + } + return count; + } + + /** + * Return entries to pool after use + * + * @param entries Array of entries + * @param count Number of entries + */ + inline void returnToPool(_ClusterSendQueueEntry **entries,unsigned int count) + { + Mutex::Lock _l(_lock); + for(unsigned int i=0;i<count;++i) + _pool[_poolCount++] = entries[i]; + } + +private: + std::list< Array<_ClusterSendQueueEntry,ZT_CLUSTER_QUEUE_CHUNK_SIZE> > _chunks; + _ClusterSendQueueEntry *_pool[ZT_CLUSTER_QUEUE_CHUNK_SIZE * ZT_CLUSTER_MAX_QUEUE_CHUNKS]; + unsigned long _poolCount; + std::set< std::pair<Address,_ClusterSendQueueEntry *> > _bySrc; + std::set< std::pair<Address,_ClusterSendQueueEntry *> > _byDest; + Mutex _lock; +}; + Cluster::Cluster( const RuntimeEnvironment *renv, uint16_t id, @@ -73,6 +236,7 @@ Cluster::Cluster( int (*addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *), void *addressToLocationFunctionArg) : RR(renv), + _sendQueue(new _ClusterSendQueue()), _sendFunction(sendFunction), _sendFunctionArg(sendFunctionArg), _addressToLocationFunction(addressToLocationFunction), @@ -84,7 +248,8 @@ Cluster::Cluster( _zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints), _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]), _lastFlushed(0), - _lastCleanedRemotePeers(0) + _lastCleanedRemotePeers(0), + _lastCleanedQueue(0) { uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; @@ -105,6 +270,7 @@ Cluster::~Cluster() Utils::burn(_masterSecret,sizeof(_masterSecret)); Utils::burn(_key,sizeof(_key)); delete [] _members; + delete _sendQueue; } void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) @@ -220,19 +386,13 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) _remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)] = RR->node->now(); } - std::list<_SQE> q; - { - Mutex::Lock _l(_sendViaClusterQueue_m); - for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { - if (qi->toPeerAddress == id.address()) - q.splice(q.end(),_sendViaClusterQueue,qi++); - else ++qi; - } - } - for(std::list<_SQE>::iterator qi(q.begin());qi!=q.end();++qi) - this->sendViaCluster(id.address(),qi->toPeerAddress,qi->data,qi->len,qi->unite); + _ClusterSendQueueEntry *q[16384]; // 16384 is "tons" + unsigned int qc = _sendQueue->getByDest(id.address(),q,16384); + for(unsigned int i=0;i<qc;++i) + this->sendViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite); + _sendQueue->returnToPool(q,qc); - TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),(unsigned int)q.size()); + TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),qc); } } break; @@ -244,7 +404,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) peer->identity().serialize(buf); Mutex::Lock _l2(_members[fromMemberId].lock); _send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size()); - _flush(fromMemberId); } } break; @@ -333,7 +492,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) { Mutex::Lock _l2(_members[fromMemberId].lock); _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); - _flush(fromMemberId); } RR->sw->send(rendezvousForLocal,true,0); } @@ -379,19 +537,6 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check return; - unsigned int queueCount = 0; - { - Mutex::Lock _l(_sendViaClusterQueue_m); - for(std::list<_SQE>::const_iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();++qi) { - if (qi->fromPeerAddress == fromPeerAddress) { - if (++queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) { - TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); - return; - } - } - } - } - const uint64_t now = RR->node->now(); uint64_t mostRecentTs = 0; @@ -423,8 +568,6 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { Mutex::Lock _l2(_members[*mid].lock); _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); - if ((enqueueAndWait)&&(queueCount == 0)) - _flush(*mid); } } @@ -432,8 +575,7 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee // later and return after having broadcasted a WANT_PEER. if (enqueueAndWait) { TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); - Mutex::Lock _l(_sendViaClusterQueue_m); - _sendViaClusterQueue.push_back(_SQE(now,fromPeerAddress,toPeerAddress,data,len,unite)); + _sendQueue->enqueue(now,fromPeerAddress,toPeerAddress,data,len,unite); return; } } @@ -464,10 +606,8 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee { Mutex::Lock _l2(_members[mostRecentMemberId].lock); - if (buf.size() > 0) { + if (buf.size() > 0) _send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); - _flush(mostRecentMemberId); - } if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0) { TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId); RR->node->putPacket(InetAddress(),_members[mostRecentMemberId].zeroTierPhysicalEndpoints.front(),data,len); @@ -484,7 +624,6 @@ void Cluster::sendDistributedQuery(const Packet &pkt) for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { Mutex::Lock _l2(_members[*mid].lock); _send(*mid,CLUSTER_MESSAGE_REMOTE_PACKET,buf.data(),buf.size()); - _flush(*mid); } } @@ -495,15 +634,6 @@ void Cluster::doPeriodicTasks() if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) { _lastFlushed = now; - { - Mutex::Lock _l2(_sendViaClusterQueue_m); - for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { - if ((now - qi->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION) - _sendViaClusterQueue.erase(qi++); - else ++qi; - } - } - Mutex::Lock _l(_memberIds_m); for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { Mutex::Lock _l2(_members[*mid].lock); @@ -549,6 +679,11 @@ void Cluster::doPeriodicTasks() else ++rp; } } + + if ((now - _lastCleanedQueue) >= ZT_CLUSTER_QUEUE_EXPIRATION) { + _lastCleanedQueue = now; + _sendQueue->expire(now); + } } void Cluster::addMember(uint16_t memberId) @@ -768,7 +903,6 @@ void Cluster::_doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep) TRACE("responding to remote WHOIS from %s @ %u with identity of %s",remotep.source().toString().c_str(),(unsigned int)fromMemberId,queried.address().toString().c_str()); Mutex::Lock _l2(_members[fromMemberId].lock); _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size()); - _flush(fromMemberId); } } } |