summaryrefslogtreecommitdiff
path: root/node/Peer.cpp
diff options
context:
space:
mode:
authorJoseph Henry <josephjah@gmail.com>2018-05-01 16:32:15 -0700
committerJoseph Henry <josephjah@gmail.com>2018-05-01 16:32:15 -0700
commit6a2ba4baca326272c45930208b70cfedf8cb1638 (patch)
tree434403aecca63908909678bd234ef8b4ffb1d1e4 /node/Peer.cpp
parent836d897aecc193ec3477e67858237a3f97819024 (diff)
downloadinfinitytier-6a2ba4baca326272c45930208b70cfedf8cb1638.tar.gz
infinitytier-6a2ba4baca326272c45930208b70cfedf8cb1638.zip
Introduced basic multipath support
Diffstat (limited to 'node/Peer.cpp')
-rw-r--r--node/Peer.cpp289
1 files changed, 272 insertions, 17 deletions
diff --git a/node/Peer.cpp b/node/Peer.cpp
index 71afd852..862a4529 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -35,6 +35,7 @@
#include "Packet.hpp"
#include "Trace.hpp"
#include "InetAddress.hpp"
+#include "RingBuffer.hpp"
namespace ZeroTier {
@@ -59,10 +60,14 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
_vRevision(0),
_id(peerIdentity),
_directPathPushCutoffCount(0),
- _credentialsCutoffCount(0)
+ _credentialsCutoffCount(0),
+ _linkBalanceStatus(false),
+ _linkRedundancyStatus(false)
{
if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH))
throw ZT_EXCEPTION_INVALID_ARGUMENT;
+ _pathChoiceHist = new RingBuffer<int>(ZT_MULTIPATH_PROPORTION_WIN_SZ);
+ _flowBalanceHist = new RingBuffer<float>(ZT_MULTIPATH_PROPORTION_WIN_SZ);
}
void Peer::received(
@@ -95,6 +100,18 @@ void Peer::received(
path->trustedPacketReceived(now);
}
+ if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
+ if ((now - _lastPathPrune) > ZT_CLOSED_PATH_PRUNING_INTERVAL) {
+ _lastPathPrune = now;
+ prunePaths();
+ }
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ _paths[i].p->measureLink(now);
+ }
+ }
+ }
+
if (hops == 0) {
// If this is a direct packet (no hops), update existing paths or learn new ones
@@ -232,26 +249,246 @@ void Peer::received(
}
}
-SharedPtr<Path> Peer::getBestPath(int64_t now,bool includeExpired) const
+SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
{
Mutex::Lock _l(_paths_m);
-
unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS;
- long bestPathQuality = 2147483647;
+
+ /**
+ * Send traffic across the highest quality path only. This algorithm will still
+ * use the old path quality metric.
+ */
+ if (RR->node->getMultipathMode() == ZT_MULTIPATH_NONE) {
+ long bestPathQuality = 2147483647;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->isValidState()) {
+ if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
+ const long q = _paths[i].p->quality(now) / _paths[i].priority;
+ if (q <= bestPathQuality) {
+ bestPathQuality = q;
+ bestPath = i;
+ }
+ }
+ } else break;
+ }
+ if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) {
+ return _paths[bestPath].p;
+ }
+ return SharedPtr<Path>();
+ }
+
+ if ((now - _lastPathPrune) > ZT_CLOSED_PATH_PRUNING_INTERVAL) {
+ _lastPathPrune = now;
+ prunePaths();
+ }
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
- const long q = _paths[i].p->quality(now) / _paths[i].priority;
- if (q <= bestPathQuality) {
- bestPathQuality = q;
+ _paths[i].p->measureLink(now);
+ }
+ }
+
+ /**
+ * Randomly distribute traffic across all paths
+ *
+ * Behavior:
+ * - If path DOWN: Stop randomly choosing that path
+ * - If path UP: Start randomly choosing that path
+ * - If all paths are unresponsive: randomly choose from all paths
+ */
+ int numAlivePaths = 0;
+ int numStalePaths = 0;
+ if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) {
+ int alivePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ int stalePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ memset(&alivePaths, -1, sizeof(alivePaths));
+ memset(&stalePaths, -1, sizeof(stalePaths));
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if (_paths[i].p->isValidState()) {
+ if (_paths[i].p->alive(now)) {
+ alivePaths[numAlivePaths] = i;
+ numAlivePaths++;
+ }
+ else {
+ stalePaths[numStalePaths] = i;
+ numStalePaths++;
+ }
+ }
+ }
+ }
+ unsigned int r;
+ Utils::getSecureRandom(&r, 1);
+ if (numAlivePaths > 0) {
+ // pick a random out of the set deemed "alive"
+ int rf = (float)(r %= numAlivePaths);
+ return _paths[alivePaths[rf]].p;
+ }
+ else if(numStalePaths > 0) {
+ // resort to trying any non-expired path
+ int rf = (float)(r %= numStalePaths);
+ return _paths[stalePaths[rf]].p;
+ }
+ }
+
+ /**
+ * Proportionally allocate traffic according to dynamic path quality measurements
+ */
+ if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) {
+ float relq[ZT_MAX_PEER_NETWORK_PATHS];
+ memset(&relq, 0, sizeof(relq));
+ float alloc[ZT_MAX_PEER_NETWORK_PATHS];
+ memset(&alloc, 0, sizeof(alloc));
+
+ // Survey
+ //
+ // Take a survey of all available link qualities. We use this to determine if we
+ // can skip this algorithm altogether and if not, to establish baseline for physical
+ // link quality used in later calculations.
+ //
+ // We find the min/max quality of our currently-active links so
+ // that we can form a relative scale to rank each link proportionally
+ // to each other link.
+ uint16_t alivePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ uint16_t stalePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ memset(&alivePaths, -1, sizeof(alivePaths));
+ memset(&stalePaths, -1, sizeof(stalePaths));
+ uint16_t numAlivePaths = 0;
+ uint16_t numStalePaths = 0;
+ float minQuality = 10000;
+ float maxQuality = -1;
+ float currQuality;
+ for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->isValidState()) {
+ if (!_paths[i].p->monitorsReady()) {
+ // TODO: This should fix itself anyway but we should test whether forcing the use of a new path will
+ // aid in establishing flow balance more quickly.
+ }
+ // Compute quality here, going forward we will use lastComputedQuality()
+ currQuality = _paths[i].p->computeQuality(now);
+ if (!_paths[i].p->stale(now)) {
+ alivePaths[i] = currQuality;
+ numAlivePaths++;
+ }
+ else {
+ stalePaths[i] = currQuality;
+ numStalePaths++;
+ }
+ if (currQuality > maxQuality) {
+ maxQuality = currQuality;
bestPath = i;
}
+ if (currQuality < minQuality) {
+ minQuality = currQuality;
+ }
+ relq[i] = currQuality;
}
- } else break;
+ }
+
+ // Attempt to find an excuse not to use the rest of this algorithm
+ if (bestPath == ZT_MAX_PEER_NETWORK_PATHS || (numAlivePaths == 0 && numStalePaths == 0)) {
+ return SharedPtr<Path>();
+ } if (numAlivePaths == 1) {
+ return _paths[bestPath].p;
+ } if (numStalePaths == 1) {
+ return _paths[bestPath].p;
+ }
+
+ // Relative quality
+ //
+ // The strongest link will have a value of 1.0 whereas every other
+ // link will have a value which represents some fraction of the strongest link.
+ float totalRelativeQuality = 0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->isValidState()) {
+ relq[i] /= maxQuality ? maxQuality : 1;
+ totalRelativeQuality += relq[i];
+ }
+ }
+
+ // Convert the relative quality values into flow allocations.
+ // Additionally, determine whether each path in the flow is
+ // contributing more or less than its target allocation. If
+ // it is contributing more than required, don't allow it to be
+ // randomly selected for the next packet. If however the path
+ // needs to contribute more to the flow, we should record
+ float imbalance = 0;
+ float qualityScalingFactor = 1.0 / totalRelativeQuality;
+ for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ // Out of the last N packets to this peer, how many were sent by this path?
+ int numPktSentWithinWin = (int)_pathChoiceHist->countValue((float)i);
+ // Compute traffic allocation for each path in the flow
+ if (_paths[i].p && _paths[i].p->isValidState()) {
+ // Allocation
+ // This is the percentage of traffic we want to send over a given path
+ alloc[i] = relq[i] * qualityScalingFactor;
+ float currProportion = numPktSentWithinWin / (float)ZT_MULTIPATH_PROPORTION_WIN_SZ;
+ float targetProportion = alloc[i];
+ float diffProportion = currProportion - targetProportion;
+ // Imbalance
+ //
+ // This is the sum of the distances of each path's currently observed flow contributions
+ // from its most recent target allocation. In other words, this is a measure of how closely we
+ // are adhering to our desired allocations. It is worth noting that this value can be greater
+ // than 1.0 if a significant change to allocations is made by the algorithm, this will
+ // eventually correct itself.
+ imbalance += fabs(diffProportion);
+ if (diffProportion < 0) {
+ alloc[i] = targetProportion;
+ }
+ else {
+ alloc[i] = targetProportion;
+ }
+ }
+ }
+
+ // Compute and record current flow balance
+ float balance = 1.0 - imbalance;
+ if (balance >= ZT_MULTIPATH_FLOW_BALANCE_THESHOLD) {
+ if (!_linkBalanceStatus) {
+ _linkBalanceStatus = true;
+ RR->t->peerLinkBalanced(NULL,0,*this);
+ }
+ }
+ else {
+ if (_linkBalanceStatus) {
+ _linkBalanceStatus = false;
+ RR->t->peerLinkImbalanced(NULL,0,*this);
+ }
+ }
+
+ // Record the current flow balance. Later used for computing a mean flow balance value.
+ _flowBalanceHist->push(balance);
+
+ // Randomly choose path from allocated candidates
+ unsigned int r;
+ Utils::getSecureRandom(&r, 1);
+ float rf = (float)(r %= 100) / 100;
+ for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->isValidState() && _paths[i].p->address().isV4()) {
+ if (alloc[i] > 0 && rf < alloc[i]) {
+ bestPath = i;
+ _pathChoiceHist->push(bestPath); // Record which path we chose
+ break;
+ }
+ if (alloc[i] > 0) {
+ rf -= alloc[i];
+ }
+ else {
+ rf -= alloc[i]*-1;
+ }
+ }
+ }
+ if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) {
+ return _paths[bestPath].p;
+ }
+ return SharedPtr<Path>();
+ }
+
+ // Adhere to a user-defined interface/allocation scheme
+ if (RR->node->getMultipathMode() == ZT_MULTIPATH_MANUALLY_BALANCED) {
+ // TODO
}
- if (bestPath != ZT_MAX_PEER_NETWORK_PATHS)
- return _paths[bestPath].p;
return SharedPtr<Path>();
}
@@ -477,16 +714,34 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
}
} else break;
}
- while(j < ZT_MAX_PEER_NETWORK_PATHS) {
- _paths[j].lr = 0;
- _paths[j].p.zero();
- _paths[j].priority = 1;
- ++j;
+ if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
+ while(j < ZT_MAX_PEER_NETWORK_PATHS) {
+ _paths[j].lr = 0;
+ _paths[j].p.zero();
+ _paths[j].priority = 1;
+ ++j;
+ }
}
-
return sent;
}
+unsigned int Peer::prunePaths()
+{
+ Mutex::Lock _l(_paths_m);
+ unsigned int pruned = 0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if(_paths[i].p->isClosed() || !_paths[i].p->isValidState()) {
+ _paths[i].lr = 0;
+ _paths[i].p.zero();
+ _paths[i].priority = 1;
+ pruned++;
+ }
+ }
+ }
+ return pruned;
+}
+
void Peer::clusterRedirect(void *tPtr,const SharedPtr<Path> &originatingPath,const InetAddress &remoteAddress,const int64_t now)
{
SharedPtr<Path> np(RR->topology->getPath(originatingPath->localSocket(),remoteAddress));