From 1250135046b96f2778bda51517c8a722171a6c16 Mon Sep 17 00:00:00 2001 From: Pablo Neira Ayuso Date: Tue, 24 Apr 2012 10:55:33 +0200 Subject: conntrackd: generalize file descriptor infrastructure This patch generalizes the select-based file descriptor infrastructure by allowing you to register file descriptors and its callbacks. Instead of hardcoding the descriptors that needs to be checked. Now, struct fds_item contains a callback and pointer to data that is passed to it: struct fds_item { struct list_head head; int fd; + void (*cb)(void *data); + void *data; }; Then, we check which ones are active in the select_main_step() function: list_for_each_entry(cur, &STATE(fds)->list, head) { if (FD_ISSET(cur->fd, &readfds)) cur->cb(cur->data); } And it invoked the corresponding callback. I had to slightly modify the channel infrastructure to fit it into the changes. This modularity is required for the upcoming cthelper support. Signed-off-by: Pablo Neira Ayuso --- src/channel.c | 5 ++ src/channel_tcp.c | 1 + src/fds.c | 61 ++++++++++++- src/main.c | 2 +- src/multichannel.c | 7 +- src/run.c | 251 ++++++++++++++++++++--------------------------------- src/stats-mode.c | 1 - src/sync-mode.c | 146 ++++++++++++++++--------------- src/tcp.c | 1 - 9 files changed, 236 insertions(+), 239 deletions(-) (limited to 'src') diff --git a/src/channel.c b/src/channel.c index 818bb01..8b7c319 100644 --- a/src/channel.c +++ b/src/channel.c @@ -310,3 +310,8 @@ int channel_accept(struct channel *c) { return c->ops->accept(c); } + +int channel_type(struct channel *c) +{ + return c->ops->type; +} diff --git a/src/channel_tcp.c b/src/channel_tcp.c index f132840..a84603c 100644 --- a/src/channel_tcp.c +++ b/src/channel_tcp.c @@ -137,6 +137,7 @@ channel_tcp_accept(struct channel *c) struct channel_ops channel_tcp = { .headersiz = 40, /* IP header (20 bytes) + TCP header 20 (bytes) */ + .type = CHANNEL_T_STREAM, .open = channel_tcp_open, .close = channel_tcp_close, .send = channel_tcp_send, diff --git a/src/fds.c b/src/fds.c index 347eee1..0b95437 100644 --- a/src/fds.c +++ b/src/fds.c @@ -1,5 +1,5 @@ /* - * (C) 2006-2008 by Pablo Neira Ayuso + * (C) 2006-2012 by Pablo Neira Ayuso * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -14,9 +14,16 @@ * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * Part of this code has been sponsored by Vyatta Inc. */ #include #include +#include +#include + +#include "conntrackd.h" +#include "date.h" #include "fds.h" struct fds *create_fds(void) @@ -44,7 +51,7 @@ void destroy_fds(struct fds *fds) free(fds); } -int register_fd(int fd, struct fds *fds) +int register_fd(int fd, void (*cb)(void *data), void *data, struct fds *fds) { struct fds_item *item; @@ -58,7 +65,10 @@ int register_fd(int fd, struct fds *fds) return -1; item->fd = fd; - list_add(&item->head, &fds->list); + item->cb = cb; + item->data = data; + /* Order matters: the descriptors are served in FIFO basis. */ + list_add_tail(&item->head, &fds->list); return 0; } @@ -92,3 +102,48 @@ int unregister_fd(int fd, struct fds *fds) return 0; } +static void select_main_step(struct timeval *next_alarm) +{ + int ret; + fd_set readfds = STATE(fds)->readfds; + struct fds_item *cur, *tmp; + + ret = select(STATE(fds)->maxfd + 1, &readfds, NULL, NULL, next_alarm); + if (ret == -1) { + /* interrupted syscall, retry */ + if (errno == EINTR) + return; + + STATE(stats).select_failed++; + return; + } + + /* signals are racy */ + sigprocmask(SIG_BLOCK, &STATE(block), NULL); + + list_for_each_entry_safe(cur, tmp, &STATE(fds)->list, head) { + if (FD_ISSET(cur->fd, &readfds)) + cur->cb(cur->data); + } + + sigprocmask(SIG_UNBLOCK, &STATE(block), NULL); +} + +void __attribute__((noreturn)) select_main_loop(void) +{ + struct timeval next_alarm; + struct timeval *next = NULL; + + while(1) { + do_gettimeofday(); + + sigprocmask(SIG_BLOCK, &STATE(block), NULL); + if (next != NULL && !timerisset(next)) + next = do_alarm_run(&next_alarm); + else + next = get_next_alarm_run(&next_alarm); + sigprocmask(SIG_UNBLOCK, &STATE(block), NULL); + + select_main_step(next); + } +} diff --git a/src/main.c b/src/main.c index f7803fd..26f6c14 100644 --- a/src/main.c +++ b/src/main.c @@ -406,6 +406,6 @@ int main(int argc, char *argv[]) /* * run main process */ - run(); + select_main_loop(); return 0; } diff --git a/src/multichannel.c b/src/multichannel.c index de69d5c..952b567 100644 --- a/src/multichannel.c +++ b/src/multichannel.c @@ -109,8 +109,9 @@ void multichannel_set_current_channel(struct multichannel *m, int i) m->current = m->channel[i]; } -void multichannel_change_current_channel(struct multichannel *m, int i) +void +multichannel_change_current_channel(struct multichannel *m, struct channel *c) { - if (m->current != m->channel[i]) - m->current = m->channel[i]; + if (m->current != c) + m->current = c; } diff --git a/src/run.c b/src/run.c index 26c1783..171dee5 100644 --- a/src/run.c +++ b/src/run.c @@ -1,5 +1,5 @@ /* - * (C) 2006-2011 by Pablo Neira Ayuso + * (C) 2006-2012 by Pablo Neira Ayuso * (C) 2011 by Vyatta Inc. * * This program is free software; you can redistribute it and/or modify @@ -16,7 +16,7 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * - * Description: run and init functions + * Part of this code has been sponsored by Vyatta Inc. */ #include "conntrackd.h" @@ -460,6 +460,87 @@ static int exp_get_handler(enum nf_conntrack_msg_type type, return NFCT_CB_CONTINUE; } +/* order received via UNIX socket */ +static void local_cb(void *data) +{ + do_local_server_step(&STATE(local), NULL, local_handler); +} + +/* we have received an event from ctnetlink */ +static void event_cb(void *data) +{ + int ret; + + ret = nfct_catch(STATE(event)); + /* reset event iteration limit counter */ + STATE(event_iterations_limit) = CONFIG(event_iterations_limit); + if (ret == -1) { + switch(errno) { + case ENOBUFS: + /* We have hit ENOBUFS, it's likely that we are + * losing events. Two possible situations may + * trigger this error: + * + * 1) The netlink receiver buffer is too small: + * increasing the netlink buffer size should + * be enough. However, some event messages + * got lost. We have to resync ourselves + * with the kernel table conntrack table to + * resolve the inconsistency. + * + * 2) The receiver is too slow to process the + * netlink messages so that the queue gets + * full quickly. This generally happens + * if the system is under heavy workload + * (busy CPU). In this case, increasing the + * size of the netlink receiver buffer + * would not help anymore since we would + * be delaying the overrun. Moreover, we + * should avoid resynchronizations. We + * should do our best here and keep + * replicating as much states as possible. + * If workload lowers at some point, + * we resync ourselves. + */ + nl_resize_socket_buffer(STATE(event)); + if (CONFIG(nl_overrun_resync) > 0 && + STATE(mode)->internal->flags & INTERNAL_F_RESYNC) { + add_alarm(&STATE(resync_alarm), + CONFIG(nl_overrun_resync),0); + } + STATE(stats).nl_catch_event_failed++; + STATE(stats).nl_overrun++; + break; + case ENOENT: + /* + * We received a message from another + * netfilter subsystem that we are not + * interested in. Just ignore it. + */ + break; + case EAGAIN: + /* No more events to receive, try later. */ + break; + default: + STATE(stats).nl_catch_event_failed++; + break; + } + } +} + +/* we previously requested a resync due to buffer overrun. */ +static void resync_cb(void *data) +{ + nfct_catch(STATE(resync)); + if (STATE(mode)->internal->ct.purge) + STATE(mode)->internal->ct.purge(); +} + +static void poll_cb(void *data) +{ + nfct_catch(STATE(resync)); +} + int init(void) { @@ -493,7 +574,7 @@ init(void) dlog(LOG_ERR, "can't open unix socket!"); return -1; } - register_fd(STATE(local).fd, STATE(fds)); + register_fd(STATE(local).fd, local_cb, NULL, STATE(fds)); /* resynchronize (like 'dump' socket) but it also purges old entries */ STATE(resync) = nfct_open(CONFIG(netlink).subsys_id, 0); @@ -507,7 +588,13 @@ init(void) NFCT_T_ALL, STATE(mode)->internal->ct.resync, NULL); - register_fd(nfct_fd(STATE(resync)), STATE(fds)); + if (CONFIG(flags) & CTD_POLL) { + register_fd(nfct_fd(STATE(resync)), poll_cb, + NULL, STATE(fds)); + } else { + register_fd(nfct_fd(STATE(resync)), resync_cb, + NULL, STATE(fds)); + } fcntl(nfct_fd(STATE(resync)), F_SETFL, O_NONBLOCK); if (STATE(mode)->internal->flags & INTERNAL_F_POPULATE) { @@ -590,7 +677,7 @@ init(void) nfexp_callback_register2(STATE(event), NFCT_T_ALL, exp_event_handler, NULL); } - register_fd(nfct_fd(STATE(event)), STATE(fds)); + register_fd(nfct_fd(STATE(event)), event_cb, NULL, STATE(fds)); } /* Signals handling */ @@ -618,157 +705,3 @@ init(void) return 0; } - -static void run_events(struct timeval *next_alarm) -{ - int ret; - fd_set readfds = STATE(fds)->readfds; - - ret = select(STATE(fds)->maxfd + 1, &readfds, NULL, NULL, next_alarm); - if (ret == -1) { - /* interrupted syscall, retry */ - if (errno == EINTR) - return; - - STATE(stats).select_failed++; - return; - } - - /* signals are racy */ - sigprocmask(SIG_BLOCK, &STATE(block), NULL); - - /* order received via UNIX socket */ - if (FD_ISSET(STATE(local).fd, &readfds)) - do_local_server_step(&STATE(local), NULL, local_handler); - - /* we have receive an event from ctnetlink */ - if (FD_ISSET(nfct_fd(STATE(event)), &readfds)) { - ret = nfct_catch(STATE(event)); - /* reset event iteration limit counter */ - STATE(event_iterations_limit) = CONFIG(event_iterations_limit); - if (ret == -1) { - switch(errno) { - case ENOBUFS: - /* We have hit ENOBUFS, it's likely that we are - * losing events. Two possible situations may - * trigger this error: - * - * 1) The netlink receiver buffer is too small: - * increasing the netlink buffer size should - * be enough. However, some event messages - * got lost. We have to resync ourselves - * with the kernel table conntrack table to - * resolve the inconsistency. - * - * 2) The receiver is too slow to process the - * netlink messages so that the queue gets - * full quickly. This generally happens - * if the system is under heavy workload - * (busy CPU). In this case, increasing the - * size of the netlink receiver buffer - * would not help anymore since we would - * be delaying the overrun. Moreover, we - * should avoid resynchronizations. We - * should do our best here and keep - * replicating as much states as possible. - * If workload lowers at some point, - * we resync ourselves. - */ - nl_resize_socket_buffer(STATE(event)); - if (CONFIG(nl_overrun_resync) > 0 && - STATE(mode)->internal->flags & INTERNAL_F_RESYNC) { - add_alarm(&STATE(resync_alarm), - CONFIG(nl_overrun_resync),0); - } - STATE(stats).nl_catch_event_failed++; - STATE(stats).nl_overrun++; - break; - case ENOENT: - /* - * We received a message from another - * netfilter subsystem that we are not - * interested in. Just ignore it. - */ - break; - case EAGAIN: - /* No more events to receive, try later. */ - break; - default: - STATE(stats).nl_catch_event_failed++; - break; - } - } - } - /* we previously requested a resync due to buffer overrun. */ - if (FD_ISSET(nfct_fd(STATE(resync)), &readfds)) { - nfct_catch(STATE(resync)); - if (STATE(mode)->internal->ct.purge) - STATE(mode)->internal->ct.purge(); - } - - if (STATE(mode)->run) - STATE(mode)->run(&readfds); - - sigprocmask(SIG_UNBLOCK, &STATE(block), NULL); -} - -static void run_polling(struct timeval *next_alarm) -{ - int ret; - fd_set readfds = STATE(fds)->readfds; - - ret = select(STATE(fds)->maxfd + 1, &readfds, NULL, NULL, next_alarm); - if (ret == -1) { - /* interrupted syscall, retry */ - if (errno == EINTR) - return; - - STATE(stats).select_failed++; - return; - } - - /* signals are racy */ - sigprocmask(SIG_BLOCK, &STATE(block), NULL); - - /* order received via UNIX socket */ - if (FD_ISSET(STATE(local).fd, &readfds)) - do_local_server_step(&STATE(local), NULL, local_handler); - - /* we requested a dump from the kernel via polling_alarm */ - if (FD_ISSET(nfct_fd(STATE(resync)), &readfds)) - nfct_catch(STATE(resync)); - - if (STATE(mode)->run) - STATE(mode)->run(&readfds); - - sigprocmask(SIG_UNBLOCK, &STATE(block), NULL); -} - -static void __attribute__((noreturn)) -do_run(void (*run_step)(struct timeval *next_alarm)) -{ - struct timeval next_alarm; - struct timeval *next = NULL; - - while(1) { - do_gettimeofday(); - - sigprocmask(SIG_BLOCK, &STATE(block), NULL); - if (next != NULL && !timerisset(next)) - next = do_alarm_run(&next_alarm); - else - next = get_next_alarm_run(&next_alarm); - sigprocmask(SIG_UNBLOCK, &STATE(block), NULL); - - run_step(next); - } -} - -void run(void) -{ - if (CONFIG(flags) & CTD_POLL) { - do_run(run_polling); - } else { - do_run(run_events); - } -} diff --git a/src/stats-mode.c b/src/stats-mode.c index b768033..6b7f08d 100644 --- a/src/stats-mode.c +++ b/src/stats-mode.c @@ -201,7 +201,6 @@ static struct internal_handler internal_cache_stats = { struct ct_mode stats_mode = { .init = init_stats, - .run = NULL, .local = local_handler_stats, .kill = kill_stats, .internal = &internal_cache_stats, diff --git a/src/sync-mode.c b/src/sync-mode.c index 10fdb9e..71c320c 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -1,5 +1,5 @@ /* - * (C) 2006-2011 by Pablo Neira Ayuso + * (C) 2006-2012 by Pablo Neira Ayuso * (C) 2011 by Vyatta Inc. * * This program is free software; you can redistribute it and/or modify @@ -78,7 +78,7 @@ static struct nf_expect *msg2exp_alloc(struct nethdr *net, size_t remain) } static void -do_channel_handler_step(int i, struct nethdr *net, size_t remain) +do_channel_handler_step(struct channel *c, struct nethdr *net, size_t remain) { struct nf_conntrack *ct = NULL; struct nf_expect *exp = NULL; @@ -91,10 +91,10 @@ do_channel_handler_step(int i, struct nethdr *net, size_t remain) switch (STATE_SYNC(sync)->recv(net)) { case MSG_DATA: - multichannel_change_current_channel(STATE_SYNC(channel), i); + multichannel_change_current_channel(STATE_SYNC(channel), c); break; case MSG_CTL: - multichannel_change_current_channel(STATE_SYNC(channel), i); + multichannel_change_current_channel(STATE_SYNC(channel), c); return; case MSG_BAD: STATE_SYNC(error).msg_rcv_malformed++; @@ -175,7 +175,7 @@ static int channel_stream(struct channel *m, const char *ptr, ssize_t remain) } /* handler for messages received */ -static int channel_handler_routine(struct channel *m, int i) +static int channel_handler_routine(struct channel *m) { ssize_t numbytes; ssize_t remain, pending = cur - __net; @@ -242,7 +242,7 @@ static int channel_handler_routine(struct channel *m, int i) HDR_NETWORK2HOST(net); - do_channel_handler_step(i, net, remain); + do_channel_handler_step(m, net, remain); ptr += net->len; remain -= net->len; } @@ -250,12 +250,13 @@ static int channel_handler_routine(struct channel *m, int i) } /* handler for messages received */ -static void channel_handler(struct channel *m, int i) +static void channel_handler(void *data) { + struct channel *c = data; int k; for (k=0; kinternal->ct.flush(); } +static void commit_cb(void *data) +{ + int ret; + + read_evfd(STATE_SYNC(commit).evfd); + + ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0); + if (ret == 0) { + /* we still have things in the callback queue. */ + if (STATE_SYNC(commit).rq[1].cb) { + int fd = STATE_SYNC(commit).clientfd; + + STATE_SYNC(commit).rq[0].cb = + STATE_SYNC(commit).rq[1].cb; + + STATE_SYNC(commit).rq[1].cb = NULL; + + STATE_SYNC(commit).clientfd = -1; + STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, fd); + } else { + /* Close the client socket now, we're done. */ + close(STATE_SYNC(commit).clientfd); + STATE_SYNC(commit).clientfd = -1; + } + } +} + +static void channel_accept_cb(void *data) +{ + struct channel *c = data; + int fd; + + fd = channel_accept(data); + if (fd < 0) + return; + + register_fd(fd, channel_handler, c, STATE(fds)); +} + +static void tx_queue_cb(void *data) +{ + STATE_SYNC(sync)->xmit(); + + /* flush pending messages */ + multichannel_send_flush(STATE_SYNC(channel)); +} + static int init_sync(void) { int i; @@ -370,8 +418,19 @@ static int init_sync(void) for (i=0; ichannel_num; i++) { int fd = channel_get_fd(STATE_SYNC(channel)->channel[i]); fcntl(fd, F_SETFL, O_NONBLOCK); - if (register_fd(fd, STATE(fds)) == -1) - return -1; + + switch(channel_type(STATE_SYNC(channel)->channel[i])) { + case CHANNEL_T_STREAM: + register_fd(fd, channel_accept_cb, + STATE_SYNC(channel)->channel[i], + STATE(fds)); + break; + case CHANNEL_T_DATAGRAM: + register_fd(fd, channel_handler, + STATE_SYNC(channel)->channel[i], + STATE(fds)); + break; + } } STATE_SYNC(interface) = nl_init_interface_handler(); @@ -379,7 +438,8 @@ static int init_sync(void) dlog(LOG_ERR, "can't open interface watcher"); return -1; } - if (register_fd(nlif_fd(STATE_SYNC(interface)), STATE(fds)) == -1) + if (register_fd(nlif_fd(STATE_SYNC(interface)), + interface_handler, NULL, STATE(fds)) == -1) return -1; STATE_SYNC(tx_queue) = queue_create("txqueue", INT_MAX, QUEUE_F_EVFD); @@ -387,8 +447,8 @@ static int init_sync(void) dlog(LOG_ERR, "cannot create tx queue"); return -1; } - if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)), - STATE(fds)) == -1) + if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)), + tx_queue_cb, NULL, STATE(fds)) == -1) return -1; STATE_SYNC(commit).h = nfct_open(CONFIG(netlink).subsys_id, 0); @@ -404,7 +464,7 @@ static int init_sync(void) return -1; } if (register_fd(get_read_evfd(STATE_SYNC(commit).evfd), - STATE(fds)) == -1) { + commit_cb, NULL, STATE(fds)) == -1) { return -1; } STATE_SYNC(commit).clientfd = -1; @@ -417,61 +477,6 @@ static int init_sync(void) return 0; } -static void channel_check(struct channel *c, int i, fd_set *readfds) -{ - /* In case that this channel is connection-oriented. */ - if (channel_accept_isset(c, readfds)) - channel_accept(c); - - /* For data handling. */ - if (channel_isset(c, readfds)) - channel_handler(c, i); -} - -static void run_sync(fd_set *readfds) -{ - int i; - - for (i=0; ichannel_num; i++) - channel_check(STATE_SYNC(channel)->channel[i], i, readfds); - - if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds)) - STATE_SYNC(sync)->xmit(); - - if (FD_ISSET(nlif_fd(STATE_SYNC(interface)), readfds)) - interface_handler(); - - if (FD_ISSET(get_read_evfd(STATE_SYNC(commit).evfd), readfds)) { - int ret; - - read_evfd(STATE_SYNC(commit).evfd); - - ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0); - if (ret == 0) { - /* we still have things in the callback queue. */ - if (STATE_SYNC(commit).rq[1].cb) { - int fd = STATE_SYNC(commit).clientfd; - - STATE_SYNC(commit).rq[0].cb = - STATE_SYNC(commit).rq[1].cb; - - STATE_SYNC(commit).rq[1].cb = NULL; - - STATE_SYNC(commit).clientfd = -1; - STATE_SYNC(commit).rq[0].cb( - STATE_SYNC(commit).h, fd); - } else { - /* Close the client socket now, we're done. */ - close(STATE_SYNC(commit).clientfd); - STATE_SYNC(commit).clientfd = -1; - } - } - } - - /* flush pending messages */ - multichannel_send_flush(STATE_SYNC(channel)); -} - static void kill_sync(void) { STATE(mode)->internal->close(); @@ -729,7 +734,6 @@ static int local_handler_sync(int fd, int type, void *data) struct ct_mode sync_mode = { .init = init_sync, - .run = run_sync, .local = local_handler_sync, .kill = kill_sync, /* the internal handler is set in run-time. */ diff --git a/src/tcp.c b/src/tcp.c index f6b05ef..af27c46 100644 --- a/src/tcp.c +++ b/src/tcp.c @@ -264,7 +264,6 @@ int tcp_accept(struct tcp_sock *m) m->client_fd = ret; m->state = TCP_SERVER_CONNECTED; - register_fd(m->client_fd, STATE(fds)); } return m->client_fd; } -- cgit v1.2.3