summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
Diffstat (limited to 'node')
-rw-r--r--node/Capability.hpp6
-rw-r--r--node/CertificateOfMembership.hpp2
-rw-r--r--node/Constants.hpp190
-rw-r--r--node/Hashtable.hpp4
-rw-r--r--node/Identity.cpp4
-rw-r--r--node/IncomingPacket.cpp124
-rw-r--r--node/IncomingPacket.hpp2
-rw-r--r--node/InetAddress.cpp26
-rw-r--r--node/MulticastGroup.hpp2
-rw-r--r--node/Multicaster.cpp2
-rw-r--r--node/Network.cpp22
-rw-r--r--node/Network.hpp12
-rw-r--r--node/NetworkConfig.hpp2
-rw-r--r--node/Node.cpp18
-rw-r--r--node/Node.hpp5
-rw-r--r--node/OutboundMulticast.cpp3
-rw-r--r--node/Packet.hpp66
-rw-r--r--node/Path.hpp448
-rw-r--r--node/Peer.cpp430
-rw-r--r--node/Peer.hpp216
-rw-r--r--node/RingBuffer.hpp345
-rw-r--r--node/Salsa20.cpp2
-rw-r--r--node/SelfAwareness.cpp2
-rw-r--r--node/Switch.cpp283
-rw-r--r--node/Switch.hpp94
-rw-r--r--node/Tag.hpp2
-rw-r--r--node/Topology.cpp2
-rw-r--r--node/Topology.hpp2
-rw-r--r--node/Trace.cpp20
-rw-r--r--node/Trace.hpp6
-rw-r--r--node/Utils.hpp8
31 files changed, 2198 insertions, 152 deletions
diff --git a/node/Capability.hpp b/node/Capability.hpp
index 91a46566..775532d9 100644
--- a/node/Capability.hpp
+++ b/node/Capability.hpp
@@ -52,7 +52,7 @@ class RuntimeEnvironment;
* (1) Evaluates its capabilities in ascending order of ID to determine
* which capability allows it to transmit this packet.
* (2) If it has not done so lately, it then sends this capability to the
- * receving peer ("presents" it).
+ * receiving peer ("presents" it).
* (3) The sender then sends the packet.
*
* On the receiving side the receiver evaluates the capabilities presented
@@ -64,7 +64,7 @@ class RuntimeEnvironment;
*
* Capabilities support a chain of custody. This is currently unused but
* in the future would allow the publication of capabilities that can be
- * handed off between nodes. Limited transferrability of capabilities is
+ * handed off between nodes. Limited transferability of capabilities is
* a feature of true capability based security.
*/
class Capability : public Credential
@@ -81,7 +81,7 @@ public:
* @param id Capability ID
* @param nwid Network ID
* @param ts Timestamp (at controller)
- * @param mccl Maximum custody chain length (1 to create non-transferrable capability)
+ * @param mccl Maximum custody chain length (1 to create non-transferable capability)
* @param rules Network flow rules for this capability
* @param ruleCount Number of flow rules
*/
diff --git a/node/CertificateOfMembership.hpp b/node/CertificateOfMembership.hpp
index b5a90007..7fd38ad7 100644
--- a/node/CertificateOfMembership.hpp
+++ b/node/CertificateOfMembership.hpp
@@ -243,7 +243,7 @@ public:
* Compare two certificates for parameter agreement
*
* This compares this certificate with the other and returns true if all
- * paramters in this cert are present in the other and if they agree to
+ * parameters in this cert are present in the other and if they agree to
* within this cert's max delta value for each given parameter.
*
* Tuples present in other but not in this cert are ignored, but any
diff --git a/node/Constants.hpp b/node/Constants.hpp
index 03b04e68..73a00c3e 100644
--- a/node/Constants.hpp
+++ b/node/Constants.hpp
@@ -276,6 +276,189 @@
#define ZT_PING_CHECK_INVERVAL 5000
/**
+ * How often the local.conf file is checked for changes
+ */
+#define ZT_LOCAL_CONF_FILE_CHECK_INTERVAL 10000
+
+/**
+ * How frequently to check for changes to the system's network interfaces. When
+ * the service decides to use this constant it's because we want to react more
+ * quickly to new interfaces that pop up or go down.
+ */
+#define ZT_MULTIPATH_BINDER_REFRESH_PERIOD 5000
+
+/**
+ * Packets are only used for QoS/ACK statistical sampling if their packet ID is divisible by
+ * this integer. This is to provide a mechanism for both peers to agree on which packets need
+ * special treatment without having to exchange information. Changing this value would be
+ * a breaking change and would necessitate a protocol version upgrade. Since each incoming and
+ * outgoing packet ID is checked against this value its evaluation is of the form:
+ * (id & (divisor - 1)) == 0, thus the divisor must be a power of 2.
+ *
+ * This value is set at (16) so that given a normally-distributed RNG output we will sample
+ * 1/16th (or ~6.25%) of packets.
+ */
+#define ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR 0x10
+
+/**
+ * Time horizon for VERB_QOS_MEASUREMENT and VERB_ACK packet processing cutoff
+ */
+#define ZT_PATH_QOS_ACK_CUTOFF_TIME 30000
+
+/**
+ * Maximum number of VERB_QOS_MEASUREMENT and VERB_ACK packets allowed to be
+ * processed within cutoff time. Separate totals are kept for each type but
+ * the limit is the same for both.
+ *
+ * This limits how often this peer will compute statistical estimates
+ * of various QoS measures from a VERB_QOS_MEASUREMENT or VERB_ACK packets to
+ * CUTOFF_LIMIT times per CUTOFF_TIME milliseconds per peer to prevent
+ * this from being useful for DOS amplification attacks.
+ */
+#define ZT_PATH_QOS_ACK_CUTOFF_LIMIT 128
+
+/**
+ * Path choice history window size. This is used to keep track of which paths were
+ * previously selected so that we can maintain a target allocation over time.
+ */
+#define ZT_MULTIPATH_PROPORTION_WIN_SZ 128
+
+/**
+ * How often we will sample packet latency. Should be at least greater than ZT_PING_CHECK_INVERVAL
+ * since we will record a 0 bit/s measurement if no valid latency measurement was made within this
+ * window of time.
+ */
+#define ZT_PATH_LATENCY_SAMPLE_INTERVAL ZT_MULTIPATH_PEER_PING_PERIOD * 2
+
+/**
+ * Interval used for rate-limiting the computation of path quality estimates.
+ */
+#define ZT_PATH_QUALITY_COMPUTE_INTERVAL 1000
+
+/**
+ * Number of samples to consider when computing real-time path statistics
+ */
+#define ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ 128
+
+/**
+ * Number of samples to consider when computing performing long-term path quality analysis.
+ * By default this value is set to ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ but can
+ * be set to any value greater than that to observe longer-term path quality behavior.
+ */
+#define ZT_PATH_QUALITY_METRIC_WIN_SZ ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ
+
+/**
+ * Maximum acceptable Packet Delay Variance (PDV) over a path
+ */
+#define ZT_PATH_MAX_PDV 1000
+
+/**
+ * Maximum acceptable time interval between expectation and receipt of at least one ACK over a path
+ */
+#define ZT_PATH_MAX_AGE 30000
+
+/**
+ * Maximum acceptable mean latency over a path
+ */
+#define ZT_PATH_MAX_MEAN_LATENCY 1000
+
+/**
+ * How much each factor contributes to the "stability" score of a path
+ */
+#define ZT_PATH_CONTRIB_PDV 1.0 / 3.0
+#define ZT_PATH_CONTRIB_LATENCY 1.0 / 3.0
+#define ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE 1.0 / 3.0
+
+/**
+ * How much each factor contributes to the "quality" score of a path
+ */
+#define ZT_PATH_CONTRIB_STABILITY 0.75 / 3.0
+#define ZT_PATH_CONTRIB_THROUGHPUT 1.50 / 3.0
+#define ZT_PATH_CONTRIB_SCOPE 0.75 / 3.0
+
+/**
+ * How often a QoS packet is sent
+ */
+#define ZT_PATH_QOS_INTERVAL 3000
+
+/**
+ * Min and max acceptable sizes for a VERB_QOS_MEASUREMENT packet
+ */
+#define ZT_PATH_MIN_QOS_PACKET_SZ 8 + 1
+#define ZT_PATH_MAX_QOS_PACKET_SZ 1400
+
+/**
+ * How many ID:sojourn time pairs in a single QoS packet
+ */
+#define ZT_PATH_QOS_TABLE_SIZE (ZT_PATH_MAX_QOS_PACKET_SZ * 8) / (64 + 16)
+
+/**
+ * Maximum number of outgoing packets we monitor for QoS information
+ */
+#define ZT_PATH_MAX_OUTSTANDING_QOS_RECORDS 128
+
+/**
+ * Timeout for QoS records
+ */
+#define ZT_PATH_QOS_TIMEOUT ZT_PATH_QOS_INTERVAL * 2
+
+/**
+ * How often the service tests the path throughput
+ */
+#define ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL ZT_PATH_ACK_INTERVAL * 8
+
+/**
+ * Minimum amount of time between each ACK packet
+ */
+#define ZT_PATH_ACK_INTERVAL 1000
+
+/**
+ * How often an aggregate link statistics report is emitted into this tracing system
+ */
+#define ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL 60000
+
+/**
+ * How much an aggregate link's component paths can vary from their target allocation
+ * before the link is considered to be in a state of imbalance.
+ */
+#define ZT_PATH_IMBALANCE_THRESHOLD 0.20
+
+/**
+ * Max allowable time spent in any queue
+ */
+#define ZT_QOS_TARGET 5 // ms
+
+/**
+ * Time period where the time spent in the queue by a packet should fall below
+ * target at least once
+ */
+#define ZT_QOS_INTERVAL 100 // ms
+
+/**
+ * The number of bytes that each queue is allowed to send during each DRR cycle.
+ * This approximates a single-byte-based fairness queuing scheme
+ */
+#define ZT_QOS_QUANTUM ZT_DEFAULT_MTU
+
+/**
+ * The maximum total number of packets that can be queued among all
+ * active/inactive, old/new queues
+ */
+#define ZT_QOS_MAX_ENQUEUED_PACKETS 1024
+
+/**
+ * Number of QoS queues (buckets)
+ */
+#define ZT_QOS_NUM_BUCKETS 9
+
+/**
+ * All unspecified traffic is put in this bucket. Anything in a bucket with a smaller
+ * value is de-prioritized. Anything in a bucket with a higher value is prioritized over
+ * other traffic.
+ */
+#define ZT_QOS_DEFAULT_BUCKET 0
+
+/**
* How frequently to send heartbeats over in-use paths
*/
#define ZT_PATH_HEARTBEAT_PERIOD 14000
@@ -291,6 +474,13 @@
#define ZT_PEER_PING_PERIOD 60000
/**
+ * Delay between full-fledge pings of directly connected peers.
+ * With multipath bonding enabled ping peers more often to measure
+ * packet loss and latency.
+ */
+#define ZT_MULTIPATH_PEER_PING_PERIOD 5000
+
+/**
* Paths are considered expired if they have not sent us a real packet in this long
*/
#define ZT_PEER_PATH_EXPIRATION ((ZT_PEER_PING_PERIOD * 4) + 3000)
diff --git a/node/Hashtable.hpp b/node/Hashtable.hpp
index 777e88dc..46d5c007 100644
--- a/node/Hashtable.hpp
+++ b/node/Hashtable.hpp
@@ -399,6 +399,10 @@ private:
{
return ((unsigned long)i * (unsigned long)0x9e3779b1);
}
+ static inline unsigned long _hc(const int i)
+ {
+ return ((unsigned long)i * (unsigned long)0x9e3379b1);
+ }
inline void _grow()
{
diff --git a/node/Identity.cpp b/node/Identity.cpp
index 03f27083..1687746b 100644
--- a/node/Identity.cpp
+++ b/node/Identity.cpp
@@ -50,8 +50,8 @@ static inline void _computeMemoryHardHash(const void *publicKey,unsigned int pub
SHA512::hash(digest,publicKey,publicKeyBytes);
// Initialize genmem[] using Salsa20 in a CBC-like configuration since
- // ordinary Salsa20 is randomly seekable. This is good for a cipher
- // but is not what we want for sequential memory-harndess.
+ // ordinary Salsa20 is randomly seek-able. This is good for a cipher
+ // but is not what we want for sequential memory-hardness.
memset(genmem,0,ZT_IDENTITY_GEN_MEMORY);
Salsa20 s20(digest,(char *)digest + 32);
s20.crypt20((char *)genmem,(char *)genmem,64);
diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp
index ff4fc94b..6bb9734c 100644
--- a/node/IncomingPacket.cpp
+++ b/node/IncomingPacket.cpp
@@ -80,6 +80,7 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr)
if (!trusted) {
if (!dearmor(peer->key())) {
RR->t->incomingPacketMessageAuthenticationFailure(tPtr,_path,packetId(),sourceAddress,hops(),"invalid MAC");
+ _path->recordInvalidPacket();
return true;
}
}
@@ -93,9 +94,11 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr)
switch(v) {
//case Packet::VERB_NOP:
default: // ignore unknown verbs, but if they pass auth check they are "received"
- peer->received(tPtr,_path,hops(),packetId(),v,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),v,0,Packet::VERB_NOP,false,0);
return true;
case Packet::VERB_HELLO: return _doHELLO(RR,tPtr,true);
+ case Packet::VERB_ACK: return _doACK(RR,tPtr,peer);
+ case Packet::VERB_QOS_MEASUREMENT: return _doQOS_MEASUREMENT(RR,tPtr,peer);
case Packet::VERB_ERROR: return _doERROR(RR,tPtr,peer);
case Packet::VERB_OK: return _doOK(RR,tPtr,peer);
case Packet::VERB_WHOIS: return _doWHOIS(RR,tPtr,peer);
@@ -194,7 +197,57 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,void *tPtr,const Shar
default: break;
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_ERROR,inRePacketId,inReVerb,false,networkId);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_ERROR,inRePacketId,inReVerb,false,networkId);
+
+ return true;
+}
+
+bool IncomingPacket::_doACK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
+{
+ if (!peer->rateGateACK(RR->node->now()))
+ return true;
+ /* Dissect incoming ACK packet. From this we can estimate current throughput of the path, establish known
+ * maximums and detect packet loss. */
+ if (peer->localMultipathSupport()) {
+ int32_t ackedBytes;
+ if (payloadLength() != sizeof(ackedBytes)) {
+ return true; // ignore
+ }
+ memcpy(&ackedBytes, payload(), sizeof(ackedBytes));
+ _path->receivedAck(RR->node->now(), Utils::ntoh(ackedBytes));
+ peer->inferRemoteMultipathEnabled();
+ }
+
+ return true;
+}
+bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
+{
+ if (!peer->rateGateQoS(RR->node->now()))
+ return true;
+ /* Dissect incoming QoS packet. From this we can compute latency values and their variance.
+ * The latency variance is used as a measure of "jitter". */
+ if (peer->localMultipathSupport()) {
+ if (payloadLength() > ZT_PATH_MAX_QOS_PACKET_SZ || payloadLength() < ZT_PATH_MIN_QOS_PACKET_SZ) {
+ return true; // ignore
+ }
+ const int64_t now = RR->node->now();
+ uint64_t rx_id[ZT_PATH_QOS_TABLE_SIZE];
+ uint16_t rx_ts[ZT_PATH_QOS_TABLE_SIZE];
+ char *begin = (char *)payload();
+ char *ptr = begin;
+ int count = 0;
+ int len = payloadLength();
+ // Read packet IDs and latency compensation intervals for each packet tracked by this QoS packet
+ while (ptr < (begin + len) && (count < ZT_PATH_QOS_TABLE_SIZE)) {
+ memcpy((void*)&rx_id[count], ptr, sizeof(uint64_t));
+ ptr+=sizeof(uint64_t);
+ memcpy((void*)&rx_ts[count], ptr, sizeof(uint16_t));
+ ptr+=sizeof(uint16_t);
+ count++;
+ }
+ _path->receivedQoS(now, count, rx_id, rx_ts);
+ peer->inferRemoteMultipathEnabled();
+ }
return true;
}
@@ -395,7 +448,7 @@ bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR,void *tPtr,const bool
_path->send(RR,tPtr,outp.data(),outp.size(),now);
peer->setRemoteVersion(protoVersion,vMajor,vMinor,vRevision); // important for this to go first so received() knows the version
- peer->received(tPtr,_path,hops(),pid,Packet::VERB_HELLO,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),pid,payloadLength(),Packet::VERB_HELLO,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -445,8 +498,9 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedP
}
}
- if (!hops())
- _path->updateLatency((unsigned int)latency);
+ if (!hops() && (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE)) {
+ _path->updateLatency((unsigned int)latency, RR->node->now());
+ }
peer->setRemoteVersion(vProto,vMajor,vMinor,vRevision);
@@ -507,7 +561,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedP
default: break;
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_OK,inRePacketId,inReVerb,false,networkId);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_OK,inRePacketId,inReVerb,false,networkId);
return true;
}
@@ -542,7 +596,7 @@ bool IncomingPacket::_doWHOIS(const RuntimeEnvironment *RR,void *tPtr,const Shar
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_WHOIS,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_WHOIS,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -566,7 +620,7 @@ bool IncomingPacket::_doRENDEZVOUS(const RuntimeEnvironment *RR,void *tPtr,const
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_RENDEZVOUS,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_RENDEZVOUS,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -595,7 +649,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_FRAME,0,Packet::VERB_NOP,trustEstablished,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_FRAME,0,Packet::VERB_NOP,trustEstablished,nwid);
return true;
}
@@ -618,7 +672,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
if (!network->gate(tPtr,peer)) {
RR->t->incomingNetworkAccessDenied(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,true);
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
return true;
}
@@ -630,7 +684,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
const uint8_t *const frameData = (const uint8_t *)field(comLen + ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD,frameLen);
if ((!from)||(from == network->mac())) {
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
@@ -641,19 +695,19 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
network->learnBridgeRoute(from,peer->address());
} else {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,from,to,"bridging not allowed (remote)");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
} else if (to != network->mac()) {
if (to.isMulticast()) {
if (network->config().multicastLimit == 0) {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,from,to,"multicast disabled");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
} else if (!network->config().permitsBridging(RR->identity.address())) {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,from,to,"bridging not allowed (local)");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
}
@@ -673,9 +727,9 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid);
} else {
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
}
return true;
@@ -695,7 +749,7 @@ bool IncomingPacket::_doECHO(const RuntimeEnvironment *RR,void *tPtr,const Share
outp.armor(peer->key(),true);
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
- peer->received(tPtr,_path,hops(),pid,Packet::VERB_ECHO,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),pid,payloadLength(),Packet::VERB_ECHO,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -740,7 +794,7 @@ bool IncomingPacket::_doMULTICAST_LIKE(const RuntimeEnvironment *RR,void *tPtr,c
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_LIKE,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_LIKE,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
return true;
}
@@ -863,7 +917,7 @@ bool IncomingPacket::_doNETWORK_CREDENTIALS(const RuntimeEnvironment *RR,void *t
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_NETWORK_CREDENTIALS,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_NETWORK_CREDENTIALS,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
return true;
}
@@ -889,7 +943,7 @@ bool IncomingPacket::_doNETWORK_CONFIG_REQUEST(const RuntimeEnvironment *RR,void
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
}
- peer->received(tPtr,_path,hopCount,requestPacketId,Packet::VERB_NETWORK_CONFIG_REQUEST,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hopCount,requestPacketId,payloadLength(),Packet::VERB_NETWORK_CONFIG_REQUEST,0,Packet::VERB_NOP,false,nwid);
return true;
}
@@ -910,7 +964,7 @@ bool IncomingPacket::_doNETWORK_CONFIG(const RuntimeEnvironment *RR,void *tPtr,c
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_NETWORK_CONFIG,0,Packet::VERB_NOP,false,(network) ? network->id() : 0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_NETWORK_CONFIG,0,Packet::VERB_NOP,false,(network) ? network->id() : 0);
return true;
}
@@ -953,7 +1007,7 @@ bool IncomingPacket::_doMULTICAST_GATHER(const RuntimeEnvironment *RR,void *tPtr
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_GATHER,0,Packet::VERB_NOP,trustEstablished,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_GATHER,0,Packet::VERB_NOP,trustEstablished,nwid);
return true;
}
@@ -979,7 +1033,7 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
if (!network->gate(tPtr,peer)) {
RR->t->incomingNetworkAccessDenied(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_MULTICAST_FRAME,true);
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
return true;
}
@@ -1003,19 +1057,19 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
if (network->config().multicastLimit == 0) {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_MULTICAST_FRAME,from,to.mac(),"multicast disabled");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
return true;
}
if ((frameLen > 0)&&(frameLen <= ZT_MAX_MTU)) {
if (!to.mac().isMulticast()) {
RR->t->incomingPacketInvalid(tPtr,_path,packetId(),source(),hops(),Packet::VERB_MULTICAST_FRAME,"destination not multicast");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
if ((!from)||(from.isMulticast())||(from == network->mac())) {
RR->t->incomingPacketInvalid(tPtr,_path,packetId(),source(),hops(),Packet::VERB_MULTICAST_FRAME,"invalid source MAC");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
@@ -1029,7 +1083,7 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
network->learnBridgeRoute(from,peer->address());
} else {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_MULTICAST_FRAME,from,to.mac(),"bridging not allowed (remote)");
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
}
@@ -1052,10 +1106,10 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid);
} else {
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
}
return true;
@@ -1067,7 +1121,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
// First, subject this to a rate limit
if (!peer->rateGatePushDirectPaths(now)) {
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -1091,7 +1145,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
case 4: {
const InetAddress a(field(ptr,4),4,at<uint16_t>(ptr + 4));
if (
- ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
+ ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
(!( ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) == 0) && (peer->hasActivePathTo(now,a)) )) && // not already known
(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
{
@@ -1105,7 +1159,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
case 6: {
const InetAddress a(field(ptr,16),16,at<uint16_t>(ptr + 16));
if (
- ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
+ ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
(!( ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) == 0) && (peer->hasActivePathTo(now,a)) )) && // not already known
(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
{
@@ -1120,7 +1174,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
ptr += addrLen;
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -1136,7 +1190,7 @@ bool IncomingPacket::_doUSER_MESSAGE(const RuntimeEnvironment *RR,void *tPtr,con
RR->node->postEvent(tPtr,ZT_EVENT_USER_MESSAGE,reinterpret_cast<const void *>(&um));
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_USER_MESSAGE,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_USER_MESSAGE,0,Packet::VERB_NOP,false,0);
return true;
}
@@ -1160,7 +1214,7 @@ bool IncomingPacket::_doREMOTE_TRACE(const RuntimeEnvironment *RR,void *tPtr,con
}
}
- peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_REMOTE_TRACE,0,Packet::VERB_NOP,false,0);
+ peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_REMOTE_TRACE,0,Packet::VERB_NOP,false,0);
return true;
}
diff --git a/node/IncomingPacket.hpp b/node/IncomingPacket.hpp
index 88f4f066..9144277c 100644
--- a/node/IncomingPacket.hpp
+++ b/node/IncomingPacket.hpp
@@ -125,6 +125,8 @@ private:
// been authenticated, decrypted, decompressed, and classified.
bool _doERROR(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doHELLO(const RuntimeEnvironment *RR,void *tPtr,const bool alreadyAuthenticated);
+ bool _doACK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
+ bool _doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doWHOIS(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doRENDEZVOUS(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
diff --git a/node/InetAddress.cpp b/node/InetAddress.cpp
index 36b4e434..5d6ad07f 100644
--- a/node/InetAddress.cpp
+++ b/node/InetAddress.cpp
@@ -5,7 +5,7 @@
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
- * (at your oion) any later version.
+ * (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -62,23 +62,23 @@ InetAddress::IpScope InetAddress::ipScope() const
case 0x37: return IP_SCOPE_PSEUDOPRIVATE; // 55.0.0.0/8 (US DoD)
case 0x38: return IP_SCOPE_PSEUDOPRIVATE; // 56.0.0.0/8 (US Postal Service)
case 0x64:
- if ((ip & 0xffc00000) == 0x64400000) return IP_SCOPE_PRIVATE; // 100.64.0.0/10
+ if ((ip & 0xffc00000) == 0x64400000) return IP_SCOPE_PRIVATE; // 100.64.0.0/10
break;
case 0x7f: return IP_SCOPE_LOOPBACK; // 127.0.0.0/8
case 0xa9:
- if ((ip & 0xffff0000) == 0xa9fe0000) return IP_SCOPE_LINK_LOCAL; // 169.254.0.0/16
+ if ((ip & 0xffff0000) == 0xa9fe0000) return IP_SCOPE_LINK_LOCAL; // 169.254.0.0/16
break;
case 0xac:
- if ((ip & 0xfff00000) == 0xac100000) return IP_SCOPE_PRIVATE; // 172.16.0.0/12
+ if ((ip & 0xfff00000) == 0xac100000) return IP_SCOPE_PRIVATE; // 172.16.0.0/12
break;
case 0xc0:
- if ((ip & 0xffff0000) == 0xc0a80000) return IP_SCOPE_PRIVATE; // 192.168.0.0/16
+ if ((ip & 0xffff0000) == 0xc0a80000) return IP_SCOPE_PRIVATE; // 192.168.0.0/16
break;
case 0xff: return IP_SCOPE_NONE; // 255.0.0.0/8 (broadcast, or unused/unusable)
}
switch(ip >> 28) {
- case 0xe: return IP_SCOPE_MULTICAST; // 224.0.0.0/4
- case 0xf: return IP_SCOPE_PSEUDOPRIVATE; // 240.0.0.0/4 ("reserved," usually unusable)
+ case 0xe: return IP_SCOPE_MULTICAST; // 224.0.0.0/4
+ case 0xf: return IP_SCOPE_PSEUDOPRIVATE; // 240.0.0.0/4 ("reserved," usually unusable)
}
return IP_SCOPE_GLOBAL;
} break;
@@ -86,21 +86,21 @@ InetAddress::IpScope InetAddress::ipScope() const
case AF_INET6: {
const unsigned char *ip = reinterpret_cast<const unsigned char *>(reinterpret_cast<const struct sockaddr_in6 *>(this)->sin6_addr.s6_addr);
if ((ip[0] & 0xf0) == 0xf0) {
- if (ip[0] == 0xff) return IP_SCOPE_MULTICAST; // ff00::/8
+ if (ip[0] == 0xff) return IP_SCOPE_MULTICAST; // ff00::/8
if ((ip[0] == 0xfe)&&((ip[1] & 0xc0) == 0x80)) {
unsigned int k = 2;
while ((!ip[k])&&(k < 15)) ++k;
if ((k == 15)&&(ip[15] == 0x01))
- return IP_SCOPE_LOOPBACK; // fe80::1/128
- else return IP_SCOPE_LINK_LOCAL; // fe80::/10
+ return IP_SCOPE_LOOPBACK; // fe80::1/128
+ else return IP_SCOPE_LINK_LOCAL; // fe80::/10
}
- if ((ip[0] & 0xfe) == 0xfc) return IP_SCOPE_PRIVATE; // fc00::/7
+ if ((ip[0] & 0xfe) == 0xfc) return IP_SCOPE_PRIVATE; // fc00::/7
}
unsigned int k = 0;
while ((!ip[k])&&(k < 15)) ++k;
if (k == 15) { // all 0's except last byte
- if (ip[15] == 0x01) return IP_SCOPE_LOOPBACK; // ::1/128
- if (ip[15] == 0x00) return IP_SCOPE_NONE; // ::/128
+ if (ip[15] == 0x01) return IP_SCOPE_LOOPBACK; // ::1/128
+ if (ip[15] == 0x00) return IP_SCOPE_NONE; // ::/128
}
return IP_SCOPE_GLOBAL;
} break;
diff --git a/node/MulticastGroup.hpp b/node/MulticastGroup.hpp
index 0f4a621e..0a318136 100644
--- a/node/MulticastGroup.hpp
+++ b/node/MulticastGroup.hpp
@@ -68,7 +68,7 @@ public:
* Derive the multicast group used for address resolution (ARP/NDP) for an IP
*
* @param ip IP address (port field is ignored)
- * @return Multicat group for ARP/NDP
+ * @return Multicast group for ARP/NDP
*/
static inline MulticastGroup deriveMulticastGroupForAddressResolution(const InetAddress &ip)
{
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp
index 753e4ee0..b2a5a205 100644
--- a/node/Multicaster.cpp
+++ b/node/Multicaster.cpp
@@ -190,7 +190,7 @@ void Multicaster::send(
for(unsigned int i=0;i<multicastReplicatorCount;++i) {
const SharedPtr<Peer> p(RR->topology->getPeerNoCache(multicastReplicators[i]));
if ((p)&&(p->isAlive(now))) {
- const SharedPtr<Path> pp(p->getBestPath(now,false));
+ const SharedPtr<Path> pp(p->getAppropriatePath(now,false));
if ((pp)&&(pp->latency() < bestMulticastReplicatorLatency)) {
bestMulticastReplicatorLatency = pp->latency();
bestMulticastReplicatorPath = pp;
diff --git a/node/Network.cpp b/node/Network.cpp
index a75d9fd1..69a74281 100644
--- a/node/Network.cpp
+++ b/node/Network.cpp
@@ -29,6 +29,8 @@
#include <stdlib.h>
#include <math.h>
+#include "../include/ZeroTierDebug.h"
+
#include "Constants.hpp"
#include "../version.h"
#include "Network.hpp"
@@ -106,7 +108,8 @@ static _doZtFilterResult _doZtFilter(
const unsigned int ruleCount,
Address &cc, // MUTABLE -- set to TEE destination if TEE action is taken or left alone otherwise
unsigned int &ccLength, // MUTABLE -- set to length of packet payload to TEE
- bool &ccWatch) // MUTABLE -- set to true for WATCH target as opposed to normal TEE
+ bool &ccWatch, // MUTABLE -- set to true for WATCH target as opposed to normal TEE
+ uint8_t &qosBucket) // MUTABLE -- set to the value of the argument provided to PRIORITY
{
// Set to true if we are a TEE/REDIRECT/WATCH target
bool superAccept = false;
@@ -124,6 +127,10 @@ static _doZtFilterResult _doZtFilter(
if ((unsigned int)rt <= (unsigned int)ZT_NETWORK_RULE_ACTION__MAX_ID) {
if (thisSetMatches) {
switch(rt) {
+ case ZT_NETWORK_RULE_ACTION_PRIORITY:
+ qosBucket = (rules[rn].v.qosBucket >= 0 || rules[rn].v.qosBucket <= 8) ? rules[rn].v.qosBucket : 4; // 4 = default bucket (no priority)
+ return DOZTFILTER_ACCEPT;
+
case ZT_NETWORK_RULE_ACTION_DROP:
return DOZTFILTER_DROP;
@@ -621,7 +628,8 @@ bool Network::filterOutgoingPacket(
const uint8_t *frameData,
const unsigned int frameLen,
const unsigned int etherType,
- const unsigned int vlanId)
+ const unsigned int vlanId,
+ uint8_t &qosBucket)
{
const int64_t now = RR->node->now();
Address ztFinalDest(ztDest);
@@ -636,7 +644,7 @@ bool Network::filterOutgoingPacket(
Membership *const membership = (ztDest) ? _memberships.get(ztDest) : (Membership *)0;
- switch(_doZtFilter(RR,rrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch)) {
+ switch(_doZtFilter(RR,rrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch,qosBucket)) {
case DOZTFILTER_NO_MATCH: {
for(unsigned int c=0;c<_config.capabilityCount;++c) {
@@ -644,7 +652,7 @@ bool Network::filterOutgoingPacket(
Address cc2;
unsigned int ccLength2 = 0;
bool ccWatch2 = false;
- switch (_doZtFilter(RR,crrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.capabilities[c].rules(),_config.capabilities[c].ruleCount(),cc2,ccLength2,ccWatch2)) {
+ switch (_doZtFilter(RR,crrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.capabilities[c].rules(),_config.capabilities[c].ruleCount(),cc2,ccLength2,ccWatch2,qosBucket)) {
case DOZTFILTER_NO_MATCH:
case DOZTFILTER_DROP: // explicit DROP in a capability just terminates its evaluation and is an anti-pattern
break;
@@ -759,11 +767,13 @@ int Network::filterIncomingPacket(
bool ccWatch = false;
const Capability *c = (Capability *)0;
+ uint8_t qosBucket = 255; // For incoming packets this is a dummy value
+
Mutex::Lock _l(_lock);
Membership &membership = _membership(sourcePeer->address());
- switch (_doZtFilter(RR,rrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch)) {
+ switch (_doZtFilter(RR,rrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch,qosBucket)) {
case DOZTFILTER_NO_MATCH: {
Membership::CapabilityIterator mci(membership,_config);
@@ -772,7 +782,7 @@ int Network::filterIncomingPacket(
Address cc2;
unsigned int ccLength2 = 0;
bool ccWatch2 = false;
- switch(_doZtFilter(RR,crrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,c->rules(),c->ruleCount(),cc2,ccLength2,ccWatch2)) {
+ switch(_doZtFilter(RR,crrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,c->rules(),c->ruleCount(),cc2,ccLength2,ccWatch2,qosBucket)) {
case DOZTFILTER_NO_MATCH:
case DOZTFILTER_DROP: // explicit DROP in a capability just terminates its evaluation and is an anti-pattern
break;
diff --git a/node/Network.hpp b/node/Network.hpp
index 95b5483a..2baab511 100644
--- a/node/Network.hpp
+++ b/node/Network.hpp
@@ -132,7 +132,8 @@ public:
const uint8_t *frameData,
const unsigned int frameLen,
const unsigned int etherType,
- const unsigned int vlanId);
+ const unsigned int vlanId,
+ uint8_t &qosBucket);
/**
* Apply filters to an incoming packet
@@ -298,6 +299,13 @@ public:
void learnBridgeRoute(const MAC &mac,const Address &addr);
/**
+ * Whether QoS is in effect for this network
+ */
+ bool QoSEnabled() {
+ return false;
+ }
+
+ /**
* Learn a multicast group that is bridged to our tap device
*
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
@@ -438,6 +446,6 @@ private:
AtomicCounter __refCount;
};
-} // naemspace ZeroTier
+} // namespace ZeroTier
#endif
diff --git a/node/NetworkConfig.hpp b/node/NetworkConfig.hpp
index 44066c86..8900bda5 100644
--- a/node/NetworkConfig.hpp
+++ b/node/NetworkConfig.hpp
@@ -562,7 +562,7 @@ public:
char name[ZT_MAX_NETWORK_SHORT_NAME_LENGTH + 1];
/**
- * Certficiate of membership (for private networks)
+ * Certificate of membership (for private networks)
*/
CertificateOfMembership com;
};
diff --git a/node/Node.cpp b/node/Node.cpp
index db511430..576b2e4a 100644
--- a/node/Node.cpp
+++ b/node/Node.cpp
@@ -234,7 +234,7 @@ public:
}
if ((!contacted)&&(_bestCurrentUpstream)) {
- const SharedPtr<Path> up(_bestCurrentUpstream->getBestPath(_now,true));
+ const SharedPtr<Path> up(_bestCurrentUpstream->getAppropriatePath(_now,true));
if (up)
p->sendHELLO(_tPtr,up->localSocket(),up->address(),_now);
}
@@ -368,6 +368,7 @@ ZT_ResultCode Node::leave(uint64_t nwid,void **uptr,void *tptr)
{
Mutex::Lock _l(_networks_m);
SharedPtr<Network> *nw = _networks.get(nwid);
+ RR->sw->removeNetworkQoSControlBlock(nwid);
if (!nw)
return ZT_RESULT_OK;
if (uptr)
@@ -450,6 +451,7 @@ ZT_PeerList *Node::peers() const
for(std::vector< std::pair< Address,SharedPtr<Peer> > >::iterator pi(peers.begin());pi!=peers.end();++pi) {
ZT_Peer *p = &(pl->peers[pl->peerCount++]);
p->address = pi->second->address().toInt();
+ p->hadAggregateLink = 0;
if (pi->second->remoteVersionKnown()) {
p->versionMajor = pi->second->remoteVersionMajor();
p->versionMinor = pi->second->remoteVersionMinor();
@@ -465,7 +467,8 @@ ZT_PeerList *Node::peers() const
p->role = RR->topology->role(pi->second->identity().address());
std::vector< SharedPtr<Path> > paths(pi->second->paths(_now));
- SharedPtr<Path> bestp(pi->second->getBestPath(_now,false));
+ SharedPtr<Path> bestp(pi->second->getAppropriatePath(_now,false));
+ p->hadAggregateLink |= pi->second->hasAggregateLink();
p->pathCount = 0;
for(std::vector< SharedPtr<Path> >::iterator path(paths.begin());path!=paths.end();++path) {
ZT_FAST_MEMCPY(&(p->paths[p->pathCount].address),&((*path)->address()),sizeof(struct sockaddr_storage));
@@ -474,6 +477,17 @@ ZT_PeerList *Node::peers() const
p->paths[p->pathCount].trustedPathId = RR->topology->getOutboundPathTrust((*path)->address());
p->paths[p->pathCount].expired = 0;
p->paths[p->pathCount].preferred = ((*path) == bestp) ? 1 : 0;
+ p->paths[p->pathCount].latency = (*path)->latency();
+ p->paths[p->pathCount].packetDelayVariance = (*path)->packetDelayVariance();
+ p->paths[p->pathCount].throughputDisturbCoeff = (*path)->throughputDisturbanceCoefficient();
+ p->paths[p->pathCount].packetErrorRatio = (*path)->packetErrorRatio();
+ p->paths[p->pathCount].packetLossRatio = (*path)->packetLossRatio();
+ p->paths[p->pathCount].stability = (*path)->lastComputedStability();
+ p->paths[p->pathCount].throughput = (*path)->meanThroughput();
+ p->paths[p->pathCount].maxThroughput = (*path)->maxLifetimeThroughput();
+ p->paths[p->pathCount].allocation = (float)(*path)->allocation() / (float)255;
+ p->paths[p->pathCount].ifname = (*path)->getName();
+
++p->pathCount;
}
}
diff --git a/node/Node.hpp b/node/Node.hpp
index 79284b63..4ad0e3cd 100644
--- a/node/Node.hpp
+++ b/node/Node.hpp
@@ -260,6 +260,9 @@ public:
inline const Address &remoteTraceTarget() const { return _remoteTraceTarget; }
inline Trace::Level remoteTraceLevel() const { return _remoteTraceLevel; }
+ inline void setMultipathMode(uint8_t mode) { _multipathMode = mode; }
+ inline uint8_t getMultipathMode() { return _multipathMode; }
+
private:
RuntimeEnvironment _RR;
RuntimeEnvironment *RR;
@@ -284,6 +287,8 @@ private:
Address _remoteTraceTarget;
enum Trace::Level _remoteTraceLevel;
+ uint8_t _multipathMode;
+
volatile int64_t _now;
int64_t _lastPingCheck;
int64_t _lastHousekeepingRun;
diff --git a/node/OutboundMulticast.cpp b/node/OutboundMulticast.cpp
index d7a7b4d8..2391771f 100644
--- a/node/OutboundMulticast.cpp
+++ b/node/OutboundMulticast.cpp
@@ -85,7 +85,8 @@ void OutboundMulticast::sendOnly(const RuntimeEnvironment *RR,void *tPtr,const A
{
const SharedPtr<Network> nw(RR->node->network(_nwid));
const Address toAddr2(toAddr);
- if ((nw)&&(nw->filterOutgoingPacket(tPtr,true,RR->identity.address(),toAddr2,_macSrc,_macDest,_frameData,_frameLen,_etherType,0))) {
+ uint8_t QoSBucket = 255; // Dummy value
+ if ((nw)&&(nw->filterOutgoingPacket(tPtr,true,RR->identity.address(),toAddr2,_macSrc,_macDest,_frameData,_frameLen,_etherType,0,QoSBucket))) {
_packet.newInitializationVector();
_packet.setDestination(toAddr2);
RR->node->expectReplyTo(_packet.packetId());
diff --git a/node/Packet.hpp b/node/Packet.hpp
index 27da6fb5..8e82bd34 100644
--- a/node/Packet.hpp
+++ b/node/Packet.hpp
@@ -150,7 +150,7 @@
*
* In cryptography, a "break" means something different from what it means in
* common discussion. If a cipher is 256 bits strong and someone finds a way
- * to reduce key search to 254 bits, this constitues a "break" in the academic
+ * to reduce key search to 254 bits, this constitutes a "break" in the academic
* literature. 254 bits is still far beyond what can be leveraged to accomplish
* a "break" as most people would understand it -- the actual decryption and
* reading of traffic.
@@ -249,7 +249,7 @@
*/
#define ZT_PROTO_MIN_FRAGMENT_LENGTH ZT_PACKET_FRAGMENT_IDX_PAYLOAD
-// Field incides for parsing verbs -------------------------------------------
+// Field indices for parsing verbs -------------------------------------------
// Some verbs have variable-length fields. Those aren't fully defined here
// yet-- instead they are parsed using relative indexes in IncomingPacket.
@@ -419,7 +419,7 @@ public:
template<unsigned int C2>
Fragment(const Buffer<C2> &b) :
- Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
+ Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
{
}
@@ -734,7 +734,7 @@ public:
* Credentials can be for any number of networks.
*
* The use of a zero byte to terminate the COM section is for legacy
- * backward compatiblity. Newer fields are prefixed with a length.
+ * backward compatibility. Newer fields are prefixed with a length.
*
* OK/ERROR are not generated.
*/
@@ -751,7 +751,7 @@ public:
* This message requests network configuration from a node capable of
* providing it.
*
- * Respones to this are always whole configs intended for the recipient.
+ * Responses to this are always whole configs intended for the recipient.
* For patches and other updates a NETWORK_CONFIG is sent instead.
*
* It would be valid and correct as of 1.2.0 to use NETWORK_CONFIG always,
@@ -884,7 +884,7 @@ public:
* <[6] MAC address of multicast group>
* <[4] 32-bit ADI for multicast group>
* <[1] flags>
- * [<[...] network certficate of membership (DEPRECATED)>]
+ * [<[...] network certificate of membership (DEPRECATED)>]
* [<[...] implicit gather results if flag 0x01 is set>]
*
* OK flags (same bits as request flags):
@@ -930,7 +930,53 @@ public:
*/
VERB_PUSH_DIRECT_PATHS = 0x10,
- // 0x11, 0x12 -- deprecated
+ // 0x11 -- deprecated
+
+ /**
+ * An acknowledgment of receipt of a series of recent packets from another
+ * peer. This is used to calculate relative throughput values and to detect
+ * packet loss. Only VERB_FRAME and VERB_EXT_FRAME packets are counted.
+ *
+ * ACK response format:
+ * <[4] 32-bit number of bytes received since last ACK>
+ *
+ * Upon receipt of this packet, the local peer will verify that the correct
+ * number of bytes were received by the remote peer. If these values do
+ * not agree that could be an indicator of packet loss.
+ *
+ * Additionally, the local peer knows the interval of time that has
+ * elapsed since the last received ACK. With this information it can compute
+ * a rough estimate of the current throughput.
+ *
+ * This is sent at a maximum rate of once per every ZT_PATH_ACK_INTERVAL
+ */
+ VERB_ACK = 0x12,
+
+ /**
+ * A packet containing timing measurements useful for estimating path quality.
+ * Composed of a list of <packet ID:internal sojourn time> pairs for an
+ * arbitrary set of recent packets. This is used to sample for latency and
+ * packet delay variance (PDV, "jitter").
+ *
+ * QoS record format:
+ *
+ * <[8] 64-bit packet ID of previously-received packet>
+ * <[1] 8-bit packet sojourn time>
+ * <...repeat until end of max 1400 byte packet...>
+ *
+ * The number of possible records per QoS packet is: (1400 * 8) / 72 = 155
+ * This packet should be sent very rarely (every few seconds) as it can be
+ * somewhat large if the connection is saturated. Future versions might use
+ * a bloom table to probabilistically determine these values in a vastly
+ * more space-efficient manner.
+ *
+ * Note: The 'internal packet sojourn time' is a slight misnomer as it is a
+ * measure of the amount of time between when a packet was received and the
+ * egress time of its tracking QoS packet.
+ *
+ * This is sent at a maximum rate of once per every ZT_PATH_QOS_INTERVAL
+ */
+ VERB_QOS_MEASUREMENT = 0x13,
/**
* A message with arbitrary user-definable content:
@@ -954,7 +1000,7 @@ public:
*
* This message contains a remote trace event. Remote trace events can
* be sent to observers configured at the network level for those that
- * pertain directly to actiity on a network, or to global observers if
+ * pertain directly to activity on a network, or to global observers if
* locally configured.
*
* The instance ID is a random 64-bit value generated by each ZeroTier
@@ -999,7 +1045,7 @@ public:
template<unsigned int C2>
Packet(const Buffer<C2> &b) :
- Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
+ Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
{
}
@@ -1251,7 +1297,7 @@ public:
* Encrypt/decrypt a separately armored portion of a packet
*
* This is currently only used to mask portions of HELLO as an extra
- * security precation since most of that message is sent in the clear.
+ * security precaution since most of that message is sent in the clear.
*
* This must NEVER be used more than once in the same packet, as doing
* so will result in re-use of the same key stream.
diff --git a/node/Path.hpp b/node/Path.hpp
index e12328ff..cafff8cf 100644
--- a/node/Path.hpp
+++ b/node/Path.hpp
@@ -39,6 +39,10 @@
#include "SharedPtr.hpp"
#include "AtomicCounter.hpp"
#include "Utils.hpp"
+#include "RingBuffer.hpp"
+#include "Packet.hpp"
+
+#include "../osdep/Phy.hpp"
/**
* Maximum return value of preferenceRank()
@@ -55,6 +59,7 @@ class RuntimeEnvironment;
class Path
{
friend class SharedPtr<Path>;
+ Phy<Path *> *_phy;
public:
/**
@@ -93,22 +98,77 @@ public:
_lastOut(0),
_lastIn(0),
_lastTrustEstablishedPacketReceived(0),
+ _lastPathQualityComputeTime(0),
_localSocket(-1),
_latency(0xffff),
_addr(),
- _ipScope(InetAddress::IP_SCOPE_NONE)
+ _ipScope(InetAddress::IP_SCOPE_NONE),
+ _lastAck(0),
+ _lastThroughputEstimation(0),
+ _lastQoSMeasurement(0),
+ _lastQoSRecordPurge(0),
+ _unackedBytes(0),
+ _expectingAckAsOf(0),
+ _packetsReceivedSinceLastAck(0),
+ _packetsReceivedSinceLastQoS(0),
+ _maxLifetimeThroughput(0),
+ _lastComputedMeanThroughput(0),
+ _bytesAckedSinceLastThroughputEstimation(0),
+ _lastComputedMeanLatency(0.0),
+ _lastComputedPacketDelayVariance(0.0),
+ _lastComputedPacketErrorRatio(0.0),
+ _lastComputedPacketLossRatio(0),
+ _lastComputedStability(0.0),
+ _lastComputedRelativeQuality(0),
+ _lastComputedThroughputDistCoeff(0.0),
+ _lastAllocation(0)
{
+ prepareBuffers();
}
Path(const int64_t localSocket,const InetAddress &addr) :
_lastOut(0),
_lastIn(0),
_lastTrustEstablishedPacketReceived(0),
+ _lastPathQualityComputeTime(0),
_localSocket(localSocket),
_latency(0xffff),
_addr(addr),
- _ipScope(addr.ipScope())
+ _ipScope(addr.ipScope()),
+ _lastAck(0),
+ _lastThroughputEstimation(0),
+ _lastQoSMeasurement(0),
+ _lastQoSRecordPurge(0),
+ _unackedBytes(0),
+ _expectingAckAsOf(0),
+ _packetsReceivedSinceLastAck(0),
+ _packetsReceivedSinceLastQoS(0),
+ _maxLifetimeThroughput(0),
+ _lastComputedMeanThroughput(0),
+ _bytesAckedSinceLastThroughputEstimation(0),
+ _lastComputedMeanLatency(0.0),
+ _lastComputedPacketDelayVariance(0.0),
+ _lastComputedPacketErrorRatio(0.0),
+ _lastComputedPacketLossRatio(0),
+ _lastComputedStability(0.0),
+ _lastComputedRelativeQuality(0),
+ _lastComputedThroughputDistCoeff(0.0),
+ _lastAllocation(0)
{
+ prepareBuffers();
+ _phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, 16);
+ }
+
+ ~Path()
+ {
+ delete _throughputSamples;
+ delete _latencySamples;
+ delete _packetValiditySamples;
+ delete _throughputDisturbanceSamples;
+ _throughputSamples = NULL;
+ _latencySamples = NULL;
+ _packetValiditySamples = NULL;
+ _throughputDisturbanceSamples = NULL;
}
/**
@@ -147,12 +207,16 @@ public:
*
* @param l Measured latency
*/
- inline void updateLatency(const unsigned int l)
+ inline void updateLatency(const unsigned int l, int64_t now)
{
unsigned int pl = _latency;
- if (pl < 0xffff)
+ if (pl < 0xffff) {
_latency = (pl + l) / 2;
- else _latency = l;
+ }
+ else {
+ _latency = l;
+ }
+ _latencySamples->push(l);
}
/**
@@ -241,6 +305,324 @@ public:
}
/**
+ * Record statistics on outgoing packets. Used later to estimate QoS metrics.
+ *
+ * @param now Current time
+ * @param packetId ID of packet
+ * @param payloadLength Length of payload
+ * @param verb Packet verb
+ */
+ inline void recordOutgoingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
+ {
+ Mutex::Lock _l(_statistics_m);
+ if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) {
+ if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) {
+ _unackedBytes += payloadLength;
+ // Take note that we're expecting a VERB_ACK on this path as of a specific time
+ _expectingAckAsOf = ackAge(now) > ZT_PATH_ACK_INTERVAL ? _expectingAckAsOf : now;
+ if (_outQoSRecords.size() < ZT_PATH_MAX_OUTSTANDING_QOS_RECORDS) {
+ _outQoSRecords[packetId] = now;
+ }
+ }
+ }
+ }
+
+ /**
+ * Record statistics on incoming packets. Used later to estimate QoS metrics.
+ *
+ * @param now Current time
+ * @param packetId ID of packet
+ * @param payloadLength Length of payload
+ * @param verb Packet verb
+ */
+ inline void recordIncomingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
+ {
+ Mutex::Lock _l(_statistics_m);
+ if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) {
+ if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) {
+ _inACKRecords[packetId] = payloadLength;
+ _packetsReceivedSinceLastAck++;
+ _inQoSRecords[packetId] = now;
+ _packetsReceivedSinceLastQoS++;
+ }
+ _packetValiditySamples->push(true);
+ }
+ }
+
+ /**
+ * Record that we've received a VERB_ACK on this path, also compute throughput if required.
+ *
+ * @param now Current time
+ * @param ackedBytes Number of bytes acknowledged by other peer
+ */
+ inline void receivedAck(int64_t now, int32_t ackedBytes)
+ {
+ _expectingAckAsOf = 0;
+ _unackedBytes = (ackedBytes > _unackedBytes) ? 0 : _unackedBytes - ackedBytes;
+ int64_t timeSinceThroughputEstimate = (now - _lastThroughputEstimation);
+ if (timeSinceThroughputEstimate >= ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL) {
+ uint64_t throughput = (float)(_bytesAckedSinceLastThroughputEstimation * 8) / ((float)timeSinceThroughputEstimate / (float)1000);
+ _throughputSamples->push(throughput);
+ _maxLifetimeThroughput = throughput > _maxLifetimeThroughput ? throughput : _maxLifetimeThroughput;
+ _lastThroughputEstimation = now;
+ _bytesAckedSinceLastThroughputEstimation = 0;
+ } else {
+ _bytesAckedSinceLastThroughputEstimation += ackedBytes;
+ }
+ }
+
+ /**
+ * @return Number of bytes this peer is responsible for ACKing since last ACK
+ */
+ inline int32_t bytesToAck()
+ {
+ Mutex::Lock _l(_statistics_m);
+ int32_t bytesToAck = 0;
+ std::map<uint64_t,uint16_t>::iterator it = _inACKRecords.begin();
+ while (it != _inACKRecords.end()) {
+ bytesToAck += it->second;
+ it++;
+ }
+ return bytesToAck;
+ }
+
+ /**
+ * @return Number of bytes thus far sent that have not been acknowledged by the remote peer
+ */
+ inline int64_t unackedSentBytes()
+ {
+ return _unackedBytes;
+ }
+
+ /**
+ * Account for the fact that an ACK was just sent. Reset counters, timers, and clear statistics buffers
+ *
+ * @param Current time
+ */
+ inline void sentAck(int64_t now)
+ {
+ Mutex::Lock _l(_statistics_m);
+ _inACKRecords.clear();
+ _packetsReceivedSinceLastAck = 0;
+ _lastAck = now;
+ }
+
+ /**
+ * Receive QoS data, match with recorded egress times from this peer, compute latency
+ * estimates.
+ *
+ * @param now Current time
+ * @param count Number of records
+ * @param rx_id table of packet IDs
+ * @param rx_ts table of holding times
+ */
+ inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint16_t *rx_ts)
+ {
+ Mutex::Lock _l(_statistics_m);
+ // Look up egress times and compute latency values for each record
+ std::map<uint64_t,uint64_t>::iterator it;
+ for (int j=0; j<count; j++) {
+ it = _outQoSRecords.find(rx_id[j]);
+ if (it != _outQoSRecords.end()) {
+ uint16_t rtt = (uint16_t)(now - it->second);
+ uint16_t rtt_compensated = rtt - rx_ts[j];
+ float latency = rtt_compensated / 2.0;
+ updateLatency(latency, now);
+ _outQoSRecords.erase(it);
+ }
+ }
+ }
+
+ /**
+ * Generate the contents of a VERB_QOS_MEASUREMENT packet.
+ *
+ * @param now Current time
+ * @param qosBuffer destination buffer
+ * @return Size of payload
+ */
+ inline int32_t generateQoSPacket(int64_t now, char *qosBuffer)
+ {
+ Mutex::Lock _l(_statistics_m);
+ int32_t len = 0;
+ std::map<uint64_t,uint64_t>::iterator it = _inQoSRecords.begin();
+ int i=0;
+ while (i<_packetsReceivedSinceLastQoS && it != _inQoSRecords.end()) {
+ uint64_t id = it->first;
+ memcpy(qosBuffer, &id, sizeof(uint64_t));
+ qosBuffer+=sizeof(uint64_t);
+ uint16_t holdingTime = (now - it->second);
+ memcpy(qosBuffer, &holdingTime, sizeof(uint16_t));
+ qosBuffer+=sizeof(uint16_t);
+ len+=sizeof(uint64_t)+sizeof(uint16_t);
+ _inQoSRecords.erase(it++);
+ i++;
+ }
+ return len;
+ }
+
+ /**
+ * Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers.
+ *
+ * @param Current time
+ */
+ inline void sentQoS(int64_t now) {
+ _packetsReceivedSinceLastQoS = 0;
+ _lastQoSMeasurement = now;
+ }
+
+ /**
+ * @param now Current time
+ * @return Whether an ACK (VERB_ACK) packet needs to be emitted at this time
+ */
+ inline bool needsToSendAck(int64_t now) {
+ return ((now - _lastAck) >= ZT_PATH_ACK_INTERVAL ||
+ (_packetsReceivedSinceLastAck == ZT_PATH_QOS_TABLE_SIZE)) && _packetsReceivedSinceLastAck;
+ }
+
+ /**
+ * @param now Current time
+ * @return Whether a QoS (VERB_QOS_MEASUREMENT) packet needs to be emitted at this time
+ */
+ inline bool needsToSendQoS(int64_t now) {
+ return ((_packetsReceivedSinceLastQoS >= ZT_PATH_QOS_TABLE_SIZE) ||
+ ((now - _lastQoSMeasurement) > ZT_PATH_QOS_INTERVAL)) && _packetsReceivedSinceLastQoS;
+ }
+
+ /**
+ * How much time has elapsed since we've been expecting a VERB_ACK on this path. This value
+ * is used to determine a more relevant path "age". This lets us penalize paths which are no
+ * longer ACKing, but not those that simple aren't being used to carry traffic at the
+ * current time.
+ */
+ inline int64_t ackAge(int64_t now) { return _expectingAckAsOf ? now - _expectingAckAsOf : 0; }
+
+ /**
+ * The maximum observed throughput (in bits/s) for this path
+ */
+ inline uint64_t maxLifetimeThroughput() { return _maxLifetimeThroughput; }
+
+ /**
+ * @return The mean throughput (in bits/s) of this link
+ */
+ inline uint64_t meanThroughput() { return _lastComputedMeanThroughput; }
+
+ /**
+ * Assign a new relative quality value for this path in the aggregate link
+ *
+ * @param rq Quality of this path in comparison to other paths available to this peer
+ */
+ inline void updateRelativeQuality(float rq) { _lastComputedRelativeQuality = rq; }
+
+ /**
+ * @return Quality of this path compared to others in the aggregate link
+ */
+ inline float relativeQuality() { return _lastComputedRelativeQuality; }
+
+ /**
+ * Assign a new allocation value for this path in the aggregate link
+ *
+ * @param allocation Percentage of traffic to be sent over this path to a peer
+ */
+ inline void updateComponentAllocationOfAggregateLink(unsigned char allocation) { _lastAllocation = allocation; }
+
+ /**
+ * @return Percentage of traffic allocated to this path in the aggregate link
+ */
+ inline unsigned char allocation() { return _lastAllocation; }
+
+ /**
+ * @return Stability estimates can become expensive to compute, we cache the most recent result.
+ */
+ inline float lastComputedStability() { return _lastComputedStability; }
+
+ /**
+ * @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to
+ */
+ inline char *getName() { return _ifname; }
+
+ /**
+ * @return Packet delay variance
+ */
+ inline float packetDelayVariance() { return _lastComputedPacketDelayVariance; }
+
+ /**
+ * @return Previously-computed mean latency
+ */
+ inline float meanLatency() { return _lastComputedMeanLatency; }
+
+ /**
+ * @return Packet loss rate (PLR)
+ */
+ inline float packetLossRatio() { return _lastComputedPacketLossRatio; }
+
+ /**
+ * @return Packet error ratio (PER)
+ */
+ inline float packetErrorRatio() { return _lastComputedPacketErrorRatio; }
+
+ /**
+ * Record an invalid incoming packet. This packet failed MAC/compression/cipher checks and will now
+ * contribute to a Packet Error Ratio (PER).
+ */
+ inline void recordInvalidPacket() { _packetValiditySamples->push(false); }
+
+ /**
+ * @return A pointer to a cached copy of the address string for this Path (For debugging only)
+ */
+ inline char *getAddressString() { return _addrString; }
+
+ /**
+ * @return The current throughput disturbance coefficient
+ */
+ inline float throughputDisturbanceCoefficient() { return _lastComputedThroughputDistCoeff; }
+
+ /**
+ * Compute and cache stability and performance metrics. The resultant stability coefficient is a measure of how "well behaved"
+ * this path is. This figure is substantially different from (but required for the estimation of the path's overall "quality".
+ *
+ * @param now Current time
+ */
+ inline void processBackgroundPathMeasurements(int64_t now) {
+ if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
+ Mutex::Lock _l(_statistics_m);
+ _lastPathQualityComputeTime = now;
+ address().toString(_addrString);
+ _lastComputedMeanLatency = _latencySamples->mean();
+ _lastComputedPacketDelayVariance = _latencySamples->stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689)
+ _lastComputedMeanThroughput = (uint64_t)_throughputSamples->mean();
+ // If no packet validity samples, assume PER==0
+ _lastComputedPacketErrorRatio = 1 - (_packetValiditySamples->count() ? _packetValiditySamples->mean() : 1);
+ // Compute path stability
+ // Normalize measurements with wildly different ranges into a reasonable range
+ float normalized_pdv = Utils::normalize(_lastComputedPacketDelayVariance, 0, ZT_PATH_MAX_PDV, 0, 10);
+ float normalized_la = Utils::normalize(_lastComputedMeanLatency, 0, ZT_PATH_MAX_MEAN_LATENCY, 0, 10);
+ float throughput_cv = _throughputSamples->mean() > 0 ? _throughputSamples->stddev() / _throughputSamples->mean() : 1;
+ // Form an exponential cutoff and apply contribution weights
+ float pdv_contrib = exp((-1)*normalized_pdv) * ZT_PATH_CONTRIB_PDV;
+ float latency_contrib = exp((-1)*normalized_la) * ZT_PATH_CONTRIB_LATENCY;
+ // Throughput Disturbance Coefficient
+ float throughput_disturbance_contrib = exp((-1)*throughput_cv) * ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE;
+ _throughputDisturbanceSamples->push(throughput_cv);
+ _lastComputedThroughputDistCoeff = _throughputDisturbanceSamples->mean();
+ // Obey user-defined ignored contributions
+ pdv_contrib = ZT_PATH_CONTRIB_PDV > 0.0 ? pdv_contrib : 1;
+ latency_contrib = ZT_PATH_CONTRIB_LATENCY > 0.0 ? latency_contrib : 1;
+ throughput_disturbance_contrib = ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE > 0.0 ? throughput_disturbance_contrib : 1;
+ // Stability
+ _lastComputedStability = pdv_contrib + latency_contrib + throughput_disturbance_contrib;
+ _lastComputedStability *= 1 - _lastComputedPacketErrorRatio;
+ // Prevent QoS records from sticking around for too long
+ std::map<uint64_t,uint64_t>::iterator it = _outQoSRecords.begin();
+ while (it != _outQoSRecords.end()) {
+ // Time since egress of tracked packet
+ if ((now - it->second) >= ZT_PATH_QOS_TIMEOUT) {
+ _outQoSRecords.erase(it++);
+ } else { it++; }
+ }
+ }
+ }
+
+ /**
* @return True if this path is alive (receiving heartbeats)
*/
inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); }
@@ -265,15 +647,71 @@ public:
*/
inline int64_t lastTrustEstablishedPacketReceived() const { return _lastTrustEstablishedPacketReceived; }
+ /**
+ * Initialize statistical buffers
+ */
+ inline void prepareBuffers() {
+ _throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ _latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ _packetValiditySamples = new RingBuffer<bool>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ _throughputDisturbanceSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ memset(_ifname, 0, 16);
+ memset(_addrString, 0, sizeof(_addrString));
+ }
+
private:
+ Mutex _statistics_m;
+
volatile int64_t _lastOut;
volatile int64_t _lastIn;
volatile int64_t _lastTrustEstablishedPacketReceived;
+ volatile int64_t _lastPathQualityComputeTime;
int64_t _localSocket;
volatile unsigned int _latency;
InetAddress _addr;
InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often
AtomicCounter __refCount;
+
+ std::map<uint64_t, uint64_t> _outQoSRecords; // id:egress_time
+ std::map<uint64_t, uint64_t> _inQoSRecords; // id:now
+ std::map<uint64_t, uint16_t> _inACKRecords; // id:len
+
+ int64_t _lastAck;
+ int64_t _lastThroughputEstimation;
+ int64_t _lastQoSMeasurement;
+ int64_t _lastQoSRecordPurge;
+
+ int64_t _unackedBytes;
+ int64_t _expectingAckAsOf;
+ int16_t _packetsReceivedSinceLastAck;
+ int16_t _packetsReceivedSinceLastQoS;
+
+ uint64_t _maxLifetimeThroughput;
+ uint64_t _lastComputedMeanThroughput;
+ uint64_t _bytesAckedSinceLastThroughputEstimation;
+
+ float _lastComputedMeanLatency;
+ float _lastComputedPacketDelayVariance;
+
+ float _lastComputedPacketErrorRatio;
+ float _lastComputedPacketLossRatio;
+
+ // cached estimates
+ float _lastComputedStability;
+ float _lastComputedRelativeQuality;
+ float _lastComputedThroughputDistCoeff;
+ unsigned char _lastAllocation;
+
+
+
+ // cached human-readable strings for tracing purposes
+ char _ifname[16];
+ char _addrString[256];
+
+ RingBuffer<uint64_t> *_throughputSamples;
+ RingBuffer<uint32_t> *_latencySamples;
+ RingBuffer<bool> *_packetValiditySamples;
+ RingBuffer<float> *_throughputDisturbanceSamples;
};
} // namespace ZeroTier
diff --git a/node/Peer.cpp b/node/Peer.cpp
index 71afd852..21bbfabe 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -25,7 +25,6 @@
*/
#include "../version.h"
-
#include "Constants.hpp"
#include "Peer.hpp"
#include "Node.hpp"
@@ -35,6 +34,8 @@
#include "Packet.hpp"
#include "Trace.hpp"
#include "InetAddress.hpp"
+#include "RingBuffer.hpp"
+#include "Utils.hpp"
namespace ZeroTier {
@@ -53,16 +54,31 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
_lastCredentialsReceived(0),
_lastTrustEstablishedPacketReceived(0),
_lastSentFullHello(0),
+ _lastACKWindowReset(0),
+ _lastQoSWindowReset(0),
+ _lastMultipathCompatibilityCheck(0),
+ _freeRandomByte(0),
+ _uniqueAlivePathCount(0),
+ _localMultipathSupported(false),
+ _remoteMultipathSupported(false),
+ _canUseMultipath(false),
_vProto(0),
_vMajor(0),
_vMinor(0),
_vRevision(0),
_id(peerIdentity),
_directPathPushCutoffCount(0),
- _credentialsCutoffCount(0)
+ _credentialsCutoffCount(0),
+ _linkIsBalanced(false),
+ _linkIsRedundant(false),
+ _remotePeerMultipathEnabled(false),
+ _lastAggregateStatsReport(0),
+ _lastAggregateAllocation(0)
{
+ Utils::getSecureRandom(&_freeRandomByte, 1);
if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH))
throw ZT_EXCEPTION_INVALID_ARGUMENT;
+ _pathChoiceHist = new RingBuffer<int>(ZT_MULTIPATH_PROPORTION_WIN_SZ);
}
void Peer::received(
@@ -70,6 +86,7 @@ void Peer::received(
const SharedPtr<Path> &path,
const unsigned int hops,
const uint64_t packetId,
+ const unsigned int payloadLength,
const Packet::Verb verb,
const uint64_t inRePacketId,
const Packet::Verb inReVerb,
@@ -95,9 +112,25 @@ void Peer::received(
path->trustedPacketReceived(now);
}
+ {
+ Mutex::Lock _l(_paths_m);
+
+ recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now);
+
+ if (_canUseMultipath) {
+ if (path->needsToSendQoS(now)) {
+ sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now);
+ }
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ _paths[i].p->processBackgroundPathMeasurements(now);
+ }
+ }
+ }
+ }
+
if (hops == 0) {
// If this is a direct packet (no hops), update existing paths or learn new ones
-
bool havePath = false;
{
Mutex::Lock _l(_paths_m);
@@ -116,20 +149,26 @@ void Peer::received(
if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) {
Mutex::Lock _l(_paths_m);
- // Paths are redunant if they duplicate an alive path to the same IP or
+ // Paths are redundant if they duplicate an alive path to the same IP or
// with the same local socket and address family.
bool redundant = false;
+ unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) {
+ if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) {
redundant = true;
break;
}
+ // If the path is the same address and port, simply assume this is a replacement
+ if ( (_paths[i].p->address().ipsEqual2(path->address()) && (_paths[i].p->address().port() == path->address().port()))) {
+ replacePath = i;
+ break;
+ }
} else break;
}
-
- if (!redundant) {
- unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS;
+ // If the path isn't a duplicate of the same localSocket AND we haven't already determined a replacePath,
+ // then find the worst path and replace it.
+ if (!redundant && replacePath == ZT_MAX_PEER_NETWORK_PATHS) {
int replacePathQuality = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
@@ -143,16 +182,15 @@ void Peer::received(
break;
}
}
-
- if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) {
- if (verb == Packet::VERB_OK) {
- RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId);
- _paths[replacePath].lr = now;
- _paths[replacePath].p = path;
- _paths[replacePath].priority = 1;
- } else {
- attemptToContact = true;
- }
+ }
+ if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) {
+ if (verb == Packet::VERB_OK) {
+ RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId);
+ _paths[replacePath].lr = now;
+ _paths[replacePath].p = path;
+ _paths[replacePath].priority = 1;
+ } else {
+ attemptToContact = true;
}
}
}
@@ -232,29 +270,265 @@ void Peer::received(
}
}
-SharedPtr<Path> Peer::getBestPath(int64_t now,bool includeExpired) const
+void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
+ uint16_t payloadLength, const Packet::Verb verb, int64_t now)
{
- Mutex::Lock _l(_paths_m);
+ // Grab second byte from packetId to use as a source of entropy in the next path selection
+ _freeRandomByte = (packetId & 0xFF00) >> 8;
+ if (_canUseMultipath) {
+ path->recordOutgoingPacket(now, packetId, payloadLength, verb);
+ }
+}
+void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId,
+ uint16_t payloadLength, const Packet::Verb verb, int64_t now)
+{
+ if (_canUseMultipath) {
+ if (path->needsToSendAck(now)) {
+ sendACK(tPtr, path, path->localSocket(), path->address(), now);
+ }
+ path->recordIncomingPacket(now, packetId, payloadLength, verb);
+ }
+}
+
+void Peer::computeAggregateProportionalAllocation(int64_t now)
+{
+ float maxStability = 0;
+ float totalRelativeQuality = 0;
+ float maxThroughput = 1;
+ float maxScope = 0;
+ float relStability[ZT_MAX_PEER_NETWORK_PATHS];
+ float relThroughput[ZT_MAX_PEER_NETWORK_PATHS];
+ memset(&relStability, 0, sizeof(relStability));
+ memset(&relThroughput, 0, sizeof(relThroughput));
+ // Survey all paths
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ relStability[i] = _paths[i].p->lastComputedStability();
+ relThroughput[i] = _paths[i].p->maxLifetimeThroughput();
+ maxStability = relStability[i] > maxStability ? relStability[i] : maxStability;
+ maxThroughput = relThroughput[i] > maxThroughput ? relThroughput[i] : maxThroughput;
+ maxScope = _paths[i].p->ipScope() > maxScope ? _paths[i].p->ipScope() : maxScope;
+ }
+ }
+ // Convert to relative values
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ relStability[i] /= maxStability ? maxStability : 1;
+ relThroughput[i] /= maxThroughput ? maxThroughput : 1;
+ float normalized_ma = Utils::normalize(_paths[i].p->ackAge(now), 0, ZT_PATH_MAX_AGE, 0, 10);
+ float age_contrib = exp((-1)*normalized_ma);
+ float relScope = ((float)(_paths[i].p->ipScope()+1) / (maxScope + 1));
+ float relQuality =
+ (relStability[i] * ZT_PATH_CONTRIB_STABILITY)
+ + (fmax(1, relThroughput[i]) * ZT_PATH_CONTRIB_THROUGHPUT)
+ + relScope * ZT_PATH_CONTRIB_SCOPE;
+ relQuality *= age_contrib;
+ // Arbitrary cutoffs
+ relQuality = relQuality > (1.00 / 100.0) ? relQuality : 0.0;
+ relQuality = relQuality < (99.0 / 100.0) ? relQuality : 1.0;
+ totalRelativeQuality += relQuality;
+ _paths[i].p->updateRelativeQuality(relQuality);
+ }
+ }
+ // Convert set of relative performances into an allocation set
+ for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ _paths[i].p->updateComponentAllocationOfAggregateLink((_paths[i].p->relativeQuality() / totalRelativeQuality) * 255);
+ }
+ }
+}
+
+int Peer::computeAggregateLinkPacketDelayVariance()
+{
+ float pdv = 0.0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ pdv += _paths[i].p->relativeQuality() * _paths[i].p->packetDelayVariance();
+ }
+ }
+ return pdv;
+}
+
+int Peer::computeAggregateLinkMeanLatency()
+{
+ int ml = 0;
+ int pathCount = 0;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ pathCount++;
+ ml += _paths[i].p->relativeQuality() * _paths[i].p->meanLatency();
+ }
+ }
+ return ml / pathCount;
+}
+
+int Peer::aggregateLinkPhysicalPathCount()
+{
+ std::map<std::string, bool> ifnamemap;
+ int pathCount = 0;
+ int64_t now = RR->node->now();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ if (!ifnamemap[_paths[i].p->getName()]) {
+ ifnamemap[_paths[i].p->getName()] = true;
+ pathCount++;
+ }
+ }
+ }
+ return pathCount;
+}
+
+int Peer::aggregateLinkLogicalPathCount()
+{
+ int pathCount = 0;
+ int64_t now = RR->node->now();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ pathCount++;
+ }
+ }
+ return pathCount;
+}
+
+SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
+{
+ Mutex::Lock _l(_paths_m);
unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS;
- long bestPathQuality = 2147483647;
+
+ /**
+ * Send traffic across the highest quality path only. This algorithm will still
+ * use the old path quality metric from protocol version 9.
+ */
+ if (!_canUseMultipath) {
+ long bestPathQuality = 2147483647;
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
+ const long q = _paths[i].p->quality(now) / _paths[i].priority;
+ if (q <= bestPathQuality) {
+ bestPathQuality = q;
+ bestPath = i;
+ }
+ }
+ } else break;
+ }
+ if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) {
+ return _paths[bestPath].p;
+ }
+ return SharedPtr<Path>();
+ }
+
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
- if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
- const long q = _paths[i].p->quality(now) / _paths[i].priority;
- if (q <= bestPathQuality) {
- bestPathQuality = q;
- bestPath = i;
+ _paths[i].p->processBackgroundPathMeasurements(now);
+ }
+ }
+
+ /**
+ * Randomly distribute traffic across all paths
+ */
+ int numAlivePaths = 0;
+ int numStalePaths = 0;
+ if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) {
+ int alivePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ int stalePaths[ZT_MAX_PEER_NETWORK_PATHS];
+ memset(&alivePaths, -1, sizeof(alivePaths));
+ memset(&stalePaths, -1, sizeof(stalePaths));
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if (_paths[i].p->alive(now)) {
+ alivePaths[numAlivePaths] = i;
+ numAlivePaths++;
+ }
+ else {
+ stalePaths[numStalePaths] = i;
+ numStalePaths++;
}
}
- } else break;
+ }
+ unsigned int r = _freeRandomByte;
+ if (numAlivePaths > 0) {
+ int rf = r % numAlivePaths;
+ return _paths[alivePaths[rf]].p;
+ }
+ else if(numStalePaths > 0) {
+ // Resort to trying any non-expired path
+ int rf = r % numStalePaths;
+ return _paths[stalePaths[rf]].p;
+ }
}
- if (bestPath != ZT_MAX_PEER_NETWORK_PATHS)
- return _paths[bestPath].p;
+ /**
+ * Proportionally allocate traffic according to dynamic path quality measurements
+ */
+ if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) {
+ if ((now - _lastAggregateAllocation) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
+ _lastAggregateAllocation = now;
+ computeAggregateProportionalAllocation(now);
+ }
+ // Randomly choose path according to their allocations
+ float rf = _freeRandomByte;
+ for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ if (rf < _paths[i].p->allocation()) {
+ bestPath = i;
+ _pathChoiceHist->push(bestPath); // Record which path we chose
+ break;
+ }
+ rf -= _paths[i].p->allocation();
+ }
+ }
+ if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) {
+ return _paths[bestPath].p;
+ }
+ }
return SharedPtr<Path>();
}
+char *Peer::interfaceListStr()
+{
+ std::map<std::string, int> ifnamemap;
+ char tmp[32];
+ const int64_t now = RR->node->now();
+ char *ptr = _interfaceListStr;
+ bool imbalanced = false;
+ memset(_interfaceListStr, 0, sizeof(_interfaceListStr));
+ int alivePathCount = aggregateLinkLogicalPathCount();
+ for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p && _paths[i].p->alive(now)) {
+ int ipv = _paths[i].p->address().isV4();
+ // If this is acting as an aggregate link, check allocations
+ float targetAllocation = 1.0 / alivePathCount;
+ float currentAllocation = 1.0;
+ if (alivePathCount > 1) {
+ currentAllocation = (float)_pathChoiceHist->countValue(i) / (float)_pathChoiceHist->count();
+ if (fabs(targetAllocation - currentAllocation) > ZT_PATH_IMBALANCE_THRESHOLD) {
+ imbalanced = true;
+ }
+ }
+ char *ipvStr = ipv ? (char*)"ipv4" : (char*)"ipv6";
+ sprintf(tmp, "(%s, %s, %.3f)", _paths[i].p->getName(), ipvStr, currentAllocation);
+ // Prevent duplicates
+ if(ifnamemap[_paths[i].p->getName()] != ipv) {
+ memcpy(ptr, tmp, strlen(tmp));
+ ptr += strlen(tmp);
+ *ptr = ' ';
+ ptr++;
+ ifnamemap[_paths[i].p->getName()] = ipv;
+ }
+ }
+ }
+ ptr--; // Overwrite trailing space
+ if (imbalanced) {
+ sprintf(tmp, ", is asymmetrical");
+ memcpy(ptr, tmp, sizeof(tmp));
+ } else {
+ *ptr = '\0';
+ }
+ return _interfaceListStr;
+}
+
void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &other) const
{
unsigned int myBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
@@ -376,6 +650,71 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o
}
}
+inline void Peer::processBackgroundPeerTasks(int64_t now)
+{
+ // Determine current multipath compatibility with other peer
+ if ((now - _lastMultipathCompatibilityCheck) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
+ // Cache number of available paths so that we can short-circuit multipath logic elsewhere
+ //
+ // We also take notice of duplicate paths (same IP only) because we may have
+ // recently received a direct path push from a peer and our list might contain
+ // a dead path which hasn't been fully recognized as such. In this case we
+ // don't want the duplicate to trigger execution of multipath code prematurely.
+ //
+ // This is done to support the behavior of auto multipath enable/disable
+ // without user intervention.
+ int currAlivePathCount = 0;
+ int duplicatePathsFound = 0;
+ for (unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+ if (_paths[i].p) {
+ currAlivePathCount++;
+ for (unsigned int j=0;j<ZT_MAX_PEER_NETWORK_PATHS;++j) {
+ if (_paths[i].p && _paths[j].p && _paths[i].p->address().ipsEqual2(_paths[j].p->address()) && i != j) {
+ duplicatePathsFound+=1;
+ break;
+ }
+ }
+ }
+ }
+ _uniqueAlivePathCount = (currAlivePathCount - (duplicatePathsFound / 2));
+ _lastMultipathCompatibilityCheck = now;
+ _localMultipathSupported = ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9));
+ _remoteMultipathSupported = _vProto > 9;
+ // If both peers support multipath and more than one path exist, we can use multipath logic
+ _canUseMultipath = _localMultipathSupported && _remoteMultipathSupported && (_uniqueAlivePathCount > 1);
+ }
+}
+
+void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
+{
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK);
+ uint32_t bytesToAck = path->bytesToAck();
+ outp.append<uint32_t>(bytesToAck);
+ if (atAddress) {
+ outp.armor(_key,false);
+ RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
+ } else {
+ RR->sw->send(tPtr,outp,false);
+ }
+ path->sentAck(now);
+}
+
+void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
+{
+ const int64_t _now = RR->node->now();
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT);
+ char qosData[ZT_PATH_MAX_QOS_PACKET_SZ];
+ int16_t len = path->generateQoSPacket(_now,qosData);
+ outp.append(qosData,len);
+ if (atAddress) {
+ outp.armor(_key,false);
+ RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
+ } else {
+ RR->sw->send(tPtr,outp,false);
+ }
+ path->sentQoS(now);
+}
+
void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
{
Packet outp(_id.address(),RR->identity.address(),Packet::VERB_HELLO);
@@ -444,12 +783,30 @@ void Peer::tryMemorizedPath(void *tPtr,int64_t now)
unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
{
unsigned int sent = 0;
-
Mutex::Lock _l(_paths_m);
const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
_lastSentFullHello = now;
+ processBackgroundPeerTasks(now);
+
+ // Emit traces regarding aggregate link status
+ if (_canUseMultipath) {
+ int alivePathCount = aggregateLinkPhysicalPathCount();
+ if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) {
+ _lastAggregateStatsReport = now;
+ if (alivePathCount) {
+ RR->t->peerLinkAggregateStatistics(NULL,*this);
+ }
+ } if (alivePathCount < 2 && _linkIsRedundant) {
+ _linkIsRedundant = !_linkIsRedundant;
+ RR->t->peerLinkNoLongerRedundant(NULL,*this);
+ } if (alivePathCount > 1 && !_linkIsRedundant) {
+ _linkIsRedundant = !_linkIsRedundant;
+ RR->t->peerLinkNowRedundant(NULL,*this);
+ }
+ }
+
// Right now we only keep pinging links that have the maximum priority. The
// priority is used to track cluster redirections, meaning that when a cluster
// redirects us its redirect target links override all other links and we
@@ -477,13 +834,14 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
}
} else break;
}
- while(j < ZT_MAX_PEER_NETWORK_PATHS) {
- _paths[j].lr = 0;
- _paths[j].p.zero();
- _paths[j].priority = 1;
- ++j;
+ if (canUseMultipath()) {
+ while(j < ZT_MAX_PEER_NETWORK_PATHS) {
+ _paths[j].lr = 0;
+ _paths[j].p.zero();
+ _paths[j].priority = 1;
+ ++j;
+ }
}
-
return sent;
}
diff --git a/node/Peer.hpp b/node/Peer.hpp
index b6f3c695..a32eaad0 100644
--- a/node/Peer.hpp
+++ b/node/Peer.hpp
@@ -27,18 +27,13 @@
#ifndef ZT_PEER_HPP
#define ZT_PEER_HPP
-#include <stdint.h>
-
-#include "Constants.hpp"
-
-#include <algorithm>
-#include <utility>
#include <vector>
-#include <stdexcept>
#include "../include/ZeroTierOne.h"
+#include "Constants.hpp"
#include "RuntimeEnvironment.hpp"
+#include "Node.hpp"
#include "Path.hpp"
#include "Address.hpp"
#include "Utils.hpp"
@@ -65,7 +60,11 @@ private:
Peer() {} // disabled to prevent bugs -- should not be constructed uninitialized
public:
- ~Peer() { Utils::burn(_key,sizeof(_key)); }
+ ~Peer() {
+ Utils::burn(_key,sizeof(_key));
+ delete _pathChoiceHist;
+ _pathChoiceHist = NULL;
+ }
/**
* Construct a new peer
@@ -108,6 +107,7 @@ public:
const SharedPtr<Path> &path,
const unsigned int hops,
const uint64_t packetId,
+ const unsigned int payloadLength,
const Packet::Verb verb,
const uint64_t inRePacketId,
const Packet::Verb inReVerb,
@@ -145,20 +145,95 @@ public:
*/
inline bool sendDirect(void *tPtr,const void *data,unsigned int len,int64_t now,bool force)
{
- SharedPtr<Path> bp(getBestPath(now,force));
+ SharedPtr<Path> bp(getAppropriatePath(now,force));
if (bp)
return bp->send(RR,tPtr,data,len,now);
return false;
}
/**
- * Get the best current direct path
+ * Record statistics on outgoing packets
+ *
+ * @param path Path over which packet was sent
+ * @param id Packet ID
+ * @param len Length of packet payload
+ * @param verb Packet verb
+ * @param now Current time
+ */
+ void recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now);
+
+ /**
+ * Record statistics on incoming packets
+ *
+ * @param path Path over which packet was sent
+ * @param id Packet ID
+ * @param len Length of packet payload
+ * @param verb Packet verb
+ * @param now Current time
+ */
+ void recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now);
+
+ /**
+ * Send an ACK to peer for the most recent packets received
+ *
+ * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
+ * @param localSocket Raw socket the ACK packet will be sent over
+ * @param atAddress Destination for the ACK packet
+ * @param now Current time
+ */
+ void sendACK(void *tPtr, const SharedPtr<Path> &path, const int64_t localSocket,const InetAddress &atAddress,int64_t now);
+
+ /**
+ * Send a QoS packet to peer so that it can evaluate the quality of this link
+ *
+ * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
+ * @param localSocket Raw socket the QoS packet will be sent over
+ * @param atAddress Destination for the QoS packet
+ * @param now Current time
+ */
+ void sendQOS_MEASUREMENT(void *tPtr, const SharedPtr<Path> &path, const int64_t localSocket,const InetAddress &atAddress,int64_t now);
+
+ /**
+ * Compute relative quality values and allocations for the components of the aggregate link
+ *
+ * @param now Current time
+ */
+ void computeAggregateProportionalAllocation(int64_t now);
+
+ /**
+ * @return The aggregate link Packet Delay Variance (PDV)
+ */
+ int computeAggregateLinkPacketDelayVariance();
+
+ /**
+ * @return The aggregate link mean latency
+ */
+ int computeAggregateLinkMeanLatency();
+
+ /**
+ * @return The number of currently alive "physical" paths in the aggregate link
+ */
+ int aggregateLinkPhysicalPathCount();
+
+ /**
+ * @return The number of currently alive "logical" paths in the aggregate link
+ */
+ int aggregateLinkLogicalPathCount();
+
+ /**
+ * Get the most appropriate direct path based on current multipath and QoS configuration
*
* @param now Current time
* @param includeExpired If true, include even expired paths
* @return Best current path or NULL if none
*/
- SharedPtr<Path> getBestPath(int64_t now,bool includeExpired) const;
+ SharedPtr<Path> getAppropriatePath(int64_t now, bool includeExpired);
+
+ /**
+ * Generate a human-readable string of interface names making up the aggregate link, also include
+ * moving allocation and IP version number for each (for tracing)
+ */
+ char *interfaceListStr();
/**
* Send VERB_RENDEZVOUS to this and another peer via the best common IP scope and path
@@ -213,6 +288,16 @@ public:
unsigned int doPingAndKeepalive(void *tPtr,int64_t now);
/**
+ * Clear paths whose localSocket(s) are in a CLOSED state or have an otherwise INVALID state.
+ * This should be called frequently so that we can detect and remove unproductive or invalid paths.
+ *
+ * Under the hood this is done periodically based on ZT_CLOSED_PATH_PRUNING_INTERVAL.
+ *
+ * @return Number of paths that were pruned this round
+ */
+ unsigned int prunePaths();
+
+ /**
* Process a cluster redirect sent by this peer
*
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
@@ -268,14 +353,18 @@ public:
inline int64_t isActive(int64_t now) const { return ((now - _lastNontrivialReceive) < ZT_PEER_ACTIVITY_TIMEOUT); }
/**
- * @return Latency in milliseconds of best path or 0xffff if unknown / no paths
+ * @return Latency in milliseconds of best/aggregate path or 0xffff if unknown / no paths
*/
- inline unsigned int latency(const int64_t now) const
+ inline unsigned int latency(const int64_t now)
{
- SharedPtr<Path> bp(getBestPath(now,false));
- if (bp)
- return bp->latency();
- return 0xffff;
+ if (_canUseMultipath) {
+ return (int)computeAggregateLinkMeanLatency();
+ } else {
+ SharedPtr<Path> bp(getAppropriatePath(now,false));
+ if (bp)
+ return bp->latency();
+ return 0xffff;
+ }
}
/**
@@ -289,7 +378,7 @@ public:
*
* @return Relay quality score computed from latency and other factors, lower is better
*/
- inline unsigned int relayQuality(const int64_t now) const
+ inline unsigned int relayQuality(const int64_t now)
{
const uint64_t tsr = now - _lastReceive;
if (tsr >= ZT_PEER_ACTIVITY_TIMEOUT)
@@ -329,6 +418,37 @@ public:
inline bool remoteVersionKnown() const { return ((_vMajor > 0)||(_vMinor > 0)||(_vRevision > 0)); }
/**
+ * Periodically update known multipath activation constraints. This is done so that we know when and when
+ * not to use multipath logic. Doing this once every few seconds is sufficient.
+ *
+ * @param now Current time
+ */
+ inline void processBackgroundPeerTasks(int64_t now);
+
+ /**
+ * Record that the remote peer does have multipath enabled. As is evident by the receipt of a VERB_ACK
+ * or a VERB_QOS_MEASUREMENT packet at some point in the past. Until this flag is set, the local client
+ * shall assume that multipath is not enabled and should only use classical Protocol 9 logic.
+ */
+ inline void inferRemoteMultipathEnabled() { _remotePeerMultipathEnabled = true; }
+
+ /**
+ * @return Whether the local client supports and is configured to use multipath
+ */
+ inline bool localMultipathSupport() { return _localMultipathSupported; }
+
+ /**
+ * @return Whether the remote peer supports and is configured to use multipath
+ */
+ inline bool remoteMultipathSupport() { return _remoteMultipathSupported; }
+
+ /**
+ * @return Whether this client can use multipath to communicate with this peer. True if both peers are using
+ * the correct protocol and if both peers have multipath enabled. False if otherwise.
+ */
+ inline bool canUseMultipath() { return _canUseMultipath; }
+
+ /**
* @return True if peer has received a trust established packet (e.g. common network membership) in the past ZT_TRUST_EXPIRATION ms
*/
inline bool trustEstablished(const int64_t now) const { return ((now - _lastTrustEstablishedPacketReceived) < ZT_TRUST_EXPIRATION); }
@@ -418,6 +538,41 @@ public:
}
/**
+ * Rate limit gate for VERB_ACK
+ */
+ inline bool rateGateACK(const int64_t now)
+ {
+ if ((now - _lastACKWindowReset) >= ZT_PATH_QOS_ACK_CUTOFF_TIME) {
+ _lastACKWindowReset = now;
+ _ACKCutoffCount = 0;
+ } else {
+ ++_ACKCutoffCount;
+ }
+ return (_ACKCutoffCount < ZT_PATH_QOS_ACK_CUTOFF_LIMIT);
+ }
+
+ /**
+ * Rate limit gate for VERB_QOS_MEASUREMENT
+ */
+ inline bool rateGateQoS(const int64_t now)
+ {
+ if ((now - _lastQoSWindowReset) >= ZT_PATH_QOS_ACK_CUTOFF_TIME) {
+ _lastQoSWindowReset = now;
+ _QoSCutoffCount = 0;
+ } else {
+ ++_QoSCutoffCount;
+ }
+ return (_QoSCutoffCount < ZT_PATH_QOS_ACK_CUTOFF_LIMIT);
+ }
+
+ /**
+ * @return Whether this peer is reachable via an aggregate link
+ */
+ inline bool hasAggregateLink() {
+ return _localMultipathSupported && _remoteMultipathSupported && _remotePeerMultipathEnabled;
+ }
+
+ /**
* Serialize a peer for storage in local cache
*
* This does not serialize everything, just non-ephemeral information.
@@ -515,6 +670,18 @@ private:
int64_t _lastCredentialsReceived;
int64_t _lastTrustEstablishedPacketReceived;
int64_t _lastSentFullHello;
+ int64_t _lastPathPrune;
+ int64_t _lastACKWindowReset;
+ int64_t _lastQoSWindowReset;
+ int64_t _lastMultipathCompatibilityCheck;
+
+ unsigned char _freeRandomByte;
+
+ int _uniqueAlivePathCount;
+
+ bool _localMultipathSupported;
+ bool _remoteMultipathSupported;
+ bool _canUseMultipath;
uint16_t _vProto;
uint16_t _vMajor;
@@ -528,8 +695,21 @@ private:
unsigned int _directPathPushCutoffCount;
unsigned int _credentialsCutoffCount;
+ unsigned int _QoSCutoffCount;
+ unsigned int _ACKCutoffCount;
AtomicCounter __refCount;
+
+ RingBuffer<int> *_pathChoiceHist;
+
+ bool _linkIsBalanced;
+ bool _linkIsRedundant;
+ bool _remotePeerMultipathEnabled;
+
+ int64_t _lastAggregateStatsReport;
+ int64_t _lastAggregateAllocation;
+
+ char _interfaceListStr[256]; // 16 characters * 16 paths in a link
};
} // namespace ZeroTier
diff --git a/node/RingBuffer.hpp b/node/RingBuffer.hpp
new file mode 100644
index 00000000..e8d0d238
--- /dev/null
+++ b/node/RingBuffer.hpp
@@ -0,0 +1,345 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2018 ZeroTier, Inc. https://www.zerotier.com/
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * You can be released from the requirements of the license by purchasing
+ * a commercial license. Buying such a license is mandatory as soon as you
+ * develop commercial closed-source software that incorporates or links
+ * directly against ZeroTier software without disclosing the source code
+ * of your own application.
+ */
+
+#ifndef ZT_RINGBUFFER_H
+#define ZT_RINGBUFFER_H
+
+#include <typeinfo>
+#include <cstdint>
+#include <stdlib.h>
+#include <memory.h>
+#include <algorithm>
+#include <math.h>
+
+namespace ZeroTier {
+
+/**
+ * A circular buffer
+ *
+ * For fast handling of continuously-evolving variables (such as path quality metrics).
+ * Using this, we can maintain longer sliding historical windows for important path
+ * metrics without the need for potentially expensive calls to memcpy/memmove.
+ *
+ * Some basic statistical functionality is implemented here in an attempt
+ * to reduce the complexity of code needed to interact with this type of buffer.
+ */
+
+template <class T>
+class RingBuffer
+{
+private:
+ T * buf;
+ size_t size;
+ size_t begin;
+ size_t end;
+ bool wrap;
+
+public:
+
+ /**
+ * create a RingBuffer with space for up to size elements.
+ */
+ explicit RingBuffer(size_t size)
+ : size(size),
+ begin(0),
+ end(0),
+ wrap(false)
+ {
+ buf = new T[size];
+ memset(buf, 0, sizeof(T) * size);
+ }
+
+ /**
+ * @return A pointer to the underlying buffer
+ */
+ T* get_buf()
+ {
+ return buf + begin;
+ }
+
+ /**
+ * Adjust buffer index pointer as if we copied data in
+ * @param n Number of elements to copy in
+ * @return Number of elements we copied in
+ */
+ size_t produce(size_t n)
+ {
+ n = std::min(n, getFree());
+ if (n == 0) {
+ return n;
+ }
+ const size_t first_chunk = std::min(n, size - end);
+ end = (end + first_chunk) % size;
+ if (first_chunk < n) {
+ const size_t second_chunk = n - first_chunk;
+ end = (end + second_chunk) % size;
+ }
+ if (begin == end) {
+ wrap = true;
+ }
+ return n;
+ }
+
+ /**
+ * Fast erase, O(1).
+ * Merely reset the buffer pointer, doesn't erase contents
+ */
+ void reset()
+ {
+ consume(count());
+ }
+
+ /**
+ * adjust buffer index pointer as if we copied data out
+ * @param n Number of elements we copied from the buffer
+ * @return Number of elements actually available from the buffer
+ */
+ size_t consume(size_t n)
+ {
+ n = std::min(n, count());
+ if (n == 0) {
+ return n;
+ }
+ if (wrap) {
+ wrap = false;
+ }
+ const size_t first_chunk = std::min(n, size - begin);
+ begin = (begin + first_chunk) % size;
+ if (first_chunk < n) {
+ const size_t second_chunk = n - first_chunk;
+ begin = (begin + second_chunk) % size;
+ }
+ return n;
+ }
+
+ /**
+ * @param data Buffer that is to be written to the ring
+ * @param n Number of elements to write to the buffer
+ */
+ size_t write(const T * data, size_t n)
+ {
+ n = std::min(n, getFree());
+ if (n == 0) {
+ return n;
+ }
+ const size_t first_chunk = std::min(n, size - end);
+ memcpy(buf + end, data, first_chunk * sizeof(T));
+ end = (end + first_chunk) % size;
+ if (first_chunk < n) {
+ const size_t second_chunk = n - first_chunk;
+ memcpy(buf + end, data + first_chunk, second_chunk * sizeof(T));
+ end = (end + second_chunk) % size;
+ }
+ if (begin == end) {
+ wrap = true;
+ }
+ return n;
+ }
+
+ /**
+ * Place a single value on the buffer. If the buffer is full, consume a value first.
+ *
+ * @param value A single value to be placed in the buffer
+ */
+ void push(const T value)
+ {
+ if (count() == size) {
+ consume(1);
+ }
+ const size_t first_chunk = std::min((size_t)1, size - end);
+ *(buf + end) = value;
+ end = (end + first_chunk) % size;
+ if (begin == end) {
+ wrap = true;
+ }
+ }
+
+ /**
+ * @return The most recently pushed element on the buffer
+ */
+ T get_most_recent() { return *(buf + end); }
+
+ /**
+ * @param dest Destination buffer
+ * @param n Size (in terms of number of elements) of the destination buffer
+ * @return Number of elements read from the buffer
+ */
+ size_t read(T * dest, size_t n)
+ {
+ n = std::min(n, count());
+ if (n == 0) {
+ return n;
+ }
+ if (wrap) {
+ wrap = false;
+ }
+ const size_t first_chunk = std::min(n, size - begin);
+ memcpy(dest, buf + begin, first_chunk * sizeof(T));
+ begin = (begin + first_chunk) % size;
+ if (first_chunk < n) {
+ const size_t second_chunk = n - first_chunk;
+ memcpy(dest + first_chunk, buf + begin, second_chunk * sizeof(T));
+ begin = (begin + second_chunk) % size;
+ }
+ return n;
+ }
+
+ /**
+ * Return how many elements are in the buffer, O(1).
+ *
+ * @return The number of elements in the buffer
+ */
+ size_t count()
+ {
+ if (end == begin) {
+ return wrap ? size : 0;
+ }
+ else if (end > begin) {
+ return end - begin;
+ }
+ else {
+ return size + end - begin;
+ }
+ }
+
+ /**
+ * @return The number of slots that are unused in the buffer
+ */
+ size_t getFree() { return size - count(); }
+
+ /**
+ * @return The arithmetic mean of the contents of the buffer
+ */
+ float mean()
+ {
+ size_t iterator = begin;
+ float subtotal = 0;
+ size_t curr_cnt = count();
+ for (size_t i=0; i<curr_cnt; i++) {
+ iterator = (iterator + size - 1) % curr_cnt;
+ subtotal += (float)*(buf + iterator);
+ }
+ return curr_cnt ? subtotal / (float)curr_cnt : 0;
+ }
+
+ /**
+ * @return The arithmetic mean of the most recent 'n' elements of the buffer
+ */
+ float mean(size_t n)
+ {
+ n = n < size ? n : size;
+ size_t iterator = begin;
+ float subtotal = 0;
+ size_t curr_cnt = count();
+ for (size_t i=0; i<n; i++) {
+ iterator = (iterator + size - 1) % curr_cnt;
+ subtotal += (float)*(buf + iterator);
+ }
+ return curr_cnt ? subtotal / (float)curr_cnt : 0;
+ }
+
+ /**
+ * @return The sample standard deviation of element values
+ */
+ float stddev() { return sqrt(variance()); }
+
+ /**
+ * @return The variance of element values
+ */
+ float variance()
+ {
+ size_t iterator = begin;
+ float cached_mean = mean();
+ size_t curr_cnt = count();
+ if (size) {
+ T sum_of_squared_deviations = 0;
+ for (size_t i=0; i<curr_cnt; i++) {
+ iterator = (iterator + size - 1) % curr_cnt;
+ float deviation = (buf[i] - cached_mean);
+ sum_of_squared_deviations += (deviation*deviation);
+ }
+ float variance = (float)sum_of_squared_deviations / (float)(size - 1);
+ return variance;
+ }
+ return 0;
+ }
+
+ /**
+ * @return The number of elements of zero value
+ */
+ size_t zeroCount()
+ {
+ size_t iterator = begin;
+ size_t zeros = 0;
+ size_t curr_cnt = count();
+ for (size_t i=0; i<curr_cnt; i++) {
+ iterator = (iterator + size - 1) % curr_cnt;
+ if (*(buf + iterator) == 0) {
+ zeros++;
+ }
+ }
+ return zeros;
+ }
+
+ /**
+ * @param value Value to match against in buffer
+ * @return The number of values held in the ring buffer which match a given value
+ */
+ size_t countValue(T value)
+ {
+ size_t iterator = begin;
+ size_t cnt = 0;
+ size_t curr_cnt = count();
+ for (size_t i=0; i<curr_cnt; i++) {
+ iterator = (iterator + size - 1) % curr_cnt;
+ if (*(buf + iterator) == value) {
+ cnt++;
+ }
+ }
+ return cnt;
+ }
+
+ /**
+ * Print the contents of the buffer
+ */
+ void dump()
+ {
+ size_t iterator = begin;
+ for (size_t i=0; i<size; i++) {
+ iterator = (iterator + size - 1) % size;
+ if (typeid(T) == typeid(int)) {
+ //DEBUG_INFO("buf[%2zu]=%2d", iterator, (int)*(buf + iterator));
+ }
+ else {
+ //DEBUG_INFO("buf[%2zu]=%2f", iterator, (float)*(buf + iterator));
+ }
+ }
+ }
+};
+
+} // namespace ZeroTier
+
+#endif
diff --git a/node/Salsa20.cpp b/node/Salsa20.cpp
index 1d4117e3..ca9e4718 100644
--- a/node/Salsa20.cpp
+++ b/node/Salsa20.cpp
@@ -14,7 +14,7 @@
#define XOR(v,w) ((v) ^ (w))
#define PLUS(v,w) ((uint32_t)((v) + (w)))
-// Set up laod/store macros with appropriate endianness (we don't use these in SSE mode)
+// Set up load/store macros with appropriate endianness (we don't use these in SSE mode)
#ifndef ZT_SALSA20_SSE
#if __BYTE_ORDER == __LITTLE_ENDIAN
diff --git a/node/SelfAwareness.cpp b/node/SelfAwareness.cpp
index c4f107fb..fc222672 100644
--- a/node/SelfAwareness.cpp
+++ b/node/SelfAwareness.cpp
@@ -145,7 +145,7 @@ std::vector<InetAddress> SelfAwareness::getSymmetricNatPredictions()
*
* Since flows are encrypted and authenticated they could not actually
* read or modify traffic, but they could gather meta-data for forensics
- * purpsoes or use this as a DOS attack vector. */
+ * purposes or use this as a DOS attack vector. */
std::map< uint32_t,unsigned int > maxPortByIp;
InetAddress theOneTrueSurface;
diff --git a/node/Switch.cpp b/node/Switch.cpp
index eeeca5db..7b517864 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -269,6 +269,8 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
}
}
+ uint8_t qosBucket = ZT_QOS_DEFAULT_BUCKET;
+
if (to.isMulticast()) {
MulticastGroup multicastGroup(to,0);
@@ -386,7 +388,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
network->learnBridgedMulticastGroup(tPtr,multicastGroup,RR->node->now());
// First pass sets noTee to false, but noTee is set to true in OutboundMulticast to prevent duplicates.
- if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
return;
}
@@ -410,7 +412,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
Address toZT(to.toAddress(network->id())); // since in-network MACs are derived from addresses and network IDs, we can reverse this
SharedPtr<Peer> toPeer(RR->topology->getPeer(tPtr,toZT));
- if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
return;
}
@@ -425,7 +427,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
- send(tPtr,outp,true);
+ aqm_enqueue(tPtr,network,outp,true,qosBucket);
} else {
Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME);
outp.append(network->id());
@@ -433,7 +435,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
- send(tPtr,outp,true);
+ aqm_enqueue(tPtr,network,outp,true,qosBucket);
}
} else {
@@ -442,7 +444,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
// We filter with a NULL destination ZeroTier address first. Filtrations
// for each ZT destination are also done below. This is the same rationale
// and design as for multicast.
- if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
return;
}
@@ -480,7 +482,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
}
for(unsigned int b=0;b<numBridges;++b) {
- if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
Packet outp(bridges[b],RR->identity.address(),Packet::VERB_EXT_FRAME);
outp.append(network->id());
outp.append((uint8_t)0x00);
@@ -490,7 +492,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
- send(tPtr,outp,true);
+ aqm_enqueue(tPtr,network,outp,true,qosBucket);
} else {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked (bridge replication)");
}
@@ -498,6 +500,263 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
}
}
+void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket)
+{
+ if(!network->QoSEnabled()) {
+ send(tPtr, packet, encrypt);
+ return;
+ }
+ NetworkQoSControlBlock *nqcb = _netQueueControlBlock[network->id()];
+ if (!nqcb) {
+ // DEBUG_INFO("creating network QoS control block (NQCB) for network %llx", network->id());
+ nqcb = new NetworkQoSControlBlock();
+ _netQueueControlBlock[network->id()] = nqcb;
+ // Initialize ZT_QOS_NUM_BUCKETS queues and place them in the INACTIVE list
+ // These queues will be shuffled between the new/old/inactive lists by the enqueue/dequeue algorithm
+ for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+ nqcb->inactiveQueues.push_back(new ManagedQueue(i));
+ }
+ }
+
+ if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) {
+ // DEBUG_INFO("skipping, no QoS for this packet, verb=%x", packet.verb());
+ // just send packet normally, no QoS for ZT protocol traffic
+ send(tPtr, packet, encrypt);
+ }
+
+ _aqm_m.lock();
+
+ // Enqueue packet and move queue to appropriate list
+
+ const Address dest(packet.destination());
+ TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt);
+
+ ManagedQueue *selectedQueue = nullptr;
+ for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+ if (i < nqcb->oldQueues.size()) { // search old queues first (I think this is best since old would imply most recent usage of the queue)
+ if (nqcb->oldQueues[i]->id == qosBucket) {
+ selectedQueue = nqcb->oldQueues[i];
+ }
+ } if (i < nqcb->newQueues.size()) { // search new queues (this would imply not often-used queues)
+ if (nqcb->newQueues[i]->id == qosBucket) {
+ selectedQueue = nqcb->newQueues[i];
+ }
+ } if (i < nqcb->inactiveQueues.size()) { // search inactive queues
+ if (nqcb->inactiveQueues[i]->id == qosBucket) {
+ selectedQueue = nqcb->inactiveQueues[i];
+ // move queue to end of NEW queue list
+ selectedQueue->byteCredit = ZT_QOS_QUANTUM;
+ // DEBUG_INFO("moving q=%p from INACTIVE to NEW list", selectedQueue);
+ nqcb->newQueues.push_back(selectedQueue);
+ nqcb->inactiveQueues.erase(nqcb->inactiveQueues.begin() + i);
+ }
+ }
+ }
+ if (!selectedQueue) {
+ return;
+ }
+
+ selectedQueue->q.push_back(txEntry);
+ selectedQueue->byteLength+=txEntry->packet.payloadLength();
+ nqcb->_currEnqueuedPackets++;
+
+ // DEBUG_INFO("nq=%2lu, oq=%2lu, iq=%2lu, nqcb.size()=%3d, bucket=%2d, q=%p", nqcb->newQueues.size(), nqcb->oldQueues.size(), nqcb->inactiveQueues.size(), nqcb->_currEnqueuedPackets, qosBucket, selectedQueue);
+
+ // Drop a packet if necessary
+ ManagedQueue *selectedQueueToDropFrom = nullptr;
+ if (nqcb->_currEnqueuedPackets > ZT_QOS_MAX_ENQUEUED_PACKETS)
+ {
+ // DEBUG_INFO("too many enqueued packets (%d), finding packet to drop", nqcb->_currEnqueuedPackets);
+ int maxQueueLength = 0;
+ for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+ if (i < nqcb->oldQueues.size()) {
+ if (nqcb->oldQueues[i]->byteLength > maxQueueLength) {
+ maxQueueLength = nqcb->oldQueues[i]->byteLength;
+ selectedQueueToDropFrom = nqcb->oldQueues[i];
+ }
+ } if (i < nqcb->newQueues.size()) {
+ if (nqcb->newQueues[i]->byteLength > maxQueueLength) {
+ maxQueueLength = nqcb->newQueues[i]->byteLength;
+ selectedQueueToDropFrom = nqcb->newQueues[i];
+ }
+ } if (i < nqcb->inactiveQueues.size()) {
+ if (nqcb->inactiveQueues[i]->byteLength > maxQueueLength) {
+ maxQueueLength = nqcb->inactiveQueues[i]->byteLength;
+ selectedQueueToDropFrom = nqcb->inactiveQueues[i];
+ }
+ }
+ }
+ if (selectedQueueToDropFrom) {
+ // DEBUG_INFO("dropping packet from head of largest queue (%d payload bytes)", maxQueueLength);
+ int sizeOfDroppedPacket = selectedQueueToDropFrom->q.front()->packet.payloadLength();
+ delete selectedQueueToDropFrom->q.front();
+ selectedQueueToDropFrom->q.pop_front();
+ selectedQueueToDropFrom->byteLength-=sizeOfDroppedPacket;
+ nqcb->_currEnqueuedPackets--;
+ }
+ }
+ _aqm_m.unlock();
+ aqm_dequeue(tPtr);
+}
+
+uint64_t Switch::control_law(uint64_t t, int count)
+{
+ return t + ZT_QOS_INTERVAL / sqrt(count);
+}
+
+Switch::dqr Switch::dodequeue(ManagedQueue *q, uint64_t now)
+{
+ dqr r;
+ r.ok_to_drop = false;
+ r.p = q->q.front();
+
+ if (r.p == NULL) {
+ q->first_above_time = 0;
+ return r;
+ }
+ uint64_t sojourn_time = now - r.p->creationTime;
+ if (sojourn_time < ZT_QOS_TARGET || q->byteLength <= ZT_DEFAULT_MTU) {
+ // went below - stay below for at least interval
+ q->first_above_time = 0;
+ } else {
+ if (q->first_above_time == 0) {
+ // just went above from below. if still above at
+ // first_above_time, will say it's ok to drop.
+ q->first_above_time = now + ZT_QOS_INTERVAL;
+ } else if (now >= q->first_above_time) {
+ r.ok_to_drop = true;
+ }
+ }
+ return r;
+}
+
+Switch::TXQueueEntry * Switch::CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now)
+{
+ dqr r = dodequeue(q, now);
+
+ if (q->dropping) {
+ if (!r.ok_to_drop) {
+ q->dropping = false;
+ }
+ while (now >= q->drop_next && q->dropping) {
+ q->q.pop_front(); // drop
+ r = dodequeue(q, now);
+ if (!r.ok_to_drop) {
+ // leave dropping state
+ q->dropping = false;
+ } else {
+ ++(q->count);
+ // schedule the next drop.
+ q->drop_next = control_law(q->drop_next, q->count);
+ }
+ }
+ } else if (r.ok_to_drop) {
+ q->q.pop_front(); // drop
+ r = dodequeue(q, now);
+ q->dropping = true;
+ q->count = (q->count > 2 && now - q->drop_next < 8*ZT_QOS_INTERVAL)?
+ q->count - 2 : 1;
+ q->drop_next = control_law(now, q->count);
+ }
+ return r.p;
+}
+
+void Switch::aqm_dequeue(void *tPtr)
+{
+ // Cycle through network-specific QoS control blocks
+ for(std::map<uint64_t,NetworkQoSControlBlock*>::iterator nqcb(_netQueueControlBlock.begin());nqcb!=_netQueueControlBlock.end();) {
+ if (!(*nqcb).second->_currEnqueuedPackets) {
+ return;
+ }
+
+ uint64_t now = RR->node->now();
+ TXQueueEntry *entryToEmit = nullptr;
+ std::vector<ManagedQueue*> *currQueues = &((*nqcb).second->newQueues);
+ std::vector<ManagedQueue*> *oldQueues = &((*nqcb).second->oldQueues);
+ std::vector<ManagedQueue*> *inactiveQueues = &((*nqcb).second->inactiveQueues);
+
+ _aqm_m.lock();
+
+ // Attempt dequeue from queues in NEW list
+ bool examiningNewQueues = true;
+ while (currQueues->size()) {
+ ManagedQueue *queueAtFrontOfList = currQueues->front();
+ if (queueAtFrontOfList->byteCredit < 0) {
+ queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
+ // Move to list of OLD queues
+ // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
+ oldQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ } else {
+ entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
+ if (!entryToEmit) {
+ // Move to end of list of OLD queues
+ // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
+ oldQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ }
+ else {
+ int len = entryToEmit->packet.payloadLength();
+ queueAtFrontOfList->byteLength -= len;
+ queueAtFrontOfList->byteCredit -= len;
+ // Send the packet!
+ queueAtFrontOfList->q.pop_front();
+ send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
+ (*nqcb).second->_currEnqueuedPackets--;
+ }
+ if (queueAtFrontOfList) {
+ //DEBUG_INFO("dequeuing from q=%p, len=%lu in NEW list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
+ }
+ break;
+ }
+ }
+
+ // Attempt dequeue from queues in OLD list
+ examiningNewQueues = false;
+ currQueues = &((*nqcb).second->oldQueues);
+ while (currQueues->size()) {
+ ManagedQueue *queueAtFrontOfList = currQueues->front();
+ if (queueAtFrontOfList->byteCredit < 0) {
+ queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
+ oldQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ } else {
+ entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
+ if (!entryToEmit) {
+ //DEBUG_INFO("moving q=%p from OLD to INACTIVE list", queueAtFrontOfList);
+ // Move to inactive list of queues
+ inactiveQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ }
+ else {
+ int len = entryToEmit->packet.payloadLength();
+ queueAtFrontOfList->byteLength -= len;
+ queueAtFrontOfList->byteCredit -= len;
+ queueAtFrontOfList->q.pop_front();
+ send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
+ (*nqcb).second->_currEnqueuedPackets--;
+ }
+ if (queueAtFrontOfList) {
+ //DEBUG_INFO("dequeuing from q=%p, len=%lu in OLD list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
+ }
+ break;
+ }
+ }
+ nqcb++;
+ _aqm_m.unlock();
+ }
+}
+
+void Switch::removeNetworkQoSControlBlock(uint64_t nwid)
+{
+ NetworkQoSControlBlock *nq = _netQueueControlBlock[nwid];
+ if (nq) {
+ _netQueueControlBlock.erase(nwid);
+ delete nq;
+ nq = NULL;
+ }
+}
+
void Switch::send(void *tPtr,Packet &packet,bool encrypt)
{
const Address dest(packet.destination());
@@ -557,6 +816,7 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
{
Mutex::Lock _l(_txQueue_m);
+
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
if (txi->dest == peer->address()) {
if (_trySend(tPtr,txi->packet,txi->encrypt)) {
@@ -581,6 +841,7 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now)
std::vector<Address> needWhois;
{
Mutex::Lock _l(_txQueue_m);
+
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
if (_trySend(tPtr,txi->packet,txi->encrypt)) {
_txQueue.erase(txi++);
@@ -654,12 +915,12 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
const SharedPtr<Peer> peer(RR->topology->getPeer(tPtr,destination));
if (peer) {
- viaPath = peer->getBestPath(now,false);
+ viaPath = peer->getAppropriatePath(now,false);
if (!viaPath) {
peer->tryMemorizedPath(tPtr,now); // periodically attempt memorized or statically defined paths, if any are known
const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer());
- if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) ) {
- if (!(viaPath = peer->getBestPath(now,true)))
+ if ( (!relay) || (!(viaPath = relay->getAppropriatePath(now,false))) ) {
+ if (!(viaPath = peer->getAppropriatePath(now,true)))
return false;
}
}
@@ -674,6 +935,8 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
unsigned int chunkSize = std::min(packet.size(),mtu);
packet.setFragmented(chunkSize < packet.size());
+ peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now);
+
if (trustedPathId) {
packet.setTrusted(trustedPathId);
} else {
diff --git a/node/Switch.hpp b/node/Switch.hpp
index 5de17fa0..0ee4ccfd 100644
--- a/node/Switch.hpp
+++ b/node/Switch.hpp
@@ -59,6 +59,14 @@ class Peer;
*/
class Switch
{
+ struct ManagedQueue;
+ struct TXQueueEntry;
+
+ typedef struct {
+ TXQueueEntry *p;
+ bool ok_to_drop;
+ } dqr;
+
public:
Switch(const RuntimeEnvironment *renv);
@@ -88,6 +96,62 @@ public:
void onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
/**
+ * Determines the next drop schedule for packets in the TX queue
+ *
+ * @param t Current time
+ * @param count Number of packets dropped this round
+ */
+ uint64_t control_law(uint64_t t, int count);
+
+ /**
+ * Selects a packet eligible for transmission from a TX queue. According to the control law, multiple packets
+ * may be intentionally dropped before a packet is returned to the AQM scheduler.
+ *
+ * @param q The TX queue that is being dequeued from
+ * @param now Current time
+ */
+ dqr dodequeue(ManagedQueue *q, uint64_t now);
+
+ /**
+ * Presents a packet to the AQM scheduler.
+ *
+ * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
+ * @param network Network that the packet shall be sent over
+ * @param packet Packet to be sent
+ * @param encrypt Encrypt packet payload? (always true except for HELLO)
+ * @param qosBucket Which bucket the rule-system determined this packet should fall into
+ */
+ void aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket);
+
+ /**
+ * Performs a single AQM cycle and dequeues and transmits all eligible packets on all networks
+ *
+ * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
+ */
+ void aqm_dequeue(void *tPtr);
+
+ /**
+ * Calls the dequeue mechanism and adjust queue state variables
+ *
+ * @param q The TX queue that is being dequeued from
+ * @param isNew Whether or not this queue is in the NEW list
+ * @param now Current time
+ */
+ Switch::TXQueueEntry * CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now);
+
+ /**
+ * Removes QoS Queues and flow state variables for a specific network. These queues are created
+ * automatically upon the transmission of the first packet from this peer to another peer on the
+ * given network.
+ *
+ * The reason for existence of queues and flow state variables specific to each network is so that
+ * each network's QoS rules function independently.
+ *
+ * @param nwid Network ID
+ */
+ void removeNetworkQoSControlBlock(uint64_t nwid);
+
+ /**
* Send a packet to a ZeroTier address (destination in packet)
*
* The packet must be fully composed with source and destination but not
@@ -200,6 +264,7 @@ private:
};
std::list< TXQueueEntry > _txQueue;
Mutex _txQueue_m;
+ Mutex _aqm_m;
// Tracks sending of VERB_RENDEZVOUS to relaying peers
struct _LastUniteKey
@@ -221,6 +286,35 @@ private:
};
Hashtable< _LastUniteKey,uint64_t > _lastUniteAttempt; // key is always sorted in ascending order, for set-like behavior
Mutex _lastUniteAttempt_m;
+
+ // Queue with additional flow state variables
+ struct ManagedQueue
+ {
+ ManagedQueue(int id) :
+ id(id),
+ byteCredit(ZT_QOS_QUANTUM),
+ byteLength(0),
+ dropping(false)
+ {}
+ int id;
+ int byteCredit;
+ int byteLength;
+ uint64_t first_above_time;
+ uint32_t count;
+ uint64_t drop_next;
+ bool dropping;
+ uint64_t drop_next_time;
+ std::list< TXQueueEntry *> q;
+ };
+ // To implement fq_codel we need to maintain a queue of queues
+ struct NetworkQoSControlBlock
+ {
+ int _currEnqueuedPackets;
+ std::vector<ManagedQueue *> newQueues;
+ std::vector<ManagedQueue *> oldQueues;
+ std::vector<ManagedQueue *> inactiveQueues;
+ };
+ std::map<uint64_t,NetworkQoSControlBlock*> _netQueueControlBlock;
};
} // namespace ZeroTier
diff --git a/node/Tag.hpp b/node/Tag.hpp
index d2e932c2..1d3e97fa 100644
--- a/node/Tag.hpp
+++ b/node/Tag.hpp
@@ -58,7 +58,7 @@ class RuntimeEnvironment;
* values.
*
* Unlike capabilities tags are signed only by the issuer and are never
- * transferrable.
+ * transferable.
*/
class Tag : public Credential
{
diff --git a/node/Topology.cpp b/node/Topology.cpp
index 12aa8d2c..9fd5c2d7 100644
--- a/node/Topology.cpp
+++ b/node/Topology.cpp
@@ -138,7 +138,7 @@ SharedPtr<Peer> Topology::getPeer(void *tPtr,const Address &zta)
}
return SharedPtr<Peer>();
}
- } catch ( ... ) {} // ignore invalid identities or other strage failures
+ } catch ( ... ) {} // ignore invalid identities or other strange failures
return SharedPtr<Peer>();
}
diff --git a/node/Topology.hpp b/node/Topology.hpp
index 63946a32..5d726007 100644
--- a/node/Topology.hpp
+++ b/node/Topology.hpp
@@ -299,7 +299,7 @@ public:
Address *a = (Address *)0;
SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
while (i.next(a,p)) {
- const SharedPtr<Path> pp((*p)->getBestPath(now,false));
+ const SharedPtr<Path> pp((*p)->getAppropriatePath(now,false));
if (pp)
++cnt;
}
diff --git a/node/Trace.cpp b/node/Trace.cpp
index 451f5806..7b2fbbb7 100644
--- a/node/Trace.cpp
+++ b/node/Trace.cpp
@@ -107,6 +107,26 @@ void Trace::peerConfirmingUnknownPath(void *const tPtr,const uint64_t networkId,
}
}
+void Trace::peerLinkNowRedundant(void *const tPtr,Peer &peer)
+{
+ ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is fully redundant",peer.address().toInt());
+}
+
+void Trace::peerLinkNoLongerRedundant(void *const tPtr,Peer &peer)
+{
+ ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is no longer redundant",peer.address().toInt());
+}
+
+void Trace::peerLinkAggregateStatistics(void *const tPtr,Peer &peer)
+{
+ ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is composed of (%d) physical paths %s, has packet delay variance (%.0f ms), mean latency (%.0f ms)",
+ peer.address().toInt(),
+ peer.aggregateLinkPhysicalPathCount(),
+ peer.interfaceListStr(),
+ peer.computeAggregateLinkPacketDelayVariance(),
+ peer.computeAggregateLinkMeanLatency());
+}
+
void Trace::peerLearnedNewPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath,const uint64_t packetId)
{
char tmp[128];
diff --git a/node/Trace.hpp b/node/Trace.hpp
index 2a2fca6c..734e84a5 100644
--- a/node/Trace.hpp
+++ b/node/Trace.hpp
@@ -121,6 +121,12 @@ public:
void resettingPathsInScope(void *const tPtr,const Address &reporter,const InetAddress &reporterPhysicalAddress,const InetAddress &myPhysicalAddress,const InetAddress::IpScope scope);
void peerConfirmingUnknownPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &path,const uint64_t packetId,const Packet::Verb verb);
+
+ void peerLinkNowRedundant(void *const tPtr,Peer &peer);
+ void peerLinkNoLongerRedundant(void *const tPtr,Peer &peer);
+
+ void peerLinkAggregateStatistics(void *const tPtr,Peer &peer);
+
void peerLearnedNewPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath,const uint64_t packetId);
void peerRedirected(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath);
diff --git a/node/Utils.hpp b/node/Utils.hpp
index cb107391..35493f01 100644
--- a/node/Utils.hpp
+++ b/node/Utils.hpp
@@ -265,6 +265,14 @@ public:
return l;
}
+ static inline float normalize(float value, int64_t bigMin, int64_t bigMax, int32_t targetMin, int32_t targetMax)
+ {
+ int64_t bigSpan = bigMax - bigMin;
+ int64_t smallSpan = targetMax - targetMin;
+ float valueScaled = (value - (float)bigMin) / (float)bigSpan;
+ return (float)targetMin + valueScaled * (float)smallSpan;
+ }
+
/**
* Generate secure random bytes
*