diff options
Diffstat (limited to 'service/OneService.cpp')
| -rw-r--r-- | service/OneService.cpp | 147 |
1 files changed, 130 insertions, 17 deletions
diff --git a/service/OneService.cpp b/service/OneService.cpp index 6b28c41e..729812ed 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -58,6 +58,8 @@ #include "OneService.hpp" #include "ControlPlane.hpp" +#include "ClusterGeoIpService.hpp" +#include "ClusterDefinition.hpp" /** * Uncomment to enable UDP breakage switch @@ -366,6 +368,11 @@ static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,c static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len); static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len); +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z); +#endif + static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); static int ShttpOnMessageBegin(http_parser *parser); @@ -419,26 +426,32 @@ class OneServiceImpl : public OneService { public: OneServiceImpl(const char *hp,unsigned int port) : - _homePath((hp) ? hp : "."), - _tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY), + _homePath((hp) ? hp : ".") + ,_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY) #ifdef ZT_ENABLE_NETWORK_CONTROLLER - _controller((SqliteNetworkController *)0), + ,_controller((SqliteNetworkController *)0) #endif - _phy(this,false,true), - _node((Node *)0), - _controlPlane((ControlPlane *)0), - _lastDirectReceiveFromGlobal(0), - _lastSendToGlobal(0), - _lastRestart(0), - _nextBackgroundTaskDeadline(0), - _tcpFallbackTunnel((TcpConnection *)0), - _termReason(ONE_STILL_RUNNING), - _port(0), + ,_phy(this,false,true) + ,_node((Node *)0) + ,_controlPlane((ControlPlane *)0) + ,_lastDirectReceiveFromGlobal(0) + ,_lastSendToGlobal(0) + ,_lastRestart(0) + ,_nextBackgroundTaskDeadline(0) + ,_tcpFallbackTunnel((TcpConnection *)0) + ,_termReason(ONE_STILL_RUNNING) + ,_port(0) #ifdef ZT_USE_MINIUPNPC - _v4UpnpUdpSocket((PhySocket *)0), - _upnpClient((UPNPClient *)0), + ,_v4UpnpUdpSocket((PhySocket *)0) + ,_upnpClient((UPNPClient *)0) #endif - _run(true) +#ifdef ZT_ENABLE_CLUSTER + ,_clusterMessageSocket((PhySocket *)0) + ,_clusterGeoIpService((ClusterGeoIpService *)0) + ,_clusterDefinition((ClusterDefinition *)0) + ,_clusterMemberId(0) +#endif + ,_run(true) { const int portTrials = (port == 0) ? 256 : 1; // if port is 0, pick random for(int k=0;k<portTrials;++k) { @@ -510,6 +523,9 @@ public: _phy.close(_v6UdpSocket); _phy.close(_v4TcpListenSocket); _phy.close(_v6TcpListenSocket); +#ifdef ZT_ENABLE_CLUSTER + _phy.close(_clusterMessageSocket); +#endif #ifdef ZT_USE_MINIUPNPC _phy.close(_v4UpnpUdpSocket); delete _upnpClient; @@ -517,6 +533,10 @@ public: #ifdef ZT_ENABLE_NETWORK_CONTROLLER delete _controller; #endif +#ifdef ZT_ENABLE_CLUSTER + delete _clusterGeoIpService; + delete _clusterDefinition; +#endif } virtual ReasonForTermination run() @@ -556,6 +576,70 @@ public: _node->setNetconfMaster((void *)_controller); #endif +#ifdef ZT_ENABLE_CLUSTER + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str())) { + _clusterDefinition = new ClusterDefinition(_node->address(),(_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str()); + if (_clusterDefinition->size() > 0) { + std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members()); + for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) { + PhySocket *cs = _phy.udpBind(reinterpret_cast<const struct sockaddr *>(&(m->clusterEndpoint))); + if (cs) { + if (_clusterMessageSocket) { + _phy.close(_clusterMessageSocket,false); + _phy.close(cs,false); + + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: able to bind more than one cluster message socket IP/port!"; + return _termReason; + } + _clusterMessageSocket = cs; + _clusterMemberId = m->id; + } + } + + if (!_clusterMessageSocket) { + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: unable to bind to any cluster message socket IP/port."; + return _termReason; + } + + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str())) + _clusterGeoIpService = new ClusterGeoIpService((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str()); + + const ClusterDefinition::MemberDefinition &me = (*_clusterDefinition)[_clusterMemberId]; + InetAddress endpoints[255]; + unsigned int numEndpoints = 0; + for(std::vector<InetAddress>::const_iterator i(me.zeroTierEndpoints.begin());i!=me.zeroTierEndpoints.end();++i) + endpoints[numEndpoints++] = *i; + + if (_node->clusterInit( + _clusterMemberId, + reinterpret_cast<const struct sockaddr_storage *>(endpoints), + numEndpoints, + me.x, + me.y, + me.z, + &SclusterSendFunction, + this, + (_clusterGeoIpService) ? &SclusterGeoIpFunction : 0, + this) == ZT_RESULT_OK) { + + std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members()); + for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) { + if (m->id != _clusterMemberId) + _node->clusterAddMember(m->id); + } + + } + } else { + delete _clusterDefinition; + _clusterDefinition = (ClusterDefinition *)0; + } + } +#endif + _controlPlane = new ControlPlane(this,_node,(_homePath + ZT_PATH_SEPARATOR_S + "ui").c_str()); _controlPlane->addAuthToken(authToken.c_str()); @@ -781,10 +865,18 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { +#ifdef ZT_ENABLE_CLUSTER + if (sock == _clusterMessageSocket) { + _node->clusterHandleIncomingMessage(data,len); + return; + } +#endif + #ifdef ZT_BREAK_UDP if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) return; #endif + if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); ZT_ResultCode rc = _node->processWirePacket( @@ -1303,7 +1395,6 @@ public: _phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections } -private: std::string _dataStorePrepPath(const char *name) const { std::string p(_homePath); @@ -1358,6 +1449,13 @@ private: UPNPClient *_upnpClient; #endif +#ifdef ZT_ENABLE_CLUSTER + PhySocket *_clusterMessageSocket; + ClusterGeoIpService *_clusterGeoIpService; + ClusterDefinition *_clusterDefinition; + unsigned int _clusterMemberId; +#endif + bool _run; Mutex _run_m; }; @@ -1375,6 +1473,21 @@ static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct soc static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); } +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len) +{ + OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); + const ClusterDefinition::MemberDefinition &md = (*(impl->_clusterDefinition))[toMemberId]; + if (md.clusterEndpoint) + impl->_phy.udpSend(impl->_clusterMessageSocket,reinterpret_cast<const struct sockaddr *>(&(md.clusterEndpoint)),data,len); +} +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z) +{ + OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); + return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z)); +} +#endif + static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); } |
