From 5f39d5b7ea202ca39cef46779b5406e35e0dcb3e Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 6 Nov 2015 14:37:17 -0800 Subject: Further pare down Cluster messaging and rename some stuff. --- node/Cluster.cpp | 31 ++++++++++--------------------- node/Cluster.hpp | 7 +------ node/IncomingPacket.cpp | 4 ++-- node/Node.cpp | 4 ++-- node/Peer.cpp | 10 +++++----- node/Peer.hpp | 2 +- node/Switch.cpp | 6 +++--- 7 files changed, 24 insertions(+), 40 deletions(-) (limited to 'node') diff --git a/node/Cluster.cpp b/node/Cluster.cpp index e9c5856f..dc7ecab2 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -214,19 +214,10 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) case STATE_MESSAGE_HAVE_PEER: { const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - InetAddress physicalAddress; - ptr += physicalAddress.deserialize(dmsg,ptr); - if (physicalAddress) { - SharedPtr myPeerRecord(RR->topology->getPeerNoCache(zeroTierAddress)); - if (myPeerRecord) - myPeerRecord->removePathByAddress(physicalAddress); - } - { - Mutex::Lock _l2(_peerAffinities_m); - _PA &pa = _peerAffinities[zeroTierAddress]; - pa.ts = RR->node->now(); - pa.mid = fromMemberId; - } + Mutex::Lock _l2(_peerAffinities_m); + _PA &pa = _peerAffinities[zeroTierAddress]; + pa.ts = RR->node->now(); + pa.mid = fromMemberId; TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str()); } break; @@ -402,7 +393,7 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee return true; } -void Cluster::replicateHavePeer(const Identity &peerId,const InetAddress &physicalAddress) +void Cluster::replicateHavePeer(const Identity &peerId) { const uint64_t now = RR->node->now(); { @@ -420,14 +411,13 @@ void Cluster::replicateHavePeer(const Identity &peerId,const InetAddress &physic } } - Buffer<1024> buf; - peerId.address().appendTo(buf); - physicalAddress.serialize(buf); + char buf[ZT_ADDRESS_LENGTH]; + peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH); { Mutex::Lock _l(_memberIds_m); for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { Mutex::Lock _l2(_members[*mid].lock); - _send(*mid,STATE_MESSAGE_HAVE_PEER,buf.data(),buf.size()); + _send(*mid,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH); } } } @@ -472,9 +462,8 @@ struct _ClusterAnnouncePeers Cluster *const parent; inline void operator()(const Topology &t,const SharedPtr &peer) const { - Path *p = peer->getBestPath(now); - if (p) - parent->replicateHavePeer(peer->identity(),p->address()); + if (peer->hasActiveDirectPath(now)) + parent->replicateHavePeer(peer->identity()); } }; void Cluster::doPeriodicTasks() diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 22297506..73391114 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -125,10 +125,6 @@ public: /** * Cluster member has this peer: * <[5] ZeroTier address of peer> - * <[...] binary serialized peer remote physical address> - * - * Clusters send this message when they learn a path to a peer. The - * replicated physical address is the one learned. */ STATE_MESSAGE_HAVE_PEER = 2, @@ -237,9 +233,8 @@ public: * Advertise to the cluster that we have this peer * * @param peerId Identity of peer that we have - * @param physicalAddress Physical address of peer (from our POV) */ - void replicateHavePeer(const Identity &peerId,const InetAddress &physicalAddress); + void replicateHavePeer(const Identity &peerId); /** * Advertise a multicast LIKE to the cluster diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index f2216235..26339b00 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -972,7 +972,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,const Sha if ( ((flags & 0x01) == 0) && (Path::isAddressValidForPath(a)) ) { if (++countPerScope[(int)a.ipScope()][0] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) { TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str()); - peer->attemptToContactAt(RR,_localAddress,a,now); + peer->sendHELLO(RR,_localAddress,a,now); } else { TRACE("ignoring contact for %s at %s -- too many per scope",peer->address().toString().c_str(),a.toString().c_str()); } @@ -983,7 +983,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,const Sha if ( ((flags & 0x01) == 0) && (Path::isAddressValidForPath(a)) ) { if (++countPerScope[(int)a.ipScope()][1] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) { TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str()); - peer->attemptToContactAt(RR,_localAddress,a,now); + peer->sendHELLO(RR,_localAddress,a,now); } else { TRACE("ignoring contact for %s at %s -- too many per scope",peer->address().toString().c_str(),a.toString().c_str()); } diff --git a/node/Node.cpp b/node/Node.cpp index bcf5db1a..6ba038eb 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -249,7 +249,7 @@ public: } else { if (stableEndpoint4) { needToContactIndirect = false; - p->attemptToContactAt(RR,InetAddress(),stableEndpoint4,_now); + p->sendHELLO(RR,InetAddress(),stableEndpoint4,_now); } } if (p->doPingAndKeepalive(RR,_now,AF_INET6)) { @@ -257,7 +257,7 @@ public: } else { if (stableEndpoint6) { needToContactIndirect = false; - p->attemptToContactAt(RR,InetAddress(),stableEndpoint6,_now); + p->sendHELLO(RR,InetAddress(),stableEndpoint6,_now); } } diff --git a/node/Peer.cpp b/node/Peer.cpp index 0b981c8e..d3394da6 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -188,7 +188,7 @@ void Peer::received( if ((now - _lastPathConfirmationSent) >= ZT_MIN_PATH_CONFIRMATION_INTERVAL) { _lastPathConfirmationSent = now; TRACE("got %s via unknown path %s(%s), confirming...",Packet::verbString(verb),_id.address().toString().c_str(),remoteAddr.toString().c_str()); - attemptToContactAt(RR,localAddr,remoteAddr,now); + sendHELLO(RR,localAddr,remoteAddr,now); } } @@ -198,7 +198,7 @@ void Peer::received( #ifdef ZT_ENABLE_CLUSTER if ((RR->cluster)&&(pathIsConfirmed)) - RR->cluster->replicateHavePeer(_id,remoteAddr); + RR->cluster->replicateHavePeer(_id); #endif if (needMulticastGroupAnnounce) { @@ -208,7 +208,7 @@ void Peer::received( } } -void Peer::attemptToContactAt(const RuntimeEnvironment *RR,const InetAddress &localAddr,const InetAddress &atAddress,uint64_t now) +void Peer::sendHELLO(const RuntimeEnvironment *RR,const InetAddress &localAddr,const InetAddress &atAddress,uint64_t now) { // _lock not required here since _id is immutable and nothing else is accessed @@ -242,7 +242,7 @@ bool Peer::doPingAndKeepalive(const RuntimeEnvironment *RR,uint64_t now,int inet if (p) { if ((now - p->lastReceived()) >= ZT_PEER_DIRECT_PING_DELAY) { //TRACE("PING %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived()); - attemptToContactAt(RR,p->localAddress(),p->address(),now); + sendHELLO(RR,p->localAddress(),p->address(),now); p->sent(now); } else if (((now - p->lastSend()) >= ZT_NAT_KEEPALIVE_DELAY)&&(!p->reliable())) { //TRACE("NAT keepalive %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived()); @@ -348,7 +348,7 @@ bool Peer::resetWithinScope(const RuntimeEnvironment *RR,InetAddress::IpScope sc unsigned int y = 0; while (x < np) { if (_paths[x].address().ipScope() == scope) { - attemptToContactAt(RR,_paths[x].localAddress(),_paths[x].address(),now); + sendHELLO(RR,_paths[x].localAddress(),_paths[x].address(),now); } else { _paths[y++] = _paths[x]; } diff --git a/node/Peer.hpp b/node/Peer.hpp index a70d9868..42708128 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -171,7 +171,7 @@ public: * @param atAddress Destination address * @param now Current time */ - void attemptToContactAt(const RuntimeEnvironment *RR,const InetAddress &localAddr,const InetAddress &atAddress,uint64_t now); + void sendHELLO(const RuntimeEnvironment *RR,const InetAddress &localAddr,const InetAddress &atAddress,uint64_t now); /** * Send pings or keepalives depending on configured timeouts diff --git a/node/Switch.cpp b/node/Switch.cpp index 8d3c12f5..4d6dacff 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -435,7 +435,7 @@ void Switch::rendezvous(const SharedPtr &peer,const InetAddress &localAddr { TRACE("sending NAT-t message to %s(%s)",peer->address().toString().c_str(),atAddr.toString().c_str()); const uint64_t now = RR->node->now(); - peer->attemptToContactAt(RR,localAddr,atAddr,now); + peer->sendHELLO(RR,localAddr,atAddr,now); { Mutex::Lock _l(_contactQueue_m); _contactQueue.push_back(ContactQueueEntry(peer,now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY,localAddr,atAddr)); @@ -508,14 +508,14 @@ unsigned long Switch::doTimerTasks(uint64_t now) } else { if (qi->strategyIteration == 0) { // First strategy: send packet directly to destination - qi->peer->attemptToContactAt(RR,qi->localAddr,qi->inaddr,now); + qi->peer->sendHELLO(RR,qi->localAddr,qi->inaddr,now); } else if (qi->strategyIteration <= 4) { // Strategies 1-4: try escalating ports for symmetric NATs that remap sequentially InetAddress tmpaddr(qi->inaddr); int p = (int)qi->inaddr.port() + qi->strategyIteration; if (p < 0xffff) { tmpaddr.setPort((unsigned int)p); - qi->peer->attemptToContactAt(RR,qi->localAddr,tmpaddr,now); + qi->peer->sendHELLO(RR,qi->localAddr,tmpaddr,now); } else qi->strategyIteration = 5; } else { // All strategies tried, expire entry -- cgit v1.2.3 From 6bc8c9d8efd05c7f85ac269be29c781fcc40672b Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 6 Nov 2015 16:12:41 -0800 Subject: Clustering cleanup, still a work in progress. --- include/ZeroTierOne.h | 2 +- node/Cluster.cpp | 109 ++++++++++++++++---------------------------------- node/Cluster.hpp | 34 +++++++--------- node/Peer.cpp | 21 ++++++---- node/Switch.cpp | 36 ++++++++--------- 5 files changed, 83 insertions(+), 119 deletions(-) (limited to 'node') diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h index fd7857d9..91700886 100644 --- a/include/ZeroTierOne.h +++ b/include/ZeroTierOne.h @@ -912,7 +912,7 @@ typedef struct { uint64_t load; /** - * Number of peers this cluster member "has" + * Number of peers */ uint64_t peers; diff --git a/node/Cluster.cpp b/node/Cluster.cpp index dc7ecab2..9c954fa3 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -184,6 +184,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) m.z = dmsg.at(ptr); ptr += 4; ptr += 8; // skip local clock, not used m.load = dmsg.at(ptr); ptr += 8; + m.peers = dmsg.at(ptr); ptr += 8; ptr += 8; // skip flags, unused #ifdef ZT_TRACE std::string addrs; @@ -215,12 +216,22 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) case STATE_MESSAGE_HAVE_PEER: { const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; Mutex::Lock _l2(_peerAffinities_m); - _PA &pa = _peerAffinities[zeroTierAddress]; - pa.ts = RR->node->now(); - pa.mid = fromMemberId; + _peerAffinities.set(zeroTierAddress,fromMemberId); TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str()); } break; + case STATE_MESSAGE_WANT_PEER: { + const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + SharedPtr peer(RR->topology->getPeerNoCache(zeroTierAddress)); + if ((peer)&&(peer->hasActiveDirectPath(RR->node->now()))) { + char buf[ZT_ADDRESS_LENGTH]; + peer->address().copyTo(buf,ZT_ADDRESS_LENGTH); + Mutex::Lock _l2(_members[fromMemberId].lock); + _send(fromMemberId,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH); + _flush(fromMemberId); + } + } break; + case STATE_MESSAGE_MULTICAST_LIKE: { const uint64_t nwid = dmsg.at(ptr); ptr += 8; const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; @@ -311,7 +322,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) if (haveMatch) { _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); - _flush(fromMemberId); // we want this to go ASAP, since with port restricted cone NATs success can be timing-sensitive + _flush(fromMemberId); RR->sw->send(rendezvousForLocal,true,0); } } @@ -349,12 +360,22 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee const uint64_t now = RR->node->now(); unsigned int canHasPeer = 0; - { // Anyone got this peer? + { Mutex::Lock _l2(_peerAffinities_m); - _PA *pa = _peerAffinities.get(toPeerAddress); - if ((pa)&&(pa->mid != _id)&&((now - pa->ts) < ZT_PEER_ACTIVITY_TIMEOUT)) - canHasPeer = pa->mid; - else return false; + const unsigned int *pa = _peerAffinities.get(toPeerAddress); + if (!pa) { + char buf[ZT_ADDRESS_LENGTH]; + peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH); + { + Mutex::Lock _l(_memberIds_m); + for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,STATE_MESSAGE_WANT_PEER,buf,ZT_ADDRESS_LENGTH); + } + } + return false; + } + canHasPeer = *pa; } Buffer<1024> buf; @@ -395,22 +416,6 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee void Cluster::replicateHavePeer(const Identity &peerId) { - const uint64_t now = RR->node->now(); - { - Mutex::Lock _l2(_peerAffinities_m); - _PA &pa = _peerAffinities[peerId.address()]; - if (pa.mid != _id) { - pa.ts = now; - pa.mid = _id; - // fall through to send code below - } else if ((now - pa.ts) < ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) { - return; - } else { - pa.ts = now; - // fall through to send code below - } - } - char buf[ZT_ADDRESS_LENGTH]; peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH); { @@ -455,44 +460,9 @@ void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembers */ } -struct _ClusterAnnouncePeers -{ - _ClusterAnnouncePeers(const uint64_t now_,Cluster *parent_) : now(now_),parent(parent_) {} - const uint64_t now; - Cluster *const parent; - inline void operator()(const Topology &t,const SharedPtr &peer) const - { - if (peer->hasActiveDirectPath(now)) - parent->replicateHavePeer(peer->identity()); - } -}; void Cluster::doPeriodicTasks() { const uint64_t now = RR->node->now(); - - // Erase old peer affinity entries just to control table size - if ((now - _lastCleanedPeerAffinities) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) { - _lastCleanedPeerAffinities = now; - Address *k = (Address *)0; - _PA *v = (_PA *)0; - Mutex::Lock _l(_peerAffinities_m); - Hashtable< Address,_PA >::Iterator i(_peerAffinities); - while (i.next(k,v)) { - if ((now - v->ts) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) - _peerAffinities.erase(*k); - } - } - - // Announce peers that we have active direct paths to -- note that we forget paths - // that other cluster members claim they have, which prevents us from fighting - // with other cluster members (route flapping) over specific paths. - if ((now - _lastCheckedPeersForAnnounce) >= (ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD / 4)) { - _lastCheckedPeersForAnnounce = now; - _ClusterAnnouncePeers func(now,this); - RR->topology->eachPeer<_ClusterAnnouncePeers &>(func); - } - - // Flush outgoing packet send queue every doPeriodicTasks() if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) { _lastFlushed = now; Mutex::Lock _l(_memberIds_m); @@ -516,6 +486,7 @@ void Cluster::doPeriodicTasks() } alive.append((uint64_t)now); alive.append((uint64_t)0); // TODO: compute and send load average + alive.append((uint64_t)RR->topology->countActive()); alive.append((uint64_t)0); // unused/reserved flags alive.append((uint8_t)_zeroTierPhysicalEndpoints.size()); for(std::vector::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe) @@ -630,8 +601,6 @@ void Cluster::status(ZT_ClusterStatus &status) const { const uint64_t now = RR->node->now(); memset(&status,0,sizeof(ZT_ClusterStatus)); - ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS]; - memset(ms,0,sizeof(ms)); status.myId = _id; @@ -641,6 +610,7 @@ void Cluster::status(ZT_ClusterStatus &status) const ms[_id]->x = _x; ms[_id]->y = _y; ms[_id]->z = _z; + ms[_id]->load = 0; // TODO ms[_id]->peers = RR->topology->countActive(); for(std::vector::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) { if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check @@ -653,10 +623,11 @@ void Cluster::status(ZT_ClusterStatus &status) const for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check break; - ZT_ClusterMemberStatus *s = ms[*mid] = &(status.members[status.clusterSize++]); + _Member &m = _members[*mid]; Mutex::Lock ml(m.lock); + ZT_ClusterMemberStatus *const s = &(status.members[status.clusterSize++]); s->id = *mid; s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement)); s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0; @@ -664,6 +635,7 @@ void Cluster::status(ZT_ClusterStatus &status) const s->y = m.y; s->z = m.z; s->load = m.load; + s->peers = m.peers; for(std::vector::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) { if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check break; @@ -671,17 +643,6 @@ void Cluster::status(ZT_ClusterStatus &status) const } } } - - { - Mutex::Lock _l2(_peerAffinities_m); - Address *k = (Address *)0; - _PA *v = (_PA *)0; - Hashtable< Address,_PA >::Iterator i(const_cast(this)->_peerAffinities); - while (i.next(k,v)) { - if ( (ms[v->mid]) && (v->mid != _id) && ((now - v->ts) < ZT_PEER_ACTIVITY_TIMEOUT) ) - ++ms[v->mid]->peers; - } - } } void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len) diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 73391114..c5d110d0 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -52,11 +52,6 @@ */ #define ZT_CLUSTER_TIMEOUT 10000 -/** - * How often should we announce that we have a peer? - */ -#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD ((ZT_PEER_DIRECT_PING_DELAY / 2) - 1000) - /** * Desired period between doPeriodicTasks() in milliseconds */ @@ -116,6 +111,7 @@ public: * <[4] Z location (signed 32-bit)> * <[8] local clock at this member> * <[8] load average> + * <[8] number of peers> * <[8] flags (currently unused, must be zero)> * <[1] number of preferred ZeroTier endpoints> * <[...] InetAddress(es) of preferred ZeroTier endpoint(s)> @@ -128,6 +124,12 @@ public: */ STATE_MESSAGE_HAVE_PEER = 2, + /** + * Cluster member wants this peer: + * <[5] ZeroTier address of peer> + */ + STATE_MESSAGE_WANT_PEER = 3, + /** * Peer subscription to multicast group: * <[8] network ID> @@ -135,13 +137,13 @@ public: * <[6] MAC address of multicast group> * <[4] 32-bit multicast group ADI> */ - STATE_MESSAGE_MULTICAST_LIKE = 3, + STATE_MESSAGE_MULTICAST_LIKE = 4, /** * Certificate of network membership for a peer: * <[...] serialized COM> */ - STATE_MESSAGE_COM = 4, + STATE_MESSAGE_COM = 5, /** * Request that VERB_RENDEZVOUS be sent to a peer that we have: @@ -155,7 +157,7 @@ public: * info for its peer, and we send VERB_RENDEZVOUS to both sides: to ours * directly and with PROXY_SEND to theirs. */ - STATE_MESSAGE_PROXY_UNITE = 5, + STATE_MESSAGE_PROXY_UNITE = 6, /** * Request that a cluster member send a packet to a locally-known peer: @@ -171,7 +173,7 @@ public: * while PROXY_SEND is used to implement proxy sending (which right * now is only used to send RENDEZVOUS). */ - STATE_MESSAGE_PROXY_SEND = 6, + STATE_MESSAGE_PROXY_SEND = 7, /** * Replicate a network config for a network we belong to: @@ -184,7 +186,7 @@ public: * * TODO: not implemented yet! */ - STATE_MESSAGE_NETWORK_CONFIG = 7 + STATE_MESSAGE_NETWORK_CONFIG = 8 }; /** @@ -316,6 +318,7 @@ private: uint64_t lastAnnouncedAliveTo; uint64_t load; + uint64_t peers; int32_t x,y,z; std::vector zeroTierPhysicalEndpoints; @@ -329,6 +332,7 @@ private: lastReceivedAliveAnnouncement = 0; lastAnnouncedAliveTo = 0; load = 0; + peers = 0; x = 0; y = 0; z = 0; @@ -344,17 +348,9 @@ private: std::vector _memberIds; Mutex _memberIds_m; - struct _PA - { - _PA() : ts(0),mid(0xffffffff) {} - uint64_t ts; - unsigned int mid; - }; - Hashtable< Address,_PA > _peerAffinities; + Hashtable< Address,unsigned int > _peerAffinities; Mutex _peerAffinities_m; - uint64_t _lastCleanedPeerAffinities; - uint64_t _lastCheckedPeersForAnnounce; uint64_t _lastFlushed; }; diff --git a/node/Peer.cpp b/node/Peer.cpp index d3394da6..52727c78 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -156,8 +156,15 @@ void Peer::received( } } - if (!pathIsConfirmed) { - if ((verb == Packet::VERB_OK)||(RR->topology->amRoot())) { + if (pathIsConfirmed) { + +#ifdef ZT_ENABLE_CLUSTER + if ((RR->cluster)&&(verb == Packet::VERB_HELLO)) + RR->cluster->replicateHavePeer(_id); +#endif + + } else { + if (verb == Packet::VERB_OK) { Path *slot = (Path *)0; if (np < ZT_MAX_PEER_NETWORK_PATHS) { @@ -179,6 +186,11 @@ void Peer::received( _sortPaths(now); } +#ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) + RR->cluster->replicateHavePeer(_id); +#endif + } else { /* If this path is not known, send a HELLO. We don't learn @@ -196,11 +208,6 @@ void Peer::received( } } // end _lock -#ifdef ZT_ENABLE_CLUSTER - if ((RR->cluster)&&(pathIsConfirmed)) - RR->cluster->replicateHavePeer(_id); -#endif - if (needMulticastGroupAnnounce) { const std::vector< SharedPtr > networks(RR->node->allNetworks()); for(std::vector< SharedPtr >::const_iterator n(networks.begin());n!=networks.end();++n) diff --git a/node/Switch.cpp b/node/Switch.cpp index 4d6dacff..355440a5 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -616,15 +616,15 @@ void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const Inet if (fragment.hops() < ZT_RELAY_MAX_HOPS) { fragment.incrementHops(); -#ifdef ZT_ENABLE_CLUSTER - if ((RR->cluster)&&(RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false))) - return; -#endif - // Note: we don't bother initiating NAT-t for fragments, since heads will set that off. // It wouldn't hurt anything, just redundant and unnecessary. SharedPtr relayTo = RR->topology->getPeer(destination); if ((!relayTo)||(!relayTo->send(RR,fragment.data(),fragment.size(),RR->node->now()))) { +#ifdef ZT_ENABLE_CLUSTER + if ((RR->cluster)&&(RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false))) + return; +#endif + // Don't know peer or no direct path -- so relay via root server relayTo = RR->topology->getBestRoot(); if (relayTo) @@ -702,19 +702,6 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr if (packet->hops() < ZT_RELAY_MAX_HOPS) { packet->incrementHops(); -#ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) { - Mutex::Lock _l(_lastUniteAttempt_m); - uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; - const bool shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL); - if (RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size(),shouldUnite)) { - if (shouldUnite) - luts = now; - return; - } - } -#endif - SharedPtr relayTo = RR->topology->getPeer(destination); if ((relayTo)&&((relayTo->send(RR,packet->data(),packet->size(),now)))) { Mutex::Lock _l(_lastUniteAttempt_m); @@ -724,6 +711,19 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr unite(source,destination); } } else { +#ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) { + Mutex::Lock _l(_lastUniteAttempt_m); + uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; + const bool shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL); + if (RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size(),shouldUnite)) { + if (shouldUnite) + luts = now; + return; + } + } +#endif + relayTo = RR->topology->getBestRoot(&source,1,true); if (relayTo) relayTo->send(RR,packet->data(),packet->size(),now); -- cgit v1.2.3