diff options
Diffstat (limited to 'node/Peer.cpp')
-rw-r--r-- | node/Peer.cpp | 150 |
1 files changed, 80 insertions, 70 deletions
diff --git a/node/Peer.cpp b/node/Peer.cpp index 1d581ab8..21bbfabe 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -56,6 +56,12 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _lastSentFullHello(0), _lastACKWindowReset(0), _lastQoSWindowReset(0), + _lastMultipathCompatibilityCheck(0), + _freeRandomByte(0), + _uniqueAlivePathCount(0), + _localMultipathSupported(false), + _remoteMultipathSupported(false), + _canUseMultipath(false), _vProto(0), _vMajor(0), _vMinor(0), @@ -69,6 +75,7 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _lastAggregateStatsReport(0), _lastAggregateAllocation(0) { + Utils::getSecureRandom(&_freeRandomByte, 1); if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH)) throw ZT_EXCEPTION_INVALID_ARGUMENT; _pathChoiceHist = new RingBuffer<int>(ZT_MULTIPATH_PROPORTION_WIN_SZ); @@ -110,7 +117,7 @@ void Peer::received( recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now); - if (canUseMultipath()) { + if (_canUseMultipath) { if (path->needsToSendQoS(now)) { sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now); } @@ -145,17 +152,23 @@ void Peer::received( // Paths are redundant if they duplicate an alive path to the same IP or // with the same local socket and address family. bool redundant = false; + unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { - if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) { + if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) { redundant = true; break; } + // If the path is the same address and port, simply assume this is a replacement + if ( (_paths[i].p->address().ipsEqual2(path->address()) && (_paths[i].p->address().port() == path->address().port()))) { + replacePath = i; + break; + } } else break; } - - if (!redundant) { - unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS; + // If the path isn't a duplicate of the same localSocket AND we haven't already determined a replacePath, + // then find the worst path and replace it. + if (!redundant && replacePath == ZT_MAX_PEER_NETWORK_PATHS) { int replacePathQuality = 0; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { @@ -169,29 +182,15 @@ void Peer::received( break; } } - - // If we find a pre-existing path with the same address, just replace it. - // If we don't find anything we can replace, just use the replacePath that we previously decided on. - if (canUseMultipath()) { - for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { - if (_paths[i].p) { - if ( _paths[i].p->address().ss_family == path->address().ss_family && _paths[i].p->address().ipsEqual2(path->address())) { - replacePath = i; - break; - } - } - } - } - - if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) { - if (verb == Packet::VERB_OK) { - RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId); - _paths[replacePath].lr = now; - _paths[replacePath].p = path; - _paths[replacePath].priority = 1; - } else { - attemptToContact = true; - } + } + if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) { + if (verb == Packet::VERB_OK) { + RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId); + _paths[replacePath].lr = now; + _paths[replacePath].p = path; + _paths[replacePath].priority = 1; + } else { + attemptToContact = true; } } } @@ -274,7 +273,9 @@ void Peer::received( void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now) { - if (localMultipathSupport()) { + // Grab second byte from packetId to use as a source of entropy in the next path selection + _freeRandomByte = (packetId & 0xFF00) >> 8; + if (_canUseMultipath) { path->recordOutgoingPacket(now, packetId, payloadLength, verb); } } @@ -282,7 +283,7 @@ void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t pack void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now) { - if (localMultipathSupport()) { + if (_canUseMultipath) { if (path->needsToSendAck(now)) { sendACK(tPtr, path, path->localSocket(), path->address(), now); } @@ -323,6 +324,9 @@ void Peer::computeAggregateProportionalAllocation(int64_t now) + (fmax(1, relThroughput[i]) * ZT_PATH_CONTRIB_THROUGHPUT) + relScope * ZT_PATH_CONTRIB_SCOPE; relQuality *= age_contrib; + // Arbitrary cutoffs + relQuality = relQuality > (1.00 / 100.0) ? relQuality : 0.0; + relQuality = relQuality < (99.0 / 100.0) ? relQuality : 1.0; totalRelativeQuality += relQuality; _paths[i].p->updateRelativeQuality(relQuality); } @@ -330,12 +334,12 @@ void Peer::computeAggregateProportionalAllocation(int64_t now) // Convert set of relative performances into an allocation set for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { - _paths[i].p->updateComponentAllocationOfAggregateLink(_paths[i].p->relativeQuality() / totalRelativeQuality); + _paths[i].p->updateComponentAllocationOfAggregateLink((_paths[i].p->relativeQuality() / totalRelativeQuality) * 255); } } } -float Peer::computeAggregateLinkPacketDelayVariance() +int Peer::computeAggregateLinkPacketDelayVariance() { float pdv = 0.0; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { @@ -346,9 +350,9 @@ float Peer::computeAggregateLinkPacketDelayVariance() return pdv; } -float Peer::computeAggregateLinkMeanLatency() +int Peer::computeAggregateLinkMeanLatency() { - float ml = 0.0; + int ml = 0; int pathCount = 0; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { @@ -396,7 +400,7 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired) * Send traffic across the highest quality path only. This algorithm will still * use the old path quality metric from protocol version 9. */ - if (!canUseMultipath()) { + if (!_canUseMultipath) { long bestPathQuality = 2147483647; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { @@ -443,15 +447,13 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired) } } } - unsigned int r; - Utils::getSecureRandom(&r, 1); + unsigned int r = _freeRandomByte; if (numAlivePaths > 0) { - // pick a random out of the set deemed "alive" int rf = r % numAlivePaths; return _paths[alivePaths[rf]].p; } else if(numStalePaths > 0) { - // resort to trying any non-expired path + // Resort to trying any non-expired path int rf = r % numStalePaths; return _paths[stalePaths[rf]].p; } @@ -461,40 +463,12 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired) * Proportionally allocate traffic according to dynamic path quality measurements */ if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) { - int numAlivePaths = 0; - int numStalePaths = 0; - int alivePaths[ZT_MAX_PEER_NETWORK_PATHS]; - int stalePaths[ZT_MAX_PEER_NETWORK_PATHS]; - memset(&alivePaths, -1, sizeof(alivePaths)); - memset(&stalePaths, -1, sizeof(stalePaths)); - // Attempt to find an excuse not to use the rest of this algorithm - // Alive or Stale? - for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { - if (_paths[i].p) { - if (_paths[i].p->alive(now)) { - alivePaths[numAlivePaths] = i; - numAlivePaths++; - } else { - stalePaths[numStalePaths] = i; - numStalePaths++; - } - // Record a default path to use as a short-circuit for the rest of the algorithm (if needed) - bestPath = i; - } - } if ((now - _lastAggregateAllocation) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) { _lastAggregateAllocation = now; computeAggregateProportionalAllocation(now); } - if (numAlivePaths == 0 && numStalePaths == 0) { - return SharedPtr<Path>(); - } if (numAlivePaths == 1 || numStalePaths == 1) { - return _paths[bestPath].p; - } // Randomly choose path according to their allocations - unsigned int r; - Utils::getSecureRandom(&r, 1); - float rf = (float)(r %= 100) / 100; + float rf = _freeRandomByte; for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { if (rf < _paths[i].p->allocation()) { @@ -676,6 +650,41 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o } } +inline void Peer::processBackgroundPeerTasks(int64_t now) +{ + // Determine current multipath compatibility with other peer + if ((now - _lastMultipathCompatibilityCheck) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) { + // Cache number of available paths so that we can short-circuit multipath logic elsewhere + // + // We also take notice of duplicate paths (same IP only) because we may have + // recently received a direct path push from a peer and our list might contain + // a dead path which hasn't been fully recognized as such. In this case we + // don't want the duplicate to trigger execution of multipath code prematurely. + // + // This is done to support the behavior of auto multipath enable/disable + // without user intervention. + int currAlivePathCount = 0; + int duplicatePathsFound = 0; + for (unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + currAlivePathCount++; + for (unsigned int j=0;j<ZT_MAX_PEER_NETWORK_PATHS;++j) { + if (_paths[i].p && _paths[j].p && _paths[i].p->address().ipsEqual2(_paths[j].p->address()) && i != j) { + duplicatePathsFound+=1; + break; + } + } + } + } + _uniqueAlivePathCount = (currAlivePathCount - (duplicatePathsFound / 2)); + _lastMultipathCompatibilityCheck = now; + _localMultipathSupported = ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9)); + _remoteMultipathSupported = _vProto > 9; + // If both peers support multipath and more than one path exist, we can use multipath logic + _canUseMultipath = _localMultipathSupported && _remoteMultipathSupported && (_uniqueAlivePathCount > 1); + } +} + void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) { Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK); @@ -774,14 +783,15 @@ void Peer::tryMemorizedPath(void *tPtr,int64_t now) unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) { unsigned int sent = 0; - Mutex::Lock _l(_paths_m); const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD); _lastSentFullHello = now; + processBackgroundPeerTasks(now); + // Emit traces regarding aggregate link status - if (canUseMultipath()) { + if (_canUseMultipath) { int alivePathCount = aggregateLinkPhysicalPathCount(); if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) { _lastAggregateStatsReport = now; |