summaryrefslogtreecommitdiff
path: root/node/Multicaster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/Multicaster.cpp')
-rw-r--r--node/Multicaster.cpp127
1 files changed, 75 insertions, 52 deletions
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp
index e1d4567a..f8d58501 100644
--- a/node/Multicaster.cpp
+++ b/node/Multicaster.cpp
@@ -34,8 +34,8 @@ namespace ZeroTier {
Multicaster::Multicaster(const RuntimeEnvironment *renv) :
RR(renv),
- _groups(1024),
- _groups_m()
+ _groups(256),
+ _gatherAuth(256)
{
}
@@ -152,10 +152,10 @@ std::vector<Address> Multicaster::getMembers(uint64_t nwid,const MulticastGroup
}
void Multicaster::send(
- const CertificateOfMembership *com,
unsigned int limit,
uint64_t now,
uint64_t nwid,
+ bool disableCompression,
const std::vector<Address> &alwaysSendTo,
const MulticastGroup &mg,
const MAC &src,
@@ -194,7 +194,7 @@ void Multicaster::send(
RR,
now,
nwid,
- com,
+ disableCompression,
limit,
1, // we'll still gather a little from peers to keep multicast list fresh
src,
@@ -226,35 +226,38 @@ void Multicaster::send(
if ((gs.members.empty())||((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY)) {
gs.lastExplicitGather = now;
- SharedPtr<Peer> explicitGatherPeers[2];
- explicitGatherPeers[0] = RR->topology->getBestRoot();
- const Address nwidc(Network::controllerFor(nwid));
- if (nwidc != RR->identity.address())
- explicitGatherPeers[1] = RR->topology->getPeer(nwidc);
- for(unsigned int k=0;k<2;++k) {
- const SharedPtr<Peer> &p = explicitGatherPeers[k];
- if (!p)
- continue;
- //TRACE(">>MC upstream GATHER up to %u for group %.16llx/%s",gatherLimit,nwid,mg.toString().c_str());
-
- const CertificateOfMembership *com = (CertificateOfMembership *)0;
- {
- SharedPtr<Network> nw(RR->node->network(nwid));
- if ((nw)&&(nw->hasConfig())&&(nw->config().com)&&(nw->config().isPrivate())&&(p->needsOurNetworkMembershipCertificate(nwid,now,true)))
- com = &(nw->config().com);
+
+ Address explicitGatherPeers[16];
+ unsigned int numExplicitGatherPeers = 0;
+ SharedPtr<Peer> bestRoot(RR->topology->getUpstreamPeer());
+ if (bestRoot)
+ explicitGatherPeers[numExplicitGatherPeers++] = bestRoot->address();
+ explicitGatherPeers[numExplicitGatherPeers++] = Network::controllerFor(nwid);
+ SharedPtr<Network> network(RR->node->network(nwid));
+ if (network) {
+ std::vector<Address> anchors(network->config().anchors());
+ for(std::vector<Address>::const_iterator a(anchors.begin());a!=anchors.end();++a) {
+ if (*a != RR->identity.address()) {
+ explicitGatherPeers[numExplicitGatherPeers++] = *a;
+ if (numExplicitGatherPeers == 16)
+ break;
+ }
}
+ }
- Packet outp(p->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
+ for(unsigned int k=0;k<numExplicitGatherPeers;++k) {
+ const CertificateOfMembership *com = (network) ? ((network->config().com) ? &(network->config().com) : (const CertificateOfMembership *)0) : (const CertificateOfMembership *)0;
+ Packet outp(explicitGatherPeers[k],RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
outp.append(nwid);
- outp.append((uint8_t)(com ? 0x01 : 0x00));
+ outp.append((uint8_t)((com) ? 0x01 : 0x00));
mg.mac().appendTo(outp);
outp.append((uint32_t)mg.adi());
outp.append((uint32_t)gatherLimit);
if (com)
com->serialize(outp);
- RR->sw->send(outp,true,0);
+ RR->node->expectReplyTo(outp.packetId());
+ RR->sw->send(outp,true);
}
- gatherLimit = 0;
}
gs.txQueue.push_back(OutboundMulticast());
@@ -264,7 +267,7 @@ void Multicaster::send(
RR,
now,
nwid,
- com,
+ disableCompression,
limit,
gatherLimit,
src,
@@ -301,42 +304,62 @@ void Multicaster::send(
void Multicaster::clean(uint64_t now)
{
- Mutex::Lock _l(_groups_m);
-
- Multicaster::Key *k = (Multicaster::Key *)0;
- MulticastGroupStatus *s = (MulticastGroupStatus *)0;
- Hashtable<Multicaster::Key,MulticastGroupStatus>::Iterator mm(_groups);
- while (mm.next(k,s)) {
- for(std::list<OutboundMulticast>::iterator tx(s->txQueue.begin());tx!=s->txQueue.end();) {
- if ((tx->expired(now))||(tx->atLimit()))
- s->txQueue.erase(tx++);
- else ++tx;
- }
+ {
+ Mutex::Lock _l(_groups_m);
+ Multicaster::Key *k = (Multicaster::Key *)0;
+ MulticastGroupStatus *s = (MulticastGroupStatus *)0;
+ Hashtable<Multicaster::Key,MulticastGroupStatus>::Iterator mm(_groups);
+ while (mm.next(k,s)) {
+ for(std::list<OutboundMulticast>::iterator tx(s->txQueue.begin());tx!=s->txQueue.end();) {
+ if ((tx->expired(now))||(tx->atLimit()))
+ s->txQueue.erase(tx++);
+ else ++tx;
+ }
- unsigned long count = 0;
- {
- std::vector<MulticastGroupMember>::iterator reader(s->members.begin());
- std::vector<MulticastGroupMember>::iterator writer(reader);
- while (reader != s->members.end()) {
- if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) {
- *writer = *reader;
- ++writer;
- ++count;
+ unsigned long count = 0;
+ {
+ std::vector<MulticastGroupMember>::iterator reader(s->members.begin());
+ std::vector<MulticastGroupMember>::iterator writer(reader);
+ while (reader != s->members.end()) {
+ if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) {
+ *writer = *reader;
+ ++writer;
+ ++count;
+ }
+ ++reader;
}
- ++reader;
+ }
+
+ if (count) {
+ s->members.resize(count);
+ } else if (s->txQueue.empty()) {
+ _groups.erase(*k);
+ } else {
+ s->members.clear();
}
}
+ }
- if (count) {
- s->members.resize(count);
- } else if (s->txQueue.empty()) {
- _groups.erase(*k);
- } else {
- s->members.clear();
+ {
+ Mutex::Lock _l(_gatherAuth_m);
+ _GatherAuthKey *k = (_GatherAuthKey *)0;
+ uint64_t *ts = NULL;
+ Hashtable<_GatherAuthKey,uint64_t>::Iterator i(_gatherAuth);
+ while (i.next(k,ts)) {
+ if ((now - *ts) >= ZT_MULTICAST_CREDENTIAL_EXPIRATON)
+ _gatherAuth.erase(*k);
}
}
}
+void Multicaster::addCredential(const CertificateOfMembership &com,bool alreadyValidated)
+{
+ if ((alreadyValidated)||(com.verify(RR) == 0)) {
+ Mutex::Lock _l(_gatherAuth_m);
+ _gatherAuth[_GatherAuthKey(com.networkId(),com.issuedTo())] = RR->node->now();
+ }
+}
+
void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &member)
{
// assumes _groups_m is locked