summaryrefslogtreecommitdiff
path: root/node/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/Cluster.cpp')
-rw-r--r--node/Cluster.cpp226
1 files changed, 180 insertions, 46 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index ab074b6d..0859c8d3 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -33,8 +33,12 @@
#include <string.h>
#include <math.h>
+#include <map>
#include <algorithm>
+#include <set>
#include <utility>
+#include <list>
+#include <stdexcept>
#include "../version.h"
@@ -49,6 +53,34 @@
#include "Packet.hpp"
#include "Switch.hpp"
#include "Node.hpp"
+#include "Array.hpp"
+
+/**
+ * Chunk size for allocating queue entries
+ *
+ * Queue entries are allocated in chunks of this many and are added to a pool.
+ * ZT_CLUSTER_MAX_QUEUE_GLOBAL must be evenly divisible by this.
+ */
+#define ZT_CLUSTER_QUEUE_CHUNK_SIZE 32
+
+/**
+ * Maximum number of chunks to ever allocate
+ *
+ * This is a global sanity limit to prevent resource exhaustion attacks. It
+ * works out to about 600mb of RAM. You'll never see this on a normal edge
+ * node. We're unlikely to see this on a root server unless someone is DOSing
+ * us. In that case cluster relaying will be affected but other functions
+ * should continue to operate normally.
+ */
+#define ZT_CLUSTER_MAX_QUEUE_CHUNKS 8194
+
+/**
+ * Max data per queue entry
+ *
+ * If we ever support larger transport MTUs this must be increased. The plus
+ * 16 is just a small margin and has no special meaning.
+ */
+#define ZT_CLUSTER_SEND_QUEUE_DATA_MAX (ZT_UDP_DEFAULT_PAYLOAD_MTU + 16)
namespace ZeroTier {
@@ -61,6 +93,137 @@ static inline double _dist3d(int x1,int y1,int z1,int x2,int y2,int z2)
return sqrt((dx * dx) + (dy * dy) + (dz * dz));
}
+// An entry in _ClusterSendQueue
+struct _ClusterSendQueueEntry
+{
+ uint64_t timestamp;
+ Address fromPeerAddress;
+ Address toPeerAddress;
+ // if we ever support larger transport MTUs this must be increased
+ unsigned char data[ZT_CLUSTER_SEND_QUEUE_DATA_MAX];
+ unsigned int len;
+ bool unite;
+};
+
+// A multi-index map with entry memory pooling -- this allows our queue to
+// be O(log(N)) and is complex enough that it makes the code a lot cleaner
+// to break it out from Cluster.
+class _ClusterSendQueue
+{
+public:
+ _ClusterSendQueue() :
+ _poolCount(0)
+ {
+ }
+
+ ~_ClusterSendQueue() {} // memory is automatically freed when _chunks is destroyed
+
+ inline void enqueue(uint64_t ts,const Address &from,const Address &to,const void *data,unsigned int len,bool unite)
+ {
+ if (len > ZT_CLUSTER_SEND_QUEUE_DATA_MAX)
+ return;
+
+ Mutex::Lock _l(_lock);
+
+ // Delete oldest queue entry if sender has too many queued packets
+ {
+ std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(from,(_ClusterSendQueueEntry *)0)));
+ std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator oldest(_bySrc.end());
+ unsigned long countForSender = 0;
+ while ((qi != _bySrc.end())&&(qi->first == from)) {
+ if (++countForSender > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) {
+ _byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(oldest->second->toPeerAddress,oldest->second));
+ _pool[_poolCount++] = oldest->second;
+ _bySrc.erase(oldest);
+ break;
+ } else if (oldest == _bySrc.end())
+ oldest = qi;
+ ++qi;
+ }
+ }
+
+ _ClusterSendQueueEntry *e;
+ if (_poolCount > 0) {
+ e = _pool[--_poolCount];
+ } else {
+ if (_chunks.size() >= ZT_CLUSTER_MAX_QUEUE_CHUNKS)
+ return; // queue is totally full!
+ _chunks.push_back(Array<_ClusterSendQueueEntry,ZT_CLUSTER_QUEUE_CHUNK_SIZE>());
+ e = &(_chunks.back().data[0]);
+ for(unsigned int i=1;i<ZT_CLUSTER_QUEUE_CHUNK_SIZE;++i)
+ _pool[_poolCount++] = &(_chunks.back().data[i]);
+ }
+
+ e->timestamp = ts;
+ e->fromPeerAddress = from;
+ e->toPeerAddress = to;
+ memcpy(e->data,data,len);
+ e->len = len;
+ e->unite = unite;
+
+ _bySrc.insert(std::pair<Address,_ClusterSendQueueEntry *>(from,e));
+ _byDest.insert(std::pair<Address,_ClusterSendQueueEntry *>(to,e));
+ }
+
+ inline void expire(uint64_t now)
+ {
+ Mutex::Lock _l(_lock);
+ for(std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.begin());qi!=_bySrc.end();) {
+ if ((now - qi->second->timestamp) > ZT_CLUSTER_QUEUE_EXPIRATION) {
+ _byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(qi->second->toPeerAddress,qi->second));
+ _pool[_poolCount++] = qi->second;
+ _bySrc.erase(qi++);
+ } else ++qi;
+ }
+ }
+
+ /**
+ * Get and dequeue entries for a given destination address
+ *
+ * After use these entries must be returned with returnToPool()!
+ *
+ * @param dest Destination address
+ * @param results Array to fill with results
+ * @param maxResults Size of results[] in pointers
+ * @return Number of actual results returned
+ */
+ inline unsigned int getByDest(const Address &dest,_ClusterSendQueueEntry **results,unsigned int maxResults)
+ {
+ unsigned int count = 0;
+ Mutex::Lock _l(_lock);
+ std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_byDest.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(dest,(_ClusterSendQueueEntry *)0)));
+ while ((qi != _byDest.end())&&(qi->first == dest)) {
+ _bySrc.erase(std::pair<Address,_ClusterSendQueueEntry *>(qi->second->fromPeerAddress,qi->second));
+ results[count++] = qi->second;
+ if (count == maxResults)
+ break;
+ _byDest.erase(qi++);
+ }
+ return count;
+ }
+
+ /**
+ * Return entries to pool after use
+ *
+ * @param entries Array of entries
+ * @param count Number of entries
+ */
+ inline void returnToPool(_ClusterSendQueueEntry **entries,unsigned int count)
+ {
+ Mutex::Lock _l(_lock);
+ for(unsigned int i=0;i<count;++i)
+ _pool[_poolCount++] = entries[i];
+ }
+
+private:
+ std::list< Array<_ClusterSendQueueEntry,ZT_CLUSTER_QUEUE_CHUNK_SIZE> > _chunks;
+ _ClusterSendQueueEntry *_pool[ZT_CLUSTER_QUEUE_CHUNK_SIZE * ZT_CLUSTER_MAX_QUEUE_CHUNKS];
+ unsigned long _poolCount;
+ std::set< std::pair<Address,_ClusterSendQueueEntry *> > _bySrc;
+ std::set< std::pair<Address,_ClusterSendQueueEntry *> > _byDest;
+ Mutex _lock;
+};
+
Cluster::Cluster(
const RuntimeEnvironment *renv,
uint16_t id,
@@ -73,6 +236,7 @@ Cluster::Cluster(
int (*addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *),
void *addressToLocationFunctionArg) :
RR(renv),
+ _sendQueue(new _ClusterSendQueue()),
_sendFunction(sendFunction),
_sendFunctionArg(sendFunctionArg),
_addressToLocationFunction(addressToLocationFunction),
@@ -84,7 +248,8 @@ Cluster::Cluster(
_zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints),
_members(new _Member[ZT_CLUSTER_MAX_MEMBERS]),
_lastFlushed(0),
- _lastCleanedRemotePeers(0)
+ _lastCleanedRemotePeers(0),
+ _lastCleanedQueue(0)
{
uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
@@ -105,6 +270,7 @@ Cluster::~Cluster()
Utils::burn(_masterSecret,sizeof(_masterSecret));
Utils::burn(_key,sizeof(_key));
delete [] _members;
+ delete _sendQueue;
}
void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
@@ -220,19 +386,13 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
_remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)] = RR->node->now();
}
- std::list<_SQE> q;
- {
- Mutex::Lock _l(_sendViaClusterQueue_m);
- for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) {
- if (qi->toPeerAddress == id.address())
- q.splice(q.end(),_sendViaClusterQueue,qi++);
- else ++qi;
- }
- }
- for(std::list<_SQE>::iterator qi(q.begin());qi!=q.end();++qi)
- this->sendViaCluster(id.address(),qi->toPeerAddress,qi->data,qi->len,qi->unite);
+ _ClusterSendQueueEntry *q[16384]; // 16384 is "tons"
+ unsigned int qc = _sendQueue->getByDest(id.address(),q,16384);
+ for(unsigned int i=0;i<qc;++i)
+ this->sendViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite);
+ _sendQueue->returnToPool(q,qc);
- TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),(unsigned int)q.size());
+ TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),qc);
}
} break;
@@ -244,7 +404,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
peer->identity().serialize(buf);
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size());
- _flush(fromMemberId);
}
} break;
@@ -333,7 +492,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
{
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
- _flush(fromMemberId);
}
RR->sw->send(rendezvousForLocal,true,0);
}
@@ -379,19 +537,6 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check
return;
- unsigned int queueCount = 0;
- {
- Mutex::Lock _l(_sendViaClusterQueue_m);
- for(std::list<_SQE>::const_iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();++qi) {
- if (qi->fromPeerAddress == fromPeerAddress) {
- if (++queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) {
- TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
- return;
- }
- }
- }
- }
-
const uint64_t now = RR->node->now();
uint64_t mostRecentTs = 0;
@@ -423,8 +568,6 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
- if ((enqueueAndWait)&&(queueCount == 0))
- _flush(*mid);
}
}
@@ -432,8 +575,7 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
// later and return after having broadcasted a WANT_PEER.
if (enqueueAndWait) {
TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
- Mutex::Lock _l(_sendViaClusterQueue_m);
- _sendViaClusterQueue.push_back(_SQE(now,fromPeerAddress,toPeerAddress,data,len,unite));
+ _sendQueue->enqueue(now,fromPeerAddress,toPeerAddress,data,len,unite);
return;
}
}
@@ -464,10 +606,8 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
{
Mutex::Lock _l2(_members[mostRecentMemberId].lock);
- if (buf.size() > 0) {
+ if (buf.size() > 0)
_send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
- _flush(mostRecentMemberId);
- }
if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0) {
TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
RR->node->putPacket(InetAddress(),_members[mostRecentMemberId].zeroTierPhysicalEndpoints.front(),data,len);
@@ -484,7 +624,6 @@ void Cluster::sendDistributedQuery(const Packet &pkt)
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,CLUSTER_MESSAGE_REMOTE_PACKET,buf.data(),buf.size());
- _flush(*mid);
}
}
@@ -495,15 +634,6 @@ void Cluster::doPeriodicTasks()
if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) {
_lastFlushed = now;
- {
- Mutex::Lock _l2(_sendViaClusterQueue_m);
- for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) {
- if ((now - qi->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION)
- _sendViaClusterQueue.erase(qi++);
- else ++qi;
- }
- }
-
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
@@ -549,6 +679,11 @@ void Cluster::doPeriodicTasks()
else ++rp;
}
}
+
+ if ((now - _lastCleanedQueue) >= ZT_CLUSTER_QUEUE_EXPIRATION) {
+ _lastCleanedQueue = now;
+ _sendQueue->expire(now);
+ }
}
void Cluster::addMember(uint16_t memberId)
@@ -768,7 +903,6 @@ void Cluster::_doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep)
TRACE("responding to remote WHOIS from %s @ %u with identity of %s",remotep.source().toString().c_str(),(unsigned int)fromMemberId,queried.address().toString().c_str());
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
- _flush(fromMemberId);
}
}
}