From 5dbebc513ad67bb65c41c42a9c761c085309564f Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 1 Feb 2017 12:00:25 -0800 Subject: Minor send path refactor to make packet I/O work on clusters if they are members of networks. Also fix a crash if compiled in cluster mode but no cluster is enabled. --- node/Switch.cpp | 126 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 82 insertions(+), 44 deletions(-) (limited to 'node/Switch.cpp') diff --git a/node/Switch.cpp b/node/Switch.cpp index f935b7aa..6df84101 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -117,7 +117,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from if ((!relayTo)||(!relayTo->sendDirect(fragment.data(),fragment.size(),now,false))) { #ifdef ZT_ENABLE_CLUSTER if (RR->cluster) { - RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false); + RR->cluster->relayViaCluster(Address(),destination,fragment.data(),fragment.size(),false); return; } #endif @@ -204,7 +204,6 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from //TRACE("<< %.16llx %s -> %s (size: %u)",(unsigned long long)packet->packetId(),source.toString().c_str(),destination.toString().c_str(),packet->size()); - if (destination != RR->identity.address()) { if ( (!RR->topology->amRoot()) && (!path->trustEstablished(now)) ) return; @@ -233,7 +232,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from if (shouldUnite) luts = now; } - RR->cluster->sendViaCluster(source,destination,packet.data(),packet.size(),shouldUnite); + RR->cluster->relayViaCluster(source,destination,packet.data(),packet.size(),shouldUnite); return; } #endif @@ -560,7 +559,7 @@ void Switch::onLocalEthernet(const SharedPtr &network,const MAC &from,c } } -void Switch::send(const Packet &packet,bool encrypt) +void Switch::send(Packet &packet,bool encrypt) { if (packet.destination() == RR->identity.address()) { TRACE("BUG: caught attempt to send() to self, ignored"); @@ -687,12 +686,17 @@ Address Switch::_sendWhoisRequest(const Address &addr,const Address *peersAlread return Address(); } -bool Switch::_trySend(const Packet &packet,bool encrypt) +bool Switch::_trySend(Packet &packet,bool encrypt) { - const SharedPtr peer(RR->topology->getPeer(packet.destination())); - if (peer) { - const uint64_t now = RR->node->now(); + SharedPtr viaPath; + const uint64_t now = RR->node->now(); + const Address destination(packet.destination()); +#ifdef ZT_ENABLE_CLUSTER + int clusterMostRecentMemberId = -1; +#endif + const SharedPtr peer(RR->topology->getPeer(destination)); + if (peer) { /* First get the best path, and if it's dead (and this is not a root) * we attempt to re-activate that path but this packet will flow * upstream. If the path comes back alive, it will be used in the future. @@ -700,58 +704,92 @@ bool Switch::_trySend(const Packet &packet,bool encrypt) * to send heartbeats "down" and because we have to at least try to * go somewhere. */ - SharedPtr viaPath(peer->getBestPath(now,false)); + viaPath = peer->getBestPath(now,false); if ( (viaPath) && (!viaPath->alive(now)) && (!RR->topology->isUpstream(peer->identity())) ) { if ((now - viaPath->lastOut()) > std::max((now - viaPath->lastIn()) * 4,(uint64_t)ZT_PATH_MIN_REACTIVATE_INTERVAL)) peer->attemptToContactAt(viaPath->localAddress(),viaPath->address(),now); viaPath.zero(); } + if (!viaPath) { - peer->tryMemorizedPath(now); // periodically attempt memorized or statically defined paths, if any are known - const SharedPtr relay(RR->topology->getUpstreamPeer()); - if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) ) { - if (!(viaPath = peer->getBestPath(now,true))) - return false; +#ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) + clusterMostRecentMemberId = RR->cluster->prepSendViaCluster(destination,packet,encrypt); + if (clusterMostRecentMemberId < 0) { +#endif + peer->tryMemorizedPath(now); // periodically attempt memorized or statically defined paths, if any are known + const SharedPtr relay(RR->topology->getUpstreamPeer()); + if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) ) { + if (!(viaPath = peer->getBestPath(now,true))) // last resort: try an expired path... we usually can never get here + return false; + } +#ifdef ZT_ENABLE_CLUSTER } +#endif } + } else { + requestWhois(destination); +#ifndef ZT_ENABLE_CLUSTER + return false; // if we are not in cluster mode, there is no way we can send without knowing the peer directly +#endif + } - Packet tmp(packet); +#ifdef ZT_TRACE +#ifdef ZT_ENABLE_CLUSTER + if ((!viaPath)&&(clusterMostRecentMemberId < 0)) { + TRACE("BUG: both viaPath and clusterMostRecentMemberId ended up invalid in Switch::_trySend()!"); + abort(); + } +#else + if (!viaPath) { + TRACE("BUG: viaPath ended up NULL in Switch::_trySend()!"); + abort(); + } +#endif +#endif - unsigned int chunkSize = std::min(tmp.size(),(unsigned int)ZT_UDP_DEFAULT_PAYLOAD_MTU); - tmp.setFragmented(chunkSize < tmp.size()); + unsigned int chunkSize = std::min(packet.size(),(unsigned int)ZT_UDP_DEFAULT_PAYLOAD_MTU); + packet.setFragmented(chunkSize < packet.size()); - const uint64_t trustedPathId = RR->topology->getOutboundPathTrust(viaPath->address()); - if (trustedPathId) { - tmp.setTrusted(trustedPathId); - } else { - tmp.armor(peer->key(),encrypt); - } + const uint64_t trustedPathId = RR->topology->getOutboundPathTrust(viaPath->address()); + if (trustedPathId) { + packet.setTrusted(trustedPathId); + } else { + packet.armor(peer->key(),encrypt); + } - if (viaPath->send(RR,tmp.data(),chunkSize,now)) { - if (chunkSize < tmp.size()) { - // Too big for one packet, fragment the rest - unsigned int fragStart = chunkSize; - unsigned int remaining = tmp.size() - chunkSize; - unsigned int fragsRemaining = (remaining / (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)); - if ((fragsRemaining * (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining) - ++fragsRemaining; - const unsigned int totalFragments = fragsRemaining + 1; - - for(unsigned int fno=1;fnosend(RR,packet.data(),chunkSize,now))) || ((clusterMostRecentMemberId >= 0)&&(RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,packet.data(),chunkSize))) ) { +#else + if (viaPath->send(RR,packet.data(),chunkSize,now)) { +#endif + if (chunkSize < packet.size()) { + // Too big for one packet, fragment the rest + unsigned int fragStart = chunkSize; + unsigned int remaining = packet.size() - chunkSize; + unsigned int fragsRemaining = (remaining / (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)); + if ((fragsRemaining * (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining) + ++fragsRemaining; + const unsigned int totalFragments = fragsRemaining + 1; + + for(unsigned int fno=1;fnosend(RR,frag.data(),frag.size(),now); - fragStart += chunkSize; - remaining -= chunkSize; - } + else if (clusterMostRecentMemberId >= 0) + RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,frag.data(),frag.size()); +#else + viaPath->send(RR,frag.data(),frag.size(),now); +#endif + fragStart += chunkSize; + remaining -= chunkSize; } - - return true; } - } else { - requestWhois(packet.destination()); } - return false; + + return true; } bool Switch::_unite(const Address &p1,const Address &p2) -- cgit v1.2.3