summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog1
-rw-r--r--include/network.h38
-rw-r--r--src/network.c43
-rw-r--r--src/queue.c2
-rw-r--r--src/sync-ftfw.c237
-rw-r--r--src/sync-mode.c46
6 files changed, 257 insertions, 110 deletions
diff --git a/ChangeLog b/ChangeLog
index 045988a..db11bf3 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -24,6 +24,7 @@ o minor cleanups
o fix asymmetric path support (reported by Gary Richards)
o improve netlink overrun handling
o add more verbose error notification when we fail to inject a conntrack
+o rework of the FT-FW approach
version 0.9.6 (2008/03/08)
------------------------------
diff --git a/include/network.h b/include/network.h
index e4ebec4..0fa7b71 100644
--- a/include/network.h
+++ b/include/network.h
@@ -26,20 +26,18 @@ struct nethdr_ack {
#define NETHDR_ACK_SIZ sizeof(struct nethdr_ack)
enum {
- NET_F_HELLO_BIT = 0,
- NET_F_HELLO = (1 << NET_F_HELLO_BIT),
-
- NET_F_RESYNC_BIT = 1,
- NET_F_RESYNC = (1 << NET_F_RESYNC_BIT),
-
- NET_F_NACK_BIT = 2,
- NET_F_NACK = (1 << NET_F_NACK_BIT),
-
- NET_F_ACK_BIT = 3,
- NET_F_ACK = (1 << NET_F_ACK_BIT),
+ NET_F_UNUSED = (1 << 0),
+ NET_F_RESYNC = (1 << 1),
+ NET_F_NACK = (1 << 2),
+ NET_F_ACK = (1 << 3),
+ NET_F_ALIVE = (1 << 4),
+};
- NET_F_ALIVE_BIT = 4,
- NET_F_ALIVE = (1 << NET_F_ALIVE_BIT),
+enum {
+ MSG_DATA,
+ MSG_CTL,
+ MSG_DROP,
+ MSG_BAD,
};
#define BUILD_NETMSG(ct, query) \
@@ -57,7 +55,18 @@ void build_netmsg(struct nf_conntrack *ct, int query, struct nethdr *net);
size_t prepare_send_netmsg(struct mcast_sock *m, void *data);
int mcast_send_netmsg(struct mcast_sock *m, void *data);
int handle_netmsg(struct nethdr *net);
+
+enum {
+ SEQ_UNKNOWN,
+ SEQ_UNSET,
+ SEQ_IN_SYNC,
+ SEQ_AFTER,
+ SEQ_BEFORE,
+};
+
int mcast_track_seq(uint32_t seq, uint32_t *exp_seq);
+void mcast_track_update_seq(uint32_t seq);
+int mcast_track_is_seq_set(void);
struct mcast_conf;
@@ -66,13 +75,12 @@ void mcast_buffered_destroy(void);
int mcast_buffered_send_netmsg(struct mcast_sock *m, void *data, size_t len);
ssize_t mcast_buffered_pending_netmsg(struct mcast_sock *m);
-#define IS_DATA(x) ((x->flags & ~NET_F_HELLO) == 0)
+#define IS_DATA(x) (x->flags == 0)
#define IS_ACK(x) (x->flags & NET_F_ACK)
#define IS_NACK(x) (x->flags & NET_F_NACK)
#define IS_RESYNC(x) (x->flags & NET_F_RESYNC)
#define IS_ALIVE(x) (x->flags & NET_F_ALIVE)
#define IS_CTL(x) IS_ACK(x) || IS_NACK(x) || IS_RESYNC(x) || IS_ALIVE(x)
-#define IS_HELLO(x) (x->flags & NET_F_HELLO)
#define HDR_NETWORK2HOST(x) \
({ \
diff --git a/src/network.c b/src/network.c
index 92999a1..d7ab415 100644
--- a/src/network.c
+++ b/src/network.c
@@ -33,13 +33,14 @@ static size_t __do_send(struct mcast_sock *m, void *data, size_t len)
#undef _TEST_DROP
#ifdef _TEST_DROP
- static int drop = 0;
- if (++drop >= 10) {
+#define DROP_RATE .25
+
+ /* simulate message omission with a certain probability */
+ if ((random() & 0x7FFFFFFF) < 0x80000000 * DROP_RATE) {
printf("drop sq: %u fl:%u len:%u\n",
ntohl(net->seq), ntohs(net->flags),
ntohs(net->len));
- drop = 0;
return 0;
}
#endif
@@ -57,7 +58,6 @@ static size_t __do_prepare(struct mcast_sock *m, void *data, size_t len)
if (!seq_set) {
seq_set = 1;
cur_seq = time(NULL);
- net->flags |= NET_F_HELLO;
}
net->len = len;
net->seq = cur_seq++;
@@ -181,9 +181,6 @@ int handle_netmsg(struct nethdr *net)
HDR_NETWORK2HOST(net);
- if (IS_HELLO(net))
- STATE_SYNC(last_seq_recv) = net->seq - 1;
-
if (IS_CTL(net))
return 0;
@@ -198,37 +195,51 @@ int handle_netmsg(struct nethdr *net)
return 0;
}
+static int local_seq_set = 0;
+
+/* this function only tracks, it does not update the last sequence received */
int mcast_track_seq(uint32_t seq, uint32_t *exp_seq)
{
- static int local_seq_set = 0;
- int ret = 1;
+ int ret = SEQ_UNKNOWN;
/* netlink sequence tracking initialization */
if (!local_seq_set) {
- local_seq_set = 1;
+ ret = SEQ_UNSET;
goto out;
}
/* fast path: we received the correct sequence */
- if (seq == STATE_SYNC(last_seq_recv)+1)
+ if (seq == STATE_SYNC(last_seq_recv)+1) {
+ ret = SEQ_IN_SYNC;
goto out;
+ }
/* out of sequence: some messages got lost */
if (after(seq, STATE_SYNC(last_seq_recv)+1)) {
STATE_SYNC(packets_lost) += seq-STATE_SYNC(last_seq_recv)+1;
- ret = 0;
+ ret = SEQ_AFTER;
goto out;
}
/* out of sequence: replayed/delayed packet? */
if (before(seq, STATE_SYNC(last_seq_recv)+1))
- dlog(LOG_WARNING, "delayed packet? exp=%u rcv=%u",
- STATE_SYNC(last_seq_recv)+1, seq);
+ ret = SEQ_BEFORE;
out:
*exp_seq = STATE_SYNC(last_seq_recv)+1;
- /* update expected sequence */
- STATE_SYNC(last_seq_recv) = seq;
return ret;
}
+
+void mcast_track_update_seq(uint32_t seq)
+{
+ if (!local_seq_set)
+ local_seq_set = 1;
+
+ STATE_SYNC(last_seq_recv) = seq;
+}
+
+int mcast_track_is_seq_set()
+{
+ return local_seq_set;
+}
diff --git a/src/queue.c b/src/queue.c
index 7b20e83..cdd70ae 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -93,7 +93,7 @@ retry:
goto err;
}
- list_add(&n->head, &b->head);
+ list_add_tail(&n->head, &b->head);
b->cur_size += size;
b->num_elems++;
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c
index cac25d0..0b98513 100644
--- a/src/sync-ftfw.c
+++ b/src/sync-ftfw.c
@@ -34,11 +34,26 @@
#define dp(...)
#endif
+#if 0
+#define dprint printf
+#else
+#define dprint(...)
+#endif
+
static LIST_HEAD(rs_list);
static LIST_HEAD(tx_list);
+static unsigned int rs_list_len;
static unsigned int tx_list_len;
static struct queue *rs_queue;
static struct queue *tx_queue;
+static uint32_t exp_seq;
+static uint32_t window;
+static uint32_t ack_from;
+static int ack_from_set = 0;
+static struct alarm_block alive_alarm;
+
+/* XXX: alive message expiration configurable */
+#define ALIVE_INT 1
struct cache_ftfw {
struct list_head rs_list;
@@ -64,6 +79,7 @@ static void cache_ftfw_del(struct us_conntrack *u, void *data)
/* no need for list_del_init since the entry is destroyed */
list_del(&cn->rs_list);
+ rs_list_len--;
}
static struct cache_extra cache_ftfw_extra = {
@@ -83,15 +99,57 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
queue_add(tx_queue, &ack, NETHDR_ACK_SIZ);
}
-static struct alarm_block alive_alarm;
+static void ftfw_run(void);
+/* this function is called from the alarm framework */
static void do_alive_alarm(struct alarm_block *a, void *data)
{
- tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
+ if (ack_from_set && mcast_track_is_seq_set()) {
+ /* exp_seq contains the last update received */
+ dprint("send ALIVE ACK (from=%u, to=%u)\n",
+ ack_from, STATE_SYNC(last_seq_recv));
+ tx_queue_add_ctlmsg(NET_F_ACK,
+ ack_from,
+ STATE_SYNC(last_seq_recv));
+ ack_from_set = 0;
+ } else
+ tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
+
+ /* TODO: no need for buffered send, extracted from run_sync() */
+ ftfw_run();
+ mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
+}
- add_alarm(&alive_alarm, 1, 0);
+#undef _SIGNAL_DEBUG
+#ifdef _SIGNAL_DEBUG
+
+static int rs_dump(void *data1, const void *data2)
+{
+ struct nethdr_ack *net = data1;
+
+ dprint("in RS queue -> seq:%u flags:%u\n", net->seq, net->flags);
+
+ return 0;
}
+#include <signal.h>
+
+static void my_dump(int foo)
+{
+ struct cache_ftfw *cn, *tmp;
+
+ list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
+ struct us_conntrack *u;
+
+ u = cache_get_conntrack(STATE_SYNC(internal), cn);
+ dprint("in RS list -> seq:%u\n", cn->seq);
+ }
+
+ queue_iterate(rs_queue, NULL, rs_dump);
+}
+
+#endif
+
static int ftfw_init(void)
{
tx_queue = queue_create(CONFIG(resend_queue_size));
@@ -106,12 +164,15 @@ static int ftfw_init(void)
return -1;
}
- INIT_LIST_HEAD(&tx_list);
- INIT_LIST_HEAD(&rs_list);
-
- /* XXX: alive message expiration configurable */
init_alarm(&alive_alarm, NULL, do_alive_alarm);
- add_alarm(&alive_alarm, 1, 0);
+ add_alarm(&alive_alarm, ALIVE_INT, 0);
+
+ /* set ack window size */
+ window = CONFIG(window_size);
+
+#ifdef _SIGNAL_DEBUG
+ signal(SIGUSR1, my_dump);
+#endif
return 0;
}
@@ -128,7 +189,7 @@ static int do_cache_to_tx(void *data1, void *data2)
struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), u);
/* add to tx list */
- list_add(&cn->tx_list, &tx_list);
+ list_add_tail(&cn->tx_list, &tx_list);
tx_list_len++;
return 0;
@@ -157,13 +218,14 @@ static int ftfw_local(int fd, int type, void *data)
static int rs_queue_to_tx(void *data1, const void *data2)
{
- struct nethdr *net = data1;
+ struct nethdr_ack *net = data1;
const struct nethdr_ack *nack = data2;
if (between(net->seq, nack->from, nack->to)) {
dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
net->seq, net->flags, net->len);
queue_add(tx_queue, net, net->len);
+ queue_del(rs_queue, net);
}
return 0;
}
@@ -182,18 +244,20 @@ static int rs_queue_empty(void *data1, const void *data2)
static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to)
{
- struct cache_ftfw *cn;
+ struct cache_ftfw *cn, *tmp;
- list_for_each_entry(cn, &rs_list, rs_list) {
+ list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
struct us_conntrack *u;
u = cache_get_conntrack(STATE_SYNC(internal), cn);
if (between(cn->seq, from, to)) {
dp("resending nack'ed (oldseq=%u)\n", cn->seq);
- list_add(&cn->tx_list, &tx_list);
+ list_del_init(&cn->rs_list);
+ rs_list_len--;
+ list_add_tail(&cn->tx_list, &tx_list);
tx_list_len++;
- }
- }
+ }
+ }
}
static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
@@ -207,54 +271,113 @@ static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
if (between(cn->seq, from, to)) {
dp("queue: deleting from queue (seq=%u)\n", cn->seq);
list_del_init(&cn->rs_list);
- }
+ rs_list_len--;
+ }
}
}
-static int ftfw_recv(const struct nethdr *net)
+static int digest_msg(const struct nethdr *net)
{
- static unsigned int window = 0;
- unsigned int exp_seq;
+ if (IS_DATA(net))
+ return MSG_DATA;
- if (window == 0)
- window = CONFIG(window_size);
+ else if (IS_ACK(net)) {
+ const struct nethdr_ack *h = (const struct nethdr_ack *) net;
- if (!mcast_track_seq(net->seq, &exp_seq)) {
- dp("OOS: sending nack (seq=%u)\n", exp_seq);
- tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
- window = CONFIG(window_size);
- } else {
- /* received a window, send an acknowledgement */
- if (--window == 0) {
- dp("sending ack (seq=%u)\n", net->seq);
- tx_queue_add_ctlmsg(NET_F_ACK,
- net->seq - CONFIG(window_size),
- net->seq);
- }
- }
+ dprint("ACK(%u): from seq=%u to seq=%u\n",
+ h->seq, h->from, h->to);
+ rs_list_empty(STATE_SYNC(internal), h->from, h->to);
+ queue_iterate(rs_queue, h, rs_queue_empty);
+ return MSG_CTL;
- if (IS_NACK(net)) {
+ } else if (IS_NACK(net)) {
const struct nethdr_ack *nack = (const struct nethdr_ack *) net;
- dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to);
+ dprint("NACK(%u): from seq=%u to seq=%u\n",
+ nack->seq, nack->from, nack->to);
rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to);
queue_iterate(rs_queue, nack, rs_queue_to_tx);
- return 1;
+ return MSG_CTL;
+
} else if (IS_RESYNC(net)) {
dp("RESYNC ALL\n");
cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx);
- return 1;
- } else if (IS_ACK(net)) {
- const struct nethdr_ack *h = (const struct nethdr_ack *) net;
+ return MSG_CTL;
- dp("ACK: from seq=%u to seq=%u\n", h->from, h->to);
- rs_list_empty(STATE_SYNC(internal), h->from, h->to);
- queue_iterate(rs_queue, h, rs_queue_empty);
- return 1;
} else if (IS_ALIVE(net))
- return 1;
+ return MSG_CTL;
- return 0;
+ return MSG_BAD;
+}
+
+static int ftfw_recv(const struct nethdr *net)
+{
+ int ret = MSG_DATA;
+
+ switch (mcast_track_seq(net->seq, &exp_seq)) {
+ case SEQ_AFTER:
+ ret = digest_msg(net);
+ if (ret == MSG_BAD) {
+ ret = MSG_BAD;
+ goto out;
+ }
+
+ if (ack_from_set) {
+ tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1);
+ dprint("OFS send half ACK: from seq=%u to seq=%u\n",
+ ack_from, exp_seq-1);
+ ack_from_set = 0;
+ }
+
+ tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
+ dprint("OFS send NACK: from seq=%u to seq=%u\n",
+ exp_seq, net->seq-1);
+
+ /* count this message as part of the new window */
+ window = CONFIG(window_size) - 1;
+ ack_from = net->seq;
+ ack_from_set = 1;
+ break;
+
+ case SEQ_BEFORE:
+ /* we don't accept delayed packets */
+ dlog(LOG_WARNING, "Received seq=%u before expected seq=%u",
+ net->seq, exp_seq);
+ dlog(LOG_WARNING, "Probably the other node has come back"
+ "to life but you forgot to add "
+ "conntrackd -r to your scripts");
+ ret = MSG_DROP;
+ break;
+
+ case SEQ_UNSET:
+ case SEQ_IN_SYNC:
+ ret = digest_msg(net);
+ if (ret == MSG_BAD) {
+ ret = MSG_BAD;
+ goto out;
+ }
+
+ if (!ack_from_set) {
+ ack_from_set = 1;
+ ack_from = net->seq;
+ }
+
+ if (--window <= 0) {
+ /* received a window, send an acknowledgement */
+ dprint("OFS send ACK: from seq=%u to seq=%u\n",
+ ack_from, net->seq);
+
+ tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq);
+ window = CONFIG(window_size);
+ ack_from_set = 0;
+ }
+ }
+
+out:
+ if ((ret == MSG_DATA || ret == MSG_CTL))
+ mcast_track_update_seq(net->seq);
+
+ return ret;
}
static void ftfw_send(struct nethdr *net, struct us_conntrack *u)
@@ -270,11 +393,14 @@ static void ftfw_send(struct nethdr *net, struct us_conntrack *u)
cn = (struct cache_ftfw *)
cache_get_extra(STATE_SYNC(internal), u);
- if (!list_empty(&cn->rs_list))
- list_del(&cn->rs_list);
+ if (!list_empty(&cn->rs_list)) {
+ list_del_init(&cn->rs_list);
+ rs_list_len--;
+ }
cn->seq = net->seq;
- list_add(&cn->rs_list, &rs_list);
+ list_add_tail(&cn->rs_list, &rs_list);
+ rs_list_len++;
break;
case NFCT_Q_DESTROY:
queue_add(rs_queue, net, net->len);
@@ -294,7 +420,7 @@ static int tx_queue_xmit(void *data1, const void *data2)
HDR_NETWORK2HOST(net);
if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) {
- dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n",
+ dprint("tx_queue -> to_rs_queue sq: %u fl:%u len:%u\n",
net->seq, net->flags, net->len);
queue_add(rs_queue, net, net->len);
}
@@ -317,8 +443,7 @@ static int tx_list_xmit(struct list_head *i, struct us_conntrack *u)
tx_list_len--;
ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
- if (STATE_SYNC(sync)->send)
- STATE_SYNC(sync)->send(net, u);
+ ftfw_send(net, u);
return ret;
}
@@ -337,6 +462,14 @@ static void ftfw_run(void)
u = cache_get_conntrack(STATE_SYNC(internal), cn);
tx_list_xmit(&cn->tx_list, u);
}
+
+ /* reset alive alarm */
+ add_alarm(&alive_alarm, 1, 0);
+
+ dprint("tx_list_len:%u tx_queue_len:%u "
+ "rs_list_len: %u rs_queue_len:%u\n",
+ tx_list_len, queue_len(tx_queue),
+ rs_list_len, queue_len(rs_queue));
}
struct sync_mode sync_ftfw = {
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 3851e4a..cbb4769 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -42,8 +42,18 @@ static void do_mcast_handler_step(struct nethdr *net)
struct nf_conntrack *ct = (struct nf_conntrack *)(void*) __ct;
struct us_conntrack *u;
- if (STATE_SYNC(sync)->recv(net))
- return;
+ switch (STATE_SYNC(sync)->recv(net)) {
+ case MSG_DATA:
+ break;
+ case MSG_DROP:
+ case MSG_CTL:
+ return;
+ case MSG_BAD:
+ STATE(malformed)++;
+ return;
+ default:
+ break;
+ }
memset(ct, 0, sizeof(__ct));
@@ -211,14 +221,15 @@ static int register_fds_sync(struct fds *fds)
static void run_sync(fd_set *readfds)
{
/* multicast packet has been received */
- if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds))
+ if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds)) {
mcast_handler();
- if (STATE_SYNC(sync)->run)
- STATE_SYNC(sync)->run();
+ if (STATE_SYNC(sync)->run)
+ STATE_SYNC(sync)->run();
- /* flush pending messages */
- mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
+ /* flush pending messages */
+ mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
+ }
}
static void kill_sync(void)
@@ -358,16 +369,8 @@ static int purge_step(void *data1, void *data2)
ret = nfct_query(h, NFCT_Q_GET, u->ct);
if (ret == -1 && errno == ENOENT) {
- size_t len;
- struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_DESTROY);
-
debug_ct(u->ct, "overrun purge resync");
-
- len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
- mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
- if (STATE_SYNC(sync)->send)
- STATE_SYNC(sync)->send(net, u);
-
+ mcast_send_sync(u, u->ct, NFCT_Q_DESTROY);
cache_del(STATE_SYNC(internal), u->ct);
}
@@ -402,16 +405,8 @@ static int overrun_sync(enum nf_conntrack_msg_type type,
if (!cache_test(STATE_SYNC(internal), ct)) {
if ((u = cache_update_force(STATE_SYNC(internal), ct))) {
- size_t len;
-
debug_ct(u->ct, "overrun resync");
-
- struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE);
- len = prepare_send_netmsg(STATE_SYNC(mcast_client),net);
- mcast_buffered_send_netmsg(STATE_SYNC(mcast_client),
- net, len);
- if (STATE_SYNC(sync)->send)
- STATE_SYNC(sync)->send(net, u);
+ mcast_send_sync(u, u->ct, NFCT_Q_UPDATE);
}
}
@@ -437,7 +432,6 @@ retry:
} else {
if (errno == EEXIST) {
cache_del(STATE_SYNC(internal), ct);
- mcast_send_sync(NULL, ct, NFCT_Q_DESTROY);
goto retry;
}