diff options
-rw-r--r-- | node/BloomFilter.hpp | 44 | ||||
-rw-r--r-- | node/Buffer.hpp | 16 | ||||
-rw-r--r-- | node/Constants.hpp | 5 | ||||
-rw-r--r-- | node/Multicaster.hpp | 274 | ||||
-rw-r--r-- | node/Network.hpp | 14 | ||||
-rw-r--r-- | node/Packet.hpp | 4 | ||||
-rw-r--r-- | node/Switch.cpp | 50 | ||||
-rw-r--r-- | node/Switch.hpp | 35 | ||||
-rw-r--r-- | node/Topology.cpp | 106 | ||||
-rw-r--r-- | node/Topology.hpp | 28 |
10 files changed, 366 insertions, 210 deletions
diff --git a/node/BloomFilter.hpp b/node/BloomFilter.hpp index 16bd8ee0..182b98ab 100644 --- a/node/BloomFilter.hpp +++ b/node/BloomFilter.hpp @@ -46,6 +46,9 @@ template<unsigned int B> class BloomFilter { public: + /** + * Construct an empty filter + */ BloomFilter() throw() { @@ -53,14 +56,27 @@ public: } /** + * Construct from a raw filter + * + * @param b Raw filter bits, must be exactly bytes() in length, or NULL to construct empty + */ + BloomFilter(const void *b) + throw() + { + if (b) + memcpy(_field,b,sizeof(_field)); + else memset(_field,0,sizeof(_field)); + } + + /** * @return Size of filter in bits */ - inline unsigned int bits() const throw() { return B; } + static inline unsigned int bits() throw() { return B; } /** * @return Size of filter in bytes */ - inline unsigned int bytes() const throw() { return (B / 8); } + static inline unsigned int bytes() throw() { return (B / 8); } /** * @return Pointer to portable array of bytes of bytes() length representing filter @@ -78,12 +94,17 @@ public: /** * @param n Value to set + * @return True if corresponding bit was already set before this operation */ - inline void add(unsigned int n) + inline bool set(unsigned int n) throw() { n %= B; - _field[n / 8] |= (1 << (n % 8)); + unsigned char *const x = _field + (n / 8); + const unsigned char m = (1 << (n % 8)); + bool already = ((*x & m)); + *x |= m; + return already; } /** @@ -98,20 +119,13 @@ public: } /** - * Clear one or more random bits - * - * This is used to apply probabilistic decay and hence "fuzziness" to - * a bloom filter over time. - * - * @param bits Number of bits to clear + * Clear a random bit in this bloom filter */ - inline void decay(unsigned int bits) + inline void decay() throw() { - for(unsigned int i=0;i<bits;++i) { - unsigned int rn = Utils::randomInt<unsigned int>(); - _field[(rn >> 7) % (B / 8)] &= ~((unsigned char)(1 << (rn & 7))); - } + const unsigned int rn = Utils::randomInt<unsigned int>(); + _field[(rn >> 3) % (B / 8)] &= ~((unsigned char)(1 << (rn & 7))); } private: diff --git a/node/Buffer.hpp b/node/Buffer.hpp index 0320ddb7..ed463bf6 100644 --- a/node/Buffer.hpp +++ b/node/Buffer.hpp @@ -241,6 +241,22 @@ public: } /** + * Append a run of bytes + * + * @param c Character value to append + * @param n Number of times to append + * @throws std::out_of_range Attempt to append beyond capacity + */ + inline void append(unsigned char c,unsigned int n) + throw(std::out_of_range) + { + if ((_l + n) > C) + throw std::out_of_range("Buffer: append beyond capacity"); + for(unsigned int i=0;i<n;++i) + _b[_l++] = (char)c; + } + + /** * Append a C-array of bytes * * @param b Data diff --git a/node/Constants.hpp b/node/Constants.hpp index 4517cc07..61044fed 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -243,6 +243,11 @@ error_no_ZT_ARCH_defined; #define ZT_MULTICAST_DEDUP_HISTORY_EXPIRE 8000 /** + * Number of bits to randomly "decay" in bloom filter per hop + */ +#define ZT_MULTICAST_BLOOM_FILTER_DECAY_RATE 2 + +/** * Period between announcements of all multicast 'likes' in ms * * Announcement occurs when a multicast group is locally joined, but all diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp new file mode 100644 index 00000000..190c3033 --- /dev/null +++ b/node/Multicaster.hpp @@ -0,0 +1,274 @@ +/* + * ZeroTier One - Global Peer to Peer Ethernet + * Copyright (C) 2012-2013 ZeroTier Networks LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifndef _ZT_MULTICASTER_HPP +#define _ZT_MULTICASTER_HPP + +#include <stdint.h> +#include <string.h> + +#include <utility> +#include <algorithm> +#include <map> +#include <set> +#include <vector> + +#include "Constants.hpp" +#include "Buffer.hpp" +#include "Packet.hpp" +#include "MulticastGroup.hpp" +#include "Utils.hpp" +#include "MAC.hpp" +#include "Address.hpp" +#include "SharedPtr.hpp" +#include "BloomFilter.hpp" + +// Maximum sample size to pick during choice of multicast propagation peers +#define ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE 64 + +namespace ZeroTier { + +/** + * Multicast propagation engine + * + * This is written as a generic class so that it can be mocked and tested + * in simulation. It also always takes 'now' as an argument, permitting + * running in simulated time. + */ +class Multicaster +{ +public: + /** + * 256-bit simple bloom filter included with multicast frame packets + */ + typedef BloomFilter<ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BITS> MulticastBloomFilter; + + Multicaster() + { + memset(_multicastHistory,0,sizeof(_multicastHistory)); + } + + /** + * Update the most recent LIKE time for an address in a given multicast group on a given network + * + * @param nwid Network ID + * @param mg Multicast group + * @param addr Address that likes group on given network + * @param now Current timestamp + */ + inline void likesMulticastGroup(const uint64_t nwid,const MulticastGroup &mg,const Address &addr,const uint64_t now) + { + _multicastMemberships[MulticastChannel(nwid,mg)][addr] = now; + } + + /** + * Check multicast history to see if this is a duplicate, and add/update entry + * + * @param from Ultimate sending MAC address + * @param to Destination multicast group + * @param payload Multicast packet payload + * @param len Length of packet + * @param nwid Network ID + * @param now Current time + * @return True if this appears to be a duplicate to within history expiration time + */ + inline bool checkAndUpdateMulticastHistory( + const MAC &from, + const MulticastGroup &to, + const void *payload, + unsigned int len, + const uint64_t nwid, + const uint64_t now) + throw() + { + // Note: CRCs aren't transmitted over the network, so portability and + // byte order don't matter. This calculation can be changed. We just + // want a unique code. + uint64_t crc = Utils::crc64(0,from.data,6); + crc = Utils::crc64(crc,to.mac().data,6); + crc ^= (uint64_t)to.adi(); + crc = Utils::crc64(crc,payload,len); + crc ^= nwid; // also include network ID in CRC + + // Replace existing entry or pick one to replace with new entry + uint64_t earliest = 0xffffffffffffffffULL; + unsigned long earliestIdx = 0; + for(unsigned int i=0;i<ZT_MULTICAST_DEDUP_HISTORY_LENGTH;++i) { + if (_multicastHistory[i][0] == crc) { + uint64_t then = _multicastHistory[i][1]; + _multicastHistory[i][1] = now; + return ((now - then) < ZT_MULTICAST_DEDUP_HISTORY_EXPIRE); + } else if (_multicastHistory[i][1] < earliest) { + earliest = _multicastHistory[i][1]; + earliestIdx = i; + } + } + + _multicastHistory[earliestIdx][0] = crc; // replace oldest entry + _multicastHistory[earliestIdx][1] = now; + + return false; + } + + /** + * Choose peers to send a propagating multicast to + * + * @param topology Topology object or mock thereof + * @param nwid Network ID + * @param mg Multicast group + * @param upstream Address from which message originated, or null (0) address if none + * @param bf Bloom filter, updated in place with sums of addresses in chosen peers and/or decay + * @param max Maximum number of peers to pick + * @param peers Array of objects of type P to fill with up to [max] peers + * @param now Current timestamp + * @return Number of peers actually stored in peers array + * @tparam T Type of topology, which is Topology in running code or a mock in simulation + * @tparam P Type of peers, which is SharedPtr<Peer> in running code or a mock in simulation (mock must behave like a pointer type) + */ + template<typename T,typename P> + inline unsigned int pickNextPropagationPeers( + T &topology, + uint64_t nwid, + const MulticastGroup &mg, + const Address &upstream, + MulticastBloomFilter &bf, + unsigned int max, + P *peers, + uint64_t now) + { + P toConsider[ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE]; + unsigned int sampleSize = 0; + + { + Mutex::Lock _l(_multicastMemberships_m); + + // Sample a random subset of peers that we know have LIKEd this multicast + // group on this network. + std::map< MulticastChannel,std::map<Address,uint64_t> >::iterator channelMembers(_multicastMemberships.find(MulticastChannel(nwid,mg))); + if ((channelMembers != _multicastMemberships.end())&&(!channelMembers->second.empty())) { + unsigned long numEntriesPermittedToSkip = (channelMembers->second.size() > ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE) ? (unsigned long)(channelMembers->second.size() - ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE) : (unsigned long)0; + double skipWhatFraction = (double)numEntriesPermittedToSkip / (double)channelMembers->second.size(); + + std::map<Address,uint64_t>::iterator channelMemberEntry(channelMembers->second.begin()); + + while (channelMemberEntry != channelMembers->second.end()) { + // Auto-clean the channel members map if their LIKEs are expired. This will + // technically skew the random distribution of chosen members just a little, but + // it's unlikely that enough will expire in any single pick to make much of a + // difference overall. + if ((now - channelMemberEntry->second) > ZT_MULTICAST_LIKE_EXPIRE) { + channelMembers->second.erase(channelMemberEntry++); + continue; + } + + // Skip some fraction of entries so that our sampling will be randomly distributed, + // since there is no other good way to sample randomly from a map. + if (numEntriesPermittedToSkip) { + double skipThis = (double)(Utils::randomInt<uint32_t>()) / 4294967296.0; + if (skipThis <= skipWhatFraction) { + --numEntriesPermittedToSkip; + ++channelMemberEntry; + continue; + } + } + + // If it's not expired and it's from our random sample, add it to the set of peers + // to consider. + P peer = topology.getPeer(channelMemberEntry->first); + if (peer) { + toConsider[sampleSize++] = peer; + if (sampleSize >= ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE) + break; // abort if we have enough candidates + } + ++channelMemberEntry; + } + + // Auto-clean: erase whole map if there are no more LIKEs for this channel + if (channelMembers->second.empty()) + _multicastMemberships.erase(channelMembers); + } + } + + // Sort in descending order of most recent direct unicast frame, picking + // peers with whom we have recently communicated. This is "implicit social + // switching." + std::sort(&(toConsider[0]),&(toConsider[sampleSize]),PeerPropagationPrioritySortOrder<P>()); + + // Decay a few random bits in bloom filter to probabilistically eliminate + // false positives as we go. The odds of decaying an already-set bit + // increases as the bloom filter saturates, so in the early hops of + // propagation this likely won't have any effect. + for(unsigned int i=0;i<ZT_MULTICAST_BLOOM_FILTER_DECAY_RATE;++i) + bf.decay(); + + // Pick peers not in the bloom filter, setting bloom filter bits accordingly to + // remember and pass on these picks. + unsigned int picked = 0; + for(unsigned int i=0;((i<sampleSize)&&(picked < max));++i) { + if (!bf.set(toConsider[i]->address().sum())) + peers[picked++] = toConsider[i]; + } + + // Add a supernode if there's nowhere else to go. Supernodes know of all multicast + // LIKEs and so can act to bridge sparse multicast groups. We do not remember them + // in the bloom filter, since such bridging may very well need to happen more than + // once. + if (!picked) { + P peer = topology.getBestSupernode(); + if (peer) + peers[picked++] = peer; + } + + return picked; + } + +private: + // Sort order for chosen propagation peers + template<typename P> + struct PeerPropagationPrioritySortOrder + { + inline bool operator()(const P &p1,const P &p2) const + { + return (p1->lastUnicastFrame() >= p2->lastUnicastFrame()); + } + }; + + // [0] - CRC, [1] - timestamp + uint64_t _multicastHistory[ZT_MULTICAST_DEDUP_HISTORY_LENGTH][2]; + + // A multicast channel, essentially a pub/sub channel. It consists of a + // network ID and a multicast group within that network. + typedef std::pair<uint64_t,MulticastGroup> MulticastChannel; + + // Address and time of last LIKE, by network ID and multicast group + std::map< MulticastChannel,std::map<Address,uint64_t> > _multicastMemberships; + Mutex _multicastMemberships_m; +}; + +} // namespace ZeroTier + +#endif diff --git a/node/Network.hpp b/node/Network.hpp index 0ed3bef5..ddb8930f 100644 --- a/node/Network.hpp +++ b/node/Network.hpp @@ -42,6 +42,7 @@ #include "RuntimeEnvironment.hpp" #include "MulticastGroup.hpp" #include "NonCopyable.hpp" +#include "MAC.hpp" namespace ZeroTier { @@ -112,6 +113,19 @@ public: } /** + * Shortcut to check open(), whether MAC is ZeroTier, then isMember() + * + * @param mac MAC address to check + * @return True if MAC is allowed + */ + inline bool isAllowed(const MAC &mac) const + throw() + { + Mutex::Lock _l(_lock); + return ((_open)||((mac.isZeroTier())&&(_members.count(Address(mac)) > 0))); + } + + /** * @return True if network is open (no membership required) */ inline bool open() const diff --git a/node/Packet.hpp b/node/Packet.hpp index 914fac84..05b4ef41 100644 --- a/node/Packet.hpp +++ b/node/Packet.hpp @@ -122,7 +122,9 @@ */ #define ZT_PROTO_MIN_FRAGMENT_LENGTH ZT_PACKET_FRAGMENT_IDX_PAYLOAD +// Size of bloom filter used in multicast propagation #define ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE 32 +#define ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BITS 256 // Field incides for parsing verbs #define ZT_PROTO_VERB_HELLO_IDX_PROTOCOL_VERSION (ZT_PACKET_IDX_PAYLOAD) @@ -419,7 +421,7 @@ public: * <[4] multicast additional distinguishing information (ADI)> * <[32] multicast propagation bloom filter> * <[1] 8-bit strict propagation hop count> - * <[2] 16-bit average peer multicast bandwidth load> + * <[2] reserved, must be 0> * <[6] source Ethernet address> * <[2] 16-bit ethertype> * <[...] ethernet payload> diff --git a/node/Switch.cpp b/node/Switch.cpp index 0ed5a9d4..b1a55756 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -50,7 +50,6 @@ namespace ZeroTier { Switch::Switch(const RuntimeEnvironment *renv) : _r(renv) { - memset(_multicastHistory,0,sizeof(_multicastHistory)); } Switch::~Switch() @@ -259,14 +258,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c mg = MulticastGroup::deriveMulticastGroupForAddressResolution(InetAddress(data.field(24,4),4,0)); } - // Remember this message's CRC, but don't drop if we've already seen it - // since it's our own. - _checkAndUpdateMulticastHistory(from,mg.mac(),data.data(),data.size(),network->id(),Utils::now()); - - // Start multicast propagation with empty bloom filter - unsigned char bloom[ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE]; - memset(bloom,0,sizeof(bloom)); - _propagateMulticast(network,_r->identity.address(),bloom,mg,0,0,from,etherType,data.data(),data.size()); + _propagateMulticast(network,_r->identity.address(),(const unsigned char *)0,mg,0,from,etherType,data.data(),data.size()); } else if (to.isZeroTier()) { // Simple unicast frame from us to another node Address toZT(to.data + 1); @@ -568,22 +560,23 @@ void Switch::_CBaddPeerFromWhois(void *arg,const SharedPtr<Peer> &p,Topology::Pe } } -void Switch::_propagateMulticast(const SharedPtr<Network> &network,const Address &upstream,unsigned char *bloom,const MulticastGroup &mg,unsigned int mcHops,unsigned int mcLoadFactor,const MAC &from,unsigned int etherType,const void *data,unsigned int len) +void Switch::_propagateMulticast(const SharedPtr<Network> &network,const Address &upstream,const unsigned char *bloom,const MulticastGroup &mg,unsigned int mcHops,const MAC &from,unsigned int etherType,const void *data,unsigned int len) { - SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH]; - unsigned int np = _r->topology->pickMulticastPropagationPeers(network->id(),upstream,bloom,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE * 8,ZT_MULTICAST_PROPAGATION_BREADTH,mg,propPeers); + if (mcHops > ZT_MULTICAST_PROPAGATION_DEPTH) + return; - for(unsigned int i=0;i<np;++i) - Utils::bloomAdd(bloom,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE,propPeers[i]->address().sum()); + Multicaster::MulticastBloomFilter newBloom(bloom); // bloom will be NULL if starting fresh + SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH]; + unsigned int np = _multicaster.pickNextPropagationPeers(*(_r->topology),network->id(),mg,upstream,newBloom,ZT_MULTICAST_PROPAGATION_BREADTH,propPeers,Utils::now()); for(unsigned int i=0;i<np;++i) { Packet outp(propPeers[i]->address(),_r->identity.address(),Packet::VERB_MULTICAST_FRAME); - outp.append(network->id()); + outp.append((uint64_t)network->id()); outp.append(mg.mac().data,6); outp.append((uint32_t)mg.adi()); - outp.append(bloom,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE); + outp.append(newBloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE); outp.append((uint8_t)mcHops); - outp.append((uint16_t)mcLoadFactor); + outp.append((unsigned char)0,2); // reserved, 0 outp.append(from.data,6); outp.append((uint16_t)etherType); outp.append(data,len); @@ -761,21 +754,22 @@ Switch::PacketServiceAttemptResult Switch::_tryHandleRemotePacket(Demarc::Port l if (network->isAllowed(source)) { if (packet.size() > ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD) { MulticastGroup mg(MAC(packet.field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_MULTICAST_MAC,6)),packet.at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ADI)); - unsigned char bloom[ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE]; - memcpy(bloom,packet.field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE); unsigned int hops = packet[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOPS]; - unsigned int loadFactor = packet.at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_LOAD_FACTOR); MAC fromMac(packet.field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_FROM_MAC,6)); unsigned int etherType = packet.at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ETHERTYPE); - - if ((fromMac.isZeroTier())&&(network->isAllowed(Address(fromMac)))) { - if (_checkAndUpdateMulticastHistory(fromMac,mg.mac(),packet.data() + ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,packet.size() - ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,network->id(),now)) { - TRACE("dropped duplicate MULTICAST_FRAME from %s: %s -> %s (adi: %.8lx), %u bytes, net: %llu",source.toString().c_str(),fromMac.toString().c_str(),mg.mac().toString().c_str(),(unsigned long)mg.adi(),packet.size() - ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,network->id()); + unsigned int payloadLen = packet.size() - ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD; + unsigned char *payload = packet.field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,payloadLen); + + if (fromMac == network->tap().mac()) { + TRACE("dropped boomerang MULTICAST_FRAME from %s",source.toString().c_str()); + } if (network->isAllowed(fromMac)) { + if (_multicaster.checkAndUpdateMulticastHistory(fromMac,mg,payload,payloadLen,network->id(),now)) { + // TODO: check if allowed etherType + network->tap().put(fromMac,mg.mac(),etherType,payload,payloadLen); } else { - //TRACE("MULTICAST_FRAME: %s -> %s (adi: %.8lx), %u bytes, net: %llu",fromMac.toString().c_str(),mg.mac().toString().c_str(),(unsigned long)mg.adi(),packet.size() - ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,network->id()); - network->tap().put(fromMac,mg.mac(),etherType,packet.data() + ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,packet.size() - ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD); - _propagateMulticast(network,source,bloom,mg,hops+1,loadFactor,fromMac,etherType,packet.data() + ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,packet.size() - ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD); + TRACE("duplicate MULTICAST_FRAME from %s: %s -> %s (adi: %.8lx), %u bytes, net: %llu",source.toString().c_str(),fromMac.toString().c_str(),mg.mac().toString().c_str(),(unsigned long)mg.adi(),packet.size() - ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,network->id()); } + _propagateMulticast(network,source,packet.field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE),mg,hops+1,fromMac,etherType,payload,payloadLen); } else { TRACE("dropped MULTICAST_FRAME from %s: ultimate sender %s not a member of closed network %llu",source.toString().c_str(),fromMac.toString().c_str(),network->id()); } @@ -804,7 +798,7 @@ Switch::PacketServiceAttemptResult Switch::_tryHandleRemotePacket(Demarc::Port l MAC mac(packet.field(ptr,6)); ptr += 6; uint32_t adi = packet.at<uint32_t>(ptr); ptr += 4; TRACE("peer %s likes multicast group %s:%.8lx on network %llu",source.toString().c_str(),mac.toString().c_str(),(unsigned long)adi,nwid); - _r->topology->likesMulticastGroup(nwid,MulticastGroup(mac,adi),source,now); + _multicaster.likesMulticastGroup(nwid,MulticastGroup(mac,adi),source,now); ++numAccepted; } else { TRACE("ignored MULTICAST_LIKE from %s: not a member of closed network %llu",source.toString().c_str(),nwid); diff --git a/node/Switch.hpp b/node/Switch.hpp index d0fbf02b..a6c753fb 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -44,6 +44,7 @@ #include "Network.hpp" #include "SharedPtr.hpp" #include "Demarc.hpp" +#include "Multicaster.hpp" namespace ZeroTier { @@ -171,7 +172,7 @@ private: static void _CBaddPeerFromHello(void *arg,const SharedPtr<Peer> &p,Topology::PeerVerifyResult result); static void _CBaddPeerFromWhois(void *arg,const SharedPtr<Peer> &p,Topology::PeerVerifyResult result); // arg == this - void _propagateMulticast(const SharedPtr<Network> &network,const Address &upstream,unsigned char *bloom,const MulticastGroup &mg,unsigned int mcHops,unsigned int mcLoadFactor,const MAC &from,unsigned int etherType,const void *data,unsigned int len); + void _propagateMulticast(const SharedPtr<Network> &network,const Address &upstream,const unsigned char *bloom,const MulticastGroup &mg,unsigned int mcHops,const MAC &from,unsigned int etherType,const void *data,unsigned int len); PacketServiceAttemptResult _tryHandleRemotePacket(Demarc::Port localPort,const InetAddress &fromAddr,Packet &packet); void _doHELLO(Demarc::Port localPort,const InetAddress &fromAddr,Packet &packet); void _requestWhois(const Address &addr); @@ -179,39 +180,9 @@ private: PacketServiceAttemptResult _trySend(const Packet &packet,bool encrypt); void _retryPendingFor(const Address &addr); - // Updates entry for crc in multicast history, returns true if already - // present in history and not expired. - inline bool _checkAndUpdateMulticastHistory(const MAC &fromMac,const MAC &toMulticastMac,const void *payload,unsigned int len,const uint64_t nwid,const uint64_t now) - { - uint64_t crc = Utils::crc64(0,fromMac.data,6); - crc = Utils::crc64(crc,toMulticastMac.data,6); - crc = Utils::crc64(crc,payload,len); - crc += nwid; // also include network ID - - uint64_t earliest = 0xffffffffffffffffULL; - unsigned long earliestIdx = 0; - for(unsigned int i=0;i<ZT_MULTICAST_DEDUP_HISTORY_LENGTH;++i) { - if (_multicastHistory[i][0] == crc) { - uint64_t then = _multicastHistory[i][1]; - _multicastHistory[i][1] = now; - return ((now - then) < ZT_MULTICAST_DEDUP_HISTORY_EXPIRE); - } else if (_multicastHistory[i][1] < earliest) { - earliest = _multicastHistory[i][1]; - earliestIdx = i; - } - } - - _multicastHistory[earliestIdx][0] = crc; // replace oldest entry - _multicastHistory[earliestIdx][1] = now; - - return false; - } - const RuntimeEnvironment *const _r; - // Multicast packet CRC64's for packets we've received recently, to reject - // duplicates during propagation. [0] is CRC64, [1] is time. - uint64_t _multicastHistory[ZT_MULTICAST_DEDUP_HISTORY_LENGTH][2]; + Multicaster _multicaster; struct WhoisRequest { diff --git a/node/Topology.cpp b/node/Topology.cpp index 1b7973a6..5fd9939d 100644 --- a/node/Topology.cpp +++ b/node/Topology.cpp @@ -207,104 +207,6 @@ void Topology::clean() _peerDeepVerifyJobs_c.signal(); } -void Topology::likesMulticastGroup(uint64_t nwid,const MulticastGroup &mg,const Address &addr,uint64_t now) -{ - Mutex::Lock _l(_multicastGroupMembers_m); - _multicastGroupMembers[nwid][mg][addr] = now; -} - -struct _PickMulticastPropagationPeersPeerPrioritySortOrder -{ - inline bool operator()(const SharedPtr<Peer> &p1,const SharedPtr<Peer> &p2) const - { - return (p1->lastUnicastFrame() >= p2->lastUnicastFrame()); - } -}; -#define _MAX_PEERS_TO_CONSIDER 256 -unsigned int Topology::pickMulticastPropagationPeers(uint64_t nwid,const Address &exclude,const void *propagationBloom,unsigned int propagationBloomSize,unsigned int count,const MulticastGroup &mg,SharedPtr<Peer> *peers) -{ - SharedPtr<Peer> possiblePeers[_MAX_PEERS_TO_CONSIDER]; - unsigned int numPossiblePeers = 0; - - if (count > _MAX_PEERS_TO_CONSIDER) - count = _MAX_PEERS_TO_CONSIDER; - - Mutex::Lock _l1(_activePeers_m); - Mutex::Lock _l2(_supernodes_m); - - // Grab known non-supernode peers in multicast group, excluding 'exclude' - // Also lazily clean up the _multicastGroupMembers structure - { - Mutex::Lock _l3(_multicastGroupMembers_m); - std::map< uint64_t,std::map< MulticastGroup,std::map< Address,uint64_t > > >::iterator mgm(_multicastGroupMembers.find(nwid)); - if (mgm != _multicastGroupMembers.end()) { - std::map< MulticastGroup,std::map< Address,uint64_t > >::iterator g(mgm->second.find(mg)); - if (g != mgm->second.end()) { - uint64_t now = Utils::now(); - for(std::map< Address,uint64_t >::iterator m(g->second.begin());m!=g->second.end();) { - if (((now - m->second) < ZT_MULTICAST_LIKE_EXPIRE)&&(m->first != exclude)) { - std::map< Address,SharedPtr<Peer> >::const_iterator p(_activePeers.find(m->first)); - if (p != _activePeers.end()) { - possiblePeers[numPossiblePeers++] = p->second; - if (numPossiblePeers > _MAX_PEERS_TO_CONSIDER) - break; - } - ++m; - } else g->second.erase(m++); - } - if (!g->second.size()) - mgm->second.erase(g); - } - } - } - - // Sort non-supernode peers in descending order of most recent data - // exchange timestamp. This sorts by implicit social relationships -- who - // you are talking to are the people who get multicasts first. - std::sort(&(possiblePeers[0]),&(possiblePeers[numPossiblePeers]),_PickMulticastPropagationPeersPeerPrioritySortOrder()); - - // Tack on a supernode peer to the end if we don't have enough regular - // peers, using supernodes to bridge gaps in sparse multicast groups. - if (numPossiblePeers < count) { - SharedPtr<Peer> bestSupernode; - unsigned int bestSupernodeLatency = 0xffff; - for(std::vector< SharedPtr<Peer> >::const_iterator sn(_supernodePeers.begin());sn!=_supernodePeers.end();++sn) { - if (((*sn)->latency())&&((*sn)->latency() < bestSupernodeLatency)) { - bestSupernodeLatency = (*sn)->latency(); - bestSupernode = *sn; - } - } - if (bestSupernode) - possiblePeers[numPossiblePeers++] = bestSupernode; - } - - unsigned int num = 0; - - // First, try to pick peers not in the propgation bloom filter - for(unsigned int i=0;i<numPossiblePeers;++i) { - if (!Utils::bloomContains(propagationBloom,propagationBloomSize,possiblePeers[i]->address().sum())) { - peers[num++] = possiblePeers[i]; - if (num >= count) - return num; - } - } - - // Next, pick other peers until full (without duplicates) - for(unsigned int i=0;i<numPossiblePeers;++i) { - for(unsigned int j=0;j<num;++j) { - if (peers[j] == possiblePeers[i]) - goto check_next_peer; - } - peers[num++] = possiblePeers[i]; - if (num >= count) - return num; -check_next_peer: - continue; - } - - return num; -} - void Topology::main() throw() { @@ -404,14 +306,6 @@ void Topology::main() } } } - { - Mutex::Lock _l(_multicastGroupMembers_m); - for(std::map< uint64_t,std::map< MulticastGroup,std::map< Address,uint64_t > > >::iterator mgm(_multicastGroupMembers.begin());mgm!=_multicastGroupMembers.end();) { - if (_r->nc->hasNetwork(mgm->first)) - ++mgm; - else _multicastGroupMembers.erase(mgm++); - } - } break; case _PeerDeepVerifyJob::EXIT_THREAD: TRACE("thread terminating..."); diff --git a/node/Topology.hpp b/node/Topology.hpp index 7baebad0..19e41e08 100644 --- a/node/Topology.hpp +++ b/node/Topology.hpp @@ -167,30 +167,6 @@ public: void clean(); /** - * Pick peers for multicast propagation - * - * @param nwid Network ID - * @param exclude Peer to exclude or zero address for none - * @param propagationBloom Propgation bloom filter - * @param propagationBloomSize Size of propagation bloom filter in BITS - * @param count Number of peers desired (propagation breadth) - * @param mg Multicast group - * @param peers Array to receive peers (must be at least [count]) - * @return Number of peers actually picked - */ - unsigned int pickMulticastPropagationPeers(uint64_t nwid,const Address &exclude,const void *propagationBloom,unsigned int propagationBloomSize,unsigned int count,const MulticastGroup &mg,SharedPtr<Peer> *peers); - - /** - * Add or update last 'like' time for an address's membership in a multicast group - * - * @param nwid Network ID - * @param mg Multicast group - * @param addr ZeroTier address - * @param now Current time - */ - void likesMulticastGroup(uint64_t nwid,const MulticastGroup &mg,const Address &addr,uint64_t now); - - /** * Apply a function or function object to all peers * * @param f Function to apply @@ -359,10 +335,6 @@ private: KISSDB _dbm; Mutex _dbm_m; - - // Multicast group members by network ID, then multicast group - std::map< uint64_t,std::map< MulticastGroup,std::map< Address,uint64_t > > > _multicastGroupMembers; - Mutex _multicastGroupMembers_m; }; } // namespace ZeroTier |