summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
Diffstat (limited to 'node')
-rw-r--r--node/Address.hpp4
-rw-r--r--node/Buffer.hpp53
-rw-r--r--node/Cluster.cpp1042
-rw-r--r--node/Cluster.hpp463
-rw-r--r--node/Constants.hpp6
-rw-r--r--node/Dictionary.hpp4
-rw-r--r--node/Hashtable.hpp7
-rw-r--r--node/IncomingPacket.cpp1
-rw-r--r--node/InetAddress.cpp8
-rw-r--r--node/MAC.hpp2
-rw-r--r--node/MulticastGroup.hpp2
-rw-r--r--node/Network.cpp59
-rw-r--r--node/Network.hpp3
-rw-r--r--node/NetworkConfig.cpp2
-rw-r--r--node/Node.cpp295
-rw-r--r--node/Node.hpp64
-rw-r--r--node/Peer.cpp1
-rw-r--r--node/Switch.cpp1
-rw-r--r--node/Topology.cpp70
-rw-r--r--node/Utils.cpp5
-rw-r--r--node/Utils.hpp3
21 files changed, 304 insertions, 1791 deletions
diff --git a/node/Address.hpp b/node/Address.hpp
index 9d2d1734..98e32858 100644
--- a/node/Address.hpp
+++ b/node/Address.hpp
@@ -144,7 +144,7 @@ public:
inline std::string toString() const
{
char buf[16];
- Utils::snprintf(buf,sizeof(buf),"%.10llx",(unsigned long long)_a);
+ Utils::ztsnprintf(buf,sizeof(buf),"%.10llx",(unsigned long long)_a);
return std::string(buf);
};
@@ -154,7 +154,7 @@ public:
*/
inline void toString(char *buf,unsigned int len) const
{
- Utils::snprintf(buf,len,"%.10llx",(unsigned long long)_a);
+ Utils::ztsnprintf(buf,len,"%.10llx",(unsigned long long)_a);
}
/**
diff --git a/node/Buffer.hpp b/node/Buffer.hpp
index 8e6b78fd..69ee1758 100644
--- a/node/Buffer.hpp
+++ b/node/Buffer.hpp
@@ -93,7 +93,6 @@ public:
}
Buffer(unsigned int l)
- throw(std::out_of_range)
{
if (l > C)
throw std::out_of_range("Buffer: construct with size larger than capacity");
@@ -102,51 +101,42 @@ public:
template<unsigned int C2>
Buffer(const Buffer<C2> &b)
- throw(std::out_of_range)
{
*this = b;
}
Buffer(const void *b,unsigned int l)
- throw(std::out_of_range)
{
copyFrom(b,l);
}
Buffer(const std::string &s)
- throw(std::out_of_range)
{
copyFrom(s.data(),s.length());
}
template<unsigned int C2>
inline Buffer &operator=(const Buffer<C2> &b)
- throw(std::out_of_range)
{
if (unlikely(b._l > C))
throw std::out_of_range("Buffer: assignment from buffer larger than capacity");
- memcpy(_b,b._b,_l = b._l);
- return *this;
- }
-
- inline Buffer &operator=(const std::string &s)
- throw(std::out_of_range)
- {
- copyFrom(s.data(),s.length());
+ if (C2 == C) {
+ memcpy(this,&b,sizeof(Buffer<C>));
+ } else {
+ memcpy(_b,b._b,_l = b._l);
+ }
return *this;
}
inline void copyFrom(const void *b,unsigned int l)
- throw(std::out_of_range)
{
if (unlikely(l > C))
throw std::out_of_range("Buffer: set from C array larger than capacity");
- _l = l;
memcpy(_b,b,l);
+ _l = l;
}
unsigned char operator[](const unsigned int i) const
- throw(std::out_of_range)
{
if (unlikely(i >= _l))
throw std::out_of_range("Buffer: [] beyond end of data");
@@ -154,7 +144,6 @@ public:
}
unsigned char &operator[](const unsigned int i)
- throw(std::out_of_range)
{
if (unlikely(i >= _l))
throw std::out_of_range("Buffer: [] beyond end of data");
@@ -175,14 +164,12 @@ public:
* @throws std::out_of_range Field extends beyond data size
*/
unsigned char *field(unsigned int i,unsigned int l)
- throw(std::out_of_range)
{
if (unlikely((i + l) > _l))
throw std::out_of_range("Buffer: field() beyond end of data");
return (unsigned char *)(_b + i);
}
const unsigned char *field(unsigned int i,unsigned int l) const
- throw(std::out_of_range)
{
if (unlikely((i + l) > _l))
throw std::out_of_range("Buffer: field() beyond end of data");
@@ -198,7 +185,6 @@ public:
*/
template<typename T>
inline void setAt(unsigned int i,const T v)
- throw(std::out_of_range)
{
if (unlikely((i + sizeof(T)) > _l))
throw std::out_of_range("Buffer: setAt() beyond end of data");
@@ -221,7 +207,6 @@ public:
*/
template<typename T>
inline T at(unsigned int i) const
- throw(std::out_of_range)
{
if (unlikely((i + sizeof(T)) > _l))
throw std::out_of_range("Buffer: at() beyond end of data");
@@ -248,7 +233,6 @@ public:
*/
template<typename T>
inline void append(const T v)
- throw(std::out_of_range)
{
if (unlikely((_l + sizeof(T)) > C))
throw std::out_of_range("Buffer: append beyond capacity");
@@ -271,7 +255,6 @@ public:
* @throws std::out_of_range Attempt to append beyond capacity
*/
inline void append(unsigned char c,unsigned int n)
- throw(std::out_of_range)
{
if (unlikely((_l + n) > C))
throw std::out_of_range("Buffer: append beyond capacity");
@@ -280,6 +263,19 @@ public:
}
/**
+ * Append secure random bytes
+ *
+ * @param n Number of random bytes to append
+ */
+ inline void appendRandom(unsigned int n)
+ {
+ if (unlikely((_l + n) > C))
+ throw std::out_of_range("Buffer: append beyond capacity");
+ Utils::getSecureRandom(_b + _l,n);
+ _l += n;
+ }
+
+ /**
* Append a C-array of bytes
*
* @param b Data
@@ -287,7 +283,6 @@ public:
* @throws std::out_of_range Attempt to append beyond capacity
*/
inline void append(const void *b,unsigned int l)
- throw(std::out_of_range)
{
if (unlikely((_l + l) > C))
throw std::out_of_range("Buffer: append beyond capacity");
@@ -302,7 +297,6 @@ public:
* @throws std::out_of_range Attempt to append beyond capacity
*/
inline void append(const std::string &s)
- throw(std::out_of_range)
{
append(s.data(),(unsigned int)s.length());
}
@@ -314,7 +308,6 @@ public:
* @throws std::out_of_range Attempt to append beyond capacity
*/
inline void appendCString(const char *s)
- throw(std::out_of_range)
{
for(;;) {
if (unlikely(_l >= C))
@@ -333,7 +326,6 @@ public:
*/
template<unsigned int C2>
inline void append(const Buffer<C2> &b)
- throw(std::out_of_range)
{
append(b._b,b._l);
}
@@ -349,7 +341,6 @@ public:
* @return Pointer to beginning of appended field of length 'l'
*/
inline char *appendField(unsigned int l)
- throw(std::out_of_range)
{
if (unlikely((_l + l) > C))
throw std::out_of_range("Buffer: append beyond capacity");
@@ -367,7 +358,6 @@ public:
* @throws std::out_of_range Capacity exceeded
*/
inline void addSize(unsigned int i)
- throw(std::out_of_range)
{
if (unlikely((i + _l) > C))
throw std::out_of_range("Buffer: setSize to larger than capacity");
@@ -383,7 +373,6 @@ public:
* @throws std::out_of_range Size larger than capacity
*/
inline void setSize(const unsigned int i)
- throw(std::out_of_range)
{
if (unlikely(i > C))
throw std::out_of_range("Buffer: setSize to larger than capacity");
@@ -397,7 +386,6 @@ public:
* @throw std::out_of_range Position is beyond size of buffer
*/
inline void behead(const unsigned int at)
- throw(std::out_of_range)
{
if (!at)
return;
@@ -414,7 +402,6 @@ public:
* @throw std::out_of_range Position plus length is beyond size of buffer
*/
inline void erase(const unsigned int at,const unsigned int length)
- throw(std::out_of_range)
{
const unsigned int endr = at + length;
if (unlikely(endr > _l))
@@ -495,8 +482,8 @@ public:
}
private:
- unsigned int _l;
char ZT_VAR_MAY_ALIAS _b[C];
+ unsigned int _l;
};
} // namespace ZeroTier
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
deleted file mode 100644
index 119aec29..00000000
--- a/node/Cluster.cpp
+++ /dev/null
@@ -1,1042 +0,0 @@
-/*
- * ZeroTier One - Network Virtualization Everywhere
- * Copyright (C) 2011-2017 ZeroTier, Inc. https://www.zerotier.com/
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * --
- *
- * You can be released from the requirements of the license by purchasing
- * a commercial license. Buying such a license is mandatory as soon as you
- * develop commercial closed-source software that incorporates or links
- * directly against ZeroTier software without disclosing the source code
- * of your own application.
- */
-
-#ifdef ZT_ENABLE_CLUSTER
-
-#include <stdint.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <math.h>
-
-#include <map>
-#include <algorithm>
-#include <set>
-#include <utility>
-#include <list>
-#include <stdexcept>
-
-#include "../version.h"
-
-#include "Cluster.hpp"
-#include "RuntimeEnvironment.hpp"
-#include "MulticastGroup.hpp"
-#include "CertificateOfMembership.hpp"
-#include "Salsa20.hpp"
-#include "Poly1305.hpp"
-#include "Identity.hpp"
-#include "Topology.hpp"
-#include "Packet.hpp"
-#include "Switch.hpp"
-#include "Node.hpp"
-#include "Network.hpp"
-#include "Array.hpp"
-
-namespace ZeroTier {
-
-static inline double _dist3d(int x1,int y1,int z1,int x2,int y2,int z2)
- throw()
-{
- double dx = ((double)x2 - (double)x1);
- double dy = ((double)y2 - (double)y1);
- double dz = ((double)z2 - (double)z1);
- return sqrt((dx * dx) + (dy * dy) + (dz * dz));
-}
-
-// An entry in _ClusterSendQueue
-struct _ClusterSendQueueEntry
-{
- uint64_t timestamp;
- Address fromPeerAddress;
- Address toPeerAddress;
- // if we ever support larger transport MTUs this must be increased
- unsigned char data[ZT_CLUSTER_SEND_QUEUE_DATA_MAX];
- unsigned int len;
- bool unite;
-};
-
-// A multi-index map with entry memory pooling -- this allows our queue to
-// be O(log(N)) and is complex enough that it makes the code a lot cleaner
-// to break it out from Cluster.
-class _ClusterSendQueue
-{
-public:
- _ClusterSendQueue() :
- _poolCount(0) {}
- ~_ClusterSendQueue() {} // memory is automatically freed when _chunks is destroyed
-
- inline void enqueue(uint64_t now,const Address &from,const Address &to,const void *data,unsigned int len,bool unite)
- {
- if (len > ZT_CLUSTER_SEND_QUEUE_DATA_MAX)
- return;
-
- Mutex::Lock _l(_lock);
-
- // Delete oldest queue entry for this sender if this enqueue() would take them over the per-sender limit
- {
- std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(from,(_ClusterSendQueueEntry *)0)));
- std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator oldest(qi);
- unsigned long countForSender = 0;
- while ((qi != _bySrc.end())&&(qi->first == from)) {
- if (qi->second->timestamp < oldest->second->timestamp)
- oldest = qi;
- ++countForSender;
- ++qi;
- }
- if (countForSender >= ZT_CLUSTER_MAX_QUEUE_PER_SENDER) {
- _byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(oldest->second->toPeerAddress,oldest->second));
- _pool[_poolCount++] = oldest->second;
- _bySrc.erase(oldest);
- }
- }
-
- _ClusterSendQueueEntry *e;
- if (_poolCount > 0) {
- e = _pool[--_poolCount];
- } else {
- if (_chunks.size() >= ZT_CLUSTER_MAX_QUEUE_CHUNKS)
- return; // queue is totally full!
- _chunks.push_back(Array<_ClusterSendQueueEntry,ZT_CLUSTER_QUEUE_CHUNK_SIZE>());
- e = &(_chunks.back().data[0]);
- for(unsigned int i=1;i<ZT_CLUSTER_QUEUE_CHUNK_SIZE;++i)
- _pool[_poolCount++] = &(_chunks.back().data[i]);
- }
-
- e->timestamp = now;
- e->fromPeerAddress = from;
- e->toPeerAddress = to;
- memcpy(e->data,data,len);
- e->len = len;
- e->unite = unite;
-
- _bySrc.insert(std::pair<Address,_ClusterSendQueueEntry *>(from,e));
- _byDest.insert(std::pair<Address,_ClusterSendQueueEntry *>(to,e));
- }
-
- inline void expire(uint64_t now)
- {
- Mutex::Lock _l(_lock);
- for(std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.begin());qi!=_bySrc.end();) {
- if ((now - qi->second->timestamp) > ZT_CLUSTER_QUEUE_EXPIRATION) {
- _byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(qi->second->toPeerAddress,qi->second));
- _pool[_poolCount++] = qi->second;
- _bySrc.erase(qi++);
- } else ++qi;
- }
- }
-
- /**
- * Get and dequeue entries for a given destination address
- *
- * After use these entries must be returned with returnToPool()!
- *
- * @param dest Destination address
- * @param results Array to fill with results
- * @param maxResults Size of results[] in pointers
- * @return Number of actual results returned
- */
- inline unsigned int getByDest(const Address &dest,_ClusterSendQueueEntry **results,unsigned int maxResults)
- {
- unsigned int count = 0;
- Mutex::Lock _l(_lock);
- std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_byDest.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(dest,(_ClusterSendQueueEntry *)0)));
- while ((qi != _byDest.end())&&(qi->first == dest)) {
- _bySrc.erase(std::pair<Address,_ClusterSendQueueEntry *>(qi->second->fromPeerAddress,qi->second));
- results[count++] = qi->second;
- if (count == maxResults)
- break;
- _byDest.erase(qi++);
- }
- return count;
- }
-
- /**
- * Return entries to pool after use
- *
- * @param entries Array of entries
- * @param count Number of entries
- */
- inline void returnToPool(_ClusterSendQueueEntry **entries,unsigned int count)
- {
- Mutex::Lock _l(_lock);
- for(unsigned int i=0;i<count;++i)
- _pool[_poolCount++] = entries[i];
- }
-
-private:
- std::list< Array<_ClusterSendQueueEntry,ZT_CLUSTER_QUEUE_CHUNK_SIZE> > _chunks;
- _ClusterSendQueueEntry *_pool[ZT_CLUSTER_QUEUE_CHUNK_SIZE * ZT_CLUSTER_MAX_QUEUE_CHUNKS];
- unsigned long _poolCount;
- std::set< std::pair<Address,_ClusterSendQueueEntry *> > _bySrc;
- std::set< std::pair<Address,_ClusterSendQueueEntry *> > _byDest;
- Mutex _lock;
-};
-
-Cluster::Cluster(
- const RuntimeEnvironment *renv,
- uint16_t id,
- const std::vector<InetAddress> &zeroTierPhysicalEndpoints,
- int32_t x,
- int32_t y,
- int32_t z,
- void (*sendFunction)(void *,unsigned int,const void *,unsigned int),
- void *sendFunctionArg,
- int (*addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *),
- void *addressToLocationFunctionArg) :
- RR(renv),
- _sendQueue(new _ClusterSendQueue()),
- _sendFunction(sendFunction),
- _sendFunctionArg(sendFunctionArg),
- _addressToLocationFunction(addressToLocationFunction),
- _addressToLocationFunctionArg(addressToLocationFunctionArg),
- _x(x),
- _y(y),
- _z(z),
- _id(id),
- _zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints),
- _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]),
- _lastFlushed(0),
- _lastCleanedRemotePeers(0),
- _lastCleanedQueue(0)
-{
- uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
-
- // Generate master secret by hashing the secret from our Identity key pair
- RR->identity.sha512PrivateKey(_masterSecret);
-
- // Generate our inbound message key, which is the master secret XORed with our ID and hashed twice
- memcpy(stmp,_masterSecret,sizeof(stmp));
- stmp[0] ^= Utils::hton(id);
- SHA512::hash(stmp,stmp,sizeof(stmp));
- SHA512::hash(stmp,stmp,sizeof(stmp));
- memcpy(_key,stmp,sizeof(_key));
- Utils::burn(stmp,sizeof(stmp));
-}
-
-Cluster::~Cluster()
-{
- Utils::burn(_masterSecret,sizeof(_masterSecret));
- Utils::burn(_key,sizeof(_key));
- delete [] _members;
- delete _sendQueue;
-}
-
-void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
-{
- Buffer<ZT_CLUSTER_MAX_MESSAGE_LENGTH> dmsg;
- {
- // FORMAT: <[16] iv><[8] MAC><... data>
- if ((len < 24)||(len > ZT_CLUSTER_MAX_MESSAGE_LENGTH))
- return;
-
- // 16-byte IV: first 8 bytes XORed with key, last 8 bytes used as Salsa20 64-bit IV
- char keytmp[32];
- memcpy(keytmp,_key,32);
- for(int i=0;i<8;++i)
- keytmp[i] ^= reinterpret_cast<const char *>(msg)[i];
- Salsa20 s20(keytmp,reinterpret_cast<const char *>(msg) + 8);
- Utils::burn(keytmp,sizeof(keytmp));
-
- // One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
- char polykey[ZT_POLY1305_KEY_LEN];
- memset(polykey,0,sizeof(polykey));
- s20.crypt12(polykey,polykey,sizeof(polykey));
-
- // Compute 16-byte MAC
- char mac[ZT_POLY1305_MAC_LEN];
- Poly1305::compute(mac,reinterpret_cast<const char *>(msg) + 24,len - 24,polykey);
-
- // Check first 8 bytes of MAC against 64-bit MAC in stream
- if (!Utils::secureEq(mac,reinterpret_cast<const char *>(msg) + 16,8))
- return;
-
- // Decrypt!
- dmsg.setSize(len - 24);
- s20.crypt12(reinterpret_cast<const char *>(msg) + 24,const_cast<void *>(dmsg.data()),dmsg.size());
- }
-
- if (dmsg.size() < 4)
- return;
- const uint16_t fromMemberId = dmsg.at<uint16_t>(0);
- unsigned int ptr = 2;
- if (fromMemberId == _id) // sanity check: we don't talk to ourselves
- return;
- const uint16_t toMemberId = dmsg.at<uint16_t>(ptr);
- ptr += 2;
- if (toMemberId != _id) // sanity check: message not for us?
- return;
-
- { // make sure sender is actually considered a member
- Mutex::Lock _l3(_memberIds_m);
- if (std::find(_memberIds.begin(),_memberIds.end(),fromMemberId) == _memberIds.end())
- return;
- }
-
- try {
- while (ptr < dmsg.size()) {
- const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
- const unsigned int nextPtr = ptr + mlen;
- if (nextPtr > dmsg.size())
- break;
-
- int mtype = -1;
- try {
- switch((StateMessageType)(mtype = (int)dmsg[ptr++])) {
- default:
- break;
-
- case CLUSTER_MESSAGE_ALIVE: {
- _Member &m = _members[fromMemberId];
- Mutex::Lock mlck(m.lock);
- ptr += 7; // skip version stuff, not used yet
- m.x = dmsg.at<int32_t>(ptr); ptr += 4;
- m.y = dmsg.at<int32_t>(ptr); ptr += 4;
- m.z = dmsg.at<int32_t>(ptr); ptr += 4;
- ptr += 8; // skip local clock, not used
- m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
- m.peers = dmsg.at<uint64_t>(ptr); ptr += 8;
- ptr += 8; // skip flags, unused
-#ifdef ZT_TRACE
- std::string addrs;
-#endif
- unsigned int physicalAddressCount = dmsg[ptr++];
- m.zeroTierPhysicalEndpoints.clear();
- for(unsigned int i=0;i<physicalAddressCount;++i) {
- m.zeroTierPhysicalEndpoints.push_back(InetAddress());
- ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr);
- if (!(m.zeroTierPhysicalEndpoints.back())) {
- m.zeroTierPhysicalEndpoints.pop_back();
- }
-#ifdef ZT_TRACE
- else {
- if (addrs.length() > 0)
- addrs.push_back(',');
- addrs.append(m.zeroTierPhysicalEndpoints.back().toString());
- }
-#endif
- }
-#ifdef ZT_TRACE
- if ((RR->node->now() - m.lastReceivedAliveAnnouncement) >= ZT_CLUSTER_TIMEOUT) {
- TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str());
- }
-#endif
- m.lastReceivedAliveAnnouncement = RR->node->now();
- } break;
-
- case CLUSTER_MESSAGE_HAVE_PEER: {
- Identity id;
- ptr += id.deserialize(dmsg,ptr);
- if (id) {
- {
- Mutex::Lock _l(_remotePeers_m);
- _RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)];
- if (!rp.lastHavePeerReceived) {
- RR->topology->saveIdentity((void *)0,id);
- RR->identity.agree(id,rp.key,ZT_PEER_SECRET_KEY_LENGTH);
- }
- rp.lastHavePeerReceived = RR->node->now();
- }
-
- _ClusterSendQueueEntry *q[16384]; // 16384 is "tons"
- unsigned int qc = _sendQueue->getByDest(id.address(),q,16384);
- for(unsigned int i=0;i<qc;++i)
- this->relayViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite);
- _sendQueue->returnToPool(q,qc);
-
- TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),qc);
- }
- } break;
-
- case CLUSTER_MESSAGE_WANT_PEER: {
- const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
- SharedPtr<Peer> peer(RR->topology->getPeerNoCache(zeroTierAddress));
- if ( (peer) && (peer->hasLocalClusterOptimalPath(RR->node->now())) ) {
- Buffer<1024> buf;
- peer->identity().serialize(buf);
- Mutex::Lock _l2(_members[fromMemberId].lock);
- _send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size());
- }
- } break;
-
- case CLUSTER_MESSAGE_REMOTE_PACKET: {
- const unsigned int plen = dmsg.at<uint16_t>(ptr); ptr += 2;
- if (plen) {
- Packet remotep(dmsg.field(ptr,plen),plen); ptr += plen;
- //TRACE("remote %s from %s via %u (%u bytes)",Packet::verbString(remotep.verb()),remotep.source().toString().c_str(),fromMemberId,plen);
- switch(remotep.verb()) {
- case Packet::VERB_WHOIS: _doREMOTE_WHOIS(fromMemberId,remotep); break;
- case Packet::VERB_MULTICAST_GATHER: _doREMOTE_MULTICAST_GATHER(fromMemberId,remotep); break;
- default: break; // ignore things we don't care about across cluster
- }
- }
- } break;
-
- case CLUSTER_MESSAGE_PROXY_UNITE: {
- const Address localPeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
- const Address remotePeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
- const unsigned int numRemotePeerPaths = dmsg[ptr++];
- InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
- for(unsigned int i=0;i<numRemotePeerPaths;++i)
- ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
-
- TRACE("[%u] requested that we unite local %s with remote %s",(unsigned int)fromMemberId,localPeerAddress.toString().c_str(),remotePeerAddress.toString().c_str());
-
- const uint64_t now = RR->node->now();
- SharedPtr<Peer> localPeer(RR->topology->getPeerNoCache(localPeerAddress));
- if ((localPeer)&&(numRemotePeerPaths > 0)) {
- InetAddress bestLocalV4,bestLocalV6;
- localPeer->getRendezvousAddresses(now,bestLocalV4,bestLocalV6);
-
- InetAddress bestRemoteV4,bestRemoteV6;
- for(unsigned int i=0;i<numRemotePeerPaths;++i) {
- if ((bestRemoteV4)&&(bestRemoteV6))
- break;
- switch(remotePeerPaths[i].ss_family) {
- case AF_INET:
- if (!bestRemoteV4)
- bestRemoteV4 = remotePeerPaths[i];
- break;
- case AF_INET6:
- if (!bestRemoteV6)
- bestRemoteV6 = remotePeerPaths[i];
- break;
- }
- }
-
- Packet rendezvousForLocal(localPeerAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
- rendezvousForLocal.append((uint8_t)0);
- remotePeerAddress.appendTo(rendezvousForLocal);
-
- Buffer<2048> rendezvousForRemote;
- remotePeerAddress.appendTo(rendezvousForRemote);
- rendezvousForRemote.append((uint8_t)Packet::VERB_RENDEZVOUS);
- rendezvousForRemote.addSize(2); // space for actual packet payload length
- rendezvousForRemote.append((uint8_t)0); // flags == 0
- localPeerAddress.appendTo(rendezvousForRemote);
-
- bool haveMatch = false;
- if ((bestLocalV6)&&(bestRemoteV6)) {
- haveMatch = true;
-
- rendezvousForLocal.append((uint16_t)bestRemoteV6.port());
- rendezvousForLocal.append((uint8_t)16);
- rendezvousForLocal.append(bestRemoteV6.rawIpData(),16);
-
- rendezvousForRemote.append((uint16_t)bestLocalV6.port());
- rendezvousForRemote.append((uint8_t)16);
- rendezvousForRemote.append(bestLocalV6.rawIpData(),16);
- rendezvousForRemote.setAt<uint16_t>(ZT_ADDRESS_LENGTH + 1,(uint16_t)(9 + 16));
- } else if ((bestLocalV4)&&(bestRemoteV4)) {
- haveMatch = true;
-
- rendezvousForLocal.append((uint16_t)bestRemoteV4.port());
- rendezvousForLocal.append((uint8_t)4);
- rendezvousForLocal.append(bestRemoteV4.rawIpData(),4);
-
- rendezvousForRemote.append((uint16_t)bestLocalV4.port());
- rendezvousForRemote.append((uint8_t)4);
- rendezvousForRemote.append(bestLocalV4.rawIpData(),4);
- rendezvousForRemote.setAt<uint16_t>(ZT_ADDRESS_LENGTH + 1,(uint16_t)(9 + 4));
- }
-
- if (haveMatch) {
- {
- Mutex::Lock _l2(_members[fromMemberId].lock);
- _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
- }
- RR->sw->send((void *)0,rendezvousForLocal,true);
- }
- }
- } break;
-
- case CLUSTER_MESSAGE_PROXY_SEND: {
- const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
- const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
- const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
- Packet outp(rcpt,RR->identity.address(),verb);
- outp.append(dmsg.field(ptr,len),len); ptr += len;
- RR->sw->send((void *)0,outp,true);
- //TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
- } break;
-
- case CLUSTER_MESSAGE_NETWORK_CONFIG: {
- const SharedPtr<Network> network(RR->node->network(dmsg.at<uint64_t>(ptr)));
- if (network) {
- // Copy into a Packet just to conform to Network API. Eventually
- // will want to refactor.
- network->handleConfigChunk((void *)0,0,Address(),Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(dmsg),ptr);
- }
- } break;
- }
- } catch ( ... ) {
- TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
- // drop invalids
- }
-
- ptr = nextPtr;
- }
- } catch ( ... ) {
- TRACE("invalid message (outer loop), discarding");
- // drop invalids
- }
-}
-
-void Cluster::broadcastHavePeer(const Identity &id)
-{
- Buffer<1024> buf;
- id.serialize(buf);
- Mutex::Lock _l(_memberIds_m);
- for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
- Mutex::Lock _l2(_members[*mid].lock);
- _send(*mid,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size());
- }
-}
-
-void Cluster::broadcastNetworkConfigChunk(const void *chunk,unsigned int len)
-{
- Mutex::Lock _l(_memberIds_m);
- for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
- Mutex::Lock _l2(_members[*mid].lock);
- _send(*mid,CLUSTER_MESSAGE_NETWORK_CONFIG,chunk,len);
- }
-}
-
-int Cluster::checkSendViaCluster(const Address &toPeerAddress,uint64_t &mostRecentTs,void *peerSecret)
-{
- const uint64_t now = RR->node->now();
- mostRecentTs = 0;
- int mostRecentMemberId = -1;
- {
- Mutex::Lock _l2(_remotePeers_m);
- std::map< std::pair<Address,unsigned int>,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
- for(;;) {
- if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress))
- break;
- else if (rpe->second.lastHavePeerReceived > mostRecentTs) {
- mostRecentTs = rpe->second.lastHavePeerReceived;
- memcpy(peerSecret,rpe->second.key,ZT_PEER_SECRET_KEY_LENGTH);
- mostRecentMemberId = (int)rpe->first.second;
- }
- ++rpe;
- }
- }
-
- const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs;
- if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) {
- if (ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT)
- mostRecentMemberId = -1;
-
- bool sendWantPeer = true;
- {
- Mutex::Lock _l(_remotePeers_m);
- _RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(toPeerAddress,(unsigned int)_id)];
- if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) {
- rp.lastSentWantPeer = now;
- } else {
- sendWantPeer = false; // don't flood WANT_PEER
- }
- }
- if (sendWantPeer) {
- char tmp[ZT_ADDRESS_LENGTH];
- toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH);
- {
- Mutex::Lock _l(_memberIds_m);
- for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
- Mutex::Lock _l2(_members[*mid].lock);
- _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
- }
- }
- }
- }
-
- return mostRecentMemberId;
-}
-
-bool Cluster::sendViaCluster(int mostRecentMemberId,const Address &toPeerAddress,const void *data,unsigned int len)
-{
- if ((mostRecentMemberId < 0)||(mostRecentMemberId >= ZT_CLUSTER_MAX_MEMBERS)) // sanity check
- return false;
- Mutex::Lock _l2(_members[mostRecentMemberId].lock);
- for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
- for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
- if (i1->ss_family == i2->ss_family) {
- TRACE("sendViaCluster sending %u bytes to %s by way of %u (%s->%s)",len,toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
- RR->node->putPacket((void *)0,*i1,*i2,data,len);
- return true;
- }
- }
- }
- return false;
-}
-
-void Cluster::relayViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite)
-{
- if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check
- return;
-
- const uint64_t now = RR->node->now();
-
- uint64_t mostRecentTs = 0;
- int mostRecentMemberId = -1;
- {
- Mutex::Lock _l2(_remotePeers_m);
- std::map< std::pair<Address,unsigned int>,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
- for(;;) {
- if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress))
- break;
- else if (rpe->second.lastHavePeerReceived > mostRecentTs) {
- mostRecentTs = rpe->second.lastHavePeerReceived;
- mostRecentMemberId = (int)rpe->first.second;
- }
- ++rpe;
- }
- }
-
- const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs;
- if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) {
- // Enqueue and wait if peer seems alive, but do WANT_PEER to refresh homing
- const bool enqueueAndWait = ((ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId < 0));
-
- // Poll everyone with WANT_PEER if the age of our most recent entry is
- // approaching expiration (or has expired, or does not exist).
- bool sendWantPeer = true;
- {
- Mutex::Lock _l(_remotePeers_m);
- _RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(toPeerAddress,(unsigned int)_id)];
- if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) {
- rp.lastSentWantPeer = now;
- } else {
- sendWantPeer = false; // don't flood WANT_PEER
- }
- }
- if (sendWantPeer) {
- char tmp[ZT_ADDRESS_LENGTH];
- toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH);
- {
- Mutex::Lock _l(_memberIds_m);
- for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
- Mutex::Lock _l2(_members[*mid].lock);
- _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
- }
- }
- }
-
- // If there isn't a good place to send via, then enqueue this for retrying
- // later and return after having broadcasted a WANT_PEER.
- if (enqueueAndWait) {
- TRACE("relayViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
- _sendQueue->enqueue(now,fromPeerAddress,toPeerAddress,data,len,unite);
- return;
- }
- }
-
- if (mostRecentMemberId >= 0) {
- Buffer<1024> buf;
- if (unite) {
- InetAddress v4,v6;
- if (fromPeerAddress) {
- SharedPtr<Peer> fromPeer(RR->topology->getPeerNoCache(fromPeerAddress));
- if (fromPeer)
- fromPeer->getRendezvousAddresses(now,v4,v6);
- }
- uint8_t addrCount = 0;
- if (v4)
- ++addrCount;
- if (v6)
- ++addrCount;
- if (addrCount) {
- toPeerAddress.appendTo(buf);
- fromPeerAddress.appendTo(buf);
- buf.append(addrCount);
- if (v4)
- v4.serialize(buf);
- if (v6)
- v6.serialize(buf);
- }
- }
-
- {
- Mutex::Lock _l2(_members[mostRecentMemberId].lock);
- if (buf.size() > 0)
- _send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
-
- for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
- for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
- if (i1->ss_family == i2->ss_family) {
- TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
- RR->node->putPacket((void *)0,*i1,*i2,data,len);
- return;
- }
- }
- }
-
- TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
- }
- }
-}
-
-void Cluster::sendDistributedQuery(const Packet &pkt)
-{
- Buffer<4096> buf;
- buf.append((uint16_t)pkt.size());
- buf.append(pkt.data(),pkt.size());
- Mutex::Lock _l(_memberIds_m);
- for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
- Mutex::Lock _l2(_members[*mid].lock);
- _send(*mid,CLUSTER_MESSAGE_REMOTE_PACKET,buf.data(),buf.size());
- }
-}
-
-void Cluster::doPeriodicTasks()
-{
- const uint64_t now = RR->node->now();
-
- if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) {
- _lastFlushed = now;
-
- Mutex::Lock _l(_memberIds_m);
- for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
- Mutex::Lock _l2(_members[*mid].lock);
-
- if ((now - _members[*mid].lastAnnouncedAliveTo) >= ((ZT_CLUSTER_TIMEOUT / 2) - 1000)) {
- _members[*mid].lastAnnouncedAliveTo = now;
-
- Buffer<2048> alive;
- alive.append((uint16_t)ZEROTIER_ONE_VERSION_MAJOR);
- alive.append((uint16_t)ZEROTIER_ONE_VERSION_MINOR);
- alive.append((uint16_t)ZEROTIER_ONE_VERSION_REVISION);
- alive.append((uint8_t)ZT_PROTO_VERSION);
- if (_addressToLocationFunction) {
- alive.append((int32_t)_x);
- alive.append((int32_t)_y);
- alive.append((int32_t)_z);
- } else {
- alive.append((int32_t)0);
- alive.append((int32_t)0);
- alive.append((int32_t)0);
- }
- alive.append((uint64_t)now);
- alive.append((uint64_t)0); // TODO: compute and send load average
- alive.append((uint64_t)RR->topology->countActive(now));
- alive.append((uint64_t)0); // unused/reserved flags
- alive.append((uint8_t)_zeroTierPhysicalEndpoints.size());
- for(std::vector<InetAddress>::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe)
- pe->serialize(alive);
- _send(*mid,CLUSTER_MESSAGE_ALIVE,alive.data(),alive.size());
- }
-
- _flush(*mid);
- }
- }
-
- if ((now - _lastCleanedRemotePeers) >= (ZT_PEER_ACTIVITY_TIMEOUT * 2)) {
- _lastCleanedRemotePeers = now;
-
- Mutex::Lock _l(_remotePeers_m);
- for(std::map< std::pair<Address,unsigned int>,_RemotePeer >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) {
- if ((now - rp->second.lastHavePeerReceived) >= ZT_PEER_ACTIVITY_TIMEOUT)
- _remotePeers.erase(rp++);
- else ++rp;
- }
- }
-
- if ((now - _lastCleanedQueue) >= ZT_CLUSTER_QUEUE_EXPIRATION) {
- _lastCleanedQueue = now;
- _sendQueue->expire(now);
- }
-}
-
-void Cluster::addMember(uint16_t memberId)
-{
- if ((memberId >= ZT_CLUSTER_MAX_MEMBERS)||(memberId == _id))
- return;
-
- Mutex::Lock _l2(_members[memberId].lock);
-
- {
- Mutex::Lock _l(_memberIds_m);
- if (std::find(_memberIds.begin(),_memberIds.end(),memberId) != _memberIds.end())
- return;
- _memberIds.push_back(memberId);
- std::sort(_memberIds.begin(),_memberIds.end());
- }
-
- _members[memberId].clear();
-
- // Generate this member's message key from the master and its ID
- uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
- memcpy(stmp,_masterSecret,sizeof(stmp));
- stmp[0] ^= Utils::hton(memberId);
- SHA512::hash(stmp,stmp,sizeof(stmp));
- SHA512::hash(stmp,stmp,sizeof(stmp));
- memcpy(_members[memberId].key,stmp,sizeof(_members[memberId].key));
- Utils::burn(stmp,sizeof(stmp));
-
- // Prepare q
- _members[memberId].q.clear();
- char iv[16];
- Utils::getSecureRandom(iv,16);
- _members[memberId].q.append(iv,16);
- _members[memberId].q.addSize(8); // room for MAC
- _members[memberId].q.append((uint16_t)_id);
- _members[memberId].q.append((uint16_t)memberId);
-}
-
-void Cluster::removeMember(uint16_t memberId)
-{
- Mutex::Lock _l(_memberIds_m);
- std::vector<uint16_t> newMemberIds;
- for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
- if (*mid != memberId)
- newMemberIds.push_back(*mid);
- }
- _memberIds = newMemberIds;
-}
-
-bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload)
-{
- if (_addressToLocationFunction) {
- // Pick based on location if it can be determined
- int px = 0,py = 0,pz = 0;
- if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast<const struct sockaddr_storage *>(&peerPhysicalAddress),&px,&py,&pz) == 0) {
- TRACE("no geolocation data for %s",peerPhysicalAddress.toIpString().c_str());
- return false;
- }
-
- // Find member closest to this peer
- const uint64_t now = RR->node->now();
- std::vector<InetAddress> best;
- const double currentDistance = _dist3d(_x,_y,_z,px,py,pz);
- double bestDistance = (offload ? 2147483648.0 : currentDistance);
-#ifdef ZT_TRACE
- unsigned int bestMember = _id;
-#endif
- {
- Mutex::Lock _l(_memberIds_m);
- for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
- _Member &m = _members[*mid];
- Mutex::Lock _ml(m.lock);
-
- // Consider member if it's alive and has sent us a location and one or more physical endpoints to send peers to
- if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) {
- const double mdist = _dist3d(m.x,m.y,m.z,px,py,pz);
- if (mdist < bestDistance) {
- bestDistance = mdist;
-#ifdef ZT_TRACE
- bestMember = *mid;
-#endif
- best = m.zeroTierPhysicalEndpoints;
- }
- }
- }
- }
-
- // Redirect to a closer member if it has a ZeroTier endpoint address in the same ss_family
- for(std::vector<InetAddress>::const_iterator a(best.begin());a!=best.end();++a) {
- if (a->ss_family == peerPhysicalAddress.ss_family) {
- TRACE("%s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestDistance,bestMember,a->toString().c_str());
- redirectTo = *a;
- return true;
- }
- }
- TRACE("%s at [%d,%d,%d] is %f from us, no better endpoints found",peerAddress.toString().c_str(),px,py,pz,currentDistance);
- return false;
- } else {
- // TODO: pick based on load if no location info?
- return false;
- }
-}
-
-bool Cluster::isClusterPeerFrontplane(const InetAddress &ip) const
-{
- Mutex::Lock _l(_memberIds_m);
- for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
- Mutex::Lock _l2(_members[*mid].lock);
- for(std::vector<InetAddress>::const_iterator i2(_members[*mid].zeroTierPhysicalEndpoints.begin());i2!=_members[*mid].zeroTierPhysicalEndpoints.end();++i2) {
- if (ip == *i2)
- return true;
- }
- }
- return false;
-}
-
-void Cluster::status(ZT_ClusterStatus &status) const
-{
- const uint64_t now = RR->node->now();
- memset(&status,0,sizeof(ZT_ClusterStatus));
-
- status.myId = _id;
-
- {
- ZT_ClusterMemberStatus *const s = &(status.members[status.clusterSize++]);
- s->id = _id;
- s->alive = 1;
- s->x = _x;
- s->y = _y;
- s->z = _z;
- s->load = 0; // TODO
- s->peers = RR->topology->countActive(now);
- for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) {
- if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
- break;
- memcpy(&(s->zeroTierPhysicalEndpoints[s->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage));
- }
- }
-
- {
- Mutex::Lock _l1(_memberIds_m);
- for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
- if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check
- break;
-
- _Member &m = _members[*mid];
- Mutex::Lock ml(m.lock);
-
- ZT_ClusterMemberStatus *const s = &(status.members[status.clusterSize++]);
- s->id = *mid;
- s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement));
- s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0;
- s->x = m.x;
- s->y = m.y;
- s->z = m.z;
- s->load = m.load;
- s->peers = m.peers;
- for(std::vector<InetAddress>::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) {
- if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
- break;
- memcpy(&(s->zeroTierPhysicalEndpoints[s->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage));
- }
- }
- }
-}
-
-void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len)
-{
- if ((len + 3) > (ZT_CLUSTER_MAX_MESSAGE_LENGTH - (24 + 2 + 2))) // sanity check
- return;
- _Member &m = _members[memberId];
- // assumes m.lock is locked!
- if ((m.q.size() + len + 3) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)
- _flush(memberId);
- m.q.append((uint16_t)(len + 1));
- m.q.append((uint8_t)type);
- m.q.append(msg,len);
-}
-
-void Cluster::_flush(uint16_t memberId)
-{
- _Member &m = _members[memberId];
- // assumes m.lock is locked!
- if (m.q.size() > (24 + 2 + 2)) { // 16-byte IV + 8-byte MAC + 2 byte from-member-ID + 2 byte to-member-ID
- // Create key from member's key and IV
- char keytmp[32];
- memcpy(keytmp,m.key,32);
- for(int i=0;i<8;++i)
- keytmp[i] ^= m.q[i];
- Salsa20 s20(keytmp,m.q.field(8,8));
- Utils::burn(keytmp,sizeof(keytmp));
-
- // One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
- char polykey[ZT_POLY1305_KEY_LEN];
- memset(polykey,0,sizeof(polykey));
- s20.crypt12(polykey,polykey,sizeof(polykey));
-
- // Encrypt m.q in place
- s20.crypt12(reinterpret_cast<const char *>(m.q.data()) + 24,const_cast<char *>(reinterpret_cast<const char *>(m.q.data())) + 24,m.q.size() - 24);
-
- // Add MAC for authentication (encrypt-then-MAC)
- char mac[ZT_POLY1305_MAC_LEN];
- Poly1305::compute(mac,reinterpret_cast<const char *>(m.q.data()) + 24,m.q.size() - 24,polykey);
- memcpy(m.q.field(16,8),mac,8);
-
- // Send!
- _sendFunction(_sendFunctionArg,memberId,m.q.data(),m.q.size());
-
- // Prepare for more
- m.q.clear();
- char iv[16];
- Utils::getSecureRandom(iv,16);
- m.q.append(iv,16);
- m.q.addSize(8); // room for MAC
- m.q.append((uint16_t)_id); // from member ID
- m.q.append((uint16_t)memberId); // to member ID
- }
-}
-
-void Cluster::_doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep)
-{
- if (remotep.payloadLength() >= ZT_ADDRESS_LENGTH) {
- Identity queried(RR->topology->getIdentity((void *)0,Address(remotep.payload(),ZT_ADDRESS_LENGTH)));
- if (queried) {
- Buffer<1024> routp;
- remotep.source().appendTo(routp);
- routp.append((uint8_t)Packet::VERB_OK);
- routp.addSize(2); // space for length
- routp.append((uint8_t)Packet::VERB_WHOIS);
- routp.append(remotep.packetId());
- queried.serialize(routp);
- routp.setAt<uint16_t>(ZT_ADDRESS_LENGTH + 1,(uint16_t)(routp.size() - ZT_ADDRESS_LENGTH - 3));
-
- TRACE("responding to remote WHOIS from %s @ %u with identity of %s",remotep.source().toString().c_str(),(unsigned int)fromMemberId,queried.address().toString().c_str());
- Mutex::Lock _l2(_members[fromMemberId].lock);
- _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
- }
- }
-}
-
-void Cluster::_doREMOTE_MULTICAST_GATHER(uint64_t fromMemberId,const Packet &remotep)
-{
- const uint64_t nwid = remotep.at<uint64_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_NETWORK_ID);
- const MulticastGroup mg(MAC(remotep.field(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_MAC,6),6),remotep.at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_ADI));
- unsigned int gatherLimit = remotep.at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_GATHER_LIMIT);
- const Address remotePeerAddress(remotep.source());
-
- if (gatherLimit) {
- Buffer<ZT_PROTO_MAX_PACKET_LENGTH> routp;
- remotePeerAddress.appendTo(routp);
- routp.append((uint8_t)Packet::VERB_OK);
- routp.addSize(2); // space for length
- routp.append((uint8_t)Packet::VERB_MULTICAST_GATHER);
- routp.append(remotep.packetId());
- routp.append(nwid);
- mg.mac().appendTo(routp);
- routp.append((uint32_t)mg.adi());
-
- if (gatherLimit > ((ZT_CLUSTER_MAX_MESSAGE_LENGTH - 80) / 5))
- gatherLimit = ((ZT_CLUSTER_MAX_MESSAGE_LENGTH - 80) / 5);
- if (RR->mc->gather(remotePeerAddress,nwid,mg,routp,gatherLimit)) {
- routp.setAt<uint16_t>(ZT_ADDRESS_LENGTH + 1,(uint16_t)(routp.size() - ZT_ADDRESS_LENGTH - 3));
-
- TRACE("responding to remote MULTICAST_GATHER from %s @ %u with %u bytes",remotePeerAddress.toString().c_str(),(unsigned int)fromMemberId,routp.size());
- Mutex::Lock _l2(_members[fromMemberId].lock);
- _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
- }
- }
-}
-
-} // namespace ZeroTier
-
-#endif // ZT_ENABLE_CLUSTER
diff --git a/node/Cluster.hpp b/node/Cluster.hpp
deleted file mode 100644
index 74b091f5..00000000
--- a/node/Cluster.hpp
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
- * ZeroTier One - Network Virtualization Everywhere
- * Copyright (C) 2011-2017 ZeroTier, Inc. https://www.zerotier.com/
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * --
- *
- * You can be released from the requirements of the license by purchasing
- * a commercial license. Buying such a license is mandatory as soon as you
- * develop commercial closed-source software that incorporates or links
- * directly against ZeroTier software without disclosing the source code
- * of your own application.
- */
-
-#ifndef ZT_CLUSTER_HPP
-#define ZT_CLUSTER_HPP
-
-#ifdef ZT_ENABLE_CLUSTER
-
-#include <map>
-
-#include "Constants.hpp"
-#include "../include/ZeroTierOne.h"
-#include "Address.hpp"
-#include "InetAddress.hpp"
-#include "SHA512.hpp"
-#include "Utils.hpp"
-#include "Buffer.hpp"
-#include "Mutex.hpp"
-#include "SharedPtr.hpp"
-#include "Hashtable.hpp"
-#include "Packet.hpp"
-#include "SharedPtr.hpp"
-
-/**
- * Timeout for cluster members being considered "alive"
- *
- * A cluster member is considered dead and will no longer have peers
- * redirected to it if we have not heard a heartbeat in this long.
- */
-#define ZT_CLUSTER_TIMEOUT 5000
-
-/**
- * Desired period between doPeriodicTasks() in milliseconds
- */
-#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 20
-
-/**
- * How often to flush outgoing message queues (maximum interval)
- */
-#define ZT_CLUSTER_FLUSH_PERIOD ZT_CLUSTER_PERIODIC_TASK_PERIOD
-
-/**
- * Maximum number of queued outgoing packets per sender address
- */
-#define ZT_CLUSTER_MAX_QUEUE_PER_SENDER 16
-
-/**
- * Expiration time for send queue entries
- */
-#define ZT_CLUSTER_QUEUE_EXPIRATION 3000
-
-/**
- * Chunk size for allocating queue entries
- *
- * Queue entries are allocated in chunks of this many and are added to a pool.
- * ZT_CLUSTER_MAX_QUEUE_GLOBAL must be evenly divisible by this.
- */
-#define ZT_CLUSTER_QUEUE_CHUNK_SIZE 32
-
-/**
- * Maximum number of chunks to ever allocate
- *
- * This is a global sanity limit to prevent resource exhaustion attacks. It
- * works out to about 600mb of RAM. You'll never see this on a normal edge
- * node. We're unlikely to see this on a root server unless someone is DOSing
- * us. In that case cluster relaying will be affected but other functions
- * should continue to operate normally.
- */
-#define ZT_CLUSTER_MAX_QUEUE_CHUNKS 8194
-
-/**
- * Max data per queue entry
- */
-#define ZT_CLUSTER_SEND_QUEUE_DATA_MAX 1500
-
-/**
- * We won't send WANT_PEER to other members more than every (ms) per recipient
- */
-#define ZT_CLUSTER_WANT_PEER_EVERY 1000
-
-namespace ZeroTier {
-
-class RuntimeEnvironment;
-class MulticastGroup;
-class Peer;
-class Identity;
-
-// Internal class implemented inside Cluster.cpp
-class _ClusterSendQueue;
-
-/**
- * Multi-homing cluster state replication and packet relaying
- *
- * Multi-homing means more than one node sharing the same ZeroTier identity.
- * There is nothing in the protocol to prevent this, but to make it work well
- * requires the devices sharing an identity to cooperate and share some
- * information.
- *
- * There are three use cases we want to fulfill:
- *
- * (1) Multi-homing of root servers with handoff for efficient routing,
- * HA, and load balancing across many commodity nodes.
- * (2) Multi-homing of network controllers for the same reason.
- * (3) Multi-homing of nodes on virtual networks, such as domain servers
- * and other important endpoints.
- *
- * These use cases are in order of escalating difficulty. The initial
- * version of Cluster is aimed at satisfying the first, though you are
- * free to try #2 and #3.
- */
-class Cluster
-{
-public:
- /**
- * State message types
- */
- enum StateMessageType
- {
- CLUSTER_MESSAGE_NOP = 0,
-
- /**
- * This cluster member is alive:
- * <[2] version minor>
- * <[2] version major>
- * <[2] version revision>
- * <[1] protocol version>
- * <[4] X location (signed 32-bit)>
- * <[4] Y location (signed 32-bit)>
- * <[4] Z location (signed 32-bit)>
- * <[8] local clock at this member>
- * <[8] load average>
- * <[8] number of peers>
- * <[8] flags (currently unused, must be zero)>
- * <[1] number of preferred ZeroTier endpoints>
- * <[...] InetAddress(es) of preferred ZeroTier endpoint(s)>
- *
- * Cluster members constantly broadcast an alive heartbeat and will only
- * receive peer redirects if they've done so within the timeout.
- */
- CLUSTER_MESSAGE_ALIVE = 1,
-
- /**
- * Cluster member has this peer:
- * <[...] serialized identity of peer>
- *
- * This is typically sent in response to WANT_PEER but can also be pushed
- * to prepopulate if this makes sense.
- */
- CLUSTER_MESSAGE_HAVE_PEER = 2,
-
- /**
- * Cluster member wants this peer:
- * <[5] ZeroTier address of peer>
- *
- * Members that have a direct link to this peer will respond with
- * HAVE_PEER.
- */
- CLUSTER_MESSAGE_WANT_PEER = 3,
-
- /**
- * A remote packet that we should also possibly respond to:
- * <[2] 16-bit length of remote packet>
- * <[...] remote packet payload>
- *
- * Cluster members may relay requests by relaying the request packet.
- * These may include requests such as WHOIS and MULTICAST_GATHER. The
- * packet must be already decrypted, decompressed, and authenticated.
- *
- * This can only be used for small request packets as per the cluster
- * message size limit, but since these are the only ones in question
- * this is fine.
- *
- * If a response is generated it is sent via PROXY_SEND.
- */
- CLUSTER_MESSAGE_REMOTE_PACKET = 4,
-
- /**
- * Request that VERB_RENDEZVOUS be sent to a peer that we have:
- * <[5] ZeroTier address of peer on recipient's side>
- * <[5] ZeroTier address of peer on sender's side>
- * <[1] 8-bit number of sender's peer's active path addresses>
- * <[...] series of serialized InetAddresses of sender's peer's paths>
- *
- * This requests that we perform NAT-t introduction between a peer that
- * we have and one on the sender's side. The sender furnishes contact
- * info for its peer, and we send VERB_RENDEZVOUS to both sides: to ours
- * directly and with PROXY_SEND to theirs.
- */
- CLUSTER_MESSAGE_PROXY_UNITE = 5,
-
- /**
- * Request that a cluster member send a packet to a locally-known peer:
- * <[5] ZeroTier address of recipient>
- * <[1] packet verb>
- * <[2] length of packet payload>
- * <[...] packet payload>
- *
- * This differs from RELAY in that it requests the receiving cluster
- * member to actually compose a ZeroTier Packet from itself to the
- * provided recipient. RELAY simply says "please forward this blob."
- * RELAY is used to implement peer-to-peer relaying with RENDEZVOUS,
- * while PROXY_SEND is used to implement proxy sending (which right
- * now is only used to send RENDEZVOUS).
- */
- CLUSTER_MESSAGE_PROXY_SEND = 6,
-
- /**
- * Replicate a network config for a network we belong to:
- * <[...] network config chunk>
- *
- * This is used by clusters to avoid every member having to query
- * for the same netconf for networks all members belong to.
- *
- * The first field of a network config chunk is the network ID,
- * so this can be checked to look up the network on receipt.
- */
- CLUSTER_MESSAGE_NETWORK_CONFIG = 7
- };
-
- /**
- * Construct a new cluster
- */
- Cluster(
- const RuntimeEnvironment *renv,
- uint16_t id,
- const std::vector<InetAddress> &zeroTierPhysicalEndpoints,
- int32_t x,
- int32_t y,
- int32_t z,
- void (*sendFunction)(void *,unsigned int,const void *,unsigned int),
- void *sendFunctionArg,
- int (*addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *),
- void *addressToLocationFunctionArg);
-
- ~Cluster();
-
- /**
- * @return This cluster member's ID
- */
- inline uint16_t id() const throw() { return _id; }
-
- /**
- * Handle an incoming intra-cluster message
- *
- * @param data Message data
- * @param len Message length (max: ZT_CLUSTER_MAX_MESSAGE_LENGTH)
- */
- void handleIncomingStateMessage(const void *msg,unsigned int len);
-
- /**
- * Broadcast that we have a given peer
- *
- * This should be done when new peers are first contacted.
- *
- * @param id Identity of peer
- */
- void broadcastHavePeer(const Identity &id);
-
- /**
- * Broadcast a network config chunk to other members of cluster
- *
- * @param chunk Chunk data
- * @param len Length of chunk
- */
- void broadcastNetworkConfigChunk(const void *chunk,unsigned int len);
-
- /**
- * If the cluster has this peer, prepare the packet to send via cluster
- *
- * Note that outp is only armored (or modified at all) if the return value is a member ID.
- *
- * @param toPeerAddress Value of outp.destination(), simply to save additional lookup
- * @param ts Result: set to time of last HAVE_PEER from the cluster
- * @param peerSecret Result: Buffer to fill with peer secret on valid return value, must be at least ZT_PEER_SECRET_KEY_LENGTH bytes
- * @return -1 if cluster does not know this peer, or a member ID to pass to sendViaCluster()
- */
- int checkSendViaCluster(const Address &toPeerAddress,uint64_t &mostRecentTs,void *peerSecret);
-
- /**
- * Send data via cluster front plane (packet head or fragment)
- *
- * @param haveMemberId Member ID that has this peer as returned by prepSendviaCluster()
- * @param toPeerAddress Destination peer address
- * @param data Packet or packet fragment data
- * @param len Length of packet or fragment
- * @return True if packet was sent (and outp was modified via armoring)
- */
- bool sendViaCluster(int haveMemberId,const Address &toPeerAddress,const void *data,unsigned int len);
-
- /**
- * Relay a packet via the cluster
- *
- * This is used in the outgoing packet and relaying logic in Switch to
- * relay packets to other cluster members. It isn't PROXY_SEND-- that is
- * used internally in Cluster to send responses to peer queries.
- *
- * @param fromPeerAddress Source peer address (if known, should be NULL for fragments)
- * @param toPeerAddress Destination peer address
- * @param data Packet or packet fragment data
- * @param len Length of packet or fragment
- * @param unite If true, also request proxy unite across cluster
- */
- void relayViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite);
-
- /**
- * Send a distributed query to other cluster members
- *
- * Some queries such as WHOIS or MULTICAST_GATHER need a response from other
- * cluster members. Replies (if any) will be sent back to the peer via
- * PROXY_SEND across the cluster.
- *
- * @param pkt Packet to distribute
- */
- void sendDistributedQuery(const Packet &pkt);
-
- /**
- * Call every ~ZT_CLUSTER_PERIODIC_TASK_PERIOD milliseconds.
- */
- void doPeriodicTasks();
-
- /**
- * Add a member ID to this cluster
- *
- * @param memberId Member ID
- */
- void addMember(uint16_t memberId);
-
- /**
- * Remove a member ID from this cluster
- *
- * @param memberId Member ID to remove
- */
- void removeMember(uint16_t memberId);
-
- /**
- * Find a better cluster endpoint for this peer (if any)
- *
- * @param redirectTo InetAddress to be set to a better endpoint (if there is one)
- * @param peerAddress Address of peer to (possibly) redirect
- * @param peerPhysicalAddress Physical address of peer's current best path (where packet was most recently received or getBestPath()->address())
- * @param offload Always redirect if possible -- can be used to offload peers during shutdown
- * @return True if redirectTo was set to a new address, false if redirectTo was not modified
- */
- bool findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload);
-
- /**
- * @param ip Address to check
- * @return True if this is a cluster frontplane address (excluding our addresses)
- */
- bool isClusterPeerFrontplane(const InetAddress &ip) const;
-
- /**
- * Fill out ZT_ClusterStatus structure (from core API)
- *
- * @param status Reference to structure to hold result (anything there is replaced)
- */
- void status(ZT_ClusterStatus &status) const;
-
-private:
- void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len);
- void _flush(uint16_t memberId);
-
- void _doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep);
- void _doREMOTE_MULTICAST_GATHER(uint64_t fromMemberId,const Packet &remotep);
-
- // These are initialized in the constructor and remain immutable ------------
- uint16_t _masterSecret[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
- unsigned char _key[ZT_PEER_SECRET_KEY_LENGTH];
- const RuntimeEnvironment *RR;
- _ClusterSendQueue *const _sendQueue;
- void (*_sendFunction)(void *,unsigned int,const void *,unsigned int);
- void *_sendFunctionArg;
- int (*_addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *);
- void *_addressToLocationFunctionArg;
- const int32_t _x;
- const int32_t _y;
- const int32_t _z;
- const uint16_t _id;
- const std::vector<InetAddress> _zeroTierPhysicalEndpoints;
- // end immutable fields -----------------------------------------------------
-
- struct _Member
- {
- unsigned char key[ZT_PEER_SECRET_KEY_LENGTH];
-
- uint64_t lastReceivedAliveAnnouncement;
- uint64_t lastAnnouncedAliveTo;
-
- uint64_t load;
- uint64_t peers;
- int32_t x,y,z;
-
- std::vector<InetAddress> zeroTierPhysicalEndpoints;
-
- Buffer<ZT_CLUSTER_MAX_MESSAGE_LENGTH> q;
-
- Mutex lock;
-
- inline void clear()
- {
- lastReceivedAliveAnnouncement = 0;
- lastAnnouncedAliveTo = 0;
- load = 0;
- peers = 0;
- x = 0;
- y = 0;
- z = 0;
- zeroTierPhysicalEndpoints.clear();
- q.clear();
- }
-
- _Member() { this->clear(); }
- ~_Member() { Utils::burn(key,sizeof(key)); }
- };
- _Member *const _members;
-
- std::vector<uint16_t> _memberIds;
- Mutex _memberIds_m;
-
- struct _RemotePeer
- {
- _RemotePeer() : lastHavePeerReceived(0),lastSentWantPeer(0) {}
- ~_RemotePeer() { Utils::burn(key,ZT_PEER_SECRET_KEY_LENGTH); }
- uint64_t lastHavePeerReceived;
- uint64_t lastSentWantPeer;
- uint8_t key[ZT_PEER_SECRET_KEY_LENGTH]; // secret key from identity agreement
- };
- std::map< std::pair<Address,unsigned int>,_RemotePeer > _remotePeers; // we need ordered behavior and lower_bound here
- Mutex _remotePeers_m;
-
- uint64_t _lastFlushed;
- uint64_t _lastCleanedRemotePeers;
- uint64_t _lastCleanedQueue;
-};
-
-} // namespace ZeroTier
-
-#endif // ZT_ENABLE_CLUSTER
-
-#endif
diff --git a/node/Constants.hpp b/node/Constants.hpp
index 3974f0ec..fbbba76e 100644
--- a/node/Constants.hpp
+++ b/node/Constants.hpp
@@ -150,6 +150,12 @@
#endif
#endif
+#ifdef __WINDOWS__
+#define ZT_PACKED_STRUCT(D) __pragma(pack(push,1)) D __pragma(pack(pop))
+#else
+#define ZT_PACKED_STRUCT(D) D __attribute__((packed))
+#endif
+
/**
* Length of a ZeroTier address in bytes
*/
diff --git a/node/Dictionary.hpp b/node/Dictionary.hpp
index 4413d628..0b000f13 100644
--- a/node/Dictionary.hpp
+++ b/node/Dictionary.hpp
@@ -391,7 +391,7 @@ public:
inline bool add(const char *key,uint64_t value)
{
char tmp[32];
- Utils::snprintf(tmp,sizeof(tmp),"%llx",(unsigned long long)value);
+ Utils::ztsnprintf(tmp,sizeof(tmp),"%llx",(unsigned long long)value);
return this->add(key,tmp,-1);
}
@@ -401,7 +401,7 @@ public:
inline bool add(const char *key,const Address &a)
{
char tmp[32];
- Utils::snprintf(tmp,sizeof(tmp),"%.10llx",(unsigned long long)a.toInt());
+ Utils::ztsnprintf(tmp,sizeof(tmp),"%.10llx",(unsigned long long)a.toInt());
return this->add(key,tmp,-1);
}
diff --git a/node/Hashtable.hpp b/node/Hashtable.hpp
index c46ed68f..b702f608 100644
--- a/node/Hashtable.hpp
+++ b/node/Hashtable.hpp
@@ -374,12 +374,7 @@ private:
}
static inline unsigned long _hc(const uint64_t i)
{
- /* NOTE: this assumes that 'i' is evenly distributed, which is the case for
- * packet IDs and network IDs -- the two use cases in ZT for uint64_t keys.
- * These values are also greater than 0xffffffff so they'll map onto a full
- * bucket count just fine no matter what happens. Normally you'd want to
- * hash an integer key index in a hash table. */
- return (unsigned long)i;
+ return (unsigned long)(i ^ (i >> 32)); // good for network IDs and addresses
}
static inline unsigned long _hc(const uint32_t i)
{
diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp
index 9140c502..1d55c9f3 100644
--- a/node/IncomingPacket.cpp
+++ b/node/IncomingPacket.cpp
@@ -42,7 +42,6 @@
#include "Salsa20.hpp"
#include "SHA512.hpp"
#include "World.hpp"
-#include "Cluster.hpp"
#include "Node.hpp"
#include "CertificateOfMembership.hpp"
#include "CertificateOfRepresentation.hpp"
diff --git a/node/InetAddress.cpp b/node/InetAddress.cpp
index 0fbb2d68..17d7c72e 100644
--- a/node/InetAddress.cpp
+++ b/node/InetAddress.cpp
@@ -152,7 +152,7 @@ std::string InetAddress::toString() const
char buf[128];
switch(ss_family) {
case AF_INET:
- Utils::snprintf(buf,sizeof(buf),"%d.%d.%d.%d/%d",
+ Utils::ztsnprintf(buf,sizeof(buf),"%d.%d.%d.%d/%d",
(int)(reinterpret_cast<const unsigned char *>(&(reinterpret_cast<const struct sockaddr_in *>(this)->sin_addr.s_addr)))[0],
(int)(reinterpret_cast<const unsigned char *>(&(reinterpret_cast<const struct sockaddr_in *>(this)->sin_addr.s_addr)))[1],
(int)(reinterpret_cast<const unsigned char *>(&(reinterpret_cast<const struct sockaddr_in *>(this)->sin_addr.s_addr)))[2],
@@ -161,7 +161,7 @@ std::string InetAddress::toString() const
);
return std::string(buf);
case AF_INET6:
- Utils::snprintf(buf,sizeof(buf),"%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x/%d",
+ Utils::ztsnprintf(buf,sizeof(buf),"%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x/%d",
(int)(reinterpret_cast<const struct sockaddr_in6 *>(this)->sin6_addr.s6_addr[0]),
(int)(reinterpret_cast<const struct sockaddr_in6 *>(this)->sin6_addr.s6_addr[1]),
(int)(reinterpret_cast<const struct sockaddr_in6 *>(this)->sin6_addr.s6_addr[2]),
@@ -190,7 +190,7 @@ std::string InetAddress::toIpString() const
char buf[128];
switch(ss_family) {
case AF_INET:
- Utils::snprintf(buf,sizeof(buf),"%d.%d.%d.%d",
+ Utils::ztsnprintf(buf,sizeof(buf),"%d.%d.%d.%d",
(int)(reinterpret_cast<const unsigned char *>(&(reinterpret_cast<const struct sockaddr_in *>(this)->sin_addr.s_addr)))[0],
(int)(reinterpret_cast<const unsigned char *>(&(reinterpret_cast<const struct sockaddr_in *>(this)->sin_addr.s_addr)))[1],
(int)(reinterpret_cast<const unsigned char *>(&(reinterpret_cast<const struct sockaddr_in *>(this)->sin_addr.s_addr)))[2],
@@ -198,7 +198,7 @@ std::string InetAddress::toIpString() const
);
return std::string(buf);
case AF_INET6:
- Utils::snprintf(buf,sizeof(buf),"%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x",
+ Utils::ztsnprintf(buf,sizeof(buf),"%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x:%.2x%.2x",
(int)(reinterpret_cast<const struct sockaddr_in6 *>(this)->sin6_addr.s6_addr[0]),
(int)(reinterpret_cast<const struct sockaddr_in6 *>(this)->sin6_addr.s6_addr[1]),
(int)(reinterpret_cast<const struct sockaddr_in6 *>(this)->sin6_addr.s6_addr[2]),
diff --git a/node/MAC.hpp b/node/MAC.hpp
index e7717d99..db50aeb1 100644
--- a/node/MAC.hpp
+++ b/node/MAC.hpp
@@ -178,7 +178,7 @@ public:
*/
inline void toString(char *buf,unsigned int len) const
{
- Utils::snprintf(buf,len,"%.2x:%.2x:%.2x:%.2x:%.2x:%.2x",(int)(*this)[0],(int)(*this)[1],(int)(*this)[2],(int)(*this)[3],(int)(*this)[4],(int)(*this)[5]);
+ Utils::ztsnprintf(buf,len,"%.2x:%.2x:%.2x:%.2x:%.2x:%.2x",(int)(*this)[0],(int)(*this)[1],(int)(*this)[2],(int)(*this)[3],(int)(*this)[4],(int)(*this)[5]);
}
/**
diff --git a/node/MulticastGroup.hpp b/node/MulticastGroup.hpp
index 4240db67..7cbec2e0 100644
--- a/node/MulticastGroup.hpp
+++ b/node/MulticastGroup.hpp
@@ -100,7 +100,7 @@ public:
inline std::string toString() const
{
char buf[64];
- Utils::snprintf(buf,sizeof(buf),"%.2x%.2x%.2x%.2x%.2x%.2x/%.8lx",(unsigned int)_mac[0],(unsigned int)_mac[1],(unsigned int)_mac[2],(unsigned int)_mac[3],(unsigned int)_mac[4],(unsigned int)_mac[5],(unsigned long)_adi);
+ Utils::ztsnprintf(buf,sizeof(buf),"%.2x%.2x%.2x%.2x%.2x%.2x/%.8lx",(unsigned int)_mac[0],(unsigned int)_mac[1],(unsigned int)_mac[2],(unsigned int)_mac[3],(unsigned int)_mac[4],(unsigned int)_mac[5],(unsigned long)_adi);
return std::string(buf);
}
diff --git a/node/Network.cpp b/node/Network.cpp
index de2ea7d7..8c6f2ce8 100644
--- a/node/Network.cpp
+++ b/node/Network.cpp
@@ -42,7 +42,6 @@
#include "NetworkController.hpp"
#include "Node.hpp"
#include "Peer.hpp"
-#include "Cluster.hpp"
// Uncomment to make the rules engine dump trace info to stdout
//#define ZT_RULES_ENGINE_DEBUGGING 1
@@ -52,7 +51,7 @@ namespace ZeroTier {
namespace {
#ifdef ZT_RULES_ENGINE_DEBUGGING
-#define FILTER_TRACE(f,...) { Utils::snprintf(dpbuf,sizeof(dpbuf),f,##__VA_ARGS__); dlog.push_back(std::string(dpbuf)); }
+#define FILTER_TRACE(f,...) { Utils::ztsnprintf(dpbuf,sizeof(dpbuf),f,##__VA_ARGS__); dlog.push_back(std::string(dpbuf)); }
static const char *_rtn(const ZT_VirtualNetworkRuleType rt)
{
switch(rt) {
@@ -682,7 +681,7 @@ static _doZtFilterResult _doZtFilter(
const ZeroTier::MulticastGroup Network::BROADCAST(ZeroTier::MAC(0xffffffffffffULL),0);
-Network::Network(const RuntimeEnvironment *renv,void *tPtr,uint64_t nwid,void *uptr) :
+Network::Network(const RuntimeEnvironment *renv,void *tPtr,uint64_t nwid,void *uptr,const NetworkConfig *nconf) :
RR(renv),
_uPtr(uptr),
_id(nwid),
@@ -697,29 +696,30 @@ Network::Network(const RuntimeEnvironment *renv,void *tPtr,uint64_t nwid,void *u
for(int i=0;i<ZT_NETWORK_MAX_INCOMING_UPDATES;++i)
_incomingConfigChunks[i].ts = 0;
- char confn[128];
- Utils::snprintf(confn,sizeof(confn),"networks.d/%.16llx.conf",_id);
-
- bool gotConf = false;
- Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY> *dconf = new Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY>();
- NetworkConfig *nconf = new NetworkConfig();
- try {
- std::string conf(RR->node->dataStoreGet(tPtr,confn));
- if (conf.length()) {
- dconf->load(conf.c_str());
- if (nconf->fromDictionary(*dconf)) {
- this->setConfiguration(tPtr,*nconf,false);
- _lastConfigUpdate = 0; // we still want to re-request a new config from the network
- gotConf = true;
+ if (nconf) {
+ this->setConfiguration(tPtr,*nconf,false);
+ _lastConfigUpdate = 0; // still want to re-request since it's likely outdated
+ } else {
+ bool got = false;
+ Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY> *dict = new Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY>();
+ try {
+ int n = RR->node->stateObjectGet(tPtr,ZT_STATE_OBJECT_NETWORK_CONFIG,nwid,dict->unsafeData(),ZT_NETWORKCONFIG_DICT_CAPACITY - 1);
+ if (n > 1) {
+ NetworkConfig *nconf = new NetworkConfig();
+ try {
+ if (nconf->fromDictionary(*dict)) {
+ this->setConfiguration(tPtr,*nconf,false);
+ _lastConfigUpdate = 0; // still want to re-request an update since it's likely outdated
+ got = true;
+ }
+ } catch ( ... ) {}
+ delete nconf;
}
- }
- } catch ( ... ) {} // ignore invalids, we'll re-request
- delete nconf;
- delete dconf;
+ } catch ( ... ) {}
+ delete dict;
- if (!gotConf) {
- // Save a one-byte CR to persist membership while we request a real netconf
- RR->node->dataStorePut(tPtr,confn,"\n",1,false);
+ if (!got)
+ RR->node->stateObjectPut(tPtr,ZT_STATE_OBJECT_NETWORK_CONFIG,nwid,"\n",1);
}
if (!_portInitialized) {
@@ -735,12 +735,9 @@ Network::~Network()
ZT_VirtualNetworkConfig ctmp;
_externalConfig(&ctmp);
- char n[128];
if (_destroyed) {
- // This is done in Node::leave() so we can pass tPtr
+ // This is done in Node::leave() so we can pass tPtr properly
//RR->node->configureVirtualNetworkPort((void *)0,_id,&_uPtr,ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY,&ctmp);
- Utils::snprintf(n,sizeof(n),"networks.d/%.16llx.conf",_id);
- RR->node->dataStoreDelete((void *)0,n);
} else {
RR->node->configureVirtualNetworkPort((void *)0,_id,&_uPtr,ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DOWN,&ctmp);
}
@@ -1188,10 +1185,8 @@ int Network::setConfiguration(void *tPtr,const NetworkConfig &nconf,bool saveToD
if (saveToDisk) {
Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY> *d = new Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY>();
try {
- char n[64];
- Utils::snprintf(n,sizeof(n),"networks.d/%.16llx.conf",_id);
if (nconf.toDictionary(*d,false))
- RR->node->dataStorePut(tPtr,n,(const void *)d->data(),d->sizeBytes(),true);
+ RR->node->stateObjectPut(tPtr,ZT_STATE_OBJECT_NETWORK_CONFIG,_id,d->data(),d->sizeBytes());
} catch ( ... ) {}
delete d;
}
@@ -1266,7 +1261,7 @@ void Network::requestConfiguration(void *tPtr)
nconf->rules[13].t = (uint8_t)ZT_NETWORK_RULE_ACTION_DROP;
nconf->type = ZT_NETWORK_TYPE_PUBLIC;
- Utils::snprintf(nconf->name,sizeof(nconf->name),"adhoc-%.04x-%.04x",(int)startPortRange,(int)endPortRange);
+ Utils::ztsnprintf(nconf->name,sizeof(nconf->name),"adhoc-%.04x-%.04x",(int)startPortRange,(int)endPortRange);
this->setConfiguration(tPtr,*nconf,false);
delete nconf;
diff --git a/node/Network.hpp b/node/Network.hpp
index cce6c41f..454a3f20 100644
--- a/node/Network.hpp
+++ b/node/Network.hpp
@@ -88,8 +88,9 @@ public:
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
* @param nwid Network ID
* @param uptr Arbitrary pointer used by externally-facing API (for user use)
+ * @param nconf Network config, if known
*/
- Network(const RuntimeEnvironment *renv,void *tPtr,uint64_t nwid,void *uptr);
+ Network(const RuntimeEnvironment *renv,void *tPtr,uint64_t nwid,void *uptr,const NetworkConfig *nconf);
~Network();
diff --git a/node/NetworkConfig.cpp b/node/NetworkConfig.cpp
index c39f6cab..65101c3a 100644
--- a/node/NetworkConfig.cpp
+++ b/node/NetworkConfig.cpp
@@ -94,7 +94,7 @@ bool NetworkConfig::toDictionary(Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY> &d,b
if (ets.length() > 0)
ets.push_back(',');
char tmp2[16];
- Utils::snprintf(tmp2,sizeof(tmp2),"%x",et);
+ Utils::ztsnprintf(tmp2,sizeof(tmp2),"%x",et);
ets.append(tmp2);
}
et = 0;
diff --git a/node/Node.cpp b/node/Node.cpp
index 911c9c4b..ab49e63b 100644
--- a/node/Node.cpp
+++ b/node/Node.cpp
@@ -33,6 +33,7 @@
#include "../version.h"
#include "Constants.hpp"
+#include "SharedPtr.hpp"
#include "Node.hpp"
#include "RuntimeEnvironment.hpp"
#include "NetworkController.hpp"
@@ -44,7 +45,7 @@
#include "Address.hpp"
#include "Identity.hpp"
#include "SelfAwareness.hpp"
-#include "Cluster.hpp"
+#include "Network.hpp"
const struct sockaddr_storage ZT_SOCKADDR_NULL = {0};
@@ -58,6 +59,7 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,uint6
_RR(this),
RR(&_RR),
_uPtr(uptr),
+ _networks(8),
_now(now),
_lastPingCheck(0),
_lastHousekeepingRun(0)
@@ -74,20 +76,31 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,uint6
memset(_expectingRepliesTo,0,sizeof(_expectingRepliesTo));
memset(_lastIdentityVerification,0,sizeof(_lastIdentityVerification));
- std::string idtmp(dataStoreGet(tptr,"identity.secret"));
- if ((!idtmp.length())||(!RR->identity.fromString(idtmp))||(!RR->identity.hasPrivate())) {
- TRACE("identity.secret not found, generating...");
- RR->identity.generate();
- idtmp = RR->identity.toString(true);
- if (!dataStorePut(tptr,"identity.secret",idtmp,true))
- throw std::runtime_error("unable to write identity.secret");
+ char tmp[512];
+ std::string tmp2;
+ int n = stateObjectGet(tptr,ZT_STATE_OBJECT_IDENTITY_SECRET,0,tmp,sizeof(tmp) - 1);
+ if (n > 0) {
+ tmp[n] = (char)0;
+ if (!RR->identity.fromString(tmp))
+ n = -1;
}
- RR->publicIdentityStr = RR->identity.toString(false);
- RR->secretIdentityStr = RR->identity.toString(true);
- idtmp = dataStoreGet(tptr,"identity.public");
- if (idtmp != RR->publicIdentityStr) {
- if (!dataStorePut(tptr,"identity.public",RR->publicIdentityStr,false))
- throw std::runtime_error("unable to write identity.public");
+ if (n <= 0) {
+ RR->identity.generate();
+ tmp2 = RR->identity.toString(true);
+ stateObjectPut(tptr,ZT_STATE_OBJECT_IDENTITY_SECRET,RR->identity.address().toInt(),tmp2.data(),(unsigned int)tmp2.length());
+ tmp2 = RR->identity.toString(false);
+ stateObjectPut(tptr,ZT_STATE_OBJECT_IDENTITY_PUBLIC,RR->identity.address().toInt(),tmp2.data(),(unsigned int)tmp2.length());
+ } else {
+ n = stateObjectGet(tptr,ZT_STATE_OBJECT_IDENTITY_PUBLIC,RR->identity.address().toInt(),tmp,sizeof(tmp) - 1);
+ if (n > 0) {
+ tmp[n] = (char)0;
+ if (RR->identity.toString(false) != tmp)
+ n = -1;
+ }
+ if (n <= 0) {
+ tmp2 = RR->identity.toString(false);
+ stateObjectPut(tptr,ZT_STATE_OBJECT_IDENTITY_PUBLIC,RR->identity.address().toInt(),tmp2.data(),(unsigned int)tmp2.length());
+ }
}
try {
@@ -110,7 +123,7 @@ Node::~Node()
{
Mutex::Lock _l(_networks_m);
- _networks.clear(); // ensure that networks are destroyed before shutdow
+ _networks.clear(); // destroy all networks before shutdown
delete RR->sa;
delete RR->topology;
@@ -122,6 +135,97 @@ Node::~Node()
#endif
}
+ZT_ResultCode Node::processStateUpdate(
+ void *tptr,
+ ZT_StateObjectType type,
+ uint64_t id,
+ const void *data,
+ unsigned int len)
+{
+ ZT_ResultCode r = ZT_RESULT_OK_IGNORED;
+ switch(type) {
+
+ case ZT_STATE_OBJECT_PEER:
+ if (len) {
+ }
+ break;
+
+ case ZT_STATE_OBJECT_PEER_IDENTITY:
+ if (len) {
+ }
+ break;
+
+ case ZT_STATE_OBJECT_NETWORK_CONFIG:
+ if (len <= (ZT_NETWORKCONFIG_DICT_CAPACITY - 1)) {
+ if (len < 2) {
+ Mutex::Lock _l(_networks_m);
+ SharedPtr<Network> &nw = _networks[id];
+ if (!nw) {
+ nw = SharedPtr<Network>(new Network(RR,tptr,id,(void *)0,(const NetworkConfig *)0));
+ r = ZT_RESULT_OK;
+ }
+ } else {
+ Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY> *dict = new Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY>(reinterpret_cast<const char *>(data),len);
+ try {
+ NetworkConfig *nconf = new NetworkConfig();
+ try {
+ if (nconf->fromDictionary(*dict)) {
+ Mutex::Lock _l(_networks_m);
+ SharedPtr<Network> &nw = _networks[id];
+ if (nw) {
+ switch (nw->setConfiguration(tptr,*nconf,false)) {
+ default:
+ r = ZT_RESULT_ERROR_BAD_PARAMETER;
+ break;
+ case 1:
+ r = ZT_RESULT_OK_IGNORED;
+ break;
+ case 2:
+ r = ZT_RESULT_OK;
+ break;
+ }
+ } else {
+ nw = SharedPtr<Network>(new Network(RR,tptr,id,(void *)0,nconf));
+ }
+ } else {
+ r = ZT_RESULT_ERROR_BAD_PARAMETER;
+ }
+ } catch ( ... ) {
+ r = ZT_RESULT_ERROR_BAD_PARAMETER;
+ }
+ delete nconf;
+ } catch ( ... ) {
+ r = ZT_RESULT_ERROR_BAD_PARAMETER;
+ }
+ delete dict;
+ }
+ } else {
+ r = ZT_RESULT_ERROR_BAD_PARAMETER;
+ }
+ break;
+
+ case ZT_STATE_OBJECT_PLANET:
+ case ZT_STATE_OBJECT_MOON:
+ if (len <= ZT_WORLD_MAX_SERIALIZED_LENGTH) {
+ World w;
+ try {
+ w.deserialize(Buffer<ZT_WORLD_MAX_SERIALIZED_LENGTH>(data,len));
+ if (( (w.type() == World::TYPE_MOON)&&(type == ZT_STATE_OBJECT_MOON) )||( (w.type() == World::TYPE_PLANET)&&(type == ZT_STATE_OBJECT_PLANET) )) {
+ r = (RR->topology->addWorld(tptr,w,false)) ? ZT_RESULT_OK : ZT_RESULT_OK_IGNORED;
+ }
+ } catch ( ... ) {
+ r = ZT_RESULT_ERROR_BAD_PARAMETER;
+ }
+ } else {
+ r = ZT_RESULT_ERROR_BAD_PARAMETER;
+ }
+ break;
+
+ default: break;
+ }
+ return r;
+}
+
ZT_ResultCode Node::processWirePacket(
void *tptr,
uint64_t now,
@@ -238,10 +342,13 @@ ZT_ResultCode Node::processBackgroundTasks(void *tptr,uint64_t now,volatile uint
std::vector< SharedPtr<Network> > needConfig;
{
Mutex::Lock _l(_networks_m);
- for(std::vector< std::pair< uint64_t,SharedPtr<Network> > >::const_iterator n(_networks.begin());n!=_networks.end();++n) {
- if (((now - n->second->lastConfigUpdate()) >= ZT_NETWORK_AUTOCONF_DELAY)||(!n->second->hasConfig()))
- needConfig.push_back(n->second);
- n->second->sendUpdatesToMembers(tptr);
+ Hashtable< uint64_t,SharedPtr<Network> >::Iterator i(_networks);
+ uint64_t *k = (uint64_t *)0;
+ SharedPtr<Network> *v = (SharedPtr<Network> *)0;
+ while (i.next(k,v)) {
+ if (((now - (*v)->lastConfigUpdate()) >= ZT_NETWORK_AUTOCONF_DELAY)||(!(*v)->hasConfig()))
+ needConfig.push_back(*v);
+ (*v)->sendUpdatesToMembers(tptr);
}
}
for(std::vector< SharedPtr<Network> >::const_iterator n(needConfig.begin());n!=needConfig.end();++n)
@@ -306,37 +413,38 @@ ZT_ResultCode Node::processBackgroundTasks(void *tptr,uint64_t now,volatile uint
ZT_ResultCode Node::join(uint64_t nwid,void *uptr,void *tptr)
{
Mutex::Lock _l(_networks_m);
- SharedPtr<Network> nw = _network(nwid);
- if(!nw) {
- const std::pair< uint64_t,SharedPtr<Network> > nn(nwid,SharedPtr<Network>(new Network(RR,tptr,nwid,uptr)));
- _networks.insert(std::upper_bound(_networks.begin(),_networks.end(),nn),nn);
- }
+ SharedPtr<Network> &nw = _networks[nwid];
+ if (!nw)
+ nw = SharedPtr<Network>(new Network(RR,tptr,nwid,uptr,(const NetworkConfig *)0));
return ZT_RESULT_OK;
}
ZT_ResultCode Node::leave(uint64_t nwid,void **uptr,void *tptr)
{
ZT_VirtualNetworkConfig ctmp;
- std::vector< std::pair< uint64_t,SharedPtr<Network> > > newn;
void **nUserPtr = (void **)0;
- Mutex::Lock _l(_networks_m);
-
- for(std::vector< std::pair< uint64_t,SharedPtr<Network> > >::const_iterator n(_networks.begin());n!=_networks.end();++n) {
- if (n->first != nwid) {
- newn.push_back(*n);
- } else {
- if (uptr)
- *uptr = *n->second->userPtr();
- n->second->externalConfig(&ctmp);
- n->second->destroy();
- nUserPtr = n->second->userPtr();
- }
+ {
+ Mutex::Lock _l(_networks_m);
+ SharedPtr<Network> *nw = _networks.get(nwid);
+ if (!nw)
+ return ZT_RESULT_OK;
+ if (uptr)
+ *uptr = (*nw)->userPtr();
+ (*nw)->externalConfig(&ctmp);
+ (*nw)->destroy();
+ nUserPtr = (*nw)->userPtr();
}
- _networks.swap(newn);
-
+
if (nUserPtr)
RR->node->configureVirtualNetworkPort(tptr,nwid,nUserPtr,ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY,&ctmp);
+ {
+ Mutex::Lock _l(_networks_m);
+ _networks.erase(nwid);
+ }
+
+ RR->node->stateObjectDelete(tptr,ZT_STATE_OBJECT_NETWORK_CONFIG,nwid);
+
return ZT_RESULT_OK;
}
@@ -431,10 +539,10 @@ ZT_PeerList *Node::peers() const
ZT_VirtualNetworkConfig *Node::networkConfig(uint64_t nwid) const
{
Mutex::Lock _l(_networks_m);
- SharedPtr<Network> nw = _network(nwid);
- if(nw) {
+ const SharedPtr<Network> *nw = _networks.get(nwid);
+ if (nw) {
ZT_VirtualNetworkConfig *nc = (ZT_VirtualNetworkConfig *)::malloc(sizeof(ZT_VirtualNetworkConfig));
- nw->externalConfig(nc);
+ (*nw)->externalConfig(nc);
return nc;
}
return (ZT_VirtualNetworkConfig *)0;
@@ -451,8 +559,11 @@ ZT_VirtualNetworkList *Node::networks() const
nl->networks = (ZT_VirtualNetworkConfig *)(buf + sizeof(ZT_VirtualNetworkList));
nl->networkCount = 0;
- for(std::vector< std::pair< uint64_t,SharedPtr<Network> > >::const_iterator n(_networks.begin());n!=_networks.end();++n)
- n->second->externalConfig(&(nl->networks[nl->networkCount++]));
+ Hashtable< uint64_t,SharedPtr<Network> >::Iterator i(*const_cast< Hashtable< uint64_t,SharedPtr<Network> > *>(&_networks));
+ uint64_t *k = (uint64_t *)0;
+ SharedPtr<Network> *v = (SharedPtr<Network> *)0;
+ while (i.next(k,v))
+ (*v)->externalConfig(&(nl->networks[nl->networkCount++]));
return nl;
}
@@ -503,6 +614,7 @@ void Node::setNetconfMaster(void *networkControllerInstance)
RR->localNetworkController->init(RR->identity,this);
}
+/*
ZT_ResultCode Node::clusterInit(
unsigned int myId,
const struct sockaddr_storage *zeroTierPhysicalEndpoints,
@@ -570,25 +682,12 @@ void Node::clusterStatus(ZT_ClusterStatus *cs)
#endif
memset(cs,0,sizeof(ZT_ClusterStatus));
}
+*/
/****************************************************************************/
/* Node methods used only within node/ */
/****************************************************************************/
-std::string Node::dataStoreGet(void *tPtr,const char *name)
-{
- char buf[1024];
- std::string r;
- unsigned long olen = 0;
- do {
- long n = _cb.dataStoreGetFunction(reinterpret_cast<ZT_Node *>(this),_uPtr,tPtr,name,buf,sizeof(buf),(unsigned long)r.length(),&olen);
- if (n <= 0)
- return std::string();
- r.append(buf,n);
- } while (r.length() < olen);
- return r;
-}
-
bool Node::shouldUsePathForZeroTierTraffic(void *tPtr,const Address &ztaddr,const InetAddress &localAddress,const InetAddress &remoteAddress)
{
if (!Path::isAddressValidForPath(remoteAddress))
@@ -599,10 +698,13 @@ bool Node::shouldUsePathForZeroTierTraffic(void *tPtr,const Address &ztaddr,cons
{
Mutex::Lock _l(_networks_m);
- for(std::vector< std::pair< uint64_t, SharedPtr<Network> > >::const_iterator i=_networks.begin();i!=_networks.end();++i) {
- if (i->second->hasConfig()) {
- for(unsigned int k=0;k<i->second->config().staticIpCount;++k) {
- if (i->second->config().staticIps[k].containsAddress(remoteAddress))
+ Hashtable< uint64_t,SharedPtr<Network> >::Iterator i(_networks);
+ uint64_t *k = (uint64_t *)0;
+ SharedPtr<Network> *v = (SharedPtr<Network> *)0;
+ while (i.next(k,v)) {
+ if ((*v)->hasConfig()) {
+ for(unsigned int k=0;k<(*v)->config().staticIpCount;++k) {
+ if ((*v)->config().staticIps[k].containsAddress(remoteAddress))
return false;
}
}
@@ -640,7 +742,7 @@ void Node::postTrace(const char *module,unsigned int line,const char *fmt,...)
va_end(ap);
tmp2[sizeof(tmp2)-1] = (char)0;
- Utils::snprintf(tmp1,sizeof(tmp1),"[%s] %s:%u %s",nowstr,module,line,tmp2);
+ Utils::ztsnprintf(tmp1,sizeof(tmp1),"[%s] %s:%u %s",nowstr,module,line,tmp2);
postEvent((void *)0,ZT_EVENT_TRACE,tmp1);
}
#endif // ZT_TRACE
@@ -806,6 +908,23 @@ void ZT_Node_delete(ZT_Node *node)
} catch ( ... ) {}
}
+enum ZT_ResultCode ZT_Node_processStateUpdate(
+ ZT_Node *node,
+ void *tptr,
+ ZT_StateObjectType type,
+ uint64_t id,
+ const void *data,
+ unsigned int len)
+{
+ try {
+ return reinterpret_cast<ZeroTier::Node *>(node)->processStateUpdate(tptr,type,id,data,len);
+ } catch (std::bad_alloc &exc) {
+ return ZT_RESULT_FATAL_ERROR_OUT_OF_MEMORY;
+ } catch ( ... ) {
+ return ZT_RESULT_FATAL_ERROR_INTERNAL;
+ }
+}
+
enum ZT_ResultCode ZT_Node_processWirePacket(
ZT_Node *node,
void *tptr,
@@ -998,56 +1117,6 @@ void ZT_Node_setNetconfMaster(ZT_Node *node,void *networkControllerInstance)
} catch ( ... ) {}
}
-enum ZT_ResultCode ZT_Node_clusterInit(
- ZT_Node *node,
- unsigned int myId,
- const struct sockaddr_storage *zeroTierPhysicalEndpoints,
- unsigned int numZeroTierPhysicalEndpoints,
- int x,
- int y,
- int z,
- void (*sendFunction)(void *,unsigned int,const void *,unsigned int),
- void *sendFunctionArg,
- int (*addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *),
- void *addressToLocationFunctionArg)
-{
- try {
- return reinterpret_cast<ZeroTier::Node *>(node)->clusterInit(myId,zeroTierPhysicalEndpoints,numZeroTierPhysicalEndpoints,x,y,z,sendFunction,sendFunctionArg,addressToLocationFunction,addressToLocationFunctionArg);
- } catch ( ... ) {
- return ZT_RESULT_FATAL_ERROR_INTERNAL;
- }
-}
-
-enum ZT_ResultCode ZT_Node_clusterAddMember(ZT_Node *node,unsigned int memberId)
-{
- try {
- return reinterpret_cast<ZeroTier::Node *>(node)->clusterAddMember(memberId);
- } catch ( ... ) {
- return ZT_RESULT_FATAL_ERROR_INTERNAL;
- }
-}
-
-void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId)
-{
- try {
- reinterpret_cast<ZeroTier::Node *>(node)->clusterRemoveMember(memberId);
- } catch ( ... ) {}
-}
-
-void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned int len)
-{
- try {
- reinterpret_cast<ZeroTier::Node *>(node)->clusterHandleIncomingMessage(msg,len);
- } catch ( ... ) {}
-}
-
-void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs)
-{
- try {
- reinterpret_cast<ZeroTier::Node *>(node)->clusterStatus(cs);
- } catch ( ... ) {}
-}
-
void ZT_Node_setTrustedPaths(ZT_Node *node,const struct sockaddr_storage *networks,const uint64_t *ids,unsigned int count)
{
try {
diff --git a/node/Node.hpp b/node/Node.hpp
index 57b5489e..f407c60c 100644
--- a/node/Node.hpp
+++ b/node/Node.hpp
@@ -46,6 +46,7 @@
#include "Path.hpp"
#include "Salsa20.hpp"
#include "NetworkController.hpp"
+#include "Hashtable.hpp"
#undef TRACE
#ifdef ZT_TRACE
@@ -81,6 +82,12 @@ public:
// Public API Functions ----------------------------------------------------
+ ZT_ResultCode processStateUpdate(
+ void *tptr,
+ ZT_StateObjectType type,
+ uint64_t id,
+ const void *data,
+ unsigned int len);
ZT_ResultCode processWirePacket(
void *tptr,
uint64_t now,
@@ -117,21 +124,6 @@ public:
void clearLocalInterfaceAddresses();
int sendUserMessage(void *tptr,uint64_t dest,uint64_t typeId,const void *data,unsigned int len);
void setNetconfMaster(void *networkControllerInstance);
- ZT_ResultCode clusterInit(
- unsigned int myId,
- const struct sockaddr_storage *zeroTierPhysicalEndpoints,
- unsigned int numZeroTierPhysicalEndpoints,
- int x,
- int y,
- int z,
- void (*sendFunction)(void *,unsigned int,const void *,unsigned int),
- void *sendFunctionArg,
- int (*addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *),
- void *addressToLocationFunctionArg);
- ZT_ResultCode clusterAddMember(unsigned int memberId);
- void clusterRemoveMember(unsigned int memberId);
- void clusterHandleIncomingMessage(const void *msg,unsigned int len);
- void clusterStatus(ZT_ClusterStatus *cs);
// Internal functions ------------------------------------------------------
@@ -169,26 +161,27 @@ public:
inline SharedPtr<Network> network(uint64_t nwid) const
{
Mutex::Lock _l(_networks_m);
- return _network(nwid);
+ const SharedPtr<Network> *n = _networks.get(nwid);
+ if (n)
+ return *n;
+ return SharedPtr<Network>();
}
inline bool belongsToNetwork(uint64_t nwid) const
{
Mutex::Lock _l(_networks_m);
- for(std::vector< std::pair< uint64_t, SharedPtr<Network> > >::const_iterator i=_networks.begin();i!=_networks.end();++i) {
- if (i->first == nwid)
- return true;
- }
- return false;
+ return _networks.contains(nwid);
}
inline std::vector< SharedPtr<Network> > allNetworks() const
{
std::vector< SharedPtr<Network> > nw;
Mutex::Lock _l(_networks_m);
- nw.reserve(_networks.size());
- for(std::vector< std::pair< uint64_t, SharedPtr<Network> > >::const_iterator i=_networks.begin();i!=_networks.end();++i)
- nw.push_back(i->second);
+ Hashtable< uint64_t,SharedPtr<Network> >::Iterator i(*const_cast< Hashtable< uint64_t,SharedPtr<Network> > * >(&_networks));
+ uint64_t *k = (uint64_t *)0;
+ SharedPtr<Network> *v = (SharedPtr<Network> *)0;
+ while (i.next(k,v))
+ nw.push_back(*v);
return nw;
}
@@ -198,17 +191,16 @@ public:
return _directPaths;
}
- inline bool dataStorePut(void *tPtr,const char *name,const void *data,unsigned int len,bool secure) { return (_cb.dataStorePutFunction(reinterpret_cast<ZT_Node *>(this),_uPtr,tPtr,name,data,len,(int)secure) == 0); }
- inline bool dataStorePut(void *tPtr,const char *name,const std::string &data,bool secure) { return dataStorePut(tPtr,name,(const void *)data.data(),(unsigned int)data.length(),secure); }
- inline void dataStoreDelete(void *tPtr,const char *name) { _cb.dataStorePutFunction(reinterpret_cast<ZT_Node *>(this),_uPtr,tPtr,name,(const void *)0,0,0); }
- std::string dataStoreGet(void *tPtr,const char *name);
-
inline void postEvent(void *tPtr,ZT_Event ev,const void *md = (const void *)0) { _cb.eventCallback(reinterpret_cast<ZT_Node *>(this),_uPtr,tPtr,ev,md); }
inline int configureVirtualNetworkPort(void *tPtr,uint64_t nwid,void **nuptr,ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nc) { return _cb.virtualNetworkConfigFunction(reinterpret_cast<ZT_Node *>(this),_uPtr,tPtr,nwid,nuptr,op,nc); }
inline bool online() const throw() { return _online; }
+ inline int stateObjectGet(void *const tPtr,ZT_StateObjectType type,const uint64_t id,void *const data,const unsigned int maxlen) { return _cb.stateGetFunction(reinterpret_cast<ZT_Node *>(this),_uPtr,tPtr,type,id,data,maxlen); }
+ inline void stateObjectPut(void *const tPtr,ZT_StateObjectType type,const uint64_t id,const void *const data,const unsigned int len) { _cb.statePutFunction(reinterpret_cast<ZT_Node *>(this),_uPtr,tPtr,type,id,data,(int)len); }
+ inline void stateObjectDelete(void *const tPtr,ZT_StateObjectType type,const uint64_t id) { _cb.statePutFunction(reinterpret_cast<ZT_Node *>(this),_uPtr,tPtr,type,id,(const void *)0,-1); }
+
#ifdef ZT_TRACE
void postTrace(const char *module,unsigned int line,const char *fmt,...);
#endif
@@ -222,6 +214,8 @@ public:
World planet() const;
std::vector<World> moons() const;
+ inline const Identity &identity() const { return _RR.identity; }
+
/**
* Register that we are expecting a reply to a packet ID
*
@@ -281,16 +275,6 @@ public:
virtual void ncSendError(uint64_t nwid,uint64_t requestPacketId,const Address &destination,NetworkController::ErrorCode errorCode);
private:
- inline SharedPtr<Network> _network(uint64_t nwid) const
- {
- // assumes _networks_m is locked
- for(std::vector< std::pair< uint64_t, SharedPtr<Network> > >::const_iterator i=_networks.begin();i!=_networks.end();++i) {
- if (i->first == nwid)
- return i->second;
- }
- return SharedPtr<Network>();
- }
-
RuntimeEnvironment _RR;
RuntimeEnvironment *RR;
void *_uPtr; // _uptr (lower case) is reserved in Visual Studio :P
@@ -303,7 +287,7 @@ private:
// Time of last identity verification indexed by InetAddress.rateGateHash() -- used in IncomingPacket::_doHELLO() via rateGateIdentityVerification()
uint64_t _lastIdentityVerification[16384];
- std::vector< std::pair< uint64_t, SharedPtr<Network> > > _networks;
+ Hashtable< uint64_t,SharedPtr<Network> > _networks;
Mutex _networks_m;
std::vector<InetAddress> _directPaths;
diff --git a/node/Peer.cpp b/node/Peer.cpp
index 01905833..84086048 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -32,7 +32,6 @@
#include "Switch.hpp"
#include "Network.hpp"
#include "SelfAwareness.hpp"
-#include "Cluster.hpp"
#include "Packet.hpp"
namespace ZeroTier {
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 211b706a..2be54b37 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -43,7 +43,6 @@
#include "Peer.hpp"
#include "SelfAwareness.hpp"
#include "Packet.hpp"
-#include "Cluster.hpp"
namespace ZeroTier {
diff --git a/node/Topology.cpp b/node/Topology.cpp
index 80f4ed4e..d4b424ff 100644
--- a/node/Topology.cpp
+++ b/node/Topology.cpp
@@ -68,15 +68,15 @@ Topology::Topology(const RuntimeEnvironment *renv,void *tPtr) :
_trustedPathCount(0),
_amRoot(false)
{
- try {
- World cachedPlanet;
- std::string buf(RR->node->dataStoreGet(tPtr,"planet"));
- if (buf.length() > 0) {
- Buffer<ZT_WORLD_MAX_SERIALIZED_LENGTH> dswtmp(buf.data(),(unsigned int)buf.length());
- cachedPlanet.deserialize(dswtmp,0);
- }
- addWorld(tPtr,cachedPlanet,false);
- } catch ( ... ) {}
+ uint8_t tmp[ZT_WORLD_MAX_SERIALIZED_LENGTH];
+ int n = RR->node->stateObjectGet(tPtr,ZT_STATE_OBJECT_PLANET,0,tmp,sizeof(tmp));
+ if (n > 0) {
+ try {
+ World cachedPlanet;
+ cachedPlanet.deserialize(Buffer<ZT_WORLD_MAX_SERIALIZED_LENGTH>(tmp,(unsigned int)n),0);
+ addWorld(tPtr,cachedPlanet,false);
+ } catch ( ... ) {} // ignore invalid cached planets
+ }
World defaultPlanet;
{
@@ -158,9 +158,8 @@ Identity Topology::getIdentity(void *tPtr,const Address &zta)
void Topology::saveIdentity(void *tPtr,const Identity &id)
{
if (id) {
- char p[128];
- Utils::snprintf(p,sizeof(p),"iddb.d/%.10llx",(unsigned long long)id.address().toInt());
- RR->node->dataStorePut(tPtr,p,id.toString(false),false);
+ const std::string tmp(id.toString(false));
+ RR->node->stateObjectPut(tPtr,ZT_STATE_OBJECT_PEER_IDENTITY,id.address().toInt(),tmp.data(),(unsigned int)tmp.length());
}
}
@@ -327,19 +326,11 @@ bool Topology::addWorld(void *tPtr,const World &newWorld,bool alwaysAcceptNew)
return false;
}
- char savePath[64];
- if (existing->type() == World::TYPE_MOON) {
- Utils::snprintf(savePath,sizeof(savePath),"moons.d/%.16llx.moon",existing->id());
- } else {
- Utils::scopy(savePath,sizeof(savePath),"planet");
- }
try {
- Buffer<ZT_WORLD_MAX_SERIALIZED_LENGTH> dswtmp;
- existing->serialize(dswtmp,false);
- RR->node->dataStorePut(tPtr,savePath,dswtmp.data(),dswtmp.size(),false);
- } catch ( ... ) {
- RR->node->dataStoreDelete(tPtr,savePath);
- }
+ Buffer<ZT_WORLD_MAX_SERIALIZED_LENGTH> sbuf;
+ existing->serialize(sbuf,false);
+ RR->node->stateObjectPut(tPtr,(existing->type() == World::TYPE_PLANET) ? ZT_STATE_OBJECT_PLANET : ZT_STATE_OBJECT_MOON,existing->id(),sbuf.data(),sbuf.size());
+ } catch ( ... ) {}
_memoizeUpstreams(tPtr);
@@ -348,21 +339,18 @@ bool Topology::addWorld(void *tPtr,const World &newWorld,bool alwaysAcceptNew)
void Topology::addMoon(void *tPtr,const uint64_t id,const Address &seed)
{
- char savePath[64];
- Utils::snprintf(savePath,sizeof(savePath),"moons.d/%.16llx.moon",id);
-
- try {
- std::string moonBin(RR->node->dataStoreGet(tPtr,savePath));
- if (moonBin.length() > 1) {
- Buffer<ZT_WORLD_MAX_SERIALIZED_LENGTH> wtmp(moonBin.data(),(unsigned int)moonBin.length());
+ char tmp[ZT_WORLD_MAX_SERIALIZED_LENGTH];
+ int n = RR->node->stateObjectGet(tPtr,ZT_STATE_OBJECT_MOON,id,tmp,sizeof(tmp));
+ if (n > 0) {
+ try {
World w;
- w.deserialize(wtmp);
+ w.deserialize(Buffer<ZT_WORLD_MAX_SERIALIZED_LENGTH>(tmp,(unsigned int)n));
if ((w.type() == World::TYPE_MOON)&&(w.id() == id)) {
addWorld(tPtr,w,true);
return;
}
- }
- } catch ( ... ) {}
+ } catch ( ... ) {}
+ }
if (seed) {
Mutex::Lock _l(_upstreams_m);
@@ -381,9 +369,7 @@ void Topology::removeMoon(void *tPtr,const uint64_t id)
if (m->id() != id) {
nm.push_back(*m);
} else {
- char savePath[64];
- Utils::snprintf(savePath,sizeof(savePath),"moons.d/%.16llx.moon",id);
- RR->node->dataStoreDelete(tPtr,savePath);
+ RR->node->stateObjectDelete(tPtr,ZT_STATE_OBJECT_MOON,id);
}
}
_moons.swap(nm);
@@ -425,12 +411,12 @@ void Topology::clean(uint64_t now)
Identity Topology::_getIdentity(void *tPtr,const Address &zta)
{
- char p[128];
- Utils::snprintf(p,sizeof(p),"iddb.d/%.10llx",(unsigned long long)zta.toInt());
- std::string ids(RR->node->dataStoreGet(tPtr,p));
- if (ids.length() > 0) {
+ char tmp[512];
+ int n = RR->node->stateObjectGet(tPtr,ZT_STATE_OBJECT_PEER_IDENTITY,zta.toInt(),tmp,sizeof(tmp) - 1);
+ if (n > 0) {
+ tmp[n] = (char)0;
try {
- return Identity(ids);
+ return Identity(tmp);
} catch ( ... ) {} // ignore invalid IDs
}
return Identity();
diff --git a/node/Utils.cpp b/node/Utils.cpp
index d69e5335..d2321e16 100644
--- a/node/Utils.cpp
+++ b/node/Utils.cpp
@@ -244,8 +244,7 @@ bool Utils::scopy(char *dest,unsigned int len,const char *src)
return true;
}
-unsigned int Utils::snprintf(char *buf,unsigned int len,const char *fmt,...)
- throw(std::length_error)
+unsigned int Utils::ztsnprintf(char *buf,unsigned int len,const char *fmt,...)
{
va_list ap;
@@ -256,7 +255,7 @@ unsigned int Utils::snprintf(char *buf,unsigned int len,const char *fmt,...)
if ((n >= (int)len)||(n < 0)) {
if (len)
buf[len - 1] = (char)0;
- throw std::length_error("buf[] overflow in Utils::snprintf");
+ throw std::length_error("buf[] overflow");
}
return (unsigned int)n;
diff --git a/node/Utils.hpp b/node/Utils.hpp
index 25a90055..212ef247 100644
--- a/node/Utils.hpp
+++ b/node/Utils.hpp
@@ -244,8 +244,7 @@ public:
* @param ... Format arguments
* @throws std::length_error buf[] too short (buf[] will still be left null-terminated)
*/
- static unsigned int snprintf(char *buf,unsigned int len,const char *fmt,...)
- throw(std::length_error);
+ static unsigned int ztsnprintf(char *buf,unsigned int len,const char *fmt,...);
/**
* Count the number of bits set in an integer