summaryrefslogtreecommitdiff
path: root/node/Cluster.cpp
diff options
context:
space:
mode:
authorGrant Limberg <glimberg@gmail.com>2015-11-02 18:30:54 -0800
committerGrant Limberg <glimberg@gmail.com>2015-11-02 18:30:54 -0800
commita19e82fcbc2203f0d84a0e744d344e0796bc0c33 (patch)
tree2f8cfc56a03cf6e614991c83a309b5fce5a48e48 /node/Cluster.cpp
parent0ffcfa307e537347f181e7b22047f252d0cdc414 (diff)
parent4e9d4304761f93a1764d3ec2d2b0c38140decad8 (diff)
downloadinfinitytier-a19e82fcbc2203f0d84a0e744d344e0796bc0c33.tar.gz
infinitytier-a19e82fcbc2203f0d84a0e744d344e0796bc0c33.zip
Merge branch 'edge' into windows-ui
Diffstat (limited to 'node/Cluster.cpp')
-rw-r--r--node/Cluster.cpp414
1 files changed, 211 insertions, 203 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index 9d25593a..e9e31ede 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -44,9 +44,9 @@
#include "CertificateOfMembership.hpp"
#include "Salsa20.hpp"
#include "Poly1305.hpp"
-#include "Packet.hpp"
#include "Identity.hpp"
-#include "Peer.hpp"
+#include "Topology.hpp"
+#include "Packet.hpp"
#include "Switch.hpp"
#include "Node.hpp"
@@ -82,7 +82,11 @@ Cluster::Cluster(
_z(z),
_id(id),
_zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints),
- _members(new _Member[ZT_CLUSTER_MAX_MEMBERS])
+ _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]),
+ _peerAffinities(65536),
+ _lastCleanedPeerAffinities(0),
+ _lastCheckedPeersForAnnounce(0),
+ _lastFlushed(0)
{
uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
@@ -200,36 +204,40 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
}
#endif
}
- m.lastReceivedAliveAnnouncement = RR->node->now();
#ifdef ZT_TRACE
- TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str());
+ if ((RR->node->now() - m.lastReceivedAliveAnnouncement) >= ZT_CLUSTER_TIMEOUT) {
+ TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str());
+ }
#endif
+ m.lastReceivedAliveAnnouncement = RR->node->now();
} break;
case STATE_MESSAGE_HAVE_PEER: {
- try {
- Identity id;
- ptr += id.deserialize(dmsg,ptr);
- if (id) {
- RR->topology->saveIdentity(id);
-
- { // Add or update peer affinity entry
- _PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
- Mutex::Lock _l2(_peerAffinities_m);
- std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
- if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
- i->timestamp = pa.timestamp;
- } else {
- _peerAffinities.push_back(pa);
- std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
- }
- }
-
- TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
- }
- } catch ( ... ) {
- // ignore invalid identities
- }
+ const uint64_t now = RR->node->now();
+ Identity id;
+ InetAddress physicalAddress;
+ ptr += id.deserialize(dmsg,ptr);
+ ptr += physicalAddress.deserialize(dmsg,ptr);
+ if (id) {
+ // Forget any paths that we have to this peer at its address
+ if (physicalAddress) {
+ SharedPtr<Peer> myPeerRecord(RR->topology->getPeerNoCache(id.address(),now));
+ if (myPeerRecord)
+ myPeerRecord->removePathByAddress(physicalAddress);
+ }
+
+ // Always save identity to update file time
+ RR->topology->saveIdentity(id);
+
+ // Set peer affinity to its new home
+ {
+ Mutex::Lock _l2(_peerAffinities_m);
+ _PA &pa = _peerAffinities[id.address()];
+ pa.ts = now;
+ pa.mid = fromMemberId;
+ }
+ TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str());
+ }
} break;
case STATE_MESSAGE_MULTICAST_LIKE: {
@@ -238,113 +246,102 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
const MAC mac(dmsg.field(ptr,6),6); ptr += 6;
const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4;
RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address);
- TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid);
+ TRACE("[%u] %s likes %s/%.8x on %.16llx",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid);
} break;
case STATE_MESSAGE_COM: {
+ /* not currently used so not decoded yet
CertificateOfMembership com;
ptr += com.deserialize(dmsg,ptr);
if (com) {
TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision());
}
+ */
} break;
- case STATE_MESSAGE_RELAY: {
+ case STATE_MESSAGE_PROXY_UNITE: {
+ const Address localPeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
+ const Address remotePeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
const unsigned int numRemotePeerPaths = dmsg[ptr++];
InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
for(unsigned int i=0;i<numRemotePeerPaths;++i)
ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
- const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
- const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
-
- if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
- const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
- TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths);
-
- SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
- if (destinationPeer) {
- if (
- (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
- (numRemotePeerPaths > 0)&&
- (packetLen >= 18)&&
- (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
- ) {
- // If remote peer paths were sent with this relayed packet, we do
- // RENDEZVOUS. It's handled here for cluster-relayed packets since
- // we don't have both Peer records so this is a different path.
-
- const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
-
- InetAddress bestDestV4,bestDestV6;
- destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
- InetAddress bestRemoteV4,bestRemoteV6;
- for(unsigned int i=0;i<numRemotePeerPaths;++i) {
- if ((bestRemoteV4)&&(bestRemoteV6))
- break;
- switch(remotePeerPaths[i].ss_family) {
- case AF_INET:
- if (!bestRemoteV4)
- bestRemoteV4 = remotePeerPaths[i];
- break;
- case AF_INET6:
- if (!bestRemoteV6)
- bestRemoteV6 = remotePeerPaths[i];
- break;
- }
- }
-
- Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
- rendezvousForDest.append((uint8_t)0);
- remotePeerAddress.appendTo(rendezvousForDest);
-
- Buffer<2048> rendezvousForOtherEnd;
- remotePeerAddress.appendTo(rendezvousForOtherEnd);
- rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
- const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
- rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
- rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
- destinationAddress.appendTo(rendezvousForOtherEnd);
-
- bool haveMatch = false;
- if ((bestDestV6)&&(bestRemoteV6)) {
- haveMatch = true;
-
- rendezvousForDest.append((uint16_t)bestRemoteV6.port());
- rendezvousForDest.append((uint8_t)16);
- rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
-
- rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
- rendezvousForOtherEnd.append((uint8_t)16);
- rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
- rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
- } else if ((bestDestV4)&&(bestRemoteV4)) {
- haveMatch = true;
-
- rendezvousForDest.append((uint16_t)bestRemoteV4.port());
- rendezvousForDest.append((uint8_t)4);
- rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
-
- rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
- rendezvousForOtherEnd.append((uint8_t)4);
- rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
- rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
- }
-
- if (haveMatch) {
- _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
- RR->sw->send(rendezvousForDest,true,0);
- }
+
+ TRACE("[%u] requested that we unite local %s with remote %s",(unsigned int)fromMemberId,localPeerAddress.toString().c_str(),remotePeerAddress.toString().c_str());
+
+ const uint64_t now = RR->node->now();
+ SharedPtr<Peer> localPeer(RR->topology->getPeerNoCache(localPeerAddress,now));
+ if ((localPeer)&&(numRemotePeerPaths > 0)) {
+ InetAddress bestLocalV4,bestLocalV6;
+ localPeer->getBestActiveAddresses(now,bestLocalV4,bestLocalV6);
+
+ InetAddress bestRemoteV4,bestRemoteV6;
+ for(unsigned int i=0;i<numRemotePeerPaths;++i) {
+ if ((bestRemoteV4)&&(bestRemoteV6))
+ break;
+ switch(remotePeerPaths[i].ss_family) {
+ case AF_INET:
+ if (!bestRemoteV4)
+ bestRemoteV4 = remotePeerPaths[i];
+ break;
+ case AF_INET6:
+ if (!bestRemoteV6)
+ bestRemoteV6 = remotePeerPaths[i];
+ break;
}
}
+
+ Packet rendezvousForLocal(localPeerAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
+ rendezvousForLocal.append((uint8_t)0);
+ remotePeerAddress.appendTo(rendezvousForLocal);
+
+ Buffer<2048> rendezvousForRemote;
+ remotePeerAddress.appendTo(rendezvousForRemote);
+ rendezvousForRemote.append((uint8_t)Packet::VERB_RENDEZVOUS);
+ const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForRemote.size();
+ rendezvousForRemote.addSize(2); // space for actual packet payload length
+ rendezvousForRemote.append((uint8_t)0); // flags == 0
+ localPeerAddress.appendTo(rendezvousForRemote);
+
+ bool haveMatch = false;
+ if ((bestLocalV6)&&(bestRemoteV6)) {
+ haveMatch = true;
+
+ rendezvousForLocal.append((uint16_t)bestRemoteV6.port());
+ rendezvousForLocal.append((uint8_t)16);
+ rendezvousForLocal.append(bestRemoteV6.rawIpData(),16);
+
+ rendezvousForRemote.append((uint16_t)bestLocalV6.port());
+ rendezvousForRemote.append((uint8_t)16);
+ rendezvousForRemote.append(bestLocalV6.rawIpData(),16);
+ rendezvousForRemote.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
+ } else if ((bestLocalV4)&&(bestRemoteV4)) {
+ haveMatch = true;
+
+ rendezvousForLocal.append((uint16_t)bestRemoteV4.port());
+ rendezvousForLocal.append((uint8_t)4);
+ rendezvousForLocal.append(bestRemoteV4.rawIpData(),4);
+
+ rendezvousForRemote.append((uint16_t)bestLocalV4.port());
+ rendezvousForRemote.append((uint8_t)4);
+ rendezvousForRemote.append(bestLocalV4.rawIpData(),4);
+ rendezvousForRemote.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
+ }
+
+ if (haveMatch) {
+ _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
+ _flush(fromMemberId); // we want this to go ASAP, since with port restricted cone NATs success can be timing-sensitive
+ RR->sw->send(rendezvousForLocal,true,0);
+ }
}
} break;
case STATE_MESSAGE_PROXY_SEND: {
- const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
+ const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
Packet outp(rcpt,RR->identity.address(),verb);
- outp.append(dmsg.field(ptr,len),len);
+ outp.append(dmsg.field(ptr,len),len); ptr += len;
RR->sw->send(outp,true,0);
TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
} break;
@@ -363,81 +360,78 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
}
}
-bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len)
+bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite)
{
if (len > 16384) // sanity check
return false;
- uint64_t mostRecentTimestamp = 0;
- uint16_t canHasPeer = 0;
+ const uint64_t now = RR->node->now();
+ unsigned int canHasPeer = 0;
{ // Anyone got this peer?
Mutex::Lock _l2(_peerAffinities_m);
- std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),_PeerAffinity(toPeerAddress,0,0))); // O(log(n))
- while ((i != _peerAffinities.end())&&(i->address() == toPeerAddress)) {
- uint16_t mid = i->clusterMemberId();
- if ((mid != _id)&&(i->timestamp > mostRecentTimestamp)) {
- mostRecentTimestamp = i->timestamp;
- canHasPeer = mid;
- }
- }
+ _PA *pa = _peerAffinities.get(toPeerAddress);
+ if ((pa)&&(pa->mid != _id)&&((now - pa->ts) < ZT_PEER_ACTIVITY_TIMEOUT))
+ canHasPeer = pa->mid;
+ else return false;
}
- const uint64_t now = RR->node->now();
- if ((now - mostRecentTimestamp) < ZT_PEER_ACTIVITY_TIMEOUT) {
- Buffer<16384> buf;
-
+ Buffer<1024> buf;
+ if (unite) {
InetAddress v4,v6;
if (fromPeerAddress) {
- SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress));
+ SharedPtr<Peer> fromPeer(RR->topology->getPeerNoCache(fromPeerAddress,now));
if (fromPeer)
fromPeer->getBestActiveAddresses(now,v4,v6);
}
- buf.append((uint8_t)( (v4) ? ((v6) ? 2 : 1) : ((v6) ? 1 : 0) ));
+ uint8_t addrCount = 0;
if (v4)
- v4.serialize(buf);
+ ++addrCount;
if (v6)
- v6.serialize(buf);
- buf.append((uint16_t)len);
- buf.append(data,len);
-
- {
- Mutex::Lock _l2(_members[canHasPeer].lock);
- _send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size());
+ ++addrCount;
+ if (addrCount) {
+ toPeerAddress.appendTo(buf);
+ fromPeerAddress.appendTo(buf);
+ buf.append(addrCount);
+ if (v4)
+ v4.serialize(buf);
+ if (v6)
+ v6.serialize(buf);
}
-
- TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
- return true;
- } else {
- TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
- return false;
}
+ {
+ Mutex::Lock _l2(_members[canHasPeer].lock);
+ if (buf.size() > 0)
+ _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
+ if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0)
+ RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len);
+ }
+
+ TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
+
+ return true;
}
-void Cluster::replicateHavePeer(const Identity &peerId)
+void Cluster::replicateHavePeer(const Identity &peerId,const InetAddress &physicalAddress)
{
- { // Use peer affinity table to track our own last announce time for peers
- _PeerAffinity pa(peerId.address(),_id,RR->node->now());
+ const uint64_t now = RR->node->now();
+ {
Mutex::Lock _l2(_peerAffinities_m);
- std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
- if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
- if ((pa.timestamp - i->timestamp) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
- i->timestamp = pa.timestamp;
- // continue to announcement
- } else {
- // we've already announced this peer recently, so skip
- return;
- }
+ _PA &pa = _peerAffinities[peerId.address()];
+ if (pa.mid != _id) {
+ pa.ts = now;
+ pa.mid = _id;
+ } else if ((now - pa.ts) < ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
+ return;
} else {
- _peerAffinities.push_back(pa);
- std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
- // continue to announcement
+ pa.ts = now;
}
}
// announcement
Buffer<4096> buf;
peerId.serialize(buf,false);
+ physicalAddress.serialize(buf);
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@@ -449,7 +443,7 @@ void Cluster::replicateHavePeer(const Identity &peerId)
void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group)
{
- Buffer<2048> buf;
+ Buffer<1024> buf;
buf.append((uint64_t)nwid);
peerAddress.appendTo(buf);
group.mac().appendTo(buf);
@@ -466,7 +460,7 @@ void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,co
void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com)
{
- Buffer<2048> buf;
+ Buffer<4096> buf;
com.serialize(buf);
TRACE("replicating %s COM for %.16llx to all members",com.issuedTo().toString().c_str(),com.networkId());
{
@@ -478,11 +472,47 @@ void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembers
}
}
+struct _ClusterAnnouncePeers
+{
+ _ClusterAnnouncePeers(const uint64_t now_,Cluster *parent_) : now(now_),parent(parent_) {}
+ const uint64_t now;
+ Cluster *const parent;
+ inline void operator()(const Topology &t,const SharedPtr<Peer> &peer) const
+ {
+ Path *p = peer->getBestPath(now);
+ if (p)
+ parent->replicateHavePeer(peer->identity(),p->address());
+ }
+};
void Cluster::doPeriodicTasks()
{
const uint64_t now = RR->node->now();
- {
+ // Erase old peer affinity entries just to control table size
+ if ((now - _lastCleanedPeerAffinities) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) {
+ _lastCleanedPeerAffinities = now;
+ Address *k = (Address *)0;
+ _PA *v = (_PA *)0;
+ Mutex::Lock _l(_peerAffinities_m);
+ Hashtable< Address,_PA >::Iterator i(_peerAffinities);
+ while (i.next(k,v)) {
+ if ((now - v->ts) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5))
+ _peerAffinities.erase(*k);
+ }
+ }
+
+ // Announce peers that we have active direct paths to -- note that we forget paths
+ // that other cluster members claim they have, which prevents us from fighting
+ // with other cluster members (route flapping) over specific paths.
+ if ((now - _lastCheckedPeersForAnnounce) >= (ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD / 4)) {
+ _lastCheckedPeersForAnnounce = now;
+ _ClusterAnnouncePeers func(now,this);
+ RR->topology->eachPeer<_ClusterAnnouncePeers &>(func);
+ }
+
+ // Flush outgoing packet send queue every doPeriodicTasks()
+ if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) {
+ _lastFlushed = now;
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
@@ -564,26 +594,22 @@ void Cluster::removeMember(uint16_t memberId)
_memberIds = newMemberIds;
}
-bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload)
+bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload)
{
- if (!peerPhysicalAddress) // sanity check
- return false;
-
if (_addressToLocationFunction) {
// Pick based on location if it can be determined
int px = 0,py = 0,pz = 0;
if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast<const struct sockaddr_storage *>(&peerPhysicalAddress),&px,&py,&pz) == 0) {
- TRACE("no geolocation available for %s",peerPhysicalAddress.toIpString().c_str());
+ TRACE("no geolocation data for %s (geo-lookup is lazy/async so it may work next time)",peerPhysicalAddress.toIpString().c_str());
return false;
}
// Find member closest to this peer
const uint64_t now = RR->node->now();
- std::vector<InetAddress> best; // initial "best" is for peer to stay put
+ std::vector<InetAddress> best;
const double currentDistance = _dist3d(_x,_y,_z,px,py,pz);
double bestDistance = (offload ? 2147483648.0 : currentDistance);
unsigned int bestMember = _id;
- TRACE("%s is at %d,%d,%d -- looking for anyone closer than %d,%d,%d (%fkm)",peerPhysicalAddress.toString().c_str(),px,py,pz,_x,_y,_z,bestDistance);
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@@ -592,7 +618,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
// Consider member if it's alive and has sent us a location and one or more physical endpoints to send peers to
if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) {
- double mdist = _dist3d(m.x,m.y,m.z,px,py,pz);
+ const double mdist = _dist3d(m.x,m.y,m.z,px,py,pz);
if (mdist < bestDistance) {
bestDistance = mdist;
bestMember = *mid;
@@ -602,36 +628,16 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
}
}
- if (best.size() > 0) {
- TRACE("%s seems closer to %u at %fkm, suggesting redirect...",peerAddress.toString().c_str(),bestMember,bestDistance);
-
- /* if (peer->remoteVersionProtocol() >= 5) {
- // If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic
- } else { */
- // Otherwise send VERB_RENDEZVOUS for ourselves, which will trick peers into trying other endpoints for us even if they're too old for PUSH_DIRECT_PATHS
- for(std::vector<InetAddress>::const_iterator a(best.begin());a!=best.end();++a) {
- if ((a->ss_family == AF_INET)||(a->ss_family == AF_INET6)) {
- Packet outp(peerAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
- outp.append((uint8_t)0); // no flags
- RR->identity.address().appendTo(outp); // HACK: rendezvous with ourselves! with really old peers this will only work if I'm a root server!
- outp.append((uint16_t)a->port());
- if (a->ss_family == AF_INET) {
- outp.append((uint8_t)4);
- outp.append(a->rawIpData(),4);
- } else {
- outp.append((uint8_t)16);
- outp.append(a->rawIpData(),16);
- }
- RR->sw->send(outp,true,0);
- }
- }
- //}
-
- return true;
- } else {
- TRACE("peer %s is at [%d,%d,%d], distance to us is %f and this seems to be the best",peerAddress.toString().c_str(),px,py,pz,currentDistance);
- return false;
+ // Redirect to a closer member if it has a ZeroTier endpoint address in the same ss_family
+ for(std::vector<InetAddress>::const_iterator a(best.begin());a!=best.end();++a) {
+ if (a->ss_family == peerPhysicalAddress.ss_family) {
+ TRACE("%s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestDistance,bestMember,a->toString().c_str());
+ redirectTo = *a;
+ return true;
+ }
}
+ TRACE("%s at [%d,%d,%d] is %f from us, no better endpoints found",peerAddress.toString().c_str(),px,py,pz,currentDistance);
+ return false;
} else {
// TODO: pick based on load if no location info?
return false;
@@ -653,7 +659,7 @@ void Cluster::status(ZT_ClusterStatus &status) const
ms[_id]->x = _x;
ms[_id]->y = _y;
ms[_id]->z = _z;
- ms[_id]->peers = RR->topology->countAlive();
+ ms[_id]->peers = RR->topology->countActive();
for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) {
if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
break;
@@ -686,10 +692,12 @@ void Cluster::status(ZT_ClusterStatus &status) const
{
Mutex::Lock _l2(_peerAffinities_m);
- for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) {
- unsigned int mid = pi->clusterMemberId();
- if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT))
- ++ms[mid]->peers;
+ Address *k = (Address *)0;
+ _PA *v = (_PA *)0;
+ Hashtable< Address,_PA >::Iterator i(const_cast<Cluster *>(this)->_peerAffinities);
+ while (i.next(k,v)) {
+ if ( (ms[v->mid]) && (v->mid != _id) && ((now - v->ts) < ZT_PEER_ACTIVITY_TIMEOUT) )
+ ++ms[v->mid]->peers;
}
}
}