From 91a22a686a17f8cedc58690592a84f5d4a3ab5e0 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Fri, 1 Jun 2018 18:03:59 -0700 Subject: Added auto-escalation to multipath if both peers support it. Improved QoS/ACK tracking. Related bug fixes --- node/Peer.cpp | 51 +++++++++++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 24 deletions(-) (limited to 'node/Peer.cpp') diff --git a/node/Peer.cpp b/node/Peer.cpp index 8deaa362..a22bbffe 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -64,6 +64,7 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _credentialsCutoffCount(0), _linkIsBalanced(false), _linkIsRedundant(false), + _remotePeerMultipathEnabled(false), _lastAggregateStatsReport(0) { if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH)) @@ -104,8 +105,10 @@ void Peer::received( { Mutex::Lock _l(_paths_m); - if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) { - recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now); + + recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now); + + if (canUseMultipath()) { if (path->needsToSendQoS(now)) { sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now); } @@ -167,7 +170,7 @@ void Peer::received( // If we find a pre-existing path with the same address, just replace it. // If we don't find anything we can replace, just use the replacePath that we previously decided on. - if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) { + if (canUseMultipath()) { for(unsigned int i=0;iaddress().ss_family == path->address().ss_family && _paths[i].p->address().ipsEqual2(path->address())) { @@ -269,21 +272,19 @@ void Peer::received( void Peer::recordOutgoingPacket(const SharedPtr &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now) { - if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) { - if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) { - path->expectingAck(now, packetId, payloadLength); - } + if (localMultipathSupport()) { + path->recordOutgoingPacket(now, packetId, payloadLength, verb); } } void Peer::recordIncomingPacket(void *tPtr, const SharedPtr &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now) { - if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) { + if (localMultipathSupport()) { if (path->needsToSendAck(now)) { sendACK(tPtr, path, path->localSocket(), path->address(), now); } - path->recordIncomingPacket(now, packetId, payloadLength); + path->recordIncomingPacket(now, packetId, payloadLength, verb); } } @@ -384,9 +385,9 @@ SharedPtr Peer::getAppropriatePath(int64_t now, bool includeExpired) /** * Send traffic across the highest quality path only. This algorithm will still - * use the old path quality metric. + * use the old path quality metric from protocol version 9. */ - if (RR->node->getMultipathMode() == ZT_MULTIPATH_NONE) { + if (!canUseMultipath()) { long bestPathQuality = 2147483647; for(unsigned int i=0;i Peer::getAppropriatePath(int64_t now, bool includeExpired) stalePaths[numStalePaths] = i; numStalePaths++; } - // Record a default path to use as a short-circuit for the rest of the algorithm + // Record a default path to use as a short-circuit for the rest of the algorithm (if needed) bestPath = i; } } + + // Compare paths to each-other + float qualityScalingFactor = computeAggregateLinkRelativeQuality(now); + if (numAlivePaths == 0 && numStalePaths == 0) { return SharedPtr(); } if (numAlivePaths == 1 || numStalePaths == 1) { return _paths[bestPath].p; } - // Compare paths to each-other - float qualityScalingFactor = computeAggregateLinkRelativeQuality(now); + // Convert set of relative performances into an allocation set for(uint16_t i=0;igetName(), ipvStr, currentAllocation); + sprintf(tmp, "(%s, %s, %.3f)", _paths[i].p->getName(), ipvStr, currentAllocation); // Prevent duplicates if(ifnamemap[_paths[i].p->getName()] != ipv) { memcpy(ptr, tmp, strlen(tmp)); @@ -543,7 +547,7 @@ char *Peer::interfaceListStr() } ptr--; // Overwrite trailing space if (imbalanced) { - sprintf(tmp, ", is IMBALANCED"); + sprintf(tmp, ", is asymmetrical"); memcpy(ptr, tmp, sizeof(tmp)); } else { *ptr = '\0'; @@ -688,10 +692,11 @@ void Peer::sendACK(void *tPtr,const SharedPtr &path,const int64_t localSoc void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) { + const int64_t _now = RR->node->now(); Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT); char qosData[ZT_PATH_MAX_QOS_PACKET_SZ]; - path->generateQoSPacket(now,qosData); - outp.append(qosData,sizeof(qosData)); + int16_t len = path->generateQoSPacket(_now,qosData); + outp.append(qosData,len); if (atAddress) { outp.armor(_key,false); RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size()); @@ -775,17 +780,15 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD); _lastSentFullHello = now; - // Emit traces regarding the status of aggregate links - if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) { + // Emit traces regarding aggregate link status + if (canUseMultipath()) { int alivePathCount = aggregateLinkPhysicalPathCount(); if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) { _lastAggregateStatsReport = now; if (alivePathCount) { RR->t->peerLinkAggregateStatistics(NULL,*this); } - } - // Report link redundancy - if (alivePathCount < 2 && _linkIsRedundant) { + } if (alivePathCount < 2 && _linkIsRedundant) { _linkIsRedundant = !_linkIsRedundant; RR->t->peerLinkNoLongerRedundant(NULL,*this); } if (alivePathCount > 1 && !_linkIsRedundant) { @@ -821,7 +824,7 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) } } else break; } - if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) { + if (canUseMultipath()) { while(j < ZT_MAX_PEER_NETWORK_PATHS) { _paths[j].lr = 0; _paths[j].p.zero(); -- cgit v1.2.3