diff options
| -rw-r--r-- | include/conntrackd.h | 1 | ||||
| -rw-r--r-- | include/queue.h | 51 | ||||
| -rw-r--r-- | include/sync.h | 6 | ||||
| -rw-r--r-- | src/event.c | 20 | ||||
| -rw-r--r-- | src/queue.c | 124 | ||||
| -rw-r--r-- | src/sync-ftfw.c | 367 | ||||
| -rw-r--r-- | src/sync-mode.c | 20 | ||||
| -rw-r--r-- | src/sync-notrack.c | 114 | 
8 files changed, 360 insertions, 343 deletions
diff --git a/include/conntrackd.h b/include/conntrackd.h index 67397b8..8cb520d 100644 --- a/include/conntrackd.h +++ b/include/conntrackd.h @@ -150,7 +150,6 @@ struct ct_sync_state {  	struct mcast_sock *mcast_server;  /* multicast socket: incoming */  	struct mcast_sock *mcast_client;  /* multicast socket: outgoing  */ -	struct evfd *evfd;		  /* event fd */  	struct sync_mode *sync;		/* sync mode */ diff --git a/include/queue.h b/include/queue.h index 5a9cf39..ef56323 100644 --- a/include/queue.h +++ b/include/queue.h @@ -1,28 +1,53 @@  #ifndef _QUEUE_H_  #define _QUEUE_H_ +#include <stdint.h>  #include "linux_list.h" -struct queue { -	size_t max_size; -	size_t cur_size; -	unsigned int num_elems; -	struct list_head head; +struct queue_node { +	struct list_head	head; +	uint32_t		type; +	struct queue		*owner; +	size_t 			size;  }; -struct queue_node { -	struct list_head head; -	size_t size; -	char data[0]; +enum { +	Q_ELEM_OBJ = 0, +	Q_ELEM_CTL = 1 +}; + +void queue_node_init(struct queue_node *n, int type); +void *queue_node_data(struct queue_node *n); + +struct queue_object { +	struct queue_node	qnode; +	char			data[0];  }; -struct queue *queue_create(size_t max_size); +struct queue_object *queue_object_new(int type, size_t size); +void queue_object_free(struct queue_object *obj); + +struct evfd; + +struct queue { +	unsigned int		max_elems; +	unsigned int		num_elems; +	uint32_t		flags; +	struct list_head	head; +	struct evfd		*evfd; +}; + +#define QUEUE_F_EVFD (1U << 0) + +struct queue *queue_create(int max_objects, unsigned int flags);  void queue_destroy(struct queue *b);  unsigned int queue_len(const struct queue *b); -int queue_add(struct queue *b, const void *data, size_t size); -void queue_del(struct queue *b, void *data); +int queue_add(struct queue *b, struct queue_node *n); +int queue_del(struct queue_node *n); +int queue_in(struct queue *b, struct queue_node *n);  void queue_iterate(struct queue *b,  		   const void *data, -		   int (*iterate)(void *data1, const void *data2)); +		   int (*iterate)(struct queue_node *n, const void *data2)); +int queue_get_eventfd(struct queue *b);  #endif diff --git a/include/sync.h b/include/sync.h index 60c9fae..9a9540c 100644 --- a/include/sync.h +++ b/include/sync.h @@ -1,8 +1,11 @@  #ifndef _SYNC_HOOKS_H_  #define _SYNC_HOOKS_H_ +#include <sys/select.h> +  struct nethdr;  struct cache_object; +struct fds;  struct sync_mode {  	int internal_cache_flags; @@ -15,7 +18,8 @@ struct sync_mode {  	int  (*local)(int fd, int type, void *data);  	int  (*recv)(const struct nethdr *net);  	void (*send)(struct nethdr *net, struct cache_object *obj); -	void (*run)(void); +	void (*run)(fd_set *readfds); +	int (*register_fds)(struct fds *fds);  };  extern struct sync_mode sync_alarm; diff --git a/src/event.c b/src/event.c index ed78835..d1dfe72 100644 --- a/src/event.c +++ b/src/event.c @@ -17,6 +17,8 @@   */  #include <unistd.h>  #include <stdlib.h> +#include <unistd.h> +#include <fcntl.h>  #include "event.h" @@ -37,6 +39,7 @@ struct evfd *create_evfd(void)  		free(e);  		return NULL;  	} +	fcntl(e->fds[0], F_SETFL, O_NONBLOCK);  	return e;  } @@ -55,19 +58,20 @@ int get_read_evfd(struct evfd *evfd)  int write_evfd(struct evfd *evfd)  { -	int data = 0; +	int data = 0, ret = 0; -	if (evfd->read) -		return 0; +	if (evfd->read == 0) +		ret = write(evfd->fds[1], &data, sizeof(data)); +	evfd->read++; -	evfd->read = 1; -	return write(evfd->fds[1], &data, sizeof(data)); +	return ret;  }  int read_evfd(struct evfd *evfd)  { -	int data; +	int data, ret = 0; -	evfd->read = 0; -	return read(evfd->fds[0], &data, sizeof(data)); +	if (--evfd->read == 0) +		ret = read(evfd->fds[0], &data, sizeof(data)); +	return ret;  } diff --git a/src/queue.c b/src/queue.c index cdd70ae..cffcc93 100644 --- a/src/queue.c +++ b/src/queue.c @@ -1,5 +1,5 @@  /* - * (C) 2006-2008 by Pablo Neira Ayuso <pablo@netfilter.org> + * (C) 2006-2009 by Pablo Neira Ayuso <pablo@netfilter.org>   *    * This program is free software; you can redistribute it and/or modify   * it under the terms of the GNU General Public License as published by @@ -17,110 +17,122 @@   */  #include "queue.h" +#include "event.h"  #include <errno.h>  #include <stdlib.h>  #include <string.h> -struct queue *queue_create(size_t max_size) +struct queue *queue_create(int max_objects, unsigned int flags)  {  	struct queue *b; -	b = malloc(sizeof(struct queue)); +	b = calloc(sizeof(struct queue), 1);  	if (b == NULL)  		return NULL; -	memset(b, 0, sizeof(struct queue)); -	b->max_size = max_size; +	b->max_elems = max_objects;  	INIT_LIST_HEAD(&b->head); +	b->flags = flags; + +	if (flags & QUEUE_F_EVFD) { +		b->evfd = create_evfd(); +		if (b->evfd == NULL) { +			free(b); +			return NULL; +		} +	}  	return b;  }  void queue_destroy(struct queue *b)  { -	struct list_head *i, *tmp; -	struct queue_node *node; - -	/* XXX: set cur_size and num_elems */ -	list_for_each_safe(i, tmp, &b->head) { -		node = (struct queue_node *) i; -		list_del(i); -		free(node); -	} +	if (b->flags & QUEUE_F_EVFD) +		destroy_evfd(b->evfd);  	free(b);  } -static struct queue_node *queue_node_create(const void *data, size_t size) +void queue_node_init(struct queue_node *n, int type)  { -	struct queue_node *n; +	INIT_LIST_HEAD(&n->head); +	n->type = type; +} -	n = malloc(sizeof(struct queue_node) + size); -	if (n == NULL) +void *queue_node_data(struct queue_node *n) +{ +	return ((char *)n) + sizeof(struct queue_node); +} + +struct queue_object *queue_object_new(int type, size_t size) +{ +	struct queue_object *obj; + +	obj = calloc(sizeof(struct queue_object) + size, 1); +	if (obj == NULL)  		return NULL; -	n->size = size; -	memcpy(n->data, data, size); +	obj->qnode.size = size; +	queue_node_init(&obj->qnode, type); -	return n; +	return obj;  } -int queue_add(struct queue *b, const void *data, size_t size) +void queue_object_free(struct queue_object *obj)  { -	int ret = 0; -	struct queue_node *n; - -	/* does it fit this queue? */ -	if (size > b->max_size) { -		errno = ENOSPC; -		ret = -1; -		goto err; -	} +	free(obj); +} -retry: -	/* queue is full: kill the oldest entry */ -	if (b->cur_size + size > b->max_size) { -		n = (struct queue_node *) b->head.prev; -		list_del(b->head.prev); -		b->cur_size -= n->size; -		free(n); -		goto retry; -	} +int queue_add(struct queue *b, struct queue_node *n) +{ +	if (!list_empty(&n->head)) +		return 0; -	n = queue_node_create(data, size); -	if (n == NULL) { -		ret = -1; -		goto err; +	if (b->num_elems >= b->max_elems) { +		errno = ENOSPC; +		return -1;  	} - +	n->owner = b;  	list_add_tail(&n->head, &b->head); -	b->cur_size += size;  	b->num_elems++; +	if (b->evfd) +		write_evfd(b->evfd); +	return 1; +} -err: -	return ret; +int queue_del(struct queue_node *n) +{ +	if (list_empty(&n->head)) +		return 0; + +	list_del_init(&n->head); +	n->owner->num_elems--; +	if (n->owner->evfd) +		read_evfd(n->owner->evfd); +	n->owner = NULL; +	return 1;  } -void queue_del(struct queue *b, void *data) +int queue_in(struct queue *b, struct queue_node *n)  { -	struct queue_node *n = container_of(data, struct queue_node, data);  +	return b == n->owner; +} -	list_del(&n->head); -	b->cur_size -= n->size; -	b->num_elems--; -	free(n); +int queue_get_eventfd(struct queue *b) +{ +	return get_read_evfd(b->evfd);  }  void queue_iterate(struct queue *b,   		   const void *data,  -		   int (*iterate)(void *data1, const void *data2)) +		   int (*iterate)(struct queue_node *n, const void *data2))  {  	struct list_head *i, *tmp;  	struct queue_node *n;  	list_for_each_safe(i, tmp, &b->head) {  		n = (struct queue_node *) i; -		if (iterate(n->data, data)) +		if (iterate(n, data))  			break;  	}  } diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index bddc18c..bb53849 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -24,7 +24,7 @@  #include "alarm.h"  #include "log.h"  #include "cache.h" -#include "event.h" +#include "fds.h"  #include <string.h> @@ -34,12 +34,8 @@  #define dp(...)  #endif -static LIST_HEAD(rs_list); -static LIST_HEAD(tx_list); -static unsigned int rs_list_len; -static unsigned int tx_list_len; -static struct queue *rs_queue; -static struct queue *tx_queue; +struct queue *tx_queue; +struct queue *rs_queue;  static uint32_t exp_seq;  static uint32_t window;  static uint32_t ack_from; @@ -58,8 +54,7 @@ static int say_hello_back;  #define ALIVE_INT 1  struct cache_ftfw { -	struct list_head 	rs_list; -	struct list_head	tx_list; +	struct queue_node	qnode;  	uint32_t 		seq;  }; @@ -67,24 +62,13 @@ static void cache_ftfw_add(struct cache_object *obj, void *data)  {  	struct cache_ftfw *cn = data;  	/* These nodes are not inserted in the list */ -	INIT_LIST_HEAD(&cn->rs_list); -	INIT_LIST_HEAD(&cn->tx_list); +	queue_node_init(&cn->qnode, Q_ELEM_OBJ);  }  static void cache_ftfw_del(struct cache_object *obj, void *data)  {  	struct cache_ftfw *cn = data; - -	/* this node is already out of the list */ -	if (!list_empty(&cn->rs_list)) { -		/* no need for list_del_init since the entry is destroyed */ -		list_del(&cn->rs_list); -		rs_list_len--; -	} -	if (!list_empty(&cn->tx_list)) { -		list_del(&cn->tx_list); -		tx_list_len--; -	} +	queue_del(&cn->qnode);  }  static struct cache_extra cache_ftfw_extra = { @@ -95,54 +79,64 @@ static struct cache_extra cache_ftfw_extra = {  static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)  { -	struct nethdr_ack ack = { -		.type  = NET_T_CTL, -		.flags = flags, -		.from  = from, -		.to    = to, -	}; +	struct queue_object *qobj; +	struct nethdr_ack *ack; + +	qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); +	if (qobj == NULL) +		return; + +	ack		= (struct nethdr_ack *)qobj->data; +	ack->type 	= NET_T_CTL; +	ack->flags	= flags; +	ack->from	= from; +	ack->to		= to;  	switch(hello_state) {  	case HELLO_INIT:  		hello_state = HELLO_SAY;  		/* fall through */  	case HELLO_SAY: -		ack.flags |= NET_F_HELLO; +		ack->flags |= NET_F_HELLO;  		break;  	}  	if (say_hello_back) { -		ack.flags |= NET_F_HELLO_BACK; +		ack->flags |= NET_F_HELLO_BACK;  		say_hello_back = 0;  	} -	queue_add(tx_queue, &ack, NETHDR_ACK_SIZ); -	write_evfd(STATE_SYNC(evfd)); +	queue_add(tx_queue, &qobj->qnode);  }  static void tx_queue_add_ctlmsg2(uint32_t flags)  { -	struct nethdr ctl = { -		.type  = NET_T_CTL, -		.flags = flags, -	}; +	struct queue_object *qobj; +	struct nethdr *ctl; + +	qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); +	if (qobj == NULL) +		return; + +	ctl		= (struct nethdr *)qobj->data; +	ctl->type 	= NET_T_CTL; +	ctl->flags	= flags;  	switch(hello_state) {  	case HELLO_INIT:  		hello_state = HELLO_SAY;  		/* fall through */  	case HELLO_SAY: -		ctl.flags |= NET_F_HELLO; +		ctl->flags |= NET_F_HELLO;  		break;  	}  	if (say_hello_back) { -		ctl.flags |= NET_F_HELLO_BACK; +		ctl->flags |= NET_F_HELLO_BACK;  		say_hello_back = 0;  	} -	queue_add(tx_queue, &ctl, NETHDR_SIZ); -	write_evfd(STATE_SYNC(evfd)); +	queue_add(tx_queue, &qobj->qnode);  }  /* this function is called from the alarm framework */ @@ -156,17 +150,18 @@ static void do_alive_alarm(struct alarm_block *a, void *data)  		ack_from_set = 0;  	} else  		tx_queue_add_ctlmsg2(NET_F_ALIVE); + +	add_alarm(&alive_alarm, ALIVE_INT, 0);  }  static int ftfw_init(void)  { -	tx_queue = queue_create(CONFIG(resend_queue_size)); +	tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD);  	if (tx_queue == NULL) {  		dlog(LOG_ERR, "cannot create tx queue");  		return -1;  	} - -	rs_queue = queue_create(CONFIG(resend_queue_size)); +	rs_queue = queue_create(INT_MAX, 0);  	if (rs_queue == NULL) {  		dlog(LOG_ERR, "cannot create rs queue");  		return -1; @@ -192,45 +187,47 @@ static int do_cache_to_tx(void *data1, void *data2)  	struct cache_object *obj = data2;  	struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj); -	/* repeated request for resync? */ -	if (!list_empty(&cn->tx_list)) -		return 0; +	if (queue_in(rs_queue, &cn->qnode)) +		queue_del(&cn->qnode); -	/* add to tx list */ -	list_add_tail(&cn->tx_list, &tx_list); -	tx_list_len++; -	write_evfd(STATE_SYNC(evfd)); +	queue_add(tx_queue, &cn->qnode);  	return 0;  } -static int debug_rs_queue_dump_step(void *data1, const void *data2) +static int rs_queue_dump(struct queue_node *n, const void *data2)  { -	struct nethdr_ack *net = data1;  	const int *fd = data2;  	char buf[512];  	int size; -	size = sprintf(buf, "seq:%u flags:%u\n", net->seq, net->flags); +	switch(n->type) { +		case Q_ELEM_CTL: { +			struct nethdr *net = queue_node_data(n); +			size = sprintf(buf, "control -> seq:%u flags:%u\n", +					    net->seq, net->flags); +			break; +		} +		case Q_ELEM_OBJ: { +			struct cache_ftfw *cn = (struct cache_ftfw *) n; +			size = sprintf(buf, "object -> seq:%u\n", cn->seq); +		break; +		} +		default: +			return 0; +	}  	send(*fd, buf, size, 0);  	return 0;  }  static void debug_rs_dump(int fd)  { -	struct cache_ftfw *cn, *tmp;  	char buf[512];  	int size; -	size = sprintf(buf, "resent list (len=%u):\n", rs_list_len); -	send(fd, buf, size, 0); -	list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { -		size = sprintf(buf, "seq:%u\n", cn->seq); -		send(fd, buf, size, 0); -	} -	size = sprintf(buf, "\nresent queue (len=%u):\n", queue_len(rs_queue)); +	size = sprintf(buf, "resent queue (len=%u):\n", queue_len(rs_queue));  	send(fd, buf, size, 0); -	queue_iterate(rs_queue, &fd, debug_rs_queue_dump_step); +	queue_iterate(rs_queue, &fd, rs_queue_dump);  }  static int ftfw_local(int fd, int type, void *data) @@ -257,87 +254,84 @@ static int ftfw_local(int fd, int type, void *data)  	return ret;  } -static int rs_queue_to_tx(void *data1, const void *data2) +static int rs_queue_to_tx(struct queue_node *n, const void *data)  { -	struct nethdr_ack *net = data1; -	const struct nethdr_ack *nack = data2; - -	if (before(net->seq, nack->from)) -		return 0;	/* continue */ -	else if (after(net->seq, nack->to)) -		return 1;	/* break */ - -	dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", -		net->seq, net->flags, net->len); -	queue_add(tx_queue, net, net->len); -	write_evfd(STATE_SYNC(evfd)); -	queue_del(rs_queue, net); -	return 0; -} +	const struct nethdr_ack *nack = data; -static int rs_queue_empty(void *data1, const void *data2) -{ -	struct nethdr *net = data1; -	const struct nethdr_ack *h = data2; +	switch(n->type) { +	case Q_ELEM_CTL: { +		struct nethdr_ack *net = queue_node_data(n); -	if (h == NULL) { -		dp("inconditional remove from queue (seq=%u)\n", net->seq); -		queue_del(rs_queue, data1); -		return 0; +		if (before(net->seq, nack->from)) +			return 0;	/* continue */ +		else if (after(net->seq, nack->to)) +			return 1;	/* break */ + +		dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", +			net->seq, net->flags, net->len); + +		queue_del(n); +		queue_add(tx_queue, n); +		break;  	} +	case Q_ELEM_OBJ: { +		struct cache_ftfw *cn; -	if (before(net->seq, h->from)) -		return 0;	/* continue */ -	else if (after(net->seq, h->to)) -		return 1;	/* break */ +		cn = (struct cache_ftfw *) n; +		if (before(cn->seq, nack->from)) +			return 0; +		else if (after(cn->seq, nack->to)) +			return 1; -	dp("remove from queue (seq=%u)\n", net->seq); -	queue_del(rs_queue, data1); +		dp("resending nack'ed (oldseq=%u)\n", cn->seq); + +		queue_del(n); +		queue_add(tx_queue, n); +		break; +	} +	}  	return 0;  } -static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to) +static int rs_queue_empty(struct queue_node *n, const void *data)  { -	struct cache_ftfw *cn, *tmp; +	const struct nethdr_ack *h = data; -	list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { -		struct cache_object *obj;; -		 -		obj = cache_data_get_object(STATE_SYNC(internal), cn); -		if (before(cn->seq, from)) -			continue; -		else if (after(cn->seq, to)) -			break; +	if (h == NULL) { +		dp("inconditional remove from queue (seq=%u)\n", net->seq); +		queue_del(n); +		return 0; +	} -		dp("resending nack'ed (oldseq=%u)\n", cn->seq); -		list_del_init(&cn->rs_list); -		rs_list_len--; -		/* we received a request for resync before this nack? */ -		if (list_empty(&cn->tx_list)) { -			list_add_tail(&cn->tx_list, &tx_list); -			tx_list_len++; -		} -		write_evfd(STATE_SYNC(evfd)); -	}  -} +	switch(n->type) { +	case Q_ELEM_CTL: { +		struct nethdr_ack *net = queue_node_data(n); -static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to) -{ -	struct cache_ftfw *cn, *tmp; +		if (before(net->seq, h->from)) +			return 0;	/* continue */ +		else if (after(net->seq, h->to)) +			return 1;	/* break */ -	list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { -		struct cache_object *obj; +		dp("remove from queue (seq=%u)\n", net->seq); +		queue_del(n); +		queue_object_free((struct queue_object *)n); +		break; +	} +	case Q_ELEM_OBJ: { +		struct cache_ftfw *cn; -		obj = cache_data_get_object(STATE_SYNC(internal), cn); -		if (before(cn->seq, from)) -			continue; -		else if (after(cn->seq, to)) -			break; +		cn = (struct cache_ftfw *) n; +		if (before(cn->seq, h->from)) +			return 0; +		else if (after(cn->seq, h->to)) +			return 1;  		dp("queue: deleting from queue (seq=%u)\n", cn->seq); -		list_del_init(&cn->rs_list); -		rs_list_len--; +		queue_del(n); +		break;  	} +	} +	return 0;  }  static int digest_msg(const struct nethdr *net) @@ -351,7 +345,6 @@ static int digest_msg(const struct nethdr *net)  		if (before(h->to, h->from))  			return MSG_BAD; -		rs_list_empty(STATE_SYNC(internal), h->from, h->to);  		queue_iterate(rs_queue, h, rs_queue_empty);  		return MSG_CTL; @@ -361,7 +354,6 @@ static int digest_msg(const struct nethdr *net)  		if (before(nack->to, nack->from))  			return MSG_BAD; -		rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to);  		queue_iterate(rs_queue, nack, rs_queue_to_tx);  		return MSG_CTL; @@ -409,7 +401,6 @@ static int ftfw_recv(const struct nethdr *net)  		 * know anything about that data, we are unreliable until   		 * the helloing finishes */  		queue_iterate(rs_queue, NULL, rs_queue_empty); -		rs_list_empty(STATE_SYNC(internal), 0, ~0U);  		goto bypass;  	} @@ -480,10 +471,8 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj)  		cn = (struct cache_ftfw *)   			cache_get_extra(STATE_SYNC(internal), obj); -		if (!list_empty(&cn->rs_list)) { -			list_del_init(&cn->rs_list); -			rs_list_len--; -		} +		if (queue_in(rs_queue, &cn->qnode)) +			queue_del(&cn->qnode);  		switch(hello_state) {  		case HELLO_INIT: @@ -500,82 +489,77 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj)  		}  		cn->seq = ntohl(net->seq); -		list_add_tail(&cn->rs_list, &rs_list); -		rs_list_len++; +		queue_add(rs_queue, &cn->qnode);  		break;  	}  } -static int tx_queue_xmit(void *data1, const void *data2) +static int tx_queue_xmit(struct queue_node *n, const void *data)  { -	struct nethdr *net = data1; - -	if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { -		nethdr_set_ack(net); -	} else if (IS_ALIVE(net)) { -		nethdr_set_ctl(net); -	} else { -		STATE_SYNC(error).msg_snd_malformed++; -		return 0; -	} -	HDR_HOST2NETWORK(net); - -	dp("tx_queue sq: %u fl:%u len:%u\n", -               ntohl(net->seq), net->flags, ntohs(net->len)); - -	mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); -	HDR_NETWORK2HOST(net); +	switch(n->type) { +	case Q_ELEM_CTL: { +		struct nethdr *net = queue_node_data(n); + +		if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { +			nethdr_set_ack(net); +		} else if (IS_ALIVE(net)) { +			nethdr_set_ctl(net); +		} else { +			STATE_SYNC(error).msg_snd_malformed++; +			return 0; +		} +		HDR_HOST2NETWORK(net); -	if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) -		queue_add(rs_queue, net, net->len); +		dp("tx_queue sq: %u fl:%u len:%u\n", +	               ntohl(net->seq), net->flags, ntohs(net->len)); -	queue_del(tx_queue, net); +		mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); +		HDR_NETWORK2HOST(net); -	return 0; -} - -static int tx_list_xmit(struct list_head *i, struct cache_object *obj, int type) -{ -	int ret; -	struct nethdr *net = BUILD_NETMSG(obj->ct, type); +		queue_del(n); +		if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) +			queue_add(rs_queue, n); +		else +			queue_object_free((struct queue_object *)n); +		break; +	} +	case Q_ELEM_OBJ: { +		struct cache_ftfw *cn; +		struct cache_object *obj; +		int type; +		struct nethdr *net; -	dp("tx_list sq: %u fl:%u len:%u\n", -                ntohl(net->seq), net->flags, ntohs(net->len)); +		cn = (struct cache_ftfw *)n; +		obj = cache_data_get_object(STATE_SYNC(internal), cn); +		type = object_status_to_network_type(obj->status); +		net = BUILD_NETMSG(obj->ct, type); -	list_del_init(i); -	tx_list_len--; +		dp("tx_list sq: %u fl:%u len:%u\n", +	                ntohl(net->seq), net->flags, ntohs(net->len)); -	ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); -	ftfw_send(net, obj); +		queue_del(n); +		mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); +		ftfw_send(net, obj); +		break; +	} +	} -	return ret; +	return 0;  } -static void ftfw_run(void) +static void ftfw_run(fd_set *readfds)  { -	struct cache_ftfw *cn, *tmp; - -	/* send messages in the tx_queue */ -	queue_iterate(tx_queue, NULL, tx_queue_xmit); - -	/* send conntracks in the tx_list */ -	list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) { -		struct cache_object *obj; - -		obj = cache_data_get_object(STATE_SYNC(internal), cn); -		if (alarm_pending(&obj->alarm)) -			tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_DEL); -		else -			tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_UPD); +	if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) { +		queue_iterate(tx_queue, NULL, tx_queue_xmit); +		add_alarm(&alive_alarm, 1, 0); +		dp("tx_queue_len:%u rs_queue_len:%u\n", +		   queue_len(tx_queue), queue_len(rs_queue));  	} +} -	/* reset alive alarm */ -	add_alarm(&alive_alarm, 1, 0); - -	dp("tx_list_len:%u tx_queue_len:%u " -	   "rs_list_len: %u rs_queue_len:%u\n", -	   tx_list_len, queue_len(tx_queue), -	   rs_list_len, queue_len(rs_queue)); +static int ftfw_register_fds(struct fds *fds) +{ +	return register_fd(queue_get_eventfd(tx_queue), fds);  }  struct sync_mode sync_ftfw = { @@ -588,4 +572,5 @@ struct sync_mode sync_ftfw = {  	.recv			= ftfw_recv,  	.send			= ftfw_send,  	.run			= ftfw_run, +	.register_fds		= ftfw_register_fds,  }; diff --git a/src/sync-mode.c b/src/sync-mode.c index 368984f..711f71b 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -242,12 +242,6 @@ static int init_sync(void)  		return -1;  	} -	STATE_SYNC(evfd) = create_evfd(); -	if (STATE_SYNC(evfd) == NULL) { -		dlog(LOG_ERR, "cannot open evfd"); -		return -1; -	} -  	/* initialization of multicast sequence generation */  	STATE_SYNC(last_seq_sent) = time(NULL); @@ -259,7 +253,10 @@ static int register_fds_sync(struct fds *fds)  	if (register_fd(STATE_SYNC(mcast_server->fd), fds) == -1)  		return -1; -	return register_fd(get_read_evfd(STATE_SYNC(evfd)), fds); +	if (STATE_SYNC(sync)->register_fds) +		return STATE_SYNC(sync)->register_fds(fds); + +	return 0;  }  static void run_sync(fd_set *readfds) @@ -268,11 +265,8 @@ static void run_sync(fd_set *readfds)  	if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds))  		mcast_handler(); -	if (FD_ISSET(get_read_evfd(STATE_SYNC(evfd)), readfds) &&  -	    STATE_SYNC(sync)->run) { -	    	read_evfd(STATE_SYNC(evfd)); -		STATE_SYNC(sync)->run(); -	} +	if (STATE_SYNC(sync)->run) +		STATE_SYNC(sync)->run(readfds);  	/* flush pending messages */  	mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client)); @@ -286,8 +280,6 @@ static void kill_sync(void)  	mcast_server_destroy(STATE_SYNC(mcast_server));  	mcast_client_destroy(STATE_SYNC(mcast_client)); -	destroy_evfd(STATE_SYNC(evfd)); -  	mcast_buffered_destroy();  	if (STATE_SYNC(sync)->kill) diff --git a/src/sync-notrack.c b/src/sync-notrack.c index 2d3783e..40cc199 100644 --- a/src/sync-notrack.c +++ b/src/sync-notrack.c @@ -23,32 +23,26 @@  #include "network.h"  #include "log.h"  #include "cache.h" -#include "event.h" +#include "fds.h"  #include <string.h> -static LIST_HEAD(tx_list); -static unsigned int tx_list_len;  static struct queue *tx_queue;  struct cache_notrack { -	struct list_head	tx_list; +	struct queue_node	qnode;  };  static void cache_notrack_add(struct cache_object *obj, void *data)  {  	struct cache_notrack *cn = data; -	INIT_LIST_HEAD(&cn->tx_list); +	queue_node_init(&cn->qnode, Q_ELEM_OBJ);  }  static void cache_notrack_del(struct cache_object *obj, void *data)  {  	struct cache_notrack *cn = data; - -	if (!list_empty(&cn->tx_list)) { -		list_del(&cn->tx_list); -		tx_list_len--; -	} +	queue_del(&cn->qnode);  }  static struct cache_extra cache_notrack_extra = { @@ -59,20 +53,25 @@ static struct cache_extra cache_notrack_extra = {  static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)  { -	struct nethdr_ack ack = { -		.type  = NET_T_CTL, -		.flags = flags, -		.from  = from, -		.to    = to, -	}; - -	queue_add(tx_queue, &ack, NETHDR_ACK_SIZ); -	write_evfd(STATE_SYNC(evfd)); +	struct queue_object *qobj; +	struct nethdr_ack *ack; + +	qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); +	if (qobj == NULL) +		return; + +	ack		= (struct nethdr_ack *)qobj->data; +        ack->type	= NET_T_CTL; +	ack->flags	= flags; +	ack->from	= from; +	ack->to		= to; + +	queue_add(tx_queue, &qobj->qnode);  }  static int notrack_init(void)  { -	tx_queue = queue_create(~0U); +	tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD);  	if (tx_queue == NULL) {  		dlog(LOG_ERR, "cannot create tx queue");  		return -1; @@ -90,16 +89,7 @@ static int do_cache_to_tx(void *data1, void *data2)  {  	struct cache_object *obj = data2;  	struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj); - -	if (!list_empty(&cn->tx_list)) -		return 0; - -	/* add to tx list */ -	list_add_tail(&cn->tx_list, &tx_list); -	tx_list_len++; - -	write_evfd(STATE_SYNC(evfd)); - +	queue_add(tx_queue, &cn->qnode);  	return 0;  } @@ -152,44 +142,49 @@ static int notrack_recv(const struct nethdr *net)  	return ret;  } -static int tx_queue_xmit(void *data1, const void *data2) +static int tx_queue_xmit(struct queue_node *n, const void *data2)  { -	struct nethdr *net = data1; -	nethdr_set_ack(net); -	HDR_HOST2NETWORK(net); -	mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); -	queue_del(tx_queue, net); +	switch (n->type) { +	case Q_ELEM_CTL: { +		struct nethdr *net = queue_node_data(n); +		if (IS_RESYNC(net)) +			nethdr_set_ack(net); +		else +			nethdr_set_ctl(net); +		HDR_HOST2NETWORK(net); +		mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); +		queue_del(n); +		queue_object_free((struct queue_object *)n); +		break; +	} +	case Q_ELEM_OBJ: { +		struct cache_ftfw *cn; +		struct cache_object *obj; +		int type; +		struct nethdr *net; + +		cn = (struct cache_ftfw *)n; +		obj = cache_data_get_object(STATE_SYNC(internal), cn); +		type = object_status_to_network_type(obj->status);; +		net = BUILD_NETMSG(obj->ct, type); +		mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); +		queue_del(n); +		break; +	} +	}  	return 0;  } -static int tx_list_xmit(struct list_head *i, struct cache_object *obj, int type) +static void notrack_run(fd_set *readfds)  { -	int ret; -	struct nethdr *net = BUILD_NETMSG(obj->ct, type); - -	list_del_init(i); -	tx_list_len--; - -	ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); - -	return ret; +	if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) +		queue_iterate(tx_queue, NULL, tx_queue_xmit);  } -static void notrack_run(void) +static int notrack_register_fds(struct fds *fds)  { -	struct cache_notrack *cn, *tmp; - -	/* send messages in the tx_queue */ -	queue_iterate(tx_queue, NULL, tx_queue_xmit); - -	/* send conntracks in the tx_list */ -	list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) { -		struct cache_object *obj; - -		obj = cache_data_get_object(STATE_SYNC(internal), cn); -		tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_UPD); -	} +	return register_fd(queue_get_eventfd(tx_queue), fds);  }  struct sync_mode sync_notrack = { @@ -201,4 +196,5 @@ struct sync_mode sync_notrack = {  	.local			= notrack_local,  	.recv			= notrack_recv,  	.run			= notrack_run, +	.register_fds		= notrack_register_fds,  };  | 
