diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-08-08 13:21:10 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-08-08 13:21:10 -0700 |
commit | e3cf7567856267bc4f4af0f1fb857cab105602e8 (patch) | |
tree | 9282025e23ffb80c69c185ecc1d3389aa40d4d4f /node | |
parent | 7e6598e9ca28da7047176907f5cacbc53ab60afe (diff) | |
download | infinitytier-e3cf7567856267bc4f4af0f1fb857cab105602e8.tar.gz infinitytier-e3cf7567856267bc4f4af0f1fb857cab105602e8.zip |
Make rxQueue lock-free using an atomic counter ring buffer.
Diffstat (limited to 'node')
-rw-r--r-- | node/AtomicCounter.hpp | 9 | ||||
-rw-r--r-- | node/Switch.cpp | 36 | ||||
-rw-r--r-- | node/Switch.hpp | 27 |
3 files changed, 33 insertions, 39 deletions
diff --git a/node/AtomicCounter.hpp b/node/AtomicCounter.hpp index e1864db8..abb342fe 100644 --- a/node/AtomicCounter.hpp +++ b/node/AtomicCounter.hpp @@ -47,6 +47,15 @@ public: _v = 0; } + inline int load() const + { +#ifdef __GNUC__ + return __sync_or_and_fetch(&_v,0); +#else + return _v.load(); +#endif + } + inline int operator++() { #ifdef __GNUC__ diff --git a/node/Switch.cpp b/node/Switch.cpp index 9c9daac9..c509ef16 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -120,10 +120,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre // Total fragments must be more than 1, otherwise why are we // seeing a Packet::Fragment? - Mutex::Lock _l(_rxQueue_m); - RXQueueEntry *const rq = _findRXQueueEntry(now,fragmentPacketId); - - if ((!rq->timestamp)||(rq->packetId != fragmentPacketId)) { + RXQueueEntry *const rq = _findRXQueueEntry(fragmentPacketId); + if (rq->packetId != fragmentPacketId) { // No packet found, so we received a fragment without its head. rq->timestamp = now; @@ -250,10 +248,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre ((uint64_t)reinterpret_cast<const uint8_t *>(data)[7]) ); - Mutex::Lock _l(_rxQueue_m); - RXQueueEntry *const rq = _findRXQueueEntry(now,packetId); - - if ((!rq->timestamp)||(rq->packetId != packetId)) { + RXQueueEntry *const rq = _findRXQueueEntry(packetId); + if (rq->packetId != packetId) { // If we have no other fragments yet, create an entry and save the head rq->timestamp = now; @@ -286,14 +282,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre // Packet is unfragmented, so just process it IncomingPacket packet(data,len,path,now); if (!packet.tryDecode(RR,tPtr)) { - Mutex::Lock _l(_rxQueue_m); - RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]); - unsigned long i = ZT_RX_QUEUE_SIZE - 1; - while ((i)&&(rq->timestamp)) { - RXQueueEntry *tmp = &(_rxQueue[--i]); - if (tmp->timestamp < rq->timestamp) - rq = tmp; - } + RXQueueEntry *const rq = _nextRXQueueEntry(); rq->timestamp = now; rq->packetId = packet.packetId(); rq->frag0 = packet; @@ -590,15 +579,12 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer) _outstandingWhoisRequests.erase(peer->address()); } - { // finish processing any packets waiting on peer's public key / identity - Mutex::Lock _l(_rxQueue_m); - unsigned long i = ZT_RX_QUEUE_SIZE; - while (i) { - RXQueueEntry *rq = &(_rxQueue[--i]); - if ((rq->timestamp)&&(rq->complete)) { - if (rq->frag0.tryDecode(RR,tPtr)) - rq->timestamp = 0; - } + // finish processing any packets waiting on peer's public key / identity + for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) { + RXQueueEntry *const rq = &(_rxQueue[ptr]); + if ((rq->timestamp)&&(rq->complete)) { + if (rq->frag0.tryDecode(RR,tPtr)) + rq->timestamp = 0; } } diff --git a/node/Switch.hpp b/node/Switch.hpp index 346aaca3..114bc5e1 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -169,25 +169,24 @@ private: bool complete; // if true, packet is complete }; RXQueueEntry _rxQueue[ZT_RX_QUEUE_SIZE]; - Mutex _rxQueue_m; + AtomicCounter _rxQueuePtr; - /* Returns the matching or oldest entry. Caller must check timestamp and - * packet ID to determine which. */ - inline RXQueueEntry *_findRXQueueEntry(uint64_t now,uint64_t packetId) + // Returns matching or next available RX queue entry + inline RXQueueEntry *_findRXQueueEntry(uint64_t packetId) { - RXQueueEntry *rq; - RXQueueEntry *oldest = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]); - unsigned long i = ZT_RX_QUEUE_SIZE; - while (i) { - rq = &(_rxQueue[--i]); + unsigned int ptr = static_cast<unsigned int>(_rxQueuePtr.load()); + for(unsigned int k=0;k<ZT_RX_QUEUE_SIZE;++k) { + RXQueueEntry *rq = &(_rxQueue[--ptr % ZT_RX_QUEUE_SIZE]); if ((rq->packetId == packetId)&&(rq->timestamp)) return rq; - if ((now - rq->timestamp) >= ZT_RX_QUEUE_EXPIRE) - rq->timestamp = 0; - if (rq->timestamp < oldest->timestamp) - oldest = rq; } - return oldest; + return &(_rxQueue[static_cast<unsigned int>(++_rxQueuePtr) % ZT_RX_QUEUE_SIZE]); + } + + // Returns next RX queue entry in ring buffer and increments ring counter + inline RXQueueEntry *_nextRXQueueEntry() + { + return &(_rxQueue[static_cast<unsigned int>(++_rxQueuePtr) % ZT_RX_QUEUE_SIZE]); } // ZeroTier-layer TX queue entry |