summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2015-10-27 15:57:26 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2015-10-27 15:57:26 -0700
commitcc6080fe3898ddd1419050ee3a2c45cc87dd140b (patch)
treedc2b2169ad18057270e106b5017b1e470cf62080
parent6399f6f0940b6f20819d021a0dc3dcf0d289f002 (diff)
downloadinfinitytier-cc6080fe3898ddd1419050ee3a2c45cc87dd140b.tar.gz
infinitytier-cc6080fe3898ddd1419050ee3a2c45cc87dd140b.zip
(1) No need to confirm if we are a root (small optimization), (2) Refactor peer affinity tracking.
-rw-r--r--node/Cluster.cpp130
-rw-r--r--node/Cluster.hpp29
-rw-r--r--node/Peer.cpp70
3 files changed, 98 insertions, 131 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index 07ca0ba1..b2f3d585 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -82,7 +82,8 @@ Cluster::Cluster(
_z(z),
_id(id),
_zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints),
- _members(new _Member[ZT_CLUSTER_MAX_MEMBERS])
+ _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]),
+ _peerAffinities(65536)
{
uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
@@ -214,19 +215,12 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
ptr += id.deserialize(dmsg,ptr);
if (id) {
RR->topology->saveIdentity(id);
-
- { // Add or update peer affinity entry
- _PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
+ {
Mutex::Lock _l2(_peerAffinities_m);
- std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
- if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
- i->timestamp = pa.timestamp;
- } else {
- _peerAffinities.push_back(pa);
- std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
- }
+ _PA &pa = _peerAffinities[id.address()];
+ pa.ts = RR->node->now();
+ pa.mid = fromMemberId;
}
-
TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
}
} catch ( ... ) {
@@ -355,83 +349,66 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
if (len > 16384) // sanity check
return false;
- uint64_t mostRecentTimestamp = 0;
+ const uint64_t now = RR->node->now();
unsigned int canHasPeer = 0;
{ // Anyone got this peer?
Mutex::Lock _l2(_peerAffinities_m);
- std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),_PeerAffinity(toPeerAddress,0,0))); // O(log(n))
- while ((i != _peerAffinities.end())&&(i->address() == toPeerAddress)) {
- const uint16_t mid = i->clusterMemberId();
- if ((mid != _id)&&(i->timestamp > mostRecentTimestamp)) {
- mostRecentTimestamp = i->timestamp;
- canHasPeer = mid;
- }
- ++i;
- }
+ _PA *pa = _peerAffinities.get(toPeerAddress);
+ if ((pa)&&(pa->mid != _id)&&((now - pa->ts) < ZT_PEER_ACTIVITY_TIMEOUT))
+ canHasPeer = pa->mid;
+ else return false;
}
- const uint64_t now = RR->node->now();
- if ((now - mostRecentTimestamp) < ZT_PEER_ACTIVITY_TIMEOUT) {
- Buffer<2048> buf;
-
- if (unite) {
- InetAddress v4,v6;
- if (fromPeerAddress) {
- SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress));
- if (fromPeer)
- fromPeer->getBestActiveAddresses(now,v4,v6);
- }
- uint8_t addrCount = 0;
+ Buffer<2048> buf;
+ if (unite) {
+ InetAddress v4,v6;
+ if (fromPeerAddress) {
+ SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress));
+ if (fromPeer)
+ fromPeer->getBestActiveAddresses(now,v4,v6);
+ }
+ uint8_t addrCount = 0;
+ if (v4)
+ ++addrCount;
+ if (v6)
+ ++addrCount;
+ if (addrCount) {
+ toPeerAddress.appendTo(buf);
+ fromPeerAddress.appendTo(buf);
+ buf.append(addrCount);
if (v4)
- ++addrCount;
+ v4.serialize(buf);
if (v6)
- ++addrCount;
- if (addrCount) {
- toPeerAddress.appendTo(buf);
- fromPeerAddress.appendTo(buf);
- buf.append(addrCount);
- if (v4)
- v4.serialize(buf);
- if (v6)
- v6.serialize(buf);
- }
+ v6.serialize(buf);
}
+ }
+ {
+ Mutex::Lock _l2(_members[canHasPeer].lock);
+ if (buf.size() > 0)
+ _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
+ if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0)
+ RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len);
+ }
- {
- Mutex::Lock _l2(_members[canHasPeer].lock);
- if (buf.size() > 0)
- _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
- if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0)
- RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len);
- }
+ TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
- TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
- return true;
- } else {
- TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
- return false;
- }
+ return true;
}
void Cluster::replicateHavePeer(const Identity &peerId)
{
+ const uint64_t now = RR->node->now();
{ // Use peer affinity table to track our own last announce time for peers
- _PeerAffinity pa(peerId.address(),_id,RR->node->now());
Mutex::Lock _l2(_peerAffinities_m);
- std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
- if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
- if ((pa.timestamp - i->timestamp) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
- i->timestamp = pa.timestamp;
- // continue to announcement
- } else {
- // we've already announced this peer recently, so skip
- return;
- }
+ _PA &pa = _peerAffinities[peerId.address()];
+ if (pa.mid != _id) {
+ pa.ts = now;
+ pa.mid = _id;
+ } else if ((now - pa.ts) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
+ return;
} else {
- _peerAffinities.push_back(pa);
- std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
- // continue to announcement
+ pa.ts = now;
}
}
@@ -598,6 +575,7 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr
}
}
+ // Suggestion redirection if a closer member was found
for(std::vector<InetAddress>::const_iterator a(best.begin());a!=best.end();++a) {
if (a->ss_family == peerPhysicalAddress.ss_family) {
TRACE("%s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestDistance,bestMember,a->toString().c_str());
@@ -661,10 +639,12 @@ void Cluster::status(ZT_ClusterStatus &status) const
{
Mutex::Lock _l2(_peerAffinities_m);
- for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) {
- unsigned int mid = pi->clusterMemberId();
- if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT))
- ++ms[mid]->peers;
+ Address *k = (Address *)0;
+ _PA *v = (_PA *)0;
+ Hashtable< Address,_PA >::Iterator i(const_cast<Cluster *>(this)->_peerAffinities);
+ while (i.next(k,v)) {
+ if ( (ms[v->mid]) && (v->mid != _id) && ((now - v->ts) < ZT_PEER_ACTIVITY_TIMEOUT) )
+ ++ms[v->mid]->peers;
}
}
}
diff --git a/node/Cluster.hpp b/node/Cluster.hpp
index bab56785..42a26c7f 100644
--- a/node/Cluster.hpp
+++ b/node/Cluster.hpp
@@ -52,7 +52,7 @@
/**
* How often should we announce that we have a peer?
*/
-#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD 60000
+#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD 30000
/**
* Desired period between doPeriodicTasks() in milliseconds
@@ -285,7 +285,7 @@ private:
void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len);
void _flush(uint16_t memberId);
- // These are initialized in the constructor and remain static
+ // These are initialized in the constructor and remain immutable
uint16_t _masterSecret[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
unsigned char _key[ZT_PEER_SECRET_KEY_LENGTH];
const RuntimeEnvironment *RR;
@@ -298,6 +298,7 @@ private:
const int32_t _z;
const uint16_t _id;
const std::vector<InetAddress> _zeroTierPhysicalEndpoints;
+ // end immutable fields
struct _Member
{
@@ -330,30 +331,18 @@ private:
_Member() { this->clear(); }
~_Member() { Utils::burn(key,sizeof(key)); }
};
-
- _Member *const _members; // cluster IDs can be from 0 to 65535 (16-bit)
+ _Member *const _members;
std::vector<uint16_t> _memberIds;
Mutex _memberIds_m;
- // Record tracking which members have which peers and how recently they claimed this -- also used to track our last claimed time
- struct _PeerAffinity
+ struct _PA
{
- _PeerAffinity(const Address &a,uint16_t mid,uint64_t ts) :
- key((a.toInt() << 16) | (uint64_t)mid),
- timestamp(ts) {}
-
- uint64_t key;
- uint64_t timestamp;
-
- inline Address address() const throw() { return Address(key >> 16); }
- inline uint16_t clusterMemberId() const throw() { return (uint16_t)(key & 0xffff); }
-
- inline bool operator<(const _PeerAffinity &pi) const throw() { return (key < pi.key); }
+ _PA() : ts(0),mid(0xffff) {}
+ uint64_t ts;
+ uint16_t mid;
};
-
- // A memory-efficient packed map of _PeerAffinity records searchable with std::binary_search() and std::lower_bound()
- std::vector<_PeerAffinity> _peerAffinities;
+ Hashtable< Address,_PA > _peerAffinities;
Mutex _peerAffinities_m;
};
diff --git a/node/Peer.cpp b/node/Peer.cpp
index e56c1eca..99e2156e 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -86,43 +86,41 @@ void Peer::received(
// Note: findBetterEndpoint() is first since we still want to check
// for a better endpoint even if we don't actually send a redirect.
if ( (RR->cluster->findBetterEndpoint(redirectTo,_id.address(),remoteAddr,false)) && (verb != Packet::VERB_OK)&&(verb != Packet::VERB_ERROR)&&(verb != Packet::VERB_RENDEZVOUS)&&(verb != Packet::VERB_PUSH_DIRECT_PATHS) ) {
- if ((redirectTo.ss_family == AF_INET)||(redirectTo.ss_family == AF_INET6)) {
- if (_vProto >= 5) {
- // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS.
- Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
- outp.append((uint16_t)1); // count == 1
- outp.append((uint8_t)0); // no flags
- outp.append((uint16_t)0); // no extensions
- if (redirectTo.ss_family == AF_INET) {
- outp.append((uint8_t)4);
- outp.append((uint8_t)6);
- outp.append(redirectTo.rawIpData(),4);
- } else {
- outp.append((uint8_t)6);
- outp.append((uint8_t)18);
- outp.append(redirectTo.rawIpData(),16);
- }
- outp.append((uint16_t)redirectTo.port());
- outp.armor(_key,true);
- RR->antiRec->logOutgoingZT(outp.data(),outp.size());
- RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size());
+ if (_vProto >= 5) {
+ // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS.
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
+ outp.append((uint16_t)1); // count == 1
+ outp.append((uint8_t)0); // no flags
+ outp.append((uint16_t)0); // no extensions
+ if (redirectTo.ss_family == AF_INET) {
+ outp.append((uint8_t)4);
+ outp.append((uint8_t)6);
+ outp.append(redirectTo.rawIpData(),4);
} else {
- // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere.
- Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS);
- outp.append((uint8_t)0); // no flags
- RR->identity.address().appendTo(outp);
- outp.append((uint16_t)redirectTo.port());
- if (redirectTo.ss_family == AF_INET) {
- outp.append((uint8_t)4);
- outp.append(redirectTo.rawIpData(),4);
- } else {
- outp.append((uint8_t)16);
- outp.append(redirectTo.rawIpData(),16);
- }
- outp.armor(_key,true);
- RR->antiRec->logOutgoingZT(outp.data(),outp.size());
- RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size());
+ outp.append((uint8_t)6);
+ outp.append((uint8_t)18);
+ outp.append(redirectTo.rawIpData(),16);
}
+ outp.append((uint16_t)redirectTo.port());
+ outp.armor(_key,true);
+ RR->antiRec->logOutgoingZT(outp.data(),outp.size());
+ RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size());
+ } else {
+ // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere.
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS);
+ outp.append((uint8_t)0); // no flags
+ RR->identity.address().appendTo(outp);
+ outp.append((uint16_t)redirectTo.port());
+ if (redirectTo.ss_family == AF_INET) {
+ outp.append((uint8_t)4);
+ outp.append(redirectTo.rawIpData(),4);
+ } else {
+ outp.append((uint8_t)16);
+ outp.append(redirectTo.rawIpData(),16);
+ }
+ outp.armor(_key,true);
+ RR->antiRec->logOutgoingZT(outp.data(),outp.size());
+ RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size());
}
}
}
@@ -167,7 +165,7 @@ void Peer::received(
}
if (!pathIsConfirmed) {
- if (verb == Packet::VERB_OK) {
+ if ((verb == Packet::VERB_OK)||(RR->topology->amRoot())) {
Path *slot = (Path *)0;
if (np < ZT_MAX_PEER_NETWORK_PATHS) {