diff options
Diffstat (limited to 'node/Path.hpp')
| -rw-r--r-- | node/Path.hpp | 458 |
1 files changed, 279 insertions, 179 deletions
diff --git a/node/Path.hpp b/node/Path.hpp index e6bcecf0..0278d919 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -102,24 +102,23 @@ public: _latency(0xffff), _addr(), _ipScope(InetAddress::IP_SCOPE_NONE), - _currentPacketSampleCounter(0), - _meanPacketErrorRatio(0.0), - _meanLatency(0.0), - _lastLatencyUpdate(0), - _jitter(0.0), - _lastPathQualitySampleTime(0), - _lastComputedQuality(0.0), - _lastPathQualityEstimate(0), - _meanAge(0.0), + _lastAck(0), + _lastThroughputEstimation(0), + _lastQoSMeasurement(0), + _unackedBytes(0), + _expectingAckAsOf(0), + _packetsReceivedSinceLastAck(0), _meanThroughput(0.0), - _packetLossRatio(0) + _maxLifetimeThroughput(0), + _bytesAckedSinceLastThroughputEstimation(0), + _meanLatency(0.0), + _packetDelayVariance(0.0), + _packetErrorRatio(0.0), + _packetLossRatio(0), + _lastComputedStability(0.0), + _lastComputedRelativeQuality(0) { - memset(_ifname, 0, sizeof(_ifname)); - memset(_addrString, 0, sizeof(_addrString)); - _throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ); - _ageSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ); - _latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ); - _errSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ); + prepareBuffers(); } Path(const int64_t localSocket,const InetAddress &addr) : @@ -131,37 +130,35 @@ public: _latency(0xffff), _addr(addr), _ipScope(addr.ipScope()), - _currentPacketSampleCounter(0), - _meanPacketErrorRatio(0.0), - _meanLatency(0.0), - _lastLatencyUpdate(0), - _jitter(0.0), - _lastPathQualitySampleTime(0), - _lastComputedQuality(0.0), - _lastPathQualityEstimate(0), - _meanAge(0.0), + _lastAck(0), + _lastThroughputEstimation(0), + _lastQoSMeasurement(0), + _unackedBytes(0), + _expectingAckAsOf(0), + _packetsReceivedSinceLastAck(0), _meanThroughput(0.0), - _packetLossRatio(0) + _maxLifetimeThroughput(0), + _bytesAckedSinceLastThroughputEstimation(0), + _meanLatency(0.0), + _packetDelayVariance(0.0), + _packetErrorRatio(0.0), + _packetLossRatio(0), + _lastComputedStability(0.0), + _lastComputedRelativeQuality(0) { - memset(_ifname, 0, sizeof(_ifname)); - memset(_addrString, 0, sizeof(_addrString)); - _throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ); - _ageSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ); - _latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ); - _errSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ); + prepareBuffers(); } ~Path() { delete _throughputSamples; - delete _ageSamples; delete _latencySamples; - delete _errSamples; - + delete _qualitySamples; + delete _packetValiditySamples; _throughputSamples = NULL; - _ageSamples = NULL; _latencySamples = NULL; - _errSamples = NULL; + _qualitySamples = NULL; + _packetValiditySamples = NULL; } /** @@ -209,7 +206,6 @@ public: else { _latency = l; } - _lastLatencyUpdate = now; _latencySamples->push(l); } @@ -299,194 +295,273 @@ public: } /** - * @return An estimate of path quality -- higher is better. + * Take note that we're expecting a VERB_ACK on this path as of a specific time + * + * @param now Current time + * @param packetId ID of the packet + * @param payloadLength Number of bytes we're is expecting a reply to */ - inline float computeQuality(const int64_t now) + inline void expectingAck(int64_t now, int64_t packetId, uint16_t payloadLength) { - float latency_contrib = _meanLatency ? (float)1.0 / _meanLatency : 0; - float jitter_contrib = _jitter ? (float)1.0 / _jitter : 0; - float throughput_contrib = _meanThroughput ? _meanThroughput / 1000000 : 0; // in Mbps - float age_contrib = _meanAge > 0 ? (float)sqrt(_meanAge) : 1; - float error_contrib = (float)1.0 - _meanPacketErrorRatio; - float sum = (latency_contrib + jitter_contrib + throughput_contrib + error_contrib) / age_contrib; - _lastComputedQuality = sum * (long)((_ipScope) + 1); - return _lastComputedQuality; + _expectingAckAsOf = ackAge(now) > ZT_PATH_ACK_INTERVAL ? _expectingAckAsOf : now; + _unackedBytes += payloadLength; + _outgoingPacketRecords[packetId] = now; } /** - * Since quality estimates can become expensive we should cache the most recent result for traffic allocation - * algorithms which may need to reference this value multiple times through the course of their execution. + * Record that we've received a VERB_ACK on this path, also compute throughput if required. + * + * @param now Current time + * @param ackedBytes Number of bytes awknowledged by other peer */ - inline float lastComputedQuality() { - return _lastComputedQuality; + inline void receivedAck(int64_t now, int32_t ackedBytes) + { + _expectingAckAsOf = 0; + _unackedBytes = (ackedBytes > _unackedBytes) ? 0 : _unackedBytes - ackedBytes; + int64_t timeSinceThroughputEstimate = (now - _lastThroughputEstimation); + if (timeSinceThroughputEstimate >= ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL) { + uint64_t throughput = (float)(_bytesAckedSinceLastThroughputEstimation) / ((float)timeSinceThroughputEstimate / (float)1000); + _throughputSamples->push(throughput); + _maxLifetimeThroughput = throughput > _maxLifetimeThroughput ? throughput : _maxLifetimeThroughput; + _lastThroughputEstimation = now; + _bytesAckedSinceLastThroughputEstimation = 0; + } else { + _bytesAckedSinceLastThroughputEstimation += ackedBytes; + } } /** - * @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to + * @return Number of bytes this peer is responsible for ACKing since last ACK */ - inline char *getName() { return _ifname; } + inline int32_t bytesToAck() + { + int32_t bytesToAck = 0; + for (int i=0; i<_packetsReceivedSinceLastAck; i++) { + bytesToAck += _recorded_len[i]; + } + return bytesToAck; + } /** - * @return Estimated throughput in bps of this link + * @return Number of bytes thusfar sent that have not been awknowledged by the remote peer */ - inline uint64_t getThroughput() { return _phy->getThroughput((PhySocket *)((uintptr_t)_localSocket)); } + inline int64_t unackedSentBytes() + { + return _unackedBytes; + } /** - * @return Packet delay varience + * Account for the fact that an ACK was just sent. Reset counters, timers, and clear statistics buffers + * + * @param Current time */ - inline float jitter() { return _jitter; } + inline void sentAck(int64_t now) + { + memset(_recorded_id, 0, sizeof(_recorded_id)); + memset(_recorded_ts, 0, sizeof(_recorded_ts)); + memset(_recorded_len, 0, sizeof(_recorded_len)); + _packetsReceivedSinceLastAck = 0; + _lastAck = now; + } /** - * @return Previously-computed mean latency + * Receive QoS data, match with recorded egress times from this peer, compute latency + * estimates. + * + * @param now Current time + * @param count Number of records + * @param rx_id table of packet IDs + * @param rx_ts table of holding times */ - inline float meanLatency() { return _meanLatency; } + inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint8_t *rx_ts) + { + // Look up egress times and compute latency values for each record + for (int j=0; j<count; j++) { + std::map<uint64_t,uint64_t>::iterator it = _outgoingPacketRecords.find(rx_id[j]); + if (it != _outgoingPacketRecords.end()) { + uint16_t rtt = (uint16_t)(now - it->second); + uint16_t rtt_compensated = rtt - rx_ts[j]; + float latency = rtt_compensated / 2.0; + updateLatency(latency, now); + _outgoingPacketRecords.erase(it); + } + } + } /** - * @return Packet loss rate + * Generate the contents of a VERB_QOS_MEASUREMENT packet. + * + * @param now Current time + * @param qosBuffer destination buffer + * @return Size of payload */ - inline float packetLossRatio() { return _packetLossRatio; } + inline int32_t generateQoSPacket(int64_t now, char *qosBuffer) + { + int32_t len = 0; + for (int i=0; i<_packetsReceivedSinceLastAck; i++) { + uint64_t id = _recorded_id[i]; + memcpy(qosBuffer, &id, sizeof(uint64_t)); + qosBuffer+=sizeof(uint64_t); + uint8_t holdingTime = (uint8_t)(now - _recorded_ts[i]); + memcpy(qosBuffer, &holdingTime, sizeof(uint8_t)); + qosBuffer+=sizeof(uint8_t); + len+=sizeof(uint64_t)+sizeof(uint8_t); + } + return len; + } /** - * @return Mean packet error ratio + * Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers. + * + * @param Current time */ - inline float meanPacketErrorRatio() { return _meanPacketErrorRatio; } + inline void sentQoS(int64_t now) { _lastQoSMeasurement = now; } /** - * @return Current packet error ratio (possibly incomplete sample set) + * Record statistics on incoming packets. Used later to estimate QoS. + * + * @param now Current time + * @param packetId + * @param payloadLength */ - inline float currentPacketErrorRatio() { - int errorsPerSample = 0; - for (int i=0; i<_currentPacketSampleCounter; i++) { - if (_packetValidity[i] == false) { - errorsPerSample++; - } - } - return (float)errorsPerSample / (float)ZT_PATH_ERROR_SAMPLE_WIN_SZ; + inline void recordIncomingPacket(int64_t now, int64_t packetId, int32_t payloadLength) + { + _recorded_ts[_packetsReceivedSinceLastAck] = now; + _recorded_id[_packetsReceivedSinceLastAck] = packetId; + _recorded_len[_packetsReceivedSinceLastAck] = payloadLength; + _packetsReceivedSinceLastAck++; + _packetValiditySamples->push(true); } /** - * @return Whether the Path's local socket is in a CLOSED state + * @param now Current time + * @return Whether an ACK (VERB_ACK) packet needs to be emitted at this time */ - inline bool isClosed() { return _phy->isClosed((PhySocket *)((uintptr_t)_localSocket)); } + inline bool needsToSendAck(int64_t now) { + return ((now - _lastAck) >= ZT_PATH_ACK_INTERVAL || + (_packetsReceivedSinceLastAck == ZT_PATH_QOS_TABLE_SIZE)) && _packetsReceivedSinceLastAck; + } /** - * @return The state of a Path's local socket + * @param now Current time + * @return Whether a QoS (VERB_QOS_MEASUREMENT) packet needs to be emitted at this time */ - inline int getState() { return _phy->getState((PhySocket *)((uintptr_t)_localSocket)); } + inline bool needsToSendQoS(int64_t now) { + return ((_packetsReceivedSinceLastAck >= ZT_PATH_QOS_TABLE_SIZE) || + ((now - _lastQoSMeasurement) > ZT_PATH_QOS_INTERVAL)) && _packetsReceivedSinceLastAck; + } /** - * @return Whether this socket may have been erased by the virtual physical link layer + * How much time has elapsed since we've been expecting a VERB_ACK on this path. This value + * is used to determine a more relevant path "age". This lets us penalize paths which are no + * longer ACKing, but not those that simple aren't being used to carry traffic at the + * current time. */ - inline bool isValidState() { return _phy->isValidState((PhySocket *)((uintptr_t)_localSocket)); } + inline int64_t ackAge(int64_t now) { return _expectingAckAsOf ? now - _expectingAckAsOf : 0; } /** - * @return Whether the path quality monitors have collected enough data to provide a quality value - * TODO: expand this + * The maximum observed throughput for this path */ - inline bool monitorsReady() { - return _latencySamples->count() && _ageSamples->count() && _throughputSamples->count(); - } + inline uint64_t maxLifetimeThroughput() { return _maxLifetimeThroughput; } /** - * @return A pointer to a cached copy of the address string for this Path (For debugging only) + * @return The mean throughput (in bits/s) of this link */ - inline char *getAddressString() { return _addrString; } + inline float meanThroughput() { return _meanThroughput; } /** - * Handle path sampling, computation of quality estimates, and other periodic tasks - * @param now Current time + * Assign a new relative quality value for this path in the aggregate link + * + * @param rq Quality of this path in comparison to other paths available to this peer */ - inline void measureLink(int64_t now) { - // Sample path properties and store them in a continuously-revolving buffer - if (now - _lastPathQualitySampleTime > ZT_PATH_QUALITY_SAMPLE_INTERVAL) { - _lastPathQualitySampleTime = now; - _throughputSamples->push(getThroughput()); // Thoughtput in bits/s - _ageSamples->push(now - _lastIn); // Age (time since last received packet) - if (now - _lastLatencyUpdate > ZT_PATH_LATENCY_SAMPLE_INTERVAL) { - _lastLatencyUpdate = now; - // Record 0 bp/s. Since we're using this to detect possible packet loss - updateLatency(0, now); - } - } - // Compute statistical values for use in link quality estimates - if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) { - _lastPathQualityComputeTime = now; - // Cache Path address string - address().toString(_addrString); - _phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, ZT_PATH_INTERFACE_NAME_SZ); // Cache Interface name - // Derived values - if (_throughputSamples->count()) { - _packetLossRatio = (float)_throughputSamples->zeroCount() / (float)_throughputSamples->count(); - } - _meanThroughput = _throughputSamples->mean(); - _meanAge = _ageSamples->mean(); - _meanLatency = _latencySamples->mean(); - // Jitter - // SEE: RFC 3393, RFC 4689 - _jitter = _latencySamples->stddev(); - _meanPacketErrorRatio = _errSamples->mean(); // Packet Error Ratio (PER) - } - // Periodically compute a path quality estimate - if (now - _lastPathQualityEstimate > ZT_PATH_QUALITY_ESTIMATE_INTERVAL) { - computeQuality(now); - } - } + inline void updateRelativeQuality(float rq) { _lastComputedRelativeQuality = rq; } /** - * @param buf Buffer to store resultant string - * @return Description of path, in ASCII string format + * @return Quality of this path compared to others in the aggregate link */ - inline char *toString(char *buf) { - sprintf(buf,"%6s, q=%8.3f, %5.3f Mb/s, j=%8.2f, ml=%8.2f, meanAge=%8.2f, addr=%45s", - getName(), - lastComputedQuality(), - (float)meanThroughput() / (float)1000000, - jitter(), - meanLatency(), - meanAge(), - getAddressString()); - return buf; - } + inline float relativeQuality() { return _lastComputedRelativeQuality; } /** - * Record whether a packet is considered invalid by MAC/compression/cipher checks. This - * could be an indication of a bit error. This function will keep a running counter of - * up to a given window size and with each counter overflow it will compute a mean error rate - * and store that in a continuously shifting sample window. - * - * @param isValid Whether the packet in question is considered invalid + * @return Stability estimates can become expensive to compute, we cache the most recent result. */ - inline void recordPacket(bool isValid) { - if (_currentPacketSampleCounter < ZT_PATH_ERROR_SAMPLE_WIN_SZ) { - _packetValidity[_currentPacketSampleCounter] = isValid; - _currentPacketSampleCounter++; - } - else { - // Sample array is full, compute an mean and stick it in the ring buffer for trend analysis - _errSamples->push(currentPacketErrorRatio()); - _currentPacketSampleCounter=0; - } - } + inline float lastComputedStability() { return _lastComputedStability; } /** - * @return The mean age (in ms) of this link + * @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to */ - inline float meanAge() { return _meanAge; } + inline char *getName() { return _ifname; } /** - * @return The mean throughput (in bits/s) of this link + * @return Packet delay varience */ - inline float meanThroughput() { return _meanThroughput; } + inline float packetDelayVariance() { return _packetDelayVariance; } /** - * @return True if this path is alive (receiving heartbeats) + * @return Previously-computed mean latency */ - inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); } + inline float meanLatency() { return _meanLatency; } + + /** + * @return Packet loss rate (PLR) + */ + inline float packetLossRatio() { return _packetLossRatio; } + + /** + * @return Packet error ratio (PER) + */ + inline float packetErrorRatio() { return _packetErrorRatio; } + + /** + * Record an invalid incoming packet. This packet failed MAC/compression/cipher checks and will now + * contribute to a Packet Error Ratio (PER). + */ + inline void recordInvalidPacket() { _packetValiditySamples->push(false); } /** - * @return True if this path hasn't received a packet in a "significant" amount of time + * @return A pointer to a cached copy of the address string for this Path (For debugging only) */ - inline bool stale(const int64_t now) const { return ((now - _lastIn) > ZT_LINK_SPEED_TEST_INTERVAL * 10); } + inline char *getAddressString() { return _addrString; } + + /** + * Compute and cache stability and performance metrics. The resultant stability coefficint is a measure of how "well behaved" + * this path is. This figure is substantially different from (but required for the estimation of the path's overall "quality". + * + * @param now Current time + */ + inline void processBackgroundPathMeasurements(int64_t now, const int64_t peerId) { + if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) { + _lastPathQualityComputeTime = now; + _phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, 16); + address().toString(_addrString); + _meanThroughput = _throughputSamples->mean(); + _meanLatency = _latencySamples->mean(); + _packetDelayVariance = _latencySamples->stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689) + // If no packet validity samples, assume PER==0 + _packetErrorRatio = 1 - (_packetValiditySamples->count() ? _packetValiditySamples->mean() : 1); + // Compute path stability + // Normalize measurements with wildly different ranges into a reasonable range + float normalized_pdv = Utils::normalize(_packetDelayVariance, 0, ZT_PATH_MAX_PDV, 0, 10); + float normalized_la = Utils::normalize(_meanLatency, 0, ZT_PATH_MAX_MEAN_LATENCY, 0, 10); + float throughput_cv = _throughputSamples->mean() > 0 ? _throughputSamples->stddev() / _throughputSamples->mean() : 1; + // Form an exponential cutoff and apply contribution weights + float pdv_contrib = exp((-1)*normalized_pdv) * ZT_PATH_CONTRIB_PDV; + float latency_contrib = exp((-1)*normalized_la) * ZT_PATH_CONTRIB_LATENCY; + float throughput_disturbance_contrib = exp((-1)*throughput_cv) * ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE; + // Obey user-defined ignored contributions + pdv_contrib = ZT_PATH_CONTRIB_PDV > 0.0 ? pdv_contrib : 1; + latency_contrib = ZT_PATH_CONTRIB_LATENCY > 0.0 ? latency_contrib : 1; + throughput_disturbance_contrib = ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE > 0.0 ? throughput_disturbance_contrib : 1; + // Compute the quality product + _lastComputedStability = pdv_contrib + latency_contrib + throughput_disturbance_contrib; + _lastComputedStability *= 1 - _packetErrorRatio; + _qualitySamples->push(_lastComputedStability); + } + } + + /** + * @return True if this path is alive (receiving heartbeats) + */ + inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); } /** * @return True if this path needs a heartbeat @@ -508,6 +583,21 @@ public: */ inline int64_t lastTrustEstablishedPacketReceived() const { return _lastTrustEstablishedPacketReceived; } + /** + * Initialize statistical buffers + */ + inline void prepareBuffers() { + _throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ); + _latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ); + _qualitySamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ); + _packetValiditySamples = new RingBuffer<bool>(ZT_PATH_QUALITY_METRIC_WIN_SZ); + memset(_ifname, 0, 16); + memset(_recorded_id, 0, sizeof(_recorded_id)); + memset(_recorded_ts, 0, sizeof(_recorded_ts)); + memset(_recorded_len, 0, sizeof(_recorded_len)); + memset(_addrString, 0, sizeof(_addrString)); + } + private: volatile int64_t _lastOut; volatile int64_t _lastIn; @@ -519,32 +609,42 @@ private: InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often AtomicCounter __refCount; - // Packet Error Ratio (PER) - int _packetValidity[ZT_PATH_ERROR_SAMPLE_WIN_SZ]; - int _currentPacketSampleCounter; - volatile float _meanPacketErrorRatio; + uint64_t _recorded_id[ZT_PATH_QOS_TABLE_SIZE]; + uint64_t _recorded_ts[ZT_PATH_QOS_TABLE_SIZE]; + uint16_t _recorded_len[ZT_PATH_QOS_TABLE_SIZE]; - // Latency and Jitter - volatile float _meanLatency; - int64_t _lastLatencyUpdate; - volatile float _jitter; + std::map<uint64_t, uint64_t> _outgoingPacketRecords; + + int64_t _lastAck; + int64_t _lastThroughputEstimation; + int64_t _lastQoSMeasurement; + + int64_t _unackedBytes; + int64_t _expectingAckAsOf; + int16_t _packetsReceivedSinceLastAck; - int64_t _lastPathQualitySampleTime; - float _lastComputedQuality; - int64_t _lastPathQualityEstimate; - float _meanAge; float _meanThroughput; + uint64_t _maxLifetimeThroughput; + uint64_t _bytesAckedSinceLastThroughputEstimation; - // Circular buffers used to efficiently store large time series - RingBuffer<uint64_t> *_throughputSamples; - RingBuffer<uint32_t> *_latencySamples; - RingBuffer<uint64_t> *_ageSamples; - RingBuffer<float> *_errSamples; + volatile float _meanLatency; + float _packetDelayVariance; + float _packetErrorRatio; float _packetLossRatio; - char _ifname[ZT_PATH_INTERFACE_NAME_SZ]; + // cached estimates + float _lastComputedStability; + float _lastComputedRelativeQuality; + + // cached human-readable strings for tracing purposes + char _ifname[16]; char _addrString[256]; + + RingBuffer<uint64_t> *_throughputSamples; + RingBuffer<uint32_t> *_latencySamples; + RingBuffer<float> *_qualitySamples; + RingBuffer<bool> *_packetValiditySamples; }; } // namespace ZeroTier |
