From daaec84c6be11b57572ff57c97efd993385890fd Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 26 Mar 2014 15:35:15 -0700 Subject: Add TCP channel support for supernode list, make Peer pick the first path if all paths are equally dead. --- node/Defaults.cpp | 17 +++++++++-------- node/Defaults.hpp | 4 ++-- node/Peer.cpp | 28 ++++++++++++++++++---------- node/Switch.cpp | 4 ++-- node/Topology.cpp | 9 ++++----- node/Topology.hpp | 13 ++----------- node/UdpSocket.cpp | 9 +++++++++ 7 files changed, 46 insertions(+), 38 deletions(-) diff --git a/node/Defaults.cpp b/node/Defaults.cpp index 3e0727da..44d5aa05 100644 --- a/node/Defaults.cpp +++ b/node/Defaults.cpp @@ -43,11 +43,11 @@ namespace ZeroTier { const Defaults ZT_DEFAULTS; -static inline std::map< Identity,std::vector > _mkSupernodeMap() +static inline std::map< Identity,std::vector< std::pair > > _mkSupernodeMap() { - std::map< Identity,std::vector > sn; + std::map< Identity,std::vector< std::pair > > sn; Identity id; - std::vector addrs; + std::vector< std::pair > addrs; // Nothing special about a supernode... except that they are // designated as such and trusted to provide WHOIS lookup. @@ -56,35 +56,36 @@ static inline std::map< Identity,std::vector > _mkSupernodeMap() addrs.clear(); if (!id.fromString("8acf059fe3:0:482f6ee5dfe902319b419de5bdc765209c0ecda38c4d6e4fcf0d33658398b4527dcd22f93112fb9befd02fd78bf7261b333fc105d192a623ca9e50fc60b374a5")) throw std::runtime_error("invalid identity in Defaults"); - addrs.push_back(InetAddress("162.243.77.111",ZT_DEFAULT_PORT)); + addrs.push_back(std::pair(InetAddress("162.243.77.111",ZT_DEFAULT_PORT),false)); + addrs.push_back(std::pair(InetAddress("162.243.77.111",443),true)); sn[id] = addrs; // nyarlathotep.zerotier.com - San Francisco, California, USA addrs.clear(); if (!id.fromString("7e19876aba:0:2a6e2b2318930f60eb097f70d0f4b028b2cd6d3d0c63c014b9039ff35390e41181f216fb2e6fa8d95c1ee9667156411905c3dccfea78d8c6dfafba688170b3fa")) throw std::runtime_error("invalid identity in Defaults"); - addrs.push_back(InetAddress("198.199.97.220",ZT_DEFAULT_PORT)); + addrs.push_back(std::pair(InetAddress("198.199.97.220",ZT_DEFAULT_PORT),false)); sn[id] = addrs; // shub-niggurath.zerotier.com - Amsterdam, Netherlands addrs.clear(); if (!id.fromString("36f63d6574:0:67a776487a1a99b32f413329f2b67c43fbf6152e42c6b66e89043e69d93e48314c7d709b58a83016bd2612dd89400b856e18c553da94892f7d3ca16bf2c92c24")) throw std::runtime_error("invalid identity in Defaults"); - addrs.push_back(InetAddress("198.211.127.172",ZT_DEFAULT_PORT)); + addrs.push_back(std::pair(InetAddress("198.211.127.172",ZT_DEFAULT_PORT),false)); sn[id] = addrs; // mi-go.zerotier.com - Singapore addrs.clear(); if (!id.fromString("abbb7f4622:0:89d2c6b2062b10f4ce314dfcb914c082566247090a6f74c8ba1c15c63b205f540758f0abae85287397152c9d8cf463cfe51e7a480946cd6a31495b24ca13253c")) throw std::runtime_error("invalid identity in Defaults"); - addrs.push_back(InetAddress("128.199.254.204",ZT_DEFAULT_PORT)); + addrs.push_back(std::pair(InetAddress("128.199.254.204",ZT_DEFAULT_PORT),false)); sn[id] = addrs; // shoggoth.zerotier.com - Tokyo, Japan addrs.clear(); if (!id.fromString("48e8f875cb:0:5ca54f55e1094f65589f3e6d74158b6964d418ddac3570757128f1c6a2498322d92fcdcd47de459f4d1f9b38df2afd0c7b3fc247ba3d773c38ba35288f24988e")) throw std::runtime_error("invalid identity in Defaults"); - addrs.push_back(InetAddress("108.61.200.101",ZT_DEFAULT_PORT)); + addrs.push_back(std::pair(InetAddress("108.61.200.101",ZT_DEFAULT_PORT),false)); sn[id] = addrs; return sn; diff --git a/node/Defaults.hpp b/node/Defaults.hpp index 0c640df7..50c2dce5 100644 --- a/node/Defaults.hpp +++ b/node/Defaults.hpp @@ -64,9 +64,9 @@ public: const std::string defaultHomePath; /** - * Supernodes on the ZeroTier network + * Supernodes on the ZeroTier network (identity, address/tcp?) */ - const std::map< Identity,std::vector > supernodes; + const std::map< Identity,std::vector< std::pair > > supernodes; /** * Identities permitted to sign software updates diff --git a/node/Peer.cpp b/node/Peer.cpp index b9b9b0c7..dcc8d4ea 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -91,6 +91,7 @@ void Peer::receive( } // Announce multicast LIKEs to peers to whom we have a direct link + // Lock can't be locked here or it'll recurse and deadlock. if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) { _lastAnnouncedTo = now; _r->sw->announceMulticastGroups(SharedPtr(this)); @@ -107,12 +108,14 @@ bool Peer::send(const RuntimeEnvironment *_r,const void *data,unsigned int len,u { Mutex::Lock _l(_lock); - if (_paths.empty()) + std::vector::iterator p(_paths.begin()); + if (p == _paths.end()) { + TRACE("send to %s failed: no paths available",_id.address().toString().c_str()); return false; - - uint64_t bestPathLastReceived = 0; - std::vector::iterator bestPath; - for(std::vector::iterator p(_paths.begin());p!=_paths.end();++p) { + } + uint64_t bestPathLastReceived = p->lastReceived(); + std::vector::iterator bestPath = p; + while (++p != _paths.end()) { uint64_t lr = p->lastReceived(); if (lr >= bestPathLastReceived) { bestPathLastReceived = lr; @@ -120,6 +123,8 @@ bool Peer::send(const RuntimeEnvironment *_r,const void *data,unsigned int len,u } } + TRACE("send to %s: using path: %s",_id.address().toString().c_str(),bestPath->toString().c_str()); + if (_r->sm->send(bestPath->address(),bestPath->tcp(),data,len)) { bestPath->sent(now); return true; @@ -147,19 +152,22 @@ bool Peer::sendPing(const RuntimeEnvironment *_r,uint64_t now,bool firstSinceRes SharedPtr self(this); Mutex::Lock _l(_lock); - bool allPingsUnanswered; + bool pingTcp; if (!firstSinceReset) { - allPingsUnanswered = true; + // Do not use TCP if one of our UDP endpoints has answered recently. + pingTcp = true; for(std::vector::iterator p(_paths.begin());p!=_paths.end();++p) { if (!p->pingUnanswered(now)) { - allPingsUnanswered = false; + pingTcp = false; break; } } - } else allPingsUnanswered = false; + } else pingTcp = false; + + TRACE("PING %s (pingTcp==%d)",_id.address().toString().c_str(),(int)pingTcp); for(std::vector::iterator p(_paths.begin());p!=_paths.end();++p) { - if ((allPingsUnanswered)||(!p->tcp())) { + if ((pingTcp)||(!p->tcp())) { if (_r->sw->sendHELLO(self,p->address(),p->tcp())) { p->sent(now); p->pinged(now); diff --git a/node/Switch.cpp b/node/Switch.cpp index 2e021a5f..0e58c744 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -752,13 +752,13 @@ bool Switch::_trySend(const Packet &packet,bool encrypt) } } -#ifdef ZT_TRACE +/* #ifdef ZT_TRACE if (via != peer) { TRACE(">> %s to %s via %s (%d)",Packet::verbString(packet.verb()),peer->address().toString().c_str(),via->address().toString().c_str(),(int)packet.size()); } else { TRACE(">> %s to %s (%d)",Packet::verbString(packet.verb()),peer->address().toString().c_str(),(int)packet.size()); } -#endif +#endif */ return true; } diff --git a/node/Topology.cpp b/node/Topology.cpp index 2211d126..75784849 100644 --- a/node/Topology.cpp +++ b/node/Topology.cpp @@ -50,7 +50,7 @@ Topology::~Topology() _dumpPeers(); } -void Topology::setSupernodes(const std::map< Identity,std::vector > &sn) +void Topology::setSupernodes(const std::map< Identity,std::vector< std::pair > > &sn) { Mutex::Lock _l(_supernodes_m); @@ -59,14 +59,13 @@ void Topology::setSupernodes(const std::map< Identity,std::vector > _supernodePeers.clear(); uint64_t now = Utils::now(); - for(std::map< Identity,std::vector >::const_iterator i(sn.begin());i!=sn.end();++i) { + for(std::map< Identity,std::vector< std::pair > >::const_iterator i(sn.begin());i!=sn.end();++i) { if (i->first != _r->identity) { SharedPtr p(getPeer(i->first.address())); if (!p) p = addPeer(SharedPtr(new Peer(_r->identity,i->first))); - for(std::vector::const_iterator j(i->second.begin());j!=i->second.end();++j) { - p->addPath(Path(*j,false,true)); - } + for(std::vector< std::pair >::const_iterator j(i->second.begin());j!=i->second.end();++j) + p->addPath(Path(j->first,j->second,true)); p->use(now); _supernodePeers.push_back(p); } diff --git a/node/Topology.hpp b/node/Topology.hpp index 0b72197d..bd39d0ec 100644 --- a/node/Topology.hpp +++ b/node/Topology.hpp @@ -63,7 +63,7 @@ public: * * @param sn Supernodes for this network */ - void setSupernodes(const std::map< Identity,std::vector > &sn); + void setSupernodes(const std::map< Identity,std::vector< std::pair > > &sn); /** * Add a peer to database @@ -103,15 +103,6 @@ public: */ void saveIdentity(const Identity &id); - /** - * @return Current network supernodes - */ - inline std::map< Identity,std::vector > supernodes() const - { - Mutex::Lock _l(_supernodes_m); - return _supernodes; - } - /** * @return Vector of peers that are supernodes */ @@ -313,7 +304,7 @@ private: std::map< Address,SharedPtr > _activePeers; Mutex _activePeers_m; - std::map< Identity,std::vector > _supernodes; + std::map< Identity,std::vector< std::pair > > _supernodes; std::set< Address > _supernodeAddresses; std::vector< SharedPtr > _supernodePeers; Mutex _supernodes_m; diff --git a/node/UdpSocket.cpp b/node/UdpSocket.cpp index 8fce9588..115298e9 100644 --- a/node/UdpSocket.cpp +++ b/node/UdpSocket.cpp @@ -48,6 +48,9 @@ #include #endif +// Uncomment to intentionally break UDP in order to test TCP fallback +#define ZT_BREAK_UDP + namespace ZeroTier { UdpSocket::~UdpSocket() @@ -66,6 +69,9 @@ bool UdpSocket::send(const InetAddress &to,const void *msg,unsigned int msglen) bool UdpSocket::sendWithHopLimit(const InetAddress &to,const void *msg,unsigned int msglen,int hopLimit) { +#ifdef ZT_BREAK_UDP + return true; +#else if (hopLimit <= 0) hopLimit = 255; if (to.isV6()) { @@ -87,6 +93,7 @@ bool UdpSocket::sendWithHopLimit(const InetAddress &to,const void *msg,unsigned return ((int)sendto(_sock,msg,msglen,0,to.saddr(),to.saddrLen()) == (int)msglen); #endif } +#endif } bool UdpSocket::notifyAvailableForRead(const SharedPtr &self,SocketManager *sm) @@ -97,7 +104,9 @@ bool UdpSocket::notifyAvailableForRead(const SharedPtr &self,SocketManag int n = (int)recvfrom(_sock,(char *)(buf.data()),ZT_SOCKET_MAX_MESSAGE_LEN,0,from.saddr(),&salen); if (n > 0) { buf.setSize((unsigned int)n); +#ifndef ZT_BREAK_UDP sm->handleReceivedPacket(self,from,buf); +#endif } return true; } -- cgit v1.2.3 From 04169b51505bc41f70ad7b8797e8e4d5376bb9c7 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 26 Mar 2014 15:44:24 -0700 Subject: If I want it to pick the first, actually picking the first is helpful. --- node/Peer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/Peer.cpp b/node/Peer.cpp index dcc8d4ea..f8ff9ba6 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -117,7 +117,7 @@ bool Peer::send(const RuntimeEnvironment *_r,const void *data,unsigned int len,u std::vector::iterator bestPath = p; while (++p != _paths.end()) { uint64_t lr = p->lastReceived(); - if (lr >= bestPathLastReceived) { + if (lr > bestPathLastReceived) { bestPathLastReceived = lr; bestPath = p; } -- cgit v1.2.3 From e6b23059aca4947c8c4638c5d5e0abdba3b2b7b7 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 26 Mar 2014 16:44:58 -0700 Subject: Change the way TCP failover is invoked. --- node/Path.hpp | 13 ------------- node/Peer.cpp | 16 +++++++++++----- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/node/Path.hpp b/node/Path.hpp index 960b08b3..09ed1f9e 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -87,19 +87,6 @@ public: return ((_addr)&&((_fixed)||((now - _lastReceived) < ZT_PEER_PATH_ACTIVITY_TIMEOUT))); } - /** - * @return True if it appears that a ping has gone unanswered - */ - inline bool pingUnanswered(uint64_t now) const - throw() - { - uint64_t lp = _lastPing; - uint64_t lr = _lastReceived; - if (lp) - return ((lr < lp)&&((lp - lr) > ZT_PING_UNANSWERED_AFTER)); - return false; - } - /** * @return Human-readable address and other information about this path, some computed as of current time */ diff --git a/node/Peer.cpp b/node/Peer.cpp index f8ff9ba6..0cd82909 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -150,18 +150,24 @@ bool Peer::sendPing(const RuntimeEnvironment *_r,uint64_t now,bool firstSinceRes { bool sent = false; SharedPtr self(this); + Mutex::Lock _l(_lock); + // NOTE: this will never ping a peer that has *only* TCP paths. Right + // now there's never such a thing as TCP is only for failover. + bool pingTcp; if (!firstSinceReset) { // Do not use TCP if one of our UDP endpoints has answered recently. - pingTcp = true; + uint64_t lastPing = 0; + uint64_t lastDirectReceive = 0; + for(std::vector::iterator p(_paths.begin());p!=_paths.end();++p) { - if (!p->pingUnanswered(now)) { - pingTcp = false; - break; - } + lastPing = std::max(lastPing,p->lastPing()); + lastDirectReceive = std::max(lastDirectReceive,p->lastReceived()); } + + pingTcp = ( (lastDirectReceive < lastPing) && ((lastPing - lastDirectReceive) >= ZT_PING_UNANSWERED_AFTER) ); } else pingTcp = false; TRACE("PING %s (pingTcp==%d)",_id.address().toString().c_str(),(int)pingTcp); -- cgit v1.2.3 From 2ac56fd120c3c9dd924fccc169beed2d51658ba9 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 26 Mar 2014 17:59:45 -0700 Subject: Fix TCP connection accumulation problem, still having issues with TCP tunneling. --- node/InetAddress.cpp | 32 +++++++++++++++++--------------- node/Node.cpp | 5 +++-- node/Peer.cpp | 2 ++ node/RuntimeEnvironment.hpp | 5 +++++ node/SocketManager.cpp | 10 +++++----- node/TcpSocket.cpp | 30 ++++++++++-------------------- node/Utils.cpp | 4 ++-- 7 files changed, 44 insertions(+), 44 deletions(-) diff --git a/node/InetAddress.cpp b/node/InetAddress.cpp index 349646cb..0a7dd88c 100644 --- a/node/InetAddress.cpp +++ b/node/InetAddress.cpp @@ -141,27 +141,29 @@ bool InetAddress::operator==(const InetAddress &a) const return (!memcmp(_sa.sin6.sin6_addr.s6_addr,a._sa.sin6.sin6_addr.s6_addr,sizeof(_sa.sin6.sin6_addr.s6_addr))); } return false; - } else if (!_sa.saddr.sa_family) - return (!a._sa.saddr.sa_family); - return (!memcmp(&_sa,&a._sa,sizeof(_sa))); + } else return (memcmp(&_sa,&a._sa,sizeof(_sa)) == 0); } bool InetAddress::operator<(const InetAddress &a) const throw() { - if (_sa.saddr.sa_family == AF_INET) { - if (a._sa.saddr.sa_family == AF_INET) - return ((ntohl(_sa.sin.sin_addr.s_addr < ntohl(a._sa.sin.sin_addr.s_addr)))||((_sa.sin.sin_addr.s_addr == a._sa.sin.sin_addr.s_addr)&&(ntohs(_sa.sin.sin_port) < ntohs(a._sa.sin.sin_port)))); - else if (a._sa.saddr.sa_family == AF_INET6) - return true; - } else if (_sa.saddr.sa_family == AF_INET6) { - if (a._sa.saddr.sa_family == AF_INET6) { - int cmp = memcmp(_sa.sin6.sin6_addr.s6_addr,a._sa.sin6.sin6_addr.s6_addr,16); - return ((cmp < 0)||((!cmp)&&(ntohs(_sa.sin6.sin6_port) < ntohs(a._sa.sin6.sin6_port)))); - } else if (a._sa.saddr.sa_family == AF_INET) - return false; + if (_sa.saddr.sa_family < a._sa.saddr.sa_family) + return true; + else if (_sa.saddr.sa_family == a._sa.saddr.sa_family) { + if (_sa.saddr.sa_family == AF_INET) { + unsigned long x = ntohl(_sa.sin.sin_addr.s_addr); + unsigned long y = ntohl(a._sa.sin.sin_addr.s_addr); + if (x == y) + return (ntohs(_sa.sin.sin_port) < ntohs(a._sa.sin.sin_port)); + else return (x < y); + } else if (_sa.saddr.sa_family == AF_INET6) { + int cmp = (int)memcmp(_sa.sin6.sin6_addr.s6_addr,a._sa.sin6.sin6_addr.s6_addr,16); + if (cmp == 0) + return (ntohs(_sa.sin6.sin6_port) < ntohs(a._sa.sin6.sin6_port)); + else return (cmp < 0); + } else return (memcmp(&_sa,&a._sa,sizeof(_sa)) < 0); } - return (_sa.saddr.sa_family < a._sa.saddr.sa_family); + return false; } } // namespace ZeroTier diff --git a/node/Node.cpp b/node/Node.cpp index 20a049bc..95e4bb1d 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -577,8 +577,9 @@ Node::ReasonForTermination Node::run() } // Ping supernodes separately for two reasons: (1) supernodes only ping each - // other, and (2) we still want to ping them first on resynchronize. - if ((resynchronize)||((now - lastSupernodePing) >= ZT_PEER_DIRECT_PING_DELAY)) { + // other, and (2) we still want to ping them first on resynchronize. Also ping + // more aggressively if nothing seems to be happening at all. + if ((resynchronize)||((now - lastSupernodePing) >= ZT_PEER_DIRECT_PING_DELAY)||((now - _r->timeOfLastPacketReceived) >= ZT_PING_UNANSWERED_AFTER)) { lastSupernodePing = now; std::vector< SharedPtr > sns(_r->topology->supernodePeers()); TRACE("pinging %d supernodes",(int)sns.size()); diff --git a/node/Peer.cpp b/node/Peer.cpp index 0cd82909..fa8ab3e8 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -69,6 +69,8 @@ void Peer::receive( Packet::Verb inReVerb, uint64_t now) { + *((const_cast(&(_r->timeOfLastPacketReceived)))) = now; + if (!hops) { // direct packet { Mutex::Lock _l(_lock); diff --git a/node/RuntimeEnvironment.hpp b/node/RuntimeEnvironment.hpp index ffa6cbdb..29693c55 100644 --- a/node/RuntimeEnvironment.hpp +++ b/node/RuntimeEnvironment.hpp @@ -65,6 +65,7 @@ public: RuntimeEnvironment() : shutdownInProgress(false), timeOfLastNetworkEnvironmentChange(0), + timeOfLastPacketReceived(0), log((Logger *)0), prng((CMWC4096 *)0), mc((Multicaster *)0), @@ -92,6 +93,10 @@ public: // Time network environment (e.g. fingerprint) last changed -- used to determine online-ness volatile uint64_t timeOfLastNetworkEnvironmentChange; + // Time last packet was received -- from anywhere. This is updated in Peer::receive() + // via an ugly const_cast<>. + volatile uint64_t timeOfLastPacketReceived; + /* * Order matters a bit here. These are constructed in this order * and then deleted in the opposite order on Node exit. The order ensures diff --git a/node/SocketManager.cpp b/node/SocketManager.cpp index 2f6eb4fb..893e17d1 100644 --- a/node/SocketManager.cpp +++ b/node/SocketManager.cpp @@ -395,17 +395,17 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned if (!ts->send(to,msg,msglen)) return false; + { + Mutex::Lock _l(_tcpSockets_m); + _tcpSockets[to] = ts; + } + _fdSetLock.lock(); FD_SET(s,&_readfds); if (connecting) FD_SET(s,&_writefds); _fdSetLock.unlock(); - { - Mutex::Lock _l(_tcpSockets_m); - _tcpSockets[to] = ts; - } - return true; } else if (to.isV4()) { if (_udpV4Socket) diff --git a/node/TcpSocket.cpp b/node/TcpSocket.cpp index a422dec6..b56775d8 100644 --- a/node/TcpSocket.cpp +++ b/node/TcpSocket.cpp @@ -164,32 +164,22 @@ bool TcpSocket::notifyAvailableForWrite(const SharedPtr &self,SocketMana if (_outptr) { int n = (int)::send(_sock,(const char *)_outbuf,_outptr,0); - if (n < 0) { + if (n <= 0) { switch(errno) { -#ifdef EBADF - case EBADF: +#ifdef EAGAIN + case EAGAIN: #endif -#ifdef EINVAL - case EINVAL: +#if defined(EWOULDBLOCK) && ( !defined(EAGAIN) || (EWOULDBLOCK != EAGAIN) ) + case EWOULDBLOCK: #endif -#ifdef ENOTSOCK - case ENOTSOCK: +#ifdef EINTR + case EINTR: #endif -#ifdef ECONNRESET - case ECONNRESET: -#endif -#ifdef EPIPE - case EPIPE: -#endif -#ifdef ENETDOWN - case ENETDOWN: -#endif - return false; - default: break; + default: + return false; } - } else if (n > 0) - memmove(_outbuf,_outbuf + (unsigned int)n,_outptr -= (unsigned int)n); + } else memmove(_outbuf,_outbuf + (unsigned int)n,_outptr -= (unsigned int)n); } if (!_outptr) diff --git a/node/Utils.cpp b/node/Utils.cpp index fe153af2..95b66337 100644 --- a/node/Utils.cpp +++ b/node/Utils.cpp @@ -207,11 +207,11 @@ void Utils::getSecureRandom(void *buf,unsigned int bytes) { int fd = ::open("/dev/urandom",O_RDONLY); if (fd < 0) { - fprintf(stderr,"FATAL ERROR: unable to open /dev/urandom%s",ZT_EOL_S); + fprintf(stderr,"FATAL ERROR: unable to open /dev/urandom (%d)"ZT_EOL_S,errno); exit(-1); } if ((int)::read(fd,randbuf,sizeof(randbuf)) != (int)sizeof(randbuf)) { - fprintf(stderr,"FATAL ERROR: unable to read from /dev/urandom%s",ZT_EOL_S); + fprintf(stderr,"FATAL ERROR: unable to read from /dev/urandom"ZT_EOL_S); exit(-1); } ::close(fd); -- cgit v1.2.3 From c231510f8b813eb73750f266dd18e836bc02d9cc Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 27 Mar 2014 17:02:52 -0700 Subject: More TCP-related fixes and tweaks to ping timing, resynchronize, and startup. --- node/Node.cpp | 33 +++++++++++++------------ node/NodeConfig.cpp | 2 +- node/RuntimeEnvironment.hpp | 4 +-- node/SocketManager.cpp | 60 ++++++++++++++++++++++++++++----------------- node/TcpSocket.cpp | 4 +-- 5 files changed, 60 insertions(+), 43 deletions(-) diff --git a/node/Node.cpp b/node/Node.cpp index 95e4bb1d..839ae082 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -537,10 +537,10 @@ Node::ReasonForTermination Node::run() uint64_t lastClean = Utils::now(); // don't need to do this immediately uint64_t lastNetworkFingerprintCheck = 0; uint64_t lastMulticastCheck = 0; + long lastDelayDelta = 0; uint64_t networkConfigurationFingerprint = _r->sysEnv->getNetworkConfigurationFingerprint(_r->nc->networkTapDeviceNames()); - _r->timeOfLastNetworkEnvironmentChange = Utils::now(); - long lastDelayDelta = 0; + _r->timeOfLastResynchronize = Utils::now(); while (impl->reasonForTermination == NODE_RUNNING) { if (Utils::fileExists(shutdownIfUnreadablePath.c_str(),false)) { @@ -551,11 +551,13 @@ Node::ReasonForTermination Node::run() } uint64_t now = Utils::now(); + + // Did the user send SIGHUP or otherwise order network resync? (mostly for debugging) bool resynchronize = impl->resynchronize; + impl->resynchronize = false; if (resynchronize) { LOG("manual resynchronize ordered, resyncing with network"); } - impl->resynchronize = false; // If it looks like the computer slept and woke, resynchronize. if (lastDelayDelta >= ZT_SLEEP_WAKE_DETECTION_THRESHOLD) { @@ -571,15 +573,17 @@ Node::ReasonForTermination Node::run() if (fp != networkConfigurationFingerprint) { LOG("netconf fingerprint change: %.16llx != %.16llx, resyncing with network",networkConfigurationFingerprint,fp); networkConfigurationFingerprint = fp; - _r->timeOfLastNetworkEnvironmentChange = now; resynchronize = true; } } - // Ping supernodes separately for two reasons: (1) supernodes only ping each - // other, and (2) we still want to ping them first on resynchronize. Also ping - // more aggressively if nothing seems to be happening at all. - if ((resynchronize)||((now - lastSupernodePing) >= ZT_PEER_DIRECT_PING_DELAY)||((now - _r->timeOfLastPacketReceived) >= ZT_PING_UNANSWERED_AFTER)) { + if (resynchronize) + _r->timeOfLastResynchronize = now; + + /* Ping supernodes separately, and do so more aggressively if we haven't + * heard anything from anyone since our last resynchronize / startup. */ + if ( ((now - lastSupernodePing) >= ZT_PEER_DIRECT_PING_DELAY) || + ((_r->timeOfLastResynchronize > _r->timeOfLastPacketReceived) && ((now - lastSupernodePing) >= ZT_PING_UNANSWERED_AFTER)) ) { lastSupernodePing = now; std::vector< SharedPtr > sns(_r->topology->supernodePeers()); TRACE("pinging %d supernodes",(int)sns.size()); @@ -595,8 +599,8 @@ Node::ReasonForTermination Node::run() _r->topology->eachPeer(Topology::ResetActivePeers(_r,now)); _r->sm->closeTcpSockets(); } else { - // Periodically check for changes in our local multicast subscriptions - // and broadcast those changes to directly connected peers. + /* Periodically check for changes in our local multicast subscriptions + * and broadcast those changes to directly connected peers. */ if ((now - lastMulticastCheck) >= ZT_MULTICAST_LOCAL_POLL_PERIOD) { lastMulticastCheck = now; try { @@ -615,8 +619,8 @@ Node::ReasonForTermination Node::run() } } - // Periodically ping all our non-stale direct peers unless we're a supernode. - // Supernodes only ping each other (which is done above). + /* Periodically ping all our non-stale direct peers unless we're a supernode. + * Supernodes only ping each other (which is done above). */ if (!_r->topology->amSupernode()) { if ((now - lastPingCheck) >= ZT_PING_CHECK_DELAY) { lastPingCheck = now; @@ -632,7 +636,7 @@ Node::ReasonForTermination Node::run() } } - // Periodically or on resynchronize update network configurations. + // Update network configurations when needed. if ((resynchronize)||((now - lastNetworkAutoconfCheck) >= ZT_NETWORK_AUTOCONF_CHECK_DELAY)) { lastNetworkAutoconfCheck = now; std::vector< SharedPtr > nets(_r->nc->networks()); @@ -642,8 +646,7 @@ Node::ReasonForTermination Node::run() } } - // Do periodic cleanup, flushes of stuff to disk, software update - // checks, etc. + // Do periodic tasks in submodules. if ((now - lastClean) >= ZT_DB_CLEAN_PERIOD) { lastClean = now; _r->mc->clean(); diff --git a/node/NodeConfig.cpp b/node/NodeConfig.cpp index 30438fc6..e2b1d974 100644 --- a/node/NodeConfig.cpp +++ b/node/NodeConfig.cpp @@ -210,7 +210,7 @@ void NodeConfig::_doCommand(IpcConnection *ipcc,const char *commandLine) // network environment changed and also less than ZT_PEER_LINK_ACTIVITY_TIMEOUT ago. bool isOnline = false; uint64_t now = Utils::now(); - uint64_t since = _r->timeOfLastNetworkEnvironmentChange; + uint64_t since = _r->timeOfLastResynchronize; std::vector< SharedPtr > snp(_r->topology->supernodePeers()); for(std::vector< SharedPtr >::const_iterator sn(snp.begin());sn!=snp.end();++sn) { uint64_t lastRec = (*sn)->lastDirectReceive(); diff --git a/node/RuntimeEnvironment.hpp b/node/RuntimeEnvironment.hpp index 29693c55..228679a1 100644 --- a/node/RuntimeEnvironment.hpp +++ b/node/RuntimeEnvironment.hpp @@ -64,7 +64,7 @@ class RuntimeEnvironment public: RuntimeEnvironment() : shutdownInProgress(false), - timeOfLastNetworkEnvironmentChange(0), + timeOfLastResynchronize(0), timeOfLastPacketReceived(0), log((Logger *)0), prng((CMWC4096 *)0), @@ -91,7 +91,7 @@ public: volatile bool shutdownInProgress; // Time network environment (e.g. fingerprint) last changed -- used to determine online-ness - volatile uint64_t timeOfLastNetworkEnvironmentChange; + volatile uint64_t timeOfLastResynchronize; // Time last packet was received -- from anywhere. This is updated in Peer::receive() // via an ugly const_cast<>. diff --git a/node/SocketManager.cpp b/node/SocketManager.cpp index 893e17d1..f81faf47 100644 --- a/node/SocketManager.cpp +++ b/node/SocketManager.cpp @@ -42,8 +42,13 @@ #include #include #include +#include +#include #endif +// Uncomment to turn off TCP Nagle +//#define ZT_TCP_NODELAY + // Allow us to use the same value on Windows and *nix #ifndef INVALID_SOCKET #define INVALID_SOCKET (-1) @@ -58,8 +63,8 @@ namespace ZeroTier { #ifdef __WINDOWS__ -// hack from StackOverflow, behaves a bit like pipe() on *nix systems -static inline void __winpipe(SOCKET fds[2]) +// hack copied from StackOverflow, behaves a bit like pipe() on *nix systems +static inline void winPipeHack(SOCKET fds[2]) { struct sockaddr_in inaddr; struct sockaddr addr; @@ -98,10 +103,11 @@ SocketManager::SocketManager( FD_ZERO(&_readfds); FD_ZERO(&_writefds); + // Create a pipe or socket pair that can be used to interrupt select() #ifdef __WINDOWS__ { SOCKET tmps[2] = { INVALID_SOCKET,INVALID_SOCKET }; - __winpipe(tmps); + winPipeHack(tmps); _whackSendPipe = tmps[0]; _whackReceivePipe = tmps[1]; u_long iMode=1; @@ -129,15 +135,12 @@ SocketManager::SocketManager( _tcpV6ListenSocket = ::socket(AF_INET6,SOCK_STREAM,0); #ifdef __WINDOWS__ if (_tcpV6ListenSocket == INVALID_SOCKET) { - _closeSockets(); - throw std::runtime_error("unable to create IPv6 SOCK_STREAM socket"); - } #else if (_tcpV6ListenSocket <= 0) { +#endif _closeSockets(); throw std::runtime_error("unable to create IPv6 SOCK_STREAM socket"); } -#endif #ifdef __WINDOWS__ { @@ -178,15 +181,12 @@ SocketManager::SocketManager( _tcpV4ListenSocket = ::socket(AF_INET,SOCK_STREAM,0); #ifdef __WINDOWS__ if (_tcpV4ListenSocket == INVALID_SOCKET) { - _closeSockets(); - throw std::runtime_error("unable to create IPv4 SOCK_STREAM socket"); - } #else if (_tcpV4ListenSocket <= 0) { +#endif _closeSockets(); throw std::runtime_error("unable to create IPv4 SOCK_STREAM socket"); } -#endif #ifdef __WINDOWS__ { @@ -368,10 +368,10 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned ::closesocket(s); return false; } - { - u_long iMode=1; - ioctlsocket(s,FIONBIO,&iMode); - } + { u_long iMode=1; ioctlsocket(s,FIONBIO,&iMode); } +#ifdef ZT_TCP_NODELAY + { BOOL f = TRUE; setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #else int s = ::socket(to.isV4() ? AF_INET : AF_INET6,SOCK_STREAM,0); if (s <= 0) @@ -381,6 +381,9 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned return false; } fcntl(s,F_SETFL,O_NONBLOCK); +#ifdef ZT_TCP_NODELAY + { int f = 1; setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #endif bool connecting = false; @@ -405,6 +408,7 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned if (connecting) FD_SET(s,&_writefds); _fdSetLock.unlock(); + whack(); return true; } else if (to.isV4()) { @@ -453,11 +457,11 @@ void SocketManager::poll(unsigned long timeout) select(_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0); if (FD_ISSET(_whackReceivePipe,&rfds)) { - char tmp; + char tmp[16]; #ifdef __WINDOWS__ - ::recv(_whackReceivePipe,&tmp,1,0); + ::recv(_whackReceivePipe,&tmp,16,0); #else - ::read(_whackReceivePipe,&tmp,1); + ::read(_whackReceivePipe,&tmp,16); #endif } @@ -476,10 +480,15 @@ void SocketManager::poll(unsigned long timeout) try { _tcpSockets[fromia] = SharedPtr(new TcpSocket(this,sockfd,false,fromia)); #ifdef __WINDOWS__ - u_long iMode=1; - ioctlsocket(sockfd,FIONBIO,&iMode); + { u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); } +#ifdef ZT_TCP_NODELAY + { BOOL f = TRUE; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #else fcntl(sockfd,F_SETFL,O_NONBLOCK); +#ifdef ZT_TCP_NODELAY + { int f = 1; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #endif _fdSetLock.lock(); FD_SET(sockfd,&_readfds); @@ -509,10 +518,15 @@ void SocketManager::poll(unsigned long timeout) try { _tcpSockets[fromia] = SharedPtr(new TcpSocket(this,sockfd,false,fromia)); #ifdef __WINDOWS__ - u_long iMode=1; - ioctlsocket(sockfd,FIONBIO,&iMode); + { u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); } +#ifdef ZT_TCP_NODELAY + { BOOL f = TRUE; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #else fcntl(sockfd,F_SETFL,O_NONBLOCK); +#ifdef ZT_TCP_NODELAY + { int f = 1; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #endif _fdSetLock.lock(); FD_SET(sockfd,&_readfds); @@ -538,7 +552,7 @@ void SocketManager::poll(unsigned long timeout) bool closedSockets = false; { // grab copy of TCP sockets list because _tcpSockets[] might be changed in a handler Mutex::Lock _l2(_tcpSockets_m); - if (_tcpSockets.size()) { + if (!_tcpSockets.empty()) { ts.reserve(_tcpSockets.size()); uint64_t now = Utils::now(); for(std::map< InetAddress,SharedPtr >::iterator s(_tcpSockets.begin());s!=_tcpSockets.end();) { diff --git a/node/TcpSocket.cpp b/node/TcpSocket.cpp index b56775d8..5d475c93 100644 --- a/node/TcpSocket.cpp +++ b/node/TcpSocket.cpp @@ -73,7 +73,7 @@ bool TcpSocket::send(const InetAddress &to,const void *msg,unsigned int msglen) Mutex::Lock _l(_writeLock); - bool outputWasEnqueued = (_outptr != 0); + bool writeInProgress = ((_outptr != 0)||(_connecting)); // Ensure that _outbuf is large enough unsigned int newptr = _outptr + 5 + msglen; @@ -102,7 +102,7 @@ bool TcpSocket::send(const InetAddress &to,const void *msg,unsigned int msglen) for(unsigned int i=0;i Date: Thu, 27 Mar 2014 17:42:02 -0700 Subject: Finally fixed TCP sockets. --- node/SocketManager.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/node/SocketManager.cpp b/node/SocketManager.cpp index f81faf47..659f448c 100644 --- a/node/SocketManager.cpp +++ b/node/SocketManager.cpp @@ -395,8 +395,13 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned } ts = SharedPtr(new TcpSocket(this,s,connecting,to)); - if (!ts->send(to,msg,msglen)) + if (!ts->send(to,msg,msglen)) { + _fdSetLock.lock(); + FD_CLR(s,&_readfds); + FD_CLR(s,&_writefds); + _fdSetLock.unlock(); return false; + } { Mutex::Lock _l(_tcpSockets_m); @@ -408,6 +413,8 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned if (connecting) FD_SET(s,&_writefds); _fdSetLock.unlock(); + + _updateNfds(); whack(); return true; -- cgit v1.2.3