From eb79d4a2f34b34c49cd2d69efac22d9bc8ac27cb Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 20 Oct 2015 16:24:21 -0700 Subject: Wire up peer announcement in cluster. --- node/Cluster.cpp | 72 +++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 14 deletions(-) (limited to 'node/Cluster.cpp') diff --git a/node/Cluster.cpp b/node/Cluster.cpp index 5b76d1f0..f1dc45b9 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -263,8 +263,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) remotePeerAddress.appendTo(rendezvousForDest); Buffer<2048> rendezvousForOtherEnd; - rendezvousForOtherEnd.addSize(2); // leave room for payload size - rendezvousForOtherEnd.append((uint8_t)STATE_MESSAGE_PROXY_SEND); remotePeerAddress.appendTo(rendezvousForOtherEnd); rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS); const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size(); @@ -298,9 +296,8 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) } if (haveMatch) { + _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size()); RR->sw->send(rendezvousForDest,true,0); - rendezvousForOtherEnd.setAt(0,(uint16_t)(rendezvousForOtherEnd.size() - 2)); - _send(fromMemberId,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size()); } } } @@ -331,14 +328,64 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) void Cluster::replicateHavePeer(const Identity &peerId) { + { // Use peer affinity table to track our own last announce time for peers + _PeerAffinity pa(peerId.address(),_id,RR->node->now()); + Mutex::Lock _l2(_peerAffinities_m); + std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n)) + if ((i != _peerAffinities.end())&&(i->key == pa.key)) { + if ((pa.timestamp - i->timestamp) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) { + i->timestamp = pa.timestamp; + // continue to announcement + } else { + // we've already announced this peer recently, so skip + return; + } + } else { + _peerAffinities.push_back(pa); + std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now + // continue to announcement + } + } + + // announcement + Buffer<4096> buf; + peerId.serialize(buf,false); + { + Mutex::Lock _l(_memberIds_m); + for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,STATE_MESSAGE_HAVE_PEER,buf.data(),buf.size()); + } + } } void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group) { + Buffer<4096> buf; + buf.append((uint64_t)nwid); + peerAddress.appendTo(buf); + group.mac().appendTo(buf); + buf.append((uint32_t)group.adi()); + { + Mutex::Lock _l(_memberIds_m); + for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,STATE_MESSAGE_MULTICAST_LIKE,buf.data(),buf.size()); + } + } } void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com) { + Buffer<4096> buf; + com.serialize(buf); + { + Mutex::Lock _l(_memberIds_m); + for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,STATE_MESSAGE_COM,buf.data(),buf.size()); + } + } } void Cluster::doPeriodicTasks() @@ -371,7 +418,7 @@ void Cluster::doPeriodicTasks() alive.append((uint8_t)_zeroTierPhysicalEndpoints.size()); for(std::vector::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe) pe->serialize(alive); - _send(*mid,alive.data(),alive.size()); + _send(*mid,STATE_MESSAGE_ALIVE,alive.data(),alive.size()); _members[*mid].lastAnnouncedAliveTo = now; } @@ -498,18 +545,15 @@ bool Cluster::redirectPeer(const SharedPtr &peer,const InetAddress &peerPh } } -void Cluster::_send(uint16_t memberId,const void *msg,unsigned int len) +void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len) { _Member &m = _members[memberId]; // assumes m.lock is locked! - for(;;) { - if ((m.q.size() + len) > ZT_CLUSTER_MAX_MESSAGE_LENGTH) - _flush(memberId); - else { - m.q.append(msg,len); - break; - } - } + if ((m.q.size() + len + 3) > ZT_CLUSTER_MAX_MESSAGE_LENGTH) + _flush(memberId); + m.q.append((uint16_t)(len + 1)); + m.q.append((uint8_t)type); + m.q.append(msg,len); } void Cluster::_flush(uint16_t memberId) -- cgit v1.2.3