diff options
Diffstat (limited to 'node/Peer.cpp')
-rw-r--r-- | node/Peer.cpp | 348 |
1 files changed, 187 insertions, 161 deletions
diff --git a/node/Peer.cpp b/node/Peer.cpp index 01492be1..251c5a5f 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -27,8 +27,6 @@ #include "Cluster.hpp" #include "Packet.hpp" -#include <algorithm> - #define ZT_PEER_PATH_SORT_INTERVAL 5000 namespace ZeroTier { @@ -45,7 +43,6 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _lastAnnouncedTo(0), _lastDirectPathPushSent(0), _lastDirectPathPushReceive(0), - _lastPathSort(0), _vProto(0), _vMajor(0), _vMinor(0), @@ -60,8 +57,7 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident } void Peer::received( - const InetAddress &localAddr, - const InetAddress &remoteAddr, + const SharedPtr<Path> &path, unsigned int hops, uint64_t packetId, Packet::Verb verb, @@ -69,13 +65,15 @@ void Peer::received( Packet::Verb inReVerb, const bool trustEstablished) { + const uint64_t now = RR->node->now(); + #ifdef ZT_ENABLE_CLUSTER bool suboptimalPath = false; if ((RR->cluster)&&(hops == 0)) { // Note: findBetterEndpoint() is first since we still want to check // for a better endpoint even if we don't actually send a redirect. InetAddress redirectTo; - if ( (verb != Packet::VERB_OK) && (verb != Packet::VERB_ERROR) && (verb != Packet::VERB_RENDEZVOUS) && (verb != Packet::VERB_PUSH_DIRECT_PATHS) && (RR->cluster->findBetterEndpoint(redirectTo,_id.address(),remoteAddr,false)) ) { + if ( (verb != Packet::VERB_OK) && (verb != Packet::VERB_ERROR) && (verb != Packet::VERB_RENDEZVOUS) && (verb != Packet::VERB_PUSH_DIRECT_PATHS) && (RR->cluster->findBetterEndpoint(redirectTo,_id.address(),path->address(),false)) ) { if (_vProto >= 5) { // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS. Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); @@ -93,7 +91,7 @@ void Peer::received( } outp.append((uint16_t)redirectTo.port()); outp.armor(_key,true); - RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); + path->send(RR,outp.data(),outp.size(),now); } else { // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere. Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); @@ -108,14 +106,13 @@ void Peer::received( outp.append(redirectTo.rawIpData(),16); } outp.armor(_key,true); - RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); + path->send(RR,outp.data(),outp.size(),now); } suboptimalPath = true; } } #endif - const uint64_t now = RR->node->now(); _lastReceive = now; if ((verb == Packet::VERB_FRAME)||(verb == Packet::VERB_EXT_FRAME)) _lastUnicastFrame = now; @@ -124,53 +121,47 @@ void Peer::received( if (hops == 0) { bool pathIsConfirmed = false; - unsigned int np = _numPaths; - for(unsigned int p=0;p<np;++p) { - if ((_paths[p].address() == remoteAddr)&&(_paths[p].localAddress() == localAddr)) { - _paths[p].received(now); + { + Mutex::Lock _l(_paths_m); + for(unsigned int p=0;p<_numPaths;++p) { + if (_paths[p].path == path) { // paths are canonicalized so pointer compare is good here + _paths[p].lastReceive = now; #ifdef ZT_ENABLE_CLUSTER - _paths[p].setClusterSuboptimal(suboptimalPath); + _paths[p].clusterSuboptimal = suboptimalPath; #endif - pathIsConfirmed = true; - break; + pathIsConfirmed = true; + break; + } } } - if ((!pathIsConfirmed)&&(RR->node->shouldUsePathForZeroTierTraffic(localAddr,remoteAddr))) { + if ((!pathIsConfirmed)&&(RR->node->shouldUsePathForZeroTierTraffic(path->localAddress(),path->address()))) { if (verb == Packet::VERB_OK) { + Mutex::Lock _l(_paths_m); - Path *slot = (Path *)0; - if (np < ZT_MAX_PEER_NETWORK_PATHS) { - slot = &(_paths[np++]); + unsigned int slot = 0; + if (_numPaths < ZT_MAX_PEER_NETWORK_PATHS) { + slot = _numPaths++; } else { - uint64_t slotWorstScore = 0xffffffffffffffffULL; - for(unsigned int p=0;p<ZT_MAX_PEER_NETWORK_PATHS;++p) { - if (!_paths[p].active(now)) { - slot = &(_paths[p]); - break; - } else { - const uint64_t score = _paths[p].score(); - if (score <= slotWorstScore) { - slotWorstScore = score; - slot = &(_paths[p]); - } + uint64_t oldest = 0ULL; + unsigned int oldestPath = 0; + for(unsigned int p=0;p<_numPaths;++p) { + if (_paths[p].lastReceive < oldest) { + oldest = _paths[p].lastReceive; + oldestPath = p; } } - } - if (slot) { - *slot = Path(localAddr,remoteAddr); - slot->received(now); -#ifdef ZT_ENABLE_CLUSTER - slot->setClusterSuboptimal(suboptimalPath); -#endif - _numPaths = np; + slot = oldestPath; } + _paths[slot].path = path; + _paths[slot].lastReceive = now; #ifdef ZT_ENABLE_CLUSTER + _paths[slot].clusterSuboptimal = suboptimalPath; + if (RR->cluster) RR->cluster->broadcastHavePeer(_id); #endif - } else { TRACE("got %s via unknown path %s(%s), confirming...",Packet::verbString(verb),_id.address().toString().c_str(),remoteAddr.toString().c_str()); @@ -178,15 +169,15 @@ void Peer::received( if ( (_vProto >= 5) && ( !((_vMajor == 1)&&(_vMinor == 1)&&(_vRevision == 0)) ) ) { Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ECHO); outp.armor(_key,true); - RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); + path->send(RR,outp.data(),outp.size(),now); } else { - sendHELLO(localAddr,remoteAddr,now); + sendHELLO(path->localAddress(),path->address(),now); } } } } else if (trustEstablished) { - _pushDirectPaths(localAddr,remoteAddr,now); + _pushDirectPaths(path,now); } if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) { @@ -197,7 +188,96 @@ void Peer::received( } } -void Peer::sendHELLO(const InetAddress &localAddr,const InetAddress &atAddress,uint64_t now,unsigned int ttl) +bool Peer::hasActivePathTo(uint64_t now,const InetAddress &addr) const +{ + Mutex::Lock _l(_paths_m); + for(unsigned int p=0;p<_numPaths;++p) { + if ( (_paths[p].path->address() == addr) && (_paths[p].path->alive(now)) ) + return true; + } + return false; +} + +void Peer::makeExclusive(const InetAddress &addr) +{ + Mutex::Lock _l(_paths_m); + + bool have = false; + for(unsigned int p=0;p<_numPaths;++p) { + if (_paths[p].path->address() == addr) { + have = true; + break; + } + } + + if (have) { + unsigned int np = _numPaths; + unsigned int x = 0; + unsigned int y = 0; + while (x < np) { + if ((_paths[x].path->address().ss_family != addr.ss_family)||(_paths[x].path->address() == addr)) { + if (y != x) { + _paths[y].path = _paths[x].path; + _paths[y].lastReceive = _paths[x].lastReceive; + #ifdef ZT_ENABLE_CLUSTER + _paths[y].clusterSuboptimal = _paths[x].clusterSuboptimal; + #endif + } + ++y; + } + ++x; + } + _numPaths = y; + } +} + +bool Peer::send(const void *data,unsigned int len,uint64_t now,bool forceEvenIfDead) +{ + Mutex::Lock _l(_paths_m); + + int bestp = -1; + uint64_t best = 0ULL; + for(unsigned int p=0;p<_numPaths;++p) { + if (_paths[p].path->alive(now)||(forceEvenIfDead)) { + const uint64_t s = _paths[p].path->score(); + if (s >= best) { + best = s; + bestp = (int)p; + } + } + } + + if (bestp >= 0) { + return _paths[bestp].path->send(RR,data,len,now); + } else { + return false; + } +} + +SharedPtr<Path> Peer::getBestPath(uint64_t now,bool forceEvenIfDead) +{ + Mutex::Lock _l(_paths_m); + + int bestp = -1; + uint64_t best = 0ULL; + for(unsigned int p=0;p<_numPaths;++p) { + if (_paths[p].path->alive(now)||(forceEvenIfDead)) { + const uint64_t s = _paths[p].path->score(); + if (s >= best) { + best = s; + bestp = (int)p; + } + } + } + + if (bestp >= 0) { + return _paths[bestp].path; + } else { + return SharedPtr<Path>(); + } +} + +void Peer::sendHELLO(const InetAddress &localAddr,const InetAddress &atAddress,uint64_t now) { Packet outp(_id.address(),RR->identity.address(),Packet::VERB_HELLO); outp.append((unsigned char)ZT_PROTO_VERSION); @@ -209,51 +289,56 @@ void Peer::sendHELLO(const InetAddress &localAddr,const InetAddress &atAddress,u atAddress.serialize(outp); outp.append((uint64_t)RR->topology->worldId()); outp.append((uint64_t)RR->topology->worldTimestamp()); - outp.armor(_key,false); // HELLO is sent in the clear - RR->node->putPacket(localAddr,atAddress,outp.data(),outp.size(),ttl); + RR->node->putPacket(localAddr,atAddress,outp.data(),outp.size()); } bool Peer::doPingAndKeepalive(uint64_t now,int inetAddressFamily) { - Path *p = (Path *)0; - - if (inetAddressFamily != 0) { - p = _getBestPath(now,inetAddressFamily); - } else { - p = _getBestPath(now); - } - - if (p) { - if ((now - p->lastReceived()) >= ZT_PEER_DIRECT_PING_DELAY) { - //TRACE("PING %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived()); - sendHELLO(p->localAddress(),p->address(),now); - p->sent(now); - p->pinged(now); - } else if ((now - std::max(p->lastSend(),p->lastKeepalive())) >= ZT_NAT_KEEPALIVE_DELAY) { - //TRACE("NAT keepalive %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived()); + bool somethingAlive = false; + Mutex::Lock _l(_paths_m); + for(unsigned int p=0;p<_numPaths;++p) { + if ((now - _paths[p].lastReceive) >= ZT_PEER_PING_PERIOD) { + sendHELLO(_paths[p].path->localAddress(),_paths[p].path->address(),now); + } else if (_paths[p].path->needsHeartbeat(now)) { _natKeepaliveBuf += (uint32_t)((now * 0x9e3779b1) >> 1); // tumble this around to send constantly varying (meaningless) payloads - RR->node->putPacket(p->localAddress(),p->address(),&_natKeepaliveBuf,sizeof(_natKeepaliveBuf)); - p->sentKeepalive(now); + _paths[p].path->send(RR,&_natKeepaliveBuf,sizeof(_natKeepaliveBuf),now); } - return true; + somethingAlive |= _paths[p].path->alive(now); } + return somethingAlive; +} +bool Peer::hasActiveDirectPath(uint64_t now) const +{ + Mutex::Lock _l(_paths_m); + for(unsigned int p=0;p<_numPaths;++p) { + if (_paths[p].path->alive(now)) + return true; + } return false; } bool Peer::resetWithinScope(InetAddress::IpScope scope,uint64_t now) { + Mutex::Lock _l(_paths_m); unsigned int np = _numPaths; unsigned int x = 0; unsigned int y = 0; while (x < np) { - if (_paths[x].address().ipScope() == scope) { + if (_paths[x].path->address().ipScope() == scope) { // Resetting a path means sending a HELLO and then forgetting it. If we // get OK(HELLO) then it will be re-learned. - sendHELLO(_paths[x].localAddress(),_paths[x].address(),now); + sendHELLO(_paths[x].path->localAddress(),_paths[x].path->address(),now); } else { - _paths[y++] = _paths[x]; + if (x != y) { + _paths[y].path = _paths[x].path; + _paths[y].lastReceive = _paths[x].lastReceive; +#ifdef ZT_ENABLE_CLUSTER + _paths[y].clusterSuboptimal = _paths[x].clusterSuboptimal; +#endif + } + ++y; } ++x; } @@ -263,114 +348,55 @@ bool Peer::resetWithinScope(InetAddress::IpScope scope,uint64_t now) void Peer::getBestActiveAddresses(uint64_t now,InetAddress &v4,InetAddress &v6) const { - uint64_t bestV4 = 0,bestV6 = 0; - for(unsigned int p=0,np=_numPaths;p<np;++p) { - if (_paths[p].active(now)) { - uint64_t lr = _paths[p].lastReceived(); - if (lr) { - if (_paths[p].address().isV4()) { - if (lr >= bestV4) { - bestV4 = lr; - v4 = _paths[p].address(); - } - } else if (_paths[p].address().isV6()) { - if (lr >= bestV6) { - bestV6 = lr; - v6 = _paths[p].address(); - } - } + Mutex::Lock _l(_paths_m); + + int bestp4 = -1,bestp6 = -1; + uint64_t best4 = 0ULL,best6 = 0ULL; + for(unsigned int p=0;p<_numPaths;++p) { + if (_paths[p].path->address().ss_family == AF_INET) { + const uint64_t s = _paths[p].path->score(); + if (s >= best4) { + best4 = s; + bestp4 = (int)p; + } + } else if (_paths[p].path->address().ss_family == AF_INET6) { + const uint64_t s = _paths[p].path->score(); + if (s >= best6) { + best6 = s; + bestp6 = (int)p; } } } + + if (bestp4 >= 0) + v4 = _paths[bestp4].path->address(); + if (bestp6 >= 0) + v6 = _paths[bestp6].path->address(); } void Peer::clean(uint64_t now) { + Mutex::Lock _l(_paths_m); unsigned int np = _numPaths; unsigned int x = 0; unsigned int y = 0; while (x < np) { - if (_paths[x].active(now)) - _paths[y++] = _paths[x]; + if ((now - _paths[x].lastReceive) <= ZT_PEER_PATH_EXPIRATION) { + if (y != x) { + _paths[y].path = _paths[x].path; + _paths[y].lastReceive = _paths[x].lastReceive; +#ifdef ZT_ENABLE_CLUSTER + _paths[y].clusterSuboptimal = _paths[x].clusterSuboptimal; +#endif + } + ++y; + } ++x; } _numPaths = y; } -void Peer::_doDeadPathDetection(Path &p,const uint64_t now) -{ - /* Dead path detection: if we have sent something to this peer and have not - * yet received a reply, double check this path. The majority of outbound - * packets including Ethernet frames do generate some kind of reply either - * immediately or at some point in the near future. This will occasionally - * (every NO_ANSWER_TIMEOUT ms) check paths unnecessarily if traffic that - * does not generate a response is being sent such as multicast announcements - * or frames belonging to unidirectional UDP protocols, but the cost is very - * tiny and the benefit in reliability is very large. This takes care of many - * failure modes including crap NATs that forget links and spurious changes - * to physical network topology that cannot be otherwise detected. - * - * Each time we do this we increment a probation counter in the path. This - * counter is reset on any packet receive over this path. If it reaches the - * MAX_PROBATION threshold the path is considred dead. */ - - if ( - (p.lastSend() > p.lastReceived()) && - ((p.lastSend() - p.lastReceived()) >= ZT_PEER_DEAD_PATH_DETECTION_NO_ANSWER_TIMEOUT) && - ((now - p.lastPing()) >= ZT_PEER_DEAD_PATH_DETECTION_NO_ANSWER_TIMEOUT) && - (!p.isClusterSuboptimal()) && - (!RR->topology->amRoot()) - ) { - TRACE("%s(%s) does not seem to be answering in a timely manner, checking if dead (probation == %u)",_id.address().toString().c_str(),p.address().toString().c_str(),p.probation()); - - if ( (_vProto >= 5) && ( !((_vMajor == 1)&&(_vMinor == 1)&&(_vRevision == 0)) ) ) { - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ECHO); - outp.armor(_key,true); - p.send(RR,outp.data(),outp.size(),now); - p.pinged(now); - } else { - sendHELLO(p.localAddress(),p.address(),now); - p.sent(now); - p.pinged(now); - } - - p.increaseProbation(); - } -} - -Path *Peer::_getBestPath(const uint64_t now) -{ - Path *bestPath = (Path *)0; - uint64_t bestPathScore = 0; - for(unsigned int i=0;i<_numPaths;++i) { - const uint64_t score = _paths[i].score(); - if ((score >= bestPathScore)&&(_paths[i].active(now))) { - bestPathScore = score; - bestPath = &(_paths[i]); - } - } - if (bestPath) - _doDeadPathDetection(*bestPath,now); - return bestPath; -} - -Path *Peer::_getBestPath(const uint64_t now,int inetAddressFamily) -{ - Path *bestPath = (Path *)0; - uint64_t bestPathScore = 0; - for(unsigned int i=0;i<_numPaths;++i) { - const uint64_t score = _paths[i].score(); - if (((int)_paths[i].address().ss_family == inetAddressFamily)&&(score >= bestPathScore)&&(_paths[i].active(now))) { - bestPathScore = score; - bestPath = &(_paths[i]); - } - } - if (bestPath) - _doDeadPathDetection(*bestPath,now); - return bestPath; -} - -bool Peer::_pushDirectPaths(const InetAddress &localAddr,const InetAddress &toAddress,uint64_t now) +bool Peer::_pushDirectPaths(const SharedPtr<Path> &path,uint64_t now) { #ifdef ZT_ENABLE_CLUSTER // Cluster mode disables normal PUSH_DIRECT_PATHS in favor of cluster-based peer redirection @@ -445,7 +471,7 @@ bool Peer::_pushDirectPaths(const InetAddress &localAddr,const InetAddress &toAd if (count) { outp.setAt(ZT_PACKET_IDX_PAYLOAD,(uint16_t)count); outp.armor(_key,true); - RR->node->putPacket(localAddr,toAddress,outp.data(),outp.size(),0); + path->send(RR,outp.data(),outp.size(),now); } } |