summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2017-10-25 15:44:10 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2017-10-25 15:44:10 -0700
commit508fa6a7fed894fd616239c04aeb0c2e8f6b9022 (patch)
treedbc603eb4353bb70099d4915c171cf331f2ce94c
parent71bdaa95087536954f1f1cb7b4652fd9b33be587 (diff)
downloadinfinitytier-508fa6a7fed894fd616239c04aeb0c2e8f6b9022.tar.gz
infinitytier-508fa6a7fed894fd616239c04aeb0c2e8f6b9022.zip
A few fixes for cluster mode.
-rw-r--r--node/IncomingPacket.cpp4
-rw-r--r--node/Path.hpp13
-rw-r--r--node/Peer.cpp183
-rw-r--r--node/Peer.hpp6
4 files changed, 133 insertions, 73 deletions
diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp
index dfa0a161..d44e3b54 100644
--- a/node/IncomingPacket.cpp
+++ b/node/IncomingPacket.cpp
@@ -1092,7 +1092,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
{
if ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) != 0) {
- peer->clusterRedirect(tPtr,_path->localSocket(),a,now);
+ peer->clusterRedirect(tPtr,_path,a,now);
} else if (++countPerScope[(int)a.ipScope()][0] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
peer->attemptToContactAt(tPtr,InetAddress(),a,now,false,0);
}
@@ -1106,7 +1106,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
{
if ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) != 0) {
- peer->clusterRedirect(tPtr,_path->localSocket(),a,now);
+ peer->clusterRedirect(tPtr,_path,a,now);
} else if (++countPerScope[(int)a.ipScope()][1] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
peer->attemptToContactAt(tPtr,InetAddress(),a,now,false,0);
}
diff --git a/node/Path.hpp b/node/Path.hpp
index 80132c13..ab52ced6 100644
--- a/node/Path.hpp
+++ b/node/Path.hpp
@@ -281,14 +281,19 @@ public:
/**
* @return Path quality -- lower is better
*/
- inline int quality(const int64_t now) const
+ inline long quality(const int64_t now) const
{
- const int l = (int)_latency;
- const int age = (int)std::min((now - _lastIn),(int64_t)(ZT_PATH_HEARTBEAT_PERIOD * 10)); // set an upper sanity limit to avoid overflow
- return (((age < (ZT_PATH_HEARTBEAT_PERIOD + 5000)) ? l : (l + 0xffff + age)) * (int)((ZT_INETADDRESS_MAX_SCOPE - _ipScope) + 1));
+ const int l = (long)_latency;
+ const int age = (long)std::min((now - _lastIn),(int64_t)(ZT_PATH_HEARTBEAT_PERIOD * 10)); // set an upper sanity limit to avoid overflow
+ return (((age < (ZT_PATH_HEARTBEAT_PERIOD + 5000)) ? l : (l + 0xffff + age)) * (long)((ZT_INETADDRESS_MAX_SCOPE - _ipScope) + 1));
}
/**
+ * @return True if this path is alive (receiving heartbeats)
+ */
+ inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); }
+
+ /**
* @return True if this path needs a heartbeat
*/
inline bool needsHeartbeat(const int64_t now) const { return ((now - _lastOut) >= ZT_PATH_HEARTBEAT_PERIOD); }
diff --git a/node/Peer.cpp b/node/Peer.cpp
index d2692011..d68e0df3 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -148,39 +148,64 @@ void Peer::received(
if (hops == 0) {
// If this is a direct packet (no hops), update existing paths or learn new ones
- Mutex::Lock _l(_paths_m);
- unsigned int worstQualityPath = 0;
- int worstQuality = 0;
bool havePath = false;
- for(unsigned int p=0;p<ZT_MAX_PEER_NETWORK_PATHS;++p) {
- if (_paths[p].p) {
- if (_paths[p].p == path) {
- _paths[p].lr = now;
- havePath = true;
- break;
- }
- const int q = _paths[p].p->quality(now) / _paths[p].priority;
- if (q >= worstQuality) {
- worstQuality = q;
- worstQualityPath = p;
- }
- } else {
- worstQualityPath = p;
- break;
+ {
+ Mutex::Lock _l(_paths_m);
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if (_paths[i].p == path) {
+ _paths[i].lr = now;
+ havePath = true;
+ break;
+ }
+ } else break;
}
}
if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) {
- if (verb == Packet::VERB_OK) {
- RR->t->peerLearnedNewPath(tPtr,networkId,*this,_paths[worstQualityPath].p,path,packetId);
- _paths[worstQualityPath].lr = now;
- _paths[worstQualityPath].p = path;
- _paths[worstQualityPath].priority = 1;
- } else {
- attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter());
- path->sent(now);
- RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb);
+ Mutex::Lock _l(_paths_m);
+
+ // Paths are redunant if they duplicate an alive path to the same IP or
+ // with the same local socket and address family.
+ bool redundant = false;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual(path->address())) ) ) {
+ redundant = true;
+ break;
+ }
+ } else break;
+ }
+
+ if (!redundant) {
+ unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS;
+ int replacePathQuality = 0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ const int q = _paths[i].p->quality(now);
+ if (q > replacePathQuality) {
+ replacePathQuality = q;
+ replacePath = i;
+ }
+ } else {
+ replacePath = i;
+ break;
+ }
+ }
+
+ if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) {
+ if (verb == Packet::VERB_OK) {
+ RR->t->peerLearnedNewPath(tPtr,networkId,*this,_paths[replacePath].p,path,packetId);
+ _paths[replacePath].lr = now;
+ _paths[replacePath].p = path;
+ _paths[replacePath].priority = 1;
+ } else {
+ attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter());
+ path->sent(now);
+ RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb);
+ }
+ }
}
}
}
@@ -258,11 +283,11 @@ SharedPtr<Path> Peer::getBestPath(int64_t now,bool includeExpired) const
Mutex::Lock _l(_paths_m);
unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS;
- int bestPathQuality = 2147483647; // INT_MAX
+ long bestPathQuality = 2147483647;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
- const int q = _paths[i].p->quality(now) / _paths[i].priority;
+ const long q = _paths[i].p->quality(now) / _paths[i].priority;
if (q <= bestPathQuality) {
bestPathQuality = q;
bestPath = i;
@@ -280,12 +305,12 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o
{
unsigned int myBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
unsigned int myBestV6ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
- int myBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
- int myBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
+ long myBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
+ long myBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
unsigned int theirBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
unsigned int theirBestV6ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
- int theirBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
- int theirBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
+ long theirBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
+ long theirBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
for(int i=0;i<=ZT_INETADDRESS_MAX_SCOPE;++i) {
myBestV4ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS;
myBestV6ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS;
@@ -301,7 +326,7 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- const int q = _paths[i].p->quality(now) / _paths[i].priority;
+ const long q = _paths[i].p->quality(now) / _paths[i].priority;
const unsigned int s = (unsigned int)_paths[i].p->ipScope();
switch(_paths[i].p->address().ss_family) {
case AF_INET:
@@ -324,7 +349,7 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (other->_paths[i].p) {
- const int q = other->_paths[i].p->quality(now) / other->_paths[i].priority;
+ const long q = other->_paths[i].p->quality(now) / other->_paths[i].priority;
const unsigned int s = (unsigned int)other->_paths[i].p->ipScope();
switch(other->_paths[i].p->address().ss_family) {
case AF_INET:
@@ -471,19 +496,32 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
_lastSentFullHello = now;
+ // Right now we only keep pinging links that have the maximum priority. The
+ // priority is used to track cluster redirections, meaning that when a cluster
+ // redirects us its redirect target links override all other links and we
+ // let those old links expire.
+ long maxPriority = 0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p)
+ maxPriority = std::max(_paths[i].priority,maxPriority);
+ else break;
+ }
+
unsigned int j = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
- if (!_paths[i].p) break;
- if ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) {
- if ((sendFullHello)||(_paths[i].p->needsHeartbeat(now))) {
- attemptToContactAt(tPtr,_paths[i].p->localSocket(),_paths[i].p->address(),now,sendFullHello,_paths[i].p->nextOutgoingCounter());
- _paths[i].p->sent(now);
- sent |= (_paths[i].p->address().ss_family == AF_INET) ? 0x1 : 0x2;
+ if (_paths[i].p) {
+ // Clean expired and reduced priority paths
+ if ( ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) && (_paths[i].priority == maxPriority) ) {
+ if ((sendFullHello)||(_paths[i].p->needsHeartbeat(now))) {
+ attemptToContactAt(tPtr,_paths[i].p->localSocket(),_paths[i].p->address(),now,sendFullHello,_paths[i].p->nextOutgoingCounter());
+ _paths[i].p->sent(now);
+ sent |= (_paths[i].p->address().ss_family == AF_INET) ? 0x1 : 0x2;
+ }
+ if (i != j)
+ _paths[j] = _paths[i];
+ ++j;
}
- if (i != j)
- _paths[j] = _paths[i];
- ++j;
- }
+ } else break;
}
while(j < ZT_MAX_PEER_NETWORK_PATHS) {
_paths[j].lr = 0;
@@ -495,35 +533,52 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
return sent;
}
-void Peer::clusterRedirect(void *tPtr,const int64_t localSocket,const InetAddress &remoteAddress,const int64_t now)
+void Peer::clusterRedirect(void *tPtr,const SharedPtr<Path> &originatingPath,const InetAddress &remoteAddress,const int64_t now)
{
- SharedPtr<Path> np(RR->topology->getPath(localSocket,remoteAddress));
+ SharedPtr<Path> np(RR->topology->getPath(originatingPath->localSocket(),remoteAddress));
RR->t->peerRedirected(tPtr,0,*this,np);
- attemptToContactAt(tPtr,localSocket,remoteAddress,now,true,np->nextOutgoingCounter());
+
+ attemptToContactAt(tPtr,originatingPath->localSocket(),remoteAddress,now,true,np->nextOutgoingCounter());
+
{
Mutex::Lock _l(_paths_m);
- int worstQuality = 0;
- unsigned int worstQualityPath = 0;
+
+ // New priority is higher than the priority of the originating path (if known)
+ long newPriority = 1;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- if (_paths[i].p == np) { // <-- where's my Fields Medal?
- _paths[i].lr = now; // consider this a "receive"
- _paths[i].priority += 5; // kind of arbitrary, bumps way up in best path quality order
- return;
+ if (_paths[i].p == originatingPath) {
+ newPriority = _paths[i].priority;
+ break;
}
- const int q = _paths[i].p->quality(now) / _paths[i].priority;
- if (q >= worstQuality) {
- worstQuality = q;
- worstQualityPath = i;
+ } else break;
+ }
+ newPriority += 2;
+
+ // Erase any paths with lower priority than this one or that are duplicate
+ // IPs and add this path.
+ unsigned int j = 0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if ((_paths[i].priority >= newPriority)&&(!_paths[i].p->address().ipsEqual(remoteAddress))) {
+ if (i != j)
+ _paths[j] = _paths[i];
+ ++j;
}
- } else {
- worstQualityPath = i;
- break;
}
}
- _paths[worstQualityPath].lr = now;
- _paths[worstQualityPath].p = np;
- _paths[worstQualityPath].priority = 6; // 1 + 5
+ if (j < ZT_MAX_PEER_NETWORK_PATHS) {
+ _paths[j].lr = now;
+ _paths[j].p = np;
+ _paths[j].priority = newPriority;
+ ++j;
+ while (j < ZT_MAX_PEER_NETWORK_PATHS) {
+ _paths[j].lr = 0;
+ _paths[j].p.zero();
+ _paths[j].priority = 1;
+ ++j;
+ }
+ }
}
}
diff --git a/node/Peer.hpp b/node/Peer.hpp
index 997c44f5..53b916ab 100644
--- a/node/Peer.hpp
+++ b/node/Peer.hpp
@@ -219,11 +219,11 @@ public:
* Process a cluster redirect sent by this peer
*
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
- * @param localSocket Local socket as supplied by external code
+ * @param originatingPath Path from which redirect originated
* @param remoteAddress Remote address
* @param now Current time
*/
- void clusterRedirect(void *tPtr,const int64_t localSocket,const InetAddress &remoteAddress,const int64_t now);
+ void clusterRedirect(void *tPtr,const SharedPtr<Path> &originatingPath,const InetAddress &remoteAddress,const int64_t now);
/**
* Reset paths within a given IP scope and address family
@@ -498,7 +498,7 @@ private:
_PeerPath() : lr(0),p(),priority(1) {}
int64_t lr; // time of last valid ZeroTier packet
SharedPtr<Path> p;
- int priority; // >= 1, higher is better
+ long priority; // >= 1, higher is better
};
uint8_t _key[ZT_PEER_SECRET_KEY_LENGTH];