summaryrefslogtreecommitdiff
path: root/node/Peer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/Peer.cpp')
-rw-r--r--node/Peer.cpp494
1 files changed, 418 insertions, 76 deletions
diff --git a/node/Peer.cpp b/node/Peer.cpp
index 71afd852..5f5d1462 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -1,6 +1,6 @@
/*
* ZeroTier One - Network Virtualization Everywhere
- * Copyright (C) 2011-2018 ZeroTier, Inc. https://www.zerotier.com/
+ * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -13,7 +13,7 @@
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
@@ -25,7 +25,6 @@
*/
#include "../version.h"
-
#include "Constants.hpp"
#include "Peer.hpp"
#include "Node.hpp"
@@ -35,9 +34,13 @@
#include "Packet.hpp"
#include "Trace.hpp"
#include "InetAddress.hpp"
+#include "RingBuffer.hpp"
+#include "Utils.hpp"
namespace ZeroTier {
+static unsigned char s_freeRandomByteCounter = 0;
+
Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity) :
RR(renv),
_lastReceive(0),
@@ -48,18 +51,29 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
_lastCredentialRequestSent(0),
_lastWhoisRequestReceived(0),
_lastEchoRequestReceived(0),
- _lastComRequestReceived(0),
- _lastComRequestSent(0),
_lastCredentialsReceived(0),
_lastTrustEstablishedPacketReceived(0),
_lastSentFullHello(0),
+ _lastACKWindowReset(0),
+ _lastQoSWindowReset(0),
+ _lastMultipathCompatibilityCheck(0),
+ _freeRandomByte((unsigned char)((uintptr_t)this >> 4) ^ ++s_freeRandomByteCounter),
+ _uniqueAlivePathCount(0),
+ _localMultipathSupported(false),
+ _remoteMultipathSupported(false),
+ _canUseMultipath(false),
_vProto(0),
_vMajor(0),
_vMinor(0),
_vRevision(0),
_id(peerIdentity),
_directPathPushCutoffCount(0),
- _credentialsCutoffCount(0)
+ _credentialsCutoffCount(0),
+ _linkIsBalanced(false),
+ _linkIsRedundant(false),
+ _remotePeerMultipathEnabled(false),
+ _lastAggregateStatsReport(0),
+ _lastAggregateAllocation(0)
{
if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH))
throw ZT_EXCEPTION_INVALID_ARGUMENT;
@@ -70,6 +84,7 @@ void Peer::received(
const SharedPtr<Path> &path,
const unsigned int hops,
const uint64_t packetId,
+ const unsigned int payloadLength,
const Packet::Verb verb,
const uint64_t inRePacketId,
const Packet::Verb inReVerb,
@@ -87,7 +102,8 @@ void Peer::received(
case Packet::VERB_MULTICAST_FRAME:
_lastNontrivialReceive = now;
break;
- default: break;
+ default:
+ break;
}
if (trustEstablished) {
@@ -95,9 +111,25 @@ void Peer::received(
path->trustedPacketReceived(now);
}
+ {
+ Mutex::Lock _l(_paths_m);
+
+ recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now);
+
+ if (_canUseMultipath) {
+ if (path->needsToSendQoS(now)) {
+ sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now);
+ }
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ _paths[i].p->processBackgroundPathMeasurements(now);
+ }
+ }
+ }
+ }
+
if (hops == 0) {
// If this is a direct packet (no hops), update existing paths or learn new ones
-
bool havePath = false;
{
Mutex::Lock _l(_paths_m);
@@ -116,20 +148,27 @@ void Peer::received(
if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) {
Mutex::Lock _l(_paths_m);
- // Paths are redunant if they duplicate an alive path to the same IP or
+ // Paths are redundant if they duplicate an alive path to the same IP or
// with the same local socket and address family.
bool redundant = false;
+ unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) {
+ if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) {
redundant = true;
break;
}
+ // If the path is the same address and port, simply assume this is a replacement
+ if ( (_paths[i].p->address().ipsEqual2(path->address()))) {
+ replacePath = i;
+ break;
+ }
} else break;
}
- if (!redundant) {
- unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS;
+ // If the path isn't a duplicate of the same localSocket AND we haven't already determined a replacePath,
+ // then find the worst path and replace it.
+ if (!redundant && replacePath == ZT_MAX_PEER_NETWORK_PATHS) {
int replacePathQuality = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
@@ -143,16 +182,16 @@ void Peer::received(
break;
}
}
+ }
- if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) {
- if (verb == Packet::VERB_OK) {
- RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId);
- _paths[replacePath].lr = now;
- _paths[replacePath].p = path;
- _paths[replacePath].priority = 1;
- } else {
- attemptToContact = true;
- }
+ if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) {
+ if (verb == Packet::VERB_OK) {
+ RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId);
+ _paths[replacePath].lr = now;
+ _paths[replacePath].p = path;
+ _paths[replacePath].priority = 1;
+ } else {
+ attemptToContact = true;
}
}
}
@@ -165,39 +204,20 @@ void Peer::received(
}
// If we have a trust relationship periodically push a message enumerating
- // all known external addresses for ourselves. We now do this even if we
- // have a current path since we'll want to use new ones too.
+ // all known external addresses for ourselves. If we already have a path this
+ // is done less frequently.
if (this->trustEstablished(now)) {
- if ((now - _lastDirectPathPushSent) >= ZT_DIRECT_PATH_PUSH_INTERVAL) {
+ const int64_t sinceLastPush = now - _lastDirectPathPushSent;
+ if (sinceLastPush >= ((hops == 0) ? ZT_DIRECT_PATH_PUSH_INTERVAL_HAVEPATH : ZT_DIRECT_PATH_PUSH_INTERVAL)) {
_lastDirectPathPushSent = now;
-
- std::vector<InetAddress> pathsToPush;
-
- std::vector<InetAddress> dps(RR->node->directPaths());
- for(std::vector<InetAddress>::const_iterator i(dps.begin());i!=dps.end();++i)
- pathsToPush.push_back(*i);
-
- // Do symmetric NAT prediction if we are communicating indirectly.
- if (hops > 0) {
- std::vector<InetAddress> sym(RR->sa->getSymmetricNatPredictions());
- for(unsigned long i=0,added=0;i<sym.size();++i) {
- InetAddress tmp(sym[(unsigned long)RR->node->prng() % sym.size()]);
- if (std::find(pathsToPush.begin(),pathsToPush.end(),tmp) == pathsToPush.end()) {
- pathsToPush.push_back(tmp);
- if (++added >= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY)
- break;
- }
- }
- }
-
+ std::vector<InetAddress> pathsToPush(RR->node->directPaths());
if (pathsToPush.size() > 0) {
std::vector<InetAddress>::const_iterator p(pathsToPush.begin());
while (p != pathsToPush.end()) {
- Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
- outp.addSize(2); // leave room for count
-
+ Packet *const outp = new Packet(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
+ outp->addSize(2); // leave room for count
unsigned int count = 0;
- while ((p != pathsToPush.end())&&((outp.size() + 24) < 1200)) {
+ while ((p != pathsToPush.end())&&((outp->size() + 24) < 1200)) {
uint8_t addressType = 4;
switch(p->ss_family) {
case AF_INET:
@@ -210,51 +230,287 @@ void Peer::received(
continue;
}
- outp.append((uint8_t)0); // no flags
- outp.append((uint16_t)0); // no extensions
- outp.append(addressType);
- outp.append((uint8_t)((addressType == 4) ? 6 : 18));
- outp.append(p->rawIpData(),((addressType == 4) ? 4 : 16));
- outp.append((uint16_t)p->port());
+ outp->append((uint8_t)0); // no flags
+ outp->append((uint16_t)0); // no extensions
+ outp->append(addressType);
+ outp->append((uint8_t)((addressType == 4) ? 6 : 18));
+ outp->append(p->rawIpData(),((addressType == 4) ? 4 : 16));
+ outp->append((uint16_t)p->port());
++count;
++p;
}
-
if (count) {
- outp.setAt(ZT_PACKET_IDX_PAYLOAD,(uint16_t)count);
- outp.armor(_key,true);
- path->send(RR,tPtr,outp.data(),outp.size(),now);
+ outp->setAt(ZT_PACKET_IDX_PAYLOAD,(uint16_t)count);
+ outp->compress();
+ outp->armor(_key,true);
+ path->send(RR,tPtr,outp->data(),outp->size(),now);
}
+ delete outp;
}
}
}
}
}
-SharedPtr<Path> Peer::getBestPath(int64_t now,bool includeExpired) const
+void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
+ uint16_t payloadLength, const Packet::Verb verb, int64_t now)
{
- Mutex::Lock _l(_paths_m);
+ _freeRandomByte += (unsigned char)(packetId >> 8); // grab entropy to use in path selection logic for multipath
+ if (_canUseMultipath) {
+ path->recordOutgoingPacket(now, packetId, payloadLength, verb);
+ }
+}
+
+void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId,
+ uint16_t payloadLength, const Packet::Verb verb, int64_t now)
+{
+ if (_canUseMultipath) {
+ if (path->needsToSendAck(now)) {
+ sendACK(tPtr, path, path->localSocket(), path->address(), now);
+ }
+ path->recordIncomingPacket(now, packetId, payloadLength, verb);
+ }
+}
+
+void Peer::computeAggregateProportionalAllocation(int64_t now)
+{
+ float maxStability = 0;
+ float totalRelativeQuality = 0;
+ float maxThroughput = 1;
+ float maxScope = 0;
+ float relStability[ZT_MAX_PEER_NETWORK_PATHS];
+ float relThroughput[ZT_MAX_PEER_NETWORK_PATHS];
+ memset(&relStability, 0, sizeof(relStability));
+ memset(&relThroughput, 0, sizeof(relThroughput));
+ // Survey all paths
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ relStability[i] = _paths[i].p->lastComputedStability();
+ relThroughput[i] = (float)_paths[i].p->maxLifetimeThroughput();
+ maxStability = relStability[i] > maxStability ? relStability[i] : maxStability;
+ maxThroughput = relThroughput[i] > maxThroughput ? relThroughput[i] : maxThroughput;
+ maxScope = _paths[i].p->ipScope() > maxScope ? _paths[i].p->ipScope() : maxScope;
+ }
+ }
+ // Convert to relative values
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ relStability[i] /= maxStability ? maxStability : 1;
+ relThroughput[i] /= maxThroughput ? maxThroughput : 1;
+ float normalized_ma = Utils::normalize((float)_paths[i].p->ackAge(now), 0, ZT_PATH_MAX_AGE, 0, 10);
+ float age_contrib = exp((-1)*normalized_ma);
+ float relScope = ((float)(_paths[i].p->ipScope()+1) / (maxScope + 1));
+ float relQuality =
+ (relStability[i] * (float)ZT_PATH_CONTRIB_STABILITY)
+ + (fmaxf(1.0f, relThroughput[i]) * (float)ZT_PATH_CONTRIB_THROUGHPUT)
+ + relScope * (float)ZT_PATH_CONTRIB_SCOPE;
+ relQuality *= age_contrib;
+ // Arbitrary cutoffs
+ relQuality = relQuality > (1.00f / 100.0f) ? relQuality : 0.0f;
+ relQuality = relQuality < (99.0f / 100.0f) ? relQuality : 1.0f;
+ totalRelativeQuality += relQuality;
+ _paths[i].p->updateRelativeQuality(relQuality);
+ }
+ }
+ // Convert set of relative performances into an allocation set
+ for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ _paths[i].p->updateComponentAllocationOfAggregateLink((unsigned char)((_paths[i].p->relativeQuality() / totalRelativeQuality) * 255));
+ }
+ }
+}
+
+int Peer::computeAggregateLinkPacketDelayVariance()
+{
+ float pdv = 0.0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ pdv += _paths[i].p->relativeQuality() * _paths[i].p->packetDelayVariance();
+ }
+ }
+ return (int)pdv;
+}
+
+int Peer::computeAggregateLinkMeanLatency()
+{
+ int ml = 0;
+ int pathCount = 0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ pathCount++;
+ ml += (int)(_paths[i].p->relativeQuality() * _paths[i].p->meanLatency());
+ }
+ }
+ return ml / pathCount;
+}
+
+int Peer::aggregateLinkPhysicalPathCount()
+{
+ std::map<std::string, bool> ifnamemap;
+ int pathCount = 0;
+ int64_t now = RR->node->now();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ if (!ifnamemap[_paths[i].p->getName()]) {
+ ifnamemap[_paths[i].p->getName()] = true;
+ pathCount++;
+ }
+ }
+ }
+ return pathCount;
+}
+
+int Peer::aggregateLinkLogicalPathCount()
+{
+ int pathCount = 0;
+ int64_t now = RR->node->now();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ pathCount++;
+ }
+ }
+ return pathCount;
+}
+SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
+{
+ Mutex::Lock _l(_paths_m);
unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS;
- long bestPathQuality = 2147483647;
+
+ /**
+ * Send traffic across the highest quality path only. This algorithm will still
+ * use the old path quality metric from protocol version 9.
+ */
+ if (!_canUseMultipath) {
+ long bestPathQuality = 2147483647;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
+ const long q = _paths[i].p->quality(now) / _paths[i].priority;
+ if (q <= bestPathQuality) {
+ bestPathQuality = q;
+ bestPath = i;
+ }
+ }
+ } else break;
+ }
+ if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) {
+ return _paths[bestPath].p;
+ }
+ return SharedPtr<Path>();
+ }
+
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
- const long q = _paths[i].p->quality(now) / _paths[i].priority;
- if (q <= bestPathQuality) {
- bestPathQuality = q;
- bestPath = i;
+ _paths[i].p->processBackgroundPathMeasurements(now);
+ }
+ }
+
+ /**
+ * Randomly distribute traffic across all paths
+ */
+ int numAlivePaths = 0;
+ int numStalePaths = 0;
+ if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) {
+ int alivePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ int stalePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ memset(&alivePaths, -1, sizeof(alivePaths));
+ memset(&stalePaths, -1, sizeof(stalePaths));
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if (_paths[i].p->alive(now)) {
+ alivePaths[numAlivePaths] = i;
+ numAlivePaths++;
+ }
+ else {
+ stalePaths[numStalePaths] = i;
+ numStalePaths++;
}
}
- } else break;
+ }
+ unsigned int r = _freeRandomByte;
+ if (numAlivePaths > 0) {
+ int rf = r % numAlivePaths;
+ return _paths[alivePaths[rf]].p;
+ }
+ else if(numStalePaths > 0) {
+ // Resort to trying any non-expired path
+ int rf = r % numStalePaths;
+ return _paths[stalePaths[rf]].p;
+ }
}
- if (bestPath != ZT_MAX_PEER_NETWORK_PATHS)
- return _paths[bestPath].p;
+ /**
+ * Proportionally allocate traffic according to dynamic path quality measurements
+ */
+ if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) {
+ if ((now - _lastAggregateAllocation) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
+ _lastAggregateAllocation = now;
+ computeAggregateProportionalAllocation(now);
+ }
+ // Randomly choose path according to their allocations
+ float rf = _freeRandomByte;
+ for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if (rf < _paths[i].p->allocation()) {
+ bestPath = i;
+ _pathChoiceHist.push(bestPath); // Record which path we chose
+ break;
+ }
+ rf -= _paths[i].p->allocation();
+ }
+ }
+ if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) {
+ return _paths[bestPath].p;
+ }
+ }
return SharedPtr<Path>();
}
+char *Peer::interfaceListStr()
+{
+ std::map<std::string, int> ifnamemap;
+ char tmp[32];
+ const int64_t now = RR->node->now();
+ char *ptr = _interfaceListStr;
+ bool imbalanced = false;
+ memset(_interfaceListStr, 0, sizeof(_interfaceListStr));
+ int alivePathCount = aggregateLinkLogicalPathCount();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ int ipv = _paths[i].p->address().isV4();
+ // If this is acting as an aggregate link, check allocations
+ float targetAllocation = 1.0f / (float)alivePathCount;
+ float currentAllocation = 1.0f;
+ if (alivePathCount > 1) {
+ currentAllocation = (float)_pathChoiceHist.countValue(i) / (float)_pathChoiceHist.count();
+ if (fabs(targetAllocation - currentAllocation) > ZT_PATH_IMBALANCE_THRESHOLD) {
+ imbalanced = true;
+ }
+ }
+ char *ipvStr = ipv ? (char*)"ipv4" : (char*)"ipv6";
+ sprintf(tmp, "(%s, %s, %.3f)", _paths[i].p->getName(), ipvStr, currentAllocation);
+ // Prevent duplicates
+ if(ifnamemap[_paths[i].p->getName()] != ipv) {
+ memcpy(ptr, tmp, strlen(tmp));
+ ptr += strlen(tmp);
+ *ptr = ' ';
+ ptr++;
+ ifnamemap[_paths[i].p->getName()] = ipv;
+ }
+ }
+ }
+ ptr--; // Overwrite trailing space
+ if (imbalanced) {
+ sprintf(tmp, ", is asymmetrical");
+ memcpy(ptr, tmp, sizeof(tmp));
+ } else {
+ *ptr = '\0';
+ }
+ return _interfaceListStr;
+}
+
void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &other) const
{
unsigned int myBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
@@ -376,6 +632,73 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o
}
}
+inline void Peer::processBackgroundPeerTasks(const int64_t now)
+{
+ // Determine current multipath compatibility with other peer
+ if ((now - _lastMultipathCompatibilityCheck) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
+ //
+ // Cache number of available paths so that we can short-circuit multipath logic elsewhere
+ //
+ // We also take notice of duplicate paths (same IP only) because we may have
+ // recently received a direct path push from a peer and our list might contain
+ // a dead path which hasn't been fully recognized as such. In this case we
+ // don't want the duplicate to trigger execution of multipath code prematurely.
+ //
+ // This is done to support the behavior of auto multipath enable/disable
+ // without user intervention.
+ //
+ int currAlivePathCount = 0;
+ int duplicatePathsFound = 0;
+ for (unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ currAlivePathCount++;
+ for (unsigned int j=0;j<ZT_MAX_PEER_NETWORK_PATHS;++j) {
+ if (_paths[i].p && _paths[j].p && _paths[i].p->address().ipsEqual2(_paths[j].p->address()) && i != j) {
+ duplicatePathsFound+=1;
+ break;
+ }
+ }
+ }
+ }
+ _uniqueAlivePathCount = (currAlivePathCount - (duplicatePathsFound / 2));
+ _lastMultipathCompatibilityCheck = now;
+ _localMultipathSupported = ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9));
+ _remoteMultipathSupported = _vProto > 9;
+ // If both peers support multipath and more than one path exist, we can use multipath logic
+ _canUseMultipath = _localMultipathSupported && _remoteMultipathSupported && (_uniqueAlivePathCount > 1);
+ }
+}
+
+void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
+{
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK);
+ uint32_t bytesToAck = path->bytesToAck();
+ outp.append<uint32_t>(bytesToAck);
+ if (atAddress) {
+ outp.armor(_key,false);
+ RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
+ } else {
+ RR->sw->send(tPtr,outp,false);
+ }
+ path->sentAck(now);
+}
+
+void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
+{
+ const int64_t _now = RR->node->now();
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT);
+ char qosData[ZT_PATH_MAX_QOS_PACKET_SZ];
+ int16_t len = path->generateQoSPacket(_now,qosData);
+ outp.append(qosData,len);
+ if (atAddress) {
+ outp.armor(_key,false);
+ RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
+ } else {
+ RR->sw->send(tPtr,outp,false);
+ }
+ path->sentQoS(now);
+}
+
void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
{
Packet outp(_id.address(),RR->identity.address(),Packet::VERB_HELLO);
@@ -444,12 +767,30 @@ void Peer::tryMemorizedPath(void *tPtr,int64_t now)
unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
{
unsigned int sent = 0;
-
Mutex::Lock _l(_paths_m);
const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
_lastSentFullHello = now;
+ processBackgroundPeerTasks(now);
+
+ // Emit traces regarding aggregate link status
+ if (_canUseMultipath) {
+ int alivePathCount = aggregateLinkPhysicalPathCount();
+ if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) {
+ _lastAggregateStatsReport = now;
+ if (alivePathCount) {
+ RR->t->peerLinkAggregateStatistics(NULL,*this);
+ }
+ } if (alivePathCount < 2 && _linkIsRedundant) {
+ _linkIsRedundant = !_linkIsRedundant;
+ RR->t->peerLinkNoLongerRedundant(NULL,*this);
+ } if (alivePathCount > 1 && !_linkIsRedundant) {
+ _linkIsRedundant = !_linkIsRedundant;
+ RR->t->peerLinkNowRedundant(NULL,*this);
+ }
+ }
+
// Right now we only keep pinging links that have the maximum priority. The
// priority is used to track cluster redirections, meaning that when a cluster
// redirects us its redirect target links override all other links and we
@@ -477,13 +818,14 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
}
} else break;
}
- while(j < ZT_MAX_PEER_NETWORK_PATHS) {
- _paths[j].lr = 0;
- _paths[j].p.zero();
- _paths[j].priority = 1;
- ++j;
+ if (canUseMultipath()) {
+ while(j < ZT_MAX_PEER_NETWORK_PATHS) {
+ _paths[j].lr = 0;
+ _paths[j].p.zero();
+ _paths[j].priority = 1;
+ ++j;
+ }
}
-
return sent;
}