summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/ZeroTierOne.h21
-rw-r--r--node/BinarySemaphore.hpp106
-rw-r--r--node/DeferredPackets.cpp95
-rw-r--r--node/DeferredPackets.hpp98
-rw-r--r--node/IncomingPacket.cpp113
-rw-r--r--node/IncomingPacket.hpp17
-rw-r--r--node/Node.cpp38
-rw-r--r--node/Node.hpp1
-rw-r--r--node/RuntimeEnvironment.hpp7
-rw-r--r--node/SharedPtr.hpp33
-rw-r--r--node/Switch.cpp8
-rw-r--r--objects.mk1
12 files changed, 464 insertions, 74 deletions
diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h
index e9b38c52..fd7857d9 100644
--- a/include/ZeroTierOne.h
+++ b/include/ZeroTierOne.h
@@ -1512,6 +1512,27 @@ void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned
void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs);
/**
+ * Do things in the background until Node dies
+ *
+ * This function can be called from one or more background threads to process
+ * certain tasks in the background to improve foreground performance. It will
+ * not return until the Node is shut down. If threading is not enabled in
+ * this build it will return immediately and will do nothing.
+ *
+ * This is completely optional. If this is never called, all processing is
+ * done in the foreground in the various processXXXX() methods.
+ *
+ * This does NOT replace or eliminate the need to call the normal
+ * processBackgroundTasks() function in your main loop. This mechanism is
+ * used to offload the processing of expensive mssages onto background
+ * handler threads to prevent foreground performance degradation under
+ * high load.
+ *
+ * @param node Node instance
+ */
+void ZT_Node_backgroundThreadMain(ZT_Node *node);
+
+/**
* Get ZeroTier One version
*
* @param major Result: major version
diff --git a/node/BinarySemaphore.hpp b/node/BinarySemaphore.hpp
new file mode 100644
index 00000000..97d0d1c4
--- /dev/null
+++ b/node/BinarySemaphore.hpp
@@ -0,0 +1,106 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2015 ZeroTier, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifndef ZT_BINARYSEMAPHORE_HPP
+#define ZT_BINARYSEMAPHORE_HPP
+
+#include <stdio.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include "Constants.hpp"
+#include "NonCopyable.hpp"
+
+#ifdef __WINDOWS__
+
+#include <Windows.h>
+
+namespace ZeroTier {
+
+class BinarySemaphore : NonCopyable
+{
+public:
+ BinarySemaphore() throw() { _sem = CreateSemaphore(NULL,0,1,NULL); }
+ ~BinarySemaphore() { CloseHandle(_sem); }
+ inline void wait() { WaitForSingleObject(_sem,INFINITE); }
+ inline void post() { ReleaseSemaphore(_sem,1,NULL); }
+private:
+ HANDLE _sem;
+};
+
+} // namespace ZeroTier
+
+#else // !__WINDOWS__
+
+#include <pthread.h>
+
+namespace ZeroTier {
+
+class BinarySemaphore : NonCopyable
+{
+public:
+ BinarySemaphore()
+ {
+ pthread_mutex_init(&_mh,(const pthread_mutexattr_t *)0);
+ pthread_cond_init(&_cond,(const pthread_condattr_t *)0);
+ _f = false;
+ }
+
+ ~BinarySemaphore()
+ {
+ pthread_cond_destroy(&_cond);
+ pthread_mutex_destroy(&_mh);
+ }
+
+ inline void wait()
+ {
+ pthread_mutex_lock(const_cast <pthread_mutex_t *>(&_mh));
+ while (!_f)
+ pthread_cond_wait(const_cast <pthread_cond_t *>(&_cond),const_cast <pthread_mutex_t *>(&_mh));
+ _f = false;
+ pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
+ }
+
+ inline void post()
+ {
+ pthread_mutex_lock(const_cast <pthread_mutex_t *>(&_mh));
+ _f = true;
+ pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
+ pthread_cond_signal(const_cast <pthread_cond_t *>(&_cond));
+ }
+
+private:
+ pthread_cond_t _cond;
+ pthread_mutex_t _mh;
+ volatile bool _f;
+};
+
+} // namespace ZeroTier
+
+#endif // !__WINDOWS__
+
+#endif
diff --git a/node/DeferredPackets.cpp b/node/DeferredPackets.cpp
new file mode 100644
index 00000000..923e1339
--- /dev/null
+++ b/node/DeferredPackets.cpp
@@ -0,0 +1,95 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2015 ZeroTier, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#include "Constants.hpp"
+#include "DeferredPackets.hpp"
+#include "IncomingPacket.hpp"
+#include "RuntimeEnvironment.hpp"
+#include "Node.hpp"
+
+namespace ZeroTier {
+
+DeferredPackets::DeferredPackets(const RuntimeEnvironment *renv) :
+ RR(renv),
+ _readPtr(0),
+ _writePtr(0),
+ _die(false)
+{
+}
+
+DeferredPackets::~DeferredPackets()
+{
+ _q_m.lock();
+ _die = true;
+ _q_m.unlock();
+ _q_s.post();
+}
+
+bool DeferredPackets::enqueue(IncomingPacket *pkt)
+{
+ _q_m.lock();
+ const unsigned long p = _writePtr % ZT_DEFFEREDPACKETS_MAX;
+ if (_q[p]) {
+ _q_m.unlock();
+ return false;
+ } else {
+ _q[p].setToUnsafe(pkt);
+ ++_writePtr;
+ _q_m.unlock();
+ _q_s.post();
+ return true;
+ }
+}
+
+int DeferredPackets::process()
+{
+ SharedPtr<IncomingPacket> pkt;
+
+ _q_m.lock();
+ if (_die) {
+ _q_m.unlock();
+ _q_s.post();
+ return -1;
+ }
+ while (_readPtr == _writePtr) {
+ _q_m.unlock();
+ _q_s.wait();
+ _q_m.lock();
+ if (_die) {
+ _q_m.unlock();
+ _q_s.post();
+ return -1;
+ }
+ }
+ pkt.swap(_q[_readPtr++ % ZT_DEFFEREDPACKETS_MAX]);
+ _q_m.unlock();
+
+ pkt->tryDecode(RR,true);
+ return 1;
+}
+
+} // namespace ZeroTier
diff --git a/node/DeferredPackets.hpp b/node/DeferredPackets.hpp
new file mode 100644
index 00000000..1ea65f3c
--- /dev/null
+++ b/node/DeferredPackets.hpp
@@ -0,0 +1,98 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2015 ZeroTier, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifndef ZT_DEFERREDPACKETS_HPP
+#define ZT_DEFERREDPACKETS_HPP
+
+#include "Constants.hpp"
+#include "SharedPtr.hpp"
+#include "Mutex.hpp"
+#include "DeferredPackets.hpp"
+#include "BinarySemaphore.hpp"
+
+/**
+ * Maximum number of deferred packets
+ */
+#define ZT_DEFFEREDPACKETS_MAX 1024
+
+namespace ZeroTier {
+
+class IncomingPacket;
+class RuntimeEnvironment;
+
+/**
+ * Deferred packets
+ *
+ * IncomingPacket can defer its decoding this way by enqueueing itself here.
+ * When this is done, deferredDecode() is called later. This is done for
+ * operations that may be expensive to allow them to potentially be handled
+ * in the background or rate limited to maintain quality of service for more
+ * routine operations.
+ */
+class DeferredPackets
+{
+public:
+ DeferredPackets(const RuntimeEnvironment *renv);
+ ~DeferredPackets();
+
+ /**
+ * Enqueue a packet
+ *
+ * Since packets enqueue themselves, they call it with 'this' and we wrap
+ * them in a SharedPtr<>. This is safe as SharedPtr<> is introspective and
+ * supports this. This should not be called from any other code outside
+ * IncomingPacket.
+ *
+ * @param pkt Packet to process later (possibly in the background)
+ * @return False if queue is full
+ */
+ bool enqueue(IncomingPacket *pkt);
+
+ /**
+ * Wait for and then process a deferred packet
+ *
+ * If we are shutting down (in destructor), this returns -1 and should
+ * not be called again. Otherwise it returns the number of packets
+ * processed.
+ *
+ * @return Number processed or -1 if shutting down
+ */
+ int process();
+
+private:
+ SharedPtr<IncomingPacket> _q[ZT_DEFFEREDPACKETS_MAX];
+ const RuntimeEnvironment *const RR;
+ unsigned long _readPtr;
+ unsigned long _writePtr;
+ bool _die;
+ Mutex _q_m;
+ BinarySemaphore _q_s;
+};
+
+} // namespace ZeroTier
+
+#endif
diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp
index 32229ba6..f2216235 100644
--- a/node/IncomingPacket.cpp
+++ b/node/IncomingPacket.cpp
@@ -46,21 +46,31 @@
#include "Cluster.hpp"
#include "Node.hpp"
#include "AntiRecursion.hpp"
+#include "DeferredPackets.hpp"
namespace ZeroTier {
-bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR)
+bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,bool deferred)
{
const Address sourceAddress(source());
try {
if ((cipher() == ZT_PROTO_CIPHER_SUITE__C25519_POLY1305_NONE)&&(verb() == Packet::VERB_HELLO)) {
- // Unencrypted HELLOs are handled here since they are used to
- // populate our identity cache in the first place. _doHELLO() is special
- // in that it contains its own authentication logic.
- return _doHELLO(RR);
+ // Unencrypted HELLOs require some potentially expensive verification, so
+ // do this in the background if background processing is enabled.
+ DeferredPackets *const dp = RR->dp; // read volatile pointer
+ if ((dp)&&(!deferred)) {
+ dp->enqueue(this);
+ return true; // 'handled' via deferring to background thread(s)
+ } else {
+ // A null pointer for peer to _doHELLO() tells it to run its own
+ // special internal authentication logic. This is done for unencrypted
+ // HELLOs to learn new identities, etc.
+ SharedPtr<Peer> tmp;
+ return _doHELLO(RR,tmp);
+ }
}
- SharedPtr<Peer> peer = RR->topology->getPeer(sourceAddress);
+ SharedPtr<Peer> peer(RR->topology->getPeer(sourceAddress));
if (peer) {
if (!dearmor(peer->key())) {
TRACE("dropped packet from %s(%s), MAC authentication failed (size: %u)",peer->address().toString().c_str(),_remoteAddress.toString().c_str(),size());
@@ -79,7 +89,8 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR)
default: // ignore unknown verbs, but if they pass auth check they are "received"
peer->received(RR,_localAddress,_remoteAddress,hops(),packetId(),v,0,Packet::VERB_NOP);
return true;
- case Packet::VERB_HELLO: return _doHELLO(RR);
+
+ case Packet::VERB_HELLO: return _doHELLO(RR,peer);
case Packet::VERB_ERROR: return _doERROR(RR,peer);
case Packet::VERB_OK: return _doOK(RR,peer);
case Packet::VERB_WHOIS: return _doWHOIS(RR,peer);
@@ -185,7 +196,7 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,const SharedPtr<Peer>
return true;
}
-bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR)
+bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR,SharedPtr<Peer> &peer)
{
/* Note: this is the only packet ever sent in the clear, and it's also
* the only packet that we authenticate via a different path. Authentication
@@ -226,63 +237,65 @@ bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR)
return true;
}
- SharedPtr<Peer> peer(RR->topology->getPeer(id.address()));
- if (peer) {
- // We already have an identity with this address -- check for collisions
-
- if (peer->identity() != id) {
- // Identity is different from the one we already have -- address collision
-
- unsigned char key[ZT_PEER_SECRET_KEY_LENGTH];
- if (RR->identity.agree(id,key,ZT_PEER_SECRET_KEY_LENGTH)) {
- if (dearmor(key)) { // ensure packet is authentic, otherwise drop
- TRACE("rejected HELLO from %s(%s): address already claimed",id.address().toString().c_str(),_remoteAddress.toString().c_str());
- Packet outp(id.address(),RR->identity.address(),Packet::VERB_ERROR);
- outp.append((unsigned char)Packet::VERB_HELLO);
- outp.append((uint64_t)pid);
- outp.append((unsigned char)Packet::ERROR_IDENTITY_COLLISION);
- outp.armor(key,true);
- RR->node->putPacket(_localAddress,_remoteAddress,outp.data(),outp.size());
+ if (!peer) {
+ peer = RR->topology->getPeer(id.address());
+ if (peer) {
+ // We already have an identity with this address -- check for collisions
+
+ if (peer->identity() != id) {
+ // Identity is different from the one we already have -- address collision
+
+ unsigned char key[ZT_PEER_SECRET_KEY_LENGTH];
+ if (RR->identity.agree(id,key,ZT_PEER_SECRET_KEY_LENGTH)) {
+ if (dearmor(key)) { // ensure packet is authentic, otherwise drop
+ TRACE("rejected HELLO from %s(%s): address already claimed",id.address().toString().c_str(),_remoteAddress.toString().c_str());
+ Packet outp(id.address(),RR->identity.address(),Packet::VERB_ERROR);
+ outp.append((unsigned char)Packet::VERB_HELLO);
+ outp.append((uint64_t)pid);
+ outp.append((unsigned char)Packet::ERROR_IDENTITY_COLLISION);
+ outp.armor(key,true);
+ RR->node->putPacket(_localAddress,_remoteAddress,outp.data(),outp.size());
+ } else {
+ TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
+ }
} else {
- TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
+ TRACE("rejected HELLO from %s(%s): key agreement failed",id.address().toString().c_str(),_remoteAddress.toString().c_str());
}
+
+ return true;
} else {
- TRACE("rejected HELLO from %s(%s): key agreement failed",id.address().toString().c_str(),_remoteAddress.toString().c_str());
- }
+ // Identity is the same as the one we already have -- check packet integrity
- return true;
+ if (!dearmor(peer->key())) {
+ TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
+ return true;
+ }
+
+ // Continue at // VALID
+ }
} else {
- // Identity is the same as the one we already have -- check packet integrity
+ // We don't already have an identity with this address -- validate and learn it
- if (!dearmor(peer->key())) {
+ // Check identity proof of work
+ if (!id.locallyValidate()) {
+ TRACE("dropped HELLO from %s(%s): identity invalid",id.address().toString().c_str(),_remoteAddress.toString().c_str());
+ return true;
+ }
+
+ // Check packet integrity and authentication
+ SharedPtr<Peer> newPeer(new Peer(RR->identity,id));
+ if (!dearmor(newPeer->key())) {
TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
return true;
}
+ peer = RR->topology->addPeer(newPeer);
// Continue at // VALID
}
- } else {
- // We don't already have an identity with this address -- validate and learn it
-
- // Check identity proof of work
- if (!id.locallyValidate()) {
- TRACE("dropped HELLO from %s(%s): identity invalid",id.address().toString().c_str(),_remoteAddress.toString().c_str());
- return true;
- }
- // Check packet integrity and authentication
- SharedPtr<Peer> newPeer(new Peer(RR->identity,id));
- if (!dearmor(newPeer->key())) {
- TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
- return true;
- }
- peer = RR->topology->addPeer(newPeer);
-
- // Continue at // VALID
+ // VALID -- if we made it here, packet passed identity and authenticity checks!
}
- // VALID -- if we made it here, packet passed identity and authenticity checks!
-
if (externalSurfaceAddress)
RR->sa->iam(id.address(),_remoteAddress,externalSurfaceAddress,RR->topology->isRoot(id),RR->node->now());
diff --git a/node/IncomingPacket.hpp b/node/IncomingPacket.hpp
index f5dd4b27..7fb7dbd3 100644
--- a/node/IncomingPacket.hpp
+++ b/node/IncomingPacket.hpp
@@ -93,14 +93,21 @@ public:
* about whether the packet was valid. A rejection is 'complete.'
*
* Once true is returned, this must not be called again. The packet's state
- * may no longer be valid.
+ * may no longer be valid. The only exception is deferred decoding. In this
+ * case true is returned to indicate to the normal decode path that it is
+ * finished with the packet. The packet will have added itself to the
+ * deferred queue and will expect tryDecode() to be called one more time
+ * with deferred set to true.
+ *
+ * Deferred decoding is performed by DeferredPackets.cpp and should not be
+ * done elsewhere. Under deferred decoding packets only get one shot and
+ * so the return value of tryDecode() is ignored.
*
* @param RR Runtime environment
+ * @param deferred If true, this is a deferred decode and the return is ignored
* @return True if decoding and processing is complete, false if caller should try again
- * @throws std::out_of_range Range error processing packet (should be discarded)
- * @throws std::runtime_error Other error processing packet (should be discarded)
*/
- bool tryDecode(const RuntimeEnvironment *RR);
+ bool tryDecode(const RuntimeEnvironment *RR,bool deferred);
/**
* @return Time of packet receipt / start of decode
@@ -132,7 +139,7 @@ private:
// These are called internally to handle packet contents once it has
// been authenticated, decrypted, decompressed, and classified.
bool _doERROR(const RuntimeEnvironment *RR,const SharedPtr<Peer> &peer);
- bool _doHELLO(const RuntimeEnvironment *RR);
+ bool _doHELLO(const RuntimeEnvironment *RR,SharedPtr<Peer> &peer); // can be called with NULL peer, while all others cannot
bool _doOK(const RuntimeEnvironment *RR,const SharedPtr<Peer> &peer);
bool _doWHOIS(const RuntimeEnvironment *RR,const SharedPtr<Peer> &peer);
bool _doRENDEZVOUS(const RuntimeEnvironment *RR,const SharedPtr<Peer> &peer);
diff --git a/node/Node.cpp b/node/Node.cpp
index 82cb7ddb..bcf5db1a 100644
--- a/node/Node.cpp
+++ b/node/Node.cpp
@@ -47,6 +47,7 @@
#include "Identity.hpp"
#include "SelfAwareness.hpp"
#include "Cluster.hpp"
+#include "DeferredPackets.hpp"
const struct sockaddr_storage ZT_SOCKADDR_NULL = {0};
@@ -130,7 +131,14 @@ Node::Node(
Node::~Node()
{
Mutex::Lock _l(_networks_m);
- _networks.clear(); // ensure that networks are destroyed before shutdown
+ Mutex::Lock _l2(RR->dpSetLock);
+
+ _networks.clear(); // ensure that networks are destroyed before shutdow
+
+ DeferredPackets *dp = RR->dp;
+ RR->dp = (DeferredPackets *)0;
+ delete dp;
+
delete RR->sa;
delete RR->topology;
delete RR->antiRec;
@@ -637,6 +645,27 @@ void Node::clusterStatus(ZT_ClusterStatus *cs)
memset(cs,0,sizeof(ZT_ClusterStatus));
}
+void Node::backgroundThreadMain()
+{
+ RR->dpSetLock.lock();
+ if (!RR->dp) {
+ try {
+ RR->dp = new DeferredPackets(RR);
+ } catch ( ... ) { // sanity check -- could only really happen if out of memory
+ RR->dpSetLock.unlock();
+ return;
+ }
+ }
+ RR->dpSetLock.unlock();
+
+ for(;;) {
+ try {
+ if (RR->dp->process() < 0)
+ break;
+ } catch ( ... ) {} // sanity check -- should not throw
+ }
+}
+
/****************************************************************************/
/* Node methods used only within node/ */
/****************************************************************************/
@@ -978,6 +1007,13 @@ void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs)
} catch ( ... ) {}
}
+void ZT_Node_backgroundThreadMain(ZT_Node *node)
+{
+ try {
+ reinterpret_cast<ZeroTier::Node *>(node)->backgroundThreadMain();
+ } catch ( ... ) {}
+}
+
void ZT_version(int *major,int *minor,int *revision,unsigned long *featureFlags)
{
if (major) *major = ZEROTIER_ONE_VERSION_MAJOR;
diff --git a/node/Node.hpp b/node/Node.hpp
index 9b85b832..800c0a55 100644
--- a/node/Node.hpp
+++ b/node/Node.hpp
@@ -125,6 +125,7 @@ public:
void clusterRemoveMember(unsigned int memberId);
void clusterHandleIncomingMessage(const void *msg,unsigned int len);
void clusterStatus(ZT_ClusterStatus *cs);
+ void backgroundThreadMain();
// Internal functions ------------------------------------------------------
diff --git a/node/RuntimeEnvironment.hpp b/node/RuntimeEnvironment.hpp
index 2ec88f72..18d9e8e5 100644
--- a/node/RuntimeEnvironment.hpp
+++ b/node/RuntimeEnvironment.hpp
@@ -32,6 +32,7 @@
#include "Constants.hpp"
#include "Identity.hpp"
+#include "Mutex.hpp"
namespace ZeroTier {
@@ -44,6 +45,7 @@ class AntiRecursion;
class NetworkController;
class SelfAwareness;
class Cluster;
+class DeferredPackets;
/**
* Holds global state for an instance of ZeroTier::Node
@@ -55,6 +57,7 @@ public:
node(n)
,identity()
,localNetworkController((NetworkController *)0)
+ ,dp((DeferredPackets *)0)
,sw((Switch *)0)
,mc((Multicaster *)0)
,antiRec((AntiRecursion *)0)
@@ -77,6 +80,10 @@ public:
// This is set externally to an instance of this base class
NetworkController *localNetworkController;
+ // This is created if background threads call Node::backgroundThreadMain().
+ DeferredPackets *volatile dp; // can be read without lock but not written
+ Mutex dpSetLock;
+
/*
* Order matters a bit here. These are constructed in this order
* and then deleted in the opposite order on Node exit. The order ensures
diff --git a/node/SharedPtr.hpp b/node/SharedPtr.hpp
index 4ecfa818..289c499f 100644
--- a/node/SharedPtr.hpp
+++ b/node/SharedPtr.hpp
@@ -64,20 +64,6 @@ public:
++obj->__refCount;
}
- SharedPtr(T *obj,bool runAwayFromZombies)
- throw() :
- _ptr(obj)
- {
- // HACK: this is used in "handlers" to take ownership of naked pointers,
- // an ugly pattern that really ought to be factored out.
- if (runAwayFromZombies) {
- if ((int)(++obj->__refCount) < 2) {
- --obj->__refCount;
- _ptr = (T *)0;
- }
- } else ++obj->__refCount;
- }
-
SharedPtr(const SharedPtr &sp)
throw() :
_ptr(sp._getAndInc())
@@ -105,6 +91,25 @@ public:
return *this;
}
+ /**
+ * Set to a naked pointer and increment its reference count
+ *
+ * This assumes this SharedPtr is NULL and that ptr is not a 'zombie.' No
+ * checks are performed.
+ *
+ * @param ptr Naked pointer to assign
+ */
+ inline void setToUnsafe(T *ptr)
+ {
+ ++ptr->__refCount;
+ _ptr = ptr;
+ }
+
+ /**
+ * Swap with another pointer 'for free' without ref count overhead
+ *
+ * @param with Pointer to swap with
+ */
inline void swap(SharedPtr &with)
throw()
{
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 97befbc6..c047a3d1 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -475,7 +475,7 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
{ // finish processing any packets waiting on peer's public key / identity
Mutex::Lock _l(_rxQueue_m);
for(std::list< SharedPtr<IncomingPacket> >::iterator rxi(_rxQueue.begin());rxi!=_rxQueue.end();) {
- if ((*rxi)->tryDecode(RR))
+ if ((*rxi)->tryDecode(RR,false))
_rxQueue.erase(rxi++);
else ++rxi;
}
@@ -672,7 +672,7 @@ void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const Inet
packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
_defragQueue.erase(pid); // dq no longer valid after this
- if (!packet->tryDecode(RR)) {
+ if (!packet->tryDecode(RR,false)) {
Mutex::Lock _l(_rxQueue_m);
_rxQueue.push_back(packet);
}
@@ -746,7 +746,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr
packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
_defragQueue.erase(pid); // dq no longer valid after this
- if (!packet->tryDecode(RR)) {
+ if (!packet->tryDecode(RR,false)) {
Mutex::Lock _l(_rxQueue_m);
_rxQueue.push_back(packet);
}
@@ -757,7 +757,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr
} // else this is a duplicate head, ignore
} else {
// Packet is unfragmented, so just process it
- if (!packet->tryDecode(RR)) {
+ if (!packet->tryDecode(RR,false)) {
Mutex::Lock _l(_rxQueue_m);
_rxQueue.push_back(packet);
}
diff --git a/objects.mk b/objects.mk
index 540072d5..8daec8b5 100644
--- a/objects.mk
+++ b/objects.mk
@@ -5,6 +5,7 @@ OBJS=\
node/C25519.o \
node/CertificateOfMembership.o \
node/Cluster.o \
+ node/DeferredPackets.o \
node/Dictionary.o \
node/Identity.o \
node/IncomingPacket.o \