summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2017-08-08 13:21:10 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2017-08-08 13:21:10 -0700
commite3cf7567856267bc4f4af0f1fb857cab105602e8 (patch)
tree9282025e23ffb80c69c185ecc1d3389aa40d4d4f /node
parent7e6598e9ca28da7047176907f5cacbc53ab60afe (diff)
downloadinfinitytier-e3cf7567856267bc4f4af0f1fb857cab105602e8.tar.gz
infinitytier-e3cf7567856267bc4f4af0f1fb857cab105602e8.zip
Make rxQueue lock-free using an atomic counter ring buffer.
Diffstat (limited to 'node')
-rw-r--r--node/AtomicCounter.hpp9
-rw-r--r--node/Switch.cpp36
-rw-r--r--node/Switch.hpp27
3 files changed, 33 insertions, 39 deletions
diff --git a/node/AtomicCounter.hpp b/node/AtomicCounter.hpp
index e1864db8..abb342fe 100644
--- a/node/AtomicCounter.hpp
+++ b/node/AtomicCounter.hpp
@@ -47,6 +47,15 @@ public:
_v = 0;
}
+ inline int load() const
+ {
+#ifdef __GNUC__
+ return __sync_or_and_fetch(&_v,0);
+#else
+ return _v.load();
+#endif
+ }
+
inline int operator++()
{
#ifdef __GNUC__
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 9c9daac9..c509ef16 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -120,10 +120,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
// Total fragments must be more than 1, otherwise why are we
// seeing a Packet::Fragment?
- Mutex::Lock _l(_rxQueue_m);
- RXQueueEntry *const rq = _findRXQueueEntry(now,fragmentPacketId);
-
- if ((!rq->timestamp)||(rq->packetId != fragmentPacketId)) {
+ RXQueueEntry *const rq = _findRXQueueEntry(fragmentPacketId);
+ if (rq->packetId != fragmentPacketId) {
// No packet found, so we received a fragment without its head.
rq->timestamp = now;
@@ -250,10 +248,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
((uint64_t)reinterpret_cast<const uint8_t *>(data)[7])
);
- Mutex::Lock _l(_rxQueue_m);
- RXQueueEntry *const rq = _findRXQueueEntry(now,packetId);
-
- if ((!rq->timestamp)||(rq->packetId != packetId)) {
+ RXQueueEntry *const rq = _findRXQueueEntry(packetId);
+ if (rq->packetId != packetId) {
// If we have no other fragments yet, create an entry and save the head
rq->timestamp = now;
@@ -286,14 +282,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
// Packet is unfragmented, so just process it
IncomingPacket packet(data,len,path,now);
if (!packet.tryDecode(RR,tPtr)) {
- Mutex::Lock _l(_rxQueue_m);
- RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]);
- unsigned long i = ZT_RX_QUEUE_SIZE - 1;
- while ((i)&&(rq->timestamp)) {
- RXQueueEntry *tmp = &(_rxQueue[--i]);
- if (tmp->timestamp < rq->timestamp)
- rq = tmp;
- }
+ RXQueueEntry *const rq = _nextRXQueueEntry();
rq->timestamp = now;
rq->packetId = packet.packetId();
rq->frag0 = packet;
@@ -590,15 +579,12 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
_outstandingWhoisRequests.erase(peer->address());
}
- { // finish processing any packets waiting on peer's public key / identity
- Mutex::Lock _l(_rxQueue_m);
- unsigned long i = ZT_RX_QUEUE_SIZE;
- while (i) {
- RXQueueEntry *rq = &(_rxQueue[--i]);
- if ((rq->timestamp)&&(rq->complete)) {
- if (rq->frag0.tryDecode(RR,tPtr))
- rq->timestamp = 0;
- }
+ // finish processing any packets waiting on peer's public key / identity
+ for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
+ RXQueueEntry *const rq = &(_rxQueue[ptr]);
+ if ((rq->timestamp)&&(rq->complete)) {
+ if (rq->frag0.tryDecode(RR,tPtr))
+ rq->timestamp = 0;
}
}
diff --git a/node/Switch.hpp b/node/Switch.hpp
index 346aaca3..114bc5e1 100644
--- a/node/Switch.hpp
+++ b/node/Switch.hpp
@@ -169,25 +169,24 @@ private:
bool complete; // if true, packet is complete
};
RXQueueEntry _rxQueue[ZT_RX_QUEUE_SIZE];
- Mutex _rxQueue_m;
+ AtomicCounter _rxQueuePtr;
- /* Returns the matching or oldest entry. Caller must check timestamp and
- * packet ID to determine which. */
- inline RXQueueEntry *_findRXQueueEntry(uint64_t now,uint64_t packetId)
+ // Returns matching or next available RX queue entry
+ inline RXQueueEntry *_findRXQueueEntry(uint64_t packetId)
{
- RXQueueEntry *rq;
- RXQueueEntry *oldest = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]);
- unsigned long i = ZT_RX_QUEUE_SIZE;
- while (i) {
- rq = &(_rxQueue[--i]);
+ unsigned int ptr = static_cast<unsigned int>(_rxQueuePtr.load());
+ for(unsigned int k=0;k<ZT_RX_QUEUE_SIZE;++k) {
+ RXQueueEntry *rq = &(_rxQueue[--ptr % ZT_RX_QUEUE_SIZE]);
if ((rq->packetId == packetId)&&(rq->timestamp))
return rq;
- if ((now - rq->timestamp) >= ZT_RX_QUEUE_EXPIRE)
- rq->timestamp = 0;
- if (rq->timestamp < oldest->timestamp)
- oldest = rq;
}
- return oldest;
+ return &(_rxQueue[static_cast<unsigned int>(++_rxQueuePtr) % ZT_RX_QUEUE_SIZE]);
+ }
+
+ // Returns next RX queue entry in ring buffer and increments ring counter
+ inline RXQueueEntry *_nextRXQueueEntry()
+ {
+ return &(_rxQueue[static_cast<unsigned int>(++_rxQueuePtr) % ZT_RX_QUEUE_SIZE]);
}
// ZeroTier-layer TX queue entry