diff options
Diffstat (limited to 'osnet/Wire.hpp')
-rw-r--r-- | osnet/Wire.hpp | 414 |
1 files changed, 251 insertions, 163 deletions
diff --git a/osnet/Wire.hpp b/osnet/Wire.hpp index f674b5d3..6a1a2fb3 100644 --- a/osnet/Wire.hpp +++ b/osnet/Wire.hpp @@ -48,39 +48,67 @@ #include <netinet/tcp.h> #endif +#include <list> + #if defined(_WIN32) || defined(_WIN64) -#define ZT_SELECTWIRE_SOCKFD_TYPE SOCKET -#define ZT_SELECTWIRE_SOCKFD_NULL (INVALID_SOCKET) -#define ZT_SELECTWIRE_SOCKFD_VALID(s) ((s) != INVALID_SOCKET) -#define ZT_SELECTWIRE_CLOSE_SOCKET(s) ::closesocket(s) -#define ZT_SELECTWIRE_MAX_SOCKETS (FD_SETSIZE) -#define ZT_SELECTWIRE_SOCKADDR_STORAGE_TYPE struct sockaddr_storage +#define ZT_WIRE_SOCKFD_TYPE SOCKET +#define ZT_WIRE_SOCKFD_NULL (INVALID_SOCKET) +#define ZT_WIRE_SOCKFD_VALID(s) ((s) != INVALID_SOCKET) +#define ZT_WIRE_CLOSE_SOCKET(s) ::closesocket(s) +#define ZT_WIRE_MAX_SOCKETS (FD_SETSIZE) +#define ZT_WIRE_SOCKADDR_STORAGE_TYPE struct sockaddr_storage #else -#define ZT_SELECTWIRE_SOCKFD_TYPE int -#define ZT_SELECTWIRE_SOCKFD_NULL (-1) -#define ZT_SELECTWIRE_SOCKFD_VALID(s) ((s) > -1) -#define ZT_SELECTWIRE_CLOSE_SOCKET(s) ::close(s) -#define ZT_SELECTWIRE_MAX_SOCKETS (FD_SETSIZE) -#define ZT_SELECTWIRE_SOCKADDR_STORAGE_TYPE struct sockaddr_storage +#define ZT_WIRE_SOCKFD_TYPE int +#define ZT_WIRE_SOCKFD_NULL (-1) +#define ZT_WIRE_SOCKFD_VALID(s) ((s) > -1) +#define ZT_WIRE_CLOSE_SOCKET(s) ::close(s) +#define ZT_WIRE_MAX_SOCKETS (FD_SETSIZE) +#define ZT_WIRE_SOCKADDR_STORAGE_TYPE struct sockaddr_storage #endif namespace ZeroTier { /** - * Wire implementation using select() for *nix or Windows + * Opaque socket type + */ +typedef const void * WireSocket; + +/** + * Simple templated non-blocking sockets implementation + * + * Yes there is boost::asio and libuv, but I like small binaries and I hate + * build dependencies. * * This implementation takes four functions or function objects as template * paramters: * - * ON_DATAGRAM_FUNCTION(const void *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) - * ON_TCP_CONNECT_FUNCTION(const void *sock,void **uptr,bool success) - * ON_TCP_ACCEPT_FUNCTION(const void *sockL,const void *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) - * ON_TCP_CLOSE_FUNCTION(const void *sock,void *uptr) - * ON_TCP_DATA_FUNCTION(const void *sock,void **uptr,void *data,unsigned long len) - * ON_TCP_WRITABLE_FUNCTION(const void *sock,void **uptr) + * ON_DATAGRAM_FUNCTION(WireSocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) + * ON_TCP_CONNECT_FUNCTION(WireSocket *sock,void **uptr,bool success) + * ON_TCP_ACCEPT_FUNCTION(WireSocket *sockL,WireSocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) + * ON_TCP_CLOSE_FUNCTION(WireSocket *sock,void **uptr) + * ON_TCP_DATA_FUNCTION(WireSocket *sock,void **uptr,void *data,unsigned long len) + * ON_TCP_WRITABLE_FUNCTION(WireSocket *sock,void **uptr) * * These templates typically refer to function objects. Templates are used to - * avoid the call overhead of indirection. + * avoid the call overhead of indirection, which is surprisingly high for high + * bandwidth applications pushing a lot of packets. + * + * The 'sock' pointer above is an opaque pointer to a socket. Each socket + * has a 'uptr' user-settable/modifiable pointer associated with it, which + * can be set on bind/connect calls and is passed as a void ** to permit + * resetting at any time. The ACCEPT handler takes two sets of sock and + * uptr: sockL and uptrL for the listen socket, and sockN and uptrN for + * the new TCP connection socket that has just been created. + * + * Handlers are always called. On outgoing TCP connection, CONNECT is always + * called on either success or failure followed by DATA and/or WRITABLE as + * indicated. On socket close, handlers are called unless close() is told + * explicitly not to call handlers. It is safe to close a socket within a + * handler, and in that case close() can be told not to call handlers to + * prevent recursion. + * + * This isn't thread-safe with the exception of whack(), which is safe to + * call from another thread to abort poll(). */ template < @@ -117,36 +145,31 @@ public: _tcpDataHandler(tcpDataHandler), _tcpWritableHandler(tcpWritableHandler) { - for(unsigned lont i=0;i<ZT_SELECTWIRE_MAX_SOCKETS;++i) - _socks[i].type = ZT_WIRE_SOCKET_NULL; - FD_ZERO(&_readfds); FD_ZERO(&_writefds); FD_ZERO(&_exceptfds); #if defined(_WIN32) || defined(_WIN64) SOCKET pipes[2]; - _winPipeHack(pipes); + this->_winPipeHack(pipes); #else int pipes[2]; if (::pipe(pipes)) throw std::runtime_error("unable to create pipes for select() abort"); #endif + _nfds = (pipes[0] > pipes[1]) ? (long)pipes[0] : (long)pipes[1]; _whackReceiveSocket = pipes[0]; _whackSendSocket = pipes[1]; - _noDelay = noDelay; } ~Wire() { - for(unsigned long i=0;i<_nsocks;++i) { - if (_socks[i].type != ZT_WIRE_SOCKET_NULL) - this->close(_socks[i],true); - } - ZT_SELECTWIRE_CLOSE_SOCKET(_whackReceiveSocket); - ZT_SELECTWIRE_CLOSE_SOCKET(_whackSendSocket); + while (!_socks.empty()) + this->close((WireSocket *)&(_socks.front()),true); + ZT_WIRE_CLOSE_SOCKET(_whackReceiveSocket); + ZT_WIRE_CLOSE_SOCKET(_whackSendSocket); } /** @@ -157,90 +180,112 @@ public: #if defined(_WIN32) || defined(_WIN64) ::send(_whackSendSocket,(const char *)this,1,0); #else - ::write(_whackSendSocket,(const void *)this,1); + ::write(_whackSendSocket,(WireSocket *)this,1); #endif } /** + * @return Number of open sockets + */ + inline unsigned long count() const throw() { return _socks.size(); } + + /** + * @return Maximum number of sockets allowed + */ + inline unsigned long maxCount() const throw() { return ZT_WIRE_MAX_SOCKETS; } + + /** * Bind a UDP socket * * @param localAddress Local endpoint address and port * @param uptr Initial value of user pointer associated with this socket - * @return Socket (as opaque pointer) or NULL on failure + * @param bufferSize Desired socket receive/send buffer size -- will set as close to this as possible (0 to accept default) + * @return Socket or NULL on failure to bind */ - inline const void *udpBind(const struct sockaddr *localAddress,void *uptr) + inline WireSocket *udpBind(const struct sockaddr *localAddress,void *uptr,int bufferSize) { - ZT_SELECTWIRE_SOCKFD_TYPE s = ::socket(AF_INET6,SOCK_DGRAM,0); - if (!ZT_SELECTWIRE_SOCKFD_VALID(s)) - return (const void *)0; - - int bs = 262144; - while (bs >= 65536) { - int tmpbs = bs; - if (setsockopt(s,SOL_SOCKET,SO_RCVBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0) - break; - bs -= 16384; - } - bs = 262144; - while (bs >= 65536) { - int tmpbs = bs; - if (setsockopt(s,SOL_SOCKET,SO_SNDBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0) - break; - bs -= 16384; + if (_socks.size() >= ZT_WIRE_MAX_SOCKETS) + return (WireSocket *)0; + + ZT_WIRE_SOCKFD_TYPE s = ::socket(localAddress->sa_family,SOCK_DGRAM,0); + if (!ZT_WIRE_SOCKFD_VALID(s)) + return (WireSocket *)0; + + if (bufferSize > 0) { + int bs = bufferSize; + while (bs >= 65536) { + int tmpbs = bs; + if (setsockopt(s,SOL_SOCKET,SO_RCVBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0) + break; + bs -= 16384; + } + bs = bufferSize; + while (bs >= 65536) { + int tmpbs = bs; + if (setsockopt(s,SOL_SOCKET,SO_SNDBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0) + break; + bs -= 16384; + } } + #if defined(_WIN32) || defined(_WIN64) - BOOL f; - if (localAddress->ss_family == AF_INET6) { - f = TRUE; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f)); - f = FALSE; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,(const char *)&f,sizeof(f)); + { + BOOL f; + if (localAddress->sa_family == AF_INET6) { + f = TRUE; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f)); + f = FALSE; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,(const char *)&f,sizeof(f)); + } + f = FALSE; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f)); + f = TRUE; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(const char *)&f,sizeof(f)); } - f = FALSE; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f)); - f = TRUE; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(const char *)&f,sizeof(f)); #else // not Windows - int f; - if (localAddress->ss_family == AF_INET6) { - f = 1; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f)); + { + int f; + if (localAddress->sa_family == AF_INET6) { + f = 1; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f)); #ifdef IPV6_MTU_DISCOVER - f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_MTU_DISCOVER,&f,sizeof(f)); + f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_MTU_DISCOVER,&f,sizeof(f)); #endif - } - f = 0; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); - f = 1; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(void *)&f,sizeof(f)); + } + f = 0; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); + f = 1; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(void *)&f,sizeof(f)); #ifdef IP_DONTFRAG - f = 0; setsockopt(s,IPPROTO_IP,IP_DONTFRAG,&f,sizeof(f)); + f = 0; setsockopt(s,IPPROTO_IP,IP_DONTFRAG,&f,sizeof(f)); #endif #ifdef IP_MTU_DISCOVER - f = 0; setsockopt(s,IPPROTO_IP,IP_MTU_DISCOVER,&f,sizeof(f)); + f = 0; setsockopt(s,IPPROTO_IP,IP_MTU_DISCOVER,&f,sizeof(f)); #endif + } #endif // Windows or not - if (::bind(s,localAddress,(localAddress->ss_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) { - ZT_SELECTWIRE_CLOSE_SOCKET(s); - return (const void *)0; + if (::bind(s,localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) { + ZT_WIRE_CLOSE_SOCKET(s); + return (WireSocket *)0; } #if defined(_WIN32) || defined(_WIN64) - u_long iMode=1; - ioctlsocket(s,FIONBIO,&iMode); + { u_long iMode=1; ioctlsocket(s,FIONBIO,&iMode); } #else fcntl(s,F_SETFL,O_NONBLOCK); #endif - for(unsigned long i=0;i<ZT_SELECTWIRE_MAX_SOCKETS;++i) { - if (_socks[i].type == ZT_WIRE_SOCKET_NULL) { - if ((long)s > _nfds) - _nfds = (long)s; - FD_SET(s,&_readfds); - _socks[i].type = ZT_WIRE_SOCKET_UDP; - _socks[i].sock = s; - _socks[i].uptr = uptr; - memcpy(&(_socks[i].saddr),localAddress,sizeof(struct sockaddr_storage)); - return (const void *)&(_socks[i]); - } + try { + _socks.push_back(WireSocketImpl()); + } catch ( ... ) { + ZT_WIRE_CLOSE_SOCKET(s); + return (WireSocket *)0; } + WireSocketImpl &sws = _socks.back(); - ZT_SELECTWIRE_CLOSE_SOCKET(s); - return (const void *)0; + if ((long)s > _nfds) + _nfds = (long)s; + FD_SET(s,&_readfds); + sws.type = ZT_WIRE_SOCKET_UDP; + sws.sock = s; + sws.uptr = uptr; + memcpy(&(sws.saddr),localAddress,sizeof(struct sockaddr_storage)); + + return (WireSocket *)&sws; } /** @@ -253,23 +298,53 @@ public: * @param len Length of packet * @return True if packet appears to have been sent successfully */ - inline bool udpSend(const void *sock,const struct sockaddr *addr,unsigned int addrlen,const void *data,unsigned long len) + inline bool udpSend(WireSocket *sock,const struct sockaddr *addr,unsigned int addrlen,WireSocket *data,unsigned long len) { - WireSocket &sws = *(const_cast <WireSocket *>(reinterpret_cast<const WireSocket *>(sock))); + WireSocketImpl &sws = *(const_cast <WireSocketImpl *>(reinterpret_cast<const WireSocketImpl *>(sock))); return ((long)::sendto(sws.sock,data,len,0,addr,(socklen_t)addrlen) == (long)len); } - inline const void *tcpListen(const struct sockaddr *localAddress,void *uptr) + /** + * Bind a local listen socket to listen for new TCP connections + * + * @param localAddress Local address and port + * @param uptr Initial value of uptr for new socket + * @return Socket or NULL on failure to bind + */ + inline WireSocket *tcpListen(const struct sockaddr *localAddress,void *uptr) { + if (_socks.size() >= ZT_WIRE_MAX_SOCKETS) + return (WireSocket *)0; } - inline const void *tcpConnect(const struct sockaddr *remoteAddress,void *uptr) + /** + * Start a non-blocking connect; CONNECT handler is called on success or failure + * + * Note that if NULL is returned here, the handler is not called. Such + * a return would indicate failure to allocate the socket, too many + * open sockets, etc. + * + * @param remoteAddress Remote address + * @param uptr Initial value of uptr for new socket + * @return New socket or NULL on failure + */ + inline WireSocket *tcpConnect(const struct sockaddr *remoteAddress,void *uptr) { + if (_socks.size() >= ZT_WIRE_MAX_SOCKETS) + return (WireSocket *)0; } - inline unsigned long tcpSend(const void *sock,const void *data,unsigned long len) + /** + * Attempt to send data to a TCP connection (non-blocking) + * + * @param sock An open TCP socket (other socket types will fail) + * @param data Data to send + * @param len Length of data + * @return Number of bytes actually sent or 0 on failure + */ + inline unsigned long tcpSend(WireSocket *sock,WireSocket *data,unsigned long len) { - WireSocket &sws = *(const_cast <WireSocket *>(reinterpret_cast<const WireSocket *>(sock))); + WireSocketImpl &sws = *(const_cast <WireSocketImpl *>(reinterpret_cast<const WireSocketImpl *>(sock))); long n = ::send(sws.sock,data,len,0); return ((n > 0) ? (unsigned long)n : 0); } @@ -277,12 +352,16 @@ public: /** * Set whether we want to be notified via the TCP writability handler when a socket is writable * + * Call whack() if this is being done from another thread and you want + * it to take effect immediately. Otherwise it is only guaranteed to + * take effect on the next poll(). + * * @param sock TCP connection socket (other types are not valid) * @param notifyWritable Want writable notifications? */ - inline const void tcpSetNotifyWritable(const void *sock,bool notifyWritable) + inline const void tcpSetNotifyWritable(WireSocket *sock,bool notifyWritable) { - WireSocket &sws = *(const_cast <WireSocket *>(reinterpret_cast<const WireSocket *>(sock))); + WireSocketImpl &sws = *(const_cast <WireSocketImpl *>(reinterpret_cast<const WireSocketImpl *>(sock))); if (notifyWritable) { FD_SET(sws.sock,&_writefds); } else { @@ -290,6 +369,15 @@ public: } } + /** + * Wait for activity and handle one or more events + * + * Note that this is not guaranteed to wait up to 'timeout' even + * if nothing happens, as whack() or other events such as signals + * may cause premature termination. + * + * @param timeout Timeout in milliseconds or 0 for none (forever) + */ inline void poll(unsigned long timeout) { char buf[131072]; @@ -314,89 +402,83 @@ public: #endif } - for(unsigned long i=0;i<ZT_SELECTWIRE_MAX_SOCKETS;++i) { - switch (_socks[i].type) { + for(std::list<WireSocketImpl>::iterator s(_socks.begin());s!=_socks.end();++s) { + switch (s->type) { case ZT_WIRE_SOCKET_TCP_OUT_PENDING: - if (FD_ISSET(_socks[i].sock,&efds)) - this->close((const void *)&(_socks[i]),true); - else if (FD_ISSET(_socks[i].sock,&wfds)) { + if (FD_ISSET(s->sock,&efds)) + this->close((WireSocket *)&(_socks[i]),true); + else if (FD_ISSET(s->sock,&wfds)) { socklen_t slen = sizeof(ss); - if (::getpeername(_socks[i].sock,(strut sockaddr *)&ss,&slen) != 0) - this->close((const void *)&(_socks[i]),true); + if (::getpeername(s->sock,(strut sockaddr *)&ss,&slen) != 0) + this->close((WireSocket *)&(_socks[i]),true); else { - _socks[i].type = ZT_WIRE_SOCKET_TCP_OUT_CONNECTED; - FD_SET(_socks[i].sock,&_readfds); - FD_CLR(_socks[i].sock,&_writefds); - FD_CLR(_socks[i].sock,&_exceptfds); + s->type = ZT_WIRE_SOCKET_TCP_OUT_CONNECTED; + FD_SET(s->sock,&_readfds); + FD_CLR(s->sock,&_writefds); + FD_CLR(s->sock,&_exceptfds); try { - _tcpConnectHandler((const void *)&(_socks[i]),&(_socks[i].uptr),true); + _tcpConnectHandler((WireSocket *)&(_socks[i]),&(s->uptr),true); } catch ( ... ) {} } } break; case ZT_WIRE_SOCKET_TCP_OUT_CONNECTED: case ZT_WIRE_SOCKET_TCP_IN: - if (FD_ISSET(_socks[i].sock,&rfds)) { - long n = (long)::recv(_socks[i].sock,buf,sizeof(buf),0); + if (FD_ISSET(s->sock,&rfds)) { + long n = (long)::recv(s->sock,buf,sizeof(buf),0); if (n <= 0) { - this->close((const void *)&(_socks[i]),true); + this->close((WireSocket *)&(_socks[i]),true); } else { try { - _tcpDataHandler((const void *)&(_socks[i]),&(_socks[i].uptr),(void *)buf,(unsigned long)n); + _tcpDataHandler((WireSocket *)&(_socks[i]),&(s->uptr),(void *)buf,(unsigned long)n); } catch ( ... ) {} } } - if ((FD_ISSET(_socks[i].sock,&wfds))&&(FD_ISSET(_socks[i].sock,&_writefds))) { + if ((FD_ISSET(s->sock,&wfds))&&(FD_ISSET(s->sock,&_writefds))) { try { - _tcpWritableHandler((const void *)&(_socks[i]),&(_socks[i].uptr)); + _tcpWritableHandler((WireSocket *)&(_socks[i]),&(s->uptr)); } catch ( ... ) {} } break; case ZT_WIRE_SOCKET_TCP_LISTEN: - if (FD_ISSET(_socks[i].sock,&rfds)) { + if (FD_ISSET(s->sock,&rfds)) { memset(&ss,0,sizeof(ss)); socklen_t slen = sizeof(ss); - ZT_SELECTWIRE_SOCKFD_TYPE s = ::accept(_socks[i].sock,(struct sockaddr *)&ss,&slen); - if (ZT_SELECTWIRE_SOCKFD_VALID(s)) { + ZT_WIRE_SOCKFD_TYPE newSock = ::accept(s->sock,(struct sockaddr *)&ss,&slen); + if (ZT_WIRE_SOCKFD_VALID(newSock)) { + if (_socks.size() >= ZT_WIRE_MAX_SOCKETS) { + ZT_WIRE_CLOSE_SOCKET(newSock); + } else { #if defined(_WIN32) || defined(_WIN64) - if (_noDelay) { BOOL f = TRUE; setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } - u_long iMode=1; - ioctlsocket(s,FIONBIO,&iMode); + { BOOL f = (_noDelay ? TRUE : FALSE); setsockopt(newSock,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } + { u_long iMode=1; ioctlsocket(newSock,FIONBIO,&iMode); } #else - if (_noDelay) { int f = 1; setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } - fcntl(s,F_SETFL,O_NONBLOCK); + { int f = (_noDelay ? 1 : 0); setsockopt(newSock,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } + fcntl(newSock,F_SETFL,O_NONBLOCK); #endif - bool haveSlot = false; - for(unsigned long k=0;k<ZT_SELECTWIRE_MAX_SOCKETS;++k) { - if (_socks[k].type == ZT_WIRE_SOCKET_NULL) { - FD_SET(s,&_readfds); - haveSlot = true; - if ((long)s > _nfds) - _nfds = (long)s; - FD_SET(s,&_readfds); - _socks[k].type = ZT_WIRE_SOCKET_UDP; - _socks[k].sock = s; - _socks[k].uptr = (void *)0; - memcpy(&(_socks[k].saddr),&ss,sizeof(struct sockaddr_storage)); - try { - _tcpAcceptHandler((const void *)&(_socks[i]),(const void *)&(_socks[k]),&(_socks[i].uptr),&(_socks[k].uptr),(const struct sockaddr *)&(_socks[k].saddr)); - } catch ( ... ) {} - break; - } - } - if (!haveSlot) - ZT_SELECTWIRE_CLOSE_SOCKET(s); + _socks.push_back(WireSocketImpl()); + WireSocketImpl &sws = _socks.back(); + FD_SET(newSock,&_readfds); + if ((long)newSock > _nfds) + _nfds = (long)newSock; + sws.type = ZT_WIRE_SOCKET_UDP; + sws.sock = s; + sws.uptr = (void *)0; + memcpy(&(sws.saddr),&ss,sizeof(struct sockaddr_storage)); + try { + _tcpAcceptHandler((WireSocket *)&(*s),(WireSocket *)&(_socks.back()),&(s->uptr),&(sws.uptr),(const struct sockaddr *)&(sws.saddr)); + } catch ( ... ) {} } } break; case ZT_WIRE_SOCKET_UDP: - if (FD_ISSET(_socks[i].sock,&rfds)) { + if (FD_ISSET(s->sock,&rfds)) { memset(&ss,0,sizeof(ss)); socklen_t slen = sizeof(ss); - long n = (long)::recvfrom(_socks[i].sock,buf,sizeof(buf),0,(struct sockaddr *)&ss,&slen); + long n = (long)::recvfrom(s->sock,buf,sizeof(buf),0,(struct sockaddr *)&ss,&slen); if (n > 0) { try { - _dgHandler((const void *)&(_socks[i]),&(_socks[i].uptr),(const struct sockaddr *)&ss,(void *)buf,(unsigned long)n); + _dgHandler((WireSocket *)&(_socks[i]),&(s->uptr),(const struct sockaddr *)&ss,(void *)buf,(unsigned long)n); } catch ( ... ) {} } } @@ -407,24 +489,23 @@ public: } } - inline void close(const void *sock,bool callHandlers) + inline void close(WireSocket *sock,bool callHandlers) { - WireSocket &sws = *(const_cast <WireSocket *>(reinterpret_cast<const WireSocket *>(sock))); - - const WireSocketType oldType = sws.type; - sws.type = ZT_WIRE_SOCKET_NULL; + if (!sock) + return; + WireSocketImpl &sws = *(const_cast <WireSocketImpl *>(reinterpret_cast<const WireSocketImpl *>(sock))); FD_CLR(sws.sock,&_readfds); FD_CLR(sws.sock,&_writefds); FD_CLR(sws.sock,&_exceptfds); - ZT_SELECTWIRE_CLOSE_SOCKET(sws.sock); + ZT_WIRE_CLOSE_SOCKET(sws.sock); - switch(oldType) { + switch(sws.type) { case ZT_WIRE_SOCKET_TCP_OUT_PENDING: if (callHandlers) { try { - _tcpConnectHandler((const void *)&sws,&(sws.uptr),false); + _tcpConnectHandler(sock,&(sws.uptr),false); } catch ( ... ) {} } break; @@ -432,7 +513,7 @@ public: case ZT_WIRE_SOCKET_TCP_IN: if (callHandlers) { try { - _tcpCloseHandler((const void *)&sws,sws.uptr); + _tcpCloseHandler(sock,&(sws.uptr)); } catch ( ... ) {} } break; @@ -442,12 +523,21 @@ public: if ((long)sws.sock >= _nfds) { long nfds = (long)_whackSendSocket; - for(unsigned long i=0;i<ZT_SELECTWIRE_MAX_SOCKETS;++i) { - if ((_socks[i].type != ZT_WIRE_SOCKET_NULL)&&((long)_socks[i].sock > nfds)) - nfds = (long)_socks[i].sock; + if ((long)_whackReceiveSocket > nfds) + nfds = (long)_whackReceiveSocket; + for(std::list<WireSocketImpl>::iterator s(_socks.begin());s!=_socks.end();++s) { + if ((long)s->sock > nfds) + nfds = (long)s->sock; } _nfds = nfds; } + + for(std::list<WireSocketImpl>::iterator s(_socks.begin());s!=_socks.end();++s) { + if (&(*s) == sock) { + _socks.erase(s); + break; + } + } } private: @@ -487,19 +577,18 @@ private: ZT_WIRE_SOCKET_TCP_IN = 0x02, ZT_WIRE_SOCKET_TCP_LISTEN = 0x03, // isTCP() == ((type & 0x03) != 0) ZT_WIRE_SOCKET_RAW = 0x04, - ZT_WIRE_SOCKET_UDP = 0x05, - ZT_WIRE_SOCKET_NULL = 0x06 + ZT_WIRE_SOCKET_UDP = 0x05 }; - struct WireSocket + struct WireSocketImpl { WireSocketType type; - ZT_SELECTWIRE_SOCKFD_TYPE sock; + ZT_WIRE_SOCKFD_TYPE sock; void *uptr; // user-settable pointer - ZT_SELECTWIRE_SOCKADDR_STORAGE_TYPE saddr; // from address for TCP_IN, local address otherwise + ZT_WIRE_SOCKADDR_STORAGE_TYPE saddr; // from address for TCP_IN, local address otherwise }; - inline bool _isTCP(const WireSocket &sws) const throw() { return ((((unsigned int)sws.type) & 0x03) != 0); } + inline bool _isTCP(const WireSocketImpl &sws) const throw() { return ((((unsigned int)sws.type) & 0x03) != 0); } ON_DATAGRAM_FUNCTION _dgHandler; ON_TCP_CONNECT_FUNCTION _tcpConnectHandler; @@ -508,13 +597,12 @@ private: ON_TCP_DATA_FUNCTION _tcpDataHandler; ON_TCP_WRITABLE_FUNCTION _tcpWritableHandler; - WireSocket _socks[ZT_SELECTWIRE_MAX_SOCKETS]; - + std::list<WireSocketImpl> _socks; fd_set _readfds,_writefds,_exceptfds; long _nfds; - ZT_SELECTWIRE_SOCKFD_TYPE _whackReceiveSocket; - ZT_SELECTWIRE_SOCKFD_TYPE _whackSendSocket; + ZT_WIRE_SOCKFD_TYPE _whackReceiveSocket; + ZT_WIRE_SOCKFD_TYPE _whackSendSocket; bool _noDelay; }; |