diff options
Diffstat (limited to 'service')
-rw-r--r-- | service/ClusterDefinition.hpp | 125 | ||||
-rw-r--r-- | service/ClusterGeoIpService.cpp | 196 | ||||
-rw-r--r-- | service/ClusterGeoIpService.hpp | 94 | ||||
-rw-r--r-- | service/ControlPlane.cpp | 36 | ||||
-rw-r--r-- | service/OneService.cpp | 147 | ||||
-rw-r--r-- | service/OneService.hpp | 3 |
6 files changed, 582 insertions, 19 deletions
diff --git a/service/ClusterDefinition.hpp b/service/ClusterDefinition.hpp new file mode 100644 index 00000000..d02894e4 --- /dev/null +++ b/service/ClusterDefinition.hpp @@ -0,0 +1,125 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifndef ZT_CLUSTERDEFINITION_HPP +#define ZT_CLUSTERDEFINITION_HPP + +#ifdef ZT_ENABLE_CLUSTER + +#include <vector> +#include <algorithm> + +#include "../node/Constants.hpp" +#include "../node/Utils.hpp" +#include "../osdep/OSUtils.hpp" + +namespace ZeroTier { + +/** + * Parser for cluster definition file + */ +class ClusterDefinition +{ +public: + struct MemberDefinition + { + MemberDefinition() : id(0),x(0),y(0),z(0) { name[0] = (char)0; } + + unsigned int id; + int x,y,z; + char name[256]; + InetAddress clusterEndpoint; + std::vector<InetAddress> zeroTierEndpoints; + }; + + ClusterDefinition(uint64_t myAddress,const char *pathToClusterFile) + { + std::string cf; + if (!OSUtils::readFile(pathToClusterFile,cf)) + return; + + char myAddressStr[64]; + Utils::snprintf(myAddressStr,sizeof(myAddressStr),"%.10llx",myAddress); + + std::vector<std::string> lines(Utils::split(cf.c_str(),"\r\n","","")); + for(std::vector<std::string>::iterator l(lines.begin());l!=lines.end();++l) { + std::vector<std::string> fields(Utils::split(l->c_str()," \t","","")); + if ((fields.size() < 5)||(fields[0][0] == '#')||(fields[0] != myAddressStr)) + continue; + + int id = Utils::strToUInt(fields[1].c_str()); + if ((id < 0)||(id > ZT_CLUSTER_MAX_MEMBERS)) + continue; + MemberDefinition &md = _md[id]; + + md.id = (unsigned int)id; + if (fields.size() >= 6) { + std::vector<std::string> xyz(Utils::split(fields[5].c_str(),",","","")); + md.x = (xyz.size() > 0) ? Utils::strToInt(xyz[0].c_str()) : 0; + md.y = (xyz.size() > 1) ? Utils::strToInt(xyz[1].c_str()) : 0; + md.z = (xyz.size() > 2) ? Utils::strToInt(xyz[2].c_str()) : 0; + } + Utils::scopy(md.name,sizeof(md.name),fields[2].c_str()); + md.clusterEndpoint.fromString(fields[3]); + if (!md.clusterEndpoint) + continue; + std::vector<std::string> zips(Utils::split(fields[4].c_str(),",","","")); + for(std::vector<std::string>::iterator zip(zips.begin());zip!=zips.end();++zip) { + InetAddress i; + i.fromString(*zip); + if (i) + md.zeroTierEndpoints.push_back(i); + } + + _ids.push_back((unsigned int)id); + } + + std::sort(_ids.begin(),_ids.end()); + } + + inline const MemberDefinition &operator[](unsigned int id) const throw() { return _md[id]; } + inline unsigned int size() const throw() { return (unsigned int)_ids.size(); } + inline const std::vector<unsigned int> &ids() const throw() { return _ids; } + + inline std::vector<MemberDefinition> members() const + { + std::vector<MemberDefinition> m; + for(std::vector<unsigned int>::const_iterator i(_ids.begin());i!=_ids.end();++i) + m.push_back(_md[*i]); + return m; + } + +private: + MemberDefinition _md[ZT_CLUSTER_MAX_MEMBERS]; + std::vector<unsigned int> _ids; +}; + +} // namespace ZeroTier + +#endif // ZT_ENABLE_CLUSTER + +#endif diff --git a/service/ClusterGeoIpService.cpp b/service/ClusterGeoIpService.cpp new file mode 100644 index 00000000..9baa7506 --- /dev/null +++ b/service/ClusterGeoIpService.cpp @@ -0,0 +1,196 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifdef ZT_ENABLE_CLUSTER + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/wait.h> +#include <signal.h> +#include <errno.h> + +#include <iostream> + +#include "ClusterGeoIpService.hpp" +#include "../node/Utils.hpp" +#include "../osdep/OSUtils.hpp" + +#define ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL (60 * 60 * 1000) + +namespace ZeroTier { + +ClusterGeoIpService::ClusterGeoIpService(const char *pathToExe) : + _pathToExe(pathToExe), + _sOutputFd(-1), + _sInputFd(-1), + _sPid(0), + _run(true) +{ + _thread = Thread::start(this); +} + +ClusterGeoIpService::~ClusterGeoIpService() +{ + _run = false; + long p = _sPid; + if (p > 0) { + ::kill(p,SIGTERM); + Thread::sleep(500); + ::kill(p,SIGKILL); + } + Thread::join(_thread); +} + +bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z) +{ + InetAddress ipNoPort(ip); + ipNoPort.setPort(0); // we index cache by IP only + const uint64_t now = OSUtils::now(); + + bool r = false; + { + Mutex::Lock _l(_cache_m); + std::map< InetAddress,_CE >::iterator c(_cache.find(ipNoPort)); + if (c != _cache.end()) { + x = c->second.x; + y = c->second.y; + z = c->second.z; + if ((now - c->second.ts) < ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL) + return true; + else r = true; // return true but refresh as well + } + } + + { + Mutex::Lock _l(_sOutputLock); + if (_sOutputFd >= 0) { + std::string ips(ipNoPort.toIpString()); + ips.push_back('\n'); + //fprintf(stderr,"ClusterGeoIpService: << %s",ips.c_str()); + ::write(_sOutputFd,ips.data(),ips.length()); + } + } + + return r; +} + +void ClusterGeoIpService::threadMain() + throw() +{ + char linebuf[65536]; + char buf[65536]; + long n,lineptr; + + while (_run) { + { + Mutex::Lock _l(_sOutputLock); + + _sOutputFd = -1; + _sInputFd = -1; + _sPid = 0; + + int stdinfds[2] = { 0,0 }; // sub-process's stdin, our output + int stdoutfds[2] = { 0,0 }; // sub-process's stdout, our input + ::pipe(stdinfds); + ::pipe(stdoutfds); + + long p = (long)::vfork(); + if (p < 0) { + Thread::sleep(500); + continue; + } else if (p == 0) { + ::close(stdinfds[1]); + ::close(stdoutfds[0]); + ::dup2(stdinfds[0],STDIN_FILENO); + ::dup2(stdoutfds[1],STDOUT_FILENO); + ::execl(_pathToExe.c_str(),_pathToExe.c_str(),(const char *)0); + ::exit(1); + } else { + ::close(stdinfds[0]); + ::close(stdoutfds[1]); + _sOutputFd = stdinfds[1]; + _sInputFd = stdoutfds[0]; + _sPid = p; + } + } + + lineptr = 0; + while (_run) { + n = ::read(_sInputFd,buf,sizeof(buf)); + if (n <= 0) { + if (errno == EINTR) + continue; + else break; + } + for(long i=0;i<n;++i) { + if (lineptr > (long)sizeof(linebuf)) + lineptr = 0; + if ((buf[i] == '\n')||(buf[i] == '\r')) { + linebuf[lineptr] = (char)0; + if (lineptr > 0) { + //fprintf(stderr,"ClusterGeoIpService: >> %s\n",linebuf); + try { + std::vector<std::string> result(Utils::split(linebuf,",","","")); + if ((result.size() >= 7)&&(result[1] == "1")) { + InetAddress rip(result[0],0); + if ((rip.ss_family == AF_INET)||(rip.ss_family == AF_INET6)) { + _CE ce; + ce.ts = OSUtils::now(); + ce.x = (int)::strtol(result[4].c_str(),(char **)0,10); + ce.y = (int)::strtol(result[5].c_str(),(char **)0,10); + ce.z = (int)::strtol(result[6].c_str(),(char **)0,10); + //fprintf(stderr,"ClusterGeoIpService: %s is at %d,%d,%d\n",rip.toIpString().c_str(),ce.x,ce.y,ce.z); + { + Mutex::Lock _l2(_cache_m); + _cache[rip] = ce; + } + } + } + } catch ( ... ) {} + } + lineptr = 0; + } else linebuf[lineptr++] = buf[i]; + } + } + + ::close(_sOutputFd); + ::close(_sInputFd); + ::kill(_sPid,SIGTERM); + Thread::sleep(250); + ::kill(_sPid,SIGKILL); + ::waitpid(_sPid,(int *)0,0); + } +} + +} // namespace ZeroTier + +#endif // ZT_ENABLE_CLUSTER diff --git a/service/ClusterGeoIpService.hpp b/service/ClusterGeoIpService.hpp new file mode 100644 index 00000000..fd04ba1d --- /dev/null +++ b/service/ClusterGeoIpService.hpp @@ -0,0 +1,94 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifndef ZT_CLUSTERGEOIPSERVICE_HPP +#define ZT_CLUSTERGEOIPSERVICE_HPP + +#ifdef ZT_ENABLE_CLUSTER + +#include <vector> +#include <map> +#include <string> + +#include "../node/Constants.hpp" +#include "../node/InetAddress.hpp" +#include "../node/Mutex.hpp" +#include "../osdep/Thread.hpp" + +namespace ZeroTier { + +/** + * Runs the Cluster GeoIP service in the background and resolves geoIP queries + */ +class ClusterGeoIpService +{ +public: + /** + * @param pathToExe Path to cluster geo-resolution service executable + */ + ClusterGeoIpService(const char *pathToExe); + + ~ClusterGeoIpService(); + + /** + * Attempt to locate an IP + * + * This returns true if x, y, and z are set. Otherwise it returns false + * and a geo-locate job is ordered in the background. This usually takes + * 500-1500ms to complete, after which time results will be available. + * If false is returned the supplied coordinate variables are unchanged. + * + * @param ip IPv4 or IPv6 address + * @param x Reference to variable to receive X + * @param y Reference to variable to receive Y + * @param z Reference to variable to receive Z + * @return True if coordinates were set + */ + bool locate(const InetAddress &ip,int &x,int &y,int &z); + + void threadMain() + throw(); + +private: + const std::string _pathToExe; + int _sOutputFd; + int _sInputFd; + volatile long _sPid; + volatile bool _run; + Thread _thread; + Mutex _sOutputLock; + + struct _CE { uint64_t ts; int x,y,z; }; + std::map< InetAddress,_CE > _cache; + Mutex _cache_m; +}; + +} // namespace ZeroTier + +#endif // ZT_ENABLE_CLUSTER + +#endif diff --git a/service/ControlPlane.cpp b/service/ControlPlane.cpp index 7affb08c..31eca7b6 100644 --- a/service/ControlPlane.cpp +++ b/service/ControlPlane.cpp @@ -354,8 +354,38 @@ unsigned int ControlPlane::handleRequest( if (ps[0] == "status") { responseContentType = "application/json"; + ZT_NodeStatus status; _node->status(&status); + + std::string clusterJson; +#ifdef ZT_ENABLE_CLUSTER + { + ZT_ClusterStatus cs; + _node->clusterStatus(&cs); + + if (cs.clusterSize >= 1) { + char t[4096]; + Utils::snprintf(t,sizeof(t),"{\n\t\t\"myId\": %u,\n\t\t\"clusterSize\": %u,\n\t\t\"members: [\n",cs.myId,cs.clusterSize); + clusterJson.append(t); + for(unsigned int i=0;i<cs.clusterSize;++i) { + Utils::snprintf(t,sizeof(t),"\t\t\t{\n\t\t\t\t\"id\": %u,\n\t\t\t\t\"msSinceLastHeartbeat\": %u,\n\t\t\t\t\"alive\": %s,\n\t\t\t\t\"x\": %d,\n\t\t\t\t\"y\": %d,\n\t\t\t\t\"z\": %d,\n\t\t\t\t\"load\": %llu\n\t\t\t\t\"peers\": %llu\n\t\t\t}%s", + cs.members[i].id, + cs.members[i].msSinceLastHeartbeat, + (cs.members[i].alive != 0) ? "true" : "false", + cs.members[i].x, + cs.members[i].y, + cs.members[i].z, + cs.members[i].load, + cs.members[i].peers, + (i == (cs.clusterSize - 1)) ? "," : ""); + clusterJson.append(t); + } + clusterJson.append(" ]\n\t\t}"); + } + } +#endif + Utils::snprintf(json,sizeof(json), "{\n" "\t\"address\": \"%.10llx\",\n" @@ -368,7 +398,8 @@ unsigned int ControlPlane::handleRequest( "\t\"versionMinor\": %d,\n" "\t\"versionRev\": %d,\n" "\t\"version\": \"%d.%d.%d\",\n" - "\t\"clock\": %llu\n" + "\t\"clock\": %llu,\n" + "\t\"cluster\": %s\n" "}\n", status.address, status.publicIdentity, @@ -380,7 +411,8 @@ unsigned int ControlPlane::handleRequest( ZEROTIER_ONE_VERSION_MINOR, ZEROTIER_ONE_VERSION_REVISION, ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION, - (unsigned long long)OSUtils::now()); + (unsigned long long)OSUtils::now(), + ((clusterJson.length() > 0) ? clusterJson.c_str() : "null")); responseBody = json; scode = 200; } else if (ps[0] == "config") { diff --git a/service/OneService.cpp b/service/OneService.cpp index 6b28c41e..729812ed 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -58,6 +58,8 @@ #include "OneService.hpp" #include "ControlPlane.hpp" +#include "ClusterGeoIpService.hpp" +#include "ClusterDefinition.hpp" /** * Uncomment to enable UDP breakage switch @@ -366,6 +368,11 @@ static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,c static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len); static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len); +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z); +#endif + static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); static int ShttpOnMessageBegin(http_parser *parser); @@ -419,26 +426,32 @@ class OneServiceImpl : public OneService { public: OneServiceImpl(const char *hp,unsigned int port) : - _homePath((hp) ? hp : "."), - _tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY), + _homePath((hp) ? hp : ".") + ,_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY) #ifdef ZT_ENABLE_NETWORK_CONTROLLER - _controller((SqliteNetworkController *)0), + ,_controller((SqliteNetworkController *)0) #endif - _phy(this,false,true), - _node((Node *)0), - _controlPlane((ControlPlane *)0), - _lastDirectReceiveFromGlobal(0), - _lastSendToGlobal(0), - _lastRestart(0), - _nextBackgroundTaskDeadline(0), - _tcpFallbackTunnel((TcpConnection *)0), - _termReason(ONE_STILL_RUNNING), - _port(0), + ,_phy(this,false,true) + ,_node((Node *)0) + ,_controlPlane((ControlPlane *)0) + ,_lastDirectReceiveFromGlobal(0) + ,_lastSendToGlobal(0) + ,_lastRestart(0) + ,_nextBackgroundTaskDeadline(0) + ,_tcpFallbackTunnel((TcpConnection *)0) + ,_termReason(ONE_STILL_RUNNING) + ,_port(0) #ifdef ZT_USE_MINIUPNPC - _v4UpnpUdpSocket((PhySocket *)0), - _upnpClient((UPNPClient *)0), + ,_v4UpnpUdpSocket((PhySocket *)0) + ,_upnpClient((UPNPClient *)0) #endif - _run(true) +#ifdef ZT_ENABLE_CLUSTER + ,_clusterMessageSocket((PhySocket *)0) + ,_clusterGeoIpService((ClusterGeoIpService *)0) + ,_clusterDefinition((ClusterDefinition *)0) + ,_clusterMemberId(0) +#endif + ,_run(true) { const int portTrials = (port == 0) ? 256 : 1; // if port is 0, pick random for(int k=0;k<portTrials;++k) { @@ -510,6 +523,9 @@ public: _phy.close(_v6UdpSocket); _phy.close(_v4TcpListenSocket); _phy.close(_v6TcpListenSocket); +#ifdef ZT_ENABLE_CLUSTER + _phy.close(_clusterMessageSocket); +#endif #ifdef ZT_USE_MINIUPNPC _phy.close(_v4UpnpUdpSocket); delete _upnpClient; @@ -517,6 +533,10 @@ public: #ifdef ZT_ENABLE_NETWORK_CONTROLLER delete _controller; #endif +#ifdef ZT_ENABLE_CLUSTER + delete _clusterGeoIpService; + delete _clusterDefinition; +#endif } virtual ReasonForTermination run() @@ -556,6 +576,70 @@ public: _node->setNetconfMaster((void *)_controller); #endif +#ifdef ZT_ENABLE_CLUSTER + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str())) { + _clusterDefinition = new ClusterDefinition(_node->address(),(_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str()); + if (_clusterDefinition->size() > 0) { + std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members()); + for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) { + PhySocket *cs = _phy.udpBind(reinterpret_cast<const struct sockaddr *>(&(m->clusterEndpoint))); + if (cs) { + if (_clusterMessageSocket) { + _phy.close(_clusterMessageSocket,false); + _phy.close(cs,false); + + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: able to bind more than one cluster message socket IP/port!"; + return _termReason; + } + _clusterMessageSocket = cs; + _clusterMemberId = m->id; + } + } + + if (!_clusterMessageSocket) { + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: unable to bind to any cluster message socket IP/port."; + return _termReason; + } + + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str())) + _clusterGeoIpService = new ClusterGeoIpService((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str()); + + const ClusterDefinition::MemberDefinition &me = (*_clusterDefinition)[_clusterMemberId]; + InetAddress endpoints[255]; + unsigned int numEndpoints = 0; + for(std::vector<InetAddress>::const_iterator i(me.zeroTierEndpoints.begin());i!=me.zeroTierEndpoints.end();++i) + endpoints[numEndpoints++] = *i; + + if (_node->clusterInit( + _clusterMemberId, + reinterpret_cast<const struct sockaddr_storage *>(endpoints), + numEndpoints, + me.x, + me.y, + me.z, + &SclusterSendFunction, + this, + (_clusterGeoIpService) ? &SclusterGeoIpFunction : 0, + this) == ZT_RESULT_OK) { + + std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members()); + for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) { + if (m->id != _clusterMemberId) + _node->clusterAddMember(m->id); + } + + } + } else { + delete _clusterDefinition; + _clusterDefinition = (ClusterDefinition *)0; + } + } +#endif + _controlPlane = new ControlPlane(this,_node,(_homePath + ZT_PATH_SEPARATOR_S + "ui").c_str()); _controlPlane->addAuthToken(authToken.c_str()); @@ -781,10 +865,18 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { +#ifdef ZT_ENABLE_CLUSTER + if (sock == _clusterMessageSocket) { + _node->clusterHandleIncomingMessage(data,len); + return; + } +#endif + #ifdef ZT_BREAK_UDP if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) return; #endif + if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); ZT_ResultCode rc = _node->processWirePacket( @@ -1303,7 +1395,6 @@ public: _phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections } -private: std::string _dataStorePrepPath(const char *name) const { std::string p(_homePath); @@ -1358,6 +1449,13 @@ private: UPNPClient *_upnpClient; #endif +#ifdef ZT_ENABLE_CLUSTER + PhySocket *_clusterMessageSocket; + ClusterGeoIpService *_clusterGeoIpService; + ClusterDefinition *_clusterDefinition; + unsigned int _clusterMemberId; +#endif + bool _run; Mutex _run_m; }; @@ -1375,6 +1473,21 @@ static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct soc static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); } +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len) +{ + OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); + const ClusterDefinition::MemberDefinition &md = (*(impl->_clusterDefinition))[toMemberId]; + if (md.clusterEndpoint) + impl->_phy.udpSend(impl->_clusterMessageSocket,reinterpret_cast<const struct sockaddr *>(&(md.clusterEndpoint)),data,len); +} +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z) +{ + OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); + return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z)); +} +#endif + static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); } diff --git a/service/OneService.hpp b/service/OneService.hpp index 2f76ebaa..4f7b988b 100644 --- a/service/OneService.hpp +++ b/service/OneService.hpp @@ -43,6 +43,9 @@ namespace ZeroTier { * periodically checked and updates are automatically downloaded, verified * against a built-in list of update signing keys, and installed. This is * only supported for certain platforms. + * + * If built with ZT_ENABLE_CLUSTER, a 'cluster' file is checked and if + * present is read to determine the identity of other cluster members. */ class OneService { |