diff options
| author | /C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org </C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org> | 2007-07-09 19:11:53 +0000 | 
|---|---|---|
| committer | /C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org </C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org> | 2007-07-09 19:11:53 +0000 | 
| commit | 96084e1a1f2e0a49c961bbddb9fffd2e03bfae3f (patch) | |
| tree | c078d88b157faa7c5ce76bc4591205756f09742b /src | |
| parent | 4df0be6fbf6a47905e0edf11c13b49ea0eacee5b (diff) | |
| download | conntrack-tools-96084e1a1f2e0a49c961bbddb9fffd2e03bfae3f.tar.gz conntrack-tools-96084e1a1f2e0a49c961bbddb9fffd2e03bfae3f.zip | |
- conntrack-tools requires libnetfilter_conntrack >= 0.0.81 
- add len field to nethdr
- implement buffered send/recv to batch messages
- stop using netlink format for network messages: use similar TLV-based format
- reduce synchronization messages size up to 60%
- introduce periodic alive messages for sync-nack protocol
- timeslice alarm implementation: remove alarm pthread, remove locking
- simplify debugging functions: use nfct_snprintf instead
- remove major use of libnfnetlink functions: use libnetfilter_conntrack API
- deprecate conntrackd -F, use conntrack -F instead
- major rework of the network infrastructure: much simple, less messy
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 7 | ||||
| -rw-r--r-- | src/alarm.c | 37 | ||||
| -rw-r--r-- | src/buffer.c | 26 | ||||
| -rw-r--r-- | src/build.c | 113 | ||||
| -rw-r--r-- | src/cache.c | 40 | ||||
| -rw-r--r-- | src/cache_iterators.c | 54 | ||||
| -rw-r--r-- | src/cache_timer.c | 2 | ||||
| -rw-r--r-- | src/lock.c | 32 | ||||
| -rw-r--r-- | src/main.c | 1 | ||||
| -rw-r--r-- | src/mcast.c | 1 | ||||
| -rw-r--r-- | src/netlink.c | 137 | ||||
| -rw-r--r-- | src/network.c | 238 | ||||
| -rw-r--r-- | src/parse.c | 76 | ||||
| -rw-r--r-- | src/proxy.c | 124 | ||||
| -rw-r--r-- | src/run.c | 72 | ||||
| -rw-r--r-- | src/state_helper.c | 2 | ||||
| -rw-r--r-- | src/stats-mode.c | 10 | ||||
| -rw-r--r-- | src/sync-mode.c | 152 | ||||
| -rw-r--r-- | src/sync-nack.c | 260 | ||||
| -rw-r--r-- | src/sync-notrack.c | 6 | ||||
| -rw-r--r-- | src/timer.c | 75 | 
21 files changed, 744 insertions, 721 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 8647d04..d71e23c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -10,7 +10,7 @@ conntrack_SOURCES = conntrack.c  conntrack_LDADD = ../extensions/libct_proto_tcp.la ../extensions/libct_proto_udp.la ../extensions/libct_proto_icmp.la  conntrackd_SOURCES = alarm.c main.c run.c hash.c buffer.c \ -		    local.c log.c mcast.c netlink.c proxy.c lock.c \ +		    local.c log.c mcast.c netlink.c \  		    ignore_pool.c \  		    cache.c cache_iterators.c \  		    cache_lifetime.c cache_timer.c \ @@ -18,9 +18,10 @@ conntrackd_SOURCES = alarm.c main.c run.c hash.c buffer.c \  		    traffic_stats.c stats-mode.c \  		    network.c \  		    state_helper.c state_helper_tcp.c \ +		    timer.c \ +		    build.c parse.c \  		    read_config_yy.y read_config_lex.l -conntrackd_LDFLAGS = $(all_libraries) -lnfnetlink -lnetfilter_conntrack \ -		     -lpthread +conntrackd_LDFLAGS = $(all_libraries) -lnfnetlink -lnetfilter_conntrack  EXTRA_DIST = read_config_yy.h diff --git a/src/alarm.c b/src/alarm.c index 1a465c2..b4db167 100644 --- a/src/alarm.c +++ b/src/alarm.c @@ -22,17 +22,13 @@  #include "conntrackd.h"  #include "alarm.h"  #include "jhash.h" -#include <pthread.h>  #include <time.h>  #include <errno.h>  /* alarm cascade */ -#define ALARM_CASCADE_SIZE     10 +#define ALARM_CASCADE_SIZE     STEPS_PER_SECONDS  static struct list_head *alarm_cascade; -/* thread stuff */ -static pthread_t alarm_thread; -  struct alarm_list *create_alarm()  {	  	return (struct alarm_list *) malloc(sizeof(struct alarm_list)); @@ -86,24 +82,11 @@ int mod_alarm(struct alarm_list *alarm, unsigned long expires)  	return 0;  } -void __run_alarms() +void do_alarm_run(int step)  {  	struct list_head *i, *tmp;  	struct alarm_list *t; -	struct timespec req = {0, 1000000000 / ALARM_CASCADE_SIZE}; -	struct timespec rem; -	static int step = 0; - -retry: -	if (nanosleep(&req, &rem) == -1) { -		/* interrupted syscall: retry with remaining time */ -		if (errno == EINTR) { -			memcpy(&req, &rem, sizeof(struct timespec)); -			goto retry; -		} -	} -	lock();  	list_for_each_safe(i, tmp, &alarm_cascade[step]) {  		t = (struct alarm_list *) i; @@ -111,17 +94,9 @@ retry:  		if (t->expires == 0)  			t->function(t, t->data);  	} -	step = (step + 1) < ALARM_CASCADE_SIZE ? step + 1 : 0; -	unlock(); -} - -void *run_alarms(void *foo) -{ -	while(1) -		__run_alarms();  } -int create_alarm_thread() +int init_alarm_scheduler()  {  	int i; @@ -132,10 +107,10 @@ int create_alarm_thread()  	for (i=0; i<ALARM_CASCADE_SIZE; i++)  		INIT_LIST_HEAD(&alarm_cascade[i]); -	return pthread_create(&alarm_thread, NULL, run_alarms, NULL); +	return 0;  } -int destroy_alarm_thread() +void destroy_alarm_scheduler()  { -	return pthread_cancel(alarm_thread); +	free(alarm_cascade);  } diff --git a/src/buffer.c b/src/buffer.c index fa0b859..23f7797 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -29,7 +29,6 @@ struct buffer *buffer_create(size_t max_size)  	b->max_size = max_size;  	INIT_LIST_HEAD(&b->head); -	pthread_mutex_init(&b->lock, NULL);  	return b;  } @@ -39,14 +38,12 @@ void buffer_destroy(struct buffer *b)  	struct list_head *i, *tmp;  	struct buffer_node *node; -	pthread_mutex_lock(&b->lock); +	/* XXX: set cur_size and num_elems */  	list_for_each_safe(i, tmp, &b->head) {  		node = (struct buffer_node *) i;  		list_del(i);  		free(node);  	} -	pthread_mutex_unlock(&b->lock); -	pthread_mutex_destroy(&b->lock);  	free(b);  } @@ -70,8 +67,6 @@ int buffer_add(struct buffer *b, const void *data, size_t size)  	int ret = 0;  	struct buffer_node *n; -	pthread_mutex_lock(&b->lock); -  	/* does it fit this buffer? */  	if (size > b->max_size) {  		errno = ENOSPC; @@ -97,28 +92,22 @@ retry:  	list_add(&n->head, &b->head);  	b->cur_size += size; +	b->num_elems++;  err: -	pthread_mutex_unlock(&b->lock);  	return ret;  } -void __buffer_del(struct buffer *b, void *data) +void buffer_del(struct buffer *b, void *data)  {  	struct buffer_node *n = container_of(data, struct buffer_node, data);   	list_del(&n->head);  	b->cur_size -= n->size; +	b->num_elems--;  	free(n);  } -void buffer_del(struct buffer *b, void *data) -{ -	pthread_mutex_lock(&b->lock); -	buffer_del(b, data); -	pthread_mutex_unlock(&b->lock); -} -  void buffer_iterate(struct buffer *b,   		    void *data,   		    int (*iterate)(void *data1, void *data2)) @@ -126,11 +115,14 @@ void buffer_iterate(struct buffer *b,  	struct list_head *i, *tmp;  	struct buffer_node *n; -	pthread_mutex_lock(&b->lock);  	list_for_each_safe(i, tmp, &b->head) {  		n = (struct buffer_node *) i;  		if (iterate(n->data, data))  			break;  	} -	pthread_mutex_unlock(&b->lock); +} + +unsigned int buffer_len(struct buffer *b) +{ +	return b->num_elems;  } diff --git a/src/build.c b/src/build.c new file mode 100644 index 0000000..b77dbc2 --- /dev/null +++ b/src/build.c @@ -0,0 +1,113 @@ +/* + * (C) 2006-2007 by Pablo Neira Ayuso <pablo@netfilter.org> + *  + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <string.h> +#include <libnetfilter_conntrack/libnetfilter_conntrack.h> +#include "network.h" + +static void addattr(struct netpld *pld, int attr, const void *data, int len) +{ +	struct netattr *nta; +	int tlen = NTA_LENGTH(len); + +	nta = PLD_TAIL(pld); +	nta->nta_attr = htons(attr); +	nta->nta_len = htons(len); +	memcpy(NTA_DATA(nta), data, len); +	pld->len += NTA_ALIGN(tlen); +} + +static void __build_u8(const struct nf_conntrack *ct, +		       struct netpld *pld, +		       int attr) +{ +	u_int8_t data = nfct_get_attr_u8(ct, attr); +	addattr(pld, attr, &data, sizeof(u_int8_t)); +} + +static void __build_u16(const struct nf_conntrack *ct, +			struct netpld *pld, +			int attr) +{ +	u_int16_t data = nfct_get_attr_u16(ct, attr); +	data = htons(data); +	addattr(pld, attr, &data, sizeof(u_int16_t)); +} + +static void __build_u32(const struct nf_conntrack *ct,  +			struct netpld *pld, +			int attr) +{ +	u_int32_t data = nfct_get_attr_u32(ct, attr); +	data = htonl(data); +	addattr(pld, attr, &data, sizeof(u_int32_t)); +} + +/* XXX: IPv6 and ICMP not supported */ +void build_netpld(struct nf_conntrack *ct, struct netpld *pld, int query) +{ +	/* undo NAT */ +	if (nfct_getobjopt(ct, NFCT_GOPT_IS_SNAT)) +		nfct_setobjopt(ct, NFCT_SOPT_UNDO_SNAT); +	if (nfct_getobjopt(ct, NFCT_GOPT_IS_DNAT)) +		nfct_setobjopt(ct, NFCT_SOPT_UNDO_DNAT); +	if (nfct_getobjopt(ct, NFCT_GOPT_IS_SPAT)) +		nfct_setobjopt(ct, NFCT_SOPT_UNDO_SPAT); +	if (nfct_getobjopt(ct, NFCT_GOPT_IS_DPAT)) +		nfct_setobjopt(ct, NFCT_SOPT_UNDO_DPAT); + +	/* build message */ +	if (nfct_attr_is_set(ct, ATTR_IPV4_SRC)) +		__build_u32(ct, pld, ATTR_IPV4_SRC); +	if (nfct_attr_is_set(ct, ATTR_IPV4_DST)) +		__build_u32(ct, pld, ATTR_IPV4_DST); +	if (nfct_attr_is_set(ct, ATTR_L3PROTO)) +		__build_u8(ct, pld, ATTR_L3PROTO); +	if (nfct_attr_is_set(ct, ATTR_PORT_SRC)) +		__build_u16(ct, pld, ATTR_PORT_SRC); +	if (nfct_attr_is_set(ct, ATTR_PORT_DST)) +		__build_u16(ct, pld, ATTR_PORT_DST); +	if (nfct_attr_is_set(ct, ATTR_L4PROTO)) { +		u_int8_t proto; + +		__build_u8(ct, pld, ATTR_L4PROTO); +		proto = nfct_get_attr_u8(ct, ATTR_L4PROTO); +		if (proto == IPPROTO_TCP) { +			if (nfct_attr_is_set(ct, ATTR_TCP_STATE)) +				__build_u8(ct, pld, ATTR_TCP_STATE); +		} +	} +	if (nfct_attr_is_set(ct, ATTR_SNAT_IPV4)) +		__build_u32(ct, pld, ATTR_SNAT_IPV4); +	if (nfct_attr_is_set(ct, ATTR_DNAT_IPV4)) +		__build_u32(ct, pld, ATTR_DNAT_IPV4); +	if (nfct_attr_is_set(ct, ATTR_SNAT_PORT)) +		__build_u16(ct, pld, ATTR_SNAT_PORT); +	if (nfct_attr_is_set(ct, ATTR_DNAT_PORT)) +		__build_u16(ct, pld, ATTR_DNAT_PORT); +	if (nfct_attr_is_set(ct, ATTR_TIMEOUT)) +		__build_u32(ct, pld, ATTR_TIMEOUT); +	if (nfct_attr_is_set(ct, ATTR_MARK)) +		__build_u32(ct, pld, ATTR_MARK); +	if (nfct_attr_is_set(ct, ATTR_STATUS)) +		__build_u32(ct, pld, ATTR_STATUS); + +	pld->query = query; + +	PLD_HOST2NETWORK(pld); +} diff --git a/src/cache.c b/src/cache.c index 3bf331c..1e20d95 100644 --- a/src/cache.c +++ b/src/cache.c @@ -193,9 +193,7 @@ struct cache *cache_create(char *name,  void cache_destroy(struct cache *c)  { -	lock();  	hashtable_destroy(c->h); -	unlock();  	free(c->features);  	free(c->feature_offset);  	free(c); @@ -237,7 +235,7 @@ static struct us_conntrack *__add(struct cache *c, struct nf_conntrack *ct)  	return NULL;  } -struct us_conntrack *__cache_add(struct cache *c, struct nf_conntrack *ct) +struct us_conntrack *cache_add(struct cache *c, struct nf_conntrack *ct)  {  	struct us_conntrack *u; @@ -252,17 +250,6 @@ struct us_conntrack *__cache_add(struct cache *c, struct nf_conntrack *ct)  	return NULL;  } -struct us_conntrack *cache_add(struct cache *c, struct nf_conntrack *ct) -{ -	struct us_conntrack *u; - -	lock(); -	u = __cache_add(c, ct); -	unlock(); - -	return u; -} -  static struct us_conntrack *__update(struct cache *c, struct nf_conntrack *ct)  {  	size_t size = c->h->datasize; @@ -317,9 +304,7 @@ struct us_conntrack *cache_update(struct cache *c, struct nf_conntrack *ct)  {  	struct us_conntrack *u; -	lock();  	u = __cache_update(c, ct); -	unlock();  	return u;  } @@ -329,19 +314,15 @@ struct us_conntrack *cache_update_force(struct cache *c,  {  	struct us_conntrack *u; -	lock();  	if ((u = __update(c, ct)) != NULL) {  		c->upd_ok++; -		unlock();  		return u;  	}  	if ((u = __add(c, ct)) != NULL) {  		c->add_ok++; -		unlock();  		return u;  	}  	c->add_fail++; -	unlock();  	return NULL;  } @@ -354,9 +335,7 @@ int cache_test(struct cache *c, struct nf_conntrack *ct)  	u->ct = ct; -	lock();  	ret = hashtable_test(c->h, u); -	unlock();  	return ret != NULL;  } @@ -390,7 +369,7 @@ static int __del(struct cache *c, struct nf_conntrack *ct)  	return 0;  } -int __cache_del(struct cache *c, struct nf_conntrack *ct) +int cache_del(struct cache *c, struct nf_conntrack *ct)  {  	if (__del(c, ct)) {  		c->del_ok++; @@ -401,17 +380,6 @@ int __cache_del(struct cache *c, struct nf_conntrack *ct)  	return 0;  } -int cache_del(struct cache *c, struct nf_conntrack *ct) -{ -	int ret; - -	lock(); -	ret = __cache_del(c, ct); -	unlock(); - -	return ret; -} -  struct us_conntrack *cache_get_conntrack(struct cache *c, void *data)  {  	return data - c->extra_offset; @@ -427,7 +395,6 @@ void cache_stats(struct cache *c, int fd)  	char buf[512];  	int size; -	lock();  	size = sprintf(buf, "cache %s:\n"  			    "current active connections:\t%12u\n"  			    "connections created:\t\t%12u\tfailed:\t%12u\n" @@ -441,7 +408,6 @@ void cache_stats(struct cache *c, int fd)  						 c->upd_fail,  						 c->del_ok,  						 c->del_fail); -	unlock();  	send(fd, buf, size, 0);  } @@ -449,7 +415,5 @@ void cache_iterate(struct cache *c,  		   void *data,   		   int (*iterate)(void *data1, void *data2))  { -	lock();  	hashtable_iterate(c->h, data, iterate); -	unlock();  } diff --git a/src/cache_iterators.c b/src/cache_iterators.c index 446cac8..1d1b2e8 100644 --- a/src/cache_iterators.c +++ b/src/cache_iterators.c @@ -71,37 +71,25 @@ void cache_dump(struct cache *c, int fd, int type)  		.type	= type  	}; -	/* does not require locking: called inside fork() */  	hashtable_iterate(c->h, (void *) &tmp, do_dump);  } +/* no need to clone, called from child process */  static int do_commit(void *data1, void *data2)  {  	int ret;  	struct cache *c = data1;  	struct us_conntrack *u = data2; -	struct nf_conntrack *ct; -	char buf[4096]; -	struct nlmsghdr *nlh = (struct nlmsghdr *)buf; - -	ct = nfct_clone(u->ct); -	if (ct == NULL) -		return 0; +	struct nf_conntrack *ct = u->ct; +	/* XXX: related connections */  	if (nfct_attr_is_set(ct, ATTR_STATUS)) {  		u_int32_t status = nfct_get_attr_u32(ct, ATTR_STATUS);  		status &= ~IPS_EXPECTED;  		nfct_set_attr_u32(ct, ATTR_STATUS, status);  	} -	if (nfct_getobjopt(ct, NFCT_GOPT_IS_SNAT)) -		nfct_setobjopt(ct, NFCT_SOPT_UNDO_SNAT); -	if (nfct_getobjopt(ct, NFCT_GOPT_IS_DNAT)) -		nfct_setobjopt(ct, NFCT_SOPT_UNDO_DNAT); -	if (nfct_getobjopt(ct, NFCT_GOPT_IS_SPAT)) -		nfct_setobjopt(ct, NFCT_SOPT_UNDO_SPAT); -	if (nfct_getobjopt(ct, NFCT_GOPT_IS_DPAT)) -		nfct_setobjopt(ct, NFCT_SOPT_UNDO_DPAT); +	nfct_setobjopt(ct, NFCT_SOPT_SETUP_REPLY);          /*   	 * Set a reduced timeout for candidate-to-be-committed @@ -109,20 +97,12 @@ static int do_commit(void *data1, void *data2)  	 */  	nfct_set_attr_u32(ct, ATTR_TIMEOUT, CONFIG(commit_timeout)); -        ret = nfct_build_query(STATE(subsys_dump), -			       NFCT_Q_CREATE_UPDATE, -			       ct, -			       nlh, -			       sizeof(buf)); - -	free(ct); -  	if (ret == -1) {  		dlog(STATE(log), "failed to build: %s", strerror(errno));  		return 0;  	} -	ret = nfnl_query(STATE(dump), nlh); +	ret = nfct_query(STATE(dump), NFCT_Q_CREATE_UPDATE, ct);  	if (ret == -1) {  		switch(errno) {  			case EEXIST: @@ -146,7 +126,6 @@ void cache_commit(struct cache *c)  	unsigned int commit_exist = c->commit_exist;  	unsigned int commit_fail = c->commit_fail; -	/* does not require locking: called inside fork() */  	hashtable_iterate(c->h, c, do_commit);  	/* calculate new entries committed */ @@ -187,30 +166,7 @@ static int do_flush(void *data1, void *data2)  void cache_flush(struct cache *c)  { -	lock();  	hashtable_iterate(c->h, c, do_flush);  	hashtable_flush(c->h);  	c->flush++; -	unlock(); -} - -#include "sync.h" -#include "network.h" - -static int do_bulk(void *data1, void *data2) -{ -	int ret; -	struct us_conntrack *u = data2; - -	mcast_build_send_update(u); - -	/* keep iterating even if we have found errors */ -	return 0; -} - -void cache_bulk(struct cache *c) -{ -	lock(); -	hashtable_iterate(c->h, NULL, do_bulk); -	unlock();  } diff --git a/src/cache_timer.c b/src/cache_timer.c index 213b59a..f3940f3 100644 --- a/src/cache_timer.c +++ b/src/cache_timer.c @@ -27,7 +27,7 @@ static void timeout(struct alarm_list *a, void *data)  	struct us_conntrack *u = data;  	debug_ct(u->ct, "expired timeout"); -	__cache_del(u->cache, u->ct); +	cache_del(u->cache, u->ct);  }  static void timer_add(struct us_conntrack *u, void *data) @@ -1,32 +0,0 @@ -/* - * (C) 2006 by Pablo Neira Ayuso <pablo@netfilter.org> - *  - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include <stdio.h> -#include <pthread.h> - -static pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; - -void lock() -{ -	pthread_mutex_lock(&global_lock); -} - -void unlock() -{ -	pthread_mutex_unlock(&global_lock); -} @@ -187,6 +187,7 @@ int main(int argc, char *argv[])  		case 'F':  			set_operation_mode(&type, REQUEST, argv);  			action = FLUSH_MASTER; +			break;  		case 'f':  			set_operation_mode(&type, REQUEST, argv);  			action = FLUSH_CACHE; diff --git a/src/mcast.c b/src/mcast.c index 85992fb..6193a59 100644 --- a/src/mcast.c +++ b/src/mcast.c @@ -87,7 +87,6 @@ struct mcast_sock *mcast_server_create(struct mcast_conf *conf)  		return NULL;  	} -  	switch(conf->ipproto) {  	case AF_INET:  		if (setsockopt(m->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, diff --git a/src/netlink.c b/src/netlink.c index 5f7cbeb..be5f82e 100644 --- a/src/netlink.c +++ b/src/netlink.c @@ -52,19 +52,10 @@ int ignore_conntrack(struct nf_conntrack *ct)  	return 0;  } -static int nl_event_handler(struct nlmsghdr *nlh, -			    struct nfattr *nfa[], -			    void *data) +static int event_handler(enum nf_conntrack_msg_type type, +			 struct nf_conntrack *ct, +			 void *data)  { -	char tmp[1024]; -	struct nf_conntrack *ct = (struct nf_conntrack *) tmp; -	int type; - -	memset(tmp, 0, sizeof(tmp)); - -	if ((type = nfct_parse_conntrack(NFCT_T_ALL, nlh, ct)) == NFCT_T_ERROR) -		return NFCT_CB_STOP; -  	/*   	 * Ignore this conntrack: it talks about a  	 * connection that is not interesting for us. @@ -74,13 +65,13 @@ static int nl_event_handler(struct nlmsghdr *nlh,  	switch(type) {  	case NFCT_T_NEW: -		STATE(mode)->event_new(ct, nlh); +		STATE(mode)->event_new(ct);  		break;  	case NFCT_T_UPDATE: -		STATE(mode)->event_upd(ct, nlh); +		STATE(mode)->event_upd(ct);  		break;  	case NFCT_T_DESTROY: -		if (STATE(mode)->event_dst(ct, nlh)) +		if (STATE(mode)->event_dst(ct))  			update_traffic_stats(ct);  		break;  	default: @@ -88,30 +79,31 @@ static int nl_event_handler(struct nlmsghdr *nlh,  		break;  	} -	return NFCT_CB_STOP; +	return NFCT_CB_CONTINUE;  } +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/fcntl.h> +  int nl_init_event_handler(void)  { -	struct nfnl_callback cb_events = { -		.call		= nl_event_handler, -		.attr_count	= CTA_MAX -	}; - -	/* open event netlink socket */ -	STATE(event) = nfnl_open(); +	STATE(event) = nfct_open(CONNTRACK, NFCT_ALL_CT_GROUPS);  	if (!STATE(event))  		return -1; +	fcntl(nfct_fd(STATE(event)), F_SETFL, O_NONBLOCK); +  	/* set up socket buffer size */  	if (CONFIG(netlink_buffer_size)) -		nfnl_rcvbufsiz(STATE(event), CONFIG(netlink_buffer_size)); +		nfnl_rcvbufsiz(nfct_nfnlh(STATE(event)),  +			       CONFIG(netlink_buffer_size));  	else {  		socklen_t socklen = sizeof(unsigned int);  		unsigned int read_size;  		/* get current buffer size */ -		getsockopt(nfnl_fd(STATE(event)), SOL_SOCKET, +		getsockopt(nfct_fd(STATE(event)), SOL_SOCKET,  			   SO_RCVBUF, &read_size, &socklen);  		CONFIG(netlink_buffer_size) = read_size; @@ -122,40 +114,16 @@ int nl_init_event_handler(void)  		CONFIG(netlink_buffer_size_max_grown) =   					CONFIG(netlink_buffer_size); -	/* open event subsystem */ -	STATE(subsys_event) = nfnl_subsys_open(STATE(event), -					       NFNL_SUBSYS_CTNETLINK, -					       IPCTNL_MSG_MAX, -					       NFCT_ALL_CT_GROUPS); -	if (STATE(subsys_event) == NULL) -		return -1; - -	/* register callback for new and update events */ -	nfnl_callback_register(STATE(subsys_event), -			       IPCTNL_MSG_CT_NEW, -			       &cb_events); - -	/* register callback for delete events */ -	nfnl_callback_register(STATE(subsys_event), -			       IPCTNL_MSG_CT_DELETE, -			       &cb_events); +	/* register callback for events */ +	nfct_callback_register(STATE(event), NFCT_T_ALL, event_handler, NULL);  	return 0;  } -static int nl_dump_handler(struct nlmsghdr *nlh, -			   struct nfattr *nfa[], -			   void *data) +static int dump_handler(enum nf_conntrack_msg_type type, +			struct nf_conntrack *ct, +			void *data)  { -	char buf[1024]; -	struct nf_conntrack *ct = (struct nf_conntrack *) buf; -	int type; - -	memset(buf, 0, sizeof(buf)); - -	if ((type = nfct_parse_conntrack(NFCT_T_ALL, nlh, ct)) == NFCT_T_ERROR) -		return NFCT_CB_CONTINUE; -  	/*   	 * Ignore this conntrack: it talks about a  	 * connection that is not interesting for us. @@ -165,7 +133,7 @@ static int nl_dump_handler(struct nlmsghdr *nlh,  	switch(type) {  	case NFCT_T_UPDATE: -		STATE(mode)->dump(ct, nlh); +		STATE(mode)->dump(ct);  		break;  	default:  		dlog(STATE(log), "received unknown msg from ctnetlink"); @@ -176,30 +144,15 @@ static int nl_dump_handler(struct nlmsghdr *nlh,  int nl_init_dump_handler(void)  { -	struct nfnl_callback cb_dump = { -		.call		= nl_dump_handler, -		.attr_count	= CTA_MAX -	}; -  	/* open dump netlink socket */ -	STATE(dump) = nfnl_open(); +	STATE(dump) = nfct_open(CONNTRACK, 0);  	if (!STATE(dump))  		return -1; -	/* open dump subsystem */ -	STATE(subsys_dump) = nfnl_subsys_open(STATE(dump), -					      NFNL_SUBSYS_CTNETLINK, -					      IPCTNL_MSG_MAX, -					      0); -	if (STATE(subsys_dump) == NULL) -		return -1; -  	/* register callback for dumped entries */ -	nfnl_callback_register(STATE(subsys_dump), -			       IPCTNL_MSG_CT_NEW, -			       &cb_dump); +	nfct_callback_register(STATE(dump), NFCT_T_ALL, dump_handler, NULL); -	if (nl_dump_conntrack_table(STATE(dump), STATE(subsys_dump)) == -1) +	if (nl_dump_conntrack_table() == -1)  		return -1;  	return 0; @@ -207,7 +160,7 @@ int nl_init_dump_handler(void)  static int warned = 0; -void nl_resize_socket_buffer(struct nfnl_handle *h) +void nl_resize_socket_buffer(struct nfct_handle *h)  {  	unsigned int s = CONFIG(netlink_buffer_size) * 2; @@ -228,44 +181,14 @@ void nl_resize_socket_buffer(struct nfnl_handle *h)  		warned = 1;  	} -	CONFIG(netlink_buffer_size) = nfnl_rcvbufsiz(h, s); +	CONFIG(netlink_buffer_size) = nfnl_rcvbufsiz(nfct_nfnlh(h), s);  	/* notify the sysadmin */  	dlog(STATE(log), "netlink socket buffer size has been set to %u bytes",   			  CONFIG(netlink_buffer_size));  } -int nl_dump_conntrack_table(struct nfnl_handle *h,  -			    struct nfnl_subsys_handle *subsys) +int nl_dump_conntrack_table(void)  { -	struct nfnlhdr req; - -	memset(&req, 0, sizeof(req)); -	nfct_build_query(subsys,  -			 NFCT_Q_DUMP,  -			 &CONFIG(family),  -			 &req,  -			 sizeof(req)); - -	if (nfnl_query(h, &req.nlh) == -1) -		return -1; - -	return 0; -} - -int nl_flush_master_conntrack_table(void) -{ -	struct nfnlhdr req; - -	memset(&req, 0, sizeof(req)); -	nfct_build_query(STATE(subsys_dump),  -			 NFCT_Q_FLUSH,  -			 &CONFIG(family),  -			 &req,  -			 sizeof(req)); - -	if (nfnl_query(STATE(dump), &req.nlh) == -1) -		return -1; - -	return 0; +	return nfct_query(STATE(dump), NFCT_Q_DUMP, &CONFIG(family));  } diff --git a/src/network.c b/src/network.c index 159bdf3..d162839 100644 --- a/src/network.c +++ b/src/network.c @@ -18,190 +18,159 @@  #include "conntrackd.h"  #include "network.h" +#include "us-conntrack.h" +#include "sync.h"  static unsigned int seq_set, cur_seq; -static int send_netmsg(struct mcast_sock *m, void *data, unsigned int len) +static int __do_send(struct mcast_sock *m, void *data, int len)  {  	struct nethdr *net = data; -	if (!seq_set) { -		seq_set = 1; -		cur_seq = time(NULL); -		net->flags |= NET_F_HELLO; -	} - -	net->flags = htons(net->flags); -	net->seq = htonl(cur_seq++); -  #undef _TEST_DROP  #ifdef _TEST_DROP  	static int drop = 0; -        if (++drop > 10) { +	if (++drop >= 10) { +		printf("drop sq: %u fl:%u len:%u\n", +			ntohl(net->seq), ntohs(net->flags), +			ntohs(net->len));  		drop = 0; -		printf("dropping resend (seq=%u)\n", ntohl(net->seq));  		return 0;  	}  #endif +	debug("send sq: %u fl:%u len:%u\n", +		ntohl(net->seq), ntohs(net->flags), +		ntohs(net->len)); +  	return mcast_send(m, net, len);  } -int mcast_send_netmsg(struct mcast_sock *m, void *data) +static int __do_prepare(struct mcast_sock *m, void *data, int len)  { -	struct nlmsghdr *nlh = data + NETHDR_SIZ; -	unsigned int len = nlh->nlmsg_len + NETHDR_SIZ;  	struct nethdr *net = data; -	if (nlh_host2network(nlh) == -1) -		return -1; +	if (!seq_set) { +		seq_set = 1; +		cur_seq = time(NULL); +		net->flags |= NET_F_HELLO; +	} +	net->len = len; +	net->seq = cur_seq++; +	HDR_HOST2NETWORK(net); -	return send_netmsg(m, data, len); +	return len;  } -int mcast_resend_netmsg(struct mcast_sock *m, void *data) +static int __prepare_ctl(struct mcast_sock *m, void *data)  { -	struct nethdr *net = data; -	struct nlmsghdr *nlh = data + NETHDR_SIZ; -	unsigned int len; +	struct nethdr_ack *nack = (struct nethdr_ack *) data; -	net->flags = ntohs(net->flags); +	return __do_prepare(m, data, NETHDR_ACK_SIZ); +} -	if (net->flags & NET_F_NACK || net->flags & NET_F_ACK) -		len = NETHDR_ACK_SIZ; -	else -		len = ntohl(nlh->nlmsg_len) + NETHDR_SIZ; +static int __prepare_data(struct mcast_sock *m, void *data) +{ +	struct nethdr *net = (struct nethdr *) data; +	struct netpld *pld = NETHDR_DATA(net); -	return send_netmsg(m, data, len); +	return __do_prepare(m, data, ntohs(pld->len) + NETPLD_SIZ + NETHDR_SIZ);  } -int mcast_send_error(struct mcast_sock *m, void *data) +int prepare_send_netmsg(struct mcast_sock *m, void *data)  { -	struct nethdr *net = data; -	unsigned int len = NETHDR_SIZ; +	int ret = 0; +	struct nethdr *net = (struct nethdr *) data; -	if (net->flags & NET_F_NACK || net->flags & NET_F_ACK) { -		struct nethdr_ack *nack = (struct nethdr_ack *) net; -		nack->from = htonl(nack->from); -		nack->to = htonl(nack->to); -		len = NETHDR_ACK_SIZ; -	} +	if (IS_DATA(net)) +		ret = __prepare_data(m, data); +	else if (IS_CTL(net)) +		ret = __prepare_ctl(m, data); -	return send_netmsg(m, data, len); +	return ret;  } -#include "us-conntrack.h" -#include "sync.h" +static int tx_buflen = 0; +/* XXX: use buffer size of interface MTU */ +static char __tx_buf[1460], *tx_buf = __tx_buf; -static int __build_send(struct us_conntrack *u, int type, int query) +/* return 0 if it is not sent, otherwise return 1 */ +int mcast_buffered_send_netmsg(struct mcast_sock *m, void *data, int len)  { -	char __net[4096]; -	struct nethdr *net = (struct nethdr *) __net; +	int ret = 0; +	struct nethdr *net = data; -	if (!state_helper_verdict(type, u->ct)) -		return 0; +retry: +	if (tx_buflen + len < sizeof(__tx_buf)) { +		memcpy(__tx_buf + tx_buflen, net, len); +		tx_buflen += len; +	} else { +		__do_send(m, tx_buf, tx_buflen); +		ret = 1; +		tx_buflen = 0; +		goto retry; +	} -	int ret = build_network_msg(query, -				    STATE(subsys_event), -				    u->ct, -				    __net, -				    sizeof(__net)); +	return ret; +} -	if (ret == -1) -		return -1; +int mcast_buffered_pending_netmsg(struct mcast_sock *m) +{ +	int ret; + +	if (tx_buflen == 0) +		return 0; -	mcast_send_netmsg(STATE_SYNC(mcast_client), __net); -	if (STATE_SYNC(sync)->send) -		STATE_SYNC(sync)->send(type, net, u); +	ret = __do_send(m, tx_buf, tx_buflen); +	tx_buflen = 0; -	return 0; +	return ret;  } -int mcast_build_send_update(struct us_conntrack *u) +int mcast_send_netmsg(struct mcast_sock *m, void *data)  { -	return __build_send(u, NFCT_T_UPDATE, NFCT_Q_UPDATE); +	int ret; +	int len = prepare_send_netmsg(m, data); + +	ret = mcast_buffered_send_netmsg(m, data, len); +	mcast_buffered_pending_netmsg(m); + +	return ret;  } -int mcast_build_send_destroy(struct us_conntrack *u) +void build_netmsg(struct nf_conntrack *ct, int query, struct nethdr *net)  { -	return __build_send(u, NFCT_T_DESTROY, NFCT_Q_DESTROY); +	struct netpld *pld = NETHDR_DATA(net); + +	build_netpld(ct, pld, query);  } -int mcast_recv_netmsg(struct mcast_sock *m, void *data, int len) +int handle_netmsg(struct nethdr *net)  {  	int ret; -	struct nethdr *net = data; -	struct nlmsghdr *nlh = data + NETHDR_SIZ; -	struct nfgenmsg *nfhdr; - -	ret = mcast_recv(m, net, len); -	if (ret <= 0) -		return ret; +	struct netpld *pld = NETHDR_DATA(net);  	/* message too small: no room for the header */ -	if (ret < NETHDR_SIZ) +	if (ntohs(net->len) < NETHDR_ACK_SIZ)  		return -1; -	if (ntohs(net->flags) & NET_F_HELLO) -		STATE_SYNC(last_seq_recv) = ntohl(net->seq) - 1; +	HDR_NETWORK2HOST(net); -	if (ntohs(net->flags) & NET_F_NACK || ntohs(net->flags) & NET_F_ACK) { -		struct nethdr_ack *nack = (struct nethdr_ack *) net; +	if (IS_HELLO(net)) +		STATE_SYNC(last_seq_recv) = net->seq - 1; -		/* message too small: no room for the header */ -		if (ret < NETHDR_ACK_SIZ) -			return -1; - -		/* host byte order conversion */ -		net->flags = ntohs(net->flags); -		net->seq = ntohl(net->seq); - -		/* acknowledgement conversion */ -		nack->from = ntohl(nack->from); -		nack->to = ntohl(nack->to); - -		return ret; -	} - -	if (ntohs(net->flags) & NET_F_RESYNC) { -		/* host byte order conversion */ -		net->flags = ntohs(net->flags); -		net->seq = ntohl(net->seq); - -		return ret; -	} +	if (IS_CTL(net)) +		return 0;  	/* information received is too small */ -	if (ret < NLMSG_SPACE(sizeof(struct nfgenmsg))) -		return -1; - -	/* information received and message length does not match */ -	if (ret != ntohl(nlh->nlmsg_len) + NETHDR_SIZ) -		return -1; - -	/* this message does not come from ctnetlink */ -	if (NFNL_SUBSYS_ID(ntohs(nlh->nlmsg_type)) != NFNL_SUBSYS_CTNETLINK) +	if (net->len < sizeof(struct netpld))  		return -1; -	nfhdr = NLMSG_DATA(nlh); - -	/* only AF_INET and AF_INET6 are supported */ -	if (nfhdr->nfgen_family != AF_INET && -	    nfhdr->nfgen_family != AF_INET6) -		return -1; - -	/* only process message coming from nfnetlink v0 */ -	if (nfhdr->version != NFNETLINK_V0) -		return -1; - -	/* host byte order conversion */ -	net->flags = ntohs(net->flags); -	net->seq = ntohl(net->seq); - -	if (nlh_network2host(nlh) == -1) +	/* size mismatch! */ +	if (net->len < ntohs(pld->len) + NETHDR_SIZ)  		return -1; -	return ret; +	return 0;  }  int mcast_track_seq(u_int32_t seq, u_int32_t *exp_seq) @@ -238,30 +207,3 @@ out:  	return ret;  } - -int build_network_msg(const int msg_type, -		      struct nfnl_subsys_handle *ssh,  -		      struct nf_conntrack *ct, -		      void *buffer, -		      unsigned int size) -{ -	memset(buffer, 0, size); -	buffer += NETHDR_SIZ; -	size -= NETHDR_SIZ; -	return nfct_build_query(ssh, msg_type, ct, buffer, size); -} - -unsigned int parse_network_msg(struct nf_conntrack *ct,  -			       const struct nlmsghdr *nlh) -{ -	/*  -	 * The parsing of netlink messages going through network is  -	 * similar to the one that is done for messages coming from -	 * kernel, therefore do not replicate more code and use the -	 * function provided in the libraries. -	 * -	 * Yup, this is a hack 8) -	 */ -	return nfct_parse_conntrack(NFCT_T_ALL, nlh, ct); -} - diff --git a/src/parse.c b/src/parse.c new file mode 100644 index 0000000..81b70c4 --- /dev/null +++ b/src/parse.c @@ -0,0 +1,76 @@ +/* + * (C) 2006-2007 by Pablo Neira Ayuso <pablo@netfilter.org> + *  + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <string.h> +#include <libnetfilter_conntrack/libnetfilter_conntrack.h> +#include "network.h" + +static int parse_u8(struct nf_conntrack *ct, int attr, void *data) +{ +	u_int8_t *value = (u_int8_t *) data; +	nfct_set_attr_u8(ct, attr, *value); +} + +static int parse_u16(struct nf_conntrack *ct, int attr, void *data) +{ +	u_int16_t *value = (u_int16_t *) data; +	nfct_set_attr_u16(ct, attr, ntohs(*value)); +} + +static int parse_u32(struct nf_conntrack *ct, int attr, void *data) +{ +	u_int32_t *value = (u_int32_t *) data; +	nfct_set_attr_u32(ct, attr, ntohl(*value)); +} + +typedef int (*parse)(struct nf_conntrack *ct, int attr, void *data); + +parse h[ATTR_MAX] = { +	[ATTR_IPV4_SRC]		= parse_u32, +	[ATTR_IPV4_DST]		= parse_u32, +	[ATTR_L3PROTO]		= parse_u8, +	[ATTR_PORT_SRC]		= parse_u16, +	[ATTR_PORT_DST]		= parse_u16, +	[ATTR_L4PROTO]		= parse_u8, +	[ATTR_TCP_STATE]	= parse_u8, +	[ATTR_SNAT_IPV4]	= parse_u32, +	[ATTR_DNAT_IPV4]	= parse_u32, +	[ATTR_SNAT_PORT]	= parse_u16, +	[ATTR_DNAT_PORT]	= parse_u16, +	[ATTR_TIMEOUT]		= parse_u32, +	[ATTR_MARK]		= parse_u32, +	[ATTR_STATUS]		= parse_u32, +}; + +void parse_netpld(struct nf_conntrack *ct, struct netpld *pld, int *query) +{ +	int len; +	struct netattr *attr; + +	PLD_NETWORK2HOST(pld); +	len = pld->len; +	attr = PLD_DATA(pld); + +	while (len > 0) { +		ATTR_NETWORK2HOST(attr); +		h[attr->nta_attr](ct, attr->nta_attr, NTA_DATA(attr)); +		attr = NTA_NEXT(attr, len); +	} + +	*query = pld->query; +} diff --git a/src/proxy.c b/src/proxy.c deleted file mode 100644 index b9bb04e..0000000 --- a/src/proxy.c +++ /dev/null @@ -1,124 +0,0 @@ -/* - * (C) 2006 by Pablo Neira Ayuso <pablo@netfilter.org> - *  - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include <libnfnetlink/libnfnetlink.h> -#include <libnetfilter_conntrack/libnetfilter_conntrack.h> - -#if 0 -#define dprintf printf -#else -#define dprintf -#endif - -int nlh_payload_host2network(struct nfattr *nfa, int len) -{ -	struct nfattr *__nfa; - -	while (NFA_OK(nfa, len)) { - -		dprintf("type=%d nfalen=%d len=%d [%s]\n",  -			nfa->nfa_type & 0x7fff, -			nfa->nfa_len, len, -			nfa->nfa_type & NFNL_NFA_NEST ? "NEST":""); - -		if (nfa->nfa_type & NFNL_NFA_NEST) { -			if (NFA_PAYLOAD(nfa) > len) -				return -1; - -			if (nlh_payload_host2network(NFA_DATA(nfa),  -						     NFA_PAYLOAD(nfa)) == -1) -				return -1; -		} - -		__nfa = NFA_NEXT(nfa, len); - -		nfa->nfa_type = htons(nfa->nfa_type); -		nfa->nfa_len  = htons(nfa->nfa_len); - -		nfa = __nfa;  -	} -	return 0; -} - -int nlh_host2network(struct nlmsghdr *nlh) -{ -	struct nfgenmsg *nfhdr = NLMSG_DATA(nlh); -	struct nfattr *cda[CTA_MAX]; -	unsigned int min_len = NLMSG_SPACE(sizeof(struct nfgenmsg)); -	unsigned int len = nlh->nlmsg_len - NLMSG_ALIGN(min_len); - -	nlh->nlmsg_len   = htonl(nlh->nlmsg_len); -	nlh->nlmsg_type  = htons(nlh->nlmsg_type); -	nlh->nlmsg_flags = htons(nlh->nlmsg_flags); -	nlh->nlmsg_seq   = htonl(nlh->nlmsg_seq); -	nlh->nlmsg_pid   = htonl(nlh->nlmsg_pid); - -	nfhdr->res_id    = htons(nfhdr->res_id); - -	return nlh_payload_host2network(NFM_NFA(NLMSG_DATA(nlh)), len); -} - -int nlh_payload_network2host(struct nfattr *nfa, int len) -{ -	nfa->nfa_type = ntohs(nfa->nfa_type); -	nfa->nfa_len  = ntohs(nfa->nfa_len); - -	while(NFA_OK(nfa, len)) { - -                dprintf("type=%d nfalen=%d len=%d [%s]\n",  -		        nfa->nfa_type & 0x7fff,  -		        nfa->nfa_len, len,  -		        nfa->nfa_type & NFNL_NFA_NEST ? "NEST":""); - -		if (nfa->nfa_type & NFNL_NFA_NEST) { -			if (NFA_PAYLOAD(nfa) > len) -				return -1; - -			if (nlh_payload_network2host(NFA_DATA(nfa), -						     NFA_PAYLOAD(nfa)) == -1) -				return -1; -		} - -		nfa = NFA_NEXT(nfa,len); - -		if (len < NFA_LENGTH(0)) -			break; - -		nfa->nfa_type = ntohs(nfa->nfa_type); -		nfa->nfa_len  = ntohs(nfa->nfa_len); -	} -	return 0; -} - -int nlh_network2host(struct nlmsghdr *nlh) -{ -	struct nfgenmsg *nfhdr = NLMSG_DATA(nlh); -	struct nfattr *cda[CTA_MAX]; -	unsigned int min_len = NLMSG_SPACE(sizeof(struct nfgenmsg)); -	unsigned int len = ntohl(nlh->nlmsg_len) - NLMSG_ALIGN(min_len); - -	nlh->nlmsg_len   = ntohl(nlh->nlmsg_len); -	nlh->nlmsg_type  = ntohs(nlh->nlmsg_type); -	nlh->nlmsg_flags = ntohs(nlh->nlmsg_flags); -	nlh->nlmsg_seq   = ntohl(nlh->nlmsg_seq); -	nlh->nlmsg_pid   = ntohl(nlh->nlmsg_pid); - -	nfhdr->res_id    = ntohs(nfhdr->res_id); - -	return nlh_payload_network2host(NFM_NFA(NLMSG_DATA(nlh)), len); -} @@ -24,20 +24,21 @@  #include "us-conntrack.h"  #include <signal.h>  #include <stdlib.h> +#include <unistd.h> +#include "timer.h"  void killer(int foo)  {  	/* no signals while handling signals */  	sigprocmask(SIG_BLOCK, &STATE(block), NULL); -	nfnl_subsys_close(STATE(subsys_event)); -	nfnl_subsys_close(STATE(subsys_dump)); -	nfnl_close(STATE(event)); -	nfnl_close(STATE(dump)); +	nfct_close(STATE(event)); +	nfct_close(STATE(dump));  	ignore_pool_destroy(STATE(ignore_pool));  	local_server_destroy(STATE(local));  	STATE(mode)->kill(); +	destroy_alarm_scheduler();          unlink(CONFIG(lockfile));  	dlog(STATE(log), "------- shutdown received ----");  	close_log(STATE(log)); @@ -69,12 +70,16 @@ void local_handler(int fd, void *data)  	switch(type) {  	case FLUSH_MASTER: -		dlog(STATE(log), "[REQ] flushing master table"); -		nl_flush_master_conntrack_table(); +		dlog(STATE(log), "[DEPRECATED] `conntrackd -F' is deprecated. " +				 "Use conntrack -F instead."); +		if (fork() == 0) { +			execlp("conntrack", "conntrack", "-F", NULL); +			exit(EXIT_SUCCESS); +		}  		return;  	case RESYNC_MASTER:  		dlog(STATE(log), "[REQ] resync with master table"); -		nl_dump_conntrack_table(STATE(dump), STATE(subsys_dump)); +		nl_dump_conntrack_table();  		return;  	} @@ -104,6 +109,11 @@ int init(int mode)  		return -1;  	} +        if (init_alarm_scheduler() == -1) { +		dlog(STATE(log), "[FAIL] can't initialize alarm scheduler"); +		return -1; +	} +  	/* local UNIX socket */  	STATE(local) = local_server_create(&CONFIG(local));  	if (!STATE(local)) { @@ -147,22 +157,20 @@ int init(int mode)  	return 0;  } -#define POLL_NSECS 1 - -static void __run(void) +static void __run(long credit, int step)  {  	int max, ret;  	fd_set readfds;  	struct timeval tv = { -		.tv_sec         = POLL_NSECS, -		.tv_usec        = 0 +		.tv_sec         = 0, +		.tv_usec        = credit,  	};  	FD_ZERO(&readfds);  	FD_SET(STATE(local), &readfds); -	FD_SET(nfnl_fd(STATE(event)), &readfds); +	FD_SET(nfct_fd(STATE(event)), &readfds); -	max = MAX(STATE(local), nfnl_fd(STATE(event))); +	max = MAX(STATE(local), nfct_fd(STATE(event)));  	if (STATE(mode)->add_fds_to_set)  		max = MAX(max, STATE(mode)->add_fds_to_set(&readfds)); @@ -185,8 +193,8 @@ static void __run(void)  		do_local_server_step(STATE(local), NULL, local_handler);  	/* conntrack event has happened */ -	if (FD_ISSET(nfnl_fd(STATE(event)), &readfds)) { -		ret = nfnl_catch(STATE(event)); +	if (FD_ISSET(nfct_fd(STATE(event)), &readfds)) { +		while ((ret = nfct_catch(STATE(event))) != -1);  		if (ret == -1) {  			switch(errno) {  			case ENOBUFS: @@ -197,6 +205,7 @@ static void __run(void)  				 * size and resync with master conntrack table.  				 */  				nl_resize_socket_buffer(STATE(event)); +				/* XXX: schedule overrun call via alarm */  				STATE(mode)->overrun();  				break;  			case ENOENT: @@ -206,6 +215,8 @@ static void __run(void)  				 * interested in. Just ignore it.  				 */  				break; +			case EAGAIN: +				break;  			default:  				dlog(STATE(log), "event catch says: %s",  						  strerror(errno)); @@ -214,14 +225,35 @@ static void __run(void)  		}  	} -	if (STATE(mode)->step) -		STATE(mode)->step(&readfds); +	if (STATE(mode)->run) +		STATE(mode)->run(&readfds, step);  	sigprocmask(SIG_UNBLOCK, &STATE(block), NULL);  }  void run(void)  { -	while(1) -		__run(); +	int step = 0; +	struct timer timer; + +	timer_init(&timer); + +	while(1) { +		timer_start(&timer); +		__run(GET_CREDITS(timer), step); +		timer_stop(&timer); + +		if (timer_adjust_credit(&timer)) { +			timer_start(&timer); +			sigprocmask(SIG_BLOCK, &STATE(block), NULL); +			do_alarm_run(step); +			sigprocmask(SIG_UNBLOCK, &STATE(block), NULL); +			timer_stop(&timer); + +			if (timer_adjust_credit(&timer)) +				dlog(STATE(log), "alarm run takes too long!"); + +			step = (step + 1) < STEPS_PER_SECONDS ? step + 1 : 0; +		} +	}  } diff --git a/src/state_helper.c b/src/state_helper.c index 81b0d09..eba9d8f 100644 --- a/src/state_helper.c +++ b/src/state_helper.c @@ -25,7 +25,7 @@ int state_helper_verdict(int type, struct nf_conntrack *ct)  {  	u_int8_t l4proto; -        if (type == NFCT_T_DESTROY) +        if (type == NFCT_Q_DESTROY)  		return ST_H_REPLICATE;  	l4proto = nfct_get_attr_u8(ct, ATTR_ORIG_L4PROTO); diff --git a/src/stats-mode.c b/src/stats-mode.c index 92794cd..65bab1b 100644 --- a/src/stats-mode.c +++ b/src/stats-mode.c @@ -86,7 +86,7 @@ static int local_handler_stats(int fd, int type, void *data)  	return ret;  } -static void dump_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh) +static void dump_stats(struct nf_conntrack *ct)  {  	if (cache_update_force(STATE_STATS(cache), ct))  		debug_ct(ct, "resync entry"); @@ -137,7 +137,7 @@ static void overrun_stats()  	nfct_close(h);  } -static void event_new_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh) +static void event_new_stats(struct nf_conntrack *ct)  {  	if (cache_add(STATE_STATS(cache), ct)) {  		debug_ct(ct, "cache new"); @@ -150,7 +150,7 @@ static void event_new_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh)  	}  } -static void event_update_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh) +static void event_update_stats(struct nf_conntrack *ct)  {  	if (!cache_update_force(STATE_STATS(cache), ct)) {  		debug_ct(ct, "can't update"); @@ -159,7 +159,7 @@ static void event_update_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh)  	debug_ct(ct, "update");  } -static int event_destroy_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh) +static int event_destroy_stats(struct nf_conntrack *ct)  {  	if (cache_del(STATE_STATS(cache), ct)) {  		debug_ct(ct, "cache destroy"); @@ -173,7 +173,7 @@ static int event_destroy_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh)  struct ct_mode stats_mode = {  	.init 			= init_stats,  	.add_fds_to_set 	= NULL, -	.step			= NULL, +	.run			= NULL,  	.local			= local_handler_stats,  	.kill			= kill_stats,  	.dump			= dump_stats, diff --git a/src/sync-mode.c b/src/sync-mode.c index 38ab016..f30cb95 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -27,43 +27,27 @@  #include <sys/select.h>  #include "sync.h"  #include "network.h" +#include "buffer.h" +#include "debug.h" -/* handler for multicast messages received */ -static void mcast_handler() +static void do_mcast_handler_step(struct nethdr *net)  { -	int ret; -	unsigned int type; -	char __net[4096]; -	struct nethdr *net = (struct nethdr *) __net; -	struct nlmsghdr *nlh = (struct nlmsghdr *) (__net + NETHDR_SIZ); +	unsigned int query; +	struct netpld *pld = NETHDR_DATA(net);  	char __ct[nfct_maxsize()];  	struct nf_conntrack *ct = (struct nf_conntrack *) __ct;  	struct us_conntrack *u = NULL; -	ret = mcast_recv_netmsg(STATE_SYNC(mcast_server), net, sizeof(__net)); -	if (ret <= 0) { -		STATE(malformed)++; -		return; -	} -  	if (STATE_SYNC(sync)->recv(net))  		return;  	memset(ct, 0, sizeof(__ct)); -	if ((type = parse_network_msg(ct, nlh)) == NFCT_T_ERROR) { -		STATE(malformed)++; -		return; -	} +	/* XXX: check for malformed */ +	parse_netpld(ct, pld, &query); -	nfct_attr_unset(ct, ATTR_TIMEOUT); -	nfct_attr_unset(ct, ATTR_ORIG_COUNTER_BYTES); -	nfct_attr_unset(ct, ATTR_ORIG_COUNTER_PACKETS); -	nfct_attr_unset(ct, ATTR_REPL_COUNTER_BYTES); -	nfct_attr_unset(ct, ATTR_REPL_COUNTER_PACKETS); - -	switch(type) { -	case NFCT_T_NEW: +	switch(query) { +	case NFCT_Q_CREATE:  retry:		  		if ((u = cache_add(STATE_SYNC(external), ct))) {  			debug_ct(u->ct, "external new"); @@ -80,24 +64,57 @@ retry:  			debug_ct(ct, "can't add");  		}  		break; -	case NFCT_T_UPDATE: +	case NFCT_Q_UPDATE:  		if ((u = cache_update_force(STATE_SYNC(external), ct))) {  			debug_ct(u->ct, "external update");  		} else  			debug_ct(ct, "can't update");  		break; -	case NFCT_T_DESTROY: +	case NFCT_Q_DESTROY:  		if (cache_del(STATE_SYNC(external), ct))  			debug_ct(ct, "external destroy");  		else  			debug_ct(ct, "can't destroy");  		break;  	default: -		dlog(STATE(log), "mcast received unknown msg type %d\n", type); +		dlog(STATE(log), "mcast received unknown query %d\n", query);  		break;  	}  } +/* handler for multicast messages received */ +static void mcast_handler() +{ +	int numbytes, remain; +	char __net[4096], *ptr = __net; + +	numbytes = mcast_recv(STATE_SYNC(mcast_server), __net, sizeof(__net)); +	if (numbytes <= 0) +		return; + +	remain = numbytes; +	while (remain > 0) { +		struct nethdr *net = (struct nethdr *) ptr; + +		if (ntohs(net->len) > remain) { +			dlog(STATE(log), "fragmented messages"); +			break; +		} + +		debug("recv sq: %u fl:%u len:%u (rem:%d)\n",  +			ntohl(net->seq), ntohs(net->flags), +			ntohs(net->len), remain); + +		if (handle_netmsg(net) == -1) { +			STATE(malformed)++; +			return; +		} +		do_mcast_handler_step(net); +		ptr += net->len; +		remain -= net->len; +	} +} +  static int init_sync(void)  {  	int ret; @@ -159,11 +176,6 @@ static int init_sync(void)  	/* initialization of multicast sequence generation */  	STATE_SYNC(last_seq_sent) = time(NULL); -	if (create_alarm_thread() == -1) { -		dlog(STATE(log), "[FAIL] can't initialize alarm thread"); -		return -1; -	} -  	return 0;  } @@ -174,11 +186,17 @@ static int add_fds_to_set_sync(fd_set *readfds)  	return STATE_SYNC(mcast_server->fd);  } -static void step_sync(fd_set *readfds) +static void run_sync(fd_set *readfds, int step)  {  	/* multicast packet has been received */  	if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds))  		mcast_handler(); + +	if (STATE_SYNC(sync)->run) +		STATE_SYNC(sync)->run(step); + +	/* flush pending messages */ +	mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));  }  static void kill_sync() @@ -189,8 +207,6 @@ static void kill_sync()  	mcast_server_destroy(STATE_SYNC(mcast_server));  	mcast_client_destroy(STATE_SYNC(mcast_client)); -	destroy_alarm_thread(); -  	if (STATE_SYNC(sync)->kill)  		STATE_SYNC(sync)->kill();  } @@ -267,10 +283,6 @@ static int local_handler_sync(int fd, int type, void *data)  				     STATE_SYNC(mcast_server));  		dump_stats_sync(fd);  		break; -	case SEND_BULK: -		dlog(STATE(log), "[REQ] sending bulk update"); -		cache_bulk(STATE_SYNC(internal)); -		break;  	default:  		if (STATE_SYNC(sync)->local)  			ret = STATE_SYNC(sync)->local(fd, type, data); @@ -280,7 +292,7 @@ static int local_handler_sync(int fd, int type, void *data)  	return ret;  } -static void dump_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh) +static void dump_sync(struct nf_conntrack *ct)  {  	/* This is required by kernels < 2.6.20 */  	nfct_attr_unset(ct, ATTR_TIMEOUT); @@ -294,23 +306,21 @@ static void dump_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)  		debug_ct(ct, "resync");  } -static void mcast_send_sync(struct nlmsghdr *nlh, -			    struct us_conntrack *u, +static void mcast_send_sync(struct us_conntrack *u,  			    struct nf_conntrack *ct, -			    int type) +			    int query)  { -	char __net[4096]; -	struct nethdr *net = (struct nethdr *) __net; - -	memset(__net, 0, sizeof(__net)); +	int len; +	struct nethdr *net; -	if (!state_helper_verdict(type, ct)) +	if (!state_helper_verdict(query, ct))  		return; -	memcpy(__net + NETHDR_SIZ, nlh, nlh->nlmsg_len); -	mcast_send_netmsg(STATE_SYNC(mcast_client), net); +	net = BUILD_NETMSG(ct, query); +	len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); +	mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);  	if (STATE_SYNC(sync)->send) -		STATE_SYNC(sync)->send(type, net, u); +		STATE_SYNC(sync)->send(net, u);  }  static int overrun_cb(enum nf_conntrack_msg_type type, @@ -332,8 +342,16 @@ static int overrun_cb(enum nf_conntrack_msg_type type,  	if (!cache_test(STATE_SYNC(internal), ct)) {  		if ((u = cache_update_force(STATE_SYNC(internal), ct))) { +			int len; +  			debug_ct(u->ct, "overrun resync"); -			mcast_build_send_update(u); + +			struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE); +			len = prepare_send_netmsg(STATE_SYNC(mcast_client),net); +			mcast_buffered_send_netmsg(STATE_SYNC(mcast_client),  +						   net, len); +			if (STATE_SYNC(sync)->send) +				STATE_SYNC(sync)->send(net, u);  		}  	} @@ -348,9 +366,17 @@ static int overrun_purge_step(void *data1, void *data2)  	ret = nfct_query(h, NFCT_Q_GET, u->ct);  	if (ret == -1 && errno == ENOENT) { +		int len; +		struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_DESTROY); +  		debug_ct(u->ct, "overrun purge resync"); -		mcast_build_send_destroy(u); -		__cache_del(STATE_SYNC(internal), u->ct); + +	        len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); +	        mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); +		if (STATE_SYNC(sync)->send) +			STATE_SYNC(sync)->send(net, u); + +		cache_del(STATE_SYNC(internal), u->ct);  	}  	return 0; @@ -382,7 +408,7 @@ static void overrun_sync()  	nfct_close(h);  } -static void event_new_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh) +static void event_new_sync(struct nf_conntrack *ct)  {  	struct us_conntrack *u; @@ -394,12 +420,12 @@ static void event_new_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)  	nfct_attr_unset(ct, ATTR_TIMEOUT);  retry:  	if ((u = cache_add(STATE_SYNC(internal), ct))) { -		mcast_send_sync(nlh, u, ct, NFCT_T_NEW); +		mcast_send_sync(u, ct, NFCT_Q_CREATE);  		debug_ct(u->ct, "internal new");  	} else {  		if (errno == EEXIST) {  			cache_del(STATE_SYNC(internal), ct); -			mcast_send_sync(nlh, NULL, ct, NFCT_T_DESTROY); +			mcast_send_sync(NULL, ct, NFCT_Q_DESTROY);  			goto retry;  		} @@ -409,7 +435,7 @@ retry:  	}  } -static void event_update_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh) +static void event_update_sync(struct nf_conntrack *ct)  {  	struct us_conntrack *u; @@ -420,15 +446,15 @@ static void event_update_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)  		return;  	}  	debug_ct(u->ct, "internal update"); -	mcast_send_sync(nlh, u, ct, NFCT_T_UPDATE); +	mcast_send_sync(u, ct, NFCT_Q_UPDATE);  } -static int event_destroy_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh) +static int event_destroy_sync(struct nf_conntrack *ct)  {  	nfct_attr_unset(ct, ATTR_TIMEOUT);  	if (cache_del(STATE_SYNC(internal), ct)) { -		mcast_send_sync(nlh, NULL, ct, NFCT_T_DESTROY); +		mcast_send_sync(NULL, ct, NFCT_Q_DESTROY);  		debug_ct(ct, "internal destroy");  	} else  		debug_ct(ct, "can't destroy"); @@ -437,7 +463,7 @@ static int event_destroy_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)  struct ct_mode sync_mode = {  	.init 			= init_sync,  	.add_fds_to_set 	= add_fds_to_set_sync, -	.step			= step_sync, +	.run			= run_sync,  	.local			= local_handler_sync,  	.kill			= kill_sync,  	.dump			= dump_sync, diff --git a/src/sync-nack.c b/src/sync-nack.c index 20ad1f4..dbda0a7 100644 --- a/src/sync-nack.c +++ b/src/sync-nack.c @@ -24,6 +24,7 @@  #include "buffer.h"  #include "debug.h"  #include "network.h" +#include "alarm.h"  #include <libnfnetlink/libnfnetlink.h>  #include <libnetfilter_conntrack/libnetfilter_conntrack.h> @@ -33,28 +34,34 @@  #define dp  #endif -static LIST_HEAD(queue); +static LIST_HEAD(rs_list); +static LIST_HEAD(tx_list); +static unsigned int tx_list_len; +static struct buffer *rs_queue; +static struct buffer *tx_queue;  struct cache_nack { -	struct list_head 	head; +	struct list_head 	rs_list; +	struct list_head	tx_list;  	u_int32_t 		seq;  };  static void cache_nack_add(struct us_conntrack *u, void *data)  {  	struct cache_nack *cn = data; -	INIT_LIST_HEAD(&cn->head); +	INIT_LIST_HEAD(&cn->rs_list); +	INIT_LIST_HEAD(&cn->tx_list);  }  static void cache_nack_del(struct us_conntrack *u, void *data)  {  	struct cache_nack *cn = data; -	if (cn->head.next == &cn->head && -	    cn->head.prev == &cn->head) +	if (cn->rs_list.next == &cn->rs_list && +	    cn->rs_list.prev == &cn->rs_list)  	    	return; -	list_del(&cn->head); +	list_del(&cn->rs_list);  }  static struct cache_extra cache_nack_extra = { @@ -65,19 +72,31 @@ static struct cache_extra cache_nack_extra = {  static int nack_init()  { -	STATE_SYNC(buffer) = buffer_create(CONFIG(resend_buffer_size)); -	if (STATE_SYNC(buffer) == NULL) +	tx_queue = buffer_create(CONFIG(resend_buffer_size)); +	if (tx_queue == NULL) { +		dlog(STATE(log), "[FAIL] cannot create tx buffer");  		return -1; +	} + +	rs_queue = buffer_create(CONFIG(resend_buffer_size)); +	if (rs_queue == NULL) { +		dlog(STATE(log), "[FAIL] cannot create rs buffer"); +		return -1; +	} + +	INIT_LIST_HEAD(&tx_list); +	INIT_LIST_HEAD(&rs_list);  	return 0;  }  static void nack_kill()  { -	buffer_destroy(STATE_SYNC(buffer)); +	buffer_destroy(rs_queue); +	buffer_destroy(tx_queue);  } -static void mcast_send_control(u_int32_t flags, u_int32_t from, u_int32_t to) +static void tx_queue_add_ctlmsg(u_int32_t flags, u_int32_t from, u_int32_t to)  {  	struct nethdr_ack ack = {  		.flags = flags, @@ -85,8 +104,19 @@ static void mcast_send_control(u_int32_t flags, u_int32_t from, u_int32_t to)  		.to    = to,  	}; -	mcast_send_error(STATE_SYNC(mcast_client), &ack); -	buffer_add(STATE_SYNC(buffer), &ack, NETHDR_ACK_SIZ); +	buffer_add(tx_queue, &ack, NETHDR_ACK_SIZ); +} + +static int do_cache_to_tx(void *data1, void *data2) +{ +	struct us_conntrack *u = data2; +	struct cache_nack *cn = cache_get_extra(STATE_SYNC(internal), u); + +	/* add to tx list */ +	list_add(&cn->tx_list, &tx_list); +	tx_list_len++; + +	return 0;  }  static int nack_local(int fd, int type, void *data) @@ -94,85 +124,78 @@ static int nack_local(int fd, int type, void *data)  	int ret = 1;  	switch(type) { -		case REQUEST_DUMP: -			mcast_send_control(NET_F_RESYNC, 0, 0); -			dlog(STATE(log), "[REQ] request resync"); -			break; -		default: -			ret = 0; -			break; +	case REQUEST_DUMP: +		dlog(STATE(log), "[REQ] request resync"); +		tx_queue_add_ctlmsg(NET_F_RESYNC, 0, 0); +		break; +	case SEND_BULK: +		dlog(STATE(log), "[REQ] sending bulk update"); +		cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx); +		break; +	default: +		ret = 0; +		break;  	}  	return ret;  } -static int buffer_compare(void *data1, void *data2) +static int rs_queue_to_tx(void *data1, void *data2)  {  	struct nethdr *net = data1;  	struct nethdr_ack *nack = data2; -	struct nlmsghdr *nlh = data1 + NETHDR_SIZ; - -	unsigned old_seq = ntohl(net->seq); -	if (between(ntohl(net->seq), nack->from, nack->to)) { -		if (mcast_resend_netmsg(STATE_SYNC(mcast_client), net)) -			dp("resend destroy (old seq=%u) (seq=%u)\n",  -			   old_seq, ntohl(net->seq)); +	if (between(net->seq, nack->from, nack->to)) { +		dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", +			net->seq, net->flags, net->len); +		buffer_add(tx_queue, net, net->len);  	}  	return 0;  } -static int buffer_remove(void *data1, void *data2) +static int rs_queue_empty(void *data1, void *data2)  {  	struct nethdr *net = data1;  	struct nethdr_ack *h = data2; -	if (between(ntohl(net->seq), h->from, h->to)) { -		dp("remove from buffer (seq=%u)\n", ntohl(net->seq)); -		__buffer_del(STATE_SYNC(buffer), data1); +	if (between(net->seq, h->from, h->to)) { +		dp("remove from buffer (seq=%u)\n", net->seq); +		buffer_del(rs_queue, data1);  	}  	return 0;  } -static void queue_resend(struct cache *c, unsigned int from, unsigned int to) +static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to)  {  	struct list_head *n;  	struct us_conntrack *u; -	list_for_each(n, &queue) { +	list_for_each(n, &rs_list) {  		struct cache_nack *cn = (struct cache_nack *) n;  		struct us_conntrack *u;  		u = cache_get_conntrack(STATE_SYNC(internal), cn); -  		if (between(cn->seq, from, to)) { -			debug_ct(u->ct, "resend nack"); -			dp("resending nack'ed (oldseq=%u) ", cn->seq); - -			if (mcast_build_send_update(u) == -1) -				continue; - -			dp("(newseq=%u)\n", cn->seq); +			dp("resending nack'ed (oldseq=%u)\n", cn->seq); +			list_add(&cn->tx_list, &tx_list); +			tx_list_len++;  		}   	}  } -static void queue_empty(struct cache *c, unsigned int from, unsigned int to) +static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)  {  	struct list_head *n, *tmp; -	struct us_conntrack *u; -	dp("ACK from %u to %u\n", from, to); -	list_for_each_safe(n, tmp, &queue) { +	list_for_each_safe(n, tmp, &rs_list) {  		struct cache_nack *cn = (struct cache_nack *) n; +		struct us_conntrack *u;  		u = cache_get_conntrack(STATE_SYNC(internal), cn);  		if (between(cn->seq, from, to)) { -			dp("remove %u\n", cn->seq); -			debug_ct(u->ct, "ack received: empty queue");  			dp("queue: deleting from queue (seq=%u)\n", cn->seq); -			list_del(&cn->head); -			INIT_LIST_HEAD(&cn->head); +			list_del(&cn->rs_list); +			INIT_LIST_HEAD(&cn->rs_list);  		}   	}  } @@ -187,73 +210,149 @@ static int nack_recv(const struct nethdr *net)  	if (!mcast_track_seq(net->seq, &exp_seq)) {  		dp("OOS: sending nack (seq=%u)\n", exp_seq); -		mcast_send_control(NET_F_NACK, exp_seq, net->seq - 1); +		tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);  		window = CONFIG(window_size);  	} else {  		/* received a window, send an acknowledgement */  		if (--window == 0) {  			dp("sending ack (seq=%u)\n", net->seq); -			mcast_send_control(NET_F_ACK,  -					   net->seq - CONFIG(window_size),  -					   net->seq); +			tx_queue_add_ctlmsg(NET_F_ACK,  +					    net->seq - CONFIG(window_size),  +					    net->seq);  		}  	} -	if (net->flags & NET_F_NACK) { +	if (IS_NACK(net)) {  		struct nethdr_ack *nack = (struct nethdr_ack *) net;  		dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to); -		queue_resend(STATE_SYNC(internal), nack->from, nack->to); -		buffer_iterate(STATE_SYNC(buffer), nack, buffer_compare); +		rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to); +		buffer_iterate(rs_queue, nack, rs_queue_to_tx);  		return 1; -	} else if (net->flags & NET_F_RESYNC) { +	} else if (IS_RESYNC(net)) {  		dp("RESYNC ALL\n"); -		cache_bulk(STATE_SYNC(internal)); +		cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx);  		return 1; -	} else if (net->flags & NET_F_ACK) { +	} else if (IS_ACK(net)) {  		struct nethdr_ack *h = (struct nethdr_ack *) net;  		dp("ACK: from seq=%u to seq=%u\n", h->from, h->to); -		queue_empty(STATE_SYNC(internal), h->from, h->to); -		buffer_iterate(STATE_SYNC(buffer), h, buffer_remove); +		rs_list_empty(STATE_SYNC(internal), h->from, h->to); +		buffer_iterate(rs_queue, h, rs_queue_empty); +		return 1; +	} else if (IS_ALIVE(net))  		return 1; -	}  	return 0;  } -static void nack_send(int type,  -		      const struct nethdr *net, -		      struct us_conntrack *u) +static void nack_send(struct nethdr *net, struct us_conntrack *u)  { -	int size = NETHDR_SIZ; - 	struct nlmsghdr *nlh = (struct nlmsghdr *) ((void *) net + size); +	struct netpld *pld = NETHDR_DATA(net);  	struct cache_nack *cn; -  -	size += ntohl(nlh->nlmsg_len); -	switch(type) { -	case NFCT_T_NEW: -	case NFCT_T_UPDATE: +	HDR_NETWORK2HOST(net); + +	switch(ntohs(pld->query)) { +	case NFCT_Q_CREATE: +	case NFCT_Q_UPDATE:  		cn = (struct cache_nack *)   			cache_get_extra(STATE_SYNC(internal), u); -		if (cn->head.next == &cn->head && -		    cn->head.prev == &cn->head) +		if (cn->rs_list.next == &cn->rs_list && +		    cn->rs_list.prev == &cn->rs_list)  		    	goto insert; -		list_del(&cn->head); -		INIT_LIST_HEAD(&cn->head); +		list_del(&cn->rs_list); +		INIT_LIST_HEAD(&cn->rs_list);  insert: -		cn->seq = ntohl(net->seq); -		list_add(&cn->head, &queue); +		cn->seq = net->seq; +		list_add(&cn->rs_list, &rs_list);  		break; -	case NFCT_T_DESTROY: -		buffer_add(STATE_SYNC(buffer), net, size); +	case NFCT_Q_DESTROY: +		buffer_add(rs_queue, net, net->len);  		break;  	}  } +static int tx_queue_xmit(void *data1, void *data2) +{ +	struct nethdr *net = data1; +	int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); + +	dp("tx_queue sq: %u fl:%u len:%u\n", +               ntohl(net->seq), ntohs(net->flags), ntohs(net->len)); + +	mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); +	HDR_NETWORK2HOST(net); + +	if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) { +		dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n", +        	       net->seq, net->flags, net->len); +		buffer_add(rs_queue, net, net->len); +	} +	buffer_del(tx_queue, net); + +	return 0; +} + +static int tx_list_xmit(struct list_head *i, struct us_conntrack *u) +{ +	int ret; +	struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE); +	int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); + +	dp("tx_list sq: %u fl:%u len:%u\n", +                ntohl(net->seq), ntohs(net->flags), +                ntohs(net->len)); + +	list_del(i); +	INIT_LIST_HEAD(i); +	tx_list_len--; + +	ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); +	if (STATE_SYNC(sync)->send) +		STATE_SYNC(sync)->send(net, u); + +	return ret; +} + +static struct alarm_list alive_alarm; + +static void do_alive_alarm(struct alarm_list *a, void *data) +{ +	del_alarm(a); +	tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0); +} + +static void nack_run(int step) +{ +	struct list_head *i, *tmp; + +	/* send messages in the tx_queue */ +	buffer_iterate(tx_queue, NULL, tx_queue_xmit); + +	/* send conntracks in the tx_list */ +	list_for_each_safe(i, tmp, &tx_list) { +		struct cache_nack *cn; +		struct us_conntrack *u; + +		cn = container_of(i, struct cache_nack, tx_list); +		u = cache_get_conntrack(STATE_SYNC(internal), cn); +		tx_list_xmit(i, u); +	} + +	if (alive_alarm.expires > 0) +		mod_alarm(&alive_alarm, 1); +	else { +		init_alarm(&alive_alarm); +		/* XXX: alive message expiration configurable */ +		set_alarm_expiration(&alive_alarm, 1); +		set_alarm_function(&alive_alarm, do_alive_alarm); +		add_alarm(&alive_alarm); +	} +} +  struct sync_mode nack = {  	.internal_cache_flags	= LIFETIME,  	.external_cache_flags	= LIFETIME, @@ -263,4 +362,5 @@ struct sync_mode nack = {  	.local			= nack_local,  	.recv			= nack_recv,  	.send			= nack_send, +	.run			= nack_run,  }; diff --git a/src/sync-notrack.c b/src/sync-notrack.c index 1d6eba8..8588ecf 100644 --- a/src/sync-notrack.c +++ b/src/sync-notrack.c @@ -24,12 +24,16 @@  static void refresher(struct alarm_list *a, void *data)  { +	int len; +	struct nethdr *net;  	struct us_conntrack *u = data;  	debug_ct(u->ct, "persistence update");  	a->expires = random() % CONFIG(refresh) + 1; -	mcast_build_send_update(u); +	net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE); +	len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); +	mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);  }  static void cache_notrack_add(struct us_conntrack *u, void *data) diff --git a/src/timer.c b/src/timer.c new file mode 100644 index 0000000..b85c286 --- /dev/null +++ b/src/timer.c @@ -0,0 +1,75 @@ +/* + * (C) 2006-2007 by Pablo Neira Ayuso <pablo@netfilter.org> + *  + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ +#include <stdlib.h> +#include <assert.h> +#include <string.h> +#include "conntrackd.h" +#include "timer.h" + +#define TIMESLICE_CREDIT (1000000 / STEPS_PER_SECONDS) /* 200 ms timeslice */ + +void timer_init(struct timer *timer) +{ +	memset(timer, 0, sizeof(struct timer)); +	timer->credits = TIMESLICE_CREDIT; +} + +void timer_start(struct timer *timer) +{ +	gettimeofday(&timer->start, NULL); +} + +static int timeval_subtract(struct timeval *diff,  +			    struct timeval *start,  +			    struct timeval *stop) +{ +	diff->tv_sec = stop->tv_sec - start->tv_sec; +	diff->tv_usec = stop->tv_usec - start->tv_usec; + +	if (diff->tv_usec < 0) { +		diff->tv_usec += 1000000; +		diff->tv_sec--; +	} + +	/* Return 1 if result is negative. */ +	return diff->tv_sec < 0; +} + +void timer_stop(struct timer *timer) +{ +	gettimeofday(&timer->stop, NULL); +	timeval_subtract(&timer->diff, &timer->start, &timer->stop); +} + +int timer_adjust_credit(struct timer *timer) +{ +	if (timer->diff.tv_sec != 0) { +		timer->credits = TIMESLICE_CREDIT; +		return 1; +	} + +	timer->credits -= timer->diff.tv_usec; + +	if (timer->credits < 0) { +		timer->credits += TIMESLICE_CREDIT; +		if (timer->credits < 0) +			timer->credits = TIMESLICE_CREDIT; +		return 1; +	} +	return 0; +} | 
