diff options
Diffstat (limited to 'node/Cluster.cpp')
-rw-r--r-- | node/Cluster.cpp | 109 |
1 files changed, 35 insertions, 74 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp index dc7ecab2..9c954fa3 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -184,6 +184,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) m.z = dmsg.at<int32_t>(ptr); ptr += 4; ptr += 8; // skip local clock, not used m.load = dmsg.at<uint64_t>(ptr); ptr += 8; + m.peers = dmsg.at<uint64_t>(ptr); ptr += 8; ptr += 8; // skip flags, unused #ifdef ZT_TRACE std::string addrs; @@ -215,12 +216,22 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) case STATE_MESSAGE_HAVE_PEER: { const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; Mutex::Lock _l2(_peerAffinities_m); - _PA &pa = _peerAffinities[zeroTierAddress]; - pa.ts = RR->node->now(); - pa.mid = fromMemberId; + _peerAffinities.set(zeroTierAddress,fromMemberId); TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str()); } break; + case STATE_MESSAGE_WANT_PEER: { + const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + SharedPtr<Peer> peer(RR->topology->getPeerNoCache(zeroTierAddress)); + if ((peer)&&(peer->hasActiveDirectPath(RR->node->now()))) { + char buf[ZT_ADDRESS_LENGTH]; + peer->address().copyTo(buf,ZT_ADDRESS_LENGTH); + Mutex::Lock _l2(_members[fromMemberId].lock); + _send(fromMemberId,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH); + _flush(fromMemberId); + } + } break; + case STATE_MESSAGE_MULTICAST_LIKE: { const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8; const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; @@ -311,7 +322,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) if (haveMatch) { _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); - _flush(fromMemberId); // we want this to go ASAP, since with port restricted cone NATs success can be timing-sensitive + _flush(fromMemberId); RR->sw->send(rendezvousForLocal,true,0); } } @@ -349,12 +360,22 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee const uint64_t now = RR->node->now(); unsigned int canHasPeer = 0; - { // Anyone got this peer? + { Mutex::Lock _l2(_peerAffinities_m); - _PA *pa = _peerAffinities.get(toPeerAddress); - if ((pa)&&(pa->mid != _id)&&((now - pa->ts) < ZT_PEER_ACTIVITY_TIMEOUT)) - canHasPeer = pa->mid; - else return false; + const unsigned int *pa = _peerAffinities.get(toPeerAddress); + if (!pa) { + char buf[ZT_ADDRESS_LENGTH]; + peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH); + { + Mutex::Lock _l(_memberIds_m); + for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,STATE_MESSAGE_WANT_PEER,buf,ZT_ADDRESS_LENGTH); + } + } + return false; + } + canHasPeer = *pa; } Buffer<1024> buf; @@ -395,22 +416,6 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee void Cluster::replicateHavePeer(const Identity &peerId) { - const uint64_t now = RR->node->now(); - { - Mutex::Lock _l2(_peerAffinities_m); - _PA &pa = _peerAffinities[peerId.address()]; - if (pa.mid != _id) { - pa.ts = now; - pa.mid = _id; - // fall through to send code below - } else if ((now - pa.ts) < ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) { - return; - } else { - pa.ts = now; - // fall through to send code below - } - } - char buf[ZT_ADDRESS_LENGTH]; peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH); { @@ -455,44 +460,9 @@ void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembers */ } -struct _ClusterAnnouncePeers -{ - _ClusterAnnouncePeers(const uint64_t now_,Cluster *parent_) : now(now_),parent(parent_) {} - const uint64_t now; - Cluster *const parent; - inline void operator()(const Topology &t,const SharedPtr<Peer> &peer) const - { - if (peer->hasActiveDirectPath(now)) - parent->replicateHavePeer(peer->identity()); - } -}; void Cluster::doPeriodicTasks() { const uint64_t now = RR->node->now(); - - // Erase old peer affinity entries just to control table size - if ((now - _lastCleanedPeerAffinities) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) { - _lastCleanedPeerAffinities = now; - Address *k = (Address *)0; - _PA *v = (_PA *)0; - Mutex::Lock _l(_peerAffinities_m); - Hashtable< Address,_PA >::Iterator i(_peerAffinities); - while (i.next(k,v)) { - if ((now - v->ts) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) - _peerAffinities.erase(*k); - } - } - - // Announce peers that we have active direct paths to -- note that we forget paths - // that other cluster members claim they have, which prevents us from fighting - // with other cluster members (route flapping) over specific paths. - if ((now - _lastCheckedPeersForAnnounce) >= (ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD / 4)) { - _lastCheckedPeersForAnnounce = now; - _ClusterAnnouncePeers func(now,this); - RR->topology->eachPeer<_ClusterAnnouncePeers &>(func); - } - - // Flush outgoing packet send queue every doPeriodicTasks() if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) { _lastFlushed = now; Mutex::Lock _l(_memberIds_m); @@ -516,6 +486,7 @@ void Cluster::doPeriodicTasks() } alive.append((uint64_t)now); alive.append((uint64_t)0); // TODO: compute and send load average + alive.append((uint64_t)RR->topology->countActive()); alive.append((uint64_t)0); // unused/reserved flags alive.append((uint8_t)_zeroTierPhysicalEndpoints.size()); for(std::vector<InetAddress>::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe) @@ -630,8 +601,6 @@ void Cluster::status(ZT_ClusterStatus &status) const { const uint64_t now = RR->node->now(); memset(&status,0,sizeof(ZT_ClusterStatus)); - ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS]; - memset(ms,0,sizeof(ms)); status.myId = _id; @@ -641,6 +610,7 @@ void Cluster::status(ZT_ClusterStatus &status) const ms[_id]->x = _x; ms[_id]->y = _y; ms[_id]->z = _z; + ms[_id]->load = 0; // TODO ms[_id]->peers = RR->topology->countActive(); for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) { if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check @@ -653,10 +623,11 @@ void Cluster::status(ZT_ClusterStatus &status) const for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check break; - ZT_ClusterMemberStatus *s = ms[*mid] = &(status.members[status.clusterSize++]); + _Member &m = _members[*mid]; Mutex::Lock ml(m.lock); + ZT_ClusterMemberStatus *const s = &(status.members[status.clusterSize++]); s->id = *mid; s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement)); s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0; @@ -664,6 +635,7 @@ void Cluster::status(ZT_ClusterStatus &status) const s->y = m.y; s->z = m.z; s->load = m.load; + s->peers = m.peers; for(std::vector<InetAddress>::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) { if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check break; @@ -671,17 +643,6 @@ void Cluster::status(ZT_ClusterStatus &status) const } } } - - { - Mutex::Lock _l2(_peerAffinities_m); - Address *k = (Address *)0; - _PA *v = (_PA *)0; - Hashtable< Address,_PA >::Iterator i(const_cast<Cluster *>(this)->_peerAffinities); - while (i.next(k,v)) { - if ( (ms[v->mid]) && (v->mid != _id) && ((now - v->ts) < ZT_PEER_ACTIVITY_TIMEOUT) ) - ++ms[v->mid]->peers; - } - } } void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len) |