summaryrefslogtreecommitdiff
path: root/node/Peer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/Peer.cpp')
-rw-r--r--node/Peer.cpp430
1 files changed, 394 insertions, 36 deletions
diff --git a/node/Peer.cpp b/node/Peer.cpp
index 71afd852..21bbfabe 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -25,7 +25,6 @@
*/
#include "../version.h"
-
#include "Constants.hpp"
#include "Peer.hpp"
#include "Node.hpp"
@@ -35,6 +34,8 @@
#include "Packet.hpp"
#include "Trace.hpp"
#include "InetAddress.hpp"
+#include "RingBuffer.hpp"
+#include "Utils.hpp"
namespace ZeroTier {
@@ -53,16 +54,31 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
_lastCredentialsReceived(0),
_lastTrustEstablishedPacketReceived(0),
_lastSentFullHello(0),
+ _lastACKWindowReset(0),
+ _lastQoSWindowReset(0),
+ _lastMultipathCompatibilityCheck(0),
+ _freeRandomByte(0),
+ _uniqueAlivePathCount(0),
+ _localMultipathSupported(false),
+ _remoteMultipathSupported(false),
+ _canUseMultipath(false),
_vProto(0),
_vMajor(0),
_vMinor(0),
_vRevision(0),
_id(peerIdentity),
_directPathPushCutoffCount(0),
- _credentialsCutoffCount(0)
+ _credentialsCutoffCount(0),
+ _linkIsBalanced(false),
+ _linkIsRedundant(false),
+ _remotePeerMultipathEnabled(false),
+ _lastAggregateStatsReport(0),
+ _lastAggregateAllocation(0)
{
+ Utils::getSecureRandom(&_freeRandomByte, 1);
if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH))
throw ZT_EXCEPTION_INVALID_ARGUMENT;
+ _pathChoiceHist = new RingBuffer<int>(ZT_MULTIPATH_PROPORTION_WIN_SZ);
}
void Peer::received(
@@ -70,6 +86,7 @@ void Peer::received(
const SharedPtr<Path> &path,
const unsigned int hops,
const uint64_t packetId,
+ const unsigned int payloadLength,
const Packet::Verb verb,
const uint64_t inRePacketId,
const Packet::Verb inReVerb,
@@ -95,9 +112,25 @@ void Peer::received(
path->trustedPacketReceived(now);
}
+ {
+ Mutex::Lock _l(_paths_m);
+
+ recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now);
+
+ if (_canUseMultipath) {
+ if (path->needsToSendQoS(now)) {
+ sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now);
+ }
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ _paths[i].p->processBackgroundPathMeasurements(now);
+ }
+ }
+ }
+ }
+
if (hops == 0) {
// If this is a direct packet (no hops), update existing paths or learn new ones
-
bool havePath = false;
{
Mutex::Lock _l(_paths_m);
@@ -116,20 +149,26 @@ void Peer::received(
if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) {
Mutex::Lock _l(_paths_m);
- // Paths are redunant if they duplicate an alive path to the same IP or
+ // Paths are redundant if they duplicate an alive path to the same IP or
// with the same local socket and address family.
bool redundant = false;
+ unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) {
+ if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) {
redundant = true;
break;
}
+ // If the path is the same address and port, simply assume this is a replacement
+ if ( (_paths[i].p->address().ipsEqual2(path->address()) && (_paths[i].p->address().port() == path->address().port()))) {
+ replacePath = i;
+ break;
+ }
} else break;
}
-
- if (!redundant) {
- unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS;
+ // If the path isn't a duplicate of the same localSocket AND we haven't already determined a replacePath,
+ // then find the worst path and replace it.
+ if (!redundant && replacePath == ZT_MAX_PEER_NETWORK_PATHS) {
int replacePathQuality = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
@@ -143,16 +182,15 @@ void Peer::received(
break;
}
}
-
- if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) {
- if (verb == Packet::VERB_OK) {
- RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId);
- _paths[replacePath].lr = now;
- _paths[replacePath].p = path;
- _paths[replacePath].priority = 1;
- } else {
- attemptToContact = true;
- }
+ }
+ if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) {
+ if (verb == Packet::VERB_OK) {
+ RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId);
+ _paths[replacePath].lr = now;
+ _paths[replacePath].p = path;
+ _paths[replacePath].priority = 1;
+ } else {
+ attemptToContact = true;
}
}
}
@@ -232,29 +270,265 @@ void Peer::received(
}
}
-SharedPtr<Path> Peer::getBestPath(int64_t now,bool includeExpired) const
+void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
+ uint16_t payloadLength, const Packet::Verb verb, int64_t now)
{
- Mutex::Lock _l(_paths_m);
+ // Grab second byte from packetId to use as a source of entropy in the next path selection
+ _freeRandomByte = (packetId & 0xFF00) >> 8;
+ if (_canUseMultipath) {
+ path->recordOutgoingPacket(now, packetId, payloadLength, verb);
+ }
+}
+void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId,
+ uint16_t payloadLength, const Packet::Verb verb, int64_t now)
+{
+ if (_canUseMultipath) {
+ if (path->needsToSendAck(now)) {
+ sendACK(tPtr, path, path->localSocket(), path->address(), now);
+ }
+ path->recordIncomingPacket(now, packetId, payloadLength, verb);
+ }
+}
+
+void Peer::computeAggregateProportionalAllocation(int64_t now)
+{
+ float maxStability = 0;
+ float totalRelativeQuality = 0;
+ float maxThroughput = 1;
+ float maxScope = 0;
+ float relStability[ZT_MAX_PEER_NETWORK_PATHS];
+ float relThroughput[ZT_MAX_PEER_NETWORK_PATHS];
+ memset(&relStability, 0, sizeof(relStability));
+ memset(&relThroughput, 0, sizeof(relThroughput));
+ // Survey all paths
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ relStability[i] = _paths[i].p->lastComputedStability();
+ relThroughput[i] = _paths[i].p->maxLifetimeThroughput();
+ maxStability = relStability[i] > maxStability ? relStability[i] : maxStability;
+ maxThroughput = relThroughput[i] > maxThroughput ? relThroughput[i] : maxThroughput;
+ maxScope = _paths[i].p->ipScope() > maxScope ? _paths[i].p->ipScope() : maxScope;
+ }
+ }
+ // Convert to relative values
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ relStability[i] /= maxStability ? maxStability : 1;
+ relThroughput[i] /= maxThroughput ? maxThroughput : 1;
+ float normalized_ma = Utils::normalize(_paths[i].p->ackAge(now), 0, ZT_PATH_MAX_AGE, 0, 10);
+ float age_contrib = exp((-1)*normalized_ma);
+ float relScope = ((float)(_paths[i].p->ipScope()+1) / (maxScope + 1));
+ float relQuality =
+ (relStability[i] * ZT_PATH_CONTRIB_STABILITY)
+ + (fmax(1, relThroughput[i]) * ZT_PATH_CONTRIB_THROUGHPUT)
+ + relScope * ZT_PATH_CONTRIB_SCOPE;
+ relQuality *= age_contrib;
+ // Arbitrary cutoffs
+ relQuality = relQuality > (1.00 / 100.0) ? relQuality : 0.0;
+ relQuality = relQuality < (99.0 / 100.0) ? relQuality : 1.0;
+ totalRelativeQuality += relQuality;
+ _paths[i].p->updateRelativeQuality(relQuality);
+ }
+ }
+ // Convert set of relative performances into an allocation set
+ for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ _paths[i].p->updateComponentAllocationOfAggregateLink((_paths[i].p->relativeQuality() / totalRelativeQuality) * 255);
+ }
+ }
+}
+
+int Peer::computeAggregateLinkPacketDelayVariance()
+{
+ float pdv = 0.0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ pdv += _paths[i].p->relativeQuality() * _paths[i].p->packetDelayVariance();
+ }
+ }
+ return pdv;
+}
+
+int Peer::computeAggregateLinkMeanLatency()
+{
+ int ml = 0;
+ int pathCount = 0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ pathCount++;
+ ml += _paths[i].p->relativeQuality() * _paths[i].p->meanLatency();
+ }
+ }
+ return ml / pathCount;
+}
+
+int Peer::aggregateLinkPhysicalPathCount()
+{
+ std::map<std::string, bool> ifnamemap;
+ int pathCount = 0;
+ int64_t now = RR->node->now();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ if (!ifnamemap[_paths[i].p->getName()]) {
+ ifnamemap[_paths[i].p->getName()] = true;
+ pathCount++;
+ }
+ }
+ }
+ return pathCount;
+}
+
+int Peer::aggregateLinkLogicalPathCount()
+{
+ int pathCount = 0;
+ int64_t now = RR->node->now();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ pathCount++;
+ }
+ }
+ return pathCount;
+}
+
+SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
+{
+ Mutex::Lock _l(_paths_m);
unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS;
- long bestPathQuality = 2147483647;
+
+ /**
+ * Send traffic across the highest quality path only. This algorithm will still
+ * use the old path quality metric from protocol version 9.
+ */
+ if (!_canUseMultipath) {
+ long bestPathQuality = 2147483647;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
+ const long q = _paths[i].p->quality(now) / _paths[i].priority;
+ if (q <= bestPathQuality) {
+ bestPathQuality = q;
+ bestPath = i;
+ }
+ }
+ } else break;
+ }
+ if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) {
+ return _paths[bestPath].p;
+ }
+ return SharedPtr<Path>();
+ }
+
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
- const long q = _paths[i].p->quality(now) / _paths[i].priority;
- if (q <= bestPathQuality) {
- bestPathQuality = q;
- bestPath = i;
+ _paths[i].p->processBackgroundPathMeasurements(now);
+ }
+ }
+
+ /**
+ * Randomly distribute traffic across all paths
+ */
+ int numAlivePaths = 0;
+ int numStalePaths = 0;
+ if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) {
+ int alivePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ int stalePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ memset(&alivePaths, -1, sizeof(alivePaths));
+ memset(&stalePaths, -1, sizeof(stalePaths));
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if (_paths[i].p->alive(now)) {
+ alivePaths[numAlivePaths] = i;
+ numAlivePaths++;
+ }
+ else {
+ stalePaths[numStalePaths] = i;
+ numStalePaths++;
}
}
- } else break;
+ }
+ unsigned int r = _freeRandomByte;
+ if (numAlivePaths > 0) {
+ int rf = r % numAlivePaths;
+ return _paths[alivePaths[rf]].p;
+ }
+ else if(numStalePaths > 0) {
+ // Resort to trying any non-expired path
+ int rf = r % numStalePaths;
+ return _paths[stalePaths[rf]].p;
+ }
}
- if (bestPath != ZT_MAX_PEER_NETWORK_PATHS)
- return _paths[bestPath].p;
+ /**
+ * Proportionally allocate traffic according to dynamic path quality measurements
+ */
+ if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) {
+ if ((now - _lastAggregateAllocation) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
+ _lastAggregateAllocation = now;
+ computeAggregateProportionalAllocation(now);
+ }
+ // Randomly choose path according to their allocations
+ float rf = _freeRandomByte;
+ for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if (rf < _paths[i].p->allocation()) {
+ bestPath = i;
+ _pathChoiceHist->push(bestPath); // Record which path we chose
+ break;
+ }
+ rf -= _paths[i].p->allocation();
+ }
+ }
+ if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) {
+ return _paths[bestPath].p;
+ }
+ }
return SharedPtr<Path>();
}
+char *Peer::interfaceListStr()
+{
+ std::map<std::string, int> ifnamemap;
+ char tmp[32];
+ const int64_t now = RR->node->now();
+ char *ptr = _interfaceListStr;
+ bool imbalanced = false;
+ memset(_interfaceListStr, 0, sizeof(_interfaceListStr));
+ int alivePathCount = aggregateLinkLogicalPathCount();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ int ipv = _paths[i].p->address().isV4();
+ // If this is acting as an aggregate link, check allocations
+ float targetAllocation = 1.0 / alivePathCount;
+ float currentAllocation = 1.0;
+ if (alivePathCount > 1) {
+ currentAllocation = (float)_pathChoiceHist->countValue(i) / (float)_pathChoiceHist->count();
+ if (fabs(targetAllocation - currentAllocation) > ZT_PATH_IMBALANCE_THRESHOLD) {
+ imbalanced = true;
+ }
+ }
+ char *ipvStr = ipv ? (char*)"ipv4" : (char*)"ipv6";
+ sprintf(tmp, "(%s, %s, %.3f)", _paths[i].p->getName(), ipvStr, currentAllocation);
+ // Prevent duplicates
+ if(ifnamemap[_paths[i].p->getName()] != ipv) {
+ memcpy(ptr, tmp, strlen(tmp));
+ ptr += strlen(tmp);
+ *ptr = ' ';
+ ptr++;
+ ifnamemap[_paths[i].p->getName()] = ipv;
+ }
+ }
+ }
+ ptr--; // Overwrite trailing space
+ if (imbalanced) {
+ sprintf(tmp, ", is asymmetrical");
+ memcpy(ptr, tmp, sizeof(tmp));
+ } else {
+ *ptr = '\0';
+ }
+ return _interfaceListStr;
+}
+
void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &other) const
{
unsigned int myBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
@@ -376,6 +650,71 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o
}
}
+inline void Peer::processBackgroundPeerTasks(int64_t now)
+{
+ // Determine current multipath compatibility with other peer
+ if ((now - _lastMultipathCompatibilityCheck) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
+ // Cache number of available paths so that we can short-circuit multipath logic elsewhere
+ //
+ // We also take notice of duplicate paths (same IP only) because we may have
+ // recently received a direct path push from a peer and our list might contain
+ // a dead path which hasn't been fully recognized as such. In this case we
+ // don't want the duplicate to trigger execution of multipath code prematurely.
+ //
+ // This is done to support the behavior of auto multipath enable/disable
+ // without user intervention.
+ int currAlivePathCount = 0;
+ int duplicatePathsFound = 0;
+ for (unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ currAlivePathCount++;
+ for (unsigned int j=0;j<ZT_MAX_PEER_NETWORK_PATHS;++j) {
+ if (_paths[i].p && _paths[j].p && _paths[i].p->address().ipsEqual2(_paths[j].p->address()) && i != j) {
+ duplicatePathsFound+=1;
+ break;
+ }
+ }
+ }
+ }
+ _uniqueAlivePathCount = (currAlivePathCount - (duplicatePathsFound / 2));
+ _lastMultipathCompatibilityCheck = now;
+ _localMultipathSupported = ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9));
+ _remoteMultipathSupported = _vProto > 9;
+ // If both peers support multipath and more than one path exist, we can use multipath logic
+ _canUseMultipath = _localMultipathSupported && _remoteMultipathSupported && (_uniqueAlivePathCount > 1);
+ }
+}
+
+void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
+{
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK);
+ uint32_t bytesToAck = path->bytesToAck();
+ outp.append<uint32_t>(bytesToAck);
+ if (atAddress) {
+ outp.armor(_key,false);
+ RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
+ } else {
+ RR->sw->send(tPtr,outp,false);
+ }
+ path->sentAck(now);
+}
+
+void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
+{
+ const int64_t _now = RR->node->now();
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT);
+ char qosData[ZT_PATH_MAX_QOS_PACKET_SZ];
+ int16_t len = path->generateQoSPacket(_now,qosData);
+ outp.append(qosData,len);
+ if (atAddress) {
+ outp.armor(_key,false);
+ RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
+ } else {
+ RR->sw->send(tPtr,outp,false);
+ }
+ path->sentQoS(now);
+}
+
void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
{
Packet outp(_id.address(),RR->identity.address(),Packet::VERB_HELLO);
@@ -444,12 +783,30 @@ void Peer::tryMemorizedPath(void *tPtr,int64_t now)
unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
{
unsigned int sent = 0;
-
Mutex::Lock _l(_paths_m);
const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
_lastSentFullHello = now;
+ processBackgroundPeerTasks(now);
+
+ // Emit traces regarding aggregate link status
+ if (_canUseMultipath) {
+ int alivePathCount = aggregateLinkPhysicalPathCount();
+ if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) {
+ _lastAggregateStatsReport = now;
+ if (alivePathCount) {
+ RR->t->peerLinkAggregateStatistics(NULL,*this);
+ }
+ } if (alivePathCount < 2 && _linkIsRedundant) {
+ _linkIsRedundant = !_linkIsRedundant;
+ RR->t->peerLinkNoLongerRedundant(NULL,*this);
+ } if (alivePathCount > 1 && !_linkIsRedundant) {
+ _linkIsRedundant = !_linkIsRedundant;
+ RR->t->peerLinkNowRedundant(NULL,*this);
+ }
+ }
+
// Right now we only keep pinging links that have the maximum priority. The
// priority is used to track cluster redirections, meaning that when a cluster
// redirects us its redirect target links override all other links and we
@@ -477,13 +834,14 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
}
} else break;
}
- while(j < ZT_MAX_PEER_NETWORK_PATHS) {
- _paths[j].lr = 0;
- _paths[j].p.zero();
- _paths[j].priority = 1;
- ++j;
+ if (canUseMultipath()) {
+ while(j < ZT_MAX_PEER_NETWORK_PATHS) {
+ _paths[j].lr = 0;
+ _paths[j].p.zero();
+ _paths[j].priority = 1;
+ ++j;
+ }
}
-
return sent;
}