summaryrefslogtreecommitdiff
path: root/node/Path.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/Path.hpp')
-rw-r--r--node/Path.hpp448
1 files changed, 443 insertions, 5 deletions
diff --git a/node/Path.hpp b/node/Path.hpp
index e12328ff..cafff8cf 100644
--- a/node/Path.hpp
+++ b/node/Path.hpp
@@ -39,6 +39,10 @@
#include "SharedPtr.hpp"
#include "AtomicCounter.hpp"
#include "Utils.hpp"
+#include "RingBuffer.hpp"
+#include "Packet.hpp"
+
+#include "../osdep/Phy.hpp"
/**
* Maximum return value of preferenceRank()
@@ -55,6 +59,7 @@ class RuntimeEnvironment;
class Path
{
friend class SharedPtr<Path>;
+ Phy<Path *> *_phy;
public:
/**
@@ -93,22 +98,77 @@ public:
_lastOut(0),
_lastIn(0),
_lastTrustEstablishedPacketReceived(0),
+ _lastPathQualityComputeTime(0),
_localSocket(-1),
_latency(0xffff),
_addr(),
- _ipScope(InetAddress::IP_SCOPE_NONE)
+ _ipScope(InetAddress::IP_SCOPE_NONE),
+ _lastAck(0),
+ _lastThroughputEstimation(0),
+ _lastQoSMeasurement(0),
+ _lastQoSRecordPurge(0),
+ _unackedBytes(0),
+ _expectingAckAsOf(0),
+ _packetsReceivedSinceLastAck(0),
+ _packetsReceivedSinceLastQoS(0),
+ _maxLifetimeThroughput(0),
+ _lastComputedMeanThroughput(0),
+ _bytesAckedSinceLastThroughputEstimation(0),
+ _lastComputedMeanLatency(0.0),
+ _lastComputedPacketDelayVariance(0.0),
+ _lastComputedPacketErrorRatio(0.0),
+ _lastComputedPacketLossRatio(0),
+ _lastComputedStability(0.0),
+ _lastComputedRelativeQuality(0),
+ _lastComputedThroughputDistCoeff(0.0),
+ _lastAllocation(0)
{
+ prepareBuffers();
}
Path(const int64_t localSocket,const InetAddress &addr) :
_lastOut(0),
_lastIn(0),
_lastTrustEstablishedPacketReceived(0),
+ _lastPathQualityComputeTime(0),
_localSocket(localSocket),
_latency(0xffff),
_addr(addr),
- _ipScope(addr.ipScope())
+ _ipScope(addr.ipScope()),
+ _lastAck(0),
+ _lastThroughputEstimation(0),
+ _lastQoSMeasurement(0),
+ _lastQoSRecordPurge(0),
+ _unackedBytes(0),
+ _expectingAckAsOf(0),
+ _packetsReceivedSinceLastAck(0),
+ _packetsReceivedSinceLastQoS(0),
+ _maxLifetimeThroughput(0),
+ _lastComputedMeanThroughput(0),
+ _bytesAckedSinceLastThroughputEstimation(0),
+ _lastComputedMeanLatency(0.0),
+ _lastComputedPacketDelayVariance(0.0),
+ _lastComputedPacketErrorRatio(0.0),
+ _lastComputedPacketLossRatio(0),
+ _lastComputedStability(0.0),
+ _lastComputedRelativeQuality(0),
+ _lastComputedThroughputDistCoeff(0.0),
+ _lastAllocation(0)
{
+ prepareBuffers();
+ _phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, 16);
+ }
+
+ ~Path()
+ {
+ delete _throughputSamples;
+ delete _latencySamples;
+ delete _packetValiditySamples;
+ delete _throughputDisturbanceSamples;
+ _throughputSamples = NULL;
+ _latencySamples = NULL;
+ _packetValiditySamples = NULL;
+ _throughputDisturbanceSamples = NULL;
}
/**
@@ -147,12 +207,16 @@ public:
*
* @param l Measured latency
*/
- inline void updateLatency(const unsigned int l)
+ inline void updateLatency(const unsigned int l, int64_t now)
{
unsigned int pl = _latency;
- if (pl < 0xffff)
+ if (pl < 0xffff) {
_latency = (pl + l) / 2;
- else _latency = l;
+ }
+ else {
+ _latency = l;
+ }
+ _latencySamples->push(l);
}
/**
@@ -241,6 +305,324 @@ public:
}
/**
+ * Record statistics on outgoing packets. Used later to estimate QoS metrics.
+ *
+ * @param now Current time
+ * @param packetId ID of packet
+ * @param payloadLength Length of payload
+ * @param verb Packet verb
+ */
+ inline void recordOutgoingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
+ {
+ Mutex::Lock _l(_statistics_m);
+ if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) {
+ if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) {
+ _unackedBytes += payloadLength;
+ // Take note that we're expecting a VERB_ACK on this path as of a specific time
+ _expectingAckAsOf = ackAge(now) > ZT_PATH_ACK_INTERVAL ? _expectingAckAsOf : now;
+ if (_outQoSRecords.size() < ZT_PATH_MAX_OUTSTANDING_QOS_RECORDS) {
+ _outQoSRecords[packetId] = now;
+ }
+ }
+ }
+ }
+
+ /**
+ * Record statistics on incoming packets. Used later to estimate QoS metrics.
+ *
+ * @param now Current time
+ * @param packetId ID of packet
+ * @param payloadLength Length of payload
+ * @param verb Packet verb
+ */
+ inline void recordIncomingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
+ {
+ Mutex::Lock _l(_statistics_m);
+ if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) {
+ if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) {
+ _inACKRecords[packetId] = payloadLength;
+ _packetsReceivedSinceLastAck++;
+ _inQoSRecords[packetId] = now;
+ _packetsReceivedSinceLastQoS++;
+ }
+ _packetValiditySamples->push(true);
+ }
+ }
+
+ /**
+ * Record that we've received a VERB_ACK on this path, also compute throughput if required.
+ *
+ * @param now Current time
+ * @param ackedBytes Number of bytes acknowledged by other peer
+ */
+ inline void receivedAck(int64_t now, int32_t ackedBytes)
+ {
+ _expectingAckAsOf = 0;
+ _unackedBytes = (ackedBytes > _unackedBytes) ? 0 : _unackedBytes - ackedBytes;
+ int64_t timeSinceThroughputEstimate = (now - _lastThroughputEstimation);
+ if (timeSinceThroughputEstimate >= ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL) {
+ uint64_t throughput = (float)(_bytesAckedSinceLastThroughputEstimation * 8) / ((float)timeSinceThroughputEstimate / (float)1000);
+ _throughputSamples->push(throughput);
+ _maxLifetimeThroughput = throughput > _maxLifetimeThroughput ? throughput : _maxLifetimeThroughput;
+ _lastThroughputEstimation = now;
+ _bytesAckedSinceLastThroughputEstimation = 0;
+ } else {
+ _bytesAckedSinceLastThroughputEstimation += ackedBytes;
+ }
+ }
+
+ /**
+ * @return Number of bytes this peer is responsible for ACKing since last ACK
+ */
+ inline int32_t bytesToAck()
+ {
+ Mutex::Lock _l(_statistics_m);
+ int32_t bytesToAck = 0;
+ std::map<uint64_t,uint16_t>::iterator it = _inACKRecords.begin();
+ while (it != _inACKRecords.end()) {
+ bytesToAck += it->second;
+ it++;
+ }
+ return bytesToAck;
+ }
+
+ /**
+ * @return Number of bytes thus far sent that have not been acknowledged by the remote peer
+ */
+ inline int64_t unackedSentBytes()
+ {
+ return _unackedBytes;
+ }
+
+ /**
+ * Account for the fact that an ACK was just sent. Reset counters, timers, and clear statistics buffers
+ *
+ * @param Current time
+ */
+ inline void sentAck(int64_t now)
+ {
+ Mutex::Lock _l(_statistics_m);
+ _inACKRecords.clear();
+ _packetsReceivedSinceLastAck = 0;
+ _lastAck = now;
+ }
+
+ /**
+ * Receive QoS data, match with recorded egress times from this peer, compute latency
+ * estimates.
+ *
+ * @param now Current time
+ * @param count Number of records
+ * @param rx_id table of packet IDs
+ * @param rx_ts table of holding times
+ */
+ inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint16_t *rx_ts)
+ {
+ Mutex::Lock _l(_statistics_m);
+ // Look up egress times and compute latency values for each record
+ std::map<uint64_t,uint64_t>::iterator it;
+ for (int j=0; j<count; j++) {
+ it = _outQoSRecords.find(rx_id[j]);
+ if (it != _outQoSRecords.end()) {
+ uint16_t rtt = (uint16_t)(now - it->second);
+ uint16_t rtt_compensated = rtt - rx_ts[j];
+ float latency = rtt_compensated / 2.0;
+ updateLatency(latency, now);
+ _outQoSRecords.erase(it);
+ }
+ }
+ }
+
+ /**
+ * Generate the contents of a VERB_QOS_MEASUREMENT packet.
+ *
+ * @param now Current time
+ * @param qosBuffer destination buffer
+ * @return Size of payload
+ */
+ inline int32_t generateQoSPacket(int64_t now, char *qosBuffer)
+ {
+ Mutex::Lock _l(_statistics_m);
+ int32_t len = 0;
+ std::map<uint64_t,uint64_t>::iterator it = _inQoSRecords.begin();
+ int i=0;
+ while (i<_packetsReceivedSinceLastQoS && it != _inQoSRecords.end()) {
+ uint64_t id = it->first;
+ memcpy(qosBuffer, &id, sizeof(uint64_t));
+ qosBuffer+=sizeof(uint64_t);
+ uint16_t holdingTime = (now - it->second);
+ memcpy(qosBuffer, &holdingTime, sizeof(uint16_t));
+ qosBuffer+=sizeof(uint16_t);
+ len+=sizeof(uint64_t)+sizeof(uint16_t);
+ _inQoSRecords.erase(it++);
+ i++;
+ }
+ return len;
+ }
+
+ /**
+ * Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers.
+ *
+ * @param Current time
+ */
+ inline void sentQoS(int64_t now) {
+ _packetsReceivedSinceLastQoS = 0;
+ _lastQoSMeasurement = now;
+ }
+
+ /**
+ * @param now Current time
+ * @return Whether an ACK (VERB_ACK) packet needs to be emitted at this time
+ */
+ inline bool needsToSendAck(int64_t now) {
+ return ((now - _lastAck) >= ZT_PATH_ACK_INTERVAL ||
+ (_packetsReceivedSinceLastAck == ZT_PATH_QOS_TABLE_SIZE)) && _packetsReceivedSinceLastAck;
+ }
+
+ /**
+ * @param now Current time
+ * @return Whether a QoS (VERB_QOS_MEASUREMENT) packet needs to be emitted at this time
+ */
+ inline bool needsToSendQoS(int64_t now) {
+ return ((_packetsReceivedSinceLastQoS >= ZT_PATH_QOS_TABLE_SIZE) ||
+ ((now - _lastQoSMeasurement) > ZT_PATH_QOS_INTERVAL)) && _packetsReceivedSinceLastQoS;
+ }
+
+ /**
+ * How much time has elapsed since we've been expecting a VERB_ACK on this path. This value
+ * is used to determine a more relevant path "age". This lets us penalize paths which are no
+ * longer ACKing, but not those that simple aren't being used to carry traffic at the
+ * current time.
+ */
+ inline int64_t ackAge(int64_t now) { return _expectingAckAsOf ? now - _expectingAckAsOf : 0; }
+
+ /**
+ * The maximum observed throughput (in bits/s) for this path
+ */
+ inline uint64_t maxLifetimeThroughput() { return _maxLifetimeThroughput; }
+
+ /**
+ * @return The mean throughput (in bits/s) of this link
+ */
+ inline uint64_t meanThroughput() { return _lastComputedMeanThroughput; }
+
+ /**
+ * Assign a new relative quality value for this path in the aggregate link
+ *
+ * @param rq Quality of this path in comparison to other paths available to this peer
+ */
+ inline void updateRelativeQuality(float rq) { _lastComputedRelativeQuality = rq; }
+
+ /**
+ * @return Quality of this path compared to others in the aggregate link
+ */
+ inline float relativeQuality() { return _lastComputedRelativeQuality; }
+
+ /**
+ * Assign a new allocation value for this path in the aggregate link
+ *
+ * @param allocation Percentage of traffic to be sent over this path to a peer
+ */
+ inline void updateComponentAllocationOfAggregateLink(unsigned char allocation) { _lastAllocation = allocation; }
+
+ /**
+ * @return Percentage of traffic allocated to this path in the aggregate link
+ */
+ inline unsigned char allocation() { return _lastAllocation; }
+
+ /**
+ * @return Stability estimates can become expensive to compute, we cache the most recent result.
+ */
+ inline float lastComputedStability() { return _lastComputedStability; }
+
+ /**
+ * @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to
+ */
+ inline char *getName() { return _ifname; }
+
+ /**
+ * @return Packet delay variance
+ */
+ inline float packetDelayVariance() { return _lastComputedPacketDelayVariance; }
+
+ /**
+ * @return Previously-computed mean latency
+ */
+ inline float meanLatency() { return _lastComputedMeanLatency; }
+
+ /**
+ * @return Packet loss rate (PLR)
+ */
+ inline float packetLossRatio() { return _lastComputedPacketLossRatio; }
+
+ /**
+ * @return Packet error ratio (PER)
+ */
+ inline float packetErrorRatio() { return _lastComputedPacketErrorRatio; }
+
+ /**
+ * Record an invalid incoming packet. This packet failed MAC/compression/cipher checks and will now
+ * contribute to a Packet Error Ratio (PER).
+ */
+ inline void recordInvalidPacket() { _packetValiditySamples->push(false); }
+
+ /**
+ * @return A pointer to a cached copy of the address string for this Path (For debugging only)
+ */
+ inline char *getAddressString() { return _addrString; }
+
+ /**
+ * @return The current throughput disturbance coefficient
+ */
+ inline float throughputDisturbanceCoefficient() { return _lastComputedThroughputDistCoeff; }
+
+ /**
+ * Compute and cache stability and performance metrics. The resultant stability coefficient is a measure of how "well behaved"
+ * this path is. This figure is substantially different from (but required for the estimation of the path's overall "quality".
+ *
+ * @param now Current time
+ */
+ inline void processBackgroundPathMeasurements(int64_t now) {
+ if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
+ Mutex::Lock _l(_statistics_m);
+ _lastPathQualityComputeTime = now;
+ address().toString(_addrString);
+ _lastComputedMeanLatency = _latencySamples->mean();
+ _lastComputedPacketDelayVariance = _latencySamples->stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689)
+ _lastComputedMeanThroughput = (uint64_t)_throughputSamples->mean();
+ // If no packet validity samples, assume PER==0
+ _lastComputedPacketErrorRatio = 1 - (_packetValiditySamples->count() ? _packetValiditySamples->mean() : 1);
+ // Compute path stability
+ // Normalize measurements with wildly different ranges into a reasonable range
+ float normalized_pdv = Utils::normalize(_lastComputedPacketDelayVariance, 0, ZT_PATH_MAX_PDV, 0, 10);
+ float normalized_la = Utils::normalize(_lastComputedMeanLatency, 0, ZT_PATH_MAX_MEAN_LATENCY, 0, 10);
+ float throughput_cv = _throughputSamples->mean() > 0 ? _throughputSamples->stddev() / _throughputSamples->mean() : 1;
+ // Form an exponential cutoff and apply contribution weights
+ float pdv_contrib = exp((-1)*normalized_pdv) * ZT_PATH_CONTRIB_PDV;
+ float latency_contrib = exp((-1)*normalized_la) * ZT_PATH_CONTRIB_LATENCY;
+ // Throughput Disturbance Coefficient
+ float throughput_disturbance_contrib = exp((-1)*throughput_cv) * ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE;
+ _throughputDisturbanceSamples->push(throughput_cv);
+ _lastComputedThroughputDistCoeff = _throughputDisturbanceSamples->mean();
+ // Obey user-defined ignored contributions
+ pdv_contrib = ZT_PATH_CONTRIB_PDV > 0.0 ? pdv_contrib : 1;
+ latency_contrib = ZT_PATH_CONTRIB_LATENCY > 0.0 ? latency_contrib : 1;
+ throughput_disturbance_contrib = ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE > 0.0 ? throughput_disturbance_contrib : 1;
+ // Stability
+ _lastComputedStability = pdv_contrib + latency_contrib + throughput_disturbance_contrib;
+ _lastComputedStability *= 1 - _lastComputedPacketErrorRatio;
+ // Prevent QoS records from sticking around for too long
+ std::map<uint64_t,uint64_t>::iterator it = _outQoSRecords.begin();
+ while (it != _outQoSRecords.end()) {
+ // Time since egress of tracked packet
+ if ((now - it->second) >= ZT_PATH_QOS_TIMEOUT) {
+ _outQoSRecords.erase(it++);
+ } else { it++; }
+ }
+ }
+ }
+
+ /**
* @return True if this path is alive (receiving heartbeats)
*/
inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); }
@@ -265,15 +647,71 @@ public:
*/
inline int64_t lastTrustEstablishedPacketReceived() const { return _lastTrustEstablishedPacketReceived; }
+ /**
+ * Initialize statistical buffers
+ */
+ inline void prepareBuffers() {
+ _throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ _latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ _packetValiditySamples = new RingBuffer<bool>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ _throughputDisturbanceSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+ memset(_ifname, 0, 16);
+ memset(_addrString, 0, sizeof(_addrString));
+ }
+
private:
+ Mutex _statistics_m;
+
volatile int64_t _lastOut;
volatile int64_t _lastIn;
volatile int64_t _lastTrustEstablishedPacketReceived;
+ volatile int64_t _lastPathQualityComputeTime;
int64_t _localSocket;
volatile unsigned int _latency;
InetAddress _addr;
InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often
AtomicCounter __refCount;
+
+ std::map<uint64_t, uint64_t> _outQoSRecords; // id:egress_time
+ std::map<uint64_t, uint64_t> _inQoSRecords; // id:now
+ std::map<uint64_t, uint16_t> _inACKRecords; // id:len
+
+ int64_t _lastAck;
+ int64_t _lastThroughputEstimation;
+ int64_t _lastQoSMeasurement;
+ int64_t _lastQoSRecordPurge;
+
+ int64_t _unackedBytes;
+ int64_t _expectingAckAsOf;
+ int16_t _packetsReceivedSinceLastAck;
+ int16_t _packetsReceivedSinceLastQoS;
+
+ uint64_t _maxLifetimeThroughput;
+ uint64_t _lastComputedMeanThroughput;
+ uint64_t _bytesAckedSinceLastThroughputEstimation;
+
+ float _lastComputedMeanLatency;
+ float _lastComputedPacketDelayVariance;
+
+ float _lastComputedPacketErrorRatio;
+ float _lastComputedPacketLossRatio;
+
+ // cached estimates
+ float _lastComputedStability;
+ float _lastComputedRelativeQuality;
+ float _lastComputedThroughputDistCoeff;
+ unsigned char _lastAllocation;
+
+
+
+ // cached human-readable strings for tracing purposes
+ char _ifname[16];
+ char _addrString[256];
+
+ RingBuffer<uint64_t> *_throughputSamples;
+ RingBuffer<uint32_t> *_latencySamples;
+ RingBuffer<bool> *_packetValiditySamples;
+ RingBuffer<float> *_throughputDisturbanceSamples;
};
} // namespace ZeroTier