diff options
Diffstat (limited to 'osdep/Phy.hpp')
-rw-r--r-- | osdep/Phy.hpp | 358 |
1 files changed, 337 insertions, 21 deletions
diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp index 2ea68b9d..1ba6c40b 100644 --- a/osdep/Phy.hpp +++ b/osdep/Phy.hpp @@ -46,6 +46,7 @@ #define ZT_PHY_SOCKFD_VALID(s) ((s) != INVALID_SOCKET) #define ZT_PHY_CLOSE_SOCKET(s) ::closesocket(s) #define ZT_PHY_MAX_SOCKETS (FD_SETSIZE) +#define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS #define ZT_PHY_SOCKADDR_STORAGE_TYPE struct sockaddr_storage #else // not Windows @@ -58,15 +59,23 @@ #include <sys/types.h> #include <sys/select.h> #include <sys/socket.h> +#include <sys/un.h> #include <arpa/inet.h> #include <netinet/in.h> #include <netinet/tcp.h> +#if defined(__linux__) || defined(linux) || defined(__LINUX__) || defined(__linux) +#ifndef IPV6_DONTFRAG +#define IPV6_DONTFRAG 62 +#endif +#endif + #define ZT_PHY_SOCKFD_TYPE int #define ZT_PHY_SOCKFD_NULL (-1) #define ZT_PHY_SOCKFD_VALID(s) ((s) > -1) #define ZT_PHY_CLOSE_SOCKET(s) ::close(s) #define ZT_PHY_MAX_SOCKETS (FD_SETSIZE) +#define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS #define ZT_PHY_SOCKADDR_STORAGE_TYPE struct sockaddr_storage #endif // Windows or not @@ -87,12 +96,22 @@ typedef void PhySocket; * This class is templated on a pointer to a handler class which must * implement the following functions: * + * For all platforms: + * * phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) * phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) * phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) * phyOnTcpClose(PhySocket *sock,void **uptr) * phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) * phyOnTcpWritable(PhySocket *sock,void **uptr) + * phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) + * + * On Linux/OSX/Unix only (not required/used on Windows or elsewhere): + * + * phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) + * phyOnUnixClose(PhySocket *sock,void **uptr) + * phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) + * phyOnUnixWritable(PhySocket *sock,void **uptr) * * These templates typically refer to function objects. Templates are used to * avoid the call overhead of indirection, which is surprisingly high for high @@ -128,8 +147,10 @@ private: ZT_PHY_SOCKET_TCP_OUT_CONNECTED = 0x02, ZT_PHY_SOCKET_TCP_IN = 0x03, ZT_PHY_SOCKET_TCP_LISTEN = 0x04, - ZT_PHY_SOCKET_RAW = 0x05, - ZT_PHY_SOCKET_UDP = 0x06 + ZT_PHY_SOCKET_UDP = 0x05, + ZT_PHY_SOCKET_FD = 0x06, + ZT_PHY_SOCKET_UNIX_IN = 0x07, + ZT_PHY_SOCKET_UNIX_LISTEN = 0x08 }; struct PhySocketImpl @@ -218,7 +239,16 @@ public: } /** + * @param s Socket object + * @return Underlying OS-type (usually int or long) file descriptor associated with object + */ + static inline ZT_PHY_SOCKFD_TYPE getDescriptor(PhySocket *s) throw() { return reinterpret_cast<PhySocketImpl *>(s)->sock; } + + /** * Cause poll() to stop waiting immediately + * + * This can be used to reset the polling loop after changes that require + * attention, or to shut down a background thread that is waiting, etc. */ inline void whack() { @@ -240,6 +270,48 @@ public: inline unsigned long maxCount() const throw() { return ZT_PHY_MAX_SOCKETS; } /** + * Wrap a raw file descriptor in a PhySocket structure + * + * This can be used to select/poll on a raw file descriptor as part of this + * class's I/O loop. By default the fd is set for read notification but + * this can be controlled with setNotifyReadable(). When any detected + * condition is present, the phyOnFileDescriptorActivity() callback is + * called with one or both of its arguments 'true'. + * + * The Phy<>::close() method *must* be called when you're done with this + * file descriptor to remove it from the select/poll set, but unlike other + * types of sockets Phy<> does not actually close the underlying fd or + * otherwise manage its life cycle. There is also no close notification + * callback for this fd, since Phy<> doesn't actually perform reading or + * writing or detect error conditions. This is only useful for adding a + * file descriptor to Phy<> to select/poll on it. + * + * @param fd Raw file descriptor + * @param uptr User pointer to supply to callbacks + * @return PhySocket wrapping fd or NULL on failure (out of memory or too many sockets) + */ + inline PhySocket *wrapSocket(ZT_PHY_SOCKFD_TYPE fd,void *uptr = (void *)0) + { + if (_socks.size() >= ZT_PHY_MAX_SOCKETS) + return (PhySocket *)0; + try { + _socks.push_back(PhySocketImpl()); + } catch ( ... ) { + return (PhySocket *)0; + } + PhySocketImpl &sws = _socks.back(); + if ((long)fd > _nfds) + _nfds = (long)fd; + FD_SET(fd,&_readfds); + sws.type = ZT_PHY_SOCKET_FD; + sws.sock = fd; + sws.uptr = uptr; + memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); + // no sockaddr for this socket type, leave saddr null + return (PhySocket *)&sws; + } + + /** * Bind a UDP socket * * @param localAddress Local endpoint address and port @@ -291,6 +363,9 @@ public: #ifdef IPV6_MTU_DISCOVER f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_MTU_DISCOVER,&f,sizeof(f)); #endif +#ifdef IPV6_DONTFRAG + f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,&f,sizeof(f)); +#endif } f = 0; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); f = 1; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(void *)&f,sizeof(f)); @@ -340,6 +415,24 @@ public: } /** + * Set the IP TTL for the next outgoing packet (for IPv4 UDP sockets only) + * + * @param ttl New TTL (0 or >255 will set it to 255) + * @return True on success + */ + inline bool setIp4UdpTtl(PhySocket *sock,unsigned int ttl) + { + PhySocketImpl &sws = *(reinterpret_cast<PhySocketImpl *>(sock)); +#if defined(_WIN32) || defined(_WIN64) + DWORD tmp = ((ttl == 0)||(ttl > 255)) ? 255 : (DWORD)ttl; + return (::setsockopt(sws.sock,IPPROTO_IP,IP_TTL,(const char *)&tmp,sizeof(tmp)) == 0); +#else + int tmp = ((ttl == 0)||(ttl > 255)) ? 255 : (int)ttl; + return (::setsockopt(sws.sock,IPPROTO_IP,IP_TTL,(void *)&tmp,sizeof(tmp)) == 0); +#endif + } + + /** * Send a UDP packet * * @param sock UDP socket @@ -358,6 +451,64 @@ public: #endif } +#ifdef __UNIX_LIKE__ + /** + * Listen for connections on a Unix domain socket + * + * @param path Path to Unix domain socket + * @param uptr Arbitrary pointer to associate + * @return PhySocket or NULL if cannot bind + */ + inline PhySocket *unixListen(const char *path,void *uptr = (void *)0) + { + struct sockaddr_un sun; + + if (_socks.size() >= ZT_PHY_MAX_SOCKETS) + return (PhySocket *)0; + + memset(&sun,0,sizeof(sun)); + sun.sun_family = AF_UNIX; + if (strlen(path) >= sizeof(sun.sun_path)) + return (PhySocket *)0; + strcpy(sun.sun_path,path); + + ZT_PHY_SOCKFD_TYPE s = ::socket(PF_UNIX,SOCK_STREAM,0); + if (!ZT_PHY_SOCKFD_VALID(s)) + return (PhySocket *)0; + + ::fcntl(s,F_SETFL,O_NONBLOCK); + + ::unlink(path); + if (::bind(s,(struct sockaddr *)&sun,sizeof(struct sockaddr_un)) != 0) { + ZT_PHY_CLOSE_SOCKET(s); + return (PhySocket *)0; + } + if (::listen(s,128) != 0) { + ZT_PHY_CLOSE_SOCKET(s); + return (PhySocket *)0; + } + + try { + _socks.push_back(PhySocketImpl()); + } catch ( ... ) { + ZT_PHY_CLOSE_SOCKET(s); + return (PhySocket *)0; + } + PhySocketImpl &sws = _socks.back(); + + if ((long)s > _nfds) + _nfds = (long)s; + FD_SET(s,&_readfds); + sws.type = ZT_PHY_SOCKET_UNIX_LISTEN; + sws.sock = s; + sws.uptr = uptr; + memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); + memcpy(&(sws.saddr),&sun,sizeof(struct sockaddr_un)); + + return (PhySocket *)&sws; + } +#endif // __UNIX_LIKE__ + /** * Bind a local listen socket to listen for new TCP connections * @@ -523,19 +674,51 @@ public: } /** - * Attempt to send data to a TCP connection (non-blocking) + * Try to set buffer sizes as close to the given value as possible + * + * This will try the specified value and then lower values in 16K increments + * until one works. + * + * @param sock Socket + * @param bufferSize Desired buffer sizes + */ + inline void setBufferSizes(const PhySocket *sock,int bufferSize) + { + PhySocketImpl &sws = *(reinterpret_cast<PhySocketImpl *>(sock)); + if (bufferSize > 0) { + int bs = bufferSize; + while (bs >= 65536) { + int tmpbs = bs; + if (::setsockopt(sws.sock,SOL_SOCKET,SO_RCVBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0) + break; + bs -= 16384; + } + bs = bufferSize; + while (bs >= 65536) { + int tmpbs = bs; + if (::setsockopt(sws.sock,SOL_SOCKET,SO_SNDBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0) + break; + bs -= 16384; + } + } + } + + /** + * Attempt to send data to a stream socket (non-blocking) * * If -1 is returned, the socket should no longer be used as it is now * destroyed. If callCloseHandler is true, the close handler will be * called before the function returns. * - * @param sock An open TCP socket (other socket types will fail) + * This can be used with TCP, Unix, or socket pair sockets. + * + * @param sock An open stream socket (other socket types will fail) * @param data Data to send * @param len Length of data * @param callCloseHandler If true, call close handler on socket closing failure condition (default: true) * @return Number of bytes actually sent or -1 on fatal error (socket closure) */ - inline long tcpSend(PhySocket *sock,const void *data,unsigned long len,bool callCloseHandler = true) + inline long streamSend(PhySocket *sock,const void *data,unsigned long len,bool callCloseHandler = true) { PhySocketImpl &sws = *(reinterpret_cast<PhySocketImpl *>(sock)); #if defined(_WIN32) || defined(_WIN64) @@ -573,17 +756,58 @@ public: return n; } +#ifdef __UNIX_LIKE__ /** - * Set whether we want to be notified via the TCP writability handler when a socket is writable + * Attempt to send data to a Unix domain socket connection (non-blocking) + * + * If -1 is returned, the socket should no longer be used as it is now + * destroyed. If callCloseHandler is true, the close handler will be + * called before the function returns. + * + * @param sock An open Unix socket (other socket types will fail) + * @param data Data to send + * @param len Length of data + * @param callCloseHandler If true, call close handler on socket closing failure condition (default: true) + * @return Number of bytes actually sent or -1 on fatal error (socket closure) + */ + inline long unixSend(PhySocket *sock,const void *data,unsigned long len,bool callCloseHandler = true) + { + PhySocketImpl &sws = *(reinterpret_cast<PhySocketImpl *>(sock)); + long n = (long)::write(sws.sock,data,len); + if (n < 0) { + switch(errno) { +#ifdef EAGAIN + case EAGAIN: +#endif +#if defined(EWOULDBLOCK) && ( !defined(EAGAIN) || (EWOULDBLOCK != EAGAIN) ) + case EWOULDBLOCK: +#endif +#ifdef EINTR + case EINTR: +#endif + return 0; + default: + this->close(sock,callCloseHandler); + return -1; + } + } + return n; + } +#endif // __UNIX_LIKE__ + + /** + * For streams, sets whether we want to be notified that the socket is writable + * + * This can be used with TCP, Unix, or socket pair sockets. * * Call whack() if this is being done from another thread and you want * it to take effect immediately. Otherwise it is only guaranteed to * take effect on the next poll(). * - * @param sock TCP connection socket (other types are not valid) + * @param sock Stream connection socket * @param notifyWritable Want writable notifications? */ - inline const void tcpSetNotifyWritable(PhySocket *sock,bool notifyWritable) + inline const void setNotifyWritable(PhySocket *sock,bool notifyWritable) { PhySocketImpl &sws = *(reinterpret_cast<PhySocketImpl *>(sock)); if (notifyWritable) { @@ -594,6 +818,26 @@ public: } /** + * Set whether we want to be notified that a socket is readable + * + * This is primarily for raw sockets added with wrapSocket(). It could be + * used with others, but doing so would essentially lock them and prevent + * data from being read from them until this is set to 'true' again. + * + * @param sock Socket to modify + * @param notifyReadable True if socket should be monitored for readability + */ + inline const void setNotifyReadable(PhySocket *sock,bool notifyReadable) + { + PhySocketImpl &sws = *(reinterpret_cast<PhySocketImpl *>(sock)); + if (notifyReadable) { + FD_SET(sws.sock,&_readfds); + } else { + FD_CLR(sws.sock,&_readfds); + } + } + + /** * Wait for activity and handle one or more events * * Note that this is not guaranteed to wait up to 'timeout' even @@ -727,6 +971,67 @@ public: } break; + case ZT_PHY_SOCKET_UNIX_IN: { +#ifdef __UNIX_LIKE__ + ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable + if (FD_ISSET(sock,&rfds)) { + long n = (long)::read(sock,buf,sizeof(buf)); + if (n <= 0) { + this->close((PhySocket *)&(*s),true); + } else { + try { + _handler->phyOnUnixData((PhySocket *)&(*s),&(s->uptr),(void *)buf,(unsigned long)n); + } catch ( ... ) {} + } + } + if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) { + try { + //_handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr)); + } catch ( ... ) {} + } +#endif // __UNIX_LIKE__ + } break; + + case ZT_PHY_SOCKET_UNIX_LISTEN: +#ifdef __UNIX_LIKE__ + if (FD_ISSET(s->sock,&rfds)) { + memset(&ss,0,sizeof(ss)); + socklen_t slen = sizeof(ss); + ZT_PHY_SOCKFD_TYPE newSock = ::accept(s->sock,(struct sockaddr *)&ss,&slen); + if (ZT_PHY_SOCKFD_VALID(newSock)) { + if (_socks.size() >= ZT_PHY_MAX_SOCKETS) { + ZT_PHY_CLOSE_SOCKET(newSock); + } else { + fcntl(newSock,F_SETFL,O_NONBLOCK); + _socks.push_back(PhySocketImpl()); + PhySocketImpl &sws = _socks.back(); + FD_SET(newSock,&_readfds); + if ((long)newSock > _nfds) + _nfds = (long)newSock; + sws.type = ZT_PHY_SOCKET_UNIX_IN; + sws.sock = newSock; + sws.uptr = (void *)0; + memcpy(&(sws.saddr),&ss,sizeof(struct sockaddr_storage)); + try { + _handler->phyOnUnixAccept((PhySocket *)&(*s),(PhySocket *)&(_socks.back()),&(s->uptr),&(sws.uptr)); + } catch ( ... ) {} + } + } + } +#endif // __UNIX_LIKE__ + break; + + case ZT_PHY_SOCKET_FD: { + ZT_PHY_SOCKFD_TYPE sock = s->sock; + const bool readable = ((FD_ISSET(sock,&rfds))&&(FD_ISSET(sock,&_readfds))); + const bool writable = ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))); + if ((readable)||(writable)) { + try { + _handler->phyOnFileDescriptorActivity((PhySocket *)&(*s),&(s->uptr),readable,writable); + } catch ( ... ) {} + } + } break; + default: break; @@ -756,26 +1061,37 @@ public: FD_CLR(sws.sock,&_exceptfds); #endif - ZT_PHY_CLOSE_SOCKET(sws.sock); + if (sws.type != ZT_PHY_SOCKET_FD) + ZT_PHY_CLOSE_SOCKET(sws.sock); + +#ifdef __UNIX_LIKE__ + if (sws.type == ZT_PHY_SOCKET_UNIX_LISTEN) + ::unlink(((struct sockaddr_un *)(&(sws.saddr)))->sun_path); +#endif // __UNIX_LIKE__ - switch(sws.type) { - case ZT_PHY_SOCKET_TCP_OUT_PENDING: - if (callHandlers) { + if (callHandlers) { + switch(sws.type) { + case ZT_PHY_SOCKET_TCP_OUT_PENDING: try { _handler->phyOnTcpConnect(sock,&(sws.uptr),false); } catch ( ... ) {} - } - break; - case ZT_PHY_SOCKET_TCP_OUT_CONNECTED: - case ZT_PHY_SOCKET_TCP_IN: - if (callHandlers) { + break; + case ZT_PHY_SOCKET_TCP_OUT_CONNECTED: + case ZT_PHY_SOCKET_TCP_IN: try { _handler->phyOnTcpClose(sock,&(sws.uptr)); } catch ( ... ) {} - } - break; - default: - break; + break; + case ZT_PHY_SOCKET_UNIX_IN: +#ifdef __UNIX_LIKE__ + try { + _handler->phyOnUnixClose(sock,&(sws.uptr)); + } catch ( ... ) {} +#endif // __UNIX_LIKE__ + break; + default: + break; + } } // Causes entry to be deleted from list in poll(), ignored elsewhere |