diff options
Diffstat (limited to 'node/Peer.cpp')
| -rw-r--r-- | node/Peer.cpp | 428 |
1 files changed, 259 insertions, 169 deletions
diff --git a/node/Peer.cpp b/node/Peer.cpp index c46ed751..8deaa362 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -24,8 +24,8 @@ * of your own application. */ -#include "../version.h" +#include "../version.h" #include "Constants.hpp" #include "Peer.hpp" #include "Node.hpp" @@ -36,6 +36,7 @@ #include "Trace.hpp" #include "InetAddress.hpp" #include "RingBuffer.hpp" +#include "Utils.hpp" namespace ZeroTier { @@ -61,13 +62,13 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _id(peerIdentity), _directPathPushCutoffCount(0), _credentialsCutoffCount(0), - _linkBalanceStatus(false), - _linkRedundancyStatus(false) + _linkIsBalanced(false), + _linkIsRedundant(false), + _lastAggregateStatsReport(0) { if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH)) throw ZT_EXCEPTION_INVALID_ARGUMENT; _pathChoiceHist = new RingBuffer<int>(ZT_MULTIPATH_PROPORTION_WIN_SZ); - _flowBalanceHist = new RingBuffer<float>(ZT_MULTIPATH_PROPORTION_WIN_SZ); } void Peer::received( @@ -75,6 +76,7 @@ void Peer::received( const SharedPtr<Path> &path, const unsigned int hops, const uint64_t packetId, + const unsigned int payloadLength, const Packet::Verb verb, const uint64_t inRePacketId, const Packet::Verb inReVerb, @@ -103,13 +105,13 @@ void Peer::received( { Mutex::Lock _l(_paths_m); if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) { - if ((now - _lastPathPrune) > ZT_CLOSED_PATH_PRUNING_INTERVAL) { - _lastPathPrune = now; - prunePaths(); + recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now); + if (path->needsToSendQoS(now)) { + sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now); } for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { - _paths[i].p->measureLink(now); + _paths[i].p->processBackgroundPathMeasurements(now, _id.address().toInt()); } } } @@ -117,7 +119,6 @@ void Peer::received( if (hops == 0) { // If this is a direct packet (no hops), update existing paths or learn new ones - bool havePath = false; { Mutex::Lock _l(_paths_m); @@ -164,6 +165,19 @@ void Peer::received( } } + // If we find a pre-existing path with the same address, just replace it. + // If we don't find anything we can replace, just use the replacePath that we previously decided on. + if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) { + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + if ( _paths[i].p->address().ss_family == path->address().ss_family && _paths[i].p->address().ipsEqual2(path->address())) { + replacePath = i; + break; + } + } + } + } + if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) { if (verb == Packet::VERB_OK) { RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId); @@ -252,6 +266,117 @@ void Peer::received( } } +void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId, + uint16_t payloadLength, const Packet::Verb verb, int64_t now) +{ + if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) { + if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) { + path->expectingAck(now, packetId, payloadLength); + } + } +} + +void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId, + uint16_t payloadLength, const Packet::Verb verb, int64_t now) +{ + if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) { + if (path->needsToSendAck(now)) { + sendACK(tPtr, path, path->localSocket(), path->address(), now); + } + path->recordIncomingPacket(now, packetId, payloadLength); + } +} + +float Peer::computeAggregateLinkRelativeQuality(int64_t now) +{ + float maxStability = 0; + float totalRelativeQuality = 0; + float maxThroughput = 1; + float maxScope = 0; + float relStability[ZT_MAX_PEER_NETWORK_PATHS]; + float relThroughput[ZT_MAX_PEER_NETWORK_PATHS]; + memset(&relStability, 0, sizeof(relStability)); + memset(&relThroughput, 0, sizeof(relThroughput)); + // Survey all paths + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + relStability[i] = _paths[i].p->lastComputedStability(); + relThroughput[i] = _paths[i].p->maxLifetimeThroughput(); + maxStability = relStability[i] > maxStability ? relStability[i] : maxStability; + maxThroughput = relThroughput[i] > maxThroughput ? relThroughput[i] : maxThroughput; + maxScope = _paths[i].p->ipScope() > maxScope ? _paths[i].p->ipScope() : maxScope; + } + } + // Convert to relative values + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + relStability[i] /= maxStability ? maxStability : 1; + relThroughput[i] /= maxThroughput ? maxThroughput : 1; + float normalized_ma = Utils::normalize(_paths[i].p->ackAge(now), 0, ZT_PATH_MAX_AGE, 0, 10); + float age_contrib = exp((-1)*normalized_ma); + float relScope = ((float)(_paths[i].p->ipScope()+1) / (maxScope + 1)); + float relQuality = + (relStability[i] * ZT_PATH_CONTRIB_STABILITY) + + (fmax(1, relThroughput[i]) * ZT_PATH_CONTRIB_THROUGHPUT) + + relScope * ZT_PATH_CONTRIB_SCOPE; + relQuality *= age_contrib; + totalRelativeQuality += relQuality; + _paths[i].p->updateRelativeQuality(relQuality); + } + } + return (float)1.0 / totalRelativeQuality; // Used later to convert relative quantities into flow allocations +} + +float Peer::computeAggregateLinkPacketDelayVariance() +{ + float pdv = 0.0; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + pdv += _paths[i].p->relativeQuality() * _paths[i].p->packetDelayVariance(); + } + } + return pdv; +} + +float Peer::computeAggregateLinkMeanLatency() +{ + float ml = 0.0; + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + ml += _paths[i].p->relativeQuality() * _paths[i].p->meanLatency(); + } + } + return ml; +} + +int Peer::aggregateLinkPhysicalPathCount() +{ + std::map<std::string, bool> ifnamemap; + int pathCount = 0; + int64_t now = RR->node->now(); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p && _paths[i].p->alive(now)) { + if (!ifnamemap[_paths[i].p->getName()]) { + ifnamemap[_paths[i].p->getName()] = true; + pathCount++; + } + } + } + return pathCount; +} + +int Peer::aggregateLinkLogicalPathCount() +{ + int pathCount = 0; + int64_t now = RR->node->now(); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p && _paths[i].p->alive(now)) { + pathCount++; + } + } + return pathCount; +} + SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired) { Mutex::Lock _l(_paths_m); @@ -264,7 +389,7 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired) if (RR->node->getMultipathMode() == ZT_MULTIPATH_NONE) { long bestPathQuality = 2147483647; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { - if (_paths[i].p && _paths[i].p->isValidState()) { + if (_paths[i].p) { if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) { const long q = _paths[i].p->quality(now) / _paths[i].priority; if (q <= bestPathQuality) { @@ -280,23 +405,14 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired) return SharedPtr<Path>(); } - if ((now - _lastPathPrune) > ZT_CLOSED_PATH_PRUNING_INTERVAL) { - _lastPathPrune = now; - prunePaths(); - } for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { - _paths[i].p->measureLink(now); + _paths[i].p->processBackgroundPathMeasurements(now, _id.address().toInt()); } } /** * Randomly distribute traffic across all paths - * - * Behavior: - * - If path DOWN: Stop randomly choosing that path - * - If path UP: Start randomly choosing that path - * - If all paths are unresponsive: randomly choose from all paths */ int numAlivePaths = 0; int numStalePaths = 0; @@ -307,15 +423,13 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired) memset(&stalePaths, -1, sizeof(stalePaths)); for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { if (_paths[i].p) { - if (_paths[i].p->isValidState()) { - if (_paths[i].p->alive(now)) { - alivePaths[numAlivePaths] = i; - numAlivePaths++; - } - else { - stalePaths[numStalePaths] = i; - numStalePaths++; - } + if (_paths[i].p->alive(now)) { + alivePaths[numAlivePaths] = i; + numAlivePaths++; + } + else { + stalePaths[numStalePaths] = i; + numStalePaths++; } } } @@ -337,160 +451,104 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired) * Proportionally allocate traffic according to dynamic path quality measurements */ if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) { - float relq[ZT_MAX_PEER_NETWORK_PATHS]; - memset(&relq, 0, sizeof(relq)); float alloc[ZT_MAX_PEER_NETWORK_PATHS]; memset(&alloc, 0, sizeof(alloc)); - - // Survey - // - // Take a survey of all available link qualities. We use this to determine if we - // can skip this algorithm altogether and if not, to establish baseline for physical - // link quality used in later calculations. - // - // We find the min/max quality of our currently-active links so - // that we can form a relative scale to rank each link proportionally - // to each other link. - uint16_t alivePaths[ZT_MAX_PEER_NETWORK_PATHS]; - uint16_t stalePaths[ZT_MAX_PEER_NETWORK_PATHS]; + int numAlivePaths = 0; + int numStalePaths = 0; + int alivePaths[ZT_MAX_PEER_NETWORK_PATHS]; + int stalePaths[ZT_MAX_PEER_NETWORK_PATHS]; memset(&alivePaths, -1, sizeof(alivePaths)); memset(&stalePaths, -1, sizeof(stalePaths)); - uint16_t numAlivePaths = 0; - uint16_t numStalePaths = 0; - float minQuality = 10000; - float maxQuality = -1; - float currQuality; - for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { - if (_paths[i].p && _paths[i].p->isValidState()) { - if (!_paths[i].p->monitorsReady()) { - // TODO: This should fix itself anyway but we should test whether forcing the use of a new path will - // aid in establishing flow balance more quickly. - } - // Compute quality here, going forward we will use lastComputedQuality() - currQuality = _paths[i].p->computeQuality(now); - if (!_paths[i].p->stale(now)) { + // Attempt to find an excuse not to use the rest of this algorithm + // Alive or Stale? + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p) { + if (_paths[i].p->alive(now)) { + alivePaths[numAlivePaths] = i; numAlivePaths++; - } - else { + } else { + stalePaths[numStalePaths] = i; numStalePaths++; } - if (currQuality > maxQuality) { - maxQuality = currQuality; - bestPath = i; - } - if (currQuality < minQuality) { - minQuality = currQuality; - } - relq[i] = currQuality; + // Record a default path to use as a short-circuit for the rest of the algorithm + bestPath = i; } } - - // Attempt to find an excuse not to use the rest of this algorithm - if (bestPath == ZT_MAX_PEER_NETWORK_PATHS || (numAlivePaths == 0 && numStalePaths == 0)) { + if (numAlivePaths == 0 && numStalePaths == 0) { return SharedPtr<Path>(); - } if (numAlivePaths == 1) { - //return _paths[bestPath].p; - } if (numStalePaths == 1) { - //return _paths[bestPath].p; - } - - // Relative quality - // - // The strongest link will have a value of 1.0 whereas every other - // link will have a value which represents some fraction of the strongest link. - float totalRelativeQuality = 0; - for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { - if (_paths[i].p && _paths[i].p->isValidState()) { - relq[i] /= maxQuality ? maxQuality : 1; - totalRelativeQuality += relq[i]; - } + } if (numAlivePaths == 1 || numStalePaths == 1) { + return _paths[bestPath].p; } - - // Convert the relative quality values into flow allocations. - // Additionally, determine whether each path in the flow is - // contributing more or less than its target allocation. If - // it is contributing more than required, don't allow it to be - // randomly selected for the next packet. If however the path - // needs to contribute more to the flow, we should record - float imbalance = 0; - float qualityScalingFactor = (float)1.0 / totalRelativeQuality; + // Compare paths to each-other + float qualityScalingFactor = computeAggregateLinkRelativeQuality(now); + // Convert set of relative performances into an allocation set for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { - // Out of the last N packets to this peer, how many were sent by this path? - int numPktSentWithinWin = (int)_pathChoiceHist->countValue(i); - // Compute traffic allocation for each path in the flow - if (_paths[i].p && _paths[i].p->isValidState()) { - // Allocation - // This is the percentage of traffic we want to send over a given path - alloc[i] = relq[i] * qualityScalingFactor; - float currProportion = numPktSentWithinWin / (float)ZT_MULTIPATH_PROPORTION_WIN_SZ; - float targetProportion = alloc[i]; - float diffProportion = currProportion - targetProportion; - // Imbalance - // - // This is the sum of the distances of each path's currently observed flow contributions - // from its most recent target allocation. In other words, this is a measure of how closely we - // are adhering to our desired allocations. It is worth noting that this value can be greater - // than 1.0 if a significant change to allocations is made by the algorithm, this will - // eventually correct itself. - imbalance += fabs(diffProportion); - if (diffProportion < 0) { - alloc[i] = targetProportion; - } - else { - alloc[i] = targetProportion; - } - } - } - - // Compute and record current flow balance - float balance = (float)1.0 - imbalance; - if (balance >= ZT_MULTIPATH_FLOW_BALANCE_THESHOLD) { - if (!_linkBalanceStatus) { - _linkBalanceStatus = true; - RR->t->peerLinkBalanced(NULL,0,*this); - } - } - else { - if (_linkBalanceStatus) { - _linkBalanceStatus = false; - RR->t->peerLinkImbalanced(NULL,0,*this); + if (_paths[i].p) { + alloc[i] = _paths[i].p->relativeQuality() * qualityScalingFactor; } } - - // Record the current flow balance. Later used for computing a mean flow balance value. - _flowBalanceHist->push(balance); - - // Randomly choose path from allocated candidates + // Randomly choose path according to their allocations unsigned int r; Utils::getSecureRandom(&r, 1); float rf = (float)(r %= 100) / 100; for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { - if (_paths[i].p && _paths[i].p->isValidState() && _paths[i].p->address().isV4()) { - if (alloc[i] > 0 && rf < alloc[i]) { + if (_paths[i].p) { + if (rf < alloc[i]) { bestPath = i; _pathChoiceHist->push(bestPath); // Record which path we chose break; } - if (alloc[i] > 0) { - rf -= alloc[i]; - } - else { - rf -= alloc[i]*-1; - } + rf -= alloc[i]; } } if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) { return _paths[bestPath].p; } - return SharedPtr<Path>(); } + return SharedPtr<Path>(); +} - // Adhere to a user-defined interface/allocation scheme - if (RR->node->getMultipathMode() == ZT_MULTIPATH_MANUALLY_BALANCED) { - // TODO +char *Peer::interfaceListStr() +{ + std::map<std::string, int> ifnamemap; + char tmp[32]; + const int64_t now = RR->node->now(); + char *ptr = _interfaceListStr; + bool imbalanced = false; + memset(_interfaceListStr, 0, sizeof(_interfaceListStr)); + int alivePathCount = aggregateLinkLogicalPathCount(); + for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { + if (_paths[i].p && _paths[i].p->alive(now)) { + int ipv = _paths[i].p->address().isV4(); + // If this is acting as an aggregate link, check allocations + float targetAllocation = 1.0 / alivePathCount; + float currentAllocation = 1.0; + if (alivePathCount > 1) { + currentAllocation = (float)_pathChoiceHist->countValue(i) / (float)_pathChoiceHist->count(); + if (fabs(targetAllocation - currentAllocation) > ZT_PATH_IMBALANCE_THRESHOLD) { + imbalanced = true; + } + } + char *ipvStr = ipv ? (char*)"ipv4" : (char*)"ipv6"; + sprintf(tmp, "(%s, %s, %5.4f)", _paths[i].p->getName(), ipvStr, currentAllocation); + // Prevent duplicates + if(ifnamemap[_paths[i].p->getName()] != ipv) { + memcpy(ptr, tmp, strlen(tmp)); + ptr += strlen(tmp); + *ptr = ' '; + ptr++; + ifnamemap[_paths[i].p->getName()] = ipv; + } + } } - - return SharedPtr<Path>(); + ptr--; // Overwrite trailing space + if (imbalanced) { + sprintf(tmp, ", is IMBALANCED"); + memcpy(ptr, tmp, sizeof(tmp)); + } else { + *ptr = '\0'; + } + return _interfaceListStr; } void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &other) const @@ -614,6 +672,35 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o } } +void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) +{ + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK); + uint32_t bytesToAck = path->bytesToAck(); + outp.append<uint32_t>(bytesToAck); + if (atAddress) { + outp.armor(_key,false); + RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size()); + } else { + RR->sw->send(tPtr,outp,false); + } + path->sentAck(now); +} + +void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) +{ + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT); + char qosData[ZT_PATH_MAX_QOS_PACKET_SZ]; + path->generateQoSPacket(now,qosData); + outp.append(qosData,sizeof(qosData)); + if (atAddress) { + outp.armor(_key,false); + RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size()); + } else { + RR->sw->send(tPtr,outp,false); + } + path->sentQoS(now); +} + void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atAddress,int64_t now) { Packet outp(_id.address(),RR->identity.address(),Packet::VERB_HELLO); @@ -688,6 +775,25 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD); _lastSentFullHello = now; + // Emit traces regarding the status of aggregate links + if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) { + int alivePathCount = aggregateLinkPhysicalPathCount(); + if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) { + _lastAggregateStatsReport = now; + if (alivePathCount) { + RR->t->peerLinkAggregateStatistics(NULL,*this); + } + } + // Report link redundancy + if (alivePathCount < 2 && _linkIsRedundant) { + _linkIsRedundant = !_linkIsRedundant; + RR->t->peerLinkNoLongerRedundant(NULL,*this); + } if (alivePathCount > 1 && !_linkIsRedundant) { + _linkIsRedundant = !_linkIsRedundant; + RR->t->peerLinkNowRedundant(NULL,*this); + } + } + // Right now we only keep pinging links that have the maximum priority. The // priority is used to track cluster redirections, meaning that when a cluster // redirects us its redirect target links override all other links and we @@ -726,22 +832,6 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) return sent; } -unsigned int Peer::prunePaths() -{ - unsigned int pruned = 0; - for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { - if (_paths[i].p) { - if(_paths[i].p->isClosed() || !_paths[i].p->isValidState()) { - _paths[i].lr = 0; - _paths[i].p.zero(); - _paths[i].priority = 1; - pruned++; - } - } - } - return pruned; -} - void Peer::clusterRedirect(void *tPtr,const SharedPtr<Path> &originatingPath,const InetAddress &remoteAddress,const int64_t now) { SharedPtr<Path> np(RR->topology->getPath(originatingPath->localSocket(),remoteAddress)); |
