summaryrefslogtreecommitdiff
path: root/node/Multicaster.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2014-09-30 16:28:25 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2014-09-30 16:28:25 -0700
commit2659427864aee89977a58440705f7069c0e6c639 (patch)
tree21cd30ae78ed7b1df414f877d55a9243cfa342e8 /node/Multicaster.cpp
parent8607aa7c3c0a24c3161b605e5195bfd5a7ad1258 (diff)
downloadinfinitytier-2659427864aee89977a58440705f7069c0e6c639.tar.gz
infinitytier-2659427864aee89977a58440705f7069c0e6c639.zip
Multicaster needs to be global, not per-network, and a bunch of other stuff.
Diffstat (limited to 'node/Multicaster.cpp')
-rw-r--r--node/Multicaster.cpp78
1 files changed, 72 insertions, 6 deletions
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp
index 9cd8f8bf..fe590fed 100644
--- a/node/Multicaster.cpp
+++ b/node/Multicaster.cpp
@@ -34,6 +34,7 @@
#include "Switch.hpp"
#include "Packet.hpp"
#include "Peer.hpp"
+#include "CMWC4096.hpp"
#include "CertificateOfMembership.hpp"
#include "RuntimeEnvironment.hpp"
@@ -47,10 +48,75 @@ Multicaster::~Multicaster()
{
}
-void Multicaster::send(const RuntimeEnvironment *RR,uint64_t nwid,const CertificateOfMembership *com,unsigned int limit,uint64_t now,const MulticastGroup &mg,const MAC &src,unsigned int etherType,const void *data,unsigned int len)
+unsigned int Multicaster::gather(const RuntimeEnvironment *RR,uint64_t nwid,MulticastGroup &mg,Packet &appendTo,unsigned int limit) const
+{
+ unsigned char *p;
+ unsigned int n = 0,i,rptr;
+ uint64_t a,done[(ZT_PROTO_MAX_PACKET_LENGTH / 5) + 1];
+
+ Mutex::Lock _l(_groups_m);
+
+ std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::const_iterator gs(_groups.find(std::pair<uint64_t,MulticastGroup>(nwid,mg)));
+ if ((gs == _groups.end())||(gs->second.members.empty())) {
+ appendTo.append((uint32_t)0);
+ appendTo.append((uint16_t)0);
+ return 0;
+ }
+
+ if (limit > gs->second.members.size())
+ limit = (unsigned int)gs->second.members.size();
+ if (limit > 0xffff) // sanity check -- this won't fit in a packet anyway
+ limit = 0xffff;
+
+ appendTo.append((uint32_t)gs->second.members.size());
+ unsigned int nAt = appendTo.size();
+ appendTo.append((uint16_t)0); // set to n later
+
+ while ((n < limit)&&((appendTo.size() + ZT_ADDRESS_LENGTH) <= ZT_PROTO_MAX_PACKET_LENGTH)) {
+ // Pick a member at random -- if we've already picked it,
+ // keep circling the buffer until we find one we haven't.
+ // This won't loop forever since limit <= members.size().
+ rptr = (unsigned int)RR->prng->next32();
+restart_member_scan:
+ a = gs->second.members[rptr % (unsigned int)gs->second.members.size()].address.toInt();
+ for(i=0;i<n;++i) {
+ if (done[i] == a) {
+ ++rptr;
+ goto restart_member_scan;
+ }
+ }
+
+ // Log that we've picked this one
+ done[n++] = a;
+
+ // Append to packet
+ p = (unsigned char *)appendTo.appendField(ZT_ADDRESS_LENGTH);
+ *(p++) = (unsigned char)((a >> 32) & 0xff);
+ *(p++) = (unsigned char)((a >> 24) & 0xff);
+ *(p++) = (unsigned char)((a >> 16) & 0xff);
+ *(p++) = (unsigned char)((a >> 8) & 0xff);
+ *p = (unsigned char)(a & 0xff);
+ }
+
+ appendTo.setAt(nAt,(uint16_t)n);
+
+ return n;
+}
+
+void Multicaster::send(
+ const RuntimeEnvironment *RR,
+ const CertificateOfMembership *com,
+ unsigned int limit,
+ uint64_t now,
+ uint64_t nwid,
+ const MulticastGroup &mg,
+ const MAC &src,
+ unsigned int etherType,
+ const void *data,
+ unsigned int len)
{
Mutex::Lock _l(_groups_m);
- MulticastGroupStatus &gs = _groups[mg];
+ MulticastGroupStatus &gs = _groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)];
if (gs.members.size() >= limit) {
// If we already have enough members, just send and we're done -- no need for TX queue
@@ -95,13 +161,13 @@ void Multicaster::send(const RuntimeEnvironment *RR,uint64_t nwid,const Certific
}
}
-void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now,unsigned int limit)
+void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now)
{
Mutex::Lock _l(_groups_m);
- for(std::map< MulticastGroup,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) {
+ for(std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) {
// Remove expired outgoing multicasts from multicast TX queue
for(std::list<OutboundMulticast>::iterator tx(mm->second.txQueue.begin());tx!=mm->second.txQueue.end();) {
- if ((tx->expired(now))||(tx->sentToCount() >= limit))
+ if (tx->expired(now))
mm->second.txQueue.erase(tx++);
else ++tx;
}
@@ -152,7 +218,7 @@ void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now,unsigned int l
}
}
-void Multicaster::_add(const RuntimeEnvironment *RR,uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member)
+void Multicaster::_add(uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member)
{
// assumes _groups_m is locked