diff options
author | Adam Ierymenko <adam.ierymenko@zerotier.com> | 2018-04-25 06:39:02 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-04-25 06:39:02 -0700 |
commit | 42ec780a6f6eedef4d8b1d8218bd72fc6ed75cc0 (patch) | |
tree | 7bf86c4d92d6a0f77eced79bfc33313c62c7b6dd /node/Multicaster.cpp | |
parent | 18c9dc8a0649c866eff9f299f20fa5b19c502e52 (diff) | |
parent | 4608880fb06700822d01e9e5d6729fcdeb82b64b (diff) | |
download | infinitytier-42ec780a6f6eedef4d8b1d8218bd72fc6ed75cc0.tar.gz infinitytier-42ec780a6f6eedef4d8b1d8218bd72fc6ed75cc0.zip |
Merge branch 'dev' into netbsd-support
Diffstat (limited to 'node/Multicaster.cpp')
-rw-r--r-- | node/Multicaster.cpp | 267 |
1 files changed, 182 insertions, 85 deletions
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp index e1d4567a..753e4ee0 100644 --- a/node/Multicaster.cpp +++ b/node/Multicaster.cpp @@ -1,6 +1,6 @@ /* * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2016 ZeroTier, Inc. https://www.zerotier.com/ + * Copyright (C) 2011-2018 ZeroTier, Inc. https://www.zerotier.com/ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -14,13 +14,20 @@ * * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. */ #include <algorithm> #include "Constants.hpp" #include "RuntimeEnvironment.hpp" -#include "SharedPtr.hpp" #include "Multicaster.hpp" #include "Topology.hpp" #include "Switch.hpp" @@ -29,13 +36,14 @@ #include "C25519.hpp" #include "CertificateOfMembership.hpp" #include "Node.hpp" +#include "Network.hpp" namespace ZeroTier { Multicaster::Multicaster(const RuntimeEnvironment *renv) : RR(renv), - _groups(1024), - _groups_m() + _groups(256), + _gatherAuth(256) { } @@ -43,14 +51,14 @@ Multicaster::~Multicaster() { } -void Multicaster::addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const void *addresses,unsigned int count,unsigned int totalKnown) +void Multicaster::addMultiple(void *tPtr,int64_t now,uint64_t nwid,const MulticastGroup &mg,const void *addresses,unsigned int count,unsigned int totalKnown) { const unsigned char *p = (const unsigned char *)addresses; const unsigned char *e = p + (5 * count); Mutex::Lock _l(_groups_m); MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)]; while (p != e) { - _add(now,nwid,mg,gs,Address(p,5)); + _add(tPtr,now,nwid,mg,gs,Address(p,5)); p += 5; } } @@ -103,7 +111,7 @@ unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const // Members are returned in random order so that repeated gather queries // will return different subsets of a large multicast group. k = 0; - while ((added < limit)&&(k < s->members.size())&&((appendTo.size() + ZT_ADDRESS_LENGTH) <= ZT_UDP_DEFAULT_PAYLOAD_MTU)) { + while ((added < limit)&&(k < s->members.size())&&((appendTo.size() + ZT_ADDRESS_LENGTH) <= ZT_PROTO_MAX_PACKET_LENGTH)) { rptr = (unsigned int)RR->node->prng(); restart_member_scan: @@ -131,8 +139,6 @@ restart_member_scan: appendTo.setAt(totalAt,(uint32_t)totalKnown); appendTo.setAt(addedAt,(uint16_t)added); - //TRACE("..MC Multicaster::gather() attached %u of %u peers for %.16llx/%s (2)",n,(unsigned int)(gs->second.members.size() - skipped),nwid,mg.toString().c_str()); - return added; } @@ -152,23 +158,67 @@ std::vector<Address> Multicaster::getMembers(uint64_t nwid,const MulticastGroup } void Multicaster::send( - const CertificateOfMembership *com, - unsigned int limit, - uint64_t now, - uint64_t nwid, - const std::vector<Address> &alwaysSendTo, + void *tPtr, + int64_t now, + const SharedPtr<Network> &network, + const Address &origin, const MulticastGroup &mg, const MAC &src, unsigned int etherType, const void *data, unsigned int len) { - unsigned long idxbuf[8194]; + unsigned long idxbuf[4096]; unsigned long *indexes = idxbuf; + // If we're in hub-and-spoke designated multicast replication mode, see if we + // have a multicast replicator active. If so, pick the best and send it + // there. If we are a multicast replicator or if none are alive, fall back + // to sender replication. Note that bridges do not do this since this would + // break bridge route learning. This is sort of an edge case limitation of + // the current protocol and could be fixed, but fixing it would add more + // complexity than the fix is probably worth. Bridges are generally high + // bandwidth nodes. + if (!network->config().isActiveBridge(RR->identity.address())) { + Address multicastReplicators[ZT_MAX_NETWORK_SPECIALISTS]; + const unsigned int multicastReplicatorCount = network->config().multicastReplicators(multicastReplicators); + if (multicastReplicatorCount) { + if (std::find(multicastReplicators,multicastReplicators + multicastReplicatorCount,RR->identity.address()) == (multicastReplicators + multicastReplicatorCount)) { + SharedPtr<Peer> bestMulticastReplicator; + SharedPtr<Path> bestMulticastReplicatorPath; + unsigned int bestMulticastReplicatorLatency = 0xffff; + for(unsigned int i=0;i<multicastReplicatorCount;++i) { + const SharedPtr<Peer> p(RR->topology->getPeerNoCache(multicastReplicators[i])); + if ((p)&&(p->isAlive(now))) { + const SharedPtr<Path> pp(p->getBestPath(now,false)); + if ((pp)&&(pp->latency() < bestMulticastReplicatorLatency)) { + bestMulticastReplicatorLatency = pp->latency(); + bestMulticastReplicatorPath = pp; + bestMulticastReplicator = p; + } + } + } + if (bestMulticastReplicator) { + Packet outp(bestMulticastReplicator->address(),RR->identity.address(),Packet::VERB_MULTICAST_FRAME); + outp.append((uint64_t)network->id()); + outp.append((uint8_t)0x0c); // includes source MAC | please replicate + ((src) ? src : MAC(RR->identity.address(),network->id())).appendTo(outp); + mg.mac().appendTo(outp); + outp.append((uint32_t)mg.adi()); + outp.append((uint16_t)etherType); + outp.append(data,len); + if (!network->config().disableCompression()) outp.compress(); + outp.armor(bestMulticastReplicator->key(),true); + bestMulticastReplicatorPath->send(RR,tPtr,outp.data(),outp.size(),now); + return; + } + } + } + } + try { Mutex::Lock _l(_groups_m); - MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)]; + MulticastGroupStatus &gs = _groups[Multicaster::Key(network->id(),mg)]; if (!gs.members.empty()) { // Allocate a memory buffer if group is monstrous @@ -186,6 +236,10 @@ void Multicaster::send( } } + Address activeBridges[ZT_MAX_NETWORK_SPECIALISTS]; + const unsigned int activeBridgeCount = network->config().activeBridges(activeBridges); + const unsigned int limit = network->config().multicastLimit; + if (gs.members.size() >= limit) { // Skip queue if we already have enough members to complete the send operation OutboundMulticast out; @@ -193,8 +247,8 @@ void Multicaster::send( out.init( RR, now, - nwid, - com, + network->id(), + network->config().disableCompression(), limit, 1, // we'll still gather a little from peers to keep multicast list fresh src, @@ -205,9 +259,9 @@ void Multicaster::send( unsigned int count = 0; - for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { - if (*ast != RR->identity.address()) { - out.sendOnly(RR,*ast); // optimization: don't use dedup log if it's a one-pass send + for(unsigned int i=0;i<activeBridgeCount;++i) { + if ((activeBridges[i] != RR->identity.address())&&(activeBridges[i] != origin)) { + out.sendOnly(RR,tPtr,activeBridges[i]); // optimization: don't use dedup log if it's a one-pass send if (++count >= limit) break; } @@ -215,46 +269,68 @@ void Multicaster::send( unsigned long idx = 0; while ((count < limit)&&(idx < gs.members.size())) { - Address ma(gs.members[indexes[idx++]].address); - if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) { - out.sendOnly(RR,ma); // optimization: don't use dedup log if it's a one-pass send + const Address ma(gs.members[indexes[idx++]].address); + if ((std::find(activeBridges,activeBridges + activeBridgeCount,ma) == (activeBridges + activeBridgeCount))&&(ma != origin)) { + out.sendOnly(RR,tPtr,ma); // optimization: don't use dedup log if it's a one-pass send ++count; } } } else { - unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1; + const unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1; if ((gs.members.empty())||((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY)) { gs.lastExplicitGather = now; - SharedPtr<Peer> explicitGatherPeers[2]; - explicitGatherPeers[0] = RR->topology->getBestRoot(); - const Address nwidc(Network::controllerFor(nwid)); - if (nwidc != RR->identity.address()) - explicitGatherPeers[1] = RR->topology->getPeer(nwidc); - for(unsigned int k=0;k<2;++k) { - const SharedPtr<Peer> &p = explicitGatherPeers[k]; - if (!p) - continue; - //TRACE(">>MC upstream GATHER up to %u for group %.16llx/%s",gatherLimit,nwid,mg.toString().c_str()); - - const CertificateOfMembership *com = (CertificateOfMembership *)0; - { - SharedPtr<Network> nw(RR->node->network(nwid)); - if ((nw)&&(nw->hasConfig())&&(nw->config().com)&&(nw->config().isPrivate())&&(p->needsOurNetworkMembershipCertificate(nwid,now,true))) - com = &(nw->config().com); + + Address explicitGatherPeers[16]; + unsigned int numExplicitGatherPeers = 0; + + SharedPtr<Peer> bestRoot(RR->topology->getUpstreamPeer()); + if (bestRoot) + explicitGatherPeers[numExplicitGatherPeers++] = bestRoot->address(); + + explicitGatherPeers[numExplicitGatherPeers++] = network->controller(); + + Address ac[ZT_MAX_NETWORK_SPECIALISTS]; + const unsigned int accnt = network->config().alwaysContactAddresses(ac); + unsigned int shuffled[ZT_MAX_NETWORK_SPECIALISTS]; + for(unsigned int i=0;i<accnt;++i) + shuffled[i] = i; + for(unsigned int i=0,k=accnt>>1;i<k;++i) { + const uint64_t x = RR->node->prng(); + const unsigned int x1 = shuffled[(unsigned int)x % accnt]; + const unsigned int x2 = shuffled[(unsigned int)(x >> 32) % accnt]; + const unsigned int tmp = shuffled[x1]; + shuffled[x1] = shuffled[x2]; + shuffled[x2] = tmp; + } + for(unsigned int i=0;i<accnt;++i) { + explicitGatherPeers[numExplicitGatherPeers++] = ac[shuffled[i]]; + if (numExplicitGatherPeers == 16) + break; + } + + std::vector<Address> anchors(network->config().anchors()); + for(std::vector<Address>::const_iterator a(anchors.begin());a!=anchors.end();++a) { + if (*a != RR->identity.address()) { + explicitGatherPeers[numExplicitGatherPeers++] = *a; + if (numExplicitGatherPeers == 16) + break; } + } - Packet outp(p->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER); - outp.append(nwid); - outp.append((uint8_t)(com ? 0x01 : 0x00)); + for(unsigned int k=0;k<numExplicitGatherPeers;++k) { + const CertificateOfMembership *com = (network) ? ((network->config().com) ? &(network->config().com) : (const CertificateOfMembership *)0) : (const CertificateOfMembership *)0; + Packet outp(explicitGatherPeers[k],RR->identity.address(),Packet::VERB_MULTICAST_GATHER); + outp.append(network->id()); + outp.append((uint8_t)((com) ? 0x01 : 0x00)); mg.mac().appendTo(outp); outp.append((uint32_t)mg.adi()); outp.append((uint32_t)gatherLimit); if (com) com->serialize(outp); - RR->sw->send(outp,true,0); + RR->node->expectReplyTo(outp.packetId()); + RR->sw->send(tPtr,outp,true); } - gatherLimit = 0; } gs.txQueue.push_back(OutboundMulticast()); @@ -263,8 +339,8 @@ void Multicaster::send( out.init( RR, now, - nwid, - com, + network->id(), + network->config().disableCompression(), limit, gatherLimit, src, @@ -273,11 +349,14 @@ void Multicaster::send( data, len); + if (origin) + out.logAsSent(origin); + unsigned int count = 0; - for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { - if (*ast != RR->identity.address()) { - out.sendAndLog(RR,*ast); + for(unsigned int i=0;i<activeBridgeCount;++i) { + if (activeBridges[i] != RR->identity.address()) { + out.sendAndLog(RR,tPtr,activeBridges[i]); if (++count >= limit) break; } @@ -286,8 +365,8 @@ void Multicaster::send( unsigned long idx = 0; while ((count < limit)&&(idx < gs.members.size())) { Address ma(gs.members[indexes[idx++]].address); - if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) { - out.sendAndLog(RR,ma); + if (std::find(activeBridges,activeBridges + activeBridgeCount,ma) == (activeBridges + activeBridgeCount)) { + out.sendAndLog(RR,tPtr,ma); ++count; } } @@ -299,45 +378,65 @@ void Multicaster::send( delete [] indexes; } -void Multicaster::clean(uint64_t now) +void Multicaster::clean(int64_t now) { - Mutex::Lock _l(_groups_m); - - Multicaster::Key *k = (Multicaster::Key *)0; - MulticastGroupStatus *s = (MulticastGroupStatus *)0; - Hashtable<Multicaster::Key,MulticastGroupStatus>::Iterator mm(_groups); - while (mm.next(k,s)) { - for(std::list<OutboundMulticast>::iterator tx(s->txQueue.begin());tx!=s->txQueue.end();) { - if ((tx->expired(now))||(tx->atLimit())) - s->txQueue.erase(tx++); - else ++tx; - } + { + Mutex::Lock _l(_groups_m); + Multicaster::Key *k = (Multicaster::Key *)0; + MulticastGroupStatus *s = (MulticastGroupStatus *)0; + Hashtable<Multicaster::Key,MulticastGroupStatus>::Iterator mm(_groups); + while (mm.next(k,s)) { + for(std::list<OutboundMulticast>::iterator tx(s->txQueue.begin());tx!=s->txQueue.end();) { + if ((tx->expired(now))||(tx->atLimit())) + s->txQueue.erase(tx++); + else ++tx; + } - unsigned long count = 0; - { - std::vector<MulticastGroupMember>::iterator reader(s->members.begin()); - std::vector<MulticastGroupMember>::iterator writer(reader); - while (reader != s->members.end()) { - if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) { - *writer = *reader; - ++writer; - ++count; + unsigned long count = 0; + { + std::vector<MulticastGroupMember>::iterator reader(s->members.begin()); + std::vector<MulticastGroupMember>::iterator writer(reader); + while (reader != s->members.end()) { + if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) { + *writer = *reader; + ++writer; + ++count; + } + ++reader; } - ++reader; + } + + if (count) { + s->members.resize(count); + } else if (s->txQueue.empty()) { + _groups.erase(*k); + } else { + s->members.clear(); } } + } - if (count) { - s->members.resize(count); - } else if (s->txQueue.empty()) { - _groups.erase(*k); - } else { - s->members.clear(); + { + Mutex::Lock _l(_gatherAuth_m); + _GatherAuthKey *k = (_GatherAuthKey *)0; + uint64_t *ts = NULL; + Hashtable<_GatherAuthKey,uint64_t>::Iterator i(_gatherAuth); + while (i.next(k,ts)) { + if ((now - *ts) >= ZT_MULTICAST_CREDENTIAL_EXPIRATON) + _gatherAuth.erase(*k); } } } -void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &member) +void Multicaster::addCredential(void *tPtr,const CertificateOfMembership &com,bool alreadyValidated) +{ + if ((alreadyValidated)||(com.verify(RR,tPtr) == 0)) { + Mutex::Lock _l(_gatherAuth_m); + _gatherAuth[_GatherAuthKey(com.networkId(),com.issuedTo())] = RR->node->now(); + } +} + +void Multicaster::_add(void *tPtr,int64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &member) { // assumes _groups_m is locked @@ -354,13 +453,11 @@ void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,Multi gs.members.push_back(MulticastGroupMember(member,now)); - //TRACE("..MC %s joined multicast group %.16llx/%s via %s",member.toString().c_str(),nwid,mg.toString().c_str(),((learnedFrom) ? learnedFrom.toString().c_str() : "(direct)")); - for(std::list<OutboundMulticast>::iterator tx(gs.txQueue.begin());tx!=gs.txQueue.end();) { if (tx->atLimit()) gs.txQueue.erase(tx++); else { - tx->sendIfNew(RR,member); + tx->sendIfNew(RR,tPtr,member); if (tx->atLimit()) gs.txQueue.erase(tx++); else ++tx; |