summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2018-07-20 14:01:58 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2018-07-20 14:01:58 -0700
commit5b114791e52c046be3b5db254566928ccc6c7a23 (patch)
tree7802c250e74f1fe28bc0d24b866364791b74acec /node
parent9bc11a369ca9ede9649103215fd0d491af4a8077 (diff)
downloadinfinitytier-5b114791e52c046be3b5db254566928ccc6c7a23.tar.gz
infinitytier-5b114791e52c046be3b5db254566928ccc6c7a23.zip
Fix a bug that caused a crash on empty HTTP requests (localhost only) and add a lightweight lock to the RX queue to prevent possible threads stepping on each other in parallel receive paths.
Diffstat (limited to 'node')
-rw-r--r--node/Switch.cpp5
-rw-r--r--node/Switch.hpp1
-rw-r--r--node/Topology.cpp3
3 files changed, 6 insertions, 3 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 3fa8c31d..eeeca5db 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -121,6 +121,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
// seeing a Packet::Fragment?
RXQueueEntry *const rq = _findRXQueueEntry(fragmentPacketId);
+ Mutex::Lock rql(rq->lock);
if (rq->packetId != fragmentPacketId) {
// No packet found, so we received a fragment without its head.
@@ -203,6 +204,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
);
RXQueueEntry *const rq = _findRXQueueEntry(packetId);
+ Mutex::Lock rql(rq->lock);
if (rq->packetId != packetId) {
// If we have no other fragments yet, create an entry and save the head
@@ -237,6 +239,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
IncomingPacket packet(data,len,path,now);
if (!packet.tryDecode(RR,tPtr)) {
RXQueueEntry *const rq = _nextRXQueueEntry();
+ Mutex::Lock rql(rq->lock);
rq->timestamp = now;
rq->packetId = packet.packetId();
rq->frag0 = packet;
@@ -545,6 +548,7 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
const int64_t now = RR->node->now();
for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
RXQueueEntry *const rq = &(_rxQueue[ptr]);
+ Mutex::Lock rql(rq->lock);
if ((rq->timestamp)&&(rq->complete)) {
if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT))
rq->timestamp = 0;
@@ -594,6 +598,7 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now)
for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
RXQueueEntry *const rq = &(_rxQueue[ptr]);
+ Mutex::Lock rql(rq->lock);
if ((rq->timestamp)&&(rq->complete)) {
if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT)) {
rq->timestamp = 0;
diff --git a/node/Switch.hpp b/node/Switch.hpp
index 906f418e..5de17fa0 100644
--- a/node/Switch.hpp
+++ b/node/Switch.hpp
@@ -159,6 +159,7 @@ private:
unsigned int totalFragments; // 0 if only frag0 received, waiting for frags
uint32_t haveFragments; // bit mask, LSB to MSB
volatile bool complete; // if true, packet is complete
+ Mutex lock;
};
RXQueueEntry _rxQueue[ZT_RX_QUEUE_SIZE];
AtomicCounter _rxQueuePtr;
diff --git a/node/Topology.cpp b/node/Topology.cpp
index a1b66ac7..7c526b41 100644
--- a/node/Topology.cpp
+++ b/node/Topology.cpp
@@ -382,8 +382,6 @@ void Topology::doPeriodicTasks(void *tPtr,int64_t now)
}
}
- // Temporarily disable path cleanup to test hypothesis about periodic threading issues as reported by Keysight.
-/*
{
Mutex::Lock _l(_paths_m);
Hashtable< Path::HashKey,SharedPtr<Path> >::Iterator i(_paths);
@@ -394,7 +392,6 @@ void Topology::doPeriodicTasks(void *tPtr,int64_t now)
_paths.erase(*k);
}
}
-*/
}
void Topology::_memoizeUpstreams(void *tPtr)