diff options
| -rw-r--r-- | include/cache.h | 7 | ||||
| -rw-r--r-- | include/sync.h | 2 | ||||
| -rw-r--r-- | src/cache.c | 59 | ||||
| -rw-r--r-- | src/sync-alarm.c | 75 | ||||
| -rw-r--r-- | src/sync-ftfw.c | 70 | ||||
| -rw-r--r-- | src/sync-mode.c | 23 | ||||
| -rw-r--r-- | src/sync-notrack.c | 12 | 
7 files changed, 152 insertions, 96 deletions
| diff --git a/include/cache.h b/include/cache.h index fd8e05f..03b6822 100644 --- a/include/cache.h +++ b/include/cache.h @@ -4,7 +4,6 @@  #include <stdint.h>  #include <stddef.h>  #include "hash.h" -#include "alarm.h"  /* cache features */  enum { @@ -36,7 +35,7 @@ struct cache_object {  	struct	nf_conntrack *ct;  	struct	cache *cache;  	int	status; -	struct	alarm_block alarm; +	int	refcnt;  	char	data[0];  }; @@ -106,12 +105,14 @@ void cache_destroy(struct cache *e);  struct cache_object *cache_object_new(struct cache *c, struct nf_conntrack *ct);  void cache_object_free(struct cache_object *obj); +void cache_object_get(struct cache_object *obj); +int cache_object_put(struct cache_object *obj); +void cache_object_set_status(struct cache_object *obj, int status);  int cache_add(struct cache *c, struct cache_object *obj, int id);  void cache_update(struct cache *c, struct cache_object *obj, int id, struct nf_conntrack *ct);  struct cache_object *cache_update_force(struct cache *c, struct nf_conntrack *ct);  void cache_del(struct cache *c, struct cache_object *obj); -int cache_del_timer(struct cache *c, struct cache_object *obj, int timeout);  struct cache_object *cache_find(struct cache *c, struct nf_conntrack *ct, int *pos);  void cache_stats(const struct cache *c, int fd);  void cache_stats_extended(const struct cache *c, int fd); diff --git a/include/sync.h b/include/sync.h index bced1cc..51f8f5b 100644 --- a/include/sync.h +++ b/include/sync.h @@ -17,7 +17,7 @@ struct sync_mode {  	void (*kill)(void);  	int  (*local)(int fd, int type, void *data);  	int  (*recv)(const struct nethdr *net); -	void (*send)(struct nethdr *net, struct cache_object *obj); +	void (*enqueue)(struct cache_object *obj, int type);  	void (*xmit)(void);  }; diff --git a/src/cache.c b/src/cache.c index c46498b..1e08a33 100644 --- a/src/cache.c +++ b/src/cache.c @@ -174,8 +174,6 @@ void cache_destroy(struct cache *c)  	free(c);  } -static void __del_timeout(struct alarm_block *a, void *data); -  struct cache_object *cache_object_new(struct cache *c, struct nf_conntrack *ct)  {  	struct cache_object *obj; @@ -187,7 +185,6 @@ struct cache_object *cache_object_new(struct cache *c, struct nf_conntrack *ct)  		return NULL;  	}  	obj->cache = c; -	init_alarm(&obj->alarm, obj, __del_timeout);  	if ((obj->ct = nfct_new()) == NULL) {  		free(obj); @@ -207,6 +204,30 @@ void cache_object_free(struct cache_object *obj)  	free(obj);  } +int cache_object_put(struct cache_object *obj) +{ +	if (--obj->refcnt == 0) { +		cache_del(obj->cache, obj); +		cache_object_free(obj); +		return 1; +	} +	return 0; +} + +void cache_object_get(struct cache_object *obj) +{ +	obj->refcnt++; +} + +void cache_object_set_status(struct cache_object *obj, int status) +{ +	if (status == C_OBJ_DEAD) { +		obj->cache->stats.del_ok++; +		obj->cache->stats.active--; +	} +	obj->status = status; +} +  static int __add(struct cache *c, struct cache_object *obj, int id)  {  	int ret; @@ -227,6 +248,7 @@ static int __add(struct cache *c, struct cache_object *obj, int id)  	c->stats.active++;  	obj->status = C_OBJ_NEW; +	obj->refcnt++;  	return 0;  } @@ -292,7 +314,6 @@ void cache_del(struct cache *c, struct cache_object *obj)  		c->stats.del_ok++;  		c->stats.active--;  	} -	del_alarm(&obj->alarm);  	__del(c, obj);  } @@ -322,36 +343,6 @@ cache_update_force(struct cache *c, struct nf_conntrack *ct)  	return obj;  } -static void __del_timeout(struct alarm_block *a, void *data) -{ -	struct cache_object *obj = (struct cache_object *) data; -	__del(obj->cache, obj); -	cache_object_free(obj); -} - -int cache_del_timer(struct cache *c, struct cache_object *obj, int timeout) -{ -	if (timeout <= 0) { -		cache_del(c, obj); -		cache_object_free(obj); -		return 1; -	} -	if (obj->status != C_OBJ_DEAD) { -		obj->status = C_OBJ_DEAD; -		add_alarm(&obj->alarm, timeout, 0); -		/* -		 * increase stats even if this entry was not really -		 * removed yet. We do not want to make people think -		 * that the replication protocol does not work  -		 * properly. -		 */ -		c->stats.del_ok++; -		c->stats.active--; -		return 1; -	} -	return 0; -} -  struct cache_object *  cache_find(struct cache *c, struct nf_conntrack *ct, int *id)  { diff --git a/src/sync-alarm.c b/src/sync-alarm.c index 34937fe..a2f17ac 100644 --- a/src/sync-alarm.c +++ b/src/sync-alarm.c @@ -21,14 +21,21 @@  #include "network.h"  #include "alarm.h"  #include "cache.h" +#include "queue.h"  #include "debug.h"  #include <stdlib.h>  #include <string.h> +struct cache_alarm { +	struct queue_node	qnode; +	struct alarm_block	alarm; +}; + +static void alarm_enqueue(struct cache_object *obj, int query); +  static void refresher(struct alarm_block *a, void *data)  { -	struct nethdr *net;  	struct cache_object *obj = data;  	debug_ct(obj->ct, "persistence update"); @@ -37,36 +44,37 @@ static void refresher(struct alarm_block *a, void *data)  		  random() % CONFIG(refresh) + 1,  		  ((random() % 5 + 1)  * 200000) - 1); -	net = BUILD_NETMSG(obj->ct, NET_T_STATE_UPD); -	mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); +	alarm_enqueue(obj, NET_T_STATE_UPD);  }  static void cache_alarm_add(struct cache_object *obj, void *data)  { -	struct alarm_block *a = data; +	struct cache_alarm *ca = data; -	init_alarm(a, obj, refresher); -	add_alarm(a, +	queue_node_init(&ca->qnode, Q_ELEM_OBJ); +	init_alarm(&ca->alarm, obj, refresher); +	add_alarm(&ca->alarm,  		  random() % CONFIG(refresh) + 1,  		  ((random() % 5 + 1)  * 200000) - 1);  }  static void cache_alarm_update(struct cache_object *obj, void *data)  { -	struct alarm_block *a = data; -	add_alarm(a,  +	struct cache_alarm *ca = data; +	add_alarm(&ca->alarm,   		  random() % CONFIG(refresh) + 1,  		  ((random() % 5 + 1)  * 200000) - 1);  }  static void cache_alarm_destroy(struct cache_object *obj, void *data)  { -	struct alarm_block *a = data; -	del_alarm(a); +	struct cache_alarm *ca = data; +	queue_del(&ca->qnode); +	del_alarm(&ca->alarm);  }  static struct cache_extra cache_alarm_extra = { -	.size 		= sizeof(struct alarm_block), +	.size 		= sizeof(struct cache_alarm),  	.add		= cache_alarm_add,  	.update		= cache_alarm_update,  	.destroy	= cache_alarm_destroy @@ -102,9 +110,54 @@ static int alarm_recv(const struct nethdr *net)  	return 0;  } +static void alarm_enqueue(struct cache_object *obj, int query) +{ +	struct cache_alarm *ca = cache_get_extra(STATE_SYNC(internal), obj); +	if (queue_add(STATE_SYNC(tx_queue), &ca->qnode)) +		cache_object_get(obj); +} + +static int tx_queue_xmit(struct queue_node *n, const void *data) +{ +	struct nethdr *net; + +	queue_del(n); + +	switch(n->type) { +	case Q_ELEM_CTL: +		net = queue_node_data(n); +		nethdr_set_ctl(net); +		HDR_HOST2NETWORK(net); +		mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); +		queue_object_free((struct queue_object *)n); +		break; +	case Q_ELEM_OBJ: { +		struct cache_alarm *ca; +		struct cache_object *obj; +		int type; + +		ca = (struct cache_alarm *)n; +		obj = cache_data_get_object(STATE_SYNC(internal), ca); +		type = object_status_to_network_type(obj->status); +		net = BUILD_NETMSG(obj->ct, type); +		mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); +		cache_object_put(obj); +		break; +	} +	} +	return 0; +} + +static void alarm_xmit(void) +{ +	queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit); +} +  struct sync_mode sync_alarm = {  	.internal_cache_flags	= LIFETIME,  	.external_cache_flags	= TIMER | LIFETIME,  	.internal_cache_extra	= &cache_alarm_extra,  	.recv 			= alarm_recv, +	.enqueue		= alarm_enqueue, +	.xmit			= alarm_xmit,  }; diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index d544a7b..a287ecd 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -168,11 +168,13 @@ static int do_cache_to_tx(void *data1, void *data2)  	struct cache_object *obj = data2;  	struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj); -	if (queue_in(rs_queue, &cn->qnode)) +	if (queue_in(rs_queue, &cn->qnode)) {  		queue_del(&cn->qnode); - -	queue_add(STATE_SYNC(tx_queue), &cn->qnode); - +		queue_add(STATE_SYNC(tx_queue), &cn->qnode); +	} else { +		if (queue_add(STATE_SYNC(tx_queue), &cn->qnode)) +			cache_object_get(obj); +	}  	return 0;  } @@ -278,16 +280,15 @@ static int rs_queue_empty(struct queue_node *n, const void *data)  {  	const struct nethdr_ack *h = data; -	if (h == NULL) { -		dp("inconditional remove from queue (seq=%u)\n", net->seq); -		queue_del(n); -		return 0; -	} -  	switch(n->type) {  	case Q_ELEM_CTL: {  		struct nethdr_ack *net = queue_node_data(n); +		if (h == NULL) { +			queue_del(n); +			queue_object_free((struct queue_object *)n); +			return 0; +		}  		if (before(net->seq, h->from))  			return 0;	/* continue */  		else if (after(net->seq, h->to)) @@ -300,8 +301,15 @@ static int rs_queue_empty(struct queue_node *n, const void *data)  	}  	case Q_ELEM_OBJ: {  		struct cache_ftfw *cn; +		struct cache_object *obj;  		cn = (struct cache_ftfw *) n; +		if (h == NULL) { +			queue_del(n); +			obj = cache_data_get_object(STATE_SYNC(internal), cn); +			cache_object_put(obj); +			return 0; +		}  		if (before(cn->seq, h->from))  			return 0;  		else if (after(cn->seq, h->to)) @@ -309,6 +317,8 @@ static int rs_queue_empty(struct queue_node *n, const void *data)  		dp("queue: deleting from queue (seq=%u)\n", cn->seq);  		queue_del(n); +		obj = cache_data_get_object(STATE_SYNC(internal), cn); +		cache_object_put(obj);  		break;  	}  	} @@ -441,28 +451,6 @@ out:  	return ret;  } -static void ftfw_send(struct nethdr *net, struct cache_object *obj) -{ -	struct cache_ftfw *cn; - -	switch(net->type) { -	case NET_T_STATE_NEW: -	case NET_T_STATE_UPD: -	case NET_T_STATE_DEL: -		cn = (struct cache_ftfw *)  -			cache_get_extra(STATE_SYNC(internal), obj); - -		if (queue_in(rs_queue, &cn->qnode)) -			queue_del(&cn->qnode); - -		nethdr_set_hello(net); - -		cn->seq = ntohl(net->seq); -		queue_add(rs_queue, &cn->qnode); -		break; -	} -} -  static int tx_queue_xmit(struct queue_node *n, const void *data)  {  	queue_del(n); @@ -511,7 +499,9 @@ static int tx_queue_xmit(struct queue_node *n, const void *data)  	                ntohl(net->seq), net->flags, ntohs(net->len));  		mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); -		ftfw_send(net, obj); +		cn->seq = ntohl(net->seq); +		queue_add(rs_queue, &cn->qnode); +		/* we release the object once we get the acknowlegment */  		break;  	}  	} @@ -527,6 +517,18 @@ static void ftfw_xmit(void)  		queue_len(tx_queue), queue_len(rs_queue));  } +static void ftfw_enqueue(struct cache_object *obj, int type) +{ +	struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj); +	if (queue_in(rs_queue, &cn->qnode)) { +		queue_del(&cn->qnode); +		queue_add(STATE_SYNC(tx_queue), &cn->qnode); +	} else { +		if (queue_add(STATE_SYNC(tx_queue), &cn->qnode)) +			cache_object_get(obj); +	} +} +  struct sync_mode sync_ftfw = {  	.internal_cache_flags	= LIFETIME,  	.external_cache_flags	= LIFETIME, @@ -535,6 +537,6 @@ struct sync_mode sync_ftfw = {  	.kill			= ftfw_kill,  	.local			= ftfw_local,  	.recv			= ftfw_recv, -	.send			= ftfw_send, +	.enqueue		= ftfw_enqueue,  	.xmit			= ftfw_xmit,  }; diff --git a/src/sync-mode.c b/src/sync-mode.c index 5ae9062..00e2f7b 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -442,14 +442,7 @@ static void dump_sync(struct nf_conntrack *ct)  static void mcast_send_sync(struct cache_object *obj, int query)  { -	struct nethdr *net; - -	net = BUILD_NETMSG(obj->ct, query); - -	if (STATE_SYNC(sync)->send) -		STATE_SYNC(sync)->send(net, obj); - -	mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); +	STATE_SYNC(sync)->enqueue(obj, query);  }  static int purge_step(void *data1, void *data2) @@ -461,8 +454,11 @@ static int purge_step(void *data1, void *data2)  	ret = nfct_query(h, NFCT_Q_GET, obj->ct);  	if (ret == -1 && errno == ENOENT) {  		debug_ct(obj->ct, "overrun purge resync"); -		mcast_send_sync(obj, NET_T_STATE_DEL); -		cache_del_timer(STATE_SYNC(internal), obj, CONFIG(del_timeout)); +		if (obj->status != C_OBJ_DEAD) { +			cache_object_set_status(obj, C_OBJ_DEAD); +			mcast_send_sync(obj, NET_T_STATE_DEL); +			cache_object_put(obj); +		}  	}  	return 0; @@ -552,8 +548,11 @@ static int event_destroy_sync(struct nf_conntrack *ct)  		debug_ct(ct, "can't destroy");  		return 0;  	} -	mcast_send_sync(obj, NET_T_STATE_DEL); -	cache_del_timer(STATE_SYNC(internal), obj, CONFIG(del_timeout)); +	if (obj->status != C_OBJ_DEAD) { +		cache_object_set_status(obj, C_OBJ_DEAD); +		mcast_send_sync(obj, NET_T_STATE_DEL); +		cache_object_put(obj); +	}  	debug_ct(ct, "internal destroy");  	return 1;  } diff --git a/src/sync-notrack.c b/src/sync-notrack.c index 4ded298..3b547ee 100644 --- a/src/sync-notrack.c +++ b/src/sync-notrack.c @@ -71,7 +71,8 @@ static int do_cache_to_tx(void *data1, void *data2)  {  	struct cache_object *obj = data2;  	struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj); -	queue_add(STATE_SYNC(tx_queue), &cn->qnode); +	if (queue_add(STATE_SYNC(tx_queue), &cn->qnode)) +		cache_object_get(obj);  	return 0;  } @@ -152,6 +153,7 @@ static int tx_queue_xmit(struct queue_node *n, const void *data2)  		mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);  		queue_del(n); +		cache_object_put(obj);  		break;  	}  	} @@ -163,11 +165,19 @@ static void notrack_xmit(void)  	queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit);  } +static void notrack_enqueue(struct cache_object *obj, int query) +{ +	struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj); +	if (queue_add(STATE_SYNC(tx_queue), &cn->qnode)) +		cache_object_get(obj); +} +  struct sync_mode sync_notrack = {  	.internal_cache_flags	= LIFETIME,  	.external_cache_flags	= LIFETIME,  	.internal_cache_extra	= &cache_notrack_extra,  	.local			= notrack_local,  	.recv			= notrack_recv, +	.enqueue		= notrack_enqueue,  	.xmit			= notrack_xmit,  }; | 
