diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2014-10-01 14:05:25 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2014-10-01 14:05:25 -0700 |
commit | e1882b614b48bf0c2a68223b3ee2fb338dbcb8f6 (patch) | |
tree | 2447fa648e910c6ef9aa8d4b3147606b2bc18e91 | |
parent | ae082c3cb8a1ba7f8efb95ce690b012ffa7a79cd (diff) | |
download | infinitytier-e1882b614b48bf0c2a68223b3ee2fb338dbcb8f6.tar.gz infinitytier-e1882b614b48bf0c2a68223b3ee2fb338dbcb8f6.zip |
Some cleanup, Multicaster now sends multicasts as it gets additional members.
-rw-r--r-- | node/Multicaster.cpp | 47 | ||||
-rw-r--r-- | node/Multicaster.hpp | 16 | ||||
-rw-r--r-- | node/Node.cpp | 6 | ||||
-rw-r--r-- | node/OutboundMulticast.cpp | 16 | ||||
-rw-r--r-- | node/OutboundMulticast.hpp | 7 | ||||
-rw-r--r-- | node/Switch.cpp | 4 | ||||
-rw-r--r-- | node/Topology.cpp | 5 | ||||
-rw-r--r-- | node/Topology.hpp | 2 |
8 files changed, 76 insertions, 27 deletions
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp index fe590fed..a3e600d9 100644 --- a/node/Multicaster.cpp +++ b/node/Multicaster.cpp @@ -40,7 +40,8 @@ namespace ZeroTier { -Multicaster::Multicaster() +Multicaster::Multicaster(const RuntimeEnvironment *renv) : + RR(renv) { } @@ -104,9 +105,9 @@ restart_member_scan: } void Multicaster::send( - const RuntimeEnvironment *RR, const CertificateOfMembership *com, unsigned int limit, + unsigned int gatherLimit, uint64_t now, uint64_t nwid, const MulticastGroup &mg, @@ -122,7 +123,19 @@ void Multicaster::send( // If we already have enough members, just send and we're done -- no need for TX queue OutboundMulticast out; - out.init(now,RR->identity.address(),nwid,com,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len); + out.init( + now, + RR->identity.address(), + nwid, + com, + limit, + gatherLimit, + src, + mg, + etherType, + data, + len); + unsigned int count = 0; for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) { out.sendOnly(*(RR->sw),m->address); // sendOnly() avoids overhead of creating sent log since we're going to discard this immediately @@ -134,7 +147,19 @@ void Multicaster::send( gs.txQueue.push_back(OutboundMulticast()); OutboundMulticast &out = gs.txQueue.back(); - out.init(now,RR->identity.address(),nwid,com,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len); + out.init( + now, + RR->identity.address(), + nwid, + com, + limit, + gatherLimit, + src, + mg, + etherType, + data, + len); + for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) out.sendAndLog(*(RR->sw),m->address); @@ -161,13 +186,13 @@ void Multicaster::send( } } -void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now) +void Multicaster::clean(uint64_t now) { Mutex::Lock _l(_groups_m); for(std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) { // Remove expired outgoing multicasts from multicast TX queue for(std::list<OutboundMulticast>::iterator tx(mm->second.txQueue.begin());tx!=mm->second.txQueue.end();) { - if (tx->expired(now)) + if ((tx->expired(now))||(tx->atLimit())) mm->second.txQueue.erase(tx++); else ++tx; } @@ -218,7 +243,7 @@ void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now) } } -void Multicaster::_add(uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member) +void Multicaster::_add(uint64_t now,uint64_t nwid,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member) { // assumes _groups_m is locked @@ -236,6 +261,14 @@ void Multicaster::_add(uint64_t now,MulticastGroupStatus &gs,const Address &lear // be resorted on next clean(). In the future we might want to insert // this somewhere else but we'll try this for now. gs.members.push_back(MulticastGroupMember(member,learnedFrom,now)); + + // Try to send to any outgoing multicasts that are waiting for more recipients + for(std::list<OutboundMulticast>::iterator tx(gs.txQueue.begin());tx!=gs.txQueue.end();) { + tx->sendIfNew(*(RR->sw),member); + if (tx->atLimit()) + gs.txQueue.erase(tx++); + else ++tx; + } } } // namespace ZeroTier diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp index 5c858093..96207aa0 100644 --- a/node/Multicaster.hpp +++ b/node/Multicaster.hpp @@ -42,6 +42,7 @@ #include "OutboundMulticast.hpp" #include "Utils.hpp" #include "Mutex.hpp" +#include "NonCopyable.hpp" namespace ZeroTier { @@ -52,7 +53,7 @@ class Packet; /** * Database of known multicast peers within a network */ -class Multicaster +class Multicaster : NonCopyable { private: struct MulticastGroupMember @@ -79,7 +80,7 @@ private: }; public: - Multicaster(); + Multicaster(const RuntimeEnvironment *renv); ~Multicaster(); /** @@ -94,7 +95,7 @@ public: inline void subscribe(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const Address &learnedFrom,const Address &member) { Mutex::Lock _l(_groups_m); - _add(now,_groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)],learnedFrom,member); + _add(now,nwid,_groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)],learnedFrom,member); } /** @@ -120,10 +121,10 @@ public: /** * Send a multicast * - * @param RR Runtime environment * @param nwid Network ID * @param com Certificate of membership to include or NULL for none * @param limit Multicast limit + * @param gatherLimit Limit to pass for implicit gather with MULTICAST_FRAME * @param now Current time * @param mg Multicast group * @param from Source Ethernet MAC address @@ -132,9 +133,9 @@ public: * @param len Length of packet data */ void send( - const RuntimeEnvironment *RR, const CertificateOfMembership *com, unsigned int limit, + unsigned int gatherLimit, uint64_t now, uint64_t nwid, const MulticastGroup &mg, @@ -149,11 +150,12 @@ public: * @param RR Runtime environment * @param now Current time */ - void clean(const RuntimeEnvironment *RR,uint64_t now); + void clean(uint64_t now); private: - void _add(uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member); + void _add(uint64_t now,uint64_t nwid,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member); + const RuntimeEnvironment *RR; std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus > _groups; Mutex _groups_m; }; diff --git a/node/Node.cpp b/node/Node.cpp index 05a490b7..6682ad25 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -382,7 +382,7 @@ Node::ReasonForTermination Node::run() RR->http = new HttpClient(); RR->antiRec = new AntiRecursion(); - RR->mc = new Multicaster(); + RR->mc = new Multicaster(RR); RR->sw = new Switch(RR); RR->sm = new SocketManager(impl->udpPort,impl->tcpPort,&_CBztTraffic,RR); RR->topology = new Topology(RR,Utils::fileExists((RR->homePath + ZT_PATH_SEPARATOR_S + "iddb.d").c_str())); @@ -605,8 +605,8 @@ Node::ReasonForTermination Node::run() // Do periodic tasks in submodules. if ((now - lastClean) >= ZT_DB_CLEAN_PERIOD) { lastClean = now; - RR->topology->clean(); - RR->mc->clean(RR,now); + RR->topology->clean(now); + RR->mc->clean(now); RR->nc->clean(); if (RR->updater) RR->updater->checkIfMaxIntervalExceeded(now); diff --git a/node/OutboundMulticast.cpp b/node/OutboundMulticast.cpp index cd11a523..7b2e4386 100644 --- a/node/OutboundMulticast.cpp +++ b/node/OutboundMulticast.cpp @@ -32,12 +32,24 @@ namespace ZeroTier { -void OutboundMulticast::init(uint64_t timestamp,const Address &self,uint64_t nwid,const CertificateOfMembership *com,unsigned int gatherLimit,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len) +void OutboundMulticast::init( + uint64_t timestamp, + const Address &self, + uint64_t nwid, + const CertificateOfMembership *com, + unsigned int limit, + unsigned int gatherLimit, + const MAC &src, + const MulticastGroup &dest, + unsigned int etherType, + const void *payload, + unsigned int len) { _timestamp = timestamp; _nwid = nwid; _source = src; _destination = dest; + _limit = limit; _etherType = etherType; _packet.setSource(self); @@ -46,7 +58,7 @@ void OutboundMulticast::init(uint64_t timestamp,const Address &self,uint64_t nwi self.appendTo(_packet); _packet.append((uint64_t)nwid); _packet.append((uint8_t)((com) ? 0x01 : 0x00)); - _packet.append((uint32_t)gatherLimit); // gather limit -- set before send, start with 0 + _packet.append((uint32_t)gatherLimit); if (com) com->serialize(_packet); _packet.append((uint32_t)dest.adi()); dest.mac().appendTo(_packet); diff --git a/node/OutboundMulticast.hpp b/node/OutboundMulticast.hpp index 8d717fc1..548171ab 100644 --- a/node/OutboundMulticast.hpp +++ b/node/OutboundMulticast.hpp @@ -66,6 +66,7 @@ public: * @param self My ZeroTier address * @param nwid Network ID * @param com Certificate of membership to attach or NULL to omit + * @param limit Multicast limit for desired number of packets to send * @param gatherLimit Number to lazily/implicitly gather with this frame or 0 for none * @param src Source MAC address of frame * @param dest Destination multicast group (MAC + ADI) @@ -79,6 +80,7 @@ public: const Address &self, uint64_t nwid, const CertificateOfMembership *com, + unsigned int limit, unsigned int gatherLimit, const MAC &src, const MulticastGroup &dest, @@ -98,9 +100,9 @@ public: inline bool expired(uint64_t now) const throw() { return ((now - _timestamp) >= ZT_MULTICAST_TRANSMIT_TIMEOUT); } /** - * @return Number of unique recipients to which this packet has already been sent + * @return True if this outbound multicast has been sent to enough peers */ - inline unsigned int sentToCount() const throw() { return (unsigned int)_alreadySentTo.size(); } + inline bool atLimit() const throw() { return (_alreadySentTo.size() > _limit); } /** * Just send without checking log @@ -144,6 +146,7 @@ private: uint64_t _nwid; MAC _source; MulticastGroup _destination; + unsigned int _limit; unsigned int _etherType; Packet _packet; // packet contains basic structure of MULTICAST_FRAME and payload, is re-used with new IV and addressing each time std::vector<Address> _alreadySentTo; diff --git a/node/Switch.cpp b/node/Switch.cpp index 50e31fdc..c254c762 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -151,9 +151,9 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c TRACE("%s: MULTICAST %s -> %s %s %d",network->tapDeviceName().c_str(),from.toString().c_str(),mg.toString().c_str(),etherTypeName(etherType),(int)data.size()); RR->mc->send( - RR, ((!nconf->isPublic())&&(nconf->com())) ? &(nconf->com()) : (const CertificateOfMembership *)0, - network->wantMulticastGroup(mg) ? nconf->multicastLimit() : 0, + nconf->multicastLimit(), + network->wantMulticastGroup(mg) ? ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER : 0, now, network->id(), mg, diff --git a/node/Topology.cpp b/node/Topology.cpp index 8cfe571d..86c1befb 100644 --- a/node/Topology.cpp +++ b/node/Topology.cpp @@ -49,7 +49,7 @@ Topology::Topology(const RuntimeEnvironment *renv,bool enablePermanentIdCaching) Topology::~Topology() { - clean(); + clean(Utils::now()); _dumpPeers(); } @@ -256,9 +256,8 @@ keep_searching_for_supernodes: return bestSupernode; } -void Topology::clean() +void Topology::clean(uint64_t now) { - uint64_t now = Utils::now(); Mutex::Lock _l(_activePeers_m); Mutex::Lock _l2(_supernodes_m); for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();) { diff --git a/node/Topology.hpp b/node/Topology.hpp index 45b0a693..8ab10074 100644 --- a/node/Topology.hpp +++ b/node/Topology.hpp @@ -186,7 +186,7 @@ public: /** * Clean and flush database */ - void clean(); + void clean(uint64_t now); /** * Apply a function or function object to all peers |