From 98d856daa2488d3589cba058ec2d74e41dc53287 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Mon, 26 Oct 2015 17:58:51 -0700 Subject: Only send redirects to the sending InetAddress and only in response to a set of certain frame types to avoid potential race conditions. --- node/Cluster.hpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.hpp b/node/Cluster.hpp index be346659..b1266f27 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -247,12 +247,13 @@ public: /** * Redirect this peer to a better cluster member if needed * - * @param peerAddress Peer to (possibly) redirect + * @param peer Peer to (possibly) redirect + * @param localAddress Local address for path or NULL for none/any * @param peerPhysicalAddress Physical address of peer's current best path (where packet was most recently received or getBestPath()->address()) * @param offload Always redirect if possible -- can be used to offload peers during shutdown * @return True if peer was redirected */ - bool redirectPeer(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload); + bool redirectPeer(const SharedPtr &peer,const InetAddress &localAddress,const InetAddress &peerPhysicalAddress,bool offload); /** * Fill out ZT_ClusterStatus structure (from core API) -- cgit v1.2.3 From 69857b4ba8bb6ce341ca6dcdc03759fb901a831a Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 27 Oct 2015 09:36:48 -0700 Subject: Refactor cluster redirects to move code to push peers out of the actual Cluster function that checks for redirect, and clean up Peer::received() to be a bit more logical. --- node/Cluster.cpp | 54 ++++++++++++++---------------------------------------- node/Cluster.hpp | 17 ++++++++++++----- node/Peer.cpp | 53 ++++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 66 insertions(+), 58 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.cpp b/node/Cluster.cpp index d4e81ff8..a2a99ecd 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -44,11 +44,10 @@ #include "CertificateOfMembership.hpp" #include "Salsa20.hpp" #include "Poly1305.hpp" -#include "Packet.hpp" #include "Identity.hpp" -#include "Peer.hpp" +#include "Topology.hpp" +#include "Packet.hpp" #include "Switch.hpp" -#include "Node.hpp" namespace ZeroTier { @@ -566,17 +565,17 @@ void Cluster::removeMember(uint16_t memberId) _memberIds = newMemberIds; } -bool Cluster::redirectPeer(const SharedPtr &peer,const InetAddress &localAddress,const InetAddress &peerPhysicalAddress,bool offload) +InetAddress Cluster::findBetterEndpoint(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload) { if (!peerPhysicalAddress) // sanity check - return false; + return InetAddress(); if (_addressToLocationFunction) { // Pick based on location if it can be determined int px = 0,py = 0,pz = 0; if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast(&peerPhysicalAddress),&px,&py,&pz) == 0) { - TRACE("NO GEOLOCATION available for %s",peerPhysicalAddress.toIpString().c_str()); - return false; + TRACE("no geolocation data for %s (geo-lookup is lazy/async so it may work next time)",peerPhysicalAddress.toIpString().c_str()); + return InetAddress(); } // Find member closest to this peer @@ -585,7 +584,6 @@ bool Cluster::redirectPeer(const SharedPtr &peer,const InetAddress &localA const double currentDistance = _dist3d(_x,_y,_z,px,py,pz); double bestDistance = (offload ? 2147483648.0 : currentDistance); unsigned int bestMember = _id; - TRACE("%s is at %d,%d,%d -- looking for anyone closer than %d,%d,%d (%fkm)",peerPhysicalAddress.toString().c_str(),px,py,pz,_x,_y,_z,bestDistance); { Mutex::Lock _l(_memberIds_m); for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -604,41 +602,17 @@ bool Cluster::redirectPeer(const SharedPtr &peer,const InetAddress &localA } } - if (best.size() > 0) { - TRACE("%s seems closer to %u at %fkm, suggesting redirect...",peer->address().toString().c_str(),bestMember,bestDistance); - - /* if (peer->remoteVersionProtocol() >= 5) { - // If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic - } else { */ - // Otherwise send VERB_RENDEZVOUS for ourselves, which will trick peers into trying other endpoints for us even if they're too old for PUSH_DIRECT_PATHS - for(std::vector::const_iterator a(best.begin());a!=best.end();++a) { - if (a->ss_family == peerPhysicalAddress.ss_family) { - Packet outp(peer->address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); - outp.append((uint8_t)0); // no flags - RR->identity.address().appendTo(outp); // HACK: rendezvous with ourselves! with really old peers this will only work if I'm a root server! - outp.append((uint16_t)a->port()); - if (a->ss_family == AF_INET) { - outp.append((uint8_t)4); - outp.append(a->rawIpData(),4); - } else { - outp.append((uint8_t)16); - outp.append(a->rawIpData(),16); - } - outp.armor(peer->key(),true); - RR->antiRec->logOutgoingZT(outp.data(),outp.size()); - RR->node->putPacket(localAddress,peerPhysicalAddress,outp.data(),outp.size()); - } - } - //} - - return true; - } else { - //TRACE("peer %s is at [%d,%d,%d], distance to us is %f and this seems to be the best",peer->address().toString().c_str(),px,py,pz,currentDistance); - return false; + for(std::vector::const_iterator a(best.begin());a!=best.end();++a) { + if (a->ss_family == peerPhysicalAddress.ss_family) { + TRACE("%s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestDistance,bestMember,a->toString().c_str()); + return *a; + } } + TRACE("%s at [%d,%d,%d] is %f from us, no better endpoints found",peerAddress.toString().c_str(),px,py,pz,currentDistance); + return InetAddress(); } else { // TODO: pick based on load if no location info? - return false; + return InetAddress(); } } diff --git a/node/Cluster.hpp b/node/Cluster.hpp index b1266f27..080c9310 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -245,15 +245,22 @@ public: void removeMember(uint16_t memberId); /** - * Redirect this peer to a better cluster member if needed + * Find a better cluster endpoint for this peer * - * @param peer Peer to (possibly) redirect - * @param localAddress Local address for path or NULL for none/any + * If this endpoint appears to be the best, a NULL/0 InetAddres is returned. + * Otherwise the InetAddress of a better endpoint is returned and the peer + * can then then be told to contact us there. + * + * Redirection is only done within the same address family, so the returned + * endpoint will always be the same ss_family as the supplied physical + * address. + * + * @param peerAddress Address of peer to (possibly) redirect * @param peerPhysicalAddress Physical address of peer's current best path (where packet was most recently received or getBestPath()->address()) * @param offload Always redirect if possible -- can be used to offload peers during shutdown - * @return True if peer was redirected + * @return InetAddress or NULL if there does not seem to be a better endpoint */ - bool redirectPeer(const SharedPtr &peer,const InetAddress &localAddress,const InetAddress &peerPhysicalAddress,bool offload); + InetAddress findBetterEndpoint(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload); /** * Fill out ZT_ClusterStatus structure (from core API) diff --git a/node/Peer.cpp b/node/Peer.cpp index 4f2fe931..31a20eea 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -35,6 +35,7 @@ #include "AntiRecursion.hpp" #include "SelfAwareness.hpp" #include "Cluster.hpp" +#include "Packet.hpp" #include @@ -81,9 +82,28 @@ void Peer::received( Packet::Verb inReVerb) { #ifdef ZT_ENABLE_CLUSTER - if ((RR->cluster)&&(hops == 0)&&(verb != VERB_OK)&&(verb != VERB_ERROR)&&(verb != VERB_RENDEZVOUS)&&(verb != VERB_PUSH_DIRECT_PATHS)) { - if (RR->cluster->redirectPeer(SharedPtr(this),localAddr,remoteAddr,false)) - return; + bool redirected = false; + if ((RR->cluster)&&(hops == 0)&&(verb != Packet::VERB_OK)&&(verb != Packet::VERB_ERROR)&&(verb != Packet::VERB_RENDEZVOUS)&&(verb != Packet::VERB_PUSH_DIRECT_PATHS)) { + InetAddress redirectTo(RR->cluster->findBetterEndpoint(_id.address(),remoteAddr,false)); + if (redirectTo) { + // For older peers we send RENDEZVOUS with ourselves. This will only work if we are + // a root server. + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); + outp.append((uint8_t)0); // no flags + RR->identity.address().appendTo(outp); + outp.append((uint16_t)redirectTo.port()); + if (redirectTo.ss_family == AF_INET) { + outp.append((uint8_t)4); + outp.append(redirectTo.rawIpData(),4); + } else { + outp.append((uint8_t)16); + outp.append(redirectTo.rawIpData(),16); + } + outp.armor(_key,true); + RR->antiRec->logOutgoingZT(outp.data(),outp.size()); + RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); + redirected = true; + } } #endif @@ -95,6 +115,23 @@ void Peer::received( Mutex::Lock _l(_lock); _lastReceive = now; + if ((verb == Packet::VERB_FRAME)||(verb == Packet::VERB_EXT_FRAME)) + _lastUnicastFrame = now; + else if (verb == Packet::VERB_MULTICAST_FRAME) + _lastMulticastFrame = now; + +#ifdef ZT_ENABLE_CLUSTER + // If we're in cluster mode and have sent the peer a better endpoint, stop + // here and don't confirm paths, replicate multicast info, etc. The new + // endpoint should do that. + if (redirected) + return; +#endif + + if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) { + _lastAnnouncedTo = now; + needMulticastGroupAnnounce = true; + } if (hops == 0) { unsigned int np = _numPaths; @@ -144,16 +181,6 @@ void Peer::received( } } } - - if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) { - _lastAnnouncedTo = now; - needMulticastGroupAnnounce = true; - } - - if ((verb == Packet::VERB_FRAME)||(verb == Packet::VERB_EXT_FRAME)) - _lastUnicastFrame = now; - else if (verb == Packet::VERB_MULTICAST_FRAME) - _lastMulticastFrame = now; } // end _lock #ifdef ZT_ENABLE_CLUSTER -- cgit v1.2.3 From 8a7a0b6b88b2de95ebf98cafc183f7c69ec6c84f Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 27 Oct 2015 10:37:39 -0700 Subject: Cleanup, including simplification of root server picking algorithm since we no longer need all that craziness. --- node/Cluster.cpp | 2 +- node/Cluster.hpp | 15 ++++++++++++++- node/Topology.cpp | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.cpp b/node/Cluster.cpp index a2a99ecd..bd455933 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -239,7 +239,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) const MAC mac(dmsg.field(ptr,6),6); ptr += 6; const uint32_t adi = dmsg.at(ptr); ptr += 4; RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address); - TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid); + TRACE("[%u] %s likes %s/%.8x on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid); } break; case STATE_MESSAGE_COM: { diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 080c9310..7d0c6a08 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -158,7 +158,20 @@ public: * while PROXY_SEND is used to implement proxy sending (which right * now is only used to send RENDEZVOUS). */ - STATE_MESSAGE_PROXY_SEND = 6 + STATE_MESSAGE_PROXY_SEND = 6, + + /** + * Replicate a network config for a network we belong to: + * <[8] 64-bit network ID> + * <[2] 16-bit length of network config> + * <[...] serialized network config> + * + * This is used by clusters to avoid every member having to query + * for the same netconf for networks all members belong to. + * + * TODO: not implemented yet! + */ + STATE_MESSAGE_NETWORK_CONFIG = 7 }; /** diff --git a/node/Topology.cpp b/node/Topology.cpp index 6a72cf8c..49854f0e 100644 --- a/node/Topology.cpp +++ b/node/Topology.cpp @@ -215,10 +215,45 @@ SharedPtr Topology::getBestRoot(const Address *avoid,unsigned int avoidCou } } } + } else { /* If I am not a root server, the best root server is the active one with * the lowest latency. */ + unsigned int bestLatencyOverall = ~((unsigned int)0); + unsigned int bestLatencyNotAvoid = ~((unsigned int)0); + const SharedPtr *bestOverall = (const SharedPtr *)0; + const SharedPtr *bestNotAvoid = (const SharedPtr *)0; + + for(std::vector< SharedPtr >::const_iterator r(_rootPeers.begin());r!=_rootPeers.end();++r) { + if ((*r)->hasActiveDirectPath(now)) { + bool avoiding = false; + for(unsigned int i=0;iaddress()) { + avoiding = true; + break; + } + } + unsigned int l = (*r)->latency(); + if (!l) l = ~l; // zero latency indicates no measurment, so make this 'max' + if (l <= bestLatencyOverall) { + bestLatencyOverall = l; + bestOverall = &(*r); + } + if ((!avoiding)&&(l <= bestLatencyNotAvoid)) { + bestLatencyNotAvoid = l; + bestNotAvoid = &(*r); + } + } + } + + if (bestNotAvoid) + return *bestNotAvoid; + else if ((!strictAvoid)&&(bestOverall)) + return *bestOverall; + return SharedPtr(); + + /* unsigned int l,bestLatency = 65536; uint64_t lds,ldr; @@ -278,6 +313,7 @@ keep_searching_for_roots: } } } + */ } if (bestRoot) -- cgit v1.2.3 From cfe166ef359c0d92b1521e6c127e2b92238c0731 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 27 Oct 2015 12:29:01 -0700 Subject: Tweak some size limits. --- include/ZeroTierOne.h | 2 +- node/Cluster.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h index 7af4f760..7371b9f0 100644 --- a/include/ZeroTierOne.h +++ b/include/ZeroTierOne.h @@ -141,7 +141,7 @@ extern "C" { /** * Maximum allowed cluster message length in bytes */ -#define ZT_CLUSTER_MAX_MESSAGE_LENGTH (1444 * 6) +#define ZT_CLUSTER_MAX_MESSAGE_LENGTH (1444 * 4) /** * A null/empty sockaddr (all zero) to signify an unspecified socket address diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 7d0c6a08..bb7d3b39 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -57,7 +57,7 @@ /** * Desired period between doPeriodicTasks() in milliseconds */ -#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100 +#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 50 namespace ZeroTier { -- cgit v1.2.3 From f692cec763d67caae54a4f47446657c390563319 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 27 Oct 2015 14:04:12 -0700 Subject: Change how cluster relays packets -- just PROXY_UNITE and then send packet via normal ZeroTier front plane -- more efficient and eliminates fragmentation issues. --- include/ZeroTierOne.h | 2 +- node/Cluster.cpp | 195 +++++++++++++++++++++++++------------------------- node/Cluster.hpp | 22 +++--- node/Switch.cpp | 37 +++++----- node/Switch.hpp | 8 +-- 5 files changed, 133 insertions(+), 131 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h index 7371b9f0..3457634b 100644 --- a/include/ZeroTierOne.h +++ b/include/ZeroTierOne.h @@ -141,7 +141,7 @@ extern "C" { /** * Maximum allowed cluster message length in bytes */ -#define ZT_CLUSTER_MAX_MESSAGE_LENGTH (1444 * 4) +#define ZT_CLUSTER_MAX_MESSAGE_LENGTH (1500 - 48) /** * A null/empty sockaddr (all zero) to signify an unspecified socket address diff --git a/node/Cluster.cpp b/node/Cluster.cpp index eef02bc7..c18663bc 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -250,102 +250,87 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) } } break; - case STATE_MESSAGE_RELAY: { + case STATE_MESSAGE_PROXY_UNITE: { + const Address localPeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + const Address remotePeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; const unsigned int numRemotePeerPaths = dmsg[ptr++]; InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max for(unsigned int i=0;i(ptr); ptr += 2; - const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen; - - if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address - const Address destinationAddress(reinterpret_cast(packet) + 8,ZT_ADDRESS_LENGTH); - TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths); - - SharedPtr destinationPeer(RR->topology->getPeer(destinationAddress)); - if (destinationPeer) { - if ( - (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&& - (numRemotePeerPaths > 0)&& - (packetLen >= 18)&& - (reinterpret_cast(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) - ) { - // If remote peer paths were sent with this relayed packet, we do - // RENDEZVOUS. It's handled here for cluster-relayed packets since - // we don't have both Peer records so this is a different path. - - const Address remotePeerAddress(reinterpret_cast(packet) + 13,ZT_ADDRESS_LENGTH); - - InetAddress bestDestV4,bestDestV6; - destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6); - InetAddress bestRemoteV4,bestRemoteV6; - for(unsigned int i=0;iidentity.address(),Packet::VERB_RENDEZVOUS); - rendezvousForDest.append((uint8_t)0); - remotePeerAddress.appendTo(rendezvousForDest); - - Buffer<2048> rendezvousForOtherEnd; - remotePeerAddress.appendTo(rendezvousForOtherEnd); - rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS); - const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size(); - rendezvousForOtherEnd.addSize(2); // space for actual packet payload length - rendezvousForOtherEnd.append((uint8_t)0); // flags == 0 - destinationAddress.appendTo(rendezvousForOtherEnd); - - bool haveMatch = false; - if ((bestDestV6)&&(bestRemoteV6)) { - haveMatch = true; - - rendezvousForDest.append((uint16_t)bestRemoteV6.port()); - rendezvousForDest.append((uint8_t)16); - rendezvousForDest.append(bestRemoteV6.rawIpData(),16); - - rendezvousForOtherEnd.append((uint16_t)bestDestV6.port()); - rendezvousForOtherEnd.append((uint8_t)16); - rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16); - rendezvousForOtherEnd.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); - } else if ((bestDestV4)&&(bestRemoteV4)) { - haveMatch = true; - - rendezvousForDest.append((uint16_t)bestRemoteV4.port()); - rendezvousForDest.append((uint8_t)4); - rendezvousForDest.append(bestRemoteV4.rawIpData(),4); - - rendezvousForOtherEnd.append((uint16_t)bestDestV4.port()); - rendezvousForOtherEnd.append((uint8_t)4); - rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4); - rendezvousForOtherEnd.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); - } - if (haveMatch) { - _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size()); - RR->sw->send(rendezvousForDest,true,0); - } + TRACE("[%u] requested proxy unite between local peer %s and remote peer %s",(unsigned int)fromMemberId,localPeerAddress.toString().c_str(),remotePeerAddress.toString().c_str()); + + SharedPtr localPeer(RR->topology->getPeer(localPeerAddress)); + if ((localPeer)&&(numRemotePeerPaths > 0)) { + InetAddress bestLocalV4,bestLocalV6; + localPeer->getBestActiveAddresses(RR->node->now(),bestLocalV4,bestLocalV6); + + InetAddress bestRemoteV4,bestRemoteV6; + for(unsigned int i=0;iidentity.address(),Packet::VERB_RENDEZVOUS); + rendezvousForLocal.append((uint8_t)0); + remotePeerAddress.appendTo(rendezvousForLocal); + + Buffer<2048> rendezvousForRemote; + remotePeerAddress.appendTo(rendezvousForRemote); + rendezvousForRemote.append((uint8_t)Packet::VERB_RENDEZVOUS); + const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForRemote.size(); + rendezvousForRemote.addSize(2); // space for actual packet payload length + rendezvousForRemote.append((uint8_t)0); // flags == 0 + localPeerAddress.appendTo(rendezvousForRemote); + + bool haveMatch = false; + if ((bestLocalV6)&&(bestRemoteV6)) { + haveMatch = true; + + rendezvousForLocal.append((uint16_t)bestRemoteV6.port()); + rendezvousForLocal.append((uint8_t)16); + rendezvousForLocal.append(bestRemoteV6.rawIpData(),16); + + rendezvousForRemote.append((uint16_t)bestLocalV6.port()); + rendezvousForRemote.append((uint8_t)16); + rendezvousForRemote.append(bestLocalV6.rawIpData(),16); + rendezvousForRemote.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); + } else if ((bestLocalV4)&&(bestRemoteV4)) { + haveMatch = true; + + rendezvousForLocal.append((uint16_t)bestRemoteV4.port()); + rendezvousForLocal.append((uint8_t)4); + rendezvousForLocal.append(bestRemoteV4.rawIpData(),4); + + rendezvousForRemote.append((uint16_t)bestLocalV4.port()); + rendezvousForRemote.append((uint8_t)4); + rendezvousForRemote.append(bestLocalV4.rawIpData(),4); + rendezvousForRemote.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); + } + + if (haveMatch) { + _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); + RR->sw->send(rendezvousForLocal,true,0); + } } } break; case STATE_MESSAGE_PROXY_SEND: { - const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); + const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; const unsigned int len = dmsg.at(ptr); ptr += 2; Packet outp(rcpt,RR->identity.address(),verb); - outp.append(dmsg.field(ptr,len),len); + outp.append(dmsg.field(ptr,len),len); ptr += len; RR->sw->send(outp,true,0); TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); } break; @@ -364,13 +349,13 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) } } -bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len) +bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite) { if (len > 16384) // sanity check return false; uint64_t mostRecentTimestamp = 0; - uint16_t canHasPeer = 0; + unsigned int canHasPeer = 0; { // Anyone got this peer? Mutex::Lock _l2(_peerAffinities_m); @@ -387,25 +372,37 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee const uint64_t now = RR->node->now(); if ((now - mostRecentTimestamp) < ZT_PEER_ACTIVITY_TIMEOUT) { - Buffer<16384> buf; - - InetAddress v4,v6; - if (fromPeerAddress) { - SharedPtr fromPeer(RR->topology->getPeer(fromPeerAddress)); - if (fromPeer) - fromPeer->getBestActiveAddresses(now,v4,v6); + Buffer<2048> buf; + + if (unite) { + InetAddress v4,v6; + if (fromPeerAddress) { + SharedPtr fromPeer(RR->topology->getPeer(fromPeerAddress)); + if (fromPeer) + fromPeer->getBestActiveAddresses(now,v4,v6); + } + uint8_t addrCount = 0; + if (v4) + ++addrCount; + if (v6) + ++addrCount; + if (addrCount) { + toPeerAddress.appendTo(buf); + fromPeerAddress.appendTo(buf); + buf.append(addrCount); + if (v4) + v4.serialize(buf); + if (v6) + v6.serialize(buf); + } } - buf.append((uint8_t)( (v4) ? ((v6) ? 2 : 1) : ((v6) ? 1 : 0) )); - if (v4) - v4.serialize(buf); - if (v6) - v6.serialize(buf); - buf.append((uint16_t)len); - buf.append(data,len); { Mutex::Lock _l2(_members[canHasPeer].lock); - _send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size()); + if (buf.size() > 0) + _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); + if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0) + RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len); } TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); diff --git a/node/Cluster.hpp b/node/Cluster.hpp index bb7d3b39..282d8120 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -57,7 +57,7 @@ /** * Desired period between doPeriodicTasks() in milliseconds */ -#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 50 +#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100 namespace ZeroTier { @@ -136,13 +136,18 @@ public: STATE_MESSAGE_COM = 4, /** - * Relay a packet to a peer: - * <[1] 8-bit number of sending peer active path addresses> - * <[...] series of serialized InetAddresses of sending peer's paths> - * <[2] 16-bit packet length> - * <[...] packet or packet fragment> + * Request that VERB_RENDEZVOUS be sent to a peer that we have: + * <[5] ZeroTier address of peer on recipient's side> + * <[5] ZeroTier address of peer on sender's side> + * <[1] 8-bit number of sender's peer's active path addresses> + * <[...] series of serialized InetAddresses of sender's peer's paths> + * + * This requests that we perform NAT-t introduction between a peer that + * we have and one on the sender's side. The sender furnishes contact + * info for its peer, and we send VERB_RENDEZVOUS to both sides: to ours + * directly and with PROXY_SEND to theirs. */ - STATE_MESSAGE_RELAY = 5, + STATE_MESSAGE_PROXY_UNITE = 5, /** * Request that a cluster member send a packet to a locally-known peer: @@ -211,9 +216,10 @@ public: * @param toPeerAddress Destination peer address * @param data Packet or packet fragment data * @param len Length of packet or fragment + * @param unite If true, also request proxy unite across cluster * @return True if this data was sent via another cluster member, false if none have this peer */ - bool sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len); + bool sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite); /** * Advertise to the cluster that we have this peer diff --git a/node/Switch.cpp b/node/Switch.cpp index 709ed802..772eaf02 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -303,11 +303,10 @@ void Switch::send(const Packet &packet,bool encrypt,uint64_t nwid) } } -bool Switch::unite(const Address &p1,const Address &p2,bool force) +bool Switch::unite(const Address &p1,const Address &p2) { if ((p1 == RR->identity.address())||(p2 == RR->identity.address())) return false; - SharedPtr p1p = RR->topology->getPeer(p1); if (!p1p) return false; @@ -317,14 +316,6 @@ bool Switch::unite(const Address &p1,const Address &p2,bool force) const uint64_t now = RR->node->now(); - { - Mutex::Lock _l(_lastUniteAttempt_m); - uint64_t &luts = _lastUniteAttempt[_LastUniteKey(p1,p2)]; - if (((now - luts) < ZT_MIN_UNITE_INTERVAL)&&(!force)) - return false; - luts = now; - } - std::pair cg(Peer::findCommonGround(*p1p,*p2p,now)); if ((!(cg.first))||(cg.first.ipScope() != cg.second.ipScope())) return false; @@ -571,7 +562,7 @@ void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const Inet SharedPtr relayTo = RR->topology->getPeer(destination); if ((!relayTo)||(!relayTo->send(RR,fragment.data(),fragment.size(),RR->node->now()))) { #ifdef ZT_ENABLE_CLUSTER - if ((RR->cluster)&&(RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size()))) + if ((RR->cluster)&&(RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false))) return; // sent by way of another member of this cluster #endif @@ -634,7 +625,8 @@ void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const Inet void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) { - SharedPtr packet(new IncomingPacket(data,len,localAddr,fromAddr,RR->node->now())); + const uint64_t now = RR->node->now(); + SharedPtr packet(new IncomingPacket(data,len,localAddr,fromAddr,now)); Address source(packet->source()); Address destination(packet->destination()); @@ -652,17 +644,18 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr packet->incrementHops(); SharedPtr relayTo = RR->topology->getPeer(destination); - if ((relayTo)&&((relayTo->send(RR,packet->data(),packet->size(),RR->node->now())))) { - unite(source,destination,false); + if ((relayTo)&&((relayTo->send(RR,packet->data(),packet->size(),now)))) { + if (_shouldTryUnite(now,source,destination)) + unite(source,destination); } else { #ifdef ZT_ENABLE_CLUSTER - if ((RR->cluster)&&(RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size()))) + if ((RR->cluster)&&(RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size(),_shouldTryUnite(now,source,destination)))) return; // sent by way of another member of this cluster #endif relayTo = RR->topology->getBestRoot(&source,1,true); if (relayTo) - relayTo->send(RR,packet->data(),packet->size(),RR->node->now()); + relayTo->send(RR,packet->data(),packet->size(),now); } } else { TRACE("dropped relay %s(%s) -> %s, max hops exceeded",packet->source().toString().c_str(),fromAddr.toString().c_str(),destination.toString().c_str()); @@ -677,7 +670,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr if (!dq.creationTime) { // If we have no other fragments yet, create an entry and save the head - dq.creationTime = RR->node->now(); + dq.creationTime = now; dq.frag0 = packet; dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment dq.haveFragments = 1; // head is first bit (left to right) @@ -805,4 +798,14 @@ bool Switch::_trySend(const Packet &packet,bool encrypt,uint64_t nwid) return false; } +bool Switch::_shouldTryUnite(const uint64_t now,const Address &p1,const Address &p2) +{ + Mutex::Lock _l(_lastUniteAttempt_m); + uint64_t &luts = _lastUniteAttempt[_LastUniteKey(p1,p2)]; + if ((now - luts) < ZT_MIN_UNITE_INTERVAL) + return false; + luts = now; + return true; +} + } // namespace ZeroTier diff --git a/node/Switch.hpp b/node/Switch.hpp index 3bdc0c47..42e87ca5 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -127,15 +127,10 @@ public: * This only works if both peers are known, with known working direct * links to this peer. The best link for each peer is sent to the other. * - * A rate limiter is in effect via the _lastUniteAttempt map. If force - * is true, a unite attempt is made even if one has been made less than - * ZT_MIN_UNITE_INTERVAL milliseconds ago. - * * @param p1 One of two peers (order doesn't matter) * @param p2 Second of pair - * @param force If true, send now regardless of interval */ - bool unite(const Address &p1,const Address &p2,bool force); + bool unite(const Address &p1,const Address &p2); /** * Attempt NAT traversal to peer at a given physical address @@ -185,6 +180,7 @@ private: void _handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len); Address _sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted); bool _trySend(const Packet &packet,bool encrypt,uint64_t nwid); + bool _shouldTryUnite(const uint64_t now,const Address &p1,const Address &p2); const RuntimeEnvironment *const RR; uint64_t _lastBeaconResponse; -- cgit v1.2.3 From 40976c02a42b8e9078519f92a7c7412b8464e9bc Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 27 Oct 2015 14:37:38 -0700 Subject: Forget paths to peers if we are handing them off. --- node/Cluster.cpp | 14 ++++----- node/Cluster.hpp | 15 +++------- node/Peer.cpp | 88 +++++++++++++++++++++++++++++--------------------------- 3 files changed, 56 insertions(+), 61 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.cpp b/node/Cluster.cpp index c18663bc..73ff5846 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -563,17 +563,14 @@ void Cluster::removeMember(uint16_t memberId) _memberIds = newMemberIds; } -InetAddress Cluster::findBetterEndpoint(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload) +bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload) { - if (!peerPhysicalAddress) // sanity check - return InetAddress(); - if (_addressToLocationFunction) { // Pick based on location if it can be determined int px = 0,py = 0,pz = 0; if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast(&peerPhysicalAddress),&px,&py,&pz) == 0) { TRACE("no geolocation data for %s (geo-lookup is lazy/async so it may work next time)",peerPhysicalAddress.toIpString().c_str()); - return InetAddress(); + return false; } // Find member closest to this peer @@ -603,14 +600,15 @@ InetAddress Cluster::findBetterEndpoint(const Address &peerAddress,const InetAdd for(std::vector::const_iterator a(best.begin());a!=best.end();++a) { if (a->ss_family == peerPhysicalAddress.ss_family) { TRACE("%s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestDistance,bestMember,a->toString().c_str()); - return *a; + redirectTo = *a; + return true; } } TRACE("%s at [%d,%d,%d] is %f from us, no better endpoints found",peerAddress.toString().c_str(),px,py,pz,currentDistance); - return InetAddress(); + return false; } else { // TODO: pick based on load if no location info? - return InetAddress(); + return false; } } diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 282d8120..45395b0f 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -264,22 +264,15 @@ public: void removeMember(uint16_t memberId); /** - * Find a better cluster endpoint for this peer - * - * If this endpoint appears to be the best, a NULL/0 InetAddres is returned. - * Otherwise the InetAddress of a better endpoint is returned and the peer - * can then then be told to contact us there. - * - * Redirection is only done within the same address family, so the returned - * endpoint will always be the same ss_family as the supplied physical - * address. + * Find a better cluster endpoint for this peer (if any) * + * @param redirectTo InetAddress to be set to a better endpoint (if there is one) * @param peerAddress Address of peer to (possibly) redirect * @param peerPhysicalAddress Physical address of peer's current best path (where packet was most recently received or getBestPath()->address()) * @param offload Always redirect if possible -- can be used to offload peers during shutdown - * @return InetAddress or NULL if there does not seem to be a better endpoint + * @return True if redirectTo was set to a new address, false if redirectTo was not modified */ - InetAddress findBetterEndpoint(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload); + bool findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload); /** * Fill out ZT_ClusterStatus structure (from core API) diff --git a/node/Peer.cpp b/node/Peer.cpp index d5367b17..009e2be5 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -81,47 +81,49 @@ void Peer::received( Packet::Verb inReVerb) { #ifdef ZT_ENABLE_CLUSTER - bool redirected = false; - if ((RR->cluster)&&(hops == 0)&&(verb != Packet::VERB_OK)&&(verb != Packet::VERB_ERROR)&&(verb != Packet::VERB_RENDEZVOUS)&&(verb != Packet::VERB_PUSH_DIRECT_PATHS)) { - InetAddress redirectTo(RR->cluster->findBetterEndpoint(_id.address(),remoteAddr,false)); - if ((redirectTo.ss_family == AF_INET)||(redirectTo.ss_family == AF_INET6)) { - if (_vProto >= 5) { - // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS. - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); - outp.append((uint16_t)1); // count == 1 - outp.append((uint8_t)0); // no flags - outp.append((uint16_t)0); // no extensions - if (redirectTo.ss_family == AF_INET) { - outp.append((uint8_t)4); - outp.append((uint8_t)6); - outp.append(redirectTo.rawIpData(),4); - } else { - outp.append((uint8_t)6); - outp.append((uint8_t)18); - outp.append(redirectTo.rawIpData(),16); - } - outp.append((uint16_t)redirectTo.port()); - outp.armor(_key,true); - RR->antiRec->logOutgoingZT(outp.data(),outp.size()); - RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); - } else { - // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere. - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); - outp.append((uint8_t)0); // no flags - RR->identity.address().appendTo(outp); - outp.append((uint16_t)redirectTo.port()); - if (redirectTo.ss_family == AF_INET) { - outp.append((uint8_t)4); - outp.append(redirectTo.rawIpData(),4); + InetAddress redirectTo; + if ((RR->cluster)&&(hops == 0)) { + // Note: findBetterEndpoint() is first since we still want to check + // for a better endpoint even if we don't actually send a redirect. + if ( (RR->cluster->findBetterEndpoint(redirectTo,_id.address(),remoteAddr,false)) && (verb != Packet::VERB_OK)&&(verb != Packet::VERB_ERROR)&&(verb != Packet::VERB_RENDEZVOUS)&&(verb != Packet::VERB_PUSH_DIRECT_PATHS) ) { + if ((redirectTo.ss_family == AF_INET)||(redirectTo.ss_family == AF_INET6)) { + if (_vProto >= 5) { + // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS. + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); + outp.append((uint16_t)1); // count == 1 + outp.append((uint8_t)0); // no flags + outp.append((uint16_t)0); // no extensions + if (redirectTo.ss_family == AF_INET) { + outp.append((uint8_t)4); + outp.append((uint8_t)6); + outp.append(redirectTo.rawIpData(),4); + } else { + outp.append((uint8_t)6); + outp.append((uint8_t)18); + outp.append(redirectTo.rawIpData(),16); + } + outp.append((uint16_t)redirectTo.port()); + outp.armor(_key,true); + RR->antiRec->logOutgoingZT(outp.data(),outp.size()); + RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); } else { - outp.append((uint8_t)16); - outp.append(redirectTo.rawIpData(),16); + // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere. + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); + outp.append((uint8_t)0); // no flags + RR->identity.address().appendTo(outp); + outp.append((uint16_t)redirectTo.port()); + if (redirectTo.ss_family == AF_INET) { + outp.append((uint8_t)4); + outp.append(redirectTo.rawIpData(),4); + } else { + outp.append((uint8_t)16); + outp.append(redirectTo.rawIpData(),16); + } + outp.armor(_key,true); + RR->antiRec->logOutgoingZT(outp.data(),outp.size()); + RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); } - outp.armor(_key,true); - RR->antiRec->logOutgoingZT(outp.data(),outp.size()); - RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); } - redirected = true; } } #endif @@ -140,11 +142,13 @@ void Peer::received( _lastMulticastFrame = now; #ifdef ZT_ENABLE_CLUSTER - // If we're in cluster mode and have sent the peer a better endpoint, stop - // here and don't confirm paths, replicate multicast info, etc. The new - // endpoint should do that. - if (redirected) + // If we're in cluster mode and there's a better endpoint, stop here and don't + // learn or confirm paths. Also reset any existing paths, since they should + // go there and no longer talk to us here. + if (redirectTo) { + _numPaths = 0; return; + } #endif if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) { -- cgit v1.2.3 From 6399f6f0940b6f20819d021a0dc3dcf0d289f002 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 27 Oct 2015 15:02:15 -0700 Subject: This no longer has to be quite so fast. --- node/Cluster.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 45395b0f..bab56785 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -57,7 +57,7 @@ /** * Desired period between doPeriodicTasks() in milliseconds */ -#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100 +#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 250 namespace ZeroTier { -- cgit v1.2.3 From cc6080fe3898ddd1419050ee3a2c45cc87dd140b Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 27 Oct 2015 15:57:26 -0700 Subject: (1) No need to confirm if we are a root (small optimization), (2) Refactor peer affinity tracking. --- node/Cluster.cpp | 130 +++++++++++++++++++++++-------------------------------- node/Cluster.hpp | 29 ++++--------- node/Peer.cpp | 70 +++++++++++++++--------------- 3 files changed, 98 insertions(+), 131 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.cpp b/node/Cluster.cpp index 07ca0ba1..b2f3d585 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -82,7 +82,8 @@ Cluster::Cluster( _z(z), _id(id), _zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints), - _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]) + _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]), + _peerAffinities(65536) { uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; @@ -214,19 +215,12 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) ptr += id.deserialize(dmsg,ptr); if (id) { RR->topology->saveIdentity(id); - - { // Add or update peer affinity entry - _PeerAffinity pa(id.address(),fromMemberId,RR->node->now()); + { Mutex::Lock _l2(_peerAffinities_m); - std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n)) - if ((i != _peerAffinities.end())&&(i->key == pa.key)) { - i->timestamp = pa.timestamp; - } else { - _peerAffinities.push_back(pa); - std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now - } + _PA &pa = _peerAffinities[id.address()]; + pa.ts = RR->node->now(); + pa.mid = fromMemberId; } - TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str()); } } catch ( ... ) { @@ -355,83 +349,66 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee if (len > 16384) // sanity check return false; - uint64_t mostRecentTimestamp = 0; + const uint64_t now = RR->node->now(); unsigned int canHasPeer = 0; { // Anyone got this peer? Mutex::Lock _l2(_peerAffinities_m); - std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),_PeerAffinity(toPeerAddress,0,0))); // O(log(n)) - while ((i != _peerAffinities.end())&&(i->address() == toPeerAddress)) { - const uint16_t mid = i->clusterMemberId(); - if ((mid != _id)&&(i->timestamp > mostRecentTimestamp)) { - mostRecentTimestamp = i->timestamp; - canHasPeer = mid; - } - ++i; - } + _PA *pa = _peerAffinities.get(toPeerAddress); + if ((pa)&&(pa->mid != _id)&&((now - pa->ts) < ZT_PEER_ACTIVITY_TIMEOUT)) + canHasPeer = pa->mid; + else return false; } - const uint64_t now = RR->node->now(); - if ((now - mostRecentTimestamp) < ZT_PEER_ACTIVITY_TIMEOUT) { - Buffer<2048> buf; - - if (unite) { - InetAddress v4,v6; - if (fromPeerAddress) { - SharedPtr fromPeer(RR->topology->getPeer(fromPeerAddress)); - if (fromPeer) - fromPeer->getBestActiveAddresses(now,v4,v6); - } - uint8_t addrCount = 0; + Buffer<2048> buf; + if (unite) { + InetAddress v4,v6; + if (fromPeerAddress) { + SharedPtr fromPeer(RR->topology->getPeer(fromPeerAddress)); + if (fromPeer) + fromPeer->getBestActiveAddresses(now,v4,v6); + } + uint8_t addrCount = 0; + if (v4) + ++addrCount; + if (v6) + ++addrCount; + if (addrCount) { + toPeerAddress.appendTo(buf); + fromPeerAddress.appendTo(buf); + buf.append(addrCount); if (v4) - ++addrCount; + v4.serialize(buf); if (v6) - ++addrCount; - if (addrCount) { - toPeerAddress.appendTo(buf); - fromPeerAddress.appendTo(buf); - buf.append(addrCount); - if (v4) - v4.serialize(buf); - if (v6) - v6.serialize(buf); - } + v6.serialize(buf); } + } + { + Mutex::Lock _l2(_members[canHasPeer].lock); + if (buf.size() > 0) + _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); + if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0) + RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len); + } - { - Mutex::Lock _l2(_members[canHasPeer].lock); - if (buf.size() > 0) - _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); - if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0) - RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len); - } + TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); - TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); - return true; - } else { - TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); - return false; - } + return true; } void Cluster::replicateHavePeer(const Identity &peerId) { + const uint64_t now = RR->node->now(); { // Use peer affinity table to track our own last announce time for peers - _PeerAffinity pa(peerId.address(),_id,RR->node->now()); Mutex::Lock _l2(_peerAffinities_m); - std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n)) - if ((i != _peerAffinities.end())&&(i->key == pa.key)) { - if ((pa.timestamp - i->timestamp) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) { - i->timestamp = pa.timestamp; - // continue to announcement - } else { - // we've already announced this peer recently, so skip - return; - } + _PA &pa = _peerAffinities[peerId.address()]; + if (pa.mid != _id) { + pa.ts = now; + pa.mid = _id; + } else if ((now - pa.ts) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) { + return; } else { - _peerAffinities.push_back(pa); - std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now - // continue to announcement + pa.ts = now; } } @@ -598,6 +575,7 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr } } + // Suggestion redirection if a closer member was found for(std::vector::const_iterator a(best.begin());a!=best.end();++a) { if (a->ss_family == peerPhysicalAddress.ss_family) { TRACE("%s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestDistance,bestMember,a->toString().c_str()); @@ -661,10 +639,12 @@ void Cluster::status(ZT_ClusterStatus &status) const { Mutex::Lock _l2(_peerAffinities_m); - for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) { - unsigned int mid = pi->clusterMemberId(); - if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT)) - ++ms[mid]->peers; + Address *k = (Address *)0; + _PA *v = (_PA *)0; + Hashtable< Address,_PA >::Iterator i(const_cast(this)->_peerAffinities); + while (i.next(k,v)) { + if ( (ms[v->mid]) && (v->mid != _id) && ((now - v->ts) < ZT_PEER_ACTIVITY_TIMEOUT) ) + ++ms[v->mid]->peers; } } } diff --git a/node/Cluster.hpp b/node/Cluster.hpp index bab56785..42a26c7f 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -52,7 +52,7 @@ /** * How often should we announce that we have a peer? */ -#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD 60000 +#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD 30000 /** * Desired period between doPeriodicTasks() in milliseconds @@ -285,7 +285,7 @@ private: void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len); void _flush(uint16_t memberId); - // These are initialized in the constructor and remain static + // These are initialized in the constructor and remain immutable uint16_t _masterSecret[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; unsigned char _key[ZT_PEER_SECRET_KEY_LENGTH]; const RuntimeEnvironment *RR; @@ -298,6 +298,7 @@ private: const int32_t _z; const uint16_t _id; const std::vector _zeroTierPhysicalEndpoints; + // end immutable fields struct _Member { @@ -330,30 +331,18 @@ private: _Member() { this->clear(); } ~_Member() { Utils::burn(key,sizeof(key)); } }; - - _Member *const _members; // cluster IDs can be from 0 to 65535 (16-bit) + _Member *const _members; std::vector _memberIds; Mutex _memberIds_m; - // Record tracking which members have which peers and how recently they claimed this -- also used to track our last claimed time - struct _PeerAffinity + struct _PA { - _PeerAffinity(const Address &a,uint16_t mid,uint64_t ts) : - key((a.toInt() << 16) | (uint64_t)mid), - timestamp(ts) {} - - uint64_t key; - uint64_t timestamp; - - inline Address address() const throw() { return Address(key >> 16); } - inline uint16_t clusterMemberId() const throw() { return (uint16_t)(key & 0xffff); } - - inline bool operator<(const _PeerAffinity &pi) const throw() { return (key < pi.key); } + _PA() : ts(0),mid(0xffff) {} + uint64_t ts; + uint16_t mid; }; - - // A memory-efficient packed map of _PeerAffinity records searchable with std::binary_search() and std::lower_bound() - std::vector<_PeerAffinity> _peerAffinities; + Hashtable< Address,_PA > _peerAffinities; Mutex _peerAffinities_m; }; diff --git a/node/Peer.cpp b/node/Peer.cpp index e56c1eca..99e2156e 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -86,43 +86,41 @@ void Peer::received( // Note: findBetterEndpoint() is first since we still want to check // for a better endpoint even if we don't actually send a redirect. if ( (RR->cluster->findBetterEndpoint(redirectTo,_id.address(),remoteAddr,false)) && (verb != Packet::VERB_OK)&&(verb != Packet::VERB_ERROR)&&(verb != Packet::VERB_RENDEZVOUS)&&(verb != Packet::VERB_PUSH_DIRECT_PATHS) ) { - if ((redirectTo.ss_family == AF_INET)||(redirectTo.ss_family == AF_INET6)) { - if (_vProto >= 5) { - // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS. - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); - outp.append((uint16_t)1); // count == 1 - outp.append((uint8_t)0); // no flags - outp.append((uint16_t)0); // no extensions - if (redirectTo.ss_family == AF_INET) { - outp.append((uint8_t)4); - outp.append((uint8_t)6); - outp.append(redirectTo.rawIpData(),4); - } else { - outp.append((uint8_t)6); - outp.append((uint8_t)18); - outp.append(redirectTo.rawIpData(),16); - } - outp.append((uint16_t)redirectTo.port()); - outp.armor(_key,true); - RR->antiRec->logOutgoingZT(outp.data(),outp.size()); - RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); + if (_vProto >= 5) { + // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS. + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); + outp.append((uint16_t)1); // count == 1 + outp.append((uint8_t)0); // no flags + outp.append((uint16_t)0); // no extensions + if (redirectTo.ss_family == AF_INET) { + outp.append((uint8_t)4); + outp.append((uint8_t)6); + outp.append(redirectTo.rawIpData(),4); } else { - // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere. - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); - outp.append((uint8_t)0); // no flags - RR->identity.address().appendTo(outp); - outp.append((uint16_t)redirectTo.port()); - if (redirectTo.ss_family == AF_INET) { - outp.append((uint8_t)4); - outp.append(redirectTo.rawIpData(),4); - } else { - outp.append((uint8_t)16); - outp.append(redirectTo.rawIpData(),16); - } - outp.armor(_key,true); - RR->antiRec->logOutgoingZT(outp.data(),outp.size()); - RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); + outp.append((uint8_t)6); + outp.append((uint8_t)18); + outp.append(redirectTo.rawIpData(),16); } + outp.append((uint16_t)redirectTo.port()); + outp.armor(_key,true); + RR->antiRec->logOutgoingZT(outp.data(),outp.size()); + RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); + } else { + // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere. + Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); + outp.append((uint8_t)0); // no flags + RR->identity.address().appendTo(outp); + outp.append((uint16_t)redirectTo.port()); + if (redirectTo.ss_family == AF_INET) { + outp.append((uint8_t)4); + outp.append(redirectTo.rawIpData(),4); + } else { + outp.append((uint8_t)16); + outp.append(redirectTo.rawIpData(),16); + } + outp.armor(_key,true); + RR->antiRec->logOutgoingZT(outp.data(),outp.size()); + RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); } } } @@ -167,7 +165,7 @@ void Peer::received( } if (!pathIsConfirmed) { - if (verb == Packet::VERB_OK) { + if ((verb == Packet::VERB_OK)||(RR->topology->amRoot())) { Path *slot = (Path *)0; if (np < ZT_MAX_PEER_NETWORK_PATHS) { -- cgit v1.2.3 From cc1b275ad97bf186f21b487aa57d7893bee3c956 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 27 Oct 2015 16:47:13 -0700 Subject: Replicate peer endpoints and forget paths if we have them -- this allows two clusters to talk to each other, whereas forgetting all paths does not. --- node/Cluster.cpp | 45 +++++++++++++++++++++++++++------------------ node/Cluster.hpp | 7 ++++++- node/Constants.hpp | 4 ++-- node/Peer.cpp | 11 ++++------- node/Peer.hpp | 19 +++++++++++++++++++ 5 files changed, 58 insertions(+), 28 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.cpp b/node/Cluster.cpp index b2f3d585..0797d83d 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -210,22 +210,30 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) } break; case STATE_MESSAGE_HAVE_PEER: { - try { - Identity id; - ptr += id.deserialize(dmsg,ptr); - if (id) { - RR->topology->saveIdentity(id); - { - Mutex::Lock _l2(_peerAffinities_m); - _PA &pa = _peerAffinities[id.address()]; - pa.ts = RR->node->now(); - pa.mid = fromMemberId; - } - TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str()); - } - } catch ( ... ) { - // ignore invalid identities - } + Identity id; + InetAddress physicalAddress; + ptr += id.deserialize(dmsg,ptr); + ptr += physicalAddress.deserialize(dmsg,ptr); + if (id) { + // Forget any paths that we have to this peer at its address + if (physicalAddress) { + SharedPtr myPeerRecord(RR->topology->getPeer(id.address())); + if (myPeerRecord) + myPeerRecord->removePathByAddress(physicalAddress); + } + + // Always save identity to update file time + RR->topology->saveIdentity(id); + + // Set peer affinity to its new home + { + Mutex::Lock _l2(_peerAffinities_m); + _PA &pa = _peerAffinities[id.address()]; + pa.ts = RR->node->now(); + pa.mid = fromMemberId; + } + TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str()); + } } break; case STATE_MESSAGE_MULTICAST_LIKE: { @@ -396,7 +404,7 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee return true; } -void Cluster::replicateHavePeer(const Identity &peerId) +void Cluster::replicateHavePeer(const Identity &peerId,const InetAddress &physicalAddress) { const uint64_t now = RR->node->now(); { // Use peer affinity table to track our own last announce time for peers @@ -405,7 +413,7 @@ void Cluster::replicateHavePeer(const Identity &peerId) if (pa.mid != _id) { pa.ts = now; pa.mid = _id; - } else if ((now - pa.ts) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) { + } else if ((now - pa.ts) < ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) { return; } else { pa.ts = now; @@ -415,6 +423,7 @@ void Cluster::replicateHavePeer(const Identity &peerId) // announcement Buffer<4096> buf; peerId.serialize(buf,false); + physicalAddress.serialize(buf); { Mutex::Lock _l(_memberIds_m); for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 42a26c7f..c3367d57 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -117,6 +117,10 @@ public: /** * Cluster member has this peer: * <[...] binary serialized peer identity> + * <[...] binary serialized peer remote physical address> + * + * Clusters send this message when they learn a path to a peer. The + * replicated physical address is the one learned. */ STATE_MESSAGE_HAVE_PEER = 2, @@ -225,8 +229,9 @@ public: * Advertise to the cluster that we have this peer * * @param peerId Identity of peer that we have + * @param physicalAddress Physical address of peer (from our POV) */ - void replicateHavePeer(const Identity &peerId); + void replicateHavePeer(const Identity &peerId,const InetAddress &physicalAddress); /** * Advertise a multicast LIKE to the cluster diff --git a/node/Constants.hpp b/node/Constants.hpp index bef1183a..4b06db44 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -317,7 +317,7 @@ /** * Minimum delay between attempts to confirm new paths to peers (to avoid HELLO flooding) */ -#define ZT_MIN_PATH_CONFIRMATION_INTERVAL 5000 +#define ZT_MIN_PATH_CONFIRMATION_INTERVAL 1000 /** * Interval between direct path pushes in milliseconds @@ -350,7 +350,7 @@ /** * Maximum number of endpoints to contact per address type (to limit pushes like GitHub issue #235) */ -#define ZT_PUSH_DIRECT_PATHS_MAX_ENDPOINTS_PER_TYPE 2 +#define ZT_PUSH_DIRECT_PATHS_MAX_ENDPOINTS_PER_TYPE 4 /** * A test pseudo-network-ID that can be joined diff --git a/node/Peer.cpp b/node/Peer.cpp index 99e2156e..99eb32c7 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -140,13 +140,10 @@ void Peer::received( _lastMulticastFrame = now; #ifdef ZT_ENABLE_CLUSTER - // If we're in cluster mode and there's a better endpoint, stop here and don't - // learn or confirm paths. Also reset any existing paths, since they should - // go there and no longer talk to us here. - if (redirectTo) { - _numPaths = 0; + // If we think this peer belongs elsewhere, don't learn this path or + // do other connection init stuff. + if (redirectTo) return; - } #endif if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) { @@ -206,7 +203,7 @@ void Peer::received( #ifdef ZT_ENABLE_CLUSTER if ((RR->cluster)&&(pathIsConfirmed)) - RR->cluster->replicateHavePeer(_id); + RR->cluster->replicateHavePeer(_id,remoteAddr); #endif if (needMulticastGroupAnnounce) { diff --git a/node/Peer.hpp b/node/Peer.hpp index aa75b3f4..69343f20 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -412,6 +412,25 @@ public: */ void clean(const RuntimeEnvironment *RR,uint64_t now); + /** + * Remove all paths with this remote address + * + * @param addr Remote address to remove + */ + inline void removePathByAddress(const InetAddress &addr) + { + Mutex::Lock _l(_lock); + unsigned int np = _numPaths; + unsigned int x = 0; + unsigned int y = 0; + while (x < np) { + if (_paths[x].address() != addr) + _paths[y++] = _paths[x]; + ++x; + } + _numPaths = y; + } + /** * Find a common set of addresses by which two peers can link, if any * -- cgit v1.2.3 From 51fcc753549e4f7c18efb889a841c4dd4fb9e6cf Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 27 Oct 2015 17:36:47 -0700 Subject: Some cleanup, and use getPeerNoCache() exclusively in Cluster. --- node/Cluster.cpp | 42 ++++++++++++++++++++++++++++++------------ node/Cluster.hpp | 3 ++- node/Path.hpp | 8 ++++++++ 3 files changed, 40 insertions(+), 13 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.cpp b/node/Cluster.cpp index 0535f9ee..9b034822 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -83,7 +83,8 @@ Cluster::Cluster( _id(id), _zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints), _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]), - _peerAffinities(65536) + _peerAffinities(65536), + _lastCleanedPeerAffinities(0) { uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; @@ -247,11 +248,13 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) } break; case STATE_MESSAGE_COM: { + /* not currently used so not decoded yet CertificateOfMembership com; ptr += com.deserialize(dmsg,ptr); if (com) { TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision()); } + */ } break; case STATE_MESSAGE_PROXY_UNITE: { @@ -262,12 +265,13 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) for(unsigned int i=0;i localPeer(RR->topology->getPeer(localPeerAddress)); + const uint64_t now = RR->node->now(); + SharedPtr localPeer(RR->topology->getPeerNoCache(localPeerAddress,now)); if ((localPeer)&&(numRemotePeerPaths > 0)) { InetAddress bestLocalV4,bestLocalV6; - localPeer->getBestActiveAddresses(RR->node->now(),bestLocalV4,bestLocalV6); + localPeer->getBestActiveAddresses(now,bestLocalV4,bestLocalV6); InetAddress bestRemoteV4,bestRemoteV6; for(unsigned int i=0;i buf; + Buffer<1024> buf; if (unite) { InetAddress v4,v6; if (fromPeerAddress) { - SharedPtr fromPeer(RR->topology->getPeer(fromPeerAddress)); + SharedPtr fromPeer(RR->topology->getPeerNoCache(fromPeerAddress,now)); if (fromPeer) fromPeer->getBestActiveAddresses(now,v4,v6); } @@ -408,7 +412,7 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee void Cluster::replicateHavePeer(const Identity &peerId,const InetAddress &physicalAddress) { const uint64_t now = RR->node->now(); - { // Use peer affinity table to track our own last announce time for peers + { Mutex::Lock _l2(_peerAffinities_m); _PA &pa = _peerAffinities[peerId.address()]; if (pa.mid != _id) { @@ -436,7 +440,7 @@ void Cluster::replicateHavePeer(const Identity &peerId,const InetAddress &physic void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group) { - Buffer<2048> buf; + Buffer<1024> buf; buf.append((uint64_t)nwid); peerAddress.appendTo(buf); group.mac().appendTo(buf); @@ -453,7 +457,7 @@ void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,co void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com) { - Buffer<2048> buf; + Buffer<4096> buf; com.serialize(buf); TRACE("replicating %s COM for %.16llx to all members",com.issuedTo().toString().c_str(),com.networkId()); { @@ -502,6 +506,20 @@ void Cluster::doPeriodicTasks() _flush(*mid); // does nothing if nothing to flush } } + + { + if ((now - _lastCleanedPeerAffinities) >= (ZT_PEER_ACTIVITY_TIMEOUT * 10)) { + _lastCleanedPeerAffinities = now; + Address *k = (Address *)0; + _PA *v = (_PA *)0; + Mutex::Lock _l(_peerAffinities_m); + Hashtable< Address,_PA >::Iterator i(_peerAffinities); + while (i.next(k,v)) { + if ((now - v->ts) >= (ZT_PEER_ACTIVITY_TIMEOUT * 10)) + _peerAffinities.erase(*k); + } + } + } } void Cluster::addMember(uint16_t memberId) @@ -563,7 +581,7 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr // Find member closest to this peer const uint64_t now = RR->node->now(); - std::vector best; // initial "best" is for peer to stay put + std::vector best; const double currentDistance = _dist3d(_x,_y,_z,px,py,pz); double bestDistance = (offload ? 2147483648.0 : currentDistance); unsigned int bestMember = _id; @@ -575,7 +593,7 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr // Consider member if it's alive and has sent us a location and one or more physical endpoints to send peers to if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) { - double mdist = _dist3d(m.x,m.y,m.z,px,py,pz); + const double mdist = _dist3d(m.x,m.y,m.z,px,py,pz); if (mdist < bestDistance) { bestDistance = mdist; bestMember = *mid; @@ -585,7 +603,7 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr } } - // Suggestion redirection if a closer member was found + // Redirect to a closer member if it has a ZeroTier endpoint address in the same ss_family for(std::vector::const_iterator a(best.begin());a!=best.end();++a) { if (a->ss_family == peerPhysicalAddress.ss_family) { TRACE("%s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestDistance,bestMember,a->toString().c_str()); diff --git a/node/Cluster.hpp b/node/Cluster.hpp index c3367d57..cc9edd1d 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -47,7 +47,7 @@ /** * Timeout for cluster members being considered "alive" */ -#define ZT_CLUSTER_TIMEOUT 10000 +#define ZT_CLUSTER_TIMEOUT 20000 /** * How often should we announce that we have a peer? @@ -349,6 +349,7 @@ private: }; Hashtable< Address,_PA > _peerAffinities; Mutex _peerAffinities_m; + uint64_t _lastCleanedPeerAffinities; }; } // namespace ZeroTier diff --git a/node/Path.hpp b/node/Path.hpp index 99f6590b..2b05b812 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -95,6 +95,14 @@ public: { } + inline Path &operator=(const Path &p) + throw() + { + if (this != &p) + memcpy(this,&p,sizeof(Path)); + return *this; + } + /** * Called when a packet is sent to this remote path * -- cgit v1.2.3 From 883c84bdb95b0374e4f4ea2238b2288787547897 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 29 Oct 2015 09:39:36 -0700 Subject: Tweak some timings, and remove some dead code. --- node/Cluster.hpp | 2 +- node/Peer.hpp | 31 ------------------------------- 2 files changed, 1 insertion(+), 32 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.hpp b/node/Cluster.hpp index cc9edd1d..0d8c0f15 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -57,7 +57,7 @@ /** * Desired period between doPeriodicTasks() in milliseconds */ -#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 250 +#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100 namespace ZeroTier { diff --git a/node/Peer.hpp b/node/Peer.hpp index 39acffd9..e5db3bde 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -205,32 +205,6 @@ public: return pp; } - /** - * @return Time of last direct packet receive for any path - */ - inline uint64_t lastDirectReceive() const - throw() - { - Mutex::Lock _l(_lock); - uint64_t x = 0; - for(unsigned int p=0,np=_numPaths;p Date: Thu, 29 Oct 2015 10:10:09 -0700 Subject: Periodically re-announce peers that we have. --- node/Cluster.cpp | 53 ++++++++++++++++++++++++++++++++++++++--------------- node/Cluster.hpp | 11 ++++++++--- 2 files changed, 46 insertions(+), 18 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.cpp b/node/Cluster.cpp index e95f6acc..93b69a08 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -84,7 +84,8 @@ Cluster::Cluster( _zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints), _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]), _peerAffinities(65536), - _lastCleanedPeerAffinities(0) + _lastCleanedPeerAffinities(0), + _lastCheckedPeersForAnnounce(0) { uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; @@ -328,6 +329,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) if (haveMatch) { _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); + _flush(fromMemberId); // we want this to go ASAP, since with port restricted cone NATs success can be timing-sensitive RR->sw->send(rendezvousForLocal,true,0); } } @@ -469,10 +471,45 @@ void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembers } } +struct _ClusterAnnouncePeers +{ + _ClusterAnnouncePeers(const uint64_t now_,Cluster *parent_) : now(now_),parent(parent_) {} + const uint64_t now; + Cluster *const parent; + inline void operator()(const Topology &t,const SharedPtr &peer) + { + Path *p = peer->getBestPath(now); + if (p) + parent->replicateHavePeer(peer->identity(),p->address()); + } +}; void Cluster::doPeriodicTasks() { const uint64_t now = RR->node->now(); + // Erase old peer affinity entries just to control table size + if ((now - _lastCleanedPeerAffinities) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) { + _lastCleanedPeerAffinities = now; + Address *k = (Address *)0; + _PA *v = (_PA *)0; + Mutex::Lock _l(_peerAffinities_m); + Hashtable< Address,_PA >::Iterator i(_peerAffinities); + while (i.next(k,v)) { + if ((now - v->ts) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) + _peerAffinities.erase(*k); + } + } + + // Announce peers that we have active direct paths to -- note that we forget paths + // that other cluster members claim they have, which prevents us from fighting + // with other cluster members (route flapping) over specific paths. + if ((now - _lastCheckedPeersForAnnounce) >= (ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD / 4)) { + _lastCheckedPeersForAnnounce = now; + _ClusterAnnouncePeers func(now,this); + RR->topology->eachPeer<_ClusterAnnouncePeers &>(func); + } + + // Flush outgoing packet send queue every doPeriodicTasks() { Mutex::Lock _l(_memberIds_m); for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -506,20 +543,6 @@ void Cluster::doPeriodicTasks() _flush(*mid); // does nothing if nothing to flush } } - - { - if ((now - _lastCleanedPeerAffinities) >= (ZT_PEER_ACTIVITY_TIMEOUT * 10)) { - _lastCleanedPeerAffinities = now; - Address *k = (Address *)0; - _PA *v = (_PA *)0; - Mutex::Lock _l(_peerAffinities_m); - Hashtable< Address,_PA >::Iterator i(_peerAffinities); - while (i.next(k,v)) { - if ((now - v->ts) >= (ZT_PEER_ACTIVITY_TIMEOUT * 10)) - _peerAffinities.erase(*k); - } - } - } } void Cluster::addMember(uint16_t memberId) diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 0d8c0f15..7d7a1ced 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -46,18 +46,21 @@ /** * Timeout for cluster members being considered "alive" + * + * A cluster member is considered dead and will no longer have peers + * redirected to it if we have not heard a heartbeat in this long. */ -#define ZT_CLUSTER_TIMEOUT 20000 +#define ZT_CLUSTER_TIMEOUT 10000 /** * How often should we announce that we have a peer? */ -#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD 30000 +#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD ((ZT_PEER_ACTIVITY_TIMEOUT / 2) - 1000) /** * Desired period between doPeriodicTasks() in milliseconds */ -#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100 +#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 250 namespace ZeroTier { @@ -349,7 +352,9 @@ private: }; Hashtable< Address,_PA > _peerAffinities; Mutex _peerAffinities_m; + uint64_t _lastCleanedPeerAffinities; + uint64_t _lastCheckedPeersForAnnounce; }; } // namespace ZeroTier -- cgit v1.2.3 From 60ce886605c0298fc22dbce48beb106a96bd35e2 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Mon, 2 Nov 2015 15:15:20 -0800 Subject: Tweak some timings for better reliability. --- node/Cluster.cpp | 6 +- node/Cluster.hpp | 8 +- node/Constants.hpp | 24 +---- node/Multicaster.cpp | 220 ++++++++++++++++++++++--------------------- node/Node.cpp | 13 +-- tests/http/big-test-kill.sh | 2 +- tests/http/big-test-ready.sh | 2 +- tests/http/big-test-start.sh | 4 +- 8 files changed, 131 insertions(+), 148 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.cpp b/node/Cluster.cpp index d0daae43..e9e31ede 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -85,7 +85,8 @@ Cluster::Cluster( _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]), _peerAffinities(65536), _lastCleanedPeerAffinities(0), - _lastCheckedPeersForAnnounce(0) + _lastCheckedPeersForAnnounce(0), + _lastFlushed(0) { uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; @@ -510,7 +511,8 @@ void Cluster::doPeriodicTasks() } // Flush outgoing packet send queue every doPeriodicTasks() - { + if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) { + _lastFlushed = now; Mutex::Lock _l(_memberIds_m); for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { Mutex::Lock _l2(_members[*mid].lock); diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 7d7a1ced..f1caa436 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -55,13 +55,18 @@ /** * How often should we announce that we have a peer? */ -#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD ((ZT_PEER_ACTIVITY_TIMEOUT / 2) - 1000) +#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD (ZT_PEER_DIRECT_PING_DELAY / 2) /** * Desired period between doPeriodicTasks() in milliseconds */ #define ZT_CLUSTER_PERIODIC_TASK_PERIOD 250 +/** + * How often to flush outgoing message queues (maximum interval) + */ +#define ZT_CLUSTER_FLUSH_PERIOD 500 + namespace ZeroTier { class RuntimeEnvironment; @@ -355,6 +360,7 @@ private: uint64_t _lastCleanedPeerAffinities; uint64_t _lastCheckedPeersForAnnounce; + uint64_t _lastFlushed; }; } // namespace ZeroTier diff --git a/node/Constants.hpp b/node/Constants.hpp index 1d5fa6f4..bb62484d 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -173,13 +173,8 @@ /** * Timeout for receipt of fragmented packets in ms - * - * Since there's no retransmits, this is just a really bad case scenario for - * transit time. It's short enough that a DOS attack from exhausing buffers is - * very unlikely, as the transfer rate would have to be fast enough to fill - * system memory in this time. */ -#define ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT 1000 +#define ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT 500 /** * Length of secret key in bytes -- 256-bit -- do not change @@ -194,7 +189,7 @@ /** * Overriding granularity for timer tasks to prevent CPU-intensive thrashing on every packet */ -#define ZT_CORE_TIMER_TASK_GRANULARITY 1000 +#define ZT_CORE_TIMER_TASK_GRANULARITY 500 /** * How long to remember peer records in RAM if they haven't been used @@ -269,7 +264,7 @@ /** * Delay between ordinary case pings of direct links */ -#define ZT_PEER_DIRECT_PING_DELAY 120000 +#define ZT_PEER_DIRECT_PING_DELAY 60000 /** * Delay between requests for updated network autoconf information @@ -279,18 +274,7 @@ /** * Timeout for overall peer activity (measured from last receive) */ -#define ZT_PEER_ACTIVITY_TIMEOUT (ZT_PEER_DIRECT_PING_DELAY + (ZT_PING_CHECK_INVERVAL * 3)) - -/** - * Stop relaying via peers that have not responded to direct sends - * - * When we send something (including frames), we generally expect a response. - * Switching relays if no response in a short period of time causes more - * rapid failover if a root server goes down or becomes unreachable. In the - * mistaken case, little harm is done as it'll pick the next-fastest - * root server and will switch back eventually. - */ -#define ZT_PEER_RELAY_CONVERSATION_LATENCY_THRESHOLD 10000 +#define ZT_PEER_ACTIVITY_TIMEOUT ((ZT_PEER_DIRECT_PING_DELAY * 3) + (ZT_PING_CHECK_INVERVAL * 2)) /** * Minimum interval between attempts by relays to unite peers diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp index e43d7d88..01e6b799 100644 --- a/node/Multicaster.cpp +++ b/node/Multicaster.cpp @@ -175,128 +175,130 @@ void Multicaster::send( unsigned long idxbuf[8194]; unsigned long *indexes = idxbuf; - Mutex::Lock _l(_groups_m); - MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)]; - - if (!gs.members.empty()) { - // Allocate a memory buffer if group is monstrous - if (gs.members.size() > (sizeof(idxbuf) / sizeof(unsigned long))) - indexes = new unsigned long[gs.members.size()]; - - // Generate a random permutation of member indexes - for(unsigned long i=0;i0;--i) { - unsigned long j = (unsigned long)RR->node->prng() % (i + 1); - unsigned long tmp = indexes[j]; - indexes[j] = indexes[i]; - indexes[i] = tmp; + try { + Mutex::Lock _l(_groups_m); + MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)]; + + if (!gs.members.empty()) { + // Allocate a memory buffer if group is monstrous + if (gs.members.size() > (sizeof(idxbuf) / sizeof(unsigned long))) + indexes = new unsigned long[gs.members.size()]; + + // Generate a random permutation of member indexes + for(unsigned long i=0;i0;--i) { + unsigned long j = (unsigned long)RR->node->prng() % (i + 1); + unsigned long tmp = indexes[j]; + indexes[j] = indexes[i]; + indexes[i] = tmp; + } } - } - if (gs.members.size() >= limit) { - // Skip queue if we already have enough members to complete the send operation - OutboundMulticast out; - - out.init( - RR, - now, - nwid, - com, - limit, - 1, // we'll still gather a little from peers to keep multicast list fresh - src, - mg, - etherType, - data, - len); - - unsigned int count = 0; - - for(std::vector
::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { - if (*ast != RR->identity.address()) { - out.sendOnly(RR,*ast); - if (++count >= limit) - break; + if (gs.members.size() >= limit) { + // Skip queue if we already have enough members to complete the send operation + OutboundMulticast out; + + out.init( + RR, + now, + nwid, + com, + limit, + 1, // we'll still gather a little from peers to keep multicast list fresh + src, + mg, + etherType, + data, + len); + + unsigned int count = 0; + + for(std::vector
::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { + if (*ast != RR->identity.address()) { + out.sendOnly(RR,*ast); // optimization: don't use dedup log if it's a one-pass send + if (++count >= limit) + break; + } } - } - unsigned long idx = 0; - while ((count < limit)&&(idx < gs.members.size())) { - Address ma(gs.members[indexes[idx++]].address); - if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) { - out.sendOnly(RR,ma); - ++count; + unsigned long idx = 0; + while ((count < limit)&&(idx < gs.members.size())) { + Address ma(gs.members[indexes[idx++]].address); + if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) { + out.sendOnly(RR,ma); // optimization: don't use dedup log if it's a one-pass send + ++count; + } } - } - } else { - unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1; - - if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) { - gs.lastExplicitGather = now; - SharedPtr r(RR->topology->getBestRoot()); - if (r) { - TRACE(">>MC upstream GATHER up to %u for group %.16llx/%s",gatherLimit,nwid,mg.toString().c_str()); - - const CertificateOfMembership *com = (CertificateOfMembership *)0; - { - SharedPtr nw(RR->node->network(nwid)); - if (nw) { - SharedPtr nconf(nw->config2()); - if ((nconf)&&(nconf->com())&&(nconf->isPrivate())&&(r->needsOurNetworkMembershipCertificate(nwid,now,true))) - com = &(nconf->com()); + } else { + unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1; + + if ((gs.members.empty())||((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY)) { + gs.lastExplicitGather = now; + SharedPtr r(RR->topology->getBestRoot()); + if (r) { + TRACE(">>MC upstream GATHER up to %u for group %.16llx/%s",gatherLimit,nwid,mg.toString().c_str()); + + const CertificateOfMembership *com = (CertificateOfMembership *)0; + { + SharedPtr nw(RR->node->network(nwid)); + if (nw) { + SharedPtr nconf(nw->config2()); + if ((nconf)&&(nconf->com())&&(nconf->isPrivate())&&(r->needsOurNetworkMembershipCertificate(nwid,now,true))) + com = &(nconf->com()); + } } - } - Packet outp(r->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER); - outp.append(nwid); - outp.append((uint8_t)(com ? 0x01 : 0x00)); - mg.mac().appendTo(outp); - outp.append((uint32_t)mg.adi()); - outp.append((uint32_t)gatherLimit); - if (com) - com->serialize(outp); - outp.armor(r->key(),true); - r->send(RR,outp.data(),outp.size(),now); + Packet outp(r->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER); + outp.append(nwid); + outp.append((uint8_t)(com ? 0x01 : 0x00)); + mg.mac().appendTo(outp); + outp.append((uint32_t)mg.adi()); + outp.append((uint32_t)gatherLimit); + if (com) + com->serialize(outp); + outp.armor(r->key(),true); + r->send(RR,outp.data(),outp.size(),now); + } + gatherLimit = 0; } - gatherLimit = 0; - } - gs.txQueue.push_back(OutboundMulticast()); - OutboundMulticast &out = gs.txQueue.back(); - - out.init( - RR, - now, - nwid, - com, - limit, - gatherLimit, - src, - mg, - etherType, - data, - len); - - unsigned int count = 0; - - for(std::vector
::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { - if (*ast != RR->identity.address()) { - out.sendAndLog(RR,*ast); - if (++count >= limit) - break; + gs.txQueue.push_back(OutboundMulticast()); + OutboundMulticast &out = gs.txQueue.back(); + + out.init( + RR, + now, + nwid, + com, + limit, + gatherLimit, + src, + mg, + etherType, + data, + len); + + unsigned int count = 0; + + for(std::vector
::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { + if (*ast != RR->identity.address()) { + out.sendAndLog(RR,*ast); + if (++count >= limit) + break; + } } - } - unsigned long idx = 0; - while ((count < limit)&&(idx < gs.members.size())) { - Address ma(gs.members[indexes[idx++]].address); - if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) { - out.sendAndLog(RR,ma); - ++count; + unsigned long idx = 0; + while ((count < limit)&&(idx < gs.members.size())) { + Address ma(gs.members[indexes[idx++]].address); + if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) { + out.sendAndLog(RR,ma); + ++count; + } } } - } + } catch ( ... ) {} // this is a sanity check to catch any failures and make sure indexes[] still gets deleted // Free allocated memory buffer if any if (indexes != idxbuf) diff --git a/node/Node.cpp b/node/Node.cpp index 42180e99..74acc869 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -305,18 +305,7 @@ ZT_ResultCode Node::processBackgroundTasks(uint64_t now,volatile uint64_t *nextB for(std::vector< SharedPtr >::const_iterator n(needConfig.begin());n!=needConfig.end();++n) (*n)->requestConfiguration(); - // Attempt to contact network preferred relays that we don't have direct links to - std::sort(networkRelays.begin(),networkRelays.end()); - networkRelays.erase(std::unique(networkRelays.begin(),networkRelays.end()),networkRelays.end()); - for(std::vector< std::pair >::const_iterator nr(networkRelays.begin());nr!=networkRelays.end();++nr) { - if (nr->second) { - SharedPtr rp(RR->topology->getPeer(nr->first)); - if ((rp)&&(!rp->hasActiveDirectPath(now))) - rp->attemptToContactAt(RR,InetAddress(),nr->second,now); - } - } - - // Ping living or root server/relay peers + // Do pings and keepalives _PingPeersThatNeedPing pfunc(RR,now,networkRelays); RR->topology->eachPeer<_PingPeersThatNeedPing &>(pfunc); diff --git a/tests/http/big-test-kill.sh b/tests/http/big-test-kill.sh index 4a764d1f..59f36788 100755 --- a/tests/http/big-test-kill.sh +++ b/tests/http/big-test-kill.sh @@ -13,6 +13,6 @@ CONTAINER_IMAGE=zerotier/http-test export PATH=/bin:/usr/bin:/usr/local/bin:/usr/sbin:/sbin -pssh -h big-test-hosts -i -t 128 -p 256 "docker ps -aq | xargs -r docker rm -f" +pssh -h big-test-hosts -i -t 0 -p 256 "docker ps -aq | xargs -r docker rm -f" exit 0 diff --git a/tests/http/big-test-ready.sh b/tests/http/big-test-ready.sh index 391ca2a1..aa540bba 100755 --- a/tests/http/big-test-ready.sh +++ b/tests/http/big-test-ready.sh @@ -25,6 +25,6 @@ export PATH=/bin:/usr/bin:/usr/local/bin:/usr/sbin:/sbin # docker run --device=/dev/net/tun --privileged -d $CONTAINER_IMAGE #done -pssh -h big-test-hosts -i -t 128 -p 256 "docker pull $CONTAINER_IMAGE" +pssh -h big-test-hosts -i -t 0 -p 256 "docker pull $CONTAINER_IMAGE" exit 0 diff --git a/tests/http/big-test-start.sh b/tests/http/big-test-start.sh index a5e71ef1..43166c6e 100755 --- a/tests/http/big-test-start.sh +++ b/tests/http/big-test-start.sh @@ -1,7 +1,7 @@ #!/bin/bash # Edit as needed -- note that >1000 per host is likely problematic due to Linux kernel limits -NUM_CONTAINERS=100 +NUM_CONTAINERS=25 CONTAINER_IMAGE=zerotier/http-test # @@ -25,6 +25,6 @@ export PATH=/bin:/usr/bin:/usr/local/bin:/usr/sbin:/sbin # docker run --device=/dev/net/tun --privileged -d $CONTAINER_IMAGE #done -pssh -h big-test-hosts -i -t 128 -p 256 "for ((n=0;n<$NUM_CONTAINERS;n++)); do docker run --device=/dev/net/tun --privileged -d $CONTAINER_IMAGE; sleep 0.25; done" +pssh -h big-test-hosts -i -t 0 -p 256 "for ((n=0;n<$NUM_CONTAINERS;n++)); do docker run --device=/dev/net/tun --privileged -d $CONTAINER_IMAGE; sleep 0.25; done" exit 0 -- cgit v1.2.3 From 7fbe2f7adf3575f3a21fc1ab3a5a2a036e18e6e2 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Mon, 2 Nov 2015 15:38:53 -0800 Subject: Tweak some more timings for better reliability. --- node/Cluster.hpp | 2 +- node/Constants.hpp | 10 +++++----- node/Node.cpp | 2 +- node/Peer.hpp | 4 ++-- node/SelfAwareness.cpp | 2 +- node/Switch.cpp | 6 +++--- node/Topology.hpp | 9 ++++++--- tests/http/big-test-start.sh | 4 ++-- 8 files changed, 21 insertions(+), 18 deletions(-) (limited to 'node/Cluster.hpp') diff --git a/node/Cluster.hpp b/node/Cluster.hpp index f1caa436..ee220999 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -55,7 +55,7 @@ /** * How often should we announce that we have a peer? */ -#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD (ZT_PEER_DIRECT_PING_DELAY / 2) +#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD ZT_PEER_DIRECT_PING_DELAY /** * Desired period between doPeriodicTasks() in milliseconds diff --git a/node/Constants.hpp b/node/Constants.hpp index bb62484d..552688a6 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -267,14 +267,14 @@ #define ZT_PEER_DIRECT_PING_DELAY 60000 /** - * Delay between requests for updated network autoconf information + * Timeout for overall peer activity (measured from last receive) */ -#define ZT_NETWORK_AUTOCONF_DELAY 60000 +#define ZT_PEER_ACTIVITY_TIMEOUT ((ZT_PEER_DIRECT_PING_DELAY * 4) + ZT_PING_CHECK_INVERVAL) /** - * Timeout for overall peer activity (measured from last receive) + * Delay between requests for updated network autoconf information */ -#define ZT_PEER_ACTIVITY_TIMEOUT ((ZT_PEER_DIRECT_PING_DELAY * 3) + (ZT_PING_CHECK_INVERVAL * 2)) +#define ZT_NETWORK_AUTOCONF_DELAY 60000 /** * Minimum interval between attempts by relays to unite peers @@ -283,7 +283,7 @@ * a RENDEZVOUS message no more than this often. This instructs the peers * to attempt NAT-t and gives each the other's corresponding IP:port pair. */ -#define ZT_MIN_UNITE_INTERVAL 60000 +#define ZT_MIN_UNITE_INTERVAL 30000 /** * Delay between initial direct NAT-t packet and more aggressive techniques diff --git a/node/Node.cpp b/node/Node.cpp index 74acc869..82cb7ddb 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -263,7 +263,7 @@ public: } lastReceiveFromUpstream = std::max(p->lastReceive(),lastReceiveFromUpstream); - } else if (p->alive(_now)) { + } else if (p->activelyTransferringFrames(_now)) { // Normal nodes get their preferred link kept alive if the node has generated frame traffic recently p->doPingAndKeepalive(RR,_now,0); } diff --git a/node/Peer.hpp b/node/Peer.hpp index e5db3bde..ad4c6746 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -231,9 +231,9 @@ public: inline uint64_t lastAnnouncedTo() const throw() { return _lastAnnouncedTo; } /** - * @return True if peer has received an actual data frame within ZT_PEER_ACTIVITY_TIMEOUT milliseconds + * @return True if this peer is actively sending real network frames */ - inline uint64_t alive(uint64_t now) const throw() { return ((now - lastFrame()) < ZT_PEER_ACTIVITY_TIMEOUT); } + inline uint64_t activelyTransferringFrames(uint64_t now) const throw() { return ((now - lastFrame()) < ZT_PEER_ACTIVITY_TIMEOUT); } /** * @return Current latency or 0 if unknown (max: 65535) diff --git a/node/SelfAwareness.cpp b/node/SelfAwareness.cpp index d8eca071..ce75eb03 100644 --- a/node/SelfAwareness.cpp +++ b/node/SelfAwareness.cpp @@ -128,7 +128,7 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi // links to be re-established if possible, possibly using a root server or some // other relay. for(std::vector< SharedPtr >::const_iterator p(rset.peersReset.begin());p!=rset.peersReset.end();++p) { - if ((*p)->alive(now)) { + if ((*p)->activelyTransferringFrames(now)) { Packet outp((*p)->address(),RR->identity.address(),Packet::VERB_NOP); RR->sw->send(outp,true,0); } diff --git a/node/Switch.cpp b/node/Switch.cpp index 2f72f57a..120ce7a4 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -442,8 +442,8 @@ unsigned long Switch::doTimerTasks(uint64_t now) Mutex::Lock _l(_contactQueue_m); for(std::list::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) { if (now >= qi->fireAtTime) { - if ((!qi->peer->alive(now))||(qi->peer->hasActiveDirectPath(now))) { - // Cancel attempt if we've already connected or peer is no longer "alive" + if (qi->peer->hasActiveDirectPath(now)) { + // Cancel if connection has succeeded _contactQueue.erase(qi++); continue; } else { @@ -539,7 +539,7 @@ unsigned long Switch::doTimerTasks(uint64_t now) _LastUniteKey *k = (_LastUniteKey *)0; uint64_t *v = (uint64_t *)0; while (i.next(k,v)) { - if ((now - *v) >= (ZT_MIN_UNITE_INTERVAL * 16)) + if ((now - *v) >= (ZT_MIN_UNITE_INTERVAL * 8)) _lastUniteAttempt.erase(*k); } } diff --git a/node/Topology.hpp b/node/Topology.hpp index 4c1a2ab3..a0c28b0f 100644 --- a/node/Topology.hpp +++ b/node/Topology.hpp @@ -81,6 +81,11 @@ public: /** * Get a peer only if it is presently in memory (no disk cache) * + * This also does not update the lastUsed() time for peers, which means + * that it won't prevent them from falling out of RAM. This is currently + * used in the Cluster code to update peer info without forcing all peers + * across the entire cluster to remain in memory cache. + * * @param zta ZeroTier address * @param now Current time */ @@ -88,10 +93,8 @@ public: { Mutex::Lock _l(_lock); const SharedPtr *const ap = _peers.get(zta); - if (ap) { - (*ap)->use(now); + if (ap) return *ap; - } return SharedPtr(); } diff --git a/tests/http/big-test-start.sh b/tests/http/big-test-start.sh index 43166c6e..f300ac61 100755 --- a/tests/http/big-test-start.sh +++ b/tests/http/big-test-start.sh @@ -1,7 +1,7 @@ #!/bin/bash # Edit as needed -- note that >1000 per host is likely problematic due to Linux kernel limits -NUM_CONTAINERS=25 +NUM_CONTAINERS=50 CONTAINER_IMAGE=zerotier/http-test # @@ -25,6 +25,6 @@ export PATH=/bin:/usr/bin:/usr/local/bin:/usr/sbin:/sbin # docker run --device=/dev/net/tun --privileged -d $CONTAINER_IMAGE #done -pssh -h big-test-hosts -i -t 0 -p 256 "for ((n=0;n<$NUM_CONTAINERS;n++)); do docker run --device=/dev/net/tun --privileged -d $CONTAINER_IMAGE; sleep 0.25; done" +pssh -h big-test-hosts -o big-test-out -t 0 -p 256 "for ((n=0;n<$NUM_CONTAINERS;n++)); do docker run --device=/dev/net/tun --privileged -d $CONTAINER_IMAGE; sleep 0.25; done" exit 0 -- cgit v1.2.3