/* * (C) 2006-2008 by Pablo Neira Ayuso * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #include "conntrackd.h" #include "sync.h" #include "us-conntrack.h" #include "queue.h" #include "debug.h" #include "network.h" #include "alarm.h" #include "log.h" #include "cache.h" #include "event.h" #include #if 0 #define dp printf #else #define dp(...) #endif static LIST_HEAD(rs_list); static LIST_HEAD(tx_list); static unsigned int rs_list_len; static unsigned int tx_list_len; static struct queue *rs_queue; static struct queue *tx_queue; static uint32_t exp_seq; static uint32_t window; static uint32_t ack_from; static int ack_from_set = 0; static struct alarm_block alive_alarm; enum { HELLO_INIT, HELLO_SAY, HELLO_DONE, }; static int hello_state = HELLO_INIT; static int say_hello_back; /* XXX: alive message expiration configurable */ #define ALIVE_INT 1 struct cache_ftfw { struct list_head rs_list; struct list_head tx_list; uint32_t seq; }; static void cache_ftfw_add(struct us_conntrack *u, void *data) { struct cache_ftfw *cn = data; /* These nodes are not inserted in the list */ INIT_LIST_HEAD(&cn->rs_list); INIT_LIST_HEAD(&cn->tx_list); } static void cache_ftfw_del(struct us_conntrack *u, void *data) { struct cache_ftfw *cn = data; /* this node is already out of the list */ if (!list_empty(&cn->rs_list)) { /* no need for list_del_init since the entry is destroyed */ list_del(&cn->rs_list); rs_list_len--; } if (!list_empty(&cn->tx_list)) { list_del(&cn->tx_list); tx_list_len--; } } static struct cache_extra cache_ftfw_extra = { .size = sizeof(struct cache_ftfw), .add = cache_ftfw_add, .destroy = cache_ftfw_del }; static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) { struct nethdr_ack ack = { .flags = flags, .from = from, .to = to, }; switch(hello_state) { case HELLO_INIT: hello_state = HELLO_SAY; /* fall through */ case HELLO_SAY: ack.flags |= NET_F_HELLO; break; } if (say_hello_back) { ack.flags |= NET_F_HELLO_BACK; say_hello_back = 0; } queue_add(tx_queue, &ack, NETHDR_ACK_SIZ); write_evfd(STATE_SYNC(evfd)); } /* this function is called from the alarm framework */ static void do_alive_alarm(struct alarm_block *a, void *data) { if (ack_from_set && mcast_track_is_seq_set()) { /* exp_seq contains the last update received */ tx_queue_add_ctlmsg(NET_F_ACK, ack_from, STATE_SYNC(last_seq_recv)); ack_from_set = 0; } else tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0); } static int ftfw_init(void) { tx_queue = queue_create(CONFIG(resend_queue_size)); if (tx_queue == NULL) { dlog(LOG_ERR, "cannot create tx queue"); return -1; } rs_queue = queue_create(CONFIG(resend_queue_size)); if (rs_queue == NULL) { dlog(LOG_ERR, "cannot create rs queue"); return -1; } init_alarm(&alive_alarm, NULL, do_alive_alarm); add_alarm(&alive_alarm, ALIVE_INT, 0); /* set ack window size */ window = CONFIG(window_size); return 0; } static void ftfw_kill(void) { queue_destroy(rs_queue); queue_destroy(tx_queue); } static int do_cache_to_tx(void *data1, void *data2) { struct us_conntrack *u = data2; struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), u); /* repeated request for resync? */ if (!list_empty(&cn->tx_list)) return 0; /* add to tx list */ list_add_tail(&cn->tx_list, &tx_list); tx_list_len++; write_evfd(STATE_SYNC(evfd)); return 0; } static int debug_rs_queue_dump_step(void *data1, const void *data2) { struct nethdr_ack *net = data1; const int *fd = data2; char buf[512]; int size; size = sprintf(buf, "seq:%u flags:%u\n", net->seq, net->flags); send(*fd, buf, size, 0); return 0; } static void debug_rs_dump(int fd) { struct cache_ftfw *cn, *tmp; char buf[512]; int size; size = sprintf(buf, "resent list (len=%u):\n", rs_list_len); send(fd, buf, size, 0); list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { struct us_conntrack *u; u = cache_get_conntrack(STATE_SYNC(internal), cn); size = sprintf(buf, "seq:%u\n", cn->seq); send(fd, buf, size, 0); } size = sprintf(buf, "\nresent queue (len=%u):\n", queue_len(rs_queue)); send(fd, buf, size, 0); queue_iterate(rs_queue, &fd, debug_rs_queue_dump_step); } static int ftfw_local(int fd, int type, void *data) { int ret = 1; switch(type) { case REQUEST_DUMP: dlog(LOG_NOTICE, "request resync"); tx_queue_add_ctlmsg(NET_F_RESYNC, 0, 0); break; case SEND_BULK: dlog(LOG_NOTICE, "sending bulk update"); cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx); break; case DEBUG_INFO: debug_rs_dump(fd); break; default: ret = 0; break; } return ret; } static int rs_queue_to_tx(void *data1, const void *data2) { struct nethdr_ack *net = data1; const struct nethdr_ack *nack = data2; if (before(net->seq, nack->from)) return 0; /* continue */ else if (after(net->seq, nack->to)) return 1; /* break */ dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", net->seq, net->flags, net->len); queue_add(tx_queue, net, net->len); write_evfd(STATE_SYNC(evfd)); queue_del(rs_queue, net); return 0; } static int rs_queue_empty(void *data1, const void *data2) { struct nethdr *net = data1; const struct nethdr_ack *h = data2; if (h == NULL) { dp("inconditional remove from queue (seq=%u)\n", net->seq); queue_del(rs_queue, data1); return 0; } if (before(net->seq, h->from)) return 0; /* continue */ else if (after(net->seq, h->to)) return 1; /* break */ dp("remove from queue (seq=%u)\n", net->seq); queue_del(rs_queue, data1); return 0; } static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to) { struct cache_ftfw *cn, *tmp; list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { struct us_conntrack *u; u = cache_get_conntrack(STATE_SYNC(internal), cn); if (before(cn->seq, from)) continue; else if (after(cn->seq, to)) break; dp("resending nack'ed (oldseq=%u)\n", cn->seq); list_del_init(&cn->rs_list); rs_list_len--; /* we received a request for resync before this nack? */ if (list_empty(&cn->tx_list)) { list_add_tail(&cn->tx_list, &tx_list); tx_list_len++; } write_evfd(STATE_SYNC(evfd)); } } static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to) { struct cache_ftfw *cn, *tmp; list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { struct us_conntrack *u; u = cache_get_conntrack(STATE_SYNC(internal), cn); if (before(cn->seq, from)) continue; else if (after(cn->seq, to)) break; dp("queue: deleting from queue (seq=%u)\n", cn->seq); list_del_init(&cn->rs_list); rs_list_len--; } } static int digest_msg(const struct nethdr *net) { if (IS_DATA(net)) return MSG_DATA; else if (IS_ACK(net)) { const struct nethdr_ack *h = (const struct nethdr_ack *) net; if (before(h->to, h->from)) return MSG_BAD; rs_list_empty(STATE_SYNC(internal), h->from, h->to); queue_iterate(rs_queue, h, rs_queue_empty); return MSG_CTL; } else if (IS_NACK(net)) { const struct nethdr_ack *nack = (const struct nethdr_ack *) net; if (before(nack->to, nack->from)) return MSG_BAD; rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to); queue_iterate(rs_queue, nack, rs_queue_to_tx); return MSG_CTL; } else if (IS_RESYNC(net)) { dp("RESYNC ALL\n"); cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx); return MSG_CTL; } else if (IS_ALIVE(net)) return MSG_CTL; return MSG_BAD; } static int digest_hello(const struct nethdr *net) { int ret = 0; if (IS_HELLO(net)) { say_hello_back = 1; ret = 1; } if (IS_HELLO_BACK(net)) { /* this is a hello back for a requested hello */ if (hello_state == HELLO_SAY) hello_state = HELLO_DONE; } return ret; } static int ftfw_recv(const struct nethdr *net) { int ret = MSG_DATA; if (digest_hello(net)) { /* we have received a hello while we had data to acknowledge. * reset the window, the other doesn't know anthing about it. */ if (ack_from_set && before(net->seq, ack_from)) { window = CONFIG(window_size) - 1; ack_from = net->seq; } /* XXX: flush the resend queues since the other does not * know anything about that data, we are unreliable until * the helloing finishes */ queue_iterate(rs_queue, NULL, rs_queue_empty); rs_list_empty(STATE_SYNC(internal), 0, ~0U); goto bypass; } switch (mcast_track_seq(net->seq, &exp_seq)) { case SEQ_AFTER: ret = digest_msg(net); if (ret == MSG_BAD) { ret = MSG_BAD; goto out; } if (ack_from_set) { tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1); ack_from_set = 0; } tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1); /* count this message as part of the new window */ window = CONFIG(window_size) - 1; ack_from = net->seq; ack_from_set = 1; break; case SEQ_BEFORE: /* we don't accept delayed packets */ dlog(LOG_WARNING, "Received seq=%u before expected seq=%u", net->seq, exp_seq); ret = MSG_DROP; break; case SEQ_UNSET: case SEQ_IN_SYNC: bypass: ret = digest_msg(net); if (ret == MSG_BAD) { ret = MSG_BAD; goto out; } if (!ack_from_set) { ack_from_set = 1; ack_from = net->seq; } if (--window <= 0) { /* received a window, send an acknowledgement */ tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq); window = CONFIG(window_size); ack_from_set = 0; } } out: if ((ret == MSG_DATA || ret == MSG_CTL)) mcast_track_update_seq(net->seq); return ret; } static void ftfw_send(struct nethdr *net, struct us_conntrack *u) { struct cache_ftfw *cn; switch(net->type) { case NET_T_STATE_NEW: case NET_T_STATE_UPD: case NET_T_STATE_DEL: cn = (struct cache_ftfw *) cache_get_extra(STATE_SYNC(internal), u); if (!list_empty(&cn->rs_list)) { list_del_init(&cn->rs_list); rs_list_len--; } switch(hello_state) { case HELLO_INIT: hello_state = HELLO_SAY; /* fall through */ case HELLO_SAY: net->flags |= NET_F_HELLO; break; } if (say_hello_back) { net->flags |= NET_F_HELLO_BACK; say_hello_back = 0; } cn->seq = ntohl(net->seq); list_add_tail(&cn->rs_list, &rs_list); rs_list_len++; break; } } static int tx_queue_xmit(void *data1, const void *data2) { struct nethdr *net = data1; nethdr_set_ack(net); HDR_HOST2NETWORK(net); dp("tx_queue sq: %u fl:%u len:%u\n", ntohl(net->seq), net->flags, ntohs(net->len)); mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); HDR_NETWORK2HOST(net); if (IS_ACK(net) || IS_NACK(net)) queue_add(rs_queue, net, net->len); queue_del(tx_queue, net); return 0; } static int tx_list_xmit(struct list_head *i, struct us_conntrack *u, int type) { int ret; struct nethdr *net = BUILD_NETMSG(u->ct, type); dp("tx_list sq: %u fl:%u len:%u\n", ntohl(net->seq), net->flags, ntohs(net->len)); list_del_init(i); tx_list_len--; ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); ftfw_send(net, u); return ret; } static void ftfw_run(void) { struct cache_ftfw *cn, *tmp; /* send messages in the tx_queue */ queue_iterate(tx_queue, NULL, tx_queue_xmit); /* send conntracks in the tx_list */ list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) { struct us_conntrack *u; u = cache_get_conntrack(STATE_SYNC(internal), cn); if (alarm_pending(&u->alarm)) tx_list_xmit(&cn->tx_list, u, NET_T_STATE_DEL); else tx_list_xmit(&cn->tx_list, u, NET_T_STATE_UPD); } /* reset alive alarm */ add_alarm(&alive_alarm, 1, 0); dp("tx_list_len:%u tx_queue_len:%u " "rs_list_len: %u rs_queue_len:%u\n", tx_list_len, queue_len(tx_queue), rs_list_len, queue_len(rs_queue)); } struct sync_mode sync_ftfw = { .internal_cache_flags = LIFETIME, .external_cache_flags = LIFETIME, .internal_cache_extra = &cache_ftfw_extra, .init = ftfw_init, .kill = ftfw_kill, .local = ftfw_local, .recv = ftfw_recv, .send = ftfw_send, .run = ftfw_run, };