diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2013-08-08 17:20:35 -0400 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2013-08-08 17:20:35 -0400 |
commit | 3af55f4423ab527a7366a56d552a4641560bc6f2 (patch) | |
tree | 1dfa888032963a036575656832099d630eb33a75 /node | |
parent | 95c0790a88711ba1c3821df200e10c6841c3a0a9 (diff) | |
download | infinitytier-3af55f4423ab527a7366a56d552a4641560bc6f2.tar.gz infinitytier-3af55f4423ab527a7366a56d552a4641560bc6f2.zip |
Add RateLimiter for rate limiting multicast, not tested yet.
Diffstat (limited to 'node')
-rw-r--r-- | node/Constants.hpp | 15 | ||||
-rw-r--r-- | node/Multicaster.hpp | 4 | ||||
-rw-r--r-- | node/Network.hpp | 23 | ||||
-rw-r--r-- | node/PacketDecoder.cpp | 180 | ||||
-rw-r--r-- | node/RateLimiter.hpp | 125 | ||||
-rw-r--r-- | node/Utils.hpp | 11 |
6 files changed, 268 insertions, 90 deletions
diff --git a/node/Constants.hpp b/node/Constants.hpp index 49322030..386a1508 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -267,6 +267,21 @@ error_no_ZT_ARCH_defined; #define ZT_MULTICAST_LOCAL_POLL_PERIOD 10000 /** + * Default bytes per second limit for multicasts per peer on a network + */ +#define ZT_MULTICAST_DEFAULT_BYTES_PER_SECOND 50.0 + +/** + * Default balance preload for multicast rate limiters on a network + */ +#define ZT_MULTICAST_DEFAULT_RATE_PRELOAD 25.0 + +/** + * Absolute maximum balance for multicast rate limiters + */ +#define ZT_MULTICAST_DEFAULT_RATE_MAX 75.0 + +/** * Delay between scans of the topology active peer DB for peers that need ping */ #define ZT_PING_CHECK_DELAY 7000 diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp index 229b3c06..9c0795bd 100644 --- a/node/Multicaster.hpp +++ b/node/Multicaster.hpp @@ -63,6 +63,9 @@ namespace ZeroTier { * This is written as a generic class so that it can be mocked and tested * in simulation. It also always takes 'now' as an argument, permitting * running in simulated time. + * + * This does not handle network permission or rate limiting, only the + * propagation algorithm. */ class Multicaster { @@ -328,6 +331,7 @@ private: // Address and time of last LIKE typedef std::pair<Address,uint64_t> MulticastMembership; + // Network : MulticastGroup -> vector<Address : time of last LIKE> std::map< MulticastChannel,std::vector<MulticastMembership> > _multicastMemberships; Mutex _multicastMemberships_m; }; diff --git a/node/Network.hpp b/node/Network.hpp index b42e09c1..7945569c 100644 --- a/node/Network.hpp +++ b/node/Network.hpp @@ -47,6 +47,7 @@ #include "Dictionary.hpp" #include "Identity.hpp" #include "InetAddress.hpp" +#include "RateLimiter.hpp" namespace ZeroTier { @@ -426,6 +427,25 @@ public: */ Status status() const; + /** + * Invoke multicast rate limiter gate for a given address + * + * @param addr Address to check + * @param bytes Bytes address wishes to send us / propagate + * @return True if allowed, false if overshot rate limit + */ + inline bool multicastRateGate(const Address &addr,unsigned int bytes) + { + Mutex::Lock _l(_lock); + std::map<Address,RateLimiter>::iterator rl(_multicastRateLimiters.find(addr)); + if (rl == _multicastRateLimiters.end()) { + RateLimiter &newrl = _multicastRateLimiters[addr]; + newrl.init(ZT_MULTICAST_DEFAULT_BYTES_PER_SECOND,ZT_MULTICAST_DEFAULT_RATE_PRELOAD,ZT_MULTICAST_DEFAULT_RATE_MAX); + return newrl.gate((double)bytes); + } + return rl->second.gate((double)bytes); + } + private: static void _CBhandleTapData(void *arg,const MAC &from,const MAC &to,unsigned int etherType,const Buffer<4096> &data); void _restoreState(); @@ -439,6 +459,9 @@ private: // Membership certificates supplied by peers std::map<Address,Certificate> _membershipCertificates; + // Rate limiters for each multicasting peer + std::map<Address,RateLimiter> _multicastRateLimiters; + // Configuration from network master node Config _configuration; Certificate _myCertificate; diff --git a/node/PacketDecoder.cpp b/node/PacketDecoder.cpp index e09c5894..1481711b 100644 --- a/node/PacketDecoder.cpp +++ b/node/PacketDecoder.cpp @@ -494,106 +494,106 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared { try { SharedPtr<Network> network(_r->nc->network(at<uint64_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID))); - if (network) { - if (network->isAllowed(source())) { - if (size() > ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD) { - - Address originalSubmitterAddress(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_ADDRESS,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); - MAC fromMac(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SOURCE_MAC,6)); - MulticastGroup mg(MAC(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_MAC,6)),at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ADI)); - unsigned int hops = (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT]; - unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ETHERTYPE); - unsigned int datalen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD_LENGTH); - unsigned int signaturelen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SIGNATURE_LENGTH); - unsigned char *dataAndSignature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,datalen + signaturelen); - - uint64_t mccrc = Multicaster::computeMulticastDedupCrc(network->id(),fromMac,mg,etherType,dataAndSignature,datalen); - uint64_t now = Utils::now(); - bool isDuplicate = _r->multicaster->checkDuplicate(mccrc,now); - - if (originalSubmitterAddress == _r->identity.address()) { - // Technically should not happen, since the original submitter is - // excluded from consideration as a propagation recipient. - TRACE("dropped boomerang MULTICAST_FRAME received from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str()); - } else if ((!isDuplicate)||(_r->topology->amSupernode())) { - // - // If I am a supernode, I will repeatedly propagate duplicates. That's - // because supernodes are used to bridge sparse multicast groups. Non- - // supernodes will ignore duplicates completely. - // - // TODO: supernodes should keep a local bloom filter too and OR it with - // the bloom from the packet in order to pick different recipients each - // time a multicast returns to them for repropagation. - // - - SharedPtr<Peer> originalSubmitter(_r->topology->getPeer(originalSubmitterAddress)); - if (!originalSubmitter) { - TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str()); - _r->sw->requestWhois(originalSubmitterAddress); - _step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP; - return false; // try again if/when we get OK(WHOIS) - } else if (Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) { - _r->multicaster->addToDedupHistory(mccrc,now); - - // Even if we are a supernode, we still don't repeatedly inject - // duplicates into our own tap. - if (!isDuplicate) - network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen); - - if (++hops < ZT_MULTICAST_PROPAGATION_DEPTH) { - Address upstream(source()); // save this since we mangle it - - Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES)); - SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH]; - unsigned int np = _r->multicaster->pickNextPropagationPeers( - *(_r->prng), - *(_r->topology), - network->id(), - mg, - originalSubmitterAddress, - upstream, - bloom, - ZT_MULTICAST_PROPAGATION_BREADTH, - propPeers, - now); - - // In a bit of a hack, we re-use this packet to repeat it - // to our multicast propagation recipients. Afterwords we - // return true just to be sure this is the end of this - // packet's life cycle, since it is now mangled. - - setSource(_r->identity.address()); - (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops; - memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES); - compress(); - - for(unsigned int i=0;i<np;++i) { - //TRACE("propagating multicast from original node %s: %s -> %s",originalSubmitterAddress.toString().c_str(),upstream.toString().c_str(),propPeers[i]->address().toString().c_str()); - // Re-use this packet to re-send multicast frame to everyone - // downstream from us. - newInitializationVector(); - setDestination(propPeers[i]->address()); - _r->sw->send(*this,true); - } - - return true; - } else { - //TRACE("terminating MULTICAST_FRAME propagation from %s(%s): max depth reached",source().toString().c_str(),_remoteAddress.toString().c_str()); + if ((network)&&(network->isAllowed(source()))) { + Address originalSubmitterAddress(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_ADDRESS,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); + MAC fromMac(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SOURCE_MAC,6)); + MulticastGroup mg(MAC(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_MAC,6)),at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ADI)); + unsigned int hops = (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT]; + unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ETHERTYPE); + unsigned int datalen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD_LENGTH); + unsigned int signaturelen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SIGNATURE_LENGTH); + unsigned char *dataAndSignature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,datalen + signaturelen); + + uint64_t mccrc = Multicaster::computeMulticastDedupCrc(network->id(),fromMac,mg,etherType,dataAndSignature,datalen); + uint64_t now = Utils::now(); + bool isDuplicate = _r->multicaster->checkDuplicate(mccrc,now); + + if (originalSubmitterAddress == _r->identity.address()) { + // Technically should not happen, since the original submitter is + // excluded from consideration as a propagation recipient. + TRACE("dropped boomerang MULTICAST_FRAME received from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str()); + } else if ((!isDuplicate)||(_r->topology->amSupernode())) { + // + // If I am a supernode, I will repeatedly propagate duplicates. That's + // because supernodes are used to bridge sparse multicast groups. Non- + // supernodes will ignore duplicates completely. + // + // TODO: supernodes should keep a local bloom filter too and OR it with + // the bloom from the packet in order to pick different recipients each + // time a multicast returns to them for repropagation. + // + + SharedPtr<Peer> originalSubmitter(_r->topology->getPeer(originalSubmitterAddress)); + if (!originalSubmitter) { + TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str()); + _r->sw->requestWhois(originalSubmitterAddress); + _step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP; + return false; // try again if/when we get OK(WHOIS) + } else if (Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) { + // In checking the multicast rate, we don't re-check if this is + // a duplicate. That's because if isDuplicate is true it means + // we're a supernode and it's a second pass relay. + if ((isDuplicate)||(network->multicastRateGate(originalSubmitter->address(),datalen))) { + _r->multicaster->addToDedupHistory(mccrc,now); + + // Even if we are a supernode, we still don't repeatedly inject + // duplicates into our own tap. + if (!isDuplicate) + network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen); + + if (++hops < ZT_MULTICAST_PROPAGATION_DEPTH) { + Address upstream(source()); // save this since we mangle it + + Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES)); + SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH]; + unsigned int np = _r->multicaster->pickNextPropagationPeers( + *(_r->prng), + *(_r->topology), + network->id(), + mg, + originalSubmitterAddress, + upstream, + bloom, + ZT_MULTICAST_PROPAGATION_BREADTH, + propPeers, + now); + + // In a bit of a hack, we re-use this packet to repeat it + // to our multicast propagation recipients. Afterwords we + // return true just to be sure this is the end of this + // packet's life cycle, since it is now mangled. + + setSource(_r->identity.address()); + (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops; + memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES); + compress(); + + for(unsigned int i=0;i<np;++i) { + //TRACE("propagating multicast from original node %s: %s -> %s",originalSubmitterAddress.toString().c_str(),upstream.toString().c_str(),propPeers[i]->address().toString().c_str()); + // Re-use this packet to re-send multicast frame to everyone + // downstream from us. + newInitializationVector(); + setDestination(propPeers[i]->address()); + _r->sw->send(*this,true); } + + // Return here just to be safe, since this packet's state is no + // longer valid. + return true; } else { - LOG("rejected MULTICAST_FRAME from %s(%s) due to failed signature check (falsely claims origin %s)",source().toString().c_str(),_remoteAddress.toString().c_str(),originalSubmitterAddress.toString().c_str()); + //TRACE("terminating MULTICAST_FRAME propagation from %s(%s): max depth reached",source().toString().c_str(),_remoteAddress.toString().c_str()); } } else { - TRACE("dropped redundant MULTICAST_FRAME from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str()); + LOG("dropped MULTICAST_FRAME from original sender %s: rate limit overrun",originalSubmitter->address().toString().c_str()); } } else { - TRACE("dropped MULTICAST_FRAME from %s(%s): invalid short packet",source().toString().c_str(),_remoteAddress.toString().c_str()); + TRACE("rejected MULTICAST_FRAME forwarded by %s(%s): failed signature check (falsely claims origin %s)",source().toString().c_str(),_remoteAddress.toString().c_str(),originalSubmitterAddress.toString().c_str()); } } else { - TRACE("dropped MULTICAST_FRAME from %s(%s): not a member of closed network %llu",source().toString().c_str(),_remoteAddress.toString().c_str(),network->id()); + TRACE("dropped duplicate MULTICAST_FRAME from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str()); } } else { - TRACE("dropped MULTICAST_FRAME from %s(%s): network %llu unknown or we are not a member",source().toString().c_str(),_remoteAddress.toString().c_str(),at<uint64_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID)); + TRACE("dropped MULTICAST_FRAME from %s(%s): network %.16llx unknown or sender not allowed",source().toString().c_str(),_remoteAddress.toString().c_str(),(unsigned long long)network->id()); } } catch (std::exception &ex) { TRACE("dropped MULTICAST_FRAME from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what()); diff --git a/node/RateLimiter.hpp b/node/RateLimiter.hpp new file mode 100644 index 00000000..7f7dfbdb --- /dev/null +++ b/node/RateLimiter.hpp @@ -0,0 +1,125 @@ +/* + * ZeroTier One - Global Peer to Peer Ethernet + * Copyright (C) 2012-2013 ZeroTier Networks LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifndef _ZT_RATELIMITER_HPP +#define _ZT_RATELIMITER_HPP + +#include <math.h> +#include "Utils.hpp" + +namespace ZeroTier { + +/** + * Burstable rate limiter + * + * This limits a transfer rate to a maximum bytes per second using an + * accounting method based on a balance rather than accumulating an + * average rate. The result is a burstable rate limit rather than a + * continuous rate limit; the link being limited may use all its balance + * at once or slowly over time. Balance constantly replenishes over time + * up to a configurable maximum balance. + */ +class RateLimiter +{ +public: + /** + * Create an uninitialized rate limiter + * + * init() must be called before this is used. + */ + RateLimiter() throw() {} + + /** + * @param bytesPerSecond Bytes per second to permit (average) + * @param preload Initial balance to place in account + * @param max Maximum balance to permit to ever accrue (max burst) + */ + RateLimiter(double bytesPerSecond,double preload,double max) + throw() + { + init(bytesPerSecond,preload,max); + } + + /** + * Initialize or re-initialize rate limiter + * + * @param bytesPerSecond Bytes per second to permit (average) + * @param preload Initial balance to place in account + * @param max Maximum balance to permit to ever accrue (max burst) + */ + inline void init(double bytesPerSecond,double preload,double max) + throw() + { + _bytesPerSecond = bytesPerSecond; + _lastTime = Utils::nowf(); + _balance = preload; + _max = max; + } + + /** + * Update balance based on current clock + * + * This can be called at any time to check the current balance without + * affecting the behavior of gate(). + * + * @return New balance + */ + inline double updateBalance() + throw() + { + double now = Utils::nowf(); + double b = _balance = fmin(_max,_balance + (_bytesPerSecond * (now - _lastTime))); + _lastTime = now; + return b; + } + + /** + * Test balance and update / deduct if there is enough to transfer 'bytes' + * + * @param bytes Number of bytes that we wish to transfer + * @return True if balance was sufficient (balance is updated), false if not (balance unchanged) + */ + inline bool gate(double bytes) + throw() + { + if (updateBalance() >= bytes) { + _balance -= bytes; + return true; + } + return false; + } + +private: + double _bytesPerSecond; + double _lastTime; + double _balance; + double _max; +}; + +} // namespace ZeroTier + +#endif diff --git a/node/Utils.hpp b/node/Utils.hpp index 872201f8..8f5e7006 100644 --- a/node/Utils.hpp +++ b/node/Utils.hpp @@ -348,6 +348,17 @@ public: }; /** + * @return Current time in seconds since epoch, to the highest available resolution + */ + static inline double nowf() + throw() + { + struct timeval tv; + gettimeofday(&tv,(struct timezone *)0); + return ( ((double)tv.tv_sec) + (((double)tv.tv_usec) / 1000000.0) ); + } + + /** * Read the full contents of a file into a string buffer * * The buffer isn't cleared, so if it already contains data the file's data will |