summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
Diffstat (limited to 'node')
-rw-r--r--node/Constants.hpp8
-rw-r--r--node/Multicaster.cpp63
-rw-r--r--node/Multicaster.hpp3
3 files changed, 38 insertions, 36 deletions
diff --git a/node/Constants.hpp b/node/Constants.hpp
index 7fd7be35..5e052bcf 100644
--- a/node/Constants.hpp
+++ b/node/Constants.hpp
@@ -243,15 +243,9 @@
#define ZT_MULTICAST_EXPLICIT_GATHER_DELAY (ZT_MULTICAST_LIKE_EXPIRE / 10)
/**
- * Minimum delay between implicit gathers via MULTICAST_FRAME
- */
-#define ZT_MULTICAST_IMPLICIT_GATHER_DELAY 30000
-
-/**
* Timeout for outgoing multicasts
*
- * Attempts will be made to gather recipients and send until we reach
- * the limit or sending times out.
+ * This is how long we wait for explicit or implicit gather results.
*/
#define ZT_MULTICAST_TRANSMIT_TIMEOUT 5000
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp
index a9bfbbed..94c6a45b 100644
--- a/node/Multicaster.cpp
+++ b/node/Multicaster.cpp
@@ -56,7 +56,7 @@ unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const
{
unsigned char *p;
unsigned int added = 0,i,k,rptr,totalKnown = 0;
- uint64_t a,picked[(ZT_PROTO_MAX_PACKET_LENGTH / 5) + 1];
+ uint64_t a,picked[(ZT_PROTO_MAX_PACKET_LENGTH / 5) + 2];
if (!limit)
return 0;
@@ -88,6 +88,7 @@ unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const
k = 0;
while ((added < limit)&&(k < gs->second.members.size())&&((appendTo.size() + ZT_ADDRESS_LENGTH) <= ZT_PROTO_MAX_PACKET_LENGTH)) {
rptr = (unsigned int)RR->prng->next32();
+
restart_member_scan:
a = gs->second.members[rptr % (unsigned int)gs->second.members.size()].address.toInt();
for(i=0;i<k;++i) {
@@ -176,22 +177,25 @@ void Multicaster::send(
continue;
}
- if (count++ >= limit)
- break;
out.sendOnly(RR,*ast);
+ if (++count >= limit)
+ break;
}
- for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
- { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
- SharedPtr<Peer> p(RR->topology->getPeer(m->address));
- if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
- continue;
- }
+ if (count < limit) {
+ for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
+ { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
+ SharedPtr<Peer> p(RR->topology->getPeer(m->address));
+ if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
+ continue;
+ }
- if (count++ >= limit)
- break;
- if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end())
- out.sendOnly(RR,m->address);
+ if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end()) {
+ out.sendOnly(RR,m->address);
+ if (++count >= limit)
+ break;
+ }
+ }
}
} else {
unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
@@ -212,10 +216,6 @@ void Multicaster::send(
sn->send(RR,outp.data(),outp.size(),now);
}
gatherLimit = 0; // implicit not needed
- } else if ((now - gs.lastImplicitGather) > ZT_MULTICAST_IMPLICIT_GATHER_DELAY) {
- gs.lastImplicitGather = now;
- } else {
- gatherLimit = 0;
}
gs.txQueue.push_back(OutboundMulticast());
@@ -234,6 +234,8 @@ void Multicaster::send(
data,
len);
+ unsigned int count = 0;
+
for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr<Peer> p(RR->topology->getPeer(*ast));
@@ -242,17 +244,24 @@ void Multicaster::send(
}
out.sendAndLog(RR,*ast);
+ if (++count >= limit)
+ break;
}
- for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
- { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
- SharedPtr<Peer> p(RR->topology->getPeer(m->address));
- if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
- continue;
- }
+ if (count < limit) {
+ for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
+ { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
+ SharedPtr<Peer> p(RR->topology->getPeer(m->address));
+ if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
+ continue;
+ }
- if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end())
- out.sendAndLog(RR,m->address);
+ if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end()) {
+ out.sendAndLog(RR,m->address);
+ if (++count >= limit)
+ break;
+ }
+ }
}
}
@@ -331,12 +340,12 @@ void Multicaster::clean(uint64_t now)
if (writer->learnedFrom) {
SharedPtr<Peer> p(RR->topology->getPeer(writer->learnedFrom));
if (p)
- writer->rank = p->lastUnicastFrame() - ZT_MULTICAST_LIKE_EXPIRE;
+ writer->rank = (RR->topology->amSupernode() ? p->lastDirectReceive() : p->lastUnicastFrame()) - ZT_MULTICAST_LIKE_EXPIRE;
else writer->rank = writer->timestamp - (86400000 + ZT_MULTICAST_LIKE_EXPIRE);
} else {
SharedPtr<Peer> p(RR->topology->getPeer(writer->address));
if (p)
- writer->rank = p->lastUnicastFrame();
+ writer->rank = (RR->topology->amSupernode() ? p->lastDirectReceive() : p->lastUnicastFrame());
else writer->rank = writer->timestamp - 86400000;
}
diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp
index ed9c0d30..9951775e 100644
--- a/node/Multicaster.hpp
+++ b/node/Multicaster.hpp
@@ -72,10 +72,9 @@ private:
struct MulticastGroupStatus
{
- MulticastGroupStatus() : lastExplicitGather(0),lastImplicitGather(0) {}
+ MulticastGroupStatus() : lastExplicitGather(0) {}
uint64_t lastExplicitGather;
- uint64_t lastImplicitGather;
std::list<OutboundMulticast> txQueue; // pending outbound multicasts
std::vector<MulticastGroupMember> members; // members of this group
};