diff options
Diffstat (limited to 'node/Switch.cpp')
-rw-r--r-- | node/Switch.cpp | 401 |
1 files changed, 216 insertions, 185 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp index 82b13483..0392aec1 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -64,10 +64,6 @@ Switch::Switch(const RuntimeEnvironment *renv) : { } -Switch::~Switch() -{ -} - void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) { try { @@ -82,17 +78,17 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from * no longer send these, but we'll listen for them for a while to * locate peers with versions <1.0.4. */ - Address beaconAddr(reinterpret_cast<const char *>(data) + 8,5); + const Address beaconAddr(reinterpret_cast<const char *>(data) + 8,5); if (beaconAddr == RR->identity.address()) return; - if (!RR->node->shouldUsePathForZeroTierTraffic(localAddr,fromAddr)) + if (!RR->node->shouldUsePathForZeroTierTraffic(beaconAddr,localAddr,fromAddr)) return; - SharedPtr<Peer> peer(RR->topology->getPeer(beaconAddr)); + const SharedPtr<Peer> peer(RR->topology->getPeer(beaconAddr)); if (peer) { // we'll only respond to beacons from known peers if ((now - _lastBeaconResponse) >= 2500) { // limit rate of responses _lastBeaconResponse = now; Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NOP); - outp.armor(peer->key(),true); + outp.armor(peer->key(),true,path->nextOutgoingCounter()); path->send(RR,outp.data(),outp.size(),now); } } @@ -105,17 +101,14 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from const Address destination(fragment.destination()); if (destination != RR->identity.address()) { - switch(RR->node->relayPolicy()) { - case ZT_RELAY_POLICY_ALWAYS: - break; - case ZT_RELAY_POLICY_TRUSTED: - if (!path->trustEstablished(now)) - return; - break; - // case ZT_RELAY_POLICY_NEVER: - default: - return; - } +#ifdef ZT_ENABLE_CLUSTER + const bool isClusterFrontplane = ((RR->cluster)&&(RR->cluster->isClusterPeerFrontplane(fromAddr))); +#else + const bool isClusterFrontplane = false; +#endif + + if ( (!RR->topology->amRoot()) && (!path->trustEstablished(now)) && (!isClusterFrontplane) ) + return; if (fragment.hops() < ZT_RELAY_MAX_HOPS) { fragment.incrementHops(); @@ -125,14 +118,14 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from SharedPtr<Peer> relayTo = RR->topology->getPeer(destination); if ((!relayTo)||(!relayTo->sendDirect(fragment.data(),fragment.size(),now,false))) { #ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) { - RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false); + if ((RR->cluster)&&(!isClusterFrontplane)) { + RR->cluster->relayViaCluster(Address(),destination,fragment.data(),fragment.size(),false); return; } #endif - // Don't know peer or no direct path -- so relay via root server - relayTo = RR->topology->getBestRoot(); + // Don't know peer or no direct path -- so relay via someone upstream + relayTo = RR->topology->getUpstreamPeer(); if (relayTo) relayTo->sendDirect(fragment.data(),fragment.size(),now,true); } @@ -192,69 +185,98 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from } else if (len >= ZT_PROTO_MIN_PACKET_LENGTH) { // min length check is important! // Handle packet head ------------------------------------------------- - // See packet format in Packet.hpp to understand this - const uint64_t packetId = ( - (((uint64_t)reinterpret_cast<const uint8_t *>(data)[0]) << 56) | - (((uint64_t)reinterpret_cast<const uint8_t *>(data)[1]) << 48) | - (((uint64_t)reinterpret_cast<const uint8_t *>(data)[2]) << 40) | - (((uint64_t)reinterpret_cast<const uint8_t *>(data)[3]) << 32) | - (((uint64_t)reinterpret_cast<const uint8_t *>(data)[4]) << 24) | - (((uint64_t)reinterpret_cast<const uint8_t *>(data)[5]) << 16) | - (((uint64_t)reinterpret_cast<const uint8_t *>(data)[6]) << 8) | - ((uint64_t)reinterpret_cast<const uint8_t *>(data)[7]) - ); const Address destination(reinterpret_cast<const uint8_t *>(data) + 8,ZT_ADDRESS_LENGTH); const Address source(reinterpret_cast<const uint8_t *>(data) + 13,ZT_ADDRESS_LENGTH); - // Catch this and toss it -- it would never work, but it could happen if we somehow - // mistakenly guessed an address we're bound to as a destination for another peer. + //TRACE("<< %.16llx %s -> %s (size: %u)",(unsigned long long)packet->packetId(),source.toString().c_str(),destination.toString().c_str(),packet->size()); + +#ifdef ZT_ENABLE_CLUSTER + if ( (source == RR->identity.address()) && ((!RR->cluster)||(!RR->cluster->isClusterPeerFrontplane(fromAddr))) ) + return; +#else if (source == RR->identity.address()) return; - - //TRACE("<< %.16llx %s -> %s (size: %u)",(unsigned long long)packet->packetId(),source.toString().c_str(),destination.toString().c_str(),packet->size()); +#endif if (destination != RR->identity.address()) { - switch(RR->node->relayPolicy()) { - case ZT_RELAY_POLICY_ALWAYS: - break; - case ZT_RELAY_POLICY_TRUSTED: - if (!path->trustEstablished(now)) - return; - break; - // case ZT_RELAY_POLICY_NEVER: - default: - return; - } + if ( (!RR->topology->amRoot()) && (!path->trustEstablished(now)) && (source != RR->identity.address()) ) + return; Packet packet(data,len); if (packet.hops() < ZT_RELAY_MAX_HOPS) { +#ifdef ZT_ENABLE_CLUSTER + if (source != RR->identity.address()) // don't increment hops for cluster frontplane relays + packet.incrementHops(); +#else packet.incrementHops(); +#endif SharedPtr<Peer> relayTo = RR->topology->getPeer(destination); - if ((relayTo)&&((relayTo->sendDirect(packet.data(),packet.size(),now,false)))) { - Mutex::Lock _l(_lastUniteAttempt_m); - uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; - if ((now - luts) >= ZT_MIN_UNITE_INTERVAL) { - luts = now; - unite(source,destination); + if ((relayTo)&&(relayTo->sendDirect(packet.data(),packet.size(),now,false))) { + if ((source != RR->identity.address())&&(_shouldUnite(now,source,destination))) { // don't send RENDEZVOUS for cluster frontplane relays + const InetAddress *hintToSource = (InetAddress *)0; + const InetAddress *hintToDest = (InetAddress *)0; + + InetAddress destV4,destV6; + InetAddress sourceV4,sourceV6; + relayTo->getRendezvousAddresses(now,destV4,destV6); + + const SharedPtr<Peer> sourcePeer(RR->topology->getPeer(source)); + if (sourcePeer) { + sourcePeer->getRendezvousAddresses(now,sourceV4,sourceV6); + if ((destV6)&&(sourceV6)) { + hintToSource = &destV6; + hintToDest = &sourceV6; + } else if ((destV4)&&(sourceV4)) { + hintToSource = &destV4; + hintToDest = &sourceV4; + } + + if ((hintToSource)&&(hintToDest)) { + unsigned int alt = (unsigned int)RR->node->prng() & 1; // randomize which hint we send first for obscure NAT-t reasons + const unsigned int completed = alt + 2; + while (alt != completed) { + if ((alt & 1) == 0) { + Packet outp(source,RR->identity.address(),Packet::VERB_RENDEZVOUS); + outp.append((uint8_t)0); + destination.appendTo(outp); + outp.append((uint16_t)hintToSource->port()); + if (hintToSource->ss_family == AF_INET6) { + outp.append((uint8_t)16); + outp.append(hintToSource->rawIpData(),16); + } else { + outp.append((uint8_t)4); + outp.append(hintToSource->rawIpData(),4); + } + send(outp,true); + } else { + Packet outp(destination,RR->identity.address(),Packet::VERB_RENDEZVOUS); + outp.append((uint8_t)0); + source.appendTo(outp); + outp.append((uint16_t)hintToDest->port()); + if (hintToDest->ss_family == AF_INET6) { + outp.append((uint8_t)16); + outp.append(hintToDest->rawIpData(),16); + } else { + outp.append((uint8_t)4); + outp.append(hintToDest->rawIpData(),4); + } + send(outp,true); + } + ++alt; + } + } + } } } else { #ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) { - bool shouldUnite; - { - Mutex::Lock _l(_lastUniteAttempt_m); - uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; - shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL); - if (shouldUnite) - luts = now; - } - RR->cluster->sendViaCluster(source,destination,packet.data(),packet.size(),shouldUnite); + if ((RR->cluster)&&(source != RR->identity.address())) { + RR->cluster->relayViaCluster(source,destination,packet.data(),packet.size(),_shouldUnite(now,source,destination)); return; } #endif - relayTo = RR->topology->getBestRoot(&source,1,true); + relayTo = RR->topology->getUpstreamPeer(&source,1,true); if (relayTo) relayTo->sendDirect(packet.data(),packet.size(),now,true); } @@ -264,6 +286,17 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from } else if ((reinterpret_cast<const uint8_t *>(data)[ZT_PACKET_IDX_FLAGS] & ZT_PROTO_FLAG_FRAGMENTED) != 0) { // Packet is the head of a fragmented packet series + const uint64_t packetId = ( + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[0]) << 56) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[1]) << 48) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[2]) << 40) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[3]) << 32) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[4]) << 24) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[5]) << 16) | + (((uint64_t)reinterpret_cast<const uint8_t *>(data)[6]) << 8) | + ((uint64_t)reinterpret_cast<const uint8_t *>(data)[7]) + ); + Mutex::Lock _l(_rxQueue_m); RXQueueEntry *const rq = _findRXQueueEntry(now,packetId); @@ -311,7 +344,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from rq = tmp; } rq->timestamp = now; - rq->packetId = packetId; + rq->packetId = packet.packetId(); rq->frag0 = packet; rq->totalFragments = 1; rq->haveFragments = 1; @@ -577,7 +610,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c } } -void Switch::send(const Packet &packet,bool encrypt) +void Switch::send(Packet &packet,bool encrypt) { if (packet.destination() == RR->identity.address()) { TRACE("BUG: caught attempt to send() to self, ignored"); @@ -590,75 +623,6 @@ void Switch::send(const Packet &packet,bool encrypt) } } -bool Switch::unite(const Address &p1,const Address &p2) -{ - if ((p1 == RR->identity.address())||(p2 == RR->identity.address())) - return false; - SharedPtr<Peer> p1p = RR->topology->getPeer(p1); - if (!p1p) - return false; - SharedPtr<Peer> p2p = RR->topology->getPeer(p2); - if (!p2p) - return false; - - const uint64_t now = RR->node->now(); - - std::pair<InetAddress,InetAddress> cg(Peer::findCommonGround(*p1p,*p2p,now)); - if ((!(cg.first))||(cg.first.ipScope() != cg.second.ipScope())) - return false; - - TRACE("unite: %s(%s) <> %s(%s)",p1.toString().c_str(),cg.second.toString().c_str(),p2.toString().c_str(),cg.first.toString().c_str()); - - /* Tell P1 where to find P2 and vice versa, sending the packets to P1 and - * P2 in randomized order in terms of which gets sent first. This is done - * since in a few cases NAT-t can be sensitive to slight timing differences - * in terms of when the two peers initiate. Normally this is accounted for - * by the nearly-simultaneous RENDEZVOUS kickoff from the relay, but - * given that relay are hosted on cloud providers this can in some - * cases have a few ms of latency between packet departures. By randomizing - * the order we make each attempted NAT-t favor one or the other going - * first, meaning if it doesn't succeed the first time it might the second - * and so forth. */ - unsigned int alt = (unsigned int)RR->node->prng() & 1; - unsigned int completed = alt + 2; - while (alt != completed) { - if ((alt & 1) == 0) { - // Tell p1 where to find p2. - Packet outp(p1,RR->identity.address(),Packet::VERB_RENDEZVOUS); - outp.append((unsigned char)0); - p2.appendTo(outp); - outp.append((uint16_t)cg.first.port()); - if (cg.first.isV6()) { - outp.append((unsigned char)16); - outp.append(cg.first.rawIpData(),16); - } else { - outp.append((unsigned char)4); - outp.append(cg.first.rawIpData(),4); - } - outp.armor(p1p->key(),true); - p1p->sendDirect(outp.data(),outp.size(),now,true); - } else { - // Tell p2 where to find p1. - Packet outp(p2,RR->identity.address(),Packet::VERB_RENDEZVOUS); - outp.append((unsigned char)0); - p1.appendTo(outp); - outp.append((uint16_t)cg.second.port()); - if (cg.second.isV6()) { - outp.append((unsigned char)16); - outp.append(cg.second.rawIpData(),16); - } else { - outp.append((unsigned char)4); - outp.append(cg.second.rawIpData(),4); - } - outp.armor(p2p->key(),true); - p2p->sendDirect(outp.data(),outp.size(),now,true); - } - ++alt; // counts up and also flips LSB - } - - return true; -} - void Switch::requestWhois(const Address &addr) { bool inserted = false; @@ -761,9 +725,20 @@ unsigned long Switch::doTimerTasks(uint64_t now) return nextDelay; } +bool Switch::_shouldUnite(const uint64_t now,const Address &source,const Address &destination) +{ + Mutex::Lock _l(_lastUniteAttempt_m); + uint64_t &ts = _lastUniteAttempt[_LastUniteKey(source,destination)]; + if ((now - ts) >= ZT_MIN_UNITE_INTERVAL) { + ts = now; + return true; + } + return false; +} + Address Switch::_sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted) { - SharedPtr<Peer> upstream(RR->topology->getBestRoot(peersAlreadyConsulted,numPeersAlreadyConsulted,false)); + SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer(peersAlreadyConsulted,numPeersAlreadyConsulted,false)); if (upstream) { Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS); addr.appendTo(outp); @@ -773,70 +748,126 @@ Address Switch::_sendWhoisRequest(const Address &addr,const Address *peersAlread return Address(); } -bool Switch::_trySend(const Packet &packet,bool encrypt) +bool Switch::_trySend(Packet &packet,bool encrypt) { - const SharedPtr<Peer> peer(RR->topology->getPeer(packet.destination())); - if (peer) { - const uint64_t now = RR->node->now(); + SharedPtr<Path> viaPath; + const uint64_t now = RR->node->now(); + const Address destination(packet.destination()); + +#ifdef ZT_ENABLE_CLUSTER + uint64_t clusterMostRecentTs = 0; + int clusterMostRecentMemberId = -1; + uint8_t clusterPeerSecret[ZT_PEER_SECRET_KEY_LENGTH]; + if (RR->cluster) + clusterMostRecentMemberId = RR->cluster->checkSendViaCluster(destination,clusterMostRecentTs,clusterPeerSecret); +#endif - // First get the best path, and if it's dead (and this is not a root) - // we attempt to re-activate that path but this packet will flow - // upstream. If the path comes back alive, it will be used in the future. - // For roots we don't do the alive check since roots are not required - // to send heartbeats "down" and because we have to at least try to - // go somewhere. - - SharedPtr<Path> viaPath(peer->getBestPath(now,false)); - if ( (viaPath) && (!viaPath->alive(now)) && (!RR->topology->isRoot(peer->identity())) ) { - if ((now - viaPath->lastOut()) > std::max((now - viaPath->lastIn()) * 4,(uint64_t)ZT_PATH_MIN_REACTIVATE_INTERVAL)) - peer->attemptToContactAt(viaPath->localAddress(),viaPath->address(),now); + const SharedPtr<Peer> peer(RR->topology->getPeer(destination)); + if (peer) { + /* First get the best path, and if it's dead (and this is not a root) + * we attempt to re-activate that path but this packet will flow + * upstream. If the path comes back alive, it will be used in the future. + * For roots we don't do the alive check since roots are not required + * to send heartbeats "down" and because we have to at least try to + * go somewhere. */ + + viaPath = peer->getBestPath(now,false); + if ( (viaPath) && (!viaPath->alive(now)) && (!RR->topology->isUpstream(peer->identity())) ) { +#ifdef ZT_ENABLE_CLUSTER + if ((clusterMostRecentMemberId < 0)||(viaPath->lastIn() > clusterMostRecentTs)) { +#endif + if ((now - viaPath->lastOut()) > std::max((now - viaPath->lastIn()) * 4,(uint64_t)ZT_PATH_MIN_REACTIVATE_INTERVAL)) { + peer->attemptToContactAt(viaPath->localAddress(),viaPath->address(),now,false,viaPath->nextOutgoingCounter()); + viaPath->sent(now); + } +#ifdef ZT_ENABLE_CLUSTER + } +#endif viaPath.zero(); } + +#ifdef ZT_ENABLE_CLUSTER + if (clusterMostRecentMemberId >= 0) { + if ((viaPath)&&(viaPath->lastIn() < clusterMostRecentTs)) + viaPath.zero(); + } else if (!viaPath) { +#else if (!viaPath) { - SharedPtr<Peer> relay(RR->topology->getBestRoot()); +#endif + peer->tryMemorizedPath(now); // periodically attempt memorized or statically defined paths, if any are known + const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer()); if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) ) { if (!(viaPath = peer->getBestPath(now,true))) return false; } +#ifdef ZT_ENABLE_CLUSTER } +#else + } +#endif + } else { +#ifdef ZT_ENABLE_CLUSTER + if (clusterMostRecentMemberId < 0) { +#else + requestWhois(destination); + return false; // if we are not in cluster mode, there is no way we can send without knowing the peer directly +#endif +#ifdef ZT_ENABLE_CLUSTER + } +#endif + } - Packet tmp(packet); - - unsigned int chunkSize = std::min(tmp.size(),(unsigned int)ZT_UDP_DEFAULT_PAYLOAD_MTU); - tmp.setFragmented(chunkSize < tmp.size()); + unsigned int chunkSize = std::min(packet.size(),(unsigned int)ZT_UDP_DEFAULT_PAYLOAD_MTU); + packet.setFragmented(chunkSize < packet.size()); - const uint64_t trustedPathId = RR->topology->getOutboundPathTrust(viaPath->address()); - if (trustedPathId) { - tmp.setTrusted(trustedPathId); - } else { - tmp.armor(peer->key(),encrypt); - } +#ifdef ZT_ENABLE_CLUSTER + const uint64_t trustedPathId = (viaPath) ? RR->topology->getOutboundPathTrust(viaPath->address()) : 0; + if (trustedPathId) { + packet.setTrusted(trustedPathId); + } else { + packet.armor((clusterMostRecentMemberId >= 0) ? clusterPeerSecret : peer->key(),encrypt,(viaPath) ? viaPath->nextOutgoingCounter() : 0); + } +#else + const uint64_t trustedPathId = RR->topology->getOutboundPathTrust(viaPath->address()); + if (trustedPathId) { + packet.setTrusted(trustedPathId); + } else { + packet.armor(peer->key(),encrypt,viaPath->nextOutgoingCounter()); + } +#endif - if (viaPath->send(RR,tmp.data(),chunkSize,now)) { - if (chunkSize < tmp.size()) { - // Too big for one packet, fragment the rest - unsigned int fragStart = chunkSize; - unsigned int remaining = tmp.size() - chunkSize; - unsigned int fragsRemaining = (remaining / (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)); - if ((fragsRemaining * (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining) - ++fragsRemaining; - const unsigned int totalFragments = fragsRemaining + 1; - - for(unsigned int fno=1;fno<totalFragments;++fno) { - chunkSize = std::min(remaining,(unsigned int)(ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)); - Packet::Fragment frag(tmp,fragStart,chunkSize,fno,totalFragments); +#ifdef ZT_ENABLE_CLUSTER + if ( ((viaPath)&&(viaPath->send(RR,packet.data(),chunkSize,now))) || ((clusterMostRecentMemberId >= 0)&&(RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,packet.data(),chunkSize))) ) { +#else + if (viaPath->send(RR,packet.data(),chunkSize,now)) { +#endif + if (chunkSize < packet.size()) { + // Too big for one packet, fragment the rest + unsigned int fragStart = chunkSize; + unsigned int remaining = packet.size() - chunkSize; + unsigned int fragsRemaining = (remaining / (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)); + if ((fragsRemaining * (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining) + ++fragsRemaining; + const unsigned int totalFragments = fragsRemaining + 1; + + for(unsigned int fno=1;fno<totalFragments;++fno) { + chunkSize = std::min(remaining,(unsigned int)(ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)); + Packet::Fragment frag(packet,fragStart,chunkSize,fno,totalFragments); +#ifdef ZT_ENABLE_CLUSTER + if (viaPath) viaPath->send(RR,frag.data(),frag.size(),now); - fragStart += chunkSize; - remaining -= chunkSize; - } + else if (clusterMostRecentMemberId >= 0) + RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,frag.data(),frag.size()); +#else + viaPath->send(RR,frag.data(),frag.size(),now); +#endif + fragStart += chunkSize; + remaining -= chunkSize; } - - return true; } - } else { - requestWhois(packet.destination()); } - return false; + + return true; } } // namespace ZeroTier |