diff options
author | Joseph Henry <josephjah@gmail.com> | 2016-01-14 18:59:08 -0800 |
---|---|---|
committer | Joseph Henry <josephjah@gmail.com> | 2016-01-14 18:59:08 -0800 |
commit | 7cb08630d036a43a236d46b4107a3cd42a57cc7e (patch) | |
tree | 9434eeaba4a296504086a7881f359cdf42bd26f5 /netcon | |
parent | 45f0737a002d4888a613119c92f5446db71e46cf (diff) | |
download | infinitytier-7cb08630d036a43a236d46b4107a3cd42a57cc7e.tar.gz infinitytier-7cb08630d036a43a236d46b4107a3cd42a57cc7e.zip |
Fixed RX race condition
Diffstat (limited to 'netcon')
-rw-r--r-- | netcon/NetconEthernetTap.cpp | 90 | ||||
-rw-r--r-- | netcon/NetconEthernetTap.hpp | 2 | ||||
-rw-r--r-- | netcon/common.inc.c | 2 |
3 files changed, 49 insertions, 45 deletions
diff --git a/netcon/NetconEthernetTap.cpp b/netcon/NetconEthernetTap.cpp index f520d754..65fc10b1 100644 --- a/netcon/NetconEthernetTap.cpp +++ b/netcon/NetconEthernetTap.cpp @@ -56,8 +56,8 @@ #define APPLICATION_POLL_FREQ 2 #define ZT_LWIP_TCP_TIMER_INTERVAL 5 -#define STATUS_TMR_INTERVAL 250 // How often we check connection statuses (in ms) -#define DEFAULT_READ_BUFFER_SIZE 1024 * 1024 * 2 +#define STATUS_TMR_INTERVAL 500 // How often we check connection statuses (in ms) +#define DEFAULT_BUFFER_SIZE 1024 * 256 namespace ZeroTier { @@ -116,13 +116,13 @@ class TcpConnection public: bool pending, listening; - int pid, txidx, rxidx; + int pid, txsz, rxsz; PhySocket *rpcsock; PhySocket *sock; struct tcp_pcb *pcb; struct sockaddr_storage *addr; - unsigned char txbuf[DEFAULT_READ_BUFFER_SIZE]; - unsigned char rxbuf[DEFAULT_READ_BUFFER_SIZE]; + unsigned char txbuf[DEFAULT_BUFFER_SIZE]; + unsigned char rxbuf[DEFAULT_BUFFER_SIZE]; }; /* @@ -348,10 +348,8 @@ void NetconEthernetTap::threadMain() prev_status_time = now; status_remaining = STATUS_TMR_INTERVAL - since_status; - - dwr(MSG_DEBUG_EXTRA," tap_thread(): tcp\\jobs = {%d, %d}\n", tcp_connections.size(), jobmap.size()); + dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", tcp_connections.size(), jobmap.size()); for(size_t i=0; i<tcp_connections.size(); i++) { - // No TCP connections are associated, this is a candidate for removal if(!tcp_connections[i]->sock) continue; // Skip, this is a pending connection @@ -360,7 +358,6 @@ void NetconEthernetTap::threadMain() fcntl(fd, F_SETFL, O_NONBLOCK); unsigned char tmpbuf[BUF_SZ]; int n = read(fd,&tmpbuf,BUF_SZ); - dwr(MSG_DEBUG_EXTRA," tap_thread(): <%x> conn->txidx = %d\n", tcp_connections[i]->sock, tcp_connections[i]->txidx); if(tcp_connections[i]->pcb->state == SYN_SENT) { dwr(MSG_DEBUG_EXTRA," tap_thread(): <%x> state = SYN_SENT, candidate for removal\n", tcp_connections[i]->sock); } @@ -382,7 +379,7 @@ void NetconEthernetTap::threadMain() // Makeshift poll for(size_t i=0; i<tcp_connections.size(); i++) { - if(tcp_connections[i]->txidx > 0){ + if(tcp_connections[i]->txsz > 0){ lwipstack->_lock.lock(); handle_write(tcp_connections[i]); lwipstack->_lock.unlock(); @@ -473,7 +470,6 @@ void NetconEthernetTap::closeConnection(PhySocket *sock) * Signals us to close the TcpConnection associated with this PhySocket */ void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) { - dwr(MSG_DEBUG,"\nphyOnUnixClose(): close connection = %x\n", sock); closeConnection(sock); } @@ -511,24 +507,30 @@ void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid, */ void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr) { + Mutex::Lock _l(_rx_buf_m); TcpConnection *conn = getConnection(sock); - int len = conn->rxidx; + float max = (float)DEFAULT_BUFFER_SIZE; + fprintf(stderr, " ---------------- { TX: %.3f | RX: %.3f } \n", (float)conn->txsz / max, (float)conn->rxsz / max); + + int len = conn->rxsz; int n = _phy.streamSend(conn->sock, conn->rxbuf, len); if(n > 0) { if(n < len) { - dwr(MSG_INFO,"\n phyOnUnixWritable(): unable to write entire \"block\" to stream\n"); + dwr(MSG_ERROR,"\n phyOnUnixWritable(): unable to write entire \"block\" to stream\n"); } - memcpy(conn->rxbuf, conn->rxbuf+n, conn->rxidx-n); - conn->rxidx -= n; + memcpy(conn->rxbuf, conn->rxbuf+n, conn->rxsz-n); + conn->rxsz -= n; + //fprintf(stderr,"RX <--- %d bytes (sz = %d)\n", n, conn->rxsz); lwipstack->_tcp_recved(conn->pcb, n); - if(conn->rxidx == 0) + if(conn->rxsz == 0){ _phy.setNotifyWritable(conn->sock, false); // Nothing more to be notified about - dwr(MSG_DEBUG," phyOnUnixWritable(): wrote %d bytes from RX buffer to <%x> (idx = %d)\n", n, conn->sock, conn->rxidx); + } + dwr(MSG_ERROR," phyOnUnixWritable(): wrote { %d / %d } bytes from RX buffer to <%x> (sz = %d)\n", n, len, conn->sock, conn->rxsz); } else { perror("\n"); fprintf(stderr, "errno = %d\n", errno); - dwr(MSG_INFO," phyOnUnixWritable(): No data written to stream <%x>\n", conn->sock); + dwr(MSG_ERROR," phyOnUnixWritable(): No data written to stream <%x>\n", conn->sock); } } @@ -605,11 +607,15 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns } conn = getConnection(sock); + + float max = (float)DEFAULT_BUFFER_SIZE; + fprintf(stderr, " ---------------- { TX: %.3f | RX: %.3f } \n", (float)conn->txsz / max, (float)conn->rxsz / max); + if(!conn) return; if(padding_pos == -1) { // [DATA] - memcpy(&conn->txbuf[conn->txidx], buf, wlen); + memcpy(&conn->txbuf[conn->txsz], buf, wlen); } else { // Padding found, implies a token is present // [TOKEN] @@ -621,30 +627,30 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns if(len > TOKEN_SIZE && token_pos == 0) { wlen = len - TOKEN_SIZE; data_start = padding_pos+CANARY_PADDING_SIZE; - memcpy((&conn->txbuf)+conn->txidx, buf+data_start, wlen); + memcpy((&conn->txbuf)+conn->txsz, buf+data_start, wlen); } // [DATA] + [TOKEN] if(len > TOKEN_SIZE && token_pos > 0 && token_pos == len - TOKEN_SIZE) { wlen = len - TOKEN_SIZE; data_start = 0; - memcpy((&conn->txbuf)+conn->txidx, buf+data_start, wlen); + memcpy((&conn->txbuf)+conn->txsz, buf+data_start, wlen); } // [DATA] + [TOKEN] + [DATA] if(len > TOKEN_SIZE && token_pos > 0 && len > (token_pos + TOKEN_SIZE)) { wlen = len - TOKEN_SIZE; data_start = 0; data_end = padding_pos-CANARY_SIZE; - memcpy((&conn->txbuf)+conn->txidx, buf+data_start, (data_end-data_start)+1); - memcpy((&conn->txbuf)+conn->txidx, buf+(padding_pos+CANARY_PADDING_SIZE), len-(token_pos+TOKEN_SIZE)); + memcpy((&conn->txbuf)+conn->txsz, buf+data_start, (data_end-data_start)+1); + memcpy((&conn->txbuf)+conn->txsz, buf+(padding_pos+CANARY_PADDING_SIZE), len-(token_pos+TOKEN_SIZE)); } } } // Write data from stream - if(conn->txidx > (DEFAULT_READ_BUFFER_SIZE / 2)) { + if(conn->txsz > (DEFAULT_BUFFER_SIZE / 2)) { _phy.setNotifyReadable(sock, false); } lwipstack->_lock.lock(); - conn->txidx += wlen; + conn->txsz += wlen; handle_write(conn); lwipstack->_lock.unlock(); } @@ -843,29 +849,25 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf l->tap->closeConnection(l->conn->sock); return ERR_ABRT; } - else { - //dwr(MSG_ERROR," nc_recved(): invalid connection/state\n"); - } return err; } + Mutex::Lock _l(l->tap->_rx_buf_m); // Cycle through pbufs and write them to the RX buffer // The RX buffer will be emptied via phyOnUnixWritable() while(p != NULL) { if(p->len <= 0) break; - int avail = DEFAULT_READ_BUFFER_SIZE - l->conn->rxidx; + int avail = DEFAULT_BUFFER_SIZE - l->conn->rxsz; int len = p->len; - if(avail < len) { - dwr(MSG_DEBUG," nc_recv(): not enough room (%d bytes) on RX buffer\n", avail); - exit(1); - } - memcpy(l->conn->rxbuf + (l->conn->rxidx), p->payload, len); - l->conn->rxidx += len; + if(avail < len) + dwr(MSG_ERROR," nc_recv(): not enough room (%d bytes) on RX buffer\n", avail); + memcpy(l->conn->rxbuf + (l->conn->rxsz), p->payload, len); + l->conn->rxsz += len; l->tap->_phy.setNotifyWritable(l->conn->sock, true); // Signal that we're interested in knowing when we can write p = p->next; tot += len; } - dwr(MSG_DEBUG," nc_recv(): wrote %d bytes to RX buffer for <%x> (idx = %d)\n", tot, l->conn->sock, l->conn->rxidx); + dwr(MSG_ERROR," nc_recv(): wrote %d bytes to RX buffer for <%x> (sz = %d)\n", tot, l->conn->sock, l->conn->rxsz); l->tap->lwipstack->_pbuf_free(q); return ERR_OK; } @@ -984,7 +986,8 @@ err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *tpcb, u16_t len) { Larg *l = (Larg*)arg; if(len) { - if(l->conn->txidx < DEFAULT_READ_BUFFER_SIZE / 2) { + float max = (float)DEFAULT_BUFFER_SIZE; + if(l->conn->txsz < max / 2) { l->tap->_phy.setNotifyReadable(l->conn->sock, true); l->tap->_phy.whack(); } @@ -1339,7 +1342,7 @@ void NetconEthernetTap::handle_connect(PhySocket *sock, PhySocket *rpcsock, TcpC */ void NetconEthernetTap::handle_write(TcpConnection *conn) { - dwr(MSG_DEBUG_EXTRA,"handle_write(): conn->txidx = %d, conn->sock = %x\n", conn->txidx, conn->sock); + dwr(MSG_DEBUG_EXTRA,"handle_write(): conn->txsz = %d, conn->sock = %x\n", conn->txsz, conn->sock); if(!conn) { dwr(MSG_ERROR," handle_write(): invalid connection\n"); return; @@ -1357,15 +1360,15 @@ void NetconEthernetTap::handle_write(TcpConnection *conn) _phy.setNotifyReadable(conn->sock, false); return; } - if(conn->txidx <= 0) { - dwr(MSG_DEBUG,"handle_write(): conn->txidx <= 0, nothing in buffer to write\n"); + if(conn->txsz <= 0) { + dwr(MSG_DEBUG,"handle_write(): conn->txsz <= 0, nothing in buffer to write\n"); return; } if(!conn->listening) lwipstack->_tcp_output(conn->pcb); if(conn->sock) { - r = conn->txidx < sndbuf ? conn->txidx : sndbuf; + r = conn->txsz < sndbuf ? conn->txsz : sndbuf; dwr(MSG_DEBUG,"handle_write(): r = %d, sndbuf = %d\n", r, sndbuf); /* Writes data pulled from the client's socket buffer to LWIP. This merely sends the * data to LWIP to be enqueued and eventually sent to the network. */ @@ -1381,10 +1384,11 @@ void NetconEthernetTap::handle_write(TcpConnection *conn) return; } else { - sz = (conn->txidx)-r; + sz = (conn->txsz)-r; if(sz) memmove(&conn->txbuf, (conn->txbuf+r), sz); - conn->txidx -= r; + conn->txsz -= r; + //fprintf(stderr," TX ---> %d bytes (sz = %d)\n", r, conn->txsz); return; } } diff --git a/netcon/NetconEthernetTap.hpp b/netcon/NetconEthernetTap.hpp index bdb4b6c8..5de458c2 100644 --- a/netcon/NetconEthernetTap.hpp +++ b/netcon/NetconEthernetTap.hpp @@ -174,7 +174,7 @@ private: Mutex _multicastGroups_m; std::vector<InetAddress> _ips; - Mutex _ips_m, _tcpconns_m; + Mutex _ips_m, _tcpconns_m, _rx_buf_m, _tx_buf_m; unsigned int _mtu; volatile bool _enabled; diff --git a/netcon/common.inc.c b/netcon/common.inc.c index 60d2ab3b..858a2195 100644 --- a/netcon/common.inc.c +++ b/netcon/common.inc.c @@ -42,7 +42,7 @@ #ifndef _COMMON_H #define _COMMON_H 1 -#define DEBUG_LEVEL 3 +#define DEBUG_LEVEL 0 #define MSG_WARNING 4 #define MSG_ERROR 1 // Errors |