diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/channel.c | 5 | ||||
| -rw-r--r-- | src/channel_tcp.c | 1 | ||||
| -rw-r--r-- | src/fds.c | 61 | ||||
| -rw-r--r-- | src/main.c | 2 | ||||
| -rw-r--r-- | src/multichannel.c | 7 | ||||
| -rw-r--r-- | src/run.c | 251 | ||||
| -rw-r--r-- | src/stats-mode.c | 1 | ||||
| -rw-r--r-- | src/sync-mode.c | 146 | ||||
| -rw-r--r-- | src/tcp.c | 1 | 
9 files changed, 236 insertions, 239 deletions
| diff --git a/src/channel.c b/src/channel.c index 818bb01..8b7c319 100644 --- a/src/channel.c +++ b/src/channel.c @@ -310,3 +310,8 @@ int channel_accept(struct channel *c)  {  	return c->ops->accept(c);  } + +int channel_type(struct channel *c) +{ +	return c->ops->type; +} diff --git a/src/channel_tcp.c b/src/channel_tcp.c index f132840..a84603c 100644 --- a/src/channel_tcp.c +++ b/src/channel_tcp.c @@ -137,6 +137,7 @@ channel_tcp_accept(struct channel *c)  struct channel_ops channel_tcp = {  	.headersiz	= 40, /* IP header (20 bytes) + TCP header 20 (bytes) */ +	.type		= CHANNEL_T_STREAM,  	.open		= channel_tcp_open,  	.close		= channel_tcp_close,  	.send		= channel_tcp_send, @@ -1,5 +1,5 @@  /* - * (C) 2006-2008 by Pablo Neira Ayuso <pablo@netfilter.org> + * (C) 2006-2012 by Pablo Neira Ayuso <pablo@netfilter.org>   *   * This program is free software; you can redistribute it and/or modify   * it under the terms of the GNU General Public License as published by @@ -14,9 +14,16 @@   * You should have received a copy of the GNU General Public License   * along with this program; if not, write to the Free Software   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * Part of this code has been sponsored by Vyatta Inc. <http://www.vyatta.com>   */  #include <stdlib.h>  #include <string.h> +#include <errno.h> +#include <signal.h> + +#include "conntrackd.h" +#include "date.h"  #include "fds.h"  struct fds *create_fds(void) @@ -44,7 +51,7 @@ void destroy_fds(struct fds *fds)  	free(fds);  } -int register_fd(int fd, struct fds *fds) +int register_fd(int fd, void (*cb)(void *data), void *data, struct fds *fds)  {  	struct fds_item *item; @@ -58,7 +65,10 @@ int register_fd(int fd, struct fds *fds)  		return -1;  	item->fd = fd; -	list_add(&item->head, &fds->list); +	item->cb = cb; +	item->data = data; +	/* Order matters: the descriptors are served in FIFO basis. */ +	list_add_tail(&item->head, &fds->list);  	return 0;  } @@ -92,3 +102,48 @@ int unregister_fd(int fd, struct fds *fds)  	return 0;  } +static void select_main_step(struct timeval *next_alarm) +{ +	int ret; +	fd_set readfds = STATE(fds)->readfds; +	struct fds_item *cur, *tmp; + +	ret = select(STATE(fds)->maxfd + 1, &readfds, NULL, NULL, next_alarm); +	if (ret == -1) { +		/* interrupted syscall, retry */ +		if (errno == EINTR) +			return; + +		STATE(stats).select_failed++; +		return; +	} + +	/* signals are racy */ +	sigprocmask(SIG_BLOCK, &STATE(block), NULL); + +	list_for_each_entry_safe(cur, tmp, &STATE(fds)->list, head) { +		if (FD_ISSET(cur->fd, &readfds)) +			cur->cb(cur->data); +	} + +	sigprocmask(SIG_UNBLOCK, &STATE(block), NULL); +} + +void __attribute__((noreturn)) select_main_loop(void) +{ +	struct timeval next_alarm; +	struct timeval *next = NULL; + +	while(1) { +		do_gettimeofday(); + +		sigprocmask(SIG_BLOCK, &STATE(block), NULL); +		if (next != NULL && !timerisset(next)) +			next = do_alarm_run(&next_alarm); +		else +			next = get_next_alarm_run(&next_alarm); +		sigprocmask(SIG_UNBLOCK, &STATE(block), NULL); + +		select_main_step(next); +	} +} @@ -406,6 +406,6 @@ int main(int argc, char *argv[])  	/*  	 * run main process  	 */ -	run(); +	select_main_loop();  	return 0;  } diff --git a/src/multichannel.c b/src/multichannel.c index de69d5c..952b567 100644 --- a/src/multichannel.c +++ b/src/multichannel.c @@ -109,8 +109,9 @@ void multichannel_set_current_channel(struct multichannel *m, int i)  	m->current = m->channel[i];  } -void multichannel_change_current_channel(struct multichannel *m, int i) +void +multichannel_change_current_channel(struct multichannel *m, struct channel *c)  { -	if (m->current != m->channel[i]) -		m->current = m->channel[i]; +	if (m->current != c) +		m->current = c;  } @@ -1,5 +1,5 @@  /* - * (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org> + * (C) 2006-2012 by Pablo Neira Ayuso <pablo@netfilter.org>   * (C) 2011 by Vyatta Inc. <http://www.vyatta.com>   *   * This program is free software; you can redistribute it and/or modify @@ -16,7 +16,7 @@   * along with this program; if not, write to the Free Software   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   * - * Description: run and init functions + * Part of this code has been sponsored by Vyatta Inc. <http://www.vyatta.com>   */  #include "conntrackd.h" @@ -460,6 +460,87 @@ static int exp_get_handler(enum nf_conntrack_msg_type type,  	return NFCT_CB_CONTINUE;  } +/* order received via UNIX socket */ +static void local_cb(void *data) +{ +	do_local_server_step(&STATE(local), NULL, local_handler); +} + +/* we have received an event from ctnetlink */ +static void event_cb(void *data) +{ +	int ret; + +	ret = nfct_catch(STATE(event)); +	/* reset event iteration limit counter */ +	STATE(event_iterations_limit) = CONFIG(event_iterations_limit); +	if (ret == -1) { +		switch(errno) { +		case ENOBUFS: +			/* We have hit ENOBUFS, it's likely that we are +			 * losing events. Two possible situations may +			 * trigger this error: +			 * +			 * 1) The netlink receiver buffer is too small: +			 *    increasing the netlink buffer size should +			 *    be enough. However, some event messages +			 *    got lost. We have to resync ourselves +			 *    with the kernel table conntrack table to +			 *    resolve the inconsistency. +			 * +			 * 2) The receiver is too slow to process the +			 *    netlink messages so that the queue gets +			 *    full quickly. This generally happens +			 *    if the system is under heavy workload +			 *    (busy CPU). In this case, increasing the +			 *    size of the netlink receiver buffer +			 *    would not help anymore since we would +			 *    be delaying the overrun. Moreover, we +			 *    should avoid resynchronizations. We +			 *    should do our best here and keep +			 *    replicating as much states as possible. +			 *    If workload lowers at some point, +			 *    we resync ourselves. +			 */ +			nl_resize_socket_buffer(STATE(event)); +			if (CONFIG(nl_overrun_resync) > 0 && +			    STATE(mode)->internal->flags & INTERNAL_F_RESYNC) { +				add_alarm(&STATE(resync_alarm), +					  CONFIG(nl_overrun_resync),0); +			} +			STATE(stats).nl_catch_event_failed++; +			STATE(stats).nl_overrun++; +			break; +		case ENOENT: +			/* +			 * We received a message from another +			 * netfilter subsystem that we are not +			 * interested in. Just ignore it. +			 */ +		break; +		case EAGAIN: +			/* No more events to receive, try later. */ +			break; +		default: +			STATE(stats).nl_catch_event_failed++; +			break; +		} +	} +} + +/* we previously requested a resync due to buffer overrun. */ +static void resync_cb(void *data) +{ +	nfct_catch(STATE(resync)); +	if (STATE(mode)->internal->ct.purge) +		STATE(mode)->internal->ct.purge(); +} + +static void poll_cb(void *data) +{ +	nfct_catch(STATE(resync)); +} +  int  init(void)  { @@ -493,7 +574,7 @@ init(void)  		dlog(LOG_ERR, "can't open unix socket!");  		return -1;  	} -	register_fd(STATE(local).fd, STATE(fds)); +	register_fd(STATE(local).fd, local_cb, NULL, STATE(fds));  	/* resynchronize (like 'dump' socket) but it also purges old entries */  	STATE(resync) = nfct_open(CONFIG(netlink).subsys_id, 0); @@ -507,7 +588,13 @@ init(void)  			       NFCT_T_ALL,  			       STATE(mode)->internal->ct.resync,  			       NULL); -	register_fd(nfct_fd(STATE(resync)), STATE(fds)); +	if (CONFIG(flags) & CTD_POLL) { +		register_fd(nfct_fd(STATE(resync)), poll_cb, +				NULL, STATE(fds)); +	} else { +		register_fd(nfct_fd(STATE(resync)), resync_cb, +				NULL, STATE(fds)); +	}  	fcntl(nfct_fd(STATE(resync)), F_SETFL, O_NONBLOCK);  	if (STATE(mode)->internal->flags & INTERNAL_F_POPULATE) { @@ -590,7 +677,7 @@ init(void)  			nfexp_callback_register2(STATE(event), NFCT_T_ALL,  						 exp_event_handler, NULL);  		} -		register_fd(nfct_fd(STATE(event)), STATE(fds)); +		register_fd(nfct_fd(STATE(event)), event_cb, NULL, STATE(fds));  	}  	/* Signals handling */ @@ -618,157 +705,3 @@ init(void)  	return 0;  } - -static void run_events(struct timeval *next_alarm) -{ -	int ret; -	fd_set readfds = STATE(fds)->readfds; - -	ret = select(STATE(fds)->maxfd + 1, &readfds, NULL, NULL, next_alarm); -	if (ret == -1) { -		/* interrupted syscall, retry */ -		if (errno == EINTR) -			return; - -		STATE(stats).select_failed++; -		return; -	} - -	/* signals are racy */ -	sigprocmask(SIG_BLOCK, &STATE(block), NULL); - -	/* order received via UNIX socket */ -	if (FD_ISSET(STATE(local).fd, &readfds)) -		do_local_server_step(&STATE(local), NULL, local_handler); - -	/* we have receive an event from ctnetlink */ -	if (FD_ISSET(nfct_fd(STATE(event)), &readfds)) { -		ret = nfct_catch(STATE(event)); -		/* reset event iteration limit counter */ -		STATE(event_iterations_limit) = CONFIG(event_iterations_limit); -		if (ret == -1) { -		switch(errno) { -		case ENOBUFS: -			/* We have hit ENOBUFS, it's likely that we are -			 * losing events. Two possible situations may -			 * trigger this error: -			 * -			 * 1) The netlink receiver buffer is too small: -			 *    increasing the netlink buffer size should -			 *    be enough. However, some event messages -			 *    got lost. We have to resync ourselves -			 *    with the kernel table conntrack table to -			 *    resolve the inconsistency.  -			 * -			 * 2) The receiver is too slow to process the -			 *    netlink messages so that the queue gets -			 *    full quickly. This generally happens -			 *    if the system is under heavy workload -			 *    (busy CPU). In this case, increasing the -			 *    size of the netlink receiver buffer -			 *    would not help anymore since we would -			 *    be delaying the overrun. Moreover, we -			 *    should avoid resynchronizations. We  -			 *    should do our best here and keep -			 *    replicating as much states as possible. -			 *    If workload lowers at some point, -			 *    we resync ourselves. -			 */ -			nl_resize_socket_buffer(STATE(event)); -			if (CONFIG(nl_overrun_resync) > 0 && -			    STATE(mode)->internal->flags & INTERNAL_F_RESYNC) { -				add_alarm(&STATE(resync_alarm), -					  CONFIG(nl_overrun_resync),0); -			} -			STATE(stats).nl_catch_event_failed++; -			STATE(stats).nl_overrun++; -			break; -		case ENOENT: -			/* -			 * We received a message from another -			 * netfilter subsystem that we are not -			 * interested in. Just ignore it. -			 */ -			break; -		case EAGAIN: -			/* No more events to receive, try later. */ -			break; -		default: -			STATE(stats).nl_catch_event_failed++; -			break; -		} -		} -	} -	/* we previously requested a resync due to buffer overrun. */ -	if (FD_ISSET(nfct_fd(STATE(resync)), &readfds)) { -		nfct_catch(STATE(resync)); -		if (STATE(mode)->internal->ct.purge) -			STATE(mode)->internal->ct.purge(); -	} - -	if (STATE(mode)->run) -		STATE(mode)->run(&readfds); - -	sigprocmask(SIG_UNBLOCK, &STATE(block), NULL); -} - -static void run_polling(struct timeval *next_alarm) -{ -	int ret; -	fd_set readfds = STATE(fds)->readfds; - -	ret = select(STATE(fds)->maxfd + 1, &readfds, NULL, NULL, next_alarm); -	if (ret == -1) { -		/* interrupted syscall, retry */ -		if (errno == EINTR) -			return; - -		STATE(stats).select_failed++; -		return; -	} - -	/* signals are racy */ -	sigprocmask(SIG_BLOCK, &STATE(block), NULL); - -	/* order received via UNIX socket */ -	if (FD_ISSET(STATE(local).fd, &readfds)) -		do_local_server_step(&STATE(local), NULL, local_handler); - -	/* we requested a dump from the kernel via polling_alarm */ -	if (FD_ISSET(nfct_fd(STATE(resync)), &readfds)) -		nfct_catch(STATE(resync)); - -	if (STATE(mode)->run) -		STATE(mode)->run(&readfds); - -	sigprocmask(SIG_UNBLOCK, &STATE(block), NULL); -} - -static void __attribute__((noreturn)) -do_run(void (*run_step)(struct timeval *next_alarm)) -{ -	struct timeval next_alarm;  -	struct timeval *next = NULL; - -	while(1) { -		do_gettimeofday(); - -		sigprocmask(SIG_BLOCK, &STATE(block), NULL); -		if (next != NULL && !timerisset(next)) -			next = do_alarm_run(&next_alarm); -		else -			next = get_next_alarm_run(&next_alarm); -		sigprocmask(SIG_UNBLOCK, &STATE(block), NULL); - -		run_step(next); -	} -} - -void run(void) -{ -	if (CONFIG(flags) & CTD_POLL) { -		do_run(run_polling); -	} else { -		do_run(run_events); -	} -} diff --git a/src/stats-mode.c b/src/stats-mode.c index b768033..6b7f08d 100644 --- a/src/stats-mode.c +++ b/src/stats-mode.c @@ -201,7 +201,6 @@ static struct internal_handler internal_cache_stats = {  struct ct_mode stats_mode = {  	.init 			= init_stats, -	.run			= NULL,  	.local			= local_handler_stats,  	.kill			= kill_stats,  	.internal		= &internal_cache_stats, diff --git a/src/sync-mode.c b/src/sync-mode.c index 17f866a..be6366d 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -1,5 +1,5 @@  /* - * (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org> + * (C) 2006-2012 by Pablo Neira Ayuso <pablo@netfilter.org>   * (C) 2011 by Vyatta Inc. <http://www.vyatta.com>   *   * This program is free software; you can redistribute it and/or modify @@ -78,7 +78,7 @@ static struct nf_expect *msg2exp_alloc(struct nethdr *net, size_t remain)  }  static void -do_channel_handler_step(int i, struct nethdr *net, size_t remain) +do_channel_handler_step(struct channel *c, struct nethdr *net, size_t remain)  {  	struct nf_conntrack *ct = NULL;  	struct nf_expect *exp = NULL; @@ -91,10 +91,10 @@ do_channel_handler_step(int i, struct nethdr *net, size_t remain)  	switch (STATE_SYNC(sync)->recv(net)) {  	case MSG_DATA: -		multichannel_change_current_channel(STATE_SYNC(channel), i); +		multichannel_change_current_channel(STATE_SYNC(channel), c);  		break;  	case MSG_CTL: -		multichannel_change_current_channel(STATE_SYNC(channel), i); +		multichannel_change_current_channel(STATE_SYNC(channel), c);  		return;  	case MSG_BAD:  		STATE_SYNC(error).msg_rcv_malformed++; @@ -175,7 +175,7 @@ static int channel_stream(struct channel *m, const char *ptr, ssize_t remain)  }  /* handler for messages received */ -static int channel_handler_routine(struct channel *m, int i) +static int channel_handler_routine(struct channel *m)  {  	ssize_t numbytes;  	ssize_t remain, pending = cur - __net; @@ -242,7 +242,7 @@ static int channel_handler_routine(struct channel *m, int i)  		HDR_NETWORK2HOST(net); -		do_channel_handler_step(i, net, remain); +		do_channel_handler_step(m, net, remain);  		ptr += net->len;  		remain -= net->len;  	} @@ -250,12 +250,13 @@ static int channel_handler_routine(struct channel *m, int i)  }  /* handler for messages received */ -static void channel_handler(struct channel *m, int i) +static void channel_handler(void *data)  { +	struct channel *c = data;  	int k;  	for (k=0; k<CONFIG(event_iterations_limit); k++) { -		if (channel_handler_routine(m, i) == -1) { +		if (channel_handler_routine(c) == -1) {  			break;  		}  	} @@ -284,7 +285,7 @@ static void interface_candidate(void)  	dlog(LOG_ERR, "no dedicated links available!");  } -static void interface_handler(void) +static void interface_handler(void *data)  {  	int idx = multichannel_get_current_ifindex(STATE_SYNC(channel));  	unsigned int flags; @@ -311,6 +312,53 @@ static void do_reset_cache_alarm(struct alarm_block *a, void *data)  	STATE(mode)->internal->ct.flush();  } +static void commit_cb(void *data) +{ +	int ret; + +	read_evfd(STATE_SYNC(commit).evfd); + +	ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0); +	if (ret == 0) { +		/* we still have things in the callback queue. */ +		if (STATE_SYNC(commit).rq[1].cb) { +			int fd = STATE_SYNC(commit).clientfd; + +			STATE_SYNC(commit).rq[0].cb = +				STATE_SYNC(commit).rq[1].cb; + +			STATE_SYNC(commit).rq[1].cb = NULL; + +			STATE_SYNC(commit).clientfd = -1; +			STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, fd); +		} else { +			/* Close the client socket now, we're done. */ +			close(STATE_SYNC(commit).clientfd); +			STATE_SYNC(commit).clientfd = -1; +		} +	} +} + +static void channel_accept_cb(void *data) +{ +	struct channel *c = data; +	int fd; + +	fd = channel_accept(data); +	if (fd < 0) +		return; + +	register_fd(fd, channel_handler, c, STATE(fds)); +} + +static void tx_queue_cb(void *data) +{ +	STATE_SYNC(sync)->xmit(); + +	/* flush pending messages */ +	multichannel_send_flush(STATE_SYNC(channel)); +} +  static int init_sync(void)  {  	int i; @@ -370,8 +418,19 @@ static int init_sync(void)  	for (i=0; i<STATE_SYNC(channel)->channel_num; i++) {  		int fd = channel_get_fd(STATE_SYNC(channel)->channel[i]);  		fcntl(fd, F_SETFL, O_NONBLOCK); -		if (register_fd(fd, STATE(fds)) == -1) -			return -1; + +		switch(channel_type(STATE_SYNC(channel)->channel[i])) { +		case CHANNEL_T_STREAM: +			register_fd(fd, channel_accept_cb, +					STATE_SYNC(channel)->channel[i], +					STATE(fds)); +			break; +		case CHANNEL_T_DATAGRAM: +			register_fd(fd, channel_handler, +					STATE_SYNC(channel)->channel[i], +					STATE(fds)); +			break; +		}  	}  	STATE_SYNC(interface) = nl_init_interface_handler(); @@ -379,7 +438,8 @@ static int init_sync(void)  		dlog(LOG_ERR, "can't open interface watcher");  		return -1;  	} -	if (register_fd(nlif_fd(STATE_SYNC(interface)), STATE(fds)) == -1) +	if (register_fd(nlif_fd(STATE_SYNC(interface)), +			interface_handler, NULL, STATE(fds)) == -1)  		return -1;  	STATE_SYNC(tx_queue) = queue_create("txqueue", INT_MAX, QUEUE_F_EVFD); @@ -387,8 +447,8 @@ static int init_sync(void)  		dlog(LOG_ERR, "cannot create tx queue");  		return -1;  	} -	if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)),  -							STATE(fds)) == -1) +	if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)), +			tx_queue_cb, NULL, STATE(fds)) == -1)  		return -1;  	STATE_SYNC(commit).h = nfct_open(CONFIG(netlink).subsys_id, 0); @@ -404,7 +464,7 @@ static int init_sync(void)  		return -1;  	}  	if (register_fd(get_read_evfd(STATE_SYNC(commit).evfd), -							STATE(fds)) == -1) { +				commit_cb, NULL, STATE(fds)) == -1) {  		return -1;  	}  	STATE_SYNC(commit).clientfd = -1; @@ -417,61 +477,6 @@ static int init_sync(void)  	return 0;  } -static void channel_check(struct channel *c, int i, fd_set *readfds) -{ -	/* In case that this channel is connection-oriented. */ -	if (channel_accept_isset(c, readfds)) -		channel_accept(c); - -	/* For data handling. */ -	if (channel_isset(c, readfds)) -		channel_handler(c, i); -} - -static void run_sync(fd_set *readfds) -{ -	int i; - -	for (i=0; i<STATE_SYNC(channel)->channel_num; i++) -		channel_check(STATE_SYNC(channel)->channel[i], i, readfds); - -	if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds)) -		STATE_SYNC(sync)->xmit(); - -	if (FD_ISSET(nlif_fd(STATE_SYNC(interface)), readfds)) -		interface_handler(); - -	if (FD_ISSET(get_read_evfd(STATE_SYNC(commit).evfd), readfds)) { -		int ret; - -		read_evfd(STATE_SYNC(commit).evfd); - -		ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0); -		if (ret == 0) { -			/* we still have things in the callback queue. */ -			if (STATE_SYNC(commit).rq[1].cb) { -				int fd = STATE_SYNC(commit).clientfd; - -				STATE_SYNC(commit).rq[0].cb = -					STATE_SYNC(commit).rq[1].cb; - -				STATE_SYNC(commit).rq[1].cb = NULL; - -				STATE_SYNC(commit).clientfd = -1; -				STATE_SYNC(commit).rq[0].cb( -					STATE_SYNC(commit).h, fd); -			} else { -				/* Close the client socket now, we're done. */ -				close(STATE_SYNC(commit).clientfd); -				STATE_SYNC(commit).clientfd = -1; -			} -		} -	} - -	/* flush pending messages */ -	multichannel_send_flush(STATE_SYNC(channel)); -} -  static void kill_sync(void)  {  	STATE(mode)->internal->close(); @@ -747,7 +752,6 @@ static int local_handler_sync(int fd, int type, void *data)  struct ct_mode sync_mode = {  	.init 			= init_sync, -	.run			= run_sync,  	.local			= local_handler_sync,  	.kill			= kill_sync,  	/* the internal handler is set in run-time. */ @@ -264,7 +264,6 @@ int tcp_accept(struct tcp_sock *m)  		m->client_fd = ret;  		m->state = TCP_SERVER_CONNECTED; -		register_fd(m->client_fd, STATE(fds));  	}  	return m->client_fd;  } | 
