diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2015-10-27 15:57:26 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2015-10-27 15:57:26 -0700 |
commit | cc6080fe3898ddd1419050ee3a2c45cc87dd140b (patch) | |
tree | dc2b2169ad18057270e106b5017b1e470cf62080 | |
parent | 6399f6f0940b6f20819d021a0dc3dcf0d289f002 (diff) | |
download | infinitytier-cc6080fe3898ddd1419050ee3a2c45cc87dd140b.tar.gz infinitytier-cc6080fe3898ddd1419050ee3a2c45cc87dd140b.zip |
(1) No need to confirm if we are a root (small optimization), (2) Refactor peer affinity tracking.
-rw-r--r-- | node/Cluster.cpp | 130 | ||||
-rw-r--r-- | node/Cluster.hpp | 29 | ||||
-rw-r--r-- | node/Peer.cpp | 70 |
3 files changed, 98 insertions, 131 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp index 07ca0ba1..b2f3d585 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -82,7 +82,8 @@ Cluster::Cluster( _z(z), _id(id), _zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints), - _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]) + _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]), + _peerAffinities(65536) { uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; @@ -214,19 +215,12 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) ptr += id.deserialize(dmsg,ptr); if (id) { RR->topology->saveIdentity(id); - - { // Add or update peer affinity entry - _PeerAffinity pa(id.address(),fromMemberId,RR->node->now()); + { Mutex::Lock _l2(_peerAffinities_m); - std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n)) - if ((i != _peerAffinities.end())&&(i->key == pa.key)) { - i->timestamp = pa.timestamp; - } else { - _peerAffinities.push_back(pa); - std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now - } + _PA &pa = _peerAffinities[id.address()]; + pa.ts = RR->node->now(); + pa.mid = fromMemberId; } - TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str()); } } catch ( ... ) { @@ -355,83 +349,66 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee if (len > 16384) // sanity check return false; - uint64_t mostRecentTimestamp = 0; + const uint64_t now = RR->node->now(); unsigned int canHasPeer = 0; { // Anyone got this peer? Mutex::Lock _l2(_peerAffinities_m); - std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),_PeerAffinity(toPeerAddress,0,0))); // O(log(n)) - while ((i != _peerAffinities.end())&&(i->address() == toPeerAddress)) { - const uint16_t mid = i->clusterMemberId(); - if ((mid != _id)&&(i->timestamp > mostRecentTimestamp)) { - mostRecentTimestamp = i->timestamp; - canHasPeer = mid; - } - ++i; - } + _PA *pa = _peerAffinities.get(toPeerAddress); + if ((pa)&&(pa->mid != _id)&&((now - pa->ts) < ZT_PEER_ACTIVITY_TIMEOUT)) + canHasPeer = pa->mid; + else return false; } - const uint64_t now = RR->node->now(); - if ((now - mostRecentTimestamp) < ZT_PEER_ACTIVITY_TIMEOUT) { - Buffer<2048> buf; - - if (unite) { - InetAddress v4,v6; - if (fromPeerAddress) { - SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress)); - if (fromPeer) - fromPeer->getBestActiveAddresses(now,v4,v6); - } - uint8_t addrCount = 0; + Buffer<2048> buf; + if (unite) { + InetAddress v4,v6; + if (fromPeerAddress) { + SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress)); + if (fromPeer) + fromPeer->getBestActiveAddresses(now,v4,v6); + } + uint8_t addrCount = 0; + if (v4) + ++addrCount; + if (v6) + ++addrCount; + if (addrCount) { + toPeerAddress.appendTo(buf); + fromPeerAddress.appendTo(buf); + buf.append(addrCount); if (v4) - ++addrCount; + v4.serialize(buf); if (v6) - ++addrCount; - if (addrCount) { - toPeerAddress.appendTo(buf); - fromPeerAddress.appendTo(buf); - buf.append(addrCount); - if (v4) - v4.serialize(buf); - if (v6) - v6.serialize(buf); - } + v6.serialize(buf); } + } + { + Mutex::Lock _l2(_members[canHasPeer].lock); + if (buf.size() > 0) + _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); + if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0) + RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len); + } - { - Mutex::Lock _l2(_members[canHasPeer].lock); - if (buf.size() > 0) - _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); - if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0) - RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len); - } + TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); - TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); - return true; - } else { - TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); - return false; - } + return true; } void Cluster::replicateHavePeer(const Identity &peerId) { + const uint64_t now = RR->node->now(); { // Use peer affinity table to track our own last announce time for peers - _PeerAffinity pa(peerId.address(),_id,RR->node->now()); Mutex::Lock _l2(_peerAffinities_m); - std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n)) - if ((i != _peerAffinities.end())&&(i->key == pa.key)) { - if ((pa.timestamp - i->timestamp) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) { - i->timestamp = pa.timestamp; - // continue to announcement - } else { - // we've already announced this peer recently, so skip - return; - } + _PA &pa = _peerAffinities[peerId.address()]; + if (pa.mid != _id) { + pa.ts = now; + pa.mid = _id; + } else if ((now - pa.ts) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) { + return; } else { - _peerAffinities.push_back(pa); - std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now - // continue to announcement + pa.ts = now; } } @@ -598,6 +575,7 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr } } + // Suggestion redirection if a closer member was found for(std::vector<InetAddress>::const_iterator a(best.begin());a!=best.end();++a) { if (a->ss_family == peerPhysicalAddress.ss_family) { TRACE("%s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestDistance,bestMember,a->toString().c_str()); @@ -661,10 +639,12 @@ void Cluster::status(ZT_ClusterStatus &status) const { Mutex::Lock _l2(_peerAffinities_m); - for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) { - unsigned int mid = pi->clusterMemberId(); - if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT)) - ++ms[mid]->peers; + Address *k = (Address *)0; + _PA *v = (_PA *)0; + Hashtable< Address,_PA >::Iterator i(const_cast<Cluster *>(this)->_peerAffinities); + while (i.next(k,v)) { + if ( (ms[v->mid]) && (v->mid != _id) && ((now - v->ts) < ZT_PEER_ACTIVITY_TIMEOUT) ) + ++ms[v->mid]->peers; } } } diff --git a/node/Cluster.hpp b/node/Cluster.hpp index bab56785..42a26c7f 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -52,7 +52,7 @@ /** * How often should we announce that we have a peer? */ -#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD 60000 +#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD 30000 /** * Desired period between doPeriodicTasks() in milliseconds @@ -285,7 +285,7 @@ private: void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len); void _flush(uint16_t memberId); - // These are initialized in the constructor and remain static + // These are initialized in the constructor and remain immutable uint16_t _masterSecret[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; unsigned char _key[ZT_PEER_SECRET_KEY_LENGTH]; const RuntimeEnvironment *RR; @@ -298,6 +298,7 @@ private: const int32_t _z; const uint16_t _id; const std::vector<InetAddress> _zeroTierPhysicalEndpoints; + // end immutable fields struct _Member { @@ -330,30 +331,18 @@ private: _Member() { this->clear(); } ~_Member() { Utils::burn(key,sizeof(key)); } }; - - _Member *const _members; // cluster IDs can be from 0 to 65535 (16-bit) + _Member *const _members; std::vector<uint16_t> _memberIds; Mutex _memberIds_m; - // Record tracking which members have which peers and how recently they claimed this -- also used to track our last claimed time - struct _PeerAffinity + struct _PA { - _PeerAffinity(const Address &a,uint16_t mid,uint64_t ts) : - key((a.toInt() << 16) | (uint64_t)mid), - timestamp(ts) {} - - uint64_t key; - uint64_t timestamp; - - inline Address address() const throw() { return Address(key >> 16); } - inline uint16_t clusterMemberId() const throw() { return (uint16_t)(key & 0xffff); } - - inline bool operator<(const _PeerAffinity &pi) const throw() { return (key < pi.key); } + _PA() : ts(0),mid(0xffff) {} + uint64_t ts; + uint16_t mid; }; - - // A memory-efficient packed map of _PeerAffinity records searchable with std::binary_search() and std::lower_bound() - std::vector<_PeerAffinity> _peerAffinities; + Hashtable< Address,_PA > _peerAffinities; Mutex _peerAffinities_m; }; diff --git a/node/Peer.cpp b/node/Peer.cpp index e56c1eca..99e2156e 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -86,43 +86,41 @@ void Peer::received( // Note: findBetterEndpoint() is first since we still want to check // for a better endpoint even if we don't actually send a redirect. if ( (RR->cluster->findBetterEndpoint(redirectTo,_id.address(),remoteAddr,false)) && (verb != Packet::VERB_OK)&&(verb != Packet::VERB_ERROR)&&(verb != Packet::VERB_RENDEZVOUS)&&(verb != Packet::VERB_PUSH_DIRECT_PATHS) ) { - if ((redirectTo.ss_family == AF_INET)||(redirectTo.ss_family == AF_INET6)) { - if (_vProto >= 5) { - // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS. - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); - outp.append((uint16_t)1); // count == 1 - outp.append((uint8_t)0); // no flags - outp.append((uint16_t)0); // no extensions - if (redirectTo.ss_family == AF_INET) { - outp.append((uint8_t)4); - outp.append((uint8_t)6); - outp.append(redirectTo.rawIpData(),4); - } else { - outp.append((uint8_t)6); - outp.append((uint8_t)18); - outp.append(redirectTo.rawIpData(),16); - } - outp.append((uint16_t)redirectTo.port()); - outp.armor(_key,true); - RR->antiRec->logOutgoingZT(outp.data(),outp.size()); - RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); + if (_vProto >= 5) { + // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS. + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); + outp.append((uint16_t)1); // count == 1 + outp.append((uint8_t)0); // no flags + outp.append((uint16_t)0); // no extensions + if (redirectTo.ss_family == AF_INET) { + outp.append((uint8_t)4); + outp.append((uint8_t)6); + outp.append(redirectTo.rawIpData(),4); } else { - // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere. - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); - outp.append((uint8_t)0); // no flags - RR->identity.address().appendTo(outp); - outp.append((uint16_t)redirectTo.port()); - if (redirectTo.ss_family == AF_INET) { - outp.append((uint8_t)4); - outp.append(redirectTo.rawIpData(),4); - } else { - outp.append((uint8_t)16); - outp.append(redirectTo.rawIpData(),16); - } - outp.armor(_key,true); - RR->antiRec->logOutgoingZT(outp.data(),outp.size()); - RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); + outp.append((uint8_t)6); + outp.append((uint8_t)18); + outp.append(redirectTo.rawIpData(),16); } + outp.append((uint16_t)redirectTo.port()); + outp.armor(_key,true); + RR->antiRec->logOutgoingZT(outp.data(),outp.size()); + RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); + } else { + // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere. + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); + outp.append((uint8_t)0); // no flags + RR->identity.address().appendTo(outp); + outp.append((uint16_t)redirectTo.port()); + if (redirectTo.ss_family == AF_INET) { + outp.append((uint8_t)4); + outp.append(redirectTo.rawIpData(),4); + } else { + outp.append((uint8_t)16); + outp.append(redirectTo.rawIpData(),16); + } + outp.armor(_key,true); + RR->antiRec->logOutgoingZT(outp.data(),outp.size()); + RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); } } } @@ -167,7 +165,7 @@ void Peer::received( } if (!pathIsConfirmed) { - if (verb == Packet::VERB_OK) { + if ((verb == Packet::VERB_OK)||(RR->topology->amRoot())) { Path *slot = (Path *)0; if (np < ZT_MAX_PEER_NETWORK_PATHS) { |