diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2014-09-24 13:45:58 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2014-09-24 13:45:58 -0700 |
commit | 431476e2e4474c83013c2a1a6d80f1e815a2d37c (patch) | |
tree | 2c1a6e2aabee108695d1f993e3085b0ed923f7e3 | |
parent | 557801a09ef31c243add5f6d6de7a5f67af53ed2 (diff) | |
download | infinitytier-431476e2e4474c83013c2a1a6d80f1e815a2d37c.tar.gz infinitytier-431476e2e4474c83013c2a1a6d80f1e815a2d37c.zip |
Some more multicast algo work...
-rw-r--r-- | node/MulticastTopology.cpp | 19 | ||||
-rw-r--r-- | node/MulticastTopology.hpp | 24 | ||||
-rw-r--r-- | node/OutboundMulticast.hpp | 19 | ||||
-rw-r--r-- | node/Packet.hpp | 1 | ||||
-rw-r--r-- | node/Switch.cpp | 183 | ||||
-rw-r--r-- | node/Switch.hpp | 52 |
6 files changed, 165 insertions, 133 deletions
diff --git a/node/MulticastTopology.cpp b/node/MulticastTopology.cpp index e4b1066d..1db1aa7a 100644 --- a/node/MulticastTopology.cpp +++ b/node/MulticastTopology.cpp @@ -41,19 +41,8 @@ MulticastTopology::~MulticastTopology() { } -void MulticastTopology::add(const MulticastGroup &mg,const Address &member,const Address &learnedFrom) +void MulticastTopology::add(const MulticastGroup &mg,const Address &learnedFrom,const Address &member) { - Mutex::Lock _l(_groups_m); - std::vector<MulticastGroupMember> &mv = _groups[mg].members; - for(std::vector<MulticastGroupMember>::iterator m(mv.begin());m!=mv.end();++m) { - if (m->address == member) { - if (m->learnedFrom) // once a member has been seen directly, we keep its status as direct - m->learnedFrom = learnedFrom; - m->timestamp = Utils::now(); - return; - } - } - mv.push_back(MulticastGroupMember(member,learnedFrom,Utils::now())); } void MulticastTopology::erase(const MulticastGroup &mg,const Address &member) @@ -72,6 +61,12 @@ void MulticastTopology::erase(const MulticastGroup &mg,const Address &member) } } +void send(uint64_t nwid,uint64_t now,const Address &self,const MulticastGroup &mg,const MAC &from,unsigned int etherType,const void *data,unsigned int len) +{ + Mutex::Lock _l(_groups_m); + std::map< MulticastGroup,MulticastGroupStatus >::iterator r(_groups.find(mg)); +} + unsigned int MulticastTopology::shouldGather(const MulticastGroup &mg,uint64_t now,unsigned int limit,bool updateLastGatheredTimeOnNonzeroReturn) { Mutex::Lock _l(_groups_m); diff --git a/node/MulticastTopology.hpp b/node/MulticastTopology.hpp index 8a68f2b2..0b330df3 100644 --- a/node/MulticastTopology.hpp +++ b/node/MulticastTopology.hpp @@ -33,10 +33,14 @@ #include <map> #include <vector> +#include <list> #include "Constants.hpp" #include "Address.hpp" +#include "MAC.hpp" #include "MulticastGroup.hpp" +#include "OutboundMulticast.hpp" +#include "Switch.hpp" #include "Utils.hpp" #include "Mutex.hpp" @@ -70,6 +74,7 @@ private: uint64_t lastGatheredMembers; // time we last gathered members std::vector<MulticastGroupMember> members; // members of this group + std::list<OutboundMulticast> txQueue; // pending outbound multicasts }; public: @@ -80,10 +85,10 @@ public: * Add or update a member in a multicast group * * @param mg Multicast group - * @param member Member to add/update * @param learnedFrom Address from which we learned this member or NULL/0 Address if direct + * @param member New member address */ - void add(const MulticastGroup &mg,const Address &member,const Address &learnedFrom); + void add(const MulticastGroup &mg,const Address &learnedFrom,const Address &member); /** * Erase a member from a multicast group (if present) @@ -94,6 +99,21 @@ public: void erase(const MulticastGroup &mg,const Address &member); /** + * Send a multicast + * + * @param nwid Network ID + * @param now Current time + * @param sw Switch to use for sending packets + * @param self This node's address + * @param mg Multicast group + * @param from Source Ethernet MAC address + * @param etherType Ethernet frame type + * @param data Packet data + * @param len Length of packet data + */ + void send(uint64_t nwid,uint64_t now,const Switch &sw,const Address &self,const MulticastGroup &mg,const MAC &from,unsigned int etherType,const void *data,unsigned int len); + + /** * @param mg Multicast group * @return Tuple of: time we last gathered members (or 0 for never) and number of known members */ diff --git a/node/OutboundMulticast.hpp b/node/OutboundMulticast.hpp index f8338b93..3ef9532b 100644 --- a/node/OutboundMulticast.hpp +++ b/node/OutboundMulticast.hpp @@ -51,6 +51,13 @@ class OutboundMulticast { public: /** + * Create an uninitialized outbound multicast + * + * It must be initialized with init(). + */ + OutboundMulticast() {} + + /** * Initialize outbound multicast * * @param timestamp Creation time @@ -63,13 +70,13 @@ public: * @param len Length of data * @throws std::out_of_range Data too large to fit in a MULTICAST_FRAME */ - OutboundMulticast(uint64_t timestamp,const Address &self,uint64_t nwid,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len) : - _timestamp(timestamp), - _nwid(nwid), - _source(src), - _destination(dest), - _etherType(etherType) + inline void init(uint64_t timestamp,const Address &self,uint64_t nwid,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len) { + _timestamp = timestamp; + _nwid = nwid; + _source = src; + _destination = dest; + _etherType = etherType; _packet.setSource(self); _packet.setVerb(Packet::VERB_MULTICAST_FRAME); _packet.append((char)0); diff --git a/node/Packet.hpp b/node/Packet.hpp index aedc9e4c..214479d6 100644 --- a/node/Packet.hpp +++ b/node/Packet.hpp @@ -694,6 +694,7 @@ public: VERB_MULTICAST_GATHER = 13, /* Multicast frame: + * <[8] 64-bit network ID> * <[1] flags (currently unused, must be 0)> * <[4] 32-bit multicast ADI (note that this is out of order here -- it precedes MAC)> * <[6] destination MAC or all zero for destination node> diff --git a/node/Switch.cpp b/node/Switch.cpp index 4e8cf4d8..f1c814ef 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -56,8 +56,7 @@ namespace ZeroTier { Switch::Switch(const RuntimeEnvironment *renv) : _r(renv), - _lastBeacon(0), - _multicastIdCounter((unsigned int)renv->prng->next32()) // start a random spot to minimize possible collisions on startup + _lastBeacon(0) { } @@ -478,96 +477,6 @@ void Switch::contact(const SharedPtr<Peer> &peer,const InetAddress &atAddr) _r->sm->whack(); } -unsigned long Switch::doTimerTasks() -{ - unsigned long nextDelay = ~((unsigned long)0); // big number, caller will cap return value - uint64_t now = Utils::now(); - - { - Mutex::Lock _l(_contactQueue_m); - for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) { - if (now >= qi->fireAtTime) { - if (!qi->peer->hasActiveDirectPath(now)) { - TRACE("deploying aggressive NAT-t against %s(%s)",qi->peer->address().toString().c_str(),qi->inaddr.toString().c_str()); - - /* Shotgun approach -- literally -- against symmetric NATs. Most of these - * either increment or decrement ports so this gets a good number. Also try - * the original port one more time for good measure, since sometimes it - * fails first time around. */ - int p = (int)qi->inaddr.port() - 2; - for(int k=0;k<5;++k) { - if ((p > 0)&&(p <= 0xffff)) { - qi->inaddr.setPort((unsigned int)p); - sendHELLO(qi->peer,qi->inaddr); - } - ++p; - } - } - - _contactQueue.erase(qi++); - } else { - nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now)); - ++qi; - } - } - } - - { - Mutex::Lock _l(_outstandingWhoisRequests_m); - for(std::map< Address,WhoisRequest >::iterator i(_outstandingWhoisRequests.begin());i!=_outstandingWhoisRequests.end();) { - unsigned long since = (unsigned long)(now - i->second.lastSent); - if (since >= ZT_WHOIS_RETRY_DELAY) { - if (i->second.retries >= ZT_MAX_WHOIS_RETRIES) { - TRACE("WHOIS %s timed out",i->first.toString().c_str()); - _outstandingWhoisRequests.erase(i++); - continue; - } else { - i->second.lastSent = now; - i->second.peersConsulted[i->second.retries] = _sendWhoisRequest(i->first,i->second.peersConsulted,i->second.retries); - ++i->second.retries; - TRACE("WHOIS %s (retry %u)",i->first.toString().c_str(),i->second.retries); - nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY); - } - } else nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since); - ++i; - } - } - - { - Mutex::Lock _l(_txQueue_m); - for(std::multimap< Address,TXQueueEntry >::iterator i(_txQueue.begin());i!=_txQueue.end();) { - if (_trySend(i->second.packet,i->second.encrypt)) - _txQueue.erase(i++); - else if ((now - i->second.creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) { - TRACE("TX %s -> %s timed out",i->second.packet.source().toString().c_str(),i->second.packet.destination().toString().c_str()); - _txQueue.erase(i++); - } else ++i; - } - } - - { - Mutex::Lock _l(_rxQueue_m); - for(std::list< SharedPtr<IncomingPacket> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) { - if ((now - (*i)->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) { - TRACE("RX %s -> %s timed out",(*i)->source().toString().c_str(),(*i)->destination().toString().c_str()); - _rxQueue.erase(i++); - } else ++i; - } - } - - { - Mutex::Lock _l(_defragQueue_m); - for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) { - if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) { - TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first); - _defragQueue.erase(i++); - } else ++i; - } - } - - return std::max(nextDelay,(unsigned long)10); // minimum delay -} - void Switch::announceMulticastGroups(const std::map< SharedPtr<Network>,std::set<MulticastGroup> > &allMemberships) { std::vector< SharedPtr<Peer> > directPeers; @@ -682,6 +591,96 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer) } } +unsigned long Switch::doTimerTasks() +{ + unsigned long nextDelay = ~((unsigned long)0); // big number, caller will cap return value + uint64_t now = Utils::now(); + + { + Mutex::Lock _l(_contactQueue_m); + for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) { + if (now >= qi->fireAtTime) { + if (!qi->peer->hasActiveDirectPath(now)) { + TRACE("deploying aggressive NAT-t against %s(%s)",qi->peer->address().toString().c_str(),qi->inaddr.toString().c_str()); + + /* Shotgun approach -- literally -- against symmetric NATs. Most of these + * either increment or decrement ports so this gets a good number. Also try + * the original port one more time for good measure, since sometimes it + * fails first time around. */ + int p = (int)qi->inaddr.port() - 2; + for(int k=0;k<5;++k) { + if ((p > 0)&&(p <= 0xffff)) { + qi->inaddr.setPort((unsigned int)p); + sendHELLO(qi->peer,qi->inaddr); + } + ++p; + } + } + + _contactQueue.erase(qi++); + } else { + nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now)); + ++qi; + } + } + } + + { + Mutex::Lock _l(_outstandingWhoisRequests_m); + for(std::map< Address,WhoisRequest >::iterator i(_outstandingWhoisRequests.begin());i!=_outstandingWhoisRequests.end();) { + unsigned long since = (unsigned long)(now - i->second.lastSent); + if (since >= ZT_WHOIS_RETRY_DELAY) { + if (i->second.retries >= ZT_MAX_WHOIS_RETRIES) { + TRACE("WHOIS %s timed out",i->first.toString().c_str()); + _outstandingWhoisRequests.erase(i++); + continue; + } else { + i->second.lastSent = now; + i->second.peersConsulted[i->second.retries] = _sendWhoisRequest(i->first,i->second.peersConsulted,i->second.retries); + ++i->second.retries; + TRACE("WHOIS %s (retry %u)",i->first.toString().c_str(),i->second.retries); + nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY); + } + } else nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since); + ++i; + } + } + + { + Mutex::Lock _l(_txQueue_m); + for(std::multimap< Address,TXQueueEntry >::iterator i(_txQueue.begin());i!=_txQueue.end();) { + if (_trySend(i->second.packet,i->second.encrypt)) + _txQueue.erase(i++); + else if ((now - i->second.creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) { + TRACE("TX %s -> %s timed out",i->second.packet.source().toString().c_str(),i->second.packet.destination().toString().c_str()); + _txQueue.erase(i++); + } else ++i; + } + } + + { + Mutex::Lock _l(_rxQueue_m); + for(std::list< SharedPtr<IncomingPacket> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) { + if ((now - (*i)->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) { + TRACE("RX %s -> %s timed out",(*i)->source().toString().c_str(),(*i)->destination().toString().c_str()); + _rxQueue.erase(i++); + } else ++i; + } + } + + { + Mutex::Lock _l(_defragQueue_m); + for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) { + if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) { + TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first); + _defragQueue.erase(i++); + } else ++i; + } + } + + return std::max(nextDelay,(unsigned long)10); // minimum delay +} + const char *Switch::etherTypeName(const unsigned int etherType) throw() { diff --git a/node/Switch.hpp b/node/Switch.hpp index c7f80606..a1c2f752 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -67,6 +67,11 @@ class Peer; /** * Core of the distributed Ethernet switch and protocol implementation + * + * This class is perhaps a bit misnamed, but it's basically where everything + * meets. Transport-layer ZT packets come in here, as do virtual network + * packets from tap devices, and this sends them where they need to go and + * wraps/unwraps accordingly. It also handles queues and timeouts and such. */ class Switch : NonCopyable { @@ -161,13 +166,6 @@ public: void contact(const SharedPtr<Peer> &peer,const InetAddress &atAddr); /** - * Perform retries and other periodic timer tasks - * - * @return Number of milliseconds until doTimerTasks() should be run again - */ - unsigned long doTimerTasks(); - - /** * Announce multicast group memberships * * This announces all the groups for all the networks in the supplied map to @@ -204,7 +202,7 @@ public: void cancelWhoisRequest(const Address &addr); /** - * Run any processes that are waiting for this peer + * Run any processes that are waiting for this peer's identity * * Called when we learn of a peer's identity from HELLO, OK(WHOIS), etc. * @@ -213,6 +211,13 @@ public: void doAnythingWaitingForPeer(const SharedPtr<Peer> &peer); /** + * Perform retries and other periodic timer tasks + * + * @return Number of milliseconds until doTimerTasks() should be run again + */ + unsigned long doTimerTasks(); + + /** * @param etherType Ethernet type ID * @return Human-readable name */ @@ -235,8 +240,8 @@ private: const RuntimeEnvironment *const _r; volatile uint64_t _lastBeacon; - volatile unsigned int _multicastIdCounter; + // Outsanding WHOIS requests and how many retries they've undergone struct WhoisRequest { uint64_t lastSent; @@ -246,9 +251,23 @@ private: std::map< Address,WhoisRequest > _outstandingWhoisRequests; Mutex _outstandingWhoisRequests_m; - std::list< SharedPtr<IncomingPacket> > _rxQueue; + // Packet defragmentation queue -- comes before RX queue in path + struct DefragQueueEntry + { + uint64_t creationTime; + SharedPtr<IncomingPacket> frag0; + Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1]; + unsigned int totalFragments; // 0 if only frag0 received, waiting for frags + uint32_t haveFragments; // bit mask, LSB to MSB + }; + std::map< uint64_t,DefragQueueEntry > _defragQueue; + Mutex _defragQueue_m; + + // ZeroTier-layer RX queue of incoming packets in the process of being decoded + std::vector< SharedPtr<IncomingPacket> > _rxQueue; Mutex _rxQueue_m; + // ZeroTier-layer TX queue by destination ZeroTier address struct TXQueueEntry { TXQueueEntry() {} @@ -264,20 +283,11 @@ private: std::multimap< Address,TXQueueEntry > _txQueue; Mutex _txQueue_m; - struct DefragQueueEntry - { - uint64_t creationTime; - SharedPtr<IncomingPacket> frag0; - Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1]; - unsigned int totalFragments; // 0 if only frag0 received, waiting for frags - uint32_t haveFragments; // bit mask, LSB to MSB - }; - std::map< uint64_t,DefragQueueEntry > _defragQueue; - Mutex _defragQueue_m; - + // Tracks sending of VERB_RENDEZVOUS to relaying peers std::map< Array< Address,2 >,uint64_t > _lastUniteAttempt; // key is always sorted in ascending order, for set-like behavior Mutex _lastUniteAttempt_m; + // Active attempts to contact remote peers, including state of multi-phase NAT traversal struct ContactQueueEntry { ContactQueueEntry() {} |