summaryrefslogtreecommitdiff
path: root/node/DeferredPackets.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/DeferredPackets.cpp')
-rw-r--r--node/DeferredPackets.cpp48
1 files changed, 29 insertions, 19 deletions
diff --git a/node/DeferredPackets.cpp b/node/DeferredPackets.cpp
index c8e63fc8..192b4078 100644
--- a/node/DeferredPackets.cpp
+++ b/node/DeferredPackets.cpp
@@ -26,8 +26,6 @@ namespace ZeroTier {
DeferredPackets::DeferredPackets(const RuntimeEnvironment *renv) :
RR(renv),
- _readPtr(0),
- _writePtr(0),
_waiting(0),
_die(false)
{
@@ -37,39 +35,45 @@ DeferredPackets::~DeferredPackets()
{
_q_m.lock();
_die = true;
- while (_waiting > 0) {
- _q_m.unlock();
+ _q_m.unlock();
+
+ for(;;) {
_q_s.post();
+
_q_m.lock();
+ if (_waiting <= 0) {
+ _q_m.unlock();
+ break;
+ } else {
+ _q_m.unlock();
+ }
}
}
bool DeferredPackets::enqueue(IncomingPacket *pkt)
{
- _q_m.lock();
- const unsigned long p = _writePtr % ZT_DEFFEREDPACKETS_MAX;
- if (_q[p]) {
- _q_m.unlock();
- return false;
- } else {
- _q[p].setToUnsafe(pkt);
- ++_writePtr;
- _q_m.unlock();
- _q_s.post();
- return true;
+ {
+ Mutex::Lock _l(_q_m);
+ if (_q.size() >= ZT_DEFFEREDPACKETS_MAX)
+ return false;
+ _q.push_back(*pkt);
}
+ _q_s.post();
+ return true;
}
int DeferredPackets::process()
{
- SharedPtr<IncomingPacket> pkt;
+ std::list<IncomingPacket> pkt;
_q_m.lock();
+
if (_die) {
_q_m.unlock();
return -1;
}
- while (_readPtr == _writePtr) {
+
+ while (_q.empty()) {
++_waiting;
_q_m.unlock();
_q_s.wait();
@@ -80,10 +84,16 @@ int DeferredPackets::process()
return -1;
}
}
- pkt.swap(_q[_readPtr++ % ZT_DEFFEREDPACKETS_MAX]);
+
+ // Move item from _q list to a dummy list here to avoid copying packet
+ pkt.splice(pkt.end(),_q,_q.begin());
+
_q_m.unlock();
- pkt->tryDecode(RR,true);
+ try {
+ pkt.front().tryDecode(RR,true);
+ } catch ( ... ) {} // drop invalids
+
return 1;
}