summaryrefslogtreecommitdiff
path: root/service/OneService.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'service/OneService.cpp')
-rw-r--r--service/OneService.cpp1300
1 files changed, 819 insertions, 481 deletions
diff --git a/service/OneService.cpp b/service/OneService.cpp
index b96f3aed..0fadc191 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -31,7 +31,6 @@
#include <string>
#include <map>
-#include <set>
#include <vector>
#include <algorithm>
#include <list>
@@ -47,6 +46,9 @@
#include "../node/MAC.hpp"
#include "../node/Identity.hpp"
#include "../node/World.hpp"
+#include "../node/Salsa20.hpp"
+#include "../node/Poly1305.hpp"
+#include "../node/SHA512.hpp"
#include "../osdep/Phy.hpp"
#include "../osdep/Thread.hpp"
@@ -85,16 +87,6 @@
using json = nlohmann::json;
-/**
- * Uncomment to enable UDP breakage switch
- *
- * If this is defined, the presence of a file called /tmp/ZT_BREAK_UDP
- * will cause direct UDP TX/RX to stop working. This can be used to
- * test TCP tunneling fallback and other robustness features. Deleting
- * this file will cause it to start working again.
- */
-//#define ZT_BREAK_UDP
-
#include "../controller/EmbeddedNetworkController.hpp"
#ifdef ZT_USE_TEST_TAP
@@ -105,11 +97,13 @@ namespace ZeroTier { typedef TestEthernetTap EthernetTap; }
#else
#ifdef ZT_SDK
- #include "../controller/EmbeddedNetworkController.hpp"
- #include "../node/Node.hpp"
- // Use the virtual netcon endpoint instead of a tun/tap port driver
- #include "../src/SocketTap.hpp"
- namespace ZeroTier { typedef SocketTap EthernetTap; }
+
+#include "../controller/EmbeddedNetworkController.hpp"
+#include "../node/Node.hpp"
+// Use the virtual netcon endpoint instead of a tun/tap port driver
+#include "../src/SocketTap.hpp"
+namespace ZeroTier { typedef SocketTap EthernetTap; }
+
#else
#ifdef __APPLE__
@@ -148,9 +142,6 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
// How often to check for new multicast subscriptions on a tap device
#define ZT_TAP_CHECK_MULTICAST_INTERVAL 5000
-// Path under ZT1 home for controller database if controller is enabled
-#define ZT_CONTROLLER_DB_PATH "controller.d"
-
// TCP fallback relay (run by ZeroTier, Inc. -- this will eventually go away)
#define ZT_TCP_FALLBACK_RELAY "204.80.128.1/443"
@@ -166,10 +157,22 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
// Clean files from iddb.d that are older than this (60 days)
#define ZT_IDDB_CLEANUP_AGE 5184000000ULL
+// Maximum write buffer size for outgoing TCP connections (sanity limit)
+#define ZT_TCP_MAX_WRITEQ_SIZE 33554432
+
+// How often to check TCP connections and cluster links and send status to cluster peers
+#define ZT_TCP_CHECK_PERIOD 15000
+
+// TCP activity timeout
+#define ZT_TCP_ACTIVITY_TIMEOUT 60000
+
namespace ZeroTier {
namespace {
+// Fake TLS hello for TCP tunnel outgoing connections (TUNNELED mode)
+static const char ZT_TCP_TUNNEL_HELLO[9] = { 0x17,0x03,0x03,0x00,0x04,(char)ZEROTIER_ONE_VERSION_MAJOR,(char)ZEROTIER_ONE_VERSION_MINOR,(char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff),(char)(ZEROTIER_ONE_VERSION_REVISION & 0xff) };
+
static std::string _trimString(const std::string &s)
{
unsigned long end = (unsigned long)s.length();
@@ -309,18 +312,12 @@ class OneServiceImpl;
static int SnodeVirtualNetworkConfigFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t nwid,void **nuptr,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwconf);
static void SnodeEventCallback(ZT_Node *node,void *uptr,void *tptr,enum ZT_Event event,const void *metaData);
-static long SnodeDataStoreGetFunction(ZT_Node *node,void *uptr,void *tptr,const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize);
-static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,void *tptr,const char *name,const void *data,unsigned long len,int secure);
+static void SnodeStatePutFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,uint64_t id,const void *data,int len);
+static int SnodeStateGetFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,uint64_t id,void *data,unsigned int maxlen);
static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,void *tptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl);
static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
static int SnodePathCheckFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t ztaddr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *remoteAddr);
static int SnodePathLookupFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t ztaddr,int family,struct sockaddr_storage *result);
-
-#ifdef ZT_ENABLE_CLUSTER
-static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len);
-static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z);
-#endif
-
static void StapFrameHandler(void *uptr,void *tptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
static int ShttpOnMessageBegin(http_parser *parser);
@@ -359,36 +356,55 @@ static const struct http_parser_settings HTTP_PARSER_SETTINGS = {
};
#endif
+/**
+ * A TCP connection and related state and buffers
+ */
struct TcpConnection
{
enum {
+ TCP_UNCATEGORIZED_INCOMING, // uncategorized incoming connection
TCP_HTTP_INCOMING,
- TCP_HTTP_OUTGOING, // not currently used
- TCP_TUNNEL_OUTGOING // fale-SSL outgoing tunnel -- HTTP-related fields are not used
+ TCP_HTTP_OUTGOING,
+ TCP_TUNNEL_OUTGOING, // TUNNELED mode proxy outbound connection
+ TCP_CLUSTER_BACKPLANE
} type;
- bool shouldKeepAlive;
OneServiceImpl *parent;
PhySocket *sock;
- InetAddress from;
+ InetAddress remoteAddr;
+ unsigned long lastReceive;
+
+ // Used for inbound HTTP connections
http_parser parser;
unsigned long messageSize;
- uint64_t lastActivity;
-
std::string currentHeaderField;
std::string currentHeaderValue;
-
std::string url;
std::string status;
std::map< std::string,std::string > headers;
- std::string body;
- std::string writeBuf;
- Mutex writeBuf_m;
+ // Used for cluster backplane connections
+ uint64_t clusterMemberId;
+ unsigned int clusterMemberVersionMajor;
+ unsigned int clusterMemberVersionMinor;
+ unsigned int clusterMemberVersionRev;
+ std::vector< InetAddress > clusterMemberLocalAddresses;
+ Mutex clusterMemberLocalAddresses_m;
+
+ std::string readq;
+ std::string writeq;
+ Mutex writeq_m;
};
-// Used to pseudo-randomize local source port picking
-static volatile unsigned int _udpPortPickerCounter = 0;
+/**
+ * Message types for cluster backplane communication
+ */
+enum ClusterMessageType
+{
+ CLUSTER_MESSAGE_STATUS = 0,
+ CLUSTER_MESSAGE_STATE_OBJECT = 1,
+ CLUSTER_MESSAGE_PROXY_SEND = 2
+};
class OneServiceImpl : public OneService
{
@@ -398,14 +414,21 @@ public:
const std::string _homePath;
std::string _authToken;
std::string _controllerDbPath;
+ const std::string _iddbPath;
+ const std::string _networksPath;
+ const std::string _moonsPath;
+
EmbeddedNetworkController *_controller;
Phy<OneServiceImpl *> _phy;
Node *_node;
SoftwareUpdater *_updater;
bool _updateAutoApply;
unsigned int _primaryPort;
+ volatile unsigned int _udpPortPickerCounter;
+ uint64_t _clusterMemberId;
+ uint8_t _clusterKey[32]; // secret key for cluster backplane config
- // Local configuration and memo-ized static path definitions
+ // Local configuration and memo-ized information from it
json _localConfig;
Hashtable< uint64_t,std::vector<InetAddress> > _v4Hints;
Hashtable< uint64_t,std::vector<InetAddress> > _v6Hints;
@@ -415,6 +438,7 @@ public:
std::vector< InetAddress > _globalV6Blacklist;
std::vector< InetAddress > _allowManagementFrom;
std::vector< std::string > _interfacePrefixBlacklist;
+ std::vector< InetAddress > _clusterBackplaneAddresses;
Mutex _localConfig_m;
/*
@@ -428,13 +452,8 @@ public:
* destructively with uPnP port mapping behavior in very weird buggy ways.
* It's only used if uPnP/NAT-PMP is enabled in this build.
*/
- Binder _bindings[3];
unsigned int _ports[3];
- uint16_t _portsBE[3]; // ports in big-endian network byte order as in sockaddr
-
- // Sockets for JSON API -- bound only to V4 and V6 localhost
- PhySocket *_v4TcpControlSocket;
- PhySocket *_v6TcpControlSocket;
+ Binder _binder;
// Time we last received a packet from a global address
uint64_t _lastDirectReceiveFromGlobal;
@@ -470,7 +489,8 @@ public:
Mutex _nets_m;
// Active TCP/IP connections
- std::set< TcpConnection * > _tcpConnections; // no mutex for this since it's done in the main loop thread only
+ std::vector< TcpConnection * > _tcpConnections;
+ Mutex _tcpConnections_m;
TcpConnection *_tcpFallbackTunnel;
// Termination status information
@@ -484,13 +504,6 @@ public:
PortMapper *_portMapper;
#endif
- // Cluster management instance if enabled
-#ifdef ZT_ENABLE_CLUSTER
- PhySocket *_clusterMessageSocket;
- ClusterDefinition *_clusterDefinition;
- unsigned int _clusterMemberId;
-#endif
-
// Set to false to force service to stop
volatile bool _run;
Mutex _run_m;
@@ -499,15 +512,18 @@ public:
OneServiceImpl(const char *hp,unsigned int port) :
_homePath((hp) ? hp : ".")
- ,_controllerDbPath(_homePath + ZT_PATH_SEPARATOR_S ZT_CONTROLLER_DB_PATH)
+ ,_controllerDbPath(_homePath + ZT_PATH_SEPARATOR_S "controller.d")
+ ,_iddbPath(_homePath + ZT_PATH_SEPARATOR_S "iddb.d")
+ ,_networksPath(_homePath + ZT_PATH_SEPARATOR_S "networks.d")
+ ,_moonsPath(_homePath + ZT_PATH_SEPARATOR_S "moons.d")
,_controller((EmbeddedNetworkController *)0)
,_phy(this,false,true)
,_node((Node *)0)
,_updater((SoftwareUpdater *)0)
,_updateAutoApply(false)
,_primaryPort(port)
- ,_v4TcpControlSocket((PhySocket *)0)
- ,_v6TcpControlSocket((PhySocket *)0)
+ ,_udpPortPickerCounter(0)
+ ,_clusterMemberId(0)
,_lastDirectReceiveFromGlobal(0)
#ifdef ZT_TCP_FALLBACK_RELAY
,_lastSendToGlobalV4(0)
@@ -520,11 +536,6 @@ public:
#ifdef ZT_USE_MINIUPNPC
,_portMapper((PortMapper *)0)
#endif
-#ifdef ZT_ENABLE_CLUSTER
- ,_clusterMessageSocket((PhySocket *)0)
- ,_clusterDefinition((ClusterDefinition *)0)
- ,_clusterMemberId(0)
-#endif
,_run(true)
{
_ports[0] = 0;
@@ -534,23 +545,11 @@ public:
virtual ~OneServiceImpl()
{
- for(int i=0;i<3;++i)
- _bindings[i].closeAll(_phy);
-
- _phy.close(_v4TcpControlSocket);
- _phy.close(_v6TcpControlSocket);
-
-#ifdef ZT_ENABLE_CLUSTER
- _phy.close(_clusterMessageSocket);
-#endif
-
+ _binder.closeAll(_phy);
#ifdef ZT_USE_MINIUPNPC
delete _portMapper;
#endif
delete _controller;
-#ifdef ZT_ENABLE_CLUSTER
- delete _clusterDefinition;
-#endif
}
virtual ReasonForTermination run()
@@ -576,15 +575,11 @@ public:
_authToken = _trimString(_authToken);
}
- // Clean up any legacy files if present
- OSUtils::rm((_homePath + ZT_PATH_SEPARATOR_S "peers.save").c_str());
- OSUtils::rm((_homePath + ZT_PATH_SEPARATOR_S "world").c_str());
-
{
struct ZT_Node_Callbacks cb;
cb.version = 0;
- cb.dataStoreGetFunction = SnodeDataStoreGetFunction;
- cb.dataStorePutFunction = SnodeDataStorePutFunction;
+ cb.stateGetFunction = SnodeStateGetFunction;
+ cb.statePutFunction = SnodeStatePutFunction;
cb.wirePacketSendFunction = SnodeWirePacketSendFunction;
cb.virtualNetworkFrameFunction = SnodeVirtualNetworkFrameFunction;
cb.virtualNetworkConfigFunction = SnodeVirtualNetworkConfigFunction;
@@ -600,9 +595,10 @@ public:
InetAddress trustedPathNetworks[ZT_MAX_TRUSTED_PATHS];
unsigned int trustedPathCount = 0;
- // Old style "trustedpaths" flat file -- will eventually go away
+ // LEGACY: support old "trustedpaths" flat file
FILE *trustpaths = fopen((_homePath + ZT_PATH_SEPARATOR_S "trustedpaths").c_str(),"r");
if (trustpaths) {
+ fprintf(stderr,"WARNING: 'trustedpaths' flat file format is deprecated in favor of path definitions in local.conf" ZT_EOL_S);
char buf[1024];
while ((fgets(buf,sizeof(buf),trustpaths))&&(trustedPathCount < ZT_MAX_TRUSTED_PATHS)) {
int fno = 0;
@@ -664,9 +660,11 @@ public:
if (trustedPathCount)
_node->setTrustedPaths(reinterpret_cast<const struct sockaddr_storage *>(trustedPathNetworks),trustedPathIds,trustedPathCount);
}
+
+ // Apply other runtime configuration from local.conf
applyLocalConfig();
- // Bind TCP control socket
+ // Make sure we can use the primary port, and hunt for one if configured to do so
const int portTrials = (_primaryPort == 0) ? 256 : 1; // if port is 0, pick random
for(int k=0;k<portTrials;++k) {
if (_primaryPort == 0) {
@@ -674,35 +672,8 @@ public:
Utils::getSecureRandom(&randp,sizeof(randp));
_primaryPort = 20000 + (randp % 45500);
}
-
if (_trialBind(_primaryPort)) {
- struct sockaddr_in in4;
- memset(&in4,0,sizeof(in4));
- in4.sin_family = AF_INET;
- in4.sin_addr.s_addr = Utils::hton((uint32_t)((_allowManagementFrom.size() > 0) ? 0 : 0x7f000001)); // right now we just listen for TCP @127.0.0.1
- in4.sin_port = Utils::hton((uint16_t)_primaryPort);
- _v4TcpControlSocket = _phy.tcpListen((const struct sockaddr *)&in4,this);
-
- struct sockaddr_in6 in6;
- memset((void *)&in6,0,sizeof(in6));
- in6.sin6_family = AF_INET6;
- in6.sin6_port = in4.sin_port;
- if (_allowManagementFrom.size() == 0)
- in6.sin6_addr.s6_addr[15] = 1; // IPv6 localhost == ::1
- _v6TcpControlSocket = _phy.tcpListen((const struct sockaddr *)&in6,this);
-
- // We must bind one of IPv4 or IPv6 -- support either failing to support hosts that
- // have only IPv4 or only IPv6 stacks.
- if ((_v4TcpControlSocket)||(_v6TcpControlSocket)) {
- _ports[0] = _primaryPort;
- break;
- } else {
- if (_v4TcpControlSocket)
- _phy.close(_v4TcpControlSocket,false);
- if (_v6TcpControlSocket)
- _phy.close(_v6TcpControlSocket,false);
- _primaryPort = 0;
- }
+ _ports[0] = _primaryPort;
} else {
_primaryPort = 0;
}
@@ -714,7 +685,7 @@ public:
return _termReason;
}
- // Write file containing primary port to be read by CLIs, etc.
+ // Save primary port to a file so CLIs and GUIs can learn it easily
char portstr[64];
Utils::snprintf(portstr,sizeof(portstr),"%u",_ports[0]);
OSUtils::writeFile((_homePath + ZT_PATH_SEPARATOR_S "zerotier-one.port").c_str(),std::string(portstr));
@@ -722,7 +693,7 @@ public:
// Attempt to bind to a secondary port chosen from our ZeroTier address.
// This exists because there are buggy NATs out there that fail if more
// than one device behind the same NAT tries to use the same internal
- // private address port number.
+ // private address port number. Buggy NATs are a running theme.
_ports[1] = 20000 + ((unsigned int)_node->address() % 45500);
for(int i=0;;++i) {
if (i > 1000) {
@@ -761,13 +732,11 @@ public:
}
#endif
- // Populate ports in big-endian format for quick compare
- for(int i=0;i<3;++i)
- _portsBE[i] = Utils::hton((uint16_t)_ports[i]);
-
+ // Network controller is now enabled by default for desktop and server
_controller = new EmbeddedNetworkController(_node,_controllerDbPath.c_str());
_node->setNetconfMaster((void *)_controller);
+/*
#ifdef ZT_ENABLE_CLUSTER
if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S "cluster").c_str())) {
_clusterDefinition = new ClusterDefinition(_node->address(),(_homePath + ZT_PATH_SEPARATOR_S "cluster").c_str());
@@ -816,8 +785,10 @@ public:
}
}
#endif
+*/
- { // Load existing networks
+ // Join existing networks in networks.d
+ {
std::vector<std::string> networksDotD(OSUtils::listDirectory((_homePath + ZT_PATH_SEPARATOR_S "networks.d").c_str()));
for(std::vector<std::string>::iterator f(networksDotD.begin());f!=networksDotD.end();++f) {
std::size_t dot = f->find_last_of('.');
@@ -825,7 +796,9 @@ public:
_node->join(Utils::hexStrToU64(f->substr(0,dot).c_str()),(void *)0,(void *)0);
}
}
- { // Load existing moons
+
+ // Orbit existing moons in moons.d
+ {
std::vector<std::string> moonsDotD(OSUtils::listDirectory((_homePath + ZT_PATH_SEPARATOR_S "moons.d").c_str()));
for(std::vector<std::string>::iterator f(moonsDotD.begin());f!=moonsDotD.end();++f) {
std::size_t dot = f->find_last_of('.');
@@ -834,6 +807,16 @@ public:
}
}
+ // Derive the cluster's shared secret backplane encryption key by hashing its shared secret identity
+ {
+ uint8_t tmp[64];
+ SHA512::hash(tmp,_node->identity().privateKeyPair().priv.data,ZT_C25519_PRIVATE_KEY_LEN);
+ memcpy(_clusterKey,tmp,32);
+ }
+ _clusterMemberId = _node->prng();
+ if (!_clusterMemberId) _clusterMemberId = 1;
+
+ // Main I/O loop
_nextBackgroundTaskDeadline = 0;
uint64_t clockShouldBe = OSUtils::now();
_lastRestart = clockShouldBe;
@@ -842,6 +825,7 @@ public:
uint64_t lastUpdateCheck = clockShouldBe;
uint64_t lastLocalInterfaceAddressCheck = (clockShouldBe - ZT_LOCAL_INTERFACE_CHECK_INTERVAL) + 15000; // do this in 15s to give portmapper time to configure and other things time to settle
uint64_t lastCleanedIddb = 0;
+ uint64_t lastTcpCheck = 0;
for(;;) {
_run_m.lock();
if (!_run) {
@@ -859,7 +843,7 @@ public:
// Clean iddb.d on start and every 24 hours
if ((now - lastCleanedIddb) > 86400000) {
lastCleanedIddb = now;
- OSUtils::cleanDirectory((_homePath + ZT_PATH_SEPARATOR_S "iddb.d").c_str(),now - ZT_IDDB_CLEANUP_AGE);
+ OSUtils::cleanDirectory(_iddbPath.c_str(),now - ZT_IDDB_CLEANUP_AGE);
}
// Attempt to detect sleep/wake events by detecting delay overruns
@@ -879,11 +863,13 @@ public:
// Refresh bindings in case device's interfaces have changed, and also sync routes to update any shadow routes (e.g. shadow default)
if (((now - lastBindRefresh) >= ZT_BINDER_REFRESH_PERIOD)||(restarted)) {
lastBindRefresh = now;
+ unsigned int p[3];
+ unsigned int pc = 0;
for(int i=0;i<3;++i) {
- if (_ports[i]) {
- _bindings[i].refresh(_phy,_ports[i],*this);
- }
+ if (_ports[i])
+ p[pc++] = _ports[i];
}
+ _binder.refresh(_phy,p,pc,*this);
{
Mutex::Lock _l(_nets_m);
for(std::map<uint64_t,NetworkState>::iterator n(_nets.begin());n!=_nets.end();++n) {
@@ -893,15 +879,18 @@ public:
}
}
+ // Run background task processor in core if it's time to do so
uint64_t dl = _nextBackgroundTaskDeadline;
if (dl <= now) {
_node->processBackgroundTasks((void *)0,now,&_nextBackgroundTaskDeadline);
dl = _nextBackgroundTaskDeadline;
}
+ // Close TCP fallback tunnel if we have direct UDP
if ((_tcpFallbackTunnel)&&((now - _lastDirectReceiveFromGlobal) < (ZT_TCP_FALLBACK_AFTER / 2)))
_phy.close(_tcpFallbackTunnel->sock);
+ // Sync multicast group memberships
if ((now - lastTapMulticastGroupCheck) >= ZT_TAP_CHECK_MULTICAST_INTERVAL) {
lastTapMulticastGroupCheck = now;
Mutex::Lock _l(_nets_m);
@@ -917,6 +906,7 @@ public:
}
}
+ // Sync information about physical network interfaces
if ((now - lastLocalInterfaceAddressCheck) >= ZT_LOCAL_INTERFACE_CHECK_INTERVAL) {
lastLocalInterfaceAddressCheck = now;
@@ -930,19 +920,65 @@ public:
}
#endif
- std::vector<InetAddress> boundAddrs(_bindings[0].allBoundLocalInterfaceAddresses());
+ std::vector<InetAddress> boundAddrs(_binder.allBoundLocalInterfaceAddresses());
for(std::vector<InetAddress>::const_iterator i(boundAddrs.begin());i!=boundAddrs.end();++i)
_node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*i)));
}
+ // Check TCP connections and cluster links
+ if ((now - lastTcpCheck) >= ZT_TCP_CHECK_PERIOD) {
+ lastTcpCheck = now;
+
+ std::vector<PhySocket *> toClose;
+ std::vector<InetAddress> clusterLinksUp;
+ {
+ Mutex::Lock _l(_tcpConnections_m);
+ for(std::vector<TcpConnection *>::const_iterator c(_tcpConnections.begin());c!=_tcpConnections.end();++c) {
+ TcpConnection *const tc = *c;
+ tc->writeq_m.lock();
+ const unsigned long wql = (unsigned long)tc->writeq.length();
+ tc->writeq_m.unlock();
+ if ((tc->sock)&&((wql > ZT_TCP_MAX_WRITEQ_SIZE)||((now - tc->lastReceive) > ZT_TCP_ACTIVITY_TIMEOUT))) {
+ toClose.push_back(tc->sock);
+ } else if ((tc->type == TcpConnection::TCP_CLUSTER_BACKPLANE)&&(tc->clusterMemberId)) {
+ clusterLinksUp.push_back(tc->remoteAddr);
+ sendMyCurrentClusterState(tc);
+ }
+ }
+ }
+ for(std::vector<PhySocket *>::iterator s(toClose.begin());s!=toClose.end();++s)
+ _phy.close(*s,true);
+
+ {
+ Mutex::Lock _l(_localConfig_m);
+ for(std::vector<InetAddress>::const_iterator ca(_clusterBackplaneAddresses.begin());ca!=_clusterBackplaneAddresses.end();++ca) {
+ if (std::find(clusterLinksUp.begin(),clusterLinksUp.end(),*ca) == clusterLinksUp.end()) {
+ TcpConnection *tc = new TcpConnection();
+ {
+ Mutex::Lock _l(_tcpConnections_m);
+ _tcpConnections.push_back(tc);
+ }
+
+ tc->type = TcpConnection::TCP_CLUSTER_BACKPLANE;
+ tc->remoteAddr = *ca;
+ tc->lastReceive = OSUtils::now();
+ tc->parent = this;
+ tc->sock = (PhySocket *)0; // set in connect handler
+ tc->messageSize = 0;
+
+ tc->clusterMemberId = 0; // not known yet
+
+ bool connected = false;
+ _phy.tcpConnect(reinterpret_cast<const struct sockaddr *>(&(*ca)),connected,(void *)tc,true);
+ }
+ }
+ }
+ }
+
const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 100;
clockShouldBe = now + (uint64_t)delay;
_phy.poll(delay);
}
- } catch (std::exception &exc) {
- Mutex::Lock _l(_termReason_m);
- _termReason = ONE_UNRECOVERABLE_ERROR;
- _fatalErrorMessage = exc.what();
} catch ( ... ) {
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
@@ -950,6 +986,7 @@ public:
}
try {
+ Mutex::Lock _l(_tcpConnections_m);
while (!_tcpConnections.empty())
_phy.close((*_tcpConnections.begin())->sock);
} catch ( ... ) {}
@@ -991,35 +1028,35 @@ public:
}
#ifdef ZT_SDK
- virtual void leave(const char *hp)
- {
- _node->leave(Utils::hexStrToU64(hp),NULL,NULL);
- }
+ virtual void leave(const char *hp)
+ {
+ _node->leave(Utils::hexStrToU64(hp),NULL,NULL);
+ }
virtual void join(const char *hp)
{
_node->join(Utils::hexStrToU64(hp),NULL,NULL);
}
- virtual std::string givenHomePath()
- {
- return _homePath;
- }
+ virtual std::string givenHomePath()
+ {
+ return _homePath;
+ }
- virtual EthernetTap * getTap(uint64_t nwid)
- {
+ virtual EthernetTap * getTap(uint64_t nwid)
+ {
Mutex::Lock _l(_nets_m);
std::map<uint64_t,NetworkState>::const_iterator n(_nets.find(nwid));
if (n == _nets.end())
- return NULL;
+ return NULL;
return n->second.tap;
- }
+ }
- virtual EthernetTap *getTap(InetAddress &addr)
- {
- Mutex::Lock _l(_nets_m);
+ virtual EthernetTap *getTap(InetAddress &addr)
+ {
+ Mutex::Lock _l(_nets_m);
std::map<uint64_t,NetworkState>::iterator it;
- for(it = _nets.begin(); it != _nets.end(); it++) {
+ for(it = _nets.begin(); it != _nets.end(); it++) {
if(it->second.tap) {
for(int j=0; j<it->second.tap->_ips.size(); j++) {
if(it->second.tap->_ips[j].isEqualPrefix(addr) || it->second.tap->_ips[j].ipsEqual(addr) || it->second.tap->_ips[j].containsAddress(addr)) {
@@ -1027,9 +1064,9 @@ public:
}
}
}
- }
- return NULL;
- }
+ }
+ return NULL;
+ }
virtual Node * getNode()
{
@@ -1040,9 +1077,8 @@ public:
{
Mutex::Lock _l(_nets_m);
std::map<uint64_t,NetworkState>::iterator i;
- for(i = _nets.begin(); i != _nets.end(); i++) {
- delete i->second.tap;
- }
+ for(i = _nets.begin(); i != _nets.end(); i++)
+ delete i->second.tap;
}
#endif // ZT_SDK
@@ -1073,8 +1109,8 @@ public:
return false;
n->second.settings = settings;
- char nlcpath[256];
- Utils::snprintf(nlcpath,sizeof(nlcpath),"%s" ZT_PATH_SEPARATOR_S "networks.d" ZT_PATH_SEPARATOR_S "%.16llx.local.conf",_homePath.c_str(),nwid);
+ char nlcpath[4096];
+ Utils::snprintf(nlcpath,sizeof(nlcpath),"%s" ZT_PATH_SEPARATOR_S "%.16llx.local.conf",_networksPath.c_str(),nwid);
FILE *out = fopen(nlcpath,"w");
if (out) {
fprintf(out,"allowManaged=%d\n",(int)n->second.settings.allowManaged);
@@ -1089,7 +1125,9 @@ public:
return true;
}
- // Internal implementation methods -----------------------------------------
+ // =========================================================================
+ // Internal implementation methods for control plane, route setup, etc.
+ // =========================================================================
inline unsigned int handleControlPlaneHttpRequest(
const InetAddress &fromAddress,
@@ -1216,7 +1254,6 @@ public:
settings["portMappingEnabled"] = false; // not supported in build
#endif
#ifndef ZT_SDK
-
settings["softwareUpdate"] = OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT);
settings["softwareUpdateChannel"] = OSUtils::jsonString(settings["softwareUpdateChannel"],ZT_SOFTWARE_UPDATE_DEFAULT_CHANNEL);
#endif
@@ -1224,6 +1261,7 @@ public:
res["planetWorldId"] = planet.id();
res["planetWorldTimestamp"] = planet.timestamp();
+/*
#ifdef ZT_ENABLE_CLUSTER
json cj;
ZT_ClusterStatus cs;
@@ -1250,6 +1288,7 @@ public:
#else
res["cluster"] = json();
#endif
+*/
scode = 200;
} else if (ps[0] == "moon") {
@@ -1770,24 +1809,237 @@ public:
}
// =========================================================================
- // Handlers for Node and Phy<> callbacks
+ // Cluster messaging functions
// =========================================================================
- inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
+ // mlen must be at least 24
+ void encryptClusterMessage(char *data,unsigned int mlen)
{
-#ifdef ZT_ENABLE_CLUSTER
- if (sock == _clusterMessageSocket) {
- _lastDirectReceiveFromGlobal = OSUtils::now();
- _node->clusterHandleIncomingMessage(data,len);
- return;
+ uint8_t key[32];
+ memcpy(key,_clusterKey,32);
+ for(int i=0;i<8;++i) key[i] ^= data[i];
+ Salsa20 s20(key,data + 8);
+
+ uint8_t macKey[32];
+ uint8_t mac[16];
+ memset(macKey,0,32);
+ s20.crypt12(macKey,macKey,32);
+ s20.crypt12(data + 24,data + 24,mlen - 24);
+ Poly1305::compute(mac,data + 24,mlen - 24,macKey);
+ memcpy(data + 16,mac,8);
+ }
+
+ void announceStatusToClusterMember(TcpConnection *tc)
+ {
+ try {
+ Buffer<8194> buf;
+
+ buf.appendRandom(16);
+ buf.addSize(8); // space for MAC
+ buf.append((uint8_t)CLUSTER_MESSAGE_STATUS);
+ buf.append(_clusterMemberId);
+ buf.append((uint16_t)ZEROTIER_ONE_VERSION_MAJOR);
+ buf.append((uint16_t)ZEROTIER_ONE_VERSION_MINOR);
+ buf.append((uint16_t)ZEROTIER_ONE_VERSION_REVISION);
+
+ std::vector<InetAddress> lif(_binder.allBoundLocalInterfaceAddresses());
+ buf.append((uint16_t)lif.size());
+ for(std::vector<InetAddress>::const_iterator i(lif.begin());i!=lif.end();++i)
+ i->serialize(buf);
+
+ Mutex::Lock _l(tc->writeq_m);
+
+ if (tc->writeq.length() == 0)
+ _phy.setNotifyWritable(tc->sock,true);
+
+ const unsigned int mlen = buf.size();
+ tc->writeq.push_back((char)((mlen >> 16) & 0xff));
+ tc->writeq.push_back((char)((mlen >> 8) & 0xff));
+ tc->writeq.push_back((char)(mlen & 0xff));
+
+ char *const data = reinterpret_cast<char *>(buf.unsafeData());
+ encryptClusterMessage(data,mlen);
+ tc->writeq.append(data,mlen);
+ } catch ( ... ) {
+ fprintf(stderr,"WARNING: unexpected exception announcing status to cluster members" ZT_EOL_S);
}
-#endif
+ }
-#ifdef ZT_BREAK_UDP
- if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP"))
- return;
-#endif
+ bool proxySendViaCluster(const InetAddress &fromAddress,const InetAddress &dest,const void *data,unsigned int len,unsigned int ttl)
+ {
+ Mutex::Lock _l(_tcpConnections_m);
+ for(std::vector<TcpConnection *>::const_iterator c(_tcpConnections.begin());c!=_tcpConnections.end();++c) {
+ TcpConnection *const tc = *c;
+ if ((tc->type == TcpConnection::TCP_CLUSTER_BACKPLANE)&&(tc->clusterMemberId)) {
+ Mutex::Lock _l2(tc->clusterMemberLocalAddresses_m);
+ for(std::vector<InetAddress>::const_iterator i(tc->clusterMemberLocalAddresses.begin());i!=tc->clusterMemberLocalAddresses.end();++i) {
+ if (*i == fromAddress) {
+ Buffer<1024> buf;
+
+ buf.appendRandom(16);
+ buf.addSize(8); // space for MAC
+ buf.append((uint8_t)CLUSTER_MESSAGE_PROXY_SEND);
+ buf.append((uint8_t)ttl);
+ dest.serialize(buf);
+ fromAddress.serialize(buf);
+
+ Mutex::Lock _l3(tc->writeq_m);
+
+ if (tc->writeq.length() == 0)
+ _phy.setNotifyWritable(tc->sock,true);
+
+ const unsigned int mlen = buf.size() + len;
+ tc->writeq.push_back((char)((mlen >> 16) & 0xff));
+ tc->writeq.push_back((char)((mlen >> 8) & 0xff));
+ tc->writeq.push_back((char)(mlen & 0xff));
+
+ const unsigned long startpos = (unsigned long)tc->writeq.length();
+ tc->writeq.append(reinterpret_cast<const char *>(buf.data()),buf.size());
+ tc->writeq.append(reinterpret_cast<const char *>(data),len);
+
+ char *const outdata = const_cast<char *>(tc->writeq.data()) + startpos;
+ encryptClusterMessage(outdata,mlen);
+
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ void replicateStateObject(const ZT_StateObjectType type,const uint64_t id,const void *const data,const unsigned int len,TcpConnection *tc)
+ {
+ char buf[34];
+ Mutex::Lock _l2(tc->writeq_m);
+
+ if (tc->writeq.length() == 0)
+ _phy.setNotifyWritable(tc->sock,true);
+
+ const unsigned int mlen = len + 34;
+
+ tc->writeq.push_back((char)((mlen >> 16) & 0xff));
+ tc->writeq.push_back((char)((mlen >> 8) & 0xff));
+ tc->writeq.push_back((char)(mlen & 0xff));
+
+ Utils::getSecureRandom(buf,16);
+ buf[24] = (char)CLUSTER_MESSAGE_STATE_OBJECT;
+ buf[25] = (char)type;
+ buf[26] = (char)((id >> 56) & 0xff);
+ buf[27] = (char)((id >> 48) & 0xff);
+ buf[28] = (char)((id >> 40) & 0xff);
+ buf[29] = (char)((id >> 32) & 0xff);
+ buf[30] = (char)((id >> 24) & 0xff);
+ buf[31] = (char)((id >> 16) & 0xff);
+ buf[32] = (char)((id >> 8) & 0xff);
+ buf[33] = (char)(id & 0xff);
+
+ const unsigned long startpos = (unsigned long)tc->writeq.length();
+ tc->writeq.append(buf,34);
+ tc->writeq.append(reinterpret_cast<const char *>(data),len);
+
+ char *const outdata = const_cast<char *>(tc->writeq.data()) + startpos;
+ encryptClusterMessage(outdata,mlen);
+ }
+
+ void replicateStateObjectToCluster(const ZT_StateObjectType type,const uint64_t id,const void *const data,const unsigned int len,const uint64_t everyoneBut)
+ {
+ std::vector<uint64_t> sentTo;
+ if (everyoneBut)
+ sentTo.push_back(everyoneBut);
+ Mutex::Lock _l(_tcpConnections_m);
+ for(std::vector<TcpConnection *>::const_iterator ci(_tcpConnections.begin());ci!=_tcpConnections.end();++ci) {
+ TcpConnection *const c = *ci;
+ if ((c->type == TcpConnection::TCP_CLUSTER_BACKPLANE)&&(c->clusterMemberId != 0)&&(std::find(sentTo.begin(),sentTo.end(),c->clusterMemberId) == sentTo.end())) {
+ sentTo.push_back(c->clusterMemberId);
+ replicateStateObject(type,id,data,len,c);
+ }
+ }
+ }
+
+ void writeStateObject(enum ZT_StateObjectType type,uint64_t id,const void *data,int len)
+ {
+ char p[4096];
+ bool secure = false;
+ switch(type) {
+ case ZT_STATE_OBJECT_IDENTITY_PUBLIC:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "identity.public",_homePath.c_str());
+ break;
+ case ZT_STATE_OBJECT_IDENTITY_SECRET:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "identity.secret",_homePath.c_str());
+ secure = true;
+ break;
+ case ZT_STATE_OBJECT_PEER_IDENTITY:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "iddb.d/%.10llx",_homePath.c_str(),(unsigned long long)id);
+ break;
+ case ZT_STATE_OBJECT_NETWORK_CONFIG:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "networks.d/%.16llx.conf",_homePath.c_str(),(unsigned long long)id);
+ secure = true;
+ break;
+ case ZT_STATE_OBJECT_PLANET:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "planet",_homePath.c_str());
+ break;
+ case ZT_STATE_OBJECT_MOON:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "moons.d/%.16llx.moon",_homePath.c_str(),(unsigned long long)id);
+ break;
+ default:
+ p[0] = (char)0;
+ break;
+ }
+ if (p[0]) {
+ if (len >= 0) {
+ FILE *f = fopen(p,"w");
+ if (f) {
+ if (fwrite(data,len,1,f) != 1)
+ fprintf(stderr,"WARNING: unable to write to file: %s (I/O error)" ZT_EOL_S,p);
+ fclose(f);
+ if (secure)
+ OSUtils::lockDownFile(p,false);
+ } else {
+ fprintf(stderr,"WARNING: unable to write to file: %s (unable to open)" ZT_EOL_S,p);
+ }
+ } else {
+ OSUtils::rm(p);
+ }
+ }
+ }
+
+ void sendMyCurrentClusterState(TcpConnection *tc)
+ {
+ // We currently don't need to dump everything. Networks and moons are most important.
+ // The rest will get caught up rapidly due to constant peer updates, etc.
+ std::string buf;
+ std::vector<std::string> l(OSUtils::listDirectory((_homePath + ZT_PATH_SEPARATOR_S + "networks.d").c_str(),false));
+ for(std::vector<std::string>::const_iterator f(l.begin());f!=l.end();++f) {
+ buf.clear();
+ if (OSUtils::readFile((_homePath + ZT_PATH_SEPARATOR_S + *f).c_str(),buf)) {
+ if (f->length() == 21) {
+ const uint64_t nwid = Utils::hexStrToU64(f->substr(0,16).c_str());
+ if (nwid)
+ replicateStateObject(ZT_STATE_OBJECT_NETWORK_CONFIG,nwid,buf.data(),(int)buf.length(),tc);
+ }
+ }
+ }
+ l = OSUtils::listDirectory((_homePath + ZT_PATH_SEPARATOR_S + "moons.d").c_str(),false);
+ for(std::vector<std::string>::const_iterator f(l.begin());f!=l.end();++f) {
+ buf.clear();
+ if (OSUtils::readFile((_homePath + ZT_PATH_SEPARATOR_S + *f).c_str(),buf)) {
+ if (f->length() == 21) {
+ const uint64_t moonId = Utils::hexStrToU64(f->substr(0,16).c_str());
+ if (moonId)
+ replicateStateObject(ZT_STATE_OBJECT_MOON,moonId,buf.data(),(int)buf.length(),tc);
+ }
+ }
+ }
+ }
+
+ // =========================================================================
+ // Handlers for Node and Phy<> callbacks
+ // =========================================================================
+
+ inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
+ {
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
_lastDirectReceiveFromGlobal = OSUtils::now();
@@ -1811,38 +2063,33 @@ public:
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)
{
- if (!success)
+ if (!success) {
+ phyOnTcpClose(sock,uptr);
return;
+ }
- // Outgoing TCP connections are always TCP fallback tunnel connections.
-
- TcpConnection *tc = new TcpConnection();
- _tcpConnections.insert(tc);
-
- tc->type = TcpConnection::TCP_TUNNEL_OUTGOING;
- tc->shouldKeepAlive = true;
- tc->parent = this;
+ TcpConnection *const tc = reinterpret_cast<TcpConnection *>(*uptr);
+ if (!tc) { // sanity check
+ _phy.close(sock,true);
+ return;
+ }
tc->sock = sock;
- // from and parser are not used
- tc->messageSize = 0; // unused
- tc->lastActivity = OSUtils::now();
- // HTTP stuff is not used
- tc->writeBuf = "";
- *uptr = (void *)tc;
-
- // Send "hello" message
- tc->writeBuf.push_back((char)0x17);
- tc->writeBuf.push_back((char)0x03);
- tc->writeBuf.push_back((char)0x03); // fake TLS 1.2 header
- tc->writeBuf.push_back((char)0x00);
- tc->writeBuf.push_back((char)0x04); // mlen == 4
- tc->writeBuf.push_back((char)ZEROTIER_ONE_VERSION_MAJOR);
- tc->writeBuf.push_back((char)ZEROTIER_ONE_VERSION_MINOR);
- tc->writeBuf.push_back((char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff));
- tc->writeBuf.push_back((char)(ZEROTIER_ONE_VERSION_REVISION & 0xff));
- _phy.setNotifyWritable(sock,true);
-
- _tcpFallbackTunnel = tc;
+
+ if (tc->type == TcpConnection::TCP_TUNNEL_OUTGOING) {
+ if (_tcpFallbackTunnel)
+ _phy.close(_tcpFallbackTunnel->sock);
+ _tcpFallbackTunnel = tc;
+ _phy.streamSend(sock,ZT_TCP_TUNNEL_HELLO,sizeof(ZT_TCP_TUNNEL_HELLO));
+ } else if (tc->type == TcpConnection::TCP_CLUSTER_BACKPLANE) {
+ {
+ Mutex::Lock _l(tc->writeq_m);
+ tc->writeq.push_back((char)0x93); // identifies type of connection as cluster backplane
+ }
+ announceStatusToClusterMember(tc);
+ _phy.setNotifyWritable(sock,true);
+ } else {
+ _phy.close(sock,true);
+ }
}
inline void phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from)
@@ -1852,149 +2099,321 @@ public:
return;
} else {
TcpConnection *tc = new TcpConnection();
- _tcpConnections.insert(tc);
- tc->type = TcpConnection::TCP_HTTP_INCOMING;
- tc->shouldKeepAlive = true;
+ {
+ Mutex::Lock _l(_tcpConnections_m);
+ _tcpConnections.push_back(tc);
+ }
+
+ tc->type = TcpConnection::TCP_UNCATEGORIZED_INCOMING;
tc->parent = this;
tc->sock = sockN;
- tc->from = from;
+ tc->remoteAddr = from;
+ tc->lastReceive = OSUtils::now();
http_parser_init(&(tc->parser),HTTP_REQUEST);
tc->parser.data = (void *)tc;
tc->messageSize = 0;
- tc->lastActivity = OSUtils::now();
- tc->currentHeaderField = "";
- tc->currentHeaderValue = "";
- tc->url = "";
- tc->status = "";
- tc->headers.clear();
- tc->body = "";
- tc->writeBuf = "";
+
*uptrN = (void *)tc;
}
}
- inline void phyOnTcpClose(PhySocket *sock,void **uptr)
+ void phyOnTcpClose(PhySocket *sock,void **uptr)
{
TcpConnection *tc = (TcpConnection *)*uptr;
if (tc) {
- if (tc == _tcpFallbackTunnel)
+ if (tc == _tcpFallbackTunnel) {
_tcpFallbackTunnel = (TcpConnection *)0;
- _tcpConnections.erase(tc);
+ }
+ {
+ Mutex::Lock _l(_tcpConnections_m);
+ _tcpConnections.erase(std::remove(_tcpConnections.begin(),_tcpConnections.end(),tc),_tcpConnections.end());
+ }
delete tc;
}
}
- inline void phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len)
+ void phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len)
{
- TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr);
- switch(tc->type) {
+ try {
+ if (!len) return; // sanity check, should never happen
+ TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr);
+ tc->lastReceive = OSUtils::now();
+ switch(tc->type) {
+
+ case TcpConnection::TCP_UNCATEGORIZED_INCOMING:
+ switch(reinterpret_cast<uint8_t *>(data)[0]) {
+ // 0x93 is first byte of cluster backplane connections
+ case 0x93: {
+ // We only allow this from cluster backplane IPs. We also authenticate
+ // each packet cryptographically, so this is just a first line of defense.
+ bool allow = false;
+ {
+ Mutex::Lock _l(_localConfig_m);
+ for(std::vector< InetAddress >::const_iterator i(_clusterBackplaneAddresses.begin());i!=_clusterBackplaneAddresses.end();++i) {
+ if (tc->remoteAddr.ipsEqual(*i)) {
+ allow = true;
+ break;
+ }
+ }
+ }
+ if (allow) {
+ tc->type = TcpConnection::TCP_CLUSTER_BACKPLANE;
+ tc->clusterMemberId = 0; // unknown, waiting for first status message
+ announceStatusToClusterMember(tc);
+ if (len > 1)
+ phyOnTcpData(sock,uptr,reinterpret_cast<uint8_t *>(data) + 1,len - 1);
+ } else {
+ _phy.close(sock);
+ }
+ } break;
+
+ // HTTP: GET, PUT, POST, HEAD
+ case 'G':
+ case 'P':
+ case 'H': {
+ // This is only allowed from IPs permitted to access the management
+ // backplane, which is just 127.0.0.1/::1 unless otherwise configured.
+ bool allow;
+ {
+ Mutex::Lock _l(_localConfig_m);
+ if (_allowManagementFrom.size() == 0) {
+ allow = (tc->remoteAddr.ipScope() == InetAddress::IP_SCOPE_LOOPBACK);
+ } else {
+ allow = false;
+ for(std::vector<InetAddress>::const_iterator i(_allowManagementFrom.begin());i!=_allowManagementFrom.end();++i) {
+ if (i->containsAddress(tc->remoteAddr)) {
+ allow = true;
+ break;
+ }
+ }
+ }
+ }
+ if (allow) {
+ tc->type = TcpConnection::TCP_HTTP_INCOMING;
+ phyOnTcpData(sock,uptr,data,len);
+ } else {
+ _phy.close(sock);
+ }
+ } break;
- case TcpConnection::TCP_HTTP_INCOMING:
- case TcpConnection::TCP_HTTP_OUTGOING:
- http_parser_execute(&(tc->parser),&HTTP_PARSER_SETTINGS,(const char *)data,len);
- if ((tc->parser.upgrade)||(tc->parser.http_errno != HPE_OK)) {
- _phy.close(sock);
+ // Drop unknown protocols
+ default:
+ _phy.close(sock);
+ break;
+ }
+ return;
+
+ case TcpConnection::TCP_HTTP_INCOMING:
+ case TcpConnection::TCP_HTTP_OUTGOING:
+ http_parser_execute(&(tc->parser),&HTTP_PARSER_SETTINGS,(const char *)data,len);
+ if ((tc->parser.upgrade)||(tc->parser.http_errno != HPE_OK))
+ _phy.close(sock);
return;
- }
- break;
- case TcpConnection::TCP_TUNNEL_OUTGOING:
- tc->body.append((const char *)data,len);
- while (tc->body.length() >= 5) {
- const char *data = tc->body.data();
- const unsigned long mlen = ( ((((unsigned long)data[3]) & 0xff) << 8) | (((unsigned long)data[4]) & 0xff) );
- if (tc->body.length() >= (mlen + 5)) {
- InetAddress from;
+ case TcpConnection::TCP_TUNNEL_OUTGOING:
+ tc->readq.append((const char *)data,len);
+ while (tc->readq.length() >= 5) {
+ const char *data = tc->readq.data();
+ const unsigned long mlen = ( ((((unsigned long)data[3]) & 0xff) << 8) | (((unsigned long)data[4]) & 0xff) );
+ if (tc->readq.length() >= (mlen + 5)) {
+ InetAddress from;
unsigned long plen = mlen; // payload length, modified if there's an IP header
- data += 5; // skip forward past pseudo-TLS junk and mlen
- if (plen == 4) {
- // Hello message, which isn't sent by proxy and would be ignored by client
- } else if (plen) {
- // Messages should contain IPv4 or IPv6 source IP address data
- switch(data[0]) {
- case 4: // IPv4
- if (plen >= 7) {
- from.set((const void *)(data + 1),4,((((unsigned int)data[5]) & 0xff) << 8) | (((unsigned int)data[6]) & 0xff));
- data += 7; // type + 4 byte IP + 2 byte port
- plen -= 7;
- } else {
+ data += 5; // skip forward past pseudo-TLS junk and mlen
+ if (plen == 4) {
+ // Hello message, which isn't sent by proxy and would be ignored by client
+ } else if (plen) {
+ // Messages should contain IPv4 or IPv6 source IP address data
+ switch(data[0]) {
+ case 4: // IPv4
+ if (plen >= 7) {
+ from.set((const void *)(data + 1),4,((((unsigned int)data[5]) & 0xff) << 8) | (((unsigned int)data[6]) & 0xff));
+ data += 7; // type + 4 byte IP + 2 byte port
+ plen -= 7;
+ } else {
+ _phy.close(sock);
+ return;
+ }
+ break;
+ case 6: // IPv6
+ if (plen >= 19) {
+ from.set((const void *)(data + 1),16,((((unsigned int)data[17]) & 0xff) << 8) | (((unsigned int)data[18]) & 0xff));
+ data += 19; // type + 16 byte IP + 2 byte port
+ plen -= 19;
+ } else {
+ _phy.close(sock);
+ return;
+ }
+ break;
+ case 0: // none/omitted
+ ++data;
+ --plen;
+ break;
+ default: // invalid address type
_phy.close(sock);
return;
- }
- break;
- case 6: // IPv6
- if (plen >= 19) {
- from.set((const void *)(data + 1),16,((((unsigned int)data[17]) & 0xff) << 8) | (((unsigned int)data[18]) & 0xff));
- data += 19; // type + 16 byte IP + 2 byte port
- plen -= 19;
- } else {
+ }
+
+ if (from) {
+ InetAddress fakeTcpLocalInterfaceAddress((uint32_t)0xffffffff,0xffff);
+ const ZT_ResultCode rc = _node->processWirePacket(
+ (void *)0,
+ OSUtils::now(),
+ reinterpret_cast<struct sockaddr_storage *>(&fakeTcpLocalInterfaceAddress),
+ reinterpret_cast<struct sockaddr_storage *>(&from),
+ data,
+ plen,
+ &_nextBackgroundTaskDeadline);
+ if (ZT_ResultCode_isFatal(rc)) {
+ char tmp[256];
+ Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
+ Mutex::Lock _l(_termReason_m);
+ _termReason = ONE_UNRECOVERABLE_ERROR;
+ _fatalErrorMessage = tmp;
+ this->terminate();
_phy.close(sock);
return;
}
+ }
+ }
+
+ if (tc->readq.length() > (mlen + 5))
+ tc->readq.erase(tc->readq.begin(),tc->readq.begin() + (mlen + 5));
+ else tc->readq.clear();
+ } else break;
+ }
+ return;
+
+ case TcpConnection::TCP_CLUSTER_BACKPLANE:
+ tc->readq.append((const char *)data,len);
+ if (tc->readq.length() >= 28) { // got 3-byte message size + 16-byte IV + 8-byte MAC + 1-byte type (encrypted)
+ uint8_t *data = reinterpret_cast<uint8_t *>(const_cast<char *>(tc->readq.data()));
+ unsigned long mlen = ( ((unsigned long)data[0] << 16) | ((unsigned long)data[1] << 8) | (unsigned long)data[2] );
+ if ((mlen < 25)||(mlen > ZT_TCP_MAX_WRITEQ_SIZE)) {
+ _phy.close(sock);
+ return;
+ } else if (tc->readq.length() >= (mlen + 3)) { // got entire message
+ data += 3;
+
+ uint8_t key[32];
+ memcpy(key,_clusterKey,32);
+ for(int i=0;i<8;++i) key[i] ^= data[i]; // first 8 bytes of IV get XORed with key
+ Salsa20 s20(key,data + 8); // last 8 bytes of IV are fed into Salsa20 directly as its 64-bit IV
+
+ uint8_t macKey[32];
+ uint8_t mac[16];
+ memset(macKey,0,32);
+ s20.crypt12(macKey,macKey,32);
+ Poly1305::compute(mac,data + 24,mlen - 24,macKey);
+ if (!Utils::secureEq(mac,data + 16,8)) {
+ _phy.close(sock);
+ return;
+ }
+ s20.crypt12(data + 24,data + 24,mlen - 24);
+
+ switch((ClusterMessageType)data[24]) {
+ case CLUSTER_MESSAGE_STATUS:
+ if (mlen > (25 + 16)) {
+ Buffer<4096> tmp(data + 25,mlen - 25);
+ try {
+ const uint64_t cmid = tmp.at<uint64_t>(0);
+ if (cmid == _clusterMemberId) { // shouldn't happen, but don't allow self-to-self
+ _phy.close(sock);
+ return;
+ }
+ if (!tc->clusterMemberId) {
+ tc->clusterMemberId = cmid;
+ sendMyCurrentClusterState(tc);
+ }
+ tc->clusterMemberVersionMajor = tmp.at<uint16_t>(8);
+ tc->clusterMemberVersionMinor = tmp.at<uint16_t>(10);
+ tc->clusterMemberVersionRev = tmp.at<uint16_t>(12);
+ const unsigned int clusterMemberLocalAddressCount = tmp.at<uint16_t>(14);
+ std::vector<InetAddress> la;
+ unsigned int ptr = 16;
+ for(unsigned int k=0;k<clusterMemberLocalAddressCount;++k) {
+ la.push_back(InetAddress());
+ ptr += la.back().deserialize(tmp,ptr);
+ }
+ {
+ Mutex::Lock _l2(tc->clusterMemberLocalAddresses_m);
+ tc->clusterMemberLocalAddresses.swap(la);
+ }
+ } catch ( ... ) {}
+ }
break;
- case 0: // none/omitted
- ++data;
- --plen;
+
+ case CLUSTER_MESSAGE_STATE_OBJECT:
+ if (mlen >= (25 + 9)) { // type + object ID + [data]
+ const uint64_t objId = (
+ ((uint64_t)data[26] << 56) |
+ ((uint64_t)data[27] << 48) |
+ ((uint64_t)data[28] << 40) |
+ ((uint64_t)data[29] << 32) |
+ ((uint64_t)data[30] << 24) |
+ ((uint64_t)data[31] << 16) |
+ ((uint64_t)data[32] << 8) |
+ (uint64_t)data[33]
+ );
+ if (_node->processStateUpdate((void *)0,(ZT_StateObjectType)data[25],objId,data + 34,(unsigned int)(mlen - 34)) == ZT_RESULT_OK) {
+ writeStateObject((ZT_StateObjectType)data[25],objId,data + 34,(unsigned int)(mlen - 34));
+ replicateStateObjectToCluster((ZT_StateObjectType)data[25],objId,data + 34,(unsigned int)(mlen - 34),tc->clusterMemberId);
+ }
+ }
break;
- default: // invalid address type
- _phy.close(sock);
- return;
- }
- if (from) {
- InetAddress fakeTcpLocalInterfaceAddress((uint32_t)0xffffffff,0xffff);
- const ZT_ResultCode rc = _node->processWirePacket(
- (void *)0,
- OSUtils::now(),
- reinterpret_cast<struct sockaddr_storage *>(&fakeTcpLocalInterfaceAddress),
- reinterpret_cast<struct sockaddr_storage *>(&from),
- data,
- plen,
- &_nextBackgroundTaskDeadline);
- if (ZT_ResultCode_isFatal(rc)) {
- char tmp[256];
- Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
- Mutex::Lock _l(_termReason_m);
- _termReason = ONE_UNRECOVERABLE_ERROR;
- _fatalErrorMessage = tmp;
- this->terminate();
- _phy.close(sock);
- return;
- }
+ case CLUSTER_MESSAGE_PROXY_SEND:
+ if (mlen > 25) {
+ Buffer<4096> tmp(data + 25,mlen - 25);
+ try {
+ InetAddress dest,src;
+ const unsigned int ttl = (unsigned int)tmp[0];
+ unsigned int ptr = 1;
+ ptr += dest.deserialize(tmp);
+ ptr += src.deserialize(tmp,ptr);
+ if (ptr < tmp.size())
+ _binder.udpSend(_phy,src,dest,reinterpret_cast<const uint8_t *>(tmp.data()) + ptr,tmp.size() - ptr,ttl);
+ } catch ( ... ) {}
+ }
+ break;
}
- }
- if (tc->body.length() > (mlen + 5))
- tc->body = tc->body.substr(mlen + 5);
- else tc->body = "";
- } else break;
- }
- break;
+ tc->readq.erase(tc->readq.begin(),tc->readq.begin() + mlen);
+ }
+ }
+ return;
+ }
+ } catch ( ... ) {
+ _phy.close(sock);
}
}
inline void phyOnTcpWritable(PhySocket *sock,void **uptr)
{
TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr);
- Mutex::Lock _l(tc->writeBuf_m);
- if (tc->writeBuf.length() > 0) {
- long sent = (long)_phy.streamSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true);
- if (sent > 0) {
- tc->lastActivity = OSUtils::now();
- if ((unsigned long)sent >= (unsigned long)tc->writeBuf.length()) {
- tc->writeBuf = "";
- _phy.setNotifyWritable(sock,false);
- if (!tc->shouldKeepAlive)
- _phy.close(sock); // will call close handler to delete from _tcpConnections
- } else {
- tc->writeBuf = tc->writeBuf.substr(sent);
+ bool closeit = false;
+ {
+ Mutex::Lock _l(tc->writeq_m);
+ if (tc->writeq.length() > 0) {
+ long sent = (long)_phy.streamSend(sock,tc->writeq.data(),(unsigned long)tc->writeq.length(),true);
+ if (sent > 0) {
+ if ((unsigned long)sent >= (unsigned long)tc->writeq.length()) {
+ tc->writeq.clear();
+ _phy.setNotifyWritable(sock,false);
+
+ if (tc->type == TcpConnection::TCP_HTTP_INCOMING)
+ closeit = true; // HTTP keep alive not supported
+ } else {
+ tc->writeq.erase(tc->writeq.begin(),tc->writeq.begin() + sent);
+ }
}
+ } else {
+ _phy.setNotifyWritable(sock,false);
}
- } else {
- _phy.setNotifyWritable(sock,false);
}
+ if (closeit)
+ _phy.close(sock);
}
inline void phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) {}
@@ -2155,80 +2574,51 @@ public:
}
}
- inline long nodeDataStoreGetFunction(const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize)
+ inline void nodeStatePutFunction(enum ZT_StateObjectType type,uint64_t id,const void *data,int len)
{
- std::string p(_dataStorePrepPath(name));
- if (!p.length())
- return -2;
-
- FILE *f = fopen(p.c_str(),"rb");
- if (!f)
- return -1;
- if (fseek(f,0,SEEK_END) != 0) {
- fclose(f);
- return -2;
- }
- long ts = ftell(f);
- if (ts < 0) {
- fclose(f);
- return -2;
- }
- *totalSize = (unsigned long)ts;
- if (fseek(f,(long)readIndex,SEEK_SET) != 0) {
- fclose(f);
- return -2;
- }
- long n = (long)fread(buf,1,bufSize,f);
- fclose(f);
- return n;
+ writeStateObject(type,id,data,len);
+ replicateStateObjectToCluster(type,id,data,len,0);
}
- inline int nodeDataStorePutFunction(const char *name,const void *data,unsigned long len,int secure)
+ inline int nodeStateGetFunction(enum ZT_StateObjectType type,uint64_t id,void *data,unsigned int maxlen)
{
- std::string p(_dataStorePrepPath(name));
- if (!p.length())
- return -2;
-
- if (!data) {
- OSUtils::rm(p.c_str());
- return 0;
+ char p[4096];
+ switch(type) {
+ case ZT_STATE_OBJECT_IDENTITY_PUBLIC:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "identity.public",_homePath.c_str());
+ break;
+ case ZT_STATE_OBJECT_IDENTITY_SECRET:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "identity.secret",_homePath.c_str());
+ break;
+ case ZT_STATE_OBJECT_PEER_IDENTITY:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "iddb.d/%.10llx",_homePath.c_str(),(unsigned long long)id);
+ break;
+ case ZT_STATE_OBJECT_NETWORK_CONFIG:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "networks.d/%.16llx.conf",_homePath.c_str(),(unsigned long long)id);
+ break;
+ case ZT_STATE_OBJECT_PLANET:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "planet",_homePath.c_str());
+ break;
+ case ZT_STATE_OBJECT_MOON:
+ Utils::snprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "moons.d/%.16llx.moon",_homePath.c_str(),(unsigned long long)id);
+ break;
+ default:
+ return -1;
}
-
- FILE *f = fopen(p.c_str(),"wb");
- if (!f)
- return -1;
- if (fwrite(data,len,1,f) == 1) {
- fclose(f);
- if (secure)
- OSUtils::lockDownFile(p.c_str(),false);
- return 0;
- } else {
+ FILE *f = fopen(p,"r");
+ if (f) {
+ int n = (int)fread(data,1,maxlen,f);
fclose(f);
- OSUtils::rm(p.c_str());
- return -1;
+ if (n >= 0)
+ return n;
}
+ return -1;
}
inline int nodeWirePacketSendFunction(const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl)
{
- unsigned int fromBindingNo = 0;
-
- if (addr->ss_family == AF_INET) {
- if (reinterpret_cast<const struct sockaddr_in *>(localAddr)->sin_port == 0) {
- // If sender is sending from wildcard (null address), choose the secondary backup
- // port 1/4 of the time. (but only for IPv4)
- fromBindingNo = (++_udpPortPickerCounter & 0x4) >> 2;
- if (!_ports[fromBindingNo])
- fromBindingNo = 0;
- } else {
- const uint16_t lp = reinterpret_cast<const struct sockaddr_in *>(localAddr)->sin_port;
- if (lp == _portsBE[1])
- fromBindingNo = 1;
- else if (lp == _portsBE[2])
- fromBindingNo = 2;
- }
-
#ifdef ZT_TCP_FALLBACK_RELAY
+ if (addr->ss_family == AF_INET) {
// TCP fallback tunnel support, currently IPv4 only
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
// Engage TCP tunnel fallback if we haven't received anything valid from a global
@@ -2237,46 +2627,59 @@ public:
const uint64_t now = OSUtils::now();
if (((now - _lastDirectReceiveFromGlobal) > ZT_TCP_FALLBACK_AFTER)&&((now - _lastRestart) > ZT_TCP_FALLBACK_AFTER)) {
if (_tcpFallbackTunnel) {
- Mutex::Lock _l(_tcpFallbackTunnel->writeBuf_m);
- if (!_tcpFallbackTunnel->writeBuf.length())
+ Mutex::Lock _l(_tcpFallbackTunnel->writeq_m);
+ if (_tcpFallbackTunnel->writeq.length() == 0)
_phy.setNotifyWritable(_tcpFallbackTunnel->sock,true);
- unsigned long mlen = len + 7;
- _tcpFallbackTunnel->writeBuf.push_back((char)0x17);
- _tcpFallbackTunnel->writeBuf.push_back((char)0x03);
- _tcpFallbackTunnel->writeBuf.push_back((char)0x03); // fake TLS 1.2 header
- _tcpFallbackTunnel->writeBuf.push_back((char)((mlen >> 8) & 0xff));
- _tcpFallbackTunnel->writeBuf.push_back((char)(mlen & 0xff));
- _tcpFallbackTunnel->writeBuf.push_back((char)4); // IPv4
- _tcpFallbackTunnel->writeBuf.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_addr.s_addr))),4);
- _tcpFallbackTunnel->writeBuf.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_port))),2);
- _tcpFallbackTunnel->writeBuf.append((const char *)data,len);
+ const unsigned long mlen = len + 7;
+ _tcpFallbackTunnel->writeq.push_back((char)0x17);
+ _tcpFallbackTunnel->writeq.push_back((char)0x03);
+ _tcpFallbackTunnel->writeq.push_back((char)0x03); // fake TLS 1.2 header
+ _tcpFallbackTunnel->writeq.push_back((char)((mlen >> 8) & 0xff));
+ _tcpFallbackTunnel->writeq.push_back((char)(mlen & 0xff));
+ _tcpFallbackTunnel->writeq.push_back((char)4); // IPv4
+ _tcpFallbackTunnel->writeq.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_addr.s_addr))),4);
+ _tcpFallbackTunnel->writeq.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_port))),2);
+ _tcpFallbackTunnel->writeq.append((const char *)data,len);
} else if (((now - _lastSendToGlobalV4) < ZT_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobalV4) > (ZT_PING_CHECK_INVERVAL / 2))) {
- bool connected = false;
const InetAddress addr(ZT_TCP_FALLBACK_RELAY);
- _phy.tcpConnect(reinterpret_cast<const struct sockaddr *>(&addr),connected);
+ TcpConnection *tc = new TcpConnection();
+ {
+ Mutex::Lock _l(_tcpConnections_m);
+ _tcpConnections.push_back(tc);
+ }
+ tc->type = TcpConnection::TCP_TUNNEL_OUTGOING;
+ tc->remoteAddr = addr;
+ tc->lastReceive = OSUtils::now();
+ tc->parent = this;
+ tc->sock = (PhySocket *)0; // set in connect handler
+ tc->messageSize = 0;
+ bool connected = false;
+ _phy.tcpConnect(reinterpret_cast<const struct sockaddr *>(&addr),connected,(void *)tc,true);
}
}
_lastSendToGlobalV4 = now;
}
-#endif // ZT_TCP_FALLBACK_RELAY
- } else if (addr->ss_family == AF_INET6) {
- if (reinterpret_cast<const struct sockaddr_in6 *>(localAddr)->sin6_port != 0) {
- const uint16_t lp = reinterpret_cast<const struct sockaddr_in6 *>(localAddr)->sin6_port;
- if (lp == _portsBE[1])
- fromBindingNo = 1;
- else if (lp == _portsBE[2])
- fromBindingNo = 2;
- }
- } else {
- return -1;
}
+ // Even when relaying we still send via UDP. This way if UDP starts
+ // working we can instantly "fail forward" to it and stop using TCP
+ // proxy fallback, which is slow.
+#endif // ZT_TCP_FALLBACK_RELAY
-#ifdef ZT_BREAK_UDP
- if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP"))
- return 0; // silently break UDP
-#endif
+ switch (_binder.udpSend(_phy,*(reinterpret_cast<const InetAddress *>(localAddr)),*(reinterpret_cast<const InetAddress *>(addr)),data,len,ttl)) {
+ case -1: // local bound address not found, so see if a cluster peer owns it
+ if (localAddr->ss_family != 0) {
+ return (proxySendViaCluster(*(reinterpret_cast<const InetAddress *>(localAddr)),*(reinterpret_cast<const InetAddress *>(addr)),data,len,ttl)) ? 0 : -1;
+ } else {
+ return -1; // failure
+ }
+ break;
+
+ case 0: // failure
+ return -1;
- return (_bindings[fromBindingNo].udpSend(_phy,*(reinterpret_cast<const InetAddress *>(localAddr)),*(reinterpret_cast<const InetAddress *>(addr)),data,len,ttl)) ? 0 : -1;
+ default: // success
+ return 0;
+ }
}
inline void nodeVirtualNetworkFrameFunction(uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
@@ -2362,39 +2765,22 @@ public:
inline void onHttpRequestToServer(TcpConnection *tc)
{
- char tmpn[256];
+ char tmpn[4096];
std::string data;
std::string contentType("text/plain"); // default if not changed in handleRequest()
unsigned int scode = 404;
- bool allow;
- {
- Mutex::Lock _l(_localConfig_m);
- if (_allowManagementFrom.size() == 0) {
- allow = (tc->from.ipScope() == InetAddress::IP_SCOPE_LOOPBACK);
- } else {
- allow = false;
- for(std::vector<InetAddress>::const_iterator i(_allowManagementFrom.begin());i!=_allowManagementFrom.end();++i) {
- if (i->containsAddress(tc->from)) {
- allow = true;
- break;
- }
- }
- }
- }
+ // Note that we check allowed IP ranges when HTTP connections are first detected in
+ // phyOnTcpData(). If we made it here the source IP is okay.
- if (allow) {
- try {
- scode = handleControlPlaneHttpRequest(tc->from,tc->parser.method,tc->url,tc->headers,tc->body,data,contentType);
- } catch (std::exception &exc) {
- fprintf(stderr,"WARNING: unexpected exception processing control HTTP request: %s" ZT_EOL_S,exc.what());
- scode = 500;
- } catch ( ... ) {
- fprintf(stderr,"WARNING: unexpected exception processing control HTTP request: unknown exceptino" ZT_EOL_S);
- scode = 500;
- }
- } else {
- scode = 401;
+ try {
+ scode = handleControlPlaneHttpRequest(tc->remoteAddr,tc->parser.method,tc->url,tc->headers,tc->readq,data,contentType);
+ } catch (std::exception &exc) {
+ fprintf(stderr,"WARNING: unexpected exception processing control HTTP request: %s" ZT_EOL_S,exc.what());
+ scode = 500;
+ } catch ( ... ) {
+ fprintf(stderr,"WARNING: unexpected exception processing control HTTP request: unknown exceptino" ZT_EOL_S);
+ scode = 500;
}
const char *scodestr;
@@ -2410,19 +2796,16 @@ public:
default: scodestr = "Error"; break;
}
- Utils::snprintf(tmpn,sizeof(tmpn),"HTTP/1.1 %.3u %s\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n",scode,scodestr);
+ Utils::snprintf(tmpn,sizeof(tmpn),"HTTP/1.1 %.3u %s\r\nCache-Control: no-cache\r\nPragma: no-cache\r\nContent-Type: %s\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ scode,
+ scodestr,
+ contentType.c_str(),
+ (unsigned long)data.length());
{
- Mutex::Lock _l(tc->writeBuf_m);
- tc->writeBuf.assign(tmpn);
- tc->writeBuf.append("Content-Type: ");
- tc->writeBuf.append(contentType);
- Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length());
- tc->writeBuf.append(tmpn);
- if (!tc->shouldKeepAlive)
- tc->writeBuf.append("Connection: close\r\n");
- tc->writeBuf.append("\r\n");
+ Mutex::Lock _l(tc->writeq_m);
+ tc->writeq = tmpn;
if (tc->parser.method != HTTP_HEAD)
- tc->writeBuf.append(data);
+ tc->writeq.append(data);
}
_phy.setNotifyWritable(tc->sock,true);
@@ -2430,8 +2813,7 @@ public:
inline void onHttpResponseFromClient(TcpConnection *tc)
{
- if (!tc->shouldKeepAlive)
- _phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections
+ _phy.close(tc->sock);
}
bool shouldBindInterface(const char *ifname,const InetAddress &ifaddr)
@@ -2475,23 +2857,6 @@ public:
return true;
}
- std::string _dataStorePrepPath(const char *name) const
- {
- std::string p(_homePath);
- p.push_back(ZT_PATH_SEPARATOR);
- char lastc = (char)0;
- for(const char *n=name;(*n);++n) {
- if ((*n == '.')&&(lastc == '.'))
- return std::string(); // don't allow ../../ stuff as a precaution
- if (*n == '/') {
- OSUtils::mkdir(p.c_str());
- p.push_back(ZT_PATH_SEPARATOR);
- } else p.push_back(*n);
- lastc = *n;
- }
- return p;
- }
-
bool _trialBind(unsigned int port)
{
struct sockaddr_in in4;
@@ -2532,10 +2897,10 @@ static int SnodeVirtualNetworkConfigFunction(ZT_Node *node,void *uptr,void *tptr
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkConfigFunction(nwid,nuptr,op,nwconf); }
static void SnodeEventCallback(ZT_Node *node,void *uptr,void *tptr,enum ZT_Event event,const void *metaData)
{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeEventCallback(event,metaData); }
-static long SnodeDataStoreGetFunction(ZT_Node *node,void *uptr,void *tptr,const char *name,void *buf,unsigned long bufSize,unsigned long readIndex,unsigned long *totalSize)
-{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeDataStoreGetFunction(name,buf,bufSize,readIndex,totalSize); }
-static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,void *tptr,const char *name,const void *data,unsigned long len,int secure)
-{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeDataStorePutFunction(name,data,len,secure); }
+static void SnodeStatePutFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,uint64_t id,const void *data,int len)
+{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeStatePutFunction(type,id,data,len); }
+static int SnodeStateGetFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,uint64_t id,void *data,unsigned int maxlen)
+{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeStateGetFunction(type,id,data,maxlen); }
static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,void *tptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeWirePacketSendFunction(localAddr,addr,data,len,ttl); }
static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
@@ -2544,22 +2909,6 @@ static int SnodePathCheckFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t z
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodePathCheckFunction(ztaddr,localAddr,remoteAddr); }
static int SnodePathLookupFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t ztaddr,int family,struct sockaddr_storage *result)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodePathLookupFunction(ztaddr,family,result); }
-
-#ifdef ZT_ENABLE_CLUSTER
-static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len)
-{
- OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
- const ClusterDefinition::MemberDefinition &md = (*(impl->_clusterDefinition))[toMemberId];
- if (md.clusterEndpoint)
- impl->_phy.udpSend(impl->_clusterMessageSocket,reinterpret_cast<const struct sockaddr *>(&(md.clusterEndpoint)),data,len);
-}
-static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z)
-{
- OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
- return (int)(impl->_clusterDefinition->geo().locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z));
-}
-#endif
-
static void StapFrameHandler(void *uptr,void *tptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); }
@@ -2569,10 +2918,10 @@ static int ShttpOnMessageBegin(http_parser *parser)
tc->currentHeaderField = "";
tc->currentHeaderValue = "";
tc->messageSize = 0;
- tc->url = "";
- tc->status = "";
+ tc->url.clear();
+ tc->status.clear();
tc->headers.clear();
- tc->body = "";
+ tc->readq.clear();
return 0;
}
static int ShttpOnUrl(http_parser *parser,const char *ptr,size_t length)
@@ -2589,16 +2938,7 @@ static int ShttpOnStatus(http_parser *parser,const char *ptr,size_t length)
#else
static int ShttpOnStatus(http_parser *parser)
#endif
-{
- /*
- TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
- tc->messageSize += (unsigned long)length;
- if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE)
- return -1;
- tc->status.append(ptr,length);
- */
- return 0;
-}
+{ return 0; }
static int ShttpOnHeaderField(http_parser *parser,const char *ptr,size_t length)
{
TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
@@ -2636,14 +2976,12 @@ static int ShttpOnBody(http_parser *parser,const char *ptr,size_t length)
tc->messageSize += (unsigned long)length;
if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE)
return -1;
- tc->body.append(ptr,length);
+ tc->readq.append(ptr,length);
return 0;
}
static int ShttpOnMessageComplete(http_parser *parser)
{
TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
- tc->shouldKeepAlive = (http_should_keep_alive(parser) != 0);
- tc->lastActivity = OSUtils::now();
if (tc->type == TcpConnection::TCP_HTTP_INCOMING) {
tc->parent->onHttpRequestToServer(tc);
} else {