diff options
Diffstat (limited to 'node/Path.hpp')
| -rw-r--r-- | node/Path.hpp | 448 | 
1 files changed, 443 insertions, 5 deletions
| diff --git a/node/Path.hpp b/node/Path.hpp index e12328ff..cafff8cf 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -39,6 +39,10 @@  #include "SharedPtr.hpp"  #include "AtomicCounter.hpp"  #include "Utils.hpp" +#include "RingBuffer.hpp" +#include "Packet.hpp" + +#include "../osdep/Phy.hpp"  /**   * Maximum return value of preferenceRank() @@ -55,6 +59,7 @@ class RuntimeEnvironment;  class Path  {  	friend class SharedPtr<Path>; +	Phy<Path *> *_phy;  public:  	/** @@ -93,22 +98,77 @@ public:  		_lastOut(0),  		_lastIn(0),  		_lastTrustEstablishedPacketReceived(0), +		_lastPathQualityComputeTime(0),  		_localSocket(-1),  		_latency(0xffff),  		_addr(), -		_ipScope(InetAddress::IP_SCOPE_NONE) +		_ipScope(InetAddress::IP_SCOPE_NONE), +		_lastAck(0), +		_lastThroughputEstimation(0), +		_lastQoSMeasurement(0), +		_lastQoSRecordPurge(0), +		_unackedBytes(0), +		_expectingAckAsOf(0), +		_packetsReceivedSinceLastAck(0), +		_packetsReceivedSinceLastQoS(0), +		_maxLifetimeThroughput(0), +		_lastComputedMeanThroughput(0), +		_bytesAckedSinceLastThroughputEstimation(0), +		_lastComputedMeanLatency(0.0), +		_lastComputedPacketDelayVariance(0.0), +		_lastComputedPacketErrorRatio(0.0), +		_lastComputedPacketLossRatio(0), +		_lastComputedStability(0.0), +		_lastComputedRelativeQuality(0), +		_lastComputedThroughputDistCoeff(0.0), +		_lastAllocation(0)  	{ +		prepareBuffers();  	}  	Path(const int64_t localSocket,const InetAddress &addr) :  		_lastOut(0),  		_lastIn(0),  		_lastTrustEstablishedPacketReceived(0), +		_lastPathQualityComputeTime(0),  		_localSocket(localSocket),  		_latency(0xffff),  		_addr(addr), -		_ipScope(addr.ipScope()) +		_ipScope(addr.ipScope()), +		_lastAck(0), +		_lastThroughputEstimation(0), +		_lastQoSMeasurement(0), +		_lastQoSRecordPurge(0), +		_unackedBytes(0), +		_expectingAckAsOf(0), +		_packetsReceivedSinceLastAck(0), +		_packetsReceivedSinceLastQoS(0), +		_maxLifetimeThroughput(0), +		_lastComputedMeanThroughput(0), +		_bytesAckedSinceLastThroughputEstimation(0), +		_lastComputedMeanLatency(0.0), +		_lastComputedPacketDelayVariance(0.0), +		_lastComputedPacketErrorRatio(0.0), +		_lastComputedPacketLossRatio(0), +		_lastComputedStability(0.0), +		_lastComputedRelativeQuality(0), +		_lastComputedThroughputDistCoeff(0.0), +		_lastAllocation(0)  	{ +		prepareBuffers(); +		_phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, 16); +	} + +	~Path() +	{ +		delete _throughputSamples; +		delete _latencySamples; +		delete _packetValiditySamples; +		delete _throughputDisturbanceSamples; +		_throughputSamples = NULL; +		_latencySamples = NULL; +		_packetValiditySamples = NULL; +		_throughputDisturbanceSamples = NULL;  	}  	/** @@ -147,12 +207,16 @@ public:  	 *  	 * @param l Measured latency  	 */ -	inline void updateLatency(const unsigned int l) +	inline void updateLatency(const unsigned int l, int64_t now)  	{  		unsigned int pl = _latency; -		if (pl < 0xffff) +		if (pl < 0xffff) {  			_latency = (pl + l) / 2; -		else _latency = l; +		} +		else { +			_latency = l; +		} +		_latencySamples->push(l);  	}  	/** @@ -241,6 +305,324 @@ public:  	}  	/** +	 * Record statistics on outgoing packets. Used later to estimate QoS metrics. +	 * +	 * @param now Current time +	 * @param packetId ID of packet +	 * @param payloadLength Length of payload +	 * @param verb Packet verb +	 */ +	inline void recordOutgoingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb) +	{ +		Mutex::Lock _l(_statistics_m); +		if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) { +			if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) { +				_unackedBytes += payloadLength; +				// Take note that we're expecting a VERB_ACK on this path as of a specific time +				_expectingAckAsOf = ackAge(now) > ZT_PATH_ACK_INTERVAL ? _expectingAckAsOf : now; +				if (_outQoSRecords.size() < ZT_PATH_MAX_OUTSTANDING_QOS_RECORDS) { +					_outQoSRecords[packetId] = now; +				} +			} +		} +	} + +	/** +	 * Record statistics on incoming packets. Used later to estimate QoS metrics. +	 * +	 * @param now Current time +	 * @param packetId ID of packet +	 * @param payloadLength Length of payload +	 * @param verb Packet verb +	 */ +	inline void recordIncomingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb) +	{ +		Mutex::Lock _l(_statistics_m); +		if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) { +			if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) { +				_inACKRecords[packetId] = payloadLength; +				_packetsReceivedSinceLastAck++; +				_inQoSRecords[packetId] = now; +				_packetsReceivedSinceLastQoS++; +			} +			_packetValiditySamples->push(true); +		} +	} + +	/** +	 * Record that we've received a VERB_ACK on this path, also compute throughput if required. +	 * +	 * @param now Current time +	 * @param ackedBytes Number of bytes acknowledged by other peer +	 */ +	inline void receivedAck(int64_t now, int32_t ackedBytes) +	{ +		_expectingAckAsOf = 0; +		_unackedBytes = (ackedBytes > _unackedBytes) ? 0 : _unackedBytes - ackedBytes; +		int64_t timeSinceThroughputEstimate = (now - _lastThroughputEstimation); +		if (timeSinceThroughputEstimate >= ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL) { +			uint64_t throughput = (float)(_bytesAckedSinceLastThroughputEstimation * 8) / ((float)timeSinceThroughputEstimate / (float)1000); +			_throughputSamples->push(throughput); +			_maxLifetimeThroughput = throughput > _maxLifetimeThroughput ? throughput : _maxLifetimeThroughput; +			_lastThroughputEstimation = now; +			_bytesAckedSinceLastThroughputEstimation = 0; +		} else { +			_bytesAckedSinceLastThroughputEstimation += ackedBytes; +		} +	} + +	/** +	 * @return Number of bytes this peer is responsible for ACKing since last ACK +	 */ +	inline int32_t bytesToAck() +	{ +		Mutex::Lock _l(_statistics_m); +		int32_t bytesToAck = 0; +		std::map<uint64_t,uint16_t>::iterator it = _inACKRecords.begin(); +		while (it != _inACKRecords.end()) { +			bytesToAck += it->second; +			it++; +		} +		return bytesToAck; +	} + +	/** +	 * @return Number of bytes thus far sent that have not been acknowledged by the remote peer +	 */ +	inline int64_t unackedSentBytes() +	{ +		return _unackedBytes; +	} + +	/** +	 * Account for the fact that an ACK was just sent. Reset counters, timers, and clear statistics buffers +	 * +	 * @param Current time +	 */ +	inline void sentAck(int64_t now) +	{ +		Mutex::Lock _l(_statistics_m); +		_inACKRecords.clear(); +		_packetsReceivedSinceLastAck = 0; +		_lastAck = now; +	} + +	/** +	 * Receive QoS data, match with recorded egress times from this peer, compute latency +	 * estimates. +	 * +	 * @param now Current time +	 * @param count Number of records +	 * @param rx_id table of packet IDs +	 * @param rx_ts table of holding times +	 */ +	inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint16_t *rx_ts) +	{ +		Mutex::Lock _l(_statistics_m); +		// Look up egress times and compute latency values for each record +		std::map<uint64_t,uint64_t>::iterator it; +		for (int j=0; j<count; j++) { +			it = _outQoSRecords.find(rx_id[j]); +			if (it != _outQoSRecords.end()) { +				uint16_t rtt = (uint16_t)(now - it->second); +				uint16_t rtt_compensated = rtt - rx_ts[j]; +				float latency = rtt_compensated / 2.0; +				updateLatency(latency, now); +				_outQoSRecords.erase(it); +			} +		} +	} + +	/** +	 * Generate the contents of a VERB_QOS_MEASUREMENT packet. +	 * +	 * @param now Current time +	 * @param qosBuffer destination buffer +	 * @return Size of payload +	 */ +	inline int32_t generateQoSPacket(int64_t now, char *qosBuffer) +	{ +		Mutex::Lock _l(_statistics_m); +		int32_t len = 0; +		std::map<uint64_t,uint64_t>::iterator it = _inQoSRecords.begin(); +		int i=0; +		while (i<_packetsReceivedSinceLastQoS && it != _inQoSRecords.end()) { +			uint64_t id = it->first; +			memcpy(qosBuffer, &id, sizeof(uint64_t)); +			qosBuffer+=sizeof(uint64_t); +			uint16_t holdingTime = (now - it->second); +			memcpy(qosBuffer, &holdingTime, sizeof(uint16_t)); +			qosBuffer+=sizeof(uint16_t); +			len+=sizeof(uint64_t)+sizeof(uint16_t); +			_inQoSRecords.erase(it++); +			i++; +		} +		return len; +	} + +	/** +	 * Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers. +	 * +	 * @param Current time +	 */ +	inline void sentQoS(int64_t now) { +		_packetsReceivedSinceLastQoS = 0; +		_lastQoSMeasurement = now; +	} + +	/** +	 * @param now Current time +	 * @return Whether an ACK (VERB_ACK) packet needs to be emitted at this time +	 */ +	inline bool needsToSendAck(int64_t now) { +		return ((now - _lastAck) >= ZT_PATH_ACK_INTERVAL || +			(_packetsReceivedSinceLastAck == ZT_PATH_QOS_TABLE_SIZE)) && _packetsReceivedSinceLastAck; +	} + +	/** +	 * @param now Current time +	 * @return Whether a QoS (VERB_QOS_MEASUREMENT) packet needs to be emitted at this time +	 */ +	inline bool needsToSendQoS(int64_t now) { +		return ((_packetsReceivedSinceLastQoS >= ZT_PATH_QOS_TABLE_SIZE) || +			((now - _lastQoSMeasurement) > ZT_PATH_QOS_INTERVAL)) && _packetsReceivedSinceLastQoS; +	} + +	/** +	 * How much time has elapsed since we've been expecting a VERB_ACK on this path. This value +	 * is used to determine a more relevant path "age". This lets us penalize paths which are no +	 * longer ACKing, but not those that simple aren't being used to carry traffic at the +	 * current time. +	 */ +	inline int64_t ackAge(int64_t now) { return _expectingAckAsOf ? now - _expectingAckAsOf : 0; } + +	/** +	 * The maximum observed throughput (in bits/s) for this path +	 */ +	inline uint64_t maxLifetimeThroughput() { return _maxLifetimeThroughput; } + +	/** +	 * @return The mean throughput (in bits/s) of this link +	 */ +	inline uint64_t meanThroughput() { return _lastComputedMeanThroughput; } + +	/** +	 * Assign a new relative quality value for this path in the aggregate link +	 * +	 * @param rq Quality of this path in comparison to other paths available to this peer +	 */ +	inline void updateRelativeQuality(float rq) { _lastComputedRelativeQuality = rq; } + +	/** +	 * @return Quality of this path compared to others in the aggregate link +	 */ +	inline float relativeQuality() { return _lastComputedRelativeQuality; } + +	/** +	 * Assign a new allocation value for this path in the aggregate link +	 * +	 * @param allocation Percentage of traffic to be sent over this path to a peer +	 */ +	inline void updateComponentAllocationOfAggregateLink(unsigned char allocation) { _lastAllocation = allocation; } + +	/** +	 * @return Percentage of traffic allocated to this path in the aggregate link +	 */ +	inline unsigned char allocation() { return _lastAllocation; } + +	/** +	 * @return Stability estimates can become expensive to compute, we cache the most recent result. +	 */ +	inline float lastComputedStability() { return _lastComputedStability; } + +	/** +	 * @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to +	 */ +	inline char *getName() { return _ifname; } + +	/** +	 * @return Packet delay variance +	 */ +	inline float packetDelayVariance() { return _lastComputedPacketDelayVariance; } + +	/** +	 * @return Previously-computed mean latency +	 */ +	inline float meanLatency() { return _lastComputedMeanLatency; } + +	/** +	 * @return Packet loss rate (PLR) +	 */ +	inline float packetLossRatio() { return _lastComputedPacketLossRatio; } + +	/** +	 * @return Packet error ratio (PER) +	 */ +	inline float packetErrorRatio() { return _lastComputedPacketErrorRatio; } + +	/** +	 * Record an invalid incoming packet. This packet failed MAC/compression/cipher checks and will now +	 * contribute to a Packet Error Ratio (PER). +	 */ +	inline void recordInvalidPacket() { _packetValiditySamples->push(false); } + +	/** +	 * @return A pointer to a cached copy of the address string for this Path (For debugging only) +	 */ +	inline char *getAddressString() { return _addrString; } + +	/** +	 * @return The current throughput disturbance coefficient +	 */ +	inline float throughputDisturbanceCoefficient() { return _lastComputedThroughputDistCoeff; } + +	/** +	 * Compute and cache stability and performance metrics. The resultant stability coefficient is a measure of how "well behaved" +	 * this path is. This figure is substantially different from (but required for the estimation of the path's overall "quality". +	 * +	 * @param now Current time +	 */ +	inline void processBackgroundPathMeasurements(int64_t now) { +		if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) { +			Mutex::Lock _l(_statistics_m); +			_lastPathQualityComputeTime = now; +			address().toString(_addrString); +			_lastComputedMeanLatency = _latencySamples->mean(); +			_lastComputedPacketDelayVariance = _latencySamples->stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689) +			_lastComputedMeanThroughput = (uint64_t)_throughputSamples->mean(); +			// If no packet validity samples, assume PER==0 +			_lastComputedPacketErrorRatio = 1 - (_packetValiditySamples->count() ? _packetValiditySamples->mean() : 1); +			// Compute path stability +			// Normalize measurements with wildly different ranges into a reasonable range +			float normalized_pdv = Utils::normalize(_lastComputedPacketDelayVariance, 0, ZT_PATH_MAX_PDV, 0, 10); +			float normalized_la = Utils::normalize(_lastComputedMeanLatency, 0, ZT_PATH_MAX_MEAN_LATENCY, 0, 10); +			float throughput_cv = _throughputSamples->mean() > 0 ? _throughputSamples->stddev() / _throughputSamples->mean() : 1; +			// Form an exponential cutoff and apply contribution weights +			float pdv_contrib = exp((-1)*normalized_pdv) * ZT_PATH_CONTRIB_PDV; +			float latency_contrib = exp((-1)*normalized_la) * ZT_PATH_CONTRIB_LATENCY; +			// Throughput Disturbance Coefficient +			float throughput_disturbance_contrib = exp((-1)*throughput_cv) * ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE; +			_throughputDisturbanceSamples->push(throughput_cv); +			_lastComputedThroughputDistCoeff = _throughputDisturbanceSamples->mean(); +			// Obey user-defined ignored contributions +			pdv_contrib = ZT_PATH_CONTRIB_PDV > 0.0 ? pdv_contrib : 1; +			latency_contrib = ZT_PATH_CONTRIB_LATENCY > 0.0 ? latency_contrib : 1; +			throughput_disturbance_contrib = ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE > 0.0 ? throughput_disturbance_contrib : 1; +			// Stability +			_lastComputedStability = pdv_contrib + latency_contrib + throughput_disturbance_contrib; +			_lastComputedStability *= 1 - _lastComputedPacketErrorRatio; +			// Prevent QoS records from sticking around for too long +			std::map<uint64_t,uint64_t>::iterator it = _outQoSRecords.begin(); +			while (it != _outQoSRecords.end()) { +				// Time since egress of tracked packet +				if ((now - it->second) >= ZT_PATH_QOS_TIMEOUT) { +					_outQoSRecords.erase(it++); +				} else { it++; } +			} +		} +	} + +	/**  	 * @return True if this path is alive (receiving heartbeats)  	 */  	inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); } @@ -265,15 +647,71 @@ public:  	 */  	inline int64_t lastTrustEstablishedPacketReceived() const { return _lastTrustEstablishedPacketReceived; } +	/** +	 * Initialize statistical buffers +	 */ +	inline void prepareBuffers() { +		_throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ); +		_latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ); +		_packetValiditySamples = new RingBuffer<bool>(ZT_PATH_QUALITY_METRIC_WIN_SZ); +		_throughputDisturbanceSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ); +		memset(_ifname, 0, 16); +		memset(_addrString, 0, sizeof(_addrString)); +	} +  private: +	Mutex _statistics_m; +  	volatile int64_t _lastOut;  	volatile int64_t _lastIn;  	volatile int64_t _lastTrustEstablishedPacketReceived; +	volatile int64_t _lastPathQualityComputeTime;  	int64_t _localSocket;  	volatile unsigned int _latency;  	InetAddress _addr;  	InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often  	AtomicCounter __refCount; + +	std::map<uint64_t, uint64_t> _outQoSRecords; // id:egress_time +	std::map<uint64_t, uint64_t> _inQoSRecords; // id:now +	std::map<uint64_t, uint16_t> _inACKRecords; // id:len + +	int64_t _lastAck; +	int64_t _lastThroughputEstimation; +	int64_t _lastQoSMeasurement; +	int64_t _lastQoSRecordPurge; + +	int64_t _unackedBytes; +	int64_t _expectingAckAsOf; +	int16_t _packetsReceivedSinceLastAck; +	int16_t _packetsReceivedSinceLastQoS; + +	uint64_t _maxLifetimeThroughput; +	uint64_t _lastComputedMeanThroughput; +	uint64_t _bytesAckedSinceLastThroughputEstimation; + +	float _lastComputedMeanLatency; +	float _lastComputedPacketDelayVariance; + +	float _lastComputedPacketErrorRatio; +	float _lastComputedPacketLossRatio; + +	// cached estimates +	float _lastComputedStability; +	float _lastComputedRelativeQuality; +	float _lastComputedThroughputDistCoeff; +	unsigned char _lastAllocation; + + + +	// cached human-readable strings for tracing purposes +	char _ifname[16]; +	char _addrString[256]; + +	RingBuffer<uint64_t> *_throughputSamples; +	RingBuffer<uint32_t> *_latencySamples; +	RingBuffer<bool> *_packetValiditySamples; +	RingBuffer<float> *_throughputDisturbanceSamples;  };  } // namespace ZeroTier | 
