summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--node/IpcListener.cpp40
-rw-r--r--node/SocketManager.cpp14
-rw-r--r--node/SocketManager.hpp23
3 files changed, 56 insertions, 21 deletions
diff --git a/node/IpcListener.cpp b/node/IpcListener.cpp
index 94cf065f..306231a1 100644
--- a/node/IpcListener.cpp
+++ b/node/IpcListener.cpp
@@ -38,6 +38,8 @@
#else
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/stat.h>
+#include <sys/types.h>
#include <unistd.h>
#endif
@@ -56,29 +58,29 @@ IpcListener::IpcListener(const char *ep,void (*commandHandler)(void *,IpcConnect
strncpy(unaddr.sun_path,_endpoint.c_str(),sizeof(unaddr.sun_path));
unaddr.sun_path[sizeof(unaddr.sun_path) - 1] = (char)0;
- for(int tries=0;tries<3;++tries) {
- _sock = socket(AF_UNIX,SOCK_STREAM,0);
- if (_sock <= 0)
+ struct stat stattmp;
+ if (stat(_endpoint.c_str(),&stattmp)) {
+ int testSock = socket(AF_UNIX,SOCK_STREAM,0);
+ if (testSock <= 0)
throw std::runtime_error("unable to create socket of type AF_UNIX");
- if (bind(_sock,(struct sockaddr *)&unaddr,sizeof(unaddr))) {
- ::close(_sock);
- if (errno == EADDRINUSE) {
- int testSock = socket(AF_UNIX,SOCK_STREAM,0);
- if (testSock <= 0)
- throw std::runtime_error("unable to create socket of type AF_UNIX");
- if (connect(testSock,(struct sockaddr *)&unaddr,sizeof(unaddr))) {
- // error indicates nothing is listening on other end, so unlink and try again
- ::close(testSock);
- unlink(_endpoint.c_str());
- } else {
- // success means endpoint is being actively listened to by a process
- ::close(testSock);
- throw std::runtime_error("IPC endpoint address in use");
- }
- } else throw std::runtime_error("IPC endpoint could not be bound");
+ if (connect(testSock,(struct sockaddr *)&unaddr,sizeof(unaddr))) {
+ // error means nothing is listening, orphaned name
+ ::close(testSock);
+ } else {
+ // success means endpoint is being actively listened to by a process
+ ::close(testSock);
+ throw std::runtime_error("IPC endpoint address in use");
}
}
+ ::unlink(_endpoint.c_str());
+ _sock = socket(AF_UNIX,SOCK_STREAM,0);
+ if (_sock <= 0)
+ throw std::runtime_error("unable to create socket of type AF_UNIX");
+ if (bind(_sock,(struct sockaddr *)&unaddr,sizeof(unaddr))) {
+ ::close(_sock);
+ throw std::runtime_error("IPC endpoint could not be bound");
+ }
if (listen(_sock,8)) {
::close(_sock);
throw std::runtime_error("listen() failed for bound AF_UNIX socket");
diff --git a/node/SocketManager.cpp b/node/SocketManager.cpp
index 2c583f09..3c94f22c 100644
--- a/node/SocketManager.cpp
+++ b/node/SocketManager.cpp
@@ -316,6 +316,8 @@ SocketManager::SocketManager(
_udpV4Socket = SharedPtr<Socket>(new UdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V4,s));
}
}
+
+ _updateNfds();
}
SocketManager::~SocketManager()
@@ -370,7 +372,7 @@ void SocketManager::poll(unsigned long timeout)
tv.tv_sec = (long)(timeout / 1000);
tv.tv_usec = (long)((timeout % 1000) * 1000);
- select(_nfds,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0);
+ select(_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0);
if (FD_ISSET(_whackReceivePipe,&rfds)) {
char tmp[32];
@@ -396,6 +398,8 @@ void SocketManager::poll(unsigned long timeout)
_fdSetLock.lock();
FD_SET(sockfd,&_readfds);
_fdSetLock.unlock();
+ if (sockfd > _nfds)
+ _nfds = sockfd;
}
}
if ((_tcpV6ListenSocket != INVALID_SOCKET)&&(FD_ISSET(_tcpV6ListenSocket,&rfds))) {
@@ -413,6 +417,8 @@ void SocketManager::poll(unsigned long timeout)
_fdSetLock.lock();
FD_SET(sockfd,&_readfds);
_fdSetLock.unlock();
+ if (sockfd > _nfds)
+ _nfds = sockfd;
}
}
@@ -421,6 +427,7 @@ void SocketManager::poll(unsigned long timeout)
if ((_udpV6Socket)&&(FD_ISSET(_udpV6Socket->_sock,&rfds)))
_udpV6Socket->notifyAvailableForRead(_udpV6Socket,this);
+ bool closedSockets = false;
{ // grab copy of TCP sockets list because _tcpSockets[] might be changed in a handler
Mutex::Lock _l2(_tcpSockets_m);
if (_tcpSockets.size()) {
@@ -436,6 +443,7 @@ void SocketManager::poll(unsigned long timeout)
FD_CLR(s->second->_sock,&_writefds);
_fdSetLock.unlock();
_tcpSockets.erase(s++);
+ closedSockets = true;
}
}
}
@@ -451,6 +459,7 @@ void SocketManager::poll(unsigned long timeout)
FD_CLR((*s)->_sock,&_readfds);
FD_CLR((*s)->_sock,&_writefds);
_fdSetLock.unlock();
+ closedSockets = true;
continue;
}
}
@@ -464,10 +473,13 @@ void SocketManager::poll(unsigned long timeout)
FD_CLR((*s)->_sock,&_readfds);
FD_CLR((*s)->_sock,&_writefds);
_fdSetLock.unlock();
+ closedSockets = true;
continue;
}
}
}
+ if (closedSockets)
+ _updateNfds();
}
void SocketManager::whack()
diff --git a/node/SocketManager.hpp b/node/SocketManager.hpp
index 9f980e37..e6e87a1f 100644
--- a/node/SocketManager.hpp
+++ b/node/SocketManager.hpp
@@ -169,6 +169,27 @@ private:
#endif
}
+ inline void _updateNfds()
+ {
+ int nfds = _whackSendPipe;
+ if (_whackReceivePipe > nfds)
+ nfds = _whackReceivePipe;
+ if (_tcpV4ListenSocket > nfds)
+ nfds = _tcpV4ListenSocket;
+ if (_tcpV6ListenSocket > nfds)
+ nfds = _tcpV6ListenSocket;
+ if ((_udpV4Socket)&&(_udpV4Socket->_sock > nfds))
+ nfds = _udpV4Socket->_sock;
+ if ((_udpV6Socket)&&(_udpV6Socket->_sock > nfds))
+ nfds = _udpV6Socket->_sock;
+ Mutex::Lock _l(_tcpSockets_m);
+ for(std::map< InetAddress,SharedPtr<Socket> >::const_iterator s(_tcpSockets.begin());s!=_tcpSockets.end();++s) {
+ if (s->second->_sock > nfds)
+ nfds = s->second->_sock;
+ }
+ _nfds = nfds;
+ }
+
#ifdef __WINDOWS__
SOCKET _whackSendPipe;
SOCKET _whackReceivePipe;
@@ -187,7 +208,7 @@ private:
fd_set _readfds;
fd_set _writefds;
- int _nfds;
+ volatile int _nfds;
Mutex _fdSetLock;
std::map< InetAddress,SharedPtr<Socket> > _tcpSockets;