diff options
Diffstat (limited to 'src/sync-mode.c')
-rw-r--r-- | src/sync-mode.c | 65 |
1 files changed, 52 insertions, 13 deletions
diff --git a/src/sync-mode.c b/src/sync-mode.c index 174df80..6781f10 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -98,39 +98,70 @@ do_channel_handler_step(int i, struct nethdr *net, size_t remain) } } +static char __net[65536]; /* XXX: maximum MTU for IPv4 */ +static char *cur = __net; + +static int channel_stream(struct channel *m, const char *ptr, ssize_t remain) +{ + if (m->channel_flags & CHANNEL_F_STREAM) { + /* truncated data. */ + memcpy(__net, ptr, remain); + cur = __net + remain; + return 1; + } + return 0; +} + /* handler for messages received */ static int channel_handler_routine(struct channel *m, int i) { ssize_t numbytes; - ssize_t remain; - char __net[65536], *ptr = __net; /* XXX: maximum MTU for IPv4 */ + ssize_t remain, pending = cur - __net; + char *ptr = __net; - numbytes = channel_recv(m, __net, sizeof(__net)); + numbytes = channel_recv(m, cur, sizeof(__net) - pending); if (numbytes <= 0) return -1; remain = numbytes; + if (pending) { + remain += pending; + cur = __net; + } + while (remain > 0) { struct nethdr *net = (struct nethdr *) ptr; int len; if (remain < NETHDR_SIZ) { - STATE_SYNC(error).msg_rcv_malformed++; - STATE_SYNC(error).msg_rcv_truncated++; + if (!channel_stream(m, ptr, remain)) { + STATE_SYNC(error).msg_rcv_malformed++; + STATE_SYNC(error).msg_rcv_truncated++; + } break; } len = ntohs(net->len); - if (len > remain || len <= 0) { + if (len <= 0) { STATE_SYNC(error).msg_rcv_malformed++; STATE_SYNC(error).msg_rcv_bad_size++; break; } + if (len > remain) { + if (!channel_stream(m, ptr, remain)) { + STATE_SYNC(error).msg_rcv_malformed++; + STATE_SYNC(error).msg_rcv_bad_size++; + } + break; + } + if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { if (remain < NETHDR_ACK_SIZ) { - STATE_SYNC(error).msg_rcv_malformed++; - STATE_SYNC(error).msg_rcv_truncated++; + if (!channel_stream(m, ptr, remain)) { + STATE_SYNC(error).msg_rcv_malformed++; + STATE_SYNC(error).msg_rcv_truncated++; + } break; } @@ -322,15 +353,23 @@ static int init_sync(void) return 0; } +static void channel_check(struct channel *c, int i, fd_set *readfds) +{ + /* In case that this channel is connection-oriented. */ + if (channel_accept_isset(c, readfds)) + channel_accept(c); + + /* For data handling. */ + if (channel_isset(c, readfds)) + channel_handler(c, i); +} + static void run_sync(fd_set *readfds) { int i; - for (i=0; i<STATE_SYNC(channel)->channel_num; i++) { - int fd = channel_get_fd(STATE_SYNC(channel)->channel[i]); - if (FD_ISSET(fd, readfds)) - channel_handler(STATE_SYNC(channel)->channel[i], i); - } + for (i=0; i<STATE_SYNC(channel)->channel_num; i++) + channel_check(STATE_SYNC(channel)->channel[i], i, readfds); if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds)) STATE_SYNC(sync)->xmit(); |