summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2017-06-05 12:15:28 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2017-06-05 12:15:28 -0700
commit9b287392a4af95ee0d15db7e3d1f9dd6bd804060 (patch)
treec7bf3bb3c37e6c5ff4545e0249d3e089ae2d96fc
parentaa06470cb6e779563e6c21e1166c36e30a2138ce (diff)
downloadinfinitytier-9b287392a4af95ee0d15db7e3d1f9dd6bd804060.tar.gz
infinitytier-9b287392a4af95ee0d15db7e3d1f9dd6bd804060.zip
.
-rw-r--r--include/ZeroTierOne.h49
-rw-r--r--node/Buffer.hpp13
-rw-r--r--node/Network.cpp21
-rw-r--r--node/Node.cpp4
-rw-r--r--node/Node.hpp2
-rw-r--r--osdep/Binder.hpp10
-rw-r--r--service/OneService.cpp813
7 files changed, 627 insertions, 285 deletions
diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h
index 1e6f0ae8..9c295cee 100644
--- a/include/ZeroTierOne.h
+++ b/include/ZeroTierOne.h
@@ -297,7 +297,7 @@ enum ZT_ResultCode
* @param x Result code
* @return True if result code indicates a fatal error
*/
-#define ZT_ResultCode_isFatal(x) ((((int)(x)) > 0)&&(((int)(x)) < 1000))
+#define ZT_ResultCode_isFatal(x) ((((int)(x)) >= 100)&&(((int)(x)) < 1000))
/**
* Status codes sent to status update callback when things happen
@@ -393,6 +393,13 @@ enum ZT_Event
/**
* User message used with ZT_EVENT_USER_MESSAGE
+ *
+ * These are direct VL1 P2P messages for application use. Encryption and
+ * authentication in the ZeroTier protocol will guarantee the origin
+ * address and message content, but you are responsible for any other
+ * levels of authentication or access control that are required. Any node
+ * in the world can send you a user message! (Unless your network is air
+ * gapped.)
*/
typedef struct
{
@@ -720,24 +727,6 @@ typedef struct
} v;
} ZT_VirtualNetworkRule;
-typedef struct
-{
- /**
- * 128-bit ID (GUID) of this capability
- */
- uint64_t id[2];
-
- /**
- * Expiration time (measured vs. network config timestamp issued by controller)
- */
- uint64_t expiration;
-
- struct {
- uint64_t from;
- uint64_t to;
- } custody[ZT_MAX_CAPABILITY_CUSTODY_CHAIN_LENGTH];
-} ZT_VirtualNetworkCapability;
-
/**
* A route to be pushed on a virtual network
*/
@@ -1102,7 +1091,7 @@ enum ZT_StateObjectType
ZT_STATE_OBJECT_NULL = 0,
/**
- * identity.public
+ * Public address and public key
*
* Object ID: this node's address if known, or 0 if unknown (first query)
* Canonical path: <HOME>/identity.public
@@ -1111,10 +1100,10 @@ enum ZT_StateObjectType
ZT_STATE_OBJECT_IDENTITY_PUBLIC = 1,
/**
- * identity.secret
+ * Full identity with secret key
*
* Object ID: this node's address if known, or 0 if unknown (first query)
- * Canonical path: <HOME>/identity.public
+ * Canonical path: <HOME>/identity.secret
* Persistence: required, should be stored with restricted permissions e.g. mode 0600 on *nix
*/
ZT_STATE_OBJECT_IDENTITY_SECRET = 2,
@@ -1280,7 +1269,7 @@ typedef int (*ZT_StateGetFunction)(
unsigned int); /* Length of data buffer in bytes */
/**
- * Function to send a ZeroTier packet out over the wire
+ * Function to send a ZeroTier packet out over the physical wire (L2/L3)
*
* Parameters:
* (1) Node
@@ -1335,9 +1324,6 @@ typedef int (*ZT_WirePacketSendFunction)(
* all configured ZeroTier interfaces and check to ensure that the supplied
* addresses will not result in ZeroTier traffic being sent over a ZeroTier
* interface (recursion).
- *
- * Obviously this is not required in configurations where this can't happen,
- * such as network containers or embedded.
*/
typedef int (*ZT_PathCheckFunction)(
ZT_Node *, /* Node */
@@ -1426,13 +1412,12 @@ struct ZT_Node_Callbacks
};
/**
- * Create a new ZeroTier One node
- *
- * Note that this can take a few seconds the first time it's called, as it
- * will generate an identity.
+ * Create a new ZeroTier node
*
- * TODO: should consolidate function pointers into versioned structure for
- * better API stability.
+ * This will attempt to load its identity via the state get function in the
+ * callback struct. If that fails it will generate a new identity and store
+ * it. Identity generation can take anywhere from a few hundred milliseconds
+ * to a few seconds depending on your CPU speed.
*
* @param node Result: pointer is set to new node instance on success
* @param uptr User pointer to pass to functions/callbacks
diff --git a/node/Buffer.hpp b/node/Buffer.hpp
index fea32767..69ee1758 100644
--- a/node/Buffer.hpp
+++ b/node/Buffer.hpp
@@ -263,6 +263,19 @@ public:
}
/**
+ * Append secure random bytes
+ *
+ * @param n Number of random bytes to append
+ */
+ inline void appendRandom(unsigned int n)
+ {
+ if (unlikely((_l + n) > C))
+ throw std::out_of_range("Buffer: append beyond capacity");
+ Utils::getSecureRandom(_b + _l,n);
+ _l += n;
+ }
+
+ /**
* Append a C-array of bytes
*
* @param b Data
diff --git a/node/Network.cpp b/node/Network.cpp
index 6dfb0b92..74d81941 100644
--- a/node/Network.cpp
+++ b/node/Network.cpp
@@ -701,7 +701,26 @@ Network::Network(const RuntimeEnvironment *renv,void *tPtr,uint64_t nwid,void *u
this->setConfiguration(tPtr,*nconf,false);
_lastConfigUpdate = 0; // still want to re-request since it's likely outdated
} else {
- RR->node->stateObjectPut(tPtr,ZT_STATE_OBJECT_NETWORK_CONFIG,nwid,"\n",1);
+ bool got = false;
+ Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY> *dict = new Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY>();
+ try {
+ int n = RR->node->stateObjectGet(tPtr,ZT_STATE_OBJECT_NETWORK_CONFIG,nwid,dict->unsafeData(),ZT_NETWORKCONFIG_DICT_CAPACITY - 1);
+ if (n > 1) {
+ NetworkConfig *nconf = new NetworkConfig();
+ try {
+ if (nconf->fromDictionary(*dict)) {
+ this->setConfiguration(tPtr,*nconf,false);
+ _lastConfigUpdate = 0; // still want to re-request an update since it's likely outdated
+ got = true;
+ }
+ } catch ( ... ) {}
+ delete nconf;
+ }
+ } catch ( ... ) {}
+ delete dict;
+
+ if (!got)
+ RR->node->stateObjectPut(tPtr,ZT_STATE_OBJECT_NETWORK_CONFIG,nwid,"\n",1);
}
if (!_portInitialized) {
diff --git a/node/Node.cpp b/node/Node.cpp
index 7421c467..37586834 100644
--- a/node/Node.cpp
+++ b/node/Node.cpp
@@ -161,8 +161,10 @@ ZT_ResultCode Node::processStateUpdate(
if (len < 2) {
Mutex::Lock _l(_networks_m);
SharedPtr<Network> &nw = _networks[id];
- if (!nw)
+ if (!nw) {
nw = SharedPtr<Network>(new Network(RR,tptr,id,(void *)0,(const NetworkConfig *)0));
+ r = ZT_RESULT_OK;
+ }
} else {
Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY> *dict = new Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY>(reinterpret_cast<const char *>(data),len);
try {
diff --git a/node/Node.hpp b/node/Node.hpp
index ceb3b000..f407c60c 100644
--- a/node/Node.hpp
+++ b/node/Node.hpp
@@ -214,6 +214,8 @@ public:
World planet() const;
std::vector<World> moons() const;
+ inline const Identity &identity() const { return _RR.identity; }
+
/**
* Register that we are expecting a reply to a packet ID
*
diff --git a/osdep/Binder.hpp b/osdep/Binder.hpp
index ee832825..fee1c3da 100644
--- a/osdep/Binder.hpp
+++ b/osdep/Binder.hpp
@@ -446,6 +446,16 @@ public:
return aa;
}
+ /**
+ * @param aa Vector to append local interface addresses to
+ */
+ inline void allBoundLocalInterfaceAddresses(std::vector<InetAddress> &aa)
+ {
+ Mutex::Lock _l(_lock);
+ for(std::vector<_Binding>::const_iterator i(_bindings.begin());i!=_bindings.end();++i)
+ aa.push_back(i->address);
+ }
+
private:
std::vector<_Binding> _bindings;
Mutex _lock;
diff --git a/service/OneService.cpp b/service/OneService.cpp
index 5893a570..6365131e 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -31,7 +31,6 @@
#include <string>
#include <map>
-#include <set>
#include <vector>
#include <algorithm>
#include <list>
@@ -47,6 +46,9 @@
#include "../node/MAC.hpp"
#include "../node/Identity.hpp"
#include "../node/World.hpp"
+#include "../node/Salsa20.hpp"
+#include "../node/Poly1305.hpp"
+#include "../node/SHA512.hpp"
#include "../osdep/Phy.hpp"
#include "../osdep/Thread.hpp"
@@ -155,10 +157,25 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
// Clean files from iddb.d that are older than this (60 days)
#define ZT_IDDB_CLEANUP_AGE 5184000000ULL
+// Maximum write buffer size for outgoing TCP connections (sanity limit)
+#define ZT_TCP_MAX_WRITEQ_SIZE 33554432
+
+// How often to check TCP connections and cluster links
+#define ZT_TCP_CHECK_PERIOD 10000
+
+// How often to send status info to cluster links
+#define ZT_TCP_CLUSTER_SEND_STATUS_EVERY 30000
+
+// TCP activity timeout
+#define ZT_TCP_ACTIVITY_TIMEOUT 60000
+
namespace ZeroTier {
namespace {
+// Fake TLS hello for TCP tunnel outgoing connections (TUNNELED mode)
+static const char ZT_TCP_TUNNEL_HELLO[9] = { 0x17,0x03,0x03,0x00,0x04,(char)ZEROTIER_ONE_VERSION_MAJOR,(char)ZEROTIER_ONE_VERSION_MINOR,(char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff),(char)(ZEROTIER_ONE_VERSION_REVISION & 0xff) };
+
static std::string _trimString(const std::string &s)
{
unsigned long end = (unsigned long)s.length();
@@ -342,32 +359,53 @@ static const struct http_parser_settings HTTP_PARSER_SETTINGS = {
};
#endif
+/**
+ * A TCP connection and related state and buffers
+ */
struct TcpConnection
{
enum {
+ TCP_UNCATEGORIZED_INCOMING, // uncategorized incoming connection
TCP_HTTP_INCOMING,
- TCP_HTTP_OUTGOING, // not currently used
- TCP_TUNNEL_OUTGOING // fale-SSL outgoing tunnel -- HTTP-related fields are not used
+ TCP_HTTP_OUTGOING,
+ TCP_TUNNEL_OUTGOING, // TUNNELED mode proxy outbound connection
+ TCP_CLUSTER_BACKPLANE,
} type;
- bool shouldKeepAlive;
OneServiceImpl *parent;
PhySocket *sock;
InetAddress from;
+ unsigned long lastReceive;
+
+ // Used for inbound HTTP connections
http_parser parser;
unsigned long messageSize;
- uint64_t lastActivity;
-
std::string currentHeaderField;
std::string currentHeaderValue;
-
std::string url;
std::string status;
std::map< std::string,std::string > headers;
- std::string body;
- std::string writeBuf;
- Mutex writeBuf_m;
+ // Used for cluster backplane connections
+ uint64_t clusterMemberId;
+ unsigned int clusterMemberVersionMajor;
+ unsigned int clusterMemberVersionMinor;
+ unsigned int clusterMemberVersionRev;
+ std::vector< InetAddress > clusterMemberLocalAddresses;
+
+ std::string readq;
+ std::string writeq;
+ Mutex writeq_m;
+};
+
+/**
+ * Message types for cluster backplane communication
+ */
+enum ClusterMessageType
+{
+ CLUSTER_MESSAGE_STATUS = 0,
+ CLUSTER_MESSAGE_STATE_OBJECT = 1,
+ CLUSTER_MESSAGE_PROXY_SEND = 2
};
class OneServiceImpl : public OneService
@@ -389,8 +427,10 @@ public:
bool _updateAutoApply;
unsigned int _primaryPort;
volatile unsigned int _udpPortPickerCounter;
+ uint64_t _clusterMemberId;
+ uint8_t _clusterKey[32]; // secret key for cluster backplane config
- // Local configuration and memo-ized static path definitions
+ // Local configuration and memo-ized information from it
json _localConfig;
Hashtable< uint64_t,std::vector<InetAddress> > _v4Hints;
Hashtable< uint64_t,std::vector<InetAddress> > _v6Hints;
@@ -400,6 +440,7 @@ public:
std::vector< InetAddress > _globalV6Blacklist;
std::vector< InetAddress > _allowManagementFrom;
std::vector< std::string > _interfacePrefixBlacklist;
+ std::vector< InetAddress > _clusterBackplaneAddresses;
Mutex _localConfig_m;
/*
@@ -417,6 +458,10 @@ public:
unsigned int _ports[3];
uint16_t _portsBE[3]; // ports in big-endian network byte order as in sockaddr
+ // Local interface addresses obtained from bindings
+ std::vector<InetAddress> _localInterfaceAddresses;
+ Mutex _localInterfaceAddresses_m;
+
// Sockets for JSON API -- bound only to V4 and V6 localhost
PhySocket *_v4TcpControlSocket;
PhySocket *_v6TcpControlSocket;
@@ -455,7 +500,8 @@ public:
Mutex _nets_m;
// Active TCP/IP connections
- std::set< TcpConnection * > _tcpConnections; // no mutex for this since it's done in the main loop thread only
+ std::vector< TcpConnection * > _tcpConnections;
+ Mutex _tcpConnections_m;
TcpConnection *_tcpFallbackTunnel;
// Termination status information
@@ -469,13 +515,6 @@ public:
PortMapper *_portMapper;
#endif
- // Cluster management instance if enabled
-#ifdef ZT_ENABLE_CLUSTER
- PhySocket *_clusterMessageSocket;
- ClusterDefinition *_clusterDefinition;
- unsigned int _clusterMemberId;
-#endif
-
// Set to false to force service to stop
volatile bool _run;
Mutex _run_m;
@@ -512,7 +551,6 @@ public:
#ifdef ZT_ENABLE_CLUSTER
,_clusterMessageSocket((PhySocket *)0)
,_clusterDefinition((ClusterDefinition *)0)
- ,_clusterMemberId(0)
#endif
,_run(true)
{
@@ -822,6 +860,14 @@ public:
}
*/
+ {
+ uint8_t tmp[64];
+ SHA512::hash(tmp,_node->identity().privateKeyPair().priv.data,ZT_C25519_PRIVATE_KEY_LEN);
+ memcpy(_clusterKey,tmp,32);
+ }
+ _clusterMemberId = _node->prng();
+ if (!_clusterMemberId) _clusterMemberId = 1;
+
// Main I/O loop
_nextBackgroundTaskDeadline = 0;
uint64_t clockShouldBe = OSUtils::now();
@@ -911,6 +957,7 @@ public:
_node->clearLocalInterfaceAddresses();
+ // Tell Node about uPnP and NAT-PMP bound external addresses
#ifdef ZT_USE_MINIUPNPC
if (_portMapper) {
std::vector<InetAddress> mappedAddresses(_portMapper->get());
@@ -919,9 +966,20 @@ public:
}
#endif
+ // Tell Node about local interface addresses bound to the primary port
std::vector<InetAddress> boundAddrs(_bindings[0].allBoundLocalInterfaceAddresses());
for(std::vector<InetAddress>::const_iterator i(boundAddrs.begin());i!=boundAddrs.end();++i)
_node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*i)));
+
+ // Memoize all local interface addresses for use in clustering -- we tell other cluster members about these
+ {
+ Mutex::Lock _l(_localInterfaceAddresses_m);
+ _localInterfaceAddresses.clear();
+ for(int i=0;i<3;++i) {
+ if (_ports[i] > 0)
+ _bindings[i].allBoundLocalInterfaceAddresses(_localInterfaceAddresses);
+ }
+ }
}
const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 100;
@@ -939,6 +997,7 @@ public:
}
try {
+ Mutex::Lock _l(_tcpConnections_m);
while (!_tcpConnections.empty())
_phy.close((*_tcpConnections.begin())->sock);
} catch ( ... ) {}
@@ -980,35 +1039,35 @@ public:
}
#ifdef ZT_SDK
- virtual void leave(const char *hp)
- {
- _node->leave(Utils::hexStrToU64(hp),NULL,NULL);
- }
+ virtual void leave(const char *hp)
+ {
+ _node->leave(Utils::hexStrToU64(hp),NULL,NULL);
+ }
virtual void join(const char *hp)
{
_node->join(Utils::hexStrToU64(hp),NULL,NULL);
}
- virtual std::string givenHomePath()
- {
- return _homePath;
- }
+ virtual std::string givenHomePath()
+ {
+ return _homePath;
+ }
- virtual EthernetTap * getTap(uint64_t nwid)
- {
+ virtual EthernetTap * getTap(uint64_t nwid)
+ {
Mutex::Lock _l(_nets_m);
std::map<uint64_t,NetworkState>::const_iterator n(_nets.find(nwid));
if (n == _nets.end())
- return NULL;
+ return NULL;
return n->second.tap;
- }
+ }
- virtual EthernetTap *getTap(InetAddress &addr)
- {
- Mutex::Lock _l(_nets_m);
+ virtual EthernetTap *getTap(InetAddress &addr)
+ {
+ Mutex::Lock _l(_nets_m);
std::map<uint64_t,NetworkState>::iterator it;
- for(it = _nets.begin(); it != _nets.end(); it++) {
+ for(it = _nets.begin(); it != _nets.end(); it++) {
if(it->second.tap) {
for(int j=0; j<it->second.tap->_ips.size(); j++) {
if(it->second.tap->_ips[j].isEqualPrefix(addr) || it->second.tap->_ips[j].ipsEqual(addr)) {
@@ -1016,9 +1075,9 @@ public:
}
}
}
- }
- return NULL;
- }
+ }
+ return NULL;
+ }
virtual Node * getNode()
{
@@ -1029,9 +1088,8 @@ public:
{
Mutex::Lock _l(_nets_m);
std::map<uint64_t,NetworkState>::iterator i;
- for(i = _nets.begin(); i != _nets.end(); i++) {
- delete i->second.tap;
- }
+ for(i = _nets.begin(); i != _nets.end(); i++)
+ delete i->second.tap;
}
#endif // ZT_SDK
@@ -1078,7 +1136,9 @@ public:
return true;
}
- // Internal implementation methods -----------------------------------------
+ // =========================================================================
+ // Internal implementation methods for control plane, route setup, etc.
+ // =========================================================================
inline unsigned int handleControlPlaneHttpRequest(
const InetAddress &fromAddress,
@@ -1759,22 +1819,118 @@ public:
}
}
+ void announceStatusToClusterMember(TcpConnection *tc)
+ {
+ Buffer<4096> buf;
+
+ buf.appendRandom(16);
+ buf.addSize(8); // space for MAC
+ buf.append((uint8_t)CLUSTER_MESSAGE_STATUS);
+ buf.append(_clusterMemberId);
+ buf.append((uint16_t)ZEROTIER_ONE_VERSION_MAJOR);
+ buf.append((uint16_t)ZEROTIER_ONE_VERSION_MINOR);
+ buf.append((uint16_t)ZEROTIER_ONE_VERSION_REVISION);
+
+ {
+ Mutex::Lock _l(_localInterfaceAddresses_m);
+ buf.append((uint16_t)_localInterfaceAddresses.size());
+ for(std::vector<InetAddress>::const_iterator i(_localInterfaceAddresses.begin());i!=_localInterfaceAddresses.end();++i) {
+ i->serialize(buf);
+ if ((buf.size() + 32) > buf.capacity())
+ break;
+ }
+ }
+
+ Mutex::Lock _l(tc->writeq_m);
+
+ if (tc->writeq.length() == 0)
+ _phy.setNotifyWritable(tc->sock,true);
+
+ const unsigned int mlen = buf.size();
+ tc->writeq.push_back((char)((mlen >> 16) & 0xff));
+ tc->writeq.push_back((char)((mlen >> 8) & 0xff));
+ tc->writeq.push_back((char)(mlen & 0xff));
+
+ char *data = reinterpret_cast<char *>(buf.unsafeData());
+
+ uint8_t key[32];
+ memcpy(key,_clusterKey,32);
+ for(int i=0;i<8;++i) key[i] ^= data[i];
+ Salsa20 s20(key,data + 8);
+
+ uint8_t macKey[32];
+ uint8_t mac[16];
+ memset(macKey,0,32);
+ s20.crypt12(macKey,macKey,32);
+ s20.crypt12(data + 24,data + 24,mlen - 24);
+ Poly1305::compute(mac,data + 24,mlen - 24,macKey);
+ memcpy(data + 16,mac,8);
+
+ tc->writeq.append(data,mlen);
+ }
+
+ void replicateStateObjectToCluster(const ZT_StateObjectType type,const uint64_t id,const void *const data,const unsigned int len,const uint64_t everyoneBut)
+ {
+ uint8_t *buf = new uint8_t[len + 34];
+ try {
+ std::vector<uint64_t> sentTo;
+ if (everyoneBut)
+ sentTo.push_back(everyoneBut);
+ Mutex::Lock _l(_tcpConnections_m);
+ for(std::vector<TcpConnection *>::const_iterator ci(_tcpConnections.begin());ci!=_tcpConnections.end();++ci) {
+ TcpConnection *const c = *ci;
+ if ((c->type == TcpConnection::TCP_CLUSTER_BACKPLANE)&&(c->clusterMemberId != 0)&&(std::find(sentTo.begin(),sentTo.end(),c->clusterMemberId) == sentTo.end())) {
+ sentTo.push_back(c->clusterMemberId);
+ Mutex::Lock _l2(c->writeq_m);
+
+ if (c->writeq.length() == 0)
+ _phy.setNotifyWritable(c->sock,true);
+
+ const unsigned int mlen = len + 34;
+ c->writeq.push_back((char)((mlen >> 16) & 0xff));
+ c->writeq.push_back((char)((mlen >> 8) & 0xff));
+ c->writeq.push_back((char)(mlen & 0xff));
+
+ Utils::getSecureRandom(buf,16);
+
+ buf[24] = (uint8_t)CLUSTER_MESSAGE_STATE_OBJECT;
+ buf[25] = (uint8_t)type;
+ buf[26] = (uint8_t)((id >> 56) & 0xff);
+ buf[27] = (uint8_t)((id >> 48) & 0xff);
+ buf[28] = (uint8_t)((id >> 40) & 0xff);
+ buf[29] = (uint8_t)((id >> 32) & 0xff);
+ buf[30] = (uint8_t)((id >> 24) & 0xff);
+ buf[31] = (uint8_t)((id >> 16) & 0xff);
+ buf[32] = (uint8_t)((id >> 8) & 0xff);
+ buf[33] = (uint8_t)(id & 0xff);
+ memcpy(buf + 34,data,len);
+
+ uint8_t key[32];
+ memcpy(key,_clusterKey,32);
+ for(int i=0;i<8;++i) key[i] ^= buf[i];
+ Salsa20 s20(key,buf + 8);
+
+ uint8_t macKey[32];
+ uint8_t mac[16];
+ memset(macKey,0,32);
+ s20.crypt12(macKey,macKey,32);
+ s20.crypt12(buf + 24,buf + 24,mlen - 24);
+ Poly1305::compute(mac,buf + 24,mlen - 24,macKey);
+ memcpy(buf + 16,mac,8);
+
+ c->writeq.append(reinterpret_cast<char *>(buf),len + 34);
+ }
+ }
+ } catch ( ... ) {} // sanity check
+ delete [] buf;
+ }
+
// =========================================================================
// Handlers for Node and Phy<> callbacks
// =========================================================================
inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
{
-/*
-#ifdef ZT_ENABLE_CLUSTER
- if (sock == _clusterMessageSocket) {
- _lastDirectReceiveFromGlobal = OSUtils::now();
- _node->clusterHandleIncomingMessage(data,len);
- return;
- }
-#endif
-*/
-
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
_lastDirectReceiveFromGlobal = OSUtils::now();
@@ -1798,38 +1954,27 @@ public:
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)
{
- if (!success)
+ if (!success) {
+ phyOnTcpClose(sock,uptr);
return;
+ }
- // Outgoing TCP connections are always TCP fallback tunnel connections.
-
- TcpConnection *tc = new TcpConnection();
- _tcpConnections.insert(tc);
-
- tc->type = TcpConnection::TCP_TUNNEL_OUTGOING;
- tc->shouldKeepAlive = true;
- tc->parent = this;
+ TcpConnection *const tc = reinterpret_cast<TcpConnection *>(*uptr);
+ if (!tc) { // sanity check
+ _phy.close(sock,true);
+ return;
+ }
tc->sock = sock;
- // from and parser are not used
- tc->messageSize = 0; // unused
- tc->lastActivity = OSUtils::now();
- // HTTP stuff is not used
- tc->writeBuf = "";
- *uptr = (void *)tc;
-
- // Send "hello" message
- tc->writeBuf.push_back((char)0x17);
- tc->writeBuf.push_back((char)0x03);
- tc->writeBuf.push_back((char)0x03); // fake TLS 1.2 header
- tc->writeBuf.push_back((char)0x00);
- tc->writeBuf.push_back((char)0x04); // mlen == 4
- tc->writeBuf.push_back((char)ZEROTIER_ONE_VERSION_MAJOR);
- tc->writeBuf.push_back((char)ZEROTIER_ONE_VERSION_MINOR);
- tc->writeBuf.push_back((char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff));
- tc->writeBuf.push_back((char)(ZEROTIER_ONE_VERSION_REVISION & 0xff));
- _phy.setNotifyWritable(sock,true);
-
- _tcpFallbackTunnel = tc;
+
+ if (tc->type == TcpConnection::TCP_TUNNEL_OUTGOING) {
+ if (_tcpFallbackTunnel)
+ _phy.close(_tcpFallbackTunnel->sock);
+ _tcpFallbackTunnel = tc;
+ _phy.streamSend(sock,ZT_TCP_TUNNEL_HELLO,sizeof(ZT_TCP_TUNNEL_HELLO));
+ } else if (tc->type == TcpConnection::TCP_CLUSTER_BACKPLANE) {
+ } else {
+ _phy.close(sock,true);
+ }
}
inline void phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from)
@@ -1839,149 +1984,313 @@ public:
return;
} else {
TcpConnection *tc = new TcpConnection();
- _tcpConnections.insert(tc);
- tc->type = TcpConnection::TCP_HTTP_INCOMING;
- tc->shouldKeepAlive = true;
+ {
+ Mutex::Lock _l(_tcpConnections_m);
+ _tcpConnections.push_back(tc);
+ }
+
+ tc->type = TcpConnection::TCP_UNCATEGORIZED_INCOMING;
tc->parent = this;
tc->sock = sockN;
tc->from = from;
+ tc->lastReceive = OSUtils::now();
http_parser_init(&(tc->parser),HTTP_REQUEST);
tc->parser.data = (void *)tc;
tc->messageSize = 0;
- tc->lastActivity = OSUtils::now();
- tc->currentHeaderField = "";
- tc->currentHeaderValue = "";
- tc->url = "";
- tc->status = "";
- tc->headers.clear();
- tc->body = "";
- tc->writeBuf = "";
+
*uptrN = (void *)tc;
}
}
- inline void phyOnTcpClose(PhySocket *sock,void **uptr)
+ void phyOnTcpClose(PhySocket *sock,void **uptr)
{
TcpConnection *tc = (TcpConnection *)*uptr;
if (tc) {
- if (tc == _tcpFallbackTunnel)
+ if (tc == _tcpFallbackTunnel) {
_tcpFallbackTunnel = (TcpConnection *)0;
- _tcpConnections.erase(tc);
+ }
+ {
+ Mutex::Lock _l(_tcpConnections_m);
+ _tcpConnections.erase(std::remove(_tcpConnections.begin(),_tcpConnections.end(),tc),_tcpConnections.end());
+ }
delete tc;
}
}
- inline void phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len)
+ void phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len)
{
- TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr);
- switch(tc->type) {
+ try {
+ if (!len) return; // sanity check, should never happen
+ TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr);
+ tc->lastReceive = OSUtils::now();
+ switch(tc->type) {
+
+ case TcpConnection::TCP_UNCATEGORIZED_INCOMING:
+ switch(reinterpret_cast<uint8_t *>(data)[0]) {
+ // 0x93 is first byte of cluster backplane connections
+ case 0x93: {
+ bool allow = false;
+ {
+ Mutex::Lock _l(_localConfig_m);
+ for(std::vector< InetAddress >::const_iterator i(_clusterBackplaneAddresses.begin());i!=_clusterBackplaneAddresses.end();++i) {
+ if (tc->from.ipsEqual(*i)) {
+ allow = true;
+ break;
+ }
+ }
+ }
+ if (allow) { // note that we also auth each packet cryptographically -- this is just a first line sanity check
+ tc->type = TcpConnection::TCP_CLUSTER_BACKPLANE;
+ tc->clusterMemberId = 0; // unknown, waiting for first status message
+ announceStatusToClusterMember(tc);
+ if (len > 1)
+ phyOnTcpData(sock,uptr,reinterpret_cast<uint8_t *>(data) + 1,len - 1);
+ } else {
+ _phy.close(sock);
+ }
+ } break;
+
+ // HTTP: GET, PUT, POST, HEAD
+ case 'G':
+ case 'P':
+ case 'H': {
+ bool allow;
+ {
+ Mutex::Lock _l(_localConfig_m);
+ if (_allowManagementFrom.size() == 0) {
+ allow = (tc->from.ipScope() == InetAddress::IP_SCOPE_LOOPBACK);
+ } else {
+ allow = false;
+ for(std::vector<InetAddress>::const_iterator i(_allowManagementFrom.begin());i!=_allowManagementFrom.end();++i) {
+ if (i->containsAddress(tc->from)) {
+ allow = true;
+ break;
+ }
+ }
+ }
+ }
+ if (allow) {
+ tc->type = TcpConnection::TCP_HTTP_INCOMING;
+ phyOnTcpData(sock,uptr,data,len);
+ } else {
+ _phy.close(sock);
+ }
+ } break;
+
+ // Drop unknown protocols
+ default:
+ _phy.close(sock);
+ break;
+ }
+ return;
- case TcpConnection::TCP_HTTP_INCOMING:
- case TcpConnection::TCP_HTTP_OUTGOING:
- http_parser_execute(&(tc->parser),&HTTP_PARSER_SETTINGS,(const char *)data,len);
- if ((tc->parser.upgrade)||(tc->parser.http_errno != HPE_OK)) {
- _phy.close(sock);
+ case TcpConnection::TCP_HTTP_INCOMING:
+ case TcpConnection::TCP_HTTP_OUTGOING:
+ http_parser_execute(&(tc->parser),&HTTP_PARSER_SETTINGS,(const char *)data,len);
+ if ((tc->parser.upgrade)||(tc->parser.http_errno != HPE_OK))
+ _phy.close(sock);
return;
- }
- break;
- case TcpConnection::TCP_TUNNEL_OUTGOING:
- tc->body.append((const char *)data,len);
- while (tc->body.length() >= 5) {
- const char *data = tc->body.data();
- const unsigned long mlen = ( ((((unsigned long)data[3]) & 0xff) << 8) | (((unsigned long)data[4]) & 0xff) );
- if (tc->body.length() >= (mlen + 5)) {
- InetAddress from;
+ case TcpConnection::TCP_TUNNEL_OUTGOING:
+ tc->readq.append((const char *)data,len);
+ while (tc->readq.length() >= 5) {
+ const char *data = tc->readq.data();
+ const unsigned long mlen = ( ((((unsigned long)data[3]) & 0xff) << 8) | (((unsigned long)data[4]) & 0xff) );
+ if (tc->readq.length() >= (mlen + 5)) {
+ InetAddress from;
unsigned long plen = mlen; // payload length, modified if there's an IP header
- data += 5; // skip forward past pseudo-TLS junk and mlen
- if (plen == 4) {
- // Hello message, which isn't sent by proxy and would be ignored by client
- } else if (plen) {
- // Messages should contain IPv4 or IPv6 source IP address data
- switch(data[0]) {
- case 4: // IPv4
- if (plen >= 7) {
- from.set((const void *)(data + 1),4,((((unsigned int)data[5]) & 0xff) << 8) | (((unsigned int)data[6]) & 0xff));
- data += 7; // type + 4 byte IP + 2 byte port
- plen -= 7;
- } else {
+ data += 5; // skip forward past pseudo-TLS junk and mlen
+ if (plen == 4) {
+ // Hello message, which isn't sent by proxy and would be ignored by client
+ } else if (plen) {
+ // Messages should contain IPv4 or IPv6 source IP address data
+ switch(data[0]) {
+ case 4: // IPv4
+ if (plen >= 7) {
+ from.set((const void *)(data + 1),4,((((unsigned int)data[5]) & 0xff) << 8) | (((unsigned int)data[6]) & 0xff));
+ data += 7; // type + 4 byte IP + 2 byte port
+ plen -= 7;
+ } else {
+ _phy.close(sock);
+ return;
+ }
+ break;
+ case 6: // IPv6
+ if (plen >= 19) {
+ from.set((const void *)(data + 1),16,((((unsigned int)data[17]) & 0xff) << 8) | (((unsigned int)data[18]) & 0xff));
+ data += 19; // type + 16 byte IP + 2 byte port
+ plen -= 19;
+ } else {
+ _phy.close(sock);
+ return;
+ }
+ break;
+ case 0: // none/omitted
+ ++data;
+ --plen;
+ break;
+ default: // invalid address type
_phy.close(sock);
return;
- }
- break;
- case 6: // IPv6
- if (plen >= 19) {
- from.set((const void *)(data + 1),16,((((unsigned int)data[17]) & 0xff) << 8) | (((unsigned int)data[18]) & 0xff));
- data += 19; // type + 16 byte IP + 2 byte port
- plen -= 19;
- } else {
+ }
+
+ if (from) {
+ InetAddress fakeTcpLocalInterfaceAddress((uint32_t)0xffffffff,0xffff);
+ const ZT_ResultCode rc = _node->processWirePacket(
+ (void *)0,
+ OSUtils::now(),
+ reinterpret_cast<struct sockaddr_storage *>(&fakeTcpLocalInterfaceAddress),
+ reinterpret_cast<struct sockaddr_storage *>(&from),
+ data,
+ plen,
+ &_nextBackgroundTaskDeadline);
+ if (ZT_ResultCode_isFatal(rc)) {
+ char tmp[256];
+ Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
+ Mutex::Lock _l(_termReason_m);
+ _termReason = ONE_UNRECOVERABLE_ERROR;
+ _fatalErrorMessage = tmp;
+ this->terminate();
_phy.close(sock);
return;
}
+ }
+ }
+
+ if (tc->readq.length() > (mlen + 5))
+ tc->readq.erase(tc->readq.begin(),tc->readq.begin() + (mlen + 5));
+ else tc->readq.clear();
+ } else break;
+ }
+ return;
+
+ case TcpConnection::TCP_CLUSTER_BACKPLANE:
+ tc->readq.append((const char *)data,len);
+ if (tc->readq.length() >= 28) { // got 3-byte message size + 16-byte IV + 8-byte MAC + 1-byte type (encrypted)
+ uint8_t *data = reinterpret_cast<uint8_t *>(const_cast<char *>(tc->readq.data()));
+ unsigned long mlen = ( ((unsigned long)data[0] << 16) | ((unsigned long)data[1] << 8) | (unsigned long)data[2] );
+ if ((mlen < 25)||(mlen > ZT_TCP_MAX_WRITEQ_SIZE)) {
+ _phy.close(sock);
+ return;
+ } else if (tc->readq.length() >= (mlen + 3)) { // got entire message
+ data += 3;
+
+ uint8_t key[32];
+ memcpy(key,_clusterKey,32);
+ for(int i=0;i<8;++i) key[i] ^= data[i]; // first 8 bytes of IV get XORed with key
+ Salsa20 s20(key,data + 8); // last 8 bytes of IV are fed into Salsa20 directly as its 64-bit IV
+
+ uint8_t macKey[32];
+ uint8_t mac[16];
+ memset(macKey,0,32);
+ s20.crypt12(macKey,macKey,32);
+ Poly1305::compute(mac,data + 24,mlen - 24,macKey);
+ if (!Utils::secureEq(mac,data + 16,8)) {
+ _phy.close(sock);
+ return;
+ }
+ s20.crypt12(data + 24,data + 24,mlen - 24);
+
+ switch((ClusterMessageType)data[24]) {
+ case CLUSTER_MESSAGE_STATUS:
+ if (mlen > (25 + 16)) {
+ Buffer<4096> tmp(data + 25,mlen - 25);
+ try {
+ tc->clusterMemberId = tmp.at<uint64_t>(0);
+ if (tc->clusterMemberId == _clusterMemberId) { // shouldn't happen, but don't allow self-to-self
+ _phy.close(sock);
+ return;
+ }
+ tc->clusterMemberVersionMajor = tmp.at<uint16_t>(8);
+ tc->clusterMemberVersionMinor = tmp.at<uint16_t>(10);
+ tc->clusterMemberVersionRev = tmp.at<uint16_t>(12);
+ const unsigned int clusterMemberLocalAddressCount = tmp.at<uint16_t>(14);
+ std::vector<InetAddress> la;
+ unsigned int ptr = 16;
+ for(unsigned int k=0;k<clusterMemberLocalAddressCount;++k) {
+ la.push_back(InetAddress());
+ ptr += la.back().deserialize(tmp,ptr);
+ }
+ tc->clusterMemberLocalAddresses.swap(la);
+ } catch ( ... ) {}
+ }
+ break;
+
+ case CLUSTER_MESSAGE_STATE_OBJECT:
+ if (mlen >= (25 + 9)) { // type + object ID + [data]
+ const uint64_t objId = (
+ ((uint64_t)data[26] << 56) |
+ ((uint64_t)data[27] << 48) |
+ ((uint64_t)data[28] << 40) |
+ ((uint64_t)data[29] << 32) |
+ ((uint64_t)data[30] << 24) |
+ ((uint64_t)data[31] << 16) |
+ ((uint64_t)data[32] << 8) |
+ (uint64_t)data[33]
+ );
+ if (_node->processStateUpdate((void *)0,(ZT_StateObjectType)data[25],objId,data + 34,(unsigned int)(mlen - 34)) == ZT_RESULT_OK)
+ replicateStateObjectToCluster((ZT_StateObjectType)data[25],objId,data + 34,(unsigned int)(mlen - 34),tc->clusterMemberId);
+ }
break;
- case 0: // none/omitted
- ++data;
- --plen;
+
+ case CLUSTER_MESSAGE_PROXY_SEND:
+ if (mlen > 25) {
+ Buffer<4096> tmp(data + 25,mlen - 25);
+ try {
+ InetAddress dest,src;
+ unsigned int ptr = dest.deserialize(tmp);
+ ptr += src.deserialize(tmp,ptr);
+ if (ptr < tmp.size()) {
+ bool local;
+ {
+ Mutex::Lock _l(_localInterfaceAddresses_m);
+ local = (std::find(_localInterfaceAddresses.begin(),_localInterfaceAddresses.end(),src) != _localInterfaceAddresses.end());
+ }
+ if (local)
+ nodeWirePacketSendFunction(&src,&dest,reinterpret_cast<const uint8_t *>(tmp.data()) + ptr,tmp.size() - ptr,0);
+ }
+ } catch ( ... ) {}
+ }
break;
- default: // invalid address type
- _phy.close(sock);
- return;
}
- if (from) {
- InetAddress fakeTcpLocalInterfaceAddress((uint32_t)0xffffffff,0xffff);
- const ZT_ResultCode rc = _node->processWirePacket(
- (void *)0,
- OSUtils::now(),
- reinterpret_cast<struct sockaddr_storage *>(&fakeTcpLocalInterfaceAddress),
- reinterpret_cast<struct sockaddr_storage *>(&from),
- data,
- plen,
- &_nextBackgroundTaskDeadline);
- if (ZT_ResultCode_isFatal(rc)) {
- char tmp[256];
- Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
- Mutex::Lock _l(_termReason_m);
- _termReason = ONE_UNRECOVERABLE_ERROR;
- _fatalErrorMessage = tmp;
- this->terminate();
- _phy.close(sock);
- return;
- }
- }
+ tc->readq.erase(tc->readq.begin(),tc->readq.begin() + mlen);
}
+ }
+ return;
- if (tc->body.length() > (mlen + 5))
- tc->body = tc->body.substr(mlen + 5);
- else tc->body = "";
- } else break;
- }
- break;
-
+ }
+ } catch ( ... ) {
+ _phy.close(sock);
}
}
inline void phyOnTcpWritable(PhySocket *sock,void **uptr)
{
TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr);
- Mutex::Lock _l(tc->writeBuf_m);
- if (tc->writeBuf.length() > 0) {
- long sent = (long)_phy.streamSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true);
- if (sent > 0) {
- tc->lastActivity = OSUtils::now();
- if ((unsigned long)sent >= (unsigned long)tc->writeBuf.length()) {
- tc->writeBuf.clear();
- _phy.setNotifyWritable(sock,false);
- if (!tc->shouldKeepAlive)
- _phy.close(sock); // will call close handler to delete from _tcpConnections
- } else {
- tc->writeBuf.erase(tc->writeBuf.begin(),tc->writeBuf.begin() + sent);
+ bool closeit = false;
+ {
+ Mutex::Lock _l(tc->writeq_m);
+ if (tc->writeq.length() > 0) {
+ long sent = (long)_phy.streamSend(sock,tc->writeq.data(),(unsigned long)tc->writeq.length(),true);
+ if (sent > 0) {
+ if ((unsigned long)sent >= (unsigned long)tc->writeq.length()) {
+ tc->writeq.clear();
+ _phy.setNotifyWritable(sock,false);
+
+ if (tc->type == TcpConnection::TCP_HTTP_INCOMING)
+ closeit = true; // HTTP keep alive not supported
+ } else {
+ tc->writeq.erase(tc->writeq.begin(),tc->writeq.begin() + sent);
+ }
}
+ } else {
+ _phy.setNotifyWritable(sock,false);
}
- } else {
- _phy.setNotifyWritable(sock,false);
}
+ if (closeit)
+ _phy.close(sock);
}
inline void phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) {}
@@ -2148,6 +2457,27 @@ public:
inline int nodeStateGetFunction(enum ZT_StateObjectType type,uint64_t id,void *data,unsigned int maxlen)
{
+ char p[4096];
+ FILE *f = (FILE *)0;
+ switch(type) {
+ case ZT_STATE_OBJECT_IDENTITY_PUBLIC:
+ break;
+ case ZT_STATE_OBJECT_IDENTITY_SECRET:
+ break;
+ case ZT_STATE_OBJECT_PEER_IDENTITY:
+ break;
+ case ZT_STATE_OBJECT_NETWORK_CONFIG:
+ break;
+ case ZT_STATE_OBJECT_PLANET:
+ break;
+ case ZT_STATE_OBJECT_MOON:
+ break;
+ default:
+ return -1;
+ }
+ if (f) {
+ }
+ return -1;
}
inline int nodeWirePacketSendFunction(const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl)
@@ -2178,28 +2508,41 @@ public:
const uint64_t now = OSUtils::now();
if (((now - _lastDirectReceiveFromGlobal) > ZT_TCP_FALLBACK_AFTER)&&((now - _lastRestart) > ZT_TCP_FALLBACK_AFTER)) {
if (_tcpFallbackTunnel) {
- Mutex::Lock _l(_tcpFallbackTunnel->writeBuf_m);
- if (!_tcpFallbackTunnel->writeBuf.length())
+ Mutex::Lock _l(_tcpFallbackTunnel->writeq_m);
+ if (_tcpFallbackTunnel->writeq.length() == 0)
_phy.setNotifyWritable(_tcpFallbackTunnel->sock,true);
- unsigned long mlen = len + 7;
- _tcpFallbackTunnel->writeBuf.push_back((char)0x17);
- _tcpFallbackTunnel->writeBuf.push_back((char)0x03);
- _tcpFallbackTunnel->writeBuf.push_back((char)0x03); // fake TLS 1.2 header
- _tcpFallbackTunnel->writeBuf.push_back((char)((mlen >> 8) & 0xff));
- _tcpFallbackTunnel->writeBuf.push_back((char)(mlen & 0xff));
- _tcpFallbackTunnel->writeBuf.push_back((char)4); // IPv4
- _tcpFallbackTunnel->writeBuf.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_addr.s_addr))),4);
- _tcpFallbackTunnel->writeBuf.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_port))),2);
- _tcpFallbackTunnel->writeBuf.append((const char *)data,len);
+ const unsigned long mlen = len + 7;
+ _tcpFallbackTunnel->writeq.push_back((char)0x17);
+ _tcpFallbackTunnel->writeq.push_back((char)0x03);
+ _tcpFallbackTunnel->writeq.push_back((char)0x03); // fake TLS 1.2 header
+ _tcpFallbackTunnel->writeq.push_back((char)((mlen >> 8) & 0xff));
+ _tcpFallbackTunnel->writeq.push_back((char)(mlen & 0xff));
+ _tcpFallbackTunnel->writeq.push_back((char)4); // IPv4
+ _tcpFallbackTunnel->writeq.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_addr.s_addr))),4);
+ _tcpFallbackTunnel->writeq.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_port))),2);
+ _tcpFallbackTunnel->writeq.append((const char *)data,len);
} else if (((now - _lastSendToGlobalV4) < ZT_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobalV4) > (ZT_PING_CHECK_INVERVAL / 2))) {
bool connected = false;
const InetAddress addr(ZT_TCP_FALLBACK_RELAY);
- _phy.tcpConnect(reinterpret_cast<const struct sockaddr *>(&addr),connected);
+
+ TcpConnection *tc = new TcpConnection();
+ {
+ Mutex::Lock _l(_tcpConnections_m);
+ _tcpConnections.push_back(tc);
+ }
+
+ tc->type = TcpConnection::TCP_TUNNEL_OUTGOING;
+ tc->parent = this;
+ tc->sock = (PhySocket *)0; // set in connect handler
+ tc->messageSize = 0;
+
+ _phy.tcpConnect(reinterpret_cast<const struct sockaddr *>(&addr),connected,(void *)tc,true);
}
}
_lastSendToGlobalV4 = now;
}
#endif // ZT_TCP_FALLBACK_RELAY
+
} else if (addr->ss_family == AF_INET6) {
if (reinterpret_cast<const struct sockaddr_in6 *>(localAddr)->sin6_port != 0) {
const uint16_t lp = reinterpret_cast<const struct sockaddr_in6 *>(localAddr)->sin6_port;
@@ -2298,39 +2641,22 @@ public:
inline void onHttpRequestToServer(TcpConnection *tc)
{
- char tmpn[256];
+ char tmpn[4096];
std::string data;
std::string contentType("text/plain"); // default if not changed in handleRequest()
unsigned int scode = 404;
- bool allow;
- {
- Mutex::Lock _l(_localConfig_m);
- if (_allowManagementFrom.size() == 0) {
- allow = (tc->from.ipScope() == InetAddress::IP_SCOPE_LOOPBACK);
- } else {
- allow = false;
- for(std::vector<InetAddress>::const_iterator i(_allowManagementFrom.begin());i!=_allowManagementFrom.end();++i) {
- if (i->containsAddress(tc->from)) {
- allow = true;
- break;
- }
- }
- }
- }
+ // Note that we check allowed IP ranges when HTTP connections are first detected in
+ // phyOnTcpData(). If we made it here the source IP is okay.
- if (allow) {
- try {
- scode = handleControlPlaneHttpRequest(tc->from,tc->parser.method,tc->url,tc->headers,tc->body,data,contentType);
- } catch (std::exception &exc) {
- fprintf(stderr,"WARNING: unexpected exception processing control HTTP request: %s" ZT_EOL_S,exc.what());
- scode = 500;
- } catch ( ... ) {
- fprintf(stderr,"WARNING: unexpected exception processing control HTTP request: unknown exceptino" ZT_EOL_S);
- scode = 500;
- }
- } else {
- scode = 401;
+ try {
+ scode = handleControlPlaneHttpRequest(tc->from,tc->parser.method,tc->url,tc->headers,tc->readq,data,contentType);
+ } catch (std::exception &exc) {
+ fprintf(stderr,"WARNING: unexpected exception processing control HTTP request: %s" ZT_EOL_S,exc.what());
+ scode = 500;
+ } catch ( ... ) {
+ fprintf(stderr,"WARNING: unexpected exception processing control HTTP request: unknown exceptino" ZT_EOL_S);
+ scode = 500;
}
const char *scodestr;
@@ -2346,19 +2672,16 @@ public:
default: scodestr = "Error"; break;
}
- Utils::snprintf(tmpn,sizeof(tmpn),"HTTP/1.1 %.3u %s\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n",scode,scodestr);
+ Utils::snprintf(tmpn,sizeof(tmpn),"HTTP/1.1 %.3u %s\r\nCache-Control: no-cache\r\nPragma: no-cache\r\nContent-Type: %s\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ scode,
+ scodestr,
+ contentType.c_str(),
+ (unsigned long)data.length());
{
- Mutex::Lock _l(tc->writeBuf_m);
- tc->writeBuf.assign(tmpn);
- tc->writeBuf.append("Content-Type: ");
- tc->writeBuf.append(contentType);
- Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length());
- tc->writeBuf.append(tmpn);
- if (!tc->shouldKeepAlive)
- tc->writeBuf.append("Connection: close\r\n");
- tc->writeBuf.append("\r\n");
+ Mutex::Lock _l(tc->writeq_m);
+ tc->writeq = tmpn;
if (tc->parser.method != HTTP_HEAD)
- tc->writeBuf.append(data);
+ tc->writeq.append(data);
}
_phy.setNotifyWritable(tc->sock,true);
@@ -2366,8 +2689,7 @@ public:
inline void onHttpResponseFromClient(TcpConnection *tc)
{
- if (!tc->shouldKeepAlive)
- _phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections
+ _phy.close(tc->sock);
}
bool shouldBindInterface(const char *ifname,const InetAddress &ifaddr)
@@ -2472,10 +2794,10 @@ static int ShttpOnMessageBegin(http_parser *parser)
tc->currentHeaderField = "";
tc->currentHeaderValue = "";
tc->messageSize = 0;
- tc->url = "";
- tc->status = "";
+ tc->url.clear();
+ tc->status.clear();
tc->headers.clear();
- tc->body = "";
+ tc->readq.clear();
return 0;
}
static int ShttpOnUrl(http_parser *parser,const char *ptr,size_t length)
@@ -2492,16 +2814,7 @@ static int ShttpOnStatus(http_parser *parser,const char *ptr,size_t length)
#else
static int ShttpOnStatus(http_parser *parser)
#endif
-{
- /*
- TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
- tc->messageSize += (unsigned long)length;
- if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE)
- return -1;
- tc->status.append(ptr,length);
- */
- return 0;
-}
+{ return 0; }
static int ShttpOnHeaderField(http_parser *parser,const char *ptr,size_t length)
{
TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
@@ -2539,14 +2852,12 @@ static int ShttpOnBody(http_parser *parser,const char *ptr,size_t length)
tc->messageSize += (unsigned long)length;
if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE)
return -1;
- tc->body.append(ptr,length);
+ tc->readq.append(ptr,length);
return 0;
}
static int ShttpOnMessageComplete(http_parser *parser)
{
TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
- tc->shouldKeepAlive = (http_should_keep_alive(parser) != 0);
- tc->lastActivity = OSUtils::now();
if (tc->type == TcpConnection::TCP_HTTP_INCOMING) {
tc->parent->onHttpRequestToServer(tc);
} else {