diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2015-11-09 12:24:49 -0800 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2015-11-09 12:24:49 -0800 |
commit | 73e2c6e5117aba36566c22edc1c19c8d6347d317 (patch) | |
tree | 5a3a1aaa9ba148623743293d737167e3748a97db /node | |
parent | 12cd9df059dc70bed3da6c2a1041329f995e2a9d (diff) | |
download | infinitytier-73e2c6e5117aba36566c22edc1c19c8d6347d317.tar.gz infinitytier-73e2c6e5117aba36566c22edc1c19c8d6347d317.zip |
How did that ever work?
Diffstat (limited to 'node')
-rw-r--r-- | node/Cluster.cpp | 42 | ||||
-rw-r--r-- | node/Cluster.hpp | 9 |
2 files changed, 25 insertions, 26 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp index a5c30e0f..ec115159 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -223,10 +223,10 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) std::list<_SQE> q; { Mutex::Lock _l(_sendViaClusterQueue_m); - std::map< Address,std::list<_SQE> >::iterator qe(_sendViaClusterQueue.find(id.address())); - if (qe != _sendViaClusterQueue.end()) { - q.swap(qe->second); // just swap ptr instead of copying - _sendViaClusterQueue.erase(qe); + for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { + if (qi->toPeerAddress == id.address()) + q.splice(q.end(),_sendViaClusterQueue,qi++); + else ++qi; } } for(std::list<_SQE>::iterator qi(q.begin());qi!=q.end();++qi) @@ -368,16 +368,17 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check return; - _sendViaClusterQueue_m.lock(); - unsigned long queueCount; + unsigned int queueCount = 0; { - std::map< Address,std::list<_SQE> >::const_iterator qe(_sendViaClusterQueue.find(fromPeerAddress)); - queueCount = (qe == _sendViaClusterQueue.end()) ? 0 : (unsigned long)qe->second.size(); - } - _sendViaClusterQueue_m.unlock(); - if (queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) { - TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); - return; + Mutex::Lock _l(_sendViaClusterQueue_m); + for(std::list<_SQE>::const_iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();++qi) { + if (qi->fromPeerAddress == fromPeerAddress) { + if (++queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) { + TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); + return; + } + } + } } const uint64_t now = RR->node->now(); @@ -386,9 +387,9 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee unsigned int mostRecentMemberId = 0xffffffff; { Mutex::Lock _l2(_remotePeers_m); - std::map< std::pair<Address,unsigned int>,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(fromPeerAddress,0))); + std::map< std::pair<Address,unsigned int>,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0))); for(;;) { - if ((rpe == _remotePeers.end())||(rpe->first.first != fromPeerAddress)) + if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress)) break; else if (rpe->second > mostRecentTs) { mostRecentTs = rpe->second; @@ -420,7 +421,7 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee if (enqueueAndWait) { TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); Mutex::Lock _l(_sendViaClusterQueue_m); - _sendViaClusterQueue[fromPeerAddress].push_back(_SQE(now,toPeerAddress,data,len,unite)); + _sendViaClusterQueue.push_back(_SQE(now,fromPeerAddress,toPeerAddress,data,len,unite)); return; } } @@ -484,13 +485,8 @@ void Cluster::doPeriodicTasks() { Mutex::Lock _l2(_sendViaClusterQueue_m); - for(std::map< Address,std::list<_SQE> >::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { - for(std::list<_SQE>::iterator qii(qi->second.begin());qii!=qi->second.end();) { - if ((now - qii->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION) - qi->second.erase(qii++); - else ++qii; - } - if (qi->second.empty()) + for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { + if ((now - qi->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION) _sendViaClusterQueue.erase(qi++); else ++qi; } diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 63196928..4197a14b 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -39,6 +39,7 @@ #include "Constants.hpp" #include "../include/ZeroTierOne.h" #include "Address.hpp" +#include "Array.hpp" #include "InetAddress.hpp" #include "SHA512.hpp" #include "Utils.hpp" @@ -74,7 +75,7 @@ /** * Expiration time for send queue entries */ -#define ZT_CLUSTER_QUEUE_EXPIRATION 1500 +#define ZT_CLUSTER_QUEUE_EXPIRATION 500 namespace ZeroTier { @@ -372,18 +373,20 @@ private: struct _SQE { _SQE() : timestamp(0),len(0),unite(false) {} - _SQE(const uint64_t ts,const Address &t,const void *d,const unsigned int l,const bool u) : + _SQE(const uint64_t ts,const Address &f,const Address &t,const void *d,const unsigned int l,const bool u) : timestamp(ts), + fromPeerAddress(f), toPeerAddress(t), len(l), unite(u) { memcpy(data,d,l); } uint64_t timestamp; + Address fromPeerAddress; Address toPeerAddress; unsigned int len; bool unite; unsigned char data[ZT_PROTO_MAX_PACKET_LENGTH]; }; - std::map< Address,std::list<_SQE> > _sendViaClusterQueue; + std::list<_SQE> _sendViaClusterQueue; Mutex _sendViaClusterQueue_m; uint64_t _lastFlushed; |