diff options
Diffstat (limited to 'node/Multicaster.cpp')
-rw-r--r-- | node/Multicaster.cpp | 127 |
1 files changed, 75 insertions, 52 deletions
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp index e1d4567a..f8d58501 100644 --- a/node/Multicaster.cpp +++ b/node/Multicaster.cpp @@ -34,8 +34,8 @@ namespace ZeroTier { Multicaster::Multicaster(const RuntimeEnvironment *renv) : RR(renv), - _groups(1024), - _groups_m() + _groups(256), + _gatherAuth(256) { } @@ -152,10 +152,10 @@ std::vector<Address> Multicaster::getMembers(uint64_t nwid,const MulticastGroup } void Multicaster::send( - const CertificateOfMembership *com, unsigned int limit, uint64_t now, uint64_t nwid, + bool disableCompression, const std::vector<Address> &alwaysSendTo, const MulticastGroup &mg, const MAC &src, @@ -194,7 +194,7 @@ void Multicaster::send( RR, now, nwid, - com, + disableCompression, limit, 1, // we'll still gather a little from peers to keep multicast list fresh src, @@ -226,35 +226,38 @@ void Multicaster::send( if ((gs.members.empty())||((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY)) { gs.lastExplicitGather = now; - SharedPtr<Peer> explicitGatherPeers[2]; - explicitGatherPeers[0] = RR->topology->getBestRoot(); - const Address nwidc(Network::controllerFor(nwid)); - if (nwidc != RR->identity.address()) - explicitGatherPeers[1] = RR->topology->getPeer(nwidc); - for(unsigned int k=0;k<2;++k) { - const SharedPtr<Peer> &p = explicitGatherPeers[k]; - if (!p) - continue; - //TRACE(">>MC upstream GATHER up to %u for group %.16llx/%s",gatherLimit,nwid,mg.toString().c_str()); - - const CertificateOfMembership *com = (CertificateOfMembership *)0; - { - SharedPtr<Network> nw(RR->node->network(nwid)); - if ((nw)&&(nw->hasConfig())&&(nw->config().com)&&(nw->config().isPrivate())&&(p->needsOurNetworkMembershipCertificate(nwid,now,true))) - com = &(nw->config().com); + + Address explicitGatherPeers[16]; + unsigned int numExplicitGatherPeers = 0; + SharedPtr<Peer> bestRoot(RR->topology->getUpstreamPeer()); + if (bestRoot) + explicitGatherPeers[numExplicitGatherPeers++] = bestRoot->address(); + explicitGatherPeers[numExplicitGatherPeers++] = Network::controllerFor(nwid); + SharedPtr<Network> network(RR->node->network(nwid)); + if (network) { + std::vector<Address> anchors(network->config().anchors()); + for(std::vector<Address>::const_iterator a(anchors.begin());a!=anchors.end();++a) { + if (*a != RR->identity.address()) { + explicitGatherPeers[numExplicitGatherPeers++] = *a; + if (numExplicitGatherPeers == 16) + break; + } } + } - Packet outp(p->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER); + for(unsigned int k=0;k<numExplicitGatherPeers;++k) { + const CertificateOfMembership *com = (network) ? ((network->config().com) ? &(network->config().com) : (const CertificateOfMembership *)0) : (const CertificateOfMembership *)0; + Packet outp(explicitGatherPeers[k],RR->identity.address(),Packet::VERB_MULTICAST_GATHER); outp.append(nwid); - outp.append((uint8_t)(com ? 0x01 : 0x00)); + outp.append((uint8_t)((com) ? 0x01 : 0x00)); mg.mac().appendTo(outp); outp.append((uint32_t)mg.adi()); outp.append((uint32_t)gatherLimit); if (com) com->serialize(outp); - RR->sw->send(outp,true,0); + RR->node->expectReplyTo(outp.packetId()); + RR->sw->send(outp,true); } - gatherLimit = 0; } gs.txQueue.push_back(OutboundMulticast()); @@ -264,7 +267,7 @@ void Multicaster::send( RR, now, nwid, - com, + disableCompression, limit, gatherLimit, src, @@ -301,42 +304,62 @@ void Multicaster::send( void Multicaster::clean(uint64_t now) { - Mutex::Lock _l(_groups_m); - - Multicaster::Key *k = (Multicaster::Key *)0; - MulticastGroupStatus *s = (MulticastGroupStatus *)0; - Hashtable<Multicaster::Key,MulticastGroupStatus>::Iterator mm(_groups); - while (mm.next(k,s)) { - for(std::list<OutboundMulticast>::iterator tx(s->txQueue.begin());tx!=s->txQueue.end();) { - if ((tx->expired(now))||(tx->atLimit())) - s->txQueue.erase(tx++); - else ++tx; - } + { + Mutex::Lock _l(_groups_m); + Multicaster::Key *k = (Multicaster::Key *)0; + MulticastGroupStatus *s = (MulticastGroupStatus *)0; + Hashtable<Multicaster::Key,MulticastGroupStatus>::Iterator mm(_groups); + while (mm.next(k,s)) { + for(std::list<OutboundMulticast>::iterator tx(s->txQueue.begin());tx!=s->txQueue.end();) { + if ((tx->expired(now))||(tx->atLimit())) + s->txQueue.erase(tx++); + else ++tx; + } - unsigned long count = 0; - { - std::vector<MulticastGroupMember>::iterator reader(s->members.begin()); - std::vector<MulticastGroupMember>::iterator writer(reader); - while (reader != s->members.end()) { - if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) { - *writer = *reader; - ++writer; - ++count; + unsigned long count = 0; + { + std::vector<MulticastGroupMember>::iterator reader(s->members.begin()); + std::vector<MulticastGroupMember>::iterator writer(reader); + while (reader != s->members.end()) { + if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) { + *writer = *reader; + ++writer; + ++count; + } + ++reader; } - ++reader; + } + + if (count) { + s->members.resize(count); + } else if (s->txQueue.empty()) { + _groups.erase(*k); + } else { + s->members.clear(); } } + } - if (count) { - s->members.resize(count); - } else if (s->txQueue.empty()) { - _groups.erase(*k); - } else { - s->members.clear(); + { + Mutex::Lock _l(_gatherAuth_m); + _GatherAuthKey *k = (_GatherAuthKey *)0; + uint64_t *ts = NULL; + Hashtable<_GatherAuthKey,uint64_t>::Iterator i(_gatherAuth); + while (i.next(k,ts)) { + if ((now - *ts) >= ZT_MULTICAST_CREDENTIAL_EXPIRATON) + _gatherAuth.erase(*k); } } } +void Multicaster::addCredential(const CertificateOfMembership &com,bool alreadyValidated) +{ + if ((alreadyValidated)||(com.verify(RR) == 0)) { + Mutex::Lock _l(_gatherAuth_m); + _gatherAuth[_GatherAuthKey(com.networkId(),com.issuedTo())] = RR->node->now(); + } +} + void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &member) { // assumes _groups_m is locked |