diff options
Diffstat (limited to 'node/Switch.cpp')
-rw-r--r-- | node/Switch.cpp | 341 |
1 files changed, 220 insertions, 121 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp index 989f497a..74e2f4c6 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -45,6 +45,7 @@ #include "AntiRecursion.hpp" #include "SelfAwareness.hpp" #include "Packet.hpp" +#include "Cluster.hpp" namespace ZeroTier { @@ -67,7 +68,10 @@ static const char *etherTypeName(const unsigned int etherType) Switch::Switch(const RuntimeEnvironment *renv) : RR(renv), - _lastBeaconResponse(0) + _lastBeaconResponse(0), + _outstandingWhoisRequests(32), + _defragQueue(32), + _lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine { } @@ -75,7 +79,7 @@ Switch::~Switch() { } -void Switch::onRemotePacket(const InetAddress &fromAddr,const void *data,unsigned int len) +void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) { try { if (len == 13) { @@ -93,14 +97,14 @@ void Switch::onRemotePacket(const InetAddress &fromAddr,const void *data,unsigne _lastBeaconResponse = now; Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NOP); outp.armor(peer->key(),false); - RR->node->putPacket(fromAddr,outp.data(),outp.size()); + RR->node->putPacket(localAddr,fromAddr,outp.data(),outp.size()); } } } else if (len > ZT_PROTO_MIN_FRAGMENT_LENGTH) { if (((const unsigned char *)data)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) { - _handleRemotePacketFragment(fromAddr,data,len); + _handleRemotePacketFragment(localAddr,fromAddr,data,len); } else if (len >= ZT_PROTO_MIN_PACKET_LENGTH) { - _handleRemotePacketHead(fromAddr,data,len); + _handleRemotePacketHead(localAddr,fromAddr,data,len); } } } catch (std::exception &ex) { @@ -150,25 +154,84 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c MulticastGroup mg(to,0); if (to.isBroadcast()) { - if ( - (etherType == ZT_ETHERTYPE_ARP)&& - (len >= 28)&& - ( - (((const unsigned char *)data)[2] == 0x08)&& - (((const unsigned char *)data)[3] == 0x00)&& - (((const unsigned char *)data)[4] == 6)&& - (((const unsigned char *)data)[5] == 4)&& - (((const unsigned char *)data)[7] == 0x01) - ) - ) { - // Cram IPv4 IP into ADI field to make IPv4 ARP broadcast channel specific and scalable - // Also: enableBroadcast() does not apply to ARP since it's required for IPv4 + if ( (etherType == ZT_ETHERTYPE_ARP) && (len >= 28) && ((((const uint8_t *)data)[2] == 0x08)&&(((const uint8_t *)data)[3] == 0x00)&&(((const uint8_t *)data)[4] == 6)&&(((const uint8_t *)data)[5] == 4)&&(((const uint8_t *)data)[7] == 0x01)) ) { + /* IPv4 ARP is one of the few special cases that we impose upon what is + * otherwise a straightforward Ethernet switch emulation. Vanilla ARP + * is dumb old broadcast and simply doesn't scale. ZeroTier multicast + * groups have an additional field called ADI (additional distinguishing + * information) which was added specifically for ARP though it could + * be used for other things too. We then take ARP broadcasts and turn + * them into multicasts by stuffing the IP address being queried into + * the 32-bit ADI field. In practice this uses our multicast pub/sub + * system to implement a kind of extended/distributed ARP table. */ mg = MulticastGroup::deriveMulticastGroupForAddressResolution(InetAddress(((const unsigned char *)data) + 24,4,0)); } else if (!nconf->enableBroadcast()) { // Don't transmit broadcasts if this network doesn't want them TRACE("%.16llx: dropped broadcast since ff:ff:ff:ff:ff:ff is not enabled",network->id()); return; } + } else if ((etherType == ZT_ETHERTYPE_IPV6)&&(len >= (40 + 8 + 16))) { + /* IPv6 NDP emulation on ZeroTier-RFC4193 addressed networks! This allows + * for multicast-free operation in IPv6 networks, which both improves + * performance and is friendlier to mobile and (especially) IoT devices. + * In the future there may be a no-multicast build option for embedded + * and IoT use and this will be the preferred addressing mode. Note that + * it plays nice with our L2 emulation philosophy and even with bridging. + * While "real" devices behind the bridge can't have ZT-RFC4193 addresses + * themselves, they can look these addresses up with NDP and it will + * work just fine. */ + if ((reinterpret_cast<const uint8_t *>(data)[6] == 0x3a)&&(reinterpret_cast<const uint8_t *>(data)[40] == 0x87)) { // ICMPv6 neighbor solicitation + for(std::vector<InetAddress>::const_iterator sip(nconf->staticIps().begin()),sipend(nconf->staticIps().end());sip!=sipend;++sip) { + if ((sip->ss_family == AF_INET6)&&(Utils::ntoh((uint16_t)reinterpret_cast<const struct sockaddr_in6 *>(&(*sip))->sin6_port) == 88)) { + const uint8_t *my6 = reinterpret_cast<const uint8_t *>(reinterpret_cast<const struct sockaddr_in6 *>(&(*sip))->sin6_addr.s6_addr); + if ((my6[0] == 0xfd)&&(my6[9] == 0x99)&&(my6[10] == 0x93)) { // ZT-RFC4193 == fd__:____:____:____:__99:93__:____:____ / 88 + const uint8_t *pkt6 = reinterpret_cast<const uint8_t *>(data) + 40 + 8; + unsigned int ptr = 0; + while (ptr != 11) { + if (pkt6[ptr] != my6[ptr]) + break; + ++ptr; + } + if (ptr == 11) { // /88 matches an assigned address on this network + const Address atPeer(pkt6 + ptr,5); + if (atPeer != RR->identity.address()) { + const MAC atPeerMac(atPeer,network->id()); + TRACE("ZT-RFC4193 NDP emulation: %.16llx: forging response for %s/%s",network->id(),atPeer.toString().c_str(),atPeerMac.toString().c_str()); + + uint8_t adv[72]; + adv[0] = 0x60; adv[1] = 0x00; adv[2] = 0x00; adv[3] = 0x00; + adv[4] = 0x00; adv[5] = 0x20; + adv[6] = 0x3a; adv[7] = 0xff; + for(int i=0;i<16;++i) adv[8 + i] = pkt6[i]; + for(int i=0;i<16;++i) adv[24 + i] = my6[i]; + adv[40] = 0x88; adv[41] = 0x00; + adv[42] = 0x00; adv[43] = 0x00; // future home of checksum + adv[44] = 0x60; adv[45] = 0x00; adv[46] = 0x00; adv[47] = 0x00; + for(int i=0;i<16;++i) adv[48 + i] = pkt6[i]; + adv[64] = 0x02; adv[65] = 0x01; + adv[66] = atPeerMac[0]; adv[67] = atPeerMac[1]; adv[68] = atPeerMac[2]; adv[69] = atPeerMac[3]; adv[70] = atPeerMac[4]; adv[71] = atPeerMac[5]; + + uint16_t pseudo_[36]; + uint8_t *const pseudo = reinterpret_cast<uint8_t *>(pseudo_); + for(int i=0;i<32;++i) pseudo[i] = adv[8 + i]; + pseudo[32] = 0x00; pseudo[33] = 0x00; pseudo[34] = 0x00; pseudo[35] = 0x20; + pseudo[36] = 0x00; pseudo[37] = 0x00; pseudo[38] = 0x00; pseudo[39] = 0x3a; + for(int i=0;i<32;++i) pseudo[40 + i] = adv[40 + i]; + uint32_t checksum = 0; + for(int i=0;i<36;++i) checksum += Utils::hton(pseudo_[i]); + while ((checksum >> 16)) checksum = (checksum & 0xffff) + (checksum >> 16); + checksum = ~checksum; + adv[42] = (checksum >> 8) & 0xff; + adv[43] = checksum & 0xff; + + RR->node->putFrame(network->id(),atPeerMac,from,ZT_ETHERTYPE_IPV6,0,adv,72); + return; // stop processing: we have handled this frame with a spoofed local reply so no need to send it anywhere + } + } + } + } + } + } } /* Learn multicast groups for bridged-in hosts. @@ -199,7 +262,8 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c // Destination is another ZeroTier peer on the same network Address toZT(to.toAddress(network->id())); // since in-network MACs are derived from addresses and network IDs, we can reverse this - const bool includeCom = network->peerNeedsOurMembershipCertificate(toZT,RR->node->now()); + SharedPtr<Peer> toPeer(RR->topology->getPeer(toZT)); + const bool includeCom = ( (nconf->isPrivate()) && (nconf->com()) && ((!toPeer)||(toPeer->needsOurNetworkMembershipCertificate(network->id(),RR->node->now(),true))) ); if ((fromBridged)||(includeCom)) { Packet outp(toZT,RR->identity.address(),Packet::VERB_EXT_FRAME); outp.append(network->id()); @@ -264,9 +328,10 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c } for(unsigned int b=0;b<numBridges;++b) { + SharedPtr<Peer> bridgePeer(RR->topology->getPeer(bridges[b])); Packet outp(bridges[b],RR->identity.address(),Packet::VERB_EXT_FRAME); outp.append(network->id()); - if (network->peerNeedsOurMembershipCertificate(bridges[b],RR->node->now())) { + if ( (nconf->isPrivate()) && (nconf->com()) && ((!bridgePeer)||(bridgePeer->needsOurNetworkMembershipCertificate(network->id(),RR->node->now(),true))) ) { outp.append((unsigned char)0x01); // 0x01 -- COM included nconf->com().serialize(outp); } else { @@ -289,17 +354,18 @@ void Switch::send(const Packet &packet,bool encrypt,uint64_t nwid) return; } + //TRACE(">> %s to %s (%u bytes, encrypt==%d, nwid==%.16llx)",Packet::verbString(packet.verb()),packet.destination().toString().c_str(),packet.size(),(int)encrypt,nwid); + if (!_trySend(packet,encrypt,nwid)) { Mutex::Lock _l(_txQueue_m); - _txQueue.insert(std::pair< Address,TXQueueEntry >(packet.destination(),TXQueueEntry(RR->node->now(),packet,encrypt,nwid))); + _txQueue.push_back(TXQueueEntry(packet.destination(),RR->node->now(),packet,encrypt,nwid)); } } -bool Switch::unite(const Address &p1,const Address &p2,bool force) +bool Switch::unite(const Address &p1,const Address &p2) { if ((p1 == RR->identity.address())||(p2 == RR->identity.address())) return false; - SharedPtr<Peer> p1p = RR->topology->getPeer(p1); if (!p1p) return false; @@ -310,30 +376,9 @@ bool Switch::unite(const Address &p1,const Address &p2,bool force) const uint64_t now = RR->node->now(); std::pair<InetAddress,InetAddress> cg(Peer::findCommonGround(*p1p,*p2p,now)); - if (!(cg.first)) + if ((!(cg.first))||(cg.first.ipScope() != cg.second.ipScope())) return false; - if (cg.first.ipScope() != cg.second.ipScope()) - return false; - - // Addresses are sorted in key for last unite attempt map for order - // invariant lookup: (p1,p2) == (p2,p1) - Array<Address,2> uniteKey; - if (p1 >= p2) { - uniteKey[0] = p2; - uniteKey[1] = p1; - } else { - uniteKey[0] = p1; - uniteKey[1] = p2; - } - { - Mutex::Lock _l(_lastUniteAttempt_m); - std::map< Array< Address,2 >,uint64_t >::const_iterator e(_lastUniteAttempt.find(uniteKey)); - if ((!force)&&(e != _lastUniteAttempt.end())&&((now - e->second) < ZT_MIN_UNITE_INTERVAL)) - return false; - else _lastUniteAttempt[uniteKey] = now; - } - TRACE("unite: %s(%s) <> %s(%s)",p1.toString().c_str(),cg.second.toString().c_str(),p2.toString().c_str(),cg.first.toString().c_str()); /* Tell P1 where to find P2 and vice versa, sending the packets to P1 and @@ -386,14 +431,14 @@ bool Switch::unite(const Address &p1,const Address &p2,bool force) return true; } -void Switch::rendezvous(const SharedPtr<Peer> &peer,const InetAddress &atAddr) +void Switch::rendezvous(const SharedPtr<Peer> &peer,const InetAddress &localAddr,const InetAddress &atAddr) { TRACE("sending NAT-t message to %s(%s)",peer->address().toString().c_str(),atAddr.toString().c_str()); const uint64_t now = RR->node->now(); - peer->attemptToContactAt(RR,atAddr,now); + peer->sendHELLO(RR,localAddr,atAddr,now,2); // first attempt: send low-TTL packet to 'open' local NAT { Mutex::Lock _l(_contactQueue_m); - _contactQueue.push_back(ContactQueueEntry(peer,now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY,atAddr)); + _contactQueue.push_back(ContactQueueEntry(peer,now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY,localAddr,atAddr)); } } @@ -402,10 +447,13 @@ void Switch::requestWhois(const Address &addr) bool inserted = false; { Mutex::Lock _l(_outstandingWhoisRequests_m); - std::pair< std::map< Address,WhoisRequest >::iterator,bool > entry(_outstandingWhoisRequests.insert(std::pair<Address,WhoisRequest>(addr,WhoisRequest()))); - if ((inserted = entry.second)) - entry.first->second.lastSent = RR->node->now(); - entry.first->second.retries = 0; // reset retry count if entry already existed + WhoisRequest &r = _outstandingWhoisRequests[addr]; + if (r.lastSent) { + r.retries = 0; // reset retry count if entry already existed, but keep waiting and retry again after normal timeout + } else { + r.lastSent = RR->node->now(); + inserted = true; + } } if (inserted) _sendWhoisRequest(addr,(const Address *)0,0); @@ -427,7 +475,7 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer) { // finish processing any packets waiting on peer's public key / identity Mutex::Lock _l(_rxQueue_m); for(std::list< SharedPtr<IncomingPacket> >::iterator rxi(_rxQueue.begin());rxi!=_rxQueue.end();) { - if ((*rxi)->tryDecode(RR)) + if ((*rxi)->tryDecode(RR,false)) _rxQueue.erase(rxi++); else ++rxi; } @@ -435,11 +483,12 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer) { // finish sending any packets waiting on peer's public key / identity Mutex::Lock _l(_txQueue_m); - std::pair< std::multimap< Address,TXQueueEntry >::iterator,std::multimap< Address,TXQueueEntry >::iterator > waitingTxQueueItems(_txQueue.equal_range(peer->address())); - for(std::multimap< Address,TXQueueEntry >::iterator txi(waitingTxQueueItems.first);txi!=waitingTxQueueItems.second;) { - if (_trySend(txi->second.packet,txi->second.encrypt,txi->second.nwid)) - _txQueue.erase(txi++); - else ++txi; + for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) { + if (txi->dest == peer->address()) { + if (_trySend(txi->packet,txi->encrypt,txi->nwid)) + _txQueue.erase(txi++); + else ++txi; + } else ++txi; } } } @@ -452,21 +501,21 @@ unsigned long Switch::doTimerTasks(uint64_t now) Mutex::Lock _l(_contactQueue_m); for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) { if (now >= qi->fireAtTime) { - if ((!qi->peer->alive(now))||(qi->peer->hasActiveDirectPath(now))) { - // Cancel attempt if we've already connected or peer is no longer "alive" + if (qi->peer->hasActiveDirectPath(now)) { + // Cancel if connection has succeeded _contactQueue.erase(qi++); continue; } else { if (qi->strategyIteration == 0) { // First strategy: send packet directly to destination - qi->peer->attemptToContactAt(RR,qi->inaddr,now); - } else if (qi->strategyIteration <= 4) { - // Strategies 1-4: try escalating ports for symmetric NATs that remap sequentially + qi->peer->sendHELLO(RR,qi->localAddr,qi->inaddr,now); + } else if (qi->strategyIteration <= 3) { + // Strategies 1-3: try escalating ports for symmetric NATs that remap sequentially InetAddress tmpaddr(qi->inaddr); int p = (int)qi->inaddr.port() + qi->strategyIteration; if (p < 0xffff) { tmpaddr.setPort((unsigned int)p); - qi->peer->attemptToContactAt(RR,tmpaddr,now); + qi->peer->sendHELLO(RR,qi->localAddr,tmpaddr,now); } else qi->strategyIteration = 5; } else { // All strategies tried, expire entry @@ -486,36 +535,37 @@ unsigned long Switch::doTimerTasks(uint64_t now) { // Retry outstanding WHOIS requests Mutex::Lock _l(_outstandingWhoisRequests_m); - for(std::map< Address,WhoisRequest >::iterator i(_outstandingWhoisRequests.begin());i!=_outstandingWhoisRequests.end();) { - unsigned long since = (unsigned long)(now - i->second.lastSent); + Hashtable< Address,WhoisRequest >::Iterator i(_outstandingWhoisRequests); + Address *a = (Address *)0; + WhoisRequest *r = (WhoisRequest *)0; + while (i.next(a,r)) { + const unsigned long since = (unsigned long)(now - r->lastSent); if (since >= ZT_WHOIS_RETRY_DELAY) { - if (i->second.retries >= ZT_MAX_WHOIS_RETRIES) { - TRACE("WHOIS %s timed out",i->first.toString().c_str()); - _outstandingWhoisRequests.erase(i++); - continue; + if (r->retries >= ZT_MAX_WHOIS_RETRIES) { + TRACE("WHOIS %s timed out",a->toString().c_str()); + _outstandingWhoisRequests.erase(*a); } else { - i->second.lastSent = now; - i->second.peersConsulted[i->second.retries] = _sendWhoisRequest(i->first,i->second.peersConsulted,i->second.retries); - ++i->second.retries; - TRACE("WHOIS %s (retry %u)",i->first.toString().c_str(),i->second.retries); + r->lastSent = now; + r->peersConsulted[r->retries] = _sendWhoisRequest(*a,r->peersConsulted,r->retries); + ++r->retries; + TRACE("WHOIS %s (retry %u)",a->toString().c_str(),r->retries); nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY); } } else { nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since); } - ++i; } } { // Time out TX queue packets that never got WHOIS lookups or other info. Mutex::Lock _l(_txQueue_m); - for(std::multimap< Address,TXQueueEntry >::iterator i(_txQueue.begin());i!=_txQueue.end();) { - if (_trySend(i->second.packet,i->second.encrypt,i->second.nwid)) - _txQueue.erase(i++); - else if ((now - i->second.creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) { - TRACE("TX %s -> %s timed out",i->second.packet.source().toString().c_str(),i->second.packet.destination().toString().c_str()); - _txQueue.erase(i++); - } else ++i; + for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) { + if (_trySend(txi->packet,txi->encrypt,txi->nwid)) + _txQueue.erase(txi++); + else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) { + TRACE("TX %s -> %s timed out",txi->packet.source().toString().c_str(),txi->packet.destination().toString().c_str()); + _txQueue.erase(txi++); + } else ++txi; } } @@ -531,18 +581,32 @@ unsigned long Switch::doTimerTasks(uint64_t now) { // Time out packets that didn't get all their fragments. Mutex::Lock _l(_defragQueue_m); - for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) { - if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) { - TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first); - _defragQueue.erase(i++); - } else ++i; + Hashtable< uint64_t,DefragQueueEntry >::Iterator i(_defragQueue); + uint64_t *packetId = (uint64_t *)0; + DefragQueueEntry *qe = (DefragQueueEntry *)0; + while (i.next(packetId,qe)) { + if ((now - qe->creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) { + TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",*packetId); + _defragQueue.erase(*packetId); + } + } + } + + { // Remove really old last unite attempt entries to keep table size controlled + Mutex::Lock _l(_lastUniteAttempt_m); + Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt); + _LastUniteKey *k = (_LastUniteKey *)0; + uint64_t *v = (uint64_t *)0; + while (i.next(k,v)) { + if ((now - *v) >= (ZT_MIN_UNITE_INTERVAL * 8)) + _lastUniteAttempt.erase(*k); } } return nextDelay; } -void Switch::_handleRemotePacketFragment(const InetAddress &fromAddr,const void *data,unsigned int len) +void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) { Packet::Fragment fragment(data,len); Address destination(fragment.destination()); @@ -556,6 +620,13 @@ void Switch::_handleRemotePacketFragment(const InetAddress &fromAddr,const void // It wouldn't hurt anything, just redundant and unnecessary. SharedPtr<Peer> relayTo = RR->topology->getPeer(destination); if ((!relayTo)||(!relayTo->send(RR,fragment.data(),fragment.size(),RR->node->now()))) { +#ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) { + RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false); + return; + } +#endif + // Don't know peer or no direct path -- so relay via root server relayTo = RR->topology->getBestRoot(); if (relayTo) @@ -577,34 +648,33 @@ void Switch::_handleRemotePacketFragment(const InetAddress &fromAddr,const void // seeing a Packet::Fragment? Mutex::Lock _l(_defragQueue_m); - std::map< uint64_t,DefragQueueEntry >::iterator dqe(_defragQueue.find(pid)); + DefragQueueEntry &dq = _defragQueue[pid]; - if (dqe == _defragQueue.end()) { + if (!dq.creationTime) { // We received a Packet::Fragment without its head, so queue it and wait - DefragQueueEntry &dq = _defragQueue[pid]; dq.creationTime = RR->node->now(); dq.frags[fno - 1] = fragment; dq.totalFragments = tf; // total fragment count is known dq.haveFragments = 1 << fno; // we have only this fragment //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str()); - } else if (!(dqe->second.haveFragments & (1 << fno))) { + } else if (!(dq.haveFragments & (1 << fno))) { // We have other fragments and maybe the head, so add this one and check - dqe->second.frags[fno - 1] = fragment; - dqe->second.totalFragments = tf; + dq.frags[fno - 1] = fragment; + dq.totalFragments = tf; //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str()); - if (Utils::countBits(dqe->second.haveFragments |= (1 << fno)) == tf) { + if (Utils::countBits(dq.haveFragments |= (1 << fno)) == tf) { // We have all fragments -- assemble and process full Packet //TRACE("packet %.16llx is complete, assembling and processing...",pid); - SharedPtr<IncomingPacket> packet(dqe->second.frag0); + SharedPtr<IncomingPacket> packet(dq.frag0); for(unsigned int f=1;f<tf;++f) - packet->append(dqe->second.frags[f - 1].payload(),dqe->second.frags[f - 1].payloadLength()); - _defragQueue.erase(dqe); + packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength()); + _defragQueue.erase(pid); // dq no longer valid after this - if (!packet->tryDecode(RR)) { + if (!packet->tryDecode(RR,false)) { Mutex::Lock _l(_rxQueue_m); _rxQueue.push_back(packet); } @@ -614,13 +684,19 @@ void Switch::_handleRemotePacketFragment(const InetAddress &fromAddr,const void } } -void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *data,unsigned int len) +void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) { - SharedPtr<IncomingPacket> packet(new IncomingPacket(data,len,fromAddr,RR->node->now())); + const uint64_t now = RR->node->now(); + SharedPtr<IncomingPacket> packet(new IncomingPacket(data,len,localAddr,fromAddr,now)); Address source(packet->source()); Address destination(packet->destination()); + // Catch this and toss it -- it would never work, but it could happen if we somehow + // mistakenly guessed an address we're bound to as a destination for another peer. + if (source == RR->identity.address()) + return; + //TRACE("<< %.16llx %s -> %s (size: %u)",(unsigned long long)packet->packetId(),source.toString().c_str(),destination.toString().c_str(),packet->size()); if (destination != RR->identity.address()) { @@ -629,13 +705,32 @@ void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *dat packet->incrementHops(); SharedPtr<Peer> relayTo = RR->topology->getPeer(destination); - if ((relayTo)&&((relayTo->send(RR,packet->data(),packet->size(),RR->node->now())))) { - unite(source,destination,false); + if ((relayTo)&&((relayTo->send(RR,packet->data(),packet->size(),now)))) { + Mutex::Lock _l(_lastUniteAttempt_m); + uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; + if ((now - luts) >= ZT_MIN_UNITE_INTERVAL) { + luts = now; + unite(source,destination); + } } else { - // Don't know peer or no direct path -- so relay via root server +#ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) { + bool shouldUnite; + { + Mutex::Lock _l(_lastUniteAttempt_m); + uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; + shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL); + if (shouldUnite) + luts = now; + } + RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size(),shouldUnite); + return; + } +#endif + relayTo = RR->topology->getBestRoot(&source,1,true); if (relayTo) - relayTo->send(RR,packet->data(),packet->size(),RR->node->now()); + relayTo->send(RR,packet->data(),packet->size(),now); } } else { TRACE("dropped relay %s(%s) -> %s, max hops exceeded",packet->source().toString().c_str(),fromAddr.toString().c_str(),destination.toString().c_str()); @@ -645,39 +740,40 @@ void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *dat uint64_t pid = packet->packetId(); Mutex::Lock _l(_defragQueue_m); - std::map< uint64_t,DefragQueueEntry >::iterator dqe(_defragQueue.find(pid)); + DefragQueueEntry &dq = _defragQueue[pid]; - if (dqe == _defragQueue.end()) { + if (!dq.creationTime) { // If we have no other fragments yet, create an entry and save the head - DefragQueueEntry &dq = _defragQueue[pid]; - dq.creationTime = RR->node->now(); + + dq.creationTime = now; dq.frag0 = packet; dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment dq.haveFragments = 1; // head is first bit (left to right) //TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str()); - } else if (!(dqe->second.haveFragments & 1)) { + } else if (!(dq.haveFragments & 1)) { // If we have other fragments but no head, see if we are complete with the head - if ((dqe->second.totalFragments)&&(Utils::countBits(dqe->second.haveFragments |= 1) == dqe->second.totalFragments)) { + + if ((dq.totalFragments)&&(Utils::countBits(dq.haveFragments |= 1) == dq.totalFragments)) { // We have all fragments -- assemble and process full Packet //TRACE("packet %.16llx is complete, assembling and processing...",pid); // packet already contains head, so append fragments - for(unsigned int f=1;f<dqe->second.totalFragments;++f) - packet->append(dqe->second.frags[f - 1].payload(),dqe->second.frags[f - 1].payloadLength()); - _defragQueue.erase(dqe); + for(unsigned int f=1;f<dq.totalFragments;++f) + packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength()); + _defragQueue.erase(pid); // dq no longer valid after this - if (!packet->tryDecode(RR)) { + if (!packet->tryDecode(RR,false)) { Mutex::Lock _l(_rxQueue_m); _rxQueue.push_back(packet); } } else { // Still waiting on more fragments, so queue the head - dqe->second.frag0 = packet; + dq.frag0 = packet; } } // else this is a duplicate head, ignore } else { // Packet is unfragmented, so just process it - if (!packet->tryDecode(RR)) { + if (!packet->tryDecode(RR,false)) { Mutex::Lock _l(_rxQueue_m); _rxQueue.push_back(packet); } @@ -715,17 +811,20 @@ bool Switch::_trySend(const Packet &packet,bool encrypt,uint64_t nwid) return false; // sanity check: unconfigured network? why are we trying to talk to it? } - RemotePath *viaPath = peer->getBestPath(now); + Path *viaPath = peer->getBestPath(now); SharedPtr<Peer> relay; if (!viaPath) { // See if this network has a preferred relay (if packet has an associated network) if (nconf) { - unsigned int latency = ~((unsigned int)0); + unsigned int bestq = ~((unsigned int)0); for(std::vector< std::pair<Address,InetAddress> >::const_iterator r(nconf->relays().begin());r!=nconf->relays().end();++r) { if (r->first != peer->address()) { SharedPtr<Peer> rp(RR->topology->getPeer(r->first)); - if ((rp)&&(rp->hasActiveDirectPath(now))&&(rp->latency() <= latency)) + const unsigned int q = rp->relayQuality(now); + if ((rp)&&(q < bestq)) { // SUBTILE: < == don't use these if they are nil quality (unsigned int max), instead use a root + bestq = q; rp.swap(relay); + } } } } @@ -738,7 +837,7 @@ bool Switch::_trySend(const Packet &packet,bool encrypt,uint64_t nwid) return false; // no paths, no root servers? } - if ((network)&&(relay)&&(network->isAllowed(peer->address()))) { + if ((network)&&(relay)&&(network->isAllowed(peer))) { // Push hints for direct connectivity to this peer if we are relaying peer->pushDirectPaths(RR,viaPath,now,false); } |