summaryrefslogtreecommitdiff
path: root/src/sync-mode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-mode.c')
-rw-r--r--src/sync-mode.c173
1 files changed, 96 insertions, 77 deletions
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 10fdb9e..e69ecfe 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -1,5 +1,5 @@
/*
- * (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org>
+ * (C) 2006-2012 by Pablo Neira Ayuso <pablo@netfilter.org>
* (C) 2011 by Vyatta Inc. <http://www.vyatta.com>
*
* This program is free software; you can redistribute it and/or modify
@@ -78,7 +78,7 @@ static struct nf_expect *msg2exp_alloc(struct nethdr *net, size_t remain)
}
static void
-do_channel_handler_step(int i, struct nethdr *net, size_t remain)
+do_channel_handler_step(struct channel *c, struct nethdr *net, size_t remain)
{
struct nf_conntrack *ct = NULL;
struct nf_expect *exp = NULL;
@@ -91,10 +91,10 @@ do_channel_handler_step(int i, struct nethdr *net, size_t remain)
switch (STATE_SYNC(sync)->recv(net)) {
case MSG_DATA:
- multichannel_change_current_channel(STATE_SYNC(channel), i);
+ multichannel_change_current_channel(STATE_SYNC(channel), c);
break;
case MSG_CTL:
- multichannel_change_current_channel(STATE_SYNC(channel), i);
+ multichannel_change_current_channel(STATE_SYNC(channel), c);
return;
case MSG_BAD:
STATE_SYNC(error).msg_rcv_malformed++;
@@ -175,7 +175,7 @@ static int channel_stream(struct channel *m, const char *ptr, ssize_t remain)
}
/* handler for messages received */
-static int channel_handler_routine(struct channel *m, int i)
+static int channel_handler_routine(struct channel *m)
{
ssize_t numbytes;
ssize_t remain, pending = cur - __net;
@@ -242,7 +242,7 @@ static int channel_handler_routine(struct channel *m, int i)
HDR_NETWORK2HOST(net);
- do_channel_handler_step(i, net, remain);
+ do_channel_handler_step(m, net, remain);
ptr += net->len;
remain -= net->len;
}
@@ -250,12 +250,13 @@ static int channel_handler_routine(struct channel *m, int i)
}
/* handler for messages received */
-static void channel_handler(struct channel *m, int i)
+static void channel_handler(void *data)
{
+ struct channel *c = data;
int k;
for (k=0; k<CONFIG(event_iterations_limit); k++) {
- if (channel_handler_routine(m, i) == -1) {
+ if (channel_handler_routine(c) == -1) {
break;
}
}
@@ -284,7 +285,7 @@ static void interface_candidate(void)
dlog(LOG_ERR, "no dedicated links available!");
}
-static void interface_handler(void)
+static void interface_handler(void *data)
{
int idx = multichannel_get_current_ifindex(STATE_SYNC(channel));
unsigned int flags;
@@ -304,13 +305,60 @@ static void do_reset_cache_alarm(struct alarm_block *a, void *data)
* meanwhile the parent process handles events. */
if (fork_process_new(CTD_PROC_FLUSH, CTD_PROC_F_EXCL,
NULL, NULL) == 0) {
- nl_flush_conntrack_table(STATE(flush));
+ nl_flush_conntrack_table_selective();
exit(EXIT_SUCCESS);
}
/* this is not required if events don't get lost */
STATE(mode)->internal->ct.flush();
}
+static void commit_cb(void *data)
+{
+ int ret;
+
+ read_evfd(STATE_SYNC(commit).evfd);
+
+ ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0);
+ if (ret == 0) {
+ /* we still have things in the callback queue. */
+ if (STATE_SYNC(commit).rq[1].cb) {
+ int fd = STATE_SYNC(commit).clientfd;
+
+ STATE_SYNC(commit).rq[0].cb =
+ STATE_SYNC(commit).rq[1].cb;
+
+ STATE_SYNC(commit).rq[1].cb = NULL;
+
+ STATE_SYNC(commit).clientfd = -1;
+ STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, fd);
+ } else {
+ /* Close the client socket now, we're done. */
+ close(STATE_SYNC(commit).clientfd);
+ STATE_SYNC(commit).clientfd = -1;
+ }
+ }
+}
+
+static void channel_accept_cb(void *data)
+{
+ struct channel *c = data;
+ int fd;
+
+ fd = channel_accept(data);
+ if (fd < 0)
+ return;
+
+ register_fd(fd, channel_handler, c, STATE(fds));
+}
+
+static void tx_queue_cb(void *data)
+{
+ STATE_SYNC(sync)->xmit();
+
+ /* flush pending messages */
+ multichannel_send_flush(STATE_SYNC(channel));
+}
+
static int init_sync(void)
{
int i;
@@ -370,8 +418,19 @@ static int init_sync(void)
for (i=0; i<STATE_SYNC(channel)->channel_num; i++) {
int fd = channel_get_fd(STATE_SYNC(channel)->channel[i]);
fcntl(fd, F_SETFL, O_NONBLOCK);
- if (register_fd(fd, STATE(fds)) == -1)
- return -1;
+
+ switch(channel_type(STATE_SYNC(channel)->channel[i])) {
+ case CHANNEL_T_STREAM:
+ register_fd(fd, channel_accept_cb,
+ STATE_SYNC(channel)->channel[i],
+ STATE(fds));
+ break;
+ case CHANNEL_T_DATAGRAM:
+ register_fd(fd, channel_handler,
+ STATE_SYNC(channel)->channel[i],
+ STATE(fds));
+ break;
+ }
}
STATE_SYNC(interface) = nl_init_interface_handler();
@@ -379,7 +438,8 @@ static int init_sync(void)
dlog(LOG_ERR, "can't open interface watcher");
return -1;
}
- if (register_fd(nlif_fd(STATE_SYNC(interface)), STATE(fds)) == -1)
+ if (register_fd(nlif_fd(STATE_SYNC(interface)),
+ interface_handler, NULL, STATE(fds)) == -1)
return -1;
STATE_SYNC(tx_queue) = queue_create("txqueue", INT_MAX, QUEUE_F_EVFD);
@@ -387,8 +447,8 @@ static int init_sync(void)
dlog(LOG_ERR, "cannot create tx queue");
return -1;
}
- if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)),
- STATE(fds)) == -1)
+ if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)),
+ tx_queue_cb, NULL, STATE(fds)) == -1)
return -1;
STATE_SYNC(commit).h = nfct_open(CONFIG(netlink).subsys_id, 0);
@@ -404,7 +464,7 @@ static int init_sync(void)
return -1;
}
if (register_fd(get_read_evfd(STATE_SYNC(commit).evfd),
- STATE(fds)) == -1) {
+ commit_cb, NULL, STATE(fds)) == -1) {
return -1;
}
STATE_SYNC(commit).clientfd = -1;
@@ -417,61 +477,6 @@ static int init_sync(void)
return 0;
}
-static void channel_check(struct channel *c, int i, fd_set *readfds)
-{
- /* In case that this channel is connection-oriented. */
- if (channel_accept_isset(c, readfds))
- channel_accept(c);
-
- /* For data handling. */
- if (channel_isset(c, readfds))
- channel_handler(c, i);
-}
-
-static void run_sync(fd_set *readfds)
-{
- int i;
-
- for (i=0; i<STATE_SYNC(channel)->channel_num; i++)
- channel_check(STATE_SYNC(channel)->channel[i], i, readfds);
-
- if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds))
- STATE_SYNC(sync)->xmit();
-
- if (FD_ISSET(nlif_fd(STATE_SYNC(interface)), readfds))
- interface_handler();
-
- if (FD_ISSET(get_read_evfd(STATE_SYNC(commit).evfd), readfds)) {
- int ret;
-
- read_evfd(STATE_SYNC(commit).evfd);
-
- ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0);
- if (ret == 0) {
- /* we still have things in the callback queue. */
- if (STATE_SYNC(commit).rq[1].cb) {
- int fd = STATE_SYNC(commit).clientfd;
-
- STATE_SYNC(commit).rq[0].cb =
- STATE_SYNC(commit).rq[1].cb;
-
- STATE_SYNC(commit).rq[1].cb = NULL;
-
- STATE_SYNC(commit).clientfd = -1;
- STATE_SYNC(commit).rq[0].cb(
- STATE_SYNC(commit).h, fd);
- } else {
- /* Close the client socket now, we're done. */
- close(STATE_SYNC(commit).clientfd);
- STATE_SYNC(commit).clientfd = -1;
- }
- }
- }
-
- /* flush pending messages */
- multichannel_send_flush(STATE_SYNC(channel));
-}
-
static void kill_sync(void)
{
STATE(mode)->internal->close();
@@ -608,6 +613,12 @@ static int local_handler_sync(int fd, int type, void *data)
}
break;
case CT_FLUSH_CACHE:
+ /* if we're still committing, abort this command */
+ if (STATE_SYNC(commit).clientfd != -1) {
+ dlog(LOG_ERR, "ignoring flush command, "
+ "commit still in progress");
+ break;
+ }
/* inmediate flush, remove pending flush scheduled if any */
del_alarm(&STATE_SYNC(reset_cache_alarm));
dlog(LOG_NOTICE, "flushing caches");
@@ -621,12 +632,15 @@ static int local_handler_sync(int fd, int type, void *data)
STATE(mode)->internal->ct.flush();
break;
case CT_FLUSH_EXT_CACHE:
+ /* if we're still committing, abort this command */
+ if (STATE_SYNC(commit).clientfd != -1) {
+ dlog(LOG_ERR, "ignoring flush command, "
+ "commit still in progress");
+ break;
+ }
dlog(LOG_NOTICE, "flushing external cache");
STATE_SYNC(external)->ct.flush();
break;
- case KILL:
- killer(0);
- break;
case STATS:
STATE(mode)->internal->ct.stats(fd);
STATE_SYNC(external)->ct.stats(fd);
@@ -684,9 +698,15 @@ static int local_handler_sync(int fd, int type, void *data)
dlog(LOG_NOTICE, "committing expectation cache");
STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->exp.commit;
STATE_SYNC(commit).rq[1].cb = NULL;
- local_commit(fd);
+ ret = local_commit(fd);
break;
case ALL_FLUSH_CACHE:
+ /* if we're still committing, abort this command */
+ if (STATE_SYNC(commit).clientfd != -1) {
+ dlog(LOG_ERR, "ignoring flush command, "
+ "commit still in progress");
+ break;
+ }
dlog(LOG_NOTICE, "flushing caches");
STATE(mode)->internal->ct.flush();
STATE_SYNC(external)->ct.flush();
@@ -704,7 +724,7 @@ static int local_handler_sync(int fd, int type, void *data)
} else {
STATE_SYNC(commit).rq[1].cb = NULL;
}
- local_commit(fd);
+ ret = local_commit(fd);
break;
case EXP_DUMP_INT_XML:
if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
@@ -729,7 +749,6 @@ static int local_handler_sync(int fd, int type, void *data)
struct ct_mode sync_mode = {
.init = init_sync,
- .run = run_sync,
.local = local_handler_sync,
.kill = kill_sync,
/* the internal handler is set in run-time. */