diff options
Diffstat (limited to 'netcon')
-rw-r--r-- | netcon/NetconEthernetTap.cpp | 91 | ||||
-rw-r--r-- | netcon/NetconEthernetTap.hpp | 1 | ||||
-rw-r--r-- | netcon/common.inc.c | 2 |
3 files changed, 43 insertions, 51 deletions
diff --git a/netcon/NetconEthernetTap.cpp b/netcon/NetconEthernetTap.cpp index 7895df0d..23f86d5e 100644 --- a/netcon/NetconEthernetTap.cpp +++ b/netcon/NetconEthernetTap.cpp @@ -56,8 +56,8 @@ #define APPLICATION_POLL_FREQ 20 #define ZT_LWIP_TCP_TIMER_INTERVAL 5 -#define STATUS_TMR_INTERVAL 1000 // How often we check connection statuses (in ms) -#define DEFAULT_READ_BUFFER_SIZE 1024 * 1024 +#define STATUS_TMR_INTERVAL 10000 // How often we check connection statuses (in ms) +#define DEFAULT_READ_BUFFER_SIZE 1024 * 1024 * 5 namespace ZeroTier { @@ -348,7 +348,6 @@ TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock) void NetconEthernetTap::threadMain() throw() { - dwr(MSG_DEBUG, "MEMP_NUM_REASSDATA = %d\n", MEMP_NUM_REASSDATA); uint64_t prev_tcp_time = 0; uint64_t prev_status_time = 0; uint64_t prev_etharp_time = 0; @@ -364,11 +363,12 @@ void NetconEthernetTap::threadMain() uint64_t status_remaining = STATUS_TMR_INTERVAL; // Connection prunning + /* if (since_status >= STATUS_TMR_INTERVAL) { prev_status_time = now; status_remaining = STATUS_TMR_INTERVAL - since_status; - dwr(MSG_DEBUG," tap_thread(): tcp\\jobs\\socks = {%d, %d, %d}\n", tcp_connections.size(), jobmap.size(), sockmap.size()); + dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", tcp_connections.size(), jobmap.size()); for(size_t i=0; i<tcp_connections.size(); i++) { // No TCP connections are associated, this is a candidate for removal @@ -376,13 +376,6 @@ void NetconEthernetTap::threadMain() continue; // Skip, this is a pending connection int fd = _phy.getDescriptor(tcp_connections[i]->sock); - if(tcp_connections[i]->idx > 0){ - dwr(MSG_DEBUG, "writing from poll\n"); - lwipstack->_lock.lock(); - handle_write(tcp_connections[i]); - lwipstack->_lock.unlock(); - } - fcntl(fd, F_SETFL, O_NONBLOCK); unsigned char tmpbuf[BUF_SZ]; int n; @@ -399,11 +392,23 @@ void NetconEthernetTap::threadMain() phyOnUnixData(tcp_connections[i]->sock,_phy.getuptr(tcp_connections[i]->sock),&tmpbuf,BUF_SZ); } } - } + }*/ // Main TCP/ETHARP timer section if (since_tcp >= ZT_LWIP_TCP_TIMER_INTERVAL) { prev_tcp_time = now; lwipstack->tcp_tmr(); + + // Makeshift poll + + for(size_t i=0; i<tcp_connections.size(); i++) { + if(tcp_connections[i]->idx > 0){ + //dwr(MSG_DEBUG, "writing from poll\n"); + lwipstack->_lock.lock(); + handle_write(tcp_connections[i]); + lwipstack->_lock.unlock(); + } + } + } else { tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL - since_tcp; } @@ -463,8 +468,7 @@ void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) { /* * Handles data on a client's data buffer. Data is sent to LWIP to be enqueued. */ -void NetconEthernetTap::phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) -{ +void NetconEthernetTap::phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) { dwr(MSG_DEBUG,"\nphyOnFileDescriptorActivity(): new connection = %x\n", sock); } @@ -502,11 +506,9 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns unsigned char *buf = (unsigned char*)data; std::pair<PhySocket*, void*> sockdata; PhySocket *streamsock, *rpcsock; - bool found_sock = false, found_job = false; - + bool found_job = false; TcpConnection *conn; - int max_sndbuf = (float)TCP_SND_BUF; int wlen = len; // RPC @@ -527,17 +529,9 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns //return; // Don't close the socket, we'll use this later for data } else { // All RPCs other than RPC_SOCKET - streamsock = sockmap[magic_num]; - if(streamsock){ // We found a pre-existing stream socket for this RPC - sockmap[magic_num] = NULL; - found_sock = true; - dwr(MSG_DEBUG," <%x> found_sock\n", sock); - } - else { - // No matching stream has been encountered, create jobmap entry - dwr(MSG_DEBUG," <%x> creating jobmap (cmd=%d) entry for %llu\n", sock, cmd, magic_num); - jobmap[magic_num] = std::make_pair<PhySocket*, void*>(sock, data); - } + // No matching stream has been encountered, create jobmap entry + dwr(MSG_DEBUG," <%x> creating jobmap (cmd=%d) entry for %llu\n", sock, cmd, magic_num); + jobmap[magic_num] = std::make_pair<PhySocket*, void*>(sock, data); } write(_phy.getDescriptor(sock), "z", 1); // RPC ACK byte to maintain RPC->Stream order } @@ -555,14 +549,13 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns dwr(MSG_DEBUG, " <%x> padding_pos = %d\n", sock, padding_pos); // Grab token, next we'll use it to look up an RPC job if(token_pos > -1) { - dwr(MSG_DEBUG, " <%x> token_pos = %d, GRABBING TOKEN\n", sock, token_pos); memcpy(&magic_num, buf+token_pos, MAGIC_SIZE); if(magic_num != 0) { // TODO: Added to address magic_num==0 bug, last seeen 20160108 // Find job sockdata = jobmap[magic_num]; if(!sockdata.first) { // Stream before RPC - dwr(MSG_DEBUG," <%x> creating sockmap entry for %llu\n", sock, magic_num); - sockmap[magic_num] = sock; + dwr(MSG_DEBUG," <%x> unable to locate job entry for %llu\n", sock, magic_num); + return; } else found_job = true; @@ -587,14 +580,12 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns wlen = len - TOKEN_SIZE; data_start = padding_pos+MAGIC_PADDING_SIZE; memcpy((&conn->buf)+conn->idx, buf+data_start, wlen); - dwr(MSG_DEBUG," wlen = %d, data_start = %d\n", wlen, data_start); } // [DATA] + [TOKEN] if(len > TOKEN_SIZE && token_pos > 0 && token_pos == len - TOKEN_SIZE) { wlen = len - TOKEN_SIZE; data_start = 0; memcpy((&conn->buf)+conn->idx, buf+data_start, wlen); - dwr(MSG_DEBUG," wlen = %d, data_start = %d\n", wlen, data_start); } // [DATA] + [TOKEN] + [DATA] if(len > TOKEN_SIZE && token_pos > 0 && len > (token_pos + TOKEN_SIZE)) { @@ -603,10 +594,15 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns data_end = padding_pos-MAGIC_SIZE; memcpy((&conn->buf)+conn->idx, buf+data_start, (data_end-data_start)+1); memcpy((&conn->buf)+conn->idx, buf+(padding_pos+MAGIC_PADDING_SIZE), len-(token_pos+TOKEN_SIZE)); - dwr(MSG_DEBUG," wlen = %d, data_start = %d, data_end = %d\n", wlen, data_start, data_end); } } } + + // Write data from stream + if(conn->idx > (DEFAULT_READ_BUFFER_SIZE / 2)) { + dwr(MSG_DEBUG,"Buffer near full. Slowing\n"); + _phy.setNotifyReadable(sock, false); + } lwipstack->_lock.lock(); conn->idx += wlen; handle_write(conn); @@ -617,17 +613,12 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns rpcsock = sockdata.first; buf = (unsigned char*)sockdata.second; } - else if(found_sock) { - rpcsock = sock; - sock = streamsock; - } - // Process RPC if we have a corresponding jobmap/sockmap entry - if(found_job || found_sock) - { + // Process RPC if we have a corresponding jobmap entry + if(found_job) { conn = getConnection(sock); unload_rpc(buf, pid, tid, rpc_count, timestamp, magic, cmd, payload); - dwr(MSG_DEBUG," <%x> RPC: (pid=, tid=, rpc_count=, timestamp=, cmd=%d)\n", sock, /*pid, tid, rpc_count, timestamp, */cmd); + //dwr(MSG_DEBUG," <%x> RPC: (pid=, tid=, rpc_count=, timestamp=, cmd=%d)\n", sock, /*pid, tid, rpc_count, timestamp, */cmd); switch(cmd) { case RPC_BIND: @@ -659,7 +650,6 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns } closeConnection(sockdata.first); // close RPC after sending retval, no longer needed jobmap.erase(magic_num); - sockmap.erase(magic_num); return; } } @@ -949,10 +939,13 @@ err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *tpcb, u16_t len) { Larg *l = (Larg*)arg; if(len) { - dwr(MSG_DEBUG,"nc_sent(ACKED): len = %d\n",len); - l->conn->acked+=len; - l->tap->_phy.setNotifyReadable(l->conn->sock, true); - l->tap->_phy.whack(); + //dwr(MSG_DEBUG,"nc_sent(ACKED): len = %d\n",len); + //l->conn->acked+=len; + if(l->conn->idx < DEFAULT_READ_BUFFER_SIZE / 2) + { + l->tap->_phy.setNotifyReadable(l->conn->sock, true); + l->tap->_phy.whack(); + } } return ERR_OK; } @@ -1292,7 +1285,7 @@ void NetconEthernetTap::handle_write(TcpConnection *conn) dwr(MSG_ERROR," handle_write(): could not locate connection for this fd\n"); return; } - dwr(MSG_DEBUG,"conn->idx = %d, TCP_SND_BUF = %d\n", conn->idx, TCP_SND_BUF); + //dwr(MSG_DEBUG,"conn->idx = %d, TCP_SND_BUF = %d\n", conn->idx, TCP_SND_BUF); if(!conn->pcb) { dwr(MSG_ERROR," handle_write(): conn->pcb == NULL. Failed to write.\n"); return; @@ -1310,7 +1303,7 @@ void NetconEthernetTap::handle_write(TcpConnection *conn) if(conn->sock && !conn->listening) { r = conn->idx < sndbuf ? conn->idx : sndbuf; - dwr(MSG_DEBUG,"handle_write(): r = %d\n", r); + //dwr(MSG_DEBUG,"handle_write(): r = %d\n", r); /* Writes data pulled from the client's socket buffer to LWIP. This merely sends the * data to LWIP to be enqueued and eventually sent to the network. */ if(r > 0) { diff --git a/netcon/NetconEthernetTap.hpp b/netcon/NetconEthernetTap.hpp index e0519bfa..525b20cf 100644 --- a/netcon/NetconEthernetTap.hpp +++ b/netcon/NetconEthernetTap.hpp @@ -158,7 +158,6 @@ private: std::map<PhySocket*, pid_t> pidmap; std::map<uint64_t, std::pair<PhySocket*, void*> > jobmap; - std::map<uint64_t, PhySocket*> sockmap; pid_t rpc_counter; netif interface; diff --git a/netcon/common.inc.c b/netcon/common.inc.c index 60d2ab3b..669dff22 100644 --- a/netcon/common.inc.c +++ b/netcon/common.inc.c @@ -42,7 +42,7 @@ #ifndef _COMMON_H #define _COMMON_H 1 -#define DEBUG_LEVEL 3 +#define DEBUG_LEVEL 1 #define MSG_WARNING 4 #define MSG_ERROR 1 // Errors |