diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-08-23 16:42:17 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-08-23 16:42:17 -0700 |
commit | 6ee201865b12f5b0f16208f6d696b1bf00197eaf (patch) | |
tree | 07d7c9c1cc24b532946269506374c01eb59478a1 | |
parent | b1d94c9f9324a31887dc6edc99ed58d4f9b187db (diff) | |
download | infinitytier-6ee201865b12f5b0f16208f6d696b1bf00197eaf.tar.gz infinitytier-6ee201865b12f5b0f16208f6d696b1bf00197eaf.zip |
Clean up WHOIS code.
-rw-r--r-- | node/Capability.cpp | 3 | ||||
-rw-r--r-- | node/CertificateOfMembership.cpp | 3 | ||||
-rw-r--r-- | node/CertificateOfOwnership.cpp | 3 | ||||
-rw-r--r-- | node/Constants.hpp | 9 | ||||
-rw-r--r-- | node/IncomingPacket.cpp | 4 | ||||
-rw-r--r-- | node/Node.cpp | 26 | ||||
-rw-r--r-- | node/Revocation.cpp | 3 | ||||
-rw-r--r-- | node/Switch.cpp | 132 | ||||
-rw-r--r-- | node/Switch.hpp | 24 | ||||
-rw-r--r-- | node/Tag.cpp | 3 | ||||
-rw-r--r-- | node/Topology.cpp | 35 | ||||
-rw-r--r-- | node/Topology.hpp | 14 |
12 files changed, 115 insertions, 144 deletions
diff --git a/node/Capability.cpp b/node/Capability.cpp index 0e02025a..47dca1fc 100644 --- a/node/Capability.cpp +++ b/node/Capability.cpp @@ -30,6 +30,7 @@ #include "Topology.hpp" #include "Switch.hpp" #include "Network.hpp" +#include "Node.hpp" namespace ZeroTier { @@ -59,7 +60,7 @@ int Capability::verify(const RuntimeEnvironment *RR,void *tPtr) const if (!id.verify(tmp.data(),tmp.size(),_custody[c].signature)) return -1; } else { - RR->sw->requestWhois(tPtr,_custody[c].from); + RR->sw->requestWhois(tPtr,RR->node->now(),_custody[c].from); return 1; } } diff --git a/node/CertificateOfMembership.cpp b/node/CertificateOfMembership.cpp index 100253e1..dedcccff 100644 --- a/node/CertificateOfMembership.cpp +++ b/node/CertificateOfMembership.cpp @@ -29,6 +29,7 @@ #include "Topology.hpp" #include "Switch.hpp" #include "Network.hpp" +#include "Node.hpp" namespace ZeroTier { @@ -223,7 +224,7 @@ int CertificateOfMembership::verify(const RuntimeEnvironment *RR,void *tPtr) con const Identity id(RR->topology->getIdentity(tPtr,_signedBy)); if (!id) { - RR->sw->requestWhois(tPtr,_signedBy); + RR->sw->requestWhois(tPtr,RR->node->now(),_signedBy); return 1; } diff --git a/node/CertificateOfOwnership.cpp b/node/CertificateOfOwnership.cpp index 31d0ae18..eeb0d99c 100644 --- a/node/CertificateOfOwnership.cpp +++ b/node/CertificateOfOwnership.cpp @@ -30,6 +30,7 @@ #include "Topology.hpp" #include "Switch.hpp" #include "Network.hpp" +#include "Node.hpp" namespace ZeroTier { @@ -39,7 +40,7 @@ int CertificateOfOwnership::verify(const RuntimeEnvironment *RR,void *tPtr) cons return -1; const Identity id(RR->topology->getIdentity(tPtr,_signedBy)); if (!id) { - RR->sw->requestWhois(tPtr,_signedBy); + RR->sw->requestWhois(tPtr,RR->node->now(),_signedBy); return 1; } try { diff --git a/node/Constants.hpp b/node/Constants.hpp index 27dce075..cda1af3b 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -229,19 +229,14 @@ #define ZT_WHOIS_RETRY_DELAY 500 /** - * Maximum identity WHOIS retries (each attempt tries consulting a different peer) - */ -#define ZT_MAX_WHOIS_RETRIES 5 - -/** * Transmit queue entry timeout */ -#define ZT_TRANSMIT_QUEUE_TIMEOUT (ZT_WHOIS_RETRY_DELAY * (ZT_MAX_WHOIS_RETRIES + 1)) +#define ZT_TRANSMIT_QUEUE_TIMEOUT 5000 /** * Receive queue entry timeout */ -#define ZT_RECEIVE_QUEUE_TIMEOUT (ZT_WHOIS_RETRY_DELAY * (ZT_MAX_WHOIS_RETRIES + 1)) +#define ZT_RECEIVE_QUEUE_TIMEOUT 5000 /** * Maximum latency to allow for OK(HELLO) before packet is discarded diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index 3788708d..685f2f09 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -115,7 +115,7 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr) case Packet::VERB_REMOTE_TRACE: return _doREMOTE_TRACE(RR,tPtr,peer); } } else { - RR->sw->requestWhois(tPtr,sourceAddress); + RR->sw->requestWhois(tPtr,RR->node->now(),sourceAddress); return false; } } catch ( ... ) { @@ -556,7 +556,7 @@ bool IncomingPacket::_doWHOIS(const RuntimeEnvironment *RR,void *tPtr,const Shar ++count; } else { // Request unknown WHOIS from upstream from us (if we have one) - RR->sw->requestWhois(tPtr,addr); + RR->sw->requestWhois(tPtr,RR->node->now(),addr); } } diff --git a/node/Node.cpp b/node/Node.cpp index 366ddbf0..09260172 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -249,6 +249,19 @@ ZT_ResultCode Node::processBackgroundTasks(void *tptr,uint64_t now,volatile uint try { _lastPingCheck = now; + // Do pings and keepalives + Hashtable< Address,std::vector<InetAddress> > upstreamsToContact; + RR->topology->getUpstreamsToContact(upstreamsToContact); + _PingPeersThatNeedPing pfunc(RR,tptr,upstreamsToContact,now); + RR->topology->eachPeer<_PingPeersThatNeedPing &>(pfunc); + + // Run WHOIS to create Peer for any upstreams we could not contact (including pending moon seeds) + Hashtable< Address,std::vector<InetAddress> >::Iterator i(upstreamsToContact); + Address *upstreamAddress = (Address *)0; + std::vector<InetAddress> *upstreamStableEndpoints = (std::vector<InetAddress> *)0; + while (i.next(upstreamAddress,upstreamStableEndpoints)) + RR->sw->requestWhois(tptr,now,*upstreamAddress); + // Get networks that need config without leaving mutex locked { std::vector< std::pair< SharedPtr<Network>,bool > > nwl; @@ -268,19 +281,6 @@ ZT_ResultCode Node::processBackgroundTasks(void *tptr,uint64_t now,volatile uint } } - // Do pings and keepalives - Hashtable< Address,std::vector<InetAddress> > upstreamsToContact; - RR->topology->getUpstreamsToContact(upstreamsToContact); - _PingPeersThatNeedPing pfunc(RR,tptr,upstreamsToContact,now); - RR->topology->eachPeer<_PingPeersThatNeedPing &>(pfunc); - - // Run WHOIS to create Peer for any upstreams we could not contact (including pending moon seeds) - Hashtable< Address,std::vector<InetAddress> >::Iterator i(upstreamsToContact); - Address *upstreamAddress = (Address *)0; - std::vector<InetAddress> *upstreamStableEndpoints = (std::vector<InetAddress> *)0; - while (i.next(upstreamAddress,upstreamStableEndpoints)) - RR->sw->requestWhois(tptr,*upstreamAddress); - // Update online status, post status change as event const bool oldOnline = _online; _online = (((now - pfunc.lastReceiveFromUpstream) < ZT_PEER_ACTIVITY_TIMEOUT)||(RR->topology->amRoot())); diff --git a/node/Revocation.cpp b/node/Revocation.cpp index 026058da..89a2db95 100644 --- a/node/Revocation.cpp +++ b/node/Revocation.cpp @@ -30,6 +30,7 @@ #include "Topology.hpp" #include "Switch.hpp" #include "Network.hpp" +#include "Node.hpp" namespace ZeroTier { @@ -39,7 +40,7 @@ int Revocation::verify(const RuntimeEnvironment *RR,void *tPtr) const return -1; const Identity id(RR->topology->getIdentity(tPtr,_signedBy)); if (!id) { - RR->sw->requestWhois(tPtr,_signedBy); + RR->sw->requestWhois(tPtr,RR->node->now(),_signedBy); return 1; } try { diff --git a/node/Switch.cpp b/node/Switch.cpp index 0d39bee9..8446602c 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -50,7 +50,6 @@ namespace ZeroTier { Switch::Switch(const RuntimeEnvironment *renv) : RR(renv), _lastBeaconResponse(0), - _outstandingWhoisRequests(32), _lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine { } @@ -229,8 +228,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre } } } else { - relayTo = RR->topology->getUpstreamPeer(&source,1,true); - if (relayTo) + relayTo = RR->topology->getUpstreamPeer(); + if ((relayTo)&&(relayTo->address() != source)) relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,true); } } @@ -553,33 +552,35 @@ void Switch::send(void *tPtr,Packet &packet,bool encrypt) } } -void Switch::requestWhois(void *tPtr,const Address &addr) +void Switch::requestWhois(void *tPtr,const uint64_t now,const Address &addr) { if (addr == RR->identity.address()) return; - bool inserted = false; + { - Mutex::Lock _l(_outstandingWhoisRequests_m); - WhoisRequest &r = _outstandingWhoisRequests[addr]; - if (r.lastSent) { - r.retries = 0; // reset retry count if entry already existed, but keep waiting and retry again after normal timeout - } else { - r.lastSent = RR->node->now(); - inserted = true; - } + Mutex::Lock _l(_lastSentWhoisRequest_m); + uint64_t &last = _lastSentWhoisRequest[addr]; + if ((now - last) < ZT_WHOIS_RETRY_DELAY) + return; + else last = now; + } + + const SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer()); + if (upstream) { + Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS); + addr.appendTo(outp); + RR->node->expectReplyTo(outp.packetId()); + send(tPtr,outp,true); } - if (inserted) - _sendWhoisRequest(tPtr,addr,(const Address *)0,0); } void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer) { - { // cancel pending WHOIS since we now know this peer - Mutex::Lock _l(_outstandingWhoisRequests_m); - _outstandingWhoisRequests.erase(peer->address()); + { + Mutex::Lock _l(_lastSentWhoisRequest_m); + _lastSentWhoisRequest.erase(peer->address()); } - // finish processing any packets waiting on peer's public key / identity const uint64_t now = RR->node->now(); for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) { RXQueueEntry *const rq = &(_rxQueue[ptr]); @@ -589,57 +590,61 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer) } } - { // finish sending any packets waiting on peer's public key / identity + { Mutex::Lock _l(_txQueue_m); for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) { if (txi->dest == peer->address()) { - if (_trySend(tPtr,txi->packet,txi->encrypt)) + if (_trySend(tPtr,txi->packet,txi->encrypt)) { _txQueue.erase(txi++); - else ++txi; - } else ++txi; - } - } -} - -unsigned long Switch::doTimerTasks(void *tPtr,uint64_t now) -{ - unsigned long nextDelay = 0xffffffff; // ceiling delay, caller will cap to minimum - - { // Retry outstanding WHOIS requests - Mutex::Lock _l(_outstandingWhoisRequests_m); - Hashtable< Address,WhoisRequest >::Iterator i(_outstandingWhoisRequests); - Address *a = (Address *)0; - WhoisRequest *r = (WhoisRequest *)0; - while (i.next(a,r)) { - const unsigned long since = (unsigned long)(now - r->lastSent); - if (since >= ZT_WHOIS_RETRY_DELAY) { - if (r->retries >= ZT_MAX_WHOIS_RETRIES) { - _outstandingWhoisRequests.erase(*a); } else { - r->lastSent = now; - r->peersConsulted[r->retries] = _sendWhoisRequest(tPtr,*a,r->peersConsulted,(r->retries > 1) ? r->retries : 0); - ++r->retries; - nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY); + ++txi; } } else { - nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since); + ++txi; } } } +} - { // Time out TX queue packets that never got WHOIS lookups or other info. +unsigned long Switch::doTimerTasks(void *tPtr,uint64_t now) +{ + const uint64_t timeSinceLastCheck = now - _lastCheckedQueues; + if (timeSinceLastCheck < ZT_WHOIS_RETRY_DELAY) + return (unsigned long)(ZT_WHOIS_RETRY_DELAY - timeSinceLastCheck); + _lastCheckedQueues = now; + + { Mutex::Lock _l(_txQueue_m); for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) { if (_trySend(tPtr,txi->packet,txi->encrypt)) { _txQueue.erase(txi++); } else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) { RR->t->txTimedOut(tPtr,txi->dest); - _txQueue.erase(txi++); - } else ++txi; + _txQueue.erase(txi); + ++txi; + } else if (!RR->topology->getPeer(tPtr,txi->dest)) { + requestWhois(tPtr,now,txi->dest); + ++txi; + } else { + ++txi; + } + } + } + + for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) { + RXQueueEntry *const rq = &(_rxQueue[ptr]); + if ((rq->timestamp)&&(rq->complete)) { + if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT)) { + rq->timestamp = 0; + } else { + const Address src(rq->frag0.source()); + if (!RR->topology->getPeer(tPtr,src)) + requestWhois(tPtr,now,src); + } } } - { // Remove really old last unite attempt entries to keep table size controlled + { Mutex::Lock _l(_lastUniteAttempt_m); Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt); _LastUniteKey *k = (_LastUniteKey *)0; @@ -650,7 +655,18 @@ unsigned long Switch::doTimerTasks(void *tPtr,uint64_t now) } } - return nextDelay; + { + Mutex::Lock _l(_lastSentWhoisRequest_m); + Hashtable< Address,uint64_t >::Iterator i(_lastSentWhoisRequest); + Address *a = (Address *)0; + uint64_t *ts = (uint64_t *)0; + while (i.next(a,ts)) { + if ((now - *ts) > (ZT_WHOIS_RETRY_DELAY * 2)) + _lastSentWhoisRequest.erase(*a); + } + } + + return ZT_WHOIS_RETRY_DELAY; } bool Switch::_shouldUnite(const uint64_t now,const Address &source,const Address &destination) @@ -664,18 +680,6 @@ bool Switch::_shouldUnite(const uint64_t now,const Address &source,const Address return false; } -Address Switch::_sendWhoisRequest(void *tPtr,const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted) -{ - SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer(peersAlreadyConsulted,numPeersAlreadyConsulted,false)); - if (upstream) { - Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS); - addr.appendTo(outp); - RR->node->expectReplyTo(outp.packetId()); - send(tPtr,outp,true); - } - return Address(); -} - bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt) { SharedPtr<Path> viaPath; @@ -709,7 +713,7 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt) } } } else { - requestWhois(tPtr,destination); + requestWhois(tPtr,now,destination); return false; // if we are not in cluster mode, there is no way we can send without knowing the peer directly } diff --git a/node/Switch.hpp b/node/Switch.hpp index 88415541..2420607d 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -111,9 +111,10 @@ public: * Request WHOIS on a given address * * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call + * @param now Current time * @param addr Address to look up */ - void requestWhois(void *tPtr,const Address &addr); + void requestWhois(void *tPtr,const uint64_t now,const Address &addr); /** * Run any processes that are waiting for this peer's identity @@ -139,34 +140,27 @@ public: private: bool _shouldUnite(const uint64_t now,const Address &source,const Address &destination); - Address _sendWhoisRequest(void *tPtr,const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted); bool _trySend(void *tPtr,Packet &packet,bool encrypt); // packet is modified if return is true const RuntimeEnvironment *const RR; uint64_t _lastBeaconResponse; + uint64_t _lastCheckedQueues; - // Outstanding WHOIS requests and how many retries they've undergone - struct WhoisRequest - { - WhoisRequest() : lastSent(0),retries(0) {} - uint64_t lastSent; - Address peersConsulted[ZT_MAX_WHOIS_RETRIES]; // by retry - unsigned int retries; // 0..ZT_MAX_WHOIS_RETRIES - }; - Hashtable< Address,WhoisRequest > _outstandingWhoisRequests; - Mutex _outstandingWhoisRequests_m; + // Time we last sent a WHOIS request for each address + Hashtable< Address,uint64_t > _lastSentWhoisRequest; + Mutex _lastSentWhoisRequest_m; // Packets waiting for WHOIS replies or other decode info or missing fragments struct RXQueueEntry { RXQueueEntry() : timestamp(0) {} - uint64_t timestamp; // 0 if entry is not in use - uint64_t packetId; + volatile uint64_t timestamp; // 0 if entry is not in use + volatile uint64_t packetId; IncomingPacket frag0; // head of packet Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1]; // later fragments (if any) unsigned int totalFragments; // 0 if only frag0 received, waiting for frags uint32_t haveFragments; // bit mask, LSB to MSB - bool complete; // if true, packet is complete + volatile bool complete; // if true, packet is complete }; RXQueueEntry _rxQueue[ZT_RX_QUEUE_SIZE]; AtomicCounter _rxQueuePtr; diff --git a/node/Tag.cpp b/node/Tag.cpp index 39b17f2a..bde41a70 100644 --- a/node/Tag.cpp +++ b/node/Tag.cpp @@ -30,6 +30,7 @@ #include "Topology.hpp" #include "Switch.hpp" #include "Network.hpp" +#include "Node.hpp" namespace ZeroTier { @@ -39,7 +40,7 @@ int Tag::verify(const RuntimeEnvironment *RR,void *tPtr) const return -1; const Identity id(RR->topology->getIdentity(tPtr,_signedBy)); if (!id) { - RR->sw->requestWhois(tPtr,_signedBy); + RR->sw->requestWhois(tPtr,RR->node->now(),_signedBy); return 1; } try { diff --git a/node/Topology.cpp b/node/Topology.cpp index aeca59a7..ee5d969d 100644 --- a/node/Topology.cpp +++ b/node/Topology.cpp @@ -154,13 +154,11 @@ Identity Topology::getIdentity(void *tPtr,const Address &zta) return Identity(); } -SharedPtr<Peer> Topology::getUpstreamPeer(const Address *avoid,unsigned int avoidCount,bool strictAvoid) +SharedPtr<Peer> Topology::getUpstreamPeer() { const uint64_t now = RR->node->now(); - unsigned int bestQualityOverall = ~((unsigned int)0); - unsigned int bestQualityNotAvoid = ~((unsigned int)0); - const SharedPtr<Peer> *bestOverall = (const SharedPtr<Peer> *)0; - const SharedPtr<Peer> *bestNotAvoid = (const SharedPtr<Peer> *)0; + unsigned int bestq = ~((unsigned int)0); + const SharedPtr<Peer> *best = (const SharedPtr<Peer> *)0; Mutex::Lock _l1(_peers_m); Mutex::Lock _l2(_upstreams_m); @@ -168,32 +166,17 @@ SharedPtr<Peer> Topology::getUpstreamPeer(const Address *avoid,unsigned int avoi for(std::vector<Address>::const_iterator a(_upstreamAddresses.begin());a!=_upstreamAddresses.end();++a) { const SharedPtr<Peer> *p = _peers.get(*a); if (p) { - bool avoiding = false; - for(unsigned int i=0;i<avoidCount;++i) { - if (avoid[i] == (*p)->address()) { - avoiding = true; - break; - } - } const unsigned int q = (*p)->relayQuality(now); - if (q <= bestQualityOverall) { - bestQualityOverall = q; - bestOverall = &(*p); - } - if ((!avoiding)&&(q <= bestQualityNotAvoid)) { - bestQualityNotAvoid = q; - bestNotAvoid = &(*p); + if (q <= bestq) { + bestq = q; + best = p; } } } - if (bestNotAvoid) { - return *bestNotAvoid; - } else if ((!strictAvoid)&&(bestOverall)) { - return *bestOverall; - } - - return SharedPtr<Peer>(); + if (!best) + return SharedPtr<Peer>(); + return *best; } bool Topology::isUpstream(const Identity &id) const diff --git a/node/Topology.hpp b/node/Topology.hpp index 04dfb1cc..43921896 100644 --- a/node/Topology.hpp +++ b/node/Topology.hpp @@ -127,19 +127,9 @@ public: /** * Get the current best upstream peer * - * @return Root server with lowest latency or NULL if none + * @return Upstream or NULL if none available */ - inline SharedPtr<Peer> getUpstreamPeer() { return getUpstreamPeer((const Address *)0,0,false); } - - /** - * Get the current best upstream peer, avoiding those in the supplied avoid list - * - * @param avoid Nodes to avoid - * @param avoidCount Number of nodes to avoid - * @param strictAvoid If false, consider avoided root servers anyway if no non-avoid root servers are available - * @return Root server or NULL if none available - */ - SharedPtr<Peer> getUpstreamPeer(const Address *avoid,unsigned int avoidCount,bool strictAvoid); + SharedPtr<Peer> getUpstreamPeer(); /** * @param id Identity to check |