diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2014-10-21 14:26:10 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2014-10-21 14:26:10 -0700 |
commit | 128a13107023075a8167bfdfb8ed9d404bd1dccd (patch) | |
tree | a217f9837bd07daf2722d5e00c5b40ebe29c1220 /osnet | |
parent | 6bc9a938cfde3ff6caabc0a38f6e6d4650e3c384 (diff) | |
download | infinitytier-128a13107023075a8167bfdfb8ed9d404bd1dccd.tar.gz infinitytier-128a13107023075a8167bfdfb8ed9d404bd1dccd.zip |
About halfway there in refactoring to support pluggable SocketManager.
Diffstat (limited to 'osnet')
-rw-r--r-- | osnet/NativeSocketManager.cpp | 370 | ||||
-rw-r--r-- | osnet/NativeSocketManager.hpp | 96 |
2 files changed, 345 insertions, 121 deletions
diff --git a/osnet/NativeSocketManager.cpp b/osnet/NativeSocketManager.cpp index 8e3ee9e0..a85394ed 100644 --- a/osnet/NativeSocketManager.cpp +++ b/osnet/NativeSocketManager.cpp @@ -25,6 +25,8 @@ * LLC. Start here: http://www.zerotier.com/ */ +/* Native SocketManager for Windows and Unix */ + #include <stdio.h> #include <string.h> #include <stdlib.h> @@ -32,9 +34,10 @@ #include <time.h> #include <sys/types.h> -#include "SocketManager.hpp" -#include "UdpSocket.hpp" -#include "TcpSocket.hpp" +#include <algorithm> + +#include "../node/Constants.hpp" +#include "NativeSocketManager.hpp" #ifndef __WINDOWS__ #include <errno.h> @@ -44,7 +47,7 @@ #include <signal.h> #include <netinet/in.h> #include <netinet/tcp.h> -#endif +#endif // !__WINDOWS__ // Uncomment to turn off TCP Nagle //#define ZT_TCP_NODELAY @@ -62,6 +65,259 @@ namespace ZeroTier { +////////////////////////////////////////////////////////////////////////////// +// Socket implementations +////////////////////////////////////////////////////////////////////////////// + +class NativeSocket : public Socket +{ +public: +#ifdef __WINDOWS__ + NativeSocket(const Type &t,SOCKET s) : Socket(t),_sock(s) {} + SOCKET _sock; +#else + NativeSocket(const Type &t,int s) : Socket(t),_sock(s) {} + int _sock; +#endif + + virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm) = 0; + virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) = 0; +}; + +/** + * Native UDP socket + */ +class NativeUdpSocket : public NativeSocket +{ +public: +#ifdef __WINDOWS__ + NativeUdpSocket(Type t,SOCKET s) : NativeSocket(t,s) {} +#else + NativeUdpSocket(Type t,int s) : NativeSocket(t,s) {} +#endif + + virtual ~NativeUdpSocket() + { +#ifdef __WINDOWS__ + ::closesocket(_sock); +#else + ::close(_sock); +#endif + } + + virtual bool send(const InetAddress &to,const void *msg,unsigned int msglen) + { + if (to.isV6()) { +#ifdef __WINDOWS__ + return ((int)sendto(_sock,(const char *)msg,msglen,0,to.saddr(),to.saddrLen()) == (int)msglen); +#else + return ((int)sendto(_sock,msg,msglen,0,to.saddr(),to.saddrLen()) == (int)msglen); +#endif + } else { +#ifdef __WINDOWS__ + return ((int)sendto(_sock,(const char *)msg,msglen,0,to.saddr(),to.saddrLen()) == (int)msglen); +#else + return ((int)sendto(_sock,msg,msglen,0,to.saddr(),to.saddrLen()) == (int)msglen); +#endif + } + } + + virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm) + { + Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> buf; + InetAddress from; + socklen_t salen = from.saddrSpaceLen(); + int n = (int)recvfrom(_sock,(char *)(buf.data()),ZT_SOCKET_MAX_MESSAGE_LEN,0,from.saddr(),&salen); + if (n > 0) { + buf.setSize((unsigned int)n); + sm->handleReceivedPacket(self,from,buf); + } + return true; + } + + virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) + { + return true; + } +}; + +/** + * A TCP socket encapsulating ZeroTier packets over a TCP stream connection + * + * This implements a simple packet encapsulation that is designed to look like + * a TLS connection. It's not a TLS connection, but it sends TLS format record + * headers. It could be extended in the future to implement a fake TLS + * handshake. + * + * At the moment, each packet is just made to look like TLS application data: + * <[1] TLS content type> - currently 0x17 for "application data" + * <[1] TLS major version> - currently 0x03 for TLS 1.2 + * <[1] TLS minor version> - currently 0x03 for TLS 1.2 + * <[2] payload length> - 16-bit length of payload in bytes + * <[...] payload> - Message payload + * + * The primary purpose of TCP sockets is to work over ports like HTTPS(443), + * allowing users behind particularly fascist firewalls to at least reach + * ZeroTier's supernodes. UDP is the preferred method of communication as + * encapsulating L2 and L3 protocols over TCP is inherently inefficient + * due to double-ACKs. So TCP is only used as a fallback. + */ +class NativeTcpSocket : public NativeSocket +{ +public: +#ifdef __WINDOWS__ + NativeTcpSocket(NativeSocketManager *sm,SOCKET s,Socket::Type t,bool c,const InetAddress &r) : +#else + NativeTcpSocket(NativeSocketManager *sm,int s,Socket::Type t,bool c,const InetAddress &r) : +#endif + NativeSocket(t,s), + _lastActivity(Utils::now()), + _sm(sm), + _inptr(0), + _outptr(0), + _connecting(c), + _remote(r) {} + + virtual ~NativeTcpSocket() + { +#ifdef __WINDOWS__ + ::closesocket(_sock); +#else + ::close(_sock); +#endif + } + + virtual bool send(const InetAddress &to,const void *msg,unsigned int msglen) + { + if (msglen > ZT_SOCKET_MAX_MESSAGE_LEN) + return false; // message too big + if (!msglen) + return true; // sanity check + + Mutex::Lock _l(_writeLock); + + bool writeInProgress = ((_outptr != 0)||(_connecting)); + + if ((_outptr + 5 + msglen) > (unsigned int)sizeof(_outbuf)) + return false; + + _outbuf[_outptr++] = 0x17; // look like TLS data + _outbuf[_outptr++] = 0x03; + _outbuf[_outptr++] = 0x03; // look like TLS 1.2 + _outbuf[_outptr++] = (unsigned char)((msglen >> 8) & 0xff); + _outbuf[_outptr++] = (unsigned char)(msglen & 0xff); + for(unsigned int i=0;i<msglen;++i) + _outbuf[_outptr++] = ((const unsigned char *)msg)[i]; + + if (!writeInProgress) { + // If no output was enqueued before this, try to send() it and then + // start a queued write if any remains after that. + + int n = (int)::send(_sock,(const char *)_outbuf,_outptr,0); + if (n > 0) + memmove(_outbuf,_outbuf + (unsigned int)n,_outptr -= (unsigned int)n); + + if (_outptr) { + _sm->_startNotifyWrite(this); + _sm->whack(); + } + } // else just leave in _outbuf[] to get written when stream is available for write + + return true; + } + + virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm) + { + unsigned char buf[65536]; + + int n = (int)::recv(_sock,(char *)buf,sizeof(buf),0); + if (n <= 0) + return false; // read error, stream probably closed + + unsigned int p = _inptr,pl = 0; + for(int k=0;k<n;++k) { + _inbuf[p++] = buf[k]; + if (p >= (int)sizeof(_inbuf)) + return false; // read overrun, packet too large or invalid + + if ((!pl)&&(p >= 5)) { + if (_inbuf[0] == 0x17) { + // fake TLS data frame, next two bytes are TLS version and are ignored + pl = (((unsigned int)_inbuf[3] << 8) | (unsigned int)_inbuf[4]) + 5; + } else return false; // in the future we may support fake TLS handshakes + } + + if ((pl)&&(p >= pl)) { + Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> data(_inbuf + 5,pl - 5); + memmove(_inbuf,_inbuf + pl,p -= pl); + try { + sm->handleReceivedPacket(self,_remote,data); + } catch ( ... ) {} // handlers should not throw + pl = 0; + } + } + _inptr = p; + + return true; + } + + virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) + { + Mutex::Lock _l(_writeLock); + + if (_connecting) + _connecting = false; + + if (_outptr) { + int n = (int)::send(_sock,(const char *)_outbuf,_outptr,0); +#ifdef __WINDOWS__ + if (n == SOCKET_ERROR) { + switch(WSAGetLastError()) { + case WSAEINTR: + case WSAEWOULDBLOCK: + break; + default: + return false; + } +#else + if (n <= 0) { + switch(errno) { +#ifdef EAGAIN + case EAGAIN: +#endif +#if defined(EWOULDBLOCK) && ( !defined(EAGAIN) || (EWOULDBLOCK != EAGAIN) ) + case EWOULDBLOCK: +#endif +#ifdef EINTR + case EINTR: +#endif + break; + default: + return false; + } +#endif + } else memmove(_outbuf,_outbuf + (unsigned int)n,_outptr -= (unsigned int)n); + } + + if (!_outptr) + sm->_stopNotifyWrite(this); + + return true; + } + + unsigned char _inbuf[ZT_SOCKET_MAX_MESSAGE_LEN]; + unsigned char _outbuf[ZT_SOCKET_MAX_MESSAGE_LEN * 4]; + uint64_t _lastActivity; // updated whenever data is received, checked directly by SocketManager for stale TCP cleanup + NativeSocketManager *_sm; + unsigned int _inptr; + unsigned int _outptr; + bool _connecting; // manipulated directly by SocketManager, true if connect() is in progress + InetAddress _remote; + Mutex _writeLock; +}; + +////////////////////////////////////////////////////////////////////////////// + #ifdef __WINDOWS__ // hack copied from StackOverflow, behaves a bit like pipe() on *nix systems static inline void winPipeHack(SOCKET fds[2]) @@ -87,18 +343,17 @@ static inline void winPipeHack(SOCKET fds[2]) } #endif -SocketManager::SocketManager( +NativeSocketManager::NativeSocketManager( int localUdpPort, int localTcpPort, void (*packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &), void *arg) : + SocketManager(packetHandler,arg), _whackSendPipe(INVALID_SOCKET), _whackReceivePipe(INVALID_SOCKET), _tcpV4ListenSocket(INVALID_SOCKET), _tcpV6ListenSocket(INVALID_SOCKET), - _nfds(0), - _packetHandler(packetHandler), - _arg(arg) + _nfds(0) { FD_ZERO(&_readfds); FD_ZERO(&_writefds); @@ -278,7 +533,7 @@ SocketManager::SocketManager( throw std::runtime_error("unable to bind to port"); } - _udpV6Socket = SharedPtr<Socket>(new UdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V6,s)); + _udpV6Socket = SharedPtr<Socket>(new NativeUdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V6,s)); #ifdef __WINDOWS__ u_long iMode=1; ioctlsocket(s,FIONBIO,&iMode); @@ -348,7 +603,7 @@ SocketManager::SocketManager( throw std::runtime_error("unable to bind to port"); } - _udpV4Socket = SharedPtr<Socket>(new UdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V4,s)); + _udpV4Socket = SharedPtr<Socket>(new NativeUdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V4,s)); #ifdef __WINDOWS__ u_long iMode=1; ioctlsocket(s,FIONBIO,&iMode); @@ -362,13 +617,13 @@ SocketManager::SocketManager( _updateNfds(); } -SocketManager::~SocketManager() +NativeSocketManager::~NativeSocketManager() { Mutex::Lock _l(_pollLock); _closeSockets(); } -bool SocketManager::send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen) +bool NativeSocketManager::send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen) { if (tcp) { SharedPtr<Socket> ts; @@ -418,7 +673,7 @@ bool SocketManager::send(const InetAddress &to,bool tcp,bool autoConnectTcp,cons } else connecting = true; } - ts = SharedPtr<Socket>(new TcpSocket(this,s,Socket::ZT_SOCKET_TYPE_TCP_OUT,connecting,to)); + ts = SharedPtr<Socket>(new NativeTcpSocket(this,s,Socket::ZT_SOCKET_TYPE_TCP_OUT,connecting,to)); if (!ts->send(to,msg,msglen)) { _fdSetLock.lock(); FD_CLR(s,&_readfds); @@ -452,7 +707,7 @@ bool SocketManager::send(const InetAddress &to,bool tcp,bool autoConnectTcp,cons return false; } -void SocketManager::poll(unsigned long timeout) +void NativeSocketManager::poll(unsigned long timeout) { fd_set rfds,wfds,efds; struct timeval tv; @@ -508,7 +763,7 @@ void SocketManager::poll(unsigned long timeout) InetAddress fromia((const struct sockaddr *)&from); Mutex::Lock _l2(_tcpSockets_m); try { - _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,Socket::ZT_SOCKET_TYPE_TCP_IN,false,fromia)); + _tcpSockets[fromia] = SharedPtr<Socket>(new NativeTcpSocket(this,sockfd,Socket::ZT_SOCKET_TYPE_TCP_IN,false,fromia)); #ifdef __WINDOWS__ { u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); } #ifdef ZT_TCP_NODELAY @@ -548,7 +803,7 @@ void SocketManager::poll(unsigned long timeout) InetAddress fromia((const struct sockaddr *)&from); Mutex::Lock _l2(_tcpSockets_m); try { - _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,Socket::ZT_SOCKET_TYPE_TCP_IN,false,fromia)); + _tcpSockets[fromia] = SharedPtr<Socket>(new NativeTcpSocket(this,sockfd,Socket::ZT_SOCKET_TYPE_TCP_IN,false,fromia)); #ifdef __WINDOWS__ { u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); } #ifdef ZT_TCP_NODELAY @@ -576,11 +831,15 @@ void SocketManager::poll(unsigned long timeout) } } - if ((_udpV4Socket)&&(FD_ISSET(_udpV4Socket->_sock,&rfds))) { - _udpV4Socket->notifyAvailableForRead(_udpV4Socket,this); - } - if ((_udpV6Socket)&&(FD_ISSET(_udpV6Socket->_sock,&rfds))) { - _udpV6Socket->notifyAvailableForRead(_udpV6Socket,this); + { + NativeUdpSocket *usock = (NativeUdpSocket *)_udpV4Socket.ptr(); + if ((usock)&&(FD_ISSET(usock->_sock,&rfds))) { + usock->notifyAvailableForRead(_udpV4Socket,this); + } + usock = (NativeUdpSocket *)_udpV6Socket.ptr(); + if ((usock)&&(FD_ISSET(usock->_sock,&rfds))) { + usock->notifyAvailableForRead(_udpV6Socket,this); + } } bool closedSockets = false; @@ -590,17 +849,18 @@ void SocketManager::poll(unsigned long timeout) ts.reserve(_tcpSockets.size()); uint64_t now = Utils::now(); for(std::map< InetAddress,SharedPtr<Socket> >::iterator s(_tcpSockets.begin());s!=_tcpSockets.end();) { + NativeTcpSocket *tsock = (NativeTcpSocket *)s->second.ptr(); #ifdef __WINDOWS__ - if ( ((now - ((TcpSocket *)s->second.ptr())->_lastActivity) < ZT_TCP_TUNNEL_ACTIVITY_TIMEOUT) && (! ((((TcpSocket *)s->second.ptr())->_connecting)&&(FD_ISSET(s->second->_sock,&efds))) ) ) { + if ( ((now - tsock->_lastActivity) < ZT_TCP_TUNNEL_ACTIVITY_TIMEOUT) && (! ((tsock->_connecting)&&(FD_ISSET(tsock->_sock,&efds))) ) ) { #else - if ((now - ((TcpSocket *)s->second.ptr())->_lastActivity) < ZT_TCP_TUNNEL_ACTIVITY_TIMEOUT) { + if ((now - tsock->_lastActivity) < ZT_TCP_TUNNEL_ACTIVITY_TIMEOUT) { #endif ts.push_back(s->second); ++s; } else { _fdSetLock.lock(); - FD_CLR(s->second->_sock,&_readfds); - FD_CLR(s->second->_sock,&_writefds); + FD_CLR(tsock->_sock,&_readfds); + FD_CLR(tsock->_sock,&_writefds); _fdSetLock.unlock(); _tcpSockets.erase(s++); closedSockets = true; @@ -609,29 +869,30 @@ void SocketManager::poll(unsigned long timeout) } } for(std::vector< SharedPtr<Socket> >::iterator s(ts.begin());s!=ts.end();++s) { - if (FD_ISSET((*s)->_sock,&wfds)) { - if (!(*s)->notifyAvailableForWrite(*s,this)) { + NativeTcpSocket *tsock = (NativeTcpSocket *)s->ptr(); + if (FD_ISSET(tsock->_sock,&wfds)) { + if (!tsock->notifyAvailableForWrite(*s,this)) { { Mutex::Lock _l2(_tcpSockets_m); - _tcpSockets.erase(((TcpSocket *)s->ptr())->_remote); + _tcpSockets.erase(tsock->_remote); } _fdSetLock.lock(); - FD_CLR((*s)->_sock,&_readfds); - FD_CLR((*s)->_sock,&_writefds); + FD_CLR(tsock->_sock,&_readfds); + FD_CLR(tsock->_sock,&_writefds); _fdSetLock.unlock(); closedSockets = true; continue; } } - if (FD_ISSET((*s)->_sock,&rfds)) { - if (!(*s)->notifyAvailableForRead(*s,this)) { + if (FD_ISSET(tsock->_sock,&rfds)) { + if (!tsock->notifyAvailableForRead(*s,this)) { { Mutex::Lock _l2(_tcpSockets_m); - _tcpSockets.erase(((TcpSocket *)s->ptr())->_remote); + _tcpSockets.erase(tsock->_remote); } _fdSetLock.lock(); - FD_CLR((*s)->_sock,&_readfds); - FD_CLR((*s)->_sock,&_writefds); + FD_CLR(tsock->_sock,&_readfds); + FD_CLR(tsock->_sock,&_writefds); _fdSetLock.unlock(); closedSockets = true; continue; @@ -642,7 +903,7 @@ void SocketManager::poll(unsigned long timeout) _updateNfds(); } -void SocketManager::whack() +void NativeSocketManager::whack() { _whackSendPipe_m.lock(); #ifdef __WINDOWS__ @@ -653,14 +914,14 @@ void SocketManager::whack() _whackSendPipe_m.unlock(); } -void SocketManager::closeTcpSockets() +void NativeSocketManager::closeTcpSockets() { { Mutex::Lock _l2(_tcpSockets_m); _fdSetLock.lock(); for(std::map< InetAddress,SharedPtr<Socket> >::iterator s(_tcpSockets.begin());s!=_tcpSockets.end();++s) { - FD_CLR(s->second->_sock,&_readfds); - FD_CLR(s->second->_sock,&_writefds); + FD_CLR(((NativeTcpSocket *)s->second.ptr())->_sock,&_readfds); + FD_CLR(((NativeTcpSocket *)s->second.ptr())->_sock,&_writefds); } _fdSetLock.unlock(); _tcpSockets.clear(); @@ -668,8 +929,21 @@ void SocketManager::closeTcpSockets() _updateNfds(); } -void SocketManager::_closeSockets() - throw() +void NativeSocketManager::_startNotifyWrite(const NativeSocket *sock) +{ + _fdSetLock.lock(); + FD_SET(sock->_sock,&_writefds); + _fdSetLock.unlock(); +} + +void NativeSocketManager::_stopNotifyWrite(const NativeSocket *sock) +{ + _fdSetLock.lock(); + FD_CLR(sock->_sock,&_writefds); + _fdSetLock.unlock(); +} + +void NativeSocketManager::_closeSockets() { #ifdef __WINDOWS__ if (_whackSendPipe != INVALID_SOCKET) @@ -692,7 +966,7 @@ void SocketManager::_closeSockets() #endif } -void SocketManager::_updateNfds() +void NativeSocketManager::_updateNfds() { #ifdef __WINDOWS__ SOCKET nfds = _whackSendPipe; @@ -705,14 +979,14 @@ void SocketManager::_updateNfds() nfds = _tcpV4ListenSocket; if (_tcpV6ListenSocket > nfds) nfds = _tcpV6ListenSocket; - if ((_udpV4Socket)&&(_udpV4Socket->_sock > nfds)) - nfds = _udpV4Socket->_sock; - if ((_udpV6Socket)&&(_udpV6Socket->_sock > nfds)) - nfds = _udpV6Socket->_sock; + if ((_udpV4Socket)&&(((NativeUdpSocket *)_udpV4Socket.ptr())->_sock > nfds)) + nfds = ((NativeUdpSocket *)_udpV4Socket.ptr())->_sock; + if ((_udpV6Socket)&&(((NativeUdpSocket *)_udpV6Socket.ptr())->_sock > nfds)) + nfds = ((NativeUdpSocket *)_udpV6Socket.ptr())->_sock; Mutex::Lock _l(_tcpSockets_m); for(std::map< InetAddress,SharedPtr<Socket> >::const_iterator s(_tcpSockets.begin());s!=_tcpSockets.end();++s) { - if (s->second->_sock > nfds) - nfds = s->second->_sock; + if (((NativeTcpSocket *)s->second.ptr())->_sock > nfds) + nfds = ((NativeTcpSocket *)s->second.ptr())->_sock; } _nfds = (int)nfds; } diff --git a/osnet/NativeSocketManager.hpp b/osnet/NativeSocketManager.hpp index 81d2e780..d6b014cb 100644 --- a/osnet/NativeSocketManager.hpp +++ b/osnet/NativeSocketManager.hpp @@ -25,8 +25,8 @@ * LLC. Start here: http://www.zerotier.com/ */ -#ifndef ZT_SOCKETMANAGER_HPP -#define ZT_SOCKETMANAGER_HPP +#ifndef ZT_NATIVESOCKETMANAGER_HPP +#define ZT_NATIVESOCKETMANAGER_HPP #include <stdio.h> #include <stdlib.h> @@ -34,14 +34,11 @@ #include <map> #include <stdexcept> -#include "Constants.hpp" - -#include "SharedPtr.hpp" -#include "InetAddress.hpp" -#include "Socket.hpp" -#include "Mutex.hpp" -#include "NonCopyable.hpp" -#include "Buffer.hpp" +#include "../node/Constants.hpp" +#include "../node/SharedPtr.hpp" +#include "../node/Mutex.hpp" +#include "../node/SocketManager.hpp" +#include "../node/Socket.hpp" #ifdef __WINDOWS__ #include <WinSock2.h> @@ -56,16 +53,19 @@ namespace ZeroTier { +class NativeSocket; +class NativeUdpSocket; +class NativeTcpSocket; + /** * Socket I/O multiplexer * * This wraps select(), epoll(), etc. and handles creation of Sockets. */ -class SocketManager : NonCopyable +class NativeSocketManager : public SocketManager { - friend class Socket; - friend class UdpSocket; - friend class TcpSocket; + friend class NativeUdpSocket; + friend class NativeTcpSocket; public: /** @@ -75,52 +75,18 @@ public: * @param arg Second argument to packetHandler() * @throws std::runtime_error Could not bind local port(s) or open socket(s) */ - SocketManager( + NativeSocketManager( int localUdpPort, int localTcpPort, void (*packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &), void *arg); - ~SocketManager(); - - /** - * Send a message to a remote peer - * - * @param to Destination address - * @param tcp Use TCP? - * @param autoConnectTcp If true, automatically initiate TCP connection if there is none - * @param msg Message to send - * @param msglen Length of message - */ - bool send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen); - - /** - * Send a message to a remote peer via UDP (shortcut for setting both TCP params to false in send) - * - * @param to Destination address - * @param msg Message to send - * @param msglen Length of message - */ - inline bool sendUdp(const InetAddress &to,const void *msg,unsigned int msglen) { return send(to,false,false,msg,msglen); } + virtual ~NativeSocketManager(); - /** - * Perform I/O polling operation (e.g. select()) - * - * If called concurrently, one will block until the other completes. - * - * @param timeout Timeout in milliseconds, may return sooner if whack() is called - */ - void poll(unsigned long timeout); - - /** - * Cause current or next blocking poll() operation to timeout immediately - */ - void whack(); - - /** - * Close TCP sockets - */ - void closeTcpSockets(); + virtual bool send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen); + virtual void poll(unsigned long timeout); + virtual void whack(); + virtual void closeTcpSockets(); private: // Called by socket implementations when a packet is received @@ -133,24 +99,11 @@ private: } // Used by TcpSocket to register/unregister for write availability notification - inline void startNotifyWrite(const Socket *sock) - throw() - { - _fdSetLock.lock(); - FD_SET(sock->_sock,&_writefds); - _fdSetLock.unlock(); - } - inline void stopNotifyWrite(const Socket *sock) - throw() - { - _fdSetLock.lock(); - FD_CLR(sock->_sock,&_writefds); - _fdSetLock.unlock(); - } + void _startNotifyWrite(const NativeSocket *sock); + void _stopNotifyWrite(const NativeSocket *sock); // Called in SocketManager destructor or in constructor cleanup before exception throwing - void _closeSockets() - throw(); + void _closeSockets(); // Called in SocketManager to recompute _nfds for select() based implementation void _updateNfds(); @@ -179,9 +132,6 @@ private: std::map< InetAddress,SharedPtr<Socket> > _tcpSockets; Mutex _tcpSockets_m; - void (*_packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &); - void *_arg; - Mutex _pollLock; }; |