From 651794575c844fe25a717d77bd088c51383067f0 Mon Sep 17 00:00:00 2001 From: Pablo Neira Ayuso Date: Sun, 19 Jul 2009 15:28:34 +0200 Subject: conntrackd: rework commit not to fork a child process This patch reworks the commit phase to avoid the forking. This is particularly useful in active-active setups in which one node has to commit the external cache while it is receiving new entries to be added in the external cache. This results in really high commit times due to the penalty of the copy-on-write that fork performs. The default number of steps in one run loop is limited to 64 by now. No option to tune this parameter is still available via the configuration file. Signed-off-by: Pablo Neira Ayuso --- include/cache.h | 2 +- include/conntrackd.h | 22 ++++++++++++-- src/cache_iterators.c | 79 +++++++++++++++++++++++++++++++++++++-------------- src/read_config_yy.y | 5 ++++ src/sync-mode.c | 38 ++++++++++++++++--------- 5 files changed, 109 insertions(+), 37 deletions(-) diff --git a/include/cache.h b/include/cache.h index 109e6aa..7e61085 100644 --- a/include/cache.h +++ b/include/cache.h @@ -120,7 +120,7 @@ void cache_iterate_limit(struct cache *c, void *data, uint32_t from, uint32_t st struct nfct_handle; void cache_dump(struct cache *c, int fd, int type); -void cache_commit(struct cache *c, struct nfct_handle *h); +void cache_commit(struct cache *c, struct nfct_handle *h, int clientfd); void cache_flush(struct cache *c); void cache_bulk(struct cache *c); diff --git a/include/conntrackd.h b/include/conntrackd.h index 417bac6..12fd17f 100644 --- a/include/conntrackd.h +++ b/include/conntrackd.h @@ -95,6 +95,9 @@ struct ct_conf { int poll_kernel_secs; int filter_from_kernelspace; int event_iterations_limit; + struct { + int commit_steps; + } general; struct { int type; int prio; @@ -168,12 +171,27 @@ struct ct_sync_state { struct cache *internal; /* internal events cache (netlink) */ struct cache *external; /* external events cache (mcast) */ - struct nfct_handle *commit; - struct multichannel *channel; struct nlif_handle *interface; struct queue *tx_queue; +#define COMMIT_STATE_INACTIVE 0 +#define COMMIT_STATE_MASTER 1 +#define COMMIT_STATE_RELATED 2 + + struct { + int state; + int clientfd; + struct nfct_handle *h; + struct evfd *evfd; + int current; + struct { + int ok; + int fail; + struct timeval start; + } stats; + } commit; + struct alarm_block reset_cache_alarm; struct sync_mode *sync; /* sync mode */ diff --git a/src/cache_iterators.c b/src/cache_iterators.c index b6688e9..c7183fd 100644 --- a/src/cache_iterators.c +++ b/src/cache_iterators.c @@ -21,6 +21,7 @@ #include "log.h" #include "conntrackd.h" #include "netlink.h" +#include "event.h" #include #include @@ -174,37 +175,73 @@ static int do_commit_master(void *data, void *n) return 0; } -/* no need to clone, called from child process */ -void cache_commit(struct cache *c, struct nfct_handle *h) +void cache_commit(struct cache *c, struct nfct_handle *h, int clientfd) { - unsigned int commit_ok = c->stats.commit_ok; - unsigned int commit_fail = c->stats.commit_fail; + unsigned int commit_ok, commit_fail; struct __commit_container tmp = { .h = h, .c = c, }; - struct timeval commit_start, commit_stop, res; + struct timeval commit_stop, res; - gettimeofday(&commit_start, NULL); - /* commit master conntrack first, then related ones */ - hashtable_iterate(c->h, &tmp, do_commit_master); - hashtable_iterate(c->h, &tmp, do_commit_related); - gettimeofday(&commit_stop, NULL); - timersub(&commit_stop, &commit_start, &res); + switch(STATE_SYNC(commit).state) { + case COMMIT_STATE_INACTIVE: + gettimeofday(&STATE_SYNC(commit).stats.start, NULL); + STATE_SYNC(commit).stats.ok = c->stats.commit_ok; + STATE_SYNC(commit).stats.fail = c->stats.commit_fail; + STATE_SYNC(commit).clientfd = clientfd; + case COMMIT_STATE_MASTER: + STATE_SYNC(commit).current = + hashtable_iterate_limit(c->h, &tmp, + STATE_SYNC(commit).current, + CONFIG(general).commit_steps, + do_commit_master); + if (STATE_SYNC(commit).current < CONFIG(hashsize)) { + STATE_SYNC(commit).state = COMMIT_STATE_MASTER; + /* give it another step as soon as possible */ + write_evfd(STATE_SYNC(commit).evfd); + return; + } + STATE_SYNC(commit).current = 0; + STATE_SYNC(commit).state = COMMIT_STATE_RELATED; + case COMMIT_STATE_RELATED: + STATE_SYNC(commit).current = + hashtable_iterate_limit(c->h, &tmp, + STATE_SYNC(commit).current, + CONFIG(general).commit_steps, + do_commit_related); + if (STATE_SYNC(commit).current < CONFIG(hashsize)) { + STATE_SYNC(commit).state = COMMIT_STATE_RELATED; + /* give it another step as soon as possible */ + write_evfd(STATE_SYNC(commit).evfd); + return; + } + /* calculate the time that commit has taken */ + gettimeofday(&commit_stop, NULL); + timersub(&commit_stop, &STATE_SYNC(commit).stats.start, &res); + + /* calculate new entries committed */ + commit_ok = c->stats.commit_ok - STATE_SYNC(commit).stats.ok; + commit_fail = + c->stats.commit_fail - STATE_SYNC(commit).stats.fail; - /* calculate new entries committed */ - commit_ok = c->stats.commit_ok - commit_ok; - commit_fail = c->stats.commit_fail - commit_fail; + /* log results */ + dlog(LOG_NOTICE, "Committed %u new entries", commit_ok); - /* log results */ - dlog(LOG_NOTICE, "Committed %u new entries", commit_ok); + if (commit_fail) + dlog(LOG_NOTICE, "%u entries can't be " + "committed", commit_fail); - if (commit_fail) - dlog(LOG_NOTICE, "%u entries can't be " - "committed", commit_fail); + dlog(LOG_NOTICE, "commit has taken %lu.%06lu seconds", + res.tv_sec, res.tv_usec); - dlog(LOG_NOTICE, "commit has taken %lu.%06lu seconds", - res.tv_sec, res.tv_usec); + /* prepare the state machine for new commits */ + STATE_SYNC(commit).current = 0; + STATE_SYNC(commit).state = COMMIT_STATE_INACTIVE; + + /* Close the client socket now that we're done. */ + close(STATE_SYNC(commit).clientfd); + } } static int do_flush(void *data, void *n) diff --git a/src/read_config_yy.y b/src/read_config_yy.y index cab7799..0e9b99b 100644 --- a/src/read_config_yy.y +++ b/src/read_config_yy.y @@ -1379,6 +1379,11 @@ init_config(char *filename) if (CONFIG(event_iterations_limit) == 0) CONFIG(event_iterations_limit) = 100; + /* default number of bucket of the hashtable that are committed in + one run loop. XXX: no option available to tune this value yet. */ + if (CONFIG(general).commit_steps == 0) + CONFIG(general).commit_steps = 64; + /* if overrun, automatically resync with kernel after 30 seconds */ if (CONFIG(nl_overrun_resync) == 0) CONFIG(nl_overrun_resync) = 30; diff --git a/src/sync-mode.c b/src/sync-mode.c index b0e2b02..7853d91 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -298,12 +298,22 @@ static int init_sync(void) STATE(fds)) == -1) return -1; - STATE_SYNC(commit) = nfct_open(CONNTRACK, 0); - if (STATE_SYNC(commit) == NULL) { + STATE_SYNC(commit).h = nfct_open(CONNTRACK, 0); + if (STATE_SYNC(commit).h == NULL) { dlog(LOG_ERR, "can't create handler to commit"); return -1; } - origin_register(STATE_SYNC(commit), CTD_ORIGIN_COMMIT); + origin_register(STATE_SYNC(commit).h, CTD_ORIGIN_COMMIT); + + STATE_SYNC(commit).evfd = create_evfd(); + if (STATE_SYNC(commit).evfd == NULL) { + dlog(LOG_ERR, "can't create eventfd to commit"); + return -1; + } + if (register_fd(get_read_evfd(STATE_SYNC(commit).evfd), + STATE(fds)) == -1) { + return -1; + } init_alarm(&STATE_SYNC(reset_cache_alarm), NULL, do_reset_cache_alarm); @@ -329,6 +339,11 @@ static void run_sync(fd_set *readfds) if (FD_ISSET(nlif_fd(STATE_SYNC(interface)), readfds)) interface_handler(); + if (FD_ISSET(get_read_evfd(STATE_SYNC(commit).evfd), readfds)) { + read_evfd(STATE_SYNC(commit).evfd); + cache_commit(STATE_SYNC(external), STATE_SYNC(commit).h, 0); + } + /* flush pending messages */ multichannel_send_flush(STATE_SYNC(channel)); } @@ -344,8 +359,9 @@ static void kill_sync(void) queue_destroy(STATE_SYNC(tx_queue)); - origin_unregister(STATE_SYNC(commit)); - nfct_close(STATE_SYNC(commit)); + origin_unregister(STATE_SYNC(commit).h); + nfct_close(STATE_SYNC(commit).h); + destroy_evfd(STATE_SYNC(commit).evfd); if (STATE_SYNC(sync)->kill) STATE_SYNC(sync)->kill(); @@ -438,14 +454,10 @@ static int local_handler_sync(int fd, int type, void *data) /* delete the reset alarm if any before committing */ del_alarm(&STATE_SYNC(reset_cache_alarm)); - /* fork new process and insert it the process list */ - ret = fork_process_new(CTD_PROC_COMMIT, CTD_PROC_F_EXCL, - NULL, NULL); - if (ret == 0) { - dlog(LOG_NOTICE, "committing external cache"); - cache_commit(STATE_SYNC(external), STATE_SYNC(commit)); - exit(EXIT_SUCCESS); - } + dlog(LOG_NOTICE, "committing external cache"); + cache_commit(STATE_SYNC(external), STATE_SYNC(commit).h, fd); + /* Keep the client socket open, we want synchronous commits. */ + ret = LOCAL_RET_STOLEN; break; case RESET_TIMERS: if (!alarm_pending(&STATE_SYNC(reset_cache_alarm))) { -- cgit v1.2.3