From e3cf7567856267bc4f4af0f1fb857cab105602e8 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 8 Aug 2017 13:21:10 -0700 Subject: Make rxQueue lock-free using an atomic counter ring buffer. --- node/Switch.cpp | 36 +++++++++++------------------------- 1 file changed, 11 insertions(+), 25 deletions(-) (limited to 'node/Switch.cpp') diff --git a/node/Switch.cpp b/node/Switch.cpp index 9c9daac9..c509ef16 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -120,10 +120,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre // Total fragments must be more than 1, otherwise why are we // seeing a Packet::Fragment? - Mutex::Lock _l(_rxQueue_m); - RXQueueEntry *const rq = _findRXQueueEntry(now,fragmentPacketId); - - if ((!rq->timestamp)||(rq->packetId != fragmentPacketId)) { + RXQueueEntry *const rq = _findRXQueueEntry(fragmentPacketId); + if (rq->packetId != fragmentPacketId) { // No packet found, so we received a fragment without its head. rq->timestamp = now; @@ -250,10 +248,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre ((uint64_t)reinterpret_cast(data)[7]) ); - Mutex::Lock _l(_rxQueue_m); - RXQueueEntry *const rq = _findRXQueueEntry(now,packetId); - - if ((!rq->timestamp)||(rq->packetId != packetId)) { + RXQueueEntry *const rq = _findRXQueueEntry(packetId); + if (rq->packetId != packetId) { // If we have no other fragments yet, create an entry and save the head rq->timestamp = now; @@ -286,14 +282,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre // Packet is unfragmented, so just process it IncomingPacket packet(data,len,path,now); if (!packet.tryDecode(RR,tPtr)) { - Mutex::Lock _l(_rxQueue_m); - RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]); - unsigned long i = ZT_RX_QUEUE_SIZE - 1; - while ((i)&&(rq->timestamp)) { - RXQueueEntry *tmp = &(_rxQueue[--i]); - if (tmp->timestamp < rq->timestamp) - rq = tmp; - } + RXQueueEntry *const rq = _nextRXQueueEntry(); rq->timestamp = now; rq->packetId = packet.packetId(); rq->frag0 = packet; @@ -590,15 +579,12 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr &peer) _outstandingWhoisRequests.erase(peer->address()); } - { // finish processing any packets waiting on peer's public key / identity - Mutex::Lock _l(_rxQueue_m); - unsigned long i = ZT_RX_QUEUE_SIZE; - while (i) { - RXQueueEntry *rq = &(_rxQueue[--i]); - if ((rq->timestamp)&&(rq->complete)) { - if (rq->frag0.tryDecode(RR,tPtr)) - rq->timestamp = 0; - } + // finish processing any packets waiting on peer's public key / identity + for(unsigned int ptr=0;ptrtimestamp)&&(rq->complete)) { + if (rq->frag0.tryDecode(RR,tPtr)) + rq->timestamp = 0; } } -- cgit v1.2.3