diff options
Diffstat (limited to 'node/Peer.cpp')
-rw-r--r-- | node/Peer.cpp | 430 |
1 files changed, 394 insertions, 36 deletions
diff --git a/node/Peer.cpp b/node/Peer.cpp index 71afd852..21bbfabe 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -25,7 +25,6 @@ */ #include "../version.h" - #include "Constants.hpp" #include "Peer.hpp" #include "Node.hpp" @@ -35,6 +34,8 @@ #include "Packet.hpp" #include "Trace.hpp" #include "InetAddress.hpp" +#include "RingBuffer.hpp" +#include "Utils.hpp" namespace ZeroTier { @@ -53,16 +54,31 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _lastCredentialsReceived(0), _lastTrustEstablishedPacketReceived(0), _lastSentFullHello(0), + _lastACKWindowReset(0), + _lastQoSWindowReset(0), + _lastMultipathCompatibilityCheck(0), + _freeRandomByte(0), + _uniqueAlivePathCount(0), + _localMultipathSupported(false), + _remoteMultipathSupported(false), + _canUseMultipath(false), _vProto(0), _vMajor(0), _vMinor(0), _vRevision(0), _id(peerIdentity), _directPathPushCutoffCount(0), - _credentialsCutoffCount(0) + _credentialsCutoffCount(0), + _linkIsBalanced(false), + _linkIsRedundant(false), + _remotePeerMultipathEnabled(false), + _lastAggregateStatsReport(0), + _lastAggregateAllocation(0) { + Utils::getSecureRandom(&_freeRandomByte, 1); if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH)) throw ZT_EXCEPTION_INVALID_ARGUMENT; + _pathChoiceHist = new RingBuffer<int>(ZT_MULTIPATH_PROPORTION_WIN_SZ); } void Peer::received( @@ -70,6 +86,7 @@ void Peer::received( const SharedPtr<Path> &path, const unsigned int hops, const uint64_t packetId, + const unsigned int payloadLength, const Packet::Verb verb, const uint64_t inRePacketId, const Packet::Verb inReVerb, @@ -95,9 +112,25 @@ void Peer::received( path->trustedPacketReceived(now); } + { + Mutex::Lock _l(_paths_m); + + recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now); + + if (_canUseMultipath) { + if (path->needsToSendQoS(now)) { + sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now); + } + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + _paths[i].p->processBackgroundPathMeasurements(now); + } + } + } + } + if (hops == 0) { // If this is a direct packet (no hops), update existing paths or learn new ones - bool havePath = false; { Mutex::Lock _l(_paths_m); @@ -116,20 +149,26 @@ void Peer::received( if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) { Mutex::Lock _l(_paths_m); - // Paths are redunant if they duplicate an alive path to the same IP or + // Paths are redundant if they duplicate an alive path to the same IP or // with the same local socket and address family. bool redundant = false; + unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { - if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) { + if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) { redundant = true; break; } + // If the path is the same address and port, simply assume this is a replacement + if ( (_paths[i].p->address().ipsEqual2(path->address()) && (_paths[i].p->address().port() == path->address().port()))) { + replacePath = i; + break; + } } else break; } - - if (!redundant) { - unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS; + // If the path isn't a duplicate of the same localSocket AND we haven't already determined a replacePath, + // then find the worst path and replace it. + if (!redundant && replacePath == ZT_MAX_PEER_NETWORK_PATHS) { int replacePathQuality = 0; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { @@ -143,16 +182,15 @@ void Peer::received( break; } } - - if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) { - if (verb == Packet::VERB_OK) { - RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId); - _paths[replacePath].lr = now; - _paths[replacePath].p = path; - _paths[replacePath].priority = 1; - } else { - attemptToContact = true; - } + } + if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) { + if (verb == Packet::VERB_OK) { + RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId); + _paths[replacePath].lr = now; + _paths[replacePath].p = path; + _paths[replacePath].priority = 1; + } else { + attemptToContact = true; } } } @@ -232,29 +270,265 @@ void Peer::received( } } -SharedPtr<Path> Peer::getBestPath(int64_t now,bool includeExpired) const +void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId, + uint16_t payloadLength, const Packet::Verb verb, int64_t now) { - Mutex::Lock _l(_paths_m); + // Grab second byte from packetId to use as a source of entropy in the next path selection + _freeRandomByte = (packetId & 0xFF00) >> 8; + if (_canUseMultipath) { + path->recordOutgoingPacket(now, packetId, payloadLength, verb); + } +} +void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId, + uint16_t payloadLength, const Packet::Verb verb, int64_t now) +{ + if (_canUseMultipath) { + if (path->needsToSendAck(now)) { + sendACK(tPtr, path, path->localSocket(), path->address(), now); + } + path->recordIncomingPacket(now, packetId, payloadLength, verb); + } +} + +void Peer::computeAggregateProportionalAllocation(int64_t now) +{ + float maxStability = 0; + float totalRelativeQuality = 0; + float maxThroughput = 1; + float maxScope = 0; + float relStability[ZT_MAX_PEER_NETWORK_PATHS]; + float relThroughput[ZT_MAX_PEER_NETWORK_PATHS]; + memset(&relStability, 0, sizeof(relStability)); + memset(&relThroughput, 0, sizeof(relThroughput)); + // Survey all paths + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + relStability[i] = _paths[i].p->lastComputedStability(); + relThroughput[i] = _paths[i].p->maxLifetimeThroughput(); + maxStability = relStability[i] > maxStability ? relStability[i] : maxStability; + maxThroughput = relThroughput[i] > maxThroughput ? relThroughput[i] : maxThroughput; + maxScope = _paths[i].p->ipScope() > maxScope ? _paths[i].p->ipScope() : maxScope; + } + } + // Convert to relative values + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + relStability[i] /= maxStability ? maxStability : 1; + relThroughput[i] /= maxThroughput ? maxThroughput : 1; + float normalized_ma = Utils::normalize(_paths[i].p->ackAge(now), 0, ZT_PATH_MAX_AGE, 0, 10); + float age_contrib = exp((-1)*normalized_ma); + float relScope = ((float)(_paths[i].p->ipScope()+1) / (maxScope + 1)); + float relQuality = + (relStability[i] * ZT_PATH_CONTRIB_STABILITY) + + (fmax(1, relThroughput[i]) * ZT_PATH_CONTRIB_THROUGHPUT) + + relScope * ZT_PATH_CONTRIB_SCOPE; + relQuality *= age_contrib; + // Arbitrary cutoffs + relQuality = relQuality > (1.00 / 100.0) ? relQuality : 0.0; + relQuality = relQuality < (99.0 / 100.0) ? relQuality : 1.0; + totalRelativeQuality += relQuality; + _paths[i].p->updateRelativeQuality(relQuality); + } + } + // Convert set of relative performances into an allocation set + for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + _paths[i].p->updateComponentAllocationOfAggregateLink((_paths[i].p->relativeQuality() / totalRelativeQuality) * 255); + } + } +} + +int Peer::computeAggregateLinkPacketDelayVariance() +{ + float pdv = 0.0; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + pdv += _paths[i].p->relativeQuality() * _paths[i].p->packetDelayVariance(); + } + } + return pdv; +} + +int Peer::computeAggregateLinkMeanLatency() +{ + int ml = 0; + int pathCount = 0; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + pathCount++; + ml += _paths[i].p->relativeQuality() * _paths[i].p->meanLatency(); + } + } + return ml / pathCount; +} + +int Peer::aggregateLinkPhysicalPathCount() +{ + std::map<std::string, bool> ifnamemap; + int pathCount = 0; + int64_t now = RR->node->now(); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p && _paths[i].p->alive(now)) { + if (!ifnamemap[_paths[i].p->getName()]) { + ifnamemap[_paths[i].p->getName()] = true; + pathCount++; + } + } + } + return pathCount; +} + +int Peer::aggregateLinkLogicalPathCount() +{ + int pathCount = 0; + int64_t now = RR->node->now(); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p && _paths[i].p->alive(now)) { + pathCount++; + } + } + return pathCount; +} + +SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired) +{ + Mutex::Lock _l(_paths_m); unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS; - long bestPathQuality = 2147483647; + + /** + * Send traffic across the highest quality path only. This algorithm will still + * use the old path quality metric from protocol version 9. + */ + if (!_canUseMultipath) { + long bestPathQuality = 2147483647; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) { + const long q = _paths[i].p->quality(now) / _paths[i].priority; + if (q <= bestPathQuality) { + bestPathQuality = q; + bestPath = i; + } + } + } else break; + } + if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) { + return _paths[bestPath].p; + } + return SharedPtr<Path>(); + } + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { - if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) { - const long q = _paths[i].p->quality(now) / _paths[i].priority; - if (q <= bestPathQuality) { - bestPathQuality = q; - bestPath = i; + _paths[i].p->processBackgroundPathMeasurements(now); + } + } + + /** + * Randomly distribute traffic across all paths + */ + int numAlivePaths = 0; + int numStalePaths = 0; + if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) { + int alivePaths[ZT_MAX_PEER_NETWORK_PATHS]; + int stalePaths[ZT_MAX_PEER_NETWORK_PATHS]; + memset(&alivePaths, -1, sizeof(alivePaths)); + memset(&stalePaths, -1, sizeof(stalePaths)); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + if (_paths[i].p->alive(now)) { + alivePaths[numAlivePaths] = i; + numAlivePaths++; + } + else { + stalePaths[numStalePaths] = i; + numStalePaths++; } } - } else break; + } + unsigned int r = _freeRandomByte; + if (numAlivePaths > 0) { + int rf = r % numAlivePaths; + return _paths[alivePaths[rf]].p; + } + else if(numStalePaths > 0) { + // Resort to trying any non-expired path + int rf = r % numStalePaths; + return _paths[stalePaths[rf]].p; + } } - if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) - return _paths[bestPath].p; + /** + * Proportionally allocate traffic according to dynamic path quality measurements + */ + if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) { + if ((now - _lastAggregateAllocation) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) { + _lastAggregateAllocation = now; + computeAggregateProportionalAllocation(now); + } + // Randomly choose path according to their allocations + float rf = _freeRandomByte; + for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + if (rf < _paths[i].p->allocation()) { + bestPath = i; + _pathChoiceHist->push(bestPath); // Record which path we chose + break; + } + rf -= _paths[i].p->allocation(); + } + } + if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) { + return _paths[bestPath].p; + } + } return SharedPtr<Path>(); } +char *Peer::interfaceListStr() +{ + std::map<std::string, int> ifnamemap; + char tmp[32]; + const int64_t now = RR->node->now(); + char *ptr = _interfaceListStr; + bool imbalanced = false; + memset(_interfaceListStr, 0, sizeof(_interfaceListStr)); + int alivePathCount = aggregateLinkLogicalPathCount(); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p && _paths[i].p->alive(now)) { + int ipv = _paths[i].p->address().isV4(); + // If this is acting as an aggregate link, check allocations + float targetAllocation = 1.0 / alivePathCount; + float currentAllocation = 1.0; + if (alivePathCount > 1) { + currentAllocation = (float)_pathChoiceHist->countValue(i) / (float)_pathChoiceHist->count(); + if (fabs(targetAllocation - currentAllocation) > ZT_PATH_IMBALANCE_THRESHOLD) { + imbalanced = true; + } + } + char *ipvStr = ipv ? (char*)"ipv4" : (char*)"ipv6"; + sprintf(tmp, "(%s, %s, %.3f)", _paths[i].p->getName(), ipvStr, currentAllocation); + // Prevent duplicates + if(ifnamemap[_paths[i].p->getName()] != ipv) { + memcpy(ptr, tmp, strlen(tmp)); + ptr += strlen(tmp); + *ptr = ' '; + ptr++; + ifnamemap[_paths[i].p->getName()] = ipv; + } + } + } + ptr--; // Overwrite trailing space + if (imbalanced) { + sprintf(tmp, ", is asymmetrical"); + memcpy(ptr, tmp, sizeof(tmp)); + } else { + *ptr = '\0'; + } + return _interfaceListStr; +} + void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &other) const { unsigned int myBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1]; @@ -376,6 +650,71 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o } } +inline void Peer::processBackgroundPeerTasks(int64_t now) +{ + // Determine current multipath compatibility with other peer + if ((now - _lastMultipathCompatibilityCheck) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) { + // Cache number of available paths so that we can short-circuit multipath logic elsewhere + // + // We also take notice of duplicate paths (same IP only) because we may have + // recently received a direct path push from a peer and our list might contain + // a dead path which hasn't been fully recognized as such. In this case we + // don't want the duplicate to trigger execution of multipath code prematurely. + // + // This is done to support the behavior of auto multipath enable/disable + // without user intervention. + int currAlivePathCount = 0; + int duplicatePathsFound = 0; + for (unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + currAlivePathCount++; + for (unsigned int j=0;j<ZT_MAX_PEER_NETWORK_PATHS;++j) { + if (_paths[i].p && _paths[j].p && _paths[i].p->address().ipsEqual2(_paths[j].p->address()) && i != j) { + duplicatePathsFound+=1; + break; + } + } + } + } + _uniqueAlivePathCount = (currAlivePathCount - (duplicatePathsFound / 2)); + _lastMultipathCompatibilityCheck = now; + _localMultipathSupported = ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9)); + _remoteMultipathSupported = _vProto > 9; + // If both peers support multipath and more than one path exist, we can use multipath logic + _canUseMultipath = _localMultipathSupported && _remoteMultipathSupported && (_uniqueAlivePathCount > 1); + } +} + +void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) +{ + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK); + uint32_t bytesToAck = path->bytesToAck(); + outp.append<uint32_t>(bytesToAck); + if (atAddress) { + outp.armor(_key,false); + RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size()); + } else { + RR->sw->send(tPtr,outp,false); + } + path->sentAck(now); +} + +void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) +{ + const int64_t _now = RR->node->now(); + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT); + char qosData[ZT_PATH_MAX_QOS_PACKET_SZ]; + int16_t len = path->generateQoSPacket(_now,qosData); + outp.append(qosData,len); + if (atAddress) { + outp.armor(_key,false); + RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size()); + } else { + RR->sw->send(tPtr,outp,false); + } + path->sentQoS(now); +} + void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atAddress,int64_t now) { Packet outp(_id.address(),RR->identity.address(),Packet::VERB_HELLO); @@ -444,12 +783,30 @@ void Peer::tryMemorizedPath(void *tPtr,int64_t now) unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) { unsigned int sent = 0; - Mutex::Lock _l(_paths_m); const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD); _lastSentFullHello = now; + processBackgroundPeerTasks(now); + + // Emit traces regarding aggregate link status + if (_canUseMultipath) { + int alivePathCount = aggregateLinkPhysicalPathCount(); + if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) { + _lastAggregateStatsReport = now; + if (alivePathCount) { + RR->t->peerLinkAggregateStatistics(NULL,*this); + } + } if (alivePathCount < 2 && _linkIsRedundant) { + _linkIsRedundant = !_linkIsRedundant; + RR->t->peerLinkNoLongerRedundant(NULL,*this); + } if (alivePathCount > 1 && !_linkIsRedundant) { + _linkIsRedundant = !_linkIsRedundant; + RR->t->peerLinkNowRedundant(NULL,*this); + } + } + // Right now we only keep pinging links that have the maximum priority. The // priority is used to track cluster redirections, meaning that when a cluster // redirects us its redirect target links override all other links and we @@ -477,13 +834,14 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) } } else break; } - while(j < ZT_MAX_PEER_NETWORK_PATHS) { - _paths[j].lr = 0; - _paths[j].p.zero(); - _paths[j].priority = 1; - ++j; + if (canUseMultipath()) { + while(j < ZT_MAX_PEER_NETWORK_PATHS) { + _paths[j].lr = 0; + _paths[j].p.zero(); + _paths[j].priority = 1; + ++j; + } } - return sent; } |