diff options
Diffstat (limited to 'src/sync-ftfw.c')
-rw-r--r-- | src/sync-ftfw.c | 70 |
1 files changed, 36 insertions, 34 deletions
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index d544a7b..a287ecd 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -168,11 +168,13 @@ static int do_cache_to_tx(void *data1, void *data2) struct cache_object *obj = data2; struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj); - if (queue_in(rs_queue, &cn->qnode)) + if (queue_in(rs_queue, &cn->qnode)) { queue_del(&cn->qnode); - - queue_add(STATE_SYNC(tx_queue), &cn->qnode); - + queue_add(STATE_SYNC(tx_queue), &cn->qnode); + } else { + if (queue_add(STATE_SYNC(tx_queue), &cn->qnode)) + cache_object_get(obj); + } return 0; } @@ -278,16 +280,15 @@ static int rs_queue_empty(struct queue_node *n, const void *data) { const struct nethdr_ack *h = data; - if (h == NULL) { - dp("inconditional remove from queue (seq=%u)\n", net->seq); - queue_del(n); - return 0; - } - switch(n->type) { case Q_ELEM_CTL: { struct nethdr_ack *net = queue_node_data(n); + if (h == NULL) { + queue_del(n); + queue_object_free((struct queue_object *)n); + return 0; + } if (before(net->seq, h->from)) return 0; /* continue */ else if (after(net->seq, h->to)) @@ -300,8 +301,15 @@ static int rs_queue_empty(struct queue_node *n, const void *data) } case Q_ELEM_OBJ: { struct cache_ftfw *cn; + struct cache_object *obj; cn = (struct cache_ftfw *) n; + if (h == NULL) { + queue_del(n); + obj = cache_data_get_object(STATE_SYNC(internal), cn); + cache_object_put(obj); + return 0; + } if (before(cn->seq, h->from)) return 0; else if (after(cn->seq, h->to)) @@ -309,6 +317,8 @@ static int rs_queue_empty(struct queue_node *n, const void *data) dp("queue: deleting from queue (seq=%u)\n", cn->seq); queue_del(n); + obj = cache_data_get_object(STATE_SYNC(internal), cn); + cache_object_put(obj); break; } } @@ -441,28 +451,6 @@ out: return ret; } -static void ftfw_send(struct nethdr *net, struct cache_object *obj) -{ - struct cache_ftfw *cn; - - switch(net->type) { - case NET_T_STATE_NEW: - case NET_T_STATE_UPD: - case NET_T_STATE_DEL: - cn = (struct cache_ftfw *) - cache_get_extra(STATE_SYNC(internal), obj); - - if (queue_in(rs_queue, &cn->qnode)) - queue_del(&cn->qnode); - - nethdr_set_hello(net); - - cn->seq = ntohl(net->seq); - queue_add(rs_queue, &cn->qnode); - break; - } -} - static int tx_queue_xmit(struct queue_node *n, const void *data) { queue_del(n); @@ -511,7 +499,9 @@ static int tx_queue_xmit(struct queue_node *n, const void *data) ntohl(net->seq), net->flags, ntohs(net->len)); mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); - ftfw_send(net, obj); + cn->seq = ntohl(net->seq); + queue_add(rs_queue, &cn->qnode); + /* we release the object once we get the acknowlegment */ break; } } @@ -527,6 +517,18 @@ static void ftfw_xmit(void) queue_len(tx_queue), queue_len(rs_queue)); } +static void ftfw_enqueue(struct cache_object *obj, int type) +{ + struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj); + if (queue_in(rs_queue, &cn->qnode)) { + queue_del(&cn->qnode); + queue_add(STATE_SYNC(tx_queue), &cn->qnode); + } else { + if (queue_add(STATE_SYNC(tx_queue), &cn->qnode)) + cache_object_get(obj); + } +} + struct sync_mode sync_ftfw = { .internal_cache_flags = LIFETIME, .external_cache_flags = LIFETIME, @@ -535,6 +537,6 @@ struct sync_mode sync_ftfw = { .kill = ftfw_kill, .local = ftfw_local, .recv = ftfw_recv, - .send = ftfw_send, + .enqueue = ftfw_enqueue, .xmit = ftfw_xmit, }; |