summaryrefslogtreecommitdiff
path: root/src/sync-ftfw.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-ftfw.c')
-rw-r--r--src/sync-ftfw.c70
1 files changed, 36 insertions, 34 deletions
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c
index d544a7b..a287ecd 100644
--- a/src/sync-ftfw.c
+++ b/src/sync-ftfw.c
@@ -168,11 +168,13 @@ static int do_cache_to_tx(void *data1, void *data2)
struct cache_object *obj = data2;
struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj);
- if (queue_in(rs_queue, &cn->qnode))
+ if (queue_in(rs_queue, &cn->qnode)) {
queue_del(&cn->qnode);
-
- queue_add(STATE_SYNC(tx_queue), &cn->qnode);
-
+ queue_add(STATE_SYNC(tx_queue), &cn->qnode);
+ } else {
+ if (queue_add(STATE_SYNC(tx_queue), &cn->qnode))
+ cache_object_get(obj);
+ }
return 0;
}
@@ -278,16 +280,15 @@ static int rs_queue_empty(struct queue_node *n, const void *data)
{
const struct nethdr_ack *h = data;
- if (h == NULL) {
- dp("inconditional remove from queue (seq=%u)\n", net->seq);
- queue_del(n);
- return 0;
- }
-
switch(n->type) {
case Q_ELEM_CTL: {
struct nethdr_ack *net = queue_node_data(n);
+ if (h == NULL) {
+ queue_del(n);
+ queue_object_free((struct queue_object *)n);
+ return 0;
+ }
if (before(net->seq, h->from))
return 0; /* continue */
else if (after(net->seq, h->to))
@@ -300,8 +301,15 @@ static int rs_queue_empty(struct queue_node *n, const void *data)
}
case Q_ELEM_OBJ: {
struct cache_ftfw *cn;
+ struct cache_object *obj;
cn = (struct cache_ftfw *) n;
+ if (h == NULL) {
+ queue_del(n);
+ obj = cache_data_get_object(STATE_SYNC(internal), cn);
+ cache_object_put(obj);
+ return 0;
+ }
if (before(cn->seq, h->from))
return 0;
else if (after(cn->seq, h->to))
@@ -309,6 +317,8 @@ static int rs_queue_empty(struct queue_node *n, const void *data)
dp("queue: deleting from queue (seq=%u)\n", cn->seq);
queue_del(n);
+ obj = cache_data_get_object(STATE_SYNC(internal), cn);
+ cache_object_put(obj);
break;
}
}
@@ -441,28 +451,6 @@ out:
return ret;
}
-static void ftfw_send(struct nethdr *net, struct cache_object *obj)
-{
- struct cache_ftfw *cn;
-
- switch(net->type) {
- case NET_T_STATE_NEW:
- case NET_T_STATE_UPD:
- case NET_T_STATE_DEL:
- cn = (struct cache_ftfw *)
- cache_get_extra(STATE_SYNC(internal), obj);
-
- if (queue_in(rs_queue, &cn->qnode))
- queue_del(&cn->qnode);
-
- nethdr_set_hello(net);
-
- cn->seq = ntohl(net->seq);
- queue_add(rs_queue, &cn->qnode);
- break;
- }
-}
-
static int tx_queue_xmit(struct queue_node *n, const void *data)
{
queue_del(n);
@@ -511,7 +499,9 @@ static int tx_queue_xmit(struct queue_node *n, const void *data)
ntohl(net->seq), net->flags, ntohs(net->len));
mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
- ftfw_send(net, obj);
+ cn->seq = ntohl(net->seq);
+ queue_add(rs_queue, &cn->qnode);
+ /* we release the object once we get the acknowlegment */
break;
}
}
@@ -527,6 +517,18 @@ static void ftfw_xmit(void)
queue_len(tx_queue), queue_len(rs_queue));
}
+static void ftfw_enqueue(struct cache_object *obj, int type)
+{
+ struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj);
+ if (queue_in(rs_queue, &cn->qnode)) {
+ queue_del(&cn->qnode);
+ queue_add(STATE_SYNC(tx_queue), &cn->qnode);
+ } else {
+ if (queue_add(STATE_SYNC(tx_queue), &cn->qnode))
+ cache_object_get(obj);
+ }
+}
+
struct sync_mode sync_ftfw = {
.internal_cache_flags = LIFETIME,
.external_cache_flags = LIFETIME,
@@ -535,6 +537,6 @@ struct sync_mode sync_ftfw = {
.kill = ftfw_kill,
.local = ftfw_local,
.recv = ftfw_recv,
- .send = ftfw_send,
+ .enqueue = ftfw_enqueue,
.xmit = ftfw_xmit,
};