diff options
Diffstat (limited to 'service/OneService.cpp')
-rw-r--r-- | service/OneService.cpp | 441 |
1 files changed, 335 insertions, 106 deletions
diff --git a/service/OneService.cpp b/service/OneService.cpp index 670d5641..8e21fc5f 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -58,6 +58,8 @@ #include "OneService.hpp" #include "ControlPlane.hpp" +#include "ClusterGeoIpService.hpp" +#include "ClusterDefinition.hpp" /** * Uncomment to enable UDP breakage switch @@ -118,20 +120,20 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; } #define ZT_TAP_CHECK_MULTICAST_INTERVAL 30000 // Path under ZT1 home for controller database if controller is enabled -#define ZT1_CONTROLLER_DB_PATH "controller.db" +#define ZT_CONTROLLER_DB_PATH "controller.db" // TCP fallback relay host -- geo-distributed using Amazon Route53 geo-aware DNS -#define ZT1_TCP_FALLBACK_RELAY "tcp-fallback.zerotier.com" -#define ZT1_TCP_FALLBACK_RELAY_PORT 443 +#define ZT_TCP_FALLBACK_RELAY "tcp-fallback.zerotier.com" +#define ZT_TCP_FALLBACK_RELAY_PORT 443 // Frequency at which we re-resolve the TCP fallback relay -#define ZT1_TCP_FALLBACK_RERESOLVE_DELAY 86400000 +#define ZT_TCP_FALLBACK_RERESOLVE_DELAY 86400000 // Attempt to engage TCP fallback after this many ms of no reply to packets sent to global-scope IPs -#define ZT1_TCP_FALLBACK_AFTER 60000 +#define ZT_TCP_FALLBACK_AFTER 60000 // How often to check for local interface addresses -#define ZT1_LOCAL_INTERFACE_CHECK_INTERVAL 300000 +#define ZT_LOCAL_INTERFACE_CHECK_INTERVAL 300000 namespace ZeroTier { @@ -338,14 +340,38 @@ public: static BackgroundSoftwareUpdateChecker backgroundSoftwareUpdateChecker; #endif // ZT_AUTO_UPDATE +static std::string _trimString(const std::string &s) +{ + unsigned long end = (unsigned long)s.length(); + while (end) { + char c = s[end - 1]; + if ((c == ' ')||(c == '\r')||(c == '\n')||(!c)||(c == '\t')) + --end; + else break; + } + unsigned long start = 0; + while (start < end) { + char c = s[start]; + if ((c == ' ')||(c == '\r')||(c == '\n')||(!c)||(c == '\t')) + ++start; + else break; + } + return s.substr(start,end - start); +} + class OneServiceImpl; -static int SnodeVirtualNetworkConfigFunction(ZT1_Node *node,void *uptr,uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwconf); -static void SnodeEventCallback(ZT1_Node *node,void *uptr,enum ZT1_Event event,const void *metaData); -static long SnodeDataStoreGetFunction(ZT1_Node *node,void *uptr,const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize); -static int SnodeDataStorePutFunction(ZT1_Node *node,void *uptr,const char *name,const void *data,unsigned long len,int secure); -static int SnodeWirePacketSendFunction(ZT1_Node *node,void *uptr,const struct sockaddr_storage *addr,const void *data,unsigned int len); -static void SnodeVirtualNetworkFrameFunction(ZT1_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); +static int SnodeVirtualNetworkConfigFunction(ZT_Node *node,void *uptr,uint64_t nwid,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwconf); +static void SnodeEventCallback(ZT_Node *node,void *uptr,enum ZT_Event event,const void *metaData); +static long SnodeDataStoreGetFunction(ZT_Node *node,void *uptr,const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize); +static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,const void *data,unsigned long len,int secure); +static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl); +static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); + +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len); +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z); +#endif static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); @@ -396,57 +422,108 @@ struct TcpConnection Mutex writeBuf_m; }; +// Use a bigger buffer on AMD64 since these are likely to be bigger and +// servers. Otherwise use a smaller buffer. This makes no difference +// except under very high load. +#if (defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) || defined(__AMD64) || defined(__AMD64__)) +#define ZT_UDP_DESIRED_BUF_SIZE 1048576 +#else +#define ZT_UDP_DESIRED_BUF_SIZE 131072 +#endif + class OneServiceImpl : public OneService { public: - OneServiceImpl(const char *hp,unsigned int port,const char *overrideRootTopology) : - _homePath((hp) ? hp : "."), - _tcpFallbackResolver(ZT1_TCP_FALLBACK_RELAY), + OneServiceImpl(const char *hp,unsigned int port) : + _homePath((hp) ? hp : ".") + ,_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY) #ifdef ZT_ENABLE_NETWORK_CONTROLLER - _controller((_homePath + ZT_PATH_SEPARATOR_S + ZT1_CONTROLLER_DB_PATH).c_str()), + ,_controller((SqliteNetworkController *)0) #endif - _phy(this,false,true), - _overrideRootTopology((overrideRootTopology) ? overrideRootTopology : ""), - _node((Node *)0), - _controlPlane((ControlPlane *)0), - _lastDirectReceiveFromGlobal(0), - _lastSendToGlobal(0), - _lastRestart(0), - _nextBackgroundTaskDeadline(0), - _tcpFallbackTunnel((TcpConnection *)0), - _termReason(ONE_STILL_RUNNING), - _port(port), + ,_phy(this,false,true) + ,_node((Node *)0) + ,_controlPlane((ControlPlane *)0) + ,_lastDirectReceiveFromGlobal(0) + ,_lastSendToGlobal(0) + ,_lastRestart(0) + ,_nextBackgroundTaskDeadline(0) + ,_tcpFallbackTunnel((TcpConnection *)0) + ,_termReason(ONE_STILL_RUNNING) + ,_port(0) #ifdef ZT_USE_MINIUPNPC - _upnpClient((int)port), + ,_v4UpnpUdpSocket((PhySocket *)0) + ,_upnpClient((UPNPClient *)0) +#endif +#ifdef ZT_ENABLE_CLUSTER + ,_clusterMessageSocket((PhySocket *)0) + ,_clusterGeoIpService((ClusterGeoIpService *)0) + ,_clusterDefinition((ClusterDefinition *)0) + ,_clusterMemberId(0) #endif - _run(true) + ,_run(true) { - struct sockaddr_in in4; - struct sockaddr_in6 in6; - - ::memset((void *)&in4,0,sizeof(in4)); - in4.sin_family = AF_INET; - in4.sin_port = Utils::hton((uint16_t)port); - _v4UdpSocket = _phy.udpBind((const struct sockaddr *)&in4,this,131072); - if (!_v4UdpSocket) - throw std::runtime_error("cannot bind to port (UDP/IPv4)"); - in4.sin_addr.s_addr = Utils::hton((uint32_t)0x7f000001); // right now we just listen for TCP @localhost - _v4TcpListenSocket = _phy.tcpListen((const struct sockaddr *)&in4,this); - if (!_v4TcpListenSocket) { - _phy.close(_v4UdpSocket); - throw std::runtime_error("cannot bind to port (TCP/IPv4)"); + const int portTrials = (port == 0) ? 256 : 1; // if port is 0, pick random + for(int k=0;k<portTrials;++k) { + if (port == 0) { + unsigned int randp = 0; + Utils::getSecureRandom(&randp,sizeof(randp)); + port = 40000 + (randp % 25500); + } + + _v4LocalAddress = InetAddress((uint32_t)0,port); + _v4UdpSocket = _phy.udpBind((const struct sockaddr *)&_v4LocalAddress,reinterpret_cast<void *>(&_v4LocalAddress),ZT_UDP_DESIRED_BUF_SIZE); + + if (_v4UdpSocket) { + struct sockaddr_in in4; + memset(&in4,0,sizeof(in4)); + in4.sin_family = AF_INET; + in4.sin_addr.s_addr = Utils::hton((uint32_t)0x7f000001); // right now we just listen for TCP @localhost + in4.sin_port = Utils::hton((uint16_t)port); + _v4TcpListenSocket = _phy.tcpListen((const struct sockaddr *)&in4,this); + + if (_v4TcpListenSocket) { + _v6LocalAddress = InetAddress("\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",16,port); + _v6UdpSocket = _phy.udpBind((const struct sockaddr *)&_v6LocalAddress,reinterpret_cast<void *>(&_v6LocalAddress),ZT_UDP_DESIRED_BUF_SIZE); + + struct sockaddr_in6 in6; + memset((void *)&in6,0,sizeof(in6)); + in6.sin6_family = AF_INET6; + in6.sin6_port = in4.sin_port; + in6.sin6_addr.s6_addr[15] = 1; // IPv6 localhost == ::1 + _v6TcpListenSocket = _phy.tcpListen((const struct sockaddr *)&in6,this); + + _port = port; + break; // success! + } else { + _phy.close(_v4UdpSocket,false); + } + } + + port = 0; } - ::memset((void *)&in6,0,sizeof(in6)); - in6.sin6_family = AF_INET6; - in6.sin6_port = in4.sin_port; - _v6UdpSocket = _phy.udpBind((const struct sockaddr *)&in6,this,131072); - in6.sin6_addr.s6_addr[15] = 1; // listen for TCP only at localhost - _v6TcpListenSocket = _phy.tcpListen((const struct sockaddr *)&in6,this); + if (_port == 0) + throw std::runtime_error("cannot bind to port"); char portstr[64]; - Utils::snprintf(portstr,sizeof(portstr),"%u",port); + Utils::snprintf(portstr,sizeof(portstr),"%u",_port); OSUtils::writeFile((_homePath + ZT_PATH_SEPARATOR_S + "zerotier-one.port").c_str(),std::string(portstr)); + +#ifdef ZT_USE_MINIUPNPC + // Bind a secondary port for use with uPnP, since some NAT routers + // (cough Ubiquity Edge cough) barf up a lung if you do both conventional + // NAT-t and uPnP from behind the same port. I think this is a bug, but + // everyone else's router bugs are our problem. :P + for(int k=0;k<512;++k) { + const unsigned int upnport = 40000 + (((port + 1) * (k + 1)) % 25500); + _v4UpnpLocalAddress = InetAddress(0,upnport); + _v4UpnpUdpSocket = _phy.udpBind((const struct sockaddr *)&_v4UpnpLocalAddress,reinterpret_cast<void *>(&_v4UpnpLocalAddress),ZT_UDP_DESIRED_BUF_SIZE); + if (_v4UpnpUdpSocket) { + _upnpClient = new UPNPClient(upnport); + break; + } + } +#endif } virtual ~OneServiceImpl() @@ -455,6 +532,20 @@ public: _phy.close(_v6UdpSocket); _phy.close(_v4TcpListenSocket); _phy.close(_v6TcpListenSocket); +#ifdef ZT_ENABLE_CLUSTER + _phy.close(_clusterMessageSocket); +#endif +#ifdef ZT_USE_MINIUPNPC + _phy.close(_v4UpnpUdpSocket); + delete _upnpClient; +#endif +#ifdef ZT_ENABLE_NETWORK_CONTROLLER + delete _controller; +#endif +#ifdef ZT_ENABLE_CLUSTER + delete _clusterGeoIpService; + delete _clusterDefinition; +#endif } virtual ReasonForTermination run() @@ -477,7 +568,7 @@ public: } else OSUtils::lockDownFile(authTokenPath.c_str(),false); } } - authToken = Utils::trim(authToken); + authToken = _trimString(authToken); _node = new Node( OSUtils::now(), @@ -487,18 +578,82 @@ public: SnodeWirePacketSendFunction, SnodeVirtualNetworkFrameFunction, SnodeVirtualNetworkConfigFunction, - SnodeEventCallback, - ((_overrideRootTopology.length() > 0) ? _overrideRootTopology.c_str() : (const char *)0)); + SnodeEventCallback); #ifdef ZT_ENABLE_NETWORK_CONTROLLER - _node->setNetconfMaster((void *)&_controller); + _controller = new SqliteNetworkController(_node,(_homePath + ZT_PATH_SEPARATOR_S + ZT_CONTROLLER_DB_PATH).c_str(),(_homePath + ZT_PATH_SEPARATOR_S + "circuitTestResults.d").c_str()); + _node->setNetconfMaster((void *)_controller); +#endif + +#ifdef ZT_ENABLE_CLUSTER + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str())) { + _clusterDefinition = new ClusterDefinition(_node->address(),(_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str()); + if (_clusterDefinition->size() > 0) { + std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members()); + for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) { + PhySocket *cs = _phy.udpBind(reinterpret_cast<const struct sockaddr *>(&(m->clusterEndpoint))); + if (cs) { + if (_clusterMessageSocket) { + _phy.close(_clusterMessageSocket,false); + _phy.close(cs,false); + + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: able to bind more than one cluster message socket IP/port!"; + return _termReason; + } + _clusterMessageSocket = cs; + _clusterMemberId = m->id; + } + } + + if (!_clusterMessageSocket) { + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: unable to bind to any cluster message socket IP/port."; + return _termReason; + } + + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str())) + _clusterGeoIpService = new ClusterGeoIpService((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str()); + + const ClusterDefinition::MemberDefinition &me = (*_clusterDefinition)[_clusterMemberId]; + InetAddress endpoints[255]; + unsigned int numEndpoints = 0; + for(std::vector<InetAddress>::const_iterator i(me.zeroTierEndpoints.begin());i!=me.zeroTierEndpoints.end();++i) + endpoints[numEndpoints++] = *i; + + if (_node->clusterInit( + _clusterMemberId, + reinterpret_cast<const struct sockaddr_storage *>(endpoints), + numEndpoints, + me.x, + me.y, + me.z, + &SclusterSendFunction, + this, + (_clusterGeoIpService) ? &SclusterGeoIpFunction : 0, + this) == ZT_RESULT_OK) { + + std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members()); + for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) { + if (m->id != _clusterMemberId) + _node->clusterAddMember(m->id); + } + + } + } else { + delete _clusterDefinition; + _clusterDefinition = (ClusterDefinition *)0; + } + } #endif _controlPlane = new ControlPlane(this,_node,(_homePath + ZT_PATH_SEPARATOR_S + "ui").c_str()); _controlPlane->addAuthToken(authToken.c_str()); #ifdef ZT_ENABLE_NETWORK_CONTROLLER - _controlPlane->setController(&_controller); + _controlPlane->setController(_controller); #endif { // Remember networks from previous session @@ -510,12 +665,16 @@ public: } } + // Start two background threads to handle expensive ops out of line + Thread::start(_node); + Thread::start(_node); + _nextBackgroundTaskDeadline = 0; uint64_t clockShouldBe = OSUtils::now(); _lastRestart = clockShouldBe; uint64_t lastTapMulticastGroupCheck = 0; uint64_t lastTcpFallbackResolve = 0; - uint64_t lastLocalInterfaceAddressCheck = (OSUtils::now() - ZT1_LOCAL_INTERFACE_CHECK_INTERVAL) + 15000; // do this in 15s to give UPnP time to configure and other things time to settle + uint64_t lastLocalInterfaceAddressCheck = (OSUtils::now() - ZT_LOCAL_INTERFACE_CHECK_INTERVAL) + 15000; // do this in 15s to give UPnP time to configure and other things time to settle #ifdef ZT_AUTO_UPDATE uint64_t lastSoftwareUpdateCheck = 0; #endif // ZT_AUTO_UPDATE @@ -543,17 +702,17 @@ public: #ifdef ZT_AUTO_UPDATE if ((now - lastSoftwareUpdateCheck) >= ZT_AUTO_UPDATE_CHECK_PERIOD) { - lastSoftwareUpdateCheck = OSUtils::now(); + lastSoftwareUpdateCheck = now; Thread::start(&backgroundSoftwareUpdateChecker); } #endif // ZT_AUTO_UPDATE - if ((now - lastTcpFallbackResolve) >= ZT1_TCP_FALLBACK_RERESOLVE_DELAY) { + if ((now - lastTcpFallbackResolve) >= ZT_TCP_FALLBACK_RERESOLVE_DELAY) { lastTcpFallbackResolve = now; _tcpFallbackResolver.resolveNow(); } - if ((_tcpFallbackTunnel)&&((now - _lastDirectReceiveFromGlobal) < (ZT1_TCP_FALLBACK_AFTER / 2))) + if ((_tcpFallbackTunnel)&&((now - _lastDirectReceiveFromGlobal) < (ZT_TCP_FALLBACK_AFTER / 2))) _phy.close(_tcpFallbackTunnel->sock); if ((now - lastTapMulticastGroupCheck) >= ZT_TAP_CHECK_MULTICAST_INTERVAL) { @@ -569,7 +728,7 @@ public: } } - if ((now - lastLocalInterfaceAddressCheck) >= ZT1_LOCAL_INTERFACE_CHECK_INTERVAL) { + if ((now - lastLocalInterfaceAddressCheck) >= ZT_LOCAL_INTERFACE_CHECK_INTERVAL) { lastLocalInterfaceAddressCheck = now; #ifdef __UNIX_LIKE__ @@ -583,9 +742,9 @@ public: _node->clearLocalInterfaceAddresses(); #ifdef ZT_USE_MINIUPNPC - std::vector<InetAddress> upnpAddresses(_upnpClient.get()); + std::vector<InetAddress> upnpAddresses(_upnpClient->get()); for(std::vector<InetAddress>::const_iterator ext(upnpAddresses.begin());ext!=upnpAddresses.end();++ext) - _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*ext)),0,ZT1_LOCAL_INTERFACE_ADDRESS_TRUST_NORMAL); + _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*ext))); #endif struct ifaddrs *ifatbl = (struct ifaddrs *)0; @@ -603,7 +762,7 @@ public: if (!isZT) { InetAddress ip(ifa->ifa_addr); ip.setPort(_port); - _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip),0,ZT1_LOCAL_INTERFACE_ADDRESS_TRUST_NORMAL); + _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip)); } } ifa = ifa->ifa_next; @@ -637,7 +796,7 @@ public: while (ua) { InetAddress ip(ua->Address.lpSockaddr); ip.setPort(_port); - _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip),0,ZT1_LOCAL_INTERFACE_ADDRESS_TRUST_NORMAL); + _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip)); ua = ua->Next; } } @@ -719,19 +878,29 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { +#ifdef ZT_ENABLE_CLUSTER + if (sock == _clusterMessageSocket) { + _lastDirectReceiveFromGlobal = OSUtils::now(); + _node->clusterHandleIncomingMessage(data,len); + return; + } +#endif + #ifdef ZT_BREAK_UDP if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) return; #endif + if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); - ZT1_ResultCode rc = _node->processWirePacket( + ZT_ResultCode rc = _node->processWirePacket( OSUtils::now(), + reinterpret_cast<const struct sockaddr_storage *>(*uptr), (const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big data, len, &_nextBackgroundTaskDeadline); - if (ZT1_ResultCode_isFatal(rc)) { + if (ZT_ResultCode_isFatal(rc)) { char tmp[256]; Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); Mutex::Lock _l(_termReason_m); @@ -772,7 +941,7 @@ public: tc->writeBuf.push_back((char)ZEROTIER_ONE_VERSION_MINOR); tc->writeBuf.push_back((char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff)); tc->writeBuf.push_back((char)(ZEROTIER_ONE_VERSION_REVISION & 0xff)); - _phy.tcpSetNotifyWritable(sock,true); + _phy.setNotifyWritable(sock,true); _tcpFallbackTunnel = tc; } @@ -873,13 +1042,14 @@ public: } if (from) { - ZT1_ResultCode rc = _node->processWirePacket( + ZT_ResultCode rc = _node->processWirePacket( OSUtils::now(), + &ZT_SOCKADDR_NULL, reinterpret_cast<struct sockaddr_storage *>(&from), data, plen, &_nextBackgroundTaskDeadline); - if (ZT1_ResultCode_isFatal(rc)) { + if (ZT_ResultCode_isFatal(rc)) { char tmp[256]; Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); Mutex::Lock _l(_termReason_m); @@ -907,12 +1077,12 @@ public: TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr); Mutex::Lock _l(tc->writeBuf_m); if (tc->writeBuf.length() > 0) { - long sent = (long)_phy.tcpSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true); + long sent = (long)_phy.streamSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true); if (sent > 0) { tc->lastActivity = OSUtils::now(); if ((unsigned long)sent >= (unsigned long)tc->writeBuf.length()) { tc->writeBuf = ""; - _phy.tcpSetNotifyWritable(sock,false); + _phy.setNotifyWritable(sock,false); if (!tc->shouldKeepAlive) _phy.close(sock); // will call close handler to delete from _tcpConnections } else { @@ -920,16 +1090,22 @@ public: } } } else { - _phy.tcpSetNotifyWritable(sock,false); + _phy.setNotifyWritable(sock,false); } } - inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwc) + inline void phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) {} + inline void phyOnUnixClose(PhySocket *sock,void **uptr) {} + inline void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} + inline void phyOnUnixWritable(PhySocket *sock,void **uptr) {} + inline void phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) {} + + inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwc) { Mutex::Lock _l(_taps_m); std::map< uint64_t,EthernetTap * >::iterator t(_taps.find(nwid)); switch(op) { - case ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_UP: + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_UP: if (t == _taps.end()) { try { char friendlyName[1024]; @@ -959,7 +1135,7 @@ public: } } // fall through... - case ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_CONFIG_UPDATE: + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_CONFIG_UPDATE: if (t != _taps.end()) { t->second->setEnabled(nwc->enabled != 0); @@ -971,19 +1147,21 @@ public: newAssignedIps.erase(std::unique(newAssignedIps.begin(),newAssignedIps.end()),newAssignedIps.end()); for(std::vector<InetAddress>::iterator ip(newAssignedIps.begin());ip!=newAssignedIps.end();++ip) { if (!std::binary_search(assignedIps.begin(),assignedIps.end(),*ip)) - t->second->addIp(*ip); + if (!t->second->addIp(*ip)) + fprintf(stderr,"ERROR: unable to add ip address %s"ZT_EOL_S, ip->toString().c_str()); } for(std::vector<InetAddress>::iterator ip(assignedIps.begin());ip!=assignedIps.end();++ip) { if (!std::binary_search(newAssignedIps.begin(),newAssignedIps.end(),*ip)) - t->second->removeIp(*ip); + if (!t->second->removeIp(*ip)) + fprintf(stderr,"ERROR: unable to remove ip address %s"ZT_EOL_S, ip->toString().c_str()); } assignedIps.swap(newAssignedIps); } else { return -999; // tap init failed } break; - case ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_DOWN: - case ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY: + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DOWN: + case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY: if (t != _taps.end()) { #ifdef __WINDOWS__ std::string winInstanceId(t->second->instanceId()); @@ -992,7 +1170,7 @@ public: _taps.erase(t); _tapAssignedIps.erase(nwid); #ifdef __WINDOWS__ - if ((op == ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY)&&(winInstanceId.length() > 0)) + if ((op == ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY)&&(winInstanceId.length() > 0)) WindowsEthernetTap::deletePersistentTapDevice(winInstanceId.c_str()); #endif } @@ -1001,17 +1179,17 @@ public: return 0; } - inline void nodeEventCallback(enum ZT1_Event event,const void *metaData) + inline void nodeEventCallback(enum ZT_Event event,const void *metaData) { switch(event) { - case ZT1_EVENT_FATAL_ERROR_IDENTITY_COLLISION: { + case ZT_EVENT_FATAL_ERROR_IDENTITY_COLLISION: { Mutex::Lock _l(_termReason_m); _termReason = ONE_IDENTITY_COLLISION; _fatalErrorMessage = "identity/address collision"; this->terminate(); } break; - case ZT1_EVENT_TRACE: { + case ZT_EVENT_TRACE: { if (metaData) { ::fprintf(stderr,"%s"ZT_EOL_S,(const char *)metaData); ::fflush(stderr); @@ -1077,33 +1255,59 @@ public: } } - inline int nodeWirePacketSendFunction(const struct sockaddr_storage *addr,const void *data,unsigned int len) + inline int nodeWirePacketSendFunction(const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl) { +#ifdef ZT_USE_MINIUPNPC + if ((localAddr->ss_family == AF_INET)&&(reinterpret_cast<const struct sockaddr_in *>(localAddr)->sin_port == reinterpret_cast<const struct sockaddr_in *>(&_v4UpnpLocalAddress)->sin_port)) { +#ifdef ZT_BREAK_UDP + if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { +#endif + if (addr->ss_family == AF_INET) { + if (ttl) + _phy.setIp4UdpTtl(_v4UpnpUdpSocket,ttl); + const int result = ((_phy.udpSend(_v4UpnpUdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); + if (ttl) + _phy.setIp4UdpTtl(_v4UpnpUdpSocket,255); + return result; + } else { + return -1; + } +#ifdef ZT_BREAK_UDP + } +#endif + } +#endif // ZT_USE_MINIUPNPC + int result = -1; switch(addr->ss_family) { case AF_INET: #ifdef ZT_BREAK_UDP if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { #endif - if (_v4UdpSocket) - result = ((_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); + if (_v4UdpSocket) { + if (ttl) + _phy.setIp4UdpTtl(_v4UdpSocket,ttl); + result = ((_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); + if (ttl) + _phy.setIp4UdpTtl(_v4UdpSocket,255); + } #ifdef ZT_BREAK_UDP } #endif -#ifdef ZT1_TCP_FALLBACK_RELAY +#ifdef ZT_TCP_FALLBACK_RELAY // TCP fallback tunnel support if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { uint64_t now = OSUtils::now(); // Engage TCP tunnel fallback if we haven't received anything valid from a global - // IP address in ZT1_TCP_FALLBACK_AFTER milliseconds. If we do start getting + // IP address in ZT_TCP_FALLBACK_AFTER milliseconds. If we do start getting // valid direct traffic we'll stop using it and close the socket after a while. - if (((now - _lastDirectReceiveFromGlobal) > ZT1_TCP_FALLBACK_AFTER)&&((now - _lastRestart) > ZT1_TCP_FALLBACK_AFTER)) { + if (((now - _lastDirectReceiveFromGlobal) > ZT_TCP_FALLBACK_AFTER)&&((now - _lastRestart) > ZT_TCP_FALLBACK_AFTER)) { if (_tcpFallbackTunnel) { Mutex::Lock _l(_tcpFallbackTunnel->writeBuf_m); if (!_tcpFallbackTunnel->writeBuf.length()) - _phy.tcpSetNotifyWritable(_tcpFallbackTunnel->sock,true); + _phy.setNotifyWritable(_tcpFallbackTunnel->sock,true); unsigned long mlen = len + 7; _tcpFallbackTunnel->writeBuf.push_back((char)0x17); _tcpFallbackTunnel->writeBuf.push_back((char)0x03); @@ -1115,7 +1319,7 @@ public: _tcpFallbackTunnel->writeBuf.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_port))),2); _tcpFallbackTunnel->writeBuf.append((const char *)data,len); result = 0; - } else if (((now - _lastSendToGlobal) < ZT1_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > (ZT_PING_CHECK_INVERVAL / 2))) { + } else if (((now - _lastSendToGlobal) < ZT_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > (ZT_PING_CHECK_INVERVAL / 2))) { std::vector<InetAddress> tunnelIps(_tcpFallbackResolver.get()); if (tunnelIps.empty()) { if (!_tcpFallbackResolver.running()) @@ -1123,7 +1327,7 @@ public: } else { bool connected = false; InetAddress addr(tunnelIps[(unsigned long)now % tunnelIps.size()]); - addr.setPort(ZT1_TCP_FALLBACK_RELAY_PORT); + addr.setPort(ZT_TCP_FALLBACK_RELAY_PORT); _phy.tcpConnect(reinterpret_cast<const struct sockaddr *>(&addr),connected); } } @@ -1131,9 +1335,10 @@ public: _lastSendToGlobal = now; } -#endif // ZT1_TCP_FALLBACK_RELAY +#endif // ZT_TCP_FALLBACK_RELAY break; + case AF_INET6: #ifdef ZT_BREAK_UDP if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { @@ -1144,6 +1349,7 @@ public: } #endif break; + default: return -1; } @@ -1206,7 +1412,7 @@ public: tc->writeBuf.append(data); } - _phy.tcpSetNotifyWritable(tc->sock,true); + _phy.setNotifyWritable(tc->sock,true); } inline void onHttpResponseFromClient(TcpConnection *tc) @@ -1215,7 +1421,6 @@ public: _phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections } -private: std::string _dataStorePrepPath(const char *name) const { std::string p(_homePath); @@ -1236,11 +1441,11 @@ private: const std::string _homePath; BackgroundResolver _tcpFallbackResolver; #ifdef ZT_ENABLE_NETWORK_CONTROLLER - SqliteNetworkController _controller; + SqliteNetworkController *_controller; #endif Phy<OneServiceImpl *> _phy; - std::string _overrideRootTopology; Node *_node; + InetAddress _v4LocalAddress,_v6LocalAddress; PhySocket *_v4UdpSocket; PhySocket *_v6UdpSocket; PhySocket *_v4TcpListenSocket; @@ -1265,26 +1470,50 @@ private: unsigned int _port; #ifdef ZT_USE_MINIUPNPC - UPNPClient _upnpClient; + InetAddress _v4UpnpLocalAddress; + PhySocket *_v4UpnpUdpSocket; + UPNPClient *_upnpClient; +#endif + +#ifdef ZT_ENABLE_CLUSTER + PhySocket *_clusterMessageSocket; + ClusterGeoIpService *_clusterGeoIpService; + ClusterDefinition *_clusterDefinition; + unsigned int _clusterMemberId; #endif bool _run; Mutex _run_m; }; -static int SnodeVirtualNetworkConfigFunction(ZT1_Node *node,void *uptr,uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwconf) +static int SnodeVirtualNetworkConfigFunction(ZT_Node *node,void *uptr,uint64_t nwid,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwconf) { return reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkConfigFunction(nwid,op,nwconf); } -static void SnodeEventCallback(ZT1_Node *node,void *uptr,enum ZT1_Event event,const void *metaData) +static void SnodeEventCallback(ZT_Node *node,void *uptr,enum ZT_Event event,const void *metaData) { reinterpret_cast<OneServiceImpl *>(uptr)->nodeEventCallback(event,metaData); } -static long SnodeDataStoreGetFunction(ZT1_Node *node,void *uptr,const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize) +static long SnodeDataStoreGetFunction(ZT_Node *node,void *uptr,const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize) { return reinterpret_cast<OneServiceImpl *>(uptr)->nodeDataStoreGetFunction(name,buf,bufSize,readIndex,totalSize); } -static int SnodeDataStorePutFunction(ZT1_Node *node,void *uptr,const char *name,const void *data,unsigned long len,int secure) +static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,const void *data,unsigned long len,int secure) { return reinterpret_cast<OneServiceImpl *>(uptr)->nodeDataStorePutFunction(name,data,len,secure); } -static int SnodeWirePacketSendFunction(ZT1_Node *node,void *uptr,const struct sockaddr_storage *addr,const void *data,unsigned int len) -{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeWirePacketSendFunction(addr,data,len); } -static void SnodeVirtualNetworkFrameFunction(ZT1_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) +static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl) +{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeWirePacketSendFunction(localAddr,addr,data,len,ttl); } +static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); } +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len) +{ + OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); + const ClusterDefinition::MemberDefinition &md = (*(impl->_clusterDefinition))[toMemberId]; + if (md.clusterEndpoint) + impl->_phy.udpSend(impl->_clusterMessageSocket,reinterpret_cast<const struct sockaddr *>(&(md.clusterEndpoint)),data,len); +} +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z) +{ + OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); + return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z)); +} +#endif + static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); } @@ -1433,7 +1662,7 @@ std::string OneService::autoUpdateUrl() return std::string(); } -OneService *OneService::newInstance(const char *hp,unsigned int port,const char *overrideRootTopology) { return new OneServiceImpl(hp,port,overrideRootTopology); } +OneService *OneService::newInstance(const char *hp,unsigned int port) { return new OneServiceImpl(hp,port); } OneService::~OneService() {} } // namespace ZeroTier |