diff options
Diffstat (limited to 'src/sync-mode.c')
-rw-r--r-- | src/sync-mode.c | 173 |
1 files changed, 96 insertions, 77 deletions
diff --git a/src/sync-mode.c b/src/sync-mode.c index 10fdb9e..e69ecfe 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -1,5 +1,5 @@ /* - * (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org> + * (C) 2006-2012 by Pablo Neira Ayuso <pablo@netfilter.org> * (C) 2011 by Vyatta Inc. <http://www.vyatta.com> * * This program is free software; you can redistribute it and/or modify @@ -78,7 +78,7 @@ static struct nf_expect *msg2exp_alloc(struct nethdr *net, size_t remain) } static void -do_channel_handler_step(int i, struct nethdr *net, size_t remain) +do_channel_handler_step(struct channel *c, struct nethdr *net, size_t remain) { struct nf_conntrack *ct = NULL; struct nf_expect *exp = NULL; @@ -91,10 +91,10 @@ do_channel_handler_step(int i, struct nethdr *net, size_t remain) switch (STATE_SYNC(sync)->recv(net)) { case MSG_DATA: - multichannel_change_current_channel(STATE_SYNC(channel), i); + multichannel_change_current_channel(STATE_SYNC(channel), c); break; case MSG_CTL: - multichannel_change_current_channel(STATE_SYNC(channel), i); + multichannel_change_current_channel(STATE_SYNC(channel), c); return; case MSG_BAD: STATE_SYNC(error).msg_rcv_malformed++; @@ -175,7 +175,7 @@ static int channel_stream(struct channel *m, const char *ptr, ssize_t remain) } /* handler for messages received */ -static int channel_handler_routine(struct channel *m, int i) +static int channel_handler_routine(struct channel *m) { ssize_t numbytes; ssize_t remain, pending = cur - __net; @@ -242,7 +242,7 @@ static int channel_handler_routine(struct channel *m, int i) HDR_NETWORK2HOST(net); - do_channel_handler_step(i, net, remain); + do_channel_handler_step(m, net, remain); ptr += net->len; remain -= net->len; } @@ -250,12 +250,13 @@ static int channel_handler_routine(struct channel *m, int i) } /* handler for messages received */ -static void channel_handler(struct channel *m, int i) +static void channel_handler(void *data) { + struct channel *c = data; int k; for (k=0; k<CONFIG(event_iterations_limit); k++) { - if (channel_handler_routine(m, i) == -1) { + if (channel_handler_routine(c) == -1) { break; } } @@ -284,7 +285,7 @@ static void interface_candidate(void) dlog(LOG_ERR, "no dedicated links available!"); } -static void interface_handler(void) +static void interface_handler(void *data) { int idx = multichannel_get_current_ifindex(STATE_SYNC(channel)); unsigned int flags; @@ -304,13 +305,60 @@ static void do_reset_cache_alarm(struct alarm_block *a, void *data) * meanwhile the parent process handles events. */ if (fork_process_new(CTD_PROC_FLUSH, CTD_PROC_F_EXCL, NULL, NULL) == 0) { - nl_flush_conntrack_table(STATE(flush)); + nl_flush_conntrack_table_selective(); exit(EXIT_SUCCESS); } /* this is not required if events don't get lost */ STATE(mode)->internal->ct.flush(); } +static void commit_cb(void *data) +{ + int ret; + + read_evfd(STATE_SYNC(commit).evfd); + + ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0); + if (ret == 0) { + /* we still have things in the callback queue. */ + if (STATE_SYNC(commit).rq[1].cb) { + int fd = STATE_SYNC(commit).clientfd; + + STATE_SYNC(commit).rq[0].cb = + STATE_SYNC(commit).rq[1].cb; + + STATE_SYNC(commit).rq[1].cb = NULL; + + STATE_SYNC(commit).clientfd = -1; + STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, fd); + } else { + /* Close the client socket now, we're done. */ + close(STATE_SYNC(commit).clientfd); + STATE_SYNC(commit).clientfd = -1; + } + } +} + +static void channel_accept_cb(void *data) +{ + struct channel *c = data; + int fd; + + fd = channel_accept(data); + if (fd < 0) + return; + + register_fd(fd, channel_handler, c, STATE(fds)); +} + +static void tx_queue_cb(void *data) +{ + STATE_SYNC(sync)->xmit(); + + /* flush pending messages */ + multichannel_send_flush(STATE_SYNC(channel)); +} + static int init_sync(void) { int i; @@ -370,8 +418,19 @@ static int init_sync(void) for (i=0; i<STATE_SYNC(channel)->channel_num; i++) { int fd = channel_get_fd(STATE_SYNC(channel)->channel[i]); fcntl(fd, F_SETFL, O_NONBLOCK); - if (register_fd(fd, STATE(fds)) == -1) - return -1; + + switch(channel_type(STATE_SYNC(channel)->channel[i])) { + case CHANNEL_T_STREAM: + register_fd(fd, channel_accept_cb, + STATE_SYNC(channel)->channel[i], + STATE(fds)); + break; + case CHANNEL_T_DATAGRAM: + register_fd(fd, channel_handler, + STATE_SYNC(channel)->channel[i], + STATE(fds)); + break; + } } STATE_SYNC(interface) = nl_init_interface_handler(); @@ -379,7 +438,8 @@ static int init_sync(void) dlog(LOG_ERR, "can't open interface watcher"); return -1; } - if (register_fd(nlif_fd(STATE_SYNC(interface)), STATE(fds)) == -1) + if (register_fd(nlif_fd(STATE_SYNC(interface)), + interface_handler, NULL, STATE(fds)) == -1) return -1; STATE_SYNC(tx_queue) = queue_create("txqueue", INT_MAX, QUEUE_F_EVFD); @@ -387,8 +447,8 @@ static int init_sync(void) dlog(LOG_ERR, "cannot create tx queue"); return -1; } - if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)), - STATE(fds)) == -1) + if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)), + tx_queue_cb, NULL, STATE(fds)) == -1) return -1; STATE_SYNC(commit).h = nfct_open(CONFIG(netlink).subsys_id, 0); @@ -404,7 +464,7 @@ static int init_sync(void) return -1; } if (register_fd(get_read_evfd(STATE_SYNC(commit).evfd), - STATE(fds)) == -1) { + commit_cb, NULL, STATE(fds)) == -1) { return -1; } STATE_SYNC(commit).clientfd = -1; @@ -417,61 +477,6 @@ static int init_sync(void) return 0; } -static void channel_check(struct channel *c, int i, fd_set *readfds) -{ - /* In case that this channel is connection-oriented. */ - if (channel_accept_isset(c, readfds)) - channel_accept(c); - - /* For data handling. */ - if (channel_isset(c, readfds)) - channel_handler(c, i); -} - -static void run_sync(fd_set *readfds) -{ - int i; - - for (i=0; i<STATE_SYNC(channel)->channel_num; i++) - channel_check(STATE_SYNC(channel)->channel[i], i, readfds); - - if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds)) - STATE_SYNC(sync)->xmit(); - - if (FD_ISSET(nlif_fd(STATE_SYNC(interface)), readfds)) - interface_handler(); - - if (FD_ISSET(get_read_evfd(STATE_SYNC(commit).evfd), readfds)) { - int ret; - - read_evfd(STATE_SYNC(commit).evfd); - - ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0); - if (ret == 0) { - /* we still have things in the callback queue. */ - if (STATE_SYNC(commit).rq[1].cb) { - int fd = STATE_SYNC(commit).clientfd; - - STATE_SYNC(commit).rq[0].cb = - STATE_SYNC(commit).rq[1].cb; - - STATE_SYNC(commit).rq[1].cb = NULL; - - STATE_SYNC(commit).clientfd = -1; - STATE_SYNC(commit).rq[0].cb( - STATE_SYNC(commit).h, fd); - } else { - /* Close the client socket now, we're done. */ - close(STATE_SYNC(commit).clientfd); - STATE_SYNC(commit).clientfd = -1; - } - } - } - - /* flush pending messages */ - multichannel_send_flush(STATE_SYNC(channel)); -} - static void kill_sync(void) { STATE(mode)->internal->close(); @@ -608,6 +613,12 @@ static int local_handler_sync(int fd, int type, void *data) } break; case CT_FLUSH_CACHE: + /* if we're still committing, abort this command */ + if (STATE_SYNC(commit).clientfd != -1) { + dlog(LOG_ERR, "ignoring flush command, " + "commit still in progress"); + break; + } /* inmediate flush, remove pending flush scheduled if any */ del_alarm(&STATE_SYNC(reset_cache_alarm)); dlog(LOG_NOTICE, "flushing caches"); @@ -621,12 +632,15 @@ static int local_handler_sync(int fd, int type, void *data) STATE(mode)->internal->ct.flush(); break; case CT_FLUSH_EXT_CACHE: + /* if we're still committing, abort this command */ + if (STATE_SYNC(commit).clientfd != -1) { + dlog(LOG_ERR, "ignoring flush command, " + "commit still in progress"); + break; + } dlog(LOG_NOTICE, "flushing external cache"); STATE_SYNC(external)->ct.flush(); break; - case KILL: - killer(0); - break; case STATS: STATE(mode)->internal->ct.stats(fd); STATE_SYNC(external)->ct.stats(fd); @@ -684,9 +698,15 @@ static int local_handler_sync(int fd, int type, void *data) dlog(LOG_NOTICE, "committing expectation cache"); STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->exp.commit; STATE_SYNC(commit).rq[1].cb = NULL; - local_commit(fd); + ret = local_commit(fd); break; case ALL_FLUSH_CACHE: + /* if we're still committing, abort this command */ + if (STATE_SYNC(commit).clientfd != -1) { + dlog(LOG_ERR, "ignoring flush command, " + "commit still in progress"); + break; + } dlog(LOG_NOTICE, "flushing caches"); STATE(mode)->internal->ct.flush(); STATE_SYNC(external)->ct.flush(); @@ -704,7 +724,7 @@ static int local_handler_sync(int fd, int type, void *data) } else { STATE_SYNC(commit).rq[1].cb = NULL; } - local_commit(fd); + ret = local_commit(fd); break; case EXP_DUMP_INT_XML: if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) { @@ -729,7 +749,6 @@ static int local_handler_sync(int fd, int type, void *data) struct ct_mode sync_mode = { .init = init_sync, - .run = run_sync, .local = local_handler_sync, .kill = kill_sync, /* the internal handler is set in run-time. */ |