diff options
author | /C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org </C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org> | 2008-04-26 16:07:00 +0000 |
---|---|---|
committer | /C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org </C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org> | 2008-04-26 16:07:00 +0000 |
commit | 96213d5f0821aee2fe52459ab2cd54569e50cf85 (patch) | |
tree | b3bb54fd34a8c9e2353d8e8cdf1572e0e1384bf3 /src/sync-ftfw.c | |
parent | 07a3a6fe92c98e251a464a5744421ce211030003 (diff) | |
download | conntrack-tools-96213d5f0821aee2fe52459ab2cd54569e50cf85.tar.gz conntrack-tools-96213d5f0821aee2fe52459ab2cd54569e50cf85.zip |
rework of the FT-FW approach
Diffstat (limited to 'src/sync-ftfw.c')
-rw-r--r-- | src/sync-ftfw.c | 237 |
1 files changed, 185 insertions, 52 deletions
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index cac25d0..0b98513 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -34,11 +34,26 @@ #define dp(...) #endif +#if 0 +#define dprint printf +#else +#define dprint(...) +#endif + static LIST_HEAD(rs_list); static LIST_HEAD(tx_list); +static unsigned int rs_list_len; static unsigned int tx_list_len; static struct queue *rs_queue; static struct queue *tx_queue; +static uint32_t exp_seq; +static uint32_t window; +static uint32_t ack_from; +static int ack_from_set = 0; +static struct alarm_block alive_alarm; + +/* XXX: alive message expiration configurable */ +#define ALIVE_INT 1 struct cache_ftfw { struct list_head rs_list; @@ -64,6 +79,7 @@ static void cache_ftfw_del(struct us_conntrack *u, void *data) /* no need for list_del_init since the entry is destroyed */ list_del(&cn->rs_list); + rs_list_len--; } static struct cache_extra cache_ftfw_extra = { @@ -83,15 +99,57 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) queue_add(tx_queue, &ack, NETHDR_ACK_SIZ); } -static struct alarm_block alive_alarm; +static void ftfw_run(void); +/* this function is called from the alarm framework */ static void do_alive_alarm(struct alarm_block *a, void *data) { - tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0); + if (ack_from_set && mcast_track_is_seq_set()) { + /* exp_seq contains the last update received */ + dprint("send ALIVE ACK (from=%u, to=%u)\n", + ack_from, STATE_SYNC(last_seq_recv)); + tx_queue_add_ctlmsg(NET_F_ACK, + ack_from, + STATE_SYNC(last_seq_recv)); + ack_from_set = 0; + } else + tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0); + + /* TODO: no need for buffered send, extracted from run_sync() */ + ftfw_run(); + mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client)); +} - add_alarm(&alive_alarm, 1, 0); +#undef _SIGNAL_DEBUG +#ifdef _SIGNAL_DEBUG + +static int rs_dump(void *data1, const void *data2) +{ + struct nethdr_ack *net = data1; + + dprint("in RS queue -> seq:%u flags:%u\n", net->seq, net->flags); + + return 0; } +#include <signal.h> + +static void my_dump(int foo) +{ + struct cache_ftfw *cn, *tmp; + + list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { + struct us_conntrack *u; + + u = cache_get_conntrack(STATE_SYNC(internal), cn); + dprint("in RS list -> seq:%u\n", cn->seq); + } + + queue_iterate(rs_queue, NULL, rs_dump); +} + +#endif + static int ftfw_init(void) { tx_queue = queue_create(CONFIG(resend_queue_size)); @@ -106,12 +164,15 @@ static int ftfw_init(void) return -1; } - INIT_LIST_HEAD(&tx_list); - INIT_LIST_HEAD(&rs_list); - - /* XXX: alive message expiration configurable */ init_alarm(&alive_alarm, NULL, do_alive_alarm); - add_alarm(&alive_alarm, 1, 0); + add_alarm(&alive_alarm, ALIVE_INT, 0); + + /* set ack window size */ + window = CONFIG(window_size); + +#ifdef _SIGNAL_DEBUG + signal(SIGUSR1, my_dump); +#endif return 0; } @@ -128,7 +189,7 @@ static int do_cache_to_tx(void *data1, void *data2) struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), u); /* add to tx list */ - list_add(&cn->tx_list, &tx_list); + list_add_tail(&cn->tx_list, &tx_list); tx_list_len++; return 0; @@ -157,13 +218,14 @@ static int ftfw_local(int fd, int type, void *data) static int rs_queue_to_tx(void *data1, const void *data2) { - struct nethdr *net = data1; + struct nethdr_ack *net = data1; const struct nethdr_ack *nack = data2; if (between(net->seq, nack->from, nack->to)) { dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", net->seq, net->flags, net->len); queue_add(tx_queue, net, net->len); + queue_del(rs_queue, net); } return 0; } @@ -182,18 +244,20 @@ static int rs_queue_empty(void *data1, const void *data2) static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to) { - struct cache_ftfw *cn; + struct cache_ftfw *cn, *tmp; - list_for_each_entry(cn, &rs_list, rs_list) { + list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { struct us_conntrack *u; u = cache_get_conntrack(STATE_SYNC(internal), cn); if (between(cn->seq, from, to)) { dp("resending nack'ed (oldseq=%u)\n", cn->seq); - list_add(&cn->tx_list, &tx_list); + list_del_init(&cn->rs_list); + rs_list_len--; + list_add_tail(&cn->tx_list, &tx_list); tx_list_len++; - } - } + } + } } static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to) @@ -207,54 +271,113 @@ static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to) if (between(cn->seq, from, to)) { dp("queue: deleting from queue (seq=%u)\n", cn->seq); list_del_init(&cn->rs_list); - } + rs_list_len--; + } } } -static int ftfw_recv(const struct nethdr *net) +static int digest_msg(const struct nethdr *net) { - static unsigned int window = 0; - unsigned int exp_seq; + if (IS_DATA(net)) + return MSG_DATA; - if (window == 0) - window = CONFIG(window_size); + else if (IS_ACK(net)) { + const struct nethdr_ack *h = (const struct nethdr_ack *) net; - if (!mcast_track_seq(net->seq, &exp_seq)) { - dp("OOS: sending nack (seq=%u)\n", exp_seq); - tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1); - window = CONFIG(window_size); - } else { - /* received a window, send an acknowledgement */ - if (--window == 0) { - dp("sending ack (seq=%u)\n", net->seq); - tx_queue_add_ctlmsg(NET_F_ACK, - net->seq - CONFIG(window_size), - net->seq); - } - } + dprint("ACK(%u): from seq=%u to seq=%u\n", + h->seq, h->from, h->to); + rs_list_empty(STATE_SYNC(internal), h->from, h->to); + queue_iterate(rs_queue, h, rs_queue_empty); + return MSG_CTL; - if (IS_NACK(net)) { + } else if (IS_NACK(net)) { const struct nethdr_ack *nack = (const struct nethdr_ack *) net; - dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to); + dprint("NACK(%u): from seq=%u to seq=%u\n", + nack->seq, nack->from, nack->to); rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to); queue_iterate(rs_queue, nack, rs_queue_to_tx); - return 1; + return MSG_CTL; + } else if (IS_RESYNC(net)) { dp("RESYNC ALL\n"); cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx); - return 1; - } else if (IS_ACK(net)) { - const struct nethdr_ack *h = (const struct nethdr_ack *) net; + return MSG_CTL; - dp("ACK: from seq=%u to seq=%u\n", h->from, h->to); - rs_list_empty(STATE_SYNC(internal), h->from, h->to); - queue_iterate(rs_queue, h, rs_queue_empty); - return 1; } else if (IS_ALIVE(net)) - return 1; + return MSG_CTL; - return 0; + return MSG_BAD; +} + +static int ftfw_recv(const struct nethdr *net) +{ + int ret = MSG_DATA; + + switch (mcast_track_seq(net->seq, &exp_seq)) { + case SEQ_AFTER: + ret = digest_msg(net); + if (ret == MSG_BAD) { + ret = MSG_BAD; + goto out; + } + + if (ack_from_set) { + tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1); + dprint("OFS send half ACK: from seq=%u to seq=%u\n", + ack_from, exp_seq-1); + ack_from_set = 0; + } + + tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1); + dprint("OFS send NACK: from seq=%u to seq=%u\n", + exp_seq, net->seq-1); + + /* count this message as part of the new window */ + window = CONFIG(window_size) - 1; + ack_from = net->seq; + ack_from_set = 1; + break; + + case SEQ_BEFORE: + /* we don't accept delayed packets */ + dlog(LOG_WARNING, "Received seq=%u before expected seq=%u", + net->seq, exp_seq); + dlog(LOG_WARNING, "Probably the other node has come back" + "to life but you forgot to add " + "conntrackd -r to your scripts"); + ret = MSG_DROP; + break; + + case SEQ_UNSET: + case SEQ_IN_SYNC: + ret = digest_msg(net); + if (ret == MSG_BAD) { + ret = MSG_BAD; + goto out; + } + + if (!ack_from_set) { + ack_from_set = 1; + ack_from = net->seq; + } + + if (--window <= 0) { + /* received a window, send an acknowledgement */ + dprint("OFS send ACK: from seq=%u to seq=%u\n", + ack_from, net->seq); + + tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq); + window = CONFIG(window_size); + ack_from_set = 0; + } + } + +out: + if ((ret == MSG_DATA || ret == MSG_CTL)) + mcast_track_update_seq(net->seq); + + return ret; } static void ftfw_send(struct nethdr *net, struct us_conntrack *u) @@ -270,11 +393,14 @@ static void ftfw_send(struct nethdr *net, struct us_conntrack *u) cn = (struct cache_ftfw *) cache_get_extra(STATE_SYNC(internal), u); - if (!list_empty(&cn->rs_list)) - list_del(&cn->rs_list); + if (!list_empty(&cn->rs_list)) { + list_del_init(&cn->rs_list); + rs_list_len--; + } cn->seq = net->seq; - list_add(&cn->rs_list, &rs_list); + list_add_tail(&cn->rs_list, &rs_list); + rs_list_len++; break; case NFCT_Q_DESTROY: queue_add(rs_queue, net, net->len); @@ -294,7 +420,7 @@ static int tx_queue_xmit(void *data1, const void *data2) HDR_NETWORK2HOST(net); if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) { - dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n", + dprint("tx_queue -> to_rs_queue sq: %u fl:%u len:%u\n", net->seq, net->flags, net->len); queue_add(rs_queue, net, net->len); } @@ -317,8 +443,7 @@ static int tx_list_xmit(struct list_head *i, struct us_conntrack *u) tx_list_len--; ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); - if (STATE_SYNC(sync)->send) - STATE_SYNC(sync)->send(net, u); + ftfw_send(net, u); return ret; } @@ -337,6 +462,14 @@ static void ftfw_run(void) u = cache_get_conntrack(STATE_SYNC(internal), cn); tx_list_xmit(&cn->tx_list, u); } + + /* reset alive alarm */ + add_alarm(&alive_alarm, 1, 0); + + dprint("tx_list_len:%u tx_queue_len:%u " + "rs_list_len: %u rs_queue_len:%u\n", + tx_list_len, queue_len(tx_queue), + rs_list_len, queue_len(rs_queue)); } struct sync_mode sync_ftfw = { |