summaryrefslogtreecommitdiff
path: root/service/OneService.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'service/OneService.cpp')
-rw-r--r--service/OneService.cpp441
1 files changed, 335 insertions, 106 deletions
diff --git a/service/OneService.cpp b/service/OneService.cpp
index 670d5641..8e21fc5f 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -58,6 +58,8 @@
#include "OneService.hpp"
#include "ControlPlane.hpp"
+#include "ClusterGeoIpService.hpp"
+#include "ClusterDefinition.hpp"
/**
* Uncomment to enable UDP breakage switch
@@ -118,20 +120,20 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
#define ZT_TAP_CHECK_MULTICAST_INTERVAL 30000
// Path under ZT1 home for controller database if controller is enabled
-#define ZT1_CONTROLLER_DB_PATH "controller.db"
+#define ZT_CONTROLLER_DB_PATH "controller.db"
// TCP fallback relay host -- geo-distributed using Amazon Route53 geo-aware DNS
-#define ZT1_TCP_FALLBACK_RELAY "tcp-fallback.zerotier.com"
-#define ZT1_TCP_FALLBACK_RELAY_PORT 443
+#define ZT_TCP_FALLBACK_RELAY "tcp-fallback.zerotier.com"
+#define ZT_TCP_FALLBACK_RELAY_PORT 443
// Frequency at which we re-resolve the TCP fallback relay
-#define ZT1_TCP_FALLBACK_RERESOLVE_DELAY 86400000
+#define ZT_TCP_FALLBACK_RERESOLVE_DELAY 86400000
// Attempt to engage TCP fallback after this many ms of no reply to packets sent to global-scope IPs
-#define ZT1_TCP_FALLBACK_AFTER 60000
+#define ZT_TCP_FALLBACK_AFTER 60000
// How often to check for local interface addresses
-#define ZT1_LOCAL_INTERFACE_CHECK_INTERVAL 300000
+#define ZT_LOCAL_INTERFACE_CHECK_INTERVAL 300000
namespace ZeroTier {
@@ -338,14 +340,38 @@ public:
static BackgroundSoftwareUpdateChecker backgroundSoftwareUpdateChecker;
#endif // ZT_AUTO_UPDATE
+static std::string _trimString(const std::string &s)
+{
+ unsigned long end = (unsigned long)s.length();
+ while (end) {
+ char c = s[end - 1];
+ if ((c == ' ')||(c == '\r')||(c == '\n')||(!c)||(c == '\t'))
+ --end;
+ else break;
+ }
+ unsigned long start = 0;
+ while (start < end) {
+ char c = s[start];
+ if ((c == ' ')||(c == '\r')||(c == '\n')||(!c)||(c == '\t'))
+ ++start;
+ else break;
+ }
+ return s.substr(start,end - start);
+}
+
class OneServiceImpl;
-static int SnodeVirtualNetworkConfigFunction(ZT1_Node *node,void *uptr,uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwconf);
-static void SnodeEventCallback(ZT1_Node *node,void *uptr,enum ZT1_Event event,const void *metaData);
-static long SnodeDataStoreGetFunction(ZT1_Node *node,void *uptr,const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize);
-static int SnodeDataStorePutFunction(ZT1_Node *node,void *uptr,const char *name,const void *data,unsigned long len,int secure);
-static int SnodeWirePacketSendFunction(ZT1_Node *node,void *uptr,const struct sockaddr_storage *addr,const void *data,unsigned int len);
-static void SnodeVirtualNetworkFrameFunction(ZT1_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
+static int SnodeVirtualNetworkConfigFunction(ZT_Node *node,void *uptr,uint64_t nwid,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwconf);
+static void SnodeEventCallback(ZT_Node *node,void *uptr,enum ZT_Event event,const void *metaData);
+static long SnodeDataStoreGetFunction(ZT_Node *node,void *uptr,const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize);
+static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,const void *data,unsigned long len,int secure);
+static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl);
+static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
+
+#ifdef ZT_ENABLE_CLUSTER
+static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len);
+static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z);
+#endif
static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
@@ -396,57 +422,108 @@ struct TcpConnection
Mutex writeBuf_m;
};
+// Use a bigger buffer on AMD64 since these are likely to be bigger and
+// servers. Otherwise use a smaller buffer. This makes no difference
+// except under very high load.
+#if (defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) || defined(__AMD64) || defined(__AMD64__))
+#define ZT_UDP_DESIRED_BUF_SIZE 1048576
+#else
+#define ZT_UDP_DESIRED_BUF_SIZE 131072
+#endif
+
class OneServiceImpl : public OneService
{
public:
- OneServiceImpl(const char *hp,unsigned int port,const char *overrideRootTopology) :
- _homePath((hp) ? hp : "."),
- _tcpFallbackResolver(ZT1_TCP_FALLBACK_RELAY),
+ OneServiceImpl(const char *hp,unsigned int port) :
+ _homePath((hp) ? hp : ".")
+ ,_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY)
#ifdef ZT_ENABLE_NETWORK_CONTROLLER
- _controller((_homePath + ZT_PATH_SEPARATOR_S + ZT1_CONTROLLER_DB_PATH).c_str()),
+ ,_controller((SqliteNetworkController *)0)
#endif
- _phy(this,false,true),
- _overrideRootTopology((overrideRootTopology) ? overrideRootTopology : ""),
- _node((Node *)0),
- _controlPlane((ControlPlane *)0),
- _lastDirectReceiveFromGlobal(0),
- _lastSendToGlobal(0),
- _lastRestart(0),
- _nextBackgroundTaskDeadline(0),
- _tcpFallbackTunnel((TcpConnection *)0),
- _termReason(ONE_STILL_RUNNING),
- _port(port),
+ ,_phy(this,false,true)
+ ,_node((Node *)0)
+ ,_controlPlane((ControlPlane *)0)
+ ,_lastDirectReceiveFromGlobal(0)
+ ,_lastSendToGlobal(0)
+ ,_lastRestart(0)
+ ,_nextBackgroundTaskDeadline(0)
+ ,_tcpFallbackTunnel((TcpConnection *)0)
+ ,_termReason(ONE_STILL_RUNNING)
+ ,_port(0)
#ifdef ZT_USE_MINIUPNPC
- _upnpClient((int)port),
+ ,_v4UpnpUdpSocket((PhySocket *)0)
+ ,_upnpClient((UPNPClient *)0)
+#endif
+#ifdef ZT_ENABLE_CLUSTER
+ ,_clusterMessageSocket((PhySocket *)0)
+ ,_clusterGeoIpService((ClusterGeoIpService *)0)
+ ,_clusterDefinition((ClusterDefinition *)0)
+ ,_clusterMemberId(0)
#endif
- _run(true)
+ ,_run(true)
{
- struct sockaddr_in in4;
- struct sockaddr_in6 in6;
-
- ::memset((void *)&in4,0,sizeof(in4));
- in4.sin_family = AF_INET;
- in4.sin_port = Utils::hton((uint16_t)port);
- _v4UdpSocket = _phy.udpBind((const struct sockaddr *)&in4,this,131072);
- if (!_v4UdpSocket)
- throw std::runtime_error("cannot bind to port (UDP/IPv4)");
- in4.sin_addr.s_addr = Utils::hton((uint32_t)0x7f000001); // right now we just listen for TCP @localhost
- _v4TcpListenSocket = _phy.tcpListen((const struct sockaddr *)&in4,this);
- if (!_v4TcpListenSocket) {
- _phy.close(_v4UdpSocket);
- throw std::runtime_error("cannot bind to port (TCP/IPv4)");
+ const int portTrials = (port == 0) ? 256 : 1; // if port is 0, pick random
+ for(int k=0;k<portTrials;++k) {
+ if (port == 0) {
+ unsigned int randp = 0;
+ Utils::getSecureRandom(&randp,sizeof(randp));
+ port = 40000 + (randp % 25500);
+ }
+
+ _v4LocalAddress = InetAddress((uint32_t)0,port);
+ _v4UdpSocket = _phy.udpBind((const struct sockaddr *)&_v4LocalAddress,reinterpret_cast<void *>(&_v4LocalAddress),ZT_UDP_DESIRED_BUF_SIZE);
+
+ if (_v4UdpSocket) {
+ struct sockaddr_in in4;
+ memset(&in4,0,sizeof(in4));
+ in4.sin_family = AF_INET;
+ in4.sin_addr.s_addr = Utils::hton((uint32_t)0x7f000001); // right now we just listen for TCP @localhost
+ in4.sin_port = Utils::hton((uint16_t)port);
+ _v4TcpListenSocket = _phy.tcpListen((const struct sockaddr *)&in4,this);
+
+ if (_v4TcpListenSocket) {
+ _v6LocalAddress = InetAddress("\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",16,port);
+ _v6UdpSocket = _phy.udpBind((const struct sockaddr *)&_v6LocalAddress,reinterpret_cast<void *>(&_v6LocalAddress),ZT_UDP_DESIRED_BUF_SIZE);
+
+ struct sockaddr_in6 in6;
+ memset((void *)&in6,0,sizeof(in6));
+ in6.sin6_family = AF_INET6;
+ in6.sin6_port = in4.sin_port;
+ in6.sin6_addr.s6_addr[15] = 1; // IPv6 localhost == ::1
+ _v6TcpListenSocket = _phy.tcpListen((const struct sockaddr *)&in6,this);
+
+ _port = port;
+ break; // success!
+ } else {
+ _phy.close(_v4UdpSocket,false);
+ }
+ }
+
+ port = 0;
}
- ::memset((void *)&in6,0,sizeof(in6));
- in6.sin6_family = AF_INET6;
- in6.sin6_port = in4.sin_port;
- _v6UdpSocket = _phy.udpBind((const struct sockaddr *)&in6,this,131072);
- in6.sin6_addr.s6_addr[15] = 1; // listen for TCP only at localhost
- _v6TcpListenSocket = _phy.tcpListen((const struct sockaddr *)&in6,this);
+ if (_port == 0)
+ throw std::runtime_error("cannot bind to port");
char portstr[64];
- Utils::snprintf(portstr,sizeof(portstr),"%u",port);
+ Utils::snprintf(portstr,sizeof(portstr),"%u",_port);
OSUtils::writeFile((_homePath + ZT_PATH_SEPARATOR_S + "zerotier-one.port").c_str(),std::string(portstr));
+
+#ifdef ZT_USE_MINIUPNPC
+ // Bind a secondary port for use with uPnP, since some NAT routers
+ // (cough Ubiquity Edge cough) barf up a lung if you do both conventional
+ // NAT-t and uPnP from behind the same port. I think this is a bug, but
+ // everyone else's router bugs are our problem. :P
+ for(int k=0;k<512;++k) {
+ const unsigned int upnport = 40000 + (((port + 1) * (k + 1)) % 25500);
+ _v4UpnpLocalAddress = InetAddress(0,upnport);
+ _v4UpnpUdpSocket = _phy.udpBind((const struct sockaddr *)&_v4UpnpLocalAddress,reinterpret_cast<void *>(&_v4UpnpLocalAddress),ZT_UDP_DESIRED_BUF_SIZE);
+ if (_v4UpnpUdpSocket) {
+ _upnpClient = new UPNPClient(upnport);
+ break;
+ }
+ }
+#endif
}
virtual ~OneServiceImpl()
@@ -455,6 +532,20 @@ public:
_phy.close(_v6UdpSocket);
_phy.close(_v4TcpListenSocket);
_phy.close(_v6TcpListenSocket);
+#ifdef ZT_ENABLE_CLUSTER
+ _phy.close(_clusterMessageSocket);
+#endif
+#ifdef ZT_USE_MINIUPNPC
+ _phy.close(_v4UpnpUdpSocket);
+ delete _upnpClient;
+#endif
+#ifdef ZT_ENABLE_NETWORK_CONTROLLER
+ delete _controller;
+#endif
+#ifdef ZT_ENABLE_CLUSTER
+ delete _clusterGeoIpService;
+ delete _clusterDefinition;
+#endif
}
virtual ReasonForTermination run()
@@ -477,7 +568,7 @@ public:
} else OSUtils::lockDownFile(authTokenPath.c_str(),false);
}
}
- authToken = Utils::trim(authToken);
+ authToken = _trimString(authToken);
_node = new Node(
OSUtils::now(),
@@ -487,18 +578,82 @@ public:
SnodeWirePacketSendFunction,
SnodeVirtualNetworkFrameFunction,
SnodeVirtualNetworkConfigFunction,
- SnodeEventCallback,
- ((_overrideRootTopology.length() > 0) ? _overrideRootTopology.c_str() : (const char *)0));
+ SnodeEventCallback);
#ifdef ZT_ENABLE_NETWORK_CONTROLLER
- _node->setNetconfMaster((void *)&_controller);
+ _controller = new SqliteNetworkController(_node,(_homePath + ZT_PATH_SEPARATOR_S + ZT_CONTROLLER_DB_PATH).c_str(),(_homePath + ZT_PATH_SEPARATOR_S + "circuitTestResults.d").c_str());
+ _node->setNetconfMaster((void *)_controller);
+#endif
+
+#ifdef ZT_ENABLE_CLUSTER
+ if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str())) {
+ _clusterDefinition = new ClusterDefinition(_node->address(),(_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str());
+ if (_clusterDefinition->size() > 0) {
+ std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members());
+ for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) {
+ PhySocket *cs = _phy.udpBind(reinterpret_cast<const struct sockaddr *>(&(m->clusterEndpoint)));
+ if (cs) {
+ if (_clusterMessageSocket) {
+ _phy.close(_clusterMessageSocket,false);
+ _phy.close(cs,false);
+
+ Mutex::Lock _l(_termReason_m);
+ _termReason = ONE_UNRECOVERABLE_ERROR;
+ _fatalErrorMessage = "Cluster: can't determine my cluster member ID: able to bind more than one cluster message socket IP/port!";
+ return _termReason;
+ }
+ _clusterMessageSocket = cs;
+ _clusterMemberId = m->id;
+ }
+ }
+
+ if (!_clusterMessageSocket) {
+ Mutex::Lock _l(_termReason_m);
+ _termReason = ONE_UNRECOVERABLE_ERROR;
+ _fatalErrorMessage = "Cluster: can't determine my cluster member ID: unable to bind to any cluster message socket IP/port.";
+ return _termReason;
+ }
+
+ if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str()))
+ _clusterGeoIpService = new ClusterGeoIpService((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str());
+
+ const ClusterDefinition::MemberDefinition &me = (*_clusterDefinition)[_clusterMemberId];
+ InetAddress endpoints[255];
+ unsigned int numEndpoints = 0;
+ for(std::vector<InetAddress>::const_iterator i(me.zeroTierEndpoints.begin());i!=me.zeroTierEndpoints.end();++i)
+ endpoints[numEndpoints++] = *i;
+
+ if (_node->clusterInit(
+ _clusterMemberId,
+ reinterpret_cast<const struct sockaddr_storage *>(endpoints),
+ numEndpoints,
+ me.x,
+ me.y,
+ me.z,
+ &SclusterSendFunction,
+ this,
+ (_clusterGeoIpService) ? &SclusterGeoIpFunction : 0,
+ this) == ZT_RESULT_OK) {
+
+ std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members());
+ for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) {
+ if (m->id != _clusterMemberId)
+ _node->clusterAddMember(m->id);
+ }
+
+ }
+ } else {
+ delete _clusterDefinition;
+ _clusterDefinition = (ClusterDefinition *)0;
+ }
+ }
#endif
_controlPlane = new ControlPlane(this,_node,(_homePath + ZT_PATH_SEPARATOR_S + "ui").c_str());
_controlPlane->addAuthToken(authToken.c_str());
#ifdef ZT_ENABLE_NETWORK_CONTROLLER
- _controlPlane->setController(&_controller);
+ _controlPlane->setController(_controller);
#endif
{ // Remember networks from previous session
@@ -510,12 +665,16 @@ public:
}
}
+ // Start two background threads to handle expensive ops out of line
+ Thread::start(_node);
+ Thread::start(_node);
+
_nextBackgroundTaskDeadline = 0;
uint64_t clockShouldBe = OSUtils::now();
_lastRestart = clockShouldBe;
uint64_t lastTapMulticastGroupCheck = 0;
uint64_t lastTcpFallbackResolve = 0;
- uint64_t lastLocalInterfaceAddressCheck = (OSUtils::now() - ZT1_LOCAL_INTERFACE_CHECK_INTERVAL) + 15000; // do this in 15s to give UPnP time to configure and other things time to settle
+ uint64_t lastLocalInterfaceAddressCheck = (OSUtils::now() - ZT_LOCAL_INTERFACE_CHECK_INTERVAL) + 15000; // do this in 15s to give UPnP time to configure and other things time to settle
#ifdef ZT_AUTO_UPDATE
uint64_t lastSoftwareUpdateCheck = 0;
#endif // ZT_AUTO_UPDATE
@@ -543,17 +702,17 @@ public:
#ifdef ZT_AUTO_UPDATE
if ((now - lastSoftwareUpdateCheck) >= ZT_AUTO_UPDATE_CHECK_PERIOD) {
- lastSoftwareUpdateCheck = OSUtils::now();
+ lastSoftwareUpdateCheck = now;
Thread::start(&backgroundSoftwareUpdateChecker);
}
#endif // ZT_AUTO_UPDATE
- if ((now - lastTcpFallbackResolve) >= ZT1_TCP_FALLBACK_RERESOLVE_DELAY) {
+ if ((now - lastTcpFallbackResolve) >= ZT_TCP_FALLBACK_RERESOLVE_DELAY) {
lastTcpFallbackResolve = now;
_tcpFallbackResolver.resolveNow();
}
- if ((_tcpFallbackTunnel)&&((now - _lastDirectReceiveFromGlobal) < (ZT1_TCP_FALLBACK_AFTER / 2)))
+ if ((_tcpFallbackTunnel)&&((now - _lastDirectReceiveFromGlobal) < (ZT_TCP_FALLBACK_AFTER / 2)))
_phy.close(_tcpFallbackTunnel->sock);
if ((now - lastTapMulticastGroupCheck) >= ZT_TAP_CHECK_MULTICAST_INTERVAL) {
@@ -569,7 +728,7 @@ public:
}
}
- if ((now - lastLocalInterfaceAddressCheck) >= ZT1_LOCAL_INTERFACE_CHECK_INTERVAL) {
+ if ((now - lastLocalInterfaceAddressCheck) >= ZT_LOCAL_INTERFACE_CHECK_INTERVAL) {
lastLocalInterfaceAddressCheck = now;
#ifdef __UNIX_LIKE__
@@ -583,9 +742,9 @@ public:
_node->clearLocalInterfaceAddresses();
#ifdef ZT_USE_MINIUPNPC
- std::vector<InetAddress> upnpAddresses(_upnpClient.get());
+ std::vector<InetAddress> upnpAddresses(_upnpClient->get());
for(std::vector<InetAddress>::const_iterator ext(upnpAddresses.begin());ext!=upnpAddresses.end();++ext)
- _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*ext)),0,ZT1_LOCAL_INTERFACE_ADDRESS_TRUST_NORMAL);
+ _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*ext)));
#endif
struct ifaddrs *ifatbl = (struct ifaddrs *)0;
@@ -603,7 +762,7 @@ public:
if (!isZT) {
InetAddress ip(ifa->ifa_addr);
ip.setPort(_port);
- _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip),0,ZT1_LOCAL_INTERFACE_ADDRESS_TRUST_NORMAL);
+ _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip));
}
}
ifa = ifa->ifa_next;
@@ -637,7 +796,7 @@ public:
while (ua) {
InetAddress ip(ua->Address.lpSockaddr);
ip.setPort(_port);
- _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip),0,ZT1_LOCAL_INTERFACE_ADDRESS_TRUST_NORMAL);
+ _node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&ip));
ua = ua->Next;
}
}
@@ -719,19 +878,29 @@ public:
inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len)
{
+#ifdef ZT_ENABLE_CLUSTER
+ if (sock == _clusterMessageSocket) {
+ _lastDirectReceiveFromGlobal = OSUtils::now();
+ _node->clusterHandleIncomingMessage(data,len);
+ return;
+ }
+#endif
+
#ifdef ZT_BREAK_UDP
if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP"))
return;
#endif
+
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
_lastDirectReceiveFromGlobal = OSUtils::now();
- ZT1_ResultCode rc = _node->processWirePacket(
+ ZT_ResultCode rc = _node->processWirePacket(
OSUtils::now(),
+ reinterpret_cast<const struct sockaddr_storage *>(*uptr),
(const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big
data,
len,
&_nextBackgroundTaskDeadline);
- if (ZT1_ResultCode_isFatal(rc)) {
+ if (ZT_ResultCode_isFatal(rc)) {
char tmp[256];
Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
Mutex::Lock _l(_termReason_m);
@@ -772,7 +941,7 @@ public:
tc->writeBuf.push_back((char)ZEROTIER_ONE_VERSION_MINOR);
tc->writeBuf.push_back((char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff));
tc->writeBuf.push_back((char)(ZEROTIER_ONE_VERSION_REVISION & 0xff));
- _phy.tcpSetNotifyWritable(sock,true);
+ _phy.setNotifyWritable(sock,true);
_tcpFallbackTunnel = tc;
}
@@ -873,13 +1042,14 @@ public:
}
if (from) {
- ZT1_ResultCode rc = _node->processWirePacket(
+ ZT_ResultCode rc = _node->processWirePacket(
OSUtils::now(),
+ &ZT_SOCKADDR_NULL,
reinterpret_cast<struct sockaddr_storage *>(&from),
data,
plen,
&_nextBackgroundTaskDeadline);
- if (ZT1_ResultCode_isFatal(rc)) {
+ if (ZT_ResultCode_isFatal(rc)) {
char tmp[256];
Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
Mutex::Lock _l(_termReason_m);
@@ -907,12 +1077,12 @@ public:
TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr);
Mutex::Lock _l(tc->writeBuf_m);
if (tc->writeBuf.length() > 0) {
- long sent = (long)_phy.tcpSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true);
+ long sent = (long)_phy.streamSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true);
if (sent > 0) {
tc->lastActivity = OSUtils::now();
if ((unsigned long)sent >= (unsigned long)tc->writeBuf.length()) {
tc->writeBuf = "";
- _phy.tcpSetNotifyWritable(sock,false);
+ _phy.setNotifyWritable(sock,false);
if (!tc->shouldKeepAlive)
_phy.close(sock); // will call close handler to delete from _tcpConnections
} else {
@@ -920,16 +1090,22 @@ public:
}
}
} else {
- _phy.tcpSetNotifyWritable(sock,false);
+ _phy.setNotifyWritable(sock,false);
}
}
- inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwc)
+ inline void phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) {}
+ inline void phyOnUnixClose(PhySocket *sock,void **uptr) {}
+ inline void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) {}
+ inline void phyOnUnixWritable(PhySocket *sock,void **uptr) {}
+ inline void phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) {}
+
+ inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwc)
{
Mutex::Lock _l(_taps_m);
std::map< uint64_t,EthernetTap * >::iterator t(_taps.find(nwid));
switch(op) {
- case ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_UP:
+ case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_UP:
if (t == _taps.end()) {
try {
char friendlyName[1024];
@@ -959,7 +1135,7 @@ public:
}
}
// fall through...
- case ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_CONFIG_UPDATE:
+ case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_CONFIG_UPDATE:
if (t != _taps.end()) {
t->second->setEnabled(nwc->enabled != 0);
@@ -971,19 +1147,21 @@ public:
newAssignedIps.erase(std::unique(newAssignedIps.begin(),newAssignedIps.end()),newAssignedIps.end());
for(std::vector<InetAddress>::iterator ip(newAssignedIps.begin());ip!=newAssignedIps.end();++ip) {
if (!std::binary_search(assignedIps.begin(),assignedIps.end(),*ip))
- t->second->addIp(*ip);
+ if (!t->second->addIp(*ip))
+ fprintf(stderr,"ERROR: unable to add ip address %s"ZT_EOL_S, ip->toString().c_str());
}
for(std::vector<InetAddress>::iterator ip(assignedIps.begin());ip!=assignedIps.end();++ip) {
if (!std::binary_search(newAssignedIps.begin(),newAssignedIps.end(),*ip))
- t->second->removeIp(*ip);
+ if (!t->second->removeIp(*ip))
+ fprintf(stderr,"ERROR: unable to remove ip address %s"ZT_EOL_S, ip->toString().c_str());
}
assignedIps.swap(newAssignedIps);
} else {
return -999; // tap init failed
}
break;
- case ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_DOWN:
- case ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY:
+ case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DOWN:
+ case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY:
if (t != _taps.end()) {
#ifdef __WINDOWS__
std::string winInstanceId(t->second->instanceId());
@@ -992,7 +1170,7 @@ public:
_taps.erase(t);
_tapAssignedIps.erase(nwid);
#ifdef __WINDOWS__
- if ((op == ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY)&&(winInstanceId.length() > 0))
+ if ((op == ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY)&&(winInstanceId.length() > 0))
WindowsEthernetTap::deletePersistentTapDevice(winInstanceId.c_str());
#endif
}
@@ -1001,17 +1179,17 @@ public:
return 0;
}
- inline void nodeEventCallback(enum ZT1_Event event,const void *metaData)
+ inline void nodeEventCallback(enum ZT_Event event,const void *metaData)
{
switch(event) {
- case ZT1_EVENT_FATAL_ERROR_IDENTITY_COLLISION: {
+ case ZT_EVENT_FATAL_ERROR_IDENTITY_COLLISION: {
Mutex::Lock _l(_termReason_m);
_termReason = ONE_IDENTITY_COLLISION;
_fatalErrorMessage = "identity/address collision";
this->terminate();
} break;
- case ZT1_EVENT_TRACE: {
+ case ZT_EVENT_TRACE: {
if (metaData) {
::fprintf(stderr,"%s"ZT_EOL_S,(const char *)metaData);
::fflush(stderr);
@@ -1077,33 +1255,59 @@ public:
}
}
- inline int nodeWirePacketSendFunction(const struct sockaddr_storage *addr,const void *data,unsigned int len)
+ inline int nodeWirePacketSendFunction(const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl)
{
+#ifdef ZT_USE_MINIUPNPC
+ if ((localAddr->ss_family == AF_INET)&&(reinterpret_cast<const struct sockaddr_in *>(localAddr)->sin_port == reinterpret_cast<const struct sockaddr_in *>(&_v4UpnpLocalAddress)->sin_port)) {
+#ifdef ZT_BREAK_UDP
+ if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) {
+#endif
+ if (addr->ss_family == AF_INET) {
+ if (ttl)
+ _phy.setIp4UdpTtl(_v4UpnpUdpSocket,ttl);
+ const int result = ((_phy.udpSend(_v4UpnpUdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1);
+ if (ttl)
+ _phy.setIp4UdpTtl(_v4UpnpUdpSocket,255);
+ return result;
+ } else {
+ return -1;
+ }
+#ifdef ZT_BREAK_UDP
+ }
+#endif
+ }
+#endif // ZT_USE_MINIUPNPC
+
int result = -1;
switch(addr->ss_family) {
case AF_INET:
#ifdef ZT_BREAK_UDP
if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) {
#endif
- if (_v4UdpSocket)
- result = ((_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1);
+ if (_v4UdpSocket) {
+ if (ttl)
+ _phy.setIp4UdpTtl(_v4UdpSocket,ttl);
+ result = ((_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1);
+ if (ttl)
+ _phy.setIp4UdpTtl(_v4UdpSocket,255);
+ }
#ifdef ZT_BREAK_UDP
}
#endif
-#ifdef ZT1_TCP_FALLBACK_RELAY
+#ifdef ZT_TCP_FALLBACK_RELAY
// TCP fallback tunnel support
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
uint64_t now = OSUtils::now();
// Engage TCP tunnel fallback if we haven't received anything valid from a global
- // IP address in ZT1_TCP_FALLBACK_AFTER milliseconds. If we do start getting
+ // IP address in ZT_TCP_FALLBACK_AFTER milliseconds. If we do start getting
// valid direct traffic we'll stop using it and close the socket after a while.
- if (((now - _lastDirectReceiveFromGlobal) > ZT1_TCP_FALLBACK_AFTER)&&((now - _lastRestart) > ZT1_TCP_FALLBACK_AFTER)) {
+ if (((now - _lastDirectReceiveFromGlobal) > ZT_TCP_FALLBACK_AFTER)&&((now - _lastRestart) > ZT_TCP_FALLBACK_AFTER)) {
if (_tcpFallbackTunnel) {
Mutex::Lock _l(_tcpFallbackTunnel->writeBuf_m);
if (!_tcpFallbackTunnel->writeBuf.length())
- _phy.tcpSetNotifyWritable(_tcpFallbackTunnel->sock,true);
+ _phy.setNotifyWritable(_tcpFallbackTunnel->sock,true);
unsigned long mlen = len + 7;
_tcpFallbackTunnel->writeBuf.push_back((char)0x17);
_tcpFallbackTunnel->writeBuf.push_back((char)0x03);
@@ -1115,7 +1319,7 @@ public:
_tcpFallbackTunnel->writeBuf.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_port))),2);
_tcpFallbackTunnel->writeBuf.append((const char *)data,len);
result = 0;
- } else if (((now - _lastSendToGlobal) < ZT1_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > (ZT_PING_CHECK_INVERVAL / 2))) {
+ } else if (((now - _lastSendToGlobal) < ZT_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > (ZT_PING_CHECK_INVERVAL / 2))) {
std::vector<InetAddress> tunnelIps(_tcpFallbackResolver.get());
if (tunnelIps.empty()) {
if (!_tcpFallbackResolver.running())
@@ -1123,7 +1327,7 @@ public:
} else {
bool connected = false;
InetAddress addr(tunnelIps[(unsigned long)now % tunnelIps.size()]);
- addr.setPort(ZT1_TCP_FALLBACK_RELAY_PORT);
+ addr.setPort(ZT_TCP_FALLBACK_RELAY_PORT);
_phy.tcpConnect(reinterpret_cast<const struct sockaddr *>(&addr),connected);
}
}
@@ -1131,9 +1335,10 @@ public:
_lastSendToGlobal = now;
}
-#endif // ZT1_TCP_FALLBACK_RELAY
+#endif // ZT_TCP_FALLBACK_RELAY
break;
+
case AF_INET6:
#ifdef ZT_BREAK_UDP
if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) {
@@ -1144,6 +1349,7 @@ public:
}
#endif
break;
+
default:
return -1;
}
@@ -1206,7 +1412,7 @@ public:
tc->writeBuf.append(data);
}
- _phy.tcpSetNotifyWritable(tc->sock,true);
+ _phy.setNotifyWritable(tc->sock,true);
}
inline void onHttpResponseFromClient(TcpConnection *tc)
@@ -1215,7 +1421,6 @@ public:
_phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections
}
-private:
std::string _dataStorePrepPath(const char *name) const
{
std::string p(_homePath);
@@ -1236,11 +1441,11 @@ private:
const std::string _homePath;
BackgroundResolver _tcpFallbackResolver;
#ifdef ZT_ENABLE_NETWORK_CONTROLLER
- SqliteNetworkController _controller;
+ SqliteNetworkController *_controller;
#endif
Phy<OneServiceImpl *> _phy;
- std::string _overrideRootTopology;
Node *_node;
+ InetAddress _v4LocalAddress,_v6LocalAddress;
PhySocket *_v4UdpSocket;
PhySocket *_v6UdpSocket;
PhySocket *_v4TcpListenSocket;
@@ -1265,26 +1470,50 @@ private:
unsigned int _port;
#ifdef ZT_USE_MINIUPNPC
- UPNPClient _upnpClient;
+ InetAddress _v4UpnpLocalAddress;
+ PhySocket *_v4UpnpUdpSocket;
+ UPNPClient *_upnpClient;
+#endif
+
+#ifdef ZT_ENABLE_CLUSTER
+ PhySocket *_clusterMessageSocket;
+ ClusterGeoIpService *_clusterGeoIpService;
+ ClusterDefinition *_clusterDefinition;
+ unsigned int _clusterMemberId;
#endif
bool _run;
Mutex _run_m;
};
-static int SnodeVirtualNetworkConfigFunction(ZT1_Node *node,void *uptr,uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwconf)
+static int SnodeVirtualNetworkConfigFunction(ZT_Node *node,void *uptr,uint64_t nwid,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwconf)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkConfigFunction(nwid,op,nwconf); }
-static void SnodeEventCallback(ZT1_Node *node,void *uptr,enum ZT1_Event event,const void *metaData)
+static void SnodeEventCallback(ZT_Node *node,void *uptr,enum ZT_Event event,const void *metaData)
{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeEventCallback(event,metaData); }
-static long SnodeDataStoreGetFunction(ZT1_Node *node,void *uptr,const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize)
+static long SnodeDataStoreGetFunction(ZT_Node *node,void *uptr,const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeDataStoreGetFunction(name,buf,bufSize,readIndex,totalSize); }
-static int SnodeDataStorePutFunction(ZT1_Node *node,void *uptr,const char *name,const void *data,unsigned long len,int secure)
+static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,const void *data,unsigned long len,int secure)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeDataStorePutFunction(name,data,len,secure); }
-static int SnodeWirePacketSendFunction(ZT1_Node *node,void *uptr,const struct sockaddr_storage *addr,const void *data,unsigned int len)
-{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeWirePacketSendFunction(addr,data,len); }
-static void SnodeVirtualNetworkFrameFunction(ZT1_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
+static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl)
+{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeWirePacketSendFunction(localAddr,addr,data,len,ttl); }
+static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); }
+#ifdef ZT_ENABLE_CLUSTER
+static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len)
+{
+ OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
+ const ClusterDefinition::MemberDefinition &md = (*(impl->_clusterDefinition))[toMemberId];
+ if (md.clusterEndpoint)
+ impl->_phy.udpSend(impl->_clusterMessageSocket,reinterpret_cast<const struct sockaddr *>(&(md.clusterEndpoint)),data,len);
+}
+static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z)
+{
+ OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
+ return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z));
+}
+#endif
+
static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); }
@@ -1433,7 +1662,7 @@ std::string OneService::autoUpdateUrl()
return std::string();
}
-OneService *OneService::newInstance(const char *hp,unsigned int port,const char *overrideRootTopology) { return new OneServiceImpl(hp,port,overrideRootTopology); }
+OneService *OneService::newInstance(const char *hp,unsigned int port) { return new OneServiceImpl(hp,port); }
OneService::~OneService() {}
} // namespace ZeroTier