summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2014-10-01 14:05:25 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2014-10-01 14:05:25 -0700
commite1882b614b48bf0c2a68223b3ee2fb338dbcb8f6 (patch)
tree2447fa648e910c6ef9aa8d4b3147606b2bc18e91
parentae082c3cb8a1ba7f8efb95ce690b012ffa7a79cd (diff)
downloadinfinitytier-e1882b614b48bf0c2a68223b3ee2fb338dbcb8f6.tar.gz
infinitytier-e1882b614b48bf0c2a68223b3ee2fb338dbcb8f6.zip
Some cleanup, Multicaster now sends multicasts as it gets additional members.
-rw-r--r--node/Multicaster.cpp47
-rw-r--r--node/Multicaster.hpp16
-rw-r--r--node/Node.cpp6
-rw-r--r--node/OutboundMulticast.cpp16
-rw-r--r--node/OutboundMulticast.hpp7
-rw-r--r--node/Switch.cpp4
-rw-r--r--node/Topology.cpp5
-rw-r--r--node/Topology.hpp2
8 files changed, 76 insertions, 27 deletions
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp
index fe590fed..a3e600d9 100644
--- a/node/Multicaster.cpp
+++ b/node/Multicaster.cpp
@@ -40,7 +40,8 @@
namespace ZeroTier {
-Multicaster::Multicaster()
+Multicaster::Multicaster(const RuntimeEnvironment *renv) :
+ RR(renv)
{
}
@@ -104,9 +105,9 @@ restart_member_scan:
}
void Multicaster::send(
- const RuntimeEnvironment *RR,
const CertificateOfMembership *com,
unsigned int limit,
+ unsigned int gatherLimit,
uint64_t now,
uint64_t nwid,
const MulticastGroup &mg,
@@ -122,7 +123,19 @@ void Multicaster::send(
// If we already have enough members, just send and we're done -- no need for TX queue
OutboundMulticast out;
- out.init(now,RR->identity.address(),nwid,com,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len);
+ out.init(
+ now,
+ RR->identity.address(),
+ nwid,
+ com,
+ limit,
+ gatherLimit,
+ src,
+ mg,
+ etherType,
+ data,
+ len);
+
unsigned int count = 0;
for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
out.sendOnly(*(RR->sw),m->address); // sendOnly() avoids overhead of creating sent log since we're going to discard this immediately
@@ -134,7 +147,19 @@ void Multicaster::send(
gs.txQueue.push_back(OutboundMulticast());
OutboundMulticast &out = gs.txQueue.back();
- out.init(now,RR->identity.address(),nwid,com,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len);
+ out.init(
+ now,
+ RR->identity.address(),
+ nwid,
+ com,
+ limit,
+ gatherLimit,
+ src,
+ mg,
+ etherType,
+ data,
+ len);
+
for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m)
out.sendAndLog(*(RR->sw),m->address);
@@ -161,13 +186,13 @@ void Multicaster::send(
}
}
-void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now)
+void Multicaster::clean(uint64_t now)
{
Mutex::Lock _l(_groups_m);
for(std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) {
// Remove expired outgoing multicasts from multicast TX queue
for(std::list<OutboundMulticast>::iterator tx(mm->second.txQueue.begin());tx!=mm->second.txQueue.end();) {
- if (tx->expired(now))
+ if ((tx->expired(now))||(tx->atLimit()))
mm->second.txQueue.erase(tx++);
else ++tx;
}
@@ -218,7 +243,7 @@ void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now)
}
}
-void Multicaster::_add(uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member)
+void Multicaster::_add(uint64_t now,uint64_t nwid,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member)
{
// assumes _groups_m is locked
@@ -236,6 +261,14 @@ void Multicaster::_add(uint64_t now,MulticastGroupStatus &gs,const Address &lear
// be resorted on next clean(). In the future we might want to insert
// this somewhere else but we'll try this for now.
gs.members.push_back(MulticastGroupMember(member,learnedFrom,now));
+
+ // Try to send to any outgoing multicasts that are waiting for more recipients
+ for(std::list<OutboundMulticast>::iterator tx(gs.txQueue.begin());tx!=gs.txQueue.end();) {
+ tx->sendIfNew(*(RR->sw),member);
+ if (tx->atLimit())
+ gs.txQueue.erase(tx++);
+ else ++tx;
+ }
}
} // namespace ZeroTier
diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp
index 5c858093..96207aa0 100644
--- a/node/Multicaster.hpp
+++ b/node/Multicaster.hpp
@@ -42,6 +42,7 @@
#include "OutboundMulticast.hpp"
#include "Utils.hpp"
#include "Mutex.hpp"
+#include "NonCopyable.hpp"
namespace ZeroTier {
@@ -52,7 +53,7 @@ class Packet;
/**
* Database of known multicast peers within a network
*/
-class Multicaster
+class Multicaster : NonCopyable
{
private:
struct MulticastGroupMember
@@ -79,7 +80,7 @@ private:
};
public:
- Multicaster();
+ Multicaster(const RuntimeEnvironment *renv);
~Multicaster();
/**
@@ -94,7 +95,7 @@ public:
inline void subscribe(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const Address &learnedFrom,const Address &member)
{
Mutex::Lock _l(_groups_m);
- _add(now,_groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)],learnedFrom,member);
+ _add(now,nwid,_groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)],learnedFrom,member);
}
/**
@@ -120,10 +121,10 @@ public:
/**
* Send a multicast
*
- * @param RR Runtime environment
* @param nwid Network ID
* @param com Certificate of membership to include or NULL for none
* @param limit Multicast limit
+ * @param gatherLimit Limit to pass for implicit gather with MULTICAST_FRAME
* @param now Current time
* @param mg Multicast group
* @param from Source Ethernet MAC address
@@ -132,9 +133,9 @@ public:
* @param len Length of packet data
*/
void send(
- const RuntimeEnvironment *RR,
const CertificateOfMembership *com,
unsigned int limit,
+ unsigned int gatherLimit,
uint64_t now,
uint64_t nwid,
const MulticastGroup &mg,
@@ -149,11 +150,12 @@ public:
* @param RR Runtime environment
* @param now Current time
*/
- void clean(const RuntimeEnvironment *RR,uint64_t now);
+ void clean(uint64_t now);
private:
- void _add(uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member);
+ void _add(uint64_t now,uint64_t nwid,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member);
+ const RuntimeEnvironment *RR;
std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus > _groups;
Mutex _groups_m;
};
diff --git a/node/Node.cpp b/node/Node.cpp
index 05a490b7..6682ad25 100644
--- a/node/Node.cpp
+++ b/node/Node.cpp
@@ -382,7 +382,7 @@ Node::ReasonForTermination Node::run()
RR->http = new HttpClient();
RR->antiRec = new AntiRecursion();
- RR->mc = new Multicaster();
+ RR->mc = new Multicaster(RR);
RR->sw = new Switch(RR);
RR->sm = new SocketManager(impl->udpPort,impl->tcpPort,&_CBztTraffic,RR);
RR->topology = new Topology(RR,Utils::fileExists((RR->homePath + ZT_PATH_SEPARATOR_S + "iddb.d").c_str()));
@@ -605,8 +605,8 @@ Node::ReasonForTermination Node::run()
// Do periodic tasks in submodules.
if ((now - lastClean) >= ZT_DB_CLEAN_PERIOD) {
lastClean = now;
- RR->topology->clean();
- RR->mc->clean(RR,now);
+ RR->topology->clean(now);
+ RR->mc->clean(now);
RR->nc->clean();
if (RR->updater)
RR->updater->checkIfMaxIntervalExceeded(now);
diff --git a/node/OutboundMulticast.cpp b/node/OutboundMulticast.cpp
index cd11a523..7b2e4386 100644
--- a/node/OutboundMulticast.cpp
+++ b/node/OutboundMulticast.cpp
@@ -32,12 +32,24 @@
namespace ZeroTier {
-void OutboundMulticast::init(uint64_t timestamp,const Address &self,uint64_t nwid,const CertificateOfMembership *com,unsigned int gatherLimit,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len)
+void OutboundMulticast::init(
+ uint64_t timestamp,
+ const Address &self,
+ uint64_t nwid,
+ const CertificateOfMembership *com,
+ unsigned int limit,
+ unsigned int gatherLimit,
+ const MAC &src,
+ const MulticastGroup &dest,
+ unsigned int etherType,
+ const void *payload,
+ unsigned int len)
{
_timestamp = timestamp;
_nwid = nwid;
_source = src;
_destination = dest;
+ _limit = limit;
_etherType = etherType;
_packet.setSource(self);
@@ -46,7 +58,7 @@ void OutboundMulticast::init(uint64_t timestamp,const Address &self,uint64_t nwi
self.appendTo(_packet);
_packet.append((uint64_t)nwid);
_packet.append((uint8_t)((com) ? 0x01 : 0x00));
- _packet.append((uint32_t)gatherLimit); // gather limit -- set before send, start with 0
+ _packet.append((uint32_t)gatherLimit);
if (com) com->serialize(_packet);
_packet.append((uint32_t)dest.adi());
dest.mac().appendTo(_packet);
diff --git a/node/OutboundMulticast.hpp b/node/OutboundMulticast.hpp
index 8d717fc1..548171ab 100644
--- a/node/OutboundMulticast.hpp
+++ b/node/OutboundMulticast.hpp
@@ -66,6 +66,7 @@ public:
* @param self My ZeroTier address
* @param nwid Network ID
* @param com Certificate of membership to attach or NULL to omit
+ * @param limit Multicast limit for desired number of packets to send
* @param gatherLimit Number to lazily/implicitly gather with this frame or 0 for none
* @param src Source MAC address of frame
* @param dest Destination multicast group (MAC + ADI)
@@ -79,6 +80,7 @@ public:
const Address &self,
uint64_t nwid,
const CertificateOfMembership *com,
+ unsigned int limit,
unsigned int gatherLimit,
const MAC &src,
const MulticastGroup &dest,
@@ -98,9 +100,9 @@ public:
inline bool expired(uint64_t now) const throw() { return ((now - _timestamp) >= ZT_MULTICAST_TRANSMIT_TIMEOUT); }
/**
- * @return Number of unique recipients to which this packet has already been sent
+ * @return True if this outbound multicast has been sent to enough peers
*/
- inline unsigned int sentToCount() const throw() { return (unsigned int)_alreadySentTo.size(); }
+ inline bool atLimit() const throw() { return (_alreadySentTo.size() > _limit); }
/**
* Just send without checking log
@@ -144,6 +146,7 @@ private:
uint64_t _nwid;
MAC _source;
MulticastGroup _destination;
+ unsigned int _limit;
unsigned int _etherType;
Packet _packet; // packet contains basic structure of MULTICAST_FRAME and payload, is re-used with new IV and addressing each time
std::vector<Address> _alreadySentTo;
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 50e31fdc..c254c762 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -151,9 +151,9 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
TRACE("%s: MULTICAST %s -> %s %s %d",network->tapDeviceName().c_str(),from.toString().c_str(),mg.toString().c_str(),etherTypeName(etherType),(int)data.size());
RR->mc->send(
- RR,
((!nconf->isPublic())&&(nconf->com())) ? &(nconf->com()) : (const CertificateOfMembership *)0,
- network->wantMulticastGroup(mg) ? nconf->multicastLimit() : 0,
+ nconf->multicastLimit(),
+ network->wantMulticastGroup(mg) ? ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER : 0,
now,
network->id(),
mg,
diff --git a/node/Topology.cpp b/node/Topology.cpp
index 8cfe571d..86c1befb 100644
--- a/node/Topology.cpp
+++ b/node/Topology.cpp
@@ -49,7 +49,7 @@ Topology::Topology(const RuntimeEnvironment *renv,bool enablePermanentIdCaching)
Topology::~Topology()
{
- clean();
+ clean(Utils::now());
_dumpPeers();
}
@@ -256,9 +256,8 @@ keep_searching_for_supernodes:
return bestSupernode;
}
-void Topology::clean()
+void Topology::clean(uint64_t now)
{
- uint64_t now = Utils::now();
Mutex::Lock _l(_activePeers_m);
Mutex::Lock _l2(_supernodes_m);
for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();) {
diff --git a/node/Topology.hpp b/node/Topology.hpp
index 45b0a693..8ab10074 100644
--- a/node/Topology.hpp
+++ b/node/Topology.hpp
@@ -186,7 +186,7 @@ public:
/**
* Clean and flush database
*/
- void clean();
+ void clean(uint64_t now);
/**
* Apply a function or function object to all peers