diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-10-25 15:44:10 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-10-25 15:44:10 -0700 |
commit | 508fa6a7fed894fd616239c04aeb0c2e8f6b9022 (patch) | |
tree | dbc603eb4353bb70099d4915c171cf331f2ce94c | |
parent | 71bdaa95087536954f1f1cb7b4652fd9b33be587 (diff) | |
download | infinitytier-508fa6a7fed894fd616239c04aeb0c2e8f6b9022.tar.gz infinitytier-508fa6a7fed894fd616239c04aeb0c2e8f6b9022.zip |
A few fixes for cluster mode.
-rw-r--r-- | node/IncomingPacket.cpp | 4 | ||||
-rw-r--r-- | node/Path.hpp | 13 | ||||
-rw-r--r-- | node/Peer.cpp | 183 | ||||
-rw-r--r-- | node/Peer.hpp | 6 |
4 files changed, 133 insertions, 73 deletions
diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index dfa0a161..d44e3b54 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -1092,7 +1092,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt (RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path { if ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) != 0) { - peer->clusterRedirect(tPtr,_path->localSocket(),a,now); + peer->clusterRedirect(tPtr,_path,a,now); } else if (++countPerScope[(int)a.ipScope()][0] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) { peer->attemptToContactAt(tPtr,InetAddress(),a,now,false,0); } @@ -1106,7 +1106,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt (RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path { if ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) != 0) { - peer->clusterRedirect(tPtr,_path->localSocket(),a,now); + peer->clusterRedirect(tPtr,_path,a,now); } else if (++countPerScope[(int)a.ipScope()][1] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) { peer->attemptToContactAt(tPtr,InetAddress(),a,now,false,0); } diff --git a/node/Path.hpp b/node/Path.hpp index 80132c13..ab52ced6 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -281,14 +281,19 @@ public: /** * @return Path quality -- lower is better */ - inline int quality(const int64_t now) const + inline long quality(const int64_t now) const { - const int l = (int)_latency; - const int age = (int)std::min((now - _lastIn),(int64_t)(ZT_PATH_HEARTBEAT_PERIOD * 10)); // set an upper sanity limit to avoid overflow - return (((age < (ZT_PATH_HEARTBEAT_PERIOD + 5000)) ? l : (l + 0xffff + age)) * (int)((ZT_INETADDRESS_MAX_SCOPE - _ipScope) + 1)); + const int l = (long)_latency; + const int age = (long)std::min((now - _lastIn),(int64_t)(ZT_PATH_HEARTBEAT_PERIOD * 10)); // set an upper sanity limit to avoid overflow + return (((age < (ZT_PATH_HEARTBEAT_PERIOD + 5000)) ? l : (l + 0xffff + age)) * (long)((ZT_INETADDRESS_MAX_SCOPE - _ipScope) + 1)); } /** + * @return True if this path is alive (receiving heartbeats) + */ + inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); } + + /** * @return True if this path needs a heartbeat */ inline bool needsHeartbeat(const int64_t now) const { return ((now - _lastOut) >= ZT_PATH_HEARTBEAT_PERIOD); } diff --git a/node/Peer.cpp b/node/Peer.cpp index d2692011..d68e0df3 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -148,39 +148,64 @@ void Peer::received( if (hops == 0) { // If this is a direct packet (no hops), update existing paths or learn new ones - Mutex::Lock _l(_paths_m); - unsigned int worstQualityPath = 0; - int worstQuality = 0; bool havePath = false; - for(unsigned int p=0;p<ZT_MAX_PEER_NETWORK_PATHS;++p) { - if (_paths[p].p) { - if (_paths[p].p == path) { - _paths[p].lr = now; - havePath = true; - break; - } - const int q = _paths[p].p->quality(now) / _paths[p].priority; - if (q >= worstQuality) { - worstQuality = q; - worstQualityPath = p; - } - } else { - worstQualityPath = p; - break; + { + Mutex::Lock _l(_paths_m); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + if (_paths[i].p == path) { + _paths[i].lr = now; + havePath = true; + break; + } + } else break; } } if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) { - if (verb == Packet::VERB_OK) { - RR->t->peerLearnedNewPath(tPtr,networkId,*this,_paths[worstQualityPath].p,path,packetId); - _paths[worstQualityPath].lr = now; - _paths[worstQualityPath].p = path; - _paths[worstQualityPath].priority = 1; - } else { - attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter()); - path->sent(now); - RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb); + Mutex::Lock _l(_paths_m); + + // Paths are redunant if they duplicate an alive path to the same IP or + // with the same local socket and address family. + bool redundant = false; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual(path->address())) ) ) { + redundant = true; + break; + } + } else break; + } + + if (!redundant) { + unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS; + int replacePathQuality = 0; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + const int q = _paths[i].p->quality(now); + if (q > replacePathQuality) { + replacePathQuality = q; + replacePath = i; + } + } else { + replacePath = i; + break; + } + } + + if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) { + if (verb == Packet::VERB_OK) { + RR->t->peerLearnedNewPath(tPtr,networkId,*this,_paths[replacePath].p,path,packetId); + _paths[replacePath].lr = now; + _paths[replacePath].p = path; + _paths[replacePath].priority = 1; + } else { + attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter()); + path->sent(now); + RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb); + } + } } } } @@ -258,11 +283,11 @@ SharedPtr<Path> Peer::getBestPath(int64_t now,bool includeExpired) const Mutex::Lock _l(_paths_m); unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS; - int bestPathQuality = 2147483647; // INT_MAX + long bestPathQuality = 2147483647; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) { - const int q = _paths[i].p->quality(now) / _paths[i].priority; + const long q = _paths[i].p->quality(now) / _paths[i].priority; if (q <= bestPathQuality) { bestPathQuality = q; bestPath = i; @@ -280,12 +305,12 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o { unsigned int myBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1]; unsigned int myBestV6ByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - int myBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - int myBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; + long myBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; + long myBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; unsigned int theirBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1]; unsigned int theirBestV6ByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - int theirBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - int theirBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; + long theirBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; + long theirBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; for(int i=0;i<=ZT_INETADDRESS_MAX_SCOPE;++i) { myBestV4ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS; myBestV6ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS; @@ -301,7 +326,7 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { - const int q = _paths[i].p->quality(now) / _paths[i].priority; + const long q = _paths[i].p->quality(now) / _paths[i].priority; const unsigned int s = (unsigned int)_paths[i].p->ipScope(); switch(_paths[i].p->address().ss_family) { case AF_INET: @@ -324,7 +349,7 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (other->_paths[i].p) { - const int q = other->_paths[i].p->quality(now) / other->_paths[i].priority; + const long q = other->_paths[i].p->quality(now) / other->_paths[i].priority; const unsigned int s = (unsigned int)other->_paths[i].p->ipScope(); switch(other->_paths[i].p->address().ss_family) { case AF_INET: @@ -471,19 +496,32 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD); _lastSentFullHello = now; + // Right now we only keep pinging links that have the maximum priority. The + // priority is used to track cluster redirections, meaning that when a cluster + // redirects us its redirect target links override all other links and we + // let those old links expire. + long maxPriority = 0; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) + maxPriority = std::max(_paths[i].priority,maxPriority); + else break; + } + unsigned int j = 0; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { - if (!_paths[i].p) break; - if ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) { - if ((sendFullHello)||(_paths[i].p->needsHeartbeat(now))) { - attemptToContactAt(tPtr,_paths[i].p->localSocket(),_paths[i].p->address(),now,sendFullHello,_paths[i].p->nextOutgoingCounter()); - _paths[i].p->sent(now); - sent |= (_paths[i].p->address().ss_family == AF_INET) ? 0x1 : 0x2; + if (_paths[i].p) { + // Clean expired and reduced priority paths + if ( ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) && (_paths[i].priority == maxPriority) ) { + if ((sendFullHello)||(_paths[i].p->needsHeartbeat(now))) { + attemptToContactAt(tPtr,_paths[i].p->localSocket(),_paths[i].p->address(),now,sendFullHello,_paths[i].p->nextOutgoingCounter()); + _paths[i].p->sent(now); + sent |= (_paths[i].p->address().ss_family == AF_INET) ? 0x1 : 0x2; + } + if (i != j) + _paths[j] = _paths[i]; + ++j; } - if (i != j) - _paths[j] = _paths[i]; - ++j; - } + } else break; } while(j < ZT_MAX_PEER_NETWORK_PATHS) { _paths[j].lr = 0; @@ -495,35 +533,52 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) return sent; } -void Peer::clusterRedirect(void *tPtr,const int64_t localSocket,const InetAddress &remoteAddress,const int64_t now) +void Peer::clusterRedirect(void *tPtr,const SharedPtr<Path> &originatingPath,const InetAddress &remoteAddress,const int64_t now) { - SharedPtr<Path> np(RR->topology->getPath(localSocket,remoteAddress)); + SharedPtr<Path> np(RR->topology->getPath(originatingPath->localSocket(),remoteAddress)); RR->t->peerRedirected(tPtr,0,*this,np); - attemptToContactAt(tPtr,localSocket,remoteAddress,now,true,np->nextOutgoingCounter()); + + attemptToContactAt(tPtr,originatingPath->localSocket(),remoteAddress,now,true,np->nextOutgoingCounter()); + { Mutex::Lock _l(_paths_m); - int worstQuality = 0; - unsigned int worstQualityPath = 0; + + // New priority is higher than the priority of the originating path (if known) + long newPriority = 1; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { - if (_paths[i].p == np) { // <-- where's my Fields Medal? - _paths[i].lr = now; // consider this a "receive" - _paths[i].priority += 5; // kind of arbitrary, bumps way up in best path quality order - return; + if (_paths[i].p == originatingPath) { + newPriority = _paths[i].priority; + break; } - const int q = _paths[i].p->quality(now) / _paths[i].priority; - if (q >= worstQuality) { - worstQuality = q; - worstQualityPath = i; + } else break; + } + newPriority += 2; + + // Erase any paths with lower priority than this one or that are duplicate + // IPs and add this path. + unsigned int j = 0; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + if ((_paths[i].priority >= newPriority)&&(!_paths[i].p->address().ipsEqual(remoteAddress))) { + if (i != j) + _paths[j] = _paths[i]; + ++j; } - } else { - worstQualityPath = i; - break; } } - _paths[worstQualityPath].lr = now; - _paths[worstQualityPath].p = np; - _paths[worstQualityPath].priority = 6; // 1 + 5 + if (j < ZT_MAX_PEER_NETWORK_PATHS) { + _paths[j].lr = now; + _paths[j].p = np; + _paths[j].priority = newPriority; + ++j; + while (j < ZT_MAX_PEER_NETWORK_PATHS) { + _paths[j].lr = 0; + _paths[j].p.zero(); + _paths[j].priority = 1; + ++j; + } + } } } diff --git a/node/Peer.hpp b/node/Peer.hpp index 997c44f5..53b916ab 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -219,11 +219,11 @@ public: * Process a cluster redirect sent by this peer * * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call - * @param localSocket Local socket as supplied by external code + * @param originatingPath Path from which redirect originated * @param remoteAddress Remote address * @param now Current time */ - void clusterRedirect(void *tPtr,const int64_t localSocket,const InetAddress &remoteAddress,const int64_t now); + void clusterRedirect(void *tPtr,const SharedPtr<Path> &originatingPath,const InetAddress &remoteAddress,const int64_t now); /** * Reset paths within a given IP scope and address family @@ -498,7 +498,7 @@ private: _PeerPath() : lr(0),p(),priority(1) {} int64_t lr; // time of last valid ZeroTier packet SharedPtr<Path> p; - int priority; // >= 1, higher is better + long priority; // >= 1, higher is better }; uint8_t _key[ZT_PEER_SECRET_KEY_LENGTH]; |