diff options
| author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-08-08 13:21:10 -0700 |
|---|---|---|
| committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-08-08 13:21:10 -0700 |
| commit | e3cf7567856267bc4f4af0f1fb857cab105602e8 (patch) | |
| tree | 9282025e23ffb80c69c185ecc1d3389aa40d4d4f /node/Switch.cpp | |
| parent | 7e6598e9ca28da7047176907f5cacbc53ab60afe (diff) | |
| download | infinitytier-e3cf7567856267bc4f4af0f1fb857cab105602e8.tar.gz infinitytier-e3cf7567856267bc4f4af0f1fb857cab105602e8.zip | |
Make rxQueue lock-free using an atomic counter ring buffer.
Diffstat (limited to 'node/Switch.cpp')
| -rw-r--r-- | node/Switch.cpp | 36 |
1 files changed, 11 insertions, 25 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp index 9c9daac9..c509ef16 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -120,10 +120,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre // Total fragments must be more than 1, otherwise why are we // seeing a Packet::Fragment? - Mutex::Lock _l(_rxQueue_m); - RXQueueEntry *const rq = _findRXQueueEntry(now,fragmentPacketId); - - if ((!rq->timestamp)||(rq->packetId != fragmentPacketId)) { + RXQueueEntry *const rq = _findRXQueueEntry(fragmentPacketId); + if (rq->packetId != fragmentPacketId) { // No packet found, so we received a fragment without its head. rq->timestamp = now; @@ -250,10 +248,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre ((uint64_t)reinterpret_cast<const uint8_t *>(data)[7]) ); - Mutex::Lock _l(_rxQueue_m); - RXQueueEntry *const rq = _findRXQueueEntry(now,packetId); - - if ((!rq->timestamp)||(rq->packetId != packetId)) { + RXQueueEntry *const rq = _findRXQueueEntry(packetId); + if (rq->packetId != packetId) { // If we have no other fragments yet, create an entry and save the head rq->timestamp = now; @@ -286,14 +282,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre // Packet is unfragmented, so just process it IncomingPacket packet(data,len,path,now); if (!packet.tryDecode(RR,tPtr)) { - Mutex::Lock _l(_rxQueue_m); - RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]); - unsigned long i = ZT_RX_QUEUE_SIZE - 1; - while ((i)&&(rq->timestamp)) { - RXQueueEntry *tmp = &(_rxQueue[--i]); - if (tmp->timestamp < rq->timestamp) - rq = tmp; - } + RXQueueEntry *const rq = _nextRXQueueEntry(); rq->timestamp = now; rq->packetId = packet.packetId(); rq->frag0 = packet; @@ -590,15 +579,12 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer) _outstandingWhoisRequests.erase(peer->address()); } - { // finish processing any packets waiting on peer's public key / identity - Mutex::Lock _l(_rxQueue_m); - unsigned long i = ZT_RX_QUEUE_SIZE; - while (i) { - RXQueueEntry *rq = &(_rxQueue[--i]); - if ((rq->timestamp)&&(rq->complete)) { - if (rq->frag0.tryDecode(RR,tPtr)) - rq->timestamp = 0; - } + // finish processing any packets waiting on peer's public key / identity + for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) { + RXQueueEntry *const rq = &(_rxQueue[ptr]); + if ((rq->timestamp)&&(rq->complete)) { + if (rq->frag0.tryDecode(RR,tPtr)) + rq->timestamp = 0; } } |
