From f03fd5799738699f2410c3961fbd5a08c8b1b568 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 26 Jan 2018 20:38:44 -0500 Subject: Clean up some multicast code. --- node/Multicaster.cpp | 82 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 51 insertions(+), 31 deletions(-) (limited to 'node/Multicaster.cpp') diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp index d5bdfdf6..e0fa0255 100644 --- a/node/Multicaster.cpp +++ b/node/Multicaster.cpp @@ -28,7 +28,6 @@ #include "Constants.hpp" #include "RuntimeEnvironment.hpp" -#include "SharedPtr.hpp" #include "Multicaster.hpp" #include "Topology.hpp" #include "Switch.hpp" @@ -37,6 +36,7 @@ #include "C25519.hpp" #include "CertificateOfMembership.hpp" #include "Node.hpp" +#include "Network.hpp" namespace ZeroTier { @@ -159,11 +159,8 @@ std::vector
Multicaster::getMembers(uint64_t nwid,const MulticastGroup void Multicaster::send( void *tPtr, - unsigned int limit, int64_t now, - uint64_t nwid, - bool disableCompression, - const std::vector
&alwaysSendTo, + const SharedPtr &network, const MulticastGroup &mg, const MAC &src, unsigned int etherType, @@ -175,7 +172,7 @@ void Multicaster::send( try { Mutex::Lock _l(_groups_m); - MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)]; + MulticastGroupStatus &gs = _groups[Multicaster::Key(network->id(),mg)]; if (!gs.members.empty()) { // Allocate a memory buffer if group is monstrous @@ -193,6 +190,10 @@ void Multicaster::send( } } + Address activeBridges[ZT_MAX_NETWORK_SPECIALISTS]; + const unsigned int activeBridgeCount = network->config().activeBridges(activeBridges); + const unsigned int limit = network->config().multicastLimit; + if (gs.members.size() >= limit) { // Skip queue if we already have enough members to complete the send operation OutboundMulticast out; @@ -200,8 +201,8 @@ void Multicaster::send( out.init( RR, now, - nwid, - disableCompression, + network->id(), + network->config().disableCompression(), limit, 1, // we'll still gather a little from peers to keep multicast list fresh src, @@ -212,9 +213,9 @@ void Multicaster::send( unsigned int count = 0; - for(std::vector
::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { - if (*ast != RR->identity.address()) { - out.sendOnly(RR,tPtr,*ast); // optimization: don't use dedup log if it's a one-pass send + for(unsigned int i=0;iidentity.address()) { + out.sendOnly(RR,tPtr,activeBridges[i]); // optimization: don't use dedup log if it's a one-pass send if (++count >= limit) break; } @@ -222,40 +223,59 @@ void Multicaster::send( unsigned long idx = 0; while ((count < limit)&&(idx < gs.members.size())) { - Address ma(gs.members[indexes[idx++]].address); - if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) { + const Address ma(gs.members[indexes[idx++]].address); + if (std::find(activeBridges,activeBridges + activeBridgeCount,ma) == (activeBridges + activeBridgeCount)) { out.sendOnly(RR,tPtr,ma); // optimization: don't use dedup log if it's a one-pass send ++count; } } } else { - unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1; + const unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1; if ((gs.members.empty())||((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY)) { gs.lastExplicitGather = now; Address explicitGatherPeers[16]; unsigned int numExplicitGatherPeers = 0; + SharedPtr bestRoot(RR->topology->getUpstreamPeer()); if (bestRoot) explicitGatherPeers[numExplicitGatherPeers++] = bestRoot->address(); - explicitGatherPeers[numExplicitGatherPeers++] = Network::controllerFor(nwid); - SharedPtr network(RR->node->network(nwid)); - if (network) { - std::vector
anchors(network->config().anchors()); - for(std::vector
::const_iterator a(anchors.begin());a!=anchors.end();++a) { - if (*a != RR->identity.address()) { - explicitGatherPeers[numExplicitGatherPeers++] = *a; - if (numExplicitGatherPeers == 16) - break; - } + + explicitGatherPeers[numExplicitGatherPeers++] = network->controller(); + + Address ac[ZT_MAX_NETWORK_SPECIALISTS]; + const unsigned int accnt = network->config().alwaysContactAddresses(ac); + unsigned int shuffled[ZT_MAX_NETWORK_SPECIALISTS]; + for(unsigned int i=0;i>1;inode->prng(); + const unsigned int x1 = shuffled[(unsigned int)x % accnt]; + const unsigned int x2 = shuffled[(unsigned int)(x >> 32) % accnt]; + const unsigned int tmp = shuffled[x1]; + shuffled[x1] = shuffled[x2]; + shuffled[x2] = tmp; + } + for(unsigned int i=0;i anchors(network->config().anchors()); + for(std::vector
::const_iterator a(anchors.begin());a!=anchors.end();++a) { + if (*a != RR->identity.address()) { + explicitGatherPeers[numExplicitGatherPeers++] = *a; + if (numExplicitGatherPeers == 16) + break; } } for(unsigned int k=0;kconfig().com) ? &(network->config().com) : (const CertificateOfMembership *)0) : (const CertificateOfMembership *)0; Packet outp(explicitGatherPeers[k],RR->identity.address(),Packet::VERB_MULTICAST_GATHER); - outp.append(nwid); + outp.append(network->id()); outp.append((uint8_t)((com) ? 0x01 : 0x00)); mg.mac().appendTo(outp); outp.append((uint32_t)mg.adi()); @@ -273,8 +293,8 @@ void Multicaster::send( out.init( RR, now, - nwid, - disableCompression, + network->id(), + network->config().disableCompression(), limit, gatherLimit, src, @@ -285,9 +305,9 @@ void Multicaster::send( unsigned int count = 0; - for(std::vector
::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { - if (*ast != RR->identity.address()) { - out.sendAndLog(RR,tPtr,*ast); + for(unsigned int i=0;iidentity.address()) { + out.sendAndLog(RR,tPtr,activeBridges[i]); if (++count >= limit) break; } @@ -296,7 +316,7 @@ void Multicaster::send( unsigned long idx = 0; while ((count < limit)&&(idx < gs.members.size())) { Address ma(gs.members[indexes[idx++]].address); - if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) { + if (std::find(activeBridges,activeBridges + activeBridgeCount,ma) == (activeBridges + activeBridgeCount)) { out.sendAndLog(RR,tPtr,ma); ++count; } -- cgit v1.2.3