summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2013-08-08 17:20:35 -0400
committerAdam Ierymenko <adam.ierymenko@gmail.com>2013-08-08 17:20:35 -0400
commit3af55f4423ab527a7366a56d552a4641560bc6f2 (patch)
tree1dfa888032963a036575656832099d630eb33a75 /node
parent95c0790a88711ba1c3821df200e10c6841c3a0a9 (diff)
downloadinfinitytier-3af55f4423ab527a7366a56d552a4641560bc6f2.tar.gz
infinitytier-3af55f4423ab527a7366a56d552a4641560bc6f2.zip
Add RateLimiter for rate limiting multicast, not tested yet.
Diffstat (limited to 'node')
-rw-r--r--node/Constants.hpp15
-rw-r--r--node/Multicaster.hpp4
-rw-r--r--node/Network.hpp23
-rw-r--r--node/PacketDecoder.cpp180
-rw-r--r--node/RateLimiter.hpp125
-rw-r--r--node/Utils.hpp11
6 files changed, 268 insertions, 90 deletions
diff --git a/node/Constants.hpp b/node/Constants.hpp
index 49322030..386a1508 100644
--- a/node/Constants.hpp
+++ b/node/Constants.hpp
@@ -267,6 +267,21 @@ error_no_ZT_ARCH_defined;
#define ZT_MULTICAST_LOCAL_POLL_PERIOD 10000
/**
+ * Default bytes per second limit for multicasts per peer on a network
+ */
+#define ZT_MULTICAST_DEFAULT_BYTES_PER_SECOND 50.0
+
+/**
+ * Default balance preload for multicast rate limiters on a network
+ */
+#define ZT_MULTICAST_DEFAULT_RATE_PRELOAD 25.0
+
+/**
+ * Absolute maximum balance for multicast rate limiters
+ */
+#define ZT_MULTICAST_DEFAULT_RATE_MAX 75.0
+
+/**
* Delay between scans of the topology active peer DB for peers that need ping
*/
#define ZT_PING_CHECK_DELAY 7000
diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp
index 229b3c06..9c0795bd 100644
--- a/node/Multicaster.hpp
+++ b/node/Multicaster.hpp
@@ -63,6 +63,9 @@ namespace ZeroTier {
* This is written as a generic class so that it can be mocked and tested
* in simulation. It also always takes 'now' as an argument, permitting
* running in simulated time.
+ *
+ * This does not handle network permission or rate limiting, only the
+ * propagation algorithm.
*/
class Multicaster
{
@@ -328,6 +331,7 @@ private:
// Address and time of last LIKE
typedef std::pair<Address,uint64_t> MulticastMembership;
+ // Network : MulticastGroup -> vector<Address : time of last LIKE>
std::map< MulticastChannel,std::vector<MulticastMembership> > _multicastMemberships;
Mutex _multicastMemberships_m;
};
diff --git a/node/Network.hpp b/node/Network.hpp
index b42e09c1..7945569c 100644
--- a/node/Network.hpp
+++ b/node/Network.hpp
@@ -47,6 +47,7 @@
#include "Dictionary.hpp"
#include "Identity.hpp"
#include "InetAddress.hpp"
+#include "RateLimiter.hpp"
namespace ZeroTier {
@@ -426,6 +427,25 @@ public:
*/
Status status() const;
+ /**
+ * Invoke multicast rate limiter gate for a given address
+ *
+ * @param addr Address to check
+ * @param bytes Bytes address wishes to send us / propagate
+ * @return True if allowed, false if overshot rate limit
+ */
+ inline bool multicastRateGate(const Address &addr,unsigned int bytes)
+ {
+ Mutex::Lock _l(_lock);
+ std::map<Address,RateLimiter>::iterator rl(_multicastRateLimiters.find(addr));
+ if (rl == _multicastRateLimiters.end()) {
+ RateLimiter &newrl = _multicastRateLimiters[addr];
+ newrl.init(ZT_MULTICAST_DEFAULT_BYTES_PER_SECOND,ZT_MULTICAST_DEFAULT_RATE_PRELOAD,ZT_MULTICAST_DEFAULT_RATE_MAX);
+ return newrl.gate((double)bytes);
+ }
+ return rl->second.gate((double)bytes);
+ }
+
private:
static void _CBhandleTapData(void *arg,const MAC &from,const MAC &to,unsigned int etherType,const Buffer<4096> &data);
void _restoreState();
@@ -439,6 +459,9 @@ private:
// Membership certificates supplied by peers
std::map<Address,Certificate> _membershipCertificates;
+ // Rate limiters for each multicasting peer
+ std::map<Address,RateLimiter> _multicastRateLimiters;
+
// Configuration from network master node
Config _configuration;
Certificate _myCertificate;
diff --git a/node/PacketDecoder.cpp b/node/PacketDecoder.cpp
index e09c5894..1481711b 100644
--- a/node/PacketDecoder.cpp
+++ b/node/PacketDecoder.cpp
@@ -494,106 +494,106 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
{
try {
SharedPtr<Network> network(_r->nc->network(at<uint64_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID)));
- if (network) {
- if (network->isAllowed(source())) {
- if (size() > ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD) {
-
- Address originalSubmitterAddress(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_ADDRESS,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
- MAC fromMac(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SOURCE_MAC,6));
- MulticastGroup mg(MAC(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_MAC,6)),at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ADI));
- unsigned int hops = (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT];
- unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ETHERTYPE);
- unsigned int datalen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD_LENGTH);
- unsigned int signaturelen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SIGNATURE_LENGTH);
- unsigned char *dataAndSignature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,datalen + signaturelen);
-
- uint64_t mccrc = Multicaster::computeMulticastDedupCrc(network->id(),fromMac,mg,etherType,dataAndSignature,datalen);
- uint64_t now = Utils::now();
- bool isDuplicate = _r->multicaster->checkDuplicate(mccrc,now);
-
- if (originalSubmitterAddress == _r->identity.address()) {
- // Technically should not happen, since the original submitter is
- // excluded from consideration as a propagation recipient.
- TRACE("dropped boomerang MULTICAST_FRAME received from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
- } else if ((!isDuplicate)||(_r->topology->amSupernode())) {
- //
- // If I am a supernode, I will repeatedly propagate duplicates. That's
- // because supernodes are used to bridge sparse multicast groups. Non-
- // supernodes will ignore duplicates completely.
- //
- // TODO: supernodes should keep a local bloom filter too and OR it with
- // the bloom from the packet in order to pick different recipients each
- // time a multicast returns to them for repropagation.
- //
-
- SharedPtr<Peer> originalSubmitter(_r->topology->getPeer(originalSubmitterAddress));
- if (!originalSubmitter) {
- TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str());
- _r->sw->requestWhois(originalSubmitterAddress);
- _step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP;
- return false; // try again if/when we get OK(WHOIS)
- } else if (Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) {
- _r->multicaster->addToDedupHistory(mccrc,now);
-
- // Even if we are a supernode, we still don't repeatedly inject
- // duplicates into our own tap.
- if (!isDuplicate)
- network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen);
-
- if (++hops < ZT_MULTICAST_PROPAGATION_DEPTH) {
- Address upstream(source()); // save this since we mangle it
-
- Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES));
- SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
- unsigned int np = _r->multicaster->pickNextPropagationPeers(
- *(_r->prng),
- *(_r->topology),
- network->id(),
- mg,
- originalSubmitterAddress,
- upstream,
- bloom,
- ZT_MULTICAST_PROPAGATION_BREADTH,
- propPeers,
- now);
-
- // In a bit of a hack, we re-use this packet to repeat it
- // to our multicast propagation recipients. Afterwords we
- // return true just to be sure this is the end of this
- // packet's life cycle, since it is now mangled.
-
- setSource(_r->identity.address());
- (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops;
- memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES);
- compress();
-
- for(unsigned int i=0;i<np;++i) {
- //TRACE("propagating multicast from original node %s: %s -> %s",originalSubmitterAddress.toString().c_str(),upstream.toString().c_str(),propPeers[i]->address().toString().c_str());
- // Re-use this packet to re-send multicast frame to everyone
- // downstream from us.
- newInitializationVector();
- setDestination(propPeers[i]->address());
- _r->sw->send(*this,true);
- }
-
- return true;
- } else {
- //TRACE("terminating MULTICAST_FRAME propagation from %s(%s): max depth reached",source().toString().c_str(),_remoteAddress.toString().c_str());
+ if ((network)&&(network->isAllowed(source()))) {
+ Address originalSubmitterAddress(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_ADDRESS,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
+ MAC fromMac(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SOURCE_MAC,6));
+ MulticastGroup mg(MAC(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_MAC,6)),at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ADI));
+ unsigned int hops = (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT];
+ unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ETHERTYPE);
+ unsigned int datalen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD_LENGTH);
+ unsigned int signaturelen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SIGNATURE_LENGTH);
+ unsigned char *dataAndSignature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,datalen + signaturelen);
+
+ uint64_t mccrc = Multicaster::computeMulticastDedupCrc(network->id(),fromMac,mg,etherType,dataAndSignature,datalen);
+ uint64_t now = Utils::now();
+ bool isDuplicate = _r->multicaster->checkDuplicate(mccrc,now);
+
+ if (originalSubmitterAddress == _r->identity.address()) {
+ // Technically should not happen, since the original submitter is
+ // excluded from consideration as a propagation recipient.
+ TRACE("dropped boomerang MULTICAST_FRAME received from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
+ } else if ((!isDuplicate)||(_r->topology->amSupernode())) {
+ //
+ // If I am a supernode, I will repeatedly propagate duplicates. That's
+ // because supernodes are used to bridge sparse multicast groups. Non-
+ // supernodes will ignore duplicates completely.
+ //
+ // TODO: supernodes should keep a local bloom filter too and OR it with
+ // the bloom from the packet in order to pick different recipients each
+ // time a multicast returns to them for repropagation.
+ //
+
+ SharedPtr<Peer> originalSubmitter(_r->topology->getPeer(originalSubmitterAddress));
+ if (!originalSubmitter) {
+ TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str());
+ _r->sw->requestWhois(originalSubmitterAddress);
+ _step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP;
+ return false; // try again if/when we get OK(WHOIS)
+ } else if (Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) {
+ // In checking the multicast rate, we don't re-check if this is
+ // a duplicate. That's because if isDuplicate is true it means
+ // we're a supernode and it's a second pass relay.
+ if ((isDuplicate)||(network->multicastRateGate(originalSubmitter->address(),datalen))) {
+ _r->multicaster->addToDedupHistory(mccrc,now);
+
+ // Even if we are a supernode, we still don't repeatedly inject
+ // duplicates into our own tap.
+ if (!isDuplicate)
+ network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen);
+
+ if (++hops < ZT_MULTICAST_PROPAGATION_DEPTH) {
+ Address upstream(source()); // save this since we mangle it
+
+ Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES));
+ SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
+ unsigned int np = _r->multicaster->pickNextPropagationPeers(
+ *(_r->prng),
+ *(_r->topology),
+ network->id(),
+ mg,
+ originalSubmitterAddress,
+ upstream,
+ bloom,
+ ZT_MULTICAST_PROPAGATION_BREADTH,
+ propPeers,
+ now);
+
+ // In a bit of a hack, we re-use this packet to repeat it
+ // to our multicast propagation recipients. Afterwords we
+ // return true just to be sure this is the end of this
+ // packet's life cycle, since it is now mangled.
+
+ setSource(_r->identity.address());
+ (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops;
+ memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES);
+ compress();
+
+ for(unsigned int i=0;i<np;++i) {
+ //TRACE("propagating multicast from original node %s: %s -> %s",originalSubmitterAddress.toString().c_str(),upstream.toString().c_str(),propPeers[i]->address().toString().c_str());
+ // Re-use this packet to re-send multicast frame to everyone
+ // downstream from us.
+ newInitializationVector();
+ setDestination(propPeers[i]->address());
+ _r->sw->send(*this,true);
}
+
+ // Return here just to be safe, since this packet's state is no
+ // longer valid.
+ return true;
} else {
- LOG("rejected MULTICAST_FRAME from %s(%s) due to failed signature check (falsely claims origin %s)",source().toString().c_str(),_remoteAddress.toString().c_str(),originalSubmitterAddress.toString().c_str());
+ //TRACE("terminating MULTICAST_FRAME propagation from %s(%s): max depth reached",source().toString().c_str(),_remoteAddress.toString().c_str());
}
} else {
- TRACE("dropped redundant MULTICAST_FRAME from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
+ LOG("dropped MULTICAST_FRAME from original sender %s: rate limit overrun",originalSubmitter->address().toString().c_str());
}
} else {
- TRACE("dropped MULTICAST_FRAME from %s(%s): invalid short packet",source().toString().c_str(),_remoteAddress.toString().c_str());
+ TRACE("rejected MULTICAST_FRAME forwarded by %s(%s): failed signature check (falsely claims origin %s)",source().toString().c_str(),_remoteAddress.toString().c_str(),originalSubmitterAddress.toString().c_str());
}
} else {
- TRACE("dropped MULTICAST_FRAME from %s(%s): not a member of closed network %llu",source().toString().c_str(),_remoteAddress.toString().c_str(),network->id());
+ TRACE("dropped duplicate MULTICAST_FRAME from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
}
} else {
- TRACE("dropped MULTICAST_FRAME from %s(%s): network %llu unknown or we are not a member",source().toString().c_str(),_remoteAddress.toString().c_str(),at<uint64_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID));
+ TRACE("dropped MULTICAST_FRAME from %s(%s): network %.16llx unknown or sender not allowed",source().toString().c_str(),_remoteAddress.toString().c_str(),(unsigned long long)network->id());
}
} catch (std::exception &ex) {
TRACE("dropped MULTICAST_FRAME from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what());
diff --git a/node/RateLimiter.hpp b/node/RateLimiter.hpp
new file mode 100644
index 00000000..7f7dfbdb
--- /dev/null
+++ b/node/RateLimiter.hpp
@@ -0,0 +1,125 @@
+/*
+ * ZeroTier One - Global Peer to Peer Ethernet
+ * Copyright (C) 2012-2013 ZeroTier Networks LLC
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifndef _ZT_RATELIMITER_HPP
+#define _ZT_RATELIMITER_HPP
+
+#include <math.h>
+#include "Utils.hpp"
+
+namespace ZeroTier {
+
+/**
+ * Burstable rate limiter
+ *
+ * This limits a transfer rate to a maximum bytes per second using an
+ * accounting method based on a balance rather than accumulating an
+ * average rate. The result is a burstable rate limit rather than a
+ * continuous rate limit; the link being limited may use all its balance
+ * at once or slowly over time. Balance constantly replenishes over time
+ * up to a configurable maximum balance.
+ */
+class RateLimiter
+{
+public:
+ /**
+ * Create an uninitialized rate limiter
+ *
+ * init() must be called before this is used.
+ */
+ RateLimiter() throw() {}
+
+ /**
+ * @param bytesPerSecond Bytes per second to permit (average)
+ * @param preload Initial balance to place in account
+ * @param max Maximum balance to permit to ever accrue (max burst)
+ */
+ RateLimiter(double bytesPerSecond,double preload,double max)
+ throw()
+ {
+ init(bytesPerSecond,preload,max);
+ }
+
+ /**
+ * Initialize or re-initialize rate limiter
+ *
+ * @param bytesPerSecond Bytes per second to permit (average)
+ * @param preload Initial balance to place in account
+ * @param max Maximum balance to permit to ever accrue (max burst)
+ */
+ inline void init(double bytesPerSecond,double preload,double max)
+ throw()
+ {
+ _bytesPerSecond = bytesPerSecond;
+ _lastTime = Utils::nowf();
+ _balance = preload;
+ _max = max;
+ }
+
+ /**
+ * Update balance based on current clock
+ *
+ * This can be called at any time to check the current balance without
+ * affecting the behavior of gate().
+ *
+ * @return New balance
+ */
+ inline double updateBalance()
+ throw()
+ {
+ double now = Utils::nowf();
+ double b = _balance = fmin(_max,_balance + (_bytesPerSecond * (now - _lastTime)));
+ _lastTime = now;
+ return b;
+ }
+
+ /**
+ * Test balance and update / deduct if there is enough to transfer 'bytes'
+ *
+ * @param bytes Number of bytes that we wish to transfer
+ * @return True if balance was sufficient (balance is updated), false if not (balance unchanged)
+ */
+ inline bool gate(double bytes)
+ throw()
+ {
+ if (updateBalance() >= bytes) {
+ _balance -= bytes;
+ return true;
+ }
+ return false;
+ }
+
+private:
+ double _bytesPerSecond;
+ double _lastTime;
+ double _balance;
+ double _max;
+};
+
+} // namespace ZeroTier
+
+#endif
diff --git a/node/Utils.hpp b/node/Utils.hpp
index 872201f8..8f5e7006 100644
--- a/node/Utils.hpp
+++ b/node/Utils.hpp
@@ -348,6 +348,17 @@ public:
};
/**
+ * @return Current time in seconds since epoch, to the highest available resolution
+ */
+ static inline double nowf()
+ throw()
+ {
+ struct timeval tv;
+ gettimeofday(&tv,(struct timezone *)0);
+ return ( ((double)tv.tv_sec) + (((double)tv.tv_usec) / 1000000.0) );
+ }
+
+ /**
* Read the full contents of a file into a string buffer
*
* The buffer isn't cleared, so if it already contains data the file's data will