summaryrefslogtreecommitdiff
path: root/src/sync-mode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-mode.c')
-rw-r--r--src/sync-mode.c265
1 files changed, 210 insertions, 55 deletions
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 3fa0d11..2505631 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -1,6 +1,7 @@
/*
- * (C) 2006-2007 by Pablo Neira Ayuso <pablo@netfilter.org>
- *
+ * (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org>
+ * (C) 2011 by Vyatta Inc. <http://www.vyatta.com>
+ *
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
@@ -40,11 +41,47 @@
#include <net/if.h>
#include <fcntl.h>
+static struct nf_conntrack *msg2ct_alloc(struct nethdr *net, size_t remain)
+{
+ struct nf_conntrack *ct;
+
+ /* TODO: add stats on ENOMEM errors in the future. */
+ ct = nfct_new();
+ if (ct == NULL)
+ return NULL;
+
+ if (msg2ct(ct, net, remain) == -1) {
+ STATE_SYNC(error).msg_rcv_malformed++;
+ STATE_SYNC(error).msg_rcv_bad_payload++;
+ nfct_destroy(ct);
+ return NULL;
+ }
+ return ct;
+}
+
+static struct nf_expect *msg2exp_alloc(struct nethdr *net, size_t remain)
+{
+ struct nf_expect *exp;
+
+ /* TODO: add stats on ENOMEM errors in the future. */
+ exp = nfexp_new();
+ if (exp == NULL)
+ return NULL;
+
+ if (msg2exp(exp, net, remain) == -1) {
+ STATE_SYNC(error).msg_rcv_malformed++;
+ STATE_SYNC(error).msg_rcv_bad_payload++;
+ nfexp_destroy(exp);
+ return NULL;
+ }
+ return exp;
+}
+
static void
do_channel_handler_step(int i, struct nethdr *net, size_t remain)
{
- char __ct[nfct_maxsize()];
- struct nf_conntrack *ct = (struct nf_conntrack *)(void*) __ct;
+ struct nf_conntrack *ct = NULL;
+ struct nf_expect *exp = NULL;
if (net->version != CONNTRACKD_PROTOCOL_VERSION) {
STATE_SYNC(error).msg_rcv_malformed++;
@@ -74,29 +111,53 @@ do_channel_handler_step(int i, struct nethdr *net, size_t remain)
STATE_SYNC(error).msg_rcv_bad_type++;
return;
}
- memset(ct, 0, sizeof(__ct));
-
- if (parse_payload(ct, net, remain) == -1) {
- STATE_SYNC(error).msg_rcv_malformed++;
- STATE_SYNC(error).msg_rcv_bad_payload++;
- return;
- }
switch(net->type) {
- case NET_T_STATE_NEW:
- STATE_SYNC(external)->new(ct);
+ case NET_T_STATE_CT_NEW:
+ ct = msg2ct_alloc(net, remain);
+ if (ct == NULL)
+ return;
+ STATE_SYNC(external)->ct.new(ct);
+ break;
+ case NET_T_STATE_CT_UPD:
+ ct = msg2ct_alloc(net, remain);
+ if (ct == NULL)
+ return;
+ STATE_SYNC(external)->ct.upd(ct);
break;
- case NET_T_STATE_UPD:
- STATE_SYNC(external)->update(ct);
+ case NET_T_STATE_CT_DEL:
+ ct = msg2ct_alloc(net, remain);
+ if (ct == NULL)
+ return;
+ STATE_SYNC(external)->ct.del(ct);
break;
- case NET_T_STATE_DEL:
- STATE_SYNC(external)->destroy(ct);
+ case NET_T_STATE_EXP_NEW:
+ exp = msg2exp_alloc(net, remain);
+ if (exp == NULL)
+ return;
+ STATE_SYNC(external)->exp.new(exp);
+ break;
+ case NET_T_STATE_EXP_UPD:
+ exp = msg2exp_alloc(net, remain);
+ if (exp == NULL)
+ return;
+ STATE_SYNC(external)->exp.upd(exp);
+ break;
+ case NET_T_STATE_EXP_DEL:
+ exp = msg2exp_alloc(net, remain);
+ if (exp == NULL)
+ return;
+ STATE_SYNC(external)->exp.del(exp);
break;
default:
STATE_SYNC(error).msg_rcv_malformed++;
STATE_SYNC(error).msg_rcv_bad_type++;
break;
}
+ if (ct != NULL)
+ nfct_destroy(ct);
+ if (exp != NULL)
+ nfexp_destroy(exp);
}
static char __net[65536]; /* XXX: maximum MTU for IPv4 */
@@ -247,7 +308,7 @@ static void do_reset_cache_alarm(struct alarm_block *a, void *data)
exit(EXIT_SUCCESS);
}
/* this is not required if events don't get lost */
- STATE(mode)->internal->flush();
+ STATE(mode)->internal->ct.flush();
}
static int init_sync(void)
@@ -330,7 +391,7 @@ static int init_sync(void)
STATE(fds)) == -1)
return -1;
- STATE_SYNC(commit).h = nfct_open(CONNTRACK, 0);
+ STATE_SYNC(commit).h = nfct_open(CONFIG(netlink).subsys_id, 0);
if (STATE_SYNC(commit).h == NULL) {
dlog(LOG_ERR, "can't create handler to commit");
return -1;
@@ -346,6 +407,7 @@ static int init_sync(void)
STATE(fds)) == -1) {
return -1;
}
+ STATE_SYNC(commit).clientfd = -1;
init_alarm(&STATE_SYNC(reset_cache_alarm), NULL, do_reset_cache_alarm);
@@ -380,8 +442,30 @@ static void run_sync(fd_set *readfds)
interface_handler();
if (FD_ISSET(get_read_evfd(STATE_SYNC(commit).evfd), readfds)) {
+ int ret;
+
read_evfd(STATE_SYNC(commit).evfd);
- STATE_SYNC(external)->commit(STATE_SYNC(commit).h, 0);
+
+ ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0);
+ if (ret == 0) {
+ /* we still have things in the callback queue. */
+ if (STATE_SYNC(commit).rq[1].cb) {
+ int fd = STATE_SYNC(commit).clientfd;
+
+ STATE_SYNC(commit).rq[0].cb =
+ STATE_SYNC(commit).rq[1].cb;
+
+ STATE_SYNC(commit).rq[1].cb = NULL;
+
+ STATE_SYNC(commit).clientfd = -1;
+ STATE_SYNC(commit).rq[0].cb(
+ STATE_SYNC(commit).h, fd);
+ } else {
+ /* Close the client socket now, we're done. */
+ close(STATE_SYNC(commit).clientfd);
+ STATE_SYNC(commit).clientfd = -1;
+ }
+ }
}
/* flush pending messages */
@@ -458,48 +542,62 @@ static void dump_stats_sync_extended(int fd)
send(fd, buf, size, 0);
}
+static int local_commit(int fd)
+{
+ int ret;
+
+ /* delete the reset alarm if any before committing */
+ del_alarm(&STATE_SYNC(reset_cache_alarm));
+
+ ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, fd);
+ if (ret == -1) {
+ dlog(LOG_NOTICE, "commit already in progress, skipping");
+ ret = LOCAL_RET_OK;
+ } else if (ret == 0) {
+ /* we've finished the commit. */
+ ret = LOCAL_RET_OK;
+ } else {
+ /* Keep open the client, we want synchronous commit. */
+ ret = LOCAL_RET_STOLEN;
+ }
+ return ret;
+}
+
/* handler for requests coming via UNIX socket */
static int local_handler_sync(int fd, int type, void *data)
{
int ret = LOCAL_RET_OK;
switch(type) {
- case DUMP_INTERNAL:
- ret = fork_process_new(CTD_PROC_ANY, 0, NULL, NULL);
- if (ret == 0) {
- STATE(mode)->internal->dump(fd, NFCT_O_PLAIN);
+ case CT_DUMP_INTERNAL:
+ if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
+ STATE(mode)->internal->ct.dump(fd, NFCT_O_PLAIN);
exit(EXIT_SUCCESS);
}
break;
- case DUMP_EXTERNAL:
- ret = fork_process_new(CTD_PROC_ANY, 0, NULL, NULL);
- if (ret == 0) {
- STATE_SYNC(external)->dump(fd, NFCT_O_PLAIN);
+ case CT_DUMP_EXTERNAL:
+ if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
+ STATE_SYNC(external)->ct.dump(fd, NFCT_O_PLAIN);
exit(EXIT_SUCCESS);
}
break;
- case DUMP_INT_XML:
- ret = fork_process_new(CTD_PROC_ANY, 0, NULL, NULL);
- if (ret == 0) {
- STATE(mode)->internal->dump(fd, NFCT_O_XML);
+ case CT_DUMP_INT_XML:
+ if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
+ STATE(mode)->internal->ct.dump(fd, NFCT_O_XML);
exit(EXIT_SUCCESS);
}
break;
- case DUMP_EXT_XML:
- ret = fork_process_new(CTD_PROC_ANY, 0, NULL, NULL);
- if (ret == 0) {
- STATE_SYNC(external)->dump(fd, NFCT_O_XML);
+ case CT_DUMP_EXT_XML:
+ if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
+ STATE_SYNC(external)->ct.dump(fd, NFCT_O_XML);
exit(EXIT_SUCCESS);
}
break;
- case COMMIT:
- /* delete the reset alarm if any before committing */
- del_alarm(&STATE_SYNC(reset_cache_alarm));
-
- dlog(LOG_NOTICE, "committing external cache");
- STATE_SYNC(external)->commit(STATE_SYNC(commit).h, fd);
- /* Keep the client socket open, we want synchronous commits. */
- ret = LOCAL_RET_STOLEN;
+ case CT_COMMIT:
+ dlog(LOG_NOTICE, "committing conntrack cache");
+ STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->ct.commit;
+ STATE_SYNC(commit).rq[1].cb = NULL;
+ ret = local_commit(fd);
break;
case RESET_TIMERS:
if (!alarm_pending(&STATE_SYNC(reset_cache_alarm))) {
@@ -509,29 +607,29 @@ static int local_handler_sync(int fd, int type, void *data)
CONFIG(purge_timeout), 0);
}
break;
- case FLUSH_CACHE:
+ case CT_FLUSH_CACHE:
/* inmediate flush, remove pending flush scheduled if any */
del_alarm(&STATE_SYNC(reset_cache_alarm));
dlog(LOG_NOTICE, "flushing caches");
- STATE(mode)->internal->flush();
- STATE_SYNC(external)->flush();
+ STATE(mode)->internal->ct.flush();
+ STATE_SYNC(external)->ct.flush();
break;
- case FLUSH_INT_CACHE:
+ case CT_FLUSH_INT_CACHE:
/* inmediate flush, remove pending flush scheduled if any */
del_alarm(&STATE_SYNC(reset_cache_alarm));
dlog(LOG_NOTICE, "flushing internal cache");
- STATE(mode)->internal->flush();
+ STATE(mode)->internal->ct.flush();
break;
- case FLUSH_EXT_CACHE:
+ case CT_FLUSH_EXT_CACHE:
dlog(LOG_NOTICE, "flushing external cache");
- STATE_SYNC(external)->flush();
+ STATE_SYNC(external)->ct.flush();
break;
case KILL:
killer(0);
break;
case STATS:
- STATE(mode)->internal->stats(fd);
- STATE_SYNC(external)->stats(fd);
+ STATE(mode)->internal->ct.stats(fd);
+ STATE_SYNC(external)->ct.stats(fd);
dump_traffic_stats(fd);
multichannel_stats(STATE_SYNC(channel), fd);
dump_stats_sync(fd);
@@ -541,8 +639,8 @@ static int local_handler_sync(int fd, int type, void *data)
multichannel_stats(STATE_SYNC(channel), fd);
break;
case STATS_CACHE:
- STATE(mode)->internal->stats_ext(fd);
- STATE_SYNC(external)->stats_ext(fd);
+ STATE(mode)->internal->ct.stats_ext(fd);
+ STATE_SYNC(external)->ct.stats_ext(fd);
break;
case STATS_LINK:
multichannel_stats_extended(STATE_SYNC(channel),
@@ -551,6 +649,63 @@ static int local_handler_sync(int fd, int type, void *data)
case STATS_QUEUE:
queue_stats_show(fd);
break;
+ case EXP_STATS:
+ if (!(CONFIG(flags) & CTD_EXPECT))
+ break;
+
+ STATE(mode)->internal->exp.stats(fd);
+ STATE_SYNC(external)->exp.stats(fd);
+ dump_traffic_stats(fd);
+ multichannel_stats(STATE_SYNC(channel), fd);
+ dump_stats_sync(fd);
+ break;
+ case EXP_DUMP_INTERNAL:
+ if (!(CONFIG(flags) & CTD_EXPECT))
+ break;
+
+ if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
+ STATE(mode)->internal->exp.dump(fd, NFCT_O_PLAIN);
+ exit(EXIT_SUCCESS);
+ }
+ break;
+ case EXP_DUMP_EXTERNAL:
+ if (!(CONFIG(flags) & CTD_EXPECT))
+ break;
+
+ if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
+ STATE_SYNC(external)->exp.dump(fd, NFCT_O_PLAIN);
+ exit(EXIT_SUCCESS);
+ }
+ break;
+ case EXP_COMMIT:
+ if (!(CONFIG(flags) & CTD_EXPECT))
+ break;
+
+ dlog(LOG_NOTICE, "committing expectation cache");
+ STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->exp.commit;
+ STATE_SYNC(commit).rq[1].cb = NULL;
+ local_commit(fd);
+ break;
+ case ALL_FLUSH_CACHE:
+ dlog(LOG_NOTICE, "flushing caches");
+ STATE(mode)->internal->ct.flush();
+ STATE_SYNC(external)->ct.flush();
+ if (CONFIG(flags) & CTD_EXPECT) {
+ STATE(mode)->internal->exp.flush();
+ STATE_SYNC(external)->exp.flush();
+ }
+ break;
+ case ALL_COMMIT:
+ dlog(LOG_NOTICE, "committing all external caches");
+ STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->ct.commit;
+ if (CONFIG(flags) & CTD_EXPECT) {
+ STATE_SYNC(commit).rq[1].cb =
+ STATE_SYNC(external)->exp.commit;
+ } else {
+ STATE_SYNC(commit).rq[1].cb = NULL;
+ }
+ local_commit(fd);
+ break;
default:
if (STATE_SYNC(sync)->local)
ret = STATE_SYNC(sync)->local(fd, type, data);