From 6a2ba4baca326272c45930208b70cfedf8cb1638 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Tue, 1 May 2018 16:32:15 -0700 Subject: Introduced basic multipath support --- node/Path.hpp | 265 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 260 insertions(+), 5 deletions(-) (limited to 'node/Path.hpp') diff --git a/node/Path.hpp b/node/Path.hpp index e12328ff..5751d326 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -39,6 +39,9 @@ #include "SharedPtr.hpp" #include "AtomicCounter.hpp" #include "Utils.hpp" +#include "RingBuffer.hpp" + +#include "../osdep/Phy.hpp" /** * Maximum return value of preferenceRank() @@ -55,6 +58,7 @@ class RuntimeEnvironment; class Path { friend class SharedPtr; + Phy *_phy; public: /** @@ -93,22 +97,71 @@ public: _lastOut(0), _lastIn(0), _lastTrustEstablishedPacketReceived(0), + _lastPathQualityComputeTime(0), _localSocket(-1), _latency(0xffff), _addr(), - _ipScope(InetAddress::IP_SCOPE_NONE) + _ipScope(InetAddress::IP_SCOPE_NONE), + _currentPacketSampleCounter(0), + _meanPacketErrorRatio(0.0), + _meanLatency(0.0), + _lastLatencyUpdate(0), + _jitter(0.0), + _lastPathQualitySampleTime(0), + _lastComputedQuality(0.0), + _lastPathQualityEstimate(0), + _meanAge(0.0), + _meanThroughput(0.0), + _packetLossRatio(0) { + memset(_ifname, 0, sizeof(_ifname)); + memset(_addrString, 0, sizeof(_addrString)); + _throughputSamples = new RingBuffer(ZT_PATH_QUALITY_METRIC_WIN_SZ); + _ageSamples = new RingBuffer(ZT_PATH_QUALITY_METRIC_WIN_SZ); + _latencySamples = new RingBuffer(ZT_PATH_QUALITY_METRIC_WIN_SZ); + _errSamples = new RingBuffer(ZT_PATH_QUALITY_METRIC_WIN_SZ); } Path(const int64_t localSocket,const InetAddress &addr) : _lastOut(0), _lastIn(0), _lastTrustEstablishedPacketReceived(0), + _lastPathQualityComputeTime(0), _localSocket(localSocket), _latency(0xffff), _addr(addr), - _ipScope(addr.ipScope()) + _ipScope(addr.ipScope()), + _currentPacketSampleCounter(0), + _meanPacketErrorRatio(0.0), + _meanLatency(0.0), + _lastLatencyUpdate(0), + _jitter(0.0), + _lastPathQualitySampleTime(0), + _lastComputedQuality(0.0), + _lastPathQualityEstimate(0), + _meanAge(0.0), + _meanThroughput(0.0), + _packetLossRatio(0) + { + memset(_ifname, 0, sizeof(_ifname)); + memset(_addrString, 0, sizeof(_addrString)); + _throughputSamples = new RingBuffer(ZT_PATH_QUALITY_METRIC_WIN_SZ); + _ageSamples = new RingBuffer(ZT_PATH_QUALITY_METRIC_WIN_SZ); + _latencySamples = new RingBuffer(ZT_PATH_QUALITY_METRIC_WIN_SZ); + _errSamples = new RingBuffer(ZT_PATH_QUALITY_METRIC_WIN_SZ); + } + + ~Path() { + delete _throughputSamples; + delete _ageSamples; + delete _latencySamples; + delete _errSamples; + + _throughputSamples = NULL; + _ageSamples = NULL; + _latencySamples = NULL; + _errSamples = NULL; } /** @@ -147,12 +200,17 @@ public: * * @param l Measured latency */ - inline void updateLatency(const unsigned int l) + inline void updateLatency(const unsigned int l, int64_t now) { unsigned int pl = _latency; - if (pl < 0xffff) + if (pl < 0xffff) { _latency = (pl + l) / 2; - else _latency = l; + } + else { + _latency = l; + } + _lastLatencyUpdate = now; + _latencySamples->push(l); } /** @@ -240,11 +298,180 @@ public: return (((age < (ZT_PATH_HEARTBEAT_PERIOD + 5000)) ? l : (l + 0xffff + age)) * (long)((ZT_INETADDRESS_MAX_SCOPE - _ipScope) + 1)); } + /** + * @return An estimate of path quality -- higher is better. + */ + inline float computeQuality(const int64_t now) + { + float latency_contrib = _meanLatency ? 1.0 / _meanLatency : 0; + float jitter_contrib = _jitter ? 1.0 / _jitter : 0; + float throughput_contrib = _meanThroughput ? _meanThroughput / 1000000 : 0; // in Mbps + float age_contrib = _meanAge > 0 ? (float)sqrt(_meanAge) : 1; + float error_contrib = 1.0 - _meanPacketErrorRatio; + float sum = (latency_contrib + jitter_contrib + throughput_contrib + error_contrib) / age_contrib; + _lastComputedQuality = sum * (long)((_ipScope) + 1); + return _lastComputedQuality; + } + + /** + * Since quality estimates can become expensive we should cache the most recent result for traffic allocation + * algorithms which may need to reference this value multiple times through the course of their execution. + */ + inline float lastComputedQuality() { + return _lastComputedQuality; + } + + /** + * @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to + */ + inline char *getName() { return _ifname; } + + /** + * @return Estimated throughput in bps of this link + */ + inline uint64_t getThroughput() { return _phy->getThroughput((PhySocket *)((uintptr_t)_localSocket)); } + + /** + * @return Packet delay varience + */ + inline float jitter() { return _jitter; } + + /** + * @return Previously-computed mean latency + */ + inline float meanLatency() { return _meanLatency; } + + /** + * @return Packet loss rate + */ + inline float packetLossRatio() { return _packetLossRatio; } + + /** + * @return Mean packet error ratio + */ + inline float meanPacketErrorRatio() { return _meanPacketErrorRatio; } + + /** + * @return Current packet error ratio (possibly incomplete sample set) + */ + inline float currentPacketErrorRatio() { + int errorsPerSample = 0; + for (int i=0; i<_currentPacketSampleCounter; i++) { + if (_packetValidity[i] == false) { + errorsPerSample++; + } + } + return (float)errorsPerSample / (float)ZT_PATH_ERROR_SAMPLE_WIN_SZ; + } + + /** + * @return Whether the Path's local socket is in a CLOSED state + */ + inline bool isClosed() { return _phy->isClosed((PhySocket *)((uintptr_t)_localSocket)); } + + /** + * @return The state of a Path's local socket + */ + inline int getState() { return _phy->getState((PhySocket *)((uintptr_t)_localSocket)); } + + /** + * @return Whether this socket may have been erased by the virtual physical link layer + */ + inline bool isValidState() { return _phy->isValidState((PhySocket *)((uintptr_t)_localSocket)); } + + /** + * @return Whether the path quality monitors have collected enough data to provide a quality value + * TODO: expand this + */ + inline bool monitorsReady() { + return _latencySamples->count() && _ageSamples->count() && _throughputSamples->count(); + } + + /** + * @return A pointer to a cached copy of the address string for this Path (For debugging only) + */ + inline char *getAddressString() { return _addrString; } + + /** + * Handle path sampling, computation of quality estimates, and other periodic tasks + * @param now Current time + */ + inline void measureLink(int64_t now) { + // Sample path properties and store them in a continuously-revolving buffer + if (now - _lastPathQualitySampleTime > ZT_PATH_QUALITY_SAMPLE_INTERVAL) { + _lastPathQualitySampleTime = now; + _throughputSamples->push(getThroughput()); // Thoughtput in bits/s + _ageSamples->push(now - _lastIn); // Age (time since last received packet) + if (now - _lastLatencyUpdate > ZT_PATH_LATENCY_SAMPLE_INTERVAL) { + _lastLatencyUpdate = now; + // Record 0 bp/s. Since we're using this to detect possible packet loss + updateLatency(0, now); + } + } + // Compute statistical values for use in link quality estimates + if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) { + _lastPathQualityComputeTime = now; + // Cache Path address string + address().toString(_addrString); + _phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, ZT_PATH_INTERFACE_NAME_SZ); // Cache Interface name + // Derived values + if (_throughputSamples->count()) { + _packetLossRatio = (float)_throughputSamples->zeroCount() / (float)_throughputSamples->count(); + } + _meanThroughput = _throughputSamples->mean(); + _meanAge = _ageSamples->mean(); + _meanLatency = _latencySamples->mean(); + // Jitter + // SEE: RFC 3393, RFC 4689 + _jitter = _latencySamples->stddev(); + _meanPacketErrorRatio = _errSamples->mean(); // Packet Error Ratio (PER) + } + // Periodically compute a path quality estimate + if (now - _lastPathQualityEstimate > ZT_PATH_QUALITY_ESTIMATE_INTERVAL) { + computeQuality(now); + } + } + + /** + * Record whether a packet is considered invalid by MAC/compression/cipher checks. This + * could be an indication of a bit error. This function will keep a running counter of + * up to a given window size and with each counter overflow it will compute a mean error rate + * and store that in a continuously shifting sample window. + * + * @param isValid Whether the packet in question is considered invalid + */ + inline void recordPacket(bool isValid) { + if (_currentPacketSampleCounter < ZT_PATH_ERROR_SAMPLE_WIN_SZ) { + _packetValidity[_currentPacketSampleCounter] = isValid; + _currentPacketSampleCounter++; + } + else { + // Sample array is full, compute an mean and stick it in the ring buffer for trend analysis + _errSamples->push(currentPacketErrorRatio()); + _currentPacketSampleCounter=0; + } + } + + /** + * @return The mean age (in ms) of this link + */ + inline float meanAge() { return _meanAge; } + + /** + * @return The mean throughput (in bits/s) of this link + */ + inline float meanThroughput() { return _meanThroughput; } + /** * @return True if this path is alive (receiving heartbeats) */ inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); } + /** + * @return True if this path hasn't received a packet in a "significant" amount of time + */ + inline bool stale(const int64_t now) const { return ((now - _lastIn) > ZT_LINK_SPEED_TEST_INTERVAL * 10); } + /** * @return True if this path needs a heartbeat */ @@ -269,11 +496,39 @@ private: volatile int64_t _lastOut; volatile int64_t _lastIn; volatile int64_t _lastTrustEstablishedPacketReceived; + volatile int64_t _lastPathQualityComputeTime; int64_t _localSocket; volatile unsigned int _latency; InetAddress _addr; InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often AtomicCounter __refCount; + + // Packet Error Ratio (PER) + int _packetValidity[ZT_PATH_ERROR_SAMPLE_WIN_SZ]; + int _currentPacketSampleCounter; + volatile float _meanPacketErrorRatio; + + // Latency and Jitter + volatile float _meanLatency; + int64_t _lastLatencyUpdate; + volatile float _jitter; + + int64_t _lastPathQualitySampleTime; + float _lastComputedQuality; + int64_t _lastPathQualityEstimate; + float _meanAge; + float _meanThroughput; + + // Circular buffers used to efficiently store large time series + RingBuffer *_throughputSamples; + RingBuffer *_latencySamples; + RingBuffer *_ageSamples; + RingBuffer *_errSamples; + + float _packetLossRatio; + + char _ifname[ZT_PATH_INTERFACE_NAME_SZ]; + char _addrString[256]; }; } // namespace ZeroTier -- cgit v1.2.3