summaryrefslogtreecommitdiff
path: root/node/Node.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2018-01-25 07:11:59 -0500
committerAdam Ierymenko <adam.ierymenko@gmail.com>2018-01-25 07:11:59 -0500
commit4419734a7db21cd6a9ee8aba911f220cbecba085 (patch)
tree93bda7e55ef906db246434509eb5334513c27b87 /node/Node.cpp
parentf821db29f34d040d59b6118164bf3c7242959a0e (diff)
downloadinfinitytier-4419734a7db21cd6a9ee8aba911f220cbecba085.tar.gz
infinitytier-4419734a7db21cd6a9ee8aba911f220cbecba085.zip
Implement continuous contacting of designated anchors and multicast replicators - GitHub issue #666
Diffstat (limited to 'node/Node.cpp')
-rw-r--r--node/Node.cpp104
1 files changed, 56 insertions, 48 deletions
diff --git a/node/Node.cpp b/node/Node.cpp
index af03669c..46081adb 100644
--- a/node/Node.cpp
+++ b/node/Node.cpp
@@ -176,31 +176,25 @@ ZT_ResultCode Node::processVirtualNetworkFrame(
class _PingPeersThatNeedPing
{
public:
- _PingPeersThatNeedPing(const RuntimeEnvironment *renv,void *tPtr,Hashtable< Address,std::vector<InetAddress> > &upstreamsToContact,int64_t now) :
- lastReceiveFromUpstream(0),
+ _PingPeersThatNeedPing(const RuntimeEnvironment *renv,void *tPtr,Hashtable< Address,std::vector<InetAddress> > &alwaysContact,int64_t now) :
RR(renv),
_tPtr(tPtr),
- _upstreamsToContact(upstreamsToContact),
+ _alwaysContact(alwaysContact),
_now(now),
_bestCurrentUpstream(RR->topology->getUpstreamPeer())
{
}
- int64_t lastReceiveFromUpstream; // tracks last time we got a packet from an 'upstream' peer like a root or a relay
-
inline void operator()(Topology &t,const SharedPtr<Peer> &p)
{
- const std::vector<InetAddress> *const upstreamStableEndpoints = _upstreamsToContact.get(p->address());
- if (upstreamStableEndpoints) {
- // Upstreams must be pinged constantly over both IPv4 and IPv6 to allow
- // them to perform three way handshake introductions for both stacks.
-
+ const std::vector<InetAddress> *const alwaysContactEndpoints = _alwaysContact.get(p->address());
+ if (alwaysContactEndpoints) {
const unsigned int sent = p->doPingAndKeepalive(_tPtr,_now);
bool contacted = (sent != 0);
if ((sent & 0x1) == 0) { // bit 0x1 == IPv4 sent
- for(unsigned long k=0,ptr=(unsigned long)RR->node->prng();k<(unsigned long)upstreamStableEndpoints->size();++k) {
- const InetAddress &addr = (*upstreamStableEndpoints)[ptr++ % upstreamStableEndpoints->size()];
+ for(unsigned long k=0,ptr=(unsigned long)RR->node->prng();k<(unsigned long)alwaysContactEndpoints->size();++k) {
+ const InetAddress &addr = (*alwaysContactEndpoints)[ptr++ % alwaysContactEndpoints->size()];
if (addr.ss_family == AF_INET) {
p->sendHELLO(_tPtr,-1,addr,_now);
contacted = true;
@@ -210,8 +204,8 @@ public:
}
if ((sent & 0x2) == 0) { // bit 0x2 == IPv6 sent
- for(unsigned long k=0,ptr=(unsigned long)RR->node->prng();k<(unsigned long)upstreamStableEndpoints->size();++k) {
- const InetAddress &addr = (*upstreamStableEndpoints)[ptr++ % upstreamStableEndpoints->size()];
+ for(unsigned long k=0,ptr=(unsigned long)RR->node->prng();k<(unsigned long)alwaysContactEndpoints->size();++k) {
+ const InetAddress &addr = (*alwaysContactEndpoints)[ptr++ % alwaysContactEndpoints->size()];
if (addr.ss_family == AF_INET6) {
p->sendHELLO(_tPtr,-1,addr,_now);
contacted = true;
@@ -220,19 +214,14 @@ public:
}
}
- // If we have no memoized addresses for this upstream peer, attempt to contact
- // it indirectly so we will be introduced.
if ((!contacted)&&(_bestCurrentUpstream)) {
const SharedPtr<Path> up(_bestCurrentUpstream->getBestPath(_now,true));
if (up)
p->sendHELLO(_tPtr,up->localSocket(),up->address(),_now);
}
- lastReceiveFromUpstream = std::max(p->lastReceive(),lastReceiveFromUpstream);
-
- _upstreamsToContact.erase(p->address()); // after this we'll WHOIS all upstreams that remain
+ _alwaysContact.erase(p->address()); // after this we'll WHOIS all upstreams that remain
} else if (p->isActive(_now)) {
- // Regular non-upstream nodes get pinged if they appear active.
p->doPingAndKeepalive(_tPtr,_now);
}
}
@@ -240,7 +229,7 @@ public:
private:
const RuntimeEnvironment *RR;
void *_tPtr;
- Hashtable< Address,std::vector<InetAddress> > &_upstreamsToContact;
+ Hashtable< Address,std::vector<InetAddress> > &_alwaysContact;
const int64_t _now;
const SharedPtr<Peer> _bestCurrentUpstream;
};
@@ -256,41 +245,60 @@ ZT_ResultCode Node::processBackgroundTasks(void *tptr,int64_t now,volatile int64
try {
_lastPingCheck = now;
- // Do pings and keepalives
- Hashtable< Address,std::vector<InetAddress> > upstreamsToContact;
- RR->topology->getUpstreamsToContact(upstreamsToContact);
- _PingPeersThatNeedPing pfunc(RR,tptr,upstreamsToContact,now);
- RR->topology->eachPeer<_PingPeersThatNeedPing &>(pfunc);
-
- // Run WHOIS to create Peer for any upstreams we could not contact (including pending moon seeds)
- Hashtable< Address,std::vector<InetAddress> >::Iterator i(upstreamsToContact);
- Address *upstreamAddress = (Address *)0;
- std::vector<InetAddress> *upstreamStableEndpoints = (std::vector<InetAddress> *)0;
- while (i.next(upstreamAddress,upstreamStableEndpoints))
- RR->sw->requestWhois(tptr,now,*upstreamAddress);
+ // Get designated VL1 upstreams
+ Hashtable< Address,std::vector<InetAddress> > alwaysContact;
+ RR->topology->getUpstreamsToContact(alwaysContact);
- // Get networks that need config without leaving mutex locked
+ // Check last receive time on designated upstreams to see if we seem to be online
+ int64_t lastReceivedFromUpstream = 0;
{
- std::vector< std::pair< SharedPtr<Network>,bool > > nwl;
- {
- Mutex::Lock _l(_networks_m);
- nwl.reserve(_networks.size()+1);
- Hashtable< uint64_t,SharedPtr<Network> >::Iterator i(_networks);
- uint64_t *k = (uint64_t *)0;
- SharedPtr<Network> *v = (SharedPtr<Network> *)0;
- while (i.next(k,v))
- nwl.push_back( std::pair< SharedPtr<Network>,bool >(*v,(((now - (*v)->lastConfigUpdate()) >= ZT_NETWORK_AUTOCONF_DELAY)||(!(*v)->hasConfig()))) );
+ Hashtable< Address,std::vector<InetAddress> >::Iterator i(alwaysContact);
+ Address *upstreamAddress = (Address *)0;
+ std::vector<InetAddress> *upstreamStableEndpoints = (std::vector<InetAddress> *)0;
+ while (i.next(upstreamAddress,upstreamStableEndpoints)) {
+ SharedPtr<Peer> p(RR->topology->getPeerNoCache(*upstreamAddress));
+ if (p)
+ lastReceivedFromUpstream = std::max(p->lastReceive(),lastReceivedFromUpstream);
}
- for(std::vector< std::pair< SharedPtr<Network>,bool > >::const_iterator n(nwl.begin());n!=nwl.end();++n) {
- if (n->second)
- n->first->requestConfiguration(tptr);
- n->first->sendUpdatesToMembers(tptr);
+ }
+
+ // Get peers we should stay connected to according to network configs
+ // Also get networks and whether they need config
+ std::vector< std::pair< SharedPtr<Network>,bool > > networkConfigNeeded;
+ {
+ Mutex::Lock l(_networks_m);
+ Hashtable< uint64_t,SharedPtr<Network> >::Iterator i(_networks);
+ uint64_t *nwid = (uint64_t *)0;
+ SharedPtr<Network> *network = (SharedPtr<Network> *)0;
+ while (i.next(nwid,network)) {
+ (*network)->config().getAlwaysContactAddresses(alwaysContact);
+ networkConfigNeeded.push_back( std::pair< SharedPtr<Network>,bool >(*network,(((now - (*network)->lastConfigUpdate()) >= ZT_NETWORK_AUTOCONF_DELAY)||(!(*network)->hasConfig()))) );
}
}
+ // Ping active peers, upstreams, and others that we should always contact
+ _PingPeersThatNeedPing pfunc(RR,tptr,alwaysContact,now);
+ RR->topology->eachPeer<_PingPeersThatNeedPing &>(pfunc);
+
+ // Run WHOIS to create Peer for alwaysContact addresses that could not be contacted
+ {
+ Hashtable< Address,std::vector<InetAddress> >::Iterator i(alwaysContact);
+ Address *upstreamAddress = (Address *)0;
+ std::vector<InetAddress> *upstreamStableEndpoints = (std::vector<InetAddress> *)0;
+ while (i.next(upstreamAddress,upstreamStableEndpoints))
+ RR->sw->requestWhois(tptr,now,*upstreamAddress);
+ }
+
+ // Refresh network config or broadcast network updates to members as needed
+ for(std::vector< std::pair< SharedPtr<Network>,bool > >::const_iterator n(networkConfigNeeded.begin());n!=networkConfigNeeded.end();++n) {
+ if (n->second)
+ n->first->requestConfiguration(tptr);
+ n->first->sendUpdatesToMembers(tptr);
+ }
+
// Update online status, post status change as event
const bool oldOnline = _online;
- _online = (((now - pfunc.lastReceiveFromUpstream) < ZT_PEER_ACTIVITY_TIMEOUT)||(RR->topology->amRoot()));
+ _online = (((now - lastReceivedFromUpstream) < ZT_PEER_ACTIVITY_TIMEOUT)||(RR->topology->amUpstream()));
if (oldOnline != _online)
postEvent(tptr,_online ? ZT_EVENT_ONLINE : ZT_EVENT_OFFLINE);
} catch ( ... ) {