diff options
-rw-r--r-- | node/Multicaster.cpp | 206 |
1 files changed, 103 insertions, 103 deletions
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp index 6c098cc1..2785fcc4 100644 --- a/node/Multicaster.cpp +++ b/node/Multicaster.cpp @@ -178,127 +178,127 @@ void Multicaster::send( indexes[j] = indexes[i]; indexes[i] = tmp; } + } - if (gs.members.size() >= limit) { - // If we already have enough members, just send and we're done. We can - // skip the TX queue and skip the overhead of maintaining a send log by - // using sendOnly(). - OutboundMulticast out; - - out.init( - RR, - now, - nwid, - com, - limit, - 2, // we'll still gather a little from peers to keep multicast list fresh - src, - mg, - etherType, - data, - len); - - unsigned int count = 0; - - for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { - { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) - SharedPtr<Peer> p(RR->topology->getPeer(*ast)); - if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) - continue; - } + if (gs.members.size() >= limit) { + // If we already have enough members, just send and we're done. We can + // skip the TX queue and skip the overhead of maintaining a send log by + // using sendOnly(). + OutboundMulticast out; + + out.init( + RR, + now, + nwid, + com, + limit, + 2, // we'll still gather a little from peers to keep multicast list fresh + src, + mg, + etherType, + data, + len); + + unsigned int count = 0; - out.sendOnly(RR,*ast); - if (++count >= limit) - break; + for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { + { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) + SharedPtr<Peer> p(RR->topology->getPeer(*ast)); + if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) + continue; } - unsigned long idx = 0; - while (count < limit) { - const MulticastGroupMember &m = gs.members[indexes[idx++]]; + out.sendOnly(RR,*ast); + if (++count >= limit) + break; + } - { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) - SharedPtr<Peer> p(RR->topology->getPeer(m.address)); - if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) - continue; - } + unsigned long idx = 0; + while (count < limit) { + const MulticastGroupMember &m = gs.members[indexes[idx++]]; - if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) { - out.sendOnly(RR,m.address); - ++count; - } + { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) + SharedPtr<Peer> p(RR->topology->getPeer(m.address)); + if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) + continue; } - } else { - unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1; - - if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) { - gs.lastExplicitGather = now; - SharedPtr<Peer> sn(RR->topology->getBestSupernode()); - if (sn) { - TRACE(">>MC GATHER up to %u in %.16llx/%s",gatherLimit,nwid,mg.toString().c_str()); - - Packet outp(sn->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER); - outp.append(nwid); - outp.append((uint8_t)0); - mg.mac().appendTo(outp); - outp.append((uint32_t)mg.adi()); - outp.append((uint32_t)gatherLimit); // +1 just means we'll have an extra in the queue if available - outp.armor(sn->key(),true); - sn->send(RR,outp.data(),outp.size(),now); - } - gatherLimit = 0; // don't need to gather from peers this time since we consulted the core + + if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) { + out.sendOnly(RR,m.address); + ++count; + } + } + } else { + unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1; + + if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) { + gs.lastExplicitGather = now; + SharedPtr<Peer> sn(RR->topology->getBestSupernode()); + if (sn) { + TRACE(">>MC GATHER up to %u in %.16llx/%s",gatherLimit,nwid,mg.toString().c_str()); + + Packet outp(sn->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER); + outp.append(nwid); + outp.append((uint8_t)0); + mg.mac().appendTo(outp); + outp.append((uint32_t)mg.adi()); + outp.append((uint32_t)gatherLimit); // +1 just means we'll have an extra in the queue if available + outp.armor(sn->key(),true); + sn->send(RR,outp.data(),outp.size(),now); } + gatherLimit = 0; // don't need to gather from peers this time since we consulted the core + } - gs.txQueue.push_back(OutboundMulticast()); - OutboundMulticast &out = gs.txQueue.back(); - - out.init( - RR, - now, - nwid, - com, - limit, - gatherLimit, - src, - mg, - etherType, - data, - len); - - unsigned int count = 0; - - for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { - { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) - SharedPtr<Peer> p(RR->topology->getPeer(*ast)); - if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) - continue; - } + gs.txQueue.push_back(OutboundMulticast()); + OutboundMulticast &out = gs.txQueue.back(); + + out.init( + RR, + now, + nwid, + com, + limit, + gatherLimit, + src, + mg, + etherType, + data, + len); + + unsigned int count = 0; - out.sendAndLog(RR,*ast); - if (++count >= limit) - break; + for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { + { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) + SharedPtr<Peer> p(RR->topology->getPeer(*ast)); + if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) + continue; } - unsigned long idx = 0; - while ((count < limit)&&(idx < gs.members.size())) { - const MulticastGroupMember &m = gs.members[indexes[idx++]]; + out.sendAndLog(RR,*ast); + if (++count >= limit) + break; + } - { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) - SharedPtr<Peer> p(RR->topology->getPeer(m.address)); - if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) - continue; - } + unsigned long idx = 0; + while ((count < limit)&&(idx < gs.members.size())) { + const MulticastGroupMember &m = gs.members[indexes[idx++]]; - if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) { - out.sendAndLog(RR,m.address); - ++count; - } + { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) + SharedPtr<Peer> p(RR->topology->getPeer(m.address)); + if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) + continue; } - } - if (indexes != idxbuf) - delete [] indexes; + if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) { + out.sendAndLog(RR,m.address); + ++count; + } + } } + if (indexes != idxbuf) + delete [] indexes; + // DEPRECATED / LEGACY / TODO: // Currently we also always send a legacy P5_MULTICAST_FRAME packet to our // supernode. Our supernode then takes care of relaying it down to <1.0.0 |