summaryrefslogtreecommitdiff
path: root/node/Cluster.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2015-10-27 15:57:26 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2015-10-27 15:57:26 -0700
commitcc6080fe3898ddd1419050ee3a2c45cc87dd140b (patch)
treedc2b2169ad18057270e106b5017b1e470cf62080 /node/Cluster.cpp
parent6399f6f0940b6f20819d021a0dc3dcf0d289f002 (diff)
downloadinfinitytier-cc6080fe3898ddd1419050ee3a2c45cc87dd140b.tar.gz
infinitytier-cc6080fe3898ddd1419050ee3a2c45cc87dd140b.zip
(1) No need to confirm if we are a root (small optimization), (2) Refactor peer affinity tracking.
Diffstat (limited to 'node/Cluster.cpp')
-rw-r--r--node/Cluster.cpp130
1 files changed, 55 insertions, 75 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index 07ca0ba1..b2f3d585 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -82,7 +82,8 @@ Cluster::Cluster(
_z(z),
_id(id),
_zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints),
- _members(new _Member[ZT_CLUSTER_MAX_MEMBERS])
+ _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]),
+ _peerAffinities(65536)
{
uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
@@ -214,19 +215,12 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
ptr += id.deserialize(dmsg,ptr);
if (id) {
RR->topology->saveIdentity(id);
-
- { // Add or update peer affinity entry
- _PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
+ {
Mutex::Lock _l2(_peerAffinities_m);
- std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
- if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
- i->timestamp = pa.timestamp;
- } else {
- _peerAffinities.push_back(pa);
- std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
- }
+ _PA &pa = _peerAffinities[id.address()];
+ pa.ts = RR->node->now();
+ pa.mid = fromMemberId;
}
-
TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
}
} catch ( ... ) {
@@ -355,83 +349,66 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
if (len > 16384) // sanity check
return false;
- uint64_t mostRecentTimestamp = 0;
+ const uint64_t now = RR->node->now();
unsigned int canHasPeer = 0;
{ // Anyone got this peer?
Mutex::Lock _l2(_peerAffinities_m);
- std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),_PeerAffinity(toPeerAddress,0,0))); // O(log(n))
- while ((i != _peerAffinities.end())&&(i->address() == toPeerAddress)) {
- const uint16_t mid = i->clusterMemberId();
- if ((mid != _id)&&(i->timestamp > mostRecentTimestamp)) {
- mostRecentTimestamp = i->timestamp;
- canHasPeer = mid;
- }
- ++i;
- }
+ _PA *pa = _peerAffinities.get(toPeerAddress);
+ if ((pa)&&(pa->mid != _id)&&((now - pa->ts) < ZT_PEER_ACTIVITY_TIMEOUT))
+ canHasPeer = pa->mid;
+ else return false;
}
- const uint64_t now = RR->node->now();
- if ((now - mostRecentTimestamp) < ZT_PEER_ACTIVITY_TIMEOUT) {
- Buffer<2048> buf;
-
- if (unite) {
- InetAddress v4,v6;
- if (fromPeerAddress) {
- SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress));
- if (fromPeer)
- fromPeer->getBestActiveAddresses(now,v4,v6);
- }
- uint8_t addrCount = 0;
+ Buffer<2048> buf;
+ if (unite) {
+ InetAddress v4,v6;
+ if (fromPeerAddress) {
+ SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress));
+ if (fromPeer)
+ fromPeer->getBestActiveAddresses(now,v4,v6);
+ }
+ uint8_t addrCount = 0;
+ if (v4)
+ ++addrCount;
+ if (v6)
+ ++addrCount;
+ if (addrCount) {
+ toPeerAddress.appendTo(buf);
+ fromPeerAddress.appendTo(buf);
+ buf.append(addrCount);
if (v4)
- ++addrCount;
+ v4.serialize(buf);
if (v6)
- ++addrCount;
- if (addrCount) {
- toPeerAddress.appendTo(buf);
- fromPeerAddress.appendTo(buf);
- buf.append(addrCount);
- if (v4)
- v4.serialize(buf);
- if (v6)
- v6.serialize(buf);
- }
+ v6.serialize(buf);
}
+ }
+ {
+ Mutex::Lock _l2(_members[canHasPeer].lock);
+ if (buf.size() > 0)
+ _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
+ if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0)
+ RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len);
+ }
- {
- Mutex::Lock _l2(_members[canHasPeer].lock);
- if (buf.size() > 0)
- _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
- if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0)
- RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len);
- }
+ TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
- TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
- return true;
- } else {
- TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
- return false;
- }
+ return true;
}
void Cluster::replicateHavePeer(const Identity &peerId)
{
+ const uint64_t now = RR->node->now();
{ // Use peer affinity table to track our own last announce time for peers
- _PeerAffinity pa(peerId.address(),_id,RR->node->now());
Mutex::Lock _l2(_peerAffinities_m);
- std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
- if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
- if ((pa.timestamp - i->timestamp) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
- i->timestamp = pa.timestamp;
- // continue to announcement
- } else {
- // we've already announced this peer recently, so skip
- return;
- }
+ _PA &pa = _peerAffinities[peerId.address()];
+ if (pa.mid != _id) {
+ pa.ts = now;
+ pa.mid = _id;
+ } else if ((now - pa.ts) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
+ return;
} else {
- _peerAffinities.push_back(pa);
- std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
- // continue to announcement
+ pa.ts = now;
}
}
@@ -598,6 +575,7 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr
}
}
+ // Suggestion redirection if a closer member was found
for(std::vector<InetAddress>::const_iterator a(best.begin());a!=best.end();++a) {
if (a->ss_family == peerPhysicalAddress.ss_family) {
TRACE("%s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestDistance,bestMember,a->toString().c_str());
@@ -661,10 +639,12 @@ void Cluster::status(ZT_ClusterStatus &status) const
{
Mutex::Lock _l2(_peerAffinities_m);
- for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) {
- unsigned int mid = pi->clusterMemberId();
- if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT))
- ++ms[mid]->peers;
+ Address *k = (Address *)0;
+ _PA *v = (_PA *)0;
+ Hashtable< Address,_PA >::Iterator i(const_cast<Cluster *>(this)->_peerAffinities);
+ while (i.next(k,v)) {
+ if ( (ms[v->mid]) && (v->mid != _id) && ((now - v->ts) < ZT_PEER_ACTIVITY_TIMEOUT) )
+ ++ms[v->mid]->peers;
}
}
}