diff options
-rw-r--r-- | netcon/NetconEthernetTap.cpp | 140 | ||||
-rw-r--r-- | netcon/NetconEthernetTap.hpp | 16 |
2 files changed, 81 insertions, 75 deletions
diff --git a/netcon/NetconEthernetTap.cpp b/netcon/NetconEthernetTap.cpp index 9c88737f..ffbbd626 100644 --- a/netcon/NetconEthernetTap.cpp +++ b/netcon/NetconEthernetTap.cpp @@ -292,6 +292,9 @@ void NetconEthernetTap::threadMain() { uint64_t prev_tcp_time = 0, prev_status_time = 0, prev_etharp_time = 0; + Mutex::Lock _l(_tcpconns_m); + _tcpconns_m.unlock(); + // Main timer loop while (_run) { uint64_t now = OSUtils::now(); @@ -307,13 +310,16 @@ void NetconEthernetTap::threadMain() prev_status_time = now; status_remaining = STATUS_TMR_INTERVAL - since_status; - + _tcpconns_m.lock(); for(size_t i=0;i<_TcpConnections.size();++i) { if(!_TcpConnections[i]->sock) continue; // Skip, this is a pending connection int fd = _phy.getDescriptor(_TcpConnections[i]->sock); dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", _TcpConnections.size(), jobmap.size()); + + dwr(MSG_DEBUG," tap_thread(): sock=%x, pcb->state=%d\n", _TcpConnections[i]->sock, _TcpConnections[i]->pcb->state); + fcntl(fd, F_SETFL, O_NONBLOCK); unsigned char tmpbuf[BUF_SZ]; @@ -329,12 +335,14 @@ void NetconEthernetTap::threadMain() phyOnUnixData(_TcpConnections[i]->sock,_phy.getuptr(_TcpConnections[i]->sock),&tmpbuf,BUF_SZ); } } + _tcpconns_m.unlock(); } // Main TCP/ETHARP timer section if (since_tcp >= ZT_LWIP_TCP_TIMER_INTERVAL) { prev_tcp_time = now; lwipstack->tcp_tmr(); // Makeshift poll + _tcpconns_m.lock(); for(size_t i=0;i<_TcpConnections.size();++i) { if(_TcpConnections[i]->txsz > 0){ lwipstack->_lock.lock(); @@ -342,6 +350,7 @@ void NetconEthernetTap::threadMain() lwipstack->_lock.unlock(); } } + _tcpconns_m.unlock(); } else { tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL - since_tcp; } @@ -367,7 +376,6 @@ void NetconEthernetTap::phyOnTcpWritable(PhySocket *sock,void **uptr) {} TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock) { - Mutex::Lock _l(_tcpconns_m); for(size_t i=0;i<_TcpConnections.size();++i) { if(_TcpConnections[i]->sock == sock) return _TcpConnections[i]; @@ -375,27 +383,10 @@ TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock) return NULL; } -TcpConnection *NetconEthernetTap::addConnection(TcpConnection *conn) -{ - Mutex::Lock _l(_tcpconns_m); - _TcpConnections.push_back(conn); - return conn; -} - -void NetconEthernetTap::removeConnection(TcpConnection *conn) -{ - Mutex::Lock _l(_tcpconns_m); - for(size_t i=0;i<_TcpConnections.size();++i) { - if(_TcpConnections[i] == conn){ - _TcpConnections.erase(_TcpConnections.begin() + i); - delete conn; - return; - } - } -} - void NetconEthernetTap::closeConnection(PhySocket *sock) { + // Here we assume _tcpconns_m is already locked by caller + dwr(MSG_DEBUG,"closeConnection(%x)\n",sock); if(!sock) { dwr(MSG_DEBUG," closeConnection(): invalid PhySocket\n"); @@ -405,16 +396,30 @@ void NetconEthernetTap::closeConnection(PhySocket *sock) if(!conn) return; if(conn->pcb) { - if(conn->pcb->state == SYN_SENT || conn->pcb->state == CLOSE_WAIT) { + dwr(MSG_DEBUG," closeConnection(): PCB->state = %d\n", conn->pcb->state); + if(conn->pcb->state == SYN_SENT) { dwr(MSG_DEBUG," closeConnection(): invalid PCB state for this operation. ignoring.\n"); return; } - dwr(MSG_DEBUG," closeConnection(): PCB->state = %d\n", conn->pcb->state); - if(lwipstack->_tcp_close(conn->pcb) != ERR_OK) { + if(lwipstack->_tcp_close(conn->pcb) == ERR_OK) { + // Unregister callbacks for this PCB + lwipstack->_tcp_arg(conn->pcb, NULL); + lwipstack->_tcp_recv(conn->pcb, NULL); + lwipstack->_tcp_err(conn->pcb, NULL); + lwipstack->_tcp_sent(conn->pcb, NULL); + lwipstack->_tcp_poll(conn->pcb, NULL, 1); + } + else { dwr(MSG_ERROR," closeConnection(): error while calling tcp_close()\n"); } } - removeConnection(conn); + for(size_t i=0;i<_TcpConnections.size();++i) { + if(_TcpConnections[i] == conn){ + _TcpConnections.erase(_TcpConnections.begin() + i); + delete conn; + break; + } + } if(!sock) return; close(_phy.getDescriptor(sock)); @@ -422,13 +427,15 @@ void NetconEthernetTap::closeConnection(PhySocket *sock) } void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) { + Mutex::Lock _l(_tcpconns_m); closeConnection(sock); } void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr) { + Mutex::Lock _l(_tcpconns_m); + Mutex::Lock _l2(_rx_buf_m); TcpConnection *conn = getConnection(sock); - Mutex::Lock _l(_rx_buf_m); int len = conn->rxsz; int n = _phy.streamSend(conn->sock, conn->rxbuf, len); if(n > 0) { @@ -596,6 +603,7 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns default: break; } + Mutex::Lock _l(_tcpconns_m); closeConnection(sockdata.first); // close RPC after sending retval, no longer needed jobmap.erase(CANARY_num); return; @@ -636,6 +644,8 @@ void NetconEthernetTap::unloadRPC(void *data, pid_t &pid, pid_t &tid, err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newPCB, err_t err) { Larg *l = (Larg*)arg; + Mutex::Lock _l(l->tap->_tcpconns_m); + TcpConnection *conn = l->conn; NetconEthernetTap *tap = l->tap; @@ -655,15 +665,12 @@ err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newPCB, err_t err) } // create and populate new TcpConnection TcpConnection *newTcpConn = new TcpConnection(); - tap->addConnection(newTcpConn); + l->tap->_TcpConnections.push_back(newTcpConn); newTcpConn->pcb = newPCB; newTcpConn->sock = tap->_phy.wrapSocket(fds[0], newTcpConn); if(sock_fd_write(fd, fds[1]) < 0) return -1; - else { - //close(fds[1]); // close other end of socketpair - } tap->lwipstack->_tcp_arg(newPCB, new Larg(tap, newTcpConn)); tap->lwipstack->_tcp_recv(newPCB, nc_recved); tap->lwipstack->_tcp_err(newPCB, nc_err); @@ -686,21 +693,20 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *PCB, struct pbuf * int tot = 0; struct pbuf* q = p; + Mutex::Lock _l(l->tap->_tcpconns_m); + if(!l->conn) { dwr(MSG_ERROR," nc_recved(): no connection\n"); return ERR_OK; } if(p == NULL) { - if(l->conn && !l->conn->listening) { - dwr(MSG_INFO," nc_recved(): closing connection\n"); - if(l->tap->lwipstack->_tcp_close(l->conn->pcb) != ERR_OK) - dwr(MSG_ERROR," nc_recved(): error while calling tcp_close()\n"); + if(l->conn->pcb->state == CLOSE_WAIT){ l->tap->closeConnection(l->conn->sock); return ERR_ABRT; } return err; } - Mutex::Lock _l(l->tap->_rx_buf_m); + Mutex::Lock _l2(l->tap->_rx_buf_m); // Cycle through pbufs and write them to the RX buffer // The RX buffer will be emptied via phyOnUnixWritable() while(p != NULL) { @@ -721,10 +727,39 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *PCB, struct pbuf * return ERR_OK; } +err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *PCB, u16_t len) +{ + Larg *l = (Larg*)arg; + Mutex::Lock _l(l->tap->_tcpconns_m); + if(l && l->conn && len) { + float max = (float)DEFAULT_BUF_SZ; + if(l->conn->txsz < max / 2) { + l->tap->_phy.setNotifyReadable(l->conn->sock, true); + l->tap->_phy.whack(); + } + } + return ERR_OK; +} + +err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *PCB, err_t err) +{ + Larg *l = (Larg*)arg; + if(l && l->conn) + l->tap->sendReturnValue(l->tap->_phy.getDescriptor(l->conn->rpcSock), ERR_OK); + return ERR_OK; +} + +err_t NetconEthernetTap::nc_poll(void* arg, struct tcp_pcb *PCB) +{ + return ERR_OK; +} + void NetconEthernetTap::nc_err(void *arg, err_t err) { dwr(MSG_DEBUG,"nc_err() = %d\n", err); Larg *l = (Larg*)arg; + Mutex::Lock _l(l->tap->_tcpconns_m); + if(!l->conn) dwr(MSG_ERROR,"nc_err(): connection is NULL!\n"); int fd = l->tap->_phy.getDescriptor(l->conn->sock); @@ -796,37 +831,13 @@ void NetconEthernetTap::nc_err(void *arg, err_t err) l->tap->closeConnection(l->conn); } -err_t NetconEthernetTap::nc_poll(void* arg, struct tcp_pcb *PCB) -{ - return ERR_OK; -} - -err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *PCB, u16_t len) -{ - Larg *l = (Larg*)arg; - if(l->conn && len) { - float max = (float)DEFAULT_BUF_SZ; - if(l->conn->txsz < max / 2) { - l->tap->_phy.setNotifyReadable(l->conn->sock, true); - l->tap->_phy.whack(); - } - } - return ERR_OK; -} - -err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *PCB, err_t err) -{ - Larg *l = (Larg*)arg; - l->tap->sendReturnValue(l->tap->_phy.getDescriptor(l->conn->rpcSock), ERR_OK); - return ERR_OK; -} - /*------------------------------------------------------------------------------ ----------------------------- RPC Handler functions ---------------------------- ------------------------------------------------------------------------------*/ void NetconEthernetTap::handleGetsockname(PhySocket *sock, PhySocket *rpcSock, void **uptr, struct getsockname_st *getsockname_rpc) { + Mutex::Lock _l(_tcpconns_m); TcpConnection *conn = getConnection(sock); char retmsg[sizeof(struct sockaddr_storage)]; memset(&retmsg, 0, sizeof(retmsg)); @@ -837,6 +848,7 @@ void NetconEthernetTap::handleGetsockname(PhySocket *sock, PhySocket *rpcSock, v void NetconEthernetTap::handleBind(PhySocket *sock, PhySocket *rpcSock, void **uptr, struct bind_st *bind_rpc) { + Mutex::Lock _l(_tcpconns_m); struct sockaddr_in *rawAddr = (struct sockaddr_in *) &bind_rpc->addr; int port = lwipstack->ntohs(rawAddr->sin_port); ip_addr_t connAddr; @@ -878,6 +890,7 @@ void NetconEthernetTap::handleBind(PhySocket *sock, PhySocket *rpcSock, void **u void NetconEthernetTap::handleListen(PhySocket *sock, PhySocket *rpcSock, void **uptr, struct listen_st *listen_rpc) { + Mutex::Lock _l(_tcpconns_m); TcpConnection *conn = getConnection(sock); if(!conn){ dwr(MSG_ERROR," handleListen(): unable to locate TcpConnection.\n"); @@ -913,13 +926,15 @@ void NetconEthernetTap::handleListen(PhySocket *sock, PhySocket *rpcSock, void * TcpConnection * NetconEthernetTap::handleSocket(PhySocket *sock, void **uptr, struct socket_st* socket_rpc) { + Mutex::Lock _l(_tcpconns_m); struct tcp_pcb *newPCB = lwipstack->tcp_new(); if(newPCB != NULL) { TcpConnection *newConn = new TcpConnection(); *uptr = newConn; newConn->sock = sock; newConn->pcb = newPCB; - return addConnection(newConn);; + _TcpConnections.push_back(newConn); + return newConn; } dwr(MSG_ERROR," handleSocket(): Memory not available for new PCB\n"); sendReturnValue(_phy.getDescriptor(sock), -1, ENOMEM); @@ -928,9 +943,10 @@ TcpConnection * NetconEthernetTap::handleSocket(PhySocket *sock, void **uptr, st void NetconEthernetTap::handleConnect(PhySocket *sock, PhySocket *rpcSock, TcpConnection *conn, struct connect_st* connect_rpc) { + Mutex::Lock _l(_tcpconns_m); struct sockaddr_in *rawAddr = (struct sockaddr_in *) &connect_rpc->__addr; int port = lwipstack->ntohs(rawAddr->sin_port); - ip_addr_t connAddr = convert_ip((struct sockaddr_in *)&connect_rpc->__addr); + ip_addr_t connAddr = convert_ip(rawAddr); if(conn != NULL) { lwipstack->tcp_sent(conn->pcb, nc_sent); diff --git a/netcon/NetconEthernetTap.hpp b/netcon/NetconEthernetTap.hpp index 673863b3..4689bf03 100644 --- a/netcon/NetconEthernetTap.hpp +++ b/netcon/NetconEthernetTap.hpp @@ -56,9 +56,9 @@ struct connect_st; struct getsockname_st; struct accept_st; -#define APPLICATION_POLL_FREQ 50 +#define APPLICATION_POLL_FREQ 2 #define ZT_LWIP_TCP_TIMER_INTERVAL 5 -#define STATUS_TMR_INTERVAL 1000 // How often we check connection statuses (in ms) +#define STATUS_TMR_INTERVAL 500 // How often we check connection statuses (in ms) #define DEFAULT_BUF_SZ 1024 * 1024 * 2 namespace ZeroTier { @@ -71,7 +71,7 @@ class LWIPStack; */ struct TcpConnection { - bool listening; + bool listening, closing; int pid, txsz, rxsz; PhySocket *rpcSock, *sock; struct tcp_pcb *pcb; @@ -410,16 +410,6 @@ private: * Returns a pointer to a TcpConnection associated with a given PhySocket */ TcpConnection *getConnection(PhySocket *sock); - - /* - * Safely adds a new TcpConnection to _TcpConnections - */ - TcpConnection *addConnection(TcpConnection *conn); - - /* - * Safely removes a TcpConnection from _TcpConnections - */ - void removeConnection(TcpConnection *conn); /* * Closes a TcpConnection, associated LWIP PCB strcuture, |