diff options
Diffstat (limited to 'service')
-rw-r--r-- | service/ControlPlane.cpp | 16 | ||||
-rw-r--r-- | service/OneService.cpp | 128 | ||||
-rw-r--r-- | service/OneService.hpp | 5 | ||||
-rw-r--r-- | service/README.md | 1 |
4 files changed, 101 insertions, 49 deletions
diff --git a/service/ControlPlane.cpp b/service/ControlPlane.cpp index 2e8290ed..71b3fd3f 100644 --- a/service/ControlPlane.cpp +++ b/service/ControlPlane.cpp @@ -360,18 +360,20 @@ unsigned int ControlPlane::handleRequest( _node->status(&status); Utils::snprintf(json,sizeof(json), "{\n" - "\t\"address\":\"%.10llx\",\n" - "\t\"publicIdentity\":\"%s\",\n" - "\t\"online\":%s,\n" - "\t\"versionMajor\":%d,\n" - "\t\"versionMinor\":%d,\n" - "\t\"versionRev\":%d,\n" - "\t\"version\":\"%d.%d.%d\",\n" + "\t\"address\": \"%.10llx\",\n" + "\t\"publicIdentity\": \"%s\",\n" + "\t\"online\": %s,\n" + "\t\"tcpFallbackActive\": %s,\n" + "\t\"versionMajor\": %d,\n" + "\t\"versionMinor\": %d,\n" + "\t\"versionRev\": %d,\n" + "\t\"version\": \"%d.%d.%d\",\n" "\t\"clock\": %llu\n" "}\n", status.address, status.publicIdentity, (status.online) ? "true" : "false", + (_svc->tcpFallbackActive()) ? "true" : "false", ZEROTIER_ONE_VERSION_MAJOR, ZEROTIER_ONE_VERSION_MINOR, ZEROTIER_ONE_VERSION_REVISION, diff --git a/service/OneService.cpp b/service/OneService.cpp index c0b47af1..797825a7 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -58,6 +58,16 @@ #include "OneService.hpp" #include "ControlPlane.hpp" +/** + * Uncomment to enable UDP breakage switch + * + * If this is defined, the presence of a file called /tmp/ZT_BREAK_UDP + * will cause direct UDP TX/RX to stop working. This can be used to + * test TCP tunneling fallback and other robustness features. Deleting + * this file will cause it to start working again. + */ +//#define ZT_BREAK_UDP + #ifdef ZT_ENABLE_NETWORK_CONTROLLER #include "../controller/SqliteNetworkController.hpp" #else @@ -369,6 +379,7 @@ struct TcpConnection std::string body; std::string writeBuf; + Mutex writeBuf_m; }; class OneServiceImpl : public OneService @@ -380,12 +391,13 @@ public: #ifdef ZT_ENABLE_NETWORK_CONTROLLER _controller((_homePath + ZT_PATH_SEPARATOR_S + ZT1_CONTROLLER_DB_PATH).c_str()), #endif - _phy(this,true), + _phy(this,false), _overrideRootTopology((overrideRootTopology) ? overrideRootTopology : ""), _node((Node *)0), _controlPlane((ControlPlane *)0), _lastDirectReceiveFromGlobal(0), _lastSendToGlobal(0), + _lastRestart(0), _nextBackgroundTaskDeadline(0), _tcpFallbackTunnel((TcpConnection *)0), _termReason(ONE_STILL_RUNNING), @@ -481,6 +493,8 @@ public: } _nextBackgroundTaskDeadline = 0; + uint64_t clockShouldBe = OSUtils::now(); + _lastRestart = clockShouldBe; uint64_t lastTapMulticastGroupCheck = 0; uint64_t lastTcpFallbackResolve = 0; #ifdef ZT_AUTO_UPDATE @@ -496,13 +510,18 @@ public: break; } else _run_m.unlock(); - uint64_t dl = _nextBackgroundTaskDeadline; uint64_t now = OSUtils::now(); + + uint64_t dl = _nextBackgroundTaskDeadline; if (dl <= now) { _node->processBackgroundTasks(now,&_nextBackgroundTaskDeadline); dl = _nextBackgroundTaskDeadline; } + // Attempt to detect sleep/wake events by detecting delay overruns + if ((now > clockShouldBe)&&((now - clockShouldBe) > 2000)) + _lastRestart = now; + #ifdef ZT_AUTO_UPDATE if ((now - lastSoftwareUpdateCheck) >= ZT_AUTO_UPDATE_CHECK_PERIOD) { lastSoftwareUpdateCheck = OSUtils::now(); @@ -532,6 +551,7 @@ public: } const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 100; + clockShouldBe = now + (uint64_t)delay; _phy.poll(delay); } } catch (std::exception &exc) { @@ -585,6 +605,11 @@ public: return std::string(); } + virtual bool tcpFallbackActive() const + { + return (_tcpFallbackTunnel != (TcpConnection *)0); + } + virtual void terminate() { _run_m.lock(); @@ -597,7 +622,11 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { - if (reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL) +#ifdef ZT_BREAK_UDP + if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) + return; +#endif + if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); ZT1_ResultCode rc = _node->processWirePacket( OSUtils::now(), @@ -624,7 +653,7 @@ public: TcpConnection *tc = &(_tcpConnections[sock]); tc->type = TcpConnection::TCP_TUNNEL_OUTGOING; - tc->shouldKeepAlive = true; // unused + tc->shouldKeepAlive = true; tc->parent = this; tc->sock = sock; // from and parser are not used @@ -634,8 +663,6 @@ public: tc->writeBuf = ""; *uptr = (void *)tc; - _tcpFallbackTunnel = tc; - // Send "hello" message tc->writeBuf.push_back((char)0x17); tc->writeBuf.push_back((char)0x03); @@ -647,6 +674,8 @@ public: tc->writeBuf.push_back((char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff)); tc->writeBuf.push_back((char)(ZEROTIER_ONE_VERSION_REVISION & 0xff)); _phy.tcpSetNotifyWritable(sock,true); + + _tcpFallbackTunnel = tc; } inline void phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) @@ -699,11 +728,7 @@ public: case TcpConnection::TCP_TUNNEL_OUTGOING: tc->body.append((const char *)data,len); - if (tc->body.length() > 65535) { - // sanity limit -- a message will never be this big since mlen is 16-bit - _phy.close(sock); - return; - } else if (tc->body.length() >= 5) { + while (tc->body.length() >= 5) { const char *data = tc->body.data(); const unsigned long mlen = ( ((((unsigned long)data[3]) & 0xff) << 8) | (((unsigned long)data[4]) & 0xff) ); if (tc->body.length() >= (mlen + 5)) { @@ -768,7 +793,7 @@ public: if (tc->body.length() > (mlen + 5)) tc->body = tc->body.substr(mlen + 5); else tc->body = ""; - } + } else break; } break; @@ -778,18 +803,23 @@ public: inline void phyOnTcpWritable(PhySocket *sock,void **uptr) { TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr); - if (tc->writeBuf.length()) { + Mutex::Lock _l(tc->writeBuf_m); + if (tc->writeBuf.length() > 0) { long sent = (long)_phy.tcpSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true); if (sent > 0) { tc->lastActivity = OSUtils::now(); - if ((unsigned long)sent == (unsigned long)tc->writeBuf.length()) { + if ((unsigned long)sent >= (unsigned long)tc->writeBuf.length()) { tc->writeBuf = ""; _phy.tcpSetNotifyWritable(sock,false); if (!tc->shouldKeepAlive) _phy.close(sock); // will call close handler to delete from _tcpConnections - } else tc->writeBuf = tc->writeBuf.substr(sent); + } else { + tc->writeBuf = tc->writeBuf.substr(sent); + } } - } else _phy.tcpSetNotifyWritable(sock,false); // sanity check... shouldn't happen + } else { + _phy.tcpSetNotifyWritable(sock,false); + } } inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwc) @@ -939,24 +969,26 @@ public: int result = -1; switch(addr->ss_family) { case AF_INET: - //if (_v4UdpSocket) - //result = (_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); +#ifdef ZT_BREAK_UDP + if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { +#endif + if (_v4UdpSocket) + result = ((_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); +#ifdef ZT_BREAK_UDP + } +#endif - if (reinterpret_cast<const InetAddress *>(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL) { +#ifdef ZT1_TCP_FALLBACK_RELAY + // TCP fallback tunnel support + if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { uint64_t now = OSUtils::now(); - /* Engage TCP fallback if we've sent something more than PING_CHECK_INTERVAL ago, - * less than twice the fallback period ago, and haven't heard anything in the fallback - * timeout period. In that case our packets to the global IP scope are probably - * being blocked, so start trying to tunnel them out. - * - * Note that we *always* send them as plain UDP above. This way if we ever get unblocked - * or if our connectivity changes for the better, we will instantly "fail forward" back - * to plain vanilla UDP and abandon the tunnel. - * - * TCP fallback is currently only supported for tunneling to IPv4 addresses. */ - if (((now - _lastSendToGlobal) < ZT1_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > ZT_PING_CHECK_INVERVAL)&&((now - _lastDirectReceiveFromGlobal) > ZT1_TCP_FALLBACK_AFTER)) { + // Engage TCP tunnel fallback if we haven't received anything valid from a global + // IP address in ZT1_TCP_FALLBACK_AFTER milliseconds. If we do start getting + // valid direct traffic we'll stop using it and close the socket after a while. + if (((now - _lastDirectReceiveFromGlobal) > ZT1_TCP_FALLBACK_AFTER)&&((now - _lastRestart) > ZT1_TCP_FALLBACK_AFTER)) { if (_tcpFallbackTunnel) { + Mutex::Lock _l(_tcpFallbackTunnel->writeBuf_m); if (!_tcpFallbackTunnel->writeBuf.length()) _phy.tcpSetNotifyWritable(_tcpFallbackTunnel->sock,true); unsigned long mlen = len + 7; @@ -969,7 +1001,8 @@ public: _tcpFallbackTunnel->writeBuf.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_addr.s_addr))),4); _tcpFallbackTunnel->writeBuf.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_port))),2); _tcpFallbackTunnel->writeBuf.append((const char *)data,len); - } else { + result = 0; + } else if (((now - _lastSendToGlobal) < ZT1_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > (ZT_PING_CHECK_INVERVAL / 2))) { std::vector<InetAddress> tunnelIps(_tcpFallbackResolver.get()); if (tunnelIps.empty()) { if (!_tcpFallbackResolver.running()) @@ -985,11 +1018,18 @@ public: _lastSendToGlobal = now; } +#endif // ZT1_TCP_FALLBACK_RELAY break; case AF_INET6: +#ifdef ZT_BREAK_UDP + if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { +#endif if (_v6UdpSocket) - result = (_phy.udpSend(_v6UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); + result = ((_phy.udpSend(_v6UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); +#ifdef ZT_BREAK_UDP + } +#endif break; default: return -1; @@ -1039,16 +1079,19 @@ public: } Utils::snprintf(tmpn,sizeof(tmpn),"HTTP/1.1 %.3u %s\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n",scode,scodestr); - tc->writeBuf.assign(tmpn); - tc->writeBuf.append("Content-Type: "); - tc->writeBuf.append(contentType); - Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length()); - tc->writeBuf.append(tmpn); - if (!tc->shouldKeepAlive) - tc->writeBuf.append("Connection: close\r\n"); - tc->writeBuf.append("\r\n"); - if (tc->parser.method != HTTP_HEAD) - tc->writeBuf.append(data); + { + Mutex::Lock _l(tc->writeBuf_m); + tc->writeBuf.assign(tmpn); + tc->writeBuf.append("Content-Type: "); + tc->writeBuf.append(contentType); + Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length()); + tc->writeBuf.append(tmpn); + if (!tc->shouldKeepAlive) + tc->writeBuf.append("Connection: close\r\n"); + tc->writeBuf.append("\r\n"); + if (tc->parser.method != HTTP_HEAD) + tc->writeBuf.append(data); + } _phy.tcpSetNotifyWritable(tc->sock,true); } @@ -1092,6 +1135,7 @@ private: ControlPlane *_controlPlane; uint64_t _lastDirectReceiveFromGlobal; uint64_t _lastSendToGlobal; + uint64_t _lastRestart; volatile uint64_t _nextBackgroundTaskDeadline; std::map< uint64_t,EthernetTap * > _taps; diff --git a/service/OneService.hpp b/service/OneService.hpp index aea314f5..7964958c 100644 --- a/service/OneService.hpp +++ b/service/OneService.hpp @@ -127,6 +127,11 @@ public: virtual std::string portDeviceName(uint64_t nwid) const = 0; /** + * @return True if TCP fallback is currently active + */ + virtual bool tcpFallbackActive() const = 0; + + /** * Terminate background service (can be called from other threads) */ virtual void terminate() = 0; diff --git a/service/README.md b/service/README.md index 1900a612..acad97a1 100644 --- a/service/README.md +++ b/service/README.md @@ -26,6 +26,7 @@ A *jsonp* URL argument may be supplied to request JSONP encapsulation. A JSONP r <tr><td>address</td><td>string</td><td>10-digit hexadecimal ZeroTier address of this node</td><td>no</td></tr> <tr><td>publicIdentity</td><td>string</td><td>Full public ZeroTier identity of this node</td><td>no</td></tr> <tr><td>online</td><td>boolean</td><td>Does this node appear to have upstream network access?</td><td>no</td></tr> +<tr><td>tcpFallbackActive</td><td>boolean</td><td>Is TCP fallback mode active?</td><td>no</td></tr> <tr><td>versionMajor</td><td>integer</td><td>ZeroTier major version</td><td>no</td></tr> <tr><td>versionMinor</td><td>integer</td><td>ZeroTier minor version</td><td>no</td></tr> <tr><td>versionRev</td><td>integer</td><td>ZeroTier revision</td><td>no</td></tr> |