diff options
| author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2014-09-24 13:45:58 -0700 |
|---|---|---|
| committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2014-09-24 13:45:58 -0700 |
| commit | 431476e2e4474c83013c2a1a6d80f1e815a2d37c (patch) | |
| tree | 2c1a6e2aabee108695d1f993e3085b0ed923f7e3 /node/Switch.cpp | |
| parent | 557801a09ef31c243add5f6d6de7a5f67af53ed2 (diff) | |
| download | infinitytier-431476e2e4474c83013c2a1a6d80f1e815a2d37c.tar.gz infinitytier-431476e2e4474c83013c2a1a6d80f1e815a2d37c.zip | |
Some more multicast algo work...
Diffstat (limited to 'node/Switch.cpp')
| -rw-r--r-- | node/Switch.cpp | 183 |
1 files changed, 91 insertions, 92 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp index 4e8cf4d8..f1c814ef 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -56,8 +56,7 @@ namespace ZeroTier { Switch::Switch(const RuntimeEnvironment *renv) : _r(renv), - _lastBeacon(0), - _multicastIdCounter((unsigned int)renv->prng->next32()) // start a random spot to minimize possible collisions on startup + _lastBeacon(0) { } @@ -478,96 +477,6 @@ void Switch::contact(const SharedPtr<Peer> &peer,const InetAddress &atAddr) _r->sm->whack(); } -unsigned long Switch::doTimerTasks() -{ - unsigned long nextDelay = ~((unsigned long)0); // big number, caller will cap return value - uint64_t now = Utils::now(); - - { - Mutex::Lock _l(_contactQueue_m); - for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) { - if (now >= qi->fireAtTime) { - if (!qi->peer->hasActiveDirectPath(now)) { - TRACE("deploying aggressive NAT-t against %s(%s)",qi->peer->address().toString().c_str(),qi->inaddr.toString().c_str()); - - /* Shotgun approach -- literally -- against symmetric NATs. Most of these - * either increment or decrement ports so this gets a good number. Also try - * the original port one more time for good measure, since sometimes it - * fails first time around. */ - int p = (int)qi->inaddr.port() - 2; - for(int k=0;k<5;++k) { - if ((p > 0)&&(p <= 0xffff)) { - qi->inaddr.setPort((unsigned int)p); - sendHELLO(qi->peer,qi->inaddr); - } - ++p; - } - } - - _contactQueue.erase(qi++); - } else { - nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now)); - ++qi; - } - } - } - - { - Mutex::Lock _l(_outstandingWhoisRequests_m); - for(std::map< Address,WhoisRequest >::iterator i(_outstandingWhoisRequests.begin());i!=_outstandingWhoisRequests.end();) { - unsigned long since = (unsigned long)(now - i->second.lastSent); - if (since >= ZT_WHOIS_RETRY_DELAY) { - if (i->second.retries >= ZT_MAX_WHOIS_RETRIES) { - TRACE("WHOIS %s timed out",i->first.toString().c_str()); - _outstandingWhoisRequests.erase(i++); - continue; - } else { - i->second.lastSent = now; - i->second.peersConsulted[i->second.retries] = _sendWhoisRequest(i->first,i->second.peersConsulted,i->second.retries); - ++i->second.retries; - TRACE("WHOIS %s (retry %u)",i->first.toString().c_str(),i->second.retries); - nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY); - } - } else nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since); - ++i; - } - } - - { - Mutex::Lock _l(_txQueue_m); - for(std::multimap< Address,TXQueueEntry >::iterator i(_txQueue.begin());i!=_txQueue.end();) { - if (_trySend(i->second.packet,i->second.encrypt)) - _txQueue.erase(i++); - else if ((now - i->second.creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) { - TRACE("TX %s -> %s timed out",i->second.packet.source().toString().c_str(),i->second.packet.destination().toString().c_str()); - _txQueue.erase(i++); - } else ++i; - } - } - - { - Mutex::Lock _l(_rxQueue_m); - for(std::list< SharedPtr<IncomingPacket> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) { - if ((now - (*i)->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) { - TRACE("RX %s -> %s timed out",(*i)->source().toString().c_str(),(*i)->destination().toString().c_str()); - _rxQueue.erase(i++); - } else ++i; - } - } - - { - Mutex::Lock _l(_defragQueue_m); - for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) { - if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) { - TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first); - _defragQueue.erase(i++); - } else ++i; - } - } - - return std::max(nextDelay,(unsigned long)10); // minimum delay -} - void Switch::announceMulticastGroups(const std::map< SharedPtr<Network>,std::set<MulticastGroup> > &allMemberships) { std::vector< SharedPtr<Peer> > directPeers; @@ -682,6 +591,96 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer) } } +unsigned long Switch::doTimerTasks() +{ + unsigned long nextDelay = ~((unsigned long)0); // big number, caller will cap return value + uint64_t now = Utils::now(); + + { + Mutex::Lock _l(_contactQueue_m); + for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) { + if (now >= qi->fireAtTime) { + if (!qi->peer->hasActiveDirectPath(now)) { + TRACE("deploying aggressive NAT-t against %s(%s)",qi->peer->address().toString().c_str(),qi->inaddr.toString().c_str()); + + /* Shotgun approach -- literally -- against symmetric NATs. Most of these + * either increment or decrement ports so this gets a good number. Also try + * the original port one more time for good measure, since sometimes it + * fails first time around. */ + int p = (int)qi->inaddr.port() - 2; + for(int k=0;k<5;++k) { + if ((p > 0)&&(p <= 0xffff)) { + qi->inaddr.setPort((unsigned int)p); + sendHELLO(qi->peer,qi->inaddr); + } + ++p; + } + } + + _contactQueue.erase(qi++); + } else { + nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now)); + ++qi; + } + } + } + + { + Mutex::Lock _l(_outstandingWhoisRequests_m); + for(std::map< Address,WhoisRequest >::iterator i(_outstandingWhoisRequests.begin());i!=_outstandingWhoisRequests.end();) { + unsigned long since = (unsigned long)(now - i->second.lastSent); + if (since >= ZT_WHOIS_RETRY_DELAY) { + if (i->second.retries >= ZT_MAX_WHOIS_RETRIES) { + TRACE("WHOIS %s timed out",i->first.toString().c_str()); + _outstandingWhoisRequests.erase(i++); + continue; + } else { + i->second.lastSent = now; + i->second.peersConsulted[i->second.retries] = _sendWhoisRequest(i->first,i->second.peersConsulted,i->second.retries); + ++i->second.retries; + TRACE("WHOIS %s (retry %u)",i->first.toString().c_str(),i->second.retries); + nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY); + } + } else nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since); + ++i; + } + } + + { + Mutex::Lock _l(_txQueue_m); + for(std::multimap< Address,TXQueueEntry >::iterator i(_txQueue.begin());i!=_txQueue.end();) { + if (_trySend(i->second.packet,i->second.encrypt)) + _txQueue.erase(i++); + else if ((now - i->second.creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) { + TRACE("TX %s -> %s timed out",i->second.packet.source().toString().c_str(),i->second.packet.destination().toString().c_str()); + _txQueue.erase(i++); + } else ++i; + } + } + + { + Mutex::Lock _l(_rxQueue_m); + for(std::list< SharedPtr<IncomingPacket> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) { + if ((now - (*i)->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) { + TRACE("RX %s -> %s timed out",(*i)->source().toString().c_str(),(*i)->destination().toString().c_str()); + _rxQueue.erase(i++); + } else ++i; + } + } + + { + Mutex::Lock _l(_defragQueue_m); + for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) { + if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) { + TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first); + _defragQueue.erase(i++); + } else ++i; + } + } + + return std::max(nextDelay,(unsigned long)10); // minimum delay +} + const char *Switch::etherTypeName(const unsigned int etherType) throw() { |
