summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
Diffstat (limited to 'node')
-rw-r--r--node/CertificateOfMembership.hpp10
-rw-r--r--node/Cluster.cpp15
-rw-r--r--node/Cluster.hpp13
-rw-r--r--node/Constants.hpp63
-rw-r--r--node/DeferredPackets.cpp48
-rw-r--r--node/DeferredPackets.hpp13
-rw-r--r--node/IncomingPacket.cpp23
-rw-r--r--node/IncomingPacket.hpp47
-rw-r--r--node/InetAddress.cpp4
-rw-r--r--node/Node.cpp11
-rw-r--r--node/Packet.hpp2
-rw-r--r--node/Path.hpp10
-rw-r--r--node/Peer.cpp115
-rw-r--r--node/Peer.hpp6
-rw-r--r--node/SelfAwareness.cpp86
-rw-r--r--node/SelfAwareness.hpp15
-rw-r--r--node/Switch.cpp470
-rw-r--r--node/Switch.hpp41
-rw-r--r--node/Utils.hpp4
19 files changed, 587 insertions, 409 deletions
diff --git a/node/CertificateOfMembership.hpp b/node/CertificateOfMembership.hpp
index c6d59397..9a279883 100644
--- a/node/CertificateOfMembership.hpp
+++ b/node/CertificateOfMembership.hpp
@@ -33,6 +33,16 @@
#include "Identity.hpp"
#include "Utils.hpp"
+/**
+ * Default window of time for certificate agreement
+ *
+ * Right now we use time for 'revision' so this is the maximum time divergence
+ * between two certs for them to agree. It comes out to five minutes, which
+ * gives a lot of margin for error if the controller hiccups or its clock
+ * drifts but causes de-authorized peers to fall off fast enough.
+ */
+#define ZT_NETWORK_COM_DEFAULT_REVISION_MAX_DELTA (ZT_NETWORK_AUTOCONF_DELAY * 5)
+
namespace ZeroTier {
/**
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index e4c4524a..61903307 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -570,10 +570,19 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
Mutex::Lock _l2(_members[mostRecentMemberId].lock);
if (buf.size() > 0)
_send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
- if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0) {
- TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
- RR->node->putPacket(InetAddress(),_members[mostRecentMemberId].zeroTierPhysicalEndpoints.front(),data,len);
+
+ for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
+ for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
+ if (i1->ss_family == i2->ss_family) {
+ TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
+ RR->node->putPacket(*i1,*i2,data,len);
+ return;
+ }
+ }
}
+
+ TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
+ return;
}
}
diff --git a/node/Cluster.hpp b/node/Cluster.hpp
index e21d6020..dafbf425 100644
--- a/node/Cluster.hpp
+++ b/node/Cluster.hpp
@@ -47,22 +47,22 @@
/**
* Desired period between doPeriodicTasks() in milliseconds
*/
-#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 50
+#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 20
/**
* How often to flush outgoing message queues (maximum interval)
*/
-#define ZT_CLUSTER_FLUSH_PERIOD 100
+#define ZT_CLUSTER_FLUSH_PERIOD ZT_CLUSTER_PERIODIC_TASK_PERIOD
/**
* Maximum number of queued outgoing packets per sender address
*/
-#define ZT_CLUSTER_MAX_QUEUE_PER_SENDER 8
+#define ZT_CLUSTER_MAX_QUEUE_PER_SENDER 16
/**
* Expiration time for send queue entries
*/
-#define ZT_CLUSTER_QUEUE_EXPIRATION 5000
+#define ZT_CLUSTER_QUEUE_EXPIRATION 3000
/**
* Chunk size for allocating queue entries
@@ -85,11 +85,8 @@
/**
* Max data per queue entry
- *
- * If we ever support larger transport MTUs this must be increased. The plus
- * 16 is just a small margin and has no special meaning.
*/
-#define ZT_CLUSTER_SEND_QUEUE_DATA_MAX (ZT_UDP_DEFAULT_PAYLOAD_MTU + 16)
+#define ZT_CLUSTER_SEND_QUEUE_DATA_MAX 1500
namespace ZeroTier {
diff --git a/node/Constants.hpp b/node/Constants.hpp
index 4d6c9d07..6c44a8dc 100644
--- a/node/Constants.hpp
+++ b/node/Constants.hpp
@@ -51,8 +51,20 @@
#include <endian.h>
#endif
-// Disable type punning on ARM architecture -- some ARM chips throw SIGBUS on unaligned access
-#if defined(__arm__) || defined(__ARMEL__)
+#ifdef __APPLE__
+#include <TargetConditionals.h>
+#ifndef __UNIX_LIKE__
+#define __UNIX_LIKE__
+#endif
+#ifndef __BSD__
+#define __BSD__
+#endif
+#include <machine/endian.h>
+#endif
+
+// Defined this macro to disable "type punning" on a number of targets that
+// have issues with unaligned memory access.
+#if defined(__arm__) || defined(__ARMEL__) || (defined(__APPLE__) && ( (defined(TARGET_OS_IPHONE) && (TARGET_OS_IPHONE != 0)) || (defined(TARGET_OS_WATCH) && (TARGET_OS_WATCH != 0)) || (defined(TARGET_IPHONE_SIMULATOR) && (TARGET_IPHONE_SIMULATOR != 0)) ) )
#ifndef ZT_NO_TYPE_PUNNING
#define ZT_NO_TYPE_PUNNING
#endif
@@ -73,18 +85,6 @@
#endif
#endif
-// TODO: Android is what? Linux technically, but does it define it?
-
-#ifdef __APPLE__
-#include <TargetConditionals.h>
-#ifndef __UNIX_LIKE__
-#define __UNIX_LIKE__
-#endif
-#ifndef __BSD__
-#define __BSD__
-#endif
-#endif
-
#if defined(_WIN32) || defined(_WIN64)
#ifndef __WINDOWS__
#define __WINDOWS__
@@ -104,9 +104,8 @@
#include <Windows.h>
#endif
-// Assume these are little-endian. PPC is not supported for OSX, and ARM
-// runs in little-endian mode for these OS families.
-#if defined(__APPLE__) || defined(__WINDOWS__)
+// Assume little endian if not defined
+#if (defined(__APPLE__) || defined(__WINDOWS__)) && (!defined(__BYTE_ORDER))
#undef __BYTE_ORDER
#undef __LITTLE_ENDIAN
#undef __BIG_ENDIAN
@@ -163,9 +162,17 @@
#define ZT_MAX_PACKET_FRAGMENTS 4
/**
- * Timeout for receipt of fragmented packets in ms
+ * Size of RX queue
+ *
+ * This is about 2mb, and can be decreased for small devices. A queue smaller
+ * than about 4 is probably going to cause a lot of lost packets.
+ */
+#define ZT_RX_QUEUE_SIZE 64
+
+/**
+ * RX queue entries older than this do not "exist"
*/
-#define ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT 500
+#define ZT_RX_QUEUE_EXPIRE 4000
/**
* Length of secret key in bytes -- 256-bit -- do not change
@@ -255,12 +262,17 @@
/**
* Delay between ordinary case pings of direct links
*/
-#define ZT_PEER_DIRECT_PING_DELAY 90000
+#define ZT_PEER_DIRECT_PING_DELAY 60000
/**
* Timeout for overall peer activity (measured from last receive)
*/
-#define ZT_PEER_ACTIVITY_TIMEOUT ((ZT_PEER_DIRECT_PING_DELAY * 4) + ZT_PING_CHECK_INVERVAL)
+#define ZT_PEER_ACTIVITY_TIMEOUT 500000
+
+/**
+ * Timeout for path activity
+ */
+#define ZT_PATH_ACTIVITY_TIMEOUT ZT_PEER_ACTIVITY_TIMEOUT
/**
* No answer timeout to trigger dead path detection
@@ -354,6 +366,15 @@
*/
#define ZT_TEST_NETWORK_ID 0xffffffffffffffffULL
+/**
+ * Desired buffer size for UDP sockets (used in service and osdep but defined here)
+ */
+#if (defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) || defined(__AMD64) || defined(__AMD64__))
+#define ZT_UDP_DESIRED_BUF_SIZE 1048576
+#else
+#define ZT_UDP_DESIRED_BUF_SIZE 131072
+#endif
+
/* Ethernet frame types that might be relevant to us */
#define ZT_ETHERTYPE_IPV4 0x0800
#define ZT_ETHERTYPE_ARP 0x0806
diff --git a/node/DeferredPackets.cpp b/node/DeferredPackets.cpp
index c8e63fc8..192b4078 100644
--- a/node/DeferredPackets.cpp
+++ b/node/DeferredPackets.cpp
@@ -26,8 +26,6 @@ namespace ZeroTier {
DeferredPackets::DeferredPackets(const RuntimeEnvironment *renv) :
RR(renv),
- _readPtr(0),
- _writePtr(0),
_waiting(0),
_die(false)
{
@@ -37,39 +35,45 @@ DeferredPackets::~DeferredPackets()
{
_q_m.lock();
_die = true;
- while (_waiting > 0) {
- _q_m.unlock();
+ _q_m.unlock();
+
+ for(;;) {
_q_s.post();
+
_q_m.lock();
+ if (_waiting <= 0) {
+ _q_m.unlock();
+ break;
+ } else {
+ _q_m.unlock();
+ }
}
}
bool DeferredPackets::enqueue(IncomingPacket *pkt)
{
- _q_m.lock();
- const unsigned long p = _writePtr % ZT_DEFFEREDPACKETS_MAX;
- if (_q[p]) {
- _q_m.unlock();
- return false;
- } else {
- _q[p].setToUnsafe(pkt);
- ++_writePtr;
- _q_m.unlock();
- _q_s.post();
- return true;
+ {
+ Mutex::Lock _l(_q_m);
+ if (_q.size() >= ZT_DEFFEREDPACKETS_MAX)
+ return false;
+ _q.push_back(*pkt);
}
+ _q_s.post();
+ return true;
}
int DeferredPackets::process()
{
- SharedPtr<IncomingPacket> pkt;
+ std::list<IncomingPacket> pkt;
_q_m.lock();
+
if (_die) {
_q_m.unlock();
return -1;
}
- while (_readPtr == _writePtr) {
+
+ while (_q.empty()) {
++_waiting;
_q_m.unlock();
_q_s.wait();
@@ -80,10 +84,16 @@ int DeferredPackets::process()
return -1;
}
}
- pkt.swap(_q[_readPtr++ % ZT_DEFFEREDPACKETS_MAX]);
+
+ // Move item from _q list to a dummy list here to avoid copying packet
+ pkt.splice(pkt.end(),_q,_q.begin());
+
_q_m.unlock();
- pkt->tryDecode(RR,true);
+ try {
+ pkt.front().tryDecode(RR,true);
+ } catch ( ... ) {} // drop invalids
+
return 1;
}
diff --git a/node/DeferredPackets.hpp b/node/DeferredPackets.hpp
index 5ba26531..a9855396 100644
--- a/node/DeferredPackets.hpp
+++ b/node/DeferredPackets.hpp
@@ -19,6 +19,8 @@
#ifndef ZT_DEFERREDPACKETS_HPP
#define ZT_DEFERREDPACKETS_HPP
+#include <list>
+
#include "Constants.hpp"
#include "SharedPtr.hpp"
#include "Mutex.hpp"
@@ -28,7 +30,7 @@
/**
* Maximum number of deferred packets
*/
-#define ZT_DEFFEREDPACKETS_MAX 1024
+#define ZT_DEFFEREDPACKETS_MAX 256
namespace ZeroTier {
@@ -53,11 +55,6 @@ public:
/**
* Enqueue a packet
*
- * Since packets enqueue themselves, they call it with 'this' and we wrap
- * them in a SharedPtr<>. This is safe as SharedPtr<> is introspective and
- * supports this. This should not be called from any other code outside
- * IncomingPacket.
- *
* @param pkt Packet to process later (possibly in the background)
* @return False if queue is full
*/
@@ -75,10 +72,8 @@ public:
int process();
private:
- SharedPtr<IncomingPacket> _q[ZT_DEFFEREDPACKETS_MAX];
+ std::list<IncomingPacket> _q;
const RuntimeEnvironment *const RR;
- unsigned long _readPtr;
- unsigned long _writePtr;
volatile int _waiting;
volatile bool _die;
Mutex _q_m;
diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp
index efb43d23..35c78b29 100644
--- a/node/IncomingPacket.cpp
+++ b/node/IncomingPacket.cpp
@@ -282,7 +282,7 @@ bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR,SharedPtr<Peer> &peer
}
if (externalSurfaceAddress)
- RR->sa->iam(id.address(),_remoteAddress,externalSurfaceAddress,RR->topology->isRoot(id),RR->node->now());
+ RR->sa->iam(id.address(),_localAddress,_remoteAddress,externalSurfaceAddress,RR->topology->isRoot(id),RR->node->now());
Packet outp(id.address(),RR->identity.address(),Packet::VERB_OK);
outp.append((unsigned char)Packet::VERB_HELLO);
@@ -388,7 +388,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,const SharedPtr<Peer> &p
peer->setRemoteVersion(vProto,vMajor,vMinor,vRevision);
if (externalSurfaceAddress)
- RR->sa->iam(peer->address(),_remoteAddress,externalSurfaceAddress,trusted,RR->node->now());
+ RR->sa->iam(peer->address(),_localAddress,_remoteAddress,externalSurfaceAddress,trusted,RR->node->now());
} break;
case Packet::VERB_WHOIS: {
@@ -934,10 +934,10 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,const Sha
switch(addrType) {
case 4: {
InetAddress a(field(ptr,4),4,at<uint16_t>(ptr + 4));
- if ( ((flags & 0x01) == 0) && (Path::isAddressValidForPath(a)) && (!peer->hasActivePathTo(now,a)) && (RR->node->shouldUsePathForZeroTierTraffic(_localAddress,a)) ) {
+ if ( ((flags & 0x01) == 0) && (!peer->hasActivePathTo(now,a)) && (RR->node->shouldUsePathForZeroTierTraffic(_localAddress,a)) ) {
if (++countPerScope[(int)a.ipScope()][0] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str());
- peer->sendHELLO(_localAddress,a,now);
+ peer->sendHELLO(InetAddress(),a,now);
} else {
TRACE("ignoring contact for %s at %s -- too many per scope",peer->address().toString().c_str(),a.toString().c_str());
}
@@ -945,10 +945,10 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,const Sha
} break;
case 6: {
InetAddress a(field(ptr,16),16,at<uint16_t>(ptr + 16));
- if ( ((flags & 0x01) == 0) && (Path::isAddressValidForPath(a)) && (!peer->hasActivePathTo(now,a)) && (RR->node->shouldUsePathForZeroTierTraffic(_localAddress,a)) ) {
+ if ( ((flags & 0x01) == 0) && (!peer->hasActivePathTo(now,a)) && (RR->node->shouldUsePathForZeroTierTraffic(_localAddress,a)) ) {
if (++countPerScope[(int)a.ipScope()][1] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str());
- peer->sendHELLO(_localAddress,a,now);
+ peer->sendHELLO(InetAddress(),a,now);
} else {
TRACE("ignoring contact for %s at %s -- too many per scope",peer->address().toString().c_str(),a.toString().c_str());
}
@@ -1016,8 +1016,9 @@ bool IncomingPacket::_doCIRCUIT_TEST(const RuntimeEnvironment *RR,const SharedPt
if (previousHopCredentialLength >= 1) {
switch((*this)[ZT_PACKET_IDX_PAYLOAD + 31 + vlf]) {
case 0x01: { // network certificate of membership for previous hop
- if (previousHopCom.deserialize(*this,ZT_PACKET_IDX_PAYLOAD + 32 + vlf) != (previousHopCredentialLength - 1)) {
- TRACE("dropped CIRCUIT_TEST from %s(%s): previous hop COM invalid",source().toString().c_str(),_remoteAddress.toString().c_str());
+ const unsigned int phcl = previousHopCom.deserialize(*this,ZT_PACKET_IDX_PAYLOAD + 32 + vlf);
+ if (phcl != (previousHopCredentialLength - 1)) {
+ TRACE("dropped CIRCUIT_TEST from %s(%s): previous hop COM invalid (%u != %u)",source().toString().c_str(),_remoteAddress.toString().c_str(),phcl,(previousHopCredentialLength - 1));
return true;
}
} break;
@@ -1033,7 +1034,7 @@ bool IncomingPacket::_doCIRCUIT_TEST(const RuntimeEnvironment *RR,const SharedPt
SharedPtr<Network> nw(RR->node->network(originatorCredentialNetworkId));
if (nw) {
originatorCredentialNetworkConfig = nw->config2();
- if ( (originatorCredentialNetworkConfig) && ((originatorCredentialNetworkConfig->isPublic())||(peer->address() == originatorAddress)||((originatorCredentialNetworkConfig->com())&&(previousHopCom)&&(originatorCredentialNetworkConfig->com().agreesWith(previousHopCom)))) ) {
+ if ( (originatorCredentialNetworkConfig) && ( (originatorCredentialNetworkConfig->isPublic()) || (peer->address() == originatorAddress) || ((originatorCredentialNetworkConfig->com())&&(previousHopCom)&&(originatorCredentialNetworkConfig->com().agreesWith(previousHopCom))) ) ) {
TRACE("CIRCUIT_TEST %.16llx received from hop %s(%s) and originator %s with valid network ID credential %.16llx (verified from originator and next hop)",testId,source().toString().c_str(),_remoteAddress.toString().c_str(),originatorAddress.toString().c_str(),originatorCredentialNetworkId);
} else {
TRACE("dropped CIRCUIT_TEST from %s(%s): originator %s specified network ID %.16llx as credential, and previous hop %s did not supply a valid COM",source().toString().c_str(),_remoteAddress.toString().c_str(),originatorAddress.toString().c_str(),originatorCredentialNetworkId,peer->address().toString().c_str());
@@ -1078,7 +1079,7 @@ bool IncomingPacket::_doCIRCUIT_TEST(const RuntimeEnvironment *RR,const SharedPt
Packet outp(originatorAddress,RR->identity.address(),Packet::VERB_CIRCUIT_TEST_REPORT);
outp.append((uint64_t)timestamp);
outp.append((uint64_t)testId);
- outp.append((uint64_t)now);
+ outp.append((uint64_t)0); // field reserved for future use
outp.append((uint8_t)ZT_VENDOR_ZEROTIER);
outp.append((uint8_t)ZT_PROTO_VERSION);
outp.append((uint8_t)ZEROTIER_ONE_VERSION_MAJOR);
@@ -1111,7 +1112,7 @@ bool IncomingPacket::_doCIRCUIT_TEST(const RuntimeEnvironment *RR,const SharedPt
if ((originatorCredentialNetworkConfig)&&(!originatorCredentialNetworkConfig->isPublic())&&(originatorCredentialNetworkConfig->com())) {
outp.append((uint8_t)0x01); // COM
originatorCredentialNetworkConfig->com().serialize(outp);
- outp.setAt<uint16_t>(previousHopCredentialPos,(uint16_t)(size() - previousHopCredentialPos));
+ outp.setAt<uint16_t>(previousHopCredentialPos,(uint16_t)(outp.size() - (previousHopCredentialPos + 2)));
}
if (remainingHopsPtr < size())
outp.append(field(remainingHopsPtr,size() - remainingHopsPtr),size() - remainingHopsPtr);
diff --git a/node/IncomingPacket.hpp b/node/IncomingPacket.hpp
index 0df20034..96e46c00 100644
--- a/node/IncomingPacket.hpp
+++ b/node/IncomingPacket.hpp
@@ -24,8 +24,6 @@
#include "Packet.hpp"
#include "InetAddress.hpp"
#include "Utils.hpp"
-#include "SharedPtr.hpp"
-#include "AtomicCounter.hpp"
#include "MulticastGroup.hpp"
#include "Peer.hpp"
@@ -55,9 +53,21 @@ class Network;
*/
class IncomingPacket : public Packet
{
- friend class SharedPtr<IncomingPacket>;
-
public:
+ IncomingPacket() :
+ Packet(),
+ _receiveTime(0),
+ _localAddress(),
+ _remoteAddress()
+ {
+ }
+
+ IncomingPacket(const IncomingPacket &p)
+ {
+ // All fields including InetAddress are memcpy'able
+ memcpy(this,&p,sizeof(IncomingPacket));
+ }
+
/**
* Create a new packet-in-decode
*
@@ -72,9 +82,33 @@ public:
Packet(data,len),
_receiveTime(now),
_localAddress(localAddress),
- _remoteAddress(remoteAddress),
- __refCount()
+ _remoteAddress(remoteAddress)
+ {
+ }
+
+ inline IncomingPacket &operator=(const IncomingPacket &p)
+ {
+ // All fields including InetAddress are memcpy'able
+ memcpy(this,&p,sizeof(IncomingPacket));
+ return *this;
+ }
+
+ /**
+ * Init packet-in-decode in place
+ *
+ * @param data Packet data
+ * @param len Packet length
+ * @param localAddress Local interface address
+ * @param remoteAddress Address from which packet came
+ * @param now Current time
+ * @throws std::out_of_range Range error processing packet
+ */
+ inline void init(const void *data,unsigned int len,const InetAddress &localAddress,const InetAddress &remoteAddress,uint64_t now)
{
+ copyFrom(data,len);
+ _receiveTime = now;
+ _localAddress = localAddress;
+ _remoteAddress = remoteAddress;
}
/**
@@ -154,7 +188,6 @@ private:
uint64_t _receiveTime;
InetAddress _localAddress;
InetAddress _remoteAddress;
- AtomicCounter __refCount;
};
} // namespace ZeroTier
diff --git a/node/InetAddress.cpp b/node/InetAddress.cpp
index d5cd227c..dca772e8 100644
--- a/node/InetAddress.cpp
+++ b/node/InetAddress.cpp
@@ -127,8 +127,10 @@ void InetAddress::set(const void *ipBytes,unsigned int ipLen,unsigned int port)
{
memset(this,0,sizeof(InetAddress));
if (ipLen == 4) {
+ uint32_t ipb[1];
+ memcpy(ipb,ipBytes,4);
ss_family = AF_INET;
- reinterpret_cast<struct sockaddr_in *>(this)->sin_addr.s_addr = *(reinterpret_cast<const uint32_t *>(ipBytes));
+ reinterpret_cast<struct sockaddr_in *>(this)->sin_addr.s_addr = ipb[0];
reinterpret_cast<struct sockaddr_in *>(this)->sin_port = Utils::hton((uint16_t)port);
} else if (ipLen == 16) {
ss_family = AF_INET6;
diff --git a/node/Node.cpp b/node/Node.cpp
index bdeca701..1934ef7d 100644
--- a/node/Node.cpp
+++ b/node/Node.cpp
@@ -493,10 +493,10 @@ int Node::addLocalInterfaceAddress(const struct sockaddr_storage *addr)
{
if (Path::isAddressValidForPath(*(reinterpret_cast<const InetAddress *>(addr)))) {
Mutex::Lock _l(_directPaths_m);
- _directPaths.push_back(*(reinterpret_cast<const InetAddress *>(addr)));
- std::sort(_directPaths.begin(),_directPaths.end());
- _directPaths.erase(std::unique(_directPaths.begin(),_directPaths.end()),_directPaths.end());
- return 1;
+ if (std::find(_directPaths.begin(),_directPaths.end(),*(reinterpret_cast<const InetAddress *>(addr))) == _directPaths.end()) {
+ _directPaths.push_back(*(reinterpret_cast<const InetAddress *>(addr)));
+ return 1;
+ }
}
return 0;
}
@@ -670,6 +670,9 @@ std::string Node::dataStoreGet(const char *name)
bool Node::shouldUsePathForZeroTierTraffic(const InetAddress &localAddress,const InetAddress &remoteAddress)
{
+ if (!Path::isAddressValidForPath(remoteAddress))
+ return false;
+
{
Mutex::Lock _l(_networks_m);
for(std::vector< std::pair< uint64_t, SharedPtr<Network> > >::const_iterator i=_networks.begin();i!=_networks.end();++i) {
diff --git a/node/Packet.hpp b/node/Packet.hpp
index 2381397b..7d1e5c68 100644
--- a/node/Packet.hpp
+++ b/node/Packet.hpp
@@ -934,7 +934,7 @@ public:
* Circuit test hop report:
* <[8] 64-bit timestamp (from original test)>
* <[8] 64-bit test ID (from original test)>
- * <[8] 64-bit reporter timestamp (reporter's clock, 0 if unspec)>
+ * <[8] 64-bit reserved field (set to 0, currently unused)>
* <[1] 8-bit vendor ID (set to 0, currently unused)>
* <[1] 8-bit reporter protocol version>
* <[1] 8-bit reporter major version>
diff --git a/node/Path.hpp b/node/Path.hpp
index 668728e0..cb7622d3 100644
--- a/node/Path.hpp
+++ b/node/Path.hpp
@@ -127,7 +127,7 @@ public:
inline bool active(uint64_t now) const
throw()
{
- return (((now - _lastReceived) < ZT_PEER_ACTIVITY_TIMEOUT)&&(_probation < ZT_PEER_DEAD_PATH_DETECTION_MAX_PROBATION));
+ return (((now - _lastReceived) < ZT_PATH_ACTIVITY_TIMEOUT)&&(_probation < ZT_PEER_DEAD_PATH_DETECTION_MAX_PROBATION));
}
/**
@@ -243,6 +243,14 @@ public:
case InetAddress::IP_SCOPE_PSEUDOPRIVATE:
case InetAddress::IP_SCOPE_SHARED:
case InetAddress::IP_SCOPE_GLOBAL:
+ if (a.ss_family == AF_INET6) {
+ // TEMPORARY HACK: for now, we are going to blacklist he.net IPv6
+ // tunnels due to very spotty performance and low MTU issues over
+ // these IPv6 tunnel links.
+ const uint8_t *ipd = reinterpret_cast<const uint8_t *>(reinterpret_cast<const struct sockaddr_in6 *>(&a)->sin6_addr.s6_addr);
+ if ((ipd[0] == 0x20)&&(ipd[1] == 0x01)&&(ipd[2] == 0x04)&&(ipd[3] == 0x70))
+ return false;
+ }
return true;
default:
return false;
diff --git a/node/Peer.cpp b/node/Peer.cpp
index 17f9b2ef..87ca94e1 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -240,70 +240,83 @@ bool Peer::doPingAndKeepalive(uint64_t now,int inetAddressFamily)
return false;
}
-void Peer::pushDirectPaths(Path *path,uint64_t now,bool force)
+bool Peer::pushDirectPaths(const InetAddress &localAddr,const InetAddress &toAddress,uint64_t now,bool force)
{
#ifdef ZT_ENABLE_CLUSTER
// Cluster mode disables normal PUSH_DIRECT_PATHS in favor of cluster-based peer redirection
if (RR->cluster)
- return;
+ return false;
#endif
- if (((now - _lastDirectPathPushSent) >= ZT_DIRECT_PATH_PUSH_INTERVAL)||(force)) {
- _lastDirectPathPushSent = now;
+ if (!force) {
+ if ((now - _lastDirectPathPushSent) < ZT_DIRECT_PATH_PUSH_INTERVAL)
+ return false;
+ else _lastDirectPathPushSent = now;
+ }
- std::vector<InetAddress> dps(RR->node->directPaths());
- if (dps.empty())
- return;
+ std::vector<InetAddress> dps(RR->node->directPaths());
+ std::vector<InetAddress> sym(RR->sa->getSymmetricNatPredictions());
+ for(unsigned long i=0,added=0;i<sym.size();++i) {
+ InetAddress tmp(sym[(unsigned long)RR->node->prng() % sym.size()]);
+ if (std::find(dps.begin(),dps.end(),tmp) == dps.end()) {
+ dps.push_back(tmp);
+ if (++added >= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY)
+ break;
+ }
+ }
+ if (dps.empty())
+ return false;
#ifdef ZT_TRACE
- {
- std::string ps;
- for(std::vector<InetAddress>::const_iterator p(dps.begin());p!=dps.end();++p) {
- if (ps.length() > 0)
- ps.push_back(',');
- ps.append(p->toString());
- }
- TRACE("pushing %u direct paths to %s: %s",(unsigned int)dps.size(),_id.address().toString().c_str(),ps.c_str());
+ {
+ std::string ps;
+ for(std::vector<InetAddress>::const_iterator p(dps.begin());p!=dps.end();++p) {
+ if (ps.length() > 0)
+ ps.push_back(',');
+ ps.append(p->toString());
}
+ TRACE("pushing %u direct paths to %s: %s",(unsigned int)dps.size(),_id.address().toString().c_str(),ps.c_str());
+ }
#endif
- std::vector<InetAddress>::const_iterator p(dps.begin());
- while (p != dps.end()) {
- Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
- outp.addSize(2); // leave room for count
-
- unsigned int count = 0;
- while ((p != dps.end())&&((outp.size() + 24) < ZT_PROTO_MAX_PACKET_LENGTH)) {
- uint8_t addressType = 4;
- switch(p->ss_family) {
- case AF_INET:
- break;
- case AF_INET6:
- addressType = 6;
- break;
- default: // we currently only push IP addresses
- ++p;
- continue;
- }
+ std::vector<InetAddress>::const_iterator p(dps.begin());
+ while (p != dps.end()) {
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
+ outp.addSize(2); // leave room for count
+
+ unsigned int count = 0;
+ while ((p != dps.end())&&((outp.size() + 24) < 1200)) {
+ uint8_t addressType = 4;
+ switch(p->ss_family) {
+ case AF_INET:
+ break;
+ case AF_INET6:
+ addressType = 6;
+ break;
+ default: // we currently only push IP addresses
+ ++p;
+ continue;
+ }
- outp.append((uint8_t)0); // no flags
- outp.append((uint16_t)0); // no extensions
- outp.append(addressType);
- outp.append((uint8_t)((addressType == 4) ? 6 : 18));
- outp.append(p->rawIpData(),((addressType == 4) ? 4 : 16));
- outp.append((uint16_t)p->port());
+ outp.append((uint8_t)0); // no flags
+ outp.append((uint16_t)0); // no extensions
+ outp.append(addressType);
+ outp.append((uint8_t)((addressType == 4) ? 6 : 18));
+ outp.append(p->rawIpData(),((addressType == 4) ? 4 : 16));
+ outp.append((uint16_t)p->port());
- ++count;
- ++p;
- }
+ ++count;
+ ++p;
+ }
- if (count) {
- outp.setAt(ZT_PACKET_IDX_PAYLOAD,(uint16_t)count);
- outp.armor(_key,true);
- path->send(RR,outp.data(),outp.size(),now);
- }
+ if (count) {
+ outp.setAt(ZT_PACKET_IDX_PAYLOAD,(uint16_t)count);
+ outp.armor(_key,true);
+ RR->node->putPacket(localAddr,toAddress,outp.data(),outp.size(),0);
}
}
+
+ return true;
}
bool Peer::resetWithinScope(InetAddress::IpScope scope,uint64_t now)
@@ -373,7 +386,7 @@ bool Peer::validateAndSetNetworkMembershipCertificate(uint64_t nwid,const Certif
// Check signature, log and return if cert is invalid
if (com.signedBy() != Network::controllerFor(nwid)) {
- TRACE("rejected network membership certificate for %.16llx signed by %s: signer not a controller of this network",(unsigned long long)_id,com.signedBy().toString().c_str());
+ TRACE("rejected network membership certificate for %.16llx signed by %s: signer not a controller of this network",(unsigned long long)nwid,com.signedBy().toString().c_str());
return false; // invalid signer
}
@@ -382,7 +395,7 @@ bool Peer::validateAndSetNetworkMembershipCertificate(uint64_t nwid,const Certif
// We are the controller: RR->identity.address() == controller() == cert.signedBy()
// So, verify that we signed th cert ourself
if (!com.verify(RR->identity)) {
- TRACE("rejected network membership certificate for %.16llx self signed by %s: signature check failed",(unsigned long long)_id,com.signedBy().toString().c_str());
+ TRACE("rejected network membership certificate for %.16llx self signed by %s: signature check failed",(unsigned long long)nwid,com.signedBy().toString().c_str());
return false; // invalid signature
}
@@ -398,7 +411,7 @@ bool Peer::validateAndSetNetworkMembershipCertificate(uint64_t nwid,const Certif
}
if (!com.verify(signer->identity())) {
- TRACE("rejected network membership certificate for %.16llx signed by %s: signature check failed",(unsigned long long)_id,com.signedBy().toString().c_str());
+ TRACE("rejected network membership certificate for %.16llx signed by %s: signature check failed",(unsigned long long)nwid,com.signedBy().toString().c_str());
return false; // invalid signature
}
}
@@ -419,7 +432,7 @@ bool Peer::needsOurNetworkMembershipCertificate(uint64_t nwid,uint64_t now,bool
const uint64_t tmp = lastPushed;
if (updateLastPushedTime)
lastPushed = now;
- return ((now - tmp) >= (ZT_NETWORK_AUTOCONF_DELAY / 2));
+ return ((now - tmp) >= (ZT_NETWORK_AUTOCONF_DELAY / 3));
}
void Peer::clean(uint64_t now)
diff --git a/node/Peer.hpp b/node/Peer.hpp
index 5796baf9..94c58ae8 100644
--- a/node/Peer.hpp
+++ b/node/Peer.hpp
@@ -170,11 +170,13 @@ public:
/**
* Push direct paths back to self if we haven't done so in the configured timeout
*
- * @param path Remote path to use to send the push
+ * @param localAddr Local address
+ * @param toAddress Remote address to send push to (usually from path)
* @param now Current time
* @param force If true, push regardless of rate limit
+ * @return True if something was actually sent
*/
- void pushDirectPaths(Path *path,uint64_t now,bool force);
+ bool pushDirectPaths(const InetAddress &localAddr,const InetAddress &toAddress,uint64_t now,bool force);
/**
* @return All known direct paths to this peer (active or inactive)
diff --git a/node/SelfAwareness.cpp b/node/SelfAwareness.cpp
index db069046..8bed0c51 100644
--- a/node/SelfAwareness.cpp
+++ b/node/SelfAwareness.cpp
@@ -20,6 +20,9 @@
#include <stdlib.h>
#include <string.h>
+#include <set>
+#include <vector>
+
#include "Constants.hpp"
#include "SelfAwareness.hpp"
#include "RuntimeEnvironment.hpp"
@@ -64,34 +67,18 @@ SelfAwareness::~SelfAwareness()
{
}
-void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysicalAddress,const InetAddress &myPhysicalAddress,bool trusted,uint64_t now)
+void SelfAwareness::iam(const Address &reporter,const InetAddress &receivedOnLocalAddress,const InetAddress &reporterPhysicalAddress,const InetAddress &myPhysicalAddress,bool trusted,uint64_t now)
{
const InetAddress::IpScope scope = myPhysicalAddress.ipScope();
- // This would be weird, e.g. a public IP talking to 10.0.0.1, so just ignore it.
- // If your network is this weird it's probably not reliable information.
- if (scope != reporterPhysicalAddress.ipScope())
+ if ((scope != reporterPhysicalAddress.ipScope())||(scope == InetAddress::IP_SCOPE_NONE)||(scope == InetAddress::IP_SCOPE_LOOPBACK)||(scope == InetAddress::IP_SCOPE_MULTICAST))
return;
- // Some scopes we ignore, and global scope IPs are only used for this
- // mechanism if they come from someone we trust (e.g. a root).
- switch(scope) {
- case InetAddress::IP_SCOPE_NONE:
- case InetAddress::IP_SCOPE_LOOPBACK:
- case InetAddress::IP_SCOPE_MULTICAST:
- return;
- case InetAddress::IP_SCOPE_GLOBAL:
- if (!trusted)
- return;
- break;
- default:
- break;
- }
-
Mutex::Lock _l(_phy_m);
- PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,reporterPhysicalAddress,scope)];
+ PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,receivedOnLocalAddress,reporterPhysicalAddress,scope)];
- if ( ((now - entry.ts) < ZT_SELFAWARENESS_ENTRY_TIMEOUT) && (!entry.mySurface.ipsEqual(myPhysicalAddress)) ) {
+ if ( (trusted) && ((now - entry.ts) < ZT_SELFAWARENESS_ENTRY_TIMEOUT) && (!entry.mySurface.ipsEqual(myPhysicalAddress)) ) {
+ // Changes to external surface reported by trusted peers causes path reset in this scope
entry.mySurface = myPhysicalAddress;
entry.ts = now;
TRACE("physical address %s for scope %u as seen from %s(%s) differs from %s, resetting paths in scope",myPhysicalAddress.toString().c_str(),(unsigned int)scope,reporter.toString().c_str(),reporterPhysicalAddress.toString().c_str(),entry.mySurface.toString().c_str());
@@ -123,6 +110,7 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi
}
}
} else {
+ // Otherwise just update DB to use to determine external surface info
entry.mySurface = myPhysicalAddress;
entry.ts = now;
}
@@ -140,4 +128,60 @@ void SelfAwareness::clean(uint64_t now)
}
}
+std::vector<InetAddress> SelfAwareness::getSymmetricNatPredictions()
+{
+ /* This is based on ideas and strategies found here:
+ * https://tools.ietf.org/html/draft-takeda-symmetric-nat-traversal-00
+ *
+ * In short: a great many symmetric NATs allocate ports sequentially.
+ * This is common on enterprise and carrier grade NATs as well as consumer
+ * devices. This code generates a list of "you might try this" addresses by
+ * extrapolating likely port assignments from currently known external
+ * global IPv4 surfaces. These can then be included in a PUSH_DIRECT_PATHS
+ * message to another peer, causing it to possibly try these addresses and
+ * bust our local symmetric NAT. It works often enough to be worth the
+ * extra bit of code and does no harm in cases where it fails. */
+
+ // Gather unique surfaces indexed by local received-on address and flag
+ // us as behind a symmetric NAT if there is more than one.
+ std::map< InetAddress,std::set<InetAddress> > surfaces;
+ bool symmetric = false;
+ {
+ Mutex::Lock _l(_phy_m);
+ Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy);
+ PhySurfaceKey *k = (PhySurfaceKey *)0;
+ PhySurfaceEntry *e = (PhySurfaceEntry *)0;
+ while (i.next(k,e)) {
+ if ((e->mySurface.ss_family == AF_INET)&&(e->mySurface.ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
+ std::set<InetAddress> &s = surfaces[k->receivedOnLocalAddress];
+ s.insert(e->mySurface);
+ symmetric = symmetric||(s.size() > 1);
+ }
+ }
+ }
+
+ // If we appear to be symmetrically NATed, generate and return extrapolations
+ // of those surfaces. Since PUSH_DIRECT_PATHS is sent multiple times, we
+ // probabilistically generate extrapolations of anywhere from +1 to +5 to
+ // increase the odds that it will work "eventually".
+ if (symmetric) {
+ std::vector<InetAddress> r;
+ for(std::map< InetAddress,std::set<InetAddress> >::iterator si(surfaces.begin());si!=surfaces.end();++si) {
+ for(std::set<InetAddress>::iterator i(si->second.begin());i!=si->second.end();++i) {
+ InetAddress ipp(*i);
+ unsigned int p = ipp.port() + 1 + ((unsigned int)RR->node->prng() & 3);
+ if (p >= 65535)
+ p -= 64510; // NATs seldom use ports <=1024 so wrap to 1025
+ ipp.setPort(p);
+ if ((si->second.count(ipp) == 0)&&(std::find(r.begin(),r.end(),ipp) == r.end())) {
+ r.push_back(ipp);
+ }
+ }
+ }
+ return r;
+ }
+
+ return std::vector<InetAddress>();
+}
+
} // namespace ZeroTier
diff --git a/node/SelfAwareness.hpp b/node/SelfAwareness.hpp
index 2534d986..06c264a9 100644
--- a/node/SelfAwareness.hpp
+++ b/node/SelfAwareness.hpp
@@ -42,12 +42,13 @@ public:
* Called when a trusted remote peer informs us of our external network address
*
* @param reporter ZeroTier address of reporting peer
+ * @param receivedOnLocalAddress Local address on which report was received
* @param reporterPhysicalAddress Physical address that reporting peer seems to have
* @param myPhysicalAddress Physical address that peer says we have
* @param trusted True if this peer is trusted as an authority to inform us of external address changes
* @param now Current time
*/
- void iam(const Address &reporter,const InetAddress &reporterPhysicalAddress,const InetAddress &myPhysicalAddress,bool trusted,uint64_t now);
+ void iam(const Address &reporter,const InetAddress &receivedOnLocalAddress,const InetAddress &reporterPhysicalAddress,const InetAddress &myPhysicalAddress,bool trusted,uint64_t now);
/**
* Clean up database periodically
@@ -56,18 +57,26 @@ public:
*/
void clean(uint64_t now);
+ /**
+ * If we appear to be behind a symmetric NAT, get predictions for possible external endpoints
+ *
+ * @return Symmetric NAT predictions or empty vector if none
+ */
+ std::vector<InetAddress> getSymmetricNatPredictions();
+
private:
struct PhySurfaceKey
{
Address reporter;
+ InetAddress receivedOnLocalAddress;
InetAddress reporterPhysicalAddress;
InetAddress::IpScope scope;
PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {}
- PhySurfaceKey(const Address &r,const InetAddress &ra,InetAddress::IpScope s) : reporter(r),reporterPhysicalAddress(ra),scope(s) {}
+ PhySurfaceKey(const Address &r,const InetAddress &rol,const InetAddress &ra,InetAddress::IpScope s) : reporter(r),receivedOnLocalAddress(rol),reporterPhysicalAddress(ra),scope(s) {}
inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); }
- inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(reporterPhysicalAddress == k.reporterPhysicalAddress)&&(scope == k.scope)); }
+ inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(receivedOnLocalAddress == k.receivedOnLocalAddress)&&(reporterPhysicalAddress == k.reporterPhysicalAddress)&&(scope == k.scope)); }
};
struct PhySurfaceEntry
{
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 4c91a855..968d1a4a 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -60,7 +60,6 @@ Switch::Switch(const RuntimeEnvironment *renv) :
RR(renv),
_lastBeaconResponse(0),
_outstandingWhoisRequests(32),
- _defragQueue(32),
_lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine
{
}
@@ -72,11 +71,14 @@ Switch::~Switch()
void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len)
{
try {
+ const uint64_t now = RR->node->now();
+
if (len == 13) {
/* LEGACY: before VERB_PUSH_DIRECT_PATHS, peers used broadcast
* announcements on the LAN to solve the 'same network problem.' We
* no longer send these, but we'll listen for them for a while to
* locate peers with versions <1.0.4. */
+
Address beaconAddr(reinterpret_cast<const char *>(data) + 8,5);
if (beaconAddr == RR->identity.address())
return;
@@ -84,7 +86,6 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
return;
SharedPtr<Peer> peer(RR->topology->getPeer(beaconAddr));
if (peer) { // we'll only respond to beacons from known peers
- const uint64_t now = RR->node->now();
if ((now - _lastBeaconResponse) >= 2500) { // limit rate of responses
_lastBeaconResponse = now;
Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NOP);
@@ -92,11 +93,209 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
RR->node->putPacket(localAddr,fromAddr,outp.data(),outp.size());
}
}
- } else if (len > ZT_PROTO_MIN_FRAGMENT_LENGTH) {
- if (((const unsigned char *)data)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) {
- _handleRemotePacketFragment(localAddr,fromAddr,data,len);
- } else if (len >= ZT_PROTO_MIN_PACKET_LENGTH) {
- _handleRemotePacketHead(localAddr,fromAddr,data,len);
+
+ } else if (len > ZT_PROTO_MIN_FRAGMENT_LENGTH) { // min length check is important!
+ if (reinterpret_cast<const uint8_t *>(data)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) {
+ // Handle fragment ----------------------------------------------------
+
+ Packet::Fragment fragment(data,len);
+ const Address destination(fragment.destination());
+
+ if (destination != RR->identity.address()) {
+ // Fragment is not for us, so try to relay it
+ if (fragment.hops() < ZT_RELAY_MAX_HOPS) {
+ fragment.incrementHops();
+
+ // Note: we don't bother initiating NAT-t for fragments, since heads will set that off.
+ // It wouldn't hurt anything, just redundant and unnecessary.
+ SharedPtr<Peer> relayTo = RR->topology->getPeer(destination);
+ if ((!relayTo)||(!relayTo->send(fragment.data(),fragment.size(),now))) {
+#ifdef ZT_ENABLE_CLUSTER
+ if (RR->cluster) {
+ RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false);
+ return;
+ }
+#endif
+
+ // Don't know peer or no direct path -- so relay via root server
+ relayTo = RR->topology->getBestRoot();
+ if (relayTo)
+ relayTo->send(fragment.data(),fragment.size(),now);
+ }
+ } else {
+ TRACE("dropped relay [fragment](%s) -> %s, max hops exceeded",fromAddr.toString().c_str(),destination.toString().c_str());
+ }
+ } else {
+ // Fragment looks like ours
+ const uint64_t fragmentPacketId = fragment.packetId();
+ const unsigned int fragmentNumber = fragment.fragmentNumber();
+ const unsigned int totalFragments = fragment.totalFragments();
+
+ if ((totalFragments <= ZT_MAX_PACKET_FRAGMENTS)&&(fragmentNumber < ZT_MAX_PACKET_FRAGMENTS)&&(fragmentNumber > 0)&&(totalFragments > 1)) {
+ // Fragment appears basically sane. Its fragment number must be
+ // 1 or more, since a Packet with fragmented bit set is fragment 0.
+ // Total fragments must be more than 1, otherwise why are we
+ // seeing a Packet::Fragment?
+
+ Mutex::Lock _l(_rxQueue_m);
+ RXQueueEntry *const rq = _findRXQueueEntry(now,fragmentPacketId);
+
+ if ((!rq->timestamp)||(rq->packetId != fragmentPacketId)) {
+ // No packet found, so we received a fragment without its head.
+ //TRACE("fragment (%u/%u) of %.16llx from %s",fragmentNumber + 1,totalFragments,fragmentPacketId,fromAddr.toString().c_str());
+
+ rq->timestamp = now;
+ rq->packetId = fragmentPacketId;
+ rq->frags[fragmentNumber - 1] = fragment;
+ rq->totalFragments = totalFragments; // total fragment count is known
+ rq->haveFragments = 1 << fragmentNumber; // we have only this fragment
+ rq->complete = false;
+ } else if (!(rq->haveFragments & (1 << fragmentNumber))) {
+ // We have other fragments and maybe the head, so add this one and check
+ //TRACE("fragment (%u/%u) of %.16llx from %s",fragmentNumber + 1,totalFragments,fragmentPacketId,fromAddr.toString().c_str());
+
+ rq->frags[fragmentNumber - 1] = fragment;
+ rq->totalFragments = totalFragments;
+
+ if (Utils::countBits(rq->haveFragments |= (1 << fragmentNumber)) == totalFragments) {
+ // We have all fragments -- assemble and process full Packet
+ //TRACE("packet %.16llx is complete, assembling and processing...",fragmentPacketId);
+
+ for(unsigned int f=1;f<totalFragments;++f)
+ rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength());
+
+ if (rq->frag0.tryDecode(RR,false)) {
+ rq->timestamp = 0; // packet decoded, free entry
+ } else {
+ rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something
+ }
+ }
+ } // else this is a duplicate fragment, ignore
+ }
+ }
+
+ // --------------------------------------------------------------------
+ } else if (len >= ZT_PROTO_MIN_PACKET_LENGTH) { // min length check is important!
+ // Handle packet head -------------------------------------------------
+
+ // See packet format in Packet.hpp to understand this
+ const uint64_t packetId = (
+ (((uint64_t)reinterpret_cast<const uint8_t *>(data)[0]) << 56) |
+ (((uint64_t)reinterpret_cast<const uint8_t *>(data)[1]) << 48) |
+ (((uint64_t)reinterpret_cast<const uint8_t *>(data)[2]) << 40) |
+ (((uint64_t)reinterpret_cast<const uint8_t *>(data)[3]) << 32) |
+ (((uint64_t)reinterpret_cast<const uint8_t *>(data)[4]) << 24) |
+ (((uint64_t)reinterpret_cast<const uint8_t *>(data)[5]) << 16) |
+ (((uint64_t)reinterpret_cast<const uint8_t *>(data)[6]) << 8) |
+ ((uint64_t)reinterpret_cast<const uint8_t *>(data)[7])
+ );
+ const Address destination(reinterpret_cast<const uint8_t *>(data) + 8,ZT_ADDRESS_LENGTH);
+ const Address source(reinterpret_cast<const uint8_t *>(data) + 13,ZT_ADDRESS_LENGTH);
+
+ // Catch this and toss it -- it would never work, but it could happen if we somehow
+ // mistakenly guessed an address we're bound to as a destination for another peer.
+ if (source == RR->identity.address())
+ return;
+
+ //TRACE("<< %.16llx %s -> %s (size: %u)",(unsigned long long)packet->packetId(),source.toString().c_str(),destination.toString().c_str(),packet->size());
+
+ if (destination != RR->identity.address()) {
+ Packet packet(data,len);
+
+ // Packet is not for us, so try to relay it
+ if (packet.hops() < ZT_RELAY_MAX_HOPS) {
+ packet.incrementHops();
+
+ SharedPtr<Peer> relayTo = RR->topology->getPeer(destination);
+ if ((relayTo)&&((relayTo->send(packet.data(),packet.size(),now)))) {
+ Mutex::Lock _l(_lastUniteAttempt_m);
+ uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)];
+ if ((now - luts) >= ZT_MIN_UNITE_INTERVAL) {
+ luts = now;
+ unite(source,destination);
+ }
+ } else {
+#ifdef ZT_ENABLE_CLUSTER
+ if (RR->cluster) {
+ bool shouldUnite;
+ {
+ Mutex::Lock _l(_lastUniteAttempt_m);
+ uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)];
+ shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL);
+ if (shouldUnite)
+ luts = now;
+ }
+ RR->cluster->sendViaCluster(source,destination,packet.data(),packet.size(),shouldUnite);
+ return;
+ }
+#endif
+
+ relayTo = RR->topology->getBestRoot(&source,1,true);
+ if (relayTo)
+ relayTo->send(packet.data(),packet.size(),now);
+ }
+ } else {
+ TRACE("dropped relay %s(%s) -> %s, max hops exceeded",packet.source().toString().c_str(),fromAddr.toString().c_str(),destination.toString().c_str());
+ }
+ } else if ((reinterpret_cast<const uint8_t *>(data)[ZT_PACKET_IDX_FLAGS] & ZT_PROTO_FLAG_FRAGMENTED) != 0) {
+ // Packet is the head of a fragmented packet series
+
+ Mutex::Lock _l(_rxQueue_m);
+ RXQueueEntry *const rq = _findRXQueueEntry(now,packetId);
+
+ if ((!rq->timestamp)||(rq->packetId != packetId)) {
+ // If we have no other fragments yet, create an entry and save the head
+ //TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str());
+
+ rq->timestamp = now;
+ rq->packetId = packetId;
+ rq->frag0.init(data,len,localAddr,fromAddr,now);
+ rq->totalFragments = 0;
+ rq->haveFragments = 1;
+ rq->complete = false;
+ } else if (!(rq->haveFragments & 1)) {
+ // If we have other fragments but no head, see if we are complete with the head
+
+ if ((rq->totalFragments > 1)&&(Utils::countBits(rq->haveFragments |= 1) == rq->totalFragments)) {
+ // We have all fragments -- assemble and process full Packet
+ //TRACE("packet %.16llx is complete, assembling and processing...",pid);
+
+ rq->frag0.init(data,len,localAddr,fromAddr,now);
+ for(unsigned int f=1;f<rq->totalFragments;++f)
+ rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength());
+
+ if (rq->frag0.tryDecode(RR,false)) {
+ rq->timestamp = 0; // packet decoded, free entry
+ } else {
+ rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something
+ }
+ } else {
+ // Still waiting on more fragments, but keep the head
+ rq->frag0.init(data,len,localAddr,fromAddr,now);
+ }
+ } // else this is a duplicate head, ignore
+ } else {
+ // Packet is unfragmented, so just process it
+ IncomingPacket packet(data,len,localAddr,fromAddr,now);
+ if (!packet.tryDecode(RR,false)) {
+ Mutex::Lock _l(_rxQueue_m);
+ RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]);
+ unsigned long i = ZT_RX_QUEUE_SIZE - 1;
+ while ((i)&&(rq->timestamp)) {
+ RXQueueEntry *tmp = &(_rxQueue[--i]);
+ if (tmp->timestamp < rq->timestamp)
+ rq = tmp;
+ }
+ rq->timestamp = now;
+ rq->packetId = packetId;
+ rq->frag0 = packet;
+ rq->totalFragments = 1;
+ rq->haveFragments = 1;
+ rq->complete = true;
+ }
+ }
+
+ // --------------------------------------------------------------------
}
}
} catch (std::exception &ex) {
@@ -451,10 +650,13 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
{ // finish processing any packets waiting on peer's public key / identity
Mutex::Lock _l(_rxQueue_m);
- for(std::list< SharedPtr<IncomingPacket> >::iterator rxi(_rxQueue.begin());rxi!=_rxQueue.end();) {
- if ((*rxi)->tryDecode(RR,false))
- _rxQueue.erase(rxi++);
- else ++rxi;
+ unsigned long i = ZT_RX_QUEUE_SIZE;
+ while (i) {
+ RXQueueEntry *rq = &(_rxQueue[--i]);
+ if ((rq->timestamp)&&(rq->complete)) {
+ if (rq->frag0.tryDecode(RR,false))
+ rq->timestamp = 0;
+ }
}
}
@@ -478,31 +680,31 @@ unsigned long Switch::doTimerTasks(uint64_t now)
Mutex::Lock _l(_contactQueue_m);
for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) {
if (now >= qi->fireAtTime) {
- if (qi->peer->hasActiveDirectPath(now)) {
- // Cancel if connection has succeeded
+ if (!qi->peer->pushDirectPaths(qi->localAddr,qi->inaddr,now,true))
+ qi->peer->sendHELLO(qi->localAddr,qi->inaddr,now);
+ _contactQueue.erase(qi++);
+ continue;
+ /* Old symmetric NAT buster code, obsoleted by port prediction alg in SelfAwareness but left around for now in case we revert
+ if (qi->strategyIteration == 0) {
+ // First strategy: send packet directly to destination
+ qi->peer->sendHELLO(qi->localAddr,qi->inaddr,now);
+ } else if (qi->strategyIteration <= 3) {
+ // Strategies 1-3: try escalating ports for symmetric NATs that remap sequentially
+ InetAddress tmpaddr(qi->inaddr);
+ int p = (int)qi->inaddr.port() + qi->strategyIteration;
+ if (p > 65535)
+ p -= 64511;
+ tmpaddr.setPort((unsigned int)p);
+ qi->peer->sendHELLO(qi->localAddr,tmpaddr,now);
+ } else {
+ // All strategies tried, expire entry
_contactQueue.erase(qi++);
continue;
- } else {
- if (qi->strategyIteration == 0) {
- // First strategy: send packet directly to destination
- qi->peer->sendHELLO(qi->localAddr,qi->inaddr,now);
- } else if (qi->strategyIteration <= 3) {
- // Strategies 1-3: try escalating ports for symmetric NATs that remap sequentially
- InetAddress tmpaddr(qi->inaddr);
- int p = (int)qi->inaddr.port() + qi->strategyIteration;
- if (p < 0xffff) {
- tmpaddr.setPort((unsigned int)p);
- qi->peer->sendHELLO(qi->localAddr,tmpaddr,now);
- } else qi->strategyIteration = 5;
- } else {
- // All strategies tried, expire entry
- _contactQueue.erase(qi++);
- continue;
- }
- ++qi->strategyIteration;
- qi->fireAtTime = now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY;
- nextDelay = std::min(nextDelay,(unsigned long)ZT_NAT_T_TACTICAL_ESCALATION_DELAY);
}
+ ++qi->strategyIteration;
+ qi->fireAtTime = now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY;
+ nextDelay = std::min(nextDelay,(unsigned long)ZT_NAT_T_TACTICAL_ESCALATION_DELAY);
+ */
} else {
nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now));
}
@@ -546,29 +748,6 @@ unsigned long Switch::doTimerTasks(uint64_t now)
}
}
- { // Time out RX queue packets that never got WHOIS lookups or other info.
- Mutex::Lock _l(_rxQueue_m);
- for(std::list< SharedPtr<IncomingPacket> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) {
- if ((now - (*i)->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) {
- TRACE("RX %s -> %s timed out",(*i)->source().toString().c_str(),(*i)->destination().toString().c_str());
- _rxQueue.erase(i++);
- } else ++i;
- }
- }
-
- { // Time out packets that didn't get all their fragments.
- Mutex::Lock _l(_defragQueue_m);
- Hashtable< uint64_t,DefragQueueEntry >::Iterator i(_defragQueue);
- uint64_t *packetId = (uint64_t *)0;
- DefragQueueEntry *qe = (DefragQueueEntry *)0;
- while (i.next(packetId,qe)) {
- if ((now - qe->creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) {
- TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",*packetId);
- _defragQueue.erase(*packetId);
- }
- }
- }
-
{ // Remove really old last unite attempt entries to keep table size controlled
Mutex::Lock _l(_lastUniteAttempt_m);
Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt);
@@ -583,180 +762,6 @@ unsigned long Switch::doTimerTasks(uint64_t now)
return nextDelay;
}
-void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len)
-{
- Packet::Fragment fragment(data,len);
- Address destination(fragment.destination());
-
- if (destination != RR->identity.address()) {
- // Fragment is not for us, so try to relay it
- if (fragment.hops() < ZT_RELAY_MAX_HOPS) {
- fragment.incrementHops();
-
- // Note: we don't bother initiating NAT-t for fragments, since heads will set that off.
- // It wouldn't hurt anything, just redundant and unnecessary.
- SharedPtr<Peer> relayTo = RR->topology->getPeer(destination);
- if ((!relayTo)||(!relayTo->send(fragment.data(),fragment.size(),RR->node->now()))) {
-#ifdef ZT_ENABLE_CLUSTER
- if (RR->cluster) {
- RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false);
- return;
- }
-#endif
-
- // Don't know peer or no direct path -- so relay via root server
- relayTo = RR->topology->getBestRoot();
- if (relayTo)
- relayTo->send(fragment.data(),fragment.size(),RR->node->now());
- }
- } else {
- TRACE("dropped relay [fragment](%s) -> %s, max hops exceeded",fromAddr.toString().c_str(),destination.toString().c_str());
- }
- } else {
- // Fragment looks like ours
- uint64_t pid = fragment.packetId();
- unsigned int fno = fragment.fragmentNumber();
- unsigned int tf = fragment.totalFragments();
-
- if ((tf <= ZT_MAX_PACKET_FRAGMENTS)&&(fno < ZT_MAX_PACKET_FRAGMENTS)&&(fno > 0)&&(tf > 1)) {
- // Fragment appears basically sane. Its fragment number must be
- // 1 or more, since a Packet with fragmented bit set is fragment 0.
- // Total fragments must be more than 1, otherwise why are we
- // seeing a Packet::Fragment?
-
- Mutex::Lock _l(_defragQueue_m);
- DefragQueueEntry &dq = _defragQueue[pid];
-
- if (!dq.creationTime) {
- // We received a Packet::Fragment without its head, so queue it and wait
-
- dq.creationTime = RR->node->now();
- dq.frags[fno - 1] = fragment;
- dq.totalFragments = tf; // total fragment count is known
- dq.haveFragments = 1 << fno; // we have only this fragment
- //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str());
- } else if (!(dq.haveFragments & (1 << fno))) {
- // We have other fragments and maybe the head, so add this one and check
-
- dq.frags[fno - 1] = fragment;
- dq.totalFragments = tf;
- //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str());
-
- if (Utils::countBits(dq.haveFragments |= (1 << fno)) == tf) {
- // We have all fragments -- assemble and process full Packet
- //TRACE("packet %.16llx is complete, assembling and processing...",pid);
-
- SharedPtr<IncomingPacket> packet(dq.frag0);
- for(unsigned int f=1;f<tf;++f)
- packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
- _defragQueue.erase(pid); // dq no longer valid after this
-
- if (!packet->tryDecode(RR,false)) {
- Mutex::Lock _l(_rxQueue_m);
- _rxQueue.push_back(packet);
- }
- }
- } // else this is a duplicate fragment, ignore
- }
- }
-}
-
-void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len)
-{
- const uint64_t now = RR->node->now();
- SharedPtr<IncomingPacket> packet(new IncomingPacket(data,len,localAddr,fromAddr,now));
-
- Address source(packet->source());
- Address destination(packet->destination());
-
- // Catch this and toss it -- it would never work, but it could happen if we somehow
- // mistakenly guessed an address we're bound to as a destination for another peer.
- if (source == RR->identity.address())
- return;
-
- //TRACE("<< %.16llx %s -> %s (size: %u)",(unsigned long long)packet->packetId(),source.toString().c_str(),destination.toString().c_str(),packet->size());
-
- if (destination != RR->identity.address()) {
- // Packet is not for us, so try to relay it
- if (packet->hops() < ZT_RELAY_MAX_HOPS) {
- packet->incrementHops();
-
- SharedPtr<Peer> relayTo = RR->topology->getPeer(destination);
- if ((relayTo)&&((relayTo->send(packet->data(),packet->size(),now)))) {
- Mutex::Lock _l(_lastUniteAttempt_m);
- uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)];
- if ((now - luts) >= ZT_MIN_UNITE_INTERVAL) {
- luts = now;
- unite(source,destination);
- }
- } else {
-#ifdef ZT_ENABLE_CLUSTER
- if (RR->cluster) {
- bool shouldUnite;
- {
- Mutex::Lock _l(_lastUniteAttempt_m);
- uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)];
- shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL);
- if (shouldUnite)
- luts = now;
- }
- RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size(),shouldUnite);
- return;
- }
-#endif
-
- relayTo = RR->topology->getBestRoot(&source,1,true);
- if (relayTo)
- relayTo->send(packet->data(),packet->size(),now);
- }
- } else {
- TRACE("dropped relay %s(%s) -> %s, max hops exceeded",packet->source().toString().c_str(),fromAddr.toString().c_str(),destination.toString().c_str());
- }
- } else if (packet->fragmented()) {
- // Packet is the head of a fragmented packet series
-
- uint64_t pid = packet->packetId();
- Mutex::Lock _l(_defragQueue_m);
- DefragQueueEntry &dq = _defragQueue[pid];
-
- if (!dq.creationTime) {
- // If we have no other fragments yet, create an entry and save the head
-
- dq.creationTime = now;
- dq.frag0 = packet;
- dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment
- dq.haveFragments = 1; // head is first bit (left to right)
- //TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str());
- } else if (!(dq.haveFragments & 1)) {
- // If we have other fragments but no head, see if we are complete with the head
-
- if ((dq.totalFragments)&&(Utils::countBits(dq.haveFragments |= 1) == dq.totalFragments)) {
- // We have all fragments -- assemble and process full Packet
-
- //TRACE("packet %.16llx is complete, assembling and processing...",pid);
- // packet already contains head, so append fragments
- for(unsigned int f=1;f<dq.totalFragments;++f)
- packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
- _defragQueue.erase(pid); // dq no longer valid after this
-
- if (!packet->tryDecode(RR,false)) {
- Mutex::Lock _l(_rxQueue_m);
- _rxQueue.push_back(packet);
- }
- } else {
- // Still waiting on more fragments, so queue the head
- dq.frag0 = packet;
- }
- } // else this is a duplicate head, ignore
- } else {
- // Packet is unfragmented, so just process it
- if (!packet->tryDecode(RR,false)) {
- Mutex::Lock _l(_rxQueue_m);
- _rxQueue.push_back(packet);
- }
- }
-}
-
Address Switch::_sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted)
{
SharedPtr<Peer> root(RR->topology->getBestRoot(peersAlreadyConsulted,numPeersAlreadyConsulted,false));
@@ -813,12 +818,13 @@ bool Switch::_trySend(const Packet &packet,bool encrypt,uint64_t nwid)
relay = RR->topology->getBestRoot();
if (!(relay)||(!(viaPath = relay->getBestPath(now))))
- return false; // no paths, no root servers?
+ return false; // no paths, no root servers?, no relays? :P~~~
}
if ((network)&&(relay)&&(network->isAllowed(peer))) {
// Push hints for direct connectivity to this peer if we are relaying
- peer->pushDirectPaths(viaPath,now,false);
+ peer->pushDirectPaths(viaPath->localAddress(),viaPath->address(),now,false);
+ viaPath->sent(now);
}
Packet tmp(packet);
diff --git a/node/Switch.hpp b/node/Switch.hpp
index f77bf86c..ce4f00a1 100644
--- a/node/Switch.hpp
+++ b/node/Switch.hpp
@@ -150,8 +150,6 @@ public:
unsigned long doTimerTasks(uint64_t now);
private:
- void _handleRemotePacketFragment(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len);
- void _handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len);
Address _sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted);
bool _trySend(const Packet &packet,bool encrypt,uint64_t nwid);
@@ -169,23 +167,40 @@ private:
Hashtable< Address,WhoisRequest > _outstandingWhoisRequests;
Mutex _outstandingWhoisRequests_m;
- // Packet defragmentation queue -- comes before RX queue in path
- struct DefragQueueEntry
+ // Packets waiting for WHOIS replies or other decode info or missing fragments
+ struct RXQueueEntry
{
- DefragQueueEntry() : creationTime(0),totalFragments(0),haveFragments(0) {}
- uint64_t creationTime;
- SharedPtr<IncomingPacket> frag0;
- Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1];
+ RXQueueEntry() : timestamp(0) {}
+ uint64_t timestamp; // 0 if entry is not in use
+ uint64_t packetId;
+ IncomingPacket frag0; // head of packet
+ Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1]; // later fragments (if any)
unsigned int totalFragments; // 0 if only frag0 received, waiting for frags
uint32_t haveFragments; // bit mask, LSB to MSB
+ bool complete; // if true, packet is complete
};
- Hashtable< uint64_t,DefragQueueEntry > _defragQueue;
- Mutex _defragQueue_m;
-
- // ZeroTier-layer RX queue of incoming packets in the process of being decoded
- std::list< SharedPtr<IncomingPacket> > _rxQueue;
+ RXQueueEntry _rxQueue[ZT_RX_QUEUE_SIZE];
Mutex _rxQueue_m;
+ /* Returns the matching or oldest entry. Caller must check timestamp and
+ * packet ID to determine which. */
+ inline RXQueueEntry *_findRXQueueEntry(uint64_t now,uint64_t packetId)
+ {
+ RXQueueEntry *rq;
+ RXQueueEntry *oldest = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]);
+ unsigned long i = ZT_RX_QUEUE_SIZE;
+ while (i) {
+ rq = &(_rxQueue[--i]);
+ if ((rq->packetId == packetId)&&(rq->timestamp))
+ return rq;
+ if ((now - rq->timestamp) >= ZT_RX_QUEUE_EXPIRE)
+ rq->timestamp = 0;
+ if (rq->timestamp < oldest->timestamp)
+ oldest = rq;
+ }
+ return oldest;
+ }
+
// ZeroTier-layer TX queue entry
struct TXQueueEntry
{
diff --git a/node/Utils.hpp b/node/Utils.hpp
index 17c00459..3f4cc765 100644
--- a/node/Utils.hpp
+++ b/node/Utils.hpp
@@ -51,9 +51,9 @@ public:
static inline bool secureEq(const void *a,const void *b,unsigned int len)
throw()
{
- char diff = 0;
+ uint8_t diff = 0;
for(unsigned int i=0;i<len;++i)
- diff |= ( (reinterpret_cast<const char *>(a))[i] ^ (reinterpret_cast<const char *>(b))[i] );
+ diff |= ( (reinterpret_cast<const uint8_t *>(a))[i] ^ (reinterpret_cast<const uint8_t *>(b))[i] );
return (diff == 0);
}