diff options
Diffstat (limited to 'node/Peer.cpp')
-rw-r--r-- | node/Peer.cpp | 494 |
1 files changed, 418 insertions, 76 deletions
diff --git a/node/Peer.cpp b/node/Peer.cpp index 71afd852..5f5d1462 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -1,6 +1,6 @@ /* * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2018 ZeroTier, Inc. https://www.zerotier.com/ + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -13,7 +13,7 @@ * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. + * along with this program. If not, see <http://www.gnu.org/licenses/>. * * -- * @@ -25,7 +25,6 @@ */ #include "../version.h" - #include "Constants.hpp" #include "Peer.hpp" #include "Node.hpp" @@ -35,9 +34,13 @@ #include "Packet.hpp" #include "Trace.hpp" #include "InetAddress.hpp" +#include "RingBuffer.hpp" +#include "Utils.hpp" namespace ZeroTier { +static unsigned char s_freeRandomByteCounter = 0; + Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity) : RR(renv), _lastReceive(0), @@ -48,18 +51,29 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _lastCredentialRequestSent(0), _lastWhoisRequestReceived(0), _lastEchoRequestReceived(0), - _lastComRequestReceived(0), - _lastComRequestSent(0), _lastCredentialsReceived(0), _lastTrustEstablishedPacketReceived(0), _lastSentFullHello(0), + _lastACKWindowReset(0), + _lastQoSWindowReset(0), + _lastMultipathCompatibilityCheck(0), + _freeRandomByte((unsigned char)((uintptr_t)this >> 4) ^ ++s_freeRandomByteCounter), + _uniqueAlivePathCount(0), + _localMultipathSupported(false), + _remoteMultipathSupported(false), + _canUseMultipath(false), _vProto(0), _vMajor(0), _vMinor(0), _vRevision(0), _id(peerIdentity), _directPathPushCutoffCount(0), - _credentialsCutoffCount(0) + _credentialsCutoffCount(0), + _linkIsBalanced(false), + _linkIsRedundant(false), + _remotePeerMultipathEnabled(false), + _lastAggregateStatsReport(0), + _lastAggregateAllocation(0) { if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH)) throw ZT_EXCEPTION_INVALID_ARGUMENT; @@ -70,6 +84,7 @@ void Peer::received( const SharedPtr<Path> &path, const unsigned int hops, const uint64_t packetId, + const unsigned int payloadLength, const Packet::Verb verb, const uint64_t inRePacketId, const Packet::Verb inReVerb, @@ -87,7 +102,8 @@ void Peer::received( case Packet::VERB_MULTICAST_FRAME: _lastNontrivialReceive = now; break; - default: break; + default: + break; } if (trustEstablished) { @@ -95,9 +111,25 @@ void Peer::received( path->trustedPacketReceived(now); } + { + Mutex::Lock _l(_paths_m); + + recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now); + + if (_canUseMultipath) { + if (path->needsToSendQoS(now)) { + sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now); + } + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + _paths[i].p->processBackgroundPathMeasurements(now); + } + } + } + } + if (hops == 0) { // If this is a direct packet (no hops), update existing paths or learn new ones - bool havePath = false; { Mutex::Lock _l(_paths_m); @@ -116,20 +148,27 @@ void Peer::received( if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) { Mutex::Lock _l(_paths_m); - // Paths are redunant if they duplicate an alive path to the same IP or + // Paths are redundant if they duplicate an alive path to the same IP or // with the same local socket and address family. bool redundant = false; + unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { - if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) { + if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) { redundant = true; break; } + // If the path is the same address and port, simply assume this is a replacement + if ( (_paths[i].p->address().ipsEqual2(path->address()))) { + replacePath = i; + break; + } } else break; } - if (!redundant) { - unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS; + // If the path isn't a duplicate of the same localSocket AND we haven't already determined a replacePath, + // then find the worst path and replace it. + if (!redundant && replacePath == ZT_MAX_PEER_NETWORK_PATHS) { int replacePathQuality = 0; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { @@ -143,16 +182,16 @@ void Peer::received( break; } } + } - if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) { - if (verb == Packet::VERB_OK) { - RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId); - _paths[replacePath].lr = now; - _paths[replacePath].p = path; - _paths[replacePath].priority = 1; - } else { - attemptToContact = true; - } + if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) { + if (verb == Packet::VERB_OK) { + RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId); + _paths[replacePath].lr = now; + _paths[replacePath].p = path; + _paths[replacePath].priority = 1; + } else { + attemptToContact = true; } } } @@ -165,39 +204,20 @@ void Peer::received( } // If we have a trust relationship periodically push a message enumerating - // all known external addresses for ourselves. We now do this even if we - // have a current path since we'll want to use new ones too. + // all known external addresses for ourselves. If we already have a path this + // is done less frequently. if (this->trustEstablished(now)) { - if ((now - _lastDirectPathPushSent) >= ZT_DIRECT_PATH_PUSH_INTERVAL) { + const int64_t sinceLastPush = now - _lastDirectPathPushSent; + if (sinceLastPush >= ((hops == 0) ? ZT_DIRECT_PATH_PUSH_INTERVAL_HAVEPATH : ZT_DIRECT_PATH_PUSH_INTERVAL)) { _lastDirectPathPushSent = now; - - std::vector<InetAddress> pathsToPush; - - std::vector<InetAddress> dps(RR->node->directPaths()); - for(std::vector<InetAddress>::const_iterator i(dps.begin());i!=dps.end();++i) - pathsToPush.push_back(*i); - - // Do symmetric NAT prediction if we are communicating indirectly. - if (hops > 0) { - std::vector<InetAddress> sym(RR->sa->getSymmetricNatPredictions()); - for(unsigned long i=0,added=0;i<sym.size();++i) { - InetAddress tmp(sym[(unsigned long)RR->node->prng() % sym.size()]); - if (std::find(pathsToPush.begin(),pathsToPush.end(),tmp) == pathsToPush.end()) { - pathsToPush.push_back(tmp); - if (++added >= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) - break; - } - } - } - + std::vector<InetAddress> pathsToPush(RR->node->directPaths()); if (pathsToPush.size() > 0) { std::vector<InetAddress>::const_iterator p(pathsToPush.begin()); while (p != pathsToPush.end()) { - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); - outp.addSize(2); // leave room for count - + Packet *const outp = new Packet(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); + outp->addSize(2); // leave room for count unsigned int count = 0; - while ((p != pathsToPush.end())&&((outp.size() + 24) < 1200)) { + while ((p != pathsToPush.end())&&((outp->size() + 24) < 1200)) { uint8_t addressType = 4; switch(p->ss_family) { case AF_INET: @@ -210,51 +230,287 @@ void Peer::received( continue; } - outp.append((uint8_t)0); // no flags - outp.append((uint16_t)0); // no extensions - outp.append(addressType); - outp.append((uint8_t)((addressType == 4) ? 6 : 18)); - outp.append(p->rawIpData(),((addressType == 4) ? 4 : 16)); - outp.append((uint16_t)p->port()); + outp->append((uint8_t)0); // no flags + outp->append((uint16_t)0); // no extensions + outp->append(addressType); + outp->append((uint8_t)((addressType == 4) ? 6 : 18)); + outp->append(p->rawIpData(),((addressType == 4) ? 4 : 16)); + outp->append((uint16_t)p->port()); ++count; ++p; } - if (count) { - outp.setAt(ZT_PACKET_IDX_PAYLOAD,(uint16_t)count); - outp.armor(_key,true); - path->send(RR,tPtr,outp.data(),outp.size(),now); + outp->setAt(ZT_PACKET_IDX_PAYLOAD,(uint16_t)count); + outp->compress(); + outp->armor(_key,true); + path->send(RR,tPtr,outp->data(),outp->size(),now); } + delete outp; } } } } } -SharedPtr<Path> Peer::getBestPath(int64_t now,bool includeExpired) const +void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId, + uint16_t payloadLength, const Packet::Verb verb, int64_t now) { - Mutex::Lock _l(_paths_m); + _freeRandomByte += (unsigned char)(packetId >> 8); // grab entropy to use in path selection logic for multipath + if (_canUseMultipath) { + path->recordOutgoingPacket(now, packetId, payloadLength, verb); + } +} + +void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId, + uint16_t payloadLength, const Packet::Verb verb, int64_t now) +{ + if (_canUseMultipath) { + if (path->needsToSendAck(now)) { + sendACK(tPtr, path, path->localSocket(), path->address(), now); + } + path->recordIncomingPacket(now, packetId, payloadLength, verb); + } +} + +void Peer::computeAggregateProportionalAllocation(int64_t now) +{ + float maxStability = 0; + float totalRelativeQuality = 0; + float maxThroughput = 1; + float maxScope = 0; + float relStability[ZT_MAX_PEER_NETWORK_PATHS]; + float relThroughput[ZT_MAX_PEER_NETWORK_PATHS]; + memset(&relStability, 0, sizeof(relStability)); + memset(&relThroughput, 0, sizeof(relThroughput)); + // Survey all paths + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + relStability[i] = _paths[i].p->lastComputedStability(); + relThroughput[i] = (float)_paths[i].p->maxLifetimeThroughput(); + maxStability = relStability[i] > maxStability ? relStability[i] : maxStability; + maxThroughput = relThroughput[i] > maxThroughput ? relThroughput[i] : maxThroughput; + maxScope = _paths[i].p->ipScope() > maxScope ? _paths[i].p->ipScope() : maxScope; + } + } + // Convert to relative values + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + relStability[i] /= maxStability ? maxStability : 1; + relThroughput[i] /= maxThroughput ? maxThroughput : 1; + float normalized_ma = Utils::normalize((float)_paths[i].p->ackAge(now), 0, ZT_PATH_MAX_AGE, 0, 10); + float age_contrib = exp((-1)*normalized_ma); + float relScope = ((float)(_paths[i].p->ipScope()+1) / (maxScope + 1)); + float relQuality = + (relStability[i] * (float)ZT_PATH_CONTRIB_STABILITY) + + (fmaxf(1.0f, relThroughput[i]) * (float)ZT_PATH_CONTRIB_THROUGHPUT) + + relScope * (float)ZT_PATH_CONTRIB_SCOPE; + relQuality *= age_contrib; + // Arbitrary cutoffs + relQuality = relQuality > (1.00f / 100.0f) ? relQuality : 0.0f; + relQuality = relQuality < (99.0f / 100.0f) ? relQuality : 1.0f; + totalRelativeQuality += relQuality; + _paths[i].p->updateRelativeQuality(relQuality); + } + } + // Convert set of relative performances into an allocation set + for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + _paths[i].p->updateComponentAllocationOfAggregateLink((unsigned char)((_paths[i].p->relativeQuality() / totalRelativeQuality) * 255)); + } + } +} + +int Peer::computeAggregateLinkPacketDelayVariance() +{ + float pdv = 0.0; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + pdv += _paths[i].p->relativeQuality() * _paths[i].p->packetDelayVariance(); + } + } + return (int)pdv; +} + +int Peer::computeAggregateLinkMeanLatency() +{ + int ml = 0; + int pathCount = 0; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + pathCount++; + ml += (int)(_paths[i].p->relativeQuality() * _paths[i].p->meanLatency()); + } + } + return ml / pathCount; +} + +int Peer::aggregateLinkPhysicalPathCount() +{ + std::map<std::string, bool> ifnamemap; + int pathCount = 0; + int64_t now = RR->node->now(); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p && _paths[i].p->alive(now)) { + if (!ifnamemap[_paths[i].p->getName()]) { + ifnamemap[_paths[i].p->getName()] = true; + pathCount++; + } + } + } + return pathCount; +} + +int Peer::aggregateLinkLogicalPathCount() +{ + int pathCount = 0; + int64_t now = RR->node->now(); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p && _paths[i].p->alive(now)) { + pathCount++; + } + } + return pathCount; +} +SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired) +{ + Mutex::Lock _l(_paths_m); unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS; - long bestPathQuality = 2147483647; + + /** + * Send traffic across the highest quality path only. This algorithm will still + * use the old path quality metric from protocol version 9. + */ + if (!_canUseMultipath) { + long bestPathQuality = 2147483647; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) { + const long q = _paths[i].p->quality(now) / _paths[i].priority; + if (q <= bestPathQuality) { + bestPathQuality = q; + bestPath = i; + } + } + } else break; + } + if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) { + return _paths[bestPath].p; + } + return SharedPtr<Path>(); + } + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { - if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) { - const long q = _paths[i].p->quality(now) / _paths[i].priority; - if (q <= bestPathQuality) { - bestPathQuality = q; - bestPath = i; + _paths[i].p->processBackgroundPathMeasurements(now); + } + } + + /** + * Randomly distribute traffic across all paths + */ + int numAlivePaths = 0; + int numStalePaths = 0; + if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) { + int alivePaths[ZT_MAX_PEER_NETWORK_PATHS]; + int stalePaths[ZT_MAX_PEER_NETWORK_PATHS]; + memset(&alivePaths, -1, sizeof(alivePaths)); + memset(&stalePaths, -1, sizeof(stalePaths)); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + if (_paths[i].p->alive(now)) { + alivePaths[numAlivePaths] = i; + numAlivePaths++; + } + else { + stalePaths[numStalePaths] = i; + numStalePaths++; } } - } else break; + } + unsigned int r = _freeRandomByte; + if (numAlivePaths > 0) { + int rf = r % numAlivePaths; + return _paths[alivePaths[rf]].p; + } + else if(numStalePaths > 0) { + // Resort to trying any non-expired path + int rf = r % numStalePaths; + return _paths[stalePaths[rf]].p; + } } - if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) - return _paths[bestPath].p; + /** + * Proportionally allocate traffic according to dynamic path quality measurements + */ + if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) { + if ((now - _lastAggregateAllocation) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) { + _lastAggregateAllocation = now; + computeAggregateProportionalAllocation(now); + } + // Randomly choose path according to their allocations + float rf = _freeRandomByte; + for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + if (rf < _paths[i].p->allocation()) { + bestPath = i; + _pathChoiceHist.push(bestPath); // Record which path we chose + break; + } + rf -= _paths[i].p->allocation(); + } + } + if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) { + return _paths[bestPath].p; + } + } return SharedPtr<Path>(); } +char *Peer::interfaceListStr() +{ + std::map<std::string, int> ifnamemap; + char tmp[32]; + const int64_t now = RR->node->now(); + char *ptr = _interfaceListStr; + bool imbalanced = false; + memset(_interfaceListStr, 0, sizeof(_interfaceListStr)); + int alivePathCount = aggregateLinkLogicalPathCount(); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p && _paths[i].p->alive(now)) { + int ipv = _paths[i].p->address().isV4(); + // If this is acting as an aggregate link, check allocations + float targetAllocation = 1.0f / (float)alivePathCount; + float currentAllocation = 1.0f; + if (alivePathCount > 1) { + currentAllocation = (float)_pathChoiceHist.countValue(i) / (float)_pathChoiceHist.count(); + if (fabs(targetAllocation - currentAllocation) > ZT_PATH_IMBALANCE_THRESHOLD) { + imbalanced = true; + } + } + char *ipvStr = ipv ? (char*)"ipv4" : (char*)"ipv6"; + sprintf(tmp, "(%s, %s, %.3f)", _paths[i].p->getName(), ipvStr, currentAllocation); + // Prevent duplicates + if(ifnamemap[_paths[i].p->getName()] != ipv) { + memcpy(ptr, tmp, strlen(tmp)); + ptr += strlen(tmp); + *ptr = ' '; + ptr++; + ifnamemap[_paths[i].p->getName()] = ipv; + } + } + } + ptr--; // Overwrite trailing space + if (imbalanced) { + sprintf(tmp, ", is asymmetrical"); + memcpy(ptr, tmp, sizeof(tmp)); + } else { + *ptr = '\0'; + } + return _interfaceListStr; +} + void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &other) const { unsigned int myBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1]; @@ -376,6 +632,73 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o } } +inline void Peer::processBackgroundPeerTasks(const int64_t now) +{ + // Determine current multipath compatibility with other peer + if ((now - _lastMultipathCompatibilityCheck) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) { + // + // Cache number of available paths so that we can short-circuit multipath logic elsewhere + // + // We also take notice of duplicate paths (same IP only) because we may have + // recently received a direct path push from a peer and our list might contain + // a dead path which hasn't been fully recognized as such. In this case we + // don't want the duplicate to trigger execution of multipath code prematurely. + // + // This is done to support the behavior of auto multipath enable/disable + // without user intervention. + // + int currAlivePathCount = 0; + int duplicatePathsFound = 0; + for (unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + currAlivePathCount++; + for (unsigned int j=0;j<ZT_MAX_PEER_NETWORK_PATHS;++j) { + if (_paths[i].p && _paths[j].p && _paths[i].p->address().ipsEqual2(_paths[j].p->address()) && i != j) { + duplicatePathsFound+=1; + break; + } + } + } + } + _uniqueAlivePathCount = (currAlivePathCount - (duplicatePathsFound / 2)); + _lastMultipathCompatibilityCheck = now; + _localMultipathSupported = ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9)); + _remoteMultipathSupported = _vProto > 9; + // If both peers support multipath and more than one path exist, we can use multipath logic + _canUseMultipath = _localMultipathSupported && _remoteMultipathSupported && (_uniqueAlivePathCount > 1); + } +} + +void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) +{ + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK); + uint32_t bytesToAck = path->bytesToAck(); + outp.append<uint32_t>(bytesToAck); + if (atAddress) { + outp.armor(_key,false); + RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size()); + } else { + RR->sw->send(tPtr,outp,false); + } + path->sentAck(now); +} + +void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) +{ + const int64_t _now = RR->node->now(); + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT); + char qosData[ZT_PATH_MAX_QOS_PACKET_SZ]; + int16_t len = path->generateQoSPacket(_now,qosData); + outp.append(qosData,len); + if (atAddress) { + outp.armor(_key,false); + RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size()); + } else { + RR->sw->send(tPtr,outp,false); + } + path->sentQoS(now); +} + void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atAddress,int64_t now) { Packet outp(_id.address(),RR->identity.address(),Packet::VERB_HELLO); @@ -444,12 +767,30 @@ void Peer::tryMemorizedPath(void *tPtr,int64_t now) unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) { unsigned int sent = 0; - Mutex::Lock _l(_paths_m); const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD); _lastSentFullHello = now; + processBackgroundPeerTasks(now); + + // Emit traces regarding aggregate link status + if (_canUseMultipath) { + int alivePathCount = aggregateLinkPhysicalPathCount(); + if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) { + _lastAggregateStatsReport = now; + if (alivePathCount) { + RR->t->peerLinkAggregateStatistics(NULL,*this); + } + } if (alivePathCount < 2 && _linkIsRedundant) { + _linkIsRedundant = !_linkIsRedundant; + RR->t->peerLinkNoLongerRedundant(NULL,*this); + } if (alivePathCount > 1 && !_linkIsRedundant) { + _linkIsRedundant = !_linkIsRedundant; + RR->t->peerLinkNowRedundant(NULL,*this); + } + } + // Right now we only keep pinging links that have the maximum priority. The // priority is used to track cluster redirections, meaning that when a cluster // redirects us its redirect target links override all other links and we @@ -477,13 +818,14 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) } } else break; } - while(j < ZT_MAX_PEER_NETWORK_PATHS) { - _paths[j].lr = 0; - _paths[j].p.zero(); - _paths[j].priority = 1; - ++j; + if (canUseMultipath()) { + while(j < ZT_MAX_PEER_NETWORK_PATHS) { + _paths[j].lr = 0; + _paths[j].p.zero(); + _paths[j].priority = 1; + ++j; + } } - return sent; } |