summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2015-11-09 12:24:49 -0800
committerAdam Ierymenko <adam.ierymenko@gmail.com>2015-11-09 12:24:49 -0800
commit73e2c6e5117aba36566c22edc1c19c8d6347d317 (patch)
tree5a3a1aaa9ba148623743293d737167e3748a97db /node
parent12cd9df059dc70bed3da6c2a1041329f995e2a9d (diff)
downloadinfinitytier-73e2c6e5117aba36566c22edc1c19c8d6347d317.tar.gz
infinitytier-73e2c6e5117aba36566c22edc1c19c8d6347d317.zip
How did that ever work?
Diffstat (limited to 'node')
-rw-r--r--node/Cluster.cpp42
-rw-r--r--node/Cluster.hpp9
2 files changed, 25 insertions, 26 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index a5c30e0f..ec115159 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -223,10 +223,10 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
std::list<_SQE> q;
{
Mutex::Lock _l(_sendViaClusterQueue_m);
- std::map< Address,std::list<_SQE> >::iterator qe(_sendViaClusterQueue.find(id.address()));
- if (qe != _sendViaClusterQueue.end()) {
- q.swap(qe->second); // just swap ptr instead of copying
- _sendViaClusterQueue.erase(qe);
+ for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) {
+ if (qi->toPeerAddress == id.address())
+ q.splice(q.end(),_sendViaClusterQueue,qi++);
+ else ++qi;
}
}
for(std::list<_SQE>::iterator qi(q.begin());qi!=q.end();++qi)
@@ -368,16 +368,17 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check
return;
- _sendViaClusterQueue_m.lock();
- unsigned long queueCount;
+ unsigned int queueCount = 0;
{
- std::map< Address,std::list<_SQE> >::const_iterator qe(_sendViaClusterQueue.find(fromPeerAddress));
- queueCount = (qe == _sendViaClusterQueue.end()) ? 0 : (unsigned long)qe->second.size();
- }
- _sendViaClusterQueue_m.unlock();
- if (queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) {
- TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
- return;
+ Mutex::Lock _l(_sendViaClusterQueue_m);
+ for(std::list<_SQE>::const_iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();++qi) {
+ if (qi->fromPeerAddress == fromPeerAddress) {
+ if (++queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) {
+ TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
+ return;
+ }
+ }
+ }
}
const uint64_t now = RR->node->now();
@@ -386,9 +387,9 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
unsigned int mostRecentMemberId = 0xffffffff;
{
Mutex::Lock _l2(_remotePeers_m);
- std::map< std::pair<Address,unsigned int>,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(fromPeerAddress,0)));
+ std::map< std::pair<Address,unsigned int>,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
for(;;) {
- if ((rpe == _remotePeers.end())||(rpe->first.first != fromPeerAddress))
+ if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress))
break;
else if (rpe->second > mostRecentTs) {
mostRecentTs = rpe->second;
@@ -420,7 +421,7 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
if (enqueueAndWait) {
TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
Mutex::Lock _l(_sendViaClusterQueue_m);
- _sendViaClusterQueue[fromPeerAddress].push_back(_SQE(now,toPeerAddress,data,len,unite));
+ _sendViaClusterQueue.push_back(_SQE(now,fromPeerAddress,toPeerAddress,data,len,unite));
return;
}
}
@@ -484,13 +485,8 @@ void Cluster::doPeriodicTasks()
{
Mutex::Lock _l2(_sendViaClusterQueue_m);
- for(std::map< Address,std::list<_SQE> >::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) {
- for(std::list<_SQE>::iterator qii(qi->second.begin());qii!=qi->second.end();) {
- if ((now - qii->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION)
- qi->second.erase(qii++);
- else ++qii;
- }
- if (qi->second.empty())
+ for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) {
+ if ((now - qi->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION)
_sendViaClusterQueue.erase(qi++);
else ++qi;
}
diff --git a/node/Cluster.hpp b/node/Cluster.hpp
index 63196928..4197a14b 100644
--- a/node/Cluster.hpp
+++ b/node/Cluster.hpp
@@ -39,6 +39,7 @@
#include "Constants.hpp"
#include "../include/ZeroTierOne.h"
#include "Address.hpp"
+#include "Array.hpp"
#include "InetAddress.hpp"
#include "SHA512.hpp"
#include "Utils.hpp"
@@ -74,7 +75,7 @@
/**
* Expiration time for send queue entries
*/
-#define ZT_CLUSTER_QUEUE_EXPIRATION 1500
+#define ZT_CLUSTER_QUEUE_EXPIRATION 500
namespace ZeroTier {
@@ -372,18 +373,20 @@ private:
struct _SQE
{
_SQE() : timestamp(0),len(0),unite(false) {}
- _SQE(const uint64_t ts,const Address &t,const void *d,const unsigned int l,const bool u) :
+ _SQE(const uint64_t ts,const Address &f,const Address &t,const void *d,const unsigned int l,const bool u) :
timestamp(ts),
+ fromPeerAddress(f),
toPeerAddress(t),
len(l),
unite(u) { memcpy(data,d,l); }
uint64_t timestamp;
+ Address fromPeerAddress;
Address toPeerAddress;
unsigned int len;
bool unite;
unsigned char data[ZT_PROTO_MAX_PACKET_LENGTH];
};
- std::map< Address,std::list<_SQE> > _sendViaClusterQueue;
+ std::list<_SQE> _sendViaClusterQueue;
Mutex _sendViaClusterQueue_m;
uint64_t _lastFlushed;