summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoseph Henry <josephjah@gmail.com>2018-05-30 17:45:29 -0700
committerJoseph Henry <josephjah@gmail.com>2018-05-30 17:45:29 -0700
commit46a7a2be2e1c4d09d1da2bf26b110a31ec3d0661 (patch)
tree6b75b8e7f5868264428ff3b158a8c1b27288e122
parent8199dbd0dcbc3259eb3218ece183315eddb1e84b (diff)
downloadinfinitytier-46a7a2be2e1c4d09d1da2bf26b110a31ec3d0661.tar.gz
infinitytier-46a7a2be2e1c4d09d1da2bf26b110a31ec3d0661.zip
Added VERB_ACK and VERB_QOS_MEASUREMENT, refined notion of path quality
-rw-r--r--include/ZeroTierDebug.h2
-rw-r--r--make-linux.mk3
-rw-r--r--make-mac.mk5
-rw-r--r--node/Constants.hpp92
-rw-r--r--node/IncomingPacket.cpp119
-rw-r--r--node/IncomingPacket.hpp2
-rw-r--r--node/Packet.hpp52
-rw-r--r--node/Path.hpp458
-rw-r--r--node/Peer.cpp428
-rw-r--r--node/Peer.hpp85
-rw-r--r--node/RingBuffer.hpp81
-rw-r--r--node/Switch.cpp2
-rw-r--r--node/Trace.cpp22
-rw-r--r--node/Trace.hpp8
-rw-r--r--node/Utils.hpp8
-rw-r--r--osdep/Binder.hpp14
-rw-r--r--osdep/Phy.hpp150
-rw-r--r--service/OneService.cpp33
18 files changed, 899 insertions, 665 deletions
diff --git a/include/ZeroTierDebug.h b/include/ZeroTierDebug.h
index 8e5366f0..a60179b7 100644
--- a/include/ZeroTierDebug.h
+++ b/include/ZeroTierDebug.h
@@ -86,7 +86,7 @@
#include <android/log.h>
#define ZT_LOG_TAG "ZTSDK"
#endif
-#if defined(ZT_TRACE)
+#if defined(ZT_DEBUG_TRACE)
#if ZT_MSG_INFO == true
#if defined(__ANDROID__)
#define DEBUG_INFO(fmt, args...) ((void)__android_log_print(ANDROID_LOG_VERBOSE, ZT_LOG_TAG, \
diff --git a/make-linux.mk b/make-linux.mk
index 0f5ef384..749c24b8 100644
--- a/make-linux.mk
+++ b/make-linux.mk
@@ -45,6 +45,9 @@ ONE_OBJS+=ext/http-parser/http_parser.o
ifeq ($(ZT_TRACE),1)
override DEFS+=-DZT_TRACE
endif
+ifeq ($(ZT_DEBUG_TRACE),1)
+ DEFS+=-DZT_DEBUG_TRACE
+endif
ifeq ($(ZT_RULES_ENGINE_DEBUGGING),1)
override DEFS+=-DZT_RULES_ENGINE_DEBUGGING
diff --git a/make-mac.mk b/make-mac.mk
index 1178437a..12357f68 100644
--- a/make-mac.mk
+++ b/make-mac.mk
@@ -43,7 +43,10 @@ ONE_OBJS+=ext/libnatpmp/natpmp.o ext/libnatpmp/getgateway.o ext/miniupnpc/connec
# Build with address sanitization library for advanced debugging (clang)
ifeq ($(ZT_SANITIZE),1)
- SANFLAGS+=-fsanitize=address -DASAN_OPTIONS=symbolize=1
+ DEFS+=-fsanitize=address -DASAN_OPTIONS=symbolize=1
+endif
+ifeq ($(ZT_DEBUG_TRACE),1)
+ DEFS+=-DZT_DEBUG_TRACE
endif
# Debug mode -- dump trace output, build binary with -g
ifeq ($(ZT_DEBUG),1)
diff --git a/node/Constants.hpp b/node/Constants.hpp
index ee2ff0a6..227497de 100644
--- a/node/Constants.hpp
+++ b/node/Constants.hpp
@@ -268,11 +268,6 @@
#define ZT_PING_CHECK_INVERVAL 5000
/**
- * Length of interface name
- */
-#define ZT_PATH_INTERFACE_NAME_SZ 16
-
-/**
* How frequently to check for changes to the system's network interfaces. When
* the service decides to use this constant it's because we want to react more
* quickly to new interfaces that pop up or go down.
@@ -286,78 +281,95 @@
#define ZT_MULTIPATH_PROPORTION_WIN_SZ 128
/**
- * Threshold for flow to be considered balanced.
+ * How often we will sample packet latency. Should be at least greater than ZT_PING_CHECK_INVERVAL
+ * since we will record a 0 bit/s measurement if no valid latency measurement was made within this
+ * window of time.
*/
-#define ZT_MULTIPATH_FLOW_BALANCE_THESHOLD 0.80
+#define ZT_PATH_LATENCY_SAMPLE_INTERVAL ZT_MULTIPATH_PEER_PING_PERIOD * 2
/**
- * Number of samples to consider when computing path statistics
+ * Interval used for rate-limiting the computation of path quality estimates. Set at 0
+ * to compute as new packets arrive with no delay.
*/
-#define ZT_PATH_QUALITY_METRIC_WIN_SZ 64
+#define ZT_PATH_QUALITY_COMPUTE_INTERVAL 1000
/**
- * How often important path metrics are sampled (in ms). These metrics are later used
- * for path quality estimates
+ * Number of samples to consider when computing real-time path statistics
*/
-#define ZT_PATH_QUALITY_SAMPLE_INTERVAL 100
+#define ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ 128
/**
- * How often new path quality estimates are computed
+ * Number of samples to consider when computing performing long-term path quality analysis.
+ * By default this value is set to ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ but can
+ * be set to any value greater than that to observe longer-term path quality behavior.
*/
-#define ZT_PATH_QUALITY_ESTIMATE_INTERVAL 100
+#define ZT_PATH_QUALITY_METRIC_WIN_SZ ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ
/**
- * How often we will sample packet latency. Should be at least greater than ZT_PING_CHECK_INVERVAL
- * since we will record a 0 bit/s measurement if no valid latency measurement was made within this
- * window of time.
+ * Maximum acceptable Packet Delay Variance (PDV) over a path
*/
-#define ZT_PATH_LATENCY_SAMPLE_INTERVAL ZT_MULTIPATH_PEER_PING_PERIOD * 2
+#define ZT_PATH_MAX_PDV 1000
/**
- * Interval used for rate-limiting the computation of path quality estimates. Set at 0
- * to compute as new packets arrive with no delay.
+ * Maximum acceptable time interval between expectation and receipt of at least one ACK over a path
+ */
+#define ZT_PATH_MAX_AGE 30000
+
+/**
+ * Maximum acceptable mean latency over a path
+ */
+#define ZT_PATH_MAX_MEAN_LATENCY 1000
+
+/**
+ * How much each factor contributes to the "stability" score of a path
+ */
+#define ZT_PATH_CONTRIB_PDV 1.0 / 3.0
+#define ZT_PATH_CONTRIB_LATENCY 1.0 / 3.0
+#define ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE 1.0 / 3.0
+
+/**
+ * How much each factor contributes to the "quality" score of a path
*/
-#define ZT_PATH_QUALITY_COMPUTE_INTERVAL 0
+#define ZT_PATH_CONTRIB_STABILITY 0.75 / 3.0
+#define ZT_PATH_CONTRIB_THROUGHPUT 1.50 / 3.0
+#define ZT_PATH_CONTRIB_SCOPE 0.75 / 3.0
/**
- * Path error rate history window size. This is used to keep track of packet error
- * measurements over a path's medium-term history.
+ * Min and max acceptable sizes for a VERB_QOS_MEASUREMENT packet
*/
-#define ZT_PATH_ERROR_HIST_WIN_SZ 10
+#define ZT_PATH_MIN_QOS_PACKET_SZ 8 + 1
+#define ZT_PATH_MAX_QOS_PACKET_SZ 1400
/**
- * The number of packet error measurements in each sample
+ * How many ID:sojourn time pairs in a single QoS packet
*/
-#define ZT_PATH_ERROR_SAMPLE_WIN_SZ 1024
+#define ZT_PATH_QOS_TABLE_SIZE (ZT_PATH_MAX_QOS_PACKET_SZ * 8) / (64 + 8)
/**
- * How often a peer will prune its own paths. Pruning is important when multipath is
- * enabled because we want to prevent the allocation algorithms from sending anything
- * out on known dead paths. Additionally, quickly marking paths as dead helps when
- * a new path is learned and needs to replace an older path.
+ * How often the service tests the path throughput
*/
-#define ZT_CLOSED_PATH_PRUNING_INTERVAL 1000
+#define ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL ZT_PATH_ACK_INTERVAL * 8
/**
- * Datagram used to test link throughput. Contents are random.
+ * Minimum amount of time between each ACK packet
*/
-#define ZT_LINK_TEST_DATAGRAM_SZ 1024
+#define ZT_PATH_ACK_INTERVAL 250
/**
- * Size of datagram expected as a reply to a link speed test
+ * How often a QoS packet is sent
*/
-#define ZT_LINK_TEST_DATAGRAM_RESPONSE_SZ 8
+#define ZT_PATH_QOS_INTERVAL 1000
/**
- * Time before a link test datagram is considered lost. Any corresponding
- * timing records that would have been used to compute a RTT are purged.
+ * How often an aggregate link statistics report is emitted into this tracing system
*/
-#define ZT_LINK_TEST_TIMEOUT 10000
+#define ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL 60000
/**
- * How often the service tests the link throughput.
+ * How much an aggregate link's component paths can vary from their target allocation
+ * before the link is considered to be in a state of imbalance.
*/
-#define ZT_LINK_SPEED_TEST_INTERVAL 1000
+#define ZT_PATH_IMBALANCE_THRESHOLD 0.20
/**
* How frequently to send heartbeats over in-use paths
diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp
index 8f6dda63..e7227412 100644
--- a/node/IncomingPacket.cpp
+++ b/node/IncomingPacket.cpp
@@ -80,7 +80,7 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr)
if (!trusted) {
if (!dearmor(peer->key())) {
RR->t->incomingPacketMessageAuthenticationFailure(tPtr,_path,packetId(),sourceAddress,hops(),"invalid MAC");
- _path->recordPacket(false);
+ _path->recordInvalidPacket();
return true;
}
}
@@ -90,15 +90,15 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr)
return true;
}
- _path->recordPacket(true);
-
const Packet::Verb v = verb();
switch(v) {
//case Packet::VERB_NOP:
default: // ignore unknown verbs, but if they pass auth check they are "received"
- peer->received(tPtr,_path,hops(),packetId(),v,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),v,0,Packet::VERB_NOP,false,0);
return true;
case Packet::VERB_HELLO: return _doHELLO(RR,tPtr,true);
+ case Packet::VERB_ACK: return _doACK(RR,tPtr,peer);
+ case Packet::VERB_QOS_MEASUREMENT: return _doQOS_MEASUREMENT(RR,tPtr,peer);
case Packet::VERB_ERROR: return _doERROR(RR,tPtr,peer);
case Packet::VERB_OK: return _doOK(RR,tPtr,peer);
case Packet::VERB_WHOIS: return _doWHOIS(RR,tPtr,peer);
@@ -197,11 +197,55 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,void *tPtr,const Shar
default: break;
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_ERROR,inRePacketId,inReVerb,false,networkId);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_ERROR,inRePacketId,inReVerb,false,networkId);
+
+ return true;
+}
+
+bool IncomingPacket::_doACK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
+{
+ /* Dissect incoming ACK packet. From this we can estimate current throughput of the path, establish known
+ * maximums and detect packet loss. */
+
+ if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
+ int32_t ackedBytes;
+ memcpy(&ackedBytes, payload(), sizeof(int32_t));
+ _path->receivedAck(RR->node->now(), Utils::ntoh(ackedBytes));
+ }
return true;
}
+bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
+{
+
+ /* Dissect incoming QoS packet. From this we can compute latency values and their variance.
+ * The latency variance is used as a measure of "jitter". */
+
+ if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
+ if (payloadLength() < ZT_PATH_MAX_QOS_PACKET_SZ && payloadLength() > ZT_PATH_MIN_QOS_PACKET_SZ) {
+ const int64_t now = RR->node->now();
+ uint64_t rx_id[ZT_PATH_QOS_TABLE_SIZE];
+ uint8_t rx_ts[ZT_PATH_QOS_TABLE_SIZE];
+ char *begin = (char *)payload();
+ char *ptr = begin;
+ int count = 0;
+ int len = payloadLength();
+ // Read packet IDs and latency compensation intervals for each packet tracked by thie QoS packet
+ while (ptr < (begin + len)) {
+ memcpy((void*)&rx_id[count], ptr, sizeof(uint64_t));
+ rx_id[count] = Utils::ntoh(rx_id[count]);
+ ptr+=sizeof(uint64_t);
+ memcpy((void*)&rx_ts[count], ptr, sizeof(uint8_t));
+ ptr+=sizeof(uint8_t);
+ count++;
+ }
+ _path->receivedQoS(now, count, rx_id, rx_ts);
+ }
+ }
+ return true;
+}
+
bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR,void *tPtr,const bool alreadyAuthenticated)
{
const int64_t now = RR->node->now();
@@ -398,7 +442,7 @@ bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR,void *tPtr,const bool
_path->send(RR,tPtr,outp.data(),outp.size(),now);
peer->setRemoteVersion(protoVersion,vMajor,vMinor,vRevision); // important for this to go first so received() knows the version
- peer->received(tPtr,_path,hops(),pid,Packet::VERB_HELLO,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),pid,payloadLength(),Packet::VERB_HELLO,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -448,8 +492,9 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedP
}
}
- if (!hops())
+ if (!hops() && (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE)) {
_path->updateLatency((unsigned int)latency, RR->node->now());
+ }
peer->setRemoteVersion(vProto,vMajor,vMinor,vRevision);
@@ -510,7 +555,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedP
default: break;
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_OK,inRePacketId,inReVerb,false,networkId);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_OK,inRePacketId,inReVerb,false,networkId);
return true;
}
@@ -545,7 +590,7 @@ bool IncomingPacket::_doWHOIS(const RuntimeEnvironment *RR,void *tPtr,const Shar
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_WHOIS,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_WHOIS,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -569,7 +614,7 @@ bool IncomingPacket::_doRENDEZVOUS(const RuntimeEnvironment *RR,void *tPtr,const
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_RENDEZVOUS,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_RENDEZVOUS,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -598,7 +643,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_FRAME,0,Packet::VERB_NOP,trustEstablished,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_FRAME,0,Packet::VERB_NOP,trustEstablished,nwid);
return true;
}
@@ -621,7 +666,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
if (!network->gate(tPtr,peer)) {
RR->t->incomingNetworkAccessDenied(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,true);
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
return true;
}
@@ -633,7 +678,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
const uint8_t *const frameData = (const uint8_t *)field(comLen + ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD,frameLen);
if ((!from)||(from == network->mac())) {
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
@@ -644,19 +689,19 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
network->learnBridgeRoute(from,peer->address());
} else {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,from,to,"bridging not allowed (remote)");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
} else if (to != network->mac()) {
if (to.isMulticast()) {
if (network->config().multicastLimit == 0) {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,from,to,"multicast disabled");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
} else if (!network->config().permitsBridging(RR->identity.address())) {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,from,to,"bridging not allowed (local)");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
}
@@ -676,9 +721,9 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid);
} else {
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
}
return true;
@@ -698,7 +743,7 @@ bool IncomingPacket::_doECHO(const RuntimeEnvironment *RR,void *tPtr,const Share
outp.armor(peer->key(),true);
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
- peer->received(tPtr,_path,hops(),pid,Packet::VERB_ECHO,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),pid,payloadLength(),Packet::VERB_ECHO,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -743,7 +788,7 @@ bool IncomingPacket::_doMULTICAST_LIKE(const RuntimeEnvironment *RR,void *tPtr,c
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_LIKE,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_LIKE,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
return true;
}
@@ -866,7 +911,7 @@ bool IncomingPacket::_doNETWORK_CREDENTIALS(const RuntimeEnvironment *RR,void *t
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_NETWORK_CREDENTIALS,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_NETWORK_CREDENTIALS,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
return true;
}
@@ -892,7 +937,7 @@ bool IncomingPacket::_doNETWORK_CONFIG_REQUEST(const RuntimeEnvironment *RR,void
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
}
- peer->received(tPtr,_path,hopCount,requestPacketId,Packet::VERB_NETWORK_CONFIG_REQUEST,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hopCount,requestPacketId,payloadLength(),Packet::VERB_NETWORK_CONFIG_REQUEST,0,Packet::VERB_NOP,false,nwid);
return true;
}
@@ -913,7 +958,7 @@ bool IncomingPacket::_doNETWORK_CONFIG(const RuntimeEnvironment *RR,void *tPtr,c
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_NETWORK_CONFIG,0,Packet::VERB_NOP,false,(network) ? network->id() : 0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_NETWORK_CONFIG,0,Packet::VERB_NOP,false,(network) ? network->id() : 0);
return true;
}
@@ -956,7 +1001,7 @@ bool IncomingPacket::_doMULTICAST_GATHER(const RuntimeEnvironment *RR,void *tPtr
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_GATHER,0,Packet::VERB_NOP,trustEstablished,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_GATHER,0,Packet::VERB_NOP,trustEstablished,nwid);
return true;
}
@@ -982,7 +1027,7 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
if (!network->gate(tPtr,peer)) {
RR->t->incomingNetworkAccessDenied(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_MULTICAST_FRAME,true);
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
return true;
}
@@ -1006,19 +1051,19 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
if (network->config().multicastLimit == 0) {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_MULTICAST_FRAME,from,to.mac(),"multicast disabled");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
return true;
}
if ((frameLen > 0)&&(frameLen <= ZT_MAX_MTU)) {
if (!to.mac().isMulticast()) {
RR->t->incomingPacketInvalid(tPtr,_path,packetId(),source(),hops(),Packet::VERB_MULTICAST_FRAME,"destination not multicast");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
if ((!from)||(from.isMulticast())||(from == network->mac())) {
RR->t->incomingPacketInvalid(tPtr,_path,packetId(),source(),hops(),Packet::VERB_MULTICAST_FRAME,"invalid source MAC");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
@@ -1032,7 +1077,7 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
network->learnBridgeRoute(from,peer->address());
} else {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_MULTICAST_FRAME,from,to.mac(),"bridging not allowed (remote)");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
}
@@ -1055,10 +1100,10 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid);
} else {
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
}
return true;
@@ -1070,7 +1115,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
// First, subject this to a rate limit
if (!peer->rateGatePushDirectPaths(now)) {
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -1094,7 +1139,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
case 4: {
const InetAddress a(field(ptr,4),4,at<uint16_t>(ptr + 4));
if (
- ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
+ ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
(!( ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) == 0) && (peer->hasActivePathTo(now,a)) )) && // not already known
(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
{
@@ -1108,7 +1153,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
case 6: {
const InetAddress a(field(ptr,16),16,at<uint16_t>(ptr + 16));
if (
- ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
+ ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
(!( ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) == 0) && (peer->hasActivePathTo(now,a)) )) && // not already known
(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
{
@@ -1123,7 +1168,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
ptr += addrLen;
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -1139,7 +1184,7 @@ bool IncomingPacket::_doUSER_MESSAGE(const RuntimeEnvironment *RR,void *tPtr,con
RR->node->postEvent(tPtr,ZT_EVENT_USER_MESSAGE,reinterpret_cast<const void *>(&um));
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_USER_MESSAGE,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_USER_MESSAGE,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -1163,7 +1208,7 @@ bool IncomingPacket::_doREMOTE_TRACE(const RuntimeEnvironment *RR,void *tPtr,con
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_REMOTE_TRACE,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_REMOTE_TRACE,0,Packet::VERB_NOP,false,0);
return true;
}
diff --git a/node/IncomingPacket.hpp b/node/IncomingPacket.hpp
index 88f4f066..9144277c 100644
--- a/node/IncomingPacket.hpp
+++ b/node/IncomingPacket.hpp
@@ -125,6 +125,8 @@ private:
// been authenticated, decrypted, decompressed, and classified.
bool _doERROR(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doHELLO(const RuntimeEnvironment *RR,void *tPtr,const bool alreadyAuthenticated);
+ bool _doACK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
+ bool _doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doWHOIS(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doRENDEZVOUS(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
diff --git a/node/Packet.hpp b/node/Packet.hpp
index 27da6fb5..6869691e 100644
--- a/node/Packet.hpp
+++ b/node/Packet.hpp
@@ -419,7 +419,7 @@ public:
template<unsigned int C2>
Fragment(const Buffer<C2> &b) :
- Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
+ Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
{
}
@@ -930,7 +930,53 @@ public:
*/
VERB_PUSH_DIRECT_PATHS = 0x10,
- // 0x11, 0x12 -- deprecated
+ // 0x11 -- deprecated
+
+ /**
+ * An acknowledgement of receipt of a series of recent packets from another
+ * peer. This is used to calculate relative throughput values and to detect
+ * packet loss. Only VERB_FRAME and VERB_EXT_FRAME packets are counted.
+ *
+ * ACK response format:
+ * <[4] 32-bit number of bytes received since last ACK>
+ *
+ * Upon receipt of this packet, the local peer will verify that the correct
+ * number of bytes were received by the remote peer. If these values do
+ * not agree that could be an indicator of packet loss.
+ *
+ * Additionally, the local peer knows the interval of time that has
+ * elapsed since the last received ACK. With this information it can compute
+ * a rough estimate of the current throughput.
+ *
+ * This is sent at a maximum rate of once per every ZT_PATH_ACK_INTERVAL
+ */
+ VERB_ACK = 0x12,
+
+ /**
+ * A packet containing timing measurements useful for estimating path quality.
+ * Composed of a list of <packet ID:internal sojourn time> pairs for an
+ * arbitrary set of recent packets. This is used to sample for latency and
+ * packet delay variance (PDV, "jitter").
+ *
+ * QoS record format:
+ *
+ * <[8] 64-bit packet ID of previously-received packet>
+ * <[1] 8-bit packet sojourn time>
+ * <...repeat until end of max 1400 byte packet...>
+ *
+ * The number of possible records per QoS packet is: (1400 * 8) / 72 = 155
+ * This packet should be sent very rarely (every few seconds) as it can be
+ * somewhat large if the connection is saturated. Future versions might use
+ * a bloom table to probablistically determine these values in a vastly
+ * more space-efficient manner.
+ *
+ * Note: The 'internal packet sojourn time' is a slight misnomer as it is a
+ * measure of the amount of time between when a packet was received and the
+ * egress time of its tracking QoS packet.
+ *
+ * This is sent at a maximum rate of once per every ZT_PATH_QOS_INTERVAL
+ */
+ VERB_QOS_MEASUREMENT = 0x13,
/**
* A message with arbitrary user-definable content:
@@ -999,7 +1045,7 @@ public:
template<unsigned int C2>
Packet(const Buffer<C2> &b) :
- Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
+ Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
{
}
diff --git a/node/Path.hpp b/node/Path.hpp
index e6bcecf0..0278d919 100644
--- a/node/Path.hpp
+++ b/node/Path.hpp
@@ -102,24 +102,23 @@ public:
_latency(0xffff),
_addr(),
_ipScope(InetAddress::IP_SCOPE_NONE),
- _currentPacketSampleCounter(0),
- _meanPacketErrorRatio(0.0),
- _meanLatency(0.0),
- _lastLatencyUpdate(0),
- _jitter(0.0),
- _lastPathQualitySampleTime(0),
- _lastComputedQuality(0.0),
- _lastPathQualityEstimate(0),
- _meanAge(0.0),
+ _lastAck(0),
+ _lastThroughputEstimation(0),
+ _lastQoSMeasurement(0),
+ _unackedBytes(0),
+ _expectingAckAsOf(0),
+ _packetsReceivedSinceLastAck(0),
_meanThroughput(0.0),
- _packetLossRatio(0)
+ _maxLifetimeThroughput(0),
+ _bytesAckedSinceLastThroughputEstimation(0),
+ _meanLatency(0.0),
+ _packetDelayVariance(0.0),
+ _packetErrorRatio(0.0),
+ _packetLossRatio(0),
+ _lastComputedStability(0.0),
+ _lastComputedRelativeQuality(0)
{
- memset(_ifname, 0, sizeof(_ifname));
- memset(_addrString, 0, sizeof(_addrString));
- _throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
- _ageSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
- _latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
- _errSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ prepareBuffers();
}
Path(const int64_t localSocket,const InetAddress &addr) :
@@ -131,37 +130,35 @@ public:
_latency(0xffff),
_addr(addr),
_ipScope(addr.ipScope()),
- _currentPacketSampleCounter(0),
- _meanPacketErrorRatio(0.0),
- _meanLatency(0.0),
- _lastLatencyUpdate(0),
- _jitter(0.0),
- _lastPathQualitySampleTime(0),
- _lastComputedQuality(0.0),
- _lastPathQualityEstimate(0),
- _meanAge(0.0),
+ _lastAck(0),
+ _lastThroughputEstimation(0),
+ _lastQoSMeasurement(0),
+ _unackedBytes(0),
+ _expectingAckAsOf(0),
+ _packetsReceivedSinceLastAck(0),
_meanThroughput(0.0),
- _packetLossRatio(0)
+ _maxLifetimeThroughput(0),
+ _bytesAckedSinceLastThroughputEstimation(0),
+ _meanLatency(0.0),
+ _packetDelayVariance(0.0),
+ _packetErrorRatio(0.0),
+ _packetLossRatio(0),
+ _lastComputedStability(0.0),
+ _lastComputedRelativeQuality(0)
{
- memset(_ifname, 0, sizeof(_ifname));
- memset(_addrString, 0, sizeof(_addrString));
- _throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
- _ageSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
- _latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
- _errSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ prepareBuffers();
}
~Path()
{
delete _throughputSamples;
- delete _ageSamples;
delete _latencySamples;
- delete _errSamples;
-
+ delete _qualitySamples;
+ delete _packetValiditySamples;
_throughputSamples = NULL;
- _ageSamples = NULL;
_latencySamples = NULL;
- _errSamples = NULL;
+ _qualitySamples = NULL;
+ _packetValiditySamples = NULL;
}
/**
@@ -209,7 +206,6 @@ public:
else {
_latency = l;
}
- _lastLatencyUpdate = now;
_latencySamples->push(l);
}
@@ -299,194 +295,273 @@ public:
}
/**
- * @return An estimate of path quality -- higher is better.
+ * Take note that we're expecting a VERB_ACK on this path as of a specific time
+ *
+ * @param now Current time
+ * @param packetId ID of the packet
+ * @param payloadLength Number of bytes we're is expecting a reply to
*/
- inline float computeQuality(const int64_t now)
+ inline void expectingAck(int64_t now, int64_t packetId, uint16_t payloadLength)
{
- float latency_contrib = _meanLatency ? (float)1.0 / _meanLatency : 0;
- float jitter_contrib = _jitter ? (float)1.0 / _jitter : 0;
- float throughput_contrib = _meanThroughput ? _meanThroughput / 1000000 : 0; // in Mbps
- float age_contrib = _meanAge > 0 ? (float)sqrt(_meanAge) : 1;
- float error_contrib = (float)1.0 - _meanPacketErrorRatio;
- float sum = (latency_contrib + jitter_contrib + throughput_contrib + error_contrib) / age_contrib;
- _lastComputedQuality = sum * (long)((_ipScope) + 1);
- return _lastComputedQuality;
+ _expectingAckAsOf = ackAge(now) > ZT_PATH_ACK_INTERVAL ? _expectingAckAsOf : now;
+ _unackedBytes += payloadLength;
+ _outgoingPacketRecords[packetId] = now;
}
/**
- * Since quality estimates can become expensive we should cache the most recent result for traffic allocation
- * algorithms which may need to reference this value multiple times through the course of their execution.
+ * Record that we've received a VERB_ACK on this path, also compute throughput if required.
+ *
+ * @param now Current time
+ * @param ackedBytes Number of bytes awknowledged by other peer
*/
- inline float lastComputedQuality() {
- return _lastComputedQuality;
+ inline void receivedAck(int64_t now, int32_t ackedBytes)
+ {
+ _expectingAckAsOf = 0;
+ _unackedBytes = (ackedBytes > _unackedBytes) ? 0 : _unackedBytes - ackedBytes;
+ int64_t timeSinceThroughputEstimate = (now - _lastThroughputEstimation);
+ if (timeSinceThroughputEstimate >= ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL) {
+ uint64_t throughput = (float)(_bytesAckedSinceLastThroughputEstimation) / ((float)timeSinceThroughputEstimate / (float)1000);
+ _throughputSamples->push(throughput);
+ _maxLifetimeThroughput = throughput > _maxLifetimeThroughput ? throughput : _maxLifetimeThroughput;
+ _lastThroughputEstimation = now;
+ _bytesAckedSinceLastThroughputEstimation = 0;
+ } else {
+ _bytesAckedSinceLastThroughputEstimation += ackedBytes;
+ }
}
/**
- * @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to
+ * @return Number of bytes this peer is responsible for ACKing since last ACK
*/
- inline char *getName() { return _ifname; }
+ inline int32_t bytesToAck()
+ {
+ int32_t bytesToAck = 0;
+ for (int i=0; i<_packetsReceivedSinceLastAck; i++) {
+ bytesToAck += _recorded_len[i];
+ }
+ return bytesToAck;
+ }
/**
- * @return Estimated throughput in bps of this link
+ * @return Number of bytes thusfar sent that have not been awknowledged by the remote peer
*/
- inline uint64_t getThroughput() { return _phy->getThroughput((PhySocket *)((uintptr_t)_localSocket)); }
+ inline int64_t unackedSentBytes()
+ {
+ return _unackedBytes;
+ }
/**
- * @return Packet delay varience
+ * Account for the fact that an ACK was just sent. Reset counters, timers, and clear statistics buffers
+ *
+ * @param Current time
*/
- inline float jitter() { return _jitter; }
+ inline void sentAck(int64_t now)
+ {
+ memset(_recorded_id, 0, sizeof(_recorded_id));
+ memset(_recorded_ts, 0, sizeof(_recorded_ts));
+ memset(_recorded_len, 0, sizeof(_recorded_len));
+ _packetsReceivedSinceLastAck = 0;
+ _lastAck = now;
+ }
/**
- * @return Previously-computed mean latency
+ * Receive QoS data, match with recorded egress times from this peer, compute latency
+ * estimates.
+ *
+ * @param now Current time
+ * @param count Number of records
+ * @param rx_id table of packet IDs
+ * @param rx_ts table of holding times
*/
- inline float meanLatency() { return _meanLatency; }
+ inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint8_t *rx_ts)
+ {
+ // Look up egress times and compute latency values for each record
+ for (int j=0; j<count; j++) {
+ std::map<uint64_t,uint64_t>::iterator it = _outgoingPacketRecords.find(rx_id[j]);
+ if (it != _outgoingPacketRecords.end()) {
+ uint16_t rtt = (uint16_t)(now - it->second);
+ uint16_t rtt_compensated = rtt - rx_ts[j];
+ float latency = rtt_compensated / 2.0;
+ updateLatency(latency, now);
+ _outgoingPacketRecords.erase(it);
+ }
+ }
+ }
/**
- * @return Packet loss rate
+ * Generate the contents of a VERB_QOS_MEASUREMENT packet.
+ *
+ * @param now Current time
+ * @param qosBuffer destination buffer
+ * @return Size of payload
*/
- inline float packetLossRatio() { return _packetLossRatio; }
+ inline int32_t generateQoSPacket(int64_t now, char *qosBuffer)
+ {
+ int32_t len = 0;
+ for (int i=0; i<_packetsReceivedSinceLastAck; i++) {
+ uint64_t id = _recorded_id[i];
+ memcpy(qosBuffer, &id, sizeof(uint64_t));
+ qosBuffer+=sizeof(uint64_t);
+ uint8_t holdingTime = (uint8_t)(now - _recorded_ts[i]);
+ memcpy(qosBuffer, &holdingTime, sizeof(uint8_t));
+ qosBuffer+=sizeof(uint8_t);
+ len+=sizeof(uint64_t)+sizeof(uint8_t);
+ }
+ return len;
+ }
/**
- * @return Mean packet error ratio
+ * Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers.
+ *
+ * @param Current time
*/
- inline float meanPacketErrorRatio() { return _meanPacketErrorRatio; }
+ inline void sentQoS(int64_t now) { _lastQoSMeasurement = now; }
/**
- * @return Current packet error ratio (possibly incomplete sample set)
+ * Record statistics on incoming packets. Used later to estimate QoS.
+ *
+ * @param now Current time
+ * @param packetId
+ * @param payloadLength
*/
- inline float currentPacketErrorRatio() {
- int errorsPerSample = 0;
- for (int i=0; i<_currentPacketSampleCounter; i++) {
- if (_packetValidity[i] == false) {
- errorsPerSample++;
- }
- }
- return (float)errorsPerSample / (float)ZT_PATH_ERROR_SAMPLE_WIN_SZ;
+ inline void recordIncomingPacket(int64_t now, int64_t packetId, int32_t payloadLength)
+ {
+ _recorded_ts[_packetsReceivedSinceLastAck] = now;
+ _recorded_id[_packetsReceivedSinceLastAck] = packetId;
+ _recorded_len[_packetsReceivedSinceLastAck] = payloadLength;
+ _packetsReceivedSinceLastAck++;
+ _packetValiditySamples->push(true);
}
/**
- * @return Whether the Path's local socket is in a CLOSED state
+ * @param now Current time
+ * @return Whether an ACK (VERB_ACK) packet needs to be emitted at this time
*/
- inline bool isClosed() { return _phy->isClosed((PhySocket *)((uintptr_t)_localSocket)); }
+ inline bool needsToSendAck(int64_t now) {
+ return ((now - _lastAck) >= ZT_PATH_ACK_INTERVAL ||
+ (_packetsReceivedSinceLastAck == ZT_PATH_QOS_TABLE_SIZE)) && _packetsReceivedSinceLastAck;
+ }
/**
- * @return The state of a Path's local socket
+ * @param now Current time
+ * @return Whether a QoS (VERB_QOS_MEASUREMENT) packet needs to be emitted at this time
*/
- inline int getState() { return _phy->getState((PhySocket *)((uintptr_t)_localSocket)); }
+ inline bool needsToSendQoS(int64_t now) {
+ return ((_packetsReceivedSinceLastAck >= ZT_PATH_QOS_TABLE_SIZE) ||
+ ((now - _lastQoSMeasurement) > ZT_PATH_QOS_INTERVAL)) && _packetsReceivedSinceLastAck;
+ }
/**
- * @return Whether this socket may have been erased by the virtual physical link layer
+ * How much time has elapsed since we've been expecting a VERB_ACK on this path. This value
+ * is used to determine a more relevant path "age". This lets us penalize paths which are no
+ * longer ACKing, but not those that simple aren't being used to carry traffic at the
+ * current time.
*/
- inline bool isValidState() { return _phy->isValidState((PhySocket *)((uintptr_t)_localSocket)); }
+ inline int64_t ackAge(int64_t now) { return _expectingAckAsOf ? now - _expectingAckAsOf : 0; }
/**
- * @return Whether the path quality monitors have collected enough data to provide a quality value
- * TODO: expand this
+ * The maximum observed throughput for this path
*/
- inline bool monitorsReady() {
- return _latencySamples->count() && _ageSamples->count() && _throughputSamples->count();
- }
+ inline uint64_t maxLifetimeThroughput() { return _maxLifetimeThroughput; }
/**
- * @return A pointer to a cached copy of the address string for this Path (For debugging only)
+ * @return The mean throughput (in bits/s) of this link
*/
- inline char *getAddressString() { return _addrString; }
+ inline float meanThroughput() { return _meanThroughput; }
/**
- * Handle path sampling, computation of quality estimates, and other periodic tasks
- * @param now Current time
+ * Assign a new relative quality value for this path in the aggregate link
+ *
+ * @param rq Quality of this path in comparison to other paths available to this peer
*/
- inline void measureLink(int64_t now) {
- // Sample path properties and store them in a continuously-revolving buffer
- if (now - _lastPathQualitySampleTime > ZT_PATH_QUALITY_SAMPLE_INTERVAL) {
- _lastPathQualitySampleTime = now;
- _throughputSamples->push(getThroughput()); // Thoughtput in bits/s
- _ageSamples->push(now - _lastIn); // Age (time since last received packet)
- if (now - _lastLatencyUpdate > ZT_PATH_LATENCY_SAMPLE_INTERVAL) {
- _lastLatencyUpdate = now;
- // Record 0 bp/s. Since we're using this to detect possible packet loss
- updateLatency(0, now);
- }
- }
- // Compute statistical values for use in link quality estimates
- if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
- _lastPathQualityComputeTime = now;
- // Cache Path address string
- address().toString(_addrString);
- _phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, ZT_PATH_INTERFACE_NAME_SZ); // Cache Interface name
- // Derived values
- if (_throughputSamples->count()) {
- _packetLossRatio = (float)_throughputSamples->zeroCount() / (float)_throughputSamples->count();
- }
- _meanThroughput = _throughputSamples->mean();
- _meanAge = _ageSamples->mean();
- _meanLatency = _latencySamples->mean();
- // Jitter
- // SEE: RFC 3393, RFC 4689
- _jitter = _latencySamples->stddev();
- _meanPacketErrorRatio = _errSamples->mean(); // Packet Error Ratio (PER)
- }
- // Periodically compute a path quality estimate
- if (now - _lastPathQualityEstimate > ZT_PATH_QUALITY_ESTIMATE_INTERVAL) {
- computeQuality(now);
- }
- }
+ inline void updateRelativeQuality(float rq) { _lastComputedRelativeQuality = rq; }
/**
- * @param buf Buffer to store resultant string
- * @return Description of path, in ASCII string format
+ * @return Quality of this path compared to others in the aggregate link
*/
- inline char *toString(char *buf) {
- sprintf(buf,"%6s, q=%8.3f, %5.3f Mb/s, j=%8.2f, ml=%8.2f, meanAge=%8.2f, addr=%45s",
- getName(),
- lastComputedQuality(),
- (float)meanThroughput() / (float)1000000,
- jitter(),
- meanLatency(),
- meanAge(),
- getAddressString());
- return buf;
- }
+ inline float relativeQuality() { return _lastComputedRelativeQuality; }
/**
- * Record whether a packet is considered invalid by MAC/compression/cipher checks. This
- * could be an indication of a bit error. This function will keep a running counter of
- * up to a given window size and with each counter overflow it will compute a mean error rate
- * and store that in a continuously shifting sample window.
- *
- * @param isValid Whether the packet in question is considered invalid
+ * @return Stability estimates can become expensive to compute, we cache the most recent result.
*/
- inline void recordPacket(bool isValid) {
- if (_currentPacketSampleCounter < ZT_PATH_ERROR_SAMPLE_WIN_SZ) {
- _packetValidity[_currentPacketSampleCounter] = isValid;
- _currentPacketSampleCounter++;
- }
- else {
- // Sample array is full, compute an mean and stick it in the ring buffer for trend analysis
- _errSamples->push(currentPacketErrorRatio());
- _currentPacketSampleCounter=0;
- }
- }
+ inline float lastComputedStability() { return _lastComputedStability; }
/**
- * @return The mean age (in ms) of this link
+ * @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to
*/
- inline float meanAge() { return _meanAge; }
+ inline char *getName() { return _ifname; }
/**
- * @return The mean throughput (in bits/s) of this link
+ * @return Packet delay varience
*/
- inline float meanThroughput() { return _meanThroughput; }
+ inline float packetDelayVariance() { return _packetDelayVariance; }
/**
- * @return True if this path is alive (receiving heartbeats)
+ * @return Previously-computed mean latency
*/
- inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); }
+ inline float meanLatency() { return _meanLatency; }
+
+ /**
+ * @return Packet loss rate (PLR)
+ */
+ inline float packetLossRatio() { return _packetLossRatio; }
+
+ /**
+ * @return Packet error ratio (PER)
+ */
+ inline float packetErrorRatio() { return _packetErrorRatio; }
+
+ /**
+ * Record an invalid incoming packet. This packet failed MAC/compression/cipher checks and will now
+ * contribute to a Packet Error Ratio (PER).
+ */
+ inline void recordInvalidPacket() { _packetValiditySamples->push(false); }
/**
- * @return True if this path hasn't received a packet in a "significant" amount of time
+ * @return A pointer to a cached copy of the address string for this Path (For debugging only)
*/
- inline bool stale(const int64_t now) const { return ((now - _lastIn) > ZT_LINK_SPEED_TEST_INTERVAL * 10); }
+ inline char *getAddressString() { return _addrString; }
+
+ /**
+ * Compute and cache stability and performance metrics. The resultant stability coefficint is a measure of how "well behaved"
+ * this path is. This figure is substantially different from (but required for the estimation of the path's overall "quality".
+ *
+ * @param now Current time
+ */
+ inline void processBackgroundPathMeasurements(int64_t now, const int64_t peerId) {
+ if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
+ _lastPathQualityComputeTime = now;
+ _phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, 16);
+ address().toString(_addrString);
+ _meanThroughput = _throughputSamples->mean();
+ _meanLatency = _latencySamples->mean();
+ _packetDelayVariance = _latencySamples->stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689)
+ // If no packet validity samples, assume PER==0
+ _packetErrorRatio = 1 - (_packetValiditySamples->count() ? _packetValiditySamples->mean() : 1);
+ // Compute path stability
+ // Normalize measurements with wildly different ranges into a reasonable range
+ float normalized_pdv = Utils::normalize(_packetDelayVariance, 0, ZT_PATH_MAX_PDV, 0, 10);
+ float normalized_la = Utils::normalize(_meanLatency, 0, ZT_PATH_MAX_MEAN_LATENCY, 0, 10);
+ float throughput_cv = _throughputSamples->mean() > 0 ? _throughputSamples->stddev() / _throughputSamples->mean() : 1;
+ // Form an exponential cutoff and apply contribution weights
+ float pdv_contrib = exp((-1)*normalized_pdv) * ZT_PATH_CONTRIB_PDV;
+ float latency_contrib = exp((-1)*normalized_la) * ZT_PATH_CONTRIB_LATENCY;
+ float throughput_disturbance_contrib = exp((-1)*throughput_cv) * ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE;
+ // Obey user-defined ignored contributions
+ pdv_contrib = ZT_PATH_CONTRIB_PDV > 0.0 ? pdv_contrib : 1;
+ latency_contrib = ZT_PATH_CONTRIB_LATENCY > 0.0 ? latency_contrib : 1;
+ throughput_disturbance_contrib = ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE > 0.0 ? throughput_disturbance_contrib : 1;
+ // Compute the quality product
+ _lastComputedStability = pdv_contrib + latency_contrib + throughput_disturbance_contrib;
+ _lastComputedStability *= 1 - _packetErrorRatio;
+ _qualitySamples->push(_lastComputedStability);
+ }
+ }
+
+ /**
+ * @return True if this path is alive (receiving heartbeats)
+ */
+ inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); }
/**
* @return True if this path needs a heartbeat
@@ -508,6 +583,21 @@ public:
*/
inline int64_t lastTrustEstablishedPacketReceived() const { return _lastTrustEstablishedPacketReceived; }
+ /**
+ * Initialize statistical buffers
+ */
+ inline void prepareBuffers() {
+ _throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ _latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ _qualitySamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ _packetValiditySamples = new RingBuffer<bool>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ memset(_ifname, 0, 16);
+ memset(_recorded_id, 0, sizeof(_recorded_id));
+ memset(_recorded_ts, 0, sizeof(_recorded_ts));
+ memset(_recorded_len, 0, sizeof(_recorded_len));
+ memset(_addrString, 0, sizeof(_addrString));
+ }
+
private:
volatile int64_t _lastOut;
volatile int64_t _lastIn;
@@ -519,32 +609,42 @@ private:
InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often
AtomicCounter __refCount;
- // Packet Error Ratio (PER)
- int _packetValidity[ZT_PATH_ERROR_SAMPLE_WIN_SZ];
- int _currentPacketSampleCounter;
- volatile float _meanPacketErrorRatio;
+ uint64_t _recorded_id[ZT_PATH_QOS_TABLE_SIZE];
+ uint64_t _recorded_ts[ZT_PATH_QOS_TABLE_SIZE];
+ uint16_t _recorded_len[ZT_PATH_QOS_TABLE_SIZE];
- // Latency and Jitter
- volatile float _meanLatency;
- int64_t _lastLatencyUpdate;
- volatile float _jitter;
+ std::map<uint64_t, uint64_t> _outgoingPacketRecords;
+
+ int64_t _lastAck;
+ int64_t _lastThroughputEstimation;
+ int64_t _lastQoSMeasurement;
+
+ int64_t _unackedBytes;
+ int64_t _expectingAckAsOf;
+ int16_t _packetsReceivedSinceLastAck;
- int64_t _lastPathQualitySampleTime;
- float _lastComputedQuality;
- int64_t _lastPathQualityEstimate;
- float _meanAge;
float _meanThroughput;
+ uint64_t _maxLifetimeThroughput;
+ uint64_t _bytesAckedSinceLastThroughputEstimation;
- // Circular buffers used to efficiently store large time series
- RingBuffer<uint64_t> *_throughputSamples;
- RingBuffer<uint32_t> *_latencySamples;
- RingBuffer<uint64_t> *_ageSamples;
- RingBuffer<float> *_errSamples;
+ volatile float _meanLatency;
+ float _packetDelayVariance;
+ float _packetErrorRatio;
float _packetLossRatio;
- char _ifname[ZT_PATH_INTERFACE_NAME_SZ];
+ // cached estimates
+ float _lastComputedStability;
+ float _lastComputedRelativeQuality;
+
+ // cached human-readable strings for tracing purposes
+ char _ifname[16];
char _addrString[256];
+
+ RingBuffer<uint64_t> *_throughputSamples;
+ RingBuffer<uint32_t> *_latencySamples;
+ RingBuffer<float> *_qualitySamples;
+ RingBuffer<bool> *_packetValiditySamples;
};
} // namespace ZeroTier
diff --git a/node/Peer.cpp b/node/Peer.cpp
index c46ed751..8deaa362 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -24,8 +24,8 @@
* of your own application.
*/
-#include "../version.h"
+#include "../version.h"
#include "Constants.hpp"
#include "Peer.hpp"
#include "Node.hpp"
@@ -36,6 +36,7 @@
#include "Trace.hpp"
#include "InetAddress.hpp"
#include "RingBuffer.hpp"
+#include "Utils.hpp"
namespace ZeroTier {
@@ -61,13 +62,13 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
_id(peerIdentity),
_directPathPushCutoffCount(0),
_credentialsCutoffCount(0),
- _linkBalanceStatus(false),
- _linkRedundancyStatus(false)
+ _linkIsBalanced(false),
+ _linkIsRedundant(false),
+ _lastAggregateStatsReport(0)
{
if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH))
throw ZT_EXCEPTION_INVALID_ARGUMENT;
_pathChoiceHist = new RingBuffer<int>(ZT_MULTIPATH_PROPORTION_WIN_SZ);
- _flowBalanceHist = new RingBuffer<float>(ZT_MULTIPATH_PROPORTION_WIN_SZ);
}
void Peer::received(
@@ -75,6 +76,7 @@ void Peer::received(
const SharedPtr<Path> &path,
const unsigned int hops,
const uint64_t packetId,
+ const unsigned int payloadLength,
const Packet::Verb verb,
const uint64_t inRePacketId,
const Packet::Verb inReVerb,
@@ -103,13 +105,13 @@ void Peer::received(
{
Mutex::Lock _l(_paths_m);
if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
- if ((now - _lastPathPrune) > ZT_CLOSED_PATH_PRUNING_INTERVAL) {
- _lastPathPrune = now;
- prunePaths();
+ recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now);
+ if (path->needsToSendQoS(now)) {
+ sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now);
}
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- _paths[i].p->measureLink(now);
+ _paths[i].p->processBackgroundPathMeasurements(now, _id.address().toInt());
}
}
}
@@ -117,7 +119,6 @@ void Peer::received(
if (hops == 0) {
// If this is a direct packet (no hops), update existing paths or learn new ones
-
bool havePath = false;
{
Mutex::Lock _l(_paths_m);
@@ -164,6 +165,19 @@ void Peer::received(
}
}
+ // If we find a pre-existing path with the same address, just replace it.
+ // If we don't find anything we can replace, just use the replacePath that we previously decided on.
+ if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if ( _paths[i].p->address().ss_family == path->address().ss_family && _paths[i].p->address().ipsEqual2(path->address())) {
+ replacePath = i;
+ break;
+ }
+ }
+ }
+ }
+
if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) {
if (verb == Packet::VERB_OK) {
RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId);
@@ -252,6 +266,117 @@ void Peer::received(
}
}
+void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
+ uint16_t payloadLength, const Packet::Verb verb, int64_t now)
+{
+ if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
+ if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) {
+ path->expectingAck(now, packetId, payloadLength);
+ }
+ }
+}
+
+void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId,
+ uint16_t payloadLength, const Packet::Verb verb, int64_t now)
+{
+ if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) {
+ if (path->needsToSendAck(now)) {
+ sendACK(tPtr, path, path->localSocket(), path->address(), now);
+ }
+ path->recordIncomingPacket(now, packetId, payloadLength);
+ }
+}
+
+float Peer::computeAggregateLinkRelativeQuality(int64_t now)
+{
+ float maxStability = 0;
+ float totalRelativeQuality = 0;
+ float maxThroughput = 1;
+ float maxScope = 0;
+ float relStability[ZT_MAX_PEER_NETWORK_PATHS];
+ float relThroughput[ZT_MAX_PEER_NETWORK_PATHS];
+ memset(&relStability, 0, sizeof(relStability));
+ memset(&relThroughput, 0, sizeof(relThroughput));
+ // Survey all paths
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ relStability[i] = _paths[i].p->lastComputedStability();
+ relThroughput[i] = _paths[i].p->maxLifetimeThroughput();
+ maxStability = relStability[i] > maxStability ? relStability[i] : maxStability;
+ maxThroughput = relThroughput[i] > maxThroughput ? relThroughput[i] : maxThroughput;
+ maxScope = _paths[i].p->ipScope() > maxScope ? _paths[i].p->ipScope() : maxScope;
+ }
+ }
+ // Convert to relative values
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ relStability[i] /= maxStability ? maxStability : 1;
+ relThroughput[i] /= maxThroughput ? maxThroughput : 1;
+ float normalized_ma = Utils::normalize(_paths[i].p->ackAge(now), 0, ZT_PATH_MAX_AGE, 0, 10);
+ float age_contrib = exp((-1)*normalized_ma);
+ float relScope = ((float)(_paths[i].p->ipScope()+1) / (maxScope + 1));
+ float relQuality =
+ (relStability[i] * ZT_PATH_CONTRIB_STABILITY)
+ + (fmax(1, relThroughput[i]) * ZT_PATH_CONTRIB_THROUGHPUT)
+ + relScope * ZT_PATH_CONTRIB_SCOPE;
+ relQuality *= age_contrib;
+ totalRelativeQuality += relQuality;
+ _paths[i].p->updateRelativeQuality(relQuality);
+ }
+ }
+ return (float)1.0 / totalRelativeQuality; // Used later to convert relative quantities into flow allocations
+}
+
+float Peer::computeAggregateLinkPacketDelayVariance()
+{
+ float pdv = 0.0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ pdv += _paths[i].p->relativeQuality() * _paths[i].p->packetDelayVariance();
+ }
+ }
+ return pdv;
+}
+
+float Peer::computeAggregateLinkMeanLatency()
+{
+ float ml = 0.0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ ml += _paths[i].p->relativeQuality() * _paths[i].p->meanLatency();
+ }
+ }
+ return ml;
+}
+
+int Peer::aggregateLinkPhysicalPathCount()
+{
+ std::map<std::string, bool> ifnamemap;
+ int pathCount = 0;
+ int64_t now = RR->node->now();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ if (!ifnamemap[_paths[i].p->getName()]) {
+ ifnamemap[_paths[i].p->getName()] = true;
+ pathCount++;
+ }
+ }
+ }
+ return pathCount;
+}
+
+int Peer::aggregateLinkLogicalPathCount()
+{
+ int pathCount = 0;
+ int64_t now = RR->node->now();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ pathCount++;
+ }
+ }
+ return pathCount;
+}
+
SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
{
Mutex::Lock _l(_paths_m);
@@ -264,7 +389,7 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
if (RR->node->getMultipathMode() == ZT_MULTIPATH_NONE) {
long bestPathQuality = 2147483647;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
- if (_paths[i].p && _paths[i].p->isValidState()) {
+ if (_paths[i].p) {
if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
const long q = _paths[i].p->quality(now) / _paths[i].priority;
if (q <= bestPathQuality) {
@@ -280,23 +405,14 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
return SharedPtr<Path>();
}
- if ((now - _lastPathPrune) > ZT_CLOSED_PATH_PRUNING_INTERVAL) {
- _lastPathPrune = now;
- prunePaths();
- }
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- _paths[i].p->measureLink(now);
+ _paths[i].p->processBackgroundPathMeasurements(now, _id.address().toInt());
}
}
/**
* Randomly distribute traffic across all paths
- *
- * Behavior:
- * - If path DOWN: Stop randomly choosing that path
- * - If path UP: Start randomly choosing that path
- * - If all paths are unresponsive: randomly choose from all paths
*/
int numAlivePaths = 0;
int numStalePaths = 0;
@@ -307,15 +423,13 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
memset(&stalePaths, -1, sizeof(stalePaths));
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- if (_paths[i].p->isValidState()) {
- if (_paths[i].p->alive(now)) {
- alivePaths[numAlivePaths] = i;
- numAlivePaths++;
- }
- else {
- stalePaths[numStalePaths] = i;
- numStalePaths++;
- }
+ if (_paths[i].p->alive(now)) {
+ alivePaths[numAlivePaths] = i;
+ numAlivePaths++;
+ }
+ else {
+ stalePaths[numStalePaths] = i;
+ numStalePaths++;
}
}
}
@@ -337,160 +451,104 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
* Proportionally allocate traffic according to dynamic path quality measurements
*/
if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) {
- float relq[ZT_MAX_PEER_NETWORK_PATHS];
- memset(&relq, 0, sizeof(relq));
float alloc[ZT_MAX_PEER_NETWORK_PATHS];
memset(&alloc, 0, sizeof(alloc));
-
- // Survey
- //
- // Take a survey of all available link qualities. We use this to determine if we
- // can skip this algorithm altogether and if not, to establish baseline for physical
- // link quality used in later calculations.
- //
- // We find the min/max quality of our currently-active links so
- // that we can form a relative scale to rank each link proportionally
- // to each other link.
- uint16_t alivePaths[ZT_MAX_PEER_NETWORK_PATHS];
- uint16_t stalePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ int numAlivePaths = 0;
+ int numStalePaths = 0;
+ int alivePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ int stalePaths[ZT_MAX_PEER_NETWORK_PATHS];
memset(&alivePaths, -1, sizeof(alivePaths));
memset(&stalePaths, -1, sizeof(stalePaths));
- uint16_t numAlivePaths = 0;
- uint16_t numStalePaths = 0;
- float minQuality = 10000;
- float maxQuality = -1;
- float currQuality;
- for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
- if (_paths[i].p && _paths[i].p->isValidState()) {
- if (!_paths[i].p->monitorsReady()) {
- // TODO: This should fix itself anyway but we should test whether forcing the use of a new path will
- // aid in establishing flow balance more quickly.
- }
- // Compute quality here, going forward we will use lastComputedQuality()
- currQuality = _paths[i].p->computeQuality(now);
- if (!_paths[i].p->stale(now)) {
+ // Attempt to find an excuse not to use the rest of this algorithm
+ // Alive or Stale?
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if (_paths[i].p->alive(now)) {
+ alivePaths[numAlivePaths] = i;
numAlivePaths++;
- }
- else {
+ } else {
+ stalePaths[numStalePaths] = i;
numStalePaths++;
}
- if (currQuality > maxQuality) {
- maxQuality = currQuality;
- bestPath = i;
- }
- if (currQuality < minQuality) {
- minQuality = currQuality;
- }
- relq[i] = currQuality;
+ // Record a default path to use as a short-circuit for the rest of the algorithm
+ bestPath = i;
}
}
-
- // Attempt to find an excuse not to use the rest of this algorithm
- if (bestPath == ZT_MAX_PEER_NETWORK_PATHS || (numAlivePaths == 0 && numStalePaths == 0)) {
+ if (numAlivePaths == 0 && numStalePaths == 0) {
return SharedPtr<Path>();
- } if (numAlivePaths == 1) {
- //return _paths[bestPath].p;
- } if (numStalePaths == 1) {
- //return _paths[bestPath].p;
- }
-
- // Relative quality
- //
- // The strongest link will have a value of 1.0 whereas every other
- // link will have a value which represents some fraction of the strongest link.
- float totalRelativeQuality = 0;
- for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
- if (_paths[i].p && _paths[i].p->isValidState()) {
- relq[i] /= maxQuality ? maxQuality : 1;
- totalRelativeQuality += relq[i];
- }
+ } if (numAlivePaths == 1 || numStalePaths == 1) {
+ return _paths[bestPath].p;
}
-
- // Convert the relative quality values into flow allocations.
- // Additionally, determine whether each path in the flow is
- // contributing more or less than its target allocation. If
- // it is contributing more than required, don't allow it to be
- // randomly selected for the next packet. If however the path
- // needs to contribute more to the flow, we should record
- float imbalance = 0;
- float qualityScalingFactor = (float)1.0 / totalRelativeQuality;
+ // Compare paths to each-other
+ float qualityScalingFactor = computeAggregateLinkRelativeQuality(now);
+ // Convert set of relative performances into an allocation set
for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
- // Out of the last N packets to this peer, how many were sent by this path?
- int numPktSentWithinWin = (int)_pathChoiceHist->countValue(i);
- // Compute traffic allocation for each path in the flow
- if (_paths[i].p && _paths[i].p->isValidState()) {
- // Allocation
- // This is the percentage of traffic we want to send over a given path
- alloc[i] = relq[i] * qualityScalingFactor;
- float currProportion = numPktSentWithinWin / (float)ZT_MULTIPATH_PROPORTION_WIN_SZ;
- float targetProportion = alloc[i];
- float diffProportion = currProportion - targetProportion;
- // Imbalance
- //
- // This is the sum of the distances of each path's currently observed flow contributions
- // from its most recent target allocation. In other words, this is a measure of how closely we
- // are adhering to our desired allocations. It is worth noting that this value can be greater
- // than 1.0 if a significant change to allocations is made by the algorithm, this will
- // eventually correct itself.
- imbalance += fabs(diffProportion);
- if (diffProportion < 0) {
- alloc[i] = targetProportion;
- }
- else {
- alloc[i] = targetProportion;
- }
- }
- }
-
- // Compute and record current flow balance
- float balance = (float)1.0 - imbalance;
- if (balance >= ZT_MULTIPATH_FLOW_BALANCE_THESHOLD) {
- if (!_linkBalanceStatus) {
- _linkBalanceStatus = true;
- RR->t->peerLinkBalanced(NULL,0,*this);
- }
- }
- else {
- if (_linkBalanceStatus) {
- _linkBalanceStatus = false;
- RR->t->peerLinkImbalanced(NULL,0,*this);
+ if (_paths[i].p) {
+ alloc[i] = _paths[i].p->relativeQuality() * qualityScalingFactor;
}
}
-
- // Record the current flow balance. Later used for computing a mean flow balance value.
- _flowBalanceHist->push(balance);
-
- // Randomly choose path from allocated candidates
+ // Randomly choose path according to their allocations
unsigned int r;
Utils::getSecureRandom(&r, 1);
float rf = (float)(r %= 100) / 100;
for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
- if (_paths[i].p && _paths[i].p->isValidState() && _paths[i].p->address().isV4()) {
- if (alloc[i] > 0 && rf < alloc[i]) {
+ if (_paths[i].p) {
+ if (rf < alloc[i]) {
bestPath = i;
_pathChoiceHist->push(bestPath); // Record which path we chose
break;
}
- if (alloc[i] > 0) {
- rf -= alloc[i];
- }
- else {
- rf -= alloc[i]*-1;
- }
+ rf -= alloc[i];
}
}
if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) {
return _paths[bestPath].p;
}
- return SharedPtr<Path>();
}
+ return SharedPtr<Path>();
+}
- // Adhere to a user-defined interface/allocation scheme
- if (RR->node->getMultipathMode() == ZT_MULTIPATH_MANUALLY_BALANCED) {
- // TODO
+char *Peer::interfaceListStr()
+{
+ std::map<std::string, int> ifnamemap;
+ char tmp[32];
+ const int64_t now = RR->node->now();
+ char *ptr = _interfaceListStr;
+ bool imbalanced = false;
+ memset(_interfaceListStr, 0, sizeof(_interfaceListStr));
+ int alivePathCount = aggregateLinkLogicalPathCount();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ int ipv = _paths[i].p->address().isV4();
+ // If this is acting as an aggregate link, check allocations
+ float targetAllocation = 1.0 / alivePathCount;
+ float currentAllocation = 1.0;
+ if (alivePathCount > 1) {
+ currentAllocation = (float)_pathChoiceHist->countValue(i) / (float)_pathChoiceHist->count();
+ if (fabs(targetAllocation - currentAllocation) > ZT_PATH_IMBALANCE_THRESHOLD) {
+ imbalanced = true;
+ }
+ }
+ char *ipvStr = ipv ? (char*)"ipv4" : (char*)"ipv6";
+ sprintf(tmp, "(%s, %s, %5.4f)", _paths[i].p->getName(), ipvStr, currentAllocation);
+ // Prevent duplicates
+ if(ifnamemap[_paths[i].p->getName()] != ipv) {
+ memcpy(ptr, tmp, strlen(tmp));
+ ptr += strlen(tmp);
+ *ptr = ' ';
+ ptr++;
+ ifnamemap[_paths[i].p->getName()] = ipv;
+ }
+ }
}
-
- return SharedPtr<Path>();
+ ptr--; // Overwrite trailing space
+ if (imbalanced) {
+ sprintf(tmp, ", is IMBALANCED");
+ memcpy(ptr, tmp, sizeof(tmp));
+ } else {
+ *ptr = '\0';
+ }
+ return _interfaceListStr;
}
void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &other) const
@@ -614,6 +672,35 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o
}
}
+void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
+{
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK);
+ uint32_t bytesToAck = path->bytesToAck();
+ outp.append<uint32_t>(bytesToAck);
+ if (atAddress) {
+ outp.armor(_key,false);
+ RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
+ } else {
+ RR->sw->send(tPtr,outp,false);
+ }
+ path->sentAck(now);
+}
+
+void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
+{
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT);
+ char qosData[ZT_PATH_MAX_QOS_PACKET_SZ];
+ path->generateQoSPacket(now,qosData);
+ outp.append(qosData,sizeof(qosData));
+ if (atAddress) {
+ outp.armor(_key,false);
+ RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
+ } else {
+ RR->sw->send(tPtr,outp,false);
+ }
+ path->sentQoS(now);
+}
+
void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
{
Packet outp(_id.address(),RR->identity.address(),Packet::VERB_HELLO);
@@ -688,6 +775,25 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
_lastSentFullHello = now;
+ // Emit traces regarding the status of aggregate links
+ if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
+ int alivePathCount = aggregateLinkPhysicalPathCount();
+ if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) {
+ _lastAggregateStatsReport = now;
+ if (alivePathCount) {
+ RR->t->peerLinkAggregateStatistics(NULL,*this);
+ }
+ }
+ // Report link redundancy
+ if (alivePathCount < 2 && _linkIsRedundant) {
+ _linkIsRedundant = !_linkIsRedundant;
+ RR->t->peerLinkNoLongerRedundant(NULL,*this);
+ } if (alivePathCount > 1 && !_linkIsRedundant) {
+ _linkIsRedundant = !_linkIsRedundant;
+ RR->t->peerLinkNowRedundant(NULL,*this);
+ }
+ }
+
// Right now we only keep pinging links that have the maximum priority. The
// priority is used to track cluster redirections, meaning that when a cluster
// redirects us its redirect target links override all other links and we
@@ -726,22 +832,6 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
return sent;
}
-unsigned int Peer::prunePaths()
-{
- unsigned int pruned = 0;
- for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
- if (_paths[i].p) {
- if(_paths[i].p->isClosed() || !_paths[i].p->isValidState()) {
- _paths[i].lr = 0;
- _paths[i].p.zero();
- _paths[i].priority = 1;
- pruned++;
- }
- }
- }
- return pruned;
-}
-
void Peer::clusterRedirect(void *tPtr,const SharedPtr<Path> &originatingPath,const InetAddress &remoteAddress,const int64_t now)
{
SharedPtr<Path> np(RR->topology->getPath(originatingPath->localSocket(),remoteAddress));
diff --git a/node/Peer.hpp b/node/Peer.hpp
index 9873729b..2f723b07 100644
--- a/node/Peer.hpp
+++ b/node/Peer.hpp
@@ -68,9 +68,7 @@ public:
~Peer() {
Utils::burn(_key,sizeof(_key));
delete _pathChoiceHist;
- delete _flowBalanceHist;
_pathChoiceHist = NULL;
- _flowBalanceHist = NULL;
}
/**
@@ -114,6 +112,7 @@ public:
const SharedPtr<Path> &path,
const unsigned int hops,
const uint64_t packetId,
+ const unsigned int payloadLength,
const Packet::Verb verb,
const uint64_t inRePacketId,
const Packet::Verb inReVerb,
@@ -158,7 +157,74 @@ public:
}
/**
- * Get the most appropriate direct path based on current multipath configuration
+ * Record statistics on outgoing packets
+ *
+ * @param path Path over which packet was sent
+ * @param id Packet ID
+ * @param len Length of packet payload
+ * @param verb Packet verb
+ * @param now Current time
+ */
+ void recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now);
+
+ /**
+ * Record statistics on incoming packets
+ *
+ * @param path Path over which packet was sent
+ * @param id Packet ID
+ * @param len Length of packet payload
+ * @param verb Packet verb
+ * @param now Current time
+ */
+ void recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now);
+
+ /**
+ * Send an ACK to peer for the most recent packets received
+ *
+ * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
+ * @param localSocket Raw socket the ACK packet will be sent over
+ * @param atAddress Destination for the ACK packet
+ * @param now Current time
+ */
+ void sendACK(void *tPtr, const SharedPtr<Path> &path, const int64_t localSocket,const InetAddress &atAddress,int64_t now);
+
+ /**
+ * Send a QoS packet to peer so that it can evaluate the quality of this link
+ *
+ * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
+ * @param localSocket Raw socket the QoS packet will be sent over
+ * @param atAddress Destination for the QoS packet
+ * @param now Current time
+ */
+ void sendQOS_MEASUREMENT(void *tPtr, const SharedPtr<Path> &path, const int64_t localSocket,const InetAddress &atAddress,int64_t now);
+
+ /**
+ * @return The relative quality values for each path
+ */
+ float computeAggregateLinkRelativeQuality(int64_t now);
+
+ /**
+ * @return The aggregate link Packet Delay Variance (PDV)
+ */
+ float computeAggregateLinkPacketDelayVariance();
+
+ /**
+ * @return The aggregate link mean latenct
+ */
+ float computeAggregateLinkMeanLatency();
+
+ /**
+ * @return The number of currently alive "physical" paths in the aggregate link
+ */
+ int aggregateLinkPhysicalPathCount();
+
+ /**
+ * @return The number of currently alive "logical" paths in the aggregate link
+ */
+ int aggregateLinkLogicalPathCount();
+
+ /**
+ * Get the most appropriate direct path based on current multipath and QoS configuration
*
* @param now Current time
* @param includeExpired If true, include even expired paths
@@ -167,6 +233,12 @@ public:
SharedPtr<Path> getAppropriatePath(int64_t now, bool includeExpired);
/**
+ * Generate a human-readable string of interface names making up the aggregate link, also include
+ * moving allocation and IP version number for each (for tracing)
+ */
+ char *interfaceListStr();
+
+ /**
* Send VERB_RENDEZVOUS to this and another peer via the best common IP scope and path
*/
void introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &other) const;
@@ -549,11 +621,12 @@ private:
AtomicCounter __refCount;
RingBuffer<int> *_pathChoiceHist;
- RingBuffer<float> *_flowBalanceHist;
- bool _linkBalanceStatus;
- bool _linkRedundancyStatus;
+ bool _linkIsBalanced;
+ bool _linkIsRedundant;
+ uint64_t _lastAggregateStatsReport;
+ char _interfaceListStr[256]; // 16 characters * 16 paths in a link
};
} // namespace ZeroTier
diff --git a/node/RingBuffer.hpp b/node/RingBuffer.hpp
index cd384749..32ae037c 100644
--- a/node/RingBuffer.hpp
+++ b/node/RingBuffer.hpp
@@ -173,6 +173,11 @@ public:
}
/**
+ * @return The most recently pushed element on the buffer
+ */
+ T get_most_recent() { return *(buf + end); }
+
+ /**
* @param dest Destination buffer
* @param n Size (in terms of number of elements) of the destination buffer
* @return Number of elements read from the buffer
@@ -218,10 +223,7 @@ public:
/**
* @return The number of slots that are unused in the buffer
*/
- size_t getFree()
- {
- return size - count();
- }
+ size_t getFree() { return size - count(); }
/**
* @return The arithmetic mean of the contents of the buffer
@@ -229,45 +231,67 @@ public:
float mean()
{
size_t iterator = begin;
- float mean = 0;
- for (size_t i=0; i<size; i++) {
- iterator = (iterator + size - 1) % size;
- mean += *(buf + iterator);
+ float subtotal = 0;
+ size_t curr_cnt = count();
+ for (size_t i=0; i<curr_cnt; i++) {
+ iterator = (iterator + size - 1) % curr_cnt;
+ subtotal += (float)*(buf + iterator);
}
- return count() ? mean / (float)count() : 0;
+ return curr_cnt ? subtotal / (float)curr_cnt : 0;
}
/**
- * @return The sample standard deviation of the contents of the ring buffer
+ * @return The arithmetic mean of the most recent 'n' elements of the buffer
*/
- float stddev()
+ float mean(size_t n)
+ {
+ n = n < size ? n : size;
+ size_t iterator = begin;
+ float subtotal = 0;
+ size_t curr_cnt = count();
+ for (size_t i=0; i<n; i++) {
+ iterator = (iterator + size - 1) % curr_cnt;
+ subtotal += (float)*(buf + iterator);
+ }
+ return curr_cnt ? subtotal / (float)curr_cnt : 0;
+ }
+
+ /**
+ * @return The sample standard deviation of element values
+ */
+ float stddev() { return sqrt(variance()); }
+
+ /**
+ * @return The variance of element values
+ */
+ float variance()
{
size_t iterator = begin;
float cached_mean = mean();
+ size_t curr_cnt = count();
if (size) {
T sum_of_squared_deviations = 0;
- for (size_t i=0; i<size; i++) {
- iterator = (iterator + size - 1) % size;
+ for (size_t i=0; i<curr_cnt; i++) {
+ iterator = (iterator + size - 1) % curr_cnt;
float deviation = (buf[i] - cached_mean);
- float sdev = deviation*deviation;
- sum_of_squared_deviations += sdev;
+ sum_of_squared_deviations += (deviation*deviation);
}
- float variance = sum_of_squared_deviations / (size - 1);
- float sd = sqrt(variance);
- return sd;
+ float variance = (float)sum_of_squared_deviations / (float)(size - 1);
+ return variance;
}
return 0;
}
/**
- * @return The number of elements of zero value, O(n)
+ * @return The number of elements of zero value
*/
size_t zeroCount()
{
size_t iterator = begin;
size_t zeros = 0;
- for (size_t i=0; i<size; i++) {
- iterator = (iterator + size - 1) % size;
+ size_t curr_cnt = count();
+ for (size_t i=0; i<curr_cnt; i++) {
+ iterator = (iterator + size - 1) % curr_cnt;
if (*(buf + iterator) == 0) {
zeros++;
}
@@ -282,14 +306,15 @@ public:
size_t countValue(T value)
{
size_t iterator = begin;
- size_t count = 0;
- for (size_t i=0; i<size; i++) {
- iterator = (iterator + size - 1) % size;
+ size_t cnt = 0;
+ size_t curr_cnt = count();
+ for (size_t i=0; i<curr_cnt; i++) {
+ iterator = (iterator + size - 1) % curr_cnt;
if (*(buf + iterator) == value) {
- count++;
+ cnt++;
}
}
- return count;
+ return cnt;
}
/**
@@ -301,10 +326,10 @@ public:
for (size_t i=0; i<size; i++) {
iterator = (iterator + size - 1) % size;
if (typeid(T) == typeid(int)) {
- // DEBUG_INFO("buf[%2zu]=%2d", iterator, (int)*(buf + iterator));
+ //DEBUG_INFO("buf[%2zu]=%2d", iterator, (int)*(buf + iterator));
}
else {
- // DEBUG_INFO("buf[%2zu]=%2f", iterator, (float)*(buf + iterator));
+ //DEBUG_INFO("buf[%2zu]=%2f", iterator, (float)*(buf + iterator));
}
}
}
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 3f6c4e41..d53bf53e 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -666,6 +666,8 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
unsigned int chunkSize = std::min(packet.size(),mtu);
packet.setFragmented(chunkSize < packet.size());
+ peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now);
+
if (trustedPathId) {
packet.setTrusted(trustedPathId);
} else {
diff --git a/node/Trace.cpp b/node/Trace.cpp
index 01a8da55..f47a029b 100644
--- a/node/Trace.cpp
+++ b/node/Trace.cpp
@@ -106,24 +106,24 @@ void Trace::peerConfirmingUnknownPath(void *const tPtr,const uint64_t networkId,
}
}
-void Trace::peerLinkNowRedundant(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath)
+void Trace::peerLinkNowRedundant(void *const tPtr,Peer &peer)
{
- ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx on network %.16llx is fully redundant",peer.address().toInt(),networkId);
+ ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is fully redundant",peer.address().toInt());
}
-void Trace::peerLinkNoLongerRedundant(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath)
+void Trace::peerLinkNoLongerRedundant(void *const tPtr,Peer &peer)
{
- ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx on network %.16llx is no longer redundant",peer.address().toInt(),networkId);
+ ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is no longer redundant",peer.address().toInt());
}
-void Trace::peerLinkBalanced(void *const tPtr,const uint64_t networkId,Peer &peer)
+void Trace::peerLinkAggregateStatistics(void *const tPtr,Peer &peer)
{
- ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx on network %.16llx is balanced",peer.address().toInt(),networkId);
-}
-
-void Trace::peerLinkImbalanced(void *const tPtr,const uint64_t networkId,Peer &peer)
-{
- ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx on network %.16llx is unbalanced",peer.address().toInt(),networkId);
+ ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is composed of (%d) physical paths %s, has packet delay variance (%.0f ms), mean latency (%.0f ms)",
+ peer.address().toInt(),
+ peer.aggregateLinkPhysicalPathCount(),
+ peer.interfaceListStr(),
+ peer.computeAggregateLinkPacketDelayVariance(),
+ peer.computeAggregateLinkMeanLatency());
}
void Trace::peerLearnedNewPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath,const uint64_t packetId)
diff --git a/node/Trace.hpp b/node/Trace.hpp
index b01163d6..734e84a5 100644
--- a/node/Trace.hpp
+++ b/node/Trace.hpp
@@ -122,10 +122,10 @@ public:
void peerConfirmingUnknownPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &path,const uint64_t packetId,const Packet::Verb verb);
- void peerLinkNowRedundant(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath);
- void peerLinkNoLongerRedundant(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath);
- void peerLinkBalanced(void *const tPtr,const uint64_t networkId,Peer &peer);
- void peerLinkImbalanced(void *const tPtr,const uint64_t networkId,Peer &peer);
+ void peerLinkNowRedundant(void *const tPtr,Peer &peer);
+ void peerLinkNoLongerRedundant(void *const tPtr,Peer &peer);
+
+ void peerLinkAggregateStatistics(void *const tPtr,Peer &peer);
void peerLearnedNewPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath,const uint64_t packetId);
void peerRedirected(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath);
diff --git a/node/Utils.hpp b/node/Utils.hpp
index a24f2c9a..6ce67328 100644
--- a/node/Utils.hpp
+++ b/node/Utils.hpp
@@ -261,6 +261,14 @@ public:
return l;
}
+ static inline float normalize(float value, int64_t bigMin, int64_t bigMax, int32_t targetMin, int32_t targetMax)
+ {
+ int64_t bigSpan = bigMax - bigMin;
+ int64_t smallSpan = targetMax - targetMin;
+ float valueScaled = (value - (float)bigMin) / (float)bigSpan;
+ return (float)targetMin + valueScaled * (float)smallSpan;
+ }
+
/**
* Generate secure random bytes
*
diff --git a/osdep/Binder.hpp b/osdep/Binder.hpp
index 6e13836c..1f06021b 100644
--- a/osdep/Binder.hpp
+++ b/osdep/Binder.hpp
@@ -456,20 +456,6 @@ public:
return false;
}
- /**
- * Get a list of socket pointers for all bindings.
- *
- * @return A list of socket pointers for current bindings
- */
- inline std::vector<PhySocket*> getBoundSockets()
- {
- std::vector<PhySocket*> sockets;
- for (int i=0; i<ZT_BINDER_MAX_BINDINGS; i++) {
- sockets.push_back(_bindings[i].udpSock);
- }
- return sockets;
- }
-
private:
_Binding _bindings[ZT_BINDER_MAX_BINDINGS];
std::atomic<unsigned int> _bindingCount;
diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp
index 2e276a2a..5e659767 100644
--- a/osdep/Phy.hpp
+++ b/osdep/Phy.hpp
@@ -27,8 +27,6 @@
#ifndef ZT_PHY_HPP
#define ZT_PHY_HPP
-#include "../osdep/OSUtils.hpp"
-
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -88,22 +86,6 @@ namespace ZeroTier {
*/
typedef void PhySocket;
-struct link_test_record
-{
- link_test_record(PhySocket *_s, uint64_t _id, uint64_t _egress_time, uint32_t _length) :
- s(_s),
- id(_id),
- egress_time(_egress_time),
- length(_length)
- {
- //
- }
- PhySocket *s;
- uint64_t id;
- uint64_t egress_time;
- uint32_t length;
-};
-
/**
* Simple templated non-blocking sockets implementation
*
@@ -170,19 +152,13 @@ private:
ZT_PHY_SOCKET_UNIX_LISTEN = 0x08
};
- struct PhySocketImpl
- {
- PhySocketImpl() :
- throughput(0)
- {
- memset(ifname, 0, sizeof(ifname));
- }
+ struct PhySocketImpl {
+ PhySocketImpl() { memset(ifname, 0, sizeof(ifname)); }
PhySocketType type;
ZT_PHY_SOCKFD_TYPE sock;
void *uptr; // user-settable pointer
ZT_PHY_SOCKADDR_STORAGE_TYPE saddr; // remote for TCP_OUT and TCP_IN, local for TCP_LISTEN, RAW, and UDP
char ifname[16];
- uint64_t throughput;
};
std::list<PhySocketImpl> _socks;
@@ -198,7 +174,6 @@ private:
bool _noDelay;
bool _noCheck;
- std::vector<struct link_test_record*> link_test_records;
public:
/**
@@ -282,7 +257,9 @@ public:
*/
static inline void getIfName(PhySocket *s, char *nameBuf, int buflen)
{
- memcpy(nameBuf, reinterpret_cast<PhySocketImpl *>(s)->ifname, buflen);
+ if (s) {
+ memcpy(nameBuf, reinterpret_cast<PhySocketImpl *>(s)->ifname, buflen);
+ }
}
/**
@@ -292,18 +269,9 @@ public:
*/
static inline void setIfName(PhySocket *s, char *ifname, int len)
{
- memcpy(&(reinterpret_cast<PhySocketImpl *>(s)->ifname), ifname, len);
- }
-
- /**
- * Get result of most recent throughput test
- *
- * @param s Socket object
- */
- inline uint64_t getThroughput(PhySocket *s)
- {
- PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
- return sws ? sws->throughput : 0;
+ if (s) {
+ memcpy(&(reinterpret_cast<PhySocketImpl *>(s)->ifname), ifname, len);
+ }
}
/**
@@ -339,105 +307,9 @@ public:
*/
inline bool isValidState(PhySocket *s)
{
- PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
- return sws->type >= ZT_PHY_SOCKET_CLOSED && sws->type <= ZT_PHY_SOCKET_UNIX_LISTEN;
- }
-
- /**
- * Send a datagram of a known size to a selected peer and record egress time. The peer
- * shall eventually respond by echoing back a smaller datagram.
- *
- * @param s Socket object
- * @param remoteAddress Address of remote peer to receive link test packet
- * @param data Buffer containing random packet data
- * @param len Length of packet data buffer
- * @return Number of bytes successfully written to socket
- */
- inline int test_link_speed(PhySocket *s, const struct sockaddr *to, void *data, uint32_t len) {
- if (!reinterpret_cast<PhySocketImpl *>(s)) {
- return 0;
- }
- uint64_t *buf = (uint64_t*)data;
- uint64_t id = buf[0];
- if (to->sa_family != AF_INET && to->sa_family != AF_INET6) {
- return 0;
- }
- uint64_t egress_time = OSUtils::now();
- PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
-#if defined(_WIN32) || defined(_WIN64)
- int w = ::sendto(sws->sock,reinterpret_cast<const char *>(data),len,0,to,(to->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))
-#else
- int w = ::sendto(sws->sock,data,len,0,to,(to->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
-#endif
- if (w > 0) {
- link_test_records.push_back(new link_test_record(s, id, egress_time, len));
- }
- return w;
- }
-
- /**
- * Remove link speed test records which have timed-out and record a 0 bits/s measurement
- */
- inline void refresh_link_speed_records()
- {
- for(size_t i=0;i<link_test_records.size();i++) {
- if(OSUtils::now() - link_test_records[i]->egress_time > ZT_LINK_TEST_TIMEOUT) {
- PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(link_test_records[i]->s));
- if (sws) {
- sws->throughput = 0;
- }
- link_test_records.erase(link_test_records.begin() + i);
- }
- }
- }
-
- /**
- * Upon receipt of a link speed test datagram we echo back only the identification portion
- *
- * @param s Socket object
- * @param from Address of remote peer that sent this datagram
- * @param data Buffer containing datagram's contents
- * @param len Length of datagram
- * @return Number of bytes successfully written to socket in response
- */
- inline int respond_to_link_test(PhySocket *s,const struct sockaddr *from,void *data,unsigned long len) {
- PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
- uint64_t *id = (uint64_t*)data;
-#if defined(_WIN32) || defined(_WIN64)
- int w = ::sendto(sws->sock,reinterpret_cast<const char *>(id),sizeof(id[0]),0,from,(from->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
-#else
- int w = ::sendto(sws->sock,id,sizeof(id[0]),0,from,(from->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
-#endif
- return w;
- }
-
- /**
- * Upon receipt of a response to our original link test datagram, correlate this new datagram with the record
- * of the one we sent. Compute the transit time and update the throughput field of the relevant socket. This
- * value will later be read by the path quality estimation logic located in Path.hpp.
- *
- * @param s Socket object
- * @param from Address of remote peer that sent this datagram
- * @param data Buffer containing datagram contents (ID of original link test datagram)
- * @param len Length of datagram
- * @return true if datagram correponded to previous record, false if otherwise
- */
- inline bool handle_link_test_response(PhySocket *s,const struct sockaddr *from,void *data,unsigned long len) {
- uint64_t *id = (uint64_t*)data;
- for(size_t i=0;i<link_test_records.size();i++) {
- if(link_test_records[i]->id == id[0]) {
- float rtt = (OSUtils::now()-link_test_records[i]->egress_time) / (float)1000; // s
- uint32_t sz = (link_test_records[i]->length) * 8; // bits
- float transit_time = rtt / (float)2.0;
- uint64_t raw = (uint64_t)(sz / transit_time);
- PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
- if (sws) {
- sws->throughput = raw;
- }
- delete link_test_records[i];
- link_test_records.erase(link_test_records.begin() + i);
- return true;
- }
+ if (s) {
+ PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
+ return sws->type >= ZT_PHY_SOCKET_CLOSED && sws->type <= ZT_PHY_SOCKET_UNIX_LISTEN;
}
return false;
}
diff --git a/service/OneService.cpp b/service/OneService.cpp
index 051629bc..cf2a6eda 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -37,7 +37,6 @@
#include "../version.h"
#include "../include/ZeroTierOne.h"
-#include "../include/ZeroTierDebug.h"
#include "../node/Constants.hpp"
#include "../node/Mutex.hpp"
@@ -458,9 +457,6 @@ public:
// Last potential sleep/wake event
uint64_t _lastRestart;
- // Last time link throughput was tested
- uint64_t _lastLinkSpeedTest;
-
// Deadline for the next background task service function
volatile int64_t _nextBackgroundTaskDeadline;
@@ -881,26 +877,6 @@ public:
lastMultipathModeUpdate = now;
_node->setMultipathMode(_multipathMode);
}
- // Test link speeds
- // TODO: This logic should eventually find its way into the core or as part of a passive
- // measure within the protocol.
- if (_multipathMode && ((now - _lastLinkSpeedTest) >= ZT_LINK_SPEED_TEST_INTERVAL)) {
- _phy.refresh_link_speed_records();
- _lastLinkSpeedTest = now;
- // Generate random data to fill UDP packet
- uint64_t pktBuf[ZT_LINK_TEST_DATAGRAM_SZ / sizeof(uint64_t)];
- Utils::getSecureRandom(pktBuf, ZT_LINK_TEST_DATAGRAM_SZ);
- ZT_PeerList *pl = _node->peers();
- std::vector<PhySocket*> sockets = _binder.getBoundSockets();
- for (int i=0; i<ZT_BINDER_MAX_BINDINGS; i++) {
- for(size_t j=0;j<pl->peerCount;++j) {
- for (int k=0; k<(ZT_MAX_PEER_NETWORK_PATHS/4); k++) {
- Utils::getSecureRandom(pktBuf, 8); // generate one random integer for unique id
- _phy.test_link_speed(sockets[i], (struct sockaddr*)&(pl->peers[j].paths[k].address), pktBuf, ZT_LINK_TEST_DATAGRAM_SZ);
- }
- }
- }
- }
// Run background task processor in core if it's time to do so
int64_t dl = _nextBackgroundTaskDeadline;
@@ -1799,15 +1775,6 @@ public:
inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
{
- if (_multipathMode) {
- // Handle link test packets (should eventually be moved into the protocol itself)
- if (len == ZT_LINK_TEST_DATAGRAM_SZ) {
- _phy.respond_to_link_test(sock, from, data, len);
- }
- if (len == ZT_LINK_TEST_DATAGRAM_RESPONSE_SZ) {
- _phy.handle_link_test_response(sock, from, data, len);
- }
- }
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
_lastDirectReceiveFromGlobal = OSUtils::now();
const ZT_ResultCode rc = _node->processWirePacket(