diff options
author | Adam Ierymenko <adam.ierymenko@zerotier.com> | 2014-03-25 08:37:08 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@zerotier.com> | 2014-03-25 08:37:08 -0700 |
commit | 328be8f8461462ad88a639822b7df32f7b92211f (patch) | |
tree | 69c1d4dec9b0f0c885f5c3c922e7c0bada582181 /node | |
parent | d9b91758e044a70242dee1697345cd2ee4a096d4 (diff) | |
download | infinitytier-328be8f8461462ad88a639822b7df32f7b92211f.tar.gz infinitytier-328be8f8461462ad88a639822b7df32f7b92211f.zip |
Make Windows build, add (untested) Windows named pipe based IPC code.
Diffstat (limited to 'node')
-rw-r--r-- | node/IpcConnection.cpp | 55 | ||||
-rw-r--r-- | node/IpcConnection.hpp | 13 | ||||
-rw-r--r-- | node/IpcListener.cpp | 41 | ||||
-rw-r--r-- | node/IpcListener.hpp | 8 | ||||
-rw-r--r-- | node/SocketManager.cpp | 99 | ||||
-rw-r--r-- | node/SocketManager.hpp | 68 | ||||
-rw-r--r-- | node/TcpSocket.cpp | 6 | ||||
-rw-r--r-- | node/UdpSocket.cpp | 2 |
8 files changed, 206 insertions, 86 deletions
diff --git a/node/IpcConnection.cpp b/node/IpcConnection.cpp index 292403c0..f0e09449 100644 --- a/node/IpcConnection.cpp +++ b/node/IpcConnection.cpp @@ -35,10 +35,7 @@ #include "IpcConnection.hpp" -#ifdef __WINDOWS__ -#include <WinSock2.h> -#include <Windows.h> -#else +#ifndef __WINDOWS__ #include <sys/socket.h> #include <sys/un.h> #include <unistd.h> @@ -49,9 +46,18 @@ namespace ZeroTier { IpcConnection::IpcConnection(const char *endpoint,void (*commandHandler)(void *,IpcConnection *,IpcConnection::EventType,const char *),void *arg) : _handler(commandHandler), _arg(arg), +#ifdef __WINDOWS__ + _sock(INVALID_HANDLE_VALUE) +#else _sock(0) +#endif { #ifdef __WINDOWS__ + _sock = CreateFileA(endpoint,GENERIC_READ|GENERIC_WRITE,FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,NULL,OPEN_EXISTING,0,NULL); + if (_sock == INVALID_HANDLE_VALUE) + throw std::runtime_error("IPC endpoint unreachable"); + DWORD pipeMode = PIPE_READMODE_BYTE; + SetNamedPipeHandleState(_sock,&pipeMode,NULL,NULL); #else struct sockaddr_un unaddr; unaddr.sun_family = AF_UNIX; @@ -71,7 +77,11 @@ IpcConnection::IpcConnection(const char *endpoint,void (*commandHandler)(void *, Thread::start(this); } +#ifdef __WINDOWS__ +IpcConnection::IpcConnection(HANDLE s,void (*commandHandler)(void *,IpcConnection *,IpcConnection::EventType,const char *),void *arg) : +#else IpcConnection::IpcConnection(int s,void (*commandHandler)(void *,IpcConnection *,IpcConnection::EventType,const char *),void *arg) : +#endif _handler(commandHandler), _arg(arg), _sock(s) @@ -81,17 +91,22 @@ IpcConnection::IpcConnection(int s,void (*commandHandler)(void *,IpcConnection * IpcConnection::~IpcConnection() { + _writeLock.lock(); #ifdef __WINDOWS__ + HANDLE s = _sock; + _sock = INVALID_HANDLE_VALUE; + if (s != INVALID_HANDLE_VALUE) { + CloseHandle(s); + } #else - _writeLock.lock(); int s = _sock; _sock = 0; if (s > 0) { ::shutdown(s,SHUT_RDWR); ::close(s); } - _writeLock.unlock(); #endif + _writeLock.unlock(); } void IpcConnection::printf(const char *format,...) @@ -112,6 +127,8 @@ void IpcConnection::printf(const char *format,...) return; #ifdef __WINDOWS__ + DWORD bsent = 0; + WriteFile(_sock,tmp,n,&bsent,NULL); #else ::write(_sock,tmp,n); #endif @@ -120,21 +137,34 @@ void IpcConnection::printf(const char *format,...) void IpcConnection::threadMain() throw() { -#ifdef __WINDOWS__ -#else char tmp[65536]; char linebuf[65536]; unsigned int lineptr = 0; +#ifdef __WINDOWS__ + HANDLE s; + DWORD n,i; +#else int s,n,i; +#endif char c; for(;;) { +#ifdef __WINDOWS__ + s = _sock; + if (s == INVALID_HANDLE_VALUE) + break; + if (!ReadFile(s,tmp,sizeof(tmp),&n,NULL)) + break; + if (n < 0) + break; +#else s = _sock; if (s <= 0) break; n = (int)::read(s,tmp,sizeof(tmp)); if (n <= 0) break; +#endif for(i=0;i<n;++i) { c = (linebuf[lineptr] = tmp[i]); if ((c == '\r')||(c == '\n')||(lineptr == (sizeof(linebuf) - 1))) { @@ -149,15 +179,20 @@ void IpcConnection::threadMain() { _writeLock.lock(); - int s = _sock; + s = _sock; +#ifdef __WINDOWS__ + _sock = INVALID_HANDLE_VALUE; + if (s != INVALID_HANDLE_VALUE) + CloseHandle(s); +#else _sock = 0; if (s > 0) ::close(s); +#endif _writeLock.unlock(); } _handler(_arg,this,IPC_EVENT_CONNECTION_CLOSED,(const char *)0); -#endif } } // namespace ZeroTier diff --git a/node/IpcConnection.hpp b/node/IpcConnection.hpp index d1294d36..b73198cd 100644 --- a/node/IpcConnection.hpp +++ b/node/IpcConnection.hpp @@ -33,6 +33,11 @@ #include "NonCopyable.hpp" #include "Mutex.hpp" +#ifdef __WINDOWS__ +#include <WinSock2.h> +#include <Windows.h> +#endif + namespace ZeroTier { class IpcListener; @@ -74,11 +79,19 @@ public: private: // Used by IpcListener to construct incoming connections +#ifdef __WINDOWS__ + IpcConnection(HANDLE s,void (*commandHandler)(void *,IpcConnection *,IpcConnection::EventType,const char *),void *arg); +#else IpcConnection(int s,void (*commandHandler)(void *,IpcConnection *,IpcConnection::EventType,const char *),void *arg); +#endif void (*_handler)(void *,IpcConnection *,IpcConnection::EventType,const char *); void *_arg; +#ifdef __WINDOWS__ + volatile HANDLE _sock; +#else volatile int _sock; +#endif Mutex _writeLock; }; diff --git a/node/IpcListener.cpp b/node/IpcListener.cpp index 306231a1..589c11ef 100644 --- a/node/IpcListener.cpp +++ b/node/IpcListener.cpp @@ -32,10 +32,7 @@ #include "IpcListener.hpp" -#ifdef __WINDOWS__ -#include <WinSock2.h> -#include <Windows.h> -#else +#ifndef __WINDOWS__ #include <sys/socket.h> #include <sys/un.h> #include <sys/stat.h> @@ -49,10 +46,14 @@ IpcListener::IpcListener(const char *ep,void (*commandHandler)(void *,IpcConnect _endpoint(ep), _handler(commandHandler), _arg(arg), - _sock(0) -{ #ifdef __WINDOWS__ + _sock(INVALID_HANDLE_VALUE), + _die(false) #else + _sock(0) +#endif +{ +#ifndef __WINDOWS__ struct sockaddr_un unaddr; unaddr.sun_family = AF_UNIX; strncpy(unaddr.sun_path,_endpoint.c_str(),sizeof(unaddr.sun_path)); @@ -93,6 +94,14 @@ IpcListener::IpcListener(const char *ep,void (*commandHandler)(void *,IpcConnect IpcListener::~IpcListener() { #ifdef __WINDOWS__ + _sock_m.lock(); + _die = true; + HANDLE s = _sock; + _sock = INVALID_HANDLE_VALUE; + if (s != INVALID_HANDLE_VALUE) + CloseHandle(s); + _sock_m.unlock(); + Thread::join(_thread); #else int s = _sock; _sock = 0; @@ -109,6 +118,26 @@ void IpcListener::threadMain() throw() { #ifdef __WINDOWS__ + HANDLE s; + while (!_die) { + { + Mutex::Lock _l(_sock_m); + s = _sock = CreateNamedPipeA(_endpoint.c_str(),PIPE_ACCESS_DUPLEX,PIPE_READMODE_BYTE|PIPE_TYPE_BYTE|PIPE_WAIT,PIPE_UNLIMITED_INSTANCES,4096,4096,0,NULL); + } + if (s != INVALID_HANDLE_VALUE) { + if ((ConnectNamedPipe(s,NULL))||(GetLastError() == ERROR_PIPE_CONNECTED)) { + Mutex::Lock _l(_sock_m); + try { + if (s != INVALID_HANDLE_VALUE) + _handler(_arg,new IpcConnection(s,_handler,_arg),IpcConnection::IPC_EVENT_NEW_CONNECTION,(const char *)0); + } catch ( ... ) {} // handlers should not throw + } else { + Mutex::Lock _l(_sock_m); + CloseHandle(s); + _sock = INVALID_HANDLE_VALUE; + } + } + } #else struct sockaddr_un unaddr; socklen_t socklen; diff --git a/node/IpcListener.hpp b/node/IpcListener.hpp index 00a29b5d..60376c90 100644 --- a/node/IpcListener.hpp +++ b/node/IpcListener.hpp @@ -37,6 +37,8 @@ #include <stdexcept> #ifdef __WINDOWS__ +#include <WinSock2.h> +#include <Windows.h> #define ZT_IPC_ENDPOINT_BASE "\\\\.\\pipe\\ZeroTierOne-" #else #define ZT_IPC_ENDPOINT_BASE "/tmp/.ZeroTierOne-" @@ -80,7 +82,13 @@ private: std::string _endpoint; void (*_handler)(void *,IpcConnection *,IpcConnection::EventType,const char *); void *_arg; +#ifdef __WINDOWS__ + volatile HANDLE _sock; + volatile bool _die; + Mutex _sock_m; +#else volatile int _sock; +#endif Thread _thread; }; diff --git a/node/SocketManager.cpp b/node/SocketManager.cpp index dc3e1a04..cd7c646f 100644 --- a/node/SocketManager.cpp +++ b/node/SocketManager.cpp @@ -104,6 +104,8 @@ SocketManager::SocketManager( __winpipe(tmps); _whackSendPipe = tmps[0]; _whackReceivePipe = tmps[1]; + u_long iMode=1; + ioctlsocket(tmps[1],FIONBIO,&iMode); } #else { @@ -112,9 +114,9 @@ SocketManager::SocketManager( throw std::runtime_error("pipe() failed"); _whackSendPipe = tmpfds[1]; _whackReceivePipe = tmpfds[0]; + fcntl(_whackReceivePipe,F_SETFL,O_NONBLOCK); } #endif - fcntl(_whackReceivePipe,F_SETFL,O_NONBLOCK); FD_SET(_whackReceivePipe,&_readfds); if (localTcpPort > 0) { @@ -142,15 +144,17 @@ SocketManager::SocketManager( BOOL f; f = TRUE; ::setsockopt(_tcpV6ListenSocket,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f)); f = TRUE; ::setsockopt(_tcpV6ListenSocket,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f)); + u_long iMode=1; + ioctlsocket(_tcpV6ListenSocket,FIONBIO,&iMode); } #else { int f; f = 1; ::setsockopt(_tcpV6ListenSocket,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f)); f = 1; ::setsockopt(_tcpV6ListenSocket,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); + fcntl(_tcpV6ListenSocket,F_SETFL,O_NONBLOCK); } #endif - fcntl(_tcpV6ListenSocket,F_SETFL,O_NONBLOCK); struct sockaddr_in6 sin6; memset(&sin6,0,sizeof(sin6)); @@ -187,13 +191,15 @@ SocketManager::SocketManager( #ifdef __WINDOWS__ { BOOL f = TRUE; ::setsockopt(_tcpV4ListenSocket,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f)); + u_long iMode=1; + ioctlsocket(_tcpV4ListenSocket,FIONBIO,&iMode); } #else { int f = 1; ::setsockopt(_tcpV4ListenSocket,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); + fcntl(_tcpV4ListenSocket,F_SETFL,O_NONBLOCK); } #endif - fcntl(_tcpV4ListenSocket,F_SETFL,O_NONBLOCK); struct sockaddr_in sin4; memset(&sin4,0,sizeof(sin4)); @@ -269,7 +275,12 @@ SocketManager::SocketManager( } _udpV6Socket = SharedPtr<Socket>(new UdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V6,s)); +#ifdef __WINDOWS__ + u_long iMode=1; + ioctlsocket(s,FIONBIO,&iMode); +#else fcntl(s,F_SETFL,O_NONBLOCK); +#endif FD_SET(s,&_readfds); } @@ -317,7 +328,12 @@ SocketManager::SocketManager( } _udpV4Socket = SharedPtr<Socket>(new UdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V4,s)); +#ifdef __WINDOWS__ + u_long iMode=1; + ioctlsocket(s,FIONBIO,&iMode); +#else fcntl(s,F_SETFL,O_NONBLOCK); +#endif FD_SET(s,&_readfds); } } @@ -352,6 +368,10 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned ::closesocket(s); return false; } + { + u_long iMode=1; + ioctlsocket(s,FIONBIO,&iMode); + } #else int s = ::socket(to.isV4() ? AF_INET : AF_INET6,SOCK_STREAM,0); if (s <= 0) @@ -360,8 +380,8 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned ::close(s); return false; } -#endif fcntl(s,F_SETFL,O_NONBLOCK); +#endif bool connecting = false; if (connect(s,to.saddr(),to.saddrLen())) { @@ -455,14 +475,16 @@ void SocketManager::poll(unsigned long timeout) Mutex::Lock _l2(_tcpSockets_m); try { _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia)); - +#ifdef __WINDOWS__ + u_long iMode=1; + ioctlsocket(sockfd,FIONBIO,&iMode); +#else fcntl(sockfd,F_SETFL,O_NONBLOCK); - +#endif _fdSetLock.lock(); FD_SET(sockfd,&_readfds); _fdSetLock.unlock(); - - if (sockfd > _nfds) + if ((int)sockfd > (int)_nfds) _nfds = sockfd; } catch ( ... ) { CLOSE_SOCKET(sockfd); @@ -486,14 +508,16 @@ void SocketManager::poll(unsigned long timeout) Mutex::Lock _l2(_tcpSockets_m); try { _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia)); - +#ifdef __WINDOWS__ + u_long iMode=1; + ioctlsocket(sockfd,FIONBIO,&iMode); +#else fcntl(sockfd,F_SETFL,O_NONBLOCK); - +#endif _fdSetLock.lock(); FD_SET(sockfd,&_readfds); _fdSetLock.unlock(); - - if (sockfd > _nfds) + if ((int)sockfd > (int)_nfds) _nfds = sockfd; } catch ( ... ) { CLOSE_SOCKET(sockfd); @@ -570,7 +594,7 @@ void SocketManager::whack() { _whackSendPipe_m.lock(); #ifdef __WINDOWS__ - ::send(_whackSendPipe,(const void *)this,1,0); + ::send(_whackSendPipe,(const char *)this,1,0); #else ::write(_whackSendPipe,(const void *)this,1); // data is arbitrary, just send a byte #endif @@ -592,4 +616,53 @@ void SocketManager::closeTcpSockets() _updateNfds(); } +void SocketManager::_closeSockets() + throw() +{ +#ifdef __WINDOWS__ + if (_whackSendPipe != INVALID_SOCKET) + ::closesocket(_whackSendPipe); + if (_whackReceivePipe != INVALID_SOCKET) + ::closesocket(_whackReceivePipe); + if (_tcpV4ListenSocket != INVALID_SOCKET) + ::closesocket(_tcpV4ListenSocket); + if (_tcpV6ListenSocket != INVALID_SOCKET) + ::closesocket(_tcpV6ListenSocket); +#else + if (_whackSendPipe > 0) + ::close(_whackSendPipe); + if (_whackReceivePipe > 0) + ::close(_whackReceivePipe); + if (_tcpV4ListenSocket > 0) + ::close(_tcpV4ListenSocket); + if (_tcpV4ListenSocket > 0) + ::close(_tcpV6ListenSocket); +#endif +} + +void SocketManager::_updateNfds() +{ +#ifdef __WINDOWS__ + SOCKET nfds = _whackSendPipe; +#else + int nfds = _whackSendPipe; +#endif + if (_whackReceivePipe > nfds) + nfds = _whackReceivePipe; + if (_tcpV4ListenSocket > nfds) + nfds = _tcpV4ListenSocket; + if (_tcpV6ListenSocket > nfds) + nfds = _tcpV6ListenSocket; + if ((_udpV4Socket)&&(_udpV4Socket->_sock > nfds)) + nfds = _udpV4Socket->_sock; + if ((_udpV6Socket)&&(_udpV6Socket->_sock > nfds)) + nfds = _udpV6Socket->_sock; + Mutex::Lock _l(_tcpSockets_m); + for(std::map< InetAddress,SharedPtr<Socket> >::const_iterator s(_tcpSockets.begin());s!=_tcpSockets.end();++s) { + if (s->second->_sock > nfds) + nfds = s->second->_sock; + } + _nfds = nfds; +} + } // namespace ZeroTier diff --git a/node/SocketManager.hpp b/node/SocketManager.hpp index dbc4e686..003c35eb 100644 --- a/node/SocketManager.hpp +++ b/node/SocketManager.hpp @@ -30,16 +30,6 @@ #include <stdio.h> #include <stdlib.h> -#ifdef __WINDOWS__ -#include <WinSock2.h> -#include <WS2tcpip.h> -#include <Windows.h> -#else -#include <unistd.h> -#include <sys/time.h> -#include <sys/types.h> -#include <sys/select.h> -#endif #include <map> #include <stdexcept> @@ -52,6 +42,17 @@ #include "NonCopyable.hpp" #include "Buffer.hpp" +#ifdef __WINDOWS__ +#include <WinSock2.h> +#include <WS2tcpip.h> +#include <Windows.h> +#else +#include <unistd.h> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/select.h> +#endif + namespace ZeroTier { /** @@ -150,50 +151,11 @@ private: } // Called in SocketManager destructor or in constructor cleanup before exception throwing - inline void _closeSockets() - throw() - { -#ifdef __WINDOWS__ - if (_whackSendPipe != INVALID_SOCKET) - ::closesocket(_whackSendPipe); - if (_whackReceivePipe != INVALID_SOCKET) - ::closesocket(_whackReceivePipe); - if (_tcpV4ListenSocket != INVALID_SOCKET) - ::closesocket(s); - if (_tcpV6ListenSocket != INVALID_SOCKET) - ::closesocket(s); -#else - if (_whackSendPipe > 0) - ::close(_whackSendPipe); - if (_whackReceivePipe > 0) - ::close(_whackReceivePipe); - if (_tcpV4ListenSocket > 0) - ::close(_tcpV4ListenSocket); - if (_tcpV4ListenSocket > 0) - ::close(_tcpV6ListenSocket); -#endif - } + void _closeSockets() + throw(); - inline void _updateNfds() - { - int nfds = _whackSendPipe; - if (_whackReceivePipe > nfds) - nfds = _whackReceivePipe; - if (_tcpV4ListenSocket > nfds) - nfds = _tcpV4ListenSocket; - if (_tcpV6ListenSocket > nfds) - nfds = _tcpV6ListenSocket; - if ((_udpV4Socket)&&(_udpV4Socket->_sock > nfds)) - nfds = _udpV4Socket->_sock; - if ((_udpV6Socket)&&(_udpV6Socket->_sock > nfds)) - nfds = _udpV6Socket->_sock; - Mutex::Lock _l(_tcpSockets_m); - for(std::map< InetAddress,SharedPtr<Socket> >::const_iterator s(_tcpSockets.begin());s!=_tcpSockets.end();++s) { - if (s->second->_sock > nfds) - nfds = s->second->_sock; - } - _nfds = nfds; - } + // Called in SocketManager to recompute _nfds for select() based implementation + void _updateNfds(); #ifdef __WINDOWS__ SOCKET _whackSendPipe; diff --git a/node/TcpSocket.cpp b/node/TcpSocket.cpp index dce7d3a7..a422dec6 100644 --- a/node/TcpSocket.cpp +++ b/node/TcpSocket.cpp @@ -106,7 +106,7 @@ bool TcpSocket::send(const InetAddress &to,const void *msg,unsigned int msglen) // If no output was enqueued before this, try to send() it and then // start a queued write if any remains after that. - int n = (int)::send(_sock,_outbuf,_outptr,0); + int n = (int)::send(_sock,(const char *)_outbuf,_outptr,0); if (n > 0) memmove(_outbuf,_outbuf + (unsigned int)n,_outptr -= (unsigned int)n); @@ -125,7 +125,7 @@ bool TcpSocket::notifyAvailableForRead(const SharedPtr<Socket> &self,SocketManag // will not be called concurrently since only SocketManager::poll() calls this - int n = (int)::recv(_sock,buf,sizeof(buf),0); + int n = (int)::recv(_sock,(char *)buf,sizeof(buf),0); if (n <= 0) return false; // read error, stream probably closed @@ -163,7 +163,7 @@ bool TcpSocket::notifyAvailableForWrite(const SharedPtr<Socket> &self,SocketMana _connecting = false; if (_outptr) { - int n = (int)::send(_sock,_outbuf,_outptr,0); + int n = (int)::send(_sock,(const char *)_outbuf,_outptr,0); if (n < 0) { switch(errno) { #ifdef EBADF diff --git a/node/UdpSocket.cpp b/node/UdpSocket.cpp index 8730428f..8fce9588 100644 --- a/node/UdpSocket.cpp +++ b/node/UdpSocket.cpp @@ -94,7 +94,7 @@ bool UdpSocket::notifyAvailableForRead(const SharedPtr<Socket> &self,SocketManag Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> buf; InetAddress from; socklen_t salen = from.saddrSpaceLen(); - int n = (int)recvfrom(_sock,buf.data(),ZT_SOCKET_MAX_MESSAGE_LEN,0,from.saddr(),&salen); + int n = (int)recvfrom(_sock,(char *)(buf.data()),ZT_SOCKET_MAX_MESSAGE_LEN,0,from.saddr(),&salen); if (n > 0) { buf.setSize((unsigned int)n); sm->handleReceivedPacket(self,from,buf); |