diff options
-rw-r--r-- | node/Constants.hpp | 5 | ||||
-rw-r--r-- | node/Socket.hpp | 2 | ||||
-rw-r--r-- | node/SocketManager.cpp | 15 | ||||
-rw-r--r-- | node/SocketManager.hpp | 1 | ||||
-rw-r--r-- | node/TcpSocket.cpp | 198 | ||||
-rw-r--r-- | node/TcpSocket.hpp | 29 |
6 files changed, 228 insertions, 22 deletions
diff --git a/node/Constants.hpp b/node/Constants.hpp index 90cea3ad..bcefe7f9 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -305,6 +305,11 @@ error_no_byte_order_defined; #define ZT_PEER_LINK_ACTIVITY_TIMEOUT ((ZT_PEER_DIRECT_PING_DELAY * 2) + 1000) /** + * Close TCP tunnels if unused for this long + */ +#define ZT_TCP_TUNNEL_ACTIVITY_TIMEOUT ZT_PEER_LINK_ACTIVITY_TIMEOUT + +/** * Stop relaying via peers that have not responded to direct sends in this long */ #define ZT_PEER_RELAY_CONVERSATION_LATENCY_THRESHOLD 10000 diff --git a/node/Socket.hpp b/node/Socket.hpp index 4eef01b3..ff185e6f 100644 --- a/node/Socket.hpp +++ b/node/Socket.hpp @@ -43,7 +43,7 @@ /** * Maximum discrete message length supported by all socket types */ -#define ZT_SOCKET_MAX_MESSAGE_LEN 4096 +#define ZT_SOCKET_MAX_MESSAGE_LEN 2048 namespace ZeroTier { diff --git a/node/SocketManager.cpp b/node/SocketManager.cpp index a9783082..5dbbf6d7 100644 --- a/node/SocketManager.cpp +++ b/node/SocketManager.cpp @@ -351,8 +351,9 @@ bool SocketManager::sendFirewallOpener(const InetAddress &to,int hopLimit) void SocketManager::poll(unsigned long timeout) { - fd_set rfds,wfds,nfds; + fd_set rfds,wfds,efds; struct timeval tv; + std::vector< SharedPtr<Socket> > ts; #ifdef __WINDOWS__ SOCKET sockfd; #else @@ -365,11 +366,11 @@ void SocketManager::poll(unsigned long timeout) memcpy(&rfds,&_readfds,sizeof(rfds)); memcpy(&wfds,&_writefds,sizeof(wfds)); _fdSetLock.unlock(); - FD_ZERO(&nfds); + FD_ZERO(&efds); tv.tv_sec = (long)(timeout / 1000); tv.tv_usec = (long)((timeout % 1000) * 1000); - select(_nfds,&rfds,&wfds,&nfds,(timeout > 0) ? &tv : (struct timeval *)0); + select(_nfds,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0); if (FD_ISSET(_whackReceivePipe,&rfds)) { char tmp[32]; @@ -391,7 +392,7 @@ void SocketManager::poll(unsigned long timeout) #endif InetAddress fromia((const struct sockaddr *)&from); Mutex::Lock _l2(_tcpSockets_m); - _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(sockfd,false,fromia)); + _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia)); _fdSetLock.lock(); FD_SET(sockfd,&_readfds); _fdSetLock.unlock(); @@ -408,7 +409,7 @@ void SocketManager::poll(unsigned long timeout) #endif InetAddress fromia((const struct sockaddr *)&from); Mutex::Lock _l2(_tcpSockets_m); - _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(sockfd,false,fromia)); + _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia)); _fdSetLock.lock(); FD_SET(sockfd,&_readfds); _fdSetLock.unlock(); @@ -420,13 +421,13 @@ void SocketManager::poll(unsigned long timeout) if ((_udpV6Socket)&&(FD_ISSET(_udpV6Socket->_sock,&rfds))) _udpV6Socket->notifyAvailableForRead(_udpV6Socket,this); - std::vector< SharedPtr<Socket> > ts; { // grab copy of TCP sockets list because _tcpSockets[] might be changed in a handler Mutex::Lock _l2(_tcpSockets_m); if (_tcpSockets.size()) { ts.reserve(_tcpSockets.size()); + uint64_t now = Utils::now(); for(std::map< InetAddress,SharedPtr<Socket> >::iterator s(_tcpSockets.begin());s!=_tcpSockets.end();) { - if (true) { // TODO: TCP expiration check + if ((now - ((TcpSocket *)s->second.get())->_lastActivity) < ZT_TCP_TUNNEL_ACTIVITY_TIMEOUT) { ts.push_back(s->second); ++s; } else { diff --git a/node/SocketManager.hpp b/node/SocketManager.hpp index 3a42897c..9f980e37 100644 --- a/node/SocketManager.hpp +++ b/node/SocketManager.hpp @@ -144,6 +144,7 @@ private: _fdSetLock.unlock(); } + // Called in SocketManager destructor or in constructor cleanup before exception throwing inline void _closeSockets() throw() { diff --git a/node/TcpSocket.cpp b/node/TcpSocket.cpp new file mode 100644 index 00000000..83169d12 --- /dev/null +++ b/node/TcpSocket.cpp @@ -0,0 +1,198 @@ +/* + * ZeroTier One - Global Peer to Peer Ethernet + * Copyright (C) 2011-2014 ZeroTier Networks LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <fcntl.h> +#include <time.h> +#include <errno.h> +#include <sys/types.h> + +#include "Constants.hpp" +#include "TcpSocket.hpp" +#include "SocketManager.hpp" + +#ifdef __WINDOWS__ +#include <WinSock2.h> +#include <WS2tcpip.h> +#include <Windows.h> +#else +#include <unistd.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <signal.h> +#endif + +#define ZT_TCP_MAX_SENDQ_LENGTH (ZT_SOCKET_MAX_MESSAGE_LEN * 8) + +namespace ZeroTier { + +TcpSocket::~TcpSocket() +{ +#ifdef __WINDOWS__ + ::closesocket(_sock); +#else + ::close(_sock); +#endif + + if (_outbuf) + ::free(_outbuf); +} + +bool TcpSocket::send(const InetAddress &to,const void *msg,unsigned int msglen) +{ + if (msglen > ZT_SOCKET_MAX_MESSAGE_LEN) + return false; // message too big + if (!msglen) + return true; // sanity check + + Mutex::Lock _l(_writeLock); + + bool outputWasEnqueued = (_outptr != 0); + + // Ensure that _outbuf is large enough + unsigned int newptr = _outptr + 5 + msglen; + if (newptr > _outbufsize) { + unsigned int newbufsize = _outbufsize; + while (newbufsize < newptr) + newbufsize += ZT_SOCKET_MAX_MESSAGE_LEN; + if (newbufsize > ZT_TCP_MAX_SENDQ_LENGTH) + return false; // cannot send, outbuf full + unsigned char *newbuf = (unsigned char *)::malloc(newbufsize); + if (!newbuf) + return false; // cannot send, no memory + _outbufsize = newbufsize; + if (_outbuf) { + memcpy(newbuf,_outbuf,_outptr); + ::free(_outbuf); + } + _outbuf = newbuf; + } + + _outbuf[_outptr++] = 0x17; // look like TLS data + _outbuf[_outptr++] = 0x03; + _outbuf[_outptr++] = 0x03; // look like TLS 1.2 + _outbuf[_outptr++] = (unsigned char)((msglen >> 8) & 0xff); + _outbuf[_outptr++] = (unsigned char)(msglen & 0xff); + for(unsigned int i=0;i<msglen;++i) + _outbuf[_outptr++] = ((const unsigned char *)msg)[i]; + + if (!outputWasEnqueued) { + // If no output was enqueued before this, try to send() it and then + // start a queued write if any remains after that. + + int n = (int)::send(_sock,_outbuf,_outptr,0); + if (n > 0) + memmove(_outbuf,_outbuf + (unsigned int)n,_outptr -= (unsigned int)n); + + if (_outptr) { + _sm->startNotifyWrite(this); + _sm->whack(); + } + } // else just leave in _outbuf[] to get written when stream is available for write + + return true; +} + +bool TcpSocket::notifyAvailableForRead(const SharedPtr<Socket> &self,SocketManager *sm) +{ + unsigned char buf[65536]; + + // will not be called concurrently since only SocketManager::poll() calls this + + int n = (int)::recv(_sock,buf,sizeof(buf),0); + if (n <= 0) + return false; // read error, stream probably closed + + unsigned int p = _inptr,pl = 0; + for(int k=0;k<n;++k) { + _inbuf[p++] = buf[k]; + if (p >= (int)sizeof(_inbuf)) + return false; // read overrun, packet too large or invalid + + if ((!pl)&&(p >= 5)) { + if (_inbuf[0] == 0x17) { + // fake TLS data frame, next two bytes are TLS version and are ignored + pl = (((unsigned int)_inbuf[3] << 8) | (unsigned int)_inbuf[4]) + 5; + } else return false; // in the future we may support fake TLS handshakes + } + + if ((pl)&&(p >= pl)) { + Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> data(_inbuf + 5,pl - 5); + sm->handleReceivedPacket(self,_remote,data); + memmove(_inbuf,_inbuf + pl,p - pl); + p -= pl; + pl = 0; + } + } + _inptr = p; + + return true; +} + +bool TcpSocket::notifyAvailableForWrite(const SharedPtr<Socket> &self,SocketManager *sm) +{ + Mutex::Lock _l(_writeLock); + + if (_outptr) { + int n = (int)::send(_sock,_outbuf,_outptr,0); + if (n < 0) { + switch(errno) { +#ifdef EBADF + case EBADF: +#endif +#ifdef EINVAL + case EINVAL: +#endif +#ifdef ENOTSOCK + case ENOTSOCK: +#endif +#ifdef ECONNRESET + case ECONNRESET: +#endif +#ifdef EPIPE + case EPIPE: +#endif +#ifdef ENETDOWN + case ENETDOWN: +#endif + return false; + default: + break; + } + } else if (n > 0) + memmove(_outbuf,_outbuf + (unsigned int)n,_outptr -= (unsigned int)n); + } + + if (!_outptr) + sm->stopNotifyWrite(this); + + return true; +} + +} // namespace ZeroTier diff --git a/node/TcpSocket.hpp b/node/TcpSocket.hpp index e73d7ab3..0c24808d 100644 --- a/node/TcpSocket.hpp +++ b/node/TcpSocket.hpp @@ -35,9 +35,6 @@ #include "Utils.hpp" #include "Socket.hpp" -#define ZT_TCP_SENDQ_LENGTH 4096 -#define ZT_TCP_MAX_MESSAGE_LENGTH 2048 - namespace ZeroTier { class SocketManager; @@ -74,30 +71,34 @@ public: protected: #ifdef __WINDOWS__ - TcpSocket(SOCKET s,bool c,const InetAddress &r) : + TcpSocket(SocketManager *sm,SOCKET s,bool c,const InetAddress &r) : #else - TcpSocket(int s,bool c,const InetAddress &r) : + TcpSocket(SocketManager *sm,int s,bool c,const InetAddress &r) : #endif Socket(Socket::ZT_SOCKET_TYPE_TCP,s), - _lastReceivedData(Utils::now()), - _inptr(0), + _lastActivity(Utils::now()), + _sm(sm), + _outbuf((unsigned char *)0), _outptr(0), + _outbufsize(0), + _inptr(0), _connecting(c), - _remote(r), - _lock() {} + _remote(r) {} virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,SocketManager *sm); virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,SocketManager *sm); private: - unsigned char _outbuf[ZT_TCP_SENDQ_LENGTH]; - unsigned char _inbuf[ZT_TCP_MAX_MESSAGE_LENGTH]; - uint64_t _lastReceivedData; // updated whenever data is received, checked directly by SocketManager for stale TCP cleanup - unsigned int _inptr; + unsigned char _inbuf[ZT_SOCKET_MAX_MESSAGE_LEN]; + uint64_t _lastActivity; // updated whenever data is received, checked directly by SocketManager for stale TCP cleanup + SocketManager *_sm; + unsigned char *_outbuf; unsigned int _outptr; + unsigned int _outbufsize; + unsigned int _inptr; bool _connecting; // manipulated directly by SocketManager, true if connect() is in progress InetAddress _remote; - Mutex _lock; + Mutex _writeLock; }; } // namespace ZeroTier |