summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrant Limberg <glimberg@gmail.com>2015-10-26 18:10:27 -0700
committerGrant Limberg <glimberg@gmail.com>2015-10-26 18:10:27 -0700
commit352b83252fb2617a15cde0927cc30110b729e46d (patch)
tree38569e4f6c1c162cdaabf7ac8c9d6b7917d71ffd
parenta0c3083af0821db0303c62dfae9ebc560c3c147a (diff)
parent6625d7929654803f99b7a69f56a400046314acac (diff)
downloadinfinitytier-352b83252fb2617a15cde0927cc30110b729e46d.tar.gz
infinitytier-352b83252fb2617a15cde0927cc30110b729e46d.zip
Merge branch 'adamierymenko-dev' into windows-ui
-rwxr-xr-xcluster-geo/cluster-geo.exe4
-rw-r--r--cluster-geo/cluster-geo/cluster-geo.js (renamed from cluster-geo/cluster-geo/index.js)28
-rw-r--r--cluster-geo/cluster-geo/package.json2
-rw-r--r--include/ZeroTierOne.h92
-rw-r--r--node/Cluster.cpp458
-rw-r--r--node/Cluster.hpp13
-rw-r--r--node/InetAddress.hpp10
-rw-r--r--node/Network.cpp120
-rw-r--r--node/Network.hpp5
-rw-r--r--node/Node.cpp69
-rw-r--r--node/Node.hpp1
-rw-r--r--node/SelfAwareness.cpp9
-rw-r--r--node/SelfAwareness.hpp10
-rw-r--r--node/Topology.cpp40
-rw-r--r--node/Topology.hpp10
-rw-r--r--objects.mk1
-rw-r--r--osdep/Phy.hpp9
-rw-r--r--service/ClusterDefinition.hpp125
-rw-r--r--service/ClusterGeoIpService.cpp196
-rw-r--r--service/ClusterGeoIpService.hpp94
-rw-r--r--service/ControlPlane.cpp36
-rw-r--r--service/OneService.cpp147
-rw-r--r--service/OneService.hpp3
-rw-r--r--world/alice-test/mkworld.cpp32
24 files changed, 1165 insertions, 349 deletions
diff --git a/cluster-geo/cluster-geo.exe b/cluster-geo/cluster-geo.exe
index ae720610..56b76e0d 100755
--- a/cluster-geo/cluster-geo.exe
+++ b/cluster-geo/cluster-geo.exe
@@ -3,11 +3,11 @@
export PATH=/bin:/usr/bin:/usr/local/bin:/sbin:/usr/sbin
cd `dirname $0`
-if [ ! -d cluster-geo -o ! -f cluster-geo/index.js ]; then
+if [ ! -d cluster-geo -o ! -f cluster-geo/cluster-geo.js ]; then
echo 'Cannot find ./cluster-geo containing NodeJS script files.'
exit 1
fi
cd cluster-geo
-exec node index.js
+exec node --harmony cluster-geo.js
diff --git a/cluster-geo/cluster-geo/index.js b/cluster-geo/cluster-geo/cluster-geo.js
index 0e903ade..44af8492 100644
--- a/cluster-geo/cluster-geo/index.js
+++ b/cluster-geo/cluster-geo/cluster-geo.js
@@ -1,3 +1,5 @@
+"use strict";
+
//
// GeoIP lookup service
//
@@ -20,10 +22,10 @@ function lookup(ip,callback)
cache.get(ip,function(err,cachedEntryJson) {
if ((!err)&&(cachedEntryJson)) {
try {
- var cachedEntry = JSON.parse(cachedEntryJson.toString());
+ let cachedEntry = JSON.parse(cachedEntryJson.toString());
if (cachedEntry) {
- var ts = cachedEntry.ts;
- var r = cachedEntry.r;
+ let ts = cachedEntry.ts;
+ let r = cachedEntry.r;
if ((ts)&&(r)) {
if ((Date.now() - ts) < CACHE_TTL) {
r._cached = true;
@@ -57,24 +59,24 @@ process.stdin.on('readable',function() {
var chunk;
while (null !== (chunk = process.stdin.read())) {
for(var i=0;i<chunk.length;++i) {
- var c = chunk[i];
+ let c = chunk[i];
if ((c == 0x0d)||(c == 0x0a)) {
if (linebuf.length > 0) {
- var ip = linebuf;
+ let ip = linebuf;
lookup(ip,function(err,result) {
if ((err)||(!result)||(!result.location)) {
return process.stdout.write(ip+',0,0,0,0,0,0\n');
} else {
- var lat = parseFloat(result.location.latitude);
- var lon = parseFloat(result.location.longitude);
+ let lat = parseFloat(result.location.latitude);
+ let lon = parseFloat(result.location.longitude);
// Convert to X,Y,Z coordinates from Earth's origin, Earth-as-sphere approximation.
- var latRadians = lat * 0.01745329251994; // PI / 180
- var lonRadians = lon * 0.01745329251994; // PI / 180
- var cosLat = Math.cos(latRadians);
- var x = Math.round((-6371.0) * cosLat * Math.cos(lonRadians)); // 6371 == Earth's approximate radius in kilometers
- var y = Math.round(6371.0 * Math.sin(latRadians));
- var z = Math.round(6371.0 * cosLat * Math.sin(lonRadians));
+ let latRadians = lat * 0.01745329251994; // PI / 180
+ let lonRadians = lon * 0.01745329251994; // PI / 180
+ let cosLat = Math.cos(latRadians);
+ let x = Math.round((-6371.0) * cosLat * Math.cos(lonRadians)); // 6371 == Earth's approximate radius in kilometers
+ let y = Math.round(6371.0 * Math.sin(latRadians));
+ let z = Math.round(6371.0 * cosLat * Math.sin(lonRadians));
return process.stdout.write(ip+',1,'+lat+','+lon+','+x+','+y+','+z+'\n');
}
diff --git a/cluster-geo/cluster-geo/package.json b/cluster-geo/cluster-geo/package.json
index 1927197e..4cd1ce00 100644
--- a/cluster-geo/cluster-geo/package.json
+++ b/cluster-geo/cluster-geo/package.json
@@ -2,7 +2,7 @@
"name": "cluster-geo",
"version": "1.0.0",
"description": "Cluster GEO-IP Query Service",
- "main": "index.js",
+ "main": "cluster-geo.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h
index 135c8e11..7af4f760 100644
--- a/include/ZeroTierOne.h
+++ b/include/ZeroTierOne.h
@@ -131,12 +131,17 @@ extern "C" {
/**
* Maximum number of cluster members (and max member ID plus one)
*/
-#define ZT_CLUSTER_MAX_MEMBERS 256
+#define ZT_CLUSTER_MAX_MEMBERS 128
+
+/**
+ * Maximum number of physical ZeroTier addresses a cluster member can report
+ */
+#define ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES 16
/**
* Maximum allowed cluster message length in bytes
*/
-#define ZT_CLUSTER_MAX_MESSAGE_LENGTH 65535
+#define ZT_CLUSTER_MAX_MESSAGE_LENGTH (1444 * 6)
/**
* A null/empty sockaddr (all zero) to signify an unspecified socket address
@@ -880,6 +885,78 @@ typedef struct {
} ZT_CircuitTestReport;
/**
+ * A cluster member's status
+ */
+typedef struct {
+ /**
+ * This cluster member's ID (from 0 to 1-ZT_CLUSTER_MAX_MEMBERS)
+ */
+ unsigned int id;
+
+ /**
+ * Number of milliseconds since last 'alive' heartbeat message received via cluster backplane address
+ */
+ unsigned int msSinceLastHeartbeat;
+
+ /**
+ * Non-zero if cluster member is alive
+ */
+ int alive;
+
+ /**
+ * X, Y, and Z coordinates of this member (if specified, otherwise zero)
+ *
+ * What these mean depends on the location scheme being used for
+ * location-aware clustering. At present this is GeoIP and these
+ * will be the X, Y, and Z coordinates of the location on a spherical
+ * approximation of Earth where Earth's core is the origin (in km).
+ * They don't have to be perfect and need only be comparable with others
+ * to find shortest path via the standard vector distance formula.
+ */
+ int x,y,z;
+
+ /**
+ * Cluster member's last reported load
+ */
+ uint64_t load;
+
+ /**
+ * Number of peers this cluster member "has"
+ */
+ uint64_t peers;
+
+ /**
+ * Physical ZeroTier endpoints for this member (where peers are sent when directed here)
+ */
+ struct sockaddr_storage zeroTierPhysicalEndpoints[ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES];
+
+ /**
+ * Number of physical ZeroTier endpoints this member is announcing
+ */
+ unsigned int numZeroTierPhysicalEndpoints;
+} ZT_ClusterMemberStatus;
+
+/**
+ * ZeroTier cluster status
+ */
+typedef struct {
+ /**
+ * My cluster member ID (a record for 'self' is included in member[])
+ */
+ unsigned int myId;
+
+ /**
+ * Number of cluster members
+ */
+ unsigned int clusterSize;
+
+ /**
+ * Cluster member statuses
+ */
+ ZT_ClusterMemberStatus members[ZT_CLUSTER_MAX_MEMBERS];
+} ZT_ClusterStatus;
+
+/**
* An instance of a ZeroTier One node (opaque)
*/
typedef void ZT_Node;
@@ -1440,6 +1517,17 @@ void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId);
void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned int len);
/**
+ * Get the current status of the cluster from this node's point of view
+ *
+ * Calling this without clusterInit() or without cluster support will just
+ * zero out the structure and show a cluster size of zero.
+ *
+ * @param node Node instance
+ * @param cs Cluster status structure to fill with data
+ */
+void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs);
+
+/**
* Get ZeroTier One version
*
* @param major Result: major version
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index d9514db5..9d25593a 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -143,212 +143,223 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
return;
const uint16_t fromMemberId = dmsg.at<uint16_t>(0);
unsigned int ptr = 2;
- if (fromMemberId == _id)
+ if (fromMemberId == _id) // sanity check: we don't talk to ourselves
return;
const uint16_t toMemberId = dmsg.at<uint16_t>(ptr);
ptr += 2;
- if (toMemberId != _id)
+ if (toMemberId != _id) // sanity check: message not for us?
return;
- _Member &m = _members[fromMemberId];
- Mutex::Lock mlck(m.lock);
-
- try {
- while (ptr < dmsg.size()) {
- const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
- const unsigned int nextPtr = ptr + mlen;
-
- int mtype = -1;
- try {
- switch((StateMessageType)(mtype = (int)dmsg[ptr++])) {
- default:
- break;
-
- case STATE_MESSAGE_ALIVE: {
- ptr += 7; // skip version stuff, not used yet
- m.x = dmsg.at<int32_t>(ptr); ptr += 4;
- m.y = dmsg.at<int32_t>(ptr); ptr += 4;
- m.z = dmsg.at<int32_t>(ptr); ptr += 4;
- ptr += 8; // skip local clock, not used
- m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
- ptr += 8; // skip flags, unused
+ { // make sure sender is actually considered a member
+ Mutex::Lock _l3(_memberIds_m);
+ if (std::find(_memberIds.begin(),_memberIds.end(),fromMemberId) == _memberIds.end())
+ return;
+ }
+
+ {
+ _Member &m = _members[fromMemberId];
+ Mutex::Lock mlck(m.lock);
+
+ try {
+ while (ptr < dmsg.size()) {
+ const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
+ const unsigned int nextPtr = ptr + mlen;
+ if (nextPtr > dmsg.size())
+ break;
+
+ int mtype = -1;
+ try {
+ switch((StateMessageType)(mtype = (int)dmsg[ptr++])) {
+ default:
+ break;
+
+ case STATE_MESSAGE_ALIVE: {
+ ptr += 7; // skip version stuff, not used yet
+ m.x = dmsg.at<int32_t>(ptr); ptr += 4;
+ m.y = dmsg.at<int32_t>(ptr); ptr += 4;
+ m.z = dmsg.at<int32_t>(ptr); ptr += 4;
+ ptr += 8; // skip local clock, not used
+ m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
+ ptr += 8; // skip flags, unused
#ifdef ZT_TRACE
- std::string addrs;
+ std::string addrs;
#endif
- unsigned int physicalAddressCount = dmsg[ptr++];
- for(unsigned int i=0;i<physicalAddressCount;++i) {
- m.zeroTierPhysicalEndpoints.push_back(InetAddress());
- ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr);
- if (!(m.zeroTierPhysicalEndpoints.back())) {
- m.zeroTierPhysicalEndpoints.pop_back();
- }
+ unsigned int physicalAddressCount = dmsg[ptr++];
+ m.zeroTierPhysicalEndpoints.clear();
+ for(unsigned int i=0;i<physicalAddressCount;++i) {
+ m.zeroTierPhysicalEndpoints.push_back(InetAddress());
+ ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr);
+ if (!(m.zeroTierPhysicalEndpoints.back())) {
+ m.zeroTierPhysicalEndpoints.pop_back();
+ }
#ifdef ZT_TRACE
- else {
- if (addrs.length() > 0)
- addrs.push_back(',');
- addrs.append(m.zeroTierPhysicalEndpoints.back().toString());
- }
+ else {
+ if (addrs.length() > 0)
+ addrs.push_back(',');
+ addrs.append(m.zeroTierPhysicalEndpoints.back().toString());
+ }
#endif
- }
- m.lastReceivedAliveAnnouncement = RR->node->now();
+ }
+ m.lastReceivedAliveAnnouncement = RR->node->now();
#ifdef ZT_TRACE
- TRACE("[%u] I'm alive! send me peers at %s",(unsigned int)fromMemberId,addrs.c_str());
+ TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str());
#endif
- } break;
-
- case STATE_MESSAGE_HAVE_PEER: {
- try {
- Identity id;
- ptr += id.deserialize(dmsg,ptr);
- if (id) {
- RR->topology->saveIdentity(id);
-
- { // Add or update peer affinity entry
- _PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
- Mutex::Lock _l2(_peerAffinities_m);
- std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
- if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
- i->timestamp = pa.timestamp;
- } else {
- _peerAffinities.push_back(pa);
- std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
- }
- }
+ } break;
+
+ case STATE_MESSAGE_HAVE_PEER: {
+ try {
+ Identity id;
+ ptr += id.deserialize(dmsg,ptr);
+ if (id) {
+ RR->topology->saveIdentity(id);
+
+ { // Add or update peer affinity entry
+ _PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
+ Mutex::Lock _l2(_peerAffinities_m);
+ std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
+ if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
+ i->timestamp = pa.timestamp;
+ } else {
+ _peerAffinities.push_back(pa);
+ std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
+ }
+ }
- TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
- }
- } catch ( ... ) {
- // ignore invalid identities
- }
- } break;
-
- case STATE_MESSAGE_MULTICAST_LIKE: {
- const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
- const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
- const MAC mac(dmsg.field(ptr,6),6); ptr += 6;
- const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4;
- RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address);
- TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid);
- } break;
-
- case STATE_MESSAGE_COM: {
- CertificateOfMembership com;
- ptr += com.deserialize(dmsg,ptr);
- if (com) {
- TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision());
- }
- } break;
-
- case STATE_MESSAGE_RELAY: {
- const unsigned int numRemotePeerPaths = dmsg[ptr++];
- InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
- for(unsigned int i=0;i<numRemotePeerPaths;++i)
- ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
- const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
- const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
-
- if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
- const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
- TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths);
-
- SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
- if (destinationPeer) {
- if (
- (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
- (numRemotePeerPaths > 0)&&
- (packetLen >= 18)&&
- (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
- ) {
- // If remote peer paths were sent with this relayed packet, we do
- // RENDEZVOUS. It's handled here for cluster-relayed packets since
- // we don't have both Peer records so this is a different path.
-
- const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
-
- InetAddress bestDestV4,bestDestV6;
- destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
- InetAddress bestRemoteV4,bestRemoteV6;
- for(unsigned int i=0;i<numRemotePeerPaths;++i) {
- if ((bestRemoteV4)&&(bestRemoteV6))
- break;
- switch(remotePeerPaths[i].ss_family) {
- case AF_INET:
- if (!bestRemoteV4)
- bestRemoteV4 = remotePeerPaths[i];
- break;
- case AF_INET6:
- if (!bestRemoteV6)
- bestRemoteV6 = remotePeerPaths[i];
+ TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
+ }
+ } catch ( ... ) {
+ // ignore invalid identities
+ }
+ } break;
+
+ case STATE_MESSAGE_MULTICAST_LIKE: {
+ const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
+ const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
+ const MAC mac(dmsg.field(ptr,6),6); ptr += 6;
+ const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4;
+ RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address);
+ TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid);
+ } break;
+
+ case STATE_MESSAGE_COM: {
+ CertificateOfMembership com;
+ ptr += com.deserialize(dmsg,ptr);
+ if (com) {
+ TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision());
+ }
+ } break;
+
+ case STATE_MESSAGE_RELAY: {
+ const unsigned int numRemotePeerPaths = dmsg[ptr++];
+ InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
+ for(unsigned int i=0;i<numRemotePeerPaths;++i)
+ ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
+ const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
+ const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
+
+ if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
+ const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
+ TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths);
+
+ SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
+ if (destinationPeer) {
+ if (
+ (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
+ (numRemotePeerPaths > 0)&&
+ (packetLen >= 18)&&
+ (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
+ ) {
+ // If remote peer paths were sent with this relayed packet, we do
+ // RENDEZVOUS. It's handled here for cluster-relayed packets since
+ // we don't have both Peer records so this is a different path.
+
+ const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
+
+ InetAddress bestDestV4,bestDestV6;
+ destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
+ InetAddress bestRemoteV4,bestRemoteV6;
+ for(unsigned int i=0;i<numRemotePeerPaths;++i) {
+ if ((bestRemoteV4)&&(bestRemoteV6))
break;
+ switch(remotePeerPaths[i].ss_family) {
+ case AF_INET:
+ if (!bestRemoteV4)
+ bestRemoteV4 = remotePeerPaths[i];
+ break;
+ case AF_INET6:
+ if (!bestRemoteV6)
+ bestRemoteV6 = remotePeerPaths[i];
+ break;
+ }
}
- }
- Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
- rendezvousForDest.append((uint8_t)0);
- remotePeerAddress.appendTo(rendezvousForDest);
-
- Buffer<2048> rendezvousForOtherEnd;
- remotePeerAddress.appendTo(rendezvousForOtherEnd);
- rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
- const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
- rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
- rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
- destinationAddress.appendTo(rendezvousForOtherEnd);
-
- bool haveMatch = false;
- if ((bestDestV6)&&(bestRemoteV6)) {
- haveMatch = true;
-
- rendezvousForDest.append((uint16_t)bestRemoteV6.port());
- rendezvousForDest.append((uint8_t)16);
- rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
-
- rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
- rendezvousForOtherEnd.append((uint8_t)16);
- rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
- rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
- } else if ((bestDestV4)&&(bestRemoteV4)) {
- haveMatch = true;
-
- rendezvousForDest.append((uint16_t)bestRemoteV4.port());
- rendezvousForDest.append((uint8_t)4);
- rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
-
- rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
- rendezvousForOtherEnd.append((uint8_t)4);
- rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
- rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
- }
+ Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
+ rendezvousForDest.append((uint8_t)0);
+ remotePeerAddress.appendTo(rendezvousForDest);
+
+ Buffer<2048> rendezvousForOtherEnd;
+ remotePeerAddress.appendTo(rendezvousForOtherEnd);
+ rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
+ const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
+ rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
+ rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
+ destinationAddress.appendTo(rendezvousForOtherEnd);
+
+ bool haveMatch = false;
+ if ((bestDestV6)&&(bestRemoteV6)) {
+ haveMatch = true;
+
+ rendezvousForDest.append((uint16_t)bestRemoteV6.port());
+ rendezvousForDest.append((uint8_t)16);
+ rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
+
+ rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
+ rendezvousForOtherEnd.append((uint8_t)16);
+ rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
+ rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
+ } else if ((bestDestV4)&&(bestRemoteV4)) {
+ haveMatch = true;
+
+ rendezvousForDest.append((uint16_t)bestRemoteV4.port());
+ rendezvousForDest.append((uint8_t)4);
+ rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
+
+ rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
+ rendezvousForOtherEnd.append((uint8_t)4);
+ rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
+ rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
+ }
- if (haveMatch) {
- _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
- RR->sw->send(rendezvousForDest,true,0);
+ if (haveMatch) {
+ _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
+ RR->sw->send(rendezvousForDest,true,0);
+ }
}
}
}
- }
- } break;
-
- case STATE_MESSAGE_PROXY_SEND: {
- const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
- const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
- const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
- Packet outp(rcpt,RR->identity.address(),verb);
- outp.append(dmsg.field(ptr,len),len);
- RR->sw->send(outp,true,0);
- TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
- } break;
+ } break;
+
+ case STATE_MESSAGE_PROXY_SEND: {
+ const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
+ const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
+ const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
+ Packet outp(rcpt,RR->identity.address(),verb);
+ outp.append(dmsg.field(ptr,len),len);
+ RR->sw->send(outp,true,0);
+ TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
+ } break;
+ }
+ } catch ( ... ) {
+ TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
+ // drop invalids
}
- } catch ( ... ) {
- TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
- // drop invalids
- }
- ptr = nextPtr;
+ ptr = nextPtr;
+ }
+ } catch ( ... ) {
+ TRACE("invalid message (outer loop), discarding");
+ // drop invalids
}
- } catch ( ... ) {
- TRACE("invalid message (outer loop), discarding");
- // drop invalids
}
}
@@ -395,10 +406,12 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
_send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size());
}
+ TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
return true;
+ } else {
+ TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
+ return false;
}
-
- return false;
}
void Cluster::replicateHavePeer(const Identity &peerId)
@@ -436,11 +449,12 @@ void Cluster::replicateHavePeer(const Identity &peerId)
void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group)
{
- Buffer<4096> buf;
+ Buffer<2048> buf;
buf.append((uint64_t)nwid);
peerAddress.appendTo(buf);
group.mac().appendTo(buf);
buf.append((uint32_t)group.adi());
+ TRACE("replicating %s MULTICAST_LIKE %.16llx/%s/%u to all members",peerAddress.toString().c_str(),nwid,group.mac().toString().c_str(),(unsigned int)group.adi());
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@@ -452,8 +466,9 @@ void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,co
void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com)
{
- Buffer<4096> buf;
+ Buffer<2048> buf;
com.serialize(buf);
+ TRACE("replicating %s COM for %.16llx to all members",com.issuedTo().toString().c_str(),com.networkId());
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@@ -504,7 +519,7 @@ void Cluster::doPeriodicTasks()
void Cluster::addMember(uint16_t memberId)
{
- if (memberId >= ZT_CLUSTER_MAX_MEMBERS)
+ if ((memberId >= ZT_CLUSTER_MAX_MEMBERS)||(memberId == _id))
return;
Mutex::Lock _l2(_members[memberId].lock);
@@ -553,11 +568,12 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
{
if (!peerPhysicalAddress) // sanity check
return false;
+
if (_addressToLocationFunction) {
// Pick based on location if it can be determined
int px = 0,py = 0,pz = 0;
if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast<const struct sockaddr_storage *>(&peerPhysicalAddress),&px,&py,&pz) == 0) {
- // No geo-info so no change
+ TRACE("no geolocation available for %s",peerPhysicalAddress.toIpString().c_str());
return false;
}
@@ -567,6 +583,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
const double currentDistance = _dist3d(_x,_y,_z,px,py,pz);
double bestDistance = (offload ? 2147483648.0 : currentDistance);
unsigned int bestMember = _id;
+ TRACE("%s is at %d,%d,%d -- looking for anyone closer than %d,%d,%d (%fkm)",peerPhysicalAddress.toString().c_str(),px,py,pz,_x,_y,_z,bestDistance);
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@@ -577,6 +594,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) {
double mdist = _dist3d(m.x,m.y,m.z,px,py,pz);
if (mdist < bestDistance) {
+ bestDistance = mdist;
bestMember = *mid;
best = m.zeroTierPhysicalEndpoints;
}
@@ -585,7 +603,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
}
if (best.size() > 0) {
- TRACE("peer %s is at [%d,%d,%d], distance to us is %f, sending to %u instead for better distance %f",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestMember,bestDistance);
+ TRACE("%s seems closer to %u at %fkm, suggesting redirect...",peerAddress.toString().c_str(),bestMember,bestDistance);
/* if (peer->remoteVersionProtocol() >= 5) {
// If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic
@@ -620,8 +638,66 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
}
}
+void Cluster::status(ZT_ClusterStatus &status) const
+{
+ const uint64_t now = RR->node->now();
+ memset(&status,0,sizeof(ZT_ClusterStatus));
+ ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS];
+ memset(ms,0,sizeof(ms));
+
+ status.myId = _id;
+
+ ms[_id] = &(status.members[status.clusterSize++]);
+ ms[_id]->id = _id;
+ ms[_id]->alive = 1;
+ ms[_id]->x = _x;
+ ms[_id]->y = _y;
+ ms[_id]->z = _z;
+ ms[_id]->peers = RR->topology->countAlive();
+ for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) {
+ if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
+ break;
+ memcpy(&(ms[_id]->zeroTierPhysicalEndpoints[ms[_id]->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage));
+ }
+
+ {
+ Mutex::Lock _l1(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check
+ break;
+ ZT_ClusterMemberStatus *s = ms[*mid] = &(status.members[status.clusterSize++]);
+ _Member &m = _members[*mid];
+ Mutex::Lock ml(m.lock);
+
+ s->id = *mid;
+ s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement));
+ s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0;
+ s->x = m.x;
+ s->y = m.y;
+ s->z = m.z;
+ s->load = m.load;
+ for(std::vector<InetAddress>::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) {
+ if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
+ break;
+ memcpy(&(s->zeroTierPhysicalEndpoints[s->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage));
+ }
+ }
+ }
+
+ {
+ Mutex::Lock _l2(_peerAffinities_m);
+ for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) {
+ unsigned int mid = pi->clusterMemberId();
+ if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT))
+ ++ms[mid]->peers;
+ }
+ }
+}
+
void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len)
{
+ if ((len + 3) > (ZT_CLUSTER_MAX_MESSAGE_LENGTH - (24 + 2 + 2))) // sanity check
+ return;
_Member &m = _members[memberId];
// assumes m.lock is locked!
if ((m.q.size() + len + 3) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)
diff --git a/node/Cluster.hpp b/node/Cluster.hpp
index 2e60fd6b..be346659 100644
--- a/node/Cluster.hpp
+++ b/node/Cluster.hpp
@@ -47,7 +47,7 @@
/**
* Timeout for cluster members being considered "alive"
*/
-#define ZT_CLUSTER_TIMEOUT 30000
+#define ZT_CLUSTER_TIMEOUT 10000
/**
* How often should we announce that we have a peer?
@@ -57,7 +57,7 @@
/**
* Desired period between doPeriodicTasks() in milliseconds
*/
-#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 50
+#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100
namespace ZeroTier {
@@ -145,7 +145,7 @@ public:
STATE_MESSAGE_RELAY = 5,
/**
- * Request to send a packet to a locally-known peer:
+ * Request that a cluster member send a packet to a locally-known peer:
* <[5] ZeroTier address of recipient>
* <[1] packet verb>
* <[2] length of packet payload>
@@ -254,6 +254,13 @@ public:
*/
bool redirectPeer(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload);
+ /**
+ * Fill out ZT_ClusterStatus structure (from core API)
+ *
+ * @param status Reference to structure to hold result (anything there is replaced)
+ */
+ void status(ZT_ClusterStatus &status) const;
+
private:
void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len);
void _flush(uint16_t memberId);
diff --git a/node/InetAddress.hpp b/node/InetAddress.hpp
index 50db272a..ecafcf51 100644
--- a/node/InetAddress.hpp
+++ b/node/InetAddress.hpp
@@ -409,6 +409,16 @@ struct InetAddress : public sockaddr_storage
switch(b[p++]) {
case 0:
return 1;
+ case 0x01:
+ // TODO: Ethernet address (but accept for forward compatibility)
+ return 7;
+ case 0x02:
+ // TODO: Bluetooth address (but accept for forward compatibility)
+ return 7;
+ case 0x03:
+ // TODO: Other address types (but accept for forward compatibility)
+ // These could be extended/optional things like AF_UNIX, LTE Direct, shared memory, etc.
+ return (unsigned int)(b.template at<uint16_t>(p) + 3); // other addresses begin with 16-bit non-inclusive length
case 0x04:
ss_family = AF_INET;
memcpy(&(reinterpret_cast<struct sockaddr_in *>(this)->sin_addr.s_addr),b.field(p,4),4); p += 4;
diff --git a/node/Network.cpp b/node/Network.cpp
index 46f93241..cd30e386 100644
--- a/node/Network.cpp
+++ b/node/Network.cpp
@@ -144,7 +144,15 @@ void Network::multicastUnsubscribe(const MulticastGroup &mg)
bool Network::tryAnnounceMulticastGroupsTo(const SharedPtr<Peer> &peer)
{
Mutex::Lock _l(_lock);
- return _tryAnnounceMulticastGroupsTo(RR->topology->rootAddresses(),_allMulticastGroups(),peer,RR->node->now());
+ if (
+ (_isAllowed(peer)) ||
+ (peer->address() == this->controller()) ||
+ (RR->topology->isRoot(peer->identity()))
+ ) {
+ _announceMulticastGroupsTo(peer->address(),_allMulticastGroups());
+ return true;
+ }
+ return false;
}
bool Network::applyConfiguration(const SharedPtr<NetworkConfig> &conf)
@@ -400,77 +408,80 @@ bool Network::_isAllowed(const SharedPtr<Peer> &peer) const
return false; // default position on any failure
}
-bool Network::_tryAnnounceMulticastGroupsTo(const std::vector<Address> &alwaysAddresses,const std::vector<MulticastGroup> &allMulticastGroups,const SharedPtr<Peer> &peer,uint64_t now) const
-{
- // assumes _lock is locked
- if (
- (_isAllowed(peer)) ||
- (peer->address() == this->controller()) ||
- (std::find(alwaysAddresses.begin(),alwaysAddresses.end(),peer->address()) != alwaysAddresses.end())
- ) {
-
- if ((_config)&&(_config->com())&&(!_config->isPublic())&&(peer->needsOurNetworkMembershipCertificate(_id,now,true))) {
- Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE);
- _config->com().serialize(outp);
- outp.armor(peer->key(),true);
- peer->send(RR,outp.data(),outp.size(),now);
- }
-
- {
- Packet outp(peer->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
-
- for(std::vector<MulticastGroup>::const_iterator mg(allMulticastGroups.begin());mg!=allMulticastGroups.end();++mg) {
- if ((outp.size() + 18) >= ZT_UDP_DEFAULT_PAYLOAD_MTU) {
- outp.armor(peer->key(),true);
- peer->send(RR,outp.data(),outp.size(),now);
- outp.reset(peer->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
- }
-
- // network ID, MAC, ADI
- outp.append((uint64_t)_id);
- mg->mac().appendTo(outp);
- outp.append((uint32_t)mg->adi());
- }
-
- if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH) {
- outp.armor(peer->key(),true);
- peer->send(RR,outp.data(),outp.size(),now);
- }
- }
-
- return true;
- }
- return false;
-}
-
-class _AnnounceMulticastGroupsToAll
+class _GetPeersThatNeedMulticastAnnouncement
{
public:
- _AnnounceMulticastGroupsToAll(const RuntimeEnvironment *renv,Network *nw) :
+ _GetPeersThatNeedMulticastAnnouncement(const RuntimeEnvironment *renv,Network *nw) :
_now(renv->node->now()),
+ _controller(nw->controller()),
_network(nw),
- _rootAddresses(renv->topology->rootAddresses()),
- _allMulticastGroups(nw->_allMulticastGroups())
+ _rootAddresses(renv->topology->rootAddresses())
{}
-
- inline void operator()(Topology &t,const SharedPtr<Peer> &p) { _network->_tryAnnounceMulticastGroupsTo(_rootAddresses,_allMulticastGroups,p,_now); }
-
+ inline void operator()(Topology &t,const SharedPtr<Peer> &p)
+ {
+ if (
+ (_network->_isAllowed(p)) ||
+ (p->address() == _controller) ||
+ (std::find(_rootAddresses.begin(),_rootAddresses.end(),p->address()) != _rootAddresses.end())
+ ) {
+ peers.push_back(p->address());
+ }
+ }
+ std::vector<Address> peers;
private:
uint64_t _now;
+ Address _controller;
Network *_network;
std::vector<Address> _rootAddresses;
- std::vector<MulticastGroup> _allMulticastGroups;
};
void Network::_announceMulticastGroups()
{
// Assumes _lock is locked
- _AnnounceMulticastGroupsToAll afunc(RR,this);
- RR->topology->eachPeer<_AnnounceMulticastGroupsToAll &>(afunc);
+
+ _GetPeersThatNeedMulticastAnnouncement gpfunc(RR,this);
+ RR->topology->eachPeer<_GetPeersThatNeedMulticastAnnouncement &>(gpfunc);
+
+ std::vector<MulticastGroup> allMulticastGroups(_allMulticastGroups());
+ for(std::vector<Address>::const_iterator pa(gpfunc.peers.begin());pa!=gpfunc.peers.end();++pa)
+ _announceMulticastGroupsTo(*pa,allMulticastGroups);
+}
+
+void Network::_announceMulticastGroupsTo(const Address &peerAddress,const std::vector<MulticastGroup> &allMulticastGroups) const
+{
+ // Assumes _lock is locked
+
+ // We push COMs ahead of MULTICAST_LIKE since they're used for access control -- a COM is a public
+ // credential so "over-sharing" isn't really an issue (and we only do so with roots).
+ if ((_config)&&(_config->com())&&(!_config->isPublic())) {
+ Packet outp(peerAddress,RR->identity.address(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE);
+ _config->com().serialize(outp);
+ RR->sw->send(outp,true,0);
+ }
+
+ {
+ Packet outp(peerAddress,RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
+
+ for(std::vector<MulticastGroup>::const_iterator mg(allMulticastGroups.begin());mg!=allMulticastGroups.end();++mg) {
+ if ((outp.size() + 18) >= ZT_UDP_DEFAULT_PAYLOAD_MTU) {
+ RR->sw->send(outp,true,0);
+ outp.reset(peerAddress,RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
+ }
+
+ // network ID, MAC, ADI
+ outp.append((uint64_t)_id);
+ mg->mac().appendTo(outp);
+ outp.append((uint32_t)mg->adi());
+ }
+
+ if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH)
+ RR->sw->send(outp,true,0);
+ }
}
std::vector<MulticastGroup> Network::_allMulticastGroups() const
{
// Assumes _lock is locked
+
std::vector<MulticastGroup> mgs;
mgs.reserve(_myMulticastGroups.size() + _multicastGroupsBehindMe.size() + 1);
mgs.insert(mgs.end(),_myMulticastGroups.begin(),_myMulticastGroups.end());
@@ -479,6 +490,7 @@ std::vector<MulticastGroup> Network::_allMulticastGroups() const
mgs.push_back(Network::BROADCAST);
std::sort(mgs.begin(),mgs.end());
mgs.erase(std::unique(mgs.begin(),mgs.end()),mgs.end());
+
return mgs;
}
diff --git a/node/Network.hpp b/node/Network.hpp
index f7939323..0effa8e2 100644
--- a/node/Network.hpp
+++ b/node/Network.hpp
@@ -56,7 +56,7 @@ namespace ZeroTier {
class RuntimeEnvironment;
class Peer;
-class _AnnounceMulticastGroupsToAll; // internal function object in Network.cpp
+class _GetPeersThatNeedMulticastAnnouncement;
/**
* A virtual LAN
@@ -64,7 +64,7 @@ class _AnnounceMulticastGroupsToAll; // internal function object in Network.cpp
class Network : NonCopyable
{
friend class SharedPtr<Network>;
- friend class _AnnounceMulticastGroupsToAll;
+ friend class _GetPeersThatNeedMulticastAnnouncement; // internal function object
public:
/**
@@ -344,6 +344,7 @@ private:
bool _isAllowed(const SharedPtr<Peer> &peer) const;
bool _tryAnnounceMulticastGroupsTo(const std::vector<Address> &rootAddresses,const std::vector<MulticastGroup> &allMulticastGroups,const SharedPtr<Peer> &peer,uint64_t now) const;
void _announceMulticastGroups();
+ void _announceMulticastGroupsTo(const Address &peerAddress,const std::vector<MulticastGroup> &allMulticastGroups) const;
std::vector<MulticastGroup> _allMulticastGroups() const;
const RuntimeEnvironment *RR;
diff --git a/node/Node.cpp b/node/Node.cpp
index 6eea3d3d..2b298903 100644
--- a/node/Node.cpp
+++ b/node/Node.cpp
@@ -211,8 +211,15 @@ public:
}
}
- // If this is a network preferred relay, also always ping and if a stable endpoint is specified use that if not alive
if (!upstream) {
+ // If I am a root server, only ping other root servers -- roots don't ping "down"
+ // since that would just be a waste of bandwidth and could potentially cause route
+ // flapping in Cluster mode.
+ if (RR->topology->amRoot())
+ return;
+
+ // Check for network preferred relays, also considered 'upstream' and thus always
+ // pinged to keep links up. If they have stable addresses we will try them there.
for(std::vector< std::pair<Address,InetAddress> >::const_iterator r(_relays.begin());r!=_relays.end();++r) {
if (r->first == p->address()) {
if (r->second.ss_family == AF_INET)
@@ -229,18 +236,22 @@ public:
// "Upstream" devices are roots and relays and get special treatment -- they stay alive
// forever and we try to keep (if available) both IPv4 and IPv6 channels open to them.
bool needToContactIndirect = true;
- if (!p->doPingAndKeepalive(RR,_now,AF_INET)) {
+ if (p->doPingAndKeepalive(RR,_now,AF_INET)) {
+ needToContactIndirect = false;
+ } else {
if (stableEndpoint4) {
needToContactIndirect = false;
p->attemptToContactAt(RR,InetAddress(),stableEndpoint4,_now);
}
- } else needToContactIndirect = false;
- if (!p->doPingAndKeepalive(RR,_now,AF_INET6)) {
+ }
+ if (p->doPingAndKeepalive(RR,_now,AF_INET6)) {
+ needToContactIndirect = false;
+ } else {
if (stableEndpoint6) {
needToContactIndirect = false;
p->attemptToContactAt(RR,InetAddress(),stableEndpoint6,_now);
}
- } else needToContactIndirect = false;
+ }
if (needToContactIndirect) {
// If this is an upstream and we have no stable endpoint for either IPv4 or IPv6,
@@ -625,6 +636,18 @@ void Node::clusterHandleIncomingMessage(const void *msg,unsigned int len)
#endif
}
+void Node::clusterStatus(ZT_ClusterStatus *cs)
+{
+ if (!cs)
+ return;
+#ifdef ZT_ENABLE_CLUSTER
+ if (RR->cluster)
+ RR->cluster->status(*cs);
+ else
+#endif
+ memset(cs,0,sizeof(ZT_ClusterStatus));
+}
+
/****************************************************************************/
/* Node methods used only within node/ */
/****************************************************************************/
@@ -936,15 +959,6 @@ enum ZT_ResultCode ZT_Node_clusterInit(
}
}
-/**
- * Add a member to this cluster
- *
- * Calling this without having called clusterInit() will do nothing.
- *
- * @param node Node instance
- * @param memberId Member ID (must be less than or equal to ZT_CLUSTER_MAX_MEMBERS)
- * @return OK or error if clustering is disabled, ID invalid, etc.
- */
enum ZT_ResultCode ZT_Node_clusterAddMember(ZT_Node *node,unsigned int memberId)
{
try {
@@ -954,14 +968,6 @@ enum ZT_ResultCode ZT_Node_clusterAddMember(ZT_Node *node,unsigned int memberId)
}
}
-/**
- * Remove a member from this cluster
- *
- * Calling this without having called clusterInit() will do nothing.
- *
- * @param node Node instance
- * @param memberId Member ID to remove (nothing happens if not present)
- */
void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId)
{
try {
@@ -969,18 +975,6 @@ void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId)
} catch ( ... ) {}
}
-/**
- * Handle an incoming cluster state message
- *
- * The message itself contains cluster member IDs, and invalid or badly
- * addressed messages will be silently discarded.
- *
- * Calling this without having called clusterInit() will do nothing.
- *
- * @param node Node instance
- * @param msg Cluster message
- * @param len Length of cluster message
- */
void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned int len)
{
try {
@@ -988,6 +982,13 @@ void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned
} catch ( ... ) {}
}
+void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs)
+{
+ try {
+ reinterpret_cast<ZeroTier::Node *>(node)->clusterStatus(cs);
+ } catch ( ... ) {}
+}
+
void ZT_version(int *major,int *minor,int *revision,unsigned long *featureFlags)
{
if (major) *major = ZEROTIER_ONE_VERSION_MAJOR;
diff --git a/node/Node.hpp b/node/Node.hpp
index b8bd4dc5..4094a79e 100644
--- a/node/Node.hpp
+++ b/node/Node.hpp
@@ -124,6 +124,7 @@ public:
ZT_ResultCode clusterAddMember(unsigned int memberId);
void clusterRemoveMember(unsigned int memberId);
void clusterHandleIncomingMessage(const void *msg,unsigned int len);
+ void clusterStatus(ZT_ClusterStatus *cs);
// Internal functions ------------------------------------------------------
diff --git a/node/SelfAwareness.cpp b/node/SelfAwareness.cpp
index 7329322a..1b70f17c 100644
--- a/node/SelfAwareness.cpp
+++ b/node/SelfAwareness.cpp
@@ -94,7 +94,7 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi
Mutex::Lock _l(_phy_m);
- PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,scope)];
+ PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,reporterPhysicalAddress,scope)];
if ((now - entry.ts) >= ZT_SELFAWARENESS_ENTRY_TIMEOUT) {
entry.mySurface = myPhysicalAddress;
@@ -105,14 +105,15 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi
entry.ts = now;
TRACE("learned physical address %s for scope %u as seen from %s(%s) (replaced %s, resetting all in scope)",myPhysicalAddress.toString().c_str(),(unsigned int)scope,reporter.toString().c_str(),reporterPhysicalAddress.toString().c_str(),entry.mySurface.toString().c_str());
- // Erase all entries (other than this one) for this scope to prevent thrashing
- // Note: we should probably not use 'entry' after this
+ // Erase all entries in this scope that were not reported by this remote address to prevent 'thrashing'
+ // due to multiple reports of endpoint change.
+ // Don't use 'entry' after this since hash table gets modified.
{
Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy);
PhySurfaceKey *k = (PhySurfaceKey *)0;
PhySurfaceEntry *e = (PhySurfaceEntry *)0;
while (i.next(k,e)) {
- if ((k->reporter != reporter)&&(k->scope == scope))
+ if ((k->reporterPhysicalAddress != reporterPhysicalAddress)&&(k->scope == scope))
_phy.erase(*k);
}
}
diff --git a/node/SelfAwareness.hpp b/node/SelfAwareness.hpp
index 3133553e..400b05e6 100644
--- a/node/SelfAwareness.hpp
+++ b/node/SelfAwareness.hpp
@@ -69,14 +69,14 @@ private:
struct PhySurfaceKey
{
Address reporter;
+ InetAddress reporterPhysicalAddress;
InetAddress::IpScope scope;
- inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); }
-
PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {}
- PhySurfaceKey(const Address &r,InetAddress::IpScope s) : reporter(r),scope(s) {}
- inline bool operator<(const PhySurfaceKey &k) const throw() { return ((reporter < k.reporter) ? true : ((reporter == k.reporter) ? ((int)scope < (int)k.scope) : false)); }
- inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(scope == k.scope)); }
+ PhySurfaceKey(const Address &r,const InetAddress &ra,InetAddress::IpScope s) : reporter(r),reporterPhysicalAddress(ra),scope(s) {}
+
+ inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); }
+ inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(reporterPhysicalAddress == k.reporterPhysicalAddress)&&(scope == k.scope)); }
};
struct PhySurfaceEntry
{
diff --git a/node/Topology.cpp b/node/Topology.cpp
index 88c8856c..e56d1f47 100644
--- a/node/Topology.cpp
+++ b/node/Topology.cpp
@@ -35,14 +35,9 @@
namespace ZeroTier {
-// Default World
#define ZT_DEFAULT_WORLD_LENGTH 494
static const unsigned char ZT_DEFAULT_WORLD[ZT_DEFAULT_WORLD_LENGTH] = {0x01,0x00,0x00,0x00,0x00,0x08,0xea,0xc9,0x0a,0x00,0x00,0x01,0x4f,0xdf,0xbf,0xfc,0xbb,0x6c,0x7e,0x15,0x67,0x85,0x1b,0xb4,0x65,0x04,0x01,0xaf,0x56,0xbf,0xe7,0x63,0x9d,0x77,0xef,0xa4,0x1e,0x61,0x53,0x88,0xcb,0x8d,0x78,0xe5,0x47,0x38,0x98,0x5a,0x6c,0x8a,0xdd,0xe6,0x9c,0x65,0xdf,0x1a,0x80,0x63,0xce,0x2e,0x4d,0x48,0x24,0x3d,0x68,0x87,0x96,0x13,0x89,0xba,0x25,0x6f,0xc9,0xb0,0x9f,0x20,0xc5,0x4c,0x51,0x7b,0x30,0xb7,0x5f,0xba,0xca,0xa4,0xc5,0x48,0xa3,0x15,0xab,0x2f,0x1d,0x64,0xe8,0x04,0x42,0xb3,0x1c,0x51,0x8b,0x2a,0x04,0x01,0xf8,0xe1,0x81,0xaf,0x60,0x2f,0x70,0x3e,0xcd,0x0b,0x21,0x38,0x19,0x62,0x02,0xbd,0x0e,0x33,0x1d,0x0a,0x7b,0xf1,0xec,0xad,0xef,0x54,0xb3,0x7b,0x17,0x84,0xaa,0xda,0x0a,0x85,0x5d,0x0b,0x1c,0x05,0x83,0xb9,0x0e,0x3e,0xe3,0xb4,0xd1,0x8b,0x5b,0x64,0xf7,0xcf,0xe1,0xff,0x5d,0xc2,0x2a,0xcf,0x60,0x7b,0x09,0xb4,0xa3,0x86,0x3c,0x5a,0x7e,0x31,0xa0,0xc7,0xb4,0x86,0xe3,0x41,0x33,0x04,0x7e,0x19,0x87,0x6a,0xba,0x00,0x2a,0x6e,0x2b,0x23,0x18,0x93,0x0f,0x60,0xeb,0x09,0x7f,0x70,0xd0,0xf4,0xb0,0x28,0xb2,0xcd,0x6d,0x3d,0x0c,0x63,0xc0,0x14,0xb9,0x03,0x9f,0xf3,0x53,0x90,0xe4,0x11,0x81,0xf2,0x16,0xfb,0x2e,0x6f,0xa8,0xd9,0x5c,0x1e,0xe9,0x66,0x71,0x56,0x41,0x19,0x05,0xc3,0xdc,0xcf,0xea,0x78,0xd8,0xc6,0xdf,0xaf,0xba,0x68,0x81,0x70,0xb3,0xfa,0x00,0x01,0x04,0xc6,0xc7,0x61,0xdc,0x27,0x09,0x88,0x41,0x40,0x8a,0x2e,0x00,0xbb,0x1d,0x31,0xf2,0xc3,0x23,0xe2,0x64,0xe9,0xe6,0x41,0x72,0xc1,0xa7,0x4f,0x77,0x89,0x95,0x55,0xed,0x10,0x75,0x1c,0xd5,0x6e,0x86,0x40,0x5c,0xde,0x11,0x8d,0x02,0xdf,0xfe,0x55,0x5d,0x46,0x2c,0xcf,0x6a,0x85,0xb5,0x63,0x1c,0x12,0x35,0x0c,0x8d,0x5d,0xc4,0x09,0xba,0x10,0xb9,0x02,0x5d,0x0f,0x44,0x5c,0xf4,0x49,0xd9,0x2b,0x1c,0x00,0x01,0x04,0x6b,0xbf,0x2e,0xd2,0x27,0x09,0x8a,0xcf,0x05,0x9f,0xe3,0x00,0x48,0x2f,0x6e,0xe5,0xdf,0xe9,0x02,0x31,0x9b,0x41,0x9d,0xe5,0xbd,0xc7,0x65,0x20,0x9c,0x0e,0xcd,0xa3,0x8c,0x4d,0x6e,0x4f,0xcf,0x0d,0x33,0x65,0x83,0x98,0xb4,0x52,0x7d,0xcd,0x22,0xf9,0x31,0x12,0xfb,0x9b,0xef,0xd0,0x2f,0xd7,0x8b,0xf7,0x26,0x1b,0x33,0x3f,0xc1,0x05,0xd1,0x92,0xa6,0x23,0xca,0x9e,0x50,0xfc,0x60,0xb3,0x74,0xa5,0x00,0x01,0x04,0xa2,0xf3,0x4d,0x6f,0x27,0x09,0x9d,0x21,0x90,0x39,0xf3,0x00,0x01,0xf0,0x92,0x2a,0x98,0xe3,0xb3,0x4e,0xbc,0xbf,0xf3,0x33,0x26,0x9d,0xc2,0x65,0xd7,0xa0,0x20,0xaa,0xb6,0x9d,0x72,0xbe,0x4d,0x4a,0xcc,0x9c,0x8c,0x92,0x94,0x78,0x57,0x71,0x25,0x6c,0xd1,0xd9,0x42,0xa9,0x0d,0x1b,0xd1,0xd2,0xdc,0xa3,0xea,0x84,0xef,0x7d,0x85,0xaf,0xe6,0x61,0x1f,0xb4,0x3f,0xf0,0xb7,0x41,0x26,0xd9,0x0a,0x6e,0x00,0x01,0x04,0x80,0xc7,0xc5,0xd9,0x27,0x09};
-// ALICE-TEST
-//#define ZT_DEFAULT_WORLD_LENGTH 257
-//static const unsigned char ZT_DEFAULT_WORLD[ZT_DEFAULT_WORLD_LENGTH] = {0x01,0x00,0x00,0x00,0x00,0x08,0xea,0xc9,0x0a,0x00,0x00,0x01,0x50,0x81,0x2a,0x54,0x6f,0x72,0xb0,0x3b,0xbe,0x73,0xda,0xbd,0xfb,0x85,0x77,0x9f,0xc9,0x2e,0x17,0xc8,0x11,0x6e,0xda,0x61,0x80,0xd1,0x41,0xcb,0x7c,0x2d,0x2b,0xa4,0x34,0x75,0x19,0x64,0x20,0x80,0x0a,0x22,0x32,0xf2,0x01,0x6c,0xfe,0x79,0xa6,0x7d,0xec,0x10,0x7e,0x03,0xf3,0xa2,0xa0,0x19,0xc8,0x7c,0xfd,0x6c,0x56,0x52,0xa8,0xfb,0xdc,0xfb,0x93,0x81,0x3e,0x63,0x8b,0xb3,0xb6,0x72,0x45,0xa9,0x81,0x81,0xcc,0xea,0x7f,0x2f,0xd9,0x59,0xce,0xc8,0x51,0x12,0xc3,0xe3,0x44,0x76,0x54,0xed,0xe7,0x8d,0x34,0x0b,0x5d,0x10,0x3d,0x52,0x04,0x9b,0xe1,0xb2,0x36,0x51,0x75,0x14,0x30,0x53,0xe8,0x4b,0xe4,0x91,0x9a,0xed,0x99,0x56,0xa3,0x8d,0x5e,0x14,0xff,0x66,0xd8,0x4f,0xf7,0x3c,0x23,0xbe,0x02,0xbb,0x1e,0xb6,0x7e,0x07,0xfa,0x7c,0x7e,0x50,0xe8,0x40,0xf9,0x37,0x70,0x1a,0x75,0xcf,0x19,0xe6,0x83,0xe1,0x5c,0x20,0x1d,0x1e,0x5b,0xe5,0x6a,0xbe,0xe7,0xab,0xec,0x01,0xd6,0xdd,0xca,0x6a,0xb5,0x00,0x4e,0x76,0x12,0x07,0xd8,0xb4,0x20,0x0b,0xe4,0x4f,0x47,0x8e,0x3d,0xa1,0x48,0xc1,0x60,0x99,0x11,0x0e,0xe7,0x1b,0x64,0x58,0x6d,0xda,0x11,0x8e,0x40,0x22,0xab,0x63,0x68,0x2c,0xe1,0x37,0xda,0x8b,0xa8,0x17,0xfc,0x7f,0x73,0xaa,0x31,0x63,0xf2,0xe3,0x33,0x93,0x3e,0x29,0x94,0xc4,0x6b,0x4f,0x41,0x19,0x30,0x7b,0xe8,0x85,0x5a,0x72,0x00,0x01,0x04,0xa9,0x39,0x8f,0x68,0x27,0x09};
-
Topology::Topology(const RuntimeEnvironment *renv) :
RR(renv),
_amRoot(false)
@@ -337,6 +332,21 @@ void Topology::clean(uint64_t now)
}
}
+unsigned long Topology::countAlive() const
+{
+ const uint64_t now = RR->node->now();
+ unsigned long cnt = 0;
+ Mutex::Lock _l(_lock);
+ Hashtable< Address,SharedPtr<Peer> >::Iterator i(const_cast<Topology *>(this)->_peers);
+ Address *a = (Address *)0;
+ SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
+ while (i.next(a,p)) {
+ if ((*p)->alive(now))
+ ++cnt;
+ }
+ return cnt;
+}
+
Identity Topology::_getIdentity(const Address &zta)
{
char p[128];
@@ -358,16 +368,18 @@ void Topology::_setWorld(const World &newWorld)
_rootAddresses.clear();
_rootPeers.clear();
for(std::vector<World::Root>::const_iterator r(_world.roots().begin());r!=_world.roots().end();++r) {
- if (r->identity == RR->identity)
- _amRoot = true;
_rootAddresses.push_back(r->identity.address());
- SharedPtr<Peer> *rp = _peers.get(r->identity.address());
- if (rp) {
- _rootPeers.push_back(*rp);
- } else if (r->identity.address() != RR->identity.address()) {
- SharedPtr<Peer> newrp(new Peer(RR->identity,r->identity));
- _peers.set(r->identity.address(),newrp);
- _rootPeers.push_back(newrp);
+ if (r->identity.address() == RR->identity.address()) {
+ _amRoot = true;
+ } else {
+ SharedPtr<Peer> *rp = _peers.get(r->identity.address());
+ if (rp) {
+ _rootPeers.push_back(*rp);
+ } else {
+ SharedPtr<Peer> newrp(new Peer(RR->identity,r->identity));
+ _peers.set(r->identity.address(),newrp);
+ _rootPeers.push_back(newrp);
+ }
}
}
}
diff --git a/node/Topology.hpp b/node/Topology.hpp
index 48e264a8..ee9827b9 100644
--- a/node/Topology.hpp
+++ b/node/Topology.hpp
@@ -193,6 +193,11 @@ public:
void clean(uint64_t now);
/**
+ * @return Number of 'alive' peers
+ */
+ unsigned long countAlive() const;
+
+ /**
* Apply a function or function object to all peers
*
* Note: explicitly template this by reference if you want the object
@@ -225,6 +230,11 @@ public:
return _peers.entries();
}
+ /**
+ * @return True if I am a root server in the current World
+ */
+ inline bool amRoot() const throw() { return _amRoot; }
+
private:
Identity _getIdentity(const Address &zta);
void _setWorld(const World &newWorld);
diff --git a/objects.mk b/objects.mk
index 10e0c334..6dd5ea30 100644
--- a/objects.mk
+++ b/objects.mk
@@ -26,5 +26,6 @@ OBJS=\
osdep/BackgroundResolver.o \
osdep/Http.o \
osdep/OSUtils.o \
+ service/ClusterGeoIpService.o \
service/ControlPlane.o \
service/OneService.o
diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp
index 7f790e5d..6737034e 100644
--- a/osdep/Phy.hpp
+++ b/osdep/Phy.hpp
@@ -64,6 +64,12 @@
#include <netinet/in.h>
#include <netinet/tcp.h>
+#if defined(__linux__) || defined(linux) || defined(__LINUX__) || defined(__linux)
+#ifndef IPV6_DONTFRAG
+#define IPV6_DONTFRAG 62
+#endif
+#endif
+
#define ZT_PHY_SOCKFD_TYPE int
#define ZT_PHY_SOCKFD_NULL (-1)
#define ZT_PHY_SOCKFD_VALID(s) ((s) > -1)
@@ -375,6 +381,9 @@ public:
#ifdef IPV6_MTU_DISCOVER
f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_MTU_DISCOVER,&f,sizeof(f));
#endif
+#ifdef IPV6_DONTFRAG
+ f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,&f,sizeof(f));
+#endif
}
f = 0; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
f = 1; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(void *)&f,sizeof(f));
diff --git a/service/ClusterDefinition.hpp b/service/ClusterDefinition.hpp
new file mode 100644
index 00000000..d02894e4
--- /dev/null
+++ b/service/ClusterDefinition.hpp
@@ -0,0 +1,125 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2015 ZeroTier, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifndef ZT_CLUSTERDEFINITION_HPP
+#define ZT_CLUSTERDEFINITION_HPP
+
+#ifdef ZT_ENABLE_CLUSTER
+
+#include <vector>
+#include <algorithm>
+
+#include "../node/Constants.hpp"
+#include "../node/Utils.hpp"
+#include "../osdep/OSUtils.hpp"
+
+namespace ZeroTier {
+
+/**
+ * Parser for cluster definition file
+ */
+class ClusterDefinition
+{
+public:
+ struct MemberDefinition
+ {
+ MemberDefinition() : id(0),x(0),y(0),z(0) { name[0] = (char)0; }
+
+ unsigned int id;
+ int x,y,z;
+ char name[256];
+ InetAddress clusterEndpoint;
+ std::vector<InetAddress> zeroTierEndpoints;
+ };
+
+ ClusterDefinition(uint64_t myAddress,const char *pathToClusterFile)
+ {
+ std::string cf;
+ if (!OSUtils::readFile(pathToClusterFile,cf))
+ return;
+
+ char myAddressStr[64];
+ Utils::snprintf(myAddressStr,sizeof(myAddressStr),"%.10llx",myAddress);
+
+ std::vector<std::string> lines(Utils::split(cf.c_str(),"\r\n","",""));
+ for(std::vector<std::string>::iterator l(lines.begin());l!=lines.end();++l) {
+ std::vector<std::string> fields(Utils::split(l->c_str()," \t","",""));
+ if ((fields.size() < 5)||(fields[0][0] == '#')||(fields[0] != myAddressStr))
+ continue;
+
+ int id = Utils::strToUInt(fields[1].c_str());
+ if ((id < 0)||(id > ZT_CLUSTER_MAX_MEMBERS))
+ continue;
+ MemberDefinition &md = _md[id];
+
+ md.id = (unsigned int)id;
+ if (fields.size() >= 6) {
+ std::vector<std::string> xyz(Utils::split(fields[5].c_str(),",","",""));
+ md.x = (xyz.size() > 0) ? Utils::strToInt(xyz[0].c_str()) : 0;
+ md.y = (xyz.size() > 1) ? Utils::strToInt(xyz[1].c_str()) : 0;
+ md.z = (xyz.size() > 2) ? Utils::strToInt(xyz[2].c_str()) : 0;
+ }
+ Utils::scopy(md.name,sizeof(md.name),fields[2].c_str());
+ md.clusterEndpoint.fromString(fields[3]);
+ if (!md.clusterEndpoint)
+ continue;
+ std::vector<std::string> zips(Utils::split(fields[4].c_str(),",","",""));
+ for(std::vector<std::string>::iterator zip(zips.begin());zip!=zips.end();++zip) {
+ InetAddress i;
+ i.fromString(*zip);
+ if (i)
+ md.zeroTierEndpoints.push_back(i);
+ }
+
+ _ids.push_back((unsigned int)id);
+ }
+
+ std::sort(_ids.begin(),_ids.end());
+ }
+
+ inline const MemberDefinition &operator[](unsigned int id) const throw() { return _md[id]; }
+ inline unsigned int size() const throw() { return (unsigned int)_ids.size(); }
+ inline const std::vector<unsigned int> &ids() const throw() { return _ids; }
+
+ inline std::vector<MemberDefinition> members() const
+ {
+ std::vector<MemberDefinition> m;
+ for(std::vector<unsigned int>::const_iterator i(_ids.begin());i!=_ids.end();++i)
+ m.push_back(_md[*i]);
+ return m;
+ }
+
+private:
+ MemberDefinition _md[ZT_CLUSTER_MAX_MEMBERS];
+ std::vector<unsigned int> _ids;
+};
+
+} // namespace ZeroTier
+
+#endif // ZT_ENABLE_CLUSTER
+
+#endif
diff --git a/service/ClusterGeoIpService.cpp b/service/ClusterGeoIpService.cpp
new file mode 100644
index 00000000..9baa7506
--- /dev/null
+++ b/service/ClusterGeoIpService.cpp
@@ -0,0 +1,196 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2015 ZeroTier, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifdef ZT_ENABLE_CLUSTER
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+#include <signal.h>
+#include <errno.h>
+
+#include <iostream>
+
+#include "ClusterGeoIpService.hpp"
+#include "../node/Utils.hpp"
+#include "../osdep/OSUtils.hpp"
+
+#define ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL (60 * 60 * 1000)
+
+namespace ZeroTier {
+
+ClusterGeoIpService::ClusterGeoIpService(const char *pathToExe) :
+ _pathToExe(pathToExe),
+ _sOutputFd(-1),
+ _sInputFd(-1),
+ _sPid(0),
+ _run(true)
+{
+ _thread = Thread::start(this);
+}
+
+ClusterGeoIpService::~ClusterGeoIpService()
+{
+ _run = false;
+ long p = _sPid;
+ if (p > 0) {
+ ::kill(p,SIGTERM);
+ Thread::sleep(500);
+ ::kill(p,SIGKILL);
+ }
+ Thread::join(_thread);
+}
+
+bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z)
+{
+ InetAddress ipNoPort(ip);
+ ipNoPort.setPort(0); // we index cache by IP only
+ const uint64_t now = OSUtils::now();
+
+ bool r = false;
+ {
+ Mutex::Lock _l(_cache_m);
+ std::map< InetAddress,_CE >::iterator c(_cache.find(ipNoPort));
+ if (c != _cache.end()) {
+ x = c->second.x;
+ y = c->second.y;
+ z = c->second.z;
+ if ((now - c->second.ts) < ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL)
+ return true;
+ else r = true; // return true but refresh as well
+ }
+ }
+
+ {
+ Mutex::Lock _l(_sOutputLock);
+ if (_sOutputFd >= 0) {
+ std::string ips(ipNoPort.toIpString());
+ ips.push_back('\n');
+ //fprintf(stderr,"ClusterGeoIpService: << %s",ips.c_str());
+ ::write(_sOutputFd,ips.data(),ips.length());
+ }
+ }
+
+ return r;
+}
+
+void ClusterGeoIpService::threadMain()
+ throw()
+{
+ char linebuf[65536];
+ char buf[65536];
+ long n,lineptr;
+
+ while (_run) {
+ {
+ Mutex::Lock _l(_sOutputLock);
+
+ _sOutputFd = -1;
+ _sInputFd = -1;
+ _sPid = 0;
+
+ int stdinfds[2] = { 0,0 }; // sub-process's stdin, our output
+ int stdoutfds[2] = { 0,0 }; // sub-process's stdout, our input
+ ::pipe(stdinfds);
+ ::pipe(stdoutfds);
+
+ long p = (long)::vfork();
+ if (p < 0) {
+ Thread::sleep(500);
+ continue;
+ } else if (p == 0) {
+ ::close(stdinfds[1]);
+ ::close(stdoutfds[0]);
+ ::dup2(stdinfds[0],STDIN_FILENO);
+ ::dup2(stdoutfds[1],STDOUT_FILENO);
+ ::execl(_pathToExe.c_str(),_pathToExe.c_str(),(const char *)0);
+ ::exit(1);
+ } else {
+ ::close(stdinfds[0]);
+ ::close(stdoutfds[1]);
+ _sOutputFd = stdinfds[1];
+ _sInputFd = stdoutfds[0];
+ _sPid = p;
+ }
+ }
+
+ lineptr = 0;
+ while (_run) {
+ n = ::read(_sInputFd,buf,sizeof(buf));
+ if (n <= 0) {
+ if (errno == EINTR)
+ continue;
+ else break;
+ }
+ for(long i=0;i<n;++i) {
+ if (lineptr > (long)sizeof(linebuf))
+ lineptr = 0;
+ if ((buf[i] == '\n')||(buf[i] == '\r')) {
+ linebuf[lineptr] = (char)0;
+ if (lineptr > 0) {
+ //fprintf(stderr,"ClusterGeoIpService: >> %s\n",linebuf);
+ try {
+ std::vector<std::string> result(Utils::split(linebuf,",","",""));
+ if ((result.size() >= 7)&&(result[1] == "1")) {
+ InetAddress rip(result[0],0);
+ if ((rip.ss_family == AF_INET)||(rip.ss_family == AF_INET6)) {
+ _CE ce;
+ ce.ts = OSUtils::now();
+ ce.x = (int)::strtol(result[4].c_str(),(char **)0,10);
+ ce.y = (int)::strtol(result[5].c_str(),(char **)0,10);
+ ce.z = (int)::strtol(result[6].c_str(),(char **)0,10);
+ //fprintf(stderr,"ClusterGeoIpService: %s is at %d,%d,%d\n",rip.toIpString().c_str(),ce.x,ce.y,ce.z);
+ {
+ Mutex::Lock _l2(_cache_m);
+ _cache[rip] = ce;
+ }
+ }
+ }
+ } catch ( ... ) {}
+ }
+ lineptr = 0;
+ } else linebuf[lineptr++] = buf[i];
+ }
+ }
+
+ ::close(_sOutputFd);
+ ::close(_sInputFd);
+ ::kill(_sPid,SIGTERM);
+ Thread::sleep(250);
+ ::kill(_sPid,SIGKILL);
+ ::waitpid(_sPid,(int *)0,0);
+ }
+}
+
+} // namespace ZeroTier
+
+#endif // ZT_ENABLE_CLUSTER
diff --git a/service/ClusterGeoIpService.hpp b/service/ClusterGeoIpService.hpp
new file mode 100644
index 00000000..fd04ba1d
--- /dev/null
+++ b/service/ClusterGeoIpService.hpp
@@ -0,0 +1,94 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2015 ZeroTier, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifndef ZT_CLUSTERGEOIPSERVICE_HPP
+#define ZT_CLUSTERGEOIPSERVICE_HPP
+
+#ifdef ZT_ENABLE_CLUSTER
+
+#include <vector>
+#include <map>
+#include <string>
+
+#include "../node/Constants.hpp"
+#include "../node/InetAddress.hpp"
+#include "../node/Mutex.hpp"
+#include "../osdep/Thread.hpp"
+
+namespace ZeroTier {
+
+/**
+ * Runs the Cluster GeoIP service in the background and resolves geoIP queries
+ */
+class ClusterGeoIpService
+{
+public:
+ /**
+ * @param pathToExe Path to cluster geo-resolution service executable
+ */
+ ClusterGeoIpService(const char *pathToExe);
+
+ ~ClusterGeoIpService();
+
+ /**
+ * Attempt to locate an IP
+ *
+ * This returns true if x, y, and z are set. Otherwise it returns false
+ * and a geo-locate job is ordered in the background. This usually takes
+ * 500-1500ms to complete, after which time results will be available.
+ * If false is returned the supplied coordinate variables are unchanged.
+ *
+ * @param ip IPv4 or IPv6 address
+ * @param x Reference to variable to receive X
+ * @param y Reference to variable to receive Y
+ * @param z Reference to variable to receive Z
+ * @return True if coordinates were set
+ */
+ bool locate(const InetAddress &ip,int &x,int &y,int &z);
+
+ void threadMain()
+ throw();
+
+private:
+ const std::string _pathToExe;
+ int _sOutputFd;
+ int _sInputFd;
+ volatile long _sPid;
+ volatile bool _run;
+ Thread _thread;
+ Mutex _sOutputLock;
+
+ struct _CE { uint64_t ts; int x,y,z; };
+ std::map< InetAddress,_CE > _cache;
+ Mutex _cache_m;
+};
+
+} // namespace ZeroTier
+
+#endif // ZT_ENABLE_CLUSTER
+
+#endif
diff --git a/service/ControlPlane.cpp b/service/ControlPlane.cpp
index 7affb08c..31eca7b6 100644
--- a/service/ControlPlane.cpp
+++ b/service/ControlPlane.cpp
@@ -354,8 +354,38 @@ unsigned int ControlPlane::handleRequest(
if (ps[0] == "status") {
responseContentType = "application/json";
+
ZT_NodeStatus status;
_node->status(&status);
+
+ std::string clusterJson;
+#ifdef ZT_ENABLE_CLUSTER
+ {
+ ZT_ClusterStatus cs;
+ _node->clusterStatus(&cs);
+
+ if (cs.clusterSize >= 1) {
+ char t[4096];
+ Utils::snprintf(t,sizeof(t),"{\n\t\t\"myId\": %u,\n\t\t\"clusterSize\": %u,\n\t\t\"members: [\n",cs.myId,cs.clusterSize);
+ clusterJson.append(t);
+ for(unsigned int i=0;i<cs.clusterSize;++i) {
+ Utils::snprintf(t,sizeof(t),"\t\t\t{\n\t\t\t\t\"id\": %u,\n\t\t\t\t\"msSinceLastHeartbeat\": %u,\n\t\t\t\t\"alive\": %s,\n\t\t\t\t\"x\": %d,\n\t\t\t\t\"y\": %d,\n\t\t\t\t\"z\": %d,\n\t\t\t\t\"load\": %llu\n\t\t\t\t\"peers\": %llu\n\t\t\t}%s",
+ cs.members[i].id,
+ cs.members[i].msSinceLastHeartbeat,
+ (cs.members[i].alive != 0) ? "true" : "false",
+ cs.members[i].x,
+ cs.members[i].y,
+ cs.members[i].z,
+ cs.members[i].load,
+ cs.members[i].peers,
+ (i == (cs.clusterSize - 1)) ? "," : "");
+ clusterJson.append(t);
+ }
+ clusterJson.append(" ]\n\t\t}");
+ }
+ }
+#endif
+
Utils::snprintf(json,sizeof(json),
"{\n"
"\t\"address\": \"%.10llx\",\n"
@@ -368,7 +398,8 @@ unsigned int ControlPlane::handleRequest(
"\t\"versionMinor\": %d,\n"
"\t\"versionRev\": %d,\n"
"\t\"version\": \"%d.%d.%d\",\n"
- "\t\"clock\": %llu\n"
+ "\t\"clock\": %llu,\n"
+ "\t\"cluster\": %s\n"
"}\n",
status.address,
status.publicIdentity,
@@ -380,7 +411,8 @@ unsigned int ControlPlane::handleRequest(
ZEROTIER_ONE_VERSION_MINOR,
ZEROTIER_ONE_VERSION_REVISION,
ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION,
- (unsigned long long)OSUtils::now());
+ (unsigned long long)OSUtils::now(),
+ ((clusterJson.length() > 0) ? clusterJson.c_str() : "null"));
responseBody = json;
scode = 200;
} else if (ps[0] == "config") {
diff --git a/service/OneService.cpp b/service/OneService.cpp
index 6b28c41e..729812ed 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -58,6 +58,8 @@
#include "OneService.hpp"
#include "ControlPlane.hpp"
+#include "ClusterGeoIpService.hpp"
+#include "ClusterDefinition.hpp"
/**
* Uncomment to enable UDP breakage switch
@@ -366,6 +368,11 @@ static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,c
static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len);
static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
+#ifdef ZT_ENABLE_CLUSTER
+static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len);
+static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z);
+#endif
+
static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
static int ShttpOnMessageBegin(http_parser *parser);
@@ -419,26 +426,32 @@ class OneServiceImpl : public OneService
{
public:
OneServiceImpl(const char *hp,unsigned int port) :
- _homePath((hp) ? hp : "."),
- _tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY),
+ _homePath((hp) ? hp : ".")
+ ,_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY)
#ifdef ZT_ENABLE_NETWORK_CONTROLLER
- _controller((SqliteNetworkController *)0),
+ ,_controller((SqliteNetworkController *)0)
#endif
- _phy(this,false,true),
- _node((Node *)0),
- _controlPlane((ControlPlane *)0),
- _lastDirectReceiveFromGlobal(0),
- _lastSendToGlobal(0),
- _lastRestart(0),
- _nextBackgroundTaskDeadline(0),
- _tcpFallbackTunnel((TcpConnection *)0),
- _termReason(ONE_STILL_RUNNING),
- _port(0),
+ ,_phy(this,false,true)
+ ,_node((Node *)0)
+ ,_controlPlane((ControlPlane *)0)
+ ,_lastDirectReceiveFromGlobal(0)
+ ,_lastSendToGlobal(0)
+ ,_lastRestart(0)
+ ,_nextBackgroundTaskDeadline(0)
+ ,_tcpFallbackTunnel((TcpConnection *)0)
+ ,_termReason(ONE_STILL_RUNNING)
+ ,_port(0)
#ifdef ZT_USE_MINIUPNPC
- _v4UpnpUdpSocket((PhySocket *)0),
- _upnpClient((UPNPClient *)0),
+ ,_v4UpnpUdpSocket((PhySocket *)0)
+ ,_upnpClient((UPNPClient *)0)
#endif
- _run(true)
+#ifdef ZT_ENABLE_CLUSTER
+ ,_clusterMessageSocket((PhySocket *)0)
+ ,_clusterGeoIpService((ClusterGeoIpService *)0)
+ ,_clusterDefinition((ClusterDefinition *)0)
+ ,_clusterMemberId(0)
+#endif
+ ,_run(true)
{
const int portTrials = (port == 0) ? 256 : 1; // if port is 0, pick random
for(int k=0;k<portTrials;++k) {
@@ -510,6 +523,9 @@ public:
_phy.close(_v6UdpSocket);
_phy.close(_v4TcpListenSocket);
_phy.close(_v6TcpListenSocket);
+#ifdef ZT_ENABLE_CLUSTER
+ _phy.close(_clusterMessageSocket);
+#endif
#ifdef ZT_USE_MINIUPNPC
_phy.close(_v4UpnpUdpSocket);
delete _upnpClient;
@@ -517,6 +533,10 @@ public:
#ifdef ZT_ENABLE_NETWORK_CONTROLLER
delete _controller;
#endif
+#ifdef ZT_ENABLE_CLUSTER
+ delete _clusterGeoIpService;
+ delete _clusterDefinition;
+#endif
}
virtual ReasonForTermination run()
@@ -556,6 +576,70 @@ public:
_node->setNetconfMaster((void *)_controller);
#endif
+#ifdef ZT_ENABLE_CLUSTER
+ if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str())) {
+ _clusterDefinition = new ClusterDefinition(_node->address(),(_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str());
+ if (_clusterDefinition->size() > 0) {
+ std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members());
+ for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) {
+ PhySocket *cs = _phy.udpBind(reinterpret_cast<const struct sockaddr *>(&(m->clusterEndpoint)));
+ if (cs) {
+ if (_clusterMessageSocket) {
+ _phy.close(_clusterMessageSocket,false);
+ _phy.close(cs,false);
+
+ Mutex::Lock _l(_termReason_m);
+ _termReason = ONE_UNRECOVERABLE_ERROR;
+ _fatalErrorMessage = "Cluster: can't determine my cluster member ID: able to bind more than one cluster message socket IP/port!";
+ return _termReason;
+ }
+ _clusterMessageSocket = cs;
+ _clusterMemberId = m->id;
+ }
+ }
+
+ if (!_clusterMessageSocket) {
+ Mutex::Lock _l(_termReason_m);
+ _termReason = ONE_UNRECOVERABLE_ERROR;
+ _fatalErrorMessage = "Cluster: can't determine my cluster member ID: unable to bind to any cluster message socket IP/port.";
+ return _termReason;
+ }
+
+ if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str()))
+ _clusterGeoIpService = new ClusterGeoIpService((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str());
+
+ const ClusterDefinition::MemberDefinition &me = (*_clusterDefinition)[_clusterMemberId];
+ InetAddress endpoints[255];
+ unsigned int numEndpoints = 0;
+ for(std::vector<InetAddress>::const_iterator i(me.zeroTierEndpoints.begin());i!=me.zeroTierEndpoints.end();++i)
+ endpoints[numEndpoints++] = *i;
+
+ if (_node->clusterInit(
+ _clusterMemberId,
+ reinterpret_cast<const struct sockaddr_storage *>(endpoints),
+ numEndpoints,
+ me.x,
+ me.y,
+ me.z,
+ &SclusterSendFunction,
+ this,
+ (_clusterGeoIpService) ? &SclusterGeoIpFunction : 0,
+ this) == ZT_RESULT_OK) {
+
+ std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members());
+ for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) {
+ if (m->id != _clusterMemberId)
+ _node->clusterAddMember(m->id);
+ }
+
+ }
+ } else {
+ delete _clusterDefinition;
+ _clusterDefinition = (ClusterDefinition *)0;
+ }
+ }
+#endif
+
_controlPlane = new ControlPlane(this,_node,(_homePath + ZT_PATH_SEPARATOR_S + "ui").c_str());
_controlPlane->addAuthToken(authToken.c_str());
@@ -781,10 +865,18 @@ public:
inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len)
{
+#ifdef ZT_ENABLE_CLUSTER
+ if (sock == _clusterMessageSocket) {
+ _node->clusterHandleIncomingMessage(data,len);
+ return;
+ }
+#endif
+
#ifdef ZT_BREAK_UDP
if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP"))
return;
#endif
+
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
_lastDirectReceiveFromGlobal = OSUtils::now();
ZT_ResultCode rc = _node->processWirePacket(
@@ -1303,7 +1395,6 @@ public:
_phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections
}
-private:
std::string _dataStorePrepPath(const char *name) const
{
std::string p(_homePath);
@@ -1358,6 +1449,13 @@ private:
UPNPClient *_upnpClient;
#endif
+#ifdef ZT_ENABLE_CLUSTER
+ PhySocket *_clusterMessageSocket;
+ ClusterGeoIpService *_clusterGeoIpService;
+ ClusterDefinition *_clusterDefinition;
+ unsigned int _clusterMemberId;
+#endif
+
bool _run;
Mutex _run_m;
};
@@ -1375,6 +1473,21 @@ static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct soc
static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); }
+#ifdef ZT_ENABLE_CLUSTER
+static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len)
+{
+ OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
+ const ClusterDefinition::MemberDefinition &md = (*(impl->_clusterDefinition))[toMemberId];
+ if (md.clusterEndpoint)
+ impl->_phy.udpSend(impl->_clusterMessageSocket,reinterpret_cast<const struct sockaddr *>(&(md.clusterEndpoint)),data,len);
+}
+static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z)
+{
+ OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
+ return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z));
+}
+#endif
+
static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); }
diff --git a/service/OneService.hpp b/service/OneService.hpp
index 2f76ebaa..4f7b988b 100644
--- a/service/OneService.hpp
+++ b/service/OneService.hpp
@@ -43,6 +43,9 @@ namespace ZeroTier {
* periodically checked and updates are automatically downloaded, verified
* against a built-in list of update signing keys, and installed. This is
* only supported for certain platforms.
+ *
+ * If built with ZT_ENABLE_CLUSTER, a 'cluster' file is checked and if
+ * present is read to determine the identity of other cluster members.
*/
class OneService
{
diff --git a/world/alice-test/mkworld.cpp b/world/alice-test/mkworld.cpp
index e680b97e..8940db2c 100644
--- a/world/alice-test/mkworld.cpp
+++ b/world/alice-test/mkworld.cpp
@@ -133,13 +133,35 @@ int main(int argc,char **argv)
std::sort(roots.back().stableEndpoints.begin(),roots.back().stableEndpoints.end());
#endif
- // ALICE TEST
+ // NOTE -- these are temporary test identities -- this is not yet the 'real' network.
+ // (but these are the real nodes)
+
+ // Alice -- global geo-clustered root #1
roots.push_back(World::Root());
roots.back().identity = Identity("d6ddca6ab5:0:4e761207d8b4200be44f478e3da148c16099110ee71b64586dda118e4022ab63682ce137da8ba817fc7f73aa3163f2e333933e2994c46b4f4119307be8855a72");
- roots.back().stableEndpoints.push_back(InetAddress("169.57.143.104/9993"));
- std::sort(roots.back().stableEndpoints.begin(),roots.back().stableEndpoints.end());
-
- std::sort(roots.begin(),roots.end());
+ roots.back().stableEndpoints.push_back(InetAddress("188.166.94.177/9993")); // Amsterdam IPv4
+ roots.back().stableEndpoints.push_back(InetAddress("2a03:b0c0:2:d0::7d:1/9993")); // Amsterdam IPv6
+ roots.back().stableEndpoints.push_back(InetAddress("159.203.97.171/9993")); // New York IPv4
+ roots.back().stableEndpoints.push_back(InetAddress("2604:a880:800:a1::54:6001/9993 ")); // New York IPv6
+ roots.back().stableEndpoints.push_back(InetAddress("169.57.143.104/9993")); // Sao Paolo IPv4
+ roots.back().stableEndpoints.push_back(InetAddress("2607:f0d0:1d01:57::2/9993")); // Sao Paolo IPv6
+ roots.back().stableEndpoints.push_back(InetAddress("104.238.182.83/9993")); // San Francisco IPv4
+ roots.back().stableEndpoints.push_back(InetAddress("2001:19f0:ac00:809:5400:ff:fe15:f3f4/9993")); // San Francisco IPv6
+ roots.back().stableEndpoints.push_back(InetAddress("128.199.182.9/9993")); // Singapore IPv4
+ roots.back().stableEndpoints.push_back(InetAddress("2400:6180:0:d0::1b:1001/9993")); // Singapore IPv6
+
+ // Bob -- global geo-clustered root #2
+ roots.back().identity = Identity("16ebbd6c5d:0:47d39bca9d0a5cf70148e39f6c45199e17e0e32e4e46cac01ae5bcb21224137b097f40bdd982a921c3aabdcb9ada8b4f2bb0593753bfdb21cf12eac28c8d9042");
+ roots.back().stableEndpoints.push_back(InetAddress("45.33.4.67/9993")); // Dallas IPv4
+ roots.back().stableEndpoints.push_back(InetAddress("2600:3c00::f03c:91ff:fe67:b704/9993")); // Dallas IPv6
+ roots.back().stableEndpoints.push_back(InetAddress("139.162.157.243/9993")); // Frankfurt (Germany) IPv4
+ roots.back().stableEndpoints.push_back(InetAddress("2a01:7e01::f03c:91ff:fe67:3ffd/9993")); // Frankfurt (Germany) IPv6
+ roots.back().stableEndpoints.push_back(InetAddress("45.32.246.179/9993")); // Sydney IPv4
+ roots.back().stableEndpoints.push_back(InetAddress("2001:19f0:5800:8bf8:5400:ff:fe15:b39a/9993")); // Sydney IPv6
+ roots.back().stableEndpoints.push_back(InetAddress("45.32.248.87/9993")); // Tokyo IPv4
+ roots.back().stableEndpoints.push_back(InetAddress("2001:19f0:7000:9bc9:5400:00ff:fe15:c4f5/9993")); // Tokyo IPv6
+ roots.back().stableEndpoints.push_back(InetAddress("159.203.2.154/9993")); // Toronto IPv4
+ roots.back().stableEndpoints.push_back(InetAddress("2604:a880:cad:d0::26:7001/9993")); // Toronto IPv6
const uint64_t id = ZT_WORLD_ID_EARTH;
const uint64_t ts = OSUtils::now();