summaryrefslogtreecommitdiff
path: root/node/Switch.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2017-08-08 13:21:10 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2017-08-08 13:21:10 -0700
commite3cf7567856267bc4f4af0f1fb857cab105602e8 (patch)
tree9282025e23ffb80c69c185ecc1d3389aa40d4d4f /node/Switch.cpp
parent7e6598e9ca28da7047176907f5cacbc53ab60afe (diff)
downloadinfinitytier-e3cf7567856267bc4f4af0f1fb857cab105602e8.tar.gz
infinitytier-e3cf7567856267bc4f4af0f1fb857cab105602e8.zip
Make rxQueue lock-free using an atomic counter ring buffer.
Diffstat (limited to 'node/Switch.cpp')
-rw-r--r--node/Switch.cpp36
1 files changed, 11 insertions, 25 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 9c9daac9..c509ef16 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -120,10 +120,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
// Total fragments must be more than 1, otherwise why are we
// seeing a Packet::Fragment?
- Mutex::Lock _l(_rxQueue_m);
- RXQueueEntry *const rq = _findRXQueueEntry(now,fragmentPacketId);
-
- if ((!rq->timestamp)||(rq->packetId != fragmentPacketId)) {
+ RXQueueEntry *const rq = _findRXQueueEntry(fragmentPacketId);
+ if (rq->packetId != fragmentPacketId) {
// No packet found, so we received a fragment without its head.
rq->timestamp = now;
@@ -250,10 +248,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
((uint64_t)reinterpret_cast<const uint8_t *>(data)[7])
);
- Mutex::Lock _l(_rxQueue_m);
- RXQueueEntry *const rq = _findRXQueueEntry(now,packetId);
-
- if ((!rq->timestamp)||(rq->packetId != packetId)) {
+ RXQueueEntry *const rq = _findRXQueueEntry(packetId);
+ if (rq->packetId != packetId) {
// If we have no other fragments yet, create an entry and save the head
rq->timestamp = now;
@@ -286,14 +282,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
// Packet is unfragmented, so just process it
IncomingPacket packet(data,len,path,now);
if (!packet.tryDecode(RR,tPtr)) {
- Mutex::Lock _l(_rxQueue_m);
- RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]);
- unsigned long i = ZT_RX_QUEUE_SIZE - 1;
- while ((i)&&(rq->timestamp)) {
- RXQueueEntry *tmp = &(_rxQueue[--i]);
- if (tmp->timestamp < rq->timestamp)
- rq = tmp;
- }
+ RXQueueEntry *const rq = _nextRXQueueEntry();
rq->timestamp = now;
rq->packetId = packet.packetId();
rq->frag0 = packet;
@@ -590,15 +579,12 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
_outstandingWhoisRequests.erase(peer->address());
}
- { // finish processing any packets waiting on peer's public key / identity
- Mutex::Lock _l(_rxQueue_m);
- unsigned long i = ZT_RX_QUEUE_SIZE;
- while (i) {
- RXQueueEntry *rq = &(_rxQueue[--i]);
- if ((rq->timestamp)&&(rq->complete)) {
- if (rq->frag0.tryDecode(RR,tPtr))
- rq->timestamp = 0;
- }
+ // finish processing any packets waiting on peer's public key / identity
+ for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
+ RXQueueEntry *const rq = &(_rxQueue[ptr]);
+ if ((rq->timestamp)&&(rq->complete)) {
+ if (rq->frag0.tryDecode(RR,tPtr))
+ rq->timestamp = 0;
}
}