diff options
-rw-r--r-- | include/cache.h | 7 | ||||
-rw-r--r-- | include/sync.h | 2 | ||||
-rw-r--r-- | src/cache.c | 59 | ||||
-rw-r--r-- | src/sync-alarm.c | 75 | ||||
-rw-r--r-- | src/sync-ftfw.c | 70 | ||||
-rw-r--r-- | src/sync-mode.c | 23 | ||||
-rw-r--r-- | src/sync-notrack.c | 12 |
7 files changed, 152 insertions, 96 deletions
diff --git a/include/cache.h b/include/cache.h index fd8e05f..03b6822 100644 --- a/include/cache.h +++ b/include/cache.h @@ -4,7 +4,6 @@ #include <stdint.h> #include <stddef.h> #include "hash.h" -#include "alarm.h" /* cache features */ enum { @@ -36,7 +35,7 @@ struct cache_object { struct nf_conntrack *ct; struct cache *cache; int status; - struct alarm_block alarm; + int refcnt; char data[0]; }; @@ -106,12 +105,14 @@ void cache_destroy(struct cache *e); struct cache_object *cache_object_new(struct cache *c, struct nf_conntrack *ct); void cache_object_free(struct cache_object *obj); +void cache_object_get(struct cache_object *obj); +int cache_object_put(struct cache_object *obj); +void cache_object_set_status(struct cache_object *obj, int status); int cache_add(struct cache *c, struct cache_object *obj, int id); void cache_update(struct cache *c, struct cache_object *obj, int id, struct nf_conntrack *ct); struct cache_object *cache_update_force(struct cache *c, struct nf_conntrack *ct); void cache_del(struct cache *c, struct cache_object *obj); -int cache_del_timer(struct cache *c, struct cache_object *obj, int timeout); struct cache_object *cache_find(struct cache *c, struct nf_conntrack *ct, int *pos); void cache_stats(const struct cache *c, int fd); void cache_stats_extended(const struct cache *c, int fd); diff --git a/include/sync.h b/include/sync.h index bced1cc..51f8f5b 100644 --- a/include/sync.h +++ b/include/sync.h @@ -17,7 +17,7 @@ struct sync_mode { void (*kill)(void); int (*local)(int fd, int type, void *data); int (*recv)(const struct nethdr *net); - void (*send)(struct nethdr *net, struct cache_object *obj); + void (*enqueue)(struct cache_object *obj, int type); void (*xmit)(void); }; diff --git a/src/cache.c b/src/cache.c index c46498b..1e08a33 100644 --- a/src/cache.c +++ b/src/cache.c @@ -174,8 +174,6 @@ void cache_destroy(struct cache *c) free(c); } -static void __del_timeout(struct alarm_block *a, void *data); - struct cache_object *cache_object_new(struct cache *c, struct nf_conntrack *ct) { struct cache_object *obj; @@ -187,7 +185,6 @@ struct cache_object *cache_object_new(struct cache *c, struct nf_conntrack *ct) return NULL; } obj->cache = c; - init_alarm(&obj->alarm, obj, __del_timeout); if ((obj->ct = nfct_new()) == NULL) { free(obj); @@ -207,6 +204,30 @@ void cache_object_free(struct cache_object *obj) free(obj); } +int cache_object_put(struct cache_object *obj) +{ + if (--obj->refcnt == 0) { + cache_del(obj->cache, obj); + cache_object_free(obj); + return 1; + } + return 0; +} + +void cache_object_get(struct cache_object *obj) +{ + obj->refcnt++; +} + +void cache_object_set_status(struct cache_object *obj, int status) +{ + if (status == C_OBJ_DEAD) { + obj->cache->stats.del_ok++; + obj->cache->stats.active--; + } + obj->status = status; +} + static int __add(struct cache *c, struct cache_object *obj, int id) { int ret; @@ -227,6 +248,7 @@ static int __add(struct cache *c, struct cache_object *obj, int id) c->stats.active++; obj->status = C_OBJ_NEW; + obj->refcnt++; return 0; } @@ -292,7 +314,6 @@ void cache_del(struct cache *c, struct cache_object *obj) c->stats.del_ok++; c->stats.active--; } - del_alarm(&obj->alarm); __del(c, obj); } @@ -322,36 +343,6 @@ cache_update_force(struct cache *c, struct nf_conntrack *ct) return obj; } -static void __del_timeout(struct alarm_block *a, void *data) -{ - struct cache_object *obj = (struct cache_object *) data; - __del(obj->cache, obj); - cache_object_free(obj); -} - -int cache_del_timer(struct cache *c, struct cache_object *obj, int timeout) -{ - if (timeout <= 0) { - cache_del(c, obj); - cache_object_free(obj); - return 1; - } - if (obj->status != C_OBJ_DEAD) { - obj->status = C_OBJ_DEAD; - add_alarm(&obj->alarm, timeout, 0); - /* - * increase stats even if this entry was not really - * removed yet. We do not want to make people think - * that the replication protocol does not work - * properly. - */ - c->stats.del_ok++; - c->stats.active--; - return 1; - } - return 0; -} - struct cache_object * cache_find(struct cache *c, struct nf_conntrack *ct, int *id) { diff --git a/src/sync-alarm.c b/src/sync-alarm.c index 34937fe..a2f17ac 100644 --- a/src/sync-alarm.c +++ b/src/sync-alarm.c @@ -21,14 +21,21 @@ #include "network.h" #include "alarm.h" #include "cache.h" +#include "queue.h" #include "debug.h" #include <stdlib.h> #include <string.h> +struct cache_alarm { + struct queue_node qnode; + struct alarm_block alarm; +}; + +static void alarm_enqueue(struct cache_object *obj, int query); + static void refresher(struct alarm_block *a, void *data) { - struct nethdr *net; struct cache_object *obj = data; debug_ct(obj->ct, "persistence update"); @@ -37,36 +44,37 @@ static void refresher(struct alarm_block *a, void *data) random() % CONFIG(refresh) + 1, ((random() % 5 + 1) * 200000) - 1); - net = BUILD_NETMSG(obj->ct, NET_T_STATE_UPD); - mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + alarm_enqueue(obj, NET_T_STATE_UPD); } static void cache_alarm_add(struct cache_object *obj, void *data) { - struct alarm_block *a = data; + struct cache_alarm *ca = data; - init_alarm(a, obj, refresher); - add_alarm(a, + queue_node_init(&ca->qnode, Q_ELEM_OBJ); + init_alarm(&ca->alarm, obj, refresher); + add_alarm(&ca->alarm, random() % CONFIG(refresh) + 1, ((random() % 5 + 1) * 200000) - 1); } static void cache_alarm_update(struct cache_object *obj, void *data) { - struct alarm_block *a = data; - add_alarm(a, + struct cache_alarm *ca = data; + add_alarm(&ca->alarm, random() % CONFIG(refresh) + 1, ((random() % 5 + 1) * 200000) - 1); } static void cache_alarm_destroy(struct cache_object *obj, void *data) { - struct alarm_block *a = data; - del_alarm(a); + struct cache_alarm *ca = data; + queue_del(&ca->qnode); + del_alarm(&ca->alarm); } static struct cache_extra cache_alarm_extra = { - .size = sizeof(struct alarm_block), + .size = sizeof(struct cache_alarm), .add = cache_alarm_add, .update = cache_alarm_update, .destroy = cache_alarm_destroy @@ -102,9 +110,54 @@ static int alarm_recv(const struct nethdr *net) return 0; } +static void alarm_enqueue(struct cache_object *obj, int query) +{ + struct cache_alarm *ca = cache_get_extra(STATE_SYNC(internal), obj); + if (queue_add(STATE_SYNC(tx_queue), &ca->qnode)) + cache_object_get(obj); +} + +static int tx_queue_xmit(struct queue_node *n, const void *data) +{ + struct nethdr *net; + + queue_del(n); + + switch(n->type) { + case Q_ELEM_CTL: + net = queue_node_data(n); + nethdr_set_ctl(net); + HDR_HOST2NETWORK(net); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + queue_object_free((struct queue_object *)n); + break; + case Q_ELEM_OBJ: { + struct cache_alarm *ca; + struct cache_object *obj; + int type; + + ca = (struct cache_alarm *)n; + obj = cache_data_get_object(STATE_SYNC(internal), ca); + type = object_status_to_network_type(obj->status); + net = BUILD_NETMSG(obj->ct, type); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + cache_object_put(obj); + break; + } + } + return 0; +} + +static void alarm_xmit(void) +{ + queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit); +} + struct sync_mode sync_alarm = { .internal_cache_flags = LIFETIME, .external_cache_flags = TIMER | LIFETIME, .internal_cache_extra = &cache_alarm_extra, .recv = alarm_recv, + .enqueue = alarm_enqueue, + .xmit = alarm_xmit, }; diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index d544a7b..a287ecd 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -168,11 +168,13 @@ static int do_cache_to_tx(void *data1, void *data2) struct cache_object *obj = data2; struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj); - if (queue_in(rs_queue, &cn->qnode)) + if (queue_in(rs_queue, &cn->qnode)) { queue_del(&cn->qnode); - - queue_add(STATE_SYNC(tx_queue), &cn->qnode); - + queue_add(STATE_SYNC(tx_queue), &cn->qnode); + } else { + if (queue_add(STATE_SYNC(tx_queue), &cn->qnode)) + cache_object_get(obj); + } return 0; } @@ -278,16 +280,15 @@ static int rs_queue_empty(struct queue_node *n, const void *data) { const struct nethdr_ack *h = data; - if (h == NULL) { - dp("inconditional remove from queue (seq=%u)\n", net->seq); - queue_del(n); - return 0; - } - switch(n->type) { case Q_ELEM_CTL: { struct nethdr_ack *net = queue_node_data(n); + if (h == NULL) { + queue_del(n); + queue_object_free((struct queue_object *)n); + return 0; + } if (before(net->seq, h->from)) return 0; /* continue */ else if (after(net->seq, h->to)) @@ -300,8 +301,15 @@ static int rs_queue_empty(struct queue_node *n, const void *data) } case Q_ELEM_OBJ: { struct cache_ftfw *cn; + struct cache_object *obj; cn = (struct cache_ftfw *) n; + if (h == NULL) { + queue_del(n); + obj = cache_data_get_object(STATE_SYNC(internal), cn); + cache_object_put(obj); + return 0; + } if (before(cn->seq, h->from)) return 0; else if (after(cn->seq, h->to)) @@ -309,6 +317,8 @@ static int rs_queue_empty(struct queue_node *n, const void *data) dp("queue: deleting from queue (seq=%u)\n", cn->seq); queue_del(n); + obj = cache_data_get_object(STATE_SYNC(internal), cn); + cache_object_put(obj); break; } } @@ -441,28 +451,6 @@ out: return ret; } -static void ftfw_send(struct nethdr *net, struct cache_object *obj) -{ - struct cache_ftfw *cn; - - switch(net->type) { - case NET_T_STATE_NEW: - case NET_T_STATE_UPD: - case NET_T_STATE_DEL: - cn = (struct cache_ftfw *) - cache_get_extra(STATE_SYNC(internal), obj); - - if (queue_in(rs_queue, &cn->qnode)) - queue_del(&cn->qnode); - - nethdr_set_hello(net); - - cn->seq = ntohl(net->seq); - queue_add(rs_queue, &cn->qnode); - break; - } -} - static int tx_queue_xmit(struct queue_node *n, const void *data) { queue_del(n); @@ -511,7 +499,9 @@ static int tx_queue_xmit(struct queue_node *n, const void *data) ntohl(net->seq), net->flags, ntohs(net->len)); mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); - ftfw_send(net, obj); + cn->seq = ntohl(net->seq); + queue_add(rs_queue, &cn->qnode); + /* we release the object once we get the acknowlegment */ break; } } @@ -527,6 +517,18 @@ static void ftfw_xmit(void) queue_len(tx_queue), queue_len(rs_queue)); } +static void ftfw_enqueue(struct cache_object *obj, int type) +{ + struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj); + if (queue_in(rs_queue, &cn->qnode)) { + queue_del(&cn->qnode); + queue_add(STATE_SYNC(tx_queue), &cn->qnode); + } else { + if (queue_add(STATE_SYNC(tx_queue), &cn->qnode)) + cache_object_get(obj); + } +} + struct sync_mode sync_ftfw = { .internal_cache_flags = LIFETIME, .external_cache_flags = LIFETIME, @@ -535,6 +537,6 @@ struct sync_mode sync_ftfw = { .kill = ftfw_kill, .local = ftfw_local, .recv = ftfw_recv, - .send = ftfw_send, + .enqueue = ftfw_enqueue, .xmit = ftfw_xmit, }; diff --git a/src/sync-mode.c b/src/sync-mode.c index 5ae9062..00e2f7b 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -442,14 +442,7 @@ static void dump_sync(struct nf_conntrack *ct) static void mcast_send_sync(struct cache_object *obj, int query) { - struct nethdr *net; - - net = BUILD_NETMSG(obj->ct, query); - - if (STATE_SYNC(sync)->send) - STATE_SYNC(sync)->send(net, obj); - - mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + STATE_SYNC(sync)->enqueue(obj, query); } static int purge_step(void *data1, void *data2) @@ -461,8 +454,11 @@ static int purge_step(void *data1, void *data2) ret = nfct_query(h, NFCT_Q_GET, obj->ct); if (ret == -1 && errno == ENOENT) { debug_ct(obj->ct, "overrun purge resync"); - mcast_send_sync(obj, NET_T_STATE_DEL); - cache_del_timer(STATE_SYNC(internal), obj, CONFIG(del_timeout)); + if (obj->status != C_OBJ_DEAD) { + cache_object_set_status(obj, C_OBJ_DEAD); + mcast_send_sync(obj, NET_T_STATE_DEL); + cache_object_put(obj); + } } return 0; @@ -552,8 +548,11 @@ static int event_destroy_sync(struct nf_conntrack *ct) debug_ct(ct, "can't destroy"); return 0; } - mcast_send_sync(obj, NET_T_STATE_DEL); - cache_del_timer(STATE_SYNC(internal), obj, CONFIG(del_timeout)); + if (obj->status != C_OBJ_DEAD) { + cache_object_set_status(obj, C_OBJ_DEAD); + mcast_send_sync(obj, NET_T_STATE_DEL); + cache_object_put(obj); + } debug_ct(ct, "internal destroy"); return 1; } diff --git a/src/sync-notrack.c b/src/sync-notrack.c index 4ded298..3b547ee 100644 --- a/src/sync-notrack.c +++ b/src/sync-notrack.c @@ -71,7 +71,8 @@ static int do_cache_to_tx(void *data1, void *data2) { struct cache_object *obj = data2; struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj); - queue_add(STATE_SYNC(tx_queue), &cn->qnode); + if (queue_add(STATE_SYNC(tx_queue), &cn->qnode)) + cache_object_get(obj); return 0; } @@ -152,6 +153,7 @@ static int tx_queue_xmit(struct queue_node *n, const void *data2) mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); queue_del(n); + cache_object_put(obj); break; } } @@ -163,11 +165,19 @@ static void notrack_xmit(void) queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit); } +static void notrack_enqueue(struct cache_object *obj, int query) +{ + struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj); + if (queue_add(STATE_SYNC(tx_queue), &cn->qnode)) + cache_object_get(obj); +} + struct sync_mode sync_notrack = { .internal_cache_flags = LIFETIME, .external_cache_flags = LIFETIME, .internal_cache_extra = &cache_notrack_extra, .local = notrack_local, .recv = notrack_recv, + .enqueue = notrack_enqueue, .xmit = notrack_xmit, }; |