summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2014-11-21 10:50:27 -0800
committerAdam Ierymenko <adam.ierymenko@gmail.com>2014-11-21 10:50:27 -0800
commit7619b0ecbdb3485ff5f2531cea4b660e793b5467 (patch)
tree75f4e0e6b855f53f54bf36b05112e33f071b8d19
parent0c85b4ef5f406c51454b33ce22f0ce1585dbfb93 (diff)
downloadinfinitytier-7619b0ecbdb3485ff5f2531cea4b660e793b5467.tar.gz
infinitytier-7619b0ecbdb3485ff5f2531cea4b660e793b5467.zip
Send multicasts in random order.
This should not affect most users, but on large networks it should cause service announcements to work a lot better. This is the result of a prolonged discussion with a user about the visibility of game servers on a large network. The old multicast algorithm was de-facto randomized due to its distributed nature, while the new algorithm is more deterministic. This will restore some randomization beyond limit-overflow conditions. It won't affect small networks at all.
-rw-r--r--node/IncomingPacket.cpp6
-rw-r--r--node/Multicaster.cpp242
-rw-r--r--node/Multicaster.hpp22
3 files changed, 127 insertions, 143 deletions
diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp
index efda5370..ca72687e 100644
--- a/node/IncomingPacket.cpp
+++ b/node/IncomingPacket.cpp
@@ -306,7 +306,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,const SharedPtr<Peer> &p
TRACE("%s(%s): OK(MULTICAST_GATHER) %.16llx/%s length %u",source().toString().c_str(),_remoteAddress.toString().c_str(),nwid,mg.toString().c_str(),size());
unsigned int count = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_GATHER__OK__IDX_GATHER_RESULTS + 4);
- RR->mc->addMultiple(Utils::now(),nwid,mg,peer->address(),field(ZT_PROTO_VERB_MULTICAST_GATHER__OK__IDX_GATHER_RESULTS + 6,count * 5),count,at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER__OK__IDX_GATHER_RESULTS));
+ RR->mc->addMultiple(Utils::now(),nwid,mg,field(ZT_PROTO_VERB_MULTICAST_GATHER__OK__IDX_GATHER_RESULTS + 6,count * 5),count,at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER__OK__IDX_GATHER_RESULTS));
} break;
case Packet::VERB_MULTICAST_FRAME: {
@@ -332,7 +332,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,const SharedPtr<Peer> &p
offset += ZT_PROTO_VERB_MULTICAST_FRAME__OK__IDX_COM_AND_GATHER_RESULTS;
unsigned int totalKnown = at<uint32_t>(offset); offset += 4;
unsigned int count = at<uint16_t>(offset); offset += 2;
- RR->mc->addMultiple(Utils::now(),nwid,mg,peer->address(),field(offset,count * 5),count,totalKnown);
+ RR->mc->addMultiple(Utils::now(),nwid,mg,field(offset,count * 5),count,totalKnown);
}
} break;
@@ -655,7 +655,7 @@ bool IncomingPacket::_doMULTICAST_LIKE(const RuntimeEnvironment *RR,const Shared
// Iterate through 18-byte network,MAC,ADI tuples
for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;ptr<size();ptr+=18)
- RR->mc->add(now,at<uint64_t>(ptr),MulticastGroup(MAC(field(ptr + 8,6),6),at<uint32_t>(ptr + 14)),Address(),peer->address());
+ RR->mc->add(now,at<uint64_t>(ptr),MulticastGroup(MAC(field(ptr + 8,6),6),at<uint32_t>(ptr + 14)),peer->address());
peer->received(RR,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_MULTICAST_LIKE,0,Packet::VERB_NOP,now);
} catch (std::exception &ex) {
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp
index aa9fef7b..6c098cc1 100644
--- a/node/Multicaster.cpp
+++ b/node/Multicaster.cpp
@@ -52,18 +52,16 @@ Multicaster::~Multicaster()
{
}
-void Multicaster::addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const Address &learnedFrom,const void *addresses,unsigned int count,unsigned int totalKnown)
+void Multicaster::addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const void *addresses,unsigned int count,unsigned int totalKnown)
{
const unsigned char *p = (const unsigned char *)addresses;
const unsigned char *e = p + (5 * count);
Mutex::Lock _l(_groups_m);
MulticastGroupStatus &gs = _groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)];
while (p != e) {
- _add(now,nwid,mg,gs,learnedFrom,Address(p,5));
+ _add(now,nwid,mg,gs,Address(p,5));
p += 5;
}
- if (RR->topology->isSupernode(learnedFrom))
- gs.totalKnownMembers = totalKnown;
}
unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Packet &appendTo,unsigned int limit) const
@@ -160,123 +158,145 @@ void Multicaster::send(
const void *data,
unsigned int len)
{
+ unsigned long idxbuf[8194];
+ unsigned long *indexes = idxbuf;
+
Mutex::Lock _l(_groups_m);
MulticastGroupStatus &gs = _groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)];
- if (gs.members.size() >= limit) {
- // If we already have enough members, just send and we're done. We can
- // skip the TX queue and skip the overhead of maintaining a send log by
- // using sendOnly().
- OutboundMulticast out;
-
- out.init(
- RR,
- now,
- nwid,
- com,
- limit,
- 0,
- src,
- mg,
- etherType,
- data,
- len);
+ if (!gs.members.empty()) {
+ // Use a stack-allocated buffer unless this multicast group is ridiculously huge
+ if (gs.members.size() > 8194)
+ indexes = new unsigned long[gs.members.size()];
+
+ // Generate a random permutation of member indexes
+ for(unsigned long i=0;i<gs.members.size();++i)
+ indexes[i] = i;
+ for(unsigned long i=gs.members.size()-1;i>0;--i) {
+ unsigned long j = RR->prng->next32() % (i + 1);
+ unsigned long tmp = indexes[j];
+ indexes[j] = indexes[i];
+ indexes[i] = tmp;
+ }
- unsigned int count = 0;
+ if (gs.members.size() >= limit) {
+ // If we already have enough members, just send and we're done. We can
+ // skip the TX queue and skip the overhead of maintaining a send log by
+ // using sendOnly().
+ OutboundMulticast out;
+
+ out.init(
+ RR,
+ now,
+ nwid,
+ com,
+ limit,
+ 2, // we'll still gather a little from peers to keep multicast list fresh
+ src,
+ mg,
+ etherType,
+ data,
+ len);
+
+ unsigned int count = 0;
+
+ for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
+ { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
+ SharedPtr<Peer> p(RR->topology->getPeer(*ast));
+ if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
+ continue;
+ }
- for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
- { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
- SharedPtr<Peer> p(RR->topology->getPeer(*ast));
- if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
- continue;
+ out.sendOnly(RR,*ast);
+ if (++count >= limit)
+ break;
}
- out.sendOnly(RR,*ast);
- if (++count >= limit)
- break;
- }
+ unsigned long idx = 0;
+ while (count < limit) {
+ const MulticastGroupMember &m = gs.members[indexes[idx++]];
- if (count < limit) {
- for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
- SharedPtr<Peer> p(RR->topology->getPeer(m->address));
+ SharedPtr<Peer> p(RR->topology->getPeer(m.address));
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
continue;
}
- if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end()) {
- out.sendOnly(RR,m->address);
- if (++count >= limit)
- break;
+ if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) {
+ out.sendOnly(RR,m.address);
+ ++count;
}
}
- }
- } else {
- unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
-
- if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) {
- gs.lastExplicitGather = now;
- SharedPtr<Peer> sn(RR->topology->getBestSupernode());
- if (sn) {
- TRACE(">>MC GATHER up to %u in %.16llx/%s",gatherLimit,nwid,mg.toString().c_str());
-
- Packet outp(sn->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
- outp.append(nwid);
- outp.append((uint8_t)0);
- mg.mac().appendTo(outp);
- outp.append((uint32_t)mg.adi());
- outp.append((uint32_t)gatherLimit); // +1 just means we'll have an extra in the queue if available
- outp.armor(sn->key(),true);
- sn->send(RR,outp.data(),outp.size(),now);
+ } else {
+ unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
+
+ if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) {
+ gs.lastExplicitGather = now;
+ SharedPtr<Peer> sn(RR->topology->getBestSupernode());
+ if (sn) {
+ TRACE(">>MC GATHER up to %u in %.16llx/%s",gatherLimit,nwid,mg.toString().c_str());
+
+ Packet outp(sn->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
+ outp.append(nwid);
+ outp.append((uint8_t)0);
+ mg.mac().appendTo(outp);
+ outp.append((uint32_t)mg.adi());
+ outp.append((uint32_t)gatherLimit); // +1 just means we'll have an extra in the queue if available
+ outp.armor(sn->key(),true);
+ sn->send(RR,outp.data(),outp.size(),now);
+ }
+ gatherLimit = 0; // don't need to gather from peers this time since we consulted the core
}
- gatherLimit = 0; // implicit not needed
- }
- gs.txQueue.push_back(OutboundMulticast());
- OutboundMulticast &out = gs.txQueue.back();
-
- out.init(
- RR,
- now,
- nwid,
- com,
- limit,
- gatherLimit,
- src,
- mg,
- etherType,
- data,
- len);
-
- unsigned int count = 0;
+ gs.txQueue.push_back(OutboundMulticast());
+ OutboundMulticast &out = gs.txQueue.back();
+
+ out.init(
+ RR,
+ now,
+ nwid,
+ com,
+ limit,
+ gatherLimit,
+ src,
+ mg,
+ etherType,
+ data,
+ len);
+
+ unsigned int count = 0;
+
+ for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
+ { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
+ SharedPtr<Peer> p(RR->topology->getPeer(*ast));
+ if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
+ continue;
+ }
- for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
- { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
- SharedPtr<Peer> p(RR->topology->getPeer(*ast));
- if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
- continue;
+ out.sendAndLog(RR,*ast);
+ if (++count >= limit)
+ break;
}
- out.sendAndLog(RR,*ast);
- if (++count >= limit)
- break;
- }
+ unsigned long idx = 0;
+ while ((count < limit)&&(idx < gs.members.size())) {
+ const MulticastGroupMember &m = gs.members[indexes[idx++]];
- if (count < limit) {
- for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
- SharedPtr<Peer> p(RR->topology->getPeer(m->address));
+ SharedPtr<Peer> p(RR->topology->getPeer(m.address));
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
continue;
}
- if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end()) {
- out.sendAndLog(RR,m->address);
- if (++count >= limit)
- break;
+ if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) {
+ out.sendAndLog(RR,m.address);
+ ++count;
}
}
}
+
+ if (indexes != idxbuf)
+ delete [] indexes;
}
// DEPRECATED / LEGACY / TODO:
@@ -344,25 +364,6 @@ void Multicaster::clean(uint64_t now)
while (reader != mm->second.members.end()) {
if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) {
*writer = *reader;
-
- /* We rank in ascending order of most recent relevant activity. For peers we've learned
- * about by direct LIKEs, we do this in order of their own activity. For indirectly
- * acquired peers we do this minus a constant to place these categorically below directly
- * learned peers. For peers with no active Peer record, we use the time we last learned
- * about them minus one day (a large constant) to put these at the bottom of the list.
- * List is sorted in ascending order of rank and multicasts are sent last-to-first. */
- if (writer->learnedFrom != writer->address) {
- SharedPtr<Peer> p(RR->topology->getPeer(writer->learnedFrom));
- if (p)
- writer->rank = (RR->topology->amSupernode() ? p->lastDirectReceive() : p->lastUnicastFrame()) - ZT_MULTICAST_LIKE_EXPIRE;
- else writer->rank = writer->timestamp - (86400000 + ZT_MULTICAST_LIKE_EXPIRE);
- } else {
- SharedPtr<Peer> p(RR->topology->getPeer(writer->address));
- if (p)
- writer->rank = (RR->topology->amSupernode() ? p->lastDirectReceive() : p->lastUnicastFrame());
- else writer->rank = writer->timestamp - 86400000;
- }
-
++writer;
++count;
}
@@ -370,12 +371,9 @@ void Multicaster::clean(uint64_t now)
}
if (count) {
- // There are remaining members, so re-sort them by rank and resize the vector
- std::sort(mm->second.members.begin(),writer); // sorts in ascending order of rank
- mm->second.members.resize(count); // trim off the ones we cut, after writer
+ mm->second.members.resize(count);
++mm;
} else if (mm->second.txQueue.empty()) {
- // There are no remaining members and no pending multicasts, so erase the entry
_groups.erase(mm++);
} else {
mm->second.members.clear();
@@ -384,7 +382,7 @@ void Multicaster::clean(uint64_t now)
}
}
-void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member)
+void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &member)
{
// assumes _groups_m is locked
@@ -392,20 +390,14 @@ void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,Multi
if (member == RR->identity.address())
return;
- // Update timestamp and learnedFrom if existing
for(std::vector<MulticastGroupMember>::iterator m(gs.members.begin());m!=gs.members.end();++m) {
if (m->address == member) {
- if (m->learnedFrom != member) // once we learn it directly, remember this forever
- m->learnedFrom = learnedFrom;
m->timestamp = now;
return;
}
}
- // If not existing, add to end of list (highest priority) -- these will
- // be resorted on next clean(). In the future we might want to insert
- // this somewhere else but we'll try this for now.
- gs.members.push_back(MulticastGroupMember(member,learnedFrom,now));
+ gs.members.push_back(MulticastGroupMember(member,now));
//TRACE("..MC %s joined multicast group %.16llx/%s via %s",member.toString().c_str(),nwid,mg.toString().c_str(),((learnedFrom) ? learnedFrom.toString().c_str() : "(direct)"));
@@ -414,9 +406,9 @@ void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,Multi
SharedPtr<Peer> p(RR->topology->getPeer(member));
if ((!p)||(!p->remoteVersionKnown())||(p->remoteVersionMajor() >= 1)) {
for(std::list<OutboundMulticast>::iterator tx(gs.txQueue.begin());tx!=gs.txQueue.end();) {
- if (tx->atLimit())
+ if (tx->atLimit()) {
gs.txQueue.erase(tx++);
- else {
+ } else {
tx->sendIfNew(RR,member);
if (tx->atLimit())
gs.txQueue.erase(tx++);
diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp
index edfb62c6..3aebe57f 100644
--- a/node/Multicaster.hpp
+++ b/node/Multicaster.hpp
@@ -59,23 +59,17 @@ private:
struct MulticastGroupMember
{
MulticastGroupMember() {}
- MulticastGroupMember(const Address &a,const Address &lf,uint64_t ts) : address(a),learnedFrom(lf),timestamp(ts),rank(0) {}
+ MulticastGroupMember(const Address &a,uint64_t ts) : address(a),timestamp(ts) {}
Address address;
- Address learnedFrom;
- uint64_t timestamp; // time of last LIKE/OK(GATHER)
- uint64_t rank; // used by sorting algorithm in clean()
-
- // for sorting in ascending order of rank
- inline bool operator<(const MulticastGroupMember &m) const throw() { return (rank < m.rank); }
+ uint64_t timestamp; // time of last notification
};
struct MulticastGroupStatus
{
- MulticastGroupStatus() : lastExplicitGather(0),totalKnownMembers(0) {}
+ MulticastGroupStatus() : lastExplicitGather(0) {}
uint64_t lastExplicitGather;
- unsigned int totalKnownMembers; // 0 if unknown
std::list<OutboundMulticast> txQueue; // pending outbound multicasts
std::vector<MulticastGroupMember> members; // members of this group
};
@@ -90,13 +84,12 @@ public:
* @param now Current time
* @param nwid Network ID
* @param mg Multicast group
- * @param learnedFrom Address from which we learned this member
* @param member New member address
*/
- inline void add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const Address &learnedFrom,const Address &member)
+ inline void add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const Address &member)
{
Mutex::Lock _l(_groups_m);
- _add(now,nwid,mg,_groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)],learnedFrom,member);
+ _add(now,nwid,mg,_groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)],member);
}
/**
@@ -107,12 +100,11 @@ public:
* @param now Current time
* @param nwid Network ID
* @param mg Multicast group
- * @param learnedFrom Peer from which we received this list
* @param addresses Raw binary addresses in big-endian format, as a series of 5-byte fields
* @param count Number of addresses
* @param totalKnown Total number of known addresses as reported by peer
*/
- void addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const Address &learnedFrom,const void *addresses,unsigned int count,unsigned int totalKnown);
+ void addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const void *addresses,unsigned int count,unsigned int totalKnown);
/**
* Append gather results to a packet by choosing registered multicast recipients at random
@@ -177,7 +169,7 @@ public:
void clean(uint64_t now);
private:
- void _add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member);
+ void _add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &member);
const RuntimeEnvironment *RR;
std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus > _groups;