diff options
-rw-r--r-- | node/Node.cpp | 12 | ||||
-rw-r--r-- | node/Node.hpp | 7 | ||||
-rw-r--r-- | node/RuntimeEnvironment.hpp | 4 | ||||
-rw-r--r-- | node/SocketManager.hpp | 15 | ||||
-rw-r--r-- | osnet/NativeSocketManager.cpp | 33 | ||||
-rw-r--r-- | osnet/NativeSocketManager.hpp | 29 |
6 files changed, 32 insertions, 68 deletions
diff --git a/node/Node.cpp b/node/Node.cpp index 1633d416..5990b83a 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -84,8 +84,6 @@ struct _NodeImpl { RuntimeEnvironment renv; - unsigned int udpPort,tcpPort; - std::string reasonForTerminationStr; volatile Node::ReasonForTermination reasonForTermination; @@ -112,7 +110,6 @@ struct _NodeImpl delete renv.updater; renv.updater = (SoftwareUpdater *)0; delete renv.nc; renv.nc = (NodeConfig *)0; // shut down all networks, close taps, etc. delete renv.topology; renv.topology = (Topology *)0; // now we no longer need routing info - delete renv.sm; renv.sm = (SocketManager *)0; // close all sockets delete renv.sw; renv.sw = (Switch *)0; // order matters less from here down delete renv.mc; renv.mc = (Multicaster *)0; delete renv.antiRec; renv.antiRec = (AntiRecursion *)0; @@ -222,8 +219,7 @@ Node::Node( const char *hp, EthernetTapFactory *tf, RoutingTable *rt, - unsigned int udpPort, - unsigned int tcpPort, + SocketManager *sm, bool resetIdentity, const char *overrideRootTopology) throw() : _impl(new _NodeImpl) @@ -236,6 +232,7 @@ Node::Node( impl->renv.tapFactory = tf; impl->renv.routingTable = rt; + impl->renv.sm = sm; if (resetIdentity) { // Forget identity and peer database, peer keys, etc. @@ -255,8 +252,6 @@ Node::Node( } } - impl->udpPort = udpPort & 0xffff; - impl->tcpPort = tcpPort & 0xffff; impl->reasonForTermination = Node::NODE_RUNNING; impl->started = false; impl->running = false; @@ -400,7 +395,6 @@ Node::ReasonForTermination Node::run() RR->antiRec = new AntiRecursion(); RR->mc = new Multicaster(RR); RR->sw = new Switch(RR); - RR->sm = new SocketManager(impl->udpPort,impl->tcpPort,&_CBztTraffic,RR); RR->topology = new Topology(RR); try { RR->nc = new NodeConfig(RR); @@ -666,7 +660,7 @@ Node::ReasonForTermination Node::run() try { unsigned long delay = std::min((unsigned long)ZT_MAX_SERVICE_LOOP_INTERVAL,RR->sw->doTimerTasks()); uint64_t start = Utils::now(); - RR->sm->poll(delay); + RR->sm->poll(delay,&_CBztTraffic,RR); lastDelayDelta = (long)(Utils::now() - start) - (long)delay; // used to detect sleep/wake } catch (std::exception &exc) { LOG("unexpected exception running Switch doTimerTasks: %s",exc.what()); diff --git a/node/Node.hpp b/node/Node.hpp index 259cdea2..1b338b22 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -36,6 +36,7 @@ namespace ZeroTier { class EthernetTapFactory; class RoutingTable; +class SocketManager; /** * A ZeroTier One node @@ -85,8 +86,7 @@ public: * @param hp Home directory path or NULL for system-wide default for this platform * @param tf Ethernet tap factory for platform network stack * @param rt Routing table interface for platform network stack - * @param udpPort UDP port or 0 to disable - * @param tcpPort TCP port or 0 to disable + * @param sm Socket manager for physical network I/O * @param resetIdentity If true, delete identity before starting and regenerate * @param overrideRootTopology Override root topology with this dictionary (in string serialized format) and do not update (default: NULL for none) */ @@ -94,8 +94,7 @@ public: const char *hp, EthernetTapFactory *tf, RoutingTable *rt, - unsigned int udpPort, - unsigned int tcpPort, + SocketManager *sm, bool resetIdentity, const char *overrideRootTopology = (const char *)0) throw(); diff --git a/node/RuntimeEnvironment.hpp b/node/RuntimeEnvironment.hpp index 26c8da76..1061c452 100644 --- a/node/RuntimeEnvironment.hpp +++ b/node/RuntimeEnvironment.hpp @@ -75,13 +75,13 @@ public: timeOfLastPacketReceived(0), tapFactory((EthernetTapFactory *)0), routingTable((RoutingTable *)0), + sm((SocketManager *)0), log((Logger *)0), prng((CMWC4096 *)0), http((HttpClient *)0), antiRec((AntiRecursion *)0), mc((Multicaster *)0), sw((Switch *)0), - sm((SocketManager *)0), topology((Topology *)0), nc((NodeConfig *)0), node((Node *)0), @@ -117,6 +117,7 @@ public: // These are passed in from outside and are not created or deleted by the ZeroTier node core EthernetTapFactory *tapFactory; RoutingTable *routingTable; + SocketManager *sm; /* * Order matters a bit here. These are constructed in this order @@ -132,7 +133,6 @@ public: AntiRecursion *antiRec; Multicaster *mc; Switch *sw; - SocketManager *sm; Topology *topology; NodeConfig *nc; Node *node; diff --git a/node/SocketManager.hpp b/node/SocketManager.hpp index 3ac53e3d..29cc94bf 100644 --- a/node/SocketManager.hpp +++ b/node/SocketManager.hpp @@ -48,9 +48,7 @@ namespace ZeroTier { class SocketManager : NonCopyable { public: - SocketManager(void (*packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg) : - _packetHandler(packetHandler), - _arg(arg) {} + SocketManager() {} virtual ~SocketManager() {} /** @@ -87,8 +85,13 @@ public: * If called concurrently, one will block until the other completes. * * @param timeout Timeout in milliseconds, may return sooner if whack() is called + * @param handler Packet data handler + * @param arg Void argument to packet data handler */ - virtual void poll(unsigned long timeout) = 0; + virtual void poll( + unsigned long timeout, + void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &), + void *arg); /** * Cause current or next blocking poll() operation to timeout immediately @@ -99,10 +102,6 @@ public: * Close TCP sockets */ virtual void closeTcpSockets() = 0; - -protected: - void (*_packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &); - void *_arg; }; } // namespace ZeroTier diff --git a/osnet/NativeSocketManager.cpp b/osnet/NativeSocketManager.cpp index a85394ed..70c4df73 100644 --- a/osnet/NativeSocketManager.cpp +++ b/osnet/NativeSocketManager.cpp @@ -79,9 +79,6 @@ public: NativeSocket(const Type &t,int s) : Socket(t),_sock(s) {} int _sock; #endif - - virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm) = 0; - virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) = 0; }; /** @@ -122,7 +119,7 @@ public: } } - virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm) + inline bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg) { Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> buf; InetAddress from; @@ -130,12 +127,14 @@ public: int n = (int)recvfrom(_sock,(char *)(buf.data()),ZT_SOCKET_MAX_MESSAGE_LEN,0,from.saddr(),&salen); if (n > 0) { buf.setSize((unsigned int)n); - sm->handleReceivedPacket(self,from,buf); + try { + handler(self,arg,from,buf); + } catch ( ... ) {} // handlers should not throw } return true; } - virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) + inline bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) { return true; } @@ -226,7 +225,7 @@ public: return true; } - virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm) + inline bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg) { unsigned char buf[65536]; @@ -251,7 +250,7 @@ public: Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> data(_inbuf + 5,pl - 5); memmove(_inbuf,_inbuf + pl,p -= pl); try { - sm->handleReceivedPacket(self,_remote,data); + handler(self,arg,_remote,data); } catch ( ... ) {} // handlers should not throw pl = 0; } @@ -261,7 +260,7 @@ public: return true; } - virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) + inline bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) { Mutex::Lock _l(_writeLock); @@ -343,12 +342,8 @@ static inline void winPipeHack(SOCKET fds[2]) } #endif -NativeSocketManager::NativeSocketManager( - int localUdpPort, - int localTcpPort, - void (*packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &), - void *arg) : - SocketManager(packetHandler,arg), +NativeSocketManager::NativeSocketManager(int localUdpPort,int localTcpPort) : + SocketManager(), _whackSendPipe(INVALID_SOCKET), _whackReceivePipe(INVALID_SOCKET), _tcpV4ListenSocket(INVALID_SOCKET), @@ -707,7 +702,7 @@ bool NativeSocketManager::send(const InetAddress &to,bool tcp,bool autoConnectTc return false; } -void NativeSocketManager::poll(unsigned long timeout) +void NativeSocketManager::poll(unsigned long timeout,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg) { fd_set rfds,wfds,efds; struct timeval tv; @@ -834,11 +829,11 @@ void NativeSocketManager::poll(unsigned long timeout) { NativeUdpSocket *usock = (NativeUdpSocket *)_udpV4Socket.ptr(); if ((usock)&&(FD_ISSET(usock->_sock,&rfds))) { - usock->notifyAvailableForRead(_udpV4Socket,this); + usock->notifyAvailableForRead(_udpV4Socket,this,handler,arg); } usock = (NativeUdpSocket *)_udpV6Socket.ptr(); if ((usock)&&(FD_ISSET(usock->_sock,&rfds))) { - usock->notifyAvailableForRead(_udpV6Socket,this); + usock->notifyAvailableForRead(_udpV6Socket,this,handler,arg); } } @@ -885,7 +880,7 @@ void NativeSocketManager::poll(unsigned long timeout) } } if (FD_ISSET(tsock->_sock,&rfds)) { - if (!tsock->notifyAvailableForRead(*s,this)) { + if (!tsock->notifyAvailableForRead(*s,this,handler,arg)) { { Mutex::Lock _l2(_tcpSockets_m); _tcpSockets.erase(tsock->_remote); diff --git a/osnet/NativeSocketManager.hpp b/osnet/NativeSocketManager.hpp index d6b014cb..64ac5d9c 100644 --- a/osnet/NativeSocketManager.hpp +++ b/osnet/NativeSocketManager.hpp @@ -58,9 +58,7 @@ class NativeUdpSocket; class NativeTcpSocket; /** - * Socket I/O multiplexer - * - * This wraps select(), epoll(), etc. and handles creation of Sockets. + * Native socket manager for Unix and Windows */ class NativeSocketManager : public SocketManager { @@ -68,36 +66,15 @@ class NativeSocketManager : public SocketManager friend class NativeTcpSocket; public: - /** - * @param localUdpPort Local UDP port to bind or 0 for no UDP support - * @param localTcpPort Local TCP port to listen to or 0 for no incoming TCP connect support - * @param packetHandler Function to call when packets are received by a socket - * @param arg Second argument to packetHandler() - * @throws std::runtime_error Could not bind local port(s) or open socket(s) - */ - NativeSocketManager( - int localUdpPort, - int localTcpPort, - void (*packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &), - void *arg); - + NativeSocketManager(int localUdpPort,int localTcpPort); virtual ~NativeSocketManager(); virtual bool send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen); - virtual void poll(unsigned long timeout); + virtual void poll(unsigned long timeout,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg); virtual void whack(); virtual void closeTcpSockets(); private: - // Called by socket implementations when a packet is received - inline void handleReceivedPacket(const SharedPtr<Socket> &sock,const InetAddress &from,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &data) - throw() - { - try { - _packetHandler(sock,_arg,from,data); - } catch ( ... ) {} // handlers shouldn't throw - } - // Used by TcpSocket to register/unregister for write availability notification void _startNotifyWrite(const NativeSocket *sock); void _stopNotifyWrite(const NativeSocket *sock); |