diff options
| author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-03-27 17:03:17 -0700 |
|---|---|---|
| committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-03-27 17:03:17 -0700 |
| commit | e4896b257fde05a216500804d9bcef3b84b0980e (patch) | |
| tree | fd4a33a82178cbeccc55eb59a9bcf1ba70957fc7 /node/Switch.cpp | |
| parent | 592cac58155fecf2e226b32c614e19064214cc1a (diff) | |
| download | infinitytier-e4896b257fde05a216500804d9bcef3b84b0980e.tar.gz infinitytier-e4896b257fde05a216500804d9bcef3b84b0980e.zip | |
Add thread PTR that gets passed through the entire ZT core call stack and then passed to handler functions resulting from a call.
Diffstat (limited to 'node/Switch.cpp')
| -rw-r--r-- | node/Switch.cpp | 97 |
1 files changed, 49 insertions, 48 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp index aab2e7ff..62674472 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -64,7 +64,7 @@ Switch::Switch(const RuntimeEnvironment *renv) : { } -void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) +void Switch::onRemotePacket(void *tPtr,const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) { try { const uint64_t now = RR->node->now(); @@ -81,15 +81,15 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from const Address beaconAddr(reinterpret_cast<const char *>(data) + 8,5); if (beaconAddr == RR->identity.address()) return; - if (!RR->node->shouldUsePathForZeroTierTraffic(beaconAddr,localAddr,fromAddr)) + if (!RR->node->shouldUsePathForZeroTierTraffic(tPtr,beaconAddr,localAddr,fromAddr)) return; - const SharedPtr<Peer> peer(RR->topology->getPeer(beaconAddr)); + const SharedPtr<Peer> peer(RR->topology->getPeer(tPtr,beaconAddr)); if (peer) { // we'll only respond to beacons from known peers if ((now - _lastBeaconResponse) >= 2500) { // limit rate of responses _lastBeaconResponse = now; Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NOP); outp.armor(peer->key(),true,path->nextOutgoingCounter()); - path->send(RR,outp.data(),outp.size(),now); + path->send(RR,tPtr,outp.data(),outp.size(),now); } } @@ -115,8 +115,8 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from // Note: we don't bother initiating NAT-t for fragments, since heads will set that off. // It wouldn't hurt anything, just redundant and unnecessary. - SharedPtr<Peer> relayTo = RR->topology->getPeer(destination); - if ((!relayTo)||(!relayTo->sendDirect(fragment.data(),fragment.size(),now,false))) { + SharedPtr<Peer> relayTo = RR->topology->getPeer(tPtr,destination); + if ((!relayTo)||(!relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now,false))) { #ifdef ZT_ENABLE_CLUSTER if ((RR->cluster)&&(!isClusterFrontplane)) { RR->cluster->relayViaCluster(Address(),destination,fragment.data(),fragment.size(),false); @@ -127,7 +127,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from // Don't know peer or no direct path -- so relay via someone upstream relayTo = RR->topology->getUpstreamPeer(); if (relayTo) - relayTo->sendDirect(fragment.data(),fragment.size(),now,true); + relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now,true); } } else { TRACE("dropped relay [fragment](%s) -> %s, max hops exceeded",fromAddr.toString().c_str(),destination.toString().c_str()); @@ -171,7 +171,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from for(unsigned int f=1;f<totalFragments;++f) rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength()); - if (rq->frag0.tryDecode(RR)) { + if (rq->frag0.tryDecode(RR,tPtr)) { rq->timestamp = 0; // packet decoded, free entry } else { rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something @@ -212,8 +212,8 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from packet.incrementHops(); #endif - SharedPtr<Peer> relayTo = RR->topology->getPeer(destination); - if ((relayTo)&&(relayTo->sendDirect(packet.data(),packet.size(),now,false))) { + SharedPtr<Peer> relayTo = RR->topology->getPeer(tPtr,destination); + if ((relayTo)&&(relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,false))) { if ((source != RR->identity.address())&&(_shouldUnite(now,source,destination))) { // don't send RENDEZVOUS for cluster frontplane relays const InetAddress *hintToSource = (InetAddress *)0; const InetAddress *hintToDest = (InetAddress *)0; @@ -222,7 +222,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from InetAddress sourceV4,sourceV6; relayTo->getRendezvousAddresses(now,destV4,destV6); - const SharedPtr<Peer> sourcePeer(RR->topology->getPeer(source)); + const SharedPtr<Peer> sourcePeer(RR->topology->getPeer(tPtr,source)); if (sourcePeer) { sourcePeer->getRendezvousAddresses(now,sourceV4,sourceV6); if ((destV6)&&(sourceV6)) { @@ -249,7 +249,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from outp.append((uint8_t)4); outp.append(hintToSource->rawIpData(),4); } - send(outp,true); + send(tPtr,outp,true); } else { Packet outp(destination,RR->identity.address(),Packet::VERB_RENDEZVOUS); outp.append((uint8_t)0); @@ -262,7 +262,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from outp.append((uint8_t)4); outp.append(hintToDest->rawIpData(),4); } - send(outp,true); + send(tPtr,outp,true); } ++alt; } @@ -278,7 +278,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from #endif relayTo = RR->topology->getUpstreamPeer(&source,1,true); if (relayTo) - relayTo->sendDirect(packet.data(),packet.size(),now,true); + relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,true); } } else { TRACE("dropped relay %s(%s) -> %s, max hops exceeded",packet.source().toString().c_str(),fromAddr.toString().c_str(),destination.toString().c_str()); @@ -321,7 +321,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from for(unsigned int f=1;f<rq->totalFragments;++f) rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength()); - if (rq->frag0.tryDecode(RR)) { + if (rq->frag0.tryDecode(RR,tPtr)) { rq->timestamp = 0; // packet decoded, free entry } else { rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something @@ -334,7 +334,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from } else { // Packet is unfragmented, so just process it IncomingPacket packet(data,len,path,now); - if (!packet.tryDecode(RR)) { + if (!packet.tryDecode(RR,tPtr)) { Mutex::Lock _l(_rxQueue_m); RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]); unsigned long i = ZT_RX_QUEUE_SIZE - 1; @@ -362,7 +362,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from } } -void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) +void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { if (!network->hasConfig()) return; @@ -474,7 +474,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c adv[42] = (checksum >> 8) & 0xff; adv[43] = checksum & 0xff; - RR->node->putFrame(network->id(),network->userPtr(),peerMac,from,ZT_ETHERTYPE_IPV6,0,adv,72); + RR->node->putFrame(tPtr,network->id(),network->userPtr(),peerMac,from,ZT_ETHERTYPE_IPV6,0,adv,72); return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query. } // else no NDP emulation } // else no NDP emulation @@ -491,17 +491,18 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c * multicast addresses on bridge interfaces and subscribing each slave. * But in that case this does no harm, as the sets are just merged. */ if (fromBridged) - network->learnBridgedMulticastGroup(multicastGroup,RR->node->now()); + network->learnBridgedMulticastGroup(tPtr,multicastGroup,RR->node->now()); //TRACE("%.16llx: MULTICAST %s -> %s %s %u",network->id(),from.toString().c_str(),multicastGroup.toString().c_str(),etherTypeName(etherType),len); // First pass sets noTee to false, but noTee is set to true in OutboundMulticast to prevent duplicates. - if (!network->filterOutgoingPacket(false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) { + if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) { TRACE("%.16llx: %s -> %s %s packet not sent: filterOutgoingPacket() returned false",network->id(),from.toString().c_str(),to.toString().c_str(),etherTypeName(etherType)); return; } RR->mc->send( + tPtr, network->config().multicastLimit, RR->node->now(), network->id(), @@ -514,14 +515,14 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c len); } else if (to == network->mac()) { // Destination is this node, so just reinject it - RR->node->putFrame(network->id(),network->userPtr(),from,to,etherType,vlanId,data,len); + RR->node->putFrame(tPtr,network->id(),network->userPtr(),from,to,etherType,vlanId,data,len); } else if (to[0] == MAC::firstOctetForNetwork(network->id())) { // Destination is another ZeroTier peer on the same network Address toZT(to.toAddress(network->id())); // since in-network MACs are derived from addresses and network IDs, we can reverse this - SharedPtr<Peer> toPeer(RR->topology->getPeer(toZT)); + SharedPtr<Peer> toPeer(RR->topology->getPeer(tPtr,toZT)); - if (!network->filterOutgoingPacket(false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId)) { + if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId)) { TRACE("%.16llx: %s -> %s %s packet not sent: filterOutgoingPacket() returned false",network->id(),from.toString().c_str(),to.toString().c_str(),etherTypeName(etherType)); return; } @@ -536,7 +537,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c outp.append(data,len); if (!network->config().disableCompression()) outp.compress(); - send(outp,true); + send(tPtr,outp,true); } else { Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME); outp.append(network->id()); @@ -544,7 +545,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c outp.append(data,len); if (!network->config().disableCompression()) outp.compress(); - send(outp,true); + send(tPtr,outp,true); } //TRACE("%.16llx: UNICAST: %s -> %s etherType==%s(%.4x) vlanId==%u len==%u fromBridged==%d includeCom==%d",network->id(),from.toString().c_str(),to.toString().c_str(),etherTypeName(etherType),etherType,vlanId,len,(int)fromBridged,(int)includeCom); @@ -554,7 +555,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c // We filter with a NULL destination ZeroTier address first. Filtrations // for each ZT destination are also done below. This is the same rationale // and design as for multicast. - if (!network->filterOutgoingPacket(false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) { + if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) { TRACE("%.16llx: %s -> %s %s packet not sent: filterOutgoingPacket() returned false",network->id(),from.toString().c_str(),to.toString().c_str(),etherTypeName(etherType)); return; } @@ -592,7 +593,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c } for(unsigned int b=0;b<numBridges;++b) { - if (network->filterOutgoingPacket(true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId)) { + if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId)) { Packet outp(bridges[b],RR->identity.address(),Packet::VERB_EXT_FRAME); outp.append(network->id()); outp.append((uint8_t)0x00); @@ -602,7 +603,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c outp.append(data,len); if (!network->config().disableCompression()) outp.compress(); - send(outp,true); + send(tPtr,outp,true); } else { TRACE("%.16llx: %s -> %s %s packet not sent: filterOutgoingPacket() returned false",network->id(),from.toString().c_str(),to.toString().c_str(),etherTypeName(etherType)); } @@ -610,20 +611,20 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c } } -void Switch::send(Packet &packet,bool encrypt) +void Switch::send(void *tPtr,Packet &packet,bool encrypt) { if (packet.destination() == RR->identity.address()) { TRACE("BUG: caught attempt to send() to self, ignored"); return; } - if (!_trySend(packet,encrypt)) { + if (!_trySend(tPtr,packet,encrypt)) { Mutex::Lock _l(_txQueue_m); _txQueue.push_back(TXQueueEntry(packet.destination(),RR->node->now(),packet,encrypt)); } } -void Switch::requestWhois(const Address &addr) +void Switch::requestWhois(void *tPtr,const Address &addr) { #ifdef ZT_TRACE if (addr == RR->identity.address()) { @@ -644,10 +645,10 @@ void Switch::requestWhois(const Address &addr) } } if (inserted) - _sendWhoisRequest(addr,(const Address *)0,0); + _sendWhoisRequest(tPtr,addr,(const Address *)0,0); } -void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer) +void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer) { { // cancel pending WHOIS since we now know this peer Mutex::Lock _l(_outstandingWhoisRequests_m); @@ -660,7 +661,7 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer) while (i) { RXQueueEntry *rq = &(_rxQueue[--i]); if ((rq->timestamp)&&(rq->complete)) { - if (rq->frag0.tryDecode(RR)) + if (rq->frag0.tryDecode(RR,tPtr)) rq->timestamp = 0; } } @@ -670,7 +671,7 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer) Mutex::Lock _l(_txQueue_m); for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) { if (txi->dest == peer->address()) { - if (_trySend(txi->packet,txi->encrypt)) + if (_trySend(tPtr,txi->packet,txi->encrypt)) _txQueue.erase(txi++); else ++txi; } else ++txi; @@ -678,7 +679,7 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer) } } -unsigned long Switch::doTimerTasks(uint64_t now) +unsigned long Switch::doTimerTasks(void *tPtr,uint64_t now) { unsigned long nextDelay = 0xffffffff; // ceiling delay, caller will cap to minimum @@ -695,7 +696,7 @@ unsigned long Switch::doTimerTasks(uint64_t now) _outstandingWhoisRequests.erase(*a); } else { r->lastSent = now; - r->peersConsulted[r->retries] = _sendWhoisRequest(*a,r->peersConsulted,(r->retries > 1) ? r->retries : 0); + r->peersConsulted[r->retries] = _sendWhoisRequest(tPtr,*a,r->peersConsulted,(r->retries > 1) ? r->retries : 0); TRACE("WHOIS %s (retry %u)",a->toString().c_str(),r->retries); ++r->retries; nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY); @@ -709,7 +710,7 @@ unsigned long Switch::doTimerTasks(uint64_t now) { // Time out TX queue packets that never got WHOIS lookups or other info. Mutex::Lock _l(_txQueue_m); for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) { - if (_trySend(txi->packet,txi->encrypt)) + if (_trySend(tPtr,txi->packet,txi->encrypt)) _txQueue.erase(txi++); else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) { TRACE("TX %s -> %s timed out",txi->packet.source().toString().c_str(),txi->packet.destination().toString().c_str()); @@ -743,19 +744,19 @@ bool Switch::_shouldUnite(const uint64_t now,const Address &source,const Address return false; } -Address Switch::_sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted) +Address Switch::_sendWhoisRequest(void *tPtr,const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted) { SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer(peersAlreadyConsulted,numPeersAlreadyConsulted,false)); if (upstream) { Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS); addr.appendTo(outp); RR->node->expectReplyTo(outp.packetId()); - send(outp,true); + send(tPtr,outp,true); } return Address(); } -bool Switch::_trySend(Packet &packet,bool encrypt) +bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt) { SharedPtr<Path> viaPath; const uint64_t now = RR->node->now(); @@ -769,7 +770,7 @@ bool Switch::_trySend(Packet &packet,bool encrypt) clusterMostRecentMemberId = RR->cluster->checkSendViaCluster(destination,clusterMostRecentTs,clusterPeerSecret); #endif - const SharedPtr<Peer> peer(RR->topology->getPeer(destination)); + const SharedPtr<Peer> peer(RR->topology->getPeer(tPtr,destination)); if (peer) { /* First get the best path, and if it's dead (and this is not a root) * we attempt to re-activate that path but this packet will flow @@ -784,7 +785,7 @@ bool Switch::_trySend(Packet &packet,bool encrypt) if ((clusterMostRecentMemberId < 0)||(viaPath->lastIn() > clusterMostRecentTs)) { #endif if ((now - viaPath->lastOut()) > std::max((now - viaPath->lastIn()) * 4,(uint64_t)ZT_PATH_MIN_REACTIVATE_INTERVAL)) { - peer->attemptToContactAt(viaPath->localAddress(),viaPath->address(),now,false,viaPath->nextOutgoingCounter()); + peer->attemptToContactAt(tPtr,viaPath->localAddress(),viaPath->address(),now,false,viaPath->nextOutgoingCounter()); viaPath->sent(now); } #ifdef ZT_ENABLE_CLUSTER @@ -801,7 +802,7 @@ bool Switch::_trySend(Packet &packet,bool encrypt) #else if (!viaPath) { #endif - peer->tryMemorizedPath(now); // periodically attempt memorized or statically defined paths, if any are known + peer->tryMemorizedPath(tPtr,now); // periodically attempt memorized or statically defined paths, if any are known const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer()); if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) ) { if (!(viaPath = peer->getBestPath(now,true))) @@ -816,7 +817,7 @@ bool Switch::_trySend(Packet &packet,bool encrypt) #ifdef ZT_ENABLE_CLUSTER if (clusterMostRecentMemberId < 0) { #else - requestWhois(destination); + requestWhois(tPtr,destination); return false; // if we are not in cluster mode, there is no way we can send without knowing the peer directly #endif #ifdef ZT_ENABLE_CLUSTER @@ -844,9 +845,9 @@ bool Switch::_trySend(Packet &packet,bool encrypt) #endif #ifdef ZT_ENABLE_CLUSTER - if ( ((viaPath)&&(viaPath->send(RR,packet.data(),chunkSize,now))) || ((clusterMostRecentMemberId >= 0)&&(RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,packet.data(),chunkSize))) ) { + if ( ((viaPath)&&(viaPath->send(RR,tPtr,packet.data(),chunkSize,now))) || ((clusterMostRecentMemberId >= 0)&&(RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,packet.data(),chunkSize))) ) { #else - if (viaPath->send(RR,packet.data(),chunkSize,now)) { + if (viaPath->send(RR,tPtr,packet.data(),chunkSize,now)) { #endif if (chunkSize < packet.size()) { // Too big for one packet, fragment the rest @@ -866,7 +867,7 @@ bool Switch::_trySend(Packet &packet,bool encrypt) else if (clusterMostRecentMemberId >= 0) RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,frag.data(),frag.size()); #else - viaPath->send(RR,frag.data(),frag.size(),now); + viaPath->send(RR,tPtr,frag.data(),frag.size(),now); #endif fragStart += chunkSize; remaining -= chunkSize; |
