summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
Diffstat (limited to 'node')
-rw-r--r--node/Cluster.cpp72
-rw-r--r--node/Cluster.hpp10
-rw-r--r--node/IncomingPacket.cpp1
-rw-r--r--node/Peer.cpp7
-rw-r--r--node/Topology.cpp27
5 files changed, 87 insertions, 30 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index 5b76d1f0..f1dc45b9 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -263,8 +263,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
remotePeerAddress.appendTo(rendezvousForDest);
Buffer<2048> rendezvousForOtherEnd;
- rendezvousForOtherEnd.addSize(2); // leave room for payload size
- rendezvousForOtherEnd.append((uint8_t)STATE_MESSAGE_PROXY_SEND);
remotePeerAddress.appendTo(rendezvousForOtherEnd);
rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
@@ -298,9 +296,8 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
}
if (haveMatch) {
+ _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
RR->sw->send(rendezvousForDest,true,0);
- rendezvousForOtherEnd.setAt<uint16_t>(0,(uint16_t)(rendezvousForOtherEnd.size() - 2));
- _send(fromMemberId,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
}
}
}
@@ -331,14 +328,64 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
void Cluster::replicateHavePeer(const Identity &peerId)
{
+ { // Use peer affinity table to track our own last announce time for peers
+ _PeerAffinity pa(peerId.address(),_id,RR->node->now());
+ Mutex::Lock _l2(_peerAffinities_m);
+ std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
+ if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
+ if ((pa.timestamp - i->timestamp) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
+ i->timestamp = pa.timestamp;
+ // continue to announcement
+ } else {
+ // we've already announced this peer recently, so skip
+ return;
+ }
+ } else {
+ _peerAffinities.push_back(pa);
+ std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
+ // continue to announcement
+ }
+ }
+
+ // announcement
+ Buffer<4096> buf;
+ peerId.serialize(buf,false);
+ {
+ Mutex::Lock _l(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ Mutex::Lock _l2(_members[*mid].lock);
+ _send(*mid,STATE_MESSAGE_HAVE_PEER,buf.data(),buf.size());
+ }
+ }
}
void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group)
{
+ Buffer<4096> buf;
+ buf.append((uint64_t)nwid);
+ peerAddress.appendTo(buf);
+ group.mac().appendTo(buf);
+ buf.append((uint32_t)group.adi());
+ {
+ Mutex::Lock _l(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ Mutex::Lock _l2(_members[*mid].lock);
+ _send(*mid,STATE_MESSAGE_MULTICAST_LIKE,buf.data(),buf.size());
+ }
+ }
}
void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com)
{
+ Buffer<4096> buf;
+ com.serialize(buf);
+ {
+ Mutex::Lock _l(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ Mutex::Lock _l2(_members[*mid].lock);
+ _send(*mid,STATE_MESSAGE_COM,buf.data(),buf.size());
+ }
+ }
}
void Cluster::doPeriodicTasks()
@@ -371,7 +418,7 @@ void Cluster::doPeriodicTasks()
alive.append((uint8_t)_zeroTierPhysicalEndpoints.size());
for(std::vector<InetAddress>::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe)
pe->serialize(alive);
- _send(*mid,alive.data(),alive.size());
+ _send(*mid,STATE_MESSAGE_ALIVE,alive.data(),alive.size());
_members[*mid].lastAnnouncedAliveTo = now;
}
@@ -498,18 +545,15 @@ bool Cluster::redirectPeer(const SharedPtr<Peer> &peer,const InetAddress &peerPh
}
}
-void Cluster::_send(uint16_t memberId,const void *msg,unsigned int len)
+void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len)
{
_Member &m = _members[memberId];
// assumes m.lock is locked!
- for(;;) {
- if ((m.q.size() + len) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)
- _flush(memberId);
- else {
- m.q.append(msg,len);
- break;
- }
- }
+ if ((m.q.size() + len + 3) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)
+ _flush(memberId);
+ m.q.append((uint16_t)(len + 1));
+ m.q.append((uint8_t)type);
+ m.q.append(msg,len);
}
void Cluster::_flush(uint16_t memberId)
diff --git a/node/Cluster.hpp b/node/Cluster.hpp
index f253e6f6..df061c60 100644
--- a/node/Cluster.hpp
+++ b/node/Cluster.hpp
@@ -42,6 +42,7 @@
#include "Buffer.hpp"
#include "Mutex.hpp"
#include "SharedPtr.hpp"
+#include "Hashtable.hpp"
/**
* Timeout for cluster members being considered "alive"
@@ -49,6 +50,11 @@
#define ZT_CLUSTER_TIMEOUT 30000
/**
+ * How often should we announce that we have a peer?
+ */
+#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD 60000
+
+/**
* Desired period between doPeriodicTasks() in milliseconds
*/
#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 50
@@ -238,7 +244,7 @@ public:
bool redirectPeer(const SharedPtr<Peer> &peer,const InetAddress &peerPhysicalAddress,bool offload);
private:
- void _send(uint16_t memberId,const void *msg,unsigned int len);
+ void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len);
void _flush(uint16_t memberId);
// These are initialized in the constructor and remain static
@@ -292,7 +298,7 @@ private:
std::vector<uint16_t> _memberIds;
Mutex _memberIds_m;
- // Record tracking which members have which peers and how recently they claimed this
+ // Record tracking which members have which peers and how recently they claimed this -- also used to track our last claimed time
struct _PeerAffinity
{
_PeerAffinity(const Address &a,uint16_t mid,uint64_t ts) :
diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp
index a4d45068..7a47c0c6 100644
--- a/node/IncomingPacket.cpp
+++ b/node/IncomingPacket.cpp
@@ -272,7 +272,6 @@ bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR)
TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
return true;
}
-
peer = RR->topology->addPeer(newPeer);
// Continue at // VALID
diff --git a/node/Peer.cpp b/node/Peer.cpp
index 0ba379c6..45e2fedd 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -34,6 +34,7 @@
#include "Network.hpp"
#include "AntiRecursion.hpp"
#include "SelfAwareness.hpp"
+#include "Cluster.hpp"
#include <algorithm>
@@ -107,7 +108,6 @@ void Peer::received(
// Learn paths if they've been confirmed via a HELLO or an ECHO
RemotePath *slot = (RemotePath *)0;
if (np < ZT_MAX_PEER_NETWORK_PATHS) {
- // Add new path
slot = &(_paths[np++]);
} else {
uint64_t slotLRmin = 0xffffffffffffffffULL;
@@ -141,6 +141,11 @@ void Peer::received(
}
}
}
+
+#ifdef ZT_ENABLE_CLUSTER
+ if ((pathIsConfirmed)&&(RR->cluster))
+ RR->cluster->replicateHavePeer(_id);
+#endif
}
if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) {
diff --git a/node/Topology.cpp b/node/Topology.cpp
index 56ca47c8..88c8856c 100644
--- a/node/Topology.cpp
+++ b/node/Topology.cpp
@@ -122,18 +122,22 @@ Topology::~Topology()
SharedPtr<Peer> Topology::addPeer(const SharedPtr<Peer> &peer)
{
if (peer->address() == RR->identity.address()) {
- TRACE("BUG: addNewPeer() caught and ignored attempt to add peer for self");
+ TRACE("BUG: addPeer() caught and ignored attempt to add peer for self");
throw std::logic_error("cannot add peer for self");
}
- const uint64_t now = RR->node->now();
- Mutex::Lock _l(_lock);
-
- SharedPtr<Peer> &p = _peers.set(peer->address(),peer);
- p->use(now);
- saveIdentity(p->identity());
+ SharedPtr<Peer> np;
+ {
+ Mutex::Lock _l(_lock);
+ SharedPtr<Peer> &hp = _peers[peer->address()];
+ if (!hp)
+ hp = peer;
+ np = hp;
+ }
+ np->use(RR->node->now());
+ saveIdentity(np->identity());
- return p;
+ return np;
}
SharedPtr<Peer> Topology::getPeer(const Address &zta)
@@ -143,13 +147,12 @@ SharedPtr<Peer> Topology::getPeer(const Address &zta)
return SharedPtr<Peer>();
}
- const uint64_t now = RR->node->now();
Mutex::Lock _l(_lock);
SharedPtr<Peer> &ap = _peers[zta];
if (ap) {
- ap->use(now);
+ ap->use(RR->node->now());
return ap;
}
@@ -157,13 +160,13 @@ SharedPtr<Peer> Topology::getPeer(const Address &zta)
if (id) {
try {
ap = SharedPtr<Peer>(new Peer(RR->identity,id));
- ap->use(now);
+ ap->use(RR->node->now());
return ap;
} catch ( ... ) {} // invalid identity?
}
+ // If we get here it means we read an invalid cache identity or had some other error
_peers.erase(zta);
-
return SharedPtr<Peer>();
}