summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2015-10-26 12:41:08 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2015-10-26 12:41:08 -0700
commit865acfa40f65626f41bbd718b645f57c7d6db9b2 (patch)
treecb7bb750148f101578f28eec54154c40ef3a0ba2
parent3ce5ad9e2c1a31e1a3cc34f0ac8d950e4f0cf7b4 (diff)
downloadinfinitytier-865acfa40f65626f41bbd718b645f57c7d6db9b2.tar.gz
infinitytier-865acfa40f65626f41bbd718b645f57c7d6db9b2.zip
Cluster status plumbing.
-rw-r--r--include/ZeroTierOne.h77
-rw-r--r--node/Cluster.cpp56
-rw-r--r--node/Cluster.hpp7
-rw-r--r--node/Topology.cpp15
-rw-r--r--node/Topology.hpp5
5 files changed, 160 insertions, 0 deletions
diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h
index 4de4a765..86bffcdf 100644
--- a/include/ZeroTierOne.h
+++ b/include/ZeroTierOne.h
@@ -134,6 +134,11 @@ extern "C" {
#define ZT_CLUSTER_MAX_MEMBERS 128
/**
+ * Maximum number of physical ZeroTier addresses a cluster member can report
+ */
+#define ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES 16
+
+/**
* Maximum allowed cluster message length in bytes
*/
#define ZT_CLUSTER_MAX_MESSAGE_LENGTH (1444 * 6)
@@ -880,6 +885,78 @@ typedef struct {
} ZT_CircuitTestReport;
/**
+ * A cluster member's status
+ */
+typedef struct {
+ /**
+ * This cluster member's ID (from 0 to 1-ZT_CLUSTER_MAX_MEMBERS)
+ */
+ unsigned int id;
+
+ /**
+ * Number of milliseconds since last 'alive' heartbeat message received via cluster backplane address
+ */
+ unsigned int msSinceLastHeartbeat;
+
+ /**
+ * Non-zero if cluster member is alive
+ */
+ int alive;
+
+ /**
+ * X, Y, and Z coordinates of this member (if specified, otherwise zero)
+ *
+ * What these mean depends on the location scheme being used for
+ * location-aware clustering. At present this is GeoIP and these
+ * will be the X, Y, and Z coordinates of the location on a spherical
+ * approximation of Earth where Earth's core is the origin (in km).
+ * They don't have to be perfect and need only be comparable with others
+ * to find shortest path via the standard vector distance formula.
+ */
+ int x,y,z;
+
+ /**
+ * Cluster member's last reported load
+ */
+ uint64_t load;
+
+ /**
+ * Number of peers this cluster member "has"
+ */
+ uint64_t peers;
+
+ /**
+ * Physical ZeroTier endpoints for this member (where peers are sent when directed here)
+ */
+ struct sockaddr_storage zeroTierPhysicalEndpoints[ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES];
+
+ /**
+ * Number of physical ZeroTier endpoints this member is announcing
+ */
+ unsigned int numZeroTierPhysicalEndpoints;
+} ZT_ClusterMemberStatus;
+
+/**
+ * ZeroTier cluster status
+ */
+typedef struct {
+ /**
+ * My cluster member ID (a record for 'self' is included in member[])
+ */
+ unsigned int myId;
+
+ /**
+ * Number of cluster members
+ */
+ unsigned int clusterSize;
+
+ /**
+ * Cluster member statuses
+ */
+ ZT_ClusterMemberStatus member[ZT_CLUSTER_MAX_MEMBERS];
+} ZT_ClusterStatus;
+
+/**
* An instance of a ZeroTier One node (opaque)
*/
typedef void ZT_Node;
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index 900804b7..47794214 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -638,6 +638,62 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
}
}
+void Cluster::status(ZT_ClusterStatus &status) const
+{
+ const uint64_t now = RR->node->now();
+ memset(&status,0,sizeof(ZT_ClusterStatus));
+ ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS];
+ memset(ms,0,sizeof(ms));
+
+ status.myId = _id;
+
+ ms[_id] = &(status.member[status.clusterSize++]);
+ ms[_id]->id = _id;
+ ms[_id]->alive = 1;
+ ms[_id]->x = _x;
+ ms[_id]->y = _y;
+ ms[_id]->z = _z;
+ ms[_id]->peers = RR->topology->countAlive();
+ for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) {
+ if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
+ break;
+ memcpy(&(ms[_id]->zeroTierPhysicalEndpoints[ms[_id]->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage));
+ }
+
+ {
+ Mutex::Lock _l1(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check
+ break;
+ ZT_ClusterMemberStatus *s = ms[*mid] = &(status.member[status.clusterSize++]);
+ _Member &m = _members[*mid];
+ Mutex::Lock ml(m.lock);
+
+ s->id = *mid;
+ s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement));
+ s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0;
+ s->x = m.x;
+ s->y = m.y;
+ s->z = m.z;
+ s->load = m.load;
+ for(std::vector<InetAddress>::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) {
+ if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
+ break;
+ memcpy(&(s->zeroTierPhysicalEndpoints[s->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage));
+ }
+ }
+ }
+
+ {
+ Mutex::Lock _l2(_peerAffinities_m);
+ for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) {
+ unsigned int mid = pi->clusterMemberId();
+ if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT))
+ ++ms[mid]->peers;
+ }
+ }
+}
+
void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len)
{
if ((len + 3) > (ZT_CLUSTER_MAX_MESSAGE_LENGTH - (24 + 2 + 2))) // sanity check
diff --git a/node/Cluster.hpp b/node/Cluster.hpp
index daa81185..be346659 100644
--- a/node/Cluster.hpp
+++ b/node/Cluster.hpp
@@ -254,6 +254,13 @@ public:
*/
bool redirectPeer(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload);
+ /**
+ * Fill out ZT_ClusterStatus structure (from core API)
+ *
+ * @param status Reference to structure to hold result (anything there is replaced)
+ */
+ void status(ZT_ClusterStatus &status) const;
+
private:
void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len);
void _flush(uint16_t memberId);
diff --git a/node/Topology.cpp b/node/Topology.cpp
index 624c4987..e56d1f47 100644
--- a/node/Topology.cpp
+++ b/node/Topology.cpp
@@ -332,6 +332,21 @@ void Topology::clean(uint64_t now)
}
}
+unsigned long Topology::countAlive() const
+{
+ const uint64_t now = RR->node->now();
+ unsigned long cnt = 0;
+ Mutex::Lock _l(_lock);
+ Hashtable< Address,SharedPtr<Peer> >::Iterator i(const_cast<Topology *>(this)->_peers);
+ Address *a = (Address *)0;
+ SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
+ while (i.next(a,p)) {
+ if ((*p)->alive(now))
+ ++cnt;
+ }
+ return cnt;
+}
+
Identity Topology::_getIdentity(const Address &zta)
{
char p[128];
diff --git a/node/Topology.hpp b/node/Topology.hpp
index 324ec000..ee9827b9 100644
--- a/node/Topology.hpp
+++ b/node/Topology.hpp
@@ -193,6 +193,11 @@ public:
void clean(uint64_t now);
/**
+ * @return Number of 'alive' peers
+ */
+ unsigned long countAlive() const;
+
+ /**
* Apply a function or function object to all peers
*
* Note: explicitly template this by reference if you want the object