From 6a2ba4baca326272c45930208b70cfedf8cb1638 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Tue, 1 May 2018 16:32:15 -0700 Subject: Introduced basic multipath support --- node/Peer.cpp | 289 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 272 insertions(+), 17 deletions(-) (limited to 'node/Peer.cpp') diff --git a/node/Peer.cpp b/node/Peer.cpp index 71afd852..862a4529 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -35,6 +35,7 @@ #include "Packet.hpp" #include "Trace.hpp" #include "InetAddress.hpp" +#include "RingBuffer.hpp" namespace ZeroTier { @@ -59,10 +60,14 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _vRevision(0), _id(peerIdentity), _directPathPushCutoffCount(0), - _credentialsCutoffCount(0) + _credentialsCutoffCount(0), + _linkBalanceStatus(false), + _linkRedundancyStatus(false) { if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH)) throw ZT_EXCEPTION_INVALID_ARGUMENT; + _pathChoiceHist = new RingBuffer(ZT_MULTIPATH_PROPORTION_WIN_SZ); + _flowBalanceHist = new RingBuffer(ZT_MULTIPATH_PROPORTION_WIN_SZ); } void Peer::received( @@ -95,6 +100,18 @@ void Peer::received( path->trustedPacketReceived(now); } + if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) { + if ((now - _lastPathPrune) > ZT_CLOSED_PATH_PRUNING_INTERVAL) { + _lastPathPrune = now; + prunePaths(); + } + for(unsigned int i=0;imeasureLink(now); + } + } + } + if (hops == 0) { // If this is a direct packet (no hops), update existing paths or learn new ones @@ -232,26 +249,246 @@ void Peer::received( } } -SharedPtr Peer::getBestPath(int64_t now,bool includeExpired) const +SharedPtr Peer::getAppropriatePath(int64_t now, bool includeExpired) { Mutex::Lock _l(_paths_m); - unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS; - long bestPathQuality = 2147483647; + + /** + * Send traffic across the highest quality path only. This algorithm will still + * use the old path quality metric. + */ + if (RR->node->getMultipathMode() == ZT_MULTIPATH_NONE) { + long bestPathQuality = 2147483647; + for(unsigned int i=0;iisValidState()) { + if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) { + const long q = _paths[i].p->quality(now) / _paths[i].priority; + if (q <= bestPathQuality) { + bestPathQuality = q; + bestPath = i; + } + } + } else break; + } + if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) { + return _paths[bestPath].p; + } + return SharedPtr(); + } + + if ((now - _lastPathPrune) > ZT_CLOSED_PATH_PRUNING_INTERVAL) { + _lastPathPrune = now; + prunePaths(); + } for(unsigned int i=0;iquality(now) / _paths[i].priority; - if (q <= bestPathQuality) { - bestPathQuality = q; + _paths[i].p->measureLink(now); + } + } + + /** + * Randomly distribute traffic across all paths + * + * Behavior: + * - If path DOWN: Stop randomly choosing that path + * - If path UP: Start randomly choosing that path + * - If all paths are unresponsive: randomly choose from all paths + */ + int numAlivePaths = 0; + int numStalePaths = 0; + if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) { + int alivePaths[ZT_MAX_PEER_NETWORK_PATHS]; + int stalePaths[ZT_MAX_PEER_NETWORK_PATHS]; + memset(&alivePaths, -1, sizeof(alivePaths)); + memset(&stalePaths, -1, sizeof(stalePaths)); + for(unsigned int i=0;iisValidState()) { + if (_paths[i].p->alive(now)) { + alivePaths[numAlivePaths] = i; + numAlivePaths++; + } + else { + stalePaths[numStalePaths] = i; + numStalePaths++; + } + } + } + } + unsigned int r; + Utils::getSecureRandom(&r, 1); + if (numAlivePaths > 0) { + // pick a random out of the set deemed "alive" + int rf = (float)(r %= numAlivePaths); + return _paths[alivePaths[rf]].p; + } + else if(numStalePaths > 0) { + // resort to trying any non-expired path + int rf = (float)(r %= numStalePaths); + return _paths[stalePaths[rf]].p; + } + } + + /** + * Proportionally allocate traffic according to dynamic path quality measurements + */ + if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) { + float relq[ZT_MAX_PEER_NETWORK_PATHS]; + memset(&relq, 0, sizeof(relq)); + float alloc[ZT_MAX_PEER_NETWORK_PATHS]; + memset(&alloc, 0, sizeof(alloc)); + + // Survey + // + // Take a survey of all available link qualities. We use this to determine if we + // can skip this algorithm altogether and if not, to establish baseline for physical + // link quality used in later calculations. + // + // We find the min/max quality of our currently-active links so + // that we can form a relative scale to rank each link proportionally + // to each other link. + uint16_t alivePaths[ZT_MAX_PEER_NETWORK_PATHS]; + uint16_t stalePaths[ZT_MAX_PEER_NETWORK_PATHS]; + memset(&alivePaths, -1, sizeof(alivePaths)); + memset(&stalePaths, -1, sizeof(stalePaths)); + uint16_t numAlivePaths = 0; + uint16_t numStalePaths = 0; + float minQuality = 10000; + float maxQuality = -1; + float currQuality; + for(uint16_t i=0;iisValidState()) { + if (!_paths[i].p->monitorsReady()) { + // TODO: This should fix itself anyway but we should test whether forcing the use of a new path will + // aid in establishing flow balance more quickly. + } + // Compute quality here, going forward we will use lastComputedQuality() + currQuality = _paths[i].p->computeQuality(now); + if (!_paths[i].p->stale(now)) { + alivePaths[i] = currQuality; + numAlivePaths++; + } + else { + stalePaths[i] = currQuality; + numStalePaths++; + } + if (currQuality > maxQuality) { + maxQuality = currQuality; bestPath = i; } + if (currQuality < minQuality) { + minQuality = currQuality; + } + relq[i] = currQuality; } - } else break; + } + + // Attempt to find an excuse not to use the rest of this algorithm + if (bestPath == ZT_MAX_PEER_NETWORK_PATHS || (numAlivePaths == 0 && numStalePaths == 0)) { + return SharedPtr(); + } if (numAlivePaths == 1) { + return _paths[bestPath].p; + } if (numStalePaths == 1) { + return _paths[bestPath].p; + } + + // Relative quality + // + // The strongest link will have a value of 1.0 whereas every other + // link will have a value which represents some fraction of the strongest link. + float totalRelativeQuality = 0; + for(unsigned int i=0;iisValidState()) { + relq[i] /= maxQuality ? maxQuality : 1; + totalRelativeQuality += relq[i]; + } + } + + // Convert the relative quality values into flow allocations. + // Additionally, determine whether each path in the flow is + // contributing more or less than its target allocation. If + // it is contributing more than required, don't allow it to be + // randomly selected for the next packet. If however the path + // needs to contribute more to the flow, we should record + float imbalance = 0; + float qualityScalingFactor = 1.0 / totalRelativeQuality; + for(uint16_t i=0;icountValue((float)i); + // Compute traffic allocation for each path in the flow + if (_paths[i].p && _paths[i].p->isValidState()) { + // Allocation + // This is the percentage of traffic we want to send over a given path + alloc[i] = relq[i] * qualityScalingFactor; + float currProportion = numPktSentWithinWin / (float)ZT_MULTIPATH_PROPORTION_WIN_SZ; + float targetProportion = alloc[i]; + float diffProportion = currProportion - targetProportion; + // Imbalance + // + // This is the sum of the distances of each path's currently observed flow contributions + // from its most recent target allocation. In other words, this is a measure of how closely we + // are adhering to our desired allocations. It is worth noting that this value can be greater + // than 1.0 if a significant change to allocations is made by the algorithm, this will + // eventually correct itself. + imbalance += fabs(diffProportion); + if (diffProportion < 0) { + alloc[i] = targetProportion; + } + else { + alloc[i] = targetProportion; + } + } + } + + // Compute and record current flow balance + float balance = 1.0 - imbalance; + if (balance >= ZT_MULTIPATH_FLOW_BALANCE_THESHOLD) { + if (!_linkBalanceStatus) { + _linkBalanceStatus = true; + RR->t->peerLinkBalanced(NULL,0,*this); + } + } + else { + if (_linkBalanceStatus) { + _linkBalanceStatus = false; + RR->t->peerLinkImbalanced(NULL,0,*this); + } + } + + // Record the current flow balance. Later used for computing a mean flow balance value. + _flowBalanceHist->push(balance); + + // Randomly choose path from allocated candidates + unsigned int r; + Utils::getSecureRandom(&r, 1); + float rf = (float)(r %= 100) / 100; + for(int i=0;iisValidState() && _paths[i].p->address().isV4()) { + if (alloc[i] > 0 && rf < alloc[i]) { + bestPath = i; + _pathChoiceHist->push(bestPath); // Record which path we chose + break; + } + if (alloc[i] > 0) { + rf -= alloc[i]; + } + else { + rf -= alloc[i]*-1; + } + } + } + if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) { + return _paths[bestPath].p; + } + return SharedPtr(); + } + + // Adhere to a user-defined interface/allocation scheme + if (RR->node->getMultipathMode() == ZT_MULTIPATH_MANUALLY_BALANCED) { + // TODO } - if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) - return _paths[bestPath].p; return SharedPtr(); } @@ -477,16 +714,34 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) } } else break; } - while(j < ZT_MAX_PEER_NETWORK_PATHS) { - _paths[j].lr = 0; - _paths[j].p.zero(); - _paths[j].priority = 1; - ++j; + if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) { + while(j < ZT_MAX_PEER_NETWORK_PATHS) { + _paths[j].lr = 0; + _paths[j].p.zero(); + _paths[j].priority = 1; + ++j; + } } - return sent; } +unsigned int Peer::prunePaths() +{ + Mutex::Lock _l(_paths_m); + unsigned int pruned = 0; + for(unsigned int i=0;iisClosed() || !_paths[i].p->isValidState()) { + _paths[i].lr = 0; + _paths[i].p.zero(); + _paths[i].priority = 1; + pruned++; + } + } + } + return pruned; +} + void Peer::clusterRedirect(void *tPtr,const SharedPtr &originatingPath,const InetAddress &remoteAddress,const int64_t now) { SharedPtr np(RR->topology->getPath(originatingPath->localSocket(),remoteAddress)); -- cgit v1.2.3