summaryrefslogtreecommitdiff
path: root/osnet
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2014-10-21 15:18:50 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2014-10-21 15:18:50 -0700
commit2436e22f46b97b281e5fcf861ba091c9cb70a76f (patch)
tree874024c3033711788ea0e6733f1f909c4a2e65ba /osnet
parent128a13107023075a8167bfdfb8ed9d404bd1dccd (diff)
downloadinfinitytier-2436e22f46b97b281e5fcf861ba091c9cb70a76f.tar.gz
infinitytier-2436e22f46b97b281e5fcf861ba091c9cb70a76f.zip
More work on abstracting socket manager.
Diffstat (limited to 'osnet')
-rw-r--r--osnet/NativeSocketManager.cpp33
-rw-r--r--osnet/NativeSocketManager.hpp29
2 files changed, 17 insertions, 45 deletions
diff --git a/osnet/NativeSocketManager.cpp b/osnet/NativeSocketManager.cpp
index a85394ed..70c4df73 100644
--- a/osnet/NativeSocketManager.cpp
+++ b/osnet/NativeSocketManager.cpp
@@ -79,9 +79,6 @@ public:
NativeSocket(const Type &t,int s) : Socket(t),_sock(s) {}
int _sock;
#endif
-
- virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm) = 0;
- virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) = 0;
};
/**
@@ -122,7 +119,7 @@ public:
}
}
- virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm)
+ inline bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg)
{
Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> buf;
InetAddress from;
@@ -130,12 +127,14 @@ public:
int n = (int)recvfrom(_sock,(char *)(buf.data()),ZT_SOCKET_MAX_MESSAGE_LEN,0,from.saddr(),&salen);
if (n > 0) {
buf.setSize((unsigned int)n);
- sm->handleReceivedPacket(self,from,buf);
+ try {
+ handler(self,arg,from,buf);
+ } catch ( ... ) {} // handlers should not throw
}
return true;
}
- virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm)
+ inline bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm)
{
return true;
}
@@ -226,7 +225,7 @@ public:
return true;
}
- virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm)
+ inline bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg)
{
unsigned char buf[65536];
@@ -251,7 +250,7 @@ public:
Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> data(_inbuf + 5,pl - 5);
memmove(_inbuf,_inbuf + pl,p -= pl);
try {
- sm->handleReceivedPacket(self,_remote,data);
+ handler(self,arg,_remote,data);
} catch ( ... ) {} // handlers should not throw
pl = 0;
}
@@ -261,7 +260,7 @@ public:
return true;
}
- virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm)
+ inline bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm)
{
Mutex::Lock _l(_writeLock);
@@ -343,12 +342,8 @@ static inline void winPipeHack(SOCKET fds[2])
}
#endif
-NativeSocketManager::NativeSocketManager(
- int localUdpPort,
- int localTcpPort,
- void (*packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),
- void *arg) :
- SocketManager(packetHandler,arg),
+NativeSocketManager::NativeSocketManager(int localUdpPort,int localTcpPort) :
+ SocketManager(),
_whackSendPipe(INVALID_SOCKET),
_whackReceivePipe(INVALID_SOCKET),
_tcpV4ListenSocket(INVALID_SOCKET),
@@ -707,7 +702,7 @@ bool NativeSocketManager::send(const InetAddress &to,bool tcp,bool autoConnectTc
return false;
}
-void NativeSocketManager::poll(unsigned long timeout)
+void NativeSocketManager::poll(unsigned long timeout,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg)
{
fd_set rfds,wfds,efds;
struct timeval tv;
@@ -834,11 +829,11 @@ void NativeSocketManager::poll(unsigned long timeout)
{
NativeUdpSocket *usock = (NativeUdpSocket *)_udpV4Socket.ptr();
if ((usock)&&(FD_ISSET(usock->_sock,&rfds))) {
- usock->notifyAvailableForRead(_udpV4Socket,this);
+ usock->notifyAvailableForRead(_udpV4Socket,this,handler,arg);
}
usock = (NativeUdpSocket *)_udpV6Socket.ptr();
if ((usock)&&(FD_ISSET(usock->_sock,&rfds))) {
- usock->notifyAvailableForRead(_udpV6Socket,this);
+ usock->notifyAvailableForRead(_udpV6Socket,this,handler,arg);
}
}
@@ -885,7 +880,7 @@ void NativeSocketManager::poll(unsigned long timeout)
}
}
if (FD_ISSET(tsock->_sock,&rfds)) {
- if (!tsock->notifyAvailableForRead(*s,this)) {
+ if (!tsock->notifyAvailableForRead(*s,this,handler,arg)) {
{
Mutex::Lock _l2(_tcpSockets_m);
_tcpSockets.erase(tsock->_remote);
diff --git a/osnet/NativeSocketManager.hpp b/osnet/NativeSocketManager.hpp
index d6b014cb..64ac5d9c 100644
--- a/osnet/NativeSocketManager.hpp
+++ b/osnet/NativeSocketManager.hpp
@@ -58,9 +58,7 @@ class NativeUdpSocket;
class NativeTcpSocket;
/**
- * Socket I/O multiplexer
- *
- * This wraps select(), epoll(), etc. and handles creation of Sockets.
+ * Native socket manager for Unix and Windows
*/
class NativeSocketManager : public SocketManager
{
@@ -68,36 +66,15 @@ class NativeSocketManager : public SocketManager
friend class NativeTcpSocket;
public:
- /**
- * @param localUdpPort Local UDP port to bind or 0 for no UDP support
- * @param localTcpPort Local TCP port to listen to or 0 for no incoming TCP connect support
- * @param packetHandler Function to call when packets are received by a socket
- * @param arg Second argument to packetHandler()
- * @throws std::runtime_error Could not bind local port(s) or open socket(s)
- */
- NativeSocketManager(
- int localUdpPort,
- int localTcpPort,
- void (*packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),
- void *arg);
-
+ NativeSocketManager(int localUdpPort,int localTcpPort);
virtual ~NativeSocketManager();
virtual bool send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen);
- virtual void poll(unsigned long timeout);
+ virtual void poll(unsigned long timeout,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg);
virtual void whack();
virtual void closeTcpSockets();
private:
- // Called by socket implementations when a packet is received
- inline void handleReceivedPacket(const SharedPtr<Socket> &sock,const InetAddress &from,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &data)
- throw()
- {
- try {
- _packetHandler(sock,_arg,from,data);
- } catch ( ... ) {} // handlers shouldn't throw
- }
-
// Used by TcpSocket to register/unregister for write availability notification
void _startNotifyWrite(const NativeSocket *sock);
void _stopNotifyWrite(const NativeSocket *sock);