summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--node/Cluster.cpp15
-rw-r--r--node/Cluster.hpp2
-rw-r--r--node/SelfAwareness.cpp9
-rw-r--r--node/SelfAwareness.hpp10
-rw-r--r--service/ClusterGeoIpService.cpp10
-rw-r--r--service/OneService.cpp2
6 files changed, 31 insertions, 17 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index c943e62b..4088c967 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -202,7 +202,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
}
m.lastReceivedAliveAnnouncement = RR->node->now();
#ifdef ZT_TRACE
- TRACE("[%u] I'm alive! peers may be redirected to: %s",(unsigned int)fromMemberId,addrs.c_str());
+ TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str());
#endif
} break;
@@ -406,10 +406,12 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
_send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size());
}
+ TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
return true;
+ } else {
+ TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
+ return false;
}
-
- return false;
}
void Cluster::replicateHavePeer(const Identity &peerId)
@@ -564,11 +566,12 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
{
if (!peerPhysicalAddress) // sanity check
return false;
+
if (_addressToLocationFunction) {
// Pick based on location if it can be determined
int px = 0,py = 0,pz = 0;
if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast<const struct sockaddr_storage *>(&peerPhysicalAddress),&px,&py,&pz) == 0) {
- // No geo-info so no change
+ TRACE("no geolocation available for %s",peerPhysicalAddress.toIpString().c_str());
return false;
}
@@ -578,6 +581,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
const double currentDistance = _dist3d(_x,_y,_z,px,py,pz);
double bestDistance = (offload ? 2147483648.0 : currentDistance);
unsigned int bestMember = _id;
+ TRACE("%s is at %d,%d,%d -- looking for anyone closer than %d,%d,%d (%fkm)",peerPhysicalAddress.toString().c_str(),px,py,pz,_x,_y,_z,bestDistance);
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@@ -588,6 +592,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) {
double mdist = _dist3d(m.x,m.y,m.z,px,py,pz);
if (mdist < bestDistance) {
+ bestDistance = mdist;
bestMember = *mid;
best = m.zeroTierPhysicalEndpoints;
}
@@ -596,7 +601,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
}
if (best.size() > 0) {
- TRACE("peer %s is at [%d,%d,%d], distance to us is %f, sending to %u instead for better distance %f",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestMember,bestDistance);
+ TRACE("%s seems closer to %u at %fkm, suggesting redirect...",peerAddress.toString().c_str(),bestMember,bestDistance);
/* if (peer->remoteVersionProtocol() >= 5) {
// If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic
diff --git a/node/Cluster.hpp b/node/Cluster.hpp
index 6c9a2917..daa81185 100644
--- a/node/Cluster.hpp
+++ b/node/Cluster.hpp
@@ -145,7 +145,7 @@ public:
STATE_MESSAGE_RELAY = 5,
/**
- * Request to send a packet to a locally-known peer:
+ * Request that a cluster member send a packet to a locally-known peer:
* <[5] ZeroTier address of recipient>
* <[1] packet verb>
* <[2] length of packet payload>
diff --git a/node/SelfAwareness.cpp b/node/SelfAwareness.cpp
index 7329322a..1b70f17c 100644
--- a/node/SelfAwareness.cpp
+++ b/node/SelfAwareness.cpp
@@ -94,7 +94,7 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi
Mutex::Lock _l(_phy_m);
- PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,scope)];
+ PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,reporterPhysicalAddress,scope)];
if ((now - entry.ts) >= ZT_SELFAWARENESS_ENTRY_TIMEOUT) {
entry.mySurface = myPhysicalAddress;
@@ -105,14 +105,15 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi
entry.ts = now;
TRACE("learned physical address %s for scope %u as seen from %s(%s) (replaced %s, resetting all in scope)",myPhysicalAddress.toString().c_str(),(unsigned int)scope,reporter.toString().c_str(),reporterPhysicalAddress.toString().c_str(),entry.mySurface.toString().c_str());
- // Erase all entries (other than this one) for this scope to prevent thrashing
- // Note: we should probably not use 'entry' after this
+ // Erase all entries in this scope that were not reported by this remote address to prevent 'thrashing'
+ // due to multiple reports of endpoint change.
+ // Don't use 'entry' after this since hash table gets modified.
{
Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy);
PhySurfaceKey *k = (PhySurfaceKey *)0;
PhySurfaceEntry *e = (PhySurfaceEntry *)0;
while (i.next(k,e)) {
- if ((k->reporter != reporter)&&(k->scope == scope))
+ if ((k->reporterPhysicalAddress != reporterPhysicalAddress)&&(k->scope == scope))
_phy.erase(*k);
}
}
diff --git a/node/SelfAwareness.hpp b/node/SelfAwareness.hpp
index 3133553e..400b05e6 100644
--- a/node/SelfAwareness.hpp
+++ b/node/SelfAwareness.hpp
@@ -69,14 +69,14 @@ private:
struct PhySurfaceKey
{
Address reporter;
+ InetAddress reporterPhysicalAddress;
InetAddress::IpScope scope;
- inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); }
-
PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {}
- PhySurfaceKey(const Address &r,InetAddress::IpScope s) : reporter(r),scope(s) {}
- inline bool operator<(const PhySurfaceKey &k) const throw() { return ((reporter < k.reporter) ? true : ((reporter == k.reporter) ? ((int)scope < (int)k.scope) : false)); }
- inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(scope == k.scope)); }
+ PhySurfaceKey(const Address &r,const InetAddress &ra,InetAddress::IpScope s) : reporter(r),reporterPhysicalAddress(ra),scope(s) {}
+
+ inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); }
+ inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(reporterPhysicalAddress == k.reporterPhysicalAddress)&&(scope == k.scope)); }
};
struct PhySurfaceEntry
{
diff --git a/service/ClusterGeoIpService.cpp b/service/ClusterGeoIpService.cpp
index b47a9b2a..9baa7506 100644
--- a/service/ClusterGeoIpService.cpp
+++ b/service/ClusterGeoIpService.cpp
@@ -72,11 +72,14 @@ ClusterGeoIpService::~ClusterGeoIpService()
bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z)
{
+ InetAddress ipNoPort(ip);
+ ipNoPort.setPort(0); // we index cache by IP only
const uint64_t now = OSUtils::now();
+
bool r = false;
{
Mutex::Lock _l(_cache_m);
- std::map< InetAddress,_CE >::iterator c(_cache.find(ip));
+ std::map< InetAddress,_CE >::iterator c(_cache.find(ipNoPort));
if (c != _cache.end()) {
x = c->second.x;
y = c->second.y;
@@ -90,8 +93,9 @@ bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z)
{
Mutex::Lock _l(_sOutputLock);
if (_sOutputFd >= 0) {
- std::string ips(ip.toIpString());
+ std::string ips(ipNoPort.toIpString());
ips.push_back('\n');
+ //fprintf(stderr,"ClusterGeoIpService: << %s",ips.c_str());
::write(_sOutputFd,ips.data(),ips.length());
}
}
@@ -153,6 +157,7 @@ void ClusterGeoIpService::threadMain()
if ((buf[i] == '\n')||(buf[i] == '\r')) {
linebuf[lineptr] = (char)0;
if (lineptr > 0) {
+ //fprintf(stderr,"ClusterGeoIpService: >> %s\n",linebuf);
try {
std::vector<std::string> result(Utils::split(linebuf,",","",""));
if ((result.size() >= 7)&&(result[1] == "1")) {
@@ -163,6 +168,7 @@ void ClusterGeoIpService::threadMain()
ce.x = (int)::strtol(result[4].c_str(),(char **)0,10);
ce.y = (int)::strtol(result[5].c_str(),(char **)0,10);
ce.z = (int)::strtol(result[6].c_str(),(char **)0,10);
+ //fprintf(stderr,"ClusterGeoIpService: %s is at %d,%d,%d\n",rip.toIpString().c_str(),ce.x,ce.y,ce.z);
{
Mutex::Lock _l2(_cache_m);
_cache[rip] = ce;
diff --git a/service/OneService.cpp b/service/OneService.cpp
index a64d680b..729812ed 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -1473,6 +1473,7 @@ static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct soc
static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); }
+#ifdef ZT_ENABLE_CLUSTER
static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len)
{
OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
@@ -1485,6 +1486,7 @@ static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,
OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z));
}
+#endif
static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); }