diff options
Diffstat (limited to 'node')
-rw-r--r-- | node/Node.cpp | 33 | ||||
-rw-r--r-- | node/NodeConfig.cpp | 2 | ||||
-rw-r--r-- | node/RuntimeEnvironment.hpp | 4 | ||||
-rw-r--r-- | node/SocketManager.cpp | 60 | ||||
-rw-r--r-- | node/TcpSocket.cpp | 4 |
5 files changed, 60 insertions, 43 deletions
diff --git a/node/Node.cpp b/node/Node.cpp index 95e4bb1d..839ae082 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -537,10 +537,10 @@ Node::ReasonForTermination Node::run() uint64_t lastClean = Utils::now(); // don't need to do this immediately uint64_t lastNetworkFingerprintCheck = 0; uint64_t lastMulticastCheck = 0; + long lastDelayDelta = 0; uint64_t networkConfigurationFingerprint = _r->sysEnv->getNetworkConfigurationFingerprint(_r->nc->networkTapDeviceNames()); - _r->timeOfLastNetworkEnvironmentChange = Utils::now(); - long lastDelayDelta = 0; + _r->timeOfLastResynchronize = Utils::now(); while (impl->reasonForTermination == NODE_RUNNING) { if (Utils::fileExists(shutdownIfUnreadablePath.c_str(),false)) { @@ -551,11 +551,13 @@ Node::ReasonForTermination Node::run() } uint64_t now = Utils::now(); + + // Did the user send SIGHUP or otherwise order network resync? (mostly for debugging) bool resynchronize = impl->resynchronize; + impl->resynchronize = false; if (resynchronize) { LOG("manual resynchronize ordered, resyncing with network"); } - impl->resynchronize = false; // If it looks like the computer slept and woke, resynchronize. if (lastDelayDelta >= ZT_SLEEP_WAKE_DETECTION_THRESHOLD) { @@ -571,15 +573,17 @@ Node::ReasonForTermination Node::run() if (fp != networkConfigurationFingerprint) { LOG("netconf fingerprint change: %.16llx != %.16llx, resyncing with network",networkConfigurationFingerprint,fp); networkConfigurationFingerprint = fp; - _r->timeOfLastNetworkEnvironmentChange = now; resynchronize = true; } } - // Ping supernodes separately for two reasons: (1) supernodes only ping each - // other, and (2) we still want to ping them first on resynchronize. Also ping - // more aggressively if nothing seems to be happening at all. - if ((resynchronize)||((now - lastSupernodePing) >= ZT_PEER_DIRECT_PING_DELAY)||((now - _r->timeOfLastPacketReceived) >= ZT_PING_UNANSWERED_AFTER)) { + if (resynchronize) + _r->timeOfLastResynchronize = now; + + /* Ping supernodes separately, and do so more aggressively if we haven't + * heard anything from anyone since our last resynchronize / startup. */ + if ( ((now - lastSupernodePing) >= ZT_PEER_DIRECT_PING_DELAY) || + ((_r->timeOfLastResynchronize > _r->timeOfLastPacketReceived) && ((now - lastSupernodePing) >= ZT_PING_UNANSWERED_AFTER)) ) { lastSupernodePing = now; std::vector< SharedPtr<Peer> > sns(_r->topology->supernodePeers()); TRACE("pinging %d supernodes",(int)sns.size()); @@ -595,8 +599,8 @@ Node::ReasonForTermination Node::run() _r->topology->eachPeer(Topology::ResetActivePeers(_r,now)); _r->sm->closeTcpSockets(); } else { - // Periodically check for changes in our local multicast subscriptions - // and broadcast those changes to directly connected peers. + /* Periodically check for changes in our local multicast subscriptions + * and broadcast those changes to directly connected peers. */ if ((now - lastMulticastCheck) >= ZT_MULTICAST_LOCAL_POLL_PERIOD) { lastMulticastCheck = now; try { @@ -615,8 +619,8 @@ Node::ReasonForTermination Node::run() } } - // Periodically ping all our non-stale direct peers unless we're a supernode. - // Supernodes only ping each other (which is done above). + /* Periodically ping all our non-stale direct peers unless we're a supernode. + * Supernodes only ping each other (which is done above). */ if (!_r->topology->amSupernode()) { if ((now - lastPingCheck) >= ZT_PING_CHECK_DELAY) { lastPingCheck = now; @@ -632,7 +636,7 @@ Node::ReasonForTermination Node::run() } } - // Periodically or on resynchronize update network configurations. + // Update network configurations when needed. if ((resynchronize)||((now - lastNetworkAutoconfCheck) >= ZT_NETWORK_AUTOCONF_CHECK_DELAY)) { lastNetworkAutoconfCheck = now; std::vector< SharedPtr<Network> > nets(_r->nc->networks()); @@ -642,8 +646,7 @@ Node::ReasonForTermination Node::run() } } - // Do periodic cleanup, flushes of stuff to disk, software update - // checks, etc. + // Do periodic tasks in submodules. if ((now - lastClean) >= ZT_DB_CLEAN_PERIOD) { lastClean = now; _r->mc->clean(); diff --git a/node/NodeConfig.cpp b/node/NodeConfig.cpp index 30438fc6..e2b1d974 100644 --- a/node/NodeConfig.cpp +++ b/node/NodeConfig.cpp @@ -210,7 +210,7 @@ void NodeConfig::_doCommand(IpcConnection *ipcc,const char *commandLine) // network environment changed and also less than ZT_PEER_LINK_ACTIVITY_TIMEOUT ago. bool isOnline = false; uint64_t now = Utils::now(); - uint64_t since = _r->timeOfLastNetworkEnvironmentChange; + uint64_t since = _r->timeOfLastResynchronize; std::vector< SharedPtr<Peer> > snp(_r->topology->supernodePeers()); for(std::vector< SharedPtr<Peer> >::const_iterator sn(snp.begin());sn!=snp.end();++sn) { uint64_t lastRec = (*sn)->lastDirectReceive(); diff --git a/node/RuntimeEnvironment.hpp b/node/RuntimeEnvironment.hpp index 29693c55..228679a1 100644 --- a/node/RuntimeEnvironment.hpp +++ b/node/RuntimeEnvironment.hpp @@ -64,7 +64,7 @@ class RuntimeEnvironment public: RuntimeEnvironment() : shutdownInProgress(false), - timeOfLastNetworkEnvironmentChange(0), + timeOfLastResynchronize(0), timeOfLastPacketReceived(0), log((Logger *)0), prng((CMWC4096 *)0), @@ -91,7 +91,7 @@ public: volatile bool shutdownInProgress; // Time network environment (e.g. fingerprint) last changed -- used to determine online-ness - volatile uint64_t timeOfLastNetworkEnvironmentChange; + volatile uint64_t timeOfLastResynchronize; // Time last packet was received -- from anywhere. This is updated in Peer::receive() // via an ugly const_cast<>. diff --git a/node/SocketManager.cpp b/node/SocketManager.cpp index 893e17d1..f81faf47 100644 --- a/node/SocketManager.cpp +++ b/node/SocketManager.cpp @@ -42,8 +42,13 @@ #include <sys/socket.h> #include <arpa/inet.h> #include <signal.h> +#include <netinet/in.h> +#include <netinet/tcp.h> #endif +// Uncomment to turn off TCP Nagle +//#define ZT_TCP_NODELAY + // Allow us to use the same value on Windows and *nix #ifndef INVALID_SOCKET #define INVALID_SOCKET (-1) @@ -58,8 +63,8 @@ namespace ZeroTier { #ifdef __WINDOWS__ -// hack from StackOverflow, behaves a bit like pipe() on *nix systems -static inline void __winpipe(SOCKET fds[2]) +// hack copied from StackOverflow, behaves a bit like pipe() on *nix systems +static inline void winPipeHack(SOCKET fds[2]) { struct sockaddr_in inaddr; struct sockaddr addr; @@ -98,10 +103,11 @@ SocketManager::SocketManager( FD_ZERO(&_readfds); FD_ZERO(&_writefds); + // Create a pipe or socket pair that can be used to interrupt select() #ifdef __WINDOWS__ { SOCKET tmps[2] = { INVALID_SOCKET,INVALID_SOCKET }; - __winpipe(tmps); + winPipeHack(tmps); _whackSendPipe = tmps[0]; _whackReceivePipe = tmps[1]; u_long iMode=1; @@ -129,15 +135,12 @@ SocketManager::SocketManager( _tcpV6ListenSocket = ::socket(AF_INET6,SOCK_STREAM,0); #ifdef __WINDOWS__ if (_tcpV6ListenSocket == INVALID_SOCKET) { - _closeSockets(); - throw std::runtime_error("unable to create IPv6 SOCK_STREAM socket"); - } #else if (_tcpV6ListenSocket <= 0) { +#endif _closeSockets(); throw std::runtime_error("unable to create IPv6 SOCK_STREAM socket"); } -#endif #ifdef __WINDOWS__ { @@ -178,15 +181,12 @@ SocketManager::SocketManager( _tcpV4ListenSocket = ::socket(AF_INET,SOCK_STREAM,0); #ifdef __WINDOWS__ if (_tcpV4ListenSocket == INVALID_SOCKET) { - _closeSockets(); - throw std::runtime_error("unable to create IPv4 SOCK_STREAM socket"); - } #else if (_tcpV4ListenSocket <= 0) { +#endif _closeSockets(); throw std::runtime_error("unable to create IPv4 SOCK_STREAM socket"); } -#endif #ifdef __WINDOWS__ { @@ -368,10 +368,10 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned ::closesocket(s); return false; } - { - u_long iMode=1; - ioctlsocket(s,FIONBIO,&iMode); - } + { u_long iMode=1; ioctlsocket(s,FIONBIO,&iMode); } +#ifdef ZT_TCP_NODELAY + { BOOL f = TRUE; setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #else int s = ::socket(to.isV4() ? AF_INET : AF_INET6,SOCK_STREAM,0); if (s <= 0) @@ -381,6 +381,9 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned return false; } fcntl(s,F_SETFL,O_NONBLOCK); +#ifdef ZT_TCP_NODELAY + { int f = 1; setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #endif bool connecting = false; @@ -405,6 +408,7 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned if (connecting) FD_SET(s,&_writefds); _fdSetLock.unlock(); + whack(); return true; } else if (to.isV4()) { @@ -453,11 +457,11 @@ void SocketManager::poll(unsigned long timeout) select(_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0); if (FD_ISSET(_whackReceivePipe,&rfds)) { - char tmp; + char tmp[16]; #ifdef __WINDOWS__ - ::recv(_whackReceivePipe,&tmp,1,0); + ::recv(_whackReceivePipe,&tmp,16,0); #else - ::read(_whackReceivePipe,&tmp,1); + ::read(_whackReceivePipe,&tmp,16); #endif } @@ -476,10 +480,15 @@ void SocketManager::poll(unsigned long timeout) try { _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia)); #ifdef __WINDOWS__ - u_long iMode=1; - ioctlsocket(sockfd,FIONBIO,&iMode); + { u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); } +#ifdef ZT_TCP_NODELAY + { BOOL f = TRUE; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #else fcntl(sockfd,F_SETFL,O_NONBLOCK); +#ifdef ZT_TCP_NODELAY + { int f = 1; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #endif _fdSetLock.lock(); FD_SET(sockfd,&_readfds); @@ -509,10 +518,15 @@ void SocketManager::poll(unsigned long timeout) try { _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia)); #ifdef __WINDOWS__ - u_long iMode=1; - ioctlsocket(sockfd,FIONBIO,&iMode); + { u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); } +#ifdef ZT_TCP_NODELAY + { BOOL f = TRUE; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #else fcntl(sockfd,F_SETFL,O_NONBLOCK); +#ifdef ZT_TCP_NODELAY + { int f = 1; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } +#endif #endif _fdSetLock.lock(); FD_SET(sockfd,&_readfds); @@ -538,7 +552,7 @@ void SocketManager::poll(unsigned long timeout) bool closedSockets = false; { // grab copy of TCP sockets list because _tcpSockets[] might be changed in a handler Mutex::Lock _l2(_tcpSockets_m); - if (_tcpSockets.size()) { + if (!_tcpSockets.empty()) { ts.reserve(_tcpSockets.size()); uint64_t now = Utils::now(); for(std::map< InetAddress,SharedPtr<Socket> >::iterator s(_tcpSockets.begin());s!=_tcpSockets.end();) { diff --git a/node/TcpSocket.cpp b/node/TcpSocket.cpp index b56775d8..5d475c93 100644 --- a/node/TcpSocket.cpp +++ b/node/TcpSocket.cpp @@ -73,7 +73,7 @@ bool TcpSocket::send(const InetAddress &to,const void *msg,unsigned int msglen) Mutex::Lock _l(_writeLock); - bool outputWasEnqueued = (_outptr != 0); + bool writeInProgress = ((_outptr != 0)||(_connecting)); // Ensure that _outbuf is large enough unsigned int newptr = _outptr + 5 + msglen; @@ -102,7 +102,7 @@ bool TcpSocket::send(const InetAddress &to,const void *msg,unsigned int msglen) for(unsigned int i=0;i<msglen;++i) _outbuf[_outptr++] = ((const unsigned char *)msg)[i]; - if (!outputWasEnqueued) { + if (!writeInProgress) { // If no output was enqueued before this, try to send() it and then // start a queued write if any remains after that. |