summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
author/C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org </C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org>2008-04-26 16:07:00 +0000
committer/C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org </C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org>2008-04-26 16:07:00 +0000
commit96213d5f0821aee2fe52459ab2cd54569e50cf85 (patch)
treeb3bb54fd34a8c9e2353d8e8cdf1572e0e1384bf3 /src
parent07a3a6fe92c98e251a464a5744421ce211030003 (diff)
downloadconntrack-tools-96213d5f0821aee2fe52459ab2cd54569e50cf85.tar.gz
conntrack-tools-96213d5f0821aee2fe52459ab2cd54569e50cf85.zip
rework of the FT-FW approach
Diffstat (limited to 'src')
-rw-r--r--src/network.c43
-rw-r--r--src/queue.c2
-rw-r--r--src/sync-ftfw.c237
-rw-r--r--src/sync-mode.c46
4 files changed, 233 insertions, 95 deletions
diff --git a/src/network.c b/src/network.c
index 92999a1..d7ab415 100644
--- a/src/network.c
+++ b/src/network.c
@@ -33,13 +33,14 @@ static size_t __do_send(struct mcast_sock *m, void *data, size_t len)
#undef _TEST_DROP
#ifdef _TEST_DROP
- static int drop = 0;
- if (++drop >= 10) {
+#define DROP_RATE .25
+
+ /* simulate message omission with a certain probability */
+ if ((random() & 0x7FFFFFFF) < 0x80000000 * DROP_RATE) {
printf("drop sq: %u fl:%u len:%u\n",
ntohl(net->seq), ntohs(net->flags),
ntohs(net->len));
- drop = 0;
return 0;
}
#endif
@@ -57,7 +58,6 @@ static size_t __do_prepare(struct mcast_sock *m, void *data, size_t len)
if (!seq_set) {
seq_set = 1;
cur_seq = time(NULL);
- net->flags |= NET_F_HELLO;
}
net->len = len;
net->seq = cur_seq++;
@@ -181,9 +181,6 @@ int handle_netmsg(struct nethdr *net)
HDR_NETWORK2HOST(net);
- if (IS_HELLO(net))
- STATE_SYNC(last_seq_recv) = net->seq - 1;
-
if (IS_CTL(net))
return 0;
@@ -198,37 +195,51 @@ int handle_netmsg(struct nethdr *net)
return 0;
}
+static int local_seq_set = 0;
+
+/* this function only tracks, it does not update the last sequence received */
int mcast_track_seq(uint32_t seq, uint32_t *exp_seq)
{
- static int local_seq_set = 0;
- int ret = 1;
+ int ret = SEQ_UNKNOWN;
/* netlink sequence tracking initialization */
if (!local_seq_set) {
- local_seq_set = 1;
+ ret = SEQ_UNSET;
goto out;
}
/* fast path: we received the correct sequence */
- if (seq == STATE_SYNC(last_seq_recv)+1)
+ if (seq == STATE_SYNC(last_seq_recv)+1) {
+ ret = SEQ_IN_SYNC;
goto out;
+ }
/* out of sequence: some messages got lost */
if (after(seq, STATE_SYNC(last_seq_recv)+1)) {
STATE_SYNC(packets_lost) += seq-STATE_SYNC(last_seq_recv)+1;
- ret = 0;
+ ret = SEQ_AFTER;
goto out;
}
/* out of sequence: replayed/delayed packet? */
if (before(seq, STATE_SYNC(last_seq_recv)+1))
- dlog(LOG_WARNING, "delayed packet? exp=%u rcv=%u",
- STATE_SYNC(last_seq_recv)+1, seq);
+ ret = SEQ_BEFORE;
out:
*exp_seq = STATE_SYNC(last_seq_recv)+1;
- /* update expected sequence */
- STATE_SYNC(last_seq_recv) = seq;
return ret;
}
+
+void mcast_track_update_seq(uint32_t seq)
+{
+ if (!local_seq_set)
+ local_seq_set = 1;
+
+ STATE_SYNC(last_seq_recv) = seq;
+}
+
+int mcast_track_is_seq_set()
+{
+ return local_seq_set;
+}
diff --git a/src/queue.c b/src/queue.c
index 7b20e83..cdd70ae 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -93,7 +93,7 @@ retry:
goto err;
}
- list_add(&n->head, &b->head);
+ list_add_tail(&n->head, &b->head);
b->cur_size += size;
b->num_elems++;
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c
index cac25d0..0b98513 100644
--- a/src/sync-ftfw.c
+++ b/src/sync-ftfw.c
@@ -34,11 +34,26 @@
#define dp(...)
#endif
+#if 0
+#define dprint printf
+#else
+#define dprint(...)
+#endif
+
static LIST_HEAD(rs_list);
static LIST_HEAD(tx_list);
+static unsigned int rs_list_len;
static unsigned int tx_list_len;
static struct queue *rs_queue;
static struct queue *tx_queue;
+static uint32_t exp_seq;
+static uint32_t window;
+static uint32_t ack_from;
+static int ack_from_set = 0;
+static struct alarm_block alive_alarm;
+
+/* XXX: alive message expiration configurable */
+#define ALIVE_INT 1
struct cache_ftfw {
struct list_head rs_list;
@@ -64,6 +79,7 @@ static void cache_ftfw_del(struct us_conntrack *u, void *data)
/* no need for list_del_init since the entry is destroyed */
list_del(&cn->rs_list);
+ rs_list_len--;
}
static struct cache_extra cache_ftfw_extra = {
@@ -83,15 +99,57 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
queue_add(tx_queue, &ack, NETHDR_ACK_SIZ);
}
-static struct alarm_block alive_alarm;
+static void ftfw_run(void);
+/* this function is called from the alarm framework */
static void do_alive_alarm(struct alarm_block *a, void *data)
{
- tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
+ if (ack_from_set && mcast_track_is_seq_set()) {
+ /* exp_seq contains the last update received */
+ dprint("send ALIVE ACK (from=%u, to=%u)\n",
+ ack_from, STATE_SYNC(last_seq_recv));
+ tx_queue_add_ctlmsg(NET_F_ACK,
+ ack_from,
+ STATE_SYNC(last_seq_recv));
+ ack_from_set = 0;
+ } else
+ tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
+
+ /* TODO: no need for buffered send, extracted from run_sync() */
+ ftfw_run();
+ mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
+}
- add_alarm(&alive_alarm, 1, 0);
+#undef _SIGNAL_DEBUG
+#ifdef _SIGNAL_DEBUG
+
+static int rs_dump(void *data1, const void *data2)
+{
+ struct nethdr_ack *net = data1;
+
+ dprint("in RS queue -> seq:%u flags:%u\n", net->seq, net->flags);
+
+ return 0;
}
+#include <signal.h>
+
+static void my_dump(int foo)
+{
+ struct cache_ftfw *cn, *tmp;
+
+ list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
+ struct us_conntrack *u;
+
+ u = cache_get_conntrack(STATE_SYNC(internal), cn);
+ dprint("in RS list -> seq:%u\n", cn->seq);
+ }
+
+ queue_iterate(rs_queue, NULL, rs_dump);
+}
+
+#endif
+
static int ftfw_init(void)
{
tx_queue = queue_create(CONFIG(resend_queue_size));
@@ -106,12 +164,15 @@ static int ftfw_init(void)
return -1;
}
- INIT_LIST_HEAD(&tx_list);
- INIT_LIST_HEAD(&rs_list);
-
- /* XXX: alive message expiration configurable */
init_alarm(&alive_alarm, NULL, do_alive_alarm);
- add_alarm(&alive_alarm, 1, 0);
+ add_alarm(&alive_alarm, ALIVE_INT, 0);
+
+ /* set ack window size */
+ window = CONFIG(window_size);
+
+#ifdef _SIGNAL_DEBUG
+ signal(SIGUSR1, my_dump);
+#endif
return 0;
}
@@ -128,7 +189,7 @@ static int do_cache_to_tx(void *data1, void *data2)
struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), u);
/* add to tx list */
- list_add(&cn->tx_list, &tx_list);
+ list_add_tail(&cn->tx_list, &tx_list);
tx_list_len++;
return 0;
@@ -157,13 +218,14 @@ static int ftfw_local(int fd, int type, void *data)
static int rs_queue_to_tx(void *data1, const void *data2)
{
- struct nethdr *net = data1;
+ struct nethdr_ack *net = data1;
const struct nethdr_ack *nack = data2;
if (between(net->seq, nack->from, nack->to)) {
dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
net->seq, net->flags, net->len);
queue_add(tx_queue, net, net->len);
+ queue_del(rs_queue, net);
}
return 0;
}
@@ -182,18 +244,20 @@ static int rs_queue_empty(void *data1, const void *data2)
static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to)
{
- struct cache_ftfw *cn;
+ struct cache_ftfw *cn, *tmp;
- list_for_each_entry(cn, &rs_list, rs_list) {
+ list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
struct us_conntrack *u;
u = cache_get_conntrack(STATE_SYNC(internal), cn);
if (between(cn->seq, from, to)) {
dp("resending nack'ed (oldseq=%u)\n", cn->seq);
- list_add(&cn->tx_list, &tx_list);
+ list_del_init(&cn->rs_list);
+ rs_list_len--;
+ list_add_tail(&cn->tx_list, &tx_list);
tx_list_len++;
- }
- }
+ }
+ }
}
static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
@@ -207,54 +271,113 @@ static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
if (between(cn->seq, from, to)) {
dp("queue: deleting from queue (seq=%u)\n", cn->seq);
list_del_init(&cn->rs_list);
- }
+ rs_list_len--;
+ }
}
}
-static int ftfw_recv(const struct nethdr *net)
+static int digest_msg(const struct nethdr *net)
{
- static unsigned int window = 0;
- unsigned int exp_seq;
+ if (IS_DATA(net))
+ return MSG_DATA;
- if (window == 0)
- window = CONFIG(window_size);
+ else if (IS_ACK(net)) {
+ const struct nethdr_ack *h = (const struct nethdr_ack *) net;
- if (!mcast_track_seq(net->seq, &exp_seq)) {
- dp("OOS: sending nack (seq=%u)\n", exp_seq);
- tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
- window = CONFIG(window_size);
- } else {
- /* received a window, send an acknowledgement */
- if (--window == 0) {
- dp("sending ack (seq=%u)\n", net->seq);
- tx_queue_add_ctlmsg(NET_F_ACK,
- net->seq - CONFIG(window_size),
- net->seq);
- }
- }
+ dprint("ACK(%u): from seq=%u to seq=%u\n",
+ h->seq, h->from, h->to);
+ rs_list_empty(STATE_SYNC(internal), h->from, h->to);
+ queue_iterate(rs_queue, h, rs_queue_empty);
+ return MSG_CTL;
- if (IS_NACK(net)) {
+ } else if (IS_NACK(net)) {
const struct nethdr_ack *nack = (const struct nethdr_ack *) net;
- dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to);
+ dprint("NACK(%u): from seq=%u to seq=%u\n",
+ nack->seq, nack->from, nack->to);
rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to);
queue_iterate(rs_queue, nack, rs_queue_to_tx);
- return 1;
+ return MSG_CTL;
+
} else if (IS_RESYNC(net)) {
dp("RESYNC ALL\n");
cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx);
- return 1;
- } else if (IS_ACK(net)) {
- const struct nethdr_ack *h = (const struct nethdr_ack *) net;
+ return MSG_CTL;
- dp("ACK: from seq=%u to seq=%u\n", h->from, h->to);
- rs_list_empty(STATE_SYNC(internal), h->from, h->to);
- queue_iterate(rs_queue, h, rs_queue_empty);
- return 1;
} else if (IS_ALIVE(net))
- return 1;
+ return MSG_CTL;
- return 0;
+ return MSG_BAD;
+}
+
+static int ftfw_recv(const struct nethdr *net)
+{
+ int ret = MSG_DATA;
+
+ switch (mcast_track_seq(net->seq, &exp_seq)) {
+ case SEQ_AFTER:
+ ret = digest_msg(net);
+ if (ret == MSG_BAD) {
+ ret = MSG_BAD;
+ goto out;
+ }
+
+ if (ack_from_set) {
+ tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1);
+ dprint("OFS send half ACK: from seq=%u to seq=%u\n",
+ ack_from, exp_seq-1);
+ ack_from_set = 0;
+ }
+
+ tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
+ dprint("OFS send NACK: from seq=%u to seq=%u\n",
+ exp_seq, net->seq-1);
+
+ /* count this message as part of the new window */
+ window = CONFIG(window_size) - 1;
+ ack_from = net->seq;
+ ack_from_set = 1;
+ break;
+
+ case SEQ_BEFORE:
+ /* we don't accept delayed packets */
+ dlog(LOG_WARNING, "Received seq=%u before expected seq=%u",
+ net->seq, exp_seq);
+ dlog(LOG_WARNING, "Probably the other node has come back"
+ "to life but you forgot to add "
+ "conntrackd -r to your scripts");
+ ret = MSG_DROP;
+ break;
+
+ case SEQ_UNSET:
+ case SEQ_IN_SYNC:
+ ret = digest_msg(net);
+ if (ret == MSG_BAD) {
+ ret = MSG_BAD;
+ goto out;
+ }
+
+ if (!ack_from_set) {
+ ack_from_set = 1;
+ ack_from = net->seq;
+ }
+
+ if (--window <= 0) {
+ /* received a window, send an acknowledgement */
+ dprint("OFS send ACK: from seq=%u to seq=%u\n",
+ ack_from, net->seq);
+
+ tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq);
+ window = CONFIG(window_size);
+ ack_from_set = 0;
+ }
+ }
+
+out:
+ if ((ret == MSG_DATA || ret == MSG_CTL))
+ mcast_track_update_seq(net->seq);
+
+ return ret;
}
static void ftfw_send(struct nethdr *net, struct us_conntrack *u)
@@ -270,11 +393,14 @@ static void ftfw_send(struct nethdr *net, struct us_conntrack *u)
cn = (struct cache_ftfw *)
cache_get_extra(STATE_SYNC(internal), u);
- if (!list_empty(&cn->rs_list))
- list_del(&cn->rs_list);
+ if (!list_empty(&cn->rs_list)) {
+ list_del_init(&cn->rs_list);
+ rs_list_len--;
+ }
cn->seq = net->seq;
- list_add(&cn->rs_list, &rs_list);
+ list_add_tail(&cn->rs_list, &rs_list);
+ rs_list_len++;
break;
case NFCT_Q_DESTROY:
queue_add(rs_queue, net, net->len);
@@ -294,7 +420,7 @@ static int tx_queue_xmit(void *data1, const void *data2)
HDR_NETWORK2HOST(net);
if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) {
- dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n",
+ dprint("tx_queue -> to_rs_queue sq: %u fl:%u len:%u\n",
net->seq, net->flags, net->len);
queue_add(rs_queue, net, net->len);
}
@@ -317,8 +443,7 @@ static int tx_list_xmit(struct list_head *i, struct us_conntrack *u)
tx_list_len--;
ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
- if (STATE_SYNC(sync)->send)
- STATE_SYNC(sync)->send(net, u);
+ ftfw_send(net, u);
return ret;
}
@@ -337,6 +462,14 @@ static void ftfw_run(void)
u = cache_get_conntrack(STATE_SYNC(internal), cn);
tx_list_xmit(&cn->tx_list, u);
}
+
+ /* reset alive alarm */
+ add_alarm(&alive_alarm, 1, 0);
+
+ dprint("tx_list_len:%u tx_queue_len:%u "
+ "rs_list_len: %u rs_queue_len:%u\n",
+ tx_list_len, queue_len(tx_queue),
+ rs_list_len, queue_len(rs_queue));
}
struct sync_mode sync_ftfw = {
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 3851e4a..cbb4769 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -42,8 +42,18 @@ static void do_mcast_handler_step(struct nethdr *net)
struct nf_conntrack *ct = (struct nf_conntrack *)(void*) __ct;
struct us_conntrack *u;
- if (STATE_SYNC(sync)->recv(net))
- return;
+ switch (STATE_SYNC(sync)->recv(net)) {
+ case MSG_DATA:
+ break;
+ case MSG_DROP:
+ case MSG_CTL:
+ return;
+ case MSG_BAD:
+ STATE(malformed)++;
+ return;
+ default:
+ break;
+ }
memset(ct, 0, sizeof(__ct));
@@ -211,14 +221,15 @@ static int register_fds_sync(struct fds *fds)
static void run_sync(fd_set *readfds)
{
/* multicast packet has been received */
- if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds))
+ if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds)) {
mcast_handler();
- if (STATE_SYNC(sync)->run)
- STATE_SYNC(sync)->run();
+ if (STATE_SYNC(sync)->run)
+ STATE_SYNC(sync)->run();
- /* flush pending messages */
- mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
+ /* flush pending messages */
+ mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
+ }
}
static void kill_sync(void)
@@ -358,16 +369,8 @@ static int purge_step(void *data1, void *data2)
ret = nfct_query(h, NFCT_Q_GET, u->ct);
if (ret == -1 && errno == ENOENT) {
- size_t len;
- struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_DESTROY);
-
debug_ct(u->ct, "overrun purge resync");
-
- len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
- mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
- if (STATE_SYNC(sync)->send)
- STATE_SYNC(sync)->send(net, u);
-
+ mcast_send_sync(u, u->ct, NFCT_Q_DESTROY);
cache_del(STATE_SYNC(internal), u->ct);
}
@@ -402,16 +405,8 @@ static int overrun_sync(enum nf_conntrack_msg_type type,
if (!cache_test(STATE_SYNC(internal), ct)) {
if ((u = cache_update_force(STATE_SYNC(internal), ct))) {
- size_t len;
-
debug_ct(u->ct, "overrun resync");
-
- struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE);
- len = prepare_send_netmsg(STATE_SYNC(mcast_client),net);
- mcast_buffered_send_netmsg(STATE_SYNC(mcast_client),
- net, len);
- if (STATE_SYNC(sync)->send)
- STATE_SYNC(sync)->send(net, u);
+ mcast_send_sync(u, u->ct, NFCT_Q_UPDATE);
}
}
@@ -437,7 +432,6 @@ retry:
} else {
if (errno == EEXIST) {
cache_del(STATE_SYNC(internal), ct);
- mcast_send_sync(NULL, ct, NFCT_Q_DESTROY);
goto retry;
}