summaryrefslogtreecommitdiff
path: root/node/Peer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/Peer.cpp')
-rw-r--r--node/Peer.cpp203
1 files changed, 126 insertions, 77 deletions
diff --git a/node/Peer.cpp b/node/Peer.cpp
index fa7b3aa4..9d0d78e5 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -35,6 +35,7 @@
#include "AntiRecursion.hpp"
#include "SelfAwareness.hpp"
#include "Cluster.hpp"
+#include "Packet.hpp"
#include <algorithm>
@@ -54,7 +55,7 @@ Peer::Peer(const Identity &myIdentity,const Identity &peerIdentity)
_lastAnnouncedTo(0),
_lastPathConfirmationSent(0),
_lastDirectPathPushSent(0),
- _lastDirectPathPushReceived(0),
+ _lastDirectPathPushReceive(0),
_lastPathSort(0),
_vProto(0),
_vMajor(0),
@@ -63,6 +64,7 @@ Peer::Peer(const Identity &myIdentity,const Identity &peerIdentity)
_id(peerIdentity),
_numPaths(0),
_latency(0),
+ _directPathPushCutoffCount(0),
_networkComs(4),
_lastPushedComs(4)
{
@@ -80,85 +82,130 @@ void Peer::received(
uint64_t inRePacketId,
Packet::Verb inReVerb)
{
+#ifdef ZT_ENABLE_CLUSTER
+ InetAddress redirectTo;
+ if ((RR->cluster)&&(hops == 0)) {
+ // Note: findBetterEndpoint() is first since we still want to check
+ // for a better endpoint even if we don't actually send a redirect.
+ if ( (RR->cluster->findBetterEndpoint(redirectTo,_id.address(),remoteAddr,false)) && (verb != Packet::VERB_OK)&&(verb != Packet::VERB_ERROR)&&(verb != Packet::VERB_RENDEZVOUS)&&(verb != Packet::VERB_PUSH_DIRECT_PATHS) ) {
+ if (_vProto >= 5) {
+ // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS.
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
+ outp.append((uint16_t)1); // count == 1
+ outp.append((uint8_t)0); // no flags
+ outp.append((uint16_t)0); // no extensions
+ if (redirectTo.ss_family == AF_INET) {
+ outp.append((uint8_t)4);
+ outp.append((uint8_t)6);
+ outp.append(redirectTo.rawIpData(),4);
+ } else {
+ outp.append((uint8_t)6);
+ outp.append((uint8_t)18);
+ outp.append(redirectTo.rawIpData(),16);
+ }
+ outp.append((uint16_t)redirectTo.port());
+ outp.armor(_key,true);
+ RR->antiRec->logOutgoingZT(outp.data(),outp.size());
+ RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size());
+ } else {
+ // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere.
+ Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS);
+ outp.append((uint8_t)0); // no flags
+ RR->identity.address().appendTo(outp);
+ outp.append((uint16_t)redirectTo.port());
+ if (redirectTo.ss_family == AF_INET) {
+ outp.append((uint8_t)4);
+ outp.append(redirectTo.rawIpData(),4);
+ } else {
+ outp.append((uint8_t)16);
+ outp.append(redirectTo.rawIpData(),16);
+ }
+ outp.armor(_key,true);
+ RR->antiRec->logOutgoingZT(outp.data(),outp.size());
+ RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size());
+ }
+ }
+ }
+#endif
+
const uint64_t now = RR->node->now();
bool needMulticastGroupAnnounce = false;
bool pathIsConfirmed = false;
- {
+ { // begin _lock
Mutex::Lock _l(_lock);
_lastReceive = now;
+ if ((verb == Packet::VERB_FRAME)||(verb == Packet::VERB_EXT_FRAME))
+ _lastUnicastFrame = now;
+ else if (verb == Packet::VERB_MULTICAST_FRAME)
+ _lastMulticastFrame = now;
- if (!hops) {
- /* Learn new paths from direct (hops == 0) packets */
- {
- unsigned int np = _numPaths;
- for(unsigned int p=0;p<np;++p) {
- if ((_paths[p].address() == remoteAddr)&&(_paths[p].localAddress() == localAddr)) {
- _paths[p].received(now);
- pathIsConfirmed = true;
- break;
- }
+#ifdef ZT_ENABLE_CLUSTER
+ // If we think this peer belongs elsewhere, don't learn this path or
+ // do other connection init stuff.
+ if (redirectTo)
+ return;
+#endif
+
+ if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) {
+ _lastAnnouncedTo = now;
+ needMulticastGroupAnnounce = true;
+ }
+
+ if (hops == 0) {
+ unsigned int np = _numPaths;
+ for(unsigned int p=0;p<np;++p) {
+ if ((_paths[p].address() == remoteAddr)&&(_paths[p].localAddress() == localAddr)) {
+ _paths[p].received(now);
+ pathIsConfirmed = true;
+ break;
}
+ }
- if (!pathIsConfirmed) {
- if ((verb == Packet::VERB_OK)&&((inReVerb == Packet::VERB_HELLO)||(inReVerb == Packet::VERB_ECHO))) {
-
- // Learn paths if they've been confirmed via a HELLO or an ECHO
- RemotePath *slot = (RemotePath *)0;
- if (np < ZT_MAX_PEER_NETWORK_PATHS) {
- slot = &(_paths[np++]);
- } else {
- uint64_t slotLRmin = 0xffffffffffffffffULL;
- for(unsigned int p=0;p<ZT_MAX_PEER_NETWORK_PATHS;++p) {
- if (_paths[p].lastReceived() <= slotLRmin) {
- slotLRmin = _paths[p].lastReceived();
- slot = &(_paths[p]);
- }
- }
- }
- if (slot) {
- *slot = RemotePath(localAddr,remoteAddr);
- slot->received(now);
- _numPaths = np;
- pathIsConfirmed = true;
- _sortPaths(now);
- }
+ if (!pathIsConfirmed) {
+ if ((verb == Packet::VERB_OK)||(RR->topology->amRoot())) {
+ Path *slot = (Path *)0;
+ if (np < ZT_MAX_PEER_NETWORK_PATHS) {
+ slot = &(_paths[np++]);
} else {
-
- /* If this path is not known, send a HELLO. We don't learn
- * paths without confirming that a bidirectional link is in
- * fact present, but any packet that decodes and authenticates
- * correctly is considered valid. */
- if ((now - _lastPathConfirmationSent) >= ZT_MIN_PATH_CONFIRMATION_INTERVAL) {
- _lastPathConfirmationSent = now;
- TRACE("got %s via unknown path %s(%s), confirming...",Packet::verbString(verb),_id.address().toString().c_str(),remoteAddr.toString().c_str());
- attemptToContactAt(RR,localAddr,remoteAddr,now);
+ uint64_t slotLRmin = 0xffffffffffffffffULL;
+ for(unsigned int p=0;p<ZT_MAX_PEER_NETWORK_PATHS;++p) {
+ if (_paths[p].lastReceived() <= slotLRmin) {
+ slotLRmin = _paths[p].lastReceived();
+ slot = &(_paths[p]);
+ }
}
+ }
+ if (slot) {
+ *slot = Path(localAddr,remoteAddr);
+ slot->received(now);
+ _numPaths = np;
+ pathIsConfirmed = true;
+ _sortPaths(now);
+ }
+
+ } else {
+ /* If this path is not known, send a HELLO. We don't learn
+ * paths without confirming that a bidirectional link is in
+ * fact present, but any packet that decodes and authenticates
+ * correctly is considered valid. */
+ if ((now - _lastPathConfirmationSent) >= ZT_MIN_PATH_CONFIRMATION_INTERVAL) {
+ _lastPathConfirmationSent = now;
+ TRACE("got %s via unknown path %s(%s), confirming...",Packet::verbString(verb),_id.address().toString().c_str(),remoteAddr.toString().c_str());
+ attemptToContactAt(RR,localAddr,remoteAddr,now);
}
+
}
}
}
-
- if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) {
- _lastAnnouncedTo = now;
- needMulticastGroupAnnounce = true;
- }
-
- if ((verb == Packet::VERB_FRAME)||(verb == Packet::VERB_EXT_FRAME))
- _lastUnicastFrame = now;
- else if (verb == Packet::VERB_MULTICAST_FRAME)
- _lastMulticastFrame = now;
- }
+ } // end _lock
#ifdef ZT_ENABLE_CLUSTER
- if ((pathIsConfirmed)&&(RR->cluster)) {
- // Either shuttle this peer off somewhere else or report to other members that we have it
- if (!RR->cluster->redirectPeer(_id.address(),remoteAddr,false))
- RR->cluster->replicateHavePeer(_id);
- }
+ if ((RR->cluster)&&(pathIsConfirmed))
+ RR->cluster->replicateHavePeer(_id,remoteAddr);
#endif
if (needMulticastGroupAnnounce) {
@@ -190,7 +237,7 @@ void Peer::attemptToContactAt(const RuntimeEnvironment *RR,const InetAddress &lo
bool Peer::doPingAndKeepalive(const RuntimeEnvironment *RR,uint64_t now,int inetAddressFamily)
{
- RemotePath *p = (RemotePath *)0;
+ Path *p = (Path *)0;
Mutex::Lock _l(_lock);
if (inetAddressFamily != 0) {
@@ -201,16 +248,16 @@ bool Peer::doPingAndKeepalive(const RuntimeEnvironment *RR,uint64_t now,int inet
if (p) {
if ((now - p->lastReceived()) >= ZT_PEER_DIRECT_PING_DELAY) {
- TRACE("PING %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived());
+ //TRACE("PING %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived());
attemptToContactAt(RR,p->localAddress(),p->address(),now);
p->sent(now);
} else if (((now - p->lastSend()) >= ZT_NAT_KEEPALIVE_DELAY)&&(!p->reliable())) {
- TRACE("NAT keepalive %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived());
+ //TRACE("NAT keepalive %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived());
_natKeepaliveBuf += (uint32_t)((now * 0x9e3779b1) >> 1); // tumble this around to send constantly varying (meaningless) payloads
RR->node->putPacket(p->localAddress(),p->address(),&_natKeepaliveBuf,sizeof(_natKeepaliveBuf));
p->sent(now);
} else {
- TRACE("no PING or NAT keepalive: addr==%s reliable==%d %llums/%llums send/receive inactivity",p->address().toString().c_str(),(int)p->reliable(),now - p->lastSend(),now - p->lastReceived());
+ //TRACE("no PING or NAT keepalive: addr==%s reliable==%d %llums/%llums send/receive inactivity",p->address().toString().c_str(),(int)p->reliable(),now - p->lastSend(),now - p->lastReceived());
}
return true;
}
@@ -218,7 +265,7 @@ bool Peer::doPingAndKeepalive(const RuntimeEnvironment *RR,uint64_t now,int inet
return false;
}
-void Peer::pushDirectPaths(const RuntimeEnvironment *RR,RemotePath *path,uint64_t now,bool force)
+void Peer::pushDirectPaths(const RuntimeEnvironment *RR,Path *path,uint64_t now,bool force)
{
#ifdef ZT_ENABLE_CLUSTER
// Cluster mode disables normal PUSH_DIRECT_PATHS in favor of cluster-based peer redirection
@@ -231,23 +278,23 @@ void Peer::pushDirectPaths(const RuntimeEnvironment *RR,RemotePath *path,uint64_
if (((now - _lastDirectPathPushSent) >= ZT_DIRECT_PATH_PUSH_INTERVAL)||(force)) {
_lastDirectPathPushSent = now;
- std::vector<Path> dps(RR->node->directPaths());
+ std::vector<InetAddress> dps(RR->node->directPaths());
if (dps.empty())
return;
#ifdef ZT_TRACE
{
std::string ps;
- for(std::vector<Path>::const_iterator p(dps.begin());p!=dps.end();++p) {
+ for(std::vector<InetAddress>::const_iterator p(dps.begin());p!=dps.end();++p) {
if (ps.length() > 0)
ps.push_back(',');
- ps.append(p->address().toString());
+ ps.append(p->toString());
}
TRACE("pushing %u direct paths to %s: %s",(unsigned int)dps.size(),_id.address().toString().c_str(),ps.c_str());
}
#endif
- std::vector<Path>::const_iterator p(dps.begin());
+ std::vector<InetAddress>::const_iterator p(dps.begin());
while (p != dps.end()) {
Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
outp.addSize(2); // leave room for count
@@ -255,7 +302,7 @@ void Peer::pushDirectPaths(const RuntimeEnvironment *RR,RemotePath *path,uint64_
unsigned int count = 0;
while ((p != dps.end())&&((outp.size() + 24) < ZT_PROTO_MAX_PACKET_LENGTH)) {
uint8_t addressType = 4;
- switch(p->address().ss_family) {
+ switch(p->ss_family) {
case AF_INET:
break;
case AF_INET6:
@@ -267,6 +314,7 @@ void Peer::pushDirectPaths(const RuntimeEnvironment *RR,RemotePath *path,uint64_
}
uint8_t flags = 0;
+ /* TODO: path trust is not implemented yet
switch(p->trust()) {
default:
break;
@@ -277,13 +325,14 @@ void Peer::pushDirectPaths(const RuntimeEnvironment *RR,RemotePath *path,uint64_
flags |= (0x04 | 0x08); // no encryption, no authentication (redundant but go ahead and set both)
break;
}
+ */
outp.append(flags);
outp.append((uint16_t)0); // no extensions
outp.append(addressType);
outp.append((uint8_t)((addressType == 4) ? 6 : 18));
- outp.append(p->address().rawIpData(),((addressType == 4) ? 4 : 16));
- outp.append((uint16_t)p->address().port());
+ outp.append(p->rawIpData(),((addressType == 4) ? 4 : 16));
+ outp.append((uint16_t)p->port());
++count;
++p;
@@ -456,7 +505,7 @@ struct _SortPathsByQuality
{
uint64_t _now;
_SortPathsByQuality(const uint64_t now) : _now(now) {}
- inline bool operator()(const RemotePath &a,const RemotePath &b) const
+ inline bool operator()(const Path &a,const Path &b) const
{
const uint64_t qa = (
((uint64_t)a.active(_now) << 63) |
@@ -476,7 +525,7 @@ void Peer::_sortPaths(const uint64_t now)
std::sort(&(_paths[0]),&(_paths[_numPaths]),_SortPathsByQuality(now));
}
-RemotePath *Peer::_getBestPath(const uint64_t now)
+Path *Peer::_getBestPath(const uint64_t now)
{
// assumes _lock is locked
if ((now - _lastPathSort) >= ZT_PEER_PATH_SORT_INTERVAL)
@@ -488,10 +537,10 @@ RemotePath *Peer::_getBestPath(const uint64_t now)
if (_paths[0].active(now))
return &(_paths[0]);
}
- return (RemotePath *)0;
+ return (Path *)0;
}
-RemotePath *Peer::_getBestPath(const uint64_t now,int inetAddressFamily)
+Path *Peer::_getBestPath(const uint64_t now,int inetAddressFamily)
{
// assumes _lock is locked
if ((now - _lastPathSort) >= ZT_PEER_PATH_SORT_INTERVAL)
@@ -503,7 +552,7 @@ RemotePath *Peer::_getBestPath(const uint64_t now,int inetAddressFamily)
}
_sortPaths(now);
}
- return (RemotePath *)0;
+ return (Path *)0;
}
} // namespace ZeroTier