diff options
| author | Joseph Henry <josephjah@gmail.com> | 2016-01-11 10:12:59 -0800 |
|---|---|---|
| committer | Joseph Henry <josephjah@gmail.com> | 2016-01-11 10:12:59 -0800 |
| commit | 3e65ecb93d62d628b99d68fec8b1ec94993f4f09 (patch) | |
| tree | 763cf3956766a28f6c767c067eabec49e039ecff /netcon/NetconEthernetTap.cpp | |
| parent | ff9317365a1d9498076b0971f0209fc2aec5c142 (diff) | |
| download | infinitytier-3e65ecb93d62d628b99d68fec8b1ec94993f4f09.tar.gz infinitytier-3e65ecb93d62d628b99d68fec8b1ec94993f4f09.zip | |
Stateless RPC rework
Diffstat (limited to 'netcon/NetconEthernetTap.cpp')
| -rw-r--r-- | netcon/NetconEthernetTap.cpp | 1189 |
1 files changed, 496 insertions, 693 deletions
diff --git a/netcon/NetconEthernetTap.cpp b/netcon/NetconEthernetTap.cpp index 0b74d5dc..8cf5f3ff 100644 --- a/netcon/NetconEthernetTap.cpp +++ b/netcon/NetconEthernetTap.cpp @@ -29,6 +29,10 @@ #include <utility> #include <dlfcn.h> #include <sys/poll.h> +#include <stdint.h> +#include <utility> +#include <string> +#include <sys/resource.h> #include "NetconEthernetTap.hpp" @@ -52,8 +56,8 @@ #define APPLICATION_POLL_FREQ 20 #define ZT_LWIP_TCP_TIMER_INTERVAL 5 -#define STATUS_TMR_INTERVAL 3000 // How often we check connection statuses -#define DEFAULT_READ_BUFFER_SIZE 1024 * 63 +#define STATUS_TMR_INTERVAL 1000 // How often we check connection statuses (in ms) +#define DEFAULT_READ_BUFFER_SIZE 1024 * 1024 namespace ZeroTier { @@ -118,21 +122,18 @@ static err_t low_level_output(struct netif *netif, struct pbuf *p) class TcpConnection { public: - int perceived_fd; - int their_fd; - bool pending; - bool listening; - int pid; - unsigned long written; - unsigned long acked; + uint64_t accept_token; + + bool pending, listening; + int pid, idx; + unsigned long written, acked; - PhySocket *rpcSock; - PhySocket *dataSock; + PhySocket *rpcsock; + PhySocket *sock; struct tcp_pcb *pcb; struct sockaddr_storage *addr; unsigned char buf[DEFAULT_READ_BUFFER_SIZE]; - int idx; }; /* @@ -180,7 +181,7 @@ NetconEthernetTap::NetconEthernetTap( lwipstack->lwip_init(); _unixListenSocket = _phy.unixListen(sockPath,(void *)this); - dwr(MSG_INFO, " NetconEthernetTap initialized!\n", _phy.getDescriptor(_unixListenSocket)); + dwr(MSG_INFO," NetconEthernetTap initialized!\n", _phy.getDescriptor(_unixListenSocket)); if (!_unixListenSocket) throw std::runtime_error(std::string("unable to bind to ")+sockPath); _thread = Thread::start(this); @@ -277,7 +278,7 @@ void NetconEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType // First pbuf gets ethernet header at start q = p; if (q->len < sizeof(ethhdr)) { - dwr(MSG_ERROR, "_put(): Dropped packet: first pbuf smaller than ethernet header\n"); + dwr(MSG_ERROR,"_put(): Dropped packet: first pbuf smaller than ethernet header\n"); return; } memcpy(q->payload,ðhdr,sizeof(ethhdr)); @@ -290,14 +291,14 @@ void NetconEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType dataptr += q->len; } } else { - dwr(MSG_ERROR, "put(): Dropped packet: no pbufs available\n"); + dwr(MSG_ERROR,"put(): Dropped packet: no pbufs available\n"); return; } { Mutex::Lock _l2(lwipstack->_lock); if(interface.input(p, &interface) != ERR_OK) { - dwr(MSG_ERROR, "put(): Error while RXing packet (netif->input)\n"); + dwr(MSG_ERROR,"put(): Error while RXing packet (netif->input)\n"); } } } @@ -335,190 +336,23 @@ void NetconEthernetTap::scanMulticastGroups(std::vector<MulticastGroup> &added,s _multicastGroups.swap(newGroups); } -TcpConnection *NetconEthernetTap::getConnectionByTheirFD(PhySocket *sock, int fd) +TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock) { for(size_t i=0; i<tcp_connections.size(); i++) { - if(tcp_connections[i]->perceived_fd == fd && tcp_connections[i]->rpcSock == sock) + if(tcp_connections[i]->sock == sock) return tcp_connections[i]; } return NULL; } -/* - * Dumps service state in 80x25 when debug mode is off - */ -void NetconEthernetTap::compact_dump() -{ - /* - clearscreen(); - gotoxy(0,0); - - fprintf(stderr, "ZeroTier - Network Containers Service [State Dump]\n\r"); - fprintf(stderr, " RPC Sockets = %d\n\r", rpc_sockets.size()); - fprintf(stderr, " TCP Connections = %d\n\r", tcp_connections.size()); - - for(size_t i=0; i<rpc_sockets.size(); i++) { - int rpc_fd = _phy.getDescriptor(rpc_sockets[i]); - char buf[80]; - int pid = pidmap[rpc_sockets[i]]; - memset(&buf, '\0', 80); - get_path_from_pid(buf, pid); - fprintf(stderr, "\n Client(addr=0x%x, rpc=%d, pid=%d) %s\n", rpc_sockets[i], rpc_fd, pid, buf); - for(size_t j=0; j<tcp_connections.size(); j++) { - memset(&buf, '\0', 80); - get_path_from_pid(buf, tcp_connections[j]->pid); - if(tcp_connections[j]->rpcSock==rpc_sockets[i]) { - fprintf(stderr, "\t\tpath\t\t= %s\n", buf); - } - } - } - */ - for(size_t i=0; i<rpc_sockets.size(); i++) { - fprintf(stderr, "\n\n\nrpc(%d)\n", _phy.getDescriptor(rpc_sockets[i])); - for(size_t j=0; j<tcp_connections.size(); j++) { - if(_phy.getDescriptor(tcp_connections[j]->rpcSock) == _phy.getDescriptor(rpc_sockets[i])) - fprintf(stderr, "\t(%d) ----> (%d)\n\n", _phy.getDescriptor(tcp_connections[j]->dataSock), tcp_connections[j]->perceived_fd); - } - } -} - -/* - * Dumps service state - */ -void NetconEthernetTap::dump() -{ - fprintf(stderr, "\n\n---\n\ndie(): BEGIN SERVICE STATE DUMP\n"); - fprintf(stderr, "*** IF YOU SEE THIS, EMAIL THE DUMP TEXT TO joseph.henry@zerotier.com ***\n"); - fprintf(stderr, " tcp_conns = %lu, rpc_socks = %lu\n", tcp_connections.size(), rpc_sockets.size()); - - // TODO: Add logic to detect bad mapping conditions - for(size_t i=0; i<rpc_sockets.size(); i++) { - for(size_t j=0; j<rpc_sockets.size(); j++) { - if(j != i && rpc_sockets[i] == rpc_sockets[j]) { - fprintf(stderr, "Duplicate PhySockets found! (0x%p)\n", rpc_sockets[i]); - } - } - } - - // Dump the state of the service mapping - for(size_t i=0; i<rpc_sockets.size(); i++) { - int rpc_fd = _phy.getDescriptor(rpc_sockets[i]); - char buf[80]; - int pid = pidmap[rpc_sockets[i]]; - get_path_from_pid(buf, pid); - - fprintf(stderr, "\nClient(addr=0x%p, rpc=%d, pid=%d) %s\n", rpc_sockets[i], rpc_fd, pid, buf); - for(size_t j=0; j<tcp_connections.size(); j++) { - get_path_from_pid(buf, tcp_connections[j]->pid); - if(tcp_connections[j]->rpcSock==rpc_sockets[i]){ - fprintf(stderr, " |\n"); - fprintf(stderr, " |-Connection(0x%p):\n", tcp_connections[j]); - fprintf(stderr, " | path\t\t\t= %s\n", buf); - fprintf(stderr, " | perceived_fd\t\t= %d\t(fd)\n", tcp_connections[j]->perceived_fd); - fprintf(stderr, " | their_fd\t\t= %d\t(fd)\n", tcp_connections[j]->their_fd); - fprintf(stderr, " | dataSock(0x%p)\t= %d\t(fd)\n", tcp_connections[j]->dataSock, _phy.getDescriptor(tcp_connections[j]->dataSock)); - fprintf(stderr, " | rpcSock(0x%p)\t= %d\t(fd)\n", tcp_connections[j]->rpcSock, _phy.getDescriptor(tcp_connections[j]->rpcSock)); - fprintf(stderr, " | pending\t\t= %d\n", tcp_connections[j]->pending); - fprintf(stderr, " | listening\t\t= %d\n", tcp_connections[j]->listening); - fprintf(stderr, " \\------pcb(0x%p)->state\t= %d\n", tcp_connections[j]->pcb, tcp_connections[j]->pcb->state); - } - } - } - fprintf(stderr, "\n\ndie(): END SERVICE STATE DUMP\n\n---\n\n"); -} - -/* - * Dumps service state and then exits - */ -void NetconEthernetTap::die(int exret) { - dump(); - exit(exret); -} - -/* - * Closes a TcpConnection and associated LWIP PCB strcuture. - */ -void NetconEthernetTap::closeConnection(TcpConnection *conn) -{ - if(!conn) - return; - dwr(MSG_DEBUG, " closeConnection(%x, %d)\n", conn->pcb, _phy.getDescriptor(conn->dataSock)); - if(lwipstack->_tcp_close(conn->pcb) != ERR_OK) { - dwr(MSG_ERROR, " closeConnection(): Error while calling tcp_close()\n"); - exit(0); - } - else { - if(conn->dataSock) { - close(_phy.getDescriptor(conn->dataSock)); - _phy.close(conn->dataSock,false); - } - /* Eventually we might want to use a map here instead */ - for(int i=0; i<tcp_connections.size(); i++) { - if(tcp_connections[i] == conn) { - tcp_connections.erase(tcp_connections.begin() + i); - delete conn; - break; - } - } - } -} - -/* - * Close a single RPC connection and associated PhySocket - */ -void NetconEthernetTap::closeClient(PhySocket *sock) -{ - for(size_t i=0; i<rpc_sockets.size(); i++) { - if(rpc_sockets[i] == sock){ - rpc_sockets.erase(rpc_sockets.begin() + i); - break; - } - } - close(_phy.getDescriptor(sock)); - _phy.close(sock); -} - -/* - * Close all RPC and TCP connections - */ -void NetconEthernetTap::closeAll() -{ - while(rpc_sockets.size()) - closeClient(rpc_sockets.front()); - while(tcp_connections.size()) - closeConnection(tcp_connections.front()); -} - -#include <sys/resource.h> - void NetconEthernetTap::threadMain() throw() { + dwr(MSG_DEBUG, "MEMP_NUM_REASSDATA = %d\n", MEMP_NUM_REASSDATA); uint64_t prev_tcp_time = 0; uint64_t prev_status_time = 0; uint64_t prev_etharp_time = 0; -/* - fprintf(stderr, "- MEM_SIZE = %dM\n", MEM_SIZE / (1024*1024)); - fprintf(stderr, "- PBUF_POOL_SIZE = %d\n", PBUF_POOL_SIZE); - fprintf(stderr, "- PBUF_POOL_BUFSIZE = %d\n", PBUF_POOL_BUFSIZE); - fprintf(stderr, "- MEMP_NUM_PBUF = %d\n", MEMP_NUM_PBUF); - fprintf(stderr, "- MEMP_NUM_TCP_PCB = %d\n", MEMP_NUM_TCP_PCB); - fprintf(stderr, "- MEMP_NUM_TCP_PCB_LISTEN = %d\n", MEMP_NUM_TCP_PCB_LISTEN); - fprintf(stderr, "- MEMP_NUM_TCP_SEG = %d\n\n", MEMP_NUM_TCP_SEG); - - fprintf(stderr, "- TCP_SND_BUF = %dK\n", TCP_SND_BUF / 1024); - fprintf(stderr, "- TCP_SND_QUEUELEN = %d\n\n", TCP_SND_QUEUELEN); - - fprintf(stderr, "- TCP_WND = %d\n", TCP_WND); - fprintf(stderr, "- TCP_MSS = %d\n", TCP_MSS); - fprintf(stderr, "- TCP_MAXRTX = %d\n", TCP_MAXRTX); - fprintf(stderr, "- IP_REASSEMBLY = %d\n\n", IP_REASSEMBLY); - fprintf(stderr, "- ARP_TMR_INTERVAL = %d\n", ARP_TMR_INTERVAL); - fprintf(stderr, "- TCP_TMR_INTERVAL = %d\n", TCP_TMR_INTERVAL); - fprintf(stderr, "- IP_TMR_INTERVAL = %d\n", IP_TMR_INTERVAL); -*/ - // Main timer loop while (_run) { uint64_t now = OSUtils::now(); @@ -531,58 +365,39 @@ void NetconEthernetTap::threadMain() // Connection prunning if (since_status >= STATUS_TMR_INTERVAL) { - //compact_dump(); prev_status_time = now; status_remaining = STATUS_TMR_INTERVAL - since_status; - if(rpc_sockets.size() || tcp_connections.size()) { - - // dump(); - // Here we will periodically check the list of rpc_sockets for those that - // do not currently have any data connection associated with them. If they are - // unused, then we will try to read from them, if they fail, we can safely assume - // that the client has closed their end and we can close ours - for(size_t i = 0; i<tcp_connections.size(); i++) { - if(tcp_connections[i]->listening) { - char c; - if (read(_phy.getDescriptor(tcp_connections[i]->dataSock), &c, 1) < 0) { - // Still in listening state - } - else if (read(_phy.getDescriptor(tcp_connections[i]->rpcSock), &c, 1) < 0) { - // Still in listening state - } - else { - // Here we should handle the case there there is incoming data (?) - dwr(MSG_DEBUG, " tap_thread(): Listening socketpair closed. Removing RPC connection (%d)\n", - _phy.getDescriptor(tcp_connections[i]->dataSock)); - closeConnection(tcp_connections[i]); - } - } - } - } - for(size_t i=0, associated = 0; i<rpc_sockets.size(); i++, associated = 0) { - for(size_t j=0; j<tcp_connections.size(); j++) { - if (tcp_connections[j]->rpcSock == rpc_sockets[i]) - associated++; - } - if(!associated){ - // No TCP connections are associated, this is a candidate for removal - int fd = _phy.getDescriptor(rpc_sockets[i]); - fcntl(fd, F_SETFL, O_NONBLOCK); - unsigned char tmpbuf[BUF_SZ]; - int n; - if((n = read(fd,&tmpbuf,BUF_SZ)) < 0) { - dwr(MSG_DEBUG, " tap_thread(): closing RPC (%d)\n", _phy.getDescriptor(rpc_sockets[i])); - closeClient(rpc_sockets[i]); - } - // < 0 is failure - // 0 nothing to read, RPC still active - // > 0 RPC data read, handle it - else if (n > 0) { - // Handle RPC call, this is rare - dwr(MSG_DEBUG, " tap_thread(): RPC read during connection check (%d bytes)\n", n); - phyOnUnixData(rpc_sockets[i],_phy.getuptr(rpc_sockets[i]),&tmpbuf,BUF_SZ); + + dwr(MSG_DEBUG," tap_thread(): tcp\\jobs\\socks = {%d, %d, %d}\n", tcp_connections.size(), jobmap.size(), sockmap.size()); + for(size_t i=0; i<tcp_connections.size(); i++) { + + // No TCP connections are associated, this is a candidate for removal + if(!tcp_connections[i]->sock) + continue; // Skip, this is a pending connection + int fd = _phy.getDescriptor(tcp_connections[i]->sock); + + if(tcp_connections[i]->idx > 0){ + dwr(MSG_DEBUG, "----------------------writing from poll\n"); + lwipstack->_lock.lock(); + handle_write(tcp_connections[i]); + lwipstack->_lock.unlock(); } + + fcntl(fd, F_SETFL, O_NONBLOCK); + unsigned char tmpbuf[BUF_SZ]; + int n; + + if((n = read(fd,&tmpbuf,BUF_SZ)) < 0 && errno != EAGAIN) { + dwr(MSG_DEBUG," tap_thread(): closing sock (%x)\n", tcp_connections[i]->sock); + closeConnection(tcp_connections[i]->sock); } + // < 0 is failure + // 0 nothing to read, RPC still active + // > 0 RPC data read, handle it + else if (n > 0) { + dwr(MSG_DEBUG," tap_thread(): data read during connection check (%d bytes)\n", n); + phyOnUnixData(tcp_connections[i]->sock,_phy.getuptr(tcp_connections[i]->sock),&tmpbuf,BUF_SZ); + } } } // Main TCP/ETHARP timer section @@ -600,7 +415,6 @@ void NetconEthernetTap::threadMain() } _phy.poll((unsigned long)std::min(tcp_remaining,etharp_remaining)); } - closeAll(); dlclose(lwipstack->_libref); } @@ -612,10 +426,37 @@ void NetconEthernetTap::phyOnTcpClose(PhySocket *sock,void **uptr) {} void NetconEthernetTap::phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} void NetconEthernetTap::phyOnTcpWritable(PhySocket *sock,void **uptr) {} +/* + * Closes a TcpConnection and associated LWIP PCB strcuture. + */ +void NetconEthernetTap::closeConnection(PhySocket *sock) +{ + TcpConnection *conn = getConnection(sock); + if(conn) { + if(!conn->pcb) + return; + // tell LWIP to close the associated PCB + if(conn->pcb->state != CLOSED && lwipstack->_tcp_close(conn->pcb) != ERR_OK) { + dwr(MSG_ERROR," closeConnection(): Error while calling tcp_close()\n"); + } + // remove from connection list + for(size_t i=0; i<tcp_connections.size(); i++) { + if(tcp_connections[i]->sock == sock){ + tcp_connections.erase(tcp_connections.begin() + i); + //delete conn; + break; + } + } + } + if(!sock) + return; + close(_phy.getDescriptor(sock)); // close underlying fd + _phy.close(sock, false); // close PhySocket +} + void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) { - dwr(MSG_DEBUG, " phyOnUnixClose(sock=0x%x, uptr=0x%x): fd = %d\n", sock, uptr, _phy.getDescriptor(sock)); - TcpConnection *conn = (TcpConnection*)*uptr; - closeConnection(conn); + dwr(MSG_DEBUG,"\nphyOnUnixClose(): close connection = %x\n", sock); + closeConnection(sock); } /* @@ -623,129 +464,224 @@ void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) { */ void NetconEthernetTap::phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) { - if(readable) { - TcpConnection *conn = (TcpConnection*)*uptr; - if(conn->dataSock) { // Sometimes a connection may be closed via nc_recved, check first - lwipstack->_lock.lock(); - handle_write(conn); - lwipstack->_lock.unlock(); - } - } - else { - dwr(MSG_ERROR, "phyOnFileDescriptorActivity(): PhySocket not readable\n"); - } + dwr(MSG_DEBUG,"\nphyOnFileDescriptorActivity(): new connection = %x\n", sock); } /* * Add a new PhySocket for the client connections */ void NetconEthernetTap::phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) { - dwr(MSG_DEBUG, " phyOnUnixAccept(): accepting new connection\n"); - if(find(rpc_sockets.begin(), rpc_sockets.end(), sockN) != rpc_sockets.end()){ - dwr(MSG_ERROR, " phyOnUnixAccept(): SockN (0x%x) already exists!\n", sockN); - return; - } - rpc_sockets.push_back(sockN); + dwr(MSG_DEBUG,"\nphyOnUnixAccept(): new connection = %x\n", sockN); +} + +/* Unpacks the buffer from an RPC command */ +void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid, + int &rpc_count, char (timestamp[20]), char (magic[sizeof(uint64_t)]), char &cmd, void* &payload) +{ + unsigned char *buf = (unsigned char*)data; + memcpy(&pid, &buf[IDX_PID], sizeof(pid_t)); + memcpy(&tid, &buf[IDX_TID], sizeof(pid_t)); + memcpy(&rpc_count, &buf[IDX_COUNT], sizeof(int)); + memcpy(timestamp, &buf[IDX_TIME], 20); + memcpy(&cmd, &buf[IDX_PAYLOAD], sizeof(char)); + memcpy(magic, &buf[IDX_PAYLOAD+1], MAGIC_SIZE); } /* * Processes incoming data on a client-specific RPC connection */ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) -{ +{ + //usleep(5000); + //dwr(MSG_DEBUG,"\n\n\n<%x> phyOnUnixData(): len = %d\n", sock, len); + uint64_t magic_num; pid_t pid, tid; int rpc_count; - char cmd, timestamp[20]; + char cmd, timestamp[20], magic[MAGIC_SIZE]; void *payload; - unload_rpc(data, pid, tid, rpc_count, timestamp, cmd, payload); - dwr(MSG_DEBUG, "\n\nRPC: (pid=%d, tid=%d, rpc_count=%d, timestamp=%s, cmd=%d\n", pid, tid, rpc_count, timestamp, cmd); unsigned char *buf = (unsigned char*)data; + std::pair<PhySocket*, void*> sockdata; + PhySocket *streamsock, *rpcsock; + bool found_sock = false, found_job = false; + + + TcpConnection *conn; + int max_sndbuf = (float)TCP_SND_BUF; + int wlen = len; + + // RPC + if(buf[IDX_SIGNAL_BYTE] == 'R') { + unload_rpc(data, pid, tid, rpc_count, timestamp, magic, cmd, payload); + memcpy(&magic_num, magic, MAGIC_SIZE); + dwr(MSG_DEBUG," <%x> RPC: (pid=%d, tid=%d, rpc_count=%d, timestamp=%s, cmd=%d)\n", sock, pid, tid, rpc_count, timestamp, cmd); + if(cmd == RPC_SOCKET) { + dwr(MSG_DEBUG," <%x> RPC_SOCKET\n", sock); + // Create new lwip socket and associate it with this sock + struct socket_st socket_rpc; + memcpy(&socket_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct socket_st)); + TcpConnection * new_conn; + if((new_conn = handle_socket(sock, uptr, &socket_rpc))) { + pidmap[sock] = pid; + new_conn->pid = pid; + } + return; // Don't close the socket, we'll use this later for data + } + else { // All RPCs other than RPC_SOCKET + streamsock = sockmap[magic_num]; + if(streamsock){ // We found a pre-existing stream socket for this RPC + sockmap[magic_num] = NULL; + found_sock = true; + dwr(MSG_DEBUG," <%x> found_sock\n", sock); + } + else { + // No matching stream has been encountered, create jobmap entry + dwr(MSG_DEBUG," <%x> creating jobmap (cmd=%d) entry for %llu\n", sock, cmd, magic_num); + jobmap[magic_num] = std::make_pair<PhySocket*, void*>(sock, data); + } + } + } + + // STREAM + else { + int data_start = -1, data_end = -1, token_pos = -1, padding_pos = -1; + char padding[] = {0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89}; + dwr(MSG_DEBUG," <%x> stream data, len = %d\n", sock, len); + // Look for padding + std::string padding_pattern(padding, padding+MAGIC_PADDING_SIZE); + std::string buffer(buf, buf + len); + padding_pos = buffer.find(padding_pattern); + token_pos = padding_pos-MAGIC_SIZE; + dwr(MSG_DEBUG, " <%x> padding_pos = %d\n", sock, padding_pos); + // Grab token, next we'll use it to look up an RPC job + if(token_pos > -1) { + dwr(MSG_DEBUG, " <%x> token_pos = %d, GRABBING TOKEN\n", sock, token_pos); + memcpy(&magic_num, buf+token_pos, MAGIC_SIZE); + if(magic_num != 0) { // TODO: Added to address magic_num==0 bug, last seeen 20160108 + // Find job + sockdata = jobmap[magic_num]; + if(!sockdata.first) { // Stream before RPC + dwr(MSG_DEBUG," <%x> creating sockmap entry for %llu\n", sock, magic_num); + sockmap[magic_num] = sock; + } + else { + dwr(MSG_DEBUG," <%x> found_job\n", sock); + found_job = true; + } + } + } - switch(cmd) - { - case RPC_SOCKET: - dwr(MSG_DEBUG, "RPC_SOCKET\n"); - struct socket_st socket_rpc; - memcpy(&socket_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct socket_st)); - - if(rpc_count==rpc_counter) { - dwr(MSG_ERROR, "Detected repeat RPC.\n"); - //return; - } - else { - rpc_counter = rpc_count; - } + conn = getConnection(sock); + if(!conn) + return; - TcpConnection * new_conn; - if((new_conn = handle_socket(sock, uptr, &socket_rpc))) { - pidmap[sock] = pid; - new_conn->pid = pid; + if(padding_pos == -1) // [DATA] + { + dwr(MSG_DEBUG, "copy everything... wlen = %d, conn = %x, conn->buf = %x, buf = %x\n", wlen, conn, conn->buf, buf); + dwr(MSG_DEBUG, " copy everything... conn->idx = %d, sizeof(conn->buf) = %d\n", conn->idx, sizeof(conn->buf)); + memcpy(&conn->buf[conn->idx], buf, wlen); + dwr(MSG_DEBUG, "finished\n"); } - break; - case RPC_LISTEN: - dwr(MSG_DEBUG, "RPC_LISTEN\n"); - struct listen_st listen_rpc; - memcpy(&listen_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct listen_st)); - handle_listen(sock, uptr, &listen_rpc); - break; - case RPC_BIND: - dwr(MSG_DEBUG, "RPC_BIND\n"); - struct bind_st bind_rpc; - memcpy(&bind_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct bind_st)); - handle_bind(sock, uptr, &bind_rpc); - break; - case RPC_CONNECT: - dwr(MSG_DEBUG, "RPC_CONNECT\n"); - struct connect_st connect_rpc; - memcpy(&connect_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct connect_st)); - handle_connect(sock, uptr, &connect_rpc); - break; - case RPC_MAP: - dwr(MSG_DEBUG, "RPC_MAP (len = %d)\n", len); - int newfd; - memcpy(&newfd, &buf[IDX_PAYLOAD+1], sizeof(int)); - handle_retval(sock, uptr, rpc_count, newfd); - break; - case RPC_MAP_REQ: - dwr(MSG_DEBUG, "RPC_MAP_REQ\n"); - handle_map_request(sock, uptr, buf); - break; - case RPC_GETSOCKNAME: - dwr(MSG_DEBUG, "RPC_GETSOCKNAME\n"); - struct getsockname_st getsockname_rpc; - memcpy(&getsockname_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct getsockname_st)); - handle_getsockname(sock, uptr, &getsockname_rpc); - break; - default: - dwr(MSG_ERROR, "POSSIBLE RPC CORRUPTION. TRY AGAIN!\n"); - break; + else { // Padding found, implies a token is present + + dwr(MSG_DEBUG, " <%x> token_pos = %d, GRABBING DATA\n", sock, token_pos); + + // [TOKEN] + if(len == TOKEN_SIZE && token_pos == 0) { + wlen = 0; // Nothing to write + } + else { + // [TOKEN] + [DATA] + if(len > TOKEN_SIZE && token_pos == 0) { + wlen = len - TOKEN_SIZE; + data_start = padding_pos+MAGIC_PADDING_SIZE; + memcpy((&conn->buf)+conn->idx, buf+data_start, wlen); + dwr(MSG_DEBUG," wlen = %d, data_start = %d\n", wlen, data_start); + } + // [DATA] + [TOKEN] + if(len > TOKEN_SIZE && token_pos > 0 && token_pos == len - TOKEN_SIZE) { + wlen = len - TOKEN_SIZE; + data_start = 0; + memcpy((&conn->buf)+conn->idx, buf+data_start, wlen); + dwr(MSG_DEBUG," wlen = %d, data_start = %d\n", wlen, data_start); + } + // [DATA] + [TOKEN] + [DATA] + if(len > TOKEN_SIZE && token_pos > 0 && len > (token_pos + TOKEN_SIZE)) { + wlen = len - TOKEN_SIZE; + data_start = 0; + data_end = padding_pos-MAGIC_SIZE; + memcpy((&conn->buf)+conn->idx, buf+data_start, (data_end-data_start)+1); + memcpy((&conn->buf)+conn->idx, buf+(padding_pos+MAGIC_PADDING_SIZE), len-(token_pos+TOKEN_SIZE)); + dwr(MSG_DEBUG," wlen = %d, data_start = %d, data_end = %d\n", wlen, data_start, data_end); + } + } + } + lwipstack->_lock.lock(); + conn->idx += wlen; + handle_write(conn); + lwipstack->_lock.unlock(); } -} -/* - * Send a 'retval' and 'errno' to the client for an RPC over connection->rpcSock - */ -int NetconEthernetTap::send_return_value(TcpConnection *conn, int retval, int _errno = 0) -{ - if(conn) { - int n = send_return_value(_phy.getDescriptor(conn->rpcSock), retval, _errno); - if(n > 0) - conn->pending = false; - else { - dwr(MSG_ERROR, " Unable to send return value to the intercept. Closing connection\n"); - closeConnection(conn); + if(found_job) { + rpcsock = sockdata.first; + buf = (unsigned char*)sockdata.second; + } + else if(found_sock) { + rpcsock = sock; + sock = streamsock; + } + + // Process RPC if we have a corresponding jobmap/sockmap entry + if(found_job || found_sock) + { + conn = getConnection(sock); + unload_rpc(buf, pid, tid, rpc_count, timestamp, magic, cmd, payload); + dwr(MSG_DEBUG," <%x> RPC: (pid=, tid=, rpc_count=, timestamp=, cmd=%d)\n", sock, /*pid, tid, rpc_count, timestamp, */cmd); + + switch(cmd) { + case RPC_BIND: + dwr(MSG_DEBUG," <%x> RPC_BIND\n", sock); + struct bind_st bind_rpc; + memcpy(&bind_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct bind_st)); + handle_bind(sock, rpcsock, uptr, &bind_rpc); + break; + case RPC_LISTEN: + dwr(MSG_DEBUG," <%x> RPC_LISTEN\n", sock); + struct listen_st listen_rpc; + memcpy(&listen_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct listen_st)); + handle_listen(sock, rpcsock, uptr, &listen_rpc); + break; + case RPC_GETSOCKNAME: + dwr(MSG_DEBUG," <%x> RPC_GETSOCKNAME\n", sock); + struct getsockname_st getsockname_rpc; + memcpy(&getsockname_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct getsockname_st)); + handle_getsockname(sock, rpcsock, uptr, &getsockname_rpc); + break; + case RPC_CONNECT: + dwr(MSG_DEBUG," <%x> RPC_CONNECT\n", sock); + struct connect_st connect_rpc; + memcpy(&connect_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct connect_st)); + handle_connect(sock, rpcsock, conn, &connect_rpc); + return; // Keep open RPC, we'll use it once in nc_connected to send retval + default: + break; } - return n; + closeConnection(sockdata.first); // close RPC after sending retval, no longer needed + jobmap.erase(magic_num); + sockmap.erase(magic_num); + return; } - return -1; +} + +int NetconEthernetTap::send_return_value(PhySocket *sock, int retval, int _errno = 0){ + return send_return_value(_phy.getDescriptor(sock), retval, _errno); } int NetconEthernetTap::send_return_value(int fd, int retval, int _errno = 0) { - dwr(MSG_DEBUG, " send_return_value(): fd = %d, retval = %d, errno = %d\n", fd, retval, _errno); + dwr(MSG_DEBUG," send_return_value(): fd = %d, retval = %d, errno = %d\n", fd, retval, _errno); int sz = sizeof(char) + sizeof(retval) + sizeof(errno); char retmsg[sz]; - memset(&retmsg, '\0', sizeof(retmsg)); + memset(&retmsg, 0, sizeof(retmsg)); retmsg[0]=RPC_RETVAL; memcpy(&retmsg[1], &retval, sizeof(retval)); memcpy(&retmsg[1]+sizeof(retval), &_errno, sizeof(_errno)); @@ -802,52 +738,52 @@ int NetconEthernetTap::send_return_value(int fd, int retval, int _errno = 0) */ err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newpcb, err_t err) { - dwr(MSG_DEBUG, " nc_accept()\n"); + dwr(MSG_DEBUG," nc_accept()\n"); Larg *l = (Larg*)arg; TcpConnection *conn = l->conn; NetconEthernetTap *tap = l->tap; - int listening_fd = tap->_phy.getDescriptor(conn->dataSock); - - if(conn) { - ZT_PHY_SOCKFD_TYPE fds[2]; - if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) { - if(errno < 0) { - l->tap->send_return_value(conn, -1, errno); - dwr(MSG_ERROR, " nc_accept(): unable to create socketpair\n"); - return ERR_MEM; + + if(!conn->sock) + return -1; + int listening_fd = tap->_phy.getDescriptor(conn->sock); + + if(conn) { + // create new socketpair + ZT_PHY_SOCKFD_TYPE fds[2]; + if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) { + if(errno < 0) { + l->tap->send_return_value(conn, -1, errno); + dwr(MSG_ERROR," nc_accept(): unable to create socketpair\n"); + return ERR_MEM; + } } - } - TcpConnection *new_tcp_conn = new TcpConnection(); - new_tcp_conn->dataSock = tap->_phy.wrapSocket(fds[0], new_tcp_conn); - new_tcp_conn->rpcSock = conn->rpcSock; - new_tcp_conn->pcb = newpcb; - new_tcp_conn->their_fd = fds[1]; - tap->tcp_connections.push_back(new_tcp_conn); - dwr(MSG_DEBUG, " nc_accept(): socketpair = {%d, %d}\n", fds[0], fds[1]); - int send_fd = tap->_phy.getDescriptor(conn->rpcSock); - -dwr(MSG_DEBUG, " nc_accept(): sending %d via %d\n", fds[1], listening_fd); - - if(sock_fd_write(listening_fd, fds[1]) < 0){ - dwr(MSG_ERROR, " nc_accept(%d): error writing signal byte (listen_fd = %d, perceived_fd = %d)\n", listening_fd, send_fd, fds[1]); - return -1; - } - else { - close(fds[1]); // close other end of socketpair - new_tcp_conn->pending = true; - } - tap->lwipstack->_tcp_arg(newpcb, new Larg(tap, new_tcp_conn)); - tap->lwipstack->_tcp_recv(newpcb, nc_recved); - tap->lwipstack->_tcp_err(newpcb, nc_err); - tap->lwipstack->_tcp_sent(newpcb, nc_sent); - tap->lwipstack->_tcp_poll(newpcb, nc_poll, 1); - tcp_accepted(conn->pcb); // Let lwIP know that it can queue additional incoming connections - return ERR_OK; - } - else { - dwr(MSG_ERROR, " nc_accept(%d): can't locate Connection object for PCB.\n", listening_fd); - } - return -1; + // create and populate new TcpConnection + TcpConnection *new_tcp_conn = new TcpConnection(); + tap->tcp_connections.push_back(new_tcp_conn); + new_tcp_conn->pcb = newpcb; + new_tcp_conn->sock = tap->_phy.wrapSocket(fds[0], new_tcp_conn); + + if(sock_fd_write(listening_fd, fds[1]) < 0) + return -1; + else { + //close(fds[1]); // close other end of socketpair + new_tcp_conn->pending = true; + } + tap->lwipstack->_tcp_arg(newpcb, new Larg(tap, new_tcp_conn)); + tap->lwipstack->_tcp_recv(newpcb, nc_recved); + tap->lwipstack->_tcp_err(newpcb, nc_err); + tap->lwipstack->_tcp_sent(newpcb, nc_sent); + tap->lwipstack->_tcp_poll(newpcb, nc_poll, 1); + if(conn->pcb->state == LISTEN) { + dwr(MSG_DEBUG," nc_accept(): Can't call tcp_accept() on LISTEN socket (pcb = %x)\n", conn->pcb); + return ERR_OK; // TODO: Verify this is correct + } + tcp_accepted(conn->pcb); // Let lwIP know that it can queue additional incoming connections + return ERR_OK; + } + else + dwr(MSG_ERROR," nc_accept(%d): can't locate Connection object for PCB.\n", listening_fd); + return -1; } /* @@ -865,44 +801,46 @@ dwr(MSG_DEBUG, " nc_accept(): sending %d via %d\n", fds[1], listening_fd); */ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err) { - dwr(MSG_DEBUG, " nc_recved()\n"); + dwr(MSG_DEBUG," nc_recved()\n"); Larg *l = (Larg*)arg; int n; - struct pbuf* q = p; + struct pbuf* q = p; - if(!l->conn) { - dwr(MSG_ERROR, " nc_recved(): no connection object\n"); - return ERR_OK; - } - if(p == NULL) { - if(l->conn && !l->conn->listening) { - dwr(MSG_INFO, " nc_recved(): closing connection\n"); - l->tap->closeConnection(l->conn); - return ERR_ABRT; - } - else { - dwr(MSG_ERROR, " nc_recved(): can't locate connection via (arg)\n"); - } - return err; - } - q = p; - while(p != NULL) { // Cycle through pbufs and write them to the socket - if(p->len <= 0) - break; - if((n = l->tap->_phy.streamSend(l->conn->dataSock,p->payload, p->len)) > 0) { - if(n < p->len) { - dwr(MSG_INFO, " nc_recved(): unable to write entire pbuf to buffer\n"); - } - l->tap->lwipstack->_tcp_recved(tpcb, n); // TODO: would it be more efficient to call this once at the end? - dwr(MSG_DEBUG, " nc_recved(): wrote %d bytes to (%d)\n", n, l->tap->_phy.getDescriptor(l->conn->dataSock)); - } - else { - dwr(MSG_INFO, " nc_recved(): No data written to intercept buffer (%d)\n", l->tap->_phy.getDescriptor(l->conn->dataSock)); - } - p = p->next; - } - l->tap->lwipstack->_pbuf_free(q); // free pbufs - return ERR_OK; + if(!l->conn) { + dwr(MSG_ERROR," nc_recved(): no connection\n"); + return ERR_OK; + } + if(p == NULL) { + if(l->conn && !l->conn->listening) { + dwr(MSG_INFO," nc_recved(): closing connection\n"); + //if(l->tap->lwipstack->_tcp_close(l->conn->pcb) != ERR_OK) { + // dwr(MSG_ERROR," closeConnection(): Error while calling tcp_close()\n"); + //} + l->tap->closeConnection(l->conn->sock); + return ERR_ABRT; + } + else { + dwr(MSG_ERROR," nc_recved(): invalid connection/state\n"); + } + return err; + } + q = p; + while(p != NULL) { // Cycle through pbufs and write them to the socket + if(p->len <= 0) + break; + if((n = l->tap->_phy.streamSend(l->conn->sock,p->payload, p->len)) > 0) { + if(n < p->len) { + dwr(MSG_INFO," nc_recved(): unable to write entire pbuf to stream\n"); + } + l->tap->lwipstack->_tcp_recved(tpcb, n); + dwr(MSG_DEBUG," nc_recved(): wrote %d bytes to <%x>\n", n, l->conn->sock); + } + else + dwr(MSG_INFO," nc_recved(): No data written to stream <%d>\n", l->conn->sock); + p = p->next; + } + l->tap->lwipstack->_pbuf_free(q); // free pbufs + return ERR_OK; } /* @@ -917,79 +855,76 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf */ void NetconEthernetTap::nc_err(void *arg, err_t err) { - dwr(MSG_DEBUG, "nc_err()\n"); + dwr(MSG_DEBUG,"nc_err() = %d\n", err); Larg *l = (Larg*)arg; if(!l->conn) - dwr(MSG_ERROR, "nc_err(): Connection is NULL!\n"); - - if(l->conn->listening) - return; + dwr(MSG_ERROR,"nc_err(): Connection is NULL!\n"); switch(err) { case ERR_MEM: - dwr(MSG_ERROR, "nc_err(): ERR_MEM->ENOMEM\n"); - l->tap->send_return_value(l->conn, -1, ENOMEM); + dwr(MSG_ERROR,"nc_err(): ERR_MEM->ENOMEM\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, ENOMEM); break; case ERR_BUF: - dwr(MSG_ERROR, "nc_err(): ERR_BUF->ENOBUFS\n"); - l->tap->send_return_value(l->conn, -1, ENOBUFS); + dwr(MSG_ERROR,"nc_err(): ERR_BUF->ENOBUFS\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, ENOBUFS); break; case ERR_TIMEOUT: - dwr(MSG_ERROR, "nc_err(): ERR_TIMEOUT->ETIMEDOUT\n"); - l->tap->send_return_value(l->conn, -1, ETIMEDOUT); + dwr(MSG_ERROR,"nc_err(): ERR_TIMEOUT->ETIMEDOUT\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, ETIMEDOUT); break; case ERR_RTE: - dwr(MSG_ERROR, "nc_err(): ERR_RTE->ENETUNREACH\n"); - l->tap->send_return_value(l->conn, -1, ENETUNREACH); + dwr(MSG_ERROR,"nc_err(): ERR_RTE->ENETUNREACH\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, ENETUNREACH); break; case ERR_INPROGRESS: - dwr(MSG_ERROR, "nc_err(): ERR_INPROGRESS->EINPROGRESS\n"); - l->tap->send_return_value(l->conn, -1, EINPROGRESS); + dwr(MSG_ERROR,"nc_err(): ERR_INPROGRESS->EINPROGRESS\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, EINPROGRESS); break; case ERR_VAL: - dwr(MSG_ERROR, "nc_err(): ERR_VAL->EINVAL\n"); - l->tap->send_return_value(l->conn, -1, EINVAL); + dwr(MSG_ERROR,"nc_err(): ERR_VAL->EINVAL\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, EINVAL); break; case ERR_WOULDBLOCK: - dwr(MSG_ERROR, "nc_err(): ERR_WOULDBLOCK->EWOULDBLOCK\n"); - l->tap->send_return_value(l->conn, -1, EWOULDBLOCK); + dwr(MSG_ERROR,"nc_err(): ERR_WOULDBLOCK->EWOULDBLOCK\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, EWOULDBLOCK); break; case ERR_USE: - dwr(MSG_ERROR, "nc_err(): ERR_USE->EADDRINUSE\n"); - l->tap->send_return_value(l->conn, -1, EADDRINUSE); + dwr(MSG_ERROR,"nc_err(): ERR_USE->EADDRINUSE\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, EADDRINUSE); break; case ERR_ISCONN: - dwr(MSG_ERROR, "nc_err(): ERR_ISCONN->EISCONN\n"); - l->tap->send_return_value(l->conn, -1, EISCONN); + dwr(MSG_ERROR,"nc_err(): ERR_ISCONN->EISCONN\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, EISCONN); break; case ERR_ABRT: - dwr(MSG_ERROR, "nc_err(): ERR_ABRT->ECONNREFUSED\n"); - l->tap->send_return_value(l->conn, -1, ECONNREFUSED); + dwr(MSG_ERROR,"nc_err(): ERR_ABRT->ECONNREFUSED\n"); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, ECONNREFUSED); break; // FIXME: Below are errors which don't have a standard errno correlate case ERR_RST: - l->tap->send_return_value(l->conn, -1, -1); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, -1); break; case ERR_CLSD: - l->tap->send_return_value(l->conn, -1, -1); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, -1); break; case ERR_CONN: - l->tap->send_return_value(l->conn, -1, -1); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, -1); break; case ERR_ARG: - l->tap->send_return_value(l->conn, -1, -1); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, -1); break; case ERR_IF: - l->tap->send_return_value(l->conn, -1, -1); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->sock), -1, -1); break; default: break; } - dwr(MSG_ERROR, "nc_err(): closing connection\n"); - l->tap->closeConnection(l->conn); + //dwr(MSG_ERROR,"nc_err(): closing connection\n"); + //l->tap->closeConnection(l->conn); } /* @@ -1022,8 +957,9 @@ err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *tpcb, u16_t len) { Larg *l = (Larg*)arg; if(len) { + dwr(MSG_DEBUG,"nc_sent(ACKED): len = %d\n",len); l->conn->acked+=len; - l->tap->_phy.setNotifyReadable(l->conn->dataSock, true); + l->tap->_phy.setNotifyReadable(l->conn->sock, true); l->tap->_phy.whack(); } return ERR_OK; @@ -1041,9 +977,9 @@ err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *tpcb, u16_t len) */ err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *tpcb, err_t err) { - dwr(MSG_DEBUG, " nc_connected()\n"); + dwr(MSG_DEBUG," nc_connected()\n"); Larg *l = (Larg*)arg; - l->tap->send_return_value(l->conn, ERR_OK); + l->tap->send_return_value(l->tap->_phy.getDescriptor(l->conn->rpcsock), ERR_OK); return ERR_OK; } @@ -1051,126 +987,16 @@ err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *tpcb, err_t err ----------------------------- RPC Handler functions ---------------------------- ------------------------------------------------------------------------------*/ -/* Unpacks the buffer from an RPC command */ -void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid, int &rpc_count, char (timestamp[20]), char &cmd, void* &payload) -{ - unsigned char *buf = (unsigned char*)data; - memcpy(&pid, &buf[IDX_PID], sizeof(pid_t)); - memcpy(&tid, &buf[IDX_TID], sizeof(pid_t)); - memcpy(&rpc_count, &buf[IDX_COUNT], sizeof(int)); - memcpy(timestamp, &buf[IDX_TIME], 20); - memcpy(&cmd, &buf[IDX_PAYLOAD], sizeof(char)); -} - -/* - Responds to a request from the [intercept] to determine whether a local socket is - mapped to this service. In other words, how do the intercept's overridden calls - tell the difference between regular AF_LOCAL sockets and one of our socketpairs - that is used to communicate over the network? -*/ -void NetconEthernetTap::handle_map_request(PhySocket *sock, void **uptr, unsigned char* buf) -{ - dwr(4, " handle_map_request()\n"); - TcpConnection *conn = (TcpConnection*)*uptr; - int req_fd; - memcpy(&req_fd, &buf[IDX_PAYLOAD+1], sizeof(req_fd)); - for(size_t i=0; i<tcp_connections.size(); i++) { - if(tcp_connections[i]->rpcSock == conn->rpcSock && tcp_connections[i]->perceived_fd == req_fd){ - send_return_value(conn, 1, ERR_OK); // True - dwr(MSG_DEBUG, " handle_map_request(their=%d): MAPPED (to %d)\n", req_fd, - _phy.getDescriptor(tcp_connections[i]->dataSock)); - return; - } - } - send_return_value(conn, 0, ERR_OK); // False - dwr(MSG_DEBUG, " handle_map_request(their=%d): NOT MAPPED\n", req_fd); -} - -/** - * Handles a return value (client's perceived fd) and completes a mapping - * so that we know what connection an RPC call should be associated with. - * - * @param PhySocket associated with this RPC connection - * @param structure containing the data and parameters for this client's RPC - * - */ -void NetconEthernetTap::handle_retval(PhySocket *sock, void **uptr, int rpc_count, int newfd) -{ - dwr(MSG_DEBUG, " handle_retval()\n"); - TcpConnection *conn = (TcpConnection*)*uptr; - if(!conn->pending){ - send_return_value(conn, -1, -1); - return; - } - conn->pending = false; - conn->perceived_fd = newfd; - if(rpc_count==rpc_counter) { - dwr(MSG_ERROR, " handle_retval(): Detected repeat RPC.\n"); - send_return_value(conn, -1, -1); - //return; - } - else - rpc_counter = rpc_count; - - dwr(MSG_DEBUG, " handle_retval(): CONN:%x - Mapping [our=%d -> their=%d]\n",conn, - _phy.getDescriptor(conn->dataSock), conn->perceived_fd); - - /* Check for pre-existing connection for this socket --- - This block is in response to interesting behaviour from redis-server. A - socket is created, setsockopt is called and the socket is set to IPV6 but fails (for now), - then it is closed and re-opened and consequently remapped. With two pipes mapped - to the same socket, makes it possible that we write to the wrong pipe and fail. So - this block merely searches for a possible duplicate mapping and erases it - */ - for(size_t i=0; i<tcp_connections.size(); i++) { - if(tcp_connections[i] == conn) - continue; - if(tcp_connections[i]->rpcSock == conn->rpcSock) { - if(tcp_connections[i]->perceived_fd == conn->perceived_fd) { - int n; - if((n = send(_phy.getDescriptor(tcp_connections[i]->dataSock), "z", 1, MSG_NOSIGNAL)) < 0) { - dwr(MSG_DEBUG, " handle_retval(): CONN:%x - Socket (%d) already mapped (originally CONN:%x)\n", conn, tcp_connections[i]->perceived_fd, tcp_connections[i]); - closeConnection(tcp_connections[i]); - } - else { - dwr(MSG_ERROR, " handle_retval(): CONN:%x - This socket is mapped to two different pipes (?). Exiting.\n", conn); - //die(0); // FIXME: Print service mapping state and exit - } - } - } - } - send_return_value(conn, ERR_OK, ERR_OK); // Success -} - - /* Return the address that the socket is bound to */ -void NetconEthernetTap::handle_getsockname(PhySocket *sock, void **uptr, struct getsockname_st *getsockname_rpc) +void NetconEthernetTap::handle_getsockname(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct getsockname_st *getsockname_rpc) { - TcpConnection *conn = getConnectionByTheirFD(sock, getsockname_rpc->sockfd); - dwr(MSG_DEBUG, " handle_getsockname(): sockfd = %d\n", getsockname_rpc->sockfd); - dwr(MSG_DEBUG, " handle_getsockname(): conn = 0x%x\n", conn); - - /* - if(!conn){ - return; - } - struct sockaddr_in * myaddr = (struct sockaddr_in*)conn->addr; - int port = myaddr->sin_port; - int ip = myaddr->sin_addr.s_addr; - unsigned char d[4]; - d[0] = ip & 0xFF; - d[1] = (ip >> 8) & 0xFF; - d[2] = (ip >> 16) & 0xFF; - d[3] = (ip >> 24) & 0xFF; - dwr(MSG_ERROR, " handle_getsockname(): addr = %d.%d.%d.%d: %d\n", d[0],d[1],d[2],d[3], lwipstack->ntohs(port)); - */ + TcpConnection *conn = getConnection(sock); // Assemble address "command" to send to intercept char retmsg[sizeof(struct sockaddr_storage)]; memset(&retmsg, 0, sizeof(retmsg)); if ((conn)&&(conn->addr)) memcpy(&retmsg, conn->addr, sizeof(struct sockaddr_storage)); - int n = write(_phy.getDescriptor(conn->rpcSock), &retmsg, sizeof(struct sockaddr_storage)); - dwr(MSG_DEBUG, " handle_getsockname(): wrote %d bytes\n", n); + int n = write(_phy.getDescriptor(rpcsock), &retmsg, sizeof(struct sockaddr_storage)); } /* @@ -1207,52 +1033,49 @@ void NetconEthernetTap::handle_getsockname(PhySocket *sock, void **uptr, struct [-] EROFS - The socket inode would reside on a read-only file system. */ -void NetconEthernetTap::handle_bind(PhySocket *sock, void **uptr, struct bind_st *bind_rpc) +void NetconEthernetTap::handle_bind(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct bind_st *bind_rpc) { + struct sockaddr_in *connaddr; + connaddr = (struct sockaddr_in *) &bind_rpc->addr; + int conn_port = lwipstack->ntohs(connaddr->sin_port); + ip_addr_t conn_addr; + conn_addr.addr = *((u32_t *)_ips[0].rawIpData()); + TcpConnection *conn = getConnection(sock); + dwr(MSG_DEBUG," handle_bind(%d)\n", bind_rpc->sockfd); - struct sockaddr_in *connaddr; - connaddr = (struct sockaddr_in *) &bind_rpc->addr; - int conn_port = lwipstack->ntohs(connaddr->sin_port); - ip_addr_t conn_addr; - conn_addr.addr = *((u32_t *)_ips[0].rawIpData()); - TcpConnection *conn = getConnectionByTheirFD(sock, bind_rpc->sockfd); - - dwr(MSG_DEBUG, " handle_bind(%d)\n", bind_rpc->sockfd); - - if(conn) { - if(conn->pcb->state == CLOSED){ - int err = lwipstack->tcp_bind(conn->pcb, &conn_addr, conn_port); - - int ip = connaddr->sin_addr.s_addr; + if(conn) { + if(conn->pcb->state == CLOSED){ + int err = lwipstack->tcp_bind(conn->pcb, &conn_addr, conn_port); + int ip = connaddr->sin_addr.s_addr; unsigned char d[4]; d[0] = ip & 0xFF; d[1] = (ip >> 8) & 0xFF; d[2] = (ip >> 16) & 0xFF; d[3] = (ip >> 24) & 0xFF; - dwr(MSG_DEBUG, " handle_bind(): %d.%d.%d.%d : %d\n", d[0],d[1],d[2],d[3], conn_port); + dwr(MSG_DEBUG," handle_bind(): %d.%d.%d.%d : %d\n", d[0],d[1],d[2],d[3], conn_port); if(err != ERR_OK) { - dwr(MSG_ERROR, " handle_bind(): err = %d\n", err); + dwr(MSG_ERROR," handle_bind(): err = %d\n", err); if(err == ERR_USE) - send_return_value(conn, -1, EADDRINUSE); + send_return_value(rpcsock, -1, EADDRINUSE); if(err == ERR_MEM) - send_return_value(conn, -1, ENOMEM); + send_return_value(rpcsock, -1, ENOMEM); if(err == ERR_BUF) - send_return_value(conn, -1, ENOMEM); // FIXME: Closest match + send_return_value(rpcsock, -1, ENOMEM); } else { conn->addr = (struct sockaddr_storage *) &bind_rpc->addr; - send_return_value(conn, ERR_OK, ERR_OK); // Success + send_return_value(rpcsock, ERR_OK, ERR_OK); // Success } - } - else { - dwr(MSG_ERROR, " handle_bind(): PCB (%x) not in CLOSED state. Ignoring BIND request.\n", conn->pcb); - send_return_value(conn, -1, EINVAL); } - } - else { - dwr(MSG_ERROR, " handle_bind(): can't locate connection for PCB\n"); - send_return_value(conn, -1, EBADF); + else { + dwr(MSG_ERROR," handle_bind(): PCB (%x) not in CLOSED state. Ignoring BIND request.\n", conn->pcb); + send_return_value(rpcsock, -1, EINVAL); + } + } + else { + dwr(MSG_ERROR," handle_bind(): can't locate connection for PCB\n"); + send_return_value(rpcsock, -1, EBADF); } } @@ -1275,22 +1098,20 @@ void NetconEthernetTap::handle_bind(PhySocket *sock, void **uptr, struct bind_st [I] EOPNOTSUPP - The socket is not of a type that supports the listen() operation. */ -void NetconEthernetTap::handle_listen(PhySocket *sock, void **uptr, struct listen_st *listen_rpc) +void NetconEthernetTap::handle_listen(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct listen_st *listen_rpc) { - dwr(3, " handle_listen(their=%d):\n", listen_rpc->sockfd); - TcpConnection *conn = getConnectionByTheirFD(sock, listen_rpc->sockfd); + dwr(MSG_DEBUG," handle_listen(their=%d):\n", listen_rpc->sockfd); + TcpConnection *conn = getConnection(sock); if(!conn){ - dwr(MSG_ERROR, " handle_listen(): unable to locate connection object\n"); - send_return_value(conn, -1, EBADF); + dwr(MSG_ERROR," handle_listen(): unable to locate connection object\n"); + send_return_value(rpcsock, -1, EBADF); + return; + } + if(conn->pcb->state == LISTEN) { + dwr(MSG_ERROR," handle_listen(): PCB is already in listening state.\n"); + send_return_value(rpcsock, ERR_OK, ERR_OK); return; } - dwr(3, " handle_listen(our=%d -> their=%d)\n", _phy.getDescriptor(conn->dataSock), conn->perceived_fd); - - if(conn->pcb->state == LISTEN) { - dwr(MSG_ERROR, " handle_listen(): PCB is already in listening state.\n"); - send_return_value(conn, ERR_OK, ERR_OK); - return; - } struct tcp_pcb* listening_pcb; #ifdef TCP_LISTEN_BACKLOG @@ -1299,19 +1120,19 @@ void NetconEthernetTap::handle_listen(PhySocket *sock, void **uptr, struct liste listening_pcb = lwipstack->tcp_listen(conn->pcb); #endif - if(listening_pcb != NULL) { - conn->pcb = listening_pcb; - lwipstack->tcp_accept(listening_pcb, nc_accept); + if(listening_pcb != NULL) { + conn->pcb = listening_pcb; + lwipstack->tcp_accept(listening_pcb, nc_accept); lwipstack->tcp_arg(listening_pcb, new Larg(this, conn)); /* we need to wait for the client to send us the fd allocated on their end for this listening socket */ - fcntl(_phy.getDescriptor(conn->dataSock), F_SETFL, O_NONBLOCK); + fcntl(_phy.getDescriptor(conn->sock), F_SETFL, O_NONBLOCK); conn->listening = true; conn->pending = true; - send_return_value(conn, ERR_OK, ERR_OK); + send_return_value(rpcsock, ERR_OK, ERR_OK); return; - } - send_return_value(conn, -1, -1); + } + send_return_value(rpcsock, -1, -1); } /* @@ -1344,34 +1165,19 @@ void NetconEthernetTap::handle_listen(PhySocket *sock, void **uptr, struct liste */ TcpConnection * NetconEthernetTap::handle_socket(PhySocket *sock, void **uptr, struct socket_st* socket_rpc) { - int rpc_fd = _phy.getDescriptor(sock); struct tcp_pcb *newpcb = lwipstack->tcp_new(); - dwr(MSG_DEBUG, " handle_socket(): pcb=%x\n", newpcb); + dwr(MSG_DEBUG," handle_socket(): pcb=%x\n", newpcb); if(newpcb != NULL) { - ZT_PHY_SOCKFD_TYPE fds[2]; - if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) { - if(errno < 0) { - send_return_value(rpc_fd, -1, errno); - return NULL; - } - } - dwr(MSG_DEBUG, " handle_socket(): socketpair = {%d, %d}\n", fds[0], fds[1]); - TcpConnection *new_conn = new TcpConnection(); - new_conn->dataSock = _phy.wrapSocket(fds[0], new_conn); - *uptr = new_conn; - new_conn->rpcSock = sock; - new_conn->pcb = newpcb; - new_conn->their_fd = fds[1]; - tcp_connections.push_back(new_conn); - sock_fd_write(_phy.getDescriptor(sock), fds[1]); - close(fds[1]); // close other end of socketpair - // Once the client tells us what its fd is on the other end, we can then complete the mapping - new_conn->pending = true; + TcpConnection *new_conn = new TcpConnection(); + *uptr = new_conn; + new_conn->sock = sock; + new_conn->pcb = newpcb; + tcp_connections.push_back(new_conn); + new_conn->pending = true; return new_conn; } - sock_fd_write(rpc_fd, -1); // Send a bad fd, to signal error - dwr(MSG_ERROR, " handle_socket(): Memory not available for new PCB\n"); - send_return_value(rpc_fd, -1, ENOMEM); + dwr(MSG_ERROR," handle_socket(): Memory not available for new PCB\n"); + send_return_value(_phy.getDescriptor(sock), -1, ENOMEM); return NULL; } @@ -1407,13 +1213,11 @@ TcpConnection * NetconEthernetTap::handle_socket(PhySocket *sock, void **uptr, s [X] ETIMEDOUT - Timeout while attempting connection. [X] EINVAL - Invalid argument, SVr4, generally makes sense to set this - * */ -void NetconEthernetTap::handle_connect(PhySocket *sock, void **uptr, struct connect_st* connect_rpc) +void NetconEthernetTap::handle_connect(PhySocket *sock, PhySocket *rpcsock, TcpConnection *conn, struct connect_st* connect_rpc) { - dwr(MSG_DEBUG, " handle_connect()\n"); - TcpConnection *conn = (TcpConnection*)*uptr; + dwr(MSG_DEBUG," handle_connect()\n"); struct sockaddr_in *connaddr; connaddr = (struct sockaddr_in *) &connect_rpc->__addr; int conn_port = lwipstack->ntohs(connaddr->sin_port); @@ -1431,27 +1235,26 @@ void NetconEthernetTap::handle_connect(PhySocket *sock, void **uptr, struct conn if((err = lwipstack->tcp_connect(conn->pcb,&conn_addr,conn_port, nc_connected)) < 0) { if(err == ERR_ISCONN) { - send_return_value(conn, -1, EISCONN); // Already in connected state + send_return_value(rpcsock, -1, EISCONN); // Already in connected state return; } if(err == ERR_USE) { - send_return_value(conn, -1, EADDRINUSE); // Already in use + send_return_value(rpcsock, -1, EADDRINUSE); // Already in use return; } if(err == ERR_VAL) { - send_return_value(conn, -1, EINVAL); // Invalid ipaddress parameter + send_return_value(rpcsock, -1, EINVAL); // Invalid ipaddress parameter return; } if(err == ERR_RTE) { - send_return_value(conn, -1, ENETUNREACH); // No route to host + send_return_value(rpcsock, -1, ENETUNREACH); // No route to host return; } if(err == ERR_BUF) { - send_return_value(conn, -1, EAGAIN); // No more ports available + send_return_value(rpcsock, -1, EAGAIN); // No more ports available return; } - if(err == ERR_MEM) - { + if(err == ERR_MEM) { /* Can occur for the following reasons: tcp_enqueue_flags() 1) tcp_enqueue_flags is always called with either SYN or FIN in flags. @@ -1463,7 +1266,7 @@ void NetconEthernetTap::handle_connect(PhySocket *sock, void **uptr, struct conn 3) Cannot allocate new TCP segment */ - send_return_value(conn, -1, EAGAIN); // FIXME: Doesn't describe the problem well, but closest match + send_return_value(rpcsock, -1, EAGAIN); // FIXME: Doesn't describe the problem well, but closest match return; } @@ -1474,79 +1277,79 @@ void NetconEthernetTap::handle_connect(PhySocket *sock, void **uptr, struct conn // that's it! // - Most instances of a retval for a connect() should happen // in the nc_connect() and nc_err() callbacks! - dwr(MSG_ERROR, " handle_connect(): unable to connect\n"); - send_return_value(conn, -1, EAGAIN); + dwr(MSG_ERROR," handle_connect(): unable to connect\n"); + send_return_value(rpcsock, -1, EAGAIN); } // Everything seems to be ok, but we don't have enough info to retval conn->pending=true; conn->listening=true; - send_return_value(conn, -1); + conn->rpcsock=rpcsock; // used for return value from lwip CB } else { - dwr(MSG_ERROR, " handle_connect(): could not locate PCB based on their fd\n"); - send_return_value(conn, -1, EBADF); + dwr(MSG_ERROR," handle_connect(): could not locate PCB based on their fd\n"); + send_return_value(rpcsock, -1, EBADF); } } + void NetconEthernetTap::handle_write(TcpConnection *conn) { - float max = (float)TCP_SND_BUF; int r; - if(!conn) { - dwr(MSG_ERROR, " handle_write(): could not locate connection for this fd\n"); + dwr(MSG_ERROR," handle_write(): could not locate connection for this fd\n"); return; } - if(conn->idx < max) { - if(!conn->pcb) { - dwr(MSG_ERROR, " handle_write(): conn->pcb == NULL. Failed to write.\n"); - return; - } - int sndbuf = conn->pcb->snd_buf; // How much we are currently allowed to write to the connection - /* PCB send buffer is full,turn off readability notifications for the - corresponding PhySocket until nc_sent() is called and confirms that there is - now space on the buffer */ - if(sndbuf == 0) { - _phy.setNotifyReadable(conn->dataSock, false); - return; - } - if(!conn->listening) + dwr(MSG_DEBUG,"conn->idx = %d, TCP_SND_BUF = %d\n", conn->idx, TCP_SND_BUF); + if(!conn->pcb) { + dwr(MSG_ERROR," handle_write(): conn->pcb == NULL. Failed to write.\n"); + return; + } + int sndbuf = conn->pcb->snd_buf; // How much we are currently allowed to write to the connection + /* PCB send buffer is full,turn off readability notifications for the + corresponding PhySocket until nc_sent() is called and confirms that there is + now space on the buffer */ + if(sndbuf == 0) { + _phy.setNotifyReadable(conn->sock, false); + return; + } + if(!conn->listening) + lwipstack->_tcp_output(conn->pcb); + if(conn->sock && !conn->listening) { + + r = conn->idx < sndbuf ? conn->idx : sndbuf; + dwr(MSG_DEBUG,"handle_write(): r = %d\n", r); + /* Writes data pulled from the client's socket buffer to LWIP. This merely sends the + * data to LWIP to be enqueued and eventually sent to the network. */ + if(r > 0) { + int sz; + // NOTE: this assumes that lwipstack->_lock is locked, either + // because we are in a callback or have locked it manually. + int err = lwipstack->_tcp_write(conn->pcb, &conn->buf, r, TCP_WRITE_FLAG_COPY); lwipstack->_tcp_output(conn->pcb); - - if(conn->dataSock && !conn->listening) { - int read_fd = _phy.getDescriptor(conn->dataSock); - if((r = recvfrom(read_fd, (&conn->buf)+conn->idx, sndbuf, MSG_DONTWAIT, NULL, NULL)) > 0) { - conn->idx += r; - /* Writes data pulled from the client's socket buffer to LWIP. This merely sends the - * data to LWIP to be enqueued and eventually sent to the network. */ - if(r > 0) { - int sz; - // NOTE: this assumes that lwipstack->_lock is locked, either - // because we are in a callback or have locked it manually. - int err = lwipstack->_tcp_write(conn->pcb, &conn->buf, r, TCP_WRITE_FLAG_COPY); - lwipstack->_tcp_output(conn->pcb); - if(err != ERR_OK) { - dwr(MSG_ERROR, " handle_write(): error while writing to PCB, (err = %d)\n", err); - return; - } - else { - sz = (conn->idx)-r; - if(sz) { - memmove(&conn->buf, (conn->buf+r), sz); - } - conn->idx -= r; - conn->written+=r; - return; - } - } - else { - dwr(MSG_INFO, " handle_write(): LWIP stack full\n"); - return; + if(err != ERR_OK) { + dwr(MSG_ERROR," handle_write(): error while writing to PCB, (err = %d)\n", err); + if(err == -1) + dwr(MSG_DEBUG," handle_write(): possibly out of memory\n"); + return; + } + else { + sz = (conn->idx)-r; + if(sz) { + memmove(&conn->buf, (conn->buf+r), sz); } + conn->idx -= r; + conn->written+=r; + return; } } + else { + dwr(MSG_INFO," handle_write(): LWIP stack full\n"); + return; + } } } + + } // namespace ZeroTier |
