summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2015-11-06 16:12:41 -0800
committerAdam Ierymenko <adam.ierymenko@gmail.com>2015-11-06 16:12:41 -0800
commit6bc8c9d8efd05c7f85ac269be29c781fcc40672b (patch)
tree96dd722130004a2a6a84d0130f2f65baa2a6d4b2
parent5f39d5b7ea202ca39cef46779b5406e35e0dcb3e (diff)
downloadinfinitytier-6bc8c9d8efd05c7f85ac269be29c781fcc40672b.tar.gz
infinitytier-6bc8c9d8efd05c7f85ac269be29c781fcc40672b.zip
Clustering cleanup, still a work in progress.
-rw-r--r--include/ZeroTierOne.h2
-rw-r--r--node/Cluster.cpp109
-rw-r--r--node/Cluster.hpp34
-rw-r--r--node/Peer.cpp21
-rw-r--r--node/Switch.cpp36
5 files changed, 83 insertions, 119 deletions
diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h
index fd7857d9..91700886 100644
--- a/include/ZeroTierOne.h
+++ b/include/ZeroTierOne.h
@@ -912,7 +912,7 @@ typedef struct {
uint64_t load;
/**
- * Number of peers this cluster member "has"
+ * Number of peers
*/
uint64_t peers;
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index dc7ecab2..9c954fa3 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -184,6 +184,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
m.z = dmsg.at<int32_t>(ptr); ptr += 4;
ptr += 8; // skip local clock, not used
m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
+ m.peers = dmsg.at<uint64_t>(ptr); ptr += 8;
ptr += 8; // skip flags, unused
#ifdef ZT_TRACE
std::string addrs;
@@ -215,12 +216,22 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
case STATE_MESSAGE_HAVE_PEER: {
const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
Mutex::Lock _l2(_peerAffinities_m);
- _PA &pa = _peerAffinities[zeroTierAddress];
- pa.ts = RR->node->now();
- pa.mid = fromMemberId;
+ _peerAffinities.set(zeroTierAddress,fromMemberId);
TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str());
} break;
+ case STATE_MESSAGE_WANT_PEER: {
+ const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
+ SharedPtr<Peer> peer(RR->topology->getPeerNoCache(zeroTierAddress));
+ if ((peer)&&(peer->hasActiveDirectPath(RR->node->now()))) {
+ char buf[ZT_ADDRESS_LENGTH];
+ peer->address().copyTo(buf,ZT_ADDRESS_LENGTH);
+ Mutex::Lock _l2(_members[fromMemberId].lock);
+ _send(fromMemberId,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH);
+ _flush(fromMemberId);
+ }
+ } break;
+
case STATE_MESSAGE_MULTICAST_LIKE: {
const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
@@ -311,7 +322,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
if (haveMatch) {
_send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
- _flush(fromMemberId); // we want this to go ASAP, since with port restricted cone NATs success can be timing-sensitive
+ _flush(fromMemberId);
RR->sw->send(rendezvousForLocal,true,0);
}
}
@@ -349,12 +360,22 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
const uint64_t now = RR->node->now();
unsigned int canHasPeer = 0;
- { // Anyone got this peer?
+ {
Mutex::Lock _l2(_peerAffinities_m);
- _PA *pa = _peerAffinities.get(toPeerAddress);
- if ((pa)&&(pa->mid != _id)&&((now - pa->ts) < ZT_PEER_ACTIVITY_TIMEOUT))
- canHasPeer = pa->mid;
- else return false;
+ const unsigned int *pa = _peerAffinities.get(toPeerAddress);
+ if (!pa) {
+ char buf[ZT_ADDRESS_LENGTH];
+ peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH);
+ {
+ Mutex::Lock _l(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ Mutex::Lock _l2(_members[*mid].lock);
+ _send(*mid,STATE_MESSAGE_WANT_PEER,buf,ZT_ADDRESS_LENGTH);
+ }
+ }
+ return false;
+ }
+ canHasPeer = *pa;
}
Buffer<1024> buf;
@@ -395,22 +416,6 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
void Cluster::replicateHavePeer(const Identity &peerId)
{
- const uint64_t now = RR->node->now();
- {
- Mutex::Lock _l2(_peerAffinities_m);
- _PA &pa = _peerAffinities[peerId.address()];
- if (pa.mid != _id) {
- pa.ts = now;
- pa.mid = _id;
- // fall through to send code below
- } else if ((now - pa.ts) < ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
- return;
- } else {
- pa.ts = now;
- // fall through to send code below
- }
- }
-
char buf[ZT_ADDRESS_LENGTH];
peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH);
{
@@ -455,44 +460,9 @@ void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembers
*/
}
-struct _ClusterAnnouncePeers
-{
- _ClusterAnnouncePeers(const uint64_t now_,Cluster *parent_) : now(now_),parent(parent_) {}
- const uint64_t now;
- Cluster *const parent;
- inline void operator()(const Topology &t,const SharedPtr<Peer> &peer) const
- {
- if (peer->hasActiveDirectPath(now))
- parent->replicateHavePeer(peer->identity());
- }
-};
void Cluster::doPeriodicTasks()
{
const uint64_t now = RR->node->now();
-
- // Erase old peer affinity entries just to control table size
- if ((now - _lastCleanedPeerAffinities) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) {
- _lastCleanedPeerAffinities = now;
- Address *k = (Address *)0;
- _PA *v = (_PA *)0;
- Mutex::Lock _l(_peerAffinities_m);
- Hashtable< Address,_PA >::Iterator i(_peerAffinities);
- while (i.next(k,v)) {
- if ((now - v->ts) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5))
- _peerAffinities.erase(*k);
- }
- }
-
- // Announce peers that we have active direct paths to -- note that we forget paths
- // that other cluster members claim they have, which prevents us from fighting
- // with other cluster members (route flapping) over specific paths.
- if ((now - _lastCheckedPeersForAnnounce) >= (ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD / 4)) {
- _lastCheckedPeersForAnnounce = now;
- _ClusterAnnouncePeers func(now,this);
- RR->topology->eachPeer<_ClusterAnnouncePeers &>(func);
- }
-
- // Flush outgoing packet send queue every doPeriodicTasks()
if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) {
_lastFlushed = now;
Mutex::Lock _l(_memberIds_m);
@@ -516,6 +486,7 @@ void Cluster::doPeriodicTasks()
}
alive.append((uint64_t)now);
alive.append((uint64_t)0); // TODO: compute and send load average
+ alive.append((uint64_t)RR->topology->countActive());
alive.append((uint64_t)0); // unused/reserved flags
alive.append((uint8_t)_zeroTierPhysicalEndpoints.size());
for(std::vector<InetAddress>::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe)
@@ -630,8 +601,6 @@ void Cluster::status(ZT_ClusterStatus &status) const
{
const uint64_t now = RR->node->now();
memset(&status,0,sizeof(ZT_ClusterStatus));
- ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS];
- memset(ms,0,sizeof(ms));
status.myId = _id;
@@ -641,6 +610,7 @@ void Cluster::status(ZT_ClusterStatus &status) const
ms[_id]->x = _x;
ms[_id]->y = _y;
ms[_id]->z = _z;
+ ms[_id]->load = 0; // TODO
ms[_id]->peers = RR->topology->countActive();
for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) {
if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
@@ -653,10 +623,11 @@ void Cluster::status(ZT_ClusterStatus &status) const
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check
break;
- ZT_ClusterMemberStatus *s = ms[*mid] = &(status.members[status.clusterSize++]);
+
_Member &m = _members[*mid];
Mutex::Lock ml(m.lock);
+ ZT_ClusterMemberStatus *const s = &(status.members[status.clusterSize++]);
s->id = *mid;
s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement));
s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0;
@@ -664,6 +635,7 @@ void Cluster::status(ZT_ClusterStatus &status) const
s->y = m.y;
s->z = m.z;
s->load = m.load;
+ s->peers = m.peers;
for(std::vector<InetAddress>::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) {
if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
break;
@@ -671,17 +643,6 @@ void Cluster::status(ZT_ClusterStatus &status) const
}
}
}
-
- {
- Mutex::Lock _l2(_peerAffinities_m);
- Address *k = (Address *)0;
- _PA *v = (_PA *)0;
- Hashtable< Address,_PA >::Iterator i(const_cast<Cluster *>(this)->_peerAffinities);
- while (i.next(k,v)) {
- if ( (ms[v->mid]) && (v->mid != _id) && ((now - v->ts) < ZT_PEER_ACTIVITY_TIMEOUT) )
- ++ms[v->mid]->peers;
- }
- }
}
void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len)
diff --git a/node/Cluster.hpp b/node/Cluster.hpp
index 73391114..c5d110d0 100644
--- a/node/Cluster.hpp
+++ b/node/Cluster.hpp
@@ -53,11 +53,6 @@
#define ZT_CLUSTER_TIMEOUT 10000
/**
- * How often should we announce that we have a peer?
- */
-#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD ((ZT_PEER_DIRECT_PING_DELAY / 2) - 1000)
-
-/**
* Desired period between doPeriodicTasks() in milliseconds
*/
#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 250
@@ -116,6 +111,7 @@ public:
* <[4] Z location (signed 32-bit)>
* <[8] local clock at this member>
* <[8] load average>
+ * <[8] number of peers>
* <[8] flags (currently unused, must be zero)>
* <[1] number of preferred ZeroTier endpoints>
* <[...] InetAddress(es) of preferred ZeroTier endpoint(s)>
@@ -129,19 +125,25 @@ public:
STATE_MESSAGE_HAVE_PEER = 2,
/**
+ * Cluster member wants this peer:
+ * <[5] ZeroTier address of peer>
+ */
+ STATE_MESSAGE_WANT_PEER = 3,
+
+ /**
* Peer subscription to multicast group:
* <[8] network ID>
* <[5] peer ZeroTier address>
* <[6] MAC address of multicast group>
* <[4] 32-bit multicast group ADI>
*/
- STATE_MESSAGE_MULTICAST_LIKE = 3,
+ STATE_MESSAGE_MULTICAST_LIKE = 4,
/**
* Certificate of network membership for a peer:
* <[...] serialized COM>
*/
- STATE_MESSAGE_COM = 4,
+ STATE_MESSAGE_COM = 5,
/**
* Request that VERB_RENDEZVOUS be sent to a peer that we have:
@@ -155,7 +157,7 @@ public:
* info for its peer, and we send VERB_RENDEZVOUS to both sides: to ours
* directly and with PROXY_SEND to theirs.
*/
- STATE_MESSAGE_PROXY_UNITE = 5,
+ STATE_MESSAGE_PROXY_UNITE = 6,
/**
* Request that a cluster member send a packet to a locally-known peer:
@@ -171,7 +173,7 @@ public:
* while PROXY_SEND is used to implement proxy sending (which right
* now is only used to send RENDEZVOUS).
*/
- STATE_MESSAGE_PROXY_SEND = 6,
+ STATE_MESSAGE_PROXY_SEND = 7,
/**
* Replicate a network config for a network we belong to:
@@ -184,7 +186,7 @@ public:
*
* TODO: not implemented yet!
*/
- STATE_MESSAGE_NETWORK_CONFIG = 7
+ STATE_MESSAGE_NETWORK_CONFIG = 8
};
/**
@@ -316,6 +318,7 @@ private:
uint64_t lastAnnouncedAliveTo;
uint64_t load;
+ uint64_t peers;
int32_t x,y,z;
std::vector<InetAddress> zeroTierPhysicalEndpoints;
@@ -329,6 +332,7 @@ private:
lastReceivedAliveAnnouncement = 0;
lastAnnouncedAliveTo = 0;
load = 0;
+ peers = 0;
x = 0;
y = 0;
z = 0;
@@ -344,17 +348,9 @@ private:
std::vector<uint16_t> _memberIds;
Mutex _memberIds_m;
- struct _PA
- {
- _PA() : ts(0),mid(0xffffffff) {}
- uint64_t ts;
- unsigned int mid;
- };
- Hashtable< Address,_PA > _peerAffinities;
+ Hashtable< Address,unsigned int > _peerAffinities;
Mutex _peerAffinities_m;
- uint64_t _lastCleanedPeerAffinities;
- uint64_t _lastCheckedPeersForAnnounce;
uint64_t _lastFlushed;
};
diff --git a/node/Peer.cpp b/node/Peer.cpp
index d3394da6..52727c78 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -156,8 +156,15 @@ void Peer::received(
}
}
- if (!pathIsConfirmed) {
- if ((verb == Packet::VERB_OK)||(RR->topology->amRoot())) {
+ if (pathIsConfirmed) {
+
+#ifdef ZT_ENABLE_CLUSTER
+ if ((RR->cluster)&&(verb == Packet::VERB_HELLO))
+ RR->cluster->replicateHavePeer(_id);
+#endif
+
+ } else {
+ if (verb == Packet::VERB_OK) {
Path *slot = (Path *)0;
if (np < ZT_MAX_PEER_NETWORK_PATHS) {
@@ -179,6 +186,11 @@ void Peer::received(
_sortPaths(now);
}
+#ifdef ZT_ENABLE_CLUSTER
+ if (RR->cluster)
+ RR->cluster->replicateHavePeer(_id);
+#endif
+
} else {
/* If this path is not known, send a HELLO. We don't learn
@@ -196,11 +208,6 @@ void Peer::received(
}
} // end _lock
-#ifdef ZT_ENABLE_CLUSTER
- if ((RR->cluster)&&(pathIsConfirmed))
- RR->cluster->replicateHavePeer(_id);
-#endif
-
if (needMulticastGroupAnnounce) {
const std::vector< SharedPtr<Network> > networks(RR->node->allNetworks());
for(std::vector< SharedPtr<Network> >::const_iterator n(networks.begin());n!=networks.end();++n)
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 4d6dacff..355440a5 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -616,15 +616,15 @@ void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const Inet
if (fragment.hops() < ZT_RELAY_MAX_HOPS) {
fragment.incrementHops();
-#ifdef ZT_ENABLE_CLUSTER
- if ((RR->cluster)&&(RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false)))
- return;
-#endif
-
// Note: we don't bother initiating NAT-t for fragments, since heads will set that off.
// It wouldn't hurt anything, just redundant and unnecessary.
SharedPtr<Peer> relayTo = RR->topology->getPeer(destination);
if ((!relayTo)||(!relayTo->send(RR,fragment.data(),fragment.size(),RR->node->now()))) {
+#ifdef ZT_ENABLE_CLUSTER
+ if ((RR->cluster)&&(RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false)))
+ return;
+#endif
+
// Don't know peer or no direct path -- so relay via root server
relayTo = RR->topology->getBestRoot();
if (relayTo)
@@ -702,19 +702,6 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr
if (packet->hops() < ZT_RELAY_MAX_HOPS) {
packet->incrementHops();
-#ifdef ZT_ENABLE_CLUSTER
- if (RR->cluster) {
- Mutex::Lock _l(_lastUniteAttempt_m);
- uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)];
- const bool shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL);
- if (RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size(),shouldUnite)) {
- if (shouldUnite)
- luts = now;
- return;
- }
- }
-#endif
-
SharedPtr<Peer> relayTo = RR->topology->getPeer(destination);
if ((relayTo)&&((relayTo->send(RR,packet->data(),packet->size(),now)))) {
Mutex::Lock _l(_lastUniteAttempt_m);
@@ -724,6 +711,19 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr
unite(source,destination);
}
} else {
+#ifdef ZT_ENABLE_CLUSTER
+ if (RR->cluster) {
+ Mutex::Lock _l(_lastUniteAttempt_m);
+ uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)];
+ const bool shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL);
+ if (RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size(),shouldUnite)) {
+ if (shouldUnite)
+ luts = now;
+ return;
+ }
+ }
+#endif
+
relayTo = RR->topology->getBestRoot(&source,1,true);
if (relayTo)
relayTo->send(RR,packet->data(),packet->size(),now);