From b1d00262f999a597fa24af3298195db9cf52b790 Mon Sep 17 00:00:00 2001 From: Pablo Neira Ayuso Date: Sat, 17 Jan 2009 18:03:50 +0100 Subject: sync: enqueue state updates to tx_queue With this patch, all the states updates are enqueued in the tx_queue. Thus, there's a single output path. This patch adds a simple refcounting mechanism to note when an object is sitting in the txqueue. This patch also removes the alarm that is required by the ftfw approach. Signed-off-by: Pablo Neira Ayuso --- src/sync-alarm.c | 75 +++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 64 insertions(+), 11 deletions(-) (limited to 'src/sync-alarm.c') diff --git a/src/sync-alarm.c b/src/sync-alarm.c index 34937fe..a2f17ac 100644 --- a/src/sync-alarm.c +++ b/src/sync-alarm.c @@ -21,14 +21,21 @@ #include "network.h" #include "alarm.h" #include "cache.h" +#include "queue.h" #include "debug.h" #include #include +struct cache_alarm { + struct queue_node qnode; + struct alarm_block alarm; +}; + +static void alarm_enqueue(struct cache_object *obj, int query); + static void refresher(struct alarm_block *a, void *data) { - struct nethdr *net; struct cache_object *obj = data; debug_ct(obj->ct, "persistence update"); @@ -37,36 +44,37 @@ static void refresher(struct alarm_block *a, void *data) random() % CONFIG(refresh) + 1, ((random() % 5 + 1) * 200000) - 1); - net = BUILD_NETMSG(obj->ct, NET_T_STATE_UPD); - mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + alarm_enqueue(obj, NET_T_STATE_UPD); } static void cache_alarm_add(struct cache_object *obj, void *data) { - struct alarm_block *a = data; + struct cache_alarm *ca = data; - init_alarm(a, obj, refresher); - add_alarm(a, + queue_node_init(&ca->qnode, Q_ELEM_OBJ); + init_alarm(&ca->alarm, obj, refresher); + add_alarm(&ca->alarm, random() % CONFIG(refresh) + 1, ((random() % 5 + 1) * 200000) - 1); } static void cache_alarm_update(struct cache_object *obj, void *data) { - struct alarm_block *a = data; - add_alarm(a, + struct cache_alarm *ca = data; + add_alarm(&ca->alarm, random() % CONFIG(refresh) + 1, ((random() % 5 + 1) * 200000) - 1); } static void cache_alarm_destroy(struct cache_object *obj, void *data) { - struct alarm_block *a = data; - del_alarm(a); + struct cache_alarm *ca = data; + queue_del(&ca->qnode); + del_alarm(&ca->alarm); } static struct cache_extra cache_alarm_extra = { - .size = sizeof(struct alarm_block), + .size = sizeof(struct cache_alarm), .add = cache_alarm_add, .update = cache_alarm_update, .destroy = cache_alarm_destroy @@ -102,9 +110,54 @@ static int alarm_recv(const struct nethdr *net) return 0; } +static void alarm_enqueue(struct cache_object *obj, int query) +{ + struct cache_alarm *ca = cache_get_extra(STATE_SYNC(internal), obj); + if (queue_add(STATE_SYNC(tx_queue), &ca->qnode)) + cache_object_get(obj); +} + +static int tx_queue_xmit(struct queue_node *n, const void *data) +{ + struct nethdr *net; + + queue_del(n); + + switch(n->type) { + case Q_ELEM_CTL: + net = queue_node_data(n); + nethdr_set_ctl(net); + HDR_HOST2NETWORK(net); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + queue_object_free((struct queue_object *)n); + break; + case Q_ELEM_OBJ: { + struct cache_alarm *ca; + struct cache_object *obj; + int type; + + ca = (struct cache_alarm *)n; + obj = cache_data_get_object(STATE_SYNC(internal), ca); + type = object_status_to_network_type(obj->status); + net = BUILD_NETMSG(obj->ct, type); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + cache_object_put(obj); + break; + } + } + return 0; +} + +static void alarm_xmit(void) +{ + queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit); +} + struct sync_mode sync_alarm = { .internal_cache_flags = LIFETIME, .external_cache_flags = TIMER | LIFETIME, .internal_cache_extra = &cache_alarm_extra, .recv = alarm_recv, + .enqueue = alarm_enqueue, + .xmit = alarm_xmit, }; -- cgit v1.2.3