diff options
Diffstat (limited to 'node')
| -rw-r--r-- | node/CertificateOfMembership.hpp | 10 | ||||
| -rw-r--r-- | node/Cluster.cpp | 15 | ||||
| -rw-r--r-- | node/Cluster.hpp | 13 | ||||
| -rw-r--r-- | node/Constants.hpp | 63 | ||||
| -rw-r--r-- | node/DeferredPackets.cpp | 48 | ||||
| -rw-r--r-- | node/DeferredPackets.hpp | 13 | ||||
| -rw-r--r-- | node/IncomingPacket.cpp | 23 | ||||
| -rw-r--r-- | node/IncomingPacket.hpp | 47 | ||||
| -rw-r--r-- | node/InetAddress.cpp | 4 | ||||
| -rw-r--r-- | node/Node.cpp | 11 | ||||
| -rw-r--r-- | node/Packet.hpp | 2 | ||||
| -rw-r--r-- | node/Path.hpp | 10 | ||||
| -rw-r--r-- | node/Peer.cpp | 115 | ||||
| -rw-r--r-- | node/Peer.hpp | 6 | ||||
| -rw-r--r-- | node/SelfAwareness.cpp | 86 | ||||
| -rw-r--r-- | node/SelfAwareness.hpp | 15 | ||||
| -rw-r--r-- | node/Switch.cpp | 470 | ||||
| -rw-r--r-- | node/Switch.hpp | 41 | ||||
| -rw-r--r-- | node/Utils.hpp | 4 |
19 files changed, 587 insertions, 409 deletions
diff --git a/node/CertificateOfMembership.hpp b/node/CertificateOfMembership.hpp index c6d59397..9a279883 100644 --- a/node/CertificateOfMembership.hpp +++ b/node/CertificateOfMembership.hpp @@ -33,6 +33,16 @@ #include "Identity.hpp" #include "Utils.hpp" +/** + * Default window of time for certificate agreement + * + * Right now we use time for 'revision' so this is the maximum time divergence + * between two certs for them to agree. It comes out to five minutes, which + * gives a lot of margin for error if the controller hiccups or its clock + * drifts but causes de-authorized peers to fall off fast enough. + */ +#define ZT_NETWORK_COM_DEFAULT_REVISION_MAX_DELTA (ZT_NETWORK_AUTOCONF_DELAY * 5) + namespace ZeroTier { /** diff --git a/node/Cluster.cpp b/node/Cluster.cpp index e4c4524a..61903307 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -570,10 +570,19 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee Mutex::Lock _l2(_members[mostRecentMemberId].lock); if (buf.size() > 0) _send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); - if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0) { - TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId); - RR->node->putPacket(InetAddress(),_members[mostRecentMemberId].zeroTierPhysicalEndpoints.front(),data,len); + + for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) { + for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) { + if (i1->ss_family == i2->ss_family) { + TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str()); + RR->node->putPacket(*i1,*i2,data,len); + return; + } + } } + + TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId); + return; } } diff --git a/node/Cluster.hpp b/node/Cluster.hpp index e21d6020..dafbf425 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -47,22 +47,22 @@ /** * Desired period between doPeriodicTasks() in milliseconds */ -#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 50 +#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 20 /** * How often to flush outgoing message queues (maximum interval) */ -#define ZT_CLUSTER_FLUSH_PERIOD 100 +#define ZT_CLUSTER_FLUSH_PERIOD ZT_CLUSTER_PERIODIC_TASK_PERIOD /** * Maximum number of queued outgoing packets per sender address */ -#define ZT_CLUSTER_MAX_QUEUE_PER_SENDER 8 +#define ZT_CLUSTER_MAX_QUEUE_PER_SENDER 16 /** * Expiration time for send queue entries */ -#define ZT_CLUSTER_QUEUE_EXPIRATION 5000 +#define ZT_CLUSTER_QUEUE_EXPIRATION 3000 /** * Chunk size for allocating queue entries @@ -85,11 +85,8 @@ /** * Max data per queue entry - * - * If we ever support larger transport MTUs this must be increased. The plus - * 16 is just a small margin and has no special meaning. */ -#define ZT_CLUSTER_SEND_QUEUE_DATA_MAX (ZT_UDP_DEFAULT_PAYLOAD_MTU + 16) +#define ZT_CLUSTER_SEND_QUEUE_DATA_MAX 1500 namespace ZeroTier { diff --git a/node/Constants.hpp b/node/Constants.hpp index 4d6c9d07..6c44a8dc 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -51,8 +51,20 @@ #include <endian.h> #endif -// Disable type punning on ARM architecture -- some ARM chips throw SIGBUS on unaligned access -#if defined(__arm__) || defined(__ARMEL__) +#ifdef __APPLE__ +#include <TargetConditionals.h> +#ifndef __UNIX_LIKE__ +#define __UNIX_LIKE__ +#endif +#ifndef __BSD__ +#define __BSD__ +#endif +#include <machine/endian.h> +#endif + +// Defined this macro to disable "type punning" on a number of targets that +// have issues with unaligned memory access. +#if defined(__arm__) || defined(__ARMEL__) || (defined(__APPLE__) && ( (defined(TARGET_OS_IPHONE) && (TARGET_OS_IPHONE != 0)) || (defined(TARGET_OS_WATCH) && (TARGET_OS_WATCH != 0)) || (defined(TARGET_IPHONE_SIMULATOR) && (TARGET_IPHONE_SIMULATOR != 0)) ) ) #ifndef ZT_NO_TYPE_PUNNING #define ZT_NO_TYPE_PUNNING #endif @@ -73,18 +85,6 @@ #endif #endif -// TODO: Android is what? Linux technically, but does it define it? - -#ifdef __APPLE__ -#include <TargetConditionals.h> -#ifndef __UNIX_LIKE__ -#define __UNIX_LIKE__ -#endif -#ifndef __BSD__ -#define __BSD__ -#endif -#endif - #if defined(_WIN32) || defined(_WIN64) #ifndef __WINDOWS__ #define __WINDOWS__ @@ -104,9 +104,8 @@ #include <Windows.h> #endif -// Assume these are little-endian. PPC is not supported for OSX, and ARM -// runs in little-endian mode for these OS families. -#if defined(__APPLE__) || defined(__WINDOWS__) +// Assume little endian if not defined +#if (defined(__APPLE__) || defined(__WINDOWS__)) && (!defined(__BYTE_ORDER)) #undef __BYTE_ORDER #undef __LITTLE_ENDIAN #undef __BIG_ENDIAN @@ -163,9 +162,17 @@ #define ZT_MAX_PACKET_FRAGMENTS 4 /** - * Timeout for receipt of fragmented packets in ms + * Size of RX queue + * + * This is about 2mb, and can be decreased for small devices. A queue smaller + * than about 4 is probably going to cause a lot of lost packets. + */ +#define ZT_RX_QUEUE_SIZE 64 + +/** + * RX queue entries older than this do not "exist" */ -#define ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT 500 +#define ZT_RX_QUEUE_EXPIRE 4000 /** * Length of secret key in bytes -- 256-bit -- do not change @@ -255,12 +262,17 @@ /** * Delay between ordinary case pings of direct links */ -#define ZT_PEER_DIRECT_PING_DELAY 90000 +#define ZT_PEER_DIRECT_PING_DELAY 60000 /** * Timeout for overall peer activity (measured from last receive) */ -#define ZT_PEER_ACTIVITY_TIMEOUT ((ZT_PEER_DIRECT_PING_DELAY * 4) + ZT_PING_CHECK_INVERVAL) +#define ZT_PEER_ACTIVITY_TIMEOUT 500000 + +/** + * Timeout for path activity + */ +#define ZT_PATH_ACTIVITY_TIMEOUT ZT_PEER_ACTIVITY_TIMEOUT /** * No answer timeout to trigger dead path detection @@ -354,6 +366,15 @@ */ #define ZT_TEST_NETWORK_ID 0xffffffffffffffffULL +/** + * Desired buffer size for UDP sockets (used in service and osdep but defined here) + */ +#if (defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) || defined(__AMD64) || defined(__AMD64__)) +#define ZT_UDP_DESIRED_BUF_SIZE 1048576 +#else +#define ZT_UDP_DESIRED_BUF_SIZE 131072 +#endif + /* Ethernet frame types that might be relevant to us */ #define ZT_ETHERTYPE_IPV4 0x0800 #define ZT_ETHERTYPE_ARP 0x0806 diff --git a/node/DeferredPackets.cpp b/node/DeferredPackets.cpp index c8e63fc8..192b4078 100644 --- a/node/DeferredPackets.cpp +++ b/node/DeferredPackets.cpp @@ -26,8 +26,6 @@ namespace ZeroTier { DeferredPackets::DeferredPackets(const RuntimeEnvironment *renv) : RR(renv), - _readPtr(0), - _writePtr(0), _waiting(0), _die(false) { @@ -37,39 +35,45 @@ DeferredPackets::~DeferredPackets() { _q_m.lock(); _die = true; - while (_waiting > 0) { - _q_m.unlock(); + _q_m.unlock(); + + for(;;) { _q_s.post(); + _q_m.lock(); + if (_waiting <= 0) { + _q_m.unlock(); + break; + } else { + _q_m.unlock(); + } } } bool DeferredPackets::enqueue(IncomingPacket *pkt) { - _q_m.lock(); - const unsigned long p = _writePtr % ZT_DEFFEREDPACKETS_MAX; - if (_q[p]) { - _q_m.unlock(); - return false; - } else { - _q[p].setToUnsafe(pkt); - ++_writePtr; - _q_m.unlock(); - _q_s.post(); - return true; + { + Mutex::Lock _l(_q_m); + if (_q.size() >= ZT_DEFFEREDPACKETS_MAX) + return false; + _q.push_back(*pkt); } + _q_s.post(); + return true; } int DeferredPackets::process() { - SharedPtr<IncomingPacket> pkt; + std::list<IncomingPacket> pkt; _q_m.lock(); + if (_die) { _q_m.unlock(); return -1; } - while (_readPtr == _writePtr) { + + while (_q.empty()) { ++_waiting; _q_m.unlock(); _q_s.wait(); @@ -80,10 +84,16 @@ int DeferredPackets::process() return -1; } } - pkt.swap(_q[_readPtr++ % ZT_DEFFEREDPACKETS_MAX]); + + // Move item from _q list to a dummy list here to avoid copying packet + pkt.splice(pkt.end(),_q,_q.begin()); + _q_m.unlock(); - pkt->tryDecode(RR,true); + try { + pkt.front().tryDecode(RR,true); + } catch ( ... ) {} // drop invalids + return 1; } diff --git a/node/DeferredPackets.hpp b/node/DeferredPackets.hpp index 5ba26531..a9855396 100644 --- a/node/DeferredPackets.hpp +++ b/node/DeferredPackets.hpp @@ -19,6 +19,8 @@ #ifndef ZT_DEFERREDPACKETS_HPP #define ZT_DEFERREDPACKETS_HPP +#include <list> + #include "Constants.hpp" #include "SharedPtr.hpp" #include "Mutex.hpp" @@ -28,7 +30,7 @@ /** * Maximum number of deferred packets */ -#define ZT_DEFFEREDPACKETS_MAX 1024 +#define ZT_DEFFEREDPACKETS_MAX 256 namespace ZeroTier { @@ -53,11 +55,6 @@ public: /** * Enqueue a packet * - * Since packets enqueue themselves, they call it with 'this' and we wrap - * them in a SharedPtr<>. This is safe as SharedPtr<> is introspective and - * supports this. This should not be called from any other code outside - * IncomingPacket. - * * @param pkt Packet to process later (possibly in the background) * @return False if queue is full */ @@ -75,10 +72,8 @@ public: int process(); private: - SharedPtr<IncomingPacket> _q[ZT_DEFFEREDPACKETS_MAX]; + std::list<IncomingPacket> _q; const RuntimeEnvironment *const RR; - unsigned long _readPtr; - unsigned long _writePtr; volatile int _waiting; volatile bool _die; Mutex _q_m; diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index efb43d23..35c78b29 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -282,7 +282,7 @@ bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR,SharedPtr<Peer> &peer } if (externalSurfaceAddress) - RR->sa->iam(id.address(),_remoteAddress,externalSurfaceAddress,RR->topology->isRoot(id),RR->node->now()); + RR->sa->iam(id.address(),_localAddress,_remoteAddress,externalSurfaceAddress,RR->topology->isRoot(id),RR->node->now()); Packet outp(id.address(),RR->identity.address(),Packet::VERB_OK); outp.append((unsigned char)Packet::VERB_HELLO); @@ -388,7 +388,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,const SharedPtr<Peer> &p peer->setRemoteVersion(vProto,vMajor,vMinor,vRevision); if (externalSurfaceAddress) - RR->sa->iam(peer->address(),_remoteAddress,externalSurfaceAddress,trusted,RR->node->now()); + RR->sa->iam(peer->address(),_localAddress,_remoteAddress,externalSurfaceAddress,trusted,RR->node->now()); } break; case Packet::VERB_WHOIS: { @@ -934,10 +934,10 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,const Sha switch(addrType) { case 4: { InetAddress a(field(ptr,4),4,at<uint16_t>(ptr + 4)); - if ( ((flags & 0x01) == 0) && (Path::isAddressValidForPath(a)) && (!peer->hasActivePathTo(now,a)) && (RR->node->shouldUsePathForZeroTierTraffic(_localAddress,a)) ) { + if ( ((flags & 0x01) == 0) && (!peer->hasActivePathTo(now,a)) && (RR->node->shouldUsePathForZeroTierTraffic(_localAddress,a)) ) { if (++countPerScope[(int)a.ipScope()][0] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) { TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str()); - peer->sendHELLO(_localAddress,a,now); + peer->sendHELLO(InetAddress(),a,now); } else { TRACE("ignoring contact for %s at %s -- too many per scope",peer->address().toString().c_str(),a.toString().c_str()); } @@ -945,10 +945,10 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,const Sha } break; case 6: { InetAddress a(field(ptr,16),16,at<uint16_t>(ptr + 16)); - if ( ((flags & 0x01) == 0) && (Path::isAddressValidForPath(a)) && (!peer->hasActivePathTo(now,a)) && (RR->node->shouldUsePathForZeroTierTraffic(_localAddress,a)) ) { + if ( ((flags & 0x01) == 0) && (!peer->hasActivePathTo(now,a)) && (RR->node->shouldUsePathForZeroTierTraffic(_localAddress,a)) ) { if (++countPerScope[(int)a.ipScope()][1] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) { TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str()); - peer->sendHELLO(_localAddress,a,now); + peer->sendHELLO(InetAddress(),a,now); } else { TRACE("ignoring contact for %s at %s -- too many per scope",peer->address().toString().c_str(),a.toString().c_str()); } @@ -1016,8 +1016,9 @@ bool IncomingPacket::_doCIRCUIT_TEST(const RuntimeEnvironment *RR,const SharedPt if (previousHopCredentialLength >= 1) { switch((*this)[ZT_PACKET_IDX_PAYLOAD + 31 + vlf]) { case 0x01: { // network certificate of membership for previous hop - if (previousHopCom.deserialize(*this,ZT_PACKET_IDX_PAYLOAD + 32 + vlf) != (previousHopCredentialLength - 1)) { - TRACE("dropped CIRCUIT_TEST from %s(%s): previous hop COM invalid",source().toString().c_str(),_remoteAddress.toString().c_str()); + const unsigned int phcl = previousHopCom.deserialize(*this,ZT_PACKET_IDX_PAYLOAD + 32 + vlf); + if (phcl != (previousHopCredentialLength - 1)) { + TRACE("dropped CIRCUIT_TEST from %s(%s): previous hop COM invalid (%u != %u)",source().toString().c_str(),_remoteAddress.toString().c_str(),phcl,(previousHopCredentialLength - 1)); return true; } } break; @@ -1033,7 +1034,7 @@ bool IncomingPacket::_doCIRCUIT_TEST(const RuntimeEnvironment *RR,const SharedPt SharedPtr<Network> nw(RR->node->network(originatorCredentialNetworkId)); if (nw) { originatorCredentialNetworkConfig = nw->config2(); - if ( (originatorCredentialNetworkConfig) && ((originatorCredentialNetworkConfig->isPublic())||(peer->address() == originatorAddress)||((originatorCredentialNetworkConfig->com())&&(previousHopCom)&&(originatorCredentialNetworkConfig->com().agreesWith(previousHopCom)))) ) { + if ( (originatorCredentialNetworkConfig) && ( (originatorCredentialNetworkConfig->isPublic()) || (peer->address() == originatorAddress) || ((originatorCredentialNetworkConfig->com())&&(previousHopCom)&&(originatorCredentialNetworkConfig->com().agreesWith(previousHopCom))) ) ) { TRACE("CIRCUIT_TEST %.16llx received from hop %s(%s) and originator %s with valid network ID credential %.16llx (verified from originator and next hop)",testId,source().toString().c_str(),_remoteAddress.toString().c_str(),originatorAddress.toString().c_str(),originatorCredentialNetworkId); } else { TRACE("dropped CIRCUIT_TEST from %s(%s): originator %s specified network ID %.16llx as credential, and previous hop %s did not supply a valid COM",source().toString().c_str(),_remoteAddress.toString().c_str(),originatorAddress.toString().c_str(),originatorCredentialNetworkId,peer->address().toString().c_str()); @@ -1078,7 +1079,7 @@ bool IncomingPacket::_doCIRCUIT_TEST(const RuntimeEnvironment *RR,const SharedPt Packet outp(originatorAddress,RR->identity.address(),Packet::VERB_CIRCUIT_TEST_REPORT); outp.append((uint64_t)timestamp); outp.append((uint64_t)testId); - outp.append((uint64_t)now); + outp.append((uint64_t)0); // field reserved for future use outp.append((uint8_t)ZT_VENDOR_ZEROTIER); outp.append((uint8_t)ZT_PROTO_VERSION); outp.append((uint8_t)ZEROTIER_ONE_VERSION_MAJOR); @@ -1111,7 +1112,7 @@ bool IncomingPacket::_doCIRCUIT_TEST(const RuntimeEnvironment *RR,const SharedPt if ((originatorCredentialNetworkConfig)&&(!originatorCredentialNetworkConfig->isPublic())&&(originatorCredentialNetworkConfig->com())) { outp.append((uint8_t)0x01); // COM originatorCredentialNetworkConfig->com().serialize(outp); - outp.setAt<uint16_t>(previousHopCredentialPos,(uint16_t)(size() - previousHopCredentialPos)); + outp.setAt<uint16_t>(previousHopCredentialPos,(uint16_t)(outp.size() - (previousHopCredentialPos + 2))); } if (remainingHopsPtr < size()) outp.append(field(remainingHopsPtr,size() - remainingHopsPtr),size() - remainingHopsPtr); diff --git a/node/IncomingPacket.hpp b/node/IncomingPacket.hpp index 0df20034..96e46c00 100644 --- a/node/IncomingPacket.hpp +++ b/node/IncomingPacket.hpp @@ -24,8 +24,6 @@ #include "Packet.hpp" #include "InetAddress.hpp" #include "Utils.hpp" -#include "SharedPtr.hpp" -#include "AtomicCounter.hpp" #include "MulticastGroup.hpp" #include "Peer.hpp" @@ -55,9 +53,21 @@ class Network; */ class IncomingPacket : public Packet { - friend class SharedPtr<IncomingPacket>; - public: + IncomingPacket() : + Packet(), + _receiveTime(0), + _localAddress(), + _remoteAddress() + { + } + + IncomingPacket(const IncomingPacket &p) + { + // All fields including InetAddress are memcpy'able + memcpy(this,&p,sizeof(IncomingPacket)); + } + /** * Create a new packet-in-decode * @@ -72,9 +82,33 @@ public: Packet(data,len), _receiveTime(now), _localAddress(localAddress), - _remoteAddress(remoteAddress), - __refCount() + _remoteAddress(remoteAddress) + { + } + + inline IncomingPacket &operator=(const IncomingPacket &p) + { + // All fields including InetAddress are memcpy'able + memcpy(this,&p,sizeof(IncomingPacket)); + return *this; + } + + /** + * Init packet-in-decode in place + * + * @param data Packet data + * @param len Packet length + * @param localAddress Local interface address + * @param remoteAddress Address from which packet came + * @param now Current time + * @throws std::out_of_range Range error processing packet + */ + inline void init(const void *data,unsigned int len,const InetAddress &localAddress,const InetAddress &remoteAddress,uint64_t now) { + copyFrom(data,len); + _receiveTime = now; + _localAddress = localAddress; + _remoteAddress = remoteAddress; } /** @@ -154,7 +188,6 @@ private: uint64_t _receiveTime; InetAddress _localAddress; InetAddress _remoteAddress; - AtomicCounter __refCount; }; } // namespace ZeroTier diff --git a/node/InetAddress.cpp b/node/InetAddress.cpp index d5cd227c..dca772e8 100644 --- a/node/InetAddress.cpp +++ b/node/InetAddress.cpp @@ -127,8 +127,10 @@ void InetAddress::set(const void *ipBytes,unsigned int ipLen,unsigned int port) { memset(this,0,sizeof(InetAddress)); if (ipLen == 4) { + uint32_t ipb[1]; + memcpy(ipb,ipBytes,4); ss_family = AF_INET; - reinterpret_cast<struct sockaddr_in *>(this)->sin_addr.s_addr = *(reinterpret_cast<const uint32_t *>(ipBytes)); + reinterpret_cast<struct sockaddr_in *>(this)->sin_addr.s_addr = ipb[0]; reinterpret_cast<struct sockaddr_in *>(this)->sin_port = Utils::hton((uint16_t)port); } else if (ipLen == 16) { ss_family = AF_INET6; diff --git a/node/Node.cpp b/node/Node.cpp index bdeca701..1934ef7d 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -493,10 +493,10 @@ int Node::addLocalInterfaceAddress(const struct sockaddr_storage *addr) { if (Path::isAddressValidForPath(*(reinterpret_cast<const InetAddress *>(addr)))) { Mutex::Lock _l(_directPaths_m); - _directPaths.push_back(*(reinterpret_cast<const InetAddress *>(addr))); - std::sort(_directPaths.begin(),_directPaths.end()); - _directPaths.erase(std::unique(_directPaths.begin(),_directPaths.end()),_directPaths.end()); - return 1; + if (std::find(_directPaths.begin(),_directPaths.end(),*(reinterpret_cast<const InetAddress *>(addr))) == _directPaths.end()) { + _directPaths.push_back(*(reinterpret_cast<const InetAddress *>(addr))); + return 1; + } } return 0; } @@ -670,6 +670,9 @@ std::string Node::dataStoreGet(const char *name) bool Node::shouldUsePathForZeroTierTraffic(const InetAddress &localAddress,const InetAddress &remoteAddress) { + if (!Path::isAddressValidForPath(remoteAddress)) + return false; + { Mutex::Lock _l(_networks_m); for(std::vector< std::pair< uint64_t, SharedPtr<Network> > >::const_iterator i=_networks.begin();i!=_networks.end();++i) { diff --git a/node/Packet.hpp b/node/Packet.hpp index 2381397b..7d1e5c68 100644 --- a/node/Packet.hpp +++ b/node/Packet.hpp @@ -934,7 +934,7 @@ public: * Circuit test hop report: * <[8] 64-bit timestamp (from original test)> * <[8] 64-bit test ID (from original test)> - * <[8] 64-bit reporter timestamp (reporter's clock, 0 if unspec)> + * <[8] 64-bit reserved field (set to 0, currently unused)> * <[1] 8-bit vendor ID (set to 0, currently unused)> * <[1] 8-bit reporter protocol version> * <[1] 8-bit reporter major version> diff --git a/node/Path.hpp b/node/Path.hpp index 668728e0..cb7622d3 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -127,7 +127,7 @@ public: inline bool active(uint64_t now) const throw() { - return (((now - _lastReceived) < ZT_PEER_ACTIVITY_TIMEOUT)&&(_probation < ZT_PEER_DEAD_PATH_DETECTION_MAX_PROBATION)); + return (((now - _lastReceived) < ZT_PATH_ACTIVITY_TIMEOUT)&&(_probation < ZT_PEER_DEAD_PATH_DETECTION_MAX_PROBATION)); } /** @@ -243,6 +243,14 @@ public: case InetAddress::IP_SCOPE_PSEUDOPRIVATE: case InetAddress::IP_SCOPE_SHARED: case InetAddress::IP_SCOPE_GLOBAL: + if (a.ss_family == AF_INET6) { + // TEMPORARY HACK: for now, we are going to blacklist he.net IPv6 + // tunnels due to very spotty performance and low MTU issues over + // these IPv6 tunnel links. + const uint8_t *ipd = reinterpret_cast<const uint8_t *>(reinterpret_cast<const struct sockaddr_in6 *>(&a)->sin6_addr.s6_addr); + if ((ipd[0] == 0x20)&&(ipd[1] == 0x01)&&(ipd[2] == 0x04)&&(ipd[3] == 0x70)) + return false; + } return true; default: return false; diff --git a/node/Peer.cpp b/node/Peer.cpp index 17f9b2ef..87ca94e1 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -240,70 +240,83 @@ bool Peer::doPingAndKeepalive(uint64_t now,int inetAddressFamily) return false; } -void Peer::pushDirectPaths(Path *path,uint64_t now,bool force) +bool Peer::pushDirectPaths(const InetAddress &localAddr,const InetAddress &toAddress,uint64_t now,bool force) { #ifdef ZT_ENABLE_CLUSTER // Cluster mode disables normal PUSH_DIRECT_PATHS in favor of cluster-based peer redirection if (RR->cluster) - return; + return false; #endif - if (((now - _lastDirectPathPushSent) >= ZT_DIRECT_PATH_PUSH_INTERVAL)||(force)) { - _lastDirectPathPushSent = now; + if (!force) { + if ((now - _lastDirectPathPushSent) < ZT_DIRECT_PATH_PUSH_INTERVAL) + return false; + else _lastDirectPathPushSent = now; + } - std::vector<InetAddress> dps(RR->node->directPaths()); - if (dps.empty()) - return; + std::vector<InetAddress> dps(RR->node->directPaths()); + std::vector<InetAddress> sym(RR->sa->getSymmetricNatPredictions()); + for(unsigned long i=0,added=0;i<sym.size();++i) { + InetAddress tmp(sym[(unsigned long)RR->node->prng() % sym.size()]); + if (std::find(dps.begin(),dps.end(),tmp) == dps.end()) { + dps.push_back(tmp); + if (++added >= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) + break; + } + } + if (dps.empty()) + return false; #ifdef ZT_TRACE - { - std::string ps; - for(std::vector<InetAddress>::const_iterator p(dps.begin());p!=dps.end();++p) { - if (ps.length() > 0) - ps.push_back(','); - ps.append(p->toString()); - } - TRACE("pushing %u direct paths to %s: %s",(unsigned int)dps.size(),_id.address().toString().c_str(),ps.c_str()); + { + std::string ps; + for(std::vector<InetAddress>::const_iterator p(dps.begin());p!=dps.end();++p) { + if (ps.length() > 0) + ps.push_back(','); + ps.append(p->toString()); } + TRACE("pushing %u direct paths to %s: %s",(unsigned int)dps.size(),_id.address().toString().c_str(),ps.c_str()); + } #endif - std::vector<InetAddress>::const_iterator p(dps.begin()); - while (p != dps.end()) { - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); - outp.addSize(2); // leave room for count - - unsigned int count = 0; - while ((p != dps.end())&&((outp.size() + 24) < ZT_PROTO_MAX_PACKET_LENGTH)) { - uint8_t addressType = 4; - switch(p->ss_family) { - case AF_INET: - break; - case AF_INET6: - addressType = 6; - break; - default: // we currently only push IP addresses - ++p; - continue; - } + std::vector<InetAddress>::const_iterator p(dps.begin()); + while (p != dps.end()) { + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); + outp.addSize(2); // leave room for count + + unsigned int count = 0; + while ((p != dps.end())&&((outp.size() + 24) < 1200)) { + uint8_t addressType = 4; + switch(p->ss_family) { + case AF_INET: + break; + case AF_INET6: + addressType = 6; + break; + default: // we currently only push IP addresses + ++p; + continue; + } - outp.append((uint8_t)0); // no flags - outp.append((uint16_t)0); // no extensions - outp.append(addressType); - outp.append((uint8_t)((addressType == 4) ? 6 : 18)); - outp.append(p->rawIpData(),((addressType == 4) ? 4 : 16)); - outp.append((uint16_t)p->port()); + outp.append((uint8_t)0); // no flags + outp.append((uint16_t)0); // no extensions + outp.append(addressType); + outp.append((uint8_t)((addressType == 4) ? 6 : 18)); + outp.append(p->rawIpData(),((addressType == 4) ? 4 : 16)); + outp.append((uint16_t)p->port()); - ++count; - ++p; - } + ++count; + ++p; + } - if (count) { - outp.setAt(ZT_PACKET_IDX_PAYLOAD,(uint16_t)count); - outp.armor(_key,true); - path->send(RR,outp.data(),outp.size(),now); - } + if (count) { + outp.setAt(ZT_PACKET_IDX_PAYLOAD,(uint16_t)count); + outp.armor(_key,true); + RR->node->putPacket(localAddr,toAddress,outp.data(),outp.size(),0); } } + + return true; } bool Peer::resetWithinScope(InetAddress::IpScope scope,uint64_t now) @@ -373,7 +386,7 @@ bool Peer::validateAndSetNetworkMembershipCertificate(uint64_t nwid,const Certif // Check signature, log and return if cert is invalid if (com.signedBy() != Network::controllerFor(nwid)) { - TRACE("rejected network membership certificate for %.16llx signed by %s: signer not a controller of this network",(unsigned long long)_id,com.signedBy().toString().c_str()); + TRACE("rejected network membership certificate for %.16llx signed by %s: signer not a controller of this network",(unsigned long long)nwid,com.signedBy().toString().c_str()); return false; // invalid signer } @@ -382,7 +395,7 @@ bool Peer::validateAndSetNetworkMembershipCertificate(uint64_t nwid,const Certif // We are the controller: RR->identity.address() == controller() == cert.signedBy() // So, verify that we signed th cert ourself if (!com.verify(RR->identity)) { - TRACE("rejected network membership certificate for %.16llx self signed by %s: signature check failed",(unsigned long long)_id,com.signedBy().toString().c_str()); + TRACE("rejected network membership certificate for %.16llx self signed by %s: signature check failed",(unsigned long long)nwid,com.signedBy().toString().c_str()); return false; // invalid signature } @@ -398,7 +411,7 @@ bool Peer::validateAndSetNetworkMembershipCertificate(uint64_t nwid,const Certif } if (!com.verify(signer->identity())) { - TRACE("rejected network membership certificate for %.16llx signed by %s: signature check failed",(unsigned long long)_id,com.signedBy().toString().c_str()); + TRACE("rejected network membership certificate for %.16llx signed by %s: signature check failed",(unsigned long long)nwid,com.signedBy().toString().c_str()); return false; // invalid signature } } @@ -419,7 +432,7 @@ bool Peer::needsOurNetworkMembershipCertificate(uint64_t nwid,uint64_t now,bool const uint64_t tmp = lastPushed; if (updateLastPushedTime) lastPushed = now; - return ((now - tmp) >= (ZT_NETWORK_AUTOCONF_DELAY / 2)); + return ((now - tmp) >= (ZT_NETWORK_AUTOCONF_DELAY / 3)); } void Peer::clean(uint64_t now) diff --git a/node/Peer.hpp b/node/Peer.hpp index 5796baf9..94c58ae8 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -170,11 +170,13 @@ public: /** * Push direct paths back to self if we haven't done so in the configured timeout * - * @param path Remote path to use to send the push + * @param localAddr Local address + * @param toAddress Remote address to send push to (usually from path) * @param now Current time * @param force If true, push regardless of rate limit + * @return True if something was actually sent */ - void pushDirectPaths(Path *path,uint64_t now,bool force); + bool pushDirectPaths(const InetAddress &localAddr,const InetAddress &toAddress,uint64_t now,bool force); /** * @return All known direct paths to this peer (active or inactive) diff --git a/node/SelfAwareness.cpp b/node/SelfAwareness.cpp index db069046..8bed0c51 100644 --- a/node/SelfAwareness.cpp +++ b/node/SelfAwareness.cpp @@ -20,6 +20,9 @@ #include <stdlib.h> #include <string.h> +#include <set> +#include <vector> + #include "Constants.hpp" #include "SelfAwareness.hpp" #include "RuntimeEnvironment.hpp" @@ -64,34 +67,18 @@ SelfAwareness::~SelfAwareness() { } -void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysicalAddress,const InetAddress &myPhysicalAddress,bool trusted,uint64_t now) +void SelfAwareness::iam(const Address &reporter,const InetAddress &receivedOnLocalAddress,const InetAddress &reporterPhysicalAddress,const InetAddress &myPhysicalAddress,bool trusted,uint64_t now) { const InetAddress::IpScope scope = myPhysicalAddress.ipScope(); - // This would be weird, e.g. a public IP talking to 10.0.0.1, so just ignore it. - // If your network is this weird it's probably not reliable information. - if (scope != reporterPhysicalAddress.ipScope()) + if ((scope != reporterPhysicalAddress.ipScope())||(scope == InetAddress::IP_SCOPE_NONE)||(scope == InetAddress::IP_SCOPE_LOOPBACK)||(scope == InetAddress::IP_SCOPE_MULTICAST)) return; - // Some scopes we ignore, and global scope IPs are only used for this - // mechanism if they come from someone we trust (e.g. a root). - switch(scope) { - case InetAddress::IP_SCOPE_NONE: - case InetAddress::IP_SCOPE_LOOPBACK: - case InetAddress::IP_SCOPE_MULTICAST: - return; - case InetAddress::IP_SCOPE_GLOBAL: - if (!trusted) - return; - break; - default: - break; - } - Mutex::Lock _l(_phy_m); - PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,reporterPhysicalAddress,scope)]; + PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,receivedOnLocalAddress,reporterPhysicalAddress,scope)]; - if ( ((now - entry.ts) < ZT_SELFAWARENESS_ENTRY_TIMEOUT) && (!entry.mySurface.ipsEqual(myPhysicalAddress)) ) { + if ( (trusted) && ((now - entry.ts) < ZT_SELFAWARENESS_ENTRY_TIMEOUT) && (!entry.mySurface.ipsEqual(myPhysicalAddress)) ) { + // Changes to external surface reported by trusted peers causes path reset in this scope entry.mySurface = myPhysicalAddress; entry.ts = now; TRACE("physical address %s for scope %u as seen from %s(%s) differs from %s, resetting paths in scope",myPhysicalAddress.toString().c_str(),(unsigned int)scope,reporter.toString().c_str(),reporterPhysicalAddress.toString().c_str(),entry.mySurface.toString().c_str()); @@ -123,6 +110,7 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi } } } else { + // Otherwise just update DB to use to determine external surface info entry.mySurface = myPhysicalAddress; entry.ts = now; } @@ -140,4 +128,60 @@ void SelfAwareness::clean(uint64_t now) } } +std::vector<InetAddress> SelfAwareness::getSymmetricNatPredictions() +{ + /* This is based on ideas and strategies found here: + * https://tools.ietf.org/html/draft-takeda-symmetric-nat-traversal-00 + * + * In short: a great many symmetric NATs allocate ports sequentially. + * This is common on enterprise and carrier grade NATs as well as consumer + * devices. This code generates a list of "you might try this" addresses by + * extrapolating likely port assignments from currently known external + * global IPv4 surfaces. These can then be included in a PUSH_DIRECT_PATHS + * message to another peer, causing it to possibly try these addresses and + * bust our local symmetric NAT. It works often enough to be worth the + * extra bit of code and does no harm in cases where it fails. */ + + // Gather unique surfaces indexed by local received-on address and flag + // us as behind a symmetric NAT if there is more than one. + std::map< InetAddress,std::set<InetAddress> > surfaces; + bool symmetric = false; + { + Mutex::Lock _l(_phy_m); + Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy); + PhySurfaceKey *k = (PhySurfaceKey *)0; + PhySurfaceEntry *e = (PhySurfaceEntry *)0; + while (i.next(k,e)) { + if ((e->mySurface.ss_family == AF_INET)&&(e->mySurface.ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { + std::set<InetAddress> &s = surfaces[k->receivedOnLocalAddress]; + s.insert(e->mySurface); + symmetric = symmetric||(s.size() > 1); + } + } + } + + // If we appear to be symmetrically NATed, generate and return extrapolations + // of those surfaces. Since PUSH_DIRECT_PATHS is sent multiple times, we + // probabilistically generate extrapolations of anywhere from +1 to +5 to + // increase the odds that it will work "eventually". + if (symmetric) { + std::vector<InetAddress> r; + for(std::map< InetAddress,std::set<InetAddress> >::iterator si(surfaces.begin());si!=surfaces.end();++si) { + for(std::set<InetAddress>::iterator i(si->second.begin());i!=si->second.end();++i) { + InetAddress ipp(*i); + unsigned int p = ipp.port() + 1 + ((unsigned int)RR->node->prng() & 3); + if (p >= 65535) + p -= 64510; // NATs seldom use ports <=1024 so wrap to 1025 + ipp.setPort(p); + if ((si->second.count(ipp) == 0)&&(std::find(r.begin(),r.end(),ipp) == r.end())) { + r.push_back(ipp); + } + } + } + return r; + } + + return std::vector<InetAddress>(); +} + } // namespace ZeroTier diff --git a/node/SelfAwareness.hpp b/node/SelfAwareness.hpp index 2534d986..06c264a9 100644 --- a/node/SelfAwareness.hpp +++ b/node/SelfAwareness.hpp @@ -42,12 +42,13 @@ public: * Called when a trusted remote peer informs us of our external network address * * @param reporter ZeroTier address of reporting peer + * @param receivedOnLocalAddress Local address on which report was received * @param reporterPhysicalAddress Physical address that reporting peer seems to have * @param myPhysicalAddress Physical address that peer says we have * @param trusted True if this peer is trusted as an authority to inform us of external address changes * @param now Current time */ - void iam(const Address &reporter,const InetAddress &reporterPhysicalAddress,const InetAddress &myPhysicalAddress,bool trusted,uint64_t now); + void iam(const Address &reporter,const InetAddress &receivedOnLocalAddress,const InetAddress &reporterPhysicalAddress,const InetAddress &myPhysicalAddress,bool trusted,uint64_t now); /** * Clean up database periodically @@ -56,18 +57,26 @@ public: */ void clean(uint64_t now); + /** + * If we appear to be behind a symmetric NAT, get predictions for possible external endpoints + * + * @return Symmetric NAT predictions or empty vector if none + */ + std::vector<InetAddress> getSymmetricNatPredictions(); + private: struct PhySurfaceKey { Address reporter; + InetAddress receivedOnLocalAddress; InetAddress reporterPhysicalAddress; InetAddress::IpScope scope; PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {} - PhySurfaceKey(const Address &r,const InetAddress &ra,InetAddress::IpScope s) : reporter(r),reporterPhysicalAddress(ra),scope(s) {} + PhySurfaceKey(const Address &r,const InetAddress &rol,const InetAddress &ra,InetAddress::IpScope s) : reporter(r),receivedOnLocalAddress(rol),reporterPhysicalAddress(ra),scope(s) {} inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); } - inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(reporterPhysicalAddress == k.reporterPhysicalAddress)&&(scope == k.scope)); } + inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(receivedOnLocalAddress == k.receivedOnLocalAddress)&&(reporterPhysicalAddress == k.reporterPhysicalAddress)&&(scope == k.scope)); } }; struct PhySurfaceEntry { diff --git a/node/Switch.cpp b/node/Switch.cpp index 4c91a855..968d1a4a 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -60,7 +60,6 @@ Switch::Switch(const RuntimeEnvironment *renv) : RR(renv), _lastBeaconResponse(0), _outstandingWhoisRequests(32), - _defragQueue(32), _lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine { } @@ -72,11 +71,14 @@ Switch::~Switch() void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) { try { + const uint64_t now = RR->node->now(); + if (len == 13) { /* LEGACY: before VERB_PUSH_DIRECT_PATHS, peers used broadcast * announcements on the LAN to solve the 'same network problem.' We * no longer send these, but we'll listen for them for a while to * locate peers with versions <1.0.4. */ + Address beaconAddr(reinterpret_cast<const char *>(data) + 8,5); if (beaconAddr == RR->identity.address()) return; @@ -84,7 +86,6 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from return; SharedPtr<Peer> peer(RR->topology->getPeer(beaconAddr)); if (peer) { // we'll only respond to beacons from known peers - const uint64_t now = RR->node->now(); if ((now - _lastBeaconResponse) >= 2500) { // limit rate of responses _lastBeaconResponse = now; Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NOP); @@ -92,11 +93,209 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from RR->node->putPacket(localAddr,fromAddr,outp.data(),outp.size()); } } - } else if (len > ZT_PROTO_MIN_FRAGMENT_LENGTH) { - if (((const unsigned char *)data)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) { - _handleRemotePacketFragment(localAddr,fromAddr,data,len); - } else if (len >= ZT_PROTO_MIN_PACKET_LENGTH) { - _handleRemotePacketHead(localAddr,fromAddr,data,len); + + } else if (len > ZT_PROTO_MIN_FRAGMENT_LENGTH) { // min length check is important! + if (reinterpret_cast<const uint8_t *>(data)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) { + // Handle fragment ---------------------------------------------------- + + Packet::Fragment fragment(data,len); + const Address destination(fragment.destination()); + + if (destination != RR->identity.address()) { + // Fragment is not for us, so try to relay it + if (fragment.hops() < ZT_RELAY_MAX_HOPS) { + fragment.incrementHops(); + + // Note: we don't bother initiating NAT-t for fragments, since heads will set that off. + // It wouldn't hurt anything, just redundant and unnecessary. + SharedPtr<Peer> relayTo = RR->topology->getPeer(destination); + if ((!relayTo)||(!relayTo->send(fragment.data(),fragment.size(),now))) { +#ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) { + RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false); + return; + } +#endif + + // Don't know peer or no direct path -- so relay via root server + relayTo = RR->topology->getBestRoot(); + if (relayTo) + relayTo->send(fragment.data(),fragment.size(),now); + } + } else { + TRACE("dropped relay [fragment](%s) -> %s, max hops exceeded",fromAddr.toString().c_str(),destination.toString().c_str()); + } + } else { + // Fragment looks like ours + const uint64_t fragmentPacketId = fragment.packetId(); + const unsigned int fragmentNumber = fragment.fragmentNumber(); + const unsigned int totalFragments = fragment.totalFragments(); + + if ((totalFragments <= ZT_MAX_PACKET_FRAGMENTS)&&(fragmentNumber < ZT_MAX_PACKET_FRAGMENTS)&&(fragmentNumber > 0)&&(totalFragments > 1)) { + // Fragment appears basically sane. Its fragment number must be + // 1 or more, since a Packet with fragmented bit set is fragment 0. + // Total fragments must be more than 1, otherwise why are we + // seeing a Packet::Fragment? + + Mutex::Lock _l(_rxQueue_m); + RXQueueEntry *const rq = _findRXQueueEntry(now,fragmentPacketId); + + if ((!rq->timestamp)||(rq->packetId != fragmentPacketId)) { + // No packet found, so we received a fragment without its head. + //TRACE("fragment (%u/%u) of %.16llx from %s",fragmentNumber + 1,totalFragments,fragmentPacketId,fromAddr.toString().c_str()); + + rq->timestamp = now; + rq->packetId = fragmentPacketId; + rq->frags[fragmentNumber - 1] = fragment; + rq->totalFragments = totalFragments; // total fragment count is known + rq->haveFragments = 1 << fragmentNumber; // we have only this fragment + rq->complete = false; + } else if (!(rq->haveFragments & (1 << fragmentNumber))) { + // We have other fragments and maybe the head, so add this one and check + //TRACE("fragment (%u/%u) of %.16llx from %s",fragmentNumber + 1,totalFragments,fragmentPacketId,fromAddr.toString().c_str()); + + rq->frags[fragmentNumber - 1] = fragment; + rq->totalFragments = totalFragments; + + if (Utils::countBits(rq->haveFragments |= (1 << fragmentNumber)) == totalFragments) { + // We have all fragments -- assemble and process full Packet + //TRACE("packet %.16llx is complete, assembling and processing...",fragmentPacketId); + + for(unsigned int f=1;f<totalFragments;++f) + rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength()); + + if (rq->frag0.tryDecode(RR,false)) { + rq->timestamp = 0; // packet decoded, free entry + } else { + rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something + } + } + } // else this is a duplicate fragment, ignore + } + } + + // -------------------------------------------------------------------- + } else if (len >= ZT_PROTO_MIN_PACKET_LENGTH) { // min length check is important! + // Handle packet head ------------------------------------------------- + + // See packet format in Packet.hpp to understand this + const uint64_t packetId = ( + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[0]) << 56) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[1]) << 48) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[2]) << 40) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[3]) << 32) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[4]) << 24) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[5]) << 16) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[6]) << 8) | + ((uint64_t)reinterpret_cast<const uint8_t *>(data)[7]) + ); + const Address destination(reinterpret_cast<const uint8_t *>(data) + 8,ZT_ADDRESS_LENGTH); + const Address source(reinterpret_cast<const uint8_t *>(data) + 13,ZT_ADDRESS_LENGTH); + + // Catch this and toss it -- it would never work, but it could happen if we somehow + // mistakenly guessed an address we're bound to as a destination for another peer. + if (source == RR->identity.address()) + return; + + //TRACE("<< %.16llx %s -> %s (size: %u)",(unsigned long long)packet->packetId(),source.toString().c_str(),destination.toString().c_str(),packet->size()); + + if (destination != RR->identity.address()) { + Packet packet(data,len); + + // Packet is not for us, so try to relay it + if (packet.hops() < ZT_RELAY_MAX_HOPS) { + packet.incrementHops(); + + SharedPtr<Peer> relayTo = RR->topology->getPeer(destination); + if ((relayTo)&&((relayTo->send(packet.data(),packet.size(),now)))) { + Mutex::Lock _l(_lastUniteAttempt_m); + uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; + if ((now - luts) >= ZT_MIN_UNITE_INTERVAL) { + luts = now; + unite(source,destination); + } + } else { +#ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) { + bool shouldUnite; + { + Mutex::Lock _l(_lastUniteAttempt_m); + uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; + shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL); + if (shouldUnite) + luts = now; + } + RR->cluster->sendViaCluster(source,destination,packet.data(),packet.size(),shouldUnite); + return; + } +#endif + + relayTo = RR->topology->getBestRoot(&source,1,true); + if (relayTo) + relayTo->send(packet.data(),packet.size(),now); + } + } else { + TRACE("dropped relay %s(%s) -> %s, max hops exceeded",packet.source().toString().c_str(),fromAddr.toString().c_str(),destination.toString().c_str()); + } + } else if ((reinterpret_cast<const uint8_t *>(data)[ZT_PACKET_IDX_FLAGS] & ZT_PROTO_FLAG_FRAGMENTED) != 0) { + // Packet is the head of a fragmented packet series + + Mutex::Lock _l(_rxQueue_m); + RXQueueEntry *const rq = _findRXQueueEntry(now,packetId); + + if ((!rq->timestamp)||(rq->packetId != packetId)) { + // If we have no other fragments yet, create an entry and save the head + //TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str()); + + rq->timestamp = now; + rq->packetId = packetId; + rq->frag0.init(data,len,localAddr,fromAddr,now); + rq->totalFragments = 0; + rq->haveFragments = 1; + rq->complete = false; + } else if (!(rq->haveFragments & 1)) { + // If we have other fragments but no head, see if we are complete with the head + + if ((rq->totalFragments > 1)&&(Utils::countBits(rq->haveFragments |= 1) == rq->totalFragments)) { + // We have all fragments -- assemble and process full Packet + //TRACE("packet %.16llx is complete, assembling and processing...",pid); + + rq->frag0.init(data,len,localAddr,fromAddr,now); + for(unsigned int f=1;f<rq->totalFragments;++f) + rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength()); + + if (rq->frag0.tryDecode(RR,false)) { + rq->timestamp = 0; // packet decoded, free entry + } else { + rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something + } + } else { + // Still waiting on more fragments, but keep the head + rq->frag0.init(data,len,localAddr,fromAddr,now); + } + } // else this is a duplicate head, ignore + } else { + // Packet is unfragmented, so just process it + IncomingPacket packet(data,len,localAddr,fromAddr,now); + if (!packet.tryDecode(RR,false)) { + Mutex::Lock _l(_rxQueue_m); + RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]); + unsigned long i = ZT_RX_QUEUE_SIZE - 1; + while ((i)&&(rq->timestamp)) { + RXQueueEntry *tmp = &(_rxQueue[--i]); + if (tmp->timestamp < rq->timestamp) + rq = tmp; + } + rq->timestamp = now; + rq->packetId = packetId; + rq->frag0 = packet; + rq->totalFragments = 1; + rq->haveFragments = 1; + rq->complete = true; + } + } + + // -------------------------------------------------------------------- } } } catch (std::exception &ex) { @@ -451,10 +650,13 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer) { // finish processing any packets waiting on peer's public key / identity Mutex::Lock _l(_rxQueue_m); - for(std::list< SharedPtr<IncomingPacket> >::iterator rxi(_rxQueue.begin());rxi!=_rxQueue.end();) { - if ((*rxi)->tryDecode(RR,false)) - _rxQueue.erase(rxi++); - else ++rxi; + unsigned long i = ZT_RX_QUEUE_SIZE; + while (i) { + RXQueueEntry *rq = &(_rxQueue[--i]); + if ((rq->timestamp)&&(rq->complete)) { + if (rq->frag0.tryDecode(RR,false)) + rq->timestamp = 0; + } } } @@ -478,31 +680,31 @@ unsigned long Switch::doTimerTasks(uint64_t now) Mutex::Lock _l(_contactQueue_m); for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) { if (now >= qi->fireAtTime) { - if (qi->peer->hasActiveDirectPath(now)) { - // Cancel if connection has succeeded + if (!qi->peer->pushDirectPaths(qi->localAddr,qi->inaddr,now,true)) + qi->peer->sendHELLO(qi->localAddr,qi->inaddr,now); + _contactQueue.erase(qi++); + continue; + /* Old symmetric NAT buster code, obsoleted by port prediction alg in SelfAwareness but left around for now in case we revert + if (qi->strategyIteration == 0) { + // First strategy: send packet directly to destination + qi->peer->sendHELLO(qi->localAddr,qi->inaddr,now); + } else if (qi->strategyIteration <= 3) { + // Strategies 1-3: try escalating ports for symmetric NATs that remap sequentially + InetAddress tmpaddr(qi->inaddr); + int p = (int)qi->inaddr.port() + qi->strategyIteration; + if (p > 65535) + p -= 64511; + tmpaddr.setPort((unsigned int)p); + qi->peer->sendHELLO(qi->localAddr,tmpaddr,now); + } else { + // All strategies tried, expire entry _contactQueue.erase(qi++); continue; - } else { - if (qi->strategyIteration == 0) { - // First strategy: send packet directly to destination - qi->peer->sendHELLO(qi->localAddr,qi->inaddr,now); - } else if (qi->strategyIteration <= 3) { - // Strategies 1-3: try escalating ports for symmetric NATs that remap sequentially - InetAddress tmpaddr(qi->inaddr); - int p = (int)qi->inaddr.port() + qi->strategyIteration; - if (p < 0xffff) { - tmpaddr.setPort((unsigned int)p); - qi->peer->sendHELLO(qi->localAddr,tmpaddr,now); - } else qi->strategyIteration = 5; - } else { - // All strategies tried, expire entry - _contactQueue.erase(qi++); - continue; - } - ++qi->strategyIteration; - qi->fireAtTime = now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY; - nextDelay = std::min(nextDelay,(unsigned long)ZT_NAT_T_TACTICAL_ESCALATION_DELAY); } + ++qi->strategyIteration; + qi->fireAtTime = now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY; + nextDelay = std::min(nextDelay,(unsigned long)ZT_NAT_T_TACTICAL_ESCALATION_DELAY); + */ } else { nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now)); } @@ -546,29 +748,6 @@ unsigned long Switch::doTimerTasks(uint64_t now) } } - { // Time out RX queue packets that never got WHOIS lookups or other info. - Mutex::Lock _l(_rxQueue_m); - for(std::list< SharedPtr<IncomingPacket> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) { - if ((now - (*i)->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) { - TRACE("RX %s -> %s timed out",(*i)->source().toString().c_str(),(*i)->destination().toString().c_str()); - _rxQueue.erase(i++); - } else ++i; - } - } - - { // Time out packets that didn't get all their fragments. - Mutex::Lock _l(_defragQueue_m); - Hashtable< uint64_t,DefragQueueEntry >::Iterator i(_defragQueue); - uint64_t *packetId = (uint64_t *)0; - DefragQueueEntry *qe = (DefragQueueEntry *)0; - while (i.next(packetId,qe)) { - if ((now - qe->creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) { - TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",*packetId); - _defragQueue.erase(*packetId); - } - } - } - { // Remove really old last unite attempt entries to keep table size controlled Mutex::Lock _l(_lastUniteAttempt_m); Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt); @@ -583,180 +762,6 @@ unsigned long Switch::doTimerTasks(uint64_t now) return nextDelay; } -void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) -{ - Packet::Fragment fragment(data,len); - Address destination(fragment.destination()); - - if (destination != RR->identity.address()) { - // Fragment is not for us, so try to relay it - if (fragment.hops() < ZT_RELAY_MAX_HOPS) { - fragment.incrementHops(); - - // Note: we don't bother initiating NAT-t for fragments, since heads will set that off. - // It wouldn't hurt anything, just redundant and unnecessary. - SharedPtr<Peer> relayTo = RR->topology->getPeer(destination); - if ((!relayTo)||(!relayTo->send(fragment.data(),fragment.size(),RR->node->now()))) { -#ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) { - RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false); - return; - } -#endif - - // Don't know peer or no direct path -- so relay via root server - relayTo = RR->topology->getBestRoot(); - if (relayTo) - relayTo->send(fragment.data(),fragment.size(),RR->node->now()); - } - } else { - TRACE("dropped relay [fragment](%s) -> %s, max hops exceeded",fromAddr.toString().c_str(),destination.toString().c_str()); - } - } else { - // Fragment looks like ours - uint64_t pid = fragment.packetId(); - unsigned int fno = fragment.fragmentNumber(); - unsigned int tf = fragment.totalFragments(); - - if ((tf <= ZT_MAX_PACKET_FRAGMENTS)&&(fno < ZT_MAX_PACKET_FRAGMENTS)&&(fno > 0)&&(tf > 1)) { - // Fragment appears basically sane. Its fragment number must be - // 1 or more, since a Packet with fragmented bit set is fragment 0. - // Total fragments must be more than 1, otherwise why are we - // seeing a Packet::Fragment? - - Mutex::Lock _l(_defragQueue_m); - DefragQueueEntry &dq = _defragQueue[pid]; - - if (!dq.creationTime) { - // We received a Packet::Fragment without its head, so queue it and wait - - dq.creationTime = RR->node->now(); - dq.frags[fno - 1] = fragment; - dq.totalFragments = tf; // total fragment count is known - dq.haveFragments = 1 << fno; // we have only this fragment - //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str()); - } else if (!(dq.haveFragments & (1 << fno))) { - // We have other fragments and maybe the head, so add this one and check - - dq.frags[fno - 1] = fragment; - dq.totalFragments = tf; - //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str()); - - if (Utils::countBits(dq.haveFragments |= (1 << fno)) == tf) { - // We have all fragments -- assemble and process full Packet - //TRACE("packet %.16llx is complete, assembling and processing...",pid); - - SharedPtr<IncomingPacket> packet(dq.frag0); - for(unsigned int f=1;f<tf;++f) - packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength()); - _defragQueue.erase(pid); // dq no longer valid after this - - if (!packet->tryDecode(RR,false)) { - Mutex::Lock _l(_rxQueue_m); - _rxQueue.push_back(packet); - } - } - } // else this is a duplicate fragment, ignore - } - } -} - -void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) -{ - const uint64_t now = RR->node->now(); - SharedPtr<IncomingPacket> packet(new IncomingPacket(data,len,localAddr,fromAddr,now)); - - Address source(packet->source()); - Address destination(packet->destination()); - - // Catch this and toss it -- it would never work, but it could happen if we somehow - // mistakenly guessed an address we're bound to as a destination for another peer. - if (source == RR->identity.address()) - return; - - //TRACE("<< %.16llx %s -> %s (size: %u)",(unsigned long long)packet->packetId(),source.toString().c_str(),destination.toString().c_str(),packet->size()); - - if (destination != RR->identity.address()) { - // Packet is not for us, so try to relay it - if (packet->hops() < ZT_RELAY_MAX_HOPS) { - packet->incrementHops(); - - SharedPtr<Peer> relayTo = RR->topology->getPeer(destination); - if ((relayTo)&&((relayTo->send(packet->data(),packet->size(),now)))) { - Mutex::Lock _l(_lastUniteAttempt_m); - uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; - if ((now - luts) >= ZT_MIN_UNITE_INTERVAL) { - luts = now; - unite(source,destination); - } - } else { -#ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) { - bool shouldUnite; - { - Mutex::Lock _l(_lastUniteAttempt_m); - uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; - shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL); - if (shouldUnite) - luts = now; - } - RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size(),shouldUnite); - return; - } -#endif - - relayTo = RR->topology->getBestRoot(&source,1,true); - if (relayTo) - relayTo->send(packet->data(),packet->size(),now); - } - } else { - TRACE("dropped relay %s(%s) -> %s, max hops exceeded",packet->source().toString().c_str(),fromAddr.toString().c_str(),destination.toString().c_str()); - } - } else if (packet->fragmented()) { - // Packet is the head of a fragmented packet series - - uint64_t pid = packet->packetId(); - Mutex::Lock _l(_defragQueue_m); - DefragQueueEntry &dq = _defragQueue[pid]; - - if (!dq.creationTime) { - // If we have no other fragments yet, create an entry and save the head - - dq.creationTime = now; - dq.frag0 = packet; - dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment - dq.haveFragments = 1; // head is first bit (left to right) - //TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str()); - } else if (!(dq.haveFragments & 1)) { - // If we have other fragments but no head, see if we are complete with the head - - if ((dq.totalFragments)&&(Utils::countBits(dq.haveFragments |= 1) == dq.totalFragments)) { - // We have all fragments -- assemble and process full Packet - - //TRACE("packet %.16llx is complete, assembling and processing...",pid); - // packet already contains head, so append fragments - for(unsigned int f=1;f<dq.totalFragments;++f) - packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength()); - _defragQueue.erase(pid); // dq no longer valid after this - - if (!packet->tryDecode(RR,false)) { - Mutex::Lock _l(_rxQueue_m); - _rxQueue.push_back(packet); - } - } else { - // Still waiting on more fragments, so queue the head - dq.frag0 = packet; - } - } // else this is a duplicate head, ignore - } else { - // Packet is unfragmented, so just process it - if (!packet->tryDecode(RR,false)) { - Mutex::Lock _l(_rxQueue_m); - _rxQueue.push_back(packet); - } - } -} - Address Switch::_sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted) { SharedPtr<Peer> root(RR->topology->getBestRoot(peersAlreadyConsulted,numPeersAlreadyConsulted,false)); @@ -813,12 +818,13 @@ bool Switch::_trySend(const Packet &packet,bool encrypt,uint64_t nwid) relay = RR->topology->getBestRoot(); if (!(relay)||(!(viaPath = relay->getBestPath(now)))) - return false; // no paths, no root servers? + return false; // no paths, no root servers?, no relays? :P~~~ } if ((network)&&(relay)&&(network->isAllowed(peer))) { // Push hints for direct connectivity to this peer if we are relaying - peer->pushDirectPaths(viaPath,now,false); + peer->pushDirectPaths(viaPath->localAddress(),viaPath->address(),now,false); + viaPath->sent(now); } Packet tmp(packet); diff --git a/node/Switch.hpp b/node/Switch.hpp index f77bf86c..ce4f00a1 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -150,8 +150,6 @@ public: unsigned long doTimerTasks(uint64_t now); private: - void _handleRemotePacketFragment(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len); - void _handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len); Address _sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted); bool _trySend(const Packet &packet,bool encrypt,uint64_t nwid); @@ -169,23 +167,40 @@ private: Hashtable< Address,WhoisRequest > _outstandingWhoisRequests; Mutex _outstandingWhoisRequests_m; - // Packet defragmentation queue -- comes before RX queue in path - struct DefragQueueEntry + // Packets waiting for WHOIS replies or other decode info or missing fragments + struct RXQueueEntry { - DefragQueueEntry() : creationTime(0),totalFragments(0),haveFragments(0) {} - uint64_t creationTime; - SharedPtr<IncomingPacket> frag0; - Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1]; + RXQueueEntry() : timestamp(0) {} + uint64_t timestamp; // 0 if entry is not in use + uint64_t packetId; + IncomingPacket frag0; // head of packet + Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1]; // later fragments (if any) unsigned int totalFragments; // 0 if only frag0 received, waiting for frags uint32_t haveFragments; // bit mask, LSB to MSB + bool complete; // if true, packet is complete }; - Hashtable< uint64_t,DefragQueueEntry > _defragQueue; - Mutex _defragQueue_m; - - // ZeroTier-layer RX queue of incoming packets in the process of being decoded - std::list< SharedPtr<IncomingPacket> > _rxQueue; + RXQueueEntry _rxQueue[ZT_RX_QUEUE_SIZE]; Mutex _rxQueue_m; + /* Returns the matching or oldest entry. Caller must check timestamp and + * packet ID to determine which. */ + inline RXQueueEntry *_findRXQueueEntry(uint64_t now,uint64_t packetId) + { + RXQueueEntry *rq; + RXQueueEntry *oldest = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]); + unsigned long i = ZT_RX_QUEUE_SIZE; + while (i) { + rq = &(_rxQueue[--i]); + if ((rq->packetId == packetId)&&(rq->timestamp)) + return rq; + if ((now - rq->timestamp) >= ZT_RX_QUEUE_EXPIRE) + rq->timestamp = 0; + if (rq->timestamp < oldest->timestamp) + oldest = rq; + } + return oldest; + } + // ZeroTier-layer TX queue entry struct TXQueueEntry { diff --git a/node/Utils.hpp b/node/Utils.hpp index 17c00459..3f4cc765 100644 --- a/node/Utils.hpp +++ b/node/Utils.hpp @@ -51,9 +51,9 @@ public: static inline bool secureEq(const void *a,const void *b,unsigned int len) throw() { - char diff = 0; + uint8_t diff = 0; for(unsigned int i=0;i<len;++i) - diff |= ( (reinterpret_cast<const char *>(a))[i] ^ (reinterpret_cast<const char *>(b))[i] ); + diff |= ( (reinterpret_cast<const uint8_t *>(a))[i] ^ (reinterpret_cast<const uint8_t *>(b))[i] ); return (diff == 0); } |
