summaryrefslogtreecommitdiff
path: root/src/sync-ftfw.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-ftfw.c')
-rw-r--r--src/sync-ftfw.c367
1 files changed, 176 insertions, 191 deletions
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c
index bddc18c..bb53849 100644
--- a/src/sync-ftfw.c
+++ b/src/sync-ftfw.c
@@ -24,7 +24,7 @@
#include "alarm.h"
#include "log.h"
#include "cache.h"
-#include "event.h"
+#include "fds.h"
#include <string.h>
@@ -34,12 +34,8 @@
#define dp(...)
#endif
-static LIST_HEAD(rs_list);
-static LIST_HEAD(tx_list);
-static unsigned int rs_list_len;
-static unsigned int tx_list_len;
-static struct queue *rs_queue;
-static struct queue *tx_queue;
+struct queue *tx_queue;
+struct queue *rs_queue;
static uint32_t exp_seq;
static uint32_t window;
static uint32_t ack_from;
@@ -58,8 +54,7 @@ static int say_hello_back;
#define ALIVE_INT 1
struct cache_ftfw {
- struct list_head rs_list;
- struct list_head tx_list;
+ struct queue_node qnode;
uint32_t seq;
};
@@ -67,24 +62,13 @@ static void cache_ftfw_add(struct cache_object *obj, void *data)
{
struct cache_ftfw *cn = data;
/* These nodes are not inserted in the list */
- INIT_LIST_HEAD(&cn->rs_list);
- INIT_LIST_HEAD(&cn->tx_list);
+ queue_node_init(&cn->qnode, Q_ELEM_OBJ);
}
static void cache_ftfw_del(struct cache_object *obj, void *data)
{
struct cache_ftfw *cn = data;
-
- /* this node is already out of the list */
- if (!list_empty(&cn->rs_list)) {
- /* no need for list_del_init since the entry is destroyed */
- list_del(&cn->rs_list);
- rs_list_len--;
- }
- if (!list_empty(&cn->tx_list)) {
- list_del(&cn->tx_list);
- tx_list_len--;
- }
+ queue_del(&cn->qnode);
}
static struct cache_extra cache_ftfw_extra = {
@@ -95,54 +79,64 @@ static struct cache_extra cache_ftfw_extra = {
static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
{
- struct nethdr_ack ack = {
- .type = NET_T_CTL,
- .flags = flags,
- .from = from,
- .to = to,
- };
+ struct queue_object *qobj;
+ struct nethdr_ack *ack;
+
+ qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
+ if (qobj == NULL)
+ return;
+
+ ack = (struct nethdr_ack *)qobj->data;
+ ack->type = NET_T_CTL;
+ ack->flags = flags;
+ ack->from = from;
+ ack->to = to;
switch(hello_state) {
case HELLO_INIT:
hello_state = HELLO_SAY;
/* fall through */
case HELLO_SAY:
- ack.flags |= NET_F_HELLO;
+ ack->flags |= NET_F_HELLO;
break;
}
if (say_hello_back) {
- ack.flags |= NET_F_HELLO_BACK;
+ ack->flags |= NET_F_HELLO_BACK;
say_hello_back = 0;
}
- queue_add(tx_queue, &ack, NETHDR_ACK_SIZ);
- write_evfd(STATE_SYNC(evfd));
+ queue_add(tx_queue, &qobj->qnode);
}
static void tx_queue_add_ctlmsg2(uint32_t flags)
{
- struct nethdr ctl = {
- .type = NET_T_CTL,
- .flags = flags,
- };
+ struct queue_object *qobj;
+ struct nethdr *ctl;
+
+ qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
+ if (qobj == NULL)
+ return;
+
+ ctl = (struct nethdr *)qobj->data;
+ ctl->type = NET_T_CTL;
+ ctl->flags = flags;
switch(hello_state) {
case HELLO_INIT:
hello_state = HELLO_SAY;
/* fall through */
case HELLO_SAY:
- ctl.flags |= NET_F_HELLO;
+ ctl->flags |= NET_F_HELLO;
break;
}
if (say_hello_back) {
- ctl.flags |= NET_F_HELLO_BACK;
+ ctl->flags |= NET_F_HELLO_BACK;
say_hello_back = 0;
}
- queue_add(tx_queue, &ctl, NETHDR_SIZ);
- write_evfd(STATE_SYNC(evfd));
+ queue_add(tx_queue, &qobj->qnode);
}
/* this function is called from the alarm framework */
@@ -156,17 +150,18 @@ static void do_alive_alarm(struct alarm_block *a, void *data)
ack_from_set = 0;
} else
tx_queue_add_ctlmsg2(NET_F_ALIVE);
+
+ add_alarm(&alive_alarm, ALIVE_INT, 0);
}
static int ftfw_init(void)
{
- tx_queue = queue_create(CONFIG(resend_queue_size));
+ tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD);
if (tx_queue == NULL) {
dlog(LOG_ERR, "cannot create tx queue");
return -1;
}
-
- rs_queue = queue_create(CONFIG(resend_queue_size));
+ rs_queue = queue_create(INT_MAX, 0);
if (rs_queue == NULL) {
dlog(LOG_ERR, "cannot create rs queue");
return -1;
@@ -192,45 +187,47 @@ static int do_cache_to_tx(void *data1, void *data2)
struct cache_object *obj = data2;
struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj);
- /* repeated request for resync? */
- if (!list_empty(&cn->tx_list))
- return 0;
+ if (queue_in(rs_queue, &cn->qnode))
+ queue_del(&cn->qnode);
- /* add to tx list */
- list_add_tail(&cn->tx_list, &tx_list);
- tx_list_len++;
- write_evfd(STATE_SYNC(evfd));
+ queue_add(tx_queue, &cn->qnode);
return 0;
}
-static int debug_rs_queue_dump_step(void *data1, const void *data2)
+static int rs_queue_dump(struct queue_node *n, const void *data2)
{
- struct nethdr_ack *net = data1;
const int *fd = data2;
char buf[512];
int size;
- size = sprintf(buf, "seq:%u flags:%u\n", net->seq, net->flags);
+ switch(n->type) {
+ case Q_ELEM_CTL: {
+ struct nethdr *net = queue_node_data(n);
+ size = sprintf(buf, "control -> seq:%u flags:%u\n",
+ net->seq, net->flags);
+ break;
+ }
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn = (struct cache_ftfw *) n;
+ size = sprintf(buf, "object -> seq:%u\n", cn->seq);
+ break;
+ }
+ default:
+ return 0;
+ }
send(*fd, buf, size, 0);
return 0;
}
static void debug_rs_dump(int fd)
{
- struct cache_ftfw *cn, *tmp;
char buf[512];
int size;
- size = sprintf(buf, "resent list (len=%u):\n", rs_list_len);
- send(fd, buf, size, 0);
- list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
- size = sprintf(buf, "seq:%u\n", cn->seq);
- send(fd, buf, size, 0);
- }
- size = sprintf(buf, "\nresent queue (len=%u):\n", queue_len(rs_queue));
+ size = sprintf(buf, "resent queue (len=%u):\n", queue_len(rs_queue));
send(fd, buf, size, 0);
- queue_iterate(rs_queue, &fd, debug_rs_queue_dump_step);
+ queue_iterate(rs_queue, &fd, rs_queue_dump);
}
static int ftfw_local(int fd, int type, void *data)
@@ -257,87 +254,84 @@ static int ftfw_local(int fd, int type, void *data)
return ret;
}
-static int rs_queue_to_tx(void *data1, const void *data2)
+static int rs_queue_to_tx(struct queue_node *n, const void *data)
{
- struct nethdr_ack *net = data1;
- const struct nethdr_ack *nack = data2;
-
- if (before(net->seq, nack->from))
- return 0; /* continue */
- else if (after(net->seq, nack->to))
- return 1; /* break */
-
- dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
- net->seq, net->flags, net->len);
- queue_add(tx_queue, net, net->len);
- write_evfd(STATE_SYNC(evfd));
- queue_del(rs_queue, net);
- return 0;
-}
+ const struct nethdr_ack *nack = data;
-static int rs_queue_empty(void *data1, const void *data2)
-{
- struct nethdr *net = data1;
- const struct nethdr_ack *h = data2;
+ switch(n->type) {
+ case Q_ELEM_CTL: {
+ struct nethdr_ack *net = queue_node_data(n);
- if (h == NULL) {
- dp("inconditional remove from queue (seq=%u)\n", net->seq);
- queue_del(rs_queue, data1);
- return 0;
+ if (before(net->seq, nack->from))
+ return 0; /* continue */
+ else if (after(net->seq, nack->to))
+ return 1; /* break */
+
+ dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
+ net->seq, net->flags, net->len);
+
+ queue_del(n);
+ queue_add(tx_queue, n);
+ break;
}
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn;
- if (before(net->seq, h->from))
- return 0; /* continue */
- else if (after(net->seq, h->to))
- return 1; /* break */
+ cn = (struct cache_ftfw *) n;
+ if (before(cn->seq, nack->from))
+ return 0;
+ else if (after(cn->seq, nack->to))
+ return 1;
- dp("remove from queue (seq=%u)\n", net->seq);
- queue_del(rs_queue, data1);
+ dp("resending nack'ed (oldseq=%u)\n", cn->seq);
+
+ queue_del(n);
+ queue_add(tx_queue, n);
+ break;
+ }
+ }
return 0;
}
-static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to)
+static int rs_queue_empty(struct queue_node *n, const void *data)
{
- struct cache_ftfw *cn, *tmp;
+ const struct nethdr_ack *h = data;
- list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
- struct cache_object *obj;;
-
- obj = cache_data_get_object(STATE_SYNC(internal), cn);
- if (before(cn->seq, from))
- continue;
- else if (after(cn->seq, to))
- break;
+ if (h == NULL) {
+ dp("inconditional remove from queue (seq=%u)\n", net->seq);
+ queue_del(n);
+ return 0;
+ }
- dp("resending nack'ed (oldseq=%u)\n", cn->seq);
- list_del_init(&cn->rs_list);
- rs_list_len--;
- /* we received a request for resync before this nack? */
- if (list_empty(&cn->tx_list)) {
- list_add_tail(&cn->tx_list, &tx_list);
- tx_list_len++;
- }
- write_evfd(STATE_SYNC(evfd));
- }
-}
+ switch(n->type) {
+ case Q_ELEM_CTL: {
+ struct nethdr_ack *net = queue_node_data(n);
-static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
-{
- struct cache_ftfw *cn, *tmp;
+ if (before(net->seq, h->from))
+ return 0; /* continue */
+ else if (after(net->seq, h->to))
+ return 1; /* break */
- list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
- struct cache_object *obj;
+ dp("remove from queue (seq=%u)\n", net->seq);
+ queue_del(n);
+ queue_object_free((struct queue_object *)n);
+ break;
+ }
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn;
- obj = cache_data_get_object(STATE_SYNC(internal), cn);
- if (before(cn->seq, from))
- continue;
- else if (after(cn->seq, to))
- break;
+ cn = (struct cache_ftfw *) n;
+ if (before(cn->seq, h->from))
+ return 0;
+ else if (after(cn->seq, h->to))
+ return 1;
dp("queue: deleting from queue (seq=%u)\n", cn->seq);
- list_del_init(&cn->rs_list);
- rs_list_len--;
+ queue_del(n);
+ break;
}
+ }
+ return 0;
}
static int digest_msg(const struct nethdr *net)
@@ -351,7 +345,6 @@ static int digest_msg(const struct nethdr *net)
if (before(h->to, h->from))
return MSG_BAD;
- rs_list_empty(STATE_SYNC(internal), h->from, h->to);
queue_iterate(rs_queue, h, rs_queue_empty);
return MSG_CTL;
@@ -361,7 +354,6 @@ static int digest_msg(const struct nethdr *net)
if (before(nack->to, nack->from))
return MSG_BAD;
- rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to);
queue_iterate(rs_queue, nack, rs_queue_to_tx);
return MSG_CTL;
@@ -409,7 +401,6 @@ static int ftfw_recv(const struct nethdr *net)
* know anything about that data, we are unreliable until
* the helloing finishes */
queue_iterate(rs_queue, NULL, rs_queue_empty);
- rs_list_empty(STATE_SYNC(internal), 0, ~0U);
goto bypass;
}
@@ -480,10 +471,8 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj)
cn = (struct cache_ftfw *)
cache_get_extra(STATE_SYNC(internal), obj);
- if (!list_empty(&cn->rs_list)) {
- list_del_init(&cn->rs_list);
- rs_list_len--;
- }
+ if (queue_in(rs_queue, &cn->qnode))
+ queue_del(&cn->qnode);
switch(hello_state) {
case HELLO_INIT:
@@ -500,82 +489,77 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj)
}
cn->seq = ntohl(net->seq);
- list_add_tail(&cn->rs_list, &rs_list);
- rs_list_len++;
+ queue_add(rs_queue, &cn->qnode);
break;
}
}
-static int tx_queue_xmit(void *data1, const void *data2)
+static int tx_queue_xmit(struct queue_node *n, const void *data)
{
- struct nethdr *net = data1;
-
- if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
- nethdr_set_ack(net);
- } else if (IS_ALIVE(net)) {
- nethdr_set_ctl(net);
- } else {
- STATE_SYNC(error).msg_snd_malformed++;
- return 0;
- }
- HDR_HOST2NETWORK(net);
-
- dp("tx_queue sq: %u fl:%u len:%u\n",
- ntohl(net->seq), net->flags, ntohs(net->len));
-
- mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
- HDR_NETWORK2HOST(net);
+ switch(n->type) {
+ case Q_ELEM_CTL: {
+ struct nethdr *net = queue_node_data(n);
+
+ if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
+ nethdr_set_ack(net);
+ } else if (IS_ALIVE(net)) {
+ nethdr_set_ctl(net);
+ } else {
+ STATE_SYNC(error).msg_snd_malformed++;
+ return 0;
+ }
+ HDR_HOST2NETWORK(net);
- if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net))
- queue_add(rs_queue, net, net->len);
+ dp("tx_queue sq: %u fl:%u len:%u\n",
+ ntohl(net->seq), net->flags, ntohs(net->len));
- queue_del(tx_queue, net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ HDR_NETWORK2HOST(net);
- return 0;
-}
-
-static int tx_list_xmit(struct list_head *i, struct cache_object *obj, int type)
-{
- int ret;
- struct nethdr *net = BUILD_NETMSG(obj->ct, type);
+ queue_del(n);
+ if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net))
+ queue_add(rs_queue, n);
+ else
+ queue_object_free((struct queue_object *)n);
+ break;
+ }
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn;
+ struct cache_object *obj;
+ int type;
+ struct nethdr *net;
- dp("tx_list sq: %u fl:%u len:%u\n",
- ntohl(net->seq), net->flags, ntohs(net->len));
+ cn = (struct cache_ftfw *)n;
+ obj = cache_data_get_object(STATE_SYNC(internal), cn);
+ type = object_status_to_network_type(obj->status);
+ net = BUILD_NETMSG(obj->ct, type);
- list_del_init(i);
- tx_list_len--;
+ dp("tx_list sq: %u fl:%u len:%u\n",
+ ntohl(net->seq), net->flags, ntohs(net->len));
- ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
- ftfw_send(net, obj);
+ queue_del(n);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ ftfw_send(net, obj);
+ break;
+ }
+ }
- return ret;
+ return 0;
}
-static void ftfw_run(void)
+static void ftfw_run(fd_set *readfds)
{
- struct cache_ftfw *cn, *tmp;
-
- /* send messages in the tx_queue */
- queue_iterate(tx_queue, NULL, tx_queue_xmit);
-
- /* send conntracks in the tx_list */
- list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) {
- struct cache_object *obj;
-
- obj = cache_data_get_object(STATE_SYNC(internal), cn);
- if (alarm_pending(&obj->alarm))
- tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_DEL);
- else
- tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_UPD);
+ if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) {
+ queue_iterate(tx_queue, NULL, tx_queue_xmit);
+ add_alarm(&alive_alarm, 1, 0);
+ dp("tx_queue_len:%u rs_queue_len:%u\n",
+ queue_len(tx_queue), queue_len(rs_queue));
}
+}
- /* reset alive alarm */
- add_alarm(&alive_alarm, 1, 0);
-
- dp("tx_list_len:%u tx_queue_len:%u "
- "rs_list_len: %u rs_queue_len:%u\n",
- tx_list_len, queue_len(tx_queue),
- rs_list_len, queue_len(rs_queue));
+static int ftfw_register_fds(struct fds *fds)
+{
+ return register_fd(queue_get_eventfd(tx_queue), fds);
}
struct sync_mode sync_ftfw = {
@@ -588,4 +572,5 @@ struct sync_mode sync_ftfw = {
.recv = ftfw_recv,
.send = ftfw_send,
.run = ftfw_run,
+ .register_fds = ftfw_register_fds,
};