diff options
Diffstat (limited to 'service')
-rw-r--r-- | service/ClusterDefinition.hpp | 125 | ||||
-rw-r--r-- | service/ClusterGeoIpService.cpp | 197 | ||||
-rw-r--r-- | service/ClusterGeoIpService.hpp | 94 | ||||
-rw-r--r-- | service/ControlPlane.cpp | 44 | ||||
-rw-r--r-- | service/OneService.cpp | 214 | ||||
-rw-r--r-- | service/OneService.hpp | 7 |
6 files changed, 634 insertions, 47 deletions
diff --git a/service/ClusterDefinition.hpp b/service/ClusterDefinition.hpp new file mode 100644 index 00000000..d02894e4 --- /dev/null +++ b/service/ClusterDefinition.hpp @@ -0,0 +1,125 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifndef ZT_CLUSTERDEFINITION_HPP +#define ZT_CLUSTERDEFINITION_HPP + +#ifdef ZT_ENABLE_CLUSTER + +#include <vector> +#include <algorithm> + +#include "../node/Constants.hpp" +#include "../node/Utils.hpp" +#include "../osdep/OSUtils.hpp" + +namespace ZeroTier { + +/** + * Parser for cluster definition file + */ +class ClusterDefinition +{ +public: + struct MemberDefinition + { + MemberDefinition() : id(0),x(0),y(0),z(0) { name[0] = (char)0; } + + unsigned int id; + int x,y,z; + char name[256]; + InetAddress clusterEndpoint; + std::vector<InetAddress> zeroTierEndpoints; + }; + + ClusterDefinition(uint64_t myAddress,const char *pathToClusterFile) + { + std::string cf; + if (!OSUtils::readFile(pathToClusterFile,cf)) + return; + + char myAddressStr[64]; + Utils::snprintf(myAddressStr,sizeof(myAddressStr),"%.10llx",myAddress); + + std::vector<std::string> lines(Utils::split(cf.c_str(),"\r\n","","")); + for(std::vector<std::string>::iterator l(lines.begin());l!=lines.end();++l) { + std::vector<std::string> fields(Utils::split(l->c_str()," \t","","")); + if ((fields.size() < 5)||(fields[0][0] == '#')||(fields[0] != myAddressStr)) + continue; + + int id = Utils::strToUInt(fields[1].c_str()); + if ((id < 0)||(id > ZT_CLUSTER_MAX_MEMBERS)) + continue; + MemberDefinition &md = _md[id]; + + md.id = (unsigned int)id; + if (fields.size() >= 6) { + std::vector<std::string> xyz(Utils::split(fields[5].c_str(),",","","")); + md.x = (xyz.size() > 0) ? Utils::strToInt(xyz[0].c_str()) : 0; + md.y = (xyz.size() > 1) ? Utils::strToInt(xyz[1].c_str()) : 0; + md.z = (xyz.size() > 2) ? Utils::strToInt(xyz[2].c_str()) : 0; + } + Utils::scopy(md.name,sizeof(md.name),fields[2].c_str()); + md.clusterEndpoint.fromString(fields[3]); + if (!md.clusterEndpoint) + continue; + std::vector<std::string> zips(Utils::split(fields[4].c_str(),",","","")); + for(std::vector<std::string>::iterator zip(zips.begin());zip!=zips.end();++zip) { + InetAddress i; + i.fromString(*zip); + if (i) + md.zeroTierEndpoints.push_back(i); + } + + _ids.push_back((unsigned int)id); + } + + std::sort(_ids.begin(),_ids.end()); + } + + inline const MemberDefinition &operator[](unsigned int id) const throw() { return _md[id]; } + inline unsigned int size() const throw() { return (unsigned int)_ids.size(); } + inline const std::vector<unsigned int> &ids() const throw() { return _ids; } + + inline std::vector<MemberDefinition> members() const + { + std::vector<MemberDefinition> m; + for(std::vector<unsigned int>::const_iterator i(_ids.begin());i!=_ids.end();++i) + m.push_back(_md[*i]); + return m; + } + +private: + MemberDefinition _md[ZT_CLUSTER_MAX_MEMBERS]; + std::vector<unsigned int> _ids; +}; + +} // namespace ZeroTier + +#endif // ZT_ENABLE_CLUSTER + +#endif diff --git a/service/ClusterGeoIpService.cpp b/service/ClusterGeoIpService.cpp new file mode 100644 index 00000000..f646fe99 --- /dev/null +++ b/service/ClusterGeoIpService.cpp @@ -0,0 +1,197 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifdef ZT_ENABLE_CLUSTER + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/wait.h> +#include <signal.h> +#include <errno.h> + +#include <iostream> + +#include "ClusterGeoIpService.hpp" +#include "../node/Utils.hpp" +#include "../osdep/OSUtils.hpp" + +// 120 days +#define ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL 10368000000ULL + +namespace ZeroTier { + +ClusterGeoIpService::ClusterGeoIpService(const char *pathToExe) : + _pathToExe(pathToExe), + _sOutputFd(-1), + _sInputFd(-1), + _sPid(0), + _run(true) +{ + _thread = Thread::start(this); +} + +ClusterGeoIpService::~ClusterGeoIpService() +{ + _run = false; + long p = _sPid; + if (p > 0) { + ::kill(p,SIGTERM); + Thread::sleep(500); + ::kill(p,SIGKILL); + } + Thread::join(_thread); +} + +bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z) +{ + InetAddress ipNoPort(ip); + ipNoPort.setPort(0); // we index cache by IP only + const uint64_t now = OSUtils::now(); + + bool r = false; + { + Mutex::Lock _l(_cache_m); + std::map< InetAddress,_CE >::iterator c(_cache.find(ipNoPort)); + if (c != _cache.end()) { + x = c->second.x; + y = c->second.y; + z = c->second.z; + if ((now - c->second.ts) < ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL) + return true; + else r = true; // return true but refresh as well + } + } + + { + Mutex::Lock _l(_sOutputLock); + if (_sOutputFd >= 0) { + std::string ips(ipNoPort.toIpString()); + ips.push_back('\n'); + //fprintf(stderr,"ClusterGeoIpService: << %s",ips.c_str()); + ::write(_sOutputFd,ips.data(),ips.length()); + } + } + + return r; +} + +void ClusterGeoIpService::threadMain() + throw() +{ + char linebuf[65536]; + char buf[65536]; + long n,lineptr; + + while (_run) { + { + Mutex::Lock _l(_sOutputLock); + + _sOutputFd = -1; + _sInputFd = -1; + _sPid = 0; + + int stdinfds[2] = { 0,0 }; // sub-process's stdin, our output + int stdoutfds[2] = { 0,0 }; // sub-process's stdout, our input + ::pipe(stdinfds); + ::pipe(stdoutfds); + + long p = (long)::vfork(); + if (p < 0) { + Thread::sleep(500); + continue; + } else if (p == 0) { + ::close(stdinfds[1]); + ::close(stdoutfds[0]); + ::dup2(stdinfds[0],STDIN_FILENO); + ::dup2(stdoutfds[1],STDOUT_FILENO); + ::execl(_pathToExe.c_str(),_pathToExe.c_str(),(const char *)0); + ::exit(1); + } else { + ::close(stdinfds[0]); + ::close(stdoutfds[1]); + _sOutputFd = stdinfds[1]; + _sInputFd = stdoutfds[0]; + _sPid = p; + } + } + + lineptr = 0; + while (_run) { + n = ::read(_sInputFd,buf,sizeof(buf)); + if (n <= 0) { + if (errno == EINTR) + continue; + else break; + } + for(long i=0;i<n;++i) { + if (lineptr > (long)sizeof(linebuf)) + lineptr = 0; + if ((buf[i] == '\n')||(buf[i] == '\r')) { + linebuf[lineptr] = (char)0; + if (lineptr > 0) { + //fprintf(stderr,"ClusterGeoIpService: >> %s\n",linebuf); + try { + std::vector<std::string> result(Utils::split(linebuf,",","","")); + if ((result.size() >= 7)&&(result[1] == "1")) { + InetAddress rip(result[0],0); + if ((rip.ss_family == AF_INET)||(rip.ss_family == AF_INET6)) { + _CE ce; + ce.ts = OSUtils::now(); + ce.x = (int)::strtol(result[4].c_str(),(char **)0,10); + ce.y = (int)::strtol(result[5].c_str(),(char **)0,10); + ce.z = (int)::strtol(result[6].c_str(),(char **)0,10); + //fprintf(stderr,"ClusterGeoIpService: %s is at %d,%d,%d\n",rip.toIpString().c_str(),ce.x,ce.y,ce.z); + { + Mutex::Lock _l2(_cache_m); + _cache[rip] = ce; + } + } + } + } catch ( ... ) {} + } + lineptr = 0; + } else linebuf[lineptr++] = buf[i]; + } + } + + ::close(_sOutputFd); + ::close(_sInputFd); + ::kill(_sPid,SIGTERM); + Thread::sleep(250); + ::kill(_sPid,SIGKILL); + ::waitpid(_sPid,(int *)0,0); + } +} + +} // namespace ZeroTier + +#endif // ZT_ENABLE_CLUSTER diff --git a/service/ClusterGeoIpService.hpp b/service/ClusterGeoIpService.hpp new file mode 100644 index 00000000..fd04ba1d --- /dev/null +++ b/service/ClusterGeoIpService.hpp @@ -0,0 +1,94 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifndef ZT_CLUSTERGEOIPSERVICE_HPP +#define ZT_CLUSTERGEOIPSERVICE_HPP + +#ifdef ZT_ENABLE_CLUSTER + +#include <vector> +#include <map> +#include <string> + +#include "../node/Constants.hpp" +#include "../node/InetAddress.hpp" +#include "../node/Mutex.hpp" +#include "../osdep/Thread.hpp" + +namespace ZeroTier { + +/** + * Runs the Cluster GeoIP service in the background and resolves geoIP queries + */ +class ClusterGeoIpService +{ +public: + /** + * @param pathToExe Path to cluster geo-resolution service executable + */ + ClusterGeoIpService(const char *pathToExe); + + ~ClusterGeoIpService(); + + /** + * Attempt to locate an IP + * + * This returns true if x, y, and z are set. Otherwise it returns false + * and a geo-locate job is ordered in the background. This usually takes + * 500-1500ms to complete, after which time results will be available. + * If false is returned the supplied coordinate variables are unchanged. + * + * @param ip IPv4 or IPv6 address + * @param x Reference to variable to receive X + * @param y Reference to variable to receive Y + * @param z Reference to variable to receive Z + * @return True if coordinates were set + */ + bool locate(const InetAddress &ip,int &x,int &y,int &z); + + void threadMain() + throw(); + +private: + const std::string _pathToExe; + int _sOutputFd; + int _sInputFd; + volatile long _sPid; + volatile bool _run; + Thread _thread; + Mutex _sOutputLock; + + struct _CE { uint64_t ts; int x,y,z; }; + std::map< InetAddress,_CE > _cache; + Mutex _cache_m; +}; + +} // namespace ZeroTier + +#endif // ZT_ENABLE_CLUSTER + +#endif diff --git a/service/ControlPlane.cpp b/service/ControlPlane.cpp index 6e731bdc..4978a91d 100644 --- a/service/ControlPlane.cpp +++ b/service/ControlPlane.cpp @@ -182,14 +182,12 @@ static std::string _jsonEnumerate(unsigned int depth,const ZT_PeerPhysicalPath * "%s\t\"address\": \"%s\",\n" "%s\t\"lastSend\": %llu,\n" "%s\t\"lastReceive\": %llu,\n" - "%s\t\"fixed\": %s,\n" "%s\t\"active\": %s,\n" "%s\t\"preferred\": %s\n" "%s}", prefix,_jsonEscape(reinterpret_cast<const InetAddress *>(&(pp[i].address))->toString()).c_str(), prefix,pp[i].lastSend, prefix,pp[i].lastReceive, - prefix,(pp[i].fixed == 0) ? "false" : "true", prefix,(pp[i].active == 0) ? "false" : "true", prefix,(pp[i].preferred == 0) ? "false" : "true", prefix); @@ -267,7 +265,7 @@ unsigned int ControlPlane::handleRequest( std::string &responseBody, std::string &responseContentType) { - char json[1024]; + char json[8194]; unsigned int scode = 404; std::vector<std::string> ps(Utils::split(path.c_str(),"/","","")); std::map<std::string,std::string> urlArgs; @@ -356,29 +354,65 @@ unsigned int ControlPlane::handleRequest( if (ps[0] == "status") { responseContentType = "application/json"; + ZT_NodeStatus status; _node->status(&status); + + std::string clusterJson; +#ifdef ZT_ENABLE_CLUSTER + { + ZT_ClusterStatus cs; + _node->clusterStatus(&cs); + + if (cs.clusterSize >= 1) { + char t[1024]; + Utils::snprintf(t,sizeof(t),"{\n\t\t\"myId\": %u,\n\t\t\"clusterSize\": %u,\n\t\t\"members\": [",cs.myId,cs.clusterSize); + clusterJson.append(t); + for(unsigned int i=0;i<cs.clusterSize;++i) { + Utils::snprintf(t,sizeof(t),"%s\t\t\t{\n\t\t\t\t\"id\": %u,\n\t\t\t\t\"msSinceLastHeartbeat\": %u,\n\t\t\t\t\"alive\": %s,\n\t\t\t\t\"x\": %d,\n\t\t\t\t\"y\": %d,\n\t\t\t\t\"z\": %d,\n\t\t\t\t\"load\": %llu,\n\t\t\t\t\"peers\": %llu\n\t\t\t}", + ((i == 0) ? "\n" : ",\n"), + cs.members[i].id, + cs.members[i].msSinceLastHeartbeat, + (cs.members[i].alive != 0) ? "true" : "false", + cs.members[i].x, + cs.members[i].y, + cs.members[i].z, + cs.members[i].load, + cs.members[i].peers); + clusterJson.append(t); + } + clusterJson.append(" ]\n\t\t}"); + } + } +#endif + Utils::snprintf(json,sizeof(json), "{\n" "\t\"address\": \"%.10llx\",\n" "\t\"publicIdentity\": \"%s\",\n" + "\t\"worldId\": %llu,\n" + "\t\"worldTimestamp\": %llu,\n" "\t\"online\": %s,\n" "\t\"tcpFallbackActive\": %s,\n" "\t\"versionMajor\": %d,\n" "\t\"versionMinor\": %d,\n" "\t\"versionRev\": %d,\n" "\t\"version\": \"%d.%d.%d\",\n" - "\t\"clock\": %llu\n" + "\t\"clock\": %llu,\n" + "\t\"cluster\": %s\n" "}\n", status.address, status.publicIdentity, + status.worldId, + status.worldTimestamp, (status.online) ? "true" : "false", (_svc->tcpFallbackActive()) ? "true" : "false", ZEROTIER_ONE_VERSION_MAJOR, ZEROTIER_ONE_VERSION_MINOR, ZEROTIER_ONE_VERSION_REVISION, ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION, - (unsigned long long)OSUtils::now()); + (unsigned long long)OSUtils::now(), + ((clusterJson.length() > 0) ? clusterJson.c_str() : "null")); responseBody = json; scode = 200; } else if (ps[0] == "config") { diff --git a/service/OneService.cpp b/service/OneService.cpp index d194b400..fd473429 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -58,6 +58,8 @@ #include "OneService.hpp" #include "ControlPlane.hpp" +#include "ClusterGeoIpService.hpp" +#include "ClusterDefinition.hpp" /** * Uncomment to enable UDP breakage switch @@ -378,9 +380,14 @@ static int SnodeVirtualNetworkConfigFunction(ZT_Node *node,void *uptr,uint64_t n static void SnodeEventCallback(ZT_Node *node,void *uptr,enum ZT_Event event,const void *metaData); static long SnodeDataStoreGetFunction(ZT_Node *node,void *uptr,const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize); static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,const void *data,unsigned long len,int secure); -static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len); +static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl); static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len); +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z); +#endif + static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); static int ShttpOnMessageBegin(http_parser *parser); @@ -430,31 +437,45 @@ struct TcpConnection Mutex writeBuf_m; }; +// Use a bigger buffer on AMD64 since these are likely to be bigger and +// servers. Otherwise use a smaller buffer. This makes no difference +// except under very high load. +#if (defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) || defined(__AMD64) || defined(__AMD64__)) +#define ZT_UDP_DESIRED_BUF_SIZE 1048576 +#else +#define ZT_UDP_DESIRED_BUF_SIZE 131072 +#endif + class OneServiceImpl : public OneService { public: - OneServiceImpl(const char *hp,unsigned int port,const char *overrideRootTopology) : - _homePath((hp) ? hp : "."), - _tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY), + OneServiceImpl(const char *hp,unsigned int port) : + _homePath((hp) ? hp : ".") + ,_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY) #ifdef ZT_ENABLE_NETWORK_CONTROLLER - _controller((SqliteNetworkController *)0), + ,_controller((SqliteNetworkController *)0) #endif - _phy(this,false,true), - _overrideRootTopology((overrideRootTopology) ? overrideRootTopology : ""), - _node((Node *)0), - _controlPlane((ControlPlane *)0), - _lastDirectReceiveFromGlobal(0), - _lastSendToGlobal(0), - _lastRestart(0), - _nextBackgroundTaskDeadline(0), - _tcpFallbackTunnel((TcpConnection *)0), - _termReason(ONE_STILL_RUNNING), - _port(0), + ,_phy(this,false,true) + ,_node((Node *)0) + ,_controlPlane((ControlPlane *)0) + ,_lastDirectReceiveFromGlobal(0) + ,_lastSendToGlobal(0) + ,_lastRestart(0) + ,_nextBackgroundTaskDeadline(0) + ,_tcpFallbackTunnel((TcpConnection *)0) + ,_termReason(ONE_STILL_RUNNING) + ,_port(0) #ifdef ZT_USE_MINIUPNPC - _v4UpnpUdpSocket((PhySocket *)0), - _upnpClient((UPNPClient *)0), + ,_v4UpnpUdpSocket((PhySocket *)0) + ,_upnpClient((UPNPClient *)0) #endif - _run(true) +#ifdef ZT_ENABLE_CLUSTER + ,_clusterMessageSocket((PhySocket *)0) + ,_clusterGeoIpService((ClusterGeoIpService *)0) + ,_clusterDefinition((ClusterDefinition *)0) + ,_clusterMemberId(0) +#endif + ,_run(true) { const int portTrials = (port == 0) ? 256 : 1; // if port is 0, pick random for(int k=0;k<portTrials;++k) { @@ -465,7 +486,7 @@ public: } _v4LocalAddress = InetAddress((uint32_t)0,port); - _v4UdpSocket = _phy.udpBind((const struct sockaddr *)&_v4LocalAddress,reinterpret_cast<void *>(&_v4LocalAddress),131072); + _v4UdpSocket = _phy.udpBind((const struct sockaddr *)&_v4LocalAddress,reinterpret_cast<void *>(&_v4LocalAddress),ZT_UDP_DESIRED_BUF_SIZE); if (_v4UdpSocket) { struct sockaddr_in in4; @@ -477,7 +498,7 @@ public: if (_v4TcpListenSocket) { _v6LocalAddress = InetAddress("\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",16,port); - _v6UdpSocket = _phy.udpBind((const struct sockaddr *)&_v6LocalAddress,reinterpret_cast<void *>(&_v6LocalAddress),131072); + _v6UdpSocket = _phy.udpBind((const struct sockaddr *)&_v6LocalAddress,reinterpret_cast<void *>(&_v6LocalAddress),ZT_UDP_DESIRED_BUF_SIZE); struct sockaddr_in6 in6; memset((void *)&in6,0,sizeof(in6)); @@ -511,7 +532,7 @@ public: for(int k=0;k<512;++k) { const unsigned int upnport = 40000 + (((port + 1) * (k + 1)) % 25500); _v4UpnpLocalAddress = InetAddress(0,upnport); - _v4UpnpUdpSocket = _phy.udpBind((const struct sockaddr *)&_v4UpnpLocalAddress,reinterpret_cast<void *>(&_v4UpnpLocalAddress),131072); + _v4UpnpUdpSocket = _phy.udpBind((const struct sockaddr *)&_v4UpnpLocalAddress,reinterpret_cast<void *>(&_v4UpnpLocalAddress),ZT_UDP_DESIRED_BUF_SIZE); if (_v4UpnpUdpSocket) { _upnpClient = new UPNPClient(upnport); break; @@ -526,6 +547,9 @@ public: _phy.close(_v6UdpSocket); _phy.close(_v4TcpListenSocket); _phy.close(_v6TcpListenSocket); +#ifdef ZT_ENABLE_CLUSTER + _phy.close(_clusterMessageSocket); +#endif #ifdef ZT_USE_MINIUPNPC _phy.close(_v4UpnpUdpSocket); delete _upnpClient; @@ -533,6 +557,10 @@ public: #ifdef ZT_ENABLE_NETWORK_CONTROLLER delete _controller; #endif +#ifdef ZT_ENABLE_CLUSTER + delete _clusterGeoIpService; + delete _clusterDefinition; +#endif } virtual ReasonForTermination run() @@ -565,14 +593,77 @@ public: SnodeWirePacketSendFunction, SnodeVirtualNetworkFrameFunction, SnodeVirtualNetworkConfigFunction, - SnodeEventCallback, - ((_overrideRootTopology.length() > 0) ? _overrideRootTopology.c_str() : (const char *)0)); + SnodeEventCallback); #ifdef ZT_ENABLE_NETWORK_CONTROLLER _controller = new SqliteNetworkController(_node,(_homePath + ZT_PATH_SEPARATOR_S + ZT_CONTROLLER_DB_PATH).c_str(),(_homePath + ZT_PATH_SEPARATOR_S + "circuitTestResults.d").c_str()); _node->setNetconfMaster((void *)_controller); #endif +#ifdef ZT_ENABLE_CLUSTER + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str())) { + _clusterDefinition = new ClusterDefinition(_node->address(),(_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str()); + if (_clusterDefinition->size() > 0) { + std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members()); + for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) { + PhySocket *cs = _phy.udpBind(reinterpret_cast<const struct sockaddr *>(&(m->clusterEndpoint))); + if (cs) { + if (_clusterMessageSocket) { + _phy.close(_clusterMessageSocket,false); + _phy.close(cs,false); + + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: able to bind more than one cluster message socket IP/port!"; + return _termReason; + } + _clusterMessageSocket = cs; + _clusterMemberId = m->id; + } + } + + if (!_clusterMessageSocket) { + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: unable to bind to any cluster message socket IP/port."; + return _termReason; + } + + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str())) + _clusterGeoIpService = new ClusterGeoIpService((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str()); + + const ClusterDefinition::MemberDefinition &me = (*_clusterDefinition)[_clusterMemberId]; + InetAddress endpoints[255]; + unsigned int numEndpoints = 0; + for(std::vector<InetAddress>::const_iterator i(me.zeroTierEndpoints.begin());i!=me.zeroTierEndpoints.end();++i) + endpoints[numEndpoints++] = *i; + + if (_node->clusterInit( + _clusterMemberId, + reinterpret_cast<const struct sockaddr_storage *>(endpoints), + numEndpoints, + me.x, + me.y, + me.z, + &SclusterSendFunction, + this, + (_clusterGeoIpService) ? &SclusterGeoIpFunction : 0, + this) == ZT_RESULT_OK) { + + std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members()); + for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) { + if (m->id != _clusterMemberId) + _node->clusterAddMember(m->id); + } + + } + } else { + delete _clusterDefinition; + _clusterDefinition = (ClusterDefinition *)0; + } + } +#endif + _controlPlane = new ControlPlane(this,_node,(_homePath + ZT_PATH_SEPARATOR_S + "ui").c_str()); _controlPlane->addAuthToken(authToken.c_str()); @@ -589,6 +680,10 @@ public: } } + // Start two background threads to handle expensive ops out of line + Thread::start(_node); + Thread::start(_node); + _nextBackgroundTaskDeadline = 0; uint64_t clockShouldBe = OSUtils::now(); _lastRestart = clockShouldBe; @@ -664,7 +759,7 @@ public: #ifdef ZT_USE_MINIUPNPC std::vector<InetAddress> upnpAddresses(_upnpClient->get()); for(std::vector<InetAddress>::const_iterator ext(upnpAddresses.begin());ext!=upnpAddresses.end();++ext) - _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*ext)),0,ZT_LOCAL_INTERFACE_ADDRESS_TRUST_NORMAL); + _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*ext))); #endif struct ifaddrs *ifatbl = (struct ifaddrs *)0; @@ -682,7 +777,7 @@ public: if (!isZT) { InetAddress ip(ifa->ifa_addr); ip.setPort(_port); - _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip),0,ZT_LOCAL_INTERFACE_ADDRESS_TRUST_NORMAL); + _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip)); } } ifa = ifa->ifa_next; @@ -716,7 +811,7 @@ public: while (ua) { InetAddress ip(ua->Address.lpSockaddr); ip.setPort(_port); - _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip),0,ZT_LOCAL_INTERFACE_ADDRESS_TRUST_NORMAL); + _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip)); ua = ua->Next; } } @@ -798,10 +893,19 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { +#ifdef ZT_ENABLE_CLUSTER + if (sock == _clusterMessageSocket) { + _lastDirectReceiveFromGlobal = OSUtils::now(); + _node->clusterHandleIncomingMessage(data,len); + return; + } +#endif + #ifdef ZT_BREAK_UDP if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) return; #endif + if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); ZT_ResultCode rc = _node->processWirePacket( @@ -955,7 +1059,7 @@ public: if (from) { ZT_ResultCode rc = _node->processWirePacket( OSUtils::now(), - 0, + &ZT_SOCKADDR_NULL, reinterpret_cast<struct sockaddr_storage *>(&from), data, plen, @@ -1164,16 +1268,23 @@ public: } } - inline int nodeWirePacketSendFunction(const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len) + inline int nodeWirePacketSendFunction(const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl) { #ifdef ZT_USE_MINIUPNPC if ((localAddr->ss_family == AF_INET)&&(reinterpret_cast<const struct sockaddr_in *>(localAddr)->sin_port == reinterpret_cast<const struct sockaddr_in *>(&_v4UpnpLocalAddress)->sin_port)) { #ifdef ZT_BREAK_UDP if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { #endif - if (addr->ss_family == AF_INET) - return ((_phy.udpSend(_v4UpnpUdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); - else return -1; + if (addr->ss_family == AF_INET) { + if (ttl) + _phy.setIp4UdpTtl(_v4UpnpUdpSocket,ttl); + const int result = ((_phy.udpSend(_v4UpnpUdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); + if (ttl) + _phy.setIp4UdpTtl(_v4UpnpUdpSocket,255); + return result; + } else { + return -1; + } #ifdef ZT_BREAK_UDP } #endif @@ -1186,8 +1297,13 @@ public: #ifdef ZT_BREAK_UDP if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { #endif - if (_v4UdpSocket) - result = ((_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); + if (_v4UdpSocket) { + if (ttl) + _phy.setIp4UdpTtl(_v4UdpSocket,ttl); + result = ((_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); + if (ttl) + _phy.setIp4UdpTtl(_v4UdpSocket,255); + } #ifdef ZT_BREAK_UDP } #endif @@ -1318,7 +1434,6 @@ public: _phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections } -private: std::string _dataStorePrepPath(const char *name) const { std::string p(_homePath); @@ -1342,7 +1457,6 @@ private: SqliteNetworkController *_controller; #endif Phy<OneServiceImpl *> _phy; - std::string _overrideRootTopology; Node *_node; InetAddress _v4LocalAddress,_v6LocalAddress; PhySocket *_v4UdpSocket; @@ -1374,6 +1488,13 @@ private: UPNPClient *_upnpClient; #endif +#ifdef ZT_ENABLE_CLUSTER + PhySocket *_clusterMessageSocket; + ClusterGeoIpService *_clusterGeoIpService; + ClusterDefinition *_clusterDefinition; + unsigned int _clusterMemberId; +#endif + bool _run; Mutex _run_m; }; @@ -1386,11 +1507,26 @@ static long SnodeDataStoreGetFunction(ZT_Node *node,void *uptr,const char *name, { return reinterpret_cast<OneServiceImpl *>(uptr)->nodeDataStoreGetFunction(name,buf,bufSize,readIndex,totalSize); } static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,const void *data,unsigned long len,int secure) { return reinterpret_cast<OneServiceImpl *>(uptr)->nodeDataStorePutFunction(name,data,len,secure); } -static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len) -{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeWirePacketSendFunction(localAddr,addr,data,len); } +static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl) +{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeWirePacketSendFunction(localAddr,addr,data,len,ttl); } static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); } +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len) +{ + OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); + const ClusterDefinition::MemberDefinition &md = (*(impl->_clusterDefinition))[toMemberId]; + if (md.clusterEndpoint) + impl->_phy.udpSend(impl->_clusterMessageSocket,reinterpret_cast<const struct sockaddr *>(&(md.clusterEndpoint)),data,len); +} +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z) +{ + OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); + return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z)); +} +#endif + static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); } @@ -1539,7 +1675,7 @@ std::string OneService::autoUpdateUrl() return std::string(); } -OneService *OneService::newInstance(const char *hp,unsigned int port,const char *overrideRootTopology) { return new OneServiceImpl(hp,port,overrideRootTopology); } +OneService *OneService::newInstance(const char *hp,unsigned int port) { return new OneServiceImpl(hp,port); } OneService::~OneService() {} } // namespace ZeroTier diff --git a/service/OneService.hpp b/service/OneService.hpp index 70d024bc..4f7b988b 100644 --- a/service/OneService.hpp +++ b/service/OneService.hpp @@ -43,6 +43,9 @@ namespace ZeroTier { * periodically checked and updates are automatically downloaded, verified * against a built-in list of update signing keys, and installed. This is * only supported for certain platforms. + * + * If built with ZT_ENABLE_CLUSTER, a 'cluster' file is checked and if + * present is read to determine the identity of other cluster members. */ class OneService { @@ -95,12 +98,10 @@ public: * * @param hp Home path * @param port TCP and UDP port for packets and HTTP control (if 0, pick random port) - * @param overrideRootTopology String-serialized root topology (for testing, default: NULL) */ static OneService *newInstance( const char *hp, - unsigned int port, - const char *overrideRootTopology = (const char *)0); + unsigned int port); virtual ~OneService(); |