diff options
-rw-r--r-- | netcon/NetconEthernetTap.cpp | 56 | ||||
-rw-r--r-- | netcon/NetconEthernetTap.hpp | 3 |
2 files changed, 23 insertions, 36 deletions
diff --git a/netcon/NetconEthernetTap.cpp b/netcon/NetconEthernetTap.cpp index ffbbd626..c3132dd8 100644 --- a/netcon/NetconEthernetTap.cpp +++ b/netcon/NetconEthernetTap.cpp @@ -292,9 +292,6 @@ void NetconEthernetTap::threadMain() { uint64_t prev_tcp_time = 0, prev_status_time = 0, prev_etharp_time = 0; - Mutex::Lock _l(_tcpconns_m); - _tcpconns_m.unlock(); - // Main timer loop while (_run) { uint64_t now = OSUtils::now(); @@ -303,23 +300,17 @@ void NetconEthernetTap::threadMain() uint64_t since_status = now - prev_status_time; uint64_t tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL; uint64_t etharp_remaining = ARP_TMR_INTERVAL; - uint64_t status_remaining = STATUS_TMR_INTERVAL; // Connection prunning if (since_status >= STATUS_TMR_INTERVAL) { prev_status_time = now; - status_remaining = STATUS_TMR_INTERVAL - since_status; - _tcpconns_m.lock(); for(size_t i=0;i<_TcpConnections.size();++i) { if(!_TcpConnections[i]->sock) - continue; // Skip, this is a pending connection + continue; int fd = _phy.getDescriptor(_TcpConnections[i]->sock); dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", _TcpConnections.size(), jobmap.size()); - - dwr(MSG_DEBUG," tap_thread(): sock=%x, pcb->state=%d\n", _TcpConnections[i]->sock, _TcpConnections[i]->pcb->state); - fcntl(fd, F_SETFL, O_NONBLOCK); unsigned char tmpbuf[BUF_SZ]; @@ -332,17 +323,15 @@ void NetconEthernetTap::threadMain() closeConnection(_TcpConnections[i]->sock); } else if (n > 0) { dwr(MSG_DEBUG," tap_thread(): data read during connection check (%d bytes)\n", n); - phyOnUnixData(_TcpConnections[i]->sock,_phy.getuptr(_TcpConnections[i]->sock),&tmpbuf,BUF_SZ); - } + phyOnUnixData(_TcpConnections[i]->sock,_phy.getuptr(_TcpConnections[i]->sock),&tmpbuf,n); + } } - _tcpconns_m.unlock(); } // Main TCP/ETHARP timer section if (since_tcp >= ZT_LWIP_TCP_TIMER_INTERVAL) { prev_tcp_time = now; lwipstack->tcp_tmr(); // Makeshift poll - _tcpconns_m.lock(); for(size_t i=0;i<_TcpConnections.size();++i) { if(_TcpConnections[i]->txsz > 0){ lwipstack->_lock.lock(); @@ -350,7 +339,6 @@ void NetconEthernetTap::threadMain() lwipstack->_lock.unlock(); } } - _tcpconns_m.unlock(); } else { tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL - since_tcp; } @@ -386,8 +374,6 @@ TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock) void NetconEthernetTap::closeConnection(PhySocket *sock) { // Here we assume _tcpconns_m is already locked by caller - - dwr(MSG_DEBUG,"closeConnection(%x)\n",sock); if(!sock) { dwr(MSG_DEBUG," closeConnection(): invalid PhySocket\n"); return; @@ -395,10 +381,10 @@ void NetconEthernetTap::closeConnection(PhySocket *sock) TcpConnection *conn = getConnection(sock); if(!conn) return; - if(conn->pcb) { - dwr(MSG_DEBUG," closeConnection(): PCB->state = %d\n", conn->pcb->state); + if(conn->pcb && conn->pcb->state != CLOSED) { + dwr(MSG_DEBUG," closeConnection(%x): PCB->state = %d\n", sock, conn->pcb->state); if(conn->pcb->state == SYN_SENT) { - dwr(MSG_DEBUG," closeConnection(): invalid PCB state for this operation. ignoring.\n"); + dwr(MSG_DEBUG," closeConnection(%x): invalid PCB state for this operation. ignoring.\n", sock); return; } if(lwipstack->_tcp_close(conn->pcb) == ERR_OK) { @@ -410,7 +396,7 @@ void NetconEthernetTap::closeConnection(PhySocket *sock) lwipstack->_tcp_poll(conn->pcb, NULL, 1); } else { - dwr(MSG_ERROR," closeConnection(): error while calling tcp_close()\n"); + dwr(MSG_ERROR," closeConnection(%x): error while calling tcp_close()\n", sock); } } for(size_t i=0;i<_TcpConnections.size();++i) { @@ -446,8 +432,8 @@ void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr) memcpy(conn->rxbuf, conn->rxbuf+n, len-n); conn->rxsz -= n; float max = (float)DEFAULT_BUF_SZ; - dwr(MSG_TRANSFER," <--- RX :: { TX: %.3f%% | RX: %.3f%% } :: %d bytes\n", - (float)conn->txsz / max, (float)conn->rxsz / max, n); + dwr(MSG_TRANSFER," <--- RX :: {TX: %.3f%%, RX: %.3f%%, sock=%x} :: %d bytes\n", + (float)conn->txsz / max, (float)conn->rxsz / max, sock, n); lwipstack->_tcp_recved(conn->pcb, n); if(conn->rxsz == 0){ _phy.setNotifyWritable(conn->sock, false); // Nothing more to be notified about @@ -567,7 +553,6 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns rpcSock = sockdata.first; buf = (unsigned char*)sockdata.second; } - // Process RPC if we have a corresponding jobmap entry if(foundJob) { unloadRPC(buf, pid, tid, rpcCount, timestamp, CANARY, cmd, payload); @@ -576,25 +561,21 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns switch(cmd) { case RPC_BIND: - dwr(MSG_DEBUG," <%x> RPC_BIND\n", sock); struct bind_st bind_rpc; memcpy(&bind_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct bind_st)); handleBind(sock, rpcSock, uptr, &bind_rpc); break; case RPC_LISTEN: - dwr(MSG_DEBUG," <%x> RPC_LISTEN\n", sock); struct listen_st listen_rpc; memcpy(&listen_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct listen_st)); handleListen(sock, rpcSock, uptr, &listen_rpc); break; case RPC_GETSOCKNAME: - dwr(MSG_DEBUG," <%x> RPC_GETSOCKNAME\n", sock); struct getsockname_st getsockname_rpc; memcpy(&getsockname_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct getsockname_st)); handleGetsockname(sock, rpcSock, uptr, &getsockname_rpc); break; case RPC_CONNECT: - dwr(MSG_DEBUG," <%x> RPC_CONNECT\n", sock); struct connect_st connect_rpc; memcpy(&connect_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct connect_st)); handleConnect(sock, rpcSock, conn, &connect_rpc); @@ -731,9 +712,11 @@ err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *PCB, u16_t len) { Larg *l = (Larg*)arg; Mutex::Lock _l(l->tap->_tcpconns_m); - if(l && l->conn && len) { - float max = (float)DEFAULT_BUF_SZ; - if(l->conn->txsz < max / 2) { + if(l->conn->probation && l->conn->txsz == 0){ + l->conn->probation = false; // TX buffer now empty, removing from probation + } + if(l && l->conn && len && !l->conn->probation) { + if(l->conn->txsz < (float)DEFAULT_BUF_SOFTMAX) { l->tap->_phy.setNotifyReadable(l->conn->sock, true); l->tap->_phy.whack(); } @@ -1033,8 +1016,11 @@ void NetconEthernetTap::handleWrite(TcpConnection *conn) /* PCB send buffer is full, turn off readability notifications for the corresponding PhySocket until nc_sent() is called and confirms that there is now space on the buffer */ - dwr(MSG_DEBUG," handleWrite(): sndbuf == 0, LWIP stack is full\n"); - _phy.setNotifyReadable(conn->sock, false); + if(!conn->probation) { + dwr(MSG_DEBUG," handleWrite(): sndbuf == 0, LWIP stack is full\n"); + _phy.setNotifyReadable(conn->sock, false); + conn->probation = true; + } return; } if(conn->txsz <= 0) @@ -1061,8 +1047,8 @@ void NetconEthernetTap::handleWrite(TcpConnection *conn) conn->txsz -= r; float max = (float)DEFAULT_BUF_SZ; - dwr(MSG_TRANSFER," TX ---> :: { TX: %.3f%% | RX: %.3f%% } :: %d bytes\n", - (float)conn->txsz / max, (float)conn->rxsz / max, r); + dwr(MSG_TRANSFER," TX ---> :: {TX: %.3f%%, RX: %.3f%%, sock=%x} :: %d bytes\n", + (float)conn->txsz / max, (float)conn->rxsz / max, conn->sock, r); return; } } diff --git a/netcon/NetconEthernetTap.hpp b/netcon/NetconEthernetTap.hpp index 4689bf03..f15f8671 100644 --- a/netcon/NetconEthernetTap.hpp +++ b/netcon/NetconEthernetTap.hpp @@ -60,6 +60,7 @@ struct accept_st; #define ZT_LWIP_TCP_TIMER_INTERVAL 5 #define STATUS_TMR_INTERVAL 500 // How often we check connection statuses (in ms) #define DEFAULT_BUF_SZ 1024 * 1024 * 2 +#define DEFAULT_BUF_SOFTMAX DEFAULT_BUF_SZ / 2 namespace ZeroTier { @@ -71,7 +72,7 @@ class LWIPStack; */ struct TcpConnection { - bool listening, closing; + bool listening, probation; int pid, txsz, rxsz; PhySocket *rpcSock, *sock; struct tcp_pcb *pcb; |