diff options
Diffstat (limited to 'service')
-rw-r--r-- | service/ControlPlane.cpp | 16 | ||||
-rw-r--r-- | service/OneService.cpp | 221 | ||||
-rw-r--r-- | service/OneService.hpp | 5 | ||||
-rw-r--r-- | service/README.md | 1 |
4 files changed, 186 insertions, 57 deletions
diff --git a/service/ControlPlane.cpp b/service/ControlPlane.cpp index 2e8290ed..71b3fd3f 100644 --- a/service/ControlPlane.cpp +++ b/service/ControlPlane.cpp @@ -360,18 +360,20 @@ unsigned int ControlPlane::handleRequest( _node->status(&status); Utils::snprintf(json,sizeof(json), "{\n" - "\t\"address\":\"%.10llx\",\n" - "\t\"publicIdentity\":\"%s\",\n" - "\t\"online\":%s,\n" - "\t\"versionMajor\":%d,\n" - "\t\"versionMinor\":%d,\n" - "\t\"versionRev\":%d,\n" - "\t\"version\":\"%d.%d.%d\",\n" + "\t\"address\": \"%.10llx\",\n" + "\t\"publicIdentity\": \"%s\",\n" + "\t\"online\": %s,\n" + "\t\"tcpFallbackActive\": %s,\n" + "\t\"versionMajor\": %d,\n" + "\t\"versionMinor\": %d,\n" + "\t\"versionRev\": %d,\n" + "\t\"version\": \"%d.%d.%d\",\n" "\t\"clock\": %llu\n" "}\n", status.address, status.publicIdentity, (status.online) ? "true" : "false", + (_svc->tcpFallbackActive()) ? "true" : "false", ZEROTIER_ONE_VERSION_MAJOR, ZEROTIER_ONE_VERSION_MINOR, ZEROTIER_ONE_VERSION_REVISION, diff --git a/service/OneService.cpp b/service/OneService.cpp index 3f45cd1e..9a72c240 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -58,6 +58,16 @@ #include "OneService.hpp" #include "ControlPlane.hpp" +/** + * Uncomment to enable UDP breakage switch + * + * If this is defined, the presence of a file called /tmp/ZT_BREAK_UDP + * will cause direct UDP TX/RX to stop working. This can be used to + * test TCP tunneling fallback and other robustness features. Deleting + * this file will cause it to start working again. + */ +//#define ZT_BREAK_UDP + #ifdef ZT_ENABLE_NETWORK_CONTROLLER #include "../controller/SqliteNetworkController.hpp" #else @@ -103,12 +113,16 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; } // Path under ZT1 home for controller database if controller is enabled #define ZT1_CONTROLLER_DB_PATH "controller.db" -// TCP fallback relay host +// TCP fallback relay host -- geo-distributed using Amazon Route53 geo-aware DNS #define ZT1_TCP_FALLBACK_RELAY "tcp-fallback.zerotier.com" +#define ZT1_TCP_FALLBACK_RELAY_PORT 443 // Frequency at which we re-resolve the TCP fallback relay #define ZT1_TCP_FALLBACK_RERESOLVE_DELAY 86400000 +// Attempt to engage TCP fallback after this many ms of no reply to packets sent to global-scope IPs +#define ZT1_TCP_FALLBACK_AFTER 60000 + namespace ZeroTier { namespace { @@ -365,6 +379,7 @@ struct TcpConnection std::string body; std::string writeBuf; + Mutex writeBuf_m; }; class OneServiceImpl : public OneService @@ -376,11 +391,15 @@ public: #ifdef ZT_ENABLE_NETWORK_CONTROLLER _controller((_homePath + ZT_PATH_SEPARATOR_S + ZT1_CONTROLLER_DB_PATH).c_str()), #endif - _phy(this,true), + _phy(this,false), _overrideRootTopology((overrideRootTopology) ? overrideRootTopology : ""), _node((Node *)0), _controlPlane((ControlPlane *)0), + _lastDirectReceiveFromGlobal(0), + _lastSendToGlobal(0), + _lastRestart(0), _nextBackgroundTaskDeadline(0), + _tcpFallbackTunnel((TcpConnection *)0), _termReason(ONE_STILL_RUNNING), _run(true) { @@ -474,6 +493,8 @@ public: } _nextBackgroundTaskDeadline = 0; + uint64_t clockShouldBe = OSUtils::now(); + _lastRestart = clockShouldBe; uint64_t lastTapMulticastGroupCheck = 0; uint64_t lastTcpFallbackResolve = 0; #ifdef ZT_AUTO_UPDATE @@ -489,13 +510,18 @@ public: break; } else _run_m.unlock(); - uint64_t dl = _nextBackgroundTaskDeadline; uint64_t now = OSUtils::now(); + + uint64_t dl = _nextBackgroundTaskDeadline; if (dl <= now) { _node->processBackgroundTasks(now,&_nextBackgroundTaskDeadline); dl = _nextBackgroundTaskDeadline; } + // Attempt to detect sleep/wake events by detecting delay overruns + if ((now > clockShouldBe)&&((now - clockShouldBe) > 2000)) + _lastRestart = now; + #ifdef ZT_AUTO_UPDATE if ((now - lastSoftwareUpdateCheck) >= ZT_AUTO_UPDATE_CHECK_PERIOD) { lastSoftwareUpdateCheck = OSUtils::now(); @@ -508,6 +534,9 @@ public: _tcpFallbackResolver.resolveNow(); } + if ((_tcpFallbackTunnel)&&((now - _lastDirectReceiveFromGlobal) < (ZT1_TCP_FALLBACK_AFTER / 2))) + _phy.close(_tcpFallbackTunnel->sock); + if ((now - lastTapMulticastGroupCheck) >= ZT_TAP_CHECK_MULTICAST_INTERVAL) { lastTapMulticastGroupCheck = now; Mutex::Lock _l(_taps_m); @@ -522,6 +551,7 @@ public: } const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 100; + clockShouldBe = now + (uint64_t)delay; _phy.poll(delay); } } catch (std::exception &exc) { @@ -535,8 +565,8 @@ public: } try { - while (!_tcpConections.empty()) - _phy.close(_tcpConections.begin()->first); + while (!_tcpConnections.empty()) + _phy.close((*_tcpConnections.begin())->sock); } catch ( ... ) {} { @@ -575,6 +605,11 @@ public: return std::string(); } + virtual bool tcpFallbackActive() const + { + return (_tcpFallbackTunnel != (TcpConnection *)0); + } + virtual void terminate() { _run_m.lock(); @@ -587,6 +622,12 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { +#ifdef ZT_BREAK_UDP + if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) + return; +#endif + if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) + _lastDirectReceiveFromGlobal = OSUtils::now(); ZT1_ResultCode rc = _node->processWirePacket( OSUtils::now(), (const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big @@ -610,9 +651,11 @@ public: // Outgoing TCP connections are always TCP fallback tunnel connections. - TcpConnection *tc = &(_tcpConections[sock]); + TcpConnection *tc = new TcpConnection(); + _tcpConnections.insert(tc); + tc->type = TcpConnection::TCP_TUNNEL_OUTGOING; - tc->shouldKeepAlive = true; // unused + tc->shouldKeepAlive = true; tc->parent = this; tc->sock = sock; // from and parser are not used @@ -633,13 +676,17 @@ public: tc->writeBuf.push_back((char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff)); tc->writeBuf.push_back((char)(ZEROTIER_ONE_VERSION_REVISION & 0xff)); _phy.tcpSetNotifyWritable(sock,true); + + _tcpFallbackTunnel = tc; } inline void phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) { // Incoming TCP connections are HTTP JSON API requests. - TcpConnection *tc = &(_tcpConections[sockN]); + TcpConnection *tc = new TcpConnection(); + _tcpConnections.insert(tc); + tc->type = TcpConnection::TCP_HTTP_INCOMING; tc->shouldKeepAlive = true; tc->parent = this; @@ -661,7 +708,13 @@ public: inline void phyOnTcpClose(PhySocket *sock,void **uptr) { - _tcpConections.erase(sock); + TcpConnection *tc = (TcpConnection *)*uptr; + if (tc) { + if (tc == _tcpFallbackTunnel) + _tcpFallbackTunnel = (TcpConnection *)0; + _tcpConnections.erase(tc); + delete tc; + } } inline void phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) @@ -680,11 +733,7 @@ public: case TcpConnection::TCP_TUNNEL_OUTGOING: tc->body.append((const char *)data,len); - if (tc->body.length() > 65535) { - // sanity limit -- a message will never be this big since mlen is 16-bit - _phy.close(sock); - return; - } else if (tc->body.length() >= 5) { + while (tc->body.length() >= 5) { const char *data = tc->body.data(); const unsigned long mlen = ( ((((unsigned long)data[3]) & 0xff) << 8) | (((unsigned long)data[4]) & 0xff) ); if (tc->body.length() >= (mlen + 5)) { @@ -726,28 +775,30 @@ public: return; } - ZT1_ResultCode rc = _node->processWirePacket( - OSUtils::now(), - (const struct sockaddr_storage *)&from, // Phy<> uses sockaddr_storage, so it'll always be that big - data, - plen, - &_nextBackgroundTaskDeadline); - if (ZT1_ResultCode_isFatal(rc)) { - char tmp[256]; - Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - _phy.close(sock); - return; + if (from) { + ZT1_ResultCode rc = _node->processWirePacket( + OSUtils::now(), + reinterpret_cast<struct sockaddr_storage *>(&from), + data, + plen, + &_nextBackgroundTaskDeadline); + if (ZT1_ResultCode_isFatal(rc)) { + char tmp[256]; + Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + _phy.close(sock); + return; + } } } if (tc->body.length() > (mlen + 5)) tc->body = tc->body.substr(mlen + 5); else tc->body = ""; - } + } else break; } break; @@ -757,18 +808,23 @@ public: inline void phyOnTcpWritable(PhySocket *sock,void **uptr) { TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr); - if (tc->writeBuf.length()) { + Mutex::Lock _l(tc->writeBuf_m); + if (tc->writeBuf.length() > 0) { long sent = (long)_phy.tcpSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true); if (sent > 0) { tc->lastActivity = OSUtils::now(); - if ((unsigned long)sent == (unsigned long)tc->writeBuf.length()) { + if ((unsigned long)sent >= (unsigned long)tc->writeBuf.length()) { tc->writeBuf = ""; _phy.tcpSetNotifyWritable(sock,false); if (!tc->shouldKeepAlive) - _phy.close(sock); // will call close handler to delete from _tcpConections - } else tc->writeBuf = tc->writeBuf.substr(sent); + _phy.close(sock); // will call close handler to delete from _tcpConnections + } else { + tc->writeBuf = tc->writeBuf.substr(sent); + } } - } else _phy.tcpSetNotifyWritable(sock,false); // sanity check... shouldn't happen + } else { + _phy.tcpSetNotifyWritable(sock,false); + } } inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwc) @@ -915,17 +971,75 @@ public: inline int nodeWirePacketSendFunction(const struct sockaddr_storage *addr,const void *data,unsigned int len) { + int result = -1; switch(addr->ss_family) { case AF_INET: +#ifdef ZT_BREAK_UDP + if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { +#endif if (_v4UdpSocket) - return (_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); + result = ((_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); +#ifdef ZT_BREAK_UDP + } +#endif + +#ifdef ZT1_TCP_FALLBACK_RELAY + // TCP fallback tunnel support + if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { + uint64_t now = OSUtils::now(); + + // Engage TCP tunnel fallback if we haven't received anything valid from a global + // IP address in ZT1_TCP_FALLBACK_AFTER milliseconds. If we do start getting + // valid direct traffic we'll stop using it and close the socket after a while. + if (((now - _lastDirectReceiveFromGlobal) > ZT1_TCP_FALLBACK_AFTER)&&((now - _lastRestart) > ZT1_TCP_FALLBACK_AFTER)) { + if (_tcpFallbackTunnel) { + Mutex::Lock _l(_tcpFallbackTunnel->writeBuf_m); + if (!_tcpFallbackTunnel->writeBuf.length()) + _phy.tcpSetNotifyWritable(_tcpFallbackTunnel->sock,true); + unsigned long mlen = len + 7; + _tcpFallbackTunnel->writeBuf.push_back((char)0x17); + _tcpFallbackTunnel->writeBuf.push_back((char)0x03); + _tcpFallbackTunnel->writeBuf.push_back((char)0x03); // fake TLS 1.2 header + _tcpFallbackTunnel->writeBuf.push_back((char)((mlen >> 8) & 0xff)); + _tcpFallbackTunnel->writeBuf.push_back((char)(mlen & 0xff)); + _tcpFallbackTunnel->writeBuf.push_back((char)4); // IPv4 + _tcpFallbackTunnel->writeBuf.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_addr.s_addr))),4); + _tcpFallbackTunnel->writeBuf.append(reinterpret_cast<const char *>(reinterpret_cast<const void *>(&(reinterpret_cast<const struct sockaddr_in *>(addr)->sin_port))),2); + _tcpFallbackTunnel->writeBuf.append((const char *)data,len); + result = 0; + } else if (((now - _lastSendToGlobal) < ZT1_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > (ZT_PING_CHECK_INVERVAL / 2))) { + std::vector<InetAddress> tunnelIps(_tcpFallbackResolver.get()); + if (tunnelIps.empty()) { + if (!_tcpFallbackResolver.running()) + _tcpFallbackResolver.resolveNow(); + } else { + bool connected = false; + InetAddress addr(tunnelIps[(unsigned long)now % tunnelIps.size()]); + addr.setPort(ZT1_TCP_FALLBACK_RELAY_PORT); + _phy.tcpConnect(reinterpret_cast<const struct sockaddr *>(&addr),connected); + } + } + } + + _lastSendToGlobal = now; + } +#endif // ZT1_TCP_FALLBACK_RELAY + break; case AF_INET6: +#ifdef ZT_BREAK_UDP + if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { +#endif if (_v6UdpSocket) - return (_phy.udpSend(_v6UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); + result = ((_phy.udpSend(_v6UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); +#ifdef ZT_BREAK_UDP + } +#endif break; + default: + return -1; } - return -1; + return result; } inline void nodeVirtualNetworkFrameFunction(uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) @@ -970,16 +1084,19 @@ public: } Utils::snprintf(tmpn,sizeof(tmpn),"HTTP/1.1 %.3u %s\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n",scode,scodestr); - tc->writeBuf.assign(tmpn); - tc->writeBuf.append("Content-Type: "); - tc->writeBuf.append(contentType); - Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length()); - tc->writeBuf.append(tmpn); - if (!tc->shouldKeepAlive) - tc->writeBuf.append("Connection: close\r\n"); - tc->writeBuf.append("\r\n"); - if (tc->parser.method != HTTP_HEAD) - tc->writeBuf.append(data); + { + Mutex::Lock _l(tc->writeBuf_m); + tc->writeBuf.assign(tmpn); + tc->writeBuf.append("Content-Type: "); + tc->writeBuf.append(contentType); + Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length()); + tc->writeBuf.append(tmpn); + if (!tc->shouldKeepAlive) + tc->writeBuf.append("Connection: close\r\n"); + tc->writeBuf.append("\r\n"); + if (tc->parser.method != HTTP_HEAD) + tc->writeBuf.append(data); + } _phy.tcpSetNotifyWritable(tc->sock,true); } @@ -987,7 +1104,7 @@ public: inline void onHttpResponseFromClient(TcpConnection *tc) { if (!tc->shouldKeepAlive) - _phy.close(tc->sock); // will call close handler, which deletes from _tcpConections + _phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections } private: @@ -1021,13 +1138,17 @@ private: PhySocket *_v4TcpListenSocket; PhySocket *_v6TcpListenSocket; ControlPlane *_controlPlane; + uint64_t _lastDirectReceiveFromGlobal; + uint64_t _lastSendToGlobal; + uint64_t _lastRestart; volatile uint64_t _nextBackgroundTaskDeadline; std::map< uint64_t,EthernetTap * > _taps; std::map< uint64_t,std::vector<InetAddress> > _tapAssignedIps; // ZeroTier assigned IPs, not user or dhcp assigned Mutex _taps_m; - std::map< PhySocket *,TcpConnection > _tcpConections; // no mutex for this since it's done in the main loop thread only + std::set< TcpConnection * > _tcpConnections; // no mutex for this since it's done in the main loop thread only + TcpConnection *_tcpFallbackTunnel; ReasonForTermination _termReason; std::string _fatalErrorMessage; diff --git a/service/OneService.hpp b/service/OneService.hpp index aea314f5..7964958c 100644 --- a/service/OneService.hpp +++ b/service/OneService.hpp @@ -127,6 +127,11 @@ public: virtual std::string portDeviceName(uint64_t nwid) const = 0; /** + * @return True if TCP fallback is currently active + */ + virtual bool tcpFallbackActive() const = 0; + + /** * Terminate background service (can be called from other threads) */ virtual void terminate() = 0; diff --git a/service/README.md b/service/README.md index 1900a612..acad97a1 100644 --- a/service/README.md +++ b/service/README.md @@ -26,6 +26,7 @@ A *jsonp* URL argument may be supplied to request JSONP encapsulation. A JSONP r <tr><td>address</td><td>string</td><td>10-digit hexadecimal ZeroTier address of this node</td><td>no</td></tr> <tr><td>publicIdentity</td><td>string</td><td>Full public ZeroTier identity of this node</td><td>no</td></tr> <tr><td>online</td><td>boolean</td><td>Does this node appear to have upstream network access?</td><td>no</td></tr> +<tr><td>tcpFallbackActive</td><td>boolean</td><td>Is TCP fallback mode active?</td><td>no</td></tr> <tr><td>versionMajor</td><td>integer</td><td>ZeroTier major version</td><td>no</td></tr> <tr><td>versionMinor</td><td>integer</td><td>ZeroTier minor version</td><td>no</td></tr> <tr><td>versionRev</td><td>integer</td><td>ZeroTier revision</td><td>no</td></tr> |