diff options
author | Joseph Henry <josephjah@gmail.com> | 2016-01-11 10:12:59 -0800 |
---|---|---|
committer | Joseph Henry <josephjah@gmail.com> | 2016-01-11 10:12:59 -0800 |
commit | 3e65ecb93d62d628b99d68fec8b1ec94993f4f09 (patch) | |
tree | 763cf3956766a28f6c767c067eabec49e039ecff /netcon | |
parent | ff9317365a1d9498076b0971f0209fc2aec5c142 (diff) | |
download | infinitytier-3e65ecb93d62d628b99d68fec8b1ec94993f4f09.tar.gz infinitytier-3e65ecb93d62d628b99d68fec8b1ec94993f4f09.zip |
Stateless RPC rework
Diffstat (limited to 'netcon')
-rw-r--r-- | netcon/Intercept.c | 262 | ||||
-rw-r--r-- | netcon/Intercept.h | 31 | ||||
-rw-r--r-- | netcon/NetconEthernetTap.cpp | 1189 | ||||
-rw-r--r-- | netcon/NetconEthernetTap.hpp | 35 | ||||
-rw-r--r-- | netcon/RPC.c | 114 | ||||
-rw-r--r-- | netcon/RPC.h | 28 | ||||
-rw-r--r-- | netcon/common.inc.c | 2 | ||||
-rw-r--r-- | netcon/docker-test/e5cd7a9e1c3511dd.conf | 0 |
8 files changed, 688 insertions, 973 deletions
diff --git a/netcon/Intercept.c b/netcon/Intercept.c index 5f6e37cb..e89f6a9f 100644 --- a/netcon/Intercept.c +++ b/netcon/Intercept.c @@ -58,40 +58,6 @@ #include "RPC.h" #include "common.inc.c" -/* Global Declarations */ -static int (*realconnect)(CONNECT_SIG); -static int (*realbind)(BIND_SIG); -static int (*realaccept)(ACCEPT_SIG); -static int (*reallisten)(LISTEN_SIG); -static int (*realsocket)(SOCKET_SIG); -static int (*realsetsockopt)(SETSOCKOPT_SIG); -static int (*realgetsockopt)(GETSOCKOPT_SIG); -static int (*realaccept4)(ACCEPT4_SIG); -static long (*realsyscall)(SYSCALL_SIG); -static int (*realclose)(CLOSE_SIG); -static int (*realclone)(CLONE_SIG); -static int (*realdup2)(DUP2_SIG); -static int (*realdup3)(DUP3_SIG); -static int (*realgetsockname)(GETSOCKNAME_SIG); - -/* Exported Function Prototypes */ -void my_init(void); -int connect(CONNECT_SIG); -int bind(BIND_SIG); -int accept(ACCEPT_SIG); -int listen(LISTEN_SIG); -int socket(SOCKET_SIG); -int setsockopt(SETSOCKOPT_SIG); -int getsockopt(GETSOCKOPT_SIG); -int accept4(ACCEPT4_SIG); -long syscall(SYSCALL_SIG); -int close(CLOSE_SIG); -int clone(CLONE_SIG); -int dup2(DUP2_SIG); -int dup3(DUP3_SIG); -int getsockname(GETSOCKNAME_SIG); - -static int init_service_connection(); static void load_symbols(void); static void set_up_intercept(); @@ -99,45 +65,34 @@ static void set_up_intercept(); ------------------- Intercept<--->Service Comm mechanisms ---------------------- ------------------------------------------------------------------------------*/ -static int rpcfd = -1; /* used for fd-transfers */ static int thispid = -1; -static int instance_count = 0; - -static int connected_to_service() { - return rpcfd == -1 ? 0 : 1; -} +char *network_pathname; /* Check whether the socket is mapped to the service or not. We need to know if this is a regular AF_LOCAL socket or an end of a socketpair that the service uses. We don't want to keep state in the intercept, so we simply ask the service via an RPC */ -static int is_mapped_to_service(int sockfd) -{ - if(rpcfd < 0) - return 0; /* no connection obviously implies no mapping */ - dwr(MSG_DEBUG,"is_mapped_to_service()\n"); - return rpc_send_command(RPC_MAP_REQ, rpcfd, &sockfd, sizeof(sockfd)); -} -/* Sets up the connection pipes and sockets to the service */ -static int init_service_connection() +static int connected_to_service(int sockfd) { - const char *network_id; - char rpcname[1024]; - network_id = getenv("ZT_NC_NETWORK"); - /* Do noting if not configured (sanity check -- should never get here in this case) */ - if (!network_id){ - fprintf(stderr, "init_service_connection(): ZT_NC_NETWORK not set.\n"); - exit(0); - } - if((rpcfd < 0 && instance_count==0) || thispid != getpid()) - rpc_mutex_init(); - - strncpy(rpcname,network_id,sizeof(rpcname)); - instance_count++; - return rpc_join(rpcname); + dwr(MSG_DEBUG_EXTRA,"connected_to_service():\n"); + socklen_t len; + struct sockaddr_storage addr; + len = sizeof addr; + struct sockaddr_un * addr_un; + getpeername(sockfd, (struct sockaddr*)&addr, &len); + if (addr.ss_family == AF_LOCAL || addr.ss_family == AF_LOCAL) { + addr_un = (struct sockaddr_un*)&addr; + if(strcmp(addr_un->sun_path, network_pathname) == 0) { + dwr(MSG_DEBUG_EXTRA,"connected_to_service(): Yes, %s\n", addr_un->sun_path); + return 1; + } + } + dwr(MSG_DEBUG_EXTRA,"connected_to_service(): Not connected to service\n"); + return 0; } + /*------------------------------------------------------------------------------ ------------------------ ctors and dtors (and friends) ------------------------ ------------------------------------------------------------------------------*/ @@ -154,7 +109,6 @@ static void load_symbols(void) dwr(MSG_DEBUG,"detected duplicate call to global constructor (pid=%d).\n", thispid); } thispid = getpid(); - realconnect = dlsym(RTLD_NEXT, "connect"); realbind = dlsym(RTLD_NEXT, "bind"); realaccept = dlsym(RTLD_NEXT, "accept"); @@ -179,9 +133,12 @@ static void _init(void) { set_up_intercept(); } /* get symbols and initialize mutexes */ static void set_up_intercept() { + network_pathname = getenv("ZT_NC_NETWORK"); + dwr(MSG_DEBUG,"Connecting to service at: %s\n", network_pathname); if (!getenv("ZT_NC_NETWORK")) return; /* Hook/intercept Posix net API symbols */ + rpc_mutex_init(); load_symbols(); } @@ -193,11 +150,10 @@ static void set_up_intercept() int setsockopt(SETSOCKOPT_SIG) { if(realsetsockopt == NULL){ - dwr(MSG_ERROR, "setsockopt(): SYMBOL NOT FOUND.\n"); + dwr(MSG_ERROR,"setsockopt(): SYMBOL NOT FOUND.\n"); return -1; } dwr(MSG_DEBUG,"setsockopt(%d)\n", socket); - /* return(realsetsockopt(socket, level, option_name, option_value, option_len)); */ if(level == SOL_IPV6 && option_name == IPV6_V6ONLY) return 0; @@ -209,9 +165,8 @@ int setsockopt(SETSOCKOPT_SIG) if(socket == STDIN_FILENO || socket == STDOUT_FILENO || socket == STDERR_FILENO) return(realsetsockopt(socket, level, option_name, option_value, option_len)); int err = realsetsockopt(socket, level, option_name, option_value, option_len); - if(err < 0){ + if(err < 0) perror("setsockopt():\n"); - } return 0; } @@ -223,19 +178,13 @@ int setsockopt(SETSOCKOPT_SIG) int getsockopt(GETSOCKOPT_SIG) { if(realgetsockopt == NULL){ - dwr(MSG_ERROR, "getsockopt(): SYMBOL NOT FOUND.\n"); + dwr(MSG_ERROR,"getsockopt(): SYMBOL NOT FOUND.\n"); return -1; } dwr(MSG_DEBUG,"getsockopt(%d)\n", sockfd); - - if(is_mapped_to_service(sockfd) <= 0) { // First, check if the service manages this + if(!connected_to_service(sockfd)) { return realgetsockopt(sockfd, level, optname, optval, optlen); } - - //int err = realgetsockopt(sockfd, level, optname, optval, optlen); - /* TODO: this condition will need a little more intelligence later on - -- we will need to know if this fd is a local we are spoofing, or a true local */ - if(optname == SO_TYPE) { int* val = (int*)optval; *val = 2; @@ -244,7 +193,6 @@ int getsockopt(GETSOCKOPT_SIG) return 0; } - /*------------------------------------------------------------------------------ ----------------------------------- socket() ----------------------------------- ------------------------------------------------------------------------------*/ @@ -255,9 +203,7 @@ int socket(SOCKET_SIG) { if(realsocket == NULL) set_up_intercept(); - dwr(MSG_DEBUG,"socket():\n"); - int newfd = -1; /* Check that type makes sense */ int flags = socket_type & ~SOCK_TYPE_MASK; if (flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK)) { @@ -275,7 +221,6 @@ int socket(SOCKET_SIG) return -1; } /* TODO: detect ENFILE condition */ - if(socket_family == AF_LOCAL || socket_family == AF_NETLINK || socket_family == AF_UNIX) { @@ -283,35 +228,14 @@ int socket(SOCKET_SIG) dwr(MSG_DEBUG,"realsocket() = %d\n", err); return err; } - - rpcfd = !connected_to_service() ? init_service_connection() : rpcfd; - if(rpcfd < 0) { - dwr(MSG_DEBUG,"BAD service connection. exiting.\n"); - exit(-1); - } - /* Assemble and send RPC */ struct socket_st rpc_st; rpc_st.socket_family = socket_family; rpc_st.socket_type = socket_type; rpc_st.protocol = protocol; rpc_st.__tid = syscall(SYS_gettid); - - newfd = rpc_send_command(RPC_SOCKET, rpcfd, &rpc_st, sizeof(struct socket_st)); - if(newfd > 0) - { - dwr(MSG_DEBUG,"sending fd = %d to Service over (%d)\n", newfd, rpcfd); - /* send our local-fd number back to service so - it can complete its mapping table entry */ - /* send fd mapping and get confirmation */ - if(rpc_send_command(RPC_MAP, rpcfd, &newfd, sizeof(newfd)) > -1) { - errno = ERR_OK; - dwr(MSG_DEBUG, "RXd fd confirmation. Mapped!\n"); - return newfd; /* Mapping complete, everything is OK */ - } - } - dwr(MSG_DEBUG,"Error while receiving new fd.\n"); - return -1; + /* -1 is passed since we we're generating the new socket in this call */ + return rpc_send_command(RPC_SOCKET, -1, &rpc_st, sizeof(struct socket_st)); } /*------------------------------------------------------------------------------ @@ -323,13 +247,12 @@ int socket(SOCKET_SIG) int connect(CONNECT_SIG) { if(realconnect == NULL){ - dwr(MSG_ERROR, "connect(): SYMBOL NOT FOUND.\n"); + dwr(MSG_ERROR,"connect(): SYMBOL NOT FOUND.\n"); return -1; } dwr(MSG_DEBUG,"connect(%d):\n", __fd); struct sockaddr_in *connaddr; connaddr = (struct sockaddr_in *) __addr; - /* Check that this is a valid fd */ if(fcntl(__fd, F_GETFD) < 0) { errno = EBADF; @@ -347,8 +270,6 @@ int connect(CONNECT_SIG) errno = EAFNOSUPPORT; return -1; } - /* FIXME: Check that address is in user space, return EFAULT ? */ - /* make sure we don't touch any standard outputs */ if(__fd == STDIN_FILENO || __fd == STDOUT_FILENO || __fd == STDERR_FILENO) return(realconnect(__fd, __addr, __len)); @@ -358,18 +279,15 @@ int connect(CONNECT_SIG) || connaddr->sin_family == AF_NETLINK || connaddr->sin_family == AF_UNIX)) { int err = realconnect(__fd, __addr, __len); - //perror("connect():"); return err; } - /* Assemble and send RPC */ struct connect_st rpc_st; rpc_st.__tid = syscall(SYS_gettid); rpc_st.__fd = __fd; memcpy(&rpc_st.__addr, __addr, sizeof(struct sockaddr_storage)); memcpy(&rpc_st.__len, &__len, sizeof(socklen_t)); - - return rpc_send_command(RPC_CONNECT, rpcfd, &rpc_st, sizeof(struct connect_st)); + return rpc_send_command(RPC_CONNECT, __fd, &rpc_st, sizeof(struct connect_st)); } /*------------------------------------------------------------------------------ @@ -381,7 +299,7 @@ int connect(CONNECT_SIG) int bind(BIND_SIG) { if(realbind == NULL){ - dwr(MSG_ERROR, "bind(): SYMBOL NOT FOUND.\n"); + dwr(MSG_ERROR,"bind(): SYMBOL NOT FOUND.\n"); return -1; } dwr(MSG_DEBUG,"bind(%d):\n", sockfd); @@ -397,11 +315,9 @@ int bind(BIND_SIG) errno = ENOTSOCK; return -1; } - /* make sure we don't touch any standard outputs */ if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO) return(realbind(sockfd, addr, addrlen)); - /* If local, just use normal syscall */ struct sockaddr_in *connaddr; connaddr = (struct sockaddr_in *)addr; @@ -413,7 +329,6 @@ int bind(BIND_SIG) dwr(MSG_DEBUG,"realbind, err = %d\n", err); return err; } - int port = connaddr->sin_port; int ip = connaddr->sin_addr.s_addr; unsigned char d[4]; @@ -421,28 +336,25 @@ int bind(BIND_SIG) d[1] = (ip >> 8) & 0xFF; d[2] = (ip >> 16) & 0xFF; d[3] = (ip >> 24) & 0xFF; - dwr(MSG_DEBUG, "bind(): %d.%d.%d.%d: %d\n", d[0],d[1],d[2],d[3], ntohs(port)); - + dwr(MSG_DEBUG,"bind(): %d.%d.%d.%d: %d\n", d[0],d[1],d[2],d[3], ntohs(port)); /* Assemble and send RPC */ struct bind_st rpc_st; rpc_st.sockfd = sockfd; rpc_st.__tid = syscall(SYS_gettid); memcpy(&rpc_st.addr, addr, sizeof(struct sockaddr_storage)); memcpy(&rpc_st.addrlen, &addrlen, sizeof(socklen_t)); - - return rpc_send_command(RPC_BIND, rpcfd, &rpc_st, sizeof(struct bind_st)); + return rpc_send_command(RPC_BIND, sockfd, &rpc_st, sizeof(struct bind_st)); } /*------------------------------------------------------------------------------ ----------------------------------- accept4() ---------------------------------- ------------------------------------------------------------------------------*/ - /* int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags */ int accept4(ACCEPT4_SIG) { if(realaccept4 == NULL){ - dwr(MSG_ERROR, "accept4(): SYMBOL NOT FOUND.\n"); + dwr(MSG_ERROR,"accept4(): SYMBOL NOT FOUND.\n"); return -1; } dwr(MSG_DEBUG,"accept4(%d):\n", sockfd); @@ -450,8 +362,7 @@ int accept4(ACCEPT4_SIG) fcntl(sockfd, F_SETFL, FD_CLOEXEC); if ((flags & SOCK_NONBLOCK)) fcntl(sockfd, F_SETFL, O_NONBLOCK); - int newfd = accept(sockfd, addr, addrlen); - return newfd; + return accept(sockfd, addr, addrlen); } /*------------------------------------------------------------------------------ @@ -463,7 +374,7 @@ int accept4(ACCEPT4_SIG) int accept(ACCEPT_SIG) { if(realaccept == NULL){ - dwr(MSG_ERROR, "accept(): SYMBOL NOT FOUND.\n"); + dwr(MSG_ERROR,"accept(): SYMBOL NOT FOUND.\n"); return -1; } dwr(MSG_DEBUG,"accept(%d):\n", sockfd); @@ -502,39 +413,27 @@ int accept(ACCEPT_SIG) dwr(MSG_DEBUG,"EINVAL\n"); return -1; } - /* redirect calls for standard I/O descriptors to kernel */ if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO){ dwr(MSG_DEBUG,"realaccept():\n"); return(realaccept(sockfd, addr, addrlen)); } - if(addr) addr->sa_family = AF_INET; - /* TODO: also get address info */ /* The following line is required for libuv/nodejs to accept connections properly, however, this has the side effect of causing certain webservers to max out the CPU in an accept loop */ //fcntl(sockfd, F_SETFL, SOCK_NONBLOCK); - int new_conn_socket = get_new_fd(sockfd); - - if(new_conn_socket > 0) - { - dwr(MSG_DEBUG, "accept(): RX: fd = (%d) over (%d)\n", new_conn_socket, rpcfd); - /* Send our local-fd number back to service so it can complete its mapping table */ - dwr(MSG_DEBUG, "accept(): sending perceived fd (%d) to service.\n", new_conn_socket); - rpc_send_command(RPC_MAP, rpcfd, &new_conn_socket, sizeof(new_conn_socket)); - dwr(MSG_DEBUG,"accept()=%d\n", new_conn_socket); + int new_fd = get_new_fd(sockfd); + if(new_fd > 0) { errno = ERR_OK; - return new_conn_socket; /* OK */ + return new_fd; } - dwr(MSG_DEBUG, "accept(): EAGAIN - Error reading signal byte from service"); errno = EAGAIN; return -EAGAIN; } - /*------------------------------------------------------------------------------ ------------------------------------- listen()---------------------------------- ------------------------------------------------------------------------------*/ @@ -543,7 +442,7 @@ int accept(ACCEPT_SIG) int listen(LISTEN_SIG) { if(reallisten == NULL){ - dwr(MSG_ERROR, "listen(): SYMBOL NOT FOUND.\n"); + dwr(MSG_ERROR,"listen(): SYMBOL NOT FOUND.\n"); return -1; } dwr(MSG_DEBUG,"listen(%d):\n", sockfd); @@ -565,25 +464,19 @@ int listen(LISTEN_SIG) errno = EOPNOTSUPP; return -1; } - /* make sure we don't touch any standard outputs */ if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO) return(reallisten(sockfd, backlog)); - if(is_mapped_to_service(sockfd) < 0) { - /* We now know this socket is not one of our socketpairs */ - int err = reallisten(sockfd, backlog); - dwr(MSG_DEBUG,"reallisten()=%d\n", err); - return err; + if(!connected_to_service(sockfd)) { + reallisten(sockfd, backlog); } - /* Assemble and send RPC */ struct listen_st rpc_st; rpc_st.sockfd = sockfd; rpc_st.backlog = backlog; rpc_st.__tid = syscall(SYS_gettid); - - return rpc_send_command(RPC_LISTEN, rpcfd, &rpc_st, sizeof(struct listen_st)); + return rpc_send_command(RPC_LISTEN, sockfd, &rpc_st, sizeof(struct listen_st)); } /*------------------------------------------------------------------------------ @@ -594,12 +487,12 @@ int listen(LISTEN_SIG) int clone(CLONE_SIG) { if(realclone == NULL){ - dwr(MSG_ERROR, "clone(): SYMBOL NOT FOUND.\n"); + dwr(MSG_ERROR,"clone(): SYMBOL NOT FOUND.\n"); return -1; } dwr(MSG_DEBUG,"clone()\n"); int err = realclone(fn, child_stack, flags, arg); - init_service_connection(); + set_up_intercept(); return err; } @@ -612,64 +505,26 @@ int close(CLOSE_SIG) { dwr(MSG_DEBUG, "close(%d)\n", fd); if(realclose == NULL) - init_service_connection(); - if(fd == rpcfd) - return -1; /* TODO: Ignore request to shut down our rpc fd, this is *almost always* safe */ - if(fd != STDIN_FILENO && fd != STDOUT_FILENO && fd != STDERR_FILENO) - return realclose(fd); - return -1; -} - -/*------------------------------------------------------------------------------ --------------------------------------- dup2() ---------------------------------- -------------------------------------------------------------------------------*/ - -/* int oldfd, int newfd */ -int dup2(DUP2_SIG) -{ - if(realdup2 == NULL){ - dwr(MSG_ERROR, "dup2(): SYMBOL NOT FOUND.\n"); - return -1; - } - dwr(MSG_DEBUG,"dup2(%d, %d)\n", oldfd, newfd); - if(oldfd == rpcfd) { - dwr(MSG_DEBUG,"client application attempted to dup2 RPC socket (%d). This is not allowed.\n", oldfd); - errno = EBADF; - return -1; - } - return realdup2(oldfd, newfd); -} - -/*------------------------------------------------------------------------------ --------------------------------------- dup3() ---------------------------------- -------------------------------------------------------------------------------*/ - -/* int oldfd, int newfd, int flags */ -int dup3(DUP3_SIG) -{ - if(realdup3 == NULL){ - dwr(MSG_ERROR, "dup3(): SYMBOL NOT FOUND.\n"); - return -1; - } - dwr(MSG_DEBUG,"dup3(%d, %d, %d)\n", oldfd, newfd, flags); - return realdup3(oldfd, newfd, flags); + set_up_intercept(); + return realclose(fd); } /*------------------------------------------------------------------------------ -------------------------------- getsockname() --------------------------------- ------------------------------------------------------------------------------*/ -/* define GETSOCKNAME_SIG int sockfd, struct sockaddr *addr, socklen_t *addrlen */ +/* int sockfd, struct sockaddr *addr, socklen_t *addrlen */ int getsockname(GETSOCKNAME_SIG) { if (realgetsockname == NULL) { - dwr(MSG_ERROR, "getsockname(): SYMBOL NOT FOUND. \n"); + dwr(MSG_ERROR,"getsockname(): SYMBOL NOT FOUND. \n"); return -1; } - dwr(MSG_DEBUG, "getsockname(%d)\n", sockfd); - if(!is_mapped_to_service(sockfd)) + dwr(MSG_DEBUG,"getsockname(%d)\n", sockfd); + if(connected_to_service(sockfd) == 0) { + dwr(MSG_DEBUG,"getsockname(): not used by service\n"); return realgetsockname(sockfd, addr, addrlen); - + } /* This is kind of a hack as it stands -- assumes sockaddr is sockaddr_in * and is an IPv4 address. */ @@ -678,12 +533,15 @@ int getsockname(GETSOCKNAME_SIG) rpc_st.sockfd = sockfd; memcpy(&rpc_st.addr, addr, *addrlen); memcpy(&rpc_st.addrlen, &addrlen, sizeof(socklen_t)); - rpc_send_command(RPC_GETSOCKNAME, rpcfd, &rpc_st, sizeof(struct getsockname_st)); - + int rpcfd = rpc_send_command(RPC_GETSOCKNAME, sockfd, &rpc_st, sizeof(struct getsockname_st)); /* read address info from service */ char addrbuf[sizeof(struct sockaddr_storage)]; memset(&addrbuf, 0, sizeof(struct sockaddr_storage)); - read(rpcfd, &addrbuf, sizeof(struct sockaddr_storage)); + + if(rpcfd > -1) + if(read(rpcfd, &addrbuf, sizeof(struct sockaddr_storage)) > 0) + close(rpcfd); + struct sockaddr_storage sock_storage; memcpy(&sock_storage, addrbuf, sizeof(struct sockaddr_storage)); *addrlen = sizeof(struct sockaddr_in); @@ -697,9 +555,7 @@ int getsockname(GETSOCKNAME_SIG) ------------------------------------------------------------------------------*/ long syscall(SYSCALL_SIG){ - - //dwr(MSG_DEBUG_EXTRA,"syscall(%u, ...):\n", number); - + dwr(MSG_DEBUG_EXTRA,"syscall(%u, ...):\n", number); va_list ap; uintptr_t a,b,c,d,e,f; va_start(ap, number); diff --git a/netcon/Intercept.h b/netcon/Intercept.h index fab96f05..3a362022 100644 --- a/netcon/Intercept.h +++ b/netcon/Intercept.h @@ -53,4 +53,35 @@ #define DUP2_SIG int oldfd, int newfd #define DUP3_SIG int oldfd, int newfd, int flags +void my_init(void); +int connect(CONNECT_SIG); +int bind(BIND_SIG); +int accept(ACCEPT_SIG); +int listen(LISTEN_SIG); +int socket(SOCKET_SIG); +int setsockopt(SETSOCKOPT_SIG); +int getsockopt(GETSOCKOPT_SIG); +int accept4(ACCEPT4_SIG); +long syscall(SYSCALL_SIG); +int close(CLOSE_SIG); +int clone(CLONE_SIG); +int dup2(DUP2_SIG); +int dup3(DUP3_SIG); +int getsockname(GETSOCKNAME_SIG); + +static int (*realconnect)(CONNECT_SIG); +static int (*realbind)(BIND_SIG); +static int (*realaccept)(ACCEPT_SIG); +static int (*reallisten)(LISTEN_SIG); +static int (*realsocket)(SOCKET_SIG); +static int (*realsetsockopt)(SETSOCKOPT_SIG); +static int (*realgetsockopt)(GETSOCKOPT_SIG); +static int (*realaccept4)(ACCEPT4_SIG); +static long (*realsyscall)(SYSCALL_SIG); +static int (*realclose)(CLOSE_SIG); +static int (*realclone)(CLONE_SIG); +static int (*realdup2)(DUP2_SIG); +static int (*realdup3)(DUP3_SIG); +static int (*realgetsockname)(GETSOCKNAME_SIG); + #endif diff --git a/netcon/NetconEthernetTap.cpp b/netcon/NetconEthernetTap.cpp index 0b74d5dc..8cf5f3ff 100644 --- a/netcon/NetconEthernetTap.cpp +++ b/netcon/NetconEthernetTap.cpp @@ -29,6 +29,10 @@ #include <utility> #include <dlfcn.h> #include <sys/poll.h> +#include <stdint.h> +#include <utility> +#include <string> +#include <sys/resource.h> #include "NetconEthernetTap.hpp" @@ -52,8 +56,8 @@ #define APPLICATION_POLL_FREQ 20 #define ZT_LWIP_TCP_TIMER_INTERVAL 5 -#define STATUS_TMR_INTERVAL 3000 // How often we check connection statuses -#define DEFAULT_READ_BUFFER_SIZE 1024 * 63 +#define STATUS_TMR_INTERVAL 1000 // How often we check connection statuses (in ms) +#define DEFAULT_READ_BUFFER_SIZE 1024 * 1024 namespace ZeroTier { @@ -118,21 +122,18 @@ static err_t low_level_output(struct netif *netif, struct pbuf *p) class TcpConnection { public: - int perceived_fd; - int their_fd; - bool pending; - bool listening; - int pid; - unsigned long written; - unsigned long acked; + uint64_t accept_token; + + bool pending, listening; + int pid, idx; + unsigned long written, acked; - PhySocket *rpcSock; - PhySocket *dataSock; + PhySocket *rpcsock; + PhySocket *sock; struct tcp_pcb *pcb; struct sockaddr_storage *addr; unsigned char buf[DEFAULT_READ_BUFFER_SIZE]; - int idx; }; /* @@ -180,7 +181,7 @@ NetconEthernetTap::NetconEthernetTap( lwipstack->lwip_init(); _unixListenSocket = _phy.unixListen(sockPath,(void *)this); - dwr(MSG_INFO, " NetconEthernetTap initialized!\n", _phy.getDescriptor(_unixListenSocket)); + dwr(MSG_INFO," NetconEthernetTap initialized!\n", _phy.getDescriptor(_unixListenSocket)); if (!_unixListenSocket) throw std::runtime_error(std::string("unable to bind to ")+sockPath); _thread = Thread::start(this); @@ -277,7 +278,7 @@ void NetconEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType // First pbuf gets ethernet header at start q = p; if (q->len < sizeof(ethhdr)) { - dwr(MSG_ERROR, "_put(): Dropped packet: first pbuf smaller than ethernet header\n"); + dwr(MSG_ERROR,"_put(): Dropped packet: first pbuf smaller than ethernet header\n"); return; } memcpy(q->payload,ðhdr,sizeof(ethhdr)); @@ -290,14 +291,14 @@ void NetconEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType dataptr += q->len; } } else { - dwr(MSG_ERROR, "put(): Dropped packet: no pbufs available\n"); + dwr(MSG_ERROR,"put(): Dropped packet: no pbufs available\n"); return; } { Mutex::Lock _l2(lwipstack->_lock); if(interface.input(p, &interface) != ERR_OK) { - dwr(MSG_ERROR, "put(): Error while RXing packet (netif->input)\n"); + dwr(MSG_ERROR,"put(): Error while RXing packet (netif->input)\n"); } } } @@ -335,190 +336,23 @@ void NetconEthernetTap::scanMulticastGroups(std::vector<MulticastGroup> &added,s _multicastGroups.swap(newGroups); } -TcpConnection *NetconEthernetTap::getConnectionByTheirFD(PhySocket *sock, int fd) +TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock) { for(size_t i=0; i<tcp_connections.size(); i++) { - if(tcp_connections[i]->perceived_fd == fd && tcp_connections[i]->rpcSock == sock) + if(tcp_connections[i]->sock == sock) return tcp_connections[i]; } return NULL; } -/* - * Dumps service state in 80x25 when debug mode is off - */ -void NetconEthernetTap::compact_dump() -{ - /* - clearscreen(); - gotoxy(0,0); - - fprintf(stderr, "ZeroTier - Network Containers Service [State Dump]\n\r"); - fprintf(stderr, " RPC Sockets = %d\n\r", rpc_sockets.size()); - fprintf(stderr, " TCP Connections = %d\n\r", tcp_connections.size()); - - for(size_t i=0; i<rpc_sockets.size(); i++) { - int rpc_fd = _phy.getDescriptor(rpc_sockets[i]); - char buf[80]; - int pid = pidmap[rpc_sockets[i]]; - memset(&buf, '\0', 80); - get_path_from_pid(buf, pid); - fprintf(stderr, "\n Client(addr=0x%x, rpc=%d, pid=%d) %s\n", rpc_sockets[i], rpc_fd, pid, buf); - for(size_t j=0; j<tcp_connections.size(); j++) { - memset(&buf, '\0', 80); - get_path_from_pid(buf, tcp_connections[j]->pid); - if(tcp_connections[j]->rpcSock==rpc_sockets[i]) { - fprintf(stderr, "\t\tpath\t\t= %s\n", buf); - } - } - } - */ - for(size_t i=0; i<rpc_sockets.size(); i++) { - fprintf(stderr, "\n\n\nrpc(%d)\n", _phy.getDescriptor(rpc_sockets[i])); - for(size_t j=0; j<tcp_connections.size(); j++) { - if(_phy.getDescriptor(tcp_connections[j]->rpcSock) == _phy.getDescriptor(rpc_sockets[i])) - fprintf(stderr, "\t(%d) ----> (%d)\n\n", _phy.getDescriptor(tcp_connections[j]->dataSock), tcp_connections[j]->perceived_fd); - } - } -} - -/* - * Dumps service state - */ -void NetconEthernetTap::dump() -{ - fprintf(stderr, "\n\n---\n\ndie(): BEGIN SERVICE STATE DUMP\n"); - fprintf(stderr, "*** IF YOU SEE THIS, EMAIL THE DUMP TEXT TO joseph.henry@zerotier.com ***\n"); - fprintf(stderr, " tcp_conns = %lu, rpc_socks = %lu\n", tcp_connections.size(), rpc_sockets.size()); - - // TODO: Add logic to detect bad mapping conditions - for(size_t i=0; i<rpc_sockets.size(); i++) { - for(size_t j=0; j<rpc_sockets.size(); j++) { - if(j != i && rpc_sockets[i] == rpc_sockets[j]) { - fprintf(stderr, "Duplicate PhySockets found! (0x%p)\n", rpc_sockets[i]); - } - } - } - - // Dump the state of the service mapping - for(size_t i=0; i<rpc_sockets.size(); i++) { - int rpc_fd = _phy.getDescriptor(rpc_sockets[i]); - char buf[80]; - int pid = pidmap[rpc_sockets[i]]; - get_path_from_pid(buf, pid); - - fprintf(stderr, "\nClient(addr=0x%p, rpc=%d, pid=%d) %s\n", rpc_sockets[i], rpc_fd, pid, buf); - for(size_t j=0; j<tcp_connections.size(); j++) { - get_path_from_pid(buf, tcp_connections[j]->pid); - if(tcp_connections[j]->rpcSock==rpc_sockets[i]){ - fprintf(stderr, " |\n"); - fprintf(stderr, " |-Connection(0x%p):\n", tcp_connections[j]); - fprintf(stderr, " | path\t\t\t= %s\n", buf); - fprintf(stderr, " | perceived_fd\t\t= %d\t(fd)\n", tcp_connections[j]->perceived_fd); - fprintf(stderr, " | their_fd\t\t= %d\t(fd)\n", tcp_connections[j]->their_fd); - fprintf(stderr, " | dataSock(0x%p)\t= %d\t(fd)\n", tcp_connections[j]->dataSock, _phy.getDescriptor(tcp_connections[j]->dataSock)); - fprintf(stderr, " | rpcSock(0x%p)\t= %d\t(fd)\n", tcp_connections[j]->rpcSock, _phy.getDescriptor(tcp_connections[j]->rpcSock)); - fprintf(stderr, " | pending\t\t= %d\n", tcp_connections[j]->pending); - fprintf(stderr, " | listening\t\t= %d\n", tcp_connections[j]->listening); - fprintf(stderr, " \\------pcb(0x%p)->state\t= %d\n", tcp_connections[j]->pcb, tcp_connections[j]->pcb->state); - } - } - } - fprintf(stderr, "\n\ndie(): END SERVICE STATE DUMP\n\n---\n\n"); -} - -/* - * Dumps service state and then exits - */ -void NetconEthernetTap::die(int exret) { - dump(); - exit(exret); -} - -/* - * Closes a TcpConnection and associated LWIP PCB strcuture. - */ -void NetconEthernetTap::closeConnection(TcpConnection *conn) -{ - if(!conn) - return; - dwr(MSG_DEBUG, " closeConnection(%x, %d)\n", conn->pcb, _phy.getDescriptor(conn->dataSock)); - if(lwipstack->_tcp_close(conn->pcb) != ERR_OK) { - dwr(MSG_ERROR, " closeConnection(): Error while calling tcp_close()\n"); - exit(0); - } - else { - if(conn->dataSock) { - close(_phy.getDescriptor(conn->dataSock)); - _phy.close(conn->dataSock,false); - } - /* Eventually we might want to use a map here instead */ - for(int i=0; i<tcp_connections.size(); i++) { - if(tcp_connections[i] == conn) { - tcp_connections.erase(tcp_connections.begin() + i); - delete conn; - break; - } - } - } -} - -/* - * Close a single RPC connection and associated PhySocket - */ -void NetconEthernetTap::closeClient(PhySocket *sock) -{ - for(size_t i=0; i<rpc_sockets.size(); i++) { - if(rpc_sockets[i] == sock){ - rpc_sockets.erase(rpc_sockets.begin() + i); - break; - } - } - close(_phy.getDescriptor(sock)); - _phy.close(sock); -} - -/* - * Close all RPC and TCP connections - */ -void NetconEthernetTap::closeAll() -{ - while(rpc_sockets.size()) - closeClient(rpc_sockets.front()); - while(tcp_connections.size()) - closeConnection(tcp_connections.front()); -} - -#include <sys/resource.h> - void NetconEthernetTap::threadMain() throw() { + dwr(MSG_DEBUG, "MEMP_NUM_REASSDATA = %d\n", MEMP_NUM_REASSDATA); uint64_t prev_tcp_time = 0; uint64_t prev_status_time = 0; uint64_t prev_etharp_time = 0; -/* - fprintf(stderr, "- MEM_SIZE = %dM\n", MEM_SIZE / (1024*1024)); - fprintf(stderr, "- PBUF_POOL_SIZE = %d\n", PBUF_POOL_SIZE); - fprintf(stderr, "- PBUF_POOL_BUFSIZE = %d\n", PBUF_POOL_BUFSIZE); - fprintf(stderr, "- MEMP_NUM_PBUF = %d\n", MEMP_NUM_PBUF); - fprintf(stderr, "- MEMP_NUM_TCP_PCB = %d\n", MEMP_NUM_TCP_PCB); - fprintf(stderr, "- MEMP_NUM_TCP_PCB_LISTEN = %d\n", MEMP_NUM_TCP_PCB_LISTEN); - fprintf(stderr, "- MEMP_NUM_TCP_SEG = %d\n\n", MEMP_NUM_TCP_SEG); - - fprintf(stderr, "- TCP_SND_BUF = %dK\n", TCP_SND_BUF / 1024); - fprintf(stderr, "- TCP_SND_QUEUELEN = %d\n\n", TCP_SND_QUEUELEN); - - fprintf(stderr, "- TCP_WND = %d\n", TCP_WND); - fprintf(stderr, "- TCP_MSS = %d\n", TCP_MSS); - fprintf(stderr, "- TCP_MAXRTX = %d\n", TCP_MAXRTX); - fprintf(stderr, "- IP_REASSEMBLY = %d\n\n", IP_REASSEMBLY); - fprintf(stderr, "- ARP_TMR_INTERVAL = %d\n", ARP_TMR_INTERVAL); - fprintf(stderr, "- TCP_TMR_INTERVAL = %d\n", TCP_TMR_INTERVAL); - fprintf(stderr, "- IP_TMR_INTERVAL = %d\n", IP_TMR_INTERVAL); -*/ - // Main timer loop while (_run) { uint64_t now = OSUtils::now(); @@ -531,58 +365,39 @@ void NetconEthernetTap::threadMain() // Connection prunning if (since_status >= STATUS_TMR_INTERVAL) { - //compact_dump(); prev_status_time = now; status_remaining = STATUS_TMR_INTERVAL - since_status; - if(rpc_sockets.size() || tcp_connections.size()) { - - // dump(); - // Here we will periodically check the list of rpc_sockets for those that - // do not currently have any data connection associated with them. If they are - // unused, then we will try to read from them, if they fail, we can safely assume - // that the client has closed their end and we can close ours - for(size_t i = 0; i<tcp_connections.size(); i++) { - if(tcp_connections[i]->listening) { - char c; - if (read(_phy.getDescriptor(tcp_connections[i]->dataSock), &c, 1) < 0) { - // Still in listening state - } - else if (read(_phy.getDescriptor(tcp_connections[i]->rpcSock), &c, 1) < 0) { - // Still in listening state - } - else { - // Here we should handle the case there there is incoming data (?) - dwr(MSG_DEBUG, " tap_thread(): Listening socketpair closed. Removing RPC connection (%d)\n", - _phy.getDescriptor(tcp_connections[i]->dataSock)); - closeConnection(tcp_connections[i]); - } - } - } - } - for(size_t i=0, associated = 0; i<rpc_sockets.size(); i++, associated = 0) { - for(size_t j=0; j<tcp_connections.size(); j++) { - if (tcp_connections[j]->rpcSock == rpc_sockets[i]) - associated++; - } - if(!associated){ - // No TCP connections are associated, this is a candidate for removal - int fd = _phy.getDescriptor(rpc_sockets[i]); - fcntl(fd, F_SETFL, O_NONBLOCK); - unsigned char tmpbuf[BUF_SZ]; - int n; - if((n = read(fd,&tmpbuf,BUF_SZ)) < 0) { - dwr(MSG_DEBUG, " tap_thread(): closing RPC (%d)\n", _phy.getDescriptor(rpc_sockets[i])); - closeClient(rpc_sockets[i]); - } - // < 0 is failure - // 0 nothing to read, RPC still active - // > 0 RPC data read, handle it - else if (n > 0) { - // Handle RPC call, this is rare - dwr(MSG_DEBUG, " tap_thread(): RPC read during connection check (%d bytes)\n", n); - phyOnUnixData(rpc_sockets[i],_phy.getuptr(rpc_sockets[i]),&tmpbuf,BUF_SZ); + + dwr(MSG_DEBUG," tap_thread(): tcp\\jobs\\socks = {%d, %d, %d}\n", tcp_connections.size(), jobmap.size(), sockmap.size()); + for(size_t i=0; i<tcp_connections.size(); i++) { + + // No TCP connections are associated, this is a candidate for removal + if(!tcp_connections[i]->sock) + continue; // Skip, this is a pending connection + int fd = _phy.getDescriptor(tcp_connections[i]->sock); + + if(tcp_connections[i]->idx > 0){ + dwr(MSG_DEBUG, "----------------------writing from poll\n"); + lwipstack->_lock.lock(); + handle_write(tcp_connections[i]); + lwipstack->_lock.unlock(); } + + fcntl(fd, F_SETFL, O_NONBLOCK); + unsigned char tmpbuf[BUF_SZ]; + int n; + + if((n = read(fd,&tmpbuf,BUF_SZ)) < 0 && errno != EAGAIN) { + dwr(MSG_DEBUG," tap_thread(): closing sock (%x)\n", tcp_connections[i]->sock); + closeConnection(tcp_connections[i]->sock); } + // < 0 is failure + // 0 nothing to read, RPC still active + // > 0 RPC data read, handle it + else if (n > 0) { + dwr(MSG_DEBUG," tap_thread(): data read during connection check (%d bytes)\n", n); + phyOnUnixData(tcp_connections[i]->sock,_phy.getuptr(tcp_connections[i]->sock),&tmpbuf,BUF_SZ); + } } } // Main TCP/ETHARP timer section @@ -600,7 +415,6 @@ void NetconEthernetTap::threadMain() } _phy.poll((unsigned long)std::min(tcp_remaining,etharp_remaining)); } - closeAll(); dlclose(lwipstack->_libref); } @@ -612,10 +426,37 @@ void NetconEthernetTap::phyOnTcpClose(PhySocket *sock,void **uptr) {} void NetconEthernetTap::phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} void NetconEthernetTap::phyOnTcpWritable(PhySocket *sock,void **uptr) {} +/* + * Closes a TcpConnection and associated LWIP PCB strcuture. + */ +void NetconEthernetTap::closeConnection(PhySocket *sock) +{ + TcpConnection *conn = getConnection(sock); + if(conn) { + if(!conn->pcb) + return; + // tell LWIP to close the associated PCB + if(conn->pcb->state != CLOSED && lwipstack->_tcp_close(conn->pcb) != ERR_OK) { + dwr(MSG_ERROR," closeConnection(): Error while calling tcp_close()\n"); + } + // remove from connection list + for(size_t i=0; i<tcp_connections.size(); i++) { + if(tcp_connections[i]->sock == sock){ + tcp_connections.erase(tcp_connections.begin() + i); + //delete conn; + break; + } + } + } + if(!sock) + return; + close(_phy.getDescriptor(sock)); // close underlying fd + _phy.close(sock, false); // close PhySocket +} + void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) { - dwr(MSG_DEBUG, " phyOnUnixClose(sock=0x%x, uptr=0x%x): fd = %d\n", sock, uptr, _phy.getDescriptor(sock)); - TcpConnection *conn = (TcpConnection*)*uptr; - closeConnection(conn); + dwr(MSG_DEBUG,"\nphyOnUnixClose(): close connection = %x\n", sock); + closeConnection(sock); } /* @@ -623,129 +464,224 @@ void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) { */ void NetconEthernetTap::phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) { - if(readable) { - TcpConnection *conn = (TcpConnection*)*uptr; - if(conn->dataSock) { // Sometimes a connection may be closed via nc_recved, check first - lwipstack->_lock.lock(); - handle_write(conn); - lwipstack->_lock.unlock(); - } - } - else { - dwr(MSG_ERROR, "phyOnFileDescriptorActivity(): PhySocket not readable\n"); - } + dwr(MSG_DEBUG,"\nphyOnFileDescriptorActivity(): new connection = %x\n", sock); } /* * Add a new PhySocket for the client connections */ void NetconEthernetTap::phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) { - dwr(MSG_DEBUG, " phyOnUnixAccept(): accepting new connection\n"); - if(find(rpc_sockets.begin(), rpc_sockets.end(), sockN) != rpc_sockets.end()){ - dwr(MSG_ERROR, " phyOnUnixAccept(): SockN (0x%x) already exists!\n", sockN); - return; - } - rpc_sockets.push_back(sockN); + dwr(MSG_DEBUG,"\nphyOnUnixAccept(): new connection = %x\n", sockN); +} + +/* Unpacks the buffer from an RPC command */ +void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid, + int &rpc_count, char (timestamp[20]), char (magic[sizeof(uint64_t)]), char &cmd, void* &payload) +{ + unsigned char *buf = (unsigned char*)data; + memcpy(&pid, &buf[IDX_PID], sizeof(pid_t)); + memcpy(&tid, &buf[IDX_TID], sizeof(pid_t)); + memcpy(&rpc_count, &buf[IDX_COUNT], sizeof(int)); + memcpy(timestamp, &buf[IDX_TIME], 20); + memcpy(&cmd, &buf[IDX_PAYLOAD], sizeof(char)); + memcpy(magic, &buf[IDX_PAYLOAD+1], MAGIC_SIZE); } /* * Processes incoming data on a client-specific RPC connection */ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) -{ +{ + //usleep(5000); + //dwr(MSG_DEBUG,"\n\n\n<%x> phyOnUnixData(): len = %d\n", sock, len); + uint64_t magic_num; pid_t pid, tid; int rpc_count; - char cmd, timestamp[20]; + char cmd, timestamp[20], magic[MAGIC_SIZE]; void *payload; - unload_rpc(data, pid, tid, rpc_count, timestamp, cmd, payload); - dwr(MSG_DEBUG, "\n\nRPC: (pid=%d, tid=%d, rpc_count=%d, timestamp=%s, cmd=%d\n", pid, tid, rpc_count, timestamp, cmd); unsigned char *buf = (unsigned char*)data; + std::pair<PhySocket*, void*> sockdata; + PhySocket *streamsock, *rpcsock; + bool found_sock = false, found_job = false; + + + TcpConnection *conn; + int max_sndbuf = (float)TCP_SND_BUF; + int wlen = len; + + // RPC + if(buf[IDX_SIGNAL_BYTE] == 'R') { + unload_rpc(data, pid, tid, rpc_count, timestamp, magic, cmd, payload); + memcpy(&magic_num, magic, MAGIC_SIZE); + dwr(MSG_DEBUG," <%x> RPC: (pid=%d, tid=%d, rpc_count=%d, timestamp=%s, cmd=%d)\n", sock, pid, tid, rpc_count, timestamp, cmd); + if(cmd == RPC_SOCKET) { + dwr(MSG_DEBUG," <%x> RPC_SOCKET\n", sock); + // Create new lwip socket and associate it with this sock + struct socket_st socket_rpc; + memcpy(&socket_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct socket_st)); + TcpConnection * new_conn; + if((new_conn = handle_socket(sock, uptr, &socket_rpc))) { + pidmap[sock] = pid; + new_conn->pid = pid; + } + return; // Don't close the socket, we'll use this later for data + } + else { // All RPCs other than RPC_SOCKET + streamsock = sockmap[magic_num]; + if(streamsock){ // We found a pre-existing stream socket for this RPC + sockmap[magic_num] = NULL; + found_sock = true; + dwr(MSG_DEBUG," <%x> found_sock\n", sock); + } + else { + // No matching stream has been encountered, create jobmap entry + dwr(MSG_DEBUG," <%x> creating jobmap (cmd=%d) entry for %llu\n", sock, cmd, magic_num); + jobmap[magic_num] = std::make_pair<PhySocket*, void*>(sock, data); + } + } + } + + // STREAM + else { + int data_start = -1, data_end = -1, token_pos = -1, padding_pos = -1; + char padding[] = {0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89}; + dwr(MSG_DEBUG," <%x> stream data, len = %d\n", sock, len); + // Look for padding + std::string padding_pattern(padding, padding+MAGIC_PADDING_SIZE); + std::string buffer(buf, buf + len); + padding_pos = buffer.find(padding_pattern); + token_pos = padding_pos-MAGIC_SIZE; + dwr(MSG_DEBUG, " <%x> padding_pos = %d\n", sock, padding_pos); + // Grab token, next we'll use it to look up an RPC job + if(token_pos > -1) { + dwr(MSG_DEBUG, " <%x> token_pos = %d, GRABBING TOKEN\n", sock, token_pos); + memcpy(&magic_num, buf+token_pos, MAGIC_SIZE); + if(magic_num != 0) { // TODO: Added to address magic_num==0 bug, last seeen 20160108 + // Find job + sockdata = jobmap[magic_num]; + if(!sockdata.first) { // Stream before RPC + dwr(MSG_DEBUG," <%x> creating sockmap entry for %llu\n", sock, magic_num); + sockmap[magic_num] = sock; + } + else { + dwr(MSG_DEBUG," <%x> found_job\n", sock); + found_job = true; + } + } + } - switch(cmd) - { - case RPC_SOCKET: - dwr(MSG_DEBUG, "RPC_SOCKET\n"); - struct socket_st socket_rpc; - memcpy(&socket_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct socket_st)); - - if(rpc_count==rpc_counter) { - dwr(MSG_ERROR, "Detected repeat RPC.\n"); - //return; - } - else { - rpc_counter = rpc_count; - } + conn = getConnection(sock); + if(!conn) + return; - TcpConnection * new_conn; - if((new_conn = handle_socket(sock, uptr, &socket_rpc))) { - pidmap[sock] = pid; - new_conn->pid = pid; + if(padding_pos == -1) // [DATA] + { + dwr(MSG_DEBUG, "copy everything... wlen = %d, conn = %x, conn->buf = %x, buf = %x\n", wlen, conn, conn->buf, buf); + dwr(MSG_DEBUG, " copy everything... conn->idx = %d, sizeof(conn->buf) = %d\n", conn->idx, sizeof(conn->buf)); + memcpy(&conn->buf[conn->idx], buf, wlen); + dwr(MSG_DEBUG, "finished\n"); } - break; - case RPC_LISTEN: - dwr(MSG_DEBUG, "RPC_LISTEN\n"); - struct listen_st listen_rpc; - memcpy(&listen_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct listen_st)); - handle_listen(sock, uptr, &listen_rpc); - break; - case RPC_BIND: - dwr(MSG_DEBUG, "RPC_BIND\n"); - struct bind_st bind_rpc; - memcpy(&bind_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct bind_st)); - handle_bind(sock, uptr, &bind_rpc); - break; - case RPC_CONNECT: - dwr(MSG_DEBUG, "RPC_CONNECT\n"); - struct connect_st connect_rpc; - memcpy(&connect_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct connect_st)); - handle_connect(sock, uptr, &connect_rpc); - break; - case RPC_MAP: - dwr(MSG_DEBUG, "RPC_MAP (len = %d)\n", len); - int newfd; - memcpy(&newfd, &buf[IDX_PAYLOAD+1], sizeof(int)); - handle_retval(sock, uptr, rpc_count, newfd); - break; - case RPC_MAP_REQ: - dwr(MSG_DEBUG, "RPC_MAP_REQ\n"); - handle_map_request(sock, uptr, buf); - break; - case RPC_GETSOCKNAME: - dwr(MSG_DEBUG, "RPC_GETSOCKNAME\n"); - struct getsockname_st getsockname_rpc; - memcpy(&getsockname_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct getsockname_st)); - handle_getsockname(sock, uptr, &getsockname_rpc); - break; - default: - dwr(MSG_ERROR, "POSSIBLE RPC CORRUPTION. TRY AGAIN!\n"); - break; + else { // Padding found, implies a token is present + + dwr(MSG_DEBUG, " <%x> token_pos = %d, GRABBING DATA\n", sock, token_pos); + + // [TOKEN] + if(len == TOKEN_SIZE && token_pos == 0) { + wlen = 0; // Nothing to write + } + else { + // [TOKEN] + [DATA] + if(len > TOKEN_SIZE && token_pos == 0) { + wlen = len - TOKEN_SIZE; + data_start = padding_pos+MAGIC_PADDING_SIZE; + memcpy((&conn->buf)+conn->idx, buf+data_start, wlen); + dwr(MSG_DEBUG," wlen = %d, data_start = %d\n", wlen, data_start); + } + // [DATA] + [TOKEN] + if(len > TOKEN_SIZE && token_pos > 0 && token_pos == len - TOKEN_SIZE) { + wlen = len - TOKEN_SIZE; + data_start = 0; + memcpy((&conn->buf)+conn->idx, buf+data_start, wlen); + dwr(MSG_DEBUG," wlen = %d, data_start = %d\n", wlen, data_start); + } + // [DATA] + [TOKEN] + [DATA] + if(len > TOKEN_SIZE && token_pos > 0 && len > (token_pos + TOKEN_SIZE)) { + wlen = len - TOKEN_SIZE; + data_start = 0; + data_end = padding_pos-MAGIC_SIZE; + memcpy((&conn->buf)+conn->idx, buf+data_start, (data_end-data_start)+1); + memcpy((&conn->buf)+conn->idx, buf+(padding_pos+MAGIC_PADDING_SIZE), len-(token_pos+TOKEN_SIZE)); + dwr(MSG_DEBUG," wlen = %d, data_start = %d, data_end = %d\n", wlen, data_start, data_end); + } + } + } + lwipstack->_lock.lock(); + conn->idx += wlen; + handle_write(conn); + lwipstack->_lock.unlock(); } -} -/* - * Send a 'retval' and 'errno' to the client for an RPC over connection->rpcSock - */ -int NetconEthernetTap::send_return_value(TcpConnection *conn, int retval, int _errno = 0) -{ - if(conn) { - int n = send_return_value(_phy.getDescriptor(conn->rpcSock), retval, _errno); - if(n > 0) - conn->pending = false; - else { - dwr(MSG_ERROR, " Unable to send return value to the intercept. Closing connection\n"); - closeConnection(conn); + if(found_job) { + rpcsock = sockdata.first; + buf = (unsigned char*)sockdata.second; + } + else if(found_sock) { + rpcsock = sock; + sock = streamsock; + } + + // Process RPC if we have a corresponding jobmap/sockmap entry + if(found_job || found_sock) + { + conn = getConnection(sock); + unload_rpc(buf, pid, tid, rpc_count, timestamp, magic, cmd, payload); + dwr(MSG_DEBUG," <%x> RPC: (pid=, tid=, rpc_count=, timestamp=, cmd=%d)\n", sock, /*pid, tid, rpc_count, timestamp, */cmd); + + switch(cmd) { + case RPC_BIND: + dwr(MSG_DEBUG," <%x> RPC_BIND\n", sock); + struct bind_st bind_rpc; + memcpy(&bind_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct bind_st)); + handle_bind(sock, rpcsock, uptr, &bind_rpc); + break; + case RPC_LISTEN: + dwr(MSG_DEBUG," <%x> RPC_LISTEN\n", sock); + struct listen_st listen_rpc; + memcpy(&listen_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct listen_st)); + handle_listen(sock, rpcsock, uptr, &listen_rpc); + break; + case RPC_GETSOCKNAME: + dwr(MSG_DEBUG," <%x> RPC_GETSOCKNAME\n", sock); + struct getsockname_st getsockname_rpc; + memcpy(&getsockname_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct getsockname_st)); + handle_getsockname(sock, rpcsock, uptr, &getsockname_rpc); + break; + case RPC_CONNECT: + dwr(MSG_DEBUG," <%x> RPC_CONNECT\n", sock); + struct connect_st connect_rpc; + memcpy(&connect_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct connect_st)); + handle_connect(sock, rpcsock, conn, &connect_rpc); + return; // Keep open RPC, we'll use it once in nc_connected to send retval + default: + break; } - return n; + closeConnection(sockdata.first); // close RPC after sending retval, no longer needed + jobmap.erase(magic_num); + sockmap.erase(magic_num); + return; } - return -1; +} + +int NetconEthernetTap::send_return_value(PhySocket *sock, int retval, int _errno = 0){ + return send_return_value(_phy.getDescriptor(sock), retval, _errno); } int NetconEthernetTap::send_return_value(int fd, int retval, int _errno = 0) { - dwr(MSG_DEBUG, " send_return_value(): fd = %d, retval = %d, errno = %d\n", fd, retval, _errno); + dwr(MSG_DEBUG," send_return_value(): fd = %d, retval = %d, errno = %d\n", fd, retval, _errno); int sz = sizeof(char) + sizeof(retval) + sizeof(errno); char retmsg[sz]; - memset(&retmsg, '\0', sizeof(retmsg)); + memset(&retmsg, 0, sizeof(retmsg)); retmsg[0]=RPC_RETVAL; memcpy(&retmsg[1], &retval, sizeof(retval)); memcpy(&retmsg[1]+sizeof(retval), &_errno, sizeof(_errno)); @@ -802,52 +738,52 @@ int NetconEthernetTap::send_return_value(int fd, int retval, int _errno = 0) */ err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newpcb, err_t err) { - dwr(MSG_DEBUG, " nc_accept()\n"); + dwr(MSG_DEBUG," nc_accept()\n"); Larg *l = (Larg*)arg; TcpConnection *conn = l->conn; NetconEthernetTap *tap = l->tap; - int listening_fd = tap->_phy.getDescriptor(conn->dataSock); - - if(conn) { - ZT_PHY_SOCKFD_TYPE fds[2]; - if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) { - if(errno < 0) { - l->tap->send_return_value(conn, -1, errno); - dwr(MSG_ERROR, " nc_accept(): unable to create socketpair\n"); - return ERR_MEM; + + if(!conn->sock) + return -1; + int listening_fd = tap->_phy.getDescriptor(conn->sock); + + if(conn) { + // create new socketpair + ZT_PHY_SOCKFD_TYPE fds[2]; + if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) { + if(errno < 0) { + l->tap->send_return_value(conn, -1, errno); + dwr(MSG_ERROR," nc_accept(): unable to create socketpair\n"); + return ERR_MEM; + } } - } - TcpConnection *new_tcp_conn = new TcpConnection(); - new_tcp_conn->dataSock = tap->_phy.wrapSocket(fds[0], new_tcp_conn); - new_tcp_conn->rpcSock = conn->rpcSock; - new_tcp_conn->pcb = newpcb; - new_tcp_conn->their_fd = fds[1]; - tap->tcp_connections.push_back(new_tcp_conn); - dwr(MSG_DEBUG, " nc_accept(): socketpair = {%d, %d}\n", fds[0], fds[1]); - int send_fd = tap->_phy.getDescriptor(conn->rpcSock); - -dwr(MSG_DEBUG, " nc_accept(): sending %d via %d\n", fds[1], listening_fd); - - if(sock_fd_write(listening_fd, fds[1]) < 0){ - dwr(MSG_ERROR, " nc_accept(%d): error writing signal byte (listen_fd = %d, perceived_fd = %d)\n", listening_fd, send_fd, fds[1]); - return -1; - } - else { - close(fds[1]); // close other end of socketpair - new_tcp_conn->pending = true; - } - tap->lwipstack->_tcp_arg(newpcb, new Larg(tap, new_tcp_conn)); - tap->lwipstack->_tcp_recv(newpcb, nc_recved); - tap->lwipstack->_tcp_err(newpcb, nc_err); - tap->lwipstack->_tcp_sent(newpcb, nc_sent); - tap->lwipstack->_tcp_poll(newpcb, nc_poll, 1); - tcp_accepted(conn->pcb); // Let lwIP know that it can queue additional incoming connections - return ERR_OK; - } - else { - dwr(MSG_ERROR, " nc_accept(%d): can't locate Connection object for PCB.\n", listening_fd); - } - return -1; + // create and populate new TcpConnection + TcpConnection *new_tcp_conn = new TcpConnection(); + tap->tcp_connections.push_back(new_tcp_conn); + new_tcp_conn->pcb = newpcb; + new_tcp_conn->sock = tap->_phy.wrapSocket(fds[0], new_tcp_conn); + + if(sock_fd_write(listening_fd, fds[1]) < 0) + return -1; + else { + //close(fds[1]); // close other end of socketpair + new_tcp_conn->pending = true; + } + tap->lwipstack->_tcp_arg(newpcb, new Larg(tap, new_tcp_conn)); + tap->lwipstack->_tcp_recv(newpcb, nc_recved); + tap->lwipstack->_tcp_err(newpcb, nc_err); + tap->lwipstack->_tcp_sent(newpcb, nc_sent); + tap->lwipstack->_tcp_poll(newpcb, nc_poll, 1); + if(conn->pcb->state == LISTEN) { + dwr(MSG_DEBUG," nc_accept(): Can't call tcp_accept() on LISTEN socket (pcb = %x)\n", conn->pcb); + return ERR_OK; // TODO: Verify this is correct + } + tcp_accepted(conn->pcb); // Let lwIP know that it can queue additional incoming connections + return ERR_OK; + } + else + dwr(MSG_ERROR," nc_accept(%d): can't locate Connection object for PCB.\n", listening_fd); + return -1; } /* @@ -865,44 +801,46 @@ dwr(MSG_DEBUG, " nc_accept(): sending %d via %d\n", fds[1], listening_fd); */ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err) { - dwr(MSG_DEBUG, " nc_recved()\n"); + dwr(MSG_DEBUG," nc_recved()\n"); Larg *l = (Larg*)arg; int n; - struct pbuf* q = p; + struct pbuf* q = p; - if(!l->conn) { - dwr(MSG_ERROR, " nc_recved(): no connection object\n"); - return ERR_OK; - } - if(p == NULL) { - if(l->conn && !l->conn->listening) { - dwr(MSG_INFO, " nc_recved(): closing connection\n"); - l->tap->closeConnection(l->conn); - return ERR_ABRT; - } - else { - dwr(MSG_ERROR, " nc_recved(): can't locate connection via (arg)\n"); - } - return err; - } - q = p; - while(p != NULL) { // Cycle through pbufs and write them to the socket - if(p->len <= 0) - break; - if((n = l->tap->_phy.streamSend(l->conn->dataSock,p->payload, p->len)) > 0) { - if(n < p->len) { - dwr(MSG_INFO, " nc_recved(): unable to write entire pbuf to buffer\n"); - } - l->tap->lwipstack->_tcp_recved(tpcb, n); // TODO: would it be more efficient to call this once at the end? - dwr(MSG_DEBUG, " nc_recved(): wrote %d bytes to (%d)\n", n, l->tap->_phy.getDescriptor(l->conn->dataSock)); - } - else { - dwr(MSG_INFO, " nc_recved(): No data written to intercept buffer (%d)\n", l->tap->_phy.getDescriptor(l->conn->dataSock)); - } - p = p->next; - } - l->tap->lwipstack->_pbuf_free(q); // free pbufs - return ERR_OK; + if(!l->conn) { + dwr(MSG_ERROR," nc_recved(): no connection\n"); + return ERR_OK; + } + if(p == NULL) { + if(l->conn && !l->conn->listening) { + dwr(MSG_INFO," nc_recved(): closing connection\n"); + //if(l->tap->lwipstack->_tcp_close(l->conn->pcb) != ERR_OK) { + // dwr(MSG_ERROR," closeConnection(): Error while calling tcp_close()\n"); + //} + l->tap->closeConnection(l->conn->sock); + return ERR_ABRT; + } + else { + dwr(MSG_ERROR," nc_recved(): invalid connection/state\n"); + } + return err; + } + q = p; + while(p != NULL) { // Cycle through pbufs and write them to the socket + if(p->len <= 0) + break; + if((n = l->tap->_phy.streamSend(l->conn->sock,p->payload, p->len)) > 0) { + if(n < p->len) { + dwr(MSG_INFO," nc_recved(): unable to write entire pbuf to stream\n"); + } + l->tap->lwipstack->_tcp_recved(tpcb, n); + dwr(MSG_DEBUG," nc_recved(): wrote %d bytes to <%x>\n", n, l->conn->sock); + } + else + dwr(MSG_INFO," nc_recved(): No data written to stream <%d>\n", l->conn->sock); + p = p->next; + } + l->tap->lwipstack->_pbuf_free(q); // free pbufs + return ERR_OK; } /* @@ -917,79 +855,76 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf */ void NetconEthernetTap::nc_err(void *arg, err_t err) { - dwr(MSG_DEBUG, "nc_err()\n"); + dwr(MSG_DEBUG,"nc_err() = %d\n", err); Larg *l = (Larg*)arg; if(!l->conn) - dwr(MSG_ERROR, "nc_err(): Connection is NULL!\n"); - - if(l->conn->listening) - return; + dwr(MSG_ERROR,"nc_err(): Connection is NULL!\n"); switch(err) { case ERR_MEM: - dwr(MSG_ERROR, "nc_err(): ERR_MEM->ENOMEM\n"); - l->tap->send_return_value(l->conn, -1, ENOMEM); + dwr(MSG_ERROR,"nc_err(): ERR_MEM->ENOMEM\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, ENOMEM); break; case ERR_BUF: - dwr(MSG_ERROR, "nc_err(): ERR_BUF->ENOBUFS\n"); - l->tap->send_return_value(l->conn, -1, ENOBUFS); + dwr(MSG_ERROR,"nc_err(): ERR_BUF->ENOBUFS\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, ENOBUFS); break; case ERR_TIMEOUT: - dwr(MSG_ERROR, "nc_err(): ERR_TIMEOUT->ETIMEDOUT\n"); - l->tap->send_return_value(l->conn, -1, ETIMEDOUT); + dwr(MSG_ERROR,"nc_err(): ERR_TIMEOUT->ETIMEDOUT\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, ETIMEDOUT); break; case ERR_RTE: - dwr(MSG_ERROR, "nc_err(): ERR_RTE->ENETUNREACH\n"); - l->tap->send_return_value(l->conn, -1, ENETUNREACH); + dwr(MSG_ERROR,"nc_err(): ERR_RTE->ENETUNREACH\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, ENETUNREACH); break; case ERR_INPROGRESS: - dwr(MSG_ERROR, "nc_err(): ERR_INPROGRESS->EINPROGRESS\n"); - l->tap->send_return_value(l->conn, -1, EINPROGRESS); + dwr(MSG_ERROR,"nc_err(): ERR_INPROGRESS->EINPROGRESS\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, EINPROGRESS); break; case ERR_VAL: - dwr(MSG_ERROR, "nc_err(): ERR_VAL->EINVAL\n"); - l->tap->send_return_value(l->conn, -1, EINVAL); + dwr(MSG_ERROR,"nc_err(): ERR_VAL->EINVAL\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, EINVAL); break; case ERR_WOULDBLOCK: - dwr(MSG_ERROR, "nc_err(): ERR_WOULDBLOCK->EWOULDBLOCK\n"); - l->tap->send_return_value(l->conn, -1, EWOULDBLOCK); + dwr(MSG_ERROR,"nc_err(): ERR_WOULDBLOCK->EWOULDBLOCK\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, EWOULDBLOCK); break; case ERR_USE: - dwr(MSG_ERROR, "nc_err(): ERR_USE->EADDRINUSE\n"); - l->tap->send_return_value(l->conn, -1, EADDRINUSE); + dwr(MSG_ERROR,"nc_err(): ERR_USE->EADDRINUSE\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, EADDRINUSE); break; case ERR_ISCONN: - dwr(MSG_ERROR, "nc_err(): ERR_ISCONN->EISCONN\n"); - l->tap->send_return_value(l->conn, -1, EISCONN); + dwr(MSG_ERROR,"nc_err(): ERR_ISCONN->EISCONN\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, EISCONN); break; case ERR_ABRT: - dwr(MSG_ERROR, "nc_err(): ERR_ABRT->ECONNREFUSED\n"); - l->tap->send_return_value(l->conn, -1, ECONNREFUSED); + dwr(MSG_ERROR,"nc_err(): ERR_ABRT->ECONNREFUSED\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, ECONNREFUSED); break; // FIXME: Below are errors which don't have a standard errno correlate case ERR_RST: - l->tap->send_return_value(l->conn, -1, -1); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, -1); break; case ERR_CLSD: - l->tap->send_return_value(l->conn, -1, -1); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, -1); break; case ERR_CONN: - l->tap->send_return_value(l->conn, -1, -1); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, -1); break; case ERR_ARG: - l->tap->send_return_value(l->conn, -1, -1); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, -1); break; case ERR_IF: - l->tap->send_return_value(l->conn, -1, -1); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, -1); break; default: break; } - dwr(MSG_ERROR, "nc_err(): closing connection\n"); - l->tap->closeConnection(l->conn); + //dwr(MSG_ERROR,"nc_err(): closing connection\n"); + //l->tap->closeConnection(l->conn); } /* @@ -1022,8 +957,9 @@ err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *tpcb, u16_t len) { Larg *l = (Larg*)arg; if(len) { + dwr(MSG_DEBUG,"nc_sent(ACKED): len = %d\n",len); l->conn->acked+=len; - l->tap->_phy.setNotifyReadable(l->conn->dataSock, true); + l->tap->_phy.setNotifyReadable(l->conn->sock, true); l->tap->_phy.whack(); } return ERR_OK; @@ -1041,9 +977,9 @@ err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *tpcb, u16_t len) */ err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *tpcb, err_t err) { - dwr(MSG_DEBUG, " nc_connected()\n"); + dwr(MSG_DEBUG," nc_connected()\n"); Larg *l = (Larg*)arg; - l->tap->send_return_value(l->conn, ERR_OK); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->rpcsock), ERR_OK); return ERR_OK; } @@ -1051,126 +987,16 @@ err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *tpcb, err_t err ----------------------------- RPC Handler functions ---------------------------- ------------------------------------------------------------------------------*/ -/* Unpacks the buffer from an RPC command */ -void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid, int &rpc_count, char (timestamp[20]), char &cmd, void* &payload) -{ - unsigned char *buf = (unsigned char*)data; - memcpy(&pid, &buf[IDX_PID], sizeof(pid_t)); - memcpy(&tid, &buf[IDX_TID], sizeof(pid_t)); - memcpy(&rpc_count, &buf[IDX_COUNT], sizeof(int)); - memcpy(timestamp, &buf[IDX_TIME], 20); - memcpy(&cmd, &buf[IDX_PAYLOAD], sizeof(char)); -} - -/* - Responds to a request from the [intercept] to determine whether a local socket is - mapped to this service. In other words, how do the intercept's overridden calls - tell the difference between regular AF_LOCAL sockets and one of our socketpairs - that is used to communicate over the network? -*/ -void NetconEthernetTap::handle_map_request(PhySocket *sock, void **uptr, unsigned char* buf) -{ - dwr(4, " handle_map_request()\n"); - TcpConnection *conn = (TcpConnection*)*uptr; - int req_fd; - memcpy(&req_fd, &buf[IDX_PAYLOAD+1], sizeof(req_fd)); - for(size_t i=0; i<tcp_connections.size(); i++) { - if(tcp_connections[i]->rpcSock == conn->rpcSock && tcp_connections[i]->perceived_fd == req_fd){ - send_return_value(conn, 1, ERR_OK); // True - dwr(MSG_DEBUG, " handle_map_request(their=%d): MAPPED (to %d)\n", req_fd, - _phy.getDescriptor(tcp_connections[i]->dataSock)); - return; - } - } - send_return_value(conn, 0, ERR_OK); // False - dwr(MSG_DEBUG, " handle_map_request(their=%d): NOT MAPPED\n", req_fd); -} - -/** - * Handles a return value (client's perceived fd) and completes a mapping - * so that we know what connection an RPC call should be associated with. - * - * @param PhySocket associated with this RPC connection - * @param structure containing the data and parameters for this client's RPC - * - */ -void NetconEthernetTap::handle_retval(PhySocket *sock, void **uptr, int rpc_count, int newfd) -{ - dwr(MSG_DEBUG, " handle_retval()\n"); - TcpConnection *conn = (TcpConnection*)*uptr; - if(!conn->pending){ - send_return_value(conn, -1, -1); - return; - } - conn->pending = false; - conn->perceived_fd = newfd; - if(rpc_count==rpc_counter) { - dwr(MSG_ERROR, " handle_retval(): Detected repeat RPC.\n"); - send_return_value(conn, -1, -1); - //return; - } - else - rpc_counter = rpc_count; - - dwr(MSG_DEBUG, " handle_retval(): CONN:%x - Mapping [our=%d -> their=%d]\n",conn, - _phy.getDescriptor(conn->dataSock), conn->perceived_fd); - - /* Check for pre-existing connection for this socket --- - This block is in response to interesting behaviour from redis-server. A - socket is created, setsockopt is called and the socket is set to IPV6 but fails (for now), - then it is closed and re-opened and consequently remapped. With two pipes mapped - to the same socket, makes it possible that we write to the wrong pipe and fail. So - this block merely searches for a possible duplicate mapping and erases it - */ - for(size_t i=0; i<tcp_connections.size(); i++) { - if(tcp_connections[i] == conn) - continue; - if(tcp_connections[i]->rpcSock == conn->rpcSock) { - if(tcp_connections[i]->perceived_fd == conn->perceived_fd) { - int n; - if((n = send(_phy.getDescriptor(tcp_connections[i]->dataSock), "z", 1, MSG_NOSIGNAL)) < 0) { - dwr(MSG_DEBUG, " handle_retval(): CONN:%x - Socket (%d) already mapped (originally CONN:%x)\n", conn, tcp_connections[i]->perceived_fd, tcp_connections[i]); - closeConnection(tcp_connections[i]); - } - else { - dwr(MSG_ERROR, " handle_retval(): CONN:%x - This socket is mapped to two different pipes (?). Exiting.\n", conn); - //die(0); // FIXME: Print service mapping state and exit - } - } - } - } - send_return_value(conn, ERR_OK, ERR_OK); // Success -} - - /* Return the address that the socket is bound to */ -void NetconEthernetTap::handle_getsockname(PhySocket *sock, void **uptr, struct getsockname_st *getsockname_rpc) +void NetconEthernetTap::handle_getsockname(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct getsockname_st *getsockname_rpc) { - TcpConnection *conn = getConnectionByTheirFD(sock, getsockname_rpc->sockfd); - dwr(MSG_DEBUG, " handle_getsockname(): sockfd = %d\n", getsockname_rpc->sockfd); - dwr(MSG_DEBUG, " handle_getsockname(): conn = 0x%x\n", conn); - - /* - if(!conn){ - return; - } - struct sockaddr_in * myaddr = (struct sockaddr_in*)conn->addr; - int port = myaddr->sin_port; - int ip = myaddr->sin_addr.s_addr; - unsigned char d[4]; - d[0] = ip & 0xFF; - d[1] = (ip >> 8) & 0xFF; - d[2] = (ip >> 16) & 0xFF; - d[3] = (ip >> 24) & 0xFF; - dwr(MSG_ERROR, " handle_getsockname(): addr = %d.%d.%d.%d: %d\n", d[0],d[1],d[2],d[3], lwipstack->ntohs(port)); - */ + TcpConnection *conn = getConnection(sock); // Assemble address "command" to send to intercept char retmsg[sizeof(struct sockaddr_storage)]; memset(&retmsg, 0, sizeof(retmsg)); if ((conn)&&(conn->addr)) memcpy(&retmsg, conn->addr, sizeof(struct sockaddr_storage)); - int n = write(_phy.getDescriptor(conn->rpcSock), &retmsg, sizeof(struct sockaddr_storage)); - dwr(MSG_DEBUG, " handle_getsockname(): wrote %d bytes\n", n); + int n = write(_phy.getDescriptor(rpcsock), &retmsg, sizeof(struct sockaddr_storage)); } /* @@ -1207,52 +1033,49 @@ void NetconEthernetTap::handle_getsockname(PhySocket *sock, void **uptr, struct [-] EROFS - The socket inode would reside on a read-only file system. */ -void NetconEthernetTap::handle_bind(PhySocket *sock, void **uptr, struct bind_st *bind_rpc) +void NetconEthernetTap::handle_bind(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct bind_st *bind_rpc) { + struct sockaddr_in *connaddr; + connaddr = (struct sockaddr_in *) &bind_rpc->addr; + int conn_port = lwipstack->ntohs(connaddr->sin_port); + ip_addr_t conn_addr; + conn_addr.addr = *((u32_t *)_ips[0].rawIpData()); + TcpConnection *conn = getConnection(sock); + dwr(MSG_DEBUG," handle_bind(%d)\n", bind_rpc->sockfd); - struct sockaddr_in *connaddr; - connaddr = (struct sockaddr_in *) &bind_rpc->addr; - int conn_port = lwipstack->ntohs(connaddr->sin_port); - ip_addr_t conn_addr; - conn_addr.addr = *((u32_t *)_ips[0].rawIpData()); - TcpConnection *conn = getConnectionByTheirFD(sock, bind_rpc->sockfd); - - dwr(MSG_DEBUG, " handle_bind(%d)\n", bind_rpc->sockfd); - - if(conn) { - if(conn->pcb->state == CLOSED){ - int err = lwipstack->tcp_bind(conn->pcb, &conn_addr, conn_port); - - int ip = connaddr->sin_addr.s_addr; + if(conn) { + if(conn->pcb->state == CLOSED){ + int err = lwipstack->tcp_bind(conn->pcb, &conn_addr, conn_port); + int ip = connaddr->sin_addr.s_addr; unsigned char d[4]; d[0] = ip & 0xFF; d[1] = (ip >> 8) & 0xFF; d[2] = (ip >> 16) & 0xFF; d[3] = (ip >> 24) & 0xFF; - dwr(MSG_DEBUG, " handle_bind(): %d.%d.%d.%d : %d\n", d[0],d[1],d[2],d[3], conn_port); + dwr(MSG_DEBUG," handle_bind(): %d.%d.%d.%d : %d\n", d[0],d[1],d[2],d[3], conn_port); if(err != ERR_OK) { - dwr(MSG_ERROR, " handle_bind(): err = %d\n", err); + dwr(MSG_ERROR," handle_bind(): err = %d\n", err); if(err == ERR_USE) - send_return_value(conn, -1, EADDRINUSE); + send_return_value(rpcsock, -1, EADDRINUSE); if(err == ERR_MEM) - send_return_value(conn, -1, ENOMEM); + send_return_value(rpcsock, -1, ENOMEM); if(err == ERR_BUF) - send_return_value(conn, -1, ENOMEM); // FIXME: Closest match + send_return_value(rpcsock, -1, ENOMEM); } else { conn->addr = (struct sockaddr_storage *) &bind_rpc->addr; - send_return_value(conn, ERR_OK, ERR_OK); // Success + send_return_value(rpcsock, ERR_OK, ERR_OK); // Success } - } - else { - dwr(MSG_ERROR, " handle_bind(): PCB (%x) not in CLOSED state. Ignoring BIND request.\n", conn->pcb); - send_return_value(conn, -1, EINVAL); } - } - else { - dwr(MSG_ERROR, " handle_bind(): can't locate connection for PCB\n"); - send_return_value(conn, -1, EBADF); + else { + dwr(MSG_ERROR," handle_bind(): PCB (%x) not in CLOSED state. Ignoring BIND request.\n", conn->pcb); + send_return_value(rpcsock, -1, EINVAL); + } + } + else { + dwr(MSG_ERROR," handle_bind(): can't locate connection for PCB\n"); + send_return_value(rpcsock, -1, EBADF); } } @@ -1275,22 +1098,20 @@ void NetconEthernetTap::handle_bind(PhySocket *sock, void **uptr, struct bind_st [I] EOPNOTSUPP - The socket is not of a type that supports the listen() operation. */ -void NetconEthernetTap::handle_listen(PhySocket *sock, void **uptr, struct listen_st *listen_rpc) +void NetconEthernetTap::handle_listen(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct listen_st *listen_rpc) { - dwr(3, " handle_listen(their=%d):\n", listen_rpc->sockfd); - TcpConnection *conn = getConnectionByTheirFD(sock, listen_rpc->sockfd); + dwr(MSG_DEBUG," handle_listen(their=%d):\n", listen_rpc->sockfd); + TcpConnection *conn = getConnection(sock); if(!conn){ - dwr(MSG_ERROR, " handle_listen(): unable to locate connection object\n"); - send_return_value(conn, -1, EBADF); + dwr(MSG_ERROR," handle_listen(): unable to locate connection object\n"); + send_return_value(rpcsock, -1, EBADF); + return; + } + if(conn->pcb->state == LISTEN) { + dwr(MSG_ERROR," handle_listen(): PCB is already in listening state.\n"); + send_return_value(rpcsock, ERR_OK, ERR_OK); return; } - dwr(3, " handle_listen(our=%d -> their=%d)\n", _phy.getDescriptor(conn->dataSock), conn->perceived_fd); - - if(conn->pcb->state == LISTEN) { - dwr(MSG_ERROR, " handle_listen(): PCB is already in listening state.\n"); - send_return_value(conn, ERR_OK, ERR_OK); - return; - } struct tcp_pcb* listening_pcb; #ifdef TCP_LISTEN_BACKLOG @@ -1299,19 +1120,19 @@ void NetconEthernetTap::handle_listen(PhySocket *sock, void **uptr, struct liste listening_pcb = lwipstack->tcp_listen(conn->pcb); #endif - if(listening_pcb != NULL) { - conn->pcb = listening_pcb; - lwipstack->tcp_accept(listening_pcb, nc_accept); + if(listening_pcb != NULL) { + conn->pcb = listening_pcb; + lwipstack->tcp_accept(listening_pcb, nc_accept); lwipstack->tcp_arg(listening_pcb, new Larg(this, conn)); /* we need to wait for the client to send us the fd allocated on their end for this listening socket */ - fcntl(_phy.getDescriptor(conn->dataSock), F_SETFL, O_NONBLOCK); + fcntl(_phy.getDescriptor(conn->sock), F_SETFL, O_NONBLOCK); conn->listening = true; conn->pending = true; - send_return_value(conn, ERR_OK, ERR_OK); + send_return_value(rpcsock, ERR_OK, ERR_OK); return; - } - send_return_value(conn, -1, -1); + } + send_return_value(rpcsock, -1, -1); } /* @@ -1344,34 +1165,19 @@ void NetconEthernetTap::handle_listen(PhySocket *sock, void **uptr, struct liste */ TcpConnection * NetconEthernetTap::handle_socket(PhySocket *sock, void **uptr, struct socket_st* socket_rpc) { - int rpc_fd = _phy.getDescriptor(sock); struct tcp_pcb *newpcb = lwipstack->tcp_new(); - dwr(MSG_DEBUG, " handle_socket(): pcb=%x\n", newpcb); + dwr(MSG_DEBUG," handle_socket(): pcb=%x\n", newpcb); if(newpcb != NULL) { - ZT_PHY_SOCKFD_TYPE fds[2]; - if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) { - if(errno < 0) { - send_return_value(rpc_fd, -1, errno); - return NULL; - } - } - dwr(MSG_DEBUG, " handle_socket(): socketpair = {%d, %d}\n", fds[0], fds[1]); - TcpConnection *new_conn = new TcpConnection(); - new_conn->dataSock = _phy.wrapSocket(fds[0], new_conn); - *uptr = new_conn; - new_conn->rpcSock = sock; - new_conn->pcb = newpcb; - new_conn->their_fd = fds[1]; - tcp_connections.push_back(new_conn); - sock_fd_write(_phy.getDescriptor(sock), fds[1]); - close(fds[1]); // close other end of socketpair - // Once the client tells us what its fd is on the other end, we can then complete the mapping - new_conn->pending = true; + TcpConnection *new_conn = new TcpConnection(); + *uptr = new_conn; + new_conn->sock = sock; + new_conn->pcb = newpcb; + tcp_connections.push_back(new_conn); + new_conn->pending = true; return new_conn; } - sock_fd_write(rpc_fd, -1); // Send a bad fd, to signal error - dwr(MSG_ERROR, " handle_socket(): Memory not available for new PCB\n"); - send_return_value(rpc_fd, -1, ENOMEM); + dwr(MSG_ERROR," handle_socket(): Memory not available for new PCB\n"); + send_return_value(_phy.getDescriptor(sock), -1, ENOMEM); return NULL; } @@ -1407,13 +1213,11 @@ TcpConnection * NetconEthernetTap::handle_socket(PhySocket *sock, void **uptr, s [X] ETIMEDOUT - Timeout while attempting connection. [X] EINVAL - Invalid argument, SVr4, generally makes sense to set this - * */ -void NetconEthernetTap::handle_connect(PhySocket *sock, void **uptr, struct connect_st* connect_rpc) +void NetconEthernetTap::handle_connect(PhySocket *sock, PhySocket *rpcsock, TcpConnection *conn, struct connect_st* connect_rpc) { - dwr(MSG_DEBUG, " handle_connect()\n"); - TcpConnection *conn = (TcpConnection*)*uptr; + dwr(MSG_DEBUG," handle_connect()\n"); struct sockaddr_in *connaddr; connaddr = (struct sockaddr_in *) &connect_rpc->__addr; int conn_port = lwipstack->ntohs(connaddr->sin_port); @@ -1431,27 +1235,26 @@ void NetconEthernetTap::handle_connect(PhySocket *sock, void **uptr, struct conn if((err = lwipstack->tcp_connect(conn->pcb,&conn_addr,conn_port, nc_connected)) < 0) { if(err == ERR_ISCONN) { - send_return_value(conn, -1, EISCONN); // Already in connected state + send_return_value(rpcsock, -1, EISCONN); // Already in connected state return; } if(err == ERR_USE) { - send_return_value(conn, -1, EADDRINUSE); // Already in use + send_return_value(rpcsock, -1, EADDRINUSE); // Already in use return; } if(err == ERR_VAL) { - send_return_value(conn, -1, EINVAL); // Invalid ipaddress parameter + send_return_value(rpcsock, -1, EINVAL); // Invalid ipaddress parameter return; } if(err == ERR_RTE) { - send_return_value(conn, -1, ENETUNREACH); // No route to host + send_return_value(rpcsock, -1, ENETUNREACH); // No route to host return; } if(err == ERR_BUF) { - send_return_value(conn, -1, EAGAIN); // No more ports available + send_return_value(rpcsock, -1, EAGAIN); // No more ports available return; } - if(err == ERR_MEM) - { + if(err == ERR_MEM) { /* Can occur for the following reasons: tcp_enqueue_flags() 1) tcp_enqueue_flags is always called with either SYN or FIN in flags. @@ -1463,7 +1266,7 @@ void NetconEthernetTap::handle_connect(PhySocket *sock, void **uptr, struct conn 3) Cannot allocate new TCP segment */ - send_return_value(conn, -1, EAGAIN); // FIXME: Doesn't describe the problem well, but closest match + send_return_value(rpcsock, -1, EAGAIN); // FIXME: Doesn't describe the problem well, but closest match return; } @@ -1474,79 +1277,79 @@ void NetconEthernetTap::handle_connect(PhySocket *sock, void **uptr, struct conn // that's it! // - Most instances of a retval for a connect() should happen // in the nc_connect() and nc_err() callbacks! - dwr(MSG_ERROR, " handle_connect(): unable to connect\n"); - send_return_value(conn, -1, EAGAIN); + dwr(MSG_ERROR," handle_connect(): unable to connect\n"); + send_return_value(rpcsock, -1, EAGAIN); } // Everything seems to be ok, but we don't have enough info to retval conn->pending=true; conn->listening=true; - send_return_value(conn, -1); + conn->rpcsock=rpcsock; // used for return value from lwip CB } else { - dwr(MSG_ERROR, " handle_connect(): could not locate PCB based on their fd\n"); - send_return_value(conn, -1, EBADF); + dwr(MSG_ERROR," handle_connect(): could not locate PCB based on their fd\n"); + send_return_value(rpcsock, -1, EBADF); } } + void NetconEthernetTap::handle_write(TcpConnection *conn) { - float max = (float)TCP_SND_BUF; int r; - if(!conn) { - dwr(MSG_ERROR, " handle_write(): could not locate connection for this fd\n"); + dwr(MSG_ERROR," handle_write(): could not locate connection for this fd\n"); return; } - if(conn->idx < max) { - if(!conn->pcb) { - dwr(MSG_ERROR, " handle_write(): conn->pcb == NULL. Failed to write.\n"); - return; - } - int sndbuf = conn->pcb->snd_buf; // How much we are currently allowed to write to the connection - /* PCB send buffer is full,turn off readability notifications for the - corresponding PhySocket until nc_sent() is called and confirms that there is - now space on the buffer */ - if(sndbuf == 0) { - _phy.setNotifyReadable(conn->dataSock, false); - return; - } - if(!conn->listening) + dwr(MSG_DEBUG,"conn->idx = %d, TCP_SND_BUF = %d\n", conn->idx, TCP_SND_BUF); + if(!conn->pcb) { + dwr(MSG_ERROR," handle_write(): conn->pcb == NULL. Failed to write.\n"); + return; + } + int sndbuf = conn->pcb->snd_buf; // How much we are currently allowed to write to the connection + /* PCB send buffer is full,turn off readability notifications for the + corresponding PhySocket until nc_sent() is called and confirms that there is + now space on the buffer */ + if(sndbuf == 0) { + _phy.setNotifyReadable(conn->sock, false); + return; + } + if(!conn->listening) + lwipstack->_tcp_output(conn->pcb); + if(conn->sock && !conn->listening) { + + r = conn->idx < sndbuf ? conn->idx : sndbuf; + dwr(MSG_DEBUG,"handle_write(): r = %d\n", r); + /* Writes data pulled from the client's socket buffer to LWIP. This merely sends the + * data to LWIP to be enqueued and eventually sent to the network. */ + if(r > 0) { + int sz; + // NOTE: this assumes that lwipstack->_lock is locked, either + // because we are in a callback or have locked it manually. + int err = lwipstack->_tcp_write(conn->pcb, &conn->buf, r, TCP_WRITE_FLAG_COPY); lwipstack->_tcp_output(conn->pcb); - - if(conn->dataSock && !conn->listening) { - int read_fd = _phy.getDescriptor(conn->dataSock); - if((r = recvfrom(read_fd, (&conn->buf)+conn->idx, sndbuf, MSG_DONTWAIT, NULL, NULL)) > 0) { - conn->idx += r; - /* Writes data pulled from the client's socket buffer to LWIP. This merely sends the - * data to LWIP to be enqueued and eventually sent to the network. */ - if(r > 0) { - int sz; - // NOTE: this assumes that lwipstack->_lock is locked, either - // because we are in a callback or have locked it manually. - int err = lwipstack->_tcp_write(conn->pcb, &conn->buf, r, TCP_WRITE_FLAG_COPY); - lwipstack->_tcp_output(conn->pcb); - if(err != ERR_OK) { - dwr(MSG_ERROR, " handle_write(): error while writing to PCB, (err = %d)\n", err); - return; - } - else { - sz = (conn->idx)-r; - if(sz) { - memmove(&conn->buf, (conn->buf+r), sz); - } - conn->idx -= r; - conn->written+=r; - return; - } - } - else { - dwr(MSG_INFO, " handle_write(): LWIP stack full\n"); - return; + if(err != ERR_OK) { + dwr(MSG_ERROR," handle_write(): error while writing to PCB, (err = %d)\n", err); + if(err == -1) + dwr(MSG_DEBUG," handle_write(): possibly out of memory\n"); + return; + } + else { + sz = (conn->idx)-r; + if(sz) { + memmove(&conn->buf, (conn->buf+r), sz); } + conn->idx -= r; + conn->written+=r; + return; } } + else { + dwr(MSG_INFO," handle_write(): LWIP stack full\n"); + return; + } } } + + } // namespace ZeroTier diff --git a/netcon/NetconEthernetTap.hpp b/netcon/NetconEthernetTap.hpp index f18e0da8..e0519bfa 100644 --- a/netcon/NetconEthernetTap.hpp +++ b/netcon/NetconEthernetTap.hpp @@ -33,7 +33,9 @@ #include <string> #include <vector> +#include <utility> #include <stdexcept> +#include <stdint.h> #include "../node/Constants.hpp" #include "../node/MulticastGroup.hpp" @@ -110,18 +112,17 @@ private: static err_t nc_connected(void *arg, struct tcp_pcb *tpcb, err_t err); // RPC handlers (from NetconIntercept) - void unload_rpc(void *data, pid_t &pid, pid_t &tid, int &rpc_count, char (timestamp[20]), char &cmd, void* &payload); + void unload_rpc(void *data, pid_t &pid, pid_t &tid, + int &rpc_count, char (timestamp[20]), char (magic[sizeof(uint64_t)]), char &cmd, void* &payload); - void handle_getsockname(PhySocket *sock, void **uptr, struct getsockname_st *getsockname_rpc); - void handle_bind(PhySocket *sock, void **uptr, struct bind_st *bind_rpc); - void handle_listen(PhySocket *sock, void **uptr, struct listen_st *listen_rpc); - void handle_map_request(PhySocket *sock, void **uptr, unsigned char* buf); - void handle_retval(PhySocket *sock, void **uptr, int rpc_count, int newfd); + void handle_getsockname(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct getsockname_st *getsockname_rpc); + void handle_bind(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct bind_st *bind_rpc); + void handle_listen(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct listen_st *listen_rpc); TcpConnection * handle_socket(PhySocket *sock, void **uptr, struct socket_st* socket_rpc); - void handle_connect(PhySocket *sock, void **uptr, struct connect_st* connect_rpc); + void handle_connect(PhySocket *sock, PhySocket *rpcsock, TcpConnection *conn, struct connect_st* connect_rpc); void handle_write(TcpConnection *conn); - int send_return_value(TcpConnection *conn, int retval, int _errno); + int send_return_value(PhySocket *sock, int retval, int _errno); int send_return_value(int fd, int retval, int _errno); void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len); @@ -135,6 +136,9 @@ private: void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len); void phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable); + TcpConnection *getConnection(PhySocket *sock); + void closeConnection(PhySocket *sock); + ip_addr_t convert_ip(struct sockaddr_in * addr) { ip_addr_t conn_addr; @@ -147,23 +151,16 @@ private: return conn_addr; } - // Client helpers - TcpConnection *getConnectionByTheirFD(PhySocket *sock, int fd); - void closeConnection(TcpConnection *conn); - void closeAll(); - void closeClient(PhySocket *sock); - void compact_dump(); - void dump(); - void die(int exret); - Phy<NetconEthernetTap *> _phy; PhySocket *_unixListenSocket; std::vector<TcpConnection*> tcp_connections; - std::vector<PhySocket*> rpc_sockets; std::map<PhySocket*, pid_t> pidmap; - pid_t rpc_counter; + std::map<uint64_t, std::pair<PhySocket*, void*> > jobmap; + std::map<uint64_t, PhySocket*> sockmap; + + pid_t rpc_counter; netif interface; MAC _mac; diff --git a/netcon/RPC.c b/netcon/RPC.c index 2b396183..5433bb12 100644 --- a/netcon/RPC.c +++ b/netcon/RPC.c @@ -5,6 +5,9 @@ #include <errno.h> #include <sys/syscall.h> +#include <fcntl.h> +#include <stdint.h> + #include <sys/socket.h> #include <strings.h> #include "RPC.h" @@ -14,18 +17,32 @@ static int instance_count; static int rpc_count; -static pthread_mutex_t lock; +static pthread_mutex_t lock; void rpc_mutex_init() { if(pthread_mutex_init(&lock, NULL) != 0) { fprintf(stderr, "error while initializing service call mutex\n"); } } - void rpc_mutex_destroy() { pthread_mutex_destroy(&lock); } +/* + * Reads a new file descriptor from the service + */ +int get_new_fd(int sock) +{ + char buf[BUF_SZ]; + int newfd; + ssize_t size = sock_fd_read(sock, buf, sizeof(buf), &newfd); + if(size > 0){ + return newfd; + } + fprintf(stderr, "get_new_fd(): Error, unable to read fd over (%d)\n", sock); + return -1; +} + /* * Reads a return value from the service and sets errno (if applicable) */ @@ -46,21 +63,6 @@ int get_retval(int rpc_sock) return -1; } -/* - * Reads a new file descriptor from the service - */ -int get_new_fd(int sock) -{ - char buf[BUF_SZ]; - int newfd; - ssize_t size = sock_fd_read(sock, buf, sizeof(buf), &newfd); - if(size > 0){ - return newfd; - } - fprintf(stderr, "get_new_fd(): Error, unable to read fd over (%d)\n", sock); - return -1; -} - int rpc_join(const char * sockname) { struct sockaddr_un addr; @@ -81,9 +83,9 @@ int rpc_join(const char * sockname) sleep(1); } else { - int newfd = dup2(sock, RPC_FD-instance_count); - close(sock); - return newfd; + //int newfd = dup2(sock, RPC_FD-instance_count); + //close(sock); + return sock; } attempts++; } @@ -93,14 +95,26 @@ int rpc_join(const char * sockname) /* * Send a command to the service */ -int rpc_send_command(int cmd, int rpc_sock, void *data, int len) +int rpc_send_command(int cmd, int forfd, void *data, int len) { - char cmdbuf[BUF_SZ]; - cmdbuf[0] = cmd; - memcpy(&cmdbuf[1], data, len); - pthread_mutex_lock(&lock); - char metabuf[BUF_SZ]; // portion of buffer which contains RPC metadata for debugging + char padding[] = {0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89}; + char cmdbuf[BUF_SZ], magic[TOKEN_SIZE], metabuf[BUF_SZ]; + memcpy(magic+MAGIC_SIZE, padding, TOKEN_SIZE); + uint64_t magic_num; + + // ephemeral RPC socket used only for this command + int rpc_sock = rpc_join("/root/dev/ztest/nc_e5cd7a9e1c3511dd"); + // Generate token + int fdrand = open("/dev/urandom", O_RDONLY); + read(fdrand, &magic, MAGIC_SIZE); + memcpy(&magic_num, magic, MAGIC_SIZE); + cmdbuf[CMD_ID_IDX] = cmd; + memcpy(&cmdbuf[MAGIC_IDX], &magic_num, MAGIC_SIZE); + memcpy(&cmdbuf[STRUCT_IDX], data, len); + + // Format: [sig_byte] + [cmd_id] + [magic] + [meta] + [payload] + #ifdef VERBOSE /* #define IDX_PID 0 @@ -119,43 +133,52 @@ int rpc_send_command(int cmd, int rpc_sock, void *data, int len) time_t timestamp; timestamp = time(NULL); strftime(timestring, sizeof(timestring), "%H:%M:%S", localtime(×tamp)); + + metabuf[IDX_SIGNAL_BYTE] = 'R'; + memcpy(&metabuf[IDX_PID], &pid, sizeof(pid_t) ); /* pid */ memcpy(&metabuf[IDX_TID], &tid, sizeof(pid_t) ); /* tid */ memcpy(&metabuf[IDX_COUNT], &rpc_count, sizeof(rpc_count) ); /* rpc_count */ memcpy(&metabuf[IDX_TIME], ×tring, 20 ); /* timestamp */ #endif /* Combine command flag+payload with RPC metadata */ - memcpy(&metabuf[IDX_PAYLOAD], cmdbuf, len); + memcpy(&metabuf[IDX_PAYLOAD], cmdbuf, len + 1 + MAGIC_SIZE); + + // Write RPC int n_write = write(rpc_sock, &metabuf, BUF_SZ); if(n_write < 0) { - fprintf(stderr, "Error writing command to service (CMD = %d)\n", cmdbuf[0]); + fprintf(stderr, "Error writing command to service (CMD = %d)\n", cmdbuf[CMD_ID_IDX]); errno = 0; } - + // Write token to corresponding data stream + if(n_write > 0 && forfd > -1){ + usleep(5000); + int w = send(forfd, &magic, TOKEN_SIZE, 0); + } + // Process response from service int ret = ERR_OK; if(n_write > 0) { - if(cmdbuf[0]==RPC_SOCKET) { - ret = get_new_fd(rpc_sock); + if(cmdbuf[CMD_ID_IDX]==RPC_SOCKET) { + pthread_mutex_unlock(&lock); + return rpc_sock; // Used as new socket } - if(cmdbuf[0]==RPC_MAP_REQ - || cmdbuf[0]==RPC_CONNECT - || cmdbuf[0]==RPC_BIND - || cmdbuf[0]==RPC_LISTEN - || cmdbuf[0]==RPC_MAP) { + if(cmdbuf[CMD_ID_IDX]==RPC_CONNECT + || cmdbuf[CMD_ID_IDX]==RPC_BIND + || cmdbuf[CMD_ID_IDX]==RPC_LISTEN) { ret = get_retval(rpc_sock); } - if(cmdbuf[0]==RPC_GETSOCKNAME) { - ret = n_write; + if(cmdbuf[CMD_ID_IDX]==RPC_GETSOCKNAME) { + pthread_mutex_unlock(&lock); + return rpc_sock; // Don't close rpc here, we'll use it to read getsockopt_st } } - else { + else ret = -1; - } + close(rpc_sock); // We're done with this RPC socket, close it (if type-R) pthread_mutex_unlock(&lock); return ret; } - /* * Send file descriptor */ @@ -166,21 +189,17 @@ ssize_t sock_fd_write(int sock, int fd) struct iovec iov; char buf = '\0'; int buflen = 1; - union { struct cmsghdr cmsghdr; char control[CMSG_SPACE(sizeof (int))]; } cmsgu; struct cmsghdr *cmsg; - iov.iov_base = &buf; iov.iov_len = buflen; - msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = &iov; msg.msg_iovlen = 1; - if (fd != -1) { msg.msg_control = cmsgu.control; msg.msg_controllen = sizeof(cmsgu.control); @@ -193,13 +212,11 @@ ssize_t sock_fd_write(int sock, int fd) msg.msg_control = NULL; msg.msg_controllen = 0; } - size = sendmsg(sock, &msg, 0); if (size < 0) perror ("sendmsg"); return size; } - /* * Read a file descriptor */ @@ -214,10 +231,8 @@ ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd) char control[CMSG_SPACE(sizeof (int))]; } cmsgu; struct cmsghdr *cmsg; - iov.iov_base = buf; iov.iov_len = bufsize; - msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = &iov; @@ -239,7 +254,6 @@ ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd) fprintf (stderr, "invalid cmsg_type %d\n",cmsg->cmsg_type); return -1; } - *fd = *((int *) CMSG_DATA(cmsg)); } else *fd = -1; } else { diff --git a/netcon/RPC.h b/netcon/RPC.h index ad8f54d9..ee8182e8 100644 --- a/netcon/RPC.h +++ b/netcon/RPC.h @@ -1,12 +1,25 @@ #ifndef __RPCLIB_H_ #define __RPCLIB_H_ -#define IDX_PID 0 -#define IDX_TID sizeof(pid_t) +#include <stdint.h> + +#define MAGIC_SIZE sizeof(uint64_t) +#define MAGIC_PADDING_SIZE 12 +#define TOKEN_SIZE MAGIC_SIZE+MAGIC_PADDING_SIZE + +// 1st section +#define IDX_SIGNAL_BYTE 0 +#define IDX_PID 1 +#define IDX_TID sizeof(pid_t) + 1 #define IDX_COUNT IDX_TID + sizeof(pid_t) #define IDX_TIME IDX_COUNT + sizeof(int) #define IDX_PAYLOAD IDX_TIME + 20 /* 20 being the length of the timestamp string */ +// 2nd section +#define CMD_ID_IDX 0 +#define MAGIC_IDX 1 +#define STRUCT_IDX MAGIC_IDX+MAGIC_SIZE + #define BUF_SZ 256 #define PAYLOAD_SZ 223 /* BUF_SZ-IDX_PAYLOAD */ @@ -37,18 +50,19 @@ extern "C" { #endif -void rpc_mutex_destroy(); -void rpc_mutex_init(); - int get_retval(int); -int get_new_fd(int); int rpc_join(const char * sockname); -int rpc_send_command(int cmd, int rpc_sock, void *data, int len); +int rpc_send_command(int cmd, int forfd, void *data, int len); +int get_new_fd(int sock); ssize_t sock_fd_write(int sock, int fd); ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd); +void rpc_mutex_destroy(); +void rpc_mutex_init(); + + /* Structures used for sending commands via RPC mechanism */ struct bind_st { diff --git a/netcon/common.inc.c b/netcon/common.inc.c index 858a2195..60d2ab3b 100644 --- a/netcon/common.inc.c +++ b/netcon/common.inc.c @@ -42,7 +42,7 @@ #ifndef _COMMON_H #define _COMMON_H 1 -#define DEBUG_LEVEL 0 +#define DEBUG_LEVEL 3 #define MSG_WARNING 4 #define MSG_ERROR 1 // Errors diff --git a/netcon/docker-test/e5cd7a9e1c3511dd.conf b/netcon/docker-test/e5cd7a9e1c3511dd.conf new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/netcon/docker-test/e5cd7a9e1c3511dd.conf |