summaryrefslogtreecommitdiff
path: root/node/Switch.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/Switch.cpp')
-rw-r--r--node/Switch.cpp160
1 files changed, 86 insertions, 74 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 053f793e..f46b3e73 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -50,7 +50,6 @@ namespace ZeroTier {
Switch::Switch(const RuntimeEnvironment *renv) :
RR(renv),
_lastBeaconResponse(0),
- _outstandingWhoisRequests(32),
_lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine
{
}
@@ -229,8 +228,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
}
}
} else {
- relayTo = RR->topology->getUpstreamPeer(&source,1,true);
- if (relayTo)
+ relayTo = RR->topology->getUpstreamPeer();
+ if ((relayTo)&&(relayTo->address() != source))
relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,true);
}
}
@@ -545,41 +544,48 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
void Switch::send(void *tPtr,Packet &packet,bool encrypt)
{
- if (packet.destination() == RR->identity.address())
+ const Address dest(packet.destination());
+ if (dest == RR->identity.address())
return;
if (!_trySend(tPtr,packet,encrypt)) {
- Mutex::Lock _l(_txQueue_m);
- _txQueue.push_back(TXQueueEntry(packet.destination(),RR->node->now(),packet,encrypt));
+ {
+ Mutex::Lock _l(_txQueue_m);
+ _txQueue.push_back(TXQueueEntry(dest,RR->node->now(),packet,encrypt));
+ }
+ if (!RR->topology->getPeer(tPtr,dest))
+ requestWhois(tPtr,RR->node->now(),dest);
}
}
-void Switch::requestWhois(void *tPtr,const Address &addr)
+void Switch::requestWhois(void *tPtr,const uint64_t now,const Address &addr)
{
if (addr == RR->identity.address())
return;
- bool inserted = false;
+
{
- Mutex::Lock _l(_outstandingWhoisRequests_m);
- WhoisRequest &r = _outstandingWhoisRequests[addr];
- if (r.lastSent) {
- r.retries = 0; // reset retry count if entry already existed, but keep waiting and retry again after normal timeout
- } else {
- r.lastSent = RR->node->now();
- inserted = true;
- }
+ Mutex::Lock _l(_lastSentWhoisRequest_m);
+ uint64_t &last = _lastSentWhoisRequest[addr];
+ if ((now - last) < ZT_WHOIS_RETRY_DELAY)
+ return;
+ else last = now;
+ }
+
+ const SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer());
+ if (upstream) {
+ Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS);
+ addr.appendTo(outp);
+ RR->node->expectReplyTo(outp.packetId());
+ send(tPtr,outp,true);
}
- if (inserted)
- _sendWhoisRequest(tPtr,addr,(const Address *)0,0);
}
void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
{
- { // cancel pending WHOIS since we now know this peer
- Mutex::Lock _l(_outstandingWhoisRequests_m);
- _outstandingWhoisRequests.erase(peer->address());
+ {
+ Mutex::Lock _l(_lastSentWhoisRequest_m);
+ _lastSentWhoisRequest.erase(peer->address());
}
- // finish processing any packets waiting on peer's public key / identity
const uint64_t now = RR->node->now();
for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
RXQueueEntry *const rq = &(_rxQueue[ptr]);
@@ -589,57 +595,62 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
}
}
- { // finish sending any packets waiting on peer's public key / identity
+ {
Mutex::Lock _l(_txQueue_m);
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
if (txi->dest == peer->address()) {
- if (_trySend(tPtr,txi->packet,txi->encrypt))
+ if (_trySend(tPtr,txi->packet,txi->encrypt)) {
_txQueue.erase(txi++);
- else ++txi;
- } else ++txi;
- }
- }
-}
-
-unsigned long Switch::doTimerTasks(void *tPtr,uint64_t now)
-{
- unsigned long nextDelay = 0xffffffff; // ceiling delay, caller will cap to minimum
-
- { // Retry outstanding WHOIS requests
- Mutex::Lock _l(_outstandingWhoisRequests_m);
- Hashtable< Address,WhoisRequest >::Iterator i(_outstandingWhoisRequests);
- Address *a = (Address *)0;
- WhoisRequest *r = (WhoisRequest *)0;
- while (i.next(a,r)) {
- const unsigned long since = (unsigned long)(now - r->lastSent);
- if (since >= ZT_WHOIS_RETRY_DELAY) {
- if (r->retries >= ZT_MAX_WHOIS_RETRIES) {
- _outstandingWhoisRequests.erase(*a);
} else {
- r->lastSent = now;
- r->peersConsulted[r->retries] = _sendWhoisRequest(tPtr,*a,r->peersConsulted,(r->retries > 1) ? r->retries : 0);
- ++r->retries;
- nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY);
+ ++txi;
}
} else {
- nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since);
+ ++txi;
}
}
}
+}
+
+unsigned long Switch::doTimerTasks(void *tPtr,uint64_t now)
+{
+ const uint64_t timeSinceLastCheck = now - _lastCheckedQueues;
+ if (timeSinceLastCheck < ZT_WHOIS_RETRY_DELAY)
+ return (unsigned long)(ZT_WHOIS_RETRY_DELAY - timeSinceLastCheck);
+ _lastCheckedQueues = now;
- { // Time out TX queue packets that never got WHOIS lookups or other info.
+ std::vector<Address> needWhois;
+ {
Mutex::Lock _l(_txQueue_m);
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
- if (_trySend(tPtr,txi->packet,txi->encrypt))
+ if (_trySend(tPtr,txi->packet,txi->encrypt)) {
_txQueue.erase(txi++);
- else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
+ } else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
RR->t->txTimedOut(tPtr,txi->dest);
_txQueue.erase(txi++);
- } else ++txi;
+ } else {
+ if (!RR->topology->getPeer(tPtr,txi->dest))
+ needWhois.push_back(txi->dest);
+ ++txi;
+ }
+ }
+ }
+ for(std::vector<Address>::const_iterator i(needWhois.begin());i!=needWhois.end();++i)
+ requestWhois(tPtr,now,*i);
+
+ for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
+ RXQueueEntry *const rq = &(_rxQueue[ptr]);
+ if ((rq->timestamp)&&(rq->complete)) {
+ if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT)) {
+ rq->timestamp = 0;
+ } else {
+ const Address src(rq->frag0.source());
+ if (!RR->topology->getPeer(tPtr,src))
+ requestWhois(tPtr,now,src);
+ }
}
}
- { // Remove really old last unite attempt entries to keep table size controlled
+ {
Mutex::Lock _l(_lastUniteAttempt_m);
Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt);
_LastUniteKey *k = (_LastUniteKey *)0;
@@ -650,7 +661,18 @@ unsigned long Switch::doTimerTasks(void *tPtr,uint64_t now)
}
}
- return nextDelay;
+ {
+ Mutex::Lock _l(_lastSentWhoisRequest_m);
+ Hashtable< Address,uint64_t >::Iterator i(_lastSentWhoisRequest);
+ Address *a = (Address *)0;
+ uint64_t *ts = (uint64_t *)0;
+ while (i.next(a,ts)) {
+ if ((now - *ts) > (ZT_WHOIS_RETRY_DELAY * 2))
+ _lastSentWhoisRequest.erase(*a);
+ }
+ }
+
+ return ZT_WHOIS_RETRY_DELAY;
}
bool Switch::_shouldUnite(const uint64_t now,const Address &source,const Address &destination)
@@ -664,18 +686,6 @@ bool Switch::_shouldUnite(const uint64_t now,const Address &source,const Address
return false;
}
-Address Switch::_sendWhoisRequest(void *tPtr,const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted)
-{
- SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer(peersAlreadyConsulted,numPeersAlreadyConsulted,false));
- if (upstream) {
- Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS);
- addr.appendTo(outp);
- RR->node->expectReplyTo(outp.packetId());
- send(tPtr,outp,true);
- }
- return Address();
-}
-
bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
{
SharedPtr<Path> viaPath;
@@ -709,14 +719,16 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
}
}
} else {
- requestWhois(tPtr,destination);
- return false; // if we are not in cluster mode, there is no way we can send without knowing the peer directly
+ return false;
}
- unsigned int chunkSize = std::min(packet.size(),(unsigned int)ZT_UDP_DEFAULT_PAYLOAD_MTU);
+ unsigned int mtu = ZT_DEFAULT_PHYSMTU;
+ uint64_t trustedPathId = 0;
+ RR->topology->getOutboundPathInfo(viaPath->address(),mtu,trustedPathId);
+
+ unsigned int chunkSize = std::min(packet.size(),mtu);
packet.setFragmented(chunkSize < packet.size());
- const uint64_t trustedPathId = RR->topology->getOutboundPathTrust(viaPath->address());
if (trustedPathId) {
packet.setTrusted(trustedPathId);
} else {
@@ -728,13 +740,13 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
// Too big for one packet, fragment the rest
unsigned int fragStart = chunkSize;
unsigned int remaining = packet.size() - chunkSize;
- unsigned int fragsRemaining = (remaining / (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH));
- if ((fragsRemaining * (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining)
+ unsigned int fragsRemaining = (remaining / (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
+ if ((fragsRemaining * (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining)
++fragsRemaining;
const unsigned int totalFragments = fragsRemaining + 1;
for(unsigned int fno=1;fno<totalFragments;++fno) {
- chunkSize = std::min(remaining,(unsigned int)(ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH));
+ chunkSize = std::min(remaining,(unsigned int)(mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
Packet::Fragment frag(packet,fragStart,chunkSize,fno,totalFragments);
viaPath->send(RR,tPtr,frag.data(),frag.size(),now);
fragStart += chunkSize;