diff options
| author | Grant Limberg <glimberg@gmail.com> | 2015-11-02 18:32:21 -0800 |
|---|---|---|
| committer | Grant Limberg <glimberg@gmail.com> | 2015-11-02 18:32:21 -0800 |
| commit | 179b064b05460b706526a3249a25cd07232a42f5 (patch) | |
| tree | d26dfeb2c02ce59c4a531142684695e45bb515ad /service/OneService.cpp | |
| parent | 75a191a8564030f4d5e99aca76b980e2d69abd20 (diff) | |
| parent | 4e9d4304761f93a1764d3ec2d2b0c38140decad8 (diff) | |
| download | infinitytier-179b064b05460b706526a3249a25cd07232a42f5.tar.gz infinitytier-179b064b05460b706526a3249a25cd07232a42f5.zip | |
Merge branch 'edge' into android-jni-dev
Diffstat (limited to 'service/OneService.cpp')
| -rw-r--r-- | service/OneService.cpp | 207 |
1 files changed, 168 insertions, 39 deletions
diff --git a/service/OneService.cpp b/service/OneService.cpp index 6e6de8bd..8c8ff1ed 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -58,6 +58,8 @@ #include "OneService.hpp" #include "ControlPlane.hpp" +#include "ClusterGeoIpService.hpp" +#include "ClusterDefinition.hpp" /** * Uncomment to enable UDP breakage switch @@ -338,6 +340,25 @@ public: static BackgroundSoftwareUpdateChecker backgroundSoftwareUpdateChecker; #endif // ZT_AUTO_UPDATE +static std::string _trimString(const std::string &s) +{ + unsigned long end = (unsigned long)s.length(); + while (end) { + char c = s[end - 1]; + if ((c == ' ')||(c == '\r')||(c == '\n')||(!c)||(c == '\t')) + --end; + else break; + } + unsigned long start = 0; + while (start < end) { + char c = s[start]; + if ((c == ' ')||(c == '\r')||(c == '\n')||(!c)||(c == '\t')) + ++start; + else break; + } + return s.substr(start,end - start); +} + class OneServiceImpl; static int SnodeVirtualNetworkConfigFunction(ZT_Node *node,void *uptr,uint64_t nwid,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwconf); @@ -347,6 +368,11 @@ static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,c static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len); static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len); +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z); +#endif + static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); static int ShttpOnMessageBegin(http_parser *parser); @@ -399,28 +425,33 @@ struct TcpConnection class OneServiceImpl : public OneService { public: - OneServiceImpl(const char *hp,unsigned int port,const char *overrideRootTopology) : - _homePath((hp) ? hp : "."), - _tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY), + OneServiceImpl(const char *hp,unsigned int port) : + _homePath((hp) ? hp : ".") + ,_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY) #ifdef ZT_ENABLE_NETWORK_CONTROLLER - _controller((_homePath + ZT_PATH_SEPARATOR_S + ZT_CONTROLLER_DB_PATH).c_str()), + ,_controller((SqliteNetworkController *)0) #endif - _phy(this,false,true), - _overrideRootTopology((overrideRootTopology) ? overrideRootTopology : ""), - _node((Node *)0), - _controlPlane((ControlPlane *)0), - _lastDirectReceiveFromGlobal(0), - _lastSendToGlobal(0), - _lastRestart(0), - _nextBackgroundTaskDeadline(0), - _tcpFallbackTunnel((TcpConnection *)0), - _termReason(ONE_STILL_RUNNING), - _port(0), + ,_phy(this,false,true) + ,_node((Node *)0) + ,_controlPlane((ControlPlane *)0) + ,_lastDirectReceiveFromGlobal(0) + ,_lastSendToGlobal(0) + ,_lastRestart(0) + ,_nextBackgroundTaskDeadline(0) + ,_tcpFallbackTunnel((TcpConnection *)0) + ,_termReason(ONE_STILL_RUNNING) + ,_port(0) #ifdef ZT_USE_MINIUPNPC - _v4UpnpUdpSocket((PhySocket *)0), - _upnpClient((UPNPClient *)0), + ,_v4UpnpUdpSocket((PhySocket *)0) + ,_upnpClient((UPNPClient *)0) +#endif +#ifdef ZT_ENABLE_CLUSTER + ,_clusterMessageSocket((PhySocket *)0) + ,_clusterGeoIpService((ClusterGeoIpService *)0) + ,_clusterDefinition((ClusterDefinition *)0) + ,_clusterMemberId(0) #endif - _run(true) + ,_run(true) { const int portTrials = (port == 0) ? 256 : 1; // if port is 0, pick random for(int k=0;k<portTrials;++k) { @@ -470,15 +501,12 @@ public: OSUtils::writeFile((_homePath + ZT_PATH_SEPARATOR_S + "zerotier-one.port").c_str(),std::string(portstr)); #ifdef ZT_USE_MINIUPNPC - // Bind a random secondary port for use with uPnP, since some NAT routers + // Bind a secondary port for use with uPnP, since some NAT routers // (cough Ubiquity Edge cough) barf up a lung if you do both conventional // NAT-t and uPnP from behind the same port. I think this is a bug, but // everyone else's router bugs are our problem. :P - for(int k=0;k<256;++k) { - unsigned int randp = 0; - Utils::getSecureRandom(&randp,sizeof(randp)); - unsigned int upnport = 40000 + (randp % 25500); - + for(int k=0;k<512;++k) { + const unsigned int upnport = 40000 + (((port + 1) * (k + 1)) % 25500); _v4UpnpLocalAddress = InetAddress(0,upnport); _v4UpnpUdpSocket = _phy.udpBind((const struct sockaddr *)&_v4UpnpLocalAddress,reinterpret_cast<void *>(&_v4UpnpLocalAddress),131072); if (_v4UpnpUdpSocket) { @@ -495,10 +523,20 @@ public: _phy.close(_v6UdpSocket); _phy.close(_v4TcpListenSocket); _phy.close(_v6TcpListenSocket); +#ifdef ZT_ENABLE_CLUSTER + _phy.close(_clusterMessageSocket); +#endif #ifdef ZT_USE_MINIUPNPC _phy.close(_v4UpnpUdpSocket); delete _upnpClient; #endif +#ifdef ZT_ENABLE_NETWORK_CONTROLLER + delete _controller; +#endif +#ifdef ZT_ENABLE_CLUSTER + delete _clusterGeoIpService; + delete _clusterDefinition; +#endif } virtual ReasonForTermination run() @@ -521,7 +559,7 @@ public: } else OSUtils::lockDownFile(authTokenPath.c_str(),false); } } - authToken = Utils::trim(authToken); + authToken = _trimString(authToken); _node = new Node( OSUtils::now(), @@ -531,18 +569,82 @@ public: SnodeWirePacketSendFunction, SnodeVirtualNetworkFrameFunction, SnodeVirtualNetworkConfigFunction, - SnodeEventCallback, - ((_overrideRootTopology.length() > 0) ? _overrideRootTopology.c_str() : (const char *)0)); + SnodeEventCallback); #ifdef ZT_ENABLE_NETWORK_CONTROLLER - _node->setNetconfMaster((void *)&_controller); + _controller = new SqliteNetworkController(_node,(_homePath + ZT_PATH_SEPARATOR_S + ZT_CONTROLLER_DB_PATH).c_str(),(_homePath + ZT_PATH_SEPARATOR_S + "circuitTestResults.d").c_str()); + _node->setNetconfMaster((void *)_controller); +#endif + +#ifdef ZT_ENABLE_CLUSTER + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str())) { + _clusterDefinition = new ClusterDefinition(_node->address(),(_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str()); + if (_clusterDefinition->size() > 0) { + std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members()); + for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) { + PhySocket *cs = _phy.udpBind(reinterpret_cast<const struct sockaddr *>(&(m->clusterEndpoint))); + if (cs) { + if (_clusterMessageSocket) { + _phy.close(_clusterMessageSocket,false); + _phy.close(cs,false); + + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: able to bind more than one cluster message socket IP/port!"; + return _termReason; + } + _clusterMessageSocket = cs; + _clusterMemberId = m->id; + } + } + + if (!_clusterMessageSocket) { + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: unable to bind to any cluster message socket IP/port."; + return _termReason; + } + + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str())) + _clusterGeoIpService = new ClusterGeoIpService((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str()); + + const ClusterDefinition::MemberDefinition &me = (*_clusterDefinition)[_clusterMemberId]; + InetAddress endpoints[255]; + unsigned int numEndpoints = 0; + for(std::vector<InetAddress>::const_iterator i(me.zeroTierEndpoints.begin());i!=me.zeroTierEndpoints.end();++i) + endpoints[numEndpoints++] = *i; + + if (_node->clusterInit( + _clusterMemberId, + reinterpret_cast<const struct sockaddr_storage *>(endpoints), + numEndpoints, + me.x, + me.y, + me.z, + &SclusterSendFunction, + this, + (_clusterGeoIpService) ? &SclusterGeoIpFunction : 0, + this) == ZT_RESULT_OK) { + + std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members()); + for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) { + if (m->id != _clusterMemberId) + _node->clusterAddMember(m->id); + } + + } + } else { + delete _clusterDefinition; + _clusterDefinition = (ClusterDefinition *)0; + } + } #endif _controlPlane = new ControlPlane(this,_node,(_homePath + ZT_PATH_SEPARATOR_S + "ui").c_str()); _controlPlane->addAuthToken(authToken.c_str()); #ifdef ZT_ENABLE_NETWORK_CONTROLLER - _controlPlane->setController(&_controller); + _controlPlane->setController(_controller); #endif { // Remember networks from previous session @@ -629,7 +731,7 @@ public: #ifdef ZT_USE_MINIUPNPC std::vector<InetAddress> upnpAddresses(_upnpClient->get()); for(std::vector<InetAddress>::const_iterator ext(upnpAddresses.begin());ext!=upnpAddresses.end();++ext) - _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*ext)),0,ZT_LOCAL_INTERFACE_ADDRESS_TRUST_NORMAL); + _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*ext))); #endif struct ifaddrs *ifatbl = (struct ifaddrs *)0; @@ -647,7 +749,7 @@ public: if (!isZT) { InetAddress ip(ifa->ifa_addr); ip.setPort(_port); - _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip),0,ZT_LOCAL_INTERFACE_ADDRESS_TRUST_NORMAL); + _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip)); } } ifa = ifa->ifa_next; @@ -763,10 +865,19 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { +#ifdef ZT_ENABLE_CLUSTER + if (sock == _clusterMessageSocket) { + _lastDirectReceiveFromGlobal = OSUtils::now(); + _node->clusterHandleIncomingMessage(data,len); + return; + } +#endif + #ifdef ZT_BREAK_UDP if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) return; #endif + if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); ZT_ResultCode rc = _node->processWirePacket( @@ -920,7 +1031,7 @@ public: if (from) { ZT_ResultCode rc = _node->processWirePacket( OSUtils::now(), - 0, + &ZT_SOCKADDR_NULL, reinterpret_cast<struct sockaddr_storage *>(&from), data, plen, @@ -974,9 +1085,7 @@ public: inline void phyOnUnixClose(PhySocket *sock,void **uptr) {} inline void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} inline void phyOnUnixWritable(PhySocket *sock,void **uptr) {} - inline void phyOnSocketPairEndpointClose(PhySocket *sock,void **uptr) {} - inline void phyOnSocketPairEndpointData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} - inline void phyOnSocketPairEndpointWritable(PhySocket *sock,void **uptr) {} + inline void phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) {} inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwc) { @@ -1285,7 +1394,6 @@ public: _phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections } -private: std::string _dataStorePrepPath(const char *name) const { std::string p(_homePath); @@ -1306,10 +1414,9 @@ private: const std::string _homePath; BackgroundResolver _tcpFallbackResolver; #ifdef ZT_ENABLE_NETWORK_CONTROLLER - SqliteNetworkController _controller; + SqliteNetworkController *_controller; #endif Phy<OneServiceImpl *> _phy; - std::string _overrideRootTopology; Node *_node; InetAddress _v4LocalAddress,_v6LocalAddress; PhySocket *_v4UdpSocket; @@ -1341,6 +1448,13 @@ private: UPNPClient *_upnpClient; #endif +#ifdef ZT_ENABLE_CLUSTER + PhySocket *_clusterMessageSocket; + ClusterGeoIpService *_clusterGeoIpService; + ClusterDefinition *_clusterDefinition; + unsigned int _clusterMemberId; +#endif + bool _run; Mutex _run_m; }; @@ -1358,6 +1472,21 @@ static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct soc static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); } +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len) +{ + OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); + const ClusterDefinition::MemberDefinition &md = (*(impl->_clusterDefinition))[toMemberId]; + if (md.clusterEndpoint) + impl->_phy.udpSend(impl->_clusterMessageSocket,reinterpret_cast<const struct sockaddr *>(&(md.clusterEndpoint)),data,len); +} +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z) +{ + OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); + return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z)); +} +#endif + static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); } @@ -1506,7 +1635,7 @@ std::string OneService::autoUpdateUrl() return std::string(); } -OneService *OneService::newInstance(const char *hp,unsigned int port,const char *overrideRootTopology) { return new OneServiceImpl(hp,port,overrideRootTopology); } +OneService *OneService::newInstance(const char *hp,unsigned int port) { return new OneServiceImpl(hp,port); } OneService::~OneService() {} } // namespace ZeroTier |
