From e1882b614b48bf0c2a68223b3ee2fb338dbcb8f6 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 1 Oct 2014 14:05:25 -0700 Subject: Some cleanup, Multicaster now sends multicasts as it gets additional members. --- node/Multicaster.cpp | 47 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 7 deletions(-) (limited to 'node/Multicaster.cpp') diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp index fe590fed..a3e600d9 100644 --- a/node/Multicaster.cpp +++ b/node/Multicaster.cpp @@ -40,7 +40,8 @@ namespace ZeroTier { -Multicaster::Multicaster() +Multicaster::Multicaster(const RuntimeEnvironment *renv) : + RR(renv) { } @@ -104,9 +105,9 @@ restart_member_scan: } void Multicaster::send( - const RuntimeEnvironment *RR, const CertificateOfMembership *com, unsigned int limit, + unsigned int gatherLimit, uint64_t now, uint64_t nwid, const MulticastGroup &mg, @@ -122,7 +123,19 @@ void Multicaster::send( // If we already have enough members, just send and we're done -- no need for TX queue OutboundMulticast out; - out.init(now,RR->identity.address(),nwid,com,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len); + out.init( + now, + RR->identity.address(), + nwid, + com, + limit, + gatherLimit, + src, + mg, + etherType, + data, + len); + unsigned int count = 0; for(std::vector::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) { out.sendOnly(*(RR->sw),m->address); // sendOnly() avoids overhead of creating sent log since we're going to discard this immediately @@ -134,7 +147,19 @@ void Multicaster::send( gs.txQueue.push_back(OutboundMulticast()); OutboundMulticast &out = gs.txQueue.back(); - out.init(now,RR->identity.address(),nwid,com,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len); + out.init( + now, + RR->identity.address(), + nwid, + com, + limit, + gatherLimit, + src, + mg, + etherType, + data, + len); + for(std::vector::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) out.sendAndLog(*(RR->sw),m->address); @@ -161,13 +186,13 @@ void Multicaster::send( } } -void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now) +void Multicaster::clean(uint64_t now) { Mutex::Lock _l(_groups_m); for(std::map< std::pair,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) { // Remove expired outgoing multicasts from multicast TX queue for(std::list::iterator tx(mm->second.txQueue.begin());tx!=mm->second.txQueue.end();) { - if (tx->expired(now)) + if ((tx->expired(now))||(tx->atLimit())) mm->second.txQueue.erase(tx++); else ++tx; } @@ -218,7 +243,7 @@ void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now) } } -void Multicaster::_add(uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member) +void Multicaster::_add(uint64_t now,uint64_t nwid,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member) { // assumes _groups_m is locked @@ -236,6 +261,14 @@ void Multicaster::_add(uint64_t now,MulticastGroupStatus &gs,const Address &lear // be resorted on next clean(). In the future we might want to insert // this somewhere else but we'll try this for now. gs.members.push_back(MulticastGroupMember(member,learnedFrom,now)); + + // Try to send to any outgoing multicasts that are waiting for more recipients + for(std::list::iterator tx(gs.txQueue.begin());tx!=gs.txQueue.end();) { + tx->sendIfNew(*(RR->sw),member); + if (tx->atLimit()) + gs.txQueue.erase(tx++); + else ++tx; + } } } // namespace ZeroTier -- cgit v1.2.3