summaryrefslogtreecommitdiff
path: root/node/Cluster.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2015-10-20 16:24:21 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2015-10-20 16:24:21 -0700
commiteb79d4a2f34b34c49cd2d69efac22d9bc8ac27cb (patch)
treeb1670c890b60d5ae02fa49f180bb831826edf1d8 /node/Cluster.cpp
parent57e29857cf79019af03f6a3dfe0bf6fd36e2fab2 (diff)
downloadinfinitytier-eb79d4a2f34b34c49cd2d69efac22d9bc8ac27cb.tar.gz
infinitytier-eb79d4a2f34b34c49cd2d69efac22d9bc8ac27cb.zip
Wire up peer announcement in cluster.
Diffstat (limited to 'node/Cluster.cpp')
-rw-r--r--node/Cluster.cpp72
1 files changed, 58 insertions, 14 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index 5b76d1f0..f1dc45b9 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -263,8 +263,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
remotePeerAddress.appendTo(rendezvousForDest);
Buffer<2048> rendezvousForOtherEnd;
- rendezvousForOtherEnd.addSize(2); // leave room for payload size
- rendezvousForOtherEnd.append((uint8_t)STATE_MESSAGE_PROXY_SEND);
remotePeerAddress.appendTo(rendezvousForOtherEnd);
rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
@@ -298,9 +296,8 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
}
if (haveMatch) {
+ _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
RR->sw->send(rendezvousForDest,true,0);
- rendezvousForOtherEnd.setAt<uint16_t>(0,(uint16_t)(rendezvousForOtherEnd.size() - 2));
- _send(fromMemberId,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
}
}
}
@@ -331,14 +328,64 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
void Cluster::replicateHavePeer(const Identity &peerId)
{
+ { // Use peer affinity table to track our own last announce time for peers
+ _PeerAffinity pa(peerId.address(),_id,RR->node->now());
+ Mutex::Lock _l2(_peerAffinities_m);
+ std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
+ if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
+ if ((pa.timestamp - i->timestamp) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
+ i->timestamp = pa.timestamp;
+ // continue to announcement
+ } else {
+ // we've already announced this peer recently, so skip
+ return;
+ }
+ } else {
+ _peerAffinities.push_back(pa);
+ std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
+ // continue to announcement
+ }
+ }
+
+ // announcement
+ Buffer<4096> buf;
+ peerId.serialize(buf,false);
+ {
+ Mutex::Lock _l(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ Mutex::Lock _l2(_members[*mid].lock);
+ _send(*mid,STATE_MESSAGE_HAVE_PEER,buf.data(),buf.size());
+ }
+ }
}
void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group)
{
+ Buffer<4096> buf;
+ buf.append((uint64_t)nwid);
+ peerAddress.appendTo(buf);
+ group.mac().appendTo(buf);
+ buf.append((uint32_t)group.adi());
+ {
+ Mutex::Lock _l(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ Mutex::Lock _l2(_members[*mid].lock);
+ _send(*mid,STATE_MESSAGE_MULTICAST_LIKE,buf.data(),buf.size());
+ }
+ }
}
void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com)
{
+ Buffer<4096> buf;
+ com.serialize(buf);
+ {
+ Mutex::Lock _l(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ Mutex::Lock _l2(_members[*mid].lock);
+ _send(*mid,STATE_MESSAGE_COM,buf.data(),buf.size());
+ }
+ }
}
void Cluster::doPeriodicTasks()
@@ -371,7 +418,7 @@ void Cluster::doPeriodicTasks()
alive.append((uint8_t)_zeroTierPhysicalEndpoints.size());
for(std::vector<InetAddress>::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe)
pe->serialize(alive);
- _send(*mid,alive.data(),alive.size());
+ _send(*mid,STATE_MESSAGE_ALIVE,alive.data(),alive.size());
_members[*mid].lastAnnouncedAliveTo = now;
}
@@ -498,18 +545,15 @@ bool Cluster::redirectPeer(const SharedPtr<Peer> &peer,const InetAddress &peerPh
}
}
-void Cluster::_send(uint16_t memberId,const void *msg,unsigned int len)
+void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len)
{
_Member &m = _members[memberId];
// assumes m.lock is locked!
- for(;;) {
- if ((m.q.size() + len) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)
- _flush(memberId);
- else {
- m.q.append(msg,len);
- break;
- }
- }
+ if ((m.q.size() + len + 3) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)
+ _flush(memberId);
+ m.q.append((uint16_t)(len + 1));
+ m.q.append((uint8_t)type);
+ m.q.append(msg,len);
}
void Cluster::_flush(uint16_t memberId)