From 3947807b1ffc844f62eeec7dd0fe552d280fe807 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 27 Aug 2015 15:36:13 -0700 Subject: A simple and fast Hashtable, tested but not yet integrated with anything. --- selftest.cpp | 116 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) (limited to 'selftest.cpp') diff --git a/selftest.cpp b/selftest.cpp index 714964cb..8af3fc0b 100644 --- a/selftest.cpp +++ b/selftest.cpp @@ -36,6 +36,7 @@ #include #include "node/Constants.hpp" +#include "node/Hashtable.hpp" #include "node/RuntimeEnvironment.hpp" #include "node/InetAddress.hpp" #include "node/Utils.hpp" @@ -578,6 +579,119 @@ static int testPacket() static int testOther() { + std::cout << "[other] Testing Hashtable... "; std::cout.flush(); + { + Hashtable ht(128); + std::map ref; // assume std::map works correctly :) + for(int x=0;x<2;++x) { + for(int i=0;i<25000;++i) { + uint64_t k = rand(); + while ((k == 0)||(ref.count(k) > 0)) + ++k; + std::string v; + for(int j=0;j<(int)(k % 64);++j) + v.push_back("0123456789"[rand() % 10]); + ht.set(k,v); + ref[k] = v; + } + if (ht.size() != ref.size()) { + std::cout << "FAILED! (size mismatch)" << std::endl; + return -1; + } + for(std::map::iterator i(ref.begin());i!=ref.end();++i) { + std::string *v = ht.get(i->first); + if (!v) { + std::cout << "FAILED! (key not found)" << std::endl; + return -1; + } + if (*v != i->second) { + std::cout << "FAILED! (key not equal)" << std::endl; + return -1; + } + } + { + uint64_t *k; + std::string *v; + Hashtable::Iterator i(ht); + unsigned long ic = 0; + while (i.next(k,v)) { + if (ref[*k] != *v) { + std::cout << "FAILED! (iterate)" << std::endl; + return -1; + } + ++ic; + } + if (ic != ht.size()) { + std::cout << "FAILED! (iterate coverage)" << std::endl; + return -1; + } + } + for(std::map::iterator i(ref.begin());i!=ref.end();) { + if (!ht.get(i->first)) { + std::cout << "FAILED! (erase, check if exists)" << std::endl; + return -1; + } + ht.erase(i->first); + if (ht.get(i->first)) { + std::cout << "FAILED! (erase, check if erased)" << std::endl; + return -1; + } + ref.erase(i++); + if (ht.size() != ref.size()) { + std::cout << "FAILED! (erase, size)" << std::endl; + return -1; + } + } + if (!ht.empty()) { + std::cout << "FAILED! (erase, empty)" << std::endl; + return -1; + } + for(int i=0;i<10000;++i) { + uint64_t k = rand(); + while ((k == 0)||(ref.count(k) > 0)) + ++k; + std::string v; + for(int j=0;j<(int)(k % 64);++j) + v.push_back("0123456789"[rand() % 10]); + ht.set(k,v); + ref[k] = v; + } + if (ht.size() != ref.size()) { + std::cout << "FAILED! (second populate)" << std::endl; + return -1; + } + ht.clear(); + ref.clear(); + if (ht.size() != ref.size()) { + std::cout << "FAILED! (clear)" << std::endl; + return -1; + } + for(int i=0;i<10000;++i) { + uint64_t k = rand(); + while ((k == 0)||(ref.count(k) > 0)) + ++k; + std::string v; + for(int j=0;j<(int)(k % 64);++j) + v.push_back("0123456789"[rand() % 10]); + ht.set(k,v); + ref[k] = v; + } + { + Hashtable::Iterator i(ht); + uint64_t *k; + std::string *v; + while (i.next(k,v)) + ht.erase(*k); + } + ref.clear(); + if (ht.size() != ref.size()) { + std::cout << "FAILED! (clear by iterate, " << ht.size() << ")" << std::endl; + return -1; + } + } + } + std::cout << "PASS" << std::endl; + std::cout << "[other] Testing hex encode/decode... "; std::cout.flush(); for(unsigned int k=0;k<1000;++k) { unsigned int flen = (rand() % 8194) + 1; @@ -909,9 +1023,9 @@ int main(int argc,char **argv) srand((unsigned int)time(0)); r |= testSqliteNetworkController(); + r |= testOther(); r |= testCrypto(); r |= testPacket(); - r |= testOther(); r |= testIdentity(); r |= testCertificate(); r |= testPhy(); -- cgit v1.2.3 From 4838cbc350a7608ebe345a821ef32bb01a8aeca7 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 2 Sep 2015 14:32:01 -0700 Subject: Unix domain sockets in Phy<> --- osdep/Phy.hpp | 187 +++++++++++++++++++++++++++++++++++++++++++++---- selftest.cpp | 7 ++ service/OneService.cpp | 5 ++ 3 files changed, 186 insertions(+), 13 deletions(-) (limited to 'selftest.cpp') diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp index 2ea68b9d..eca4109f 100644 --- a/osdep/Phy.hpp +++ b/osdep/Phy.hpp @@ -93,6 +93,10 @@ typedef void PhySocket; * phyOnTcpClose(PhySocket *sock,void **uptr) * phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) * phyOnTcpWritable(PhySocket *sock,void **uptr) + * phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) + * phyOnUnixClose(PhySocket *sock,void **uptr) + * phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) + * phyOnUnixWritable(PhySocket *sock,void **uptr) * * These templates typically refer to function objects. Templates are used to * avoid the call overhead of indirection, which is surprisingly high for high @@ -105,6 +109,9 @@ typedef void PhySocket; * uptr: sockL and uptrL for the listen socket, and sockN and uptrN for * the new TCP connection socket that has just been created. * + * Note that phyOnUnix*() are only required and will only be used on systems + * that support Unix domain sockets. + * * Handlers are always called. On outgoing TCP connection, CONNECT is always * called on either success or failure followed by DATA and/or WRITABLE as * indicated. On socket close, handlers are called unless close() is told @@ -129,7 +136,9 @@ private: ZT_PHY_SOCKET_TCP_IN = 0x03, ZT_PHY_SOCKET_TCP_LISTEN = 0x04, ZT_PHY_SOCKET_RAW = 0x05, - ZT_PHY_SOCKET_UDP = 0x06 + ZT_PHY_SOCKET_UDP = 0x06, + ZT_PHY_SOCKET_UNIX_IN = 0x07, + ZT_PHY_SOCKET_UNIX_LISTEN = 0x08 }; struct PhySocketImpl @@ -358,6 +367,64 @@ public: #endif } +#ifdef __UNIX_LIKE__ + /** + * Listen for connections on a Unix domain socket + * + * @param path Path to Unix domain socket + * @param uptr Arbitrary pointer to associate + * @return PhySocket or NULL if cannot bind + */ + inline PhySocket *unixListen(const char *path,void *uptr = (void *)0) + { + struct sockaddr_un sun; + + if (_socks.size() >= ZT_PHY_MAX_SOCKETS) + return (PhySocket *)0; + + memset(&sun,0,sizeof(sun)); + sun.sun_family = AF_UNIX; + if (strlen(path) >= sizeof(sun.sun_path)) + return (PhySocket *)0; + strcpy(sun.sun_path,path); + + ZT_PHY_SOCKFD_TYPE s = ::socket(PF_UNIX,SOCK_STREAM,0); + if (!ZT_PHY_SOCKFD_VALID(s)) + return (PhySocket *)0; + + ::fcntl(s,F_SETFL,O_NONBLOCK); + + ::unlink(path); + if (::bind(s,(struct sockaddr *)&sun,sizeof(struct sockaddr_un)) != 0) { + ZT_PHY_CLOSE_SOCKET(s); + return (PhySocket *)0; + } + if (::listen(s,128) != 0) { + ZT_PHY_CLOSE_SOCKET(s); + return (PhySocket *)0; + } + + try { + _socks.push_back(PhySocketImpl()); + } catch ( ... ) { + ZT_PHY_CLOSE_SOCKET(s); + return (PhySocket *)0; + } + PhySocketImpl &sws = _socks.back(); + + if ((long)s > _nfds) + _nfds = (long)s; + FD_SET(s,&_readfds); + sws.type = ZT_PHY_SOCKET_UNIX_LISTEN; + sws.sock = s; + sws.uptr = uptr; + memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); + memcpy(&(sws.saddr),&sun,sizeof(struct sockaddr_un)); + + return (PhySocket *)&sws; + } +#endif // __UNIX_LIKE__ + /** * Bind a local listen socket to listen for new TCP connections * @@ -573,6 +640,45 @@ public: return n; } +#ifdef __UNIX_LIKE__ + /** + * Attempt to send data to a Unix domain socket connection (non-blocking) + * + * If -1 is returned, the socket should no longer be used as it is now + * destroyed. If callCloseHandler is true, the close handler will be + * called before the function returns. + * + * @param sock An open Unix socket (other socket types will fail) + * @param data Data to send + * @param len Length of data + * @param callCloseHandler If true, call close handler on socket closing failure condition (default: true) + * @return Number of bytes actually sent or -1 on fatal error (socket closure) + */ + inline long unixSend(PhySocket *sock,const void *data,unsigned long len,bool callCloseHandler = true) + { + PhySocketImpl &sws = *(reinterpret_cast(sock)); + long n = (long)::write(sws.sock,data,len); + if (n < 0) { + switch(errno) { +#ifdef EAGAIN + case EAGAIN: +#endif +#if defined(EWOULDBLOCK) && ( !defined(EAGAIN) || (EWOULDBLOCK != EAGAIN) ) + case EWOULDBLOCK: +#endif +#ifdef EINTR + case EINTR: +#endif + return 0; + default: + this->close(sock,callCloseHandler); + return -1; + } + } + return n; + } +#endif // __UNIX_LIKE__ + /** * Set whether we want to be notified via the TCP writability handler when a socket is writable * @@ -727,6 +833,56 @@ public: } break; + case ZT_PHY_SOCKET_UNIX_IN: { +#ifdef __UNIX_LIKE__ + ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable + if (FD_ISSET(sock,&rfds)) { + long n = (long)::read(sock,buf,sizeof(buf)); + if (n <= 0) { + this->close((PhySocket *)&(*s),true); + } else { + try { + _handler->phyOnUnixData((PhySocket *)&(*s),&(s->uptr),(void *)buf,(unsigned long)n); + } catch ( ... ) {} + } + } + if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) { + try { + _handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr)); + } catch ( ... ) {} + } +#endif // __UNIX_LIKE__ + } break; + + case ZT_PHY_SOCKET_UNIX_LISTEN: +#ifdef __UNIX_LIKE__ + if (FD_ISSET(s->sock,&rfds)) { + memset(&ss,0,sizeof(ss)); + socklen_t slen = sizeof(ss); + ZT_PHY_SOCKFD_TYPE newSock = ::accept(s->sock,(struct sockaddr *)&ss,&slen); + if (ZT_PHY_SOCKFD_VALID(newSock)) { + if (_socks.size() >= ZT_PHY_MAX_SOCKETS) { + ZT_PHY_CLOSE_SOCKET(newSock); + } else { + fcntl(newSock,F_SETFL,O_NONBLOCK); + _socks.push_back(PhySocketImpl()); + PhySocketImpl &sws = _socks.back(); + FD_SET(newSock,&_readfds); + if ((long)newSock > _nfds) + _nfds = (long)newSock; + sws.type = ZT_PHY_SOCKET_UNIX_IN; + sws.sock = newSock; + sws.uptr = (void *)0; + memcpy(&(sws.saddr),&ss,sizeof(struct sockaddr_storage)); + try { + _handler->phyOnUnixAccept((PhySocket *)&(*s),(PhySocket *)&(_socks.back()),&(s->uptr),&(sws.uptr)); + } catch ( ... ) {} + } + } + } +#endif // __UNIX_LIKE__ + break; + default: break; @@ -758,24 +914,29 @@ public: ZT_PHY_CLOSE_SOCKET(sws.sock); - switch(sws.type) { - case ZT_PHY_SOCKET_TCP_OUT_PENDING: - if (callHandlers) { + if (callHandlers) { + switch(sws.type) { + case ZT_PHY_SOCKET_TCP_OUT_PENDING: try { _handler->phyOnTcpConnect(sock,&(sws.uptr),false); } catch ( ... ) {} - } - break; - case ZT_PHY_SOCKET_TCP_OUT_CONNECTED: - case ZT_PHY_SOCKET_TCP_IN: - if (callHandlers) { + break; + case ZT_PHY_SOCKET_TCP_OUT_CONNECTED: + case ZT_PHY_SOCKET_TCP_IN: try { _handler->phyOnTcpClose(sock,&(sws.uptr)); } catch ( ... ) {} - } - break; - default: - break; + break; + case ZT_PHY_SOCKET_UNIX_IN: +#ifdef __UNIX_LIKE__ + try { + _handler->phyOnUnixClose(sock,&(sws.uptr)); + } catch ( ... ) {} +#endif // __UNIX_LIKE__ + break; + default: + break; + } } // Causes entry to be deleted from list in poll(), ignored elsewhere diff --git a/selftest.cpp b/selftest.cpp index 8af3fc0b..15afe52e 100644 --- a/selftest.cpp +++ b/selftest.cpp @@ -791,6 +791,13 @@ struct TestPhyHandlers testPhyInstance->close(sock,true); } } + +#ifdef __UNIX_LIKE__ + inline void phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) {} + inline void phyOnUnixClose(PhySocket *sock,void **uptr) {} + inline void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} + inline void phyOnUnixWritable(PhySocket *sock,void **uptr) {} +#endif // __UNIX_LIKE__ }; static int testPhy() { diff --git a/service/OneService.cpp b/service/OneService.cpp index 670d5641..f7ea2130 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -924,6 +924,11 @@ public: } } + inline void phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) {} + inline void phyOnUnixClose(PhySocket *sock,void **uptr) {} + inline void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} + inline void phyOnUnixWritable(PhySocket *sock,void **uptr) {} + inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwc) { Mutex::Lock _l(_taps_m); -- cgit v1.2.3 From da9a720c3fc2d69e35f393fbb96a716599ac0a6f Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 3 Sep 2015 17:33:06 -0700 Subject: Hash table bug fix, and add copy constructor and assignment operator for principle of least surprise. --- node/Hashtable.hpp | 74 +++++++++++++++++++++++++++++++++++++++++++++++++----- selftest.cpp | 39 ++++++++++++++++++++++++---- 2 files changed, 102 insertions(+), 11 deletions(-) (limited to 'selftest.cpp') diff --git a/node/Hashtable.hpp b/node/Hashtable.hpp index c997d54f..5076751d 100644 --- a/node/Hashtable.hpp +++ b/node/Hashtable.hpp @@ -29,6 +29,7 @@ #include #include +#include namespace ZeroTier { @@ -47,9 +48,11 @@ private: { _Bucket(const K &k,const V &v) : k(k),v(v) {} _Bucket(const K &k) : k(k),v() {} + _Bucket(const _Bucket &b) : k(b.k),v(b.v) {} + inline _Bucket &operator=(const _Bucket &b) { k = b.k; v = b.v; return *this; } K k; V v; - _Bucket *next; + _Bucket *next; // must be set manually for each _Bucket }; public: @@ -115,12 +118,47 @@ public: _t[i] = (_Bucket *)0; } + Hashtable(const Hashtable &ht) : + _t(reinterpret_cast<_Bucket **>(::malloc(sizeof(_Bucket *) * ht._bc))), + _bc(ht._bc), + _s(ht._s) + { + if (!_t) + throw std::bad_alloc(); + for(unsigned long i=0;i<_bc;++i) + _t[i] = (_Bucket *)0; + for(unsigned long i=0;i<_bc;++i) { + const _Bucket *b = ht._t[i]; + while (b) { + _Bucket *nb = new _Bucket(*b); + nb->next = _t[i]; + _t[i] = nb; + b = b->next; + } + } + } + ~Hashtable() { - clear(); + this->clear(); ::free(_t); } + inline Hashtable &operator=(const Hashtable &ht) + { + this->clear(); + if (ht._s) { + for(unsigned long i=0;iset(b->k,b->v); + b = b->next; + } + } + } + return *this; + } + /** * Erase all entries */ @@ -140,6 +178,24 @@ public: } } + /** + * @return Vector of all keys + */ + inline typename std::vector keys() + { + typename std::vector k; + if (_s) { + for(unsigned long i=0;i<_bc;++i) { + _Bucket *b = _t[i]; + while (b) { + k.push_back(b->k); + b = b->next; + } + } + } + return k; + } + /** * @param k Key * @return Pointer to value or NULL if not found @@ -187,7 +243,8 @@ public: */ inline V &set(const K &k,const V &v) { - const unsigned long bidx = _hc(k) % _bc; + const unsigned long h = _hc(k); + unsigned long bidx = h % _bc; _Bucket *b = _t[bidx]; while (b) { @@ -198,8 +255,10 @@ public: b = b->next; } - if (_s >= _bc) + if (_s >= _bc) { _grow(); + bidx = h % _bc; + } b = new _Bucket(k,v); b->next = _t[bidx]; @@ -215,7 +274,8 @@ public: */ inline V &operator[](const K &k) { - const unsigned long bidx = _hc(k) % _bc; + const unsigned long h = _hc(k); + unsigned long bidx = h % _bc; _Bucket *b = _t[bidx]; while (b) { @@ -224,8 +284,10 @@ public: b = b->next; } - if (_s >= _bc) + if (_s >= _bc) { _grow(); + bidx = h % _bc; + } b = new _Bucket(k); b->next = _t[bidx]; diff --git a/selftest.cpp b/selftest.cpp index 15afe52e..5e3b620b 100644 --- a/selftest.cpp +++ b/selftest.cpp @@ -581,31 +581,60 @@ static int testOther() { std::cout << "[other] Testing Hashtable... "; std::cout.flush(); { - Hashtable ht(128); + Hashtable ht; + Hashtable ht2; std::map ref; // assume std::map works correctly :) for(int x=0;x<2;++x) { for(int i=0;i<25000;++i) { uint64_t k = rand(); while ((k == 0)||(ref.count(k) > 0)) ++k; - std::string v; + std::string v("!"); for(int j=0;j<(int)(k % 64);++j) v.push_back("0123456789"[rand() % 10]); ht.set(k,v); ref[k] = v; } if (ht.size() != ref.size()) { - std::cout << "FAILED! (size mismatch)" << std::endl; + std::cout << "FAILED! (size mismatch, original)" << std::endl; + return -1; + } + ht2 = ht; + Hashtable ht3(ht2); + if (ht2.size() != ref.size()) { + std::cout << "FAILED! (size mismatch, assigned)" << std::endl; + return -1; + } + if (ht3.size() != ref.size()) { + std::cout << "FAILED! (size mismatch, copied)" << std::endl; return -1; } for(std::map::iterator i(ref.begin());i!=ref.end();++i) { std::string *v = ht.get(i->first); if (!v) { - std::cout << "FAILED! (key not found)" << std::endl; + std::cout << "FAILED! (key " << i->first << " not found, original)" << std::endl; + return -1; + } + if (*v != i->second) { + std::cout << "FAILED! (key " << i->first << " not equal, original)" << std::endl; + return -1; + } + v = ht2.get(i->first); + if (!v) { + std::cout << "FAILED! (key " << i->first << " not found, assigned)" << std::endl; + return -1; + } + if (*v != i->second) { + std::cout << "FAILED! (key " << i->first << " not equal, assigned)" << std::endl; + return -1; + } + v = ht3.get(i->first); + if (!v) { + std::cout << "FAILED! (key " << i->first << " not found, copied)" << std::endl; return -1; } if (*v != i->second) { - std::cout << "FAILED! (key not equal)" << std::endl; + std::cout << "FAILED! (key " << i->first << " not equal, copied)" << std::endl; return -1; } } -- cgit v1.2.3 From 9a723be263d9307b2bf9d2efca3db9e8c12e6a92 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 10 Sep 2015 13:18:57 -0700 Subject: Add socketpair support to Phy. --- osdep/Phy.hpp | 115 +++++++++++++++++++++++++++++++++++++++++++++++-- selftest.cpp | 3 ++ service/OneService.cpp | 3 ++ 3 files changed, 117 insertions(+), 4 deletions(-) (limited to 'selftest.cpp') diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp index eca4109f..1a07fa0a 100644 --- a/osdep/Phy.hpp +++ b/osdep/Phy.hpp @@ -46,6 +46,7 @@ #define ZT_PHY_SOCKFD_VALID(s) ((s) != INVALID_SOCKET) #define ZT_PHY_CLOSE_SOCKET(s) ::closesocket(s) #define ZT_PHY_MAX_SOCKETS (FD_SETSIZE) +#define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS #define ZT_PHY_SOCKADDR_STORAGE_TYPE struct sockaddr_storage #else // not Windows @@ -58,6 +59,7 @@ #include #include #include +#include #include #include #include @@ -67,8 +69,14 @@ #define ZT_PHY_SOCKFD_VALID(s) ((s) > -1) #define ZT_PHY_CLOSE_SOCKET(s) ::close(s) #define ZT_PHY_MAX_SOCKETS (FD_SETSIZE) +#define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS #define ZT_PHY_SOCKADDR_STORAGE_TYPE struct sockaddr_storage +#if defined(__linux__) || defined(linux) || defined(__LINUX__) || defined(__linux) +#define ZT_PHY_HAVE_EVENTFD 1 +#include +#endif + #endif // Windows or not namespace ZeroTier { @@ -87,16 +95,24 @@ typedef void PhySocket; * This class is templated on a pointer to a handler class which must * implement the following functions: * + * For all platforms: + * * phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) * phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) * phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) * phyOnTcpClose(PhySocket *sock,void **uptr) * phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) * phyOnTcpWritable(PhySocket *sock,void **uptr) + * + * On Linux/OSX/Unix only (not required/used on Windows or elsewhere): + * * phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) * phyOnUnixClose(PhySocket *sock,void **uptr) * phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) * phyOnUnixWritable(PhySocket *sock,void **uptr) + * phyOnSocketPairEndpointClose(PhySocket *sock,void **uptr) + * phyOnSocketPairEndpointData(PhySocket *sock,void **uptr,void *data,unsigned long len) + * phyOnSocketPairEndpointWritable(PhySocket *sock,void **uptr) * * These templates typically refer to function objects. Templates are used to * avoid the call overhead of indirection, which is surprisingly high for high @@ -109,9 +125,6 @@ typedef void PhySocket; * uptr: sockL and uptrL for the listen socket, and sockN and uptrN for * the new TCP connection socket that has just been created. * - * Note that phyOnUnix*() are only required and will only be used on systems - * that support Unix domain sockets. - * * Handlers are always called. On outgoing TCP connection, CONNECT is always * called on either success or failure followed by DATA and/or WRITABLE as * indicated. On socket close, handlers are called unless close() is told @@ -138,7 +151,8 @@ private: ZT_PHY_SOCKET_RAW = 0x05, ZT_PHY_SOCKET_UDP = 0x06, ZT_PHY_SOCKET_UNIX_IN = 0x07, - ZT_PHY_SOCKET_UNIX_LISTEN = 0x08 + ZT_PHY_SOCKET_UNIX_LISTEN = 0x08, + ZT_PHY_SOCKET_PAIR_ENDPOINT = 0x09 }; struct PhySocketImpl @@ -226,8 +240,17 @@ public: ZT_PHY_CLOSE_SOCKET(_whackSendSocket); } + /** + * @param s Socket object + * @return Underlying OS-type (usually int or long) file descriptor associated with object + */ + static inline ZT_PHY_SOCKFD_TYPE getDescriptor(PhySocket *s) throw() { return reinterpret_cast(s)->sock; } + /** * Cause poll() to stop waiting immediately + * + * This can be used to reset the polling loop after changes that require + * attention, or to shut down a background thread that is waiting, etc. */ inline void whack() { @@ -248,6 +271,58 @@ public: */ inline unsigned long maxCount() const throw() { return ZT_PHY_MAX_SOCKETS; } +#ifdef __UNIX_LIKE__ + /** + * Create a two-way socket pair + * + * This uses socketpair() to create a local domain pair. The returned + * PhySocket holds the local side of the socket pair, while the + * supplied fd variable is set to the descriptor for the remote side. + * + * The local side is set to O_NONBLOCK to work with our poll loop, but + * the remote descriptor is left untouched. It's up to the caller to + * set any required fcntl(), ioctl(), or setsockopt() settings there. + * It's also up to the caller to close the remote descriptor when + * done, if necessary. + * + * @param remoteSocketDescriptor Result parameter set to remote end of socket pair's socket FD + * @param uptr Pointer to associate with local side of socket pair + * @return PhySocket for local side of socket pair + */ + inline PhySocket *createSocketPair(ZT_PHY_SOCKFD_TYPE &remoteSocketDescriptor,void *uptr = (void *)0) + { + if (_socks.size() >= ZT_PHY_MAX_SOCKETS) + return (PhySocket *)0; + + int fd[2]; fd[0] = -1; fd[1] = -1; + if ((::socketpair(PF_LOCAL,SOCK_STREAM,0,fd) != 0)||(fd[0] <= 0)||(fd[1] <= 0)) + return (PhySocket *)0; + fcntl(fd[0],F_SETFL,O_NONBLOCK); + + try { + _socks.push_back(PhySocketImpl()); + } catch ( ... ) { + ZT_PHY_CLOSE_SOCKET(fd[0]); + ZT_PHY_CLOSE_SOCKET(fd[1]); + return (PhySocket *)0; + } + PhySocketImpl &sws = _socks.back(); + + if ((long)fd[0] > _nfds) + _nfds = (long)fd[0]; + FD_SET(fd[0],&_readfds); + sws.type = ZT_PHY_SOCKET_PAIR_ENDPOINT; + sws.sock = fd[0]; + sws.uptr = uptr; + memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); + // no sockaddr for this socket type, leave saddr null + + remoteSocketDescriptor = fd[1]; + + return (PhySocket *)&sws; + } +#endif // __UNIX_LIKE__ + /** * Bind a UDP socket * @@ -883,6 +958,27 @@ public: #endif // __UNIX_LIKE__ break; + case ZT_PHY_SOCKET_PAIR_ENDPOINT: { +#ifdef __UNIX_LIKE__ + ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable + if (FD_ISSET(sock,&rfds)) { + long n = (long)::read(sock,buf,sizeof(buf)); + if (n <= 0) { + this->close((PhySocket *)&(*s),true); + } else { + try { + _handler->phyOnSocketPairEndpointData((PhySocket *)&(*s),&(s->uptr),(void *)buf,(unsigned long)n); + } catch ( ... ) {} + } + } + if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) { + try { + _handler->phyOnSocketPairEndpointWritable((PhySocket *)&(*s),&(s->uptr)); + } catch ( ... ) {} + } +#endif // __UNIX_LIKE__ + } break; + default: break; @@ -914,6 +1010,11 @@ public: ZT_PHY_CLOSE_SOCKET(sws.sock); +#ifdef __UNIX_LIKE__ + if (sws.type == ZT_PHY_SOCKET_UNIX_LISTEN) + ::unlink(((struct sockaddr_un *)(&(sws.saddr)))->sun_path); +#endif // __UNIX_LIKE__ + if (callHandlers) { switch(sws.type) { case ZT_PHY_SOCKET_TCP_OUT_PENDING: @@ -934,6 +1035,12 @@ public: } catch ( ... ) {} #endif // __UNIX_LIKE__ break; + case ZT_PHY_SOCKET_PAIR_ENDPOINT: +#ifdef __UNIX_LIKE__ + try { + _handler->phyOnSocketPairEndpointClose(sock,&(sws.uptr)); + } catch ( ... ) {} +#endif // __UNIX_LIKE__ default: break; } diff --git a/selftest.cpp b/selftest.cpp index 5e3b620b..a664ca8e 100644 --- a/selftest.cpp +++ b/selftest.cpp @@ -826,6 +826,9 @@ struct TestPhyHandlers inline void phyOnUnixClose(PhySocket *sock,void **uptr) {} inline void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} inline void phyOnUnixWritable(PhySocket *sock,void **uptr) {} + inline void phyOnSocketPairEndpointClose(PhySocket *sock,void **uptr) {} + inline void phyOnSocketPairEndpointData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} + inline void phyOnSocketPairEndpointWritable(PhySocket *sock,void **uptr) {} #endif // __UNIX_LIKE__ }; static int testPhy() diff --git a/service/OneService.cpp b/service/OneService.cpp index f7ea2130..8085c9b4 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -928,6 +928,9 @@ public: inline void phyOnUnixClose(PhySocket *sock,void **uptr) {} inline void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} inline void phyOnUnixWritable(PhySocket *sock,void **uptr) {} + inline void phyOnSocketPairEndpointClose(PhySocket *sock,void **uptr) {} + inline void phyOnSocketPairEndpointData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} + inline void phyOnSocketPairEndpointWritable(PhySocket *sock,void **uptr) {} inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwc) { -- cgit v1.2.3 From 9dc2ef554997f5598c9cf2c4d3ca041c3152a962 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 10 Sep 2015 15:55:48 -0700 Subject: Rename some stuff in Phy since it can be used with any stream socket. --- selftest.cpp | 4 ++-- service/OneService.cpp | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'selftest.cpp') diff --git a/selftest.cpp b/selftest.cpp index a664ca8e..b899ee5a 100644 --- a/selftest.cpp +++ b/selftest.cpp @@ -795,7 +795,7 @@ struct TestPhyHandlers { ++phyTestTcpAcceptCount; *uptrN = new std::string(ZT_TEST_PHY_TCP_MESSAGE_SIZE,(char)0xff); - testPhyInstance->tcpSetNotifyWritable(sockN,true); + testPhyInstance->setNotifyWritable(sockN,true); } inline void phyOnTcpClose(PhySocket *sock,void **uptr) @@ -812,7 +812,7 @@ struct TestPhyHandlers { std::string *testMessage = (std::string *)*uptr; if ((testMessage)&&(testMessage->length() > 0)) { - long sent = testPhyInstance->tcpSend(sock,(const void *)testMessage->data(),(unsigned long)testMessage->length(),true); + long sent = testPhyInstance->streamSend(sock,(const void *)testMessage->data(),(unsigned long)testMessage->length(),true); if (sent > 0) testMessage->erase(0,sent); } diff --git a/service/OneService.cpp b/service/OneService.cpp index 8085c9b4..e8b8ba60 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -772,7 +772,7 @@ public: tc->writeBuf.push_back((char)ZEROTIER_ONE_VERSION_MINOR); tc->writeBuf.push_back((char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff)); tc->writeBuf.push_back((char)(ZEROTIER_ONE_VERSION_REVISION & 0xff)); - _phy.tcpSetNotifyWritable(sock,true); + _phy.setNotifyWritable(sock,true); _tcpFallbackTunnel = tc; } @@ -907,12 +907,12 @@ public: TcpConnection *tc = reinterpret_cast(*uptr); Mutex::Lock _l(tc->writeBuf_m); if (tc->writeBuf.length() > 0) { - long sent = (long)_phy.tcpSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true); + long sent = (long)_phy.streamSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true); if (sent > 0) { tc->lastActivity = OSUtils::now(); if ((unsigned long)sent >= (unsigned long)tc->writeBuf.length()) { tc->writeBuf = ""; - _phy.tcpSetNotifyWritable(sock,false); + _phy.setNotifyWritable(sock,false); if (!tc->shouldKeepAlive) _phy.close(sock); // will call close handler to delete from _tcpConnections } else { @@ -920,7 +920,7 @@ public: } } } else { - _phy.tcpSetNotifyWritable(sock,false); + _phy.setNotifyWritable(sock,false); } } @@ -1111,7 +1111,7 @@ public: if (_tcpFallbackTunnel) { Mutex::Lock _l(_tcpFallbackTunnel->writeBuf_m); if (!_tcpFallbackTunnel->writeBuf.length()) - _phy.tcpSetNotifyWritable(_tcpFallbackTunnel->sock,true); + _phy.setNotifyWritable(_tcpFallbackTunnel->sock,true); unsigned long mlen = len + 7; _tcpFallbackTunnel->writeBuf.push_back((char)0x17); _tcpFallbackTunnel->writeBuf.push_back((char)0x03); @@ -1214,7 +1214,7 @@ public: tc->writeBuf.append(data); } - _phy.tcpSetNotifyWritable(tc->sock,true); + _phy.setNotifyWritable(tc->sock,true); } inline void onHttpResponseFromClient(TcpConnection *tc) -- cgit v1.2.3