summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
Diffstat (limited to 'node')
-rw-r--r--node/Multicaster.hpp64
-rw-r--r--node/PacketDecoder.cpp184
-rw-r--r--node/Switch.cpp2
3 files changed, 167 insertions, 83 deletions
diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp
index fa0062b0..41fa7ff5 100644
--- a/node/Multicaster.hpp
+++ b/node/Multicaster.hpp
@@ -206,7 +206,7 @@ public:
}
/**
- * Choose peers to send a propagating multicast to
+ * Choose peers for multicast propagation via random selection
*
* @param prng Random source
* @param topology Topology object or mock thereof
@@ -223,7 +223,67 @@ public:
* @tparam P Type of peers, which is SharedPtr<Peer> in running code or a mock in simulation (mock must behave like a pointer type)
*/
template<typename T,typename P>
- inline unsigned int pickNextPropagationPeers(
+ inline unsigned int pickRandomPropagationPeers(
+ CMWC4096 &prng,
+ T &topology,
+ uint64_t nwid,
+ const MulticastGroup &mg,
+ const Address &originalSubmitter,
+ const Address &upstream,
+ MulticastBloomFilter &bf,
+ unsigned int max,
+ P *peers,
+ uint64_t now)
+ {
+ unsigned int chosen = 0;
+ Mutex::Lock _l(_multicastMemberships_m);
+ std::map< MulticastChannel,std::vector<MulticastMembership> >::iterator mm(_multicastMemberships.find(MulticastChannel(nwid,mg)));
+ if ((mm != _multicastMemberships.end())&&(!mm->second.empty())) {
+ for(unsigned int stries=0;((stries<ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE)&&(chosen < max));++stries) {
+ MulticastMembership &m = mm->second[prng.next32() % mm->second.size()];
+ unsigned int sum = m.first.sum();
+ if (
+ ((now - m.second) < ZT_MULTICAST_LIKE_EXPIRE)&& /* LIKE is not expired */
+ (!bf.contains(sum))&& /* Not in propagation bloom */
+ (m.first != originalSubmitter)&& /* Not the original submitter */
+ (m.first != upstream) ) { /* Not where the frame came from */
+ P peer(topology.getPeer(m.first));
+ if (peer) {
+ unsigned int chk = 0;
+ while (chk < chosen) {
+ if (peers[chk++] == peer)
+ break;
+ }
+ if (chk == chosen) { /* not already picked */
+ peers[chosen++] = peer;
+ bf.set(sum);
+ }
+ }
+ }
+ }
+ }
+ return 0;
+ }
+
+ /**
+ * Choose peers for multicast propagation via implicit social switching
+ *
+ * @param prng Random source
+ * @param topology Topology object or mock thereof
+ * @param nwid Network ID
+ * @param mg Multicast group
+ * @param originalSubmitter Original submitter of multicast message to network
+ * @param upstream Address from which message originated, or null (0) address if none
+ * @param bf Bloom filter, updated in place with sums of addresses in chosen peers and/or decay
+ * @param max Maximum number of peers to pick
+ * @param peers Array of objects of type P to fill with up to [max] peers
+ * @param now Current timestamp
+ * @return Number of peers actually stored in peers array
+ * @tparam T Type of topology, which is Topology in running code or a mock in simulation
+ * @tparam P Type of peers, which is SharedPtr<Peer> in running code or a mock in simulation (mock must behave like a pointer type)
+ */
+ template<typename T,typename P>
+ inline unsigned int pickSocialPropagationPeers(
CMWC4096 &prng,
T &topology,
uint64_t nwid,
diff --git a/node/PacketDecoder.cpp b/node/PacketDecoder.cpp
index 14f0d502..b4bc21e0 100644
--- a/node/PacketDecoder.cpp
+++ b/node/PacketDecoder.cpp
@@ -496,6 +496,24 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
SharedPtr<Network> network(_r->nc->network(at<uint64_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID)));
if ((network)&&(network->isAllowed(source()))) {
Address originalSubmitterAddress(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_ADDRESS,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
+
+ if (originalSubmitterAddress.isReserved()) {
+ TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): invalid original submitter address",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+ return true;
+ }
+ if (originalSubmitterAddress == _r->identity.address()) {
+ TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): boomerang!",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+ return true;
+ }
+
+ SharedPtr<Peer> originalSubmitter(_r->topology->getPeer(originalSubmitterAddress));
+ if (!originalSubmitter) {
+ TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str());
+ _r->sw->requestWhois(originalSubmitterAddress);
+ _step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP;
+ return false; // try again if/when we get OK(WHOIS)
+ }
+
MAC fromMac(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SOURCE_MAC,6));
MulticastGroup mg(MAC(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_MAC,6)),at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ADI));
unsigned int hops = (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT];
@@ -504,94 +522,100 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
unsigned int signaturelen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SIGNATURE_LENGTH);
unsigned char *dataAndSignature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,datalen + signaturelen);
+ if (!Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) {
+ LOG("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): FAILED SIGNATURE CHECK (spoofed original submitter?)",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+ return true;
+ }
+
+ if (++hops >= ZT_MULTICAST_PROPAGATION_DEPTH) {
+ TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): max depth reached",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+ return true;
+ }
+
uint64_t mccrc = Multicaster::computeMulticastDedupCrc(network->id(),fromMac,mg,etherType,dataAndSignature,datalen);
uint64_t now = Utils::now();
bool isDuplicate = _r->multicaster->checkDuplicate(mccrc,now);
- if (originalSubmitterAddress == _r->identity.address()) {
- // Technically should not happen, since the original submitter is
- // excluded from consideration as a propagation recipient.
- TRACE("dropped boomerang MULTICAST_FRAME received from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
- } else if ((!isDuplicate)||(_r->topology->amSupernode())) {
- //
- // If I am a supernode, I will repeatedly propagate duplicates. That's
- // because supernodes are used to bridge sparse multicast groups. Non-
- // supernodes will ignore duplicates completely.
- //
- // TODO: supernodes should keep a local bloom filter too and OR it with
- // the bloom from the packet in order to pick different recipients each
- // time a multicast returns to them for repropagation.
- //
-
- SharedPtr<Peer> originalSubmitter(_r->topology->getPeer(originalSubmitterAddress));
- if (!originalSubmitter) {
- TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str());
- _r->sw->requestWhois(originalSubmitterAddress);
- _step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP;
- return false; // try again if/when we get OK(WHOIS)
- } else if (Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) {
- // In checking the multicast rate, we don't re-check if this is
- // a duplicate. That's because if isDuplicate is true it means
- // we're a supernode and it's a second pass relay.
- if ((isDuplicate)||(network->multicastRateGate(originalSubmitter->address(),datalen))) {
- _r->multicaster->addToDedupHistory(mccrc,now);
-
- // Even if we are a supernode, we still don't repeatedly inject
- // duplicates into our own tap.
- if (!isDuplicate)
- network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen);
-
- if (++hops < ZT_MULTICAST_PROPAGATION_DEPTH) {
- Address upstream(source()); // save this since we mangle it
-
- Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES));
- SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
- unsigned int np = _r->multicaster->pickNextPropagationPeers(
- *(_r->prng),
- *(_r->topology),
- network->id(),
- mg,
- originalSubmitterAddress,
- upstream,
- bloom,
- ZT_MULTICAST_PROPAGATION_BREADTH,
- propPeers,
- now);
-
- // In a bit of a hack, we re-use this packet to repeat it
- // to our multicast propagation recipients. Afterwords we
- // return true just to be sure this is the end of this
- // packet's life cycle, since it is now mangled.
-
- setSource(_r->identity.address());
- (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops;
- memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES);
- compress();
-
- for(unsigned int i=0;i<np;++i) {
- //TRACE("propagating multicast from original node %s: %s -> %s",originalSubmitterAddress.toString().c_str(),upstream.toString().c_str(),propPeers[i]->address().toString().c_str());
- // Re-use this packet to re-send multicast frame to everyone
- // downstream from us.
- newInitializationVector();
- setDestination(propPeers[i]->address());
- _r->sw->send(*this,true);
- }
-
- // Return here just to be safe, since this packet's state is no
- // longer valid.
- return true;
- } else {
- //TRACE("terminating MULTICAST_FRAME propagation from %s(%s): max depth reached",source().toString().c_str(),_remoteAddress.toString().c_str());
- }
- } else {
- LOG("dropped MULTICAST_FRAME from original sender %s: rate limit overrun",originalSubmitter->address().toString().c_str());
- }
+ if (!isDuplicate) {
+ if (network->multicastRateGate(originalSubmitterAddress,datalen)) {
+ network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen);
} else {
- TRACE("rejected MULTICAST_FRAME forwarded by %s(%s): failed signature check (falsely claims origin %s)",source().toString().c_str(),_remoteAddress.toString().c_str(),originalSubmitterAddress.toString().c_str());
+ TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): sender rate limit exceeded",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+ return true;
}
+
+ /* It's important that we do this *after* rate limit checking,
+ * otherwise supernodes could be used to execute a flood by
+ * first bouncing a multicast off a supernode and then flooding
+ * it with retransmits. */
+ _r->multicaster->addToDedupHistory(mccrc,now);
+ }
+
+ Address upstream(source()); // save this since we might mangle it below
+ Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES));
+ SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
+ unsigned int np = 0;
+
+ if (_r->topology->amSupernode()) {
+ /* Supernodes behave differently here from ordinary nodes, as their
+ * role in the network is to bridge gaps between unconnected islands
+ * in a multicast propagation graph. Instead of using the ordinary
+ * multicast peer picker, supernodes propagate to random unvisited
+ * peers. They will also repeatedly propagate duplicate multicasts to
+ * new peers, while regular nodes simply discard them. This allows
+ * such gaps to be bridged more than once by ping-ponging off the
+ * same supernode -- a simple way to implement this without requiring
+ * that supernodes maintain a lot of state at the cost of a small
+ * amount of bandwidth. */
+ np = _r->multicaster->pickRandomPropagationPeers(
+ *(_r->prng),
+ *(_r->topology),
+ network->id(),
+ mg,
+ originalSubmitterAddress,
+ upstream,
+ bloom,
+ ZT_MULTICAST_PROPAGATION_BREADTH,
+ propPeers,
+ now);
+ } else if (isDuplicate) {
+ TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): duplicate",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+ return true;
} else {
- TRACE("dropped duplicate MULTICAST_FRAME from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
+ /* Regular peers only propagate non-duplicate packets, and do so
+ * according to ordinary propagation priority rules. */
+ np = _r->multicaster->pickSocialPropagationPeers(
+ *(_r->prng),
+ *(_r->topology),
+ network->id(),
+ mg,
+ originalSubmitterAddress,
+ upstream,
+ bloom,
+ ZT_MULTICAST_PROPAGATION_BREADTH,
+ propPeers,
+ now);
+ }
+
+ /* Re-use *this* packet to repeat it to our propagation
+ * recipients, which invalidates its current contents and
+ * state. */
+
+ if (np) {
+ setSource(_r->identity.address());
+ (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops;
+ memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES);
+ compress();
+ for(unsigned int i=0;i<np;++i) {
+ newInitializationVector();
+ setDestination(propPeers[i]->address());
+ _r->sw->send(*this,true);
+ }
}
+
+ /* Just to be safe, return true here to terminate processing as we
+ * have thoroughly destroyed our state by doing the above. */
+ return true;
} else {
TRACE("dropped MULTICAST_FRAME from %s(%s): network %.16llx unknown or sender not allowed",source().toString().c_str(),_remoteAddress.toString().c_str(),(unsigned long long)network->id());
}
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 04984f48..aed37307 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -108,7 +108,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
Multicaster::MulticastBloomFilter bloom;
SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
- unsigned int np = _r->multicaster->pickNextPropagationPeers(
+ unsigned int np = _r->multicaster->pickSocialPropagationPeers(
*(_r->prng),
*(_r->topology),
network->id(),