inline unsigned int pickNextPropagationPeers(
CMWC4096 &prng,
T &topology,
uint64_t nwid,
const MulticastGroup &mg,
const Address &originalSubmitter,
const Address &upstream,
MulticastBloomFilter &bf,
unsigned int max,
P *peers,
uint64_t now)
{
typename std::set< P,_PeerPropagationPrioritySortOrder > toConsider;
// Pick up to ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE peers that have
// subscribed to this channel and that are not in bloom filter.
// Pick randomly from subscribers, but place into a set that is
// sorted in descending order of time of most recent unicast
// frame transfer. (Implicit social ordering.) Also ignore original
// submitter and upstream, since we know these have seen this
// message.
{
Mutex::Lock _l(_multicastMemberships_m);
std::map< MulticastChannel,std::vector >::iterator mm(_multicastMemberships.find(MulticastChannel(nwid,mg)));
if ((mm != _multicastMemberships.end())&&(!mm->second.empty())) {
for(unsigned int stries=0;striessecond[prng.next32() % mm->second.size()];
if (((now - m.second) < ZT_MULTICAST_LIKE_EXPIRE)&&(!bf.contains(m.first.sum()))&&(m.first != originalSubmitter)&&(m.first != upstream)) {
P peer(topology.getPeer(m.first));
if (peer)
toConsider.insert(peer);
}
}
}
}
// The first peers in toConsider will be the 'best'
unsigned int chosen = 0;
for(typename std::set< P,_PeerPropagationPrioritySortOrder >::iterator i(toConsider.begin());((i!=toConsider.end())&&(chosen < max));++i)
bf.set((peers[chosen++] = *i)->address().sum());
// Add a supernode if there are fewer than the desired
// number of recipients. Note that we do not use the bloom
// filter to track visits to supernodes, intentionally
// allowing multicasts to ping pong between supernodes.
// Supernodes propagate even messages they've already seen,
// while regular nodes do not. Thus this ping-ponging will
// cause the supernodes to pick new starting points for
// peer to peer graph traversal multiple times. It's a
// simple, stateless way to increase supernode-driven
// propagation of a multicast in the event that peer to
// peer connectivity for its group is sparse.
if (chosen < max) {
Address avoid[2];
avoid[0] = originalSubmitter;
avoid[1] = upstream;
P peer = topology.getBestSupernode(avoid,2,true);
if (peer)
peers[chosen++] = peer;
}
return chosen;
}
private:
// Sort order for chosen propagation peers
template
struct _PeerPropagationPrioritySortOrder
{
inline bool operator()(const P &p1,const P &p2) const
{
return (p1->lastUnicastFrame() > p2->lastUnicastFrame());
}
};
static inline void _hashMulticastPacketForSig(uint64_t nwid,const MAC &from,const MulticastGroup &to,unsigned int etherType,const void *data,unsigned int len,unsigned char *digest)
throw()
{
unsigned char zero = 0;
SHA256_CTX sha;
SHA256_Init(&sha);
uint64_t _nwid = Utils::hton(nwid);
SHA256_Update(&sha,(unsigned char *)&_nwid,sizeof(_nwid));
SHA256_Update(&sha,&zero,1);
SHA256_Update(&sha,(unsigned char *)from.data,6);
SHA256_Update(&sha,&zero,1);
SHA256_Update(&sha,(unsigned char *)to.mac().data,6);
SHA256_Update(&sha,&zero,1);
uint32_t _adi = Utils::hton(to.adi());
SHA256_Update(&sha,(unsigned char *)&_adi,sizeof(_adi));
SHA256_Update(&sha,&zero,1);
uint16_t _etype = Utils::hton((uint16_t)etherType);
SHA256_Update(&sha,(unsigned char *)&_etype,sizeof(_etype));
SHA256_Update(&sha,&zero,1);
SHA256_Update(&sha,(unsigned char *)data,len);
SHA256_Final(digest,&sha);
}
// ring buffer: [0] - CRC, [1] - timestamp
uint64_t _multicastHistory[ZT_MULTICAST_DEDUP_HISTORY_LENGTH][2];
volatile unsigned int _multicastHistoryPtr;
// A multicast channel, essentially a pub/sub channel. It consists of a
// network ID and a multicast group within that network.
typedef std::pair MulticastChannel;
// Address and time of last LIKE
typedef std::pair MulticastMembership;
// Network : MulticastGroup -> vector
std::map< MulticastChannel,std::vector > _multicastMemberships;
Mutex _multicastMemberships_m;
};
} // namespace ZeroTier
#endif