summaryrefslogtreecommitdiff
path: root/node/Multicaster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/Multicaster.cpp')
-rw-r--r--node/Multicaster.cpp267
1 files changed, 182 insertions, 85 deletions
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp
index e1d4567a..753e4ee0 100644
--- a/node/Multicaster.cpp
+++ b/node/Multicaster.cpp
@@ -1,6 +1,6 @@
/*
* ZeroTier One - Network Virtualization Everywhere
- * Copyright (C) 2011-2016 ZeroTier, Inc. https://www.zerotier.com/
+ * Copyright (C) 2011-2018 ZeroTier, Inc. https://www.zerotier.com/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -14,13 +14,20 @@
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * You can be released from the requirements of the license by purchasing
+ * a commercial license. Buying such a license is mandatory as soon as you
+ * develop commercial closed-source software that incorporates or links
+ * directly against ZeroTier software without disclosing the source code
+ * of your own application.
*/
#include <algorithm>
#include "Constants.hpp"
#include "RuntimeEnvironment.hpp"
-#include "SharedPtr.hpp"
#include "Multicaster.hpp"
#include "Topology.hpp"
#include "Switch.hpp"
@@ -29,13 +36,14 @@
#include "C25519.hpp"
#include "CertificateOfMembership.hpp"
#include "Node.hpp"
+#include "Network.hpp"
namespace ZeroTier {
Multicaster::Multicaster(const RuntimeEnvironment *renv) :
RR(renv),
- _groups(1024),
- _groups_m()
+ _groups(256),
+ _gatherAuth(256)
{
}
@@ -43,14 +51,14 @@ Multicaster::~Multicaster()
{
}
-void Multicaster::addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const void *addresses,unsigned int count,unsigned int totalKnown)
+void Multicaster::addMultiple(void *tPtr,int64_t now,uint64_t nwid,const MulticastGroup &mg,const void *addresses,unsigned int count,unsigned int totalKnown)
{
const unsigned char *p = (const unsigned char *)addresses;
const unsigned char *e = p + (5 * count);
Mutex::Lock _l(_groups_m);
MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)];
while (p != e) {
- _add(now,nwid,mg,gs,Address(p,5));
+ _add(tPtr,now,nwid,mg,gs,Address(p,5));
p += 5;
}
}
@@ -103,7 +111,7 @@ unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const
// Members are returned in random order so that repeated gather queries
// will return different subsets of a large multicast group.
k = 0;
- while ((added < limit)&&(k < s->members.size())&&((appendTo.size() + ZT_ADDRESS_LENGTH) <= ZT_UDP_DEFAULT_PAYLOAD_MTU)) {
+ while ((added < limit)&&(k < s->members.size())&&((appendTo.size() + ZT_ADDRESS_LENGTH) <= ZT_PROTO_MAX_PACKET_LENGTH)) {
rptr = (unsigned int)RR->node->prng();
restart_member_scan:
@@ -131,8 +139,6 @@ restart_member_scan:
appendTo.setAt(totalAt,(uint32_t)totalKnown);
appendTo.setAt(addedAt,(uint16_t)added);
- //TRACE("..MC Multicaster::gather() attached %u of %u peers for %.16llx/%s (2)",n,(unsigned int)(gs->second.members.size() - skipped),nwid,mg.toString().c_str());
-
return added;
}
@@ -152,23 +158,67 @@ std::vector<Address> Multicaster::getMembers(uint64_t nwid,const MulticastGroup
}
void Multicaster::send(
- const CertificateOfMembership *com,
- unsigned int limit,
- uint64_t now,
- uint64_t nwid,
- const std::vector<Address> &alwaysSendTo,
+ void *tPtr,
+ int64_t now,
+ const SharedPtr<Network> &network,
+ const Address &origin,
const MulticastGroup &mg,
const MAC &src,
unsigned int etherType,
const void *data,
unsigned int len)
{
- unsigned long idxbuf[8194];
+ unsigned long idxbuf[4096];
unsigned long *indexes = idxbuf;
+ // If we're in hub-and-spoke designated multicast replication mode, see if we
+ // have a multicast replicator active. If so, pick the best and send it
+ // there. If we are a multicast replicator or if none are alive, fall back
+ // to sender replication. Note that bridges do not do this since this would
+ // break bridge route learning. This is sort of an edge case limitation of
+ // the current protocol and could be fixed, but fixing it would add more
+ // complexity than the fix is probably worth. Bridges are generally high
+ // bandwidth nodes.
+ if (!network->config().isActiveBridge(RR->identity.address())) {
+ Address multicastReplicators[ZT_MAX_NETWORK_SPECIALISTS];
+ const unsigned int multicastReplicatorCount = network->config().multicastReplicators(multicastReplicators);
+ if (multicastReplicatorCount) {
+ if (std::find(multicastReplicators,multicastReplicators + multicastReplicatorCount,RR->identity.address()) == (multicastReplicators + multicastReplicatorCount)) {
+ SharedPtr<Peer> bestMulticastReplicator;
+ SharedPtr<Path> bestMulticastReplicatorPath;
+ unsigned int bestMulticastReplicatorLatency = 0xffff;
+ for(unsigned int i=0;i<multicastReplicatorCount;++i) {
+ const SharedPtr<Peer> p(RR->topology->getPeerNoCache(multicastReplicators[i]));
+ if ((p)&&(p->isAlive(now))) {
+ const SharedPtr<Path> pp(p->getBestPath(now,false));
+ if ((pp)&&(pp->latency() < bestMulticastReplicatorLatency)) {
+ bestMulticastReplicatorLatency = pp->latency();
+ bestMulticastReplicatorPath = pp;
+ bestMulticastReplicator = p;
+ }
+ }
+ }
+ if (bestMulticastReplicator) {
+ Packet outp(bestMulticastReplicator->address(),RR->identity.address(),Packet::VERB_MULTICAST_FRAME);
+ outp.append((uint64_t)network->id());
+ outp.append((uint8_t)0x0c); // includes source MAC | please replicate
+ ((src) ? src : MAC(RR->identity.address(),network->id())).appendTo(outp);
+ mg.mac().appendTo(outp);
+ outp.append((uint32_t)mg.adi());
+ outp.append((uint16_t)etherType);
+ outp.append(data,len);
+ if (!network->config().disableCompression()) outp.compress();
+ outp.armor(bestMulticastReplicator->key(),true);
+ bestMulticastReplicatorPath->send(RR,tPtr,outp.data(),outp.size(),now);
+ return;
+ }
+ }
+ }
+ }
+
try {
Mutex::Lock _l(_groups_m);
- MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)];
+ MulticastGroupStatus &gs = _groups[Multicaster::Key(network->id(),mg)];
if (!gs.members.empty()) {
// Allocate a memory buffer if group is monstrous
@@ -186,6 +236,10 @@ void Multicaster::send(
}
}
+ Address activeBridges[ZT_MAX_NETWORK_SPECIALISTS];
+ const unsigned int activeBridgeCount = network->config().activeBridges(activeBridges);
+ const unsigned int limit = network->config().multicastLimit;
+
if (gs.members.size() >= limit) {
// Skip queue if we already have enough members to complete the send operation
OutboundMulticast out;
@@ -193,8 +247,8 @@ void Multicaster::send(
out.init(
RR,
now,
- nwid,
- com,
+ network->id(),
+ network->config().disableCompression(),
limit,
1, // we'll still gather a little from peers to keep multicast list fresh
src,
@@ -205,9 +259,9 @@ void Multicaster::send(
unsigned int count = 0;
- for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
- if (*ast != RR->identity.address()) {
- out.sendOnly(RR,*ast); // optimization: don't use dedup log if it's a one-pass send
+ for(unsigned int i=0;i<activeBridgeCount;++i) {
+ if ((activeBridges[i] != RR->identity.address())&&(activeBridges[i] != origin)) {
+ out.sendOnly(RR,tPtr,activeBridges[i]); // optimization: don't use dedup log if it's a one-pass send
if (++count >= limit)
break;
}
@@ -215,46 +269,68 @@ void Multicaster::send(
unsigned long idx = 0;
while ((count < limit)&&(idx < gs.members.size())) {
- Address ma(gs.members[indexes[idx++]].address);
- if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) {
- out.sendOnly(RR,ma); // optimization: don't use dedup log if it's a one-pass send
+ const Address ma(gs.members[indexes[idx++]].address);
+ if ((std::find(activeBridges,activeBridges + activeBridgeCount,ma) == (activeBridges + activeBridgeCount))&&(ma != origin)) {
+ out.sendOnly(RR,tPtr,ma); // optimization: don't use dedup log if it's a one-pass send
++count;
}
}
} else {
- unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
+ const unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
if ((gs.members.empty())||((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY)) {
gs.lastExplicitGather = now;
- SharedPtr<Peer> explicitGatherPeers[2];
- explicitGatherPeers[0] = RR->topology->getBestRoot();
- const Address nwidc(Network::controllerFor(nwid));
- if (nwidc != RR->identity.address())
- explicitGatherPeers[1] = RR->topology->getPeer(nwidc);
- for(unsigned int k=0;k<2;++k) {
- const SharedPtr<Peer> &p = explicitGatherPeers[k];
- if (!p)
- continue;
- //TRACE(">>MC upstream GATHER up to %u for group %.16llx/%s",gatherLimit,nwid,mg.toString().c_str());
-
- const CertificateOfMembership *com = (CertificateOfMembership *)0;
- {
- SharedPtr<Network> nw(RR->node->network(nwid));
- if ((nw)&&(nw->hasConfig())&&(nw->config().com)&&(nw->config().isPrivate())&&(p->needsOurNetworkMembershipCertificate(nwid,now,true)))
- com = &(nw->config().com);
+
+ Address explicitGatherPeers[16];
+ unsigned int numExplicitGatherPeers = 0;
+
+ SharedPtr<Peer> bestRoot(RR->topology->getUpstreamPeer());
+ if (bestRoot)
+ explicitGatherPeers[numExplicitGatherPeers++] = bestRoot->address();
+
+ explicitGatherPeers[numExplicitGatherPeers++] = network->controller();
+
+ Address ac[ZT_MAX_NETWORK_SPECIALISTS];
+ const unsigned int accnt = network->config().alwaysContactAddresses(ac);
+ unsigned int shuffled[ZT_MAX_NETWORK_SPECIALISTS];
+ for(unsigned int i=0;i<accnt;++i)
+ shuffled[i] = i;
+ for(unsigned int i=0,k=accnt>>1;i<k;++i) {
+ const uint64_t x = RR->node->prng();
+ const unsigned int x1 = shuffled[(unsigned int)x % accnt];
+ const unsigned int x2 = shuffled[(unsigned int)(x >> 32) % accnt];
+ const unsigned int tmp = shuffled[x1];
+ shuffled[x1] = shuffled[x2];
+ shuffled[x2] = tmp;
+ }
+ for(unsigned int i=0;i<accnt;++i) {
+ explicitGatherPeers[numExplicitGatherPeers++] = ac[shuffled[i]];
+ if (numExplicitGatherPeers == 16)
+ break;
+ }
+
+ std::vector<Address> anchors(network->config().anchors());
+ for(std::vector<Address>::const_iterator a(anchors.begin());a!=anchors.end();++a) {
+ if (*a != RR->identity.address()) {
+ explicitGatherPeers[numExplicitGatherPeers++] = *a;
+ if (numExplicitGatherPeers == 16)
+ break;
}
+ }
- Packet outp(p->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
- outp.append(nwid);
- outp.append((uint8_t)(com ? 0x01 : 0x00));
+ for(unsigned int k=0;k<numExplicitGatherPeers;++k) {
+ const CertificateOfMembership *com = (network) ? ((network->config().com) ? &(network->config().com) : (const CertificateOfMembership *)0) : (const CertificateOfMembership *)0;
+ Packet outp(explicitGatherPeers[k],RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
+ outp.append(network->id());
+ outp.append((uint8_t)((com) ? 0x01 : 0x00));
mg.mac().appendTo(outp);
outp.append((uint32_t)mg.adi());
outp.append((uint32_t)gatherLimit);
if (com)
com->serialize(outp);
- RR->sw->send(outp,true,0);
+ RR->node->expectReplyTo(outp.packetId());
+ RR->sw->send(tPtr,outp,true);
}
- gatherLimit = 0;
}
gs.txQueue.push_back(OutboundMulticast());
@@ -263,8 +339,8 @@ void Multicaster::send(
out.init(
RR,
now,
- nwid,
- com,
+ network->id(),
+ network->config().disableCompression(),
limit,
gatherLimit,
src,
@@ -273,11 +349,14 @@ void Multicaster::send(
data,
len);
+ if (origin)
+ out.logAsSent(origin);
+
unsigned int count = 0;
- for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
- if (*ast != RR->identity.address()) {
- out.sendAndLog(RR,*ast);
+ for(unsigned int i=0;i<activeBridgeCount;++i) {
+ if (activeBridges[i] != RR->identity.address()) {
+ out.sendAndLog(RR,tPtr,activeBridges[i]);
if (++count >= limit)
break;
}
@@ -286,8 +365,8 @@ void Multicaster::send(
unsigned long idx = 0;
while ((count < limit)&&(idx < gs.members.size())) {
Address ma(gs.members[indexes[idx++]].address);
- if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) {
- out.sendAndLog(RR,ma);
+ if (std::find(activeBridges,activeBridges + activeBridgeCount,ma) == (activeBridges + activeBridgeCount)) {
+ out.sendAndLog(RR,tPtr,ma);
++count;
}
}
@@ -299,45 +378,65 @@ void Multicaster::send(
delete [] indexes;
}
-void Multicaster::clean(uint64_t now)
+void Multicaster::clean(int64_t now)
{
- Mutex::Lock _l(_groups_m);
-
- Multicaster::Key *k = (Multicaster::Key *)0;
- MulticastGroupStatus *s = (MulticastGroupStatus *)0;
- Hashtable<Multicaster::Key,MulticastGroupStatus>::Iterator mm(_groups);
- while (mm.next(k,s)) {
- for(std::list<OutboundMulticast>::iterator tx(s->txQueue.begin());tx!=s->txQueue.end();) {
- if ((tx->expired(now))||(tx->atLimit()))
- s->txQueue.erase(tx++);
- else ++tx;
- }
+ {
+ Mutex::Lock _l(_groups_m);
+ Multicaster::Key *k = (Multicaster::Key *)0;
+ MulticastGroupStatus *s = (MulticastGroupStatus *)0;
+ Hashtable<Multicaster::Key,MulticastGroupStatus>::Iterator mm(_groups);
+ while (mm.next(k,s)) {
+ for(std::list<OutboundMulticast>::iterator tx(s->txQueue.begin());tx!=s->txQueue.end();) {
+ if ((tx->expired(now))||(tx->atLimit()))
+ s->txQueue.erase(tx++);
+ else ++tx;
+ }
- unsigned long count = 0;
- {
- std::vector<MulticastGroupMember>::iterator reader(s->members.begin());
- std::vector<MulticastGroupMember>::iterator writer(reader);
- while (reader != s->members.end()) {
- if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) {
- *writer = *reader;
- ++writer;
- ++count;
+ unsigned long count = 0;
+ {
+ std::vector<MulticastGroupMember>::iterator reader(s->members.begin());
+ std::vector<MulticastGroupMember>::iterator writer(reader);
+ while (reader != s->members.end()) {
+ if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) {
+ *writer = *reader;
+ ++writer;
+ ++count;
+ }
+ ++reader;
}
- ++reader;
+ }
+
+ if (count) {
+ s->members.resize(count);
+ } else if (s->txQueue.empty()) {
+ _groups.erase(*k);
+ } else {
+ s->members.clear();
}
}
+ }
- if (count) {
- s->members.resize(count);
- } else if (s->txQueue.empty()) {
- _groups.erase(*k);
- } else {
- s->members.clear();
+ {
+ Mutex::Lock _l(_gatherAuth_m);
+ _GatherAuthKey *k = (_GatherAuthKey *)0;
+ uint64_t *ts = NULL;
+ Hashtable<_GatherAuthKey,uint64_t>::Iterator i(_gatherAuth);
+ while (i.next(k,ts)) {
+ if ((now - *ts) >= ZT_MULTICAST_CREDENTIAL_EXPIRATON)
+ _gatherAuth.erase(*k);
}
}
}
-void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &member)
+void Multicaster::addCredential(void *tPtr,const CertificateOfMembership &com,bool alreadyValidated)
+{
+ if ((alreadyValidated)||(com.verify(RR,tPtr) == 0)) {
+ Mutex::Lock _l(_gatherAuth_m);
+ _gatherAuth[_GatherAuthKey(com.networkId(),com.issuedTo())] = RR->node->now();
+ }
+}
+
+void Multicaster::_add(void *tPtr,int64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &member)
{
// assumes _groups_m is locked
@@ -354,13 +453,11 @@ void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,Multi
gs.members.push_back(MulticastGroupMember(member,now));
- //TRACE("..MC %s joined multicast group %.16llx/%s via %s",member.toString().c_str(),nwid,mg.toString().c_str(),((learnedFrom) ? learnedFrom.toString().c_str() : "(direct)"));
-
for(std::list<OutboundMulticast>::iterator tx(gs.txQueue.begin());tx!=gs.txQueue.end();) {
if (tx->atLimit())
gs.txQueue.erase(tx++);
else {
- tx->sendIfNew(RR,member);
+ tx->sendIfNew(RR,tPtr,member);
if (tx->atLimit())
gs.txQueue.erase(tx++);
else ++tx;