diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-06-06 16:11:19 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-06-06 16:11:19 -0700 |
commit | 951d911531ec1eaa246e234a7b31b6bf5bc6bb79 (patch) | |
tree | 28c7f9f5ff6c1537976a6e096aff7ac1ab7007e2 | |
parent | 4f2179b0dfb06e7f1b802af89ed57b07c84e4195 (diff) | |
download | infinitytier-951d911531ec1eaa246e234a7b31b6bf5bc6bb79.tar.gz infinitytier-951d911531ec1eaa246e234a7b31b6bf5bc6bb79.zip |
Last bit of new cluster code, ready to test.
-rw-r--r-- | osdep/Binder.hpp | 215 | ||||
-rw-r--r-- | service/OneService.cpp | 531 |
2 files changed, 414 insertions, 332 deletions
diff --git a/osdep/Binder.hpp b/osdep/Binder.hpp index fee1c3da..1839ecc2 100644 --- a/osdep/Binder.hpp +++ b/osdep/Binder.hpp @@ -66,11 +66,7 @@ #include "Phy.hpp" #include "OSUtils.hpp" -/** - * Period between binder rescans/refreshes - * - * OneService also does this on detected restarts. - */ +// Period between refreshes of bindings #define ZT_BINDER_REFRESH_PERIOD 30000 namespace ZeroTier { @@ -105,10 +101,7 @@ public: Binder() {} /** - * Close all bound ports - * - * This should be called on shutdown. It closes listen sockets and UDP ports - * but not TCP connections from any TCP listen sockets. + * Close all bound ports, should be called on shutdown * * @param phy Physical interface */ @@ -116,9 +109,9 @@ public: void closeAll(Phy<PHY_HANDLER_TYPE> &phy) { Mutex::Lock _l(_lock); - for(typename std::vector<_Binding>::const_iterator i(_bindings.begin());i!=_bindings.end();++i) { - phy.close(i->udpSock,false); - phy.close(i->tcpListenSock,false); + for(std::vector<_Binding>::iterator b(_bindings.begin());b!=_bindings.end();++b) { + phy.close(b->udpSock,false); + phy.close(b->tcpListenSock,false); } } @@ -129,7 +122,7 @@ public: * changes, on startup, or periodically (e.g. every 30-60s). * * @param phy Physical interface - * @param port Port to bind to on all interfaces (TCP and UDP) + * @param ports Ports to bind on all interfaces * @param ignoreInterfacesByName Ignore these interfaces by name * @param ignoreInterfacesByNamePrefix Ignore these interfaces by name-prefix (starts-with, e.g. zt ignores zt*) * @param ignoreInterfacesByAddress Ignore these interfaces by address @@ -137,11 +130,10 @@ public: * @tparam INTERFACE_CHECKER Type for class containing shouldBindInterface() method */ template<typename PHY_HANDLER_TYPE,typename INTERFACE_CHECKER> - void refresh(Phy<PHY_HANDLER_TYPE> &phy,unsigned int port,INTERFACE_CHECKER &ifChecker) + void refresh(Phy<PHY_HANDLER_TYPE> &phy,unsigned int *ports,unsigned int portCount,INTERFACE_CHECKER &ifChecker) { std::map<InetAddress,std::string> localIfAddrs; - PhySocket *udps; - //PhySocket *tcps; + PhySocket *udps,*tcps; Mutex::Lock _l(_lock); #ifdef __WINDOWS__ @@ -161,8 +153,10 @@ public: case InetAddress::IP_SCOPE_GLOBAL: case InetAddress::IP_SCOPE_SHARED: case InetAddress::IP_SCOPE_PRIVATE: - ip.setPort(port); - localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string())); + for(int x=0;x<portCount;++x) { + ip.setPort(ports[x]); + localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string())); + } break; } } @@ -231,8 +225,10 @@ public: case InetAddress::IP_SCOPE_GLOBAL: case InetAddress::IP_SCOPE_SHARED: case InetAddress::IP_SCOPE_PRIVATE: - ip.setPort(port); - localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string(devname))); + for(int x=0;x<portCount;++x) { + ip.setPort(ports[x]); + localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string(devname))); + } break; } } @@ -249,11 +245,8 @@ public: configuration.ifc_buf = nullptr; if (controlfd < 0) goto ip4_address_error; - if (ioctl(controlfd, SIOCGIFCONF, &configuration) < 0) goto ip4_address_error; - configuration.ifc_buf = (char*)malloc(configuration.ifc_len); - if (ioctl(controlfd, SIOCGIFCONF, &configuration) < 0) goto ip4_address_error; for (int i=0; i < (int)(configuration.ifc_len / sizeof(ifreq)); i ++) { @@ -262,9 +255,8 @@ public: if (addr->sa_family != AF_INET) continue; std::string ifname = request.ifr_ifrn.ifrn_name; // name can either be just interface name or interface name followed by ':' and arbitrary label - if (ifname.find(':') != std::string::npos) { + if (ifname.find(':') != std::string::npos) ifname = ifname.substr(0, ifname.find(':')); - } InetAddress ip(&(((struct sockaddr_in *)addr)->sin_addr),4,0); if (ifChecker.shouldBindInterface(ifname.c_str(), ip)) { @@ -274,8 +266,10 @@ public: case InetAddress::IP_SCOPE_GLOBAL: case InetAddress::IP_SCOPE_SHARED: case InetAddress::IP_SCOPE_PRIVATE: - ip.setPort(port); - localIfAddrs.insert(std::pair<InetAddress,std::string>(ip, ifname)); + for(int x=0;x<portCount;++x) { + ip.setPort(ports[x]); + localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,ifname)); + } break; } } @@ -306,8 +300,10 @@ public: case InetAddress::IP_SCOPE_GLOBAL: case InetAddress::IP_SCOPE_SHARED: case InetAddress::IP_SCOPE_PRIVATE: - ip.setPort(port); - localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string(ifa->ifa_name))); + for(int x=0;x<portCount;++x) { + ip.setPort(ports[x]); + localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string(ifa->ifa_name))); + } break; } } @@ -322,59 +318,57 @@ public: // Default to binding to wildcard if we can't enumerate addresses if (localIfAddrs.empty()) { - localIfAddrs.insert(std::pair<InetAddress,std::string>(InetAddress((uint32_t)0,port),std::string())); - localIfAddrs.insert(std::pair<InetAddress,std::string>(InetAddress((const void *)"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",16,port),std::string())); + for(int x=0;x<portCount;++x) { + localIfAddrs.insert(std::pair<InetAddress,std::string>(InetAddress((uint32_t)0,ports[x]),std::string())); + localIfAddrs.insert(std::pair<InetAddress,std::string>(InetAddress((const void *)"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",16,ports[x]),std::string())); + } } - // Close any old bindings to anything that doesn't exist anymore - for(typename std::vector<_Binding>::const_iterator bi(_bindings.begin());bi!=_bindings.end();++bi) { - if (localIfAddrs.find(bi->address) == localIfAddrs.end()) { - phy.close(bi->udpSock,false); - phy.close(bi->tcpListenSock,false); + std::vector<_Binding> newBindings; + + // Save bindings that are still valid, close those that are not + for(std::vector<_Binding>::iterator b(_bindings.begin());b!=_bindings.end();++b) { + if (localIfAddrs.find(b->address) != localIfAddrs.end()) { + newBindings.push_back(*b); + } else { + phy.close(b->udpSock,false); + phy.close(b->tcpListenSock,false); } } - std::vector<_Binding> newBindings; + // Create new bindings for those not already bound for(std::map<InetAddress,std::string>::const_iterator ii(localIfAddrs.begin());ii!=localIfAddrs.end();++ii) { - typename std::vector<_Binding>::const_iterator bi(_bindings.begin()); - while (bi != _bindings.end()) { - if (bi->address == ii->first) { - newBindings.push_back(*bi); + typename std::vector<_Binding>::const_iterator bi(newBindings.begin()); + while (bi != newBindings.end()) { + if (bi->address == ii->first) break; - } ++bi; } - - if (bi == _bindings.end()) { + if (bi == newBindings.end()) { udps = phy.udpBind(reinterpret_cast<const struct sockaddr *>(&(ii->first)),(void *)0,ZT_UDP_DESIRED_BUF_SIZE); - if (udps) { - //tcps = phy.tcpListen(reinterpret_cast<const struct sockaddr *>(&ii),(void *)0); - //if (tcps) { + tcps = phy.tcpListen(reinterpret_cast<const struct sockaddr *>(&(ii->first)),(void *)0); + if ((udps)&&(tcps)) { #ifdef __LINUX__ - // Bind Linux sockets to their device so routes tha we manage do not override physical routes (wish all platforms had this!) - if (ii->second.length() > 0) { - int fd = (int)Phy<PHY_HANDLER_TYPE>::getDescriptor(udps); - char tmp[256]; - Utils::scopy(tmp,sizeof(tmp),ii->second.c_str()); - if (fd >= 0) { - if (setsockopt(fd,SOL_SOCKET,SO_BINDTODEVICE,tmp,strlen(tmp)) != 0) { - fprintf(stderr,"WARNING: unable to set SO_BINDTODEVICE to bind %s to %s\n",ii->first.toIpString().c_str(),ii->second.c_str()); - } - } - } + // Bind Linux sockets to their device so routes tha we manage do not override physical routes (wish all platforms had this!) + if (ii->second.length() > 0) { + char tmp[256]; + Utils::scopy(tmp,sizeof(tmp),ii->second.c_str()); + int fd = (int)Phy<PHY_HANDLER_TYPE>::getDescriptor(udps); + if (fd >= 0) + setsockopt(fd,SOL_SOCKET,SO_BINDTODEVICE,tmp,strlen(tmp)); + fd = (int)Phy<PHY_HANDLER_TYPE>::getDescriptor(tcps); + if (fd >= 0) + setsockopt(fd,SOL_SOCKET,SO_BINDTODEVICE,tmp,strlen(tmp)); + } #endif // __LINUX__ - newBindings.push_back(_Binding()); - newBindings.back().udpSock = udps; - //newBindings.back().tcpListenSock = tcps; - newBindings.back().address = ii->first; - //} else { - // phy.close(udps,false); - //} + newBindings.push_back(_Binding()); + newBindings.back().udpSock = udps; + newBindings.back().tcpListenSock = tcps; + newBindings.back().address = ii->first; } } } - // Swapping pointers and then letting the old one fall out of scope is faster than copying again _bindings.swap(newBindings); } @@ -402,58 +396,79 @@ public: * @param data Data to send * @param len Length of data * @param v4ttl If non-zero, send this packet with the specified IP TTL (IPv4 only) + * @return -1 == local doesn't match any bound address, 0 == send failure, 1 == send successful */ template<typename PHY_HANDLER_TYPE> - inline bool udpSend(Phy<PHY_HANDLER_TYPE> &phy,const InetAddress &local,const InetAddress &remote,const void *data,unsigned int len,unsigned int v4ttl = 0) const + inline int udpSend(Phy<PHY_HANDLER_TYPE> &phy,const InetAddress &local,const InetAddress &remote,const void *data,unsigned int len,unsigned int v4ttl = 0) const { + PhySocket *s; + typename std::vector<_Binding>::const_iterator i; + int result; Mutex::Lock _l(_lock); - if (local) { - for(typename std::vector<_Binding>::const_iterator i(_bindings.begin());i!=_bindings.end();++i) { - if (i->address == local) { - if ((v4ttl)&&(local.ss_family == AF_INET)) - phy.setIp4UdpTtl(i->udpSock,v4ttl); - const bool result = phy.udpSend(i->udpSock,reinterpret_cast<const struct sockaddr *>(&remote),data,len); - if ((v4ttl)&&(local.ss_family == AF_INET)) - phy.setIp4UdpTtl(i->udpSock,255); - return result; + + if (remote.ss_family == AF_INET) { + if (local) { + for(i=_bindings.begin();i!=_bindings.end();++i) { + if ( + (i->address.ss_family == AF_INET) && + (reinterpret_cast<const struct sockaddr_in *>(&(i->address))->sin_port == reinterpret_cast<const struct sockaddr_in *>(&local)->sin_port) && + (reinterpret_cast<const struct sockaddr_in *>(&(i->address))->sin_addr.s_addr == reinterpret_cast<const struct sockaddr_in *>(&local)->sin_addr.s_addr) + ) + { + s = i->udpSock; + goto Binder_send_packet; + } + } + } else { + for(i=_bindings.begin();i!=_bindings.end();++i) { + if (i->address.ss_family == AF_INET) { + s = i->udpSock; + goto Binder_send_packet; + } } } - return false; } else { - bool result = false; - for(typename std::vector<_Binding>::const_iterator i(_bindings.begin());i!=_bindings.end();++i) { - if (i->address.ss_family == remote.ss_family) { - if ((v4ttl)&&(remote.ss_family == AF_INET)) - phy.setIp4UdpTtl(i->udpSock,v4ttl); - result |= phy.udpSend(i->udpSock,reinterpret_cast<const struct sockaddr *>(&remote),data,len); - if ((v4ttl)&&(remote.ss_family == AF_INET)) - phy.setIp4UdpTtl(i->udpSock,255); + if (local) { + for(i=_bindings.begin();i!=_bindings.end();++i) { + if ( + (i->address.ss_family == AF_INET6) && + (reinterpret_cast<const struct sockaddr_in6 *>(&(i->address))->sin6_port == reinterpret_cast<const struct sockaddr_in6 *>(&local)->sin6_port) && + (!memcmp(reinterpret_cast<const struct sockaddr_in6 *>(&(i->address))->sin6_addr.s6_addr,reinterpret_cast<const struct sockaddr_in6 *>(&local)->sin6_addr.s6_addr,16)) + ) + { + s = i->udpSock; + goto Binder_send_packet; + } + } + } else { + for(i=_bindings.begin();i!=_bindings.end();++i) { + if (i->address.ss_family == AF_INET6) { + s = i->udpSock; + goto Binder_send_packet; + } } } - return result; } + + return -1; + +Binder_send_packet: + if (v4ttl) phy.setIp4UdpTtl(s,v4ttl); + result = (int)phy.udpSend(s,reinterpret_cast<const struct sockaddr *>(&remote),data,len); + if (v4ttl) phy.setIp4UdpTtl(s,255); + return result; } /** * @return All currently bound local interface addresses */ - inline std::vector<InetAddress> allBoundLocalInterfaceAddresses() + inline std::vector<InetAddress> allBoundLocalInterfaceAddresses() const { - Mutex::Lock _l(_lock); std::vector<InetAddress> aa; - for(std::vector<_Binding>::const_iterator i(_bindings.begin());i!=_bindings.end();++i) - aa.push_back(i->address); - return aa; - } - - /** - * @param aa Vector to append local interface addresses to - */ - inline void allBoundLocalInterfaceAddresses(std::vector<InetAddress> &aa) - { Mutex::Lock _l(_lock); - for(std::vector<_Binding>::const_iterator i(_bindings.begin());i!=_bindings.end();++i) - aa.push_back(i->address); + for(std::vector<_Binding>::const_iterator b(_bindings.begin());b!=_bindings.end();++b) + aa.push_back(b->address); + return aa; } private: diff --git a/service/OneService.cpp b/service/OneService.cpp index 472115da..8fc76e31 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -160,11 +160,8 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; } // Maximum write buffer size for outgoing TCP connections (sanity limit) #define ZT_TCP_MAX_WRITEQ_SIZE 33554432 -// How often to check TCP connections and cluster links -#define ZT_TCP_CHECK_PERIOD 10000 - -// How often to send status info to cluster links -#define ZT_TCP_CLUSTER_SEND_STATUS_EVERY 30000 +// How often to check TCP connections and cluster links and send status to cluster peers +#define ZT_TCP_CHECK_PERIOD 15000 // TCP activity timeout #define ZT_TCP_ACTIVITY_TIMEOUT 60000 @@ -369,12 +366,12 @@ struct TcpConnection TCP_HTTP_INCOMING, TCP_HTTP_OUTGOING, TCP_TUNNEL_OUTGOING, // TUNNELED mode proxy outbound connection - TCP_CLUSTER_BACKPLANE, + TCP_CLUSTER_BACKPLANE } type; OneServiceImpl *parent; PhySocket *sock; - InetAddress from; + InetAddress remoteAddr; unsigned long lastReceive; // Used for inbound HTTP connections @@ -392,6 +389,7 @@ struct TcpConnection unsigned int clusterMemberVersionMinor; unsigned int clusterMemberVersionRev; std::vector< InetAddress > clusterMemberLocalAddresses; + Mutex clusterMemberLocalAddresses_m; std::string readq; std::string writeq; @@ -454,17 +452,8 @@ public: * destructively with uPnP port mapping behavior in very weird buggy ways. * It's only used if uPnP/NAT-PMP is enabled in this build. */ - Binder _bindings[3]; unsigned int _ports[3]; - uint16_t _portsBE[3]; // ports in big-endian network byte order as in sockaddr - - // Local interface addresses obtained from bindings - std::vector<InetAddress> _localInterfaceAddresses; - Mutex _localInterfaceAddresses_m; - - // Sockets for JSON API -- bound only to V4 and V6 localhost - PhySocket *_v4TcpControlSocket; - PhySocket *_v6TcpControlSocket; + Binder _binder; // Time we last received a packet from a global address uint64_t _lastDirectReceiveFromGlobal; @@ -534,8 +523,7 @@ public: ,_updateAutoApply(false) ,_primaryPort(port) ,_udpPortPickerCounter(0) - ,_v4TcpControlSocket((PhySocket *)0) - ,_v6TcpControlSocket((PhySocket *)0) + ,_clusterMemberId(0) ,_lastDirectReceiveFromGlobal(0) #ifdef ZT_TCP_FALLBACK_RELAY ,_lastSendToGlobalV4(0) @@ -548,10 +536,6 @@ public: #ifdef ZT_USE_MINIUPNPC ,_portMapper((PortMapper *)0) #endif -#ifdef ZT_ENABLE_CLUSTER - ,_clusterMessageSocket((PhySocket *)0) - ,_clusterDefinition((ClusterDefinition *)0) -#endif ,_run(true) { _ports[0] = 0; @@ -561,23 +545,11 @@ public: virtual ~OneServiceImpl() { - for(int i=0;i<3;++i) - _bindings[i].closeAll(_phy); - - _phy.close(_v4TcpControlSocket); - _phy.close(_v6TcpControlSocket); - -#ifdef ZT_ENABLE_CLUSTER - _phy.close(_clusterMessageSocket); -#endif - + _binder.closeAll(_phy); #ifdef ZT_USE_MINIUPNPC delete _portMapper; #endif delete _controller; -#ifdef ZT_ENABLE_CLUSTER - delete _clusterDefinition; -#endif } virtual ReasonForTermination run() @@ -623,7 +595,7 @@ public: InetAddress trustedPathNetworks[ZT_MAX_TRUSTED_PATHS]; unsigned int trustedPathCount = 0; - // Old style "trustedpaths" flat file -- will eventually go away + // LEGACY: support old "trustedpaths" flat file FILE *trustpaths = fopen((_homePath + ZT_PATH_SEPARATOR_S "trustedpaths").c_str(),"r"); if (trustpaths) { fprintf(stderr,"WARNING: 'trustedpaths' flat file format is deprecated in favor of path definitions in local.conf" ZT_EOL_S); @@ -688,9 +660,11 @@ public: if (trustedPathCount) _node->setTrustedPaths(reinterpret_cast<const struct sockaddr_storage *>(trustedPathNetworks),trustedPathIds,trustedPathCount); } + + // Apply other runtime configuration from local.conf applyLocalConfig(); - // Bind TCP socket + // Make sure we can use the primary port, and hunt for one if configured to do so const int portTrials = (_primaryPort == 0) ? 256 : 1; // if port is 0, pick random for(int k=0;k<portTrials;++k) { if (_primaryPort == 0) { @@ -698,32 +672,8 @@ public: Utils::getSecureRandom(&randp,sizeof(randp)); _primaryPort = 20000 + (randp % 45500); } - if (_trialBind(_primaryPort)) { - struct sockaddr_in in4; - memset(&in4,0,sizeof(in4)); - in4.sin_family = AF_INET; - in4.sin_port = Utils::hton((uint16_t)_primaryPort); - _v4TcpControlSocket = _phy.tcpListen((const struct sockaddr *)&in4,this); - - struct sockaddr_in6 in6; - memset((void *)&in6,0,sizeof(in6)); - in6.sin6_family = AF_INET6; - in6.sin6_port = in4.sin_port; - _v6TcpControlSocket = _phy.tcpListen((const struct sockaddr *)&in6,this); - - // We must bind one of IPv4 or IPv6 -- support either failing to support hosts that - // have only IPv4 or only IPv6 stacks. - if ((_v4TcpControlSocket)||(_v6TcpControlSocket)) { - _ports[0] = _primaryPort; - break; - } else { - if (_v4TcpControlSocket) - _phy.close(_v4TcpControlSocket,false); - if (_v6TcpControlSocket) - _phy.close(_v6TcpControlSocket,false); - _primaryPort = 0; - } + _ports[0] = _primaryPort; } else { _primaryPort = 0; } @@ -743,7 +693,7 @@ public: // Attempt to bind to a secondary port chosen from our ZeroTier address. // This exists because there are buggy NATs out there that fail if more // than one device behind the same NAT tries to use the same internal - // private address port number. + // private address port number. Buggy NATs are a running theme. _ports[1] = 20000 + ((unsigned int)_node->address() % 45500); for(int i=0;;++i) { if (i > 1000) { @@ -782,10 +732,6 @@ public: } #endif - // Populate ports in big-endian format for quick compare - for(int i=0;i<3;++i) - _portsBE[i] = Utils::hton((uint16_t)_ports[i]); - // Network controller is now enabled by default for desktop and server _controller = new EmbeddedNetworkController(_node,_controllerDbPath.c_str()); _node->setNetconfMaster((void *)_controller); @@ -841,8 +787,8 @@ public: #endif */ -/* - { // Load existing networks + // Join existing networks in networks.d + { std::vector<std::string> networksDotD(OSUtils::listDirectory((_homePath + ZT_PATH_SEPARATOR_S "networks.d").c_str())); for(std::vector<std::string>::iterator f(networksDotD.begin());f!=networksDotD.end();++f) { std::size_t dot = f->find_last_of('.'); @@ -850,7 +796,9 @@ public: _node->join(Utils::hexStrToU64(f->substr(0,dot).c_str()),(void *)0,(void *)0); } } - { // Load existing moons + + // Orbit existing moons in moons.d + { std::vector<std::string> moonsDotD(OSUtils::listDirectory((_homePath + ZT_PATH_SEPARATOR_S "moons.d").c_str())); for(std::vector<std::string>::iterator f(moonsDotD.begin());f!=moonsDotD.end();++f) { std::size_t dot = f->find_last_of('.'); @@ -858,8 +806,8 @@ public: _node->orbit((void *)0,Utils::hexStrToU64(f->substr(0,dot).c_str()),0); } } -*/ + // Derive the cluster's shared secret backplane encryption key by hashing its shared secret identity { uint8_t tmp[64]; SHA512::hash(tmp,_node->identity().privateKeyPair().priv.data,ZT_C25519_PRIVATE_KEY_LEN); @@ -877,6 +825,7 @@ public: uint64_t lastUpdateCheck = clockShouldBe; uint64_t lastLocalInterfaceAddressCheck = (clockShouldBe - ZT_LOCAL_INTERFACE_CHECK_INTERVAL) + 15000; // do this in 15s to give portmapper time to configure and other things time to settle uint64_t lastCleanedIddb = 0; + uint64_t lastTcpCheck = 0; for(;;) { _run_m.lock(); if (!_run) { @@ -914,11 +863,13 @@ public: // Refresh bindings in case device's interfaces have changed, and also sync routes to update any shadow routes (e.g. shadow default) if (((now - lastBindRefresh) >= ZT_BINDER_REFRESH_PERIOD)||(restarted)) { lastBindRefresh = now; + unsigned int p[3]; + unsigned int pc = 0; for(int i=0;i<3;++i) { - if (_ports[i]) { - _bindings[i].refresh(_phy,_ports[i],*this); - } + if (_ports[i]) + p[pc++] = _ports[i]; } + _binder.refresh(_phy,p,pc,*this); { Mutex::Lock _l(_nets_m); for(std::map<uint64_t,NetworkState>::iterator n(_nets.begin());n!=_nets.end();++n) { @@ -928,15 +879,18 @@ public: } } + // Run background task processor in core if it's time to do so uint64_t dl = _nextBackgroundTaskDeadline; if (dl <= now) { _node->processBackgroundTasks((void *)0,now,&_nextBackgroundTaskDeadline); dl = _nextBackgroundTaskDeadline; } + // Close TCP fallback tunnel if we have direct UDP if ((_tcpFallbackTunnel)&&((now - _lastDirectReceiveFromGlobal) < (ZT_TCP_FALLBACK_AFTER / 2))) _phy.close(_tcpFallbackTunnel->sock); + // Sync multicast group memberships if ((now - lastTapMulticastGroupCheck) >= ZT_TAP_CHECK_MULTICAST_INTERVAL) { lastTapMulticastGroupCheck = now; Mutex::Lock _l(_nets_m); @@ -952,12 +906,12 @@ public: } } + // Sync information about physical network interfaces if ((now - lastLocalInterfaceAddressCheck) >= ZT_LOCAL_INTERFACE_CHECK_INTERVAL) { lastLocalInterfaceAddressCheck = now; _node->clearLocalInterfaceAddresses(); - // Tell Node about uPnP and NAT-PMP bound external addresses #ifdef ZT_USE_MINIUPNPC if (_portMapper) { std::vector<InetAddress> mappedAddresses(_portMapper->get()); @@ -966,18 +920,57 @@ public: } #endif - // Tell Node about local interface addresses bound to the primary port - std::vector<InetAddress> boundAddrs(_bindings[0].allBoundLocalInterfaceAddresses()); + std::vector<InetAddress> boundAddrs(_binder.allBoundLocalInterfaceAddresses()); for(std::vector<InetAddress>::const_iterator i(boundAddrs.begin());i!=boundAddrs.end();++i) _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*i))); + } + + // Check TCP connections and cluster links + if ((now - lastTcpCheck) >= ZT_TCP_CHECK_PERIOD) { + lastTcpCheck = now; - // Memoize all local interface addresses for use in clustering -- we tell other cluster members about these + std::vector<PhySocket *> toClose; + std::vector<InetAddress> clusterLinksUp; { - Mutex::Lock _l(_localInterfaceAddresses_m); - _localInterfaceAddresses.clear(); - for(int i=0;i<3;++i) { - if (_ports[i] > 0) - _bindings[i].allBoundLocalInterfaceAddresses(_localInterfaceAddresses); + Mutex::Lock _l(_tcpConnections_m); + for(std::vector<TcpConnection *>::const_iterator c(_tcpConnections.begin());c!=_tcpConnections.end();++c) { + TcpConnection *const tc = *c; + tc->writeq_m.lock(); + const unsigned long wql = (unsigned long)tc->writeq.length(); + tc->writeq_m.unlock(); + if ((tc->sock)&&((wql > ZT_TCP_MAX_WRITEQ_SIZE)||((now - tc->lastReceive) > ZT_TCP_ACTIVITY_TIMEOUT))) { + toClose.push_back(tc->sock); + } else if ((tc->type == TcpConnection::TCP_CLUSTER_BACKPLANE)&&(tc->clusterMemberId)) { + clusterLinksUp.push_back(tc->remoteAddr); + sendMyCurrentClusterState(tc); + } + } + } + for(std::vector<PhySocket *>::iterator s(toClose.begin());s!=toClose.end();++s) + _phy.close(*s,true); + + { + Mutex::Lock _l(_localConfig_m); + for(std::vector<InetAddress>::const_iterator ca(_clusterBackplaneAddresses.begin());ca!=_clusterBackplaneAddresses.end();++ca) { + if (std::find(clusterLinksUp.begin(),clusterLinksUp.end(),*ca) == clusterLinksUp.end()) { + TcpConnection *tc = new TcpConnection(); + { + Mutex::Lock _l(_tcpConnections_m); + _tcpConnections.push_back(tc); + } + + tc->type = TcpConnection::TCP_CLUSTER_BACKPLANE; + tc->remoteAddr = *ca; + tc->lastReceive = OSUtils::now(); + tc->parent = this; + tc->sock = (PhySocket *)0; // set in connect handler + tc->messageSize = 0; + + tc->clusterMemberId = 0; // not known yet + + bool connected = false; + _phy.tcpConnect(reinterpret_cast<const struct sockaddr *>(&(*ca)),connected,(void *)tc,true); + } } } } @@ -986,10 +979,6 @@ public: clockShouldBe = now + (uint64_t)delay; _phy.poll(delay); } - } catch (std::exception &exc) { - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = exc.what(); } catch ( ... ) { Mutex::Lock _l(_termReason_m); _termReason = ONE_UNRECOVERABLE_ERROR; @@ -1819,110 +1808,154 @@ public: } } - void announceStatusToClusterMember(TcpConnection *tc) + // ========================================================================= + // Cluster messaging functions + // ========================================================================= + + // mlen must be at least 24 + void encryptClusterMessage(char *data,unsigned int mlen) { - Buffer<4096> buf; + uint8_t key[32]; + memcpy(key,_clusterKey,32); + for(int i=0;i<8;++i) key[i] ^= data[i]; + Salsa20 s20(key,data + 8); - buf.appendRandom(16); - buf.addSize(8); // space for MAC - buf.append((uint8_t)CLUSTER_MESSAGE_STATUS); - buf.append(_clusterMemberId); - buf.append((uint16_t)ZEROTIER_ONE_VERSION_MAJOR); - buf.append((uint16_t)ZEROTIER_ONE_VERSION_MINOR); - buf.append((uint16_t)ZEROTIER_ONE_VERSION_REVISION); + uint8_t macKey[32]; + uint8_t mac[16]; + memset(macKey,0,32); + s20.crypt12(macKey,macKey,32); + s20.crypt12(data + 24,data + 24,mlen - 24); + Poly1305::compute(mac,data + 24,mlen - 24,macKey); + memcpy(data + 16,mac,8); + } - { - Mutex::Lock _l(_localInterfaceAddresses_m); - buf.append((uint16_t)_localInterfaceAddresses.size()); - for(std::vector<InetAddress>::const_iterator i(_localInterfaceAddresses.begin());i!=_localInterfaceAddresses.end();++i) { + void announceStatusToClusterMember(TcpConnection *tc) + { + try { + Buffer<8194> buf; + + buf.appendRandom(16); + buf.addSize(8); // space for MAC + buf.append((uint8_t)CLUSTER_MESSAGE_STATUS); + buf.append(_clusterMemberId); + buf.append((uint16_t)ZEROTIER_ONE_VERSION_MAJOR); + buf.append((uint16_t)ZEROTIER_ONE_VERSION_MINOR); + buf.append((uint16_t)ZEROTIER_ONE_VERSION_REVISION); + + std::vector<InetAddress> lif(_binder.allBoundLocalInterfaceAddresses()); + buf.append((uint16_t)lif.size()); + for(std::vector<InetAddress>::const_iterator i(lif.begin());i!=lif.end();++i) i->serialize(buf); - if ((buf.size() + 32) > buf.capacity()) - break; + + Mutex::Lock _l(tc->writeq_m); + + if (tc->writeq.length() == 0) + _phy.setNotifyWritable(tc->sock,true); + + const unsigned int mlen = buf.size(); + tc->writeq.push_back((char)((mlen >> 16) & 0xff)); + tc->writeq.push_back((char)((mlen >> 8) & 0xff)); + tc->writeq.push_back((char)(mlen & 0xff)); + + char *const data = reinterpret_cast<char *>(buf.unsafeData()); + encryptClusterMessage(data,mlen); + tc->writeq.append(data,mlen); + } catch ( ... ) { + fprintf(stderr,"WARNING: unexpected exception announcing status to cluster members" ZT_EOL_S); + } + } + + bool proxySendViaCluster(const InetAddress &fromAddress,const InetAddress &dest,const void *data,unsigned int len,unsigned int ttl) + { + Mutex::Lock _l(_tcpConnections_m); + for(std::vector<TcpConnection *>::const_iterator c(_tcpConnections.begin());c!=_tcpConnections.end();++c) { + TcpConnection *const tc = *c; + if ((tc->type == TcpConnection::TCP_CLUSTER_BACKPLANE)&&(tc->clusterMemberId)) { + Mutex::Lock _l2(tc->clusterMemberLocalAddresses_m); + for(std::vector<InetAddress>::const_iterator i(tc->clusterMemberLocalAddresses.begin());i!=tc->clusterMemberLocalAddresses.end();++i) { + if (*i == fromAddress) { + Buffer<1024> buf; + + buf.appendRandom(16); + buf.addSize(8); // space for MAC + buf.append((uint8_t)CLUSTER_MESSAGE_PROXY_SEND); + buf.append((uint8_t)ttl); + dest.serialize(buf); + fromAddress.serialize(buf); + + Mutex::Lock _l3(tc->writeq_m); + + if (tc->writeq.length() == 0) + _phy.setNotifyWritable(tc->sock,true); + + const unsigned int mlen = buf.size() + len; + tc->writeq.push_back((char)((mlen >> 16) & 0xff)); + tc->writeq.push_back((char)((mlen >> 8) & 0xff)); + tc->writeq.push_back((char)(mlen & 0xff)); + + const unsigned long startpos = (unsigned long)tc->writeq.length(); + tc->writeq.append(reinterpret_cast<const char *>(buf.data()),buf.size()); + tc->writeq.append(reinterpret_cast<const char *>(data),len); + + char *const outdata = const_cast<char *>(tc->writeq.data()) + startpos; + encryptClusterMessage(outdata,mlen); + + return true; + } + } } } + return false; + } - Mutex::Lock _l(tc->writeq_m); + void replicateStateObject(const ZT_StateObjectType type,const uint64_t id,const void *const data,const unsigned int len,TcpConnection *tc) + { + char buf[34]; + + Mutex::Lock _l2(tc->writeq_m); if (tc->writeq.length() == 0) _phy.setNotifyWritable(tc->sock,true); - const unsigned int mlen = buf.size(); + const unsigned int mlen = len + 34; + tc->writeq.push_back((char)((mlen >> 16) & 0xff)); tc->writeq.push_back((char)((mlen >> 8) & 0xff)); tc->writeq.push_back((char)(mlen & 0xff)); - char *data = reinterpret_cast<char *>(buf.unsafeData()); - - uint8_t key[32]; - memcpy(key,_clusterKey,32); - for(int i=0;i<8;++i) key[i] ^= data[i]; - Salsa20 s20(key,data + 8); - - uint8_t macKey[32]; - uint8_t mac[16]; - memset(macKey,0,32); - s20.crypt12(macKey,macKey,32); - s20.crypt12(data + 24,data + 24,mlen - 24); - Poly1305::compute(mac,data + 24,mlen - 24,macKey); - memcpy(data + 16,mac,8); - - tc->writeq.append(data,mlen); + Utils::getSecureRandom(buf,16); + buf[24] = (char)CLUSTER_MESSAGE_STATE_OBJECT; + buf[25] = (char)type; + buf[26] = (char)((id >> 56) & 0xff); + buf[27] = (char)((id >> 48) & 0xff); + buf[28] = (char)((id >> 40) & 0xff); + buf[29] = (char)((id >> 32) & 0xff); + buf[30] = (char)((id >> 24) & 0xff); + buf[31] = (char)((id >> 16) & 0xff); + buf[32] = (char)((id >> 8) & 0xff); + buf[33] = (char)(id & 0xff); + + const unsigned long startpos = (unsigned long)tc->writeq.length(); + tc->writeq.append(buf,34); + tc->writeq.append(reinterpret_cast<const char *>(data),len); + + char *const outdata = const_cast<char *>(tc->writeq.data()) + startpos; + encryptClusterMessage(outdata,mlen); } void replicateStateObjectToCluster(const ZT_StateObjectType type,const uint64_t id,const void *const data,const unsigned int len,const uint64_t everyoneBut) { - uint8_t *buf = new uint8_t[len + 34]; - try { - std::vector<uint64_t> sentTo; - if (everyoneBut) - sentTo.push_back(everyoneBut); - Mutex::Lock _l(_tcpConnections_m); - for(std::vector<TcpConnection *>::const_iterator ci(_tcpConnections.begin());ci!=_tcpConnections.end();++ci) { - TcpConnection *const c = *ci; - if ((c->type == TcpConnection::TCP_CLUSTER_BACKPLANE)&&(c->clusterMemberId != 0)&&(std::find(sentTo.begin(),sentTo.end(),c->clusterMemberId) == sentTo.end())) { - sentTo.push_back(c->clusterMemberId); - Mutex::Lock _l2(c->writeq_m); - - if (c->writeq.length() == 0) - _phy.setNotifyWritable(c->sock,true); - - const unsigned int mlen = len + 34; - c->writeq.push_back((char)((mlen >> 16) & 0xff)); - c->writeq.push_back((char)((mlen >> 8) & 0xff)); - c->writeq.push_back((char)(mlen & 0xff)); - - Utils::getSecureRandom(buf,16); - - buf[24] = (uint8_t)CLUSTER_MESSAGE_STATE_OBJECT; - buf[25] = (uint8_t)type; - buf[26] = (uint8_t)((id >> 56) & 0xff); - buf[27] = (uint8_t)((id >> 48) & 0xff); - buf[28] = (uint8_t)((id >> 40) & 0xff); - buf[29] = (uint8_t)((id >> 32) & 0xff); - buf[30] = (uint8_t)((id >> 24) & 0xff); - buf[31] = (uint8_t)((id >> 16) & 0xff); - buf[32] = (uint8_t)((id >> 8) & 0xff); - buf[33] = (uint8_t)(id & 0xff); - memcpy(buf + 34,data,len); - - uint8_t key[32]; - memcpy(key,_clusterKey,32); - for(int i=0;i<8;++i) key[i] ^= buf[i]; - Salsa20 s20(key,buf + 8); - - uint8_t macKey[32]; - uint8_t mac[16]; - memset(macKey,0,32); - s20.crypt12(macKey,macKey,32); - s20.crypt12(buf + 24,buf + 24,mlen - 24); - Poly1305::compute(mac,buf + 24,mlen - 24,macKey); - memcpy(buf + 16,mac,8); - - c->writeq.append(reinterpret_cast<char *>(buf),len + 34); - } + std::vector<uint64_t> sentTo; + if (everyoneBut) + sentTo.push_back(everyoneBut); + Mutex::Lock _l(_tcpConnections_m); + for(std::vector<TcpConnection *>::const_iterator ci(_tcpConnections.begin());ci!=_tcpConnections.end();++ci) { + TcpConnection *const c = *ci; + if ((c->type == TcpConnection::TCP_CLUSTER_BACKPLANE)&&(c->clusterMemberId != 0)&&(std::find(sentTo.begin(),sentTo.end(),c->clusterMemberId) == sentTo.end())) { + sentTo.push_back(c->clusterMemberId); + replicateStateObject(type,id,data,len,c); } - } catch ( ... ) {} // sanity check - delete [] buf; + } } void writeStateObject(enum ZT_StateObjectType type,uint64_t id,const void *data,int len) @@ -1955,15 +1988,48 @@ public: break; } if (p[0]) { - FILE *f = fopen(p,"w"); - if (f) { - if (fwrite(data,len,1,f) != 1) - fprintf(stderr,"WARNING: unable to write to file: %s (I/O error)" ZT_EOL_S,p); - fclose(f); - if (secure) - OSUtils::lockDownFile(p,false); + if (len >= 0) { + FILE *f = fopen(p,"w"); + if (f) { + if (fwrite(data,len,1,f) != 1) + fprintf(stderr,"WARNING: unable to write to file: %s (I/O error)" ZT_EOL_S,p); + fclose(f); + if (secure) + OSUtils::lockDownFile(p,false); + } else { + fprintf(stderr,"WARNING: unable to write to file: %s (unable to open)" ZT_EOL_S,p); + } } else { - fprintf(stderr,"WARNING: unable to write to file: %s (unable to open)" ZT_EOL_S,p); + OSUtils::rm(p); + } + } + } + + void sendMyCurrentClusterState(TcpConnection *tc) + { + // We currently don't need to dump everything. Networks and moons are most important. + // The rest will get caught up rapidly due to constant peer updates, etc. + std::string buf; + std::vector<std::string> l(OSUtils::listDirectory((_homePath + ZT_PATH_SEPARATOR_S + "networks.d").c_str(),false)); + for(std::vector<std::string>::const_iterator f(l.begin());f!=l.end();++f) { + buf.clear(); + if (OSUtils::readFile((_homePath + ZT_PATH_SEPARATOR_S + *f).c_str(),buf)) { + if (f->length() == 21) { + const uint64_t nwid = Utils::hexStrToU64(f->substr(0,16).c_str()); + if (nwid) + replicateStateObject(ZT_STATE_OBJECT_NETWORK_CONFIG,nwid,buf.data(),(int)buf.length(),tc); + } + } + } + l = OSUtils::listDirectory((_homePath + ZT_PATH_SEPARATOR_S + "moons.d").c_str(),false); + for(std::vector<std::string>::const_iterator f(l.begin());f!=l.end();++f) { + buf.clear(); + if (OSUtils::readFile((_homePath + ZT_PATH_SEPARATOR_S + *f).c_str(),buf)) { + if (f->length() == 21) { + const uint64_t moonId = Utils::hexStrToU64(f->substr(0,16).c_str()); + if (moonId) + replicateStateObject(ZT_STATE_OBJECT_MOON,moonId,buf.data(),(int)buf.length(),tc); + } } } } @@ -2015,6 +2081,12 @@ public: _tcpFallbackTunnel = tc; _phy.streamSend(sock,ZT_TCP_TUNNEL_HELLO,sizeof(ZT_TCP_TUNNEL_HELLO)); } else if (tc->type == TcpConnection::TCP_CLUSTER_BACKPLANE) { + { + Mutex::Lock _l(tc->writeq_m); + tc->writeq.push_back((char)0x93); // identifies type of connection as cluster backplane + } + announceStatusToClusterMember(tc); + _phy.setNotifyWritable(sock,true); } else { _phy.close(sock,true); } @@ -2035,7 +2107,7 @@ public: tc->type = TcpConnection::TCP_UNCATEGORIZED_INCOMING; tc->parent = this; tc->sock = sockN; - tc->from = from; + tc->remoteAddr = from; tc->lastReceive = OSUtils::now(); http_parser_init(&(tc->parser),HTTP_REQUEST); tc->parser.data = (void *)tc; @@ -2072,17 +2144,19 @@ public: switch(reinterpret_cast<uint8_t *>(data)[0]) { // 0x93 is first byte of cluster backplane connections case 0x93: { + // We only allow this from cluster backplane IPs. We also authenticate + // each packet cryptographically, so this is just a first line of defense. bool allow = false; { Mutex::Lock _l(_localConfig_m); for(std::vector< InetAddress >::const_iterator i(_clusterBackplaneAddresses.begin());i!=_clusterBackplaneAddresses.end();++i) { - if (tc->from.ipsEqual(*i)) { + if (tc->remoteAddr.ipsEqual(*i)) { allow = true; break; } } } - if (allow) { // note that we also auth each packet cryptographically -- this is just a first line sanity check + if (allow) { tc->type = TcpConnection::TCP_CLUSTER_BACKPLANE; tc->clusterMemberId = 0; // unknown, waiting for first status message announceStatusToClusterMember(tc); @@ -2097,15 +2171,17 @@ public: case 'G': case 'P': case 'H': { + // This is only allowed from IPs permitted to access the management + // backplane, which is just 127.0.0.1/::1 unless otherwise configured. bool allow; { Mutex::Lock _l(_localConfig_m); if (_allowManagementFrom.size() == 0) { - allow = (tc->from.ipScope() == InetAddress::IP_SCOPE_LOOPBACK); + allow = (tc->remoteAddr.ipScope() == InetAddress::IP_SCOPE_LOOPBACK); } else { allow = false; for(std::vector<InetAddress>::const_iterator i(_allowManagementFrom.begin());i!=_allowManagementFrom.end();++i) { - if (i->containsAddress(tc->from)) { + if (i->containsAddress(tc->remoteAddr)) { allow = true; break; } @@ -2240,11 +2316,15 @@ public: if (mlen > (25 + 16)) { Buffer<4096> tmp(data + 25,mlen - 25); try { - tc->clusterMemberId = tmp.at<uint64_t>(0); - if (tc->clusterMemberId == _clusterMemberId) { // shouldn't happen, but don't allow self-to-self + const uint64_t cmid = tmp.at<uint64_t>(0); + if (cmid == _clusterMemberId) { // shouldn't happen, but don't allow self-to-self _phy.close(sock); return; } + if (!tc->clusterMemberId) { + tc->clusterMemberId = cmid; + sendMyCurrentClusterState(tc); + } tc->clusterMemberVersionMajor = tmp.at<uint16_t>(8); tc->clusterMemberVersionMinor = tmp.at<uint16_t>(10); tc->clusterMemberVersionRev = tmp.at<uint16_t>(12); @@ -2255,7 +2335,10 @@ public: la.push_back(InetAddress()); ptr += la.back().deserialize(tmp,ptr); } - tc->clusterMemberLocalAddresses.swap(la); + { + Mutex::Lock _l2(tc->clusterMemberLocalAddresses_m); + tc->clusterMemberLocalAddresses.swap(la); + } } catch ( ... ) {} } break; @@ -2284,17 +2367,12 @@ public: Buffer<4096> tmp(data + 25,mlen - 25); try { InetAddress dest,src; - unsigned int ptr = dest.deserialize(tmp); + const unsigned int ttl = (unsigned int)tmp[0]; + unsigned int ptr = 1; + ptr += dest.deserialize(tmp); ptr += src.deserialize(tmp,ptr); - if (ptr < tmp.size()) { - bool local; - { - Mutex::Lock _l(_localInterfaceAddresses_m); - local = (std::find(_localInterfaceAddresses.begin(),_localInterfaceAddresses.end(),src) != _localInterfaceAddresses.end()); - } - if (local) - nodeWirePacketSendFunction(&src,&dest,reinterpret_cast<const uint8_t *>(tmp.data()) + ptr,tmp.size() - ptr,0); - } + if (ptr < tmp.size()) + _binder.udpSend(_phy,src,dest,reinterpret_cast<const uint8_t *>(tmp.data()) + ptr,tmp.size() - ptr,ttl); } catch ( ... ) {} } break; @@ -2539,24 +2617,8 @@ public: inline int nodeWirePacketSendFunction(const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl) { - unsigned int fromBindingNo = 0; - - if (addr->ss_family == AF_INET) { - if (reinterpret_cast<const struct sockaddr_in *>(localAddr)->sin_port == 0) { - // If sender is sending from wildcard (null address), choose the secondary backup - // port 1/4 of the time. (but only for IPv4) - fromBindingNo = (++_udpPortPickerCounter & 0x4) >> 2; - if (!_ports[fromBindingNo]) - fromBindingNo = 0; - } else { - const uint16_t lp = reinterpret_cast<const struct sockaddr_in *>(localAddr)->sin_port; - if (lp == _portsBE[1]) - fromBindingNo = 1; - else if (lp == _portsBE[2]) - fromBindingNo = 2; - } - #ifdef ZT_TCP_FALLBACK_RELAY + if (addr->ss_family == AF_INET) { // TCP fallback tunnel support, currently IPv4 only if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { // Engage TCP tunnel fallback if we haven't received anything valid from a global @@ -2579,40 +2641,45 @@ public: _tcpFallbackTunnel->writeq.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_port))),2); _tcpFallbackTunnel->writeq.append((const char *)data,len); } else if (((now - _lastSendToGlobalV4) < ZT_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobalV4) > (ZT_PING_CHECK_INVERVAL / 2))) { - bool connected = false; const InetAddress addr(ZT_TCP_FALLBACK_RELAY); - TcpConnection *tc = new TcpConnection(); { Mutex::Lock _l(_tcpConnections_m); _tcpConnections.push_back(tc); } - tc->type = TcpConnection::TCP_TUNNEL_OUTGOING; + tc->remoteAddr = addr; + tc->lastReceive = OSUtils::now(); tc->parent = this; tc->sock = (PhySocket *)0; // set in connect handler tc->messageSize = 0; - + bool connected = false; _phy.tcpConnect(reinterpret_cast<const struct sockaddr *>(&addr),connected,(void *)tc,true); } } _lastSendToGlobalV4 = now; } + } + // Even when relaying we still send via UDP. This way if UDP starts + // working we can instantly "fail forward" to it and stop using TCP + // proxy fallback, which is slow. #endif // ZT_TCP_FALLBACK_RELAY - } else if (addr->ss_family == AF_INET6) { - if (reinterpret_cast<const struct sockaddr_in6 *>(localAddr)->sin6_port != 0) { - const uint16_t lp = reinterpret_cast<const struct sockaddr_in6 *>(localAddr)->sin6_port; - if (lp == _portsBE[1]) - fromBindingNo = 1; - else if (lp == _portsBE[2]) - fromBindingNo = 2; - } - } else { - return -1; - } + switch (_binder.udpSend(_phy,*(reinterpret_cast<const InetAddress *>(localAddr)),*(reinterpret_cast<const InetAddress *>(addr)),data,len,ttl)) { + case -1: // local bound address not found, so see if a cluster peer owns it + if (localAddr->ss_family != 0) { + return (proxySendViaCluster(*(reinterpret_cast<const InetAddress *>(localAddr)),*(reinterpret_cast<const InetAddress *>(addr)),data,len,ttl)) ? 0 : -1; + } else { + return -1; // failure + } + break; + + case 0: // failure + return -1; - return (_bindings[fromBindingNo].udpSend(_phy,*(reinterpret_cast<const InetAddress *>(localAddr)),*(reinterpret_cast<const InetAddress *>(addr)),data,len,ttl)) ? 0 : -1; + default: // success + return 0; + } } inline void nodeVirtualNetworkFrameFunction(uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) @@ -2707,7 +2774,7 @@ public: // phyOnTcpData(). If we made it here the source IP is okay. try { - scode = handleControlPlaneHttpRequest(tc->from,tc->parser.method,tc->url,tc->headers,tc->readq,data,contentType); + scode = handleControlPlaneHttpRequest(tc->remoteAddr,tc->parser.method,tc->url,tc->headers,tc->readq,data,contentType); } catch (std::exception &exc) { fprintf(stderr,"WARNING: unexpected exception processing control HTTP request: %s" ZT_EOL_S,exc.what()); scode = 500; |