diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2018-07-20 14:01:58 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2018-07-20 14:01:58 -0700 |
commit | 5b114791e52c046be3b5db254566928ccc6c7a23 (patch) | |
tree | 7802c250e74f1fe28bc0d24b866364791b74acec /node | |
parent | 9bc11a369ca9ede9649103215fd0d491af4a8077 (diff) | |
download | infinitytier-5b114791e52c046be3b5db254566928ccc6c7a23.tar.gz infinitytier-5b114791e52c046be3b5db254566928ccc6c7a23.zip |
Fix a bug that caused a crash on empty HTTP requests (localhost only) and add a lightweight lock to the RX queue to prevent possible threads stepping on each other in parallel receive paths.
Diffstat (limited to 'node')
-rw-r--r-- | node/Switch.cpp | 5 | ||||
-rw-r--r-- | node/Switch.hpp | 1 | ||||
-rw-r--r-- | node/Topology.cpp | 3 |
3 files changed, 6 insertions, 3 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp index 3fa8c31d..eeeca5db 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -121,6 +121,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre // seeing a Packet::Fragment? RXQueueEntry *const rq = _findRXQueueEntry(fragmentPacketId); + Mutex::Lock rql(rq->lock); if (rq->packetId != fragmentPacketId) { // No packet found, so we received a fragment without its head. @@ -203,6 +204,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre ); RXQueueEntry *const rq = _findRXQueueEntry(packetId); + Mutex::Lock rql(rq->lock); if (rq->packetId != packetId) { // If we have no other fragments yet, create an entry and save the head @@ -237,6 +239,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre IncomingPacket packet(data,len,path,now); if (!packet.tryDecode(RR,tPtr)) { RXQueueEntry *const rq = _nextRXQueueEntry(); + Mutex::Lock rql(rq->lock); rq->timestamp = now; rq->packetId = packet.packetId(); rq->frag0 = packet; @@ -545,6 +548,7 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer) const int64_t now = RR->node->now(); for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) { RXQueueEntry *const rq = &(_rxQueue[ptr]); + Mutex::Lock rql(rq->lock); if ((rq->timestamp)&&(rq->complete)) { if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT)) rq->timestamp = 0; @@ -594,6 +598,7 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now) for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) { RXQueueEntry *const rq = &(_rxQueue[ptr]); + Mutex::Lock rql(rq->lock); if ((rq->timestamp)&&(rq->complete)) { if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT)) { rq->timestamp = 0; diff --git a/node/Switch.hpp b/node/Switch.hpp index 906f418e..5de17fa0 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -159,6 +159,7 @@ private: unsigned int totalFragments; // 0 if only frag0 received, waiting for frags uint32_t haveFragments; // bit mask, LSB to MSB volatile bool complete; // if true, packet is complete + Mutex lock; }; RXQueueEntry _rxQueue[ZT_RX_QUEUE_SIZE]; AtomicCounter _rxQueuePtr; diff --git a/node/Topology.cpp b/node/Topology.cpp index a1b66ac7..7c526b41 100644 --- a/node/Topology.cpp +++ b/node/Topology.cpp @@ -382,8 +382,6 @@ void Topology::doPeriodicTasks(void *tPtr,int64_t now) } } - // Temporarily disable path cleanup to test hypothesis about periodic threading issues as reported by Keysight. -/* { Mutex::Lock _l(_paths_m); Hashtable< Path::HashKey,SharedPtr<Path> >::Iterator i(_paths); @@ -394,7 +392,6 @@ void Topology::doPeriodicTasks(void *tPtr,int64_t now) _paths.erase(*k); } } -*/ } void Topology::_memoizeUpstreams(void *tPtr) |