summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPablo Neira Ayuso <pablo@netfilter.org>2012-04-24 10:55:33 +0200
committerPablo Neira Ayuso <pablo@netfilter.org>2012-05-28 12:34:32 +0200
commit1250135046b96f2778bda51517c8a722171a6c16 (patch)
tree84344a92d70fe82b717be1c5344f10eddd295351
parentfcd6f78d277113628205789c8aba9ab1f5152fc4 (diff)
downloadconntrack-tools-1250135046b96f2778bda51517c8a722171a6c16.tar.gz
conntrack-tools-1250135046b96f2778bda51517c8a722171a6c16.zip
conntrackd: generalize file descriptor infrastructure
This patch generalizes the select-based file descriptor infrastructure by allowing you to register file descriptors and its callbacks. Instead of hardcoding the descriptors that needs to be checked. Now, struct fds_item contains a callback and pointer to data that is passed to it: struct fds_item { struct list_head head; int fd; + void (*cb)(void *data); + void *data; }; Then, we check which ones are active in the select_main_step() function: list_for_each_entry(cur, &STATE(fds)->list, head) { if (FD_ISSET(cur->fd, &readfds)) cur->cb(cur->data); } And it invoked the corresponding callback. I had to slightly modify the channel infrastructure to fit it into the changes. This modularity is required for the upcoming cthelper support. Signed-off-by: Pablo Neira Ayuso <pablo@netfilter.org>
-rw-r--r--include/channel.h11
-rw-r--r--include/conntrackd.h3
-rw-r--r--include/fds.h4
-rw-r--r--src/channel.c5
-rw-r--r--src/channel_tcp.c1
-rw-r--r--src/fds.c61
-rw-r--r--src/main.c2
-rw-r--r--src/multichannel.c7
-rw-r--r--src/run.c251
-rw-r--r--src/stats-mode.c1
-rw-r--r--src/sync-mode.c146
-rw-r--r--src/tcp.c1
12 files changed, 249 insertions, 244 deletions
diff --git a/include/channel.h b/include/channel.h
index 9b5fad8..46a354f 100644
--- a/include/channel.h
+++ b/include/channel.h
@@ -35,7 +35,8 @@ struct tcp_channel {
#define CHANNEL_F_BUFFERED (1 << 1)
#define CHANNEL_F_STREAM (1 << 2)
#define CHANNEL_F_ERRORS (1 << 3)
-#define CHANNEL_F_MAX (1 << 4)
+#define CHANNEL_F_ACCEPT (1 << 4)
+#define CHANNEL_F_MAX (1 << 5)
union channel_type_conf {
struct mcast_conf mcast;
@@ -52,8 +53,12 @@ struct channel_conf {
struct nlif_handle;
+#define CHANNEL_T_DATAGRAM 0
+#define CHANNEL_T_STREAM 1
+
struct channel_ops {
int headersiz;
+ int type;
void * (*open)(void *conf);
void (*close)(void *channel);
int (*send)(void *channel, const void *data, int len);
@@ -97,6 +102,8 @@ void channel_stats(struct channel *c, int fd);
void channel_stats_extended(struct channel *c, int active,
struct nlif_handle *h, int fd);
+int channel_type(struct channel *c);
+
#define MULTICHANNEL_MAX 4
struct multichannel {
@@ -119,6 +126,6 @@ void multichannel_stats_extended(struct multichannel *m,
int multichannel_get_ifindex(struct multichannel *m, int i);
int multichannel_get_current_ifindex(struct multichannel *m);
void multichannel_set_current_channel(struct multichannel *m, int i);
-void multichannel_change_current_channel(struct multichannel *m, int i);
+void multichannel_change_current_channel(struct multichannel *m, struct channel *c);
#endif /* _CHANNEL_H_ */
diff --git a/include/conntrackd.h b/include/conntrackd.h
index 9359dfa..0e203e7 100644
--- a/include/conntrackd.h
+++ b/include/conntrackd.h
@@ -264,7 +264,6 @@ extern struct ct_general_state st;
struct ct_mode {
struct internal_handler *internal;
int (*init)(void);
- void (*run)(fd_set *readfds);
int (*local)(int fd, int type, void *data);
void (*kill)(void);
};
@@ -278,7 +277,7 @@ extern struct ct_mode stats_mode;
/* These live in run.c */
void killer(int foo);
int init(void);
-void run(void);
+void select_main_loop(void);
/* from read_config_yy.c */
int
diff --git a/include/fds.h b/include/fds.h
index f3728d7..ed0c8be 100644
--- a/include/fds.h
+++ b/include/fds.h
@@ -12,11 +12,13 @@ struct fds {
struct fds_item {
struct list_head head;
int fd;
+ void (*cb)(void *data);
+ void *data;
};
struct fds *create_fds(void);
void destroy_fds(struct fds *);
-int register_fd(int fd, struct fds *fds);
+int register_fd(int fd, void (*cb)(void *data), void *data, struct fds *fds);
int unregister_fd(int fd, struct fds *fds);
#endif
diff --git a/src/channel.c b/src/channel.c
index 818bb01..8b7c319 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -310,3 +310,8 @@ int channel_accept(struct channel *c)
{
return c->ops->accept(c);
}
+
+int channel_type(struct channel *c)
+{
+ return c->ops->type;
+}
diff --git a/src/channel_tcp.c b/src/channel_tcp.c
index f132840..a84603c 100644
--- a/src/channel_tcp.c
+++ b/src/channel_tcp.c
@@ -137,6 +137,7 @@ channel_tcp_accept(struct channel *c)
struct channel_ops channel_tcp = {
.headersiz = 40, /* IP header (20 bytes) + TCP header 20 (bytes) */
+ .type = CHANNEL_T_STREAM,
.open = channel_tcp_open,
.close = channel_tcp_close,
.send = channel_tcp_send,
diff --git a/src/fds.c b/src/fds.c
index 347eee1..0b95437 100644
--- a/src/fds.c
+++ b/src/fds.c
@@ -1,5 +1,5 @@
/*
- * (C) 2006-2008 by Pablo Neira Ayuso <pablo@netfilter.org>
+ * (C) 2006-2012 by Pablo Neira Ayuso <pablo@netfilter.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -14,9 +14,16 @@
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * Part of this code has been sponsored by Vyatta Inc. <http://www.vyatta.com>
*/
#include <stdlib.h>
#include <string.h>
+#include <errno.h>
+#include <signal.h>
+
+#include "conntrackd.h"
+#include "date.h"
#include "fds.h"
struct fds *create_fds(void)
@@ -44,7 +51,7 @@ void destroy_fds(struct fds *fds)
free(fds);
}
-int register_fd(int fd, struct fds *fds)
+int register_fd(int fd, void (*cb)(void *data), void *data, struct fds *fds)
{
struct fds_item *item;
@@ -58,7 +65,10 @@ int register_fd(int fd, struct fds *fds)
return -1;
item->fd = fd;
- list_add(&item->head, &fds->list);
+ item->cb = cb;
+ item->data = data;
+ /* Order matters: the descriptors are served in FIFO basis. */
+ list_add_tail(&item->head, &fds->list);
return 0;
}
@@ -92,3 +102,48 @@ int unregister_fd(int fd, struct fds *fds)
return 0;
}
+static void select_main_step(struct timeval *next_alarm)
+{
+ int ret;
+ fd_set readfds = STATE(fds)->readfds;
+ struct fds_item *cur, *tmp;
+
+ ret = select(STATE(fds)->maxfd + 1, &readfds, NULL, NULL, next_alarm);
+ if (ret == -1) {
+ /* interrupted syscall, retry */
+ if (errno == EINTR)
+ return;
+
+ STATE(stats).select_failed++;
+ return;
+ }
+
+ /* signals are racy */
+ sigprocmask(SIG_BLOCK, &STATE(block), NULL);
+
+ list_for_each_entry_safe(cur, tmp, &STATE(fds)->list, head) {
+ if (FD_ISSET(cur->fd, &readfds))
+ cur->cb(cur->data);
+ }
+
+ sigprocmask(SIG_UNBLOCK, &STATE(block), NULL);
+}
+
+void __attribute__((noreturn)) select_main_loop(void)
+{
+ struct timeval next_alarm;
+ struct timeval *next = NULL;
+
+ while(1) {
+ do_gettimeofday();
+
+ sigprocmask(SIG_BLOCK, &STATE(block), NULL);
+ if (next != NULL && !timerisset(next))
+ next = do_alarm_run(&next_alarm);
+ else
+ next = get_next_alarm_run(&next_alarm);
+ sigprocmask(SIG_UNBLOCK, &STATE(block), NULL);
+
+ select_main_step(next);
+ }
+}
diff --git a/src/main.c b/src/main.c
index f7803fd..26f6c14 100644
--- a/src/main.c
+++ b/src/main.c
@@ -406,6 +406,6 @@ int main(int argc, char *argv[])
/*
* run main process
*/
- run();
+ select_main_loop();
return 0;
}
diff --git a/src/multichannel.c b/src/multichannel.c
index de69d5c..952b567 100644
--- a/src/multichannel.c
+++ b/src/multichannel.c
@@ -109,8 +109,9 @@ void multichannel_set_current_channel(struct multichannel *m, int i)
m->current = m->channel[i];
}
-void multichannel_change_current_channel(struct multichannel *m, int i)
+void
+multichannel_change_current_channel(struct multichannel *m, struct channel *c)
{
- if (m->current != m->channel[i])
- m->current = m->channel[i];
+ if (m->current != c)
+ m->current = c;
}
diff --git a/src/run.c b/src/run.c
index 26c1783..171dee5 100644
--- a/src/run.c
+++ b/src/run.c
@@ -1,5 +1,5 @@
/*
- * (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org>
+ * (C) 2006-2012 by Pablo Neira Ayuso <pablo@netfilter.org>
* (C) 2011 by Vyatta Inc. <http://www.vyatta.com>
*
* This program is free software; you can redistribute it and/or modify
@@ -16,7 +16,7 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*
- * Description: run and init functions
+ * Part of this code has been sponsored by Vyatta Inc. <http://www.vyatta.com>
*/
#include "conntrackd.h"
@@ -460,6 +460,87 @@ static int exp_get_handler(enum nf_conntrack_msg_type type,
return NFCT_CB_CONTINUE;
}
+/* order received via UNIX socket */
+static void local_cb(void *data)
+{
+ do_local_server_step(&STATE(local), NULL, local_handler);
+}
+
+/* we have received an event from ctnetlink */
+static void event_cb(void *data)
+{
+ int ret;
+
+ ret = nfct_catch(STATE(event));
+ /* reset event iteration limit counter */
+ STATE(event_iterations_limit) = CONFIG(event_iterations_limit);
+ if (ret == -1) {
+ switch(errno) {
+ case ENOBUFS:
+ /* We have hit ENOBUFS, it's likely that we are
+ * losing events. Two possible situations may
+ * trigger this error:
+ *
+ * 1) The netlink receiver buffer is too small:
+ * increasing the netlink buffer size should
+ * be enough. However, some event messages
+ * got lost. We have to resync ourselves
+ * with the kernel table conntrack table to
+ * resolve the inconsistency.
+ *
+ * 2) The receiver is too slow to process the
+ * netlink messages so that the queue gets
+ * full quickly. This generally happens
+ * if the system is under heavy workload
+ * (busy CPU). In this case, increasing the
+ * size of the netlink receiver buffer
+ * would not help anymore since we would
+ * be delaying the overrun. Moreover, we
+ * should avoid resynchronizations. We
+ * should do our best here and keep
+ * replicating as much states as possible.
+ * If workload lowers at some point,
+ * we resync ourselves.
+ */
+ nl_resize_socket_buffer(STATE(event));
+ if (CONFIG(nl_overrun_resync) > 0 &&
+ STATE(mode)->internal->flags & INTERNAL_F_RESYNC) {
+ add_alarm(&STATE(resync_alarm),
+ CONFIG(nl_overrun_resync),0);
+ }
+ STATE(stats).nl_catch_event_failed++;
+ STATE(stats).nl_overrun++;
+ break;
+ case ENOENT:
+ /*
+ * We received a message from another
+ * netfilter subsystem that we are not
+ * interested in. Just ignore it.
+ */
+ break;
+ case EAGAIN:
+ /* No more events to receive, try later. */
+ break;
+ default:
+ STATE(stats).nl_catch_event_failed++;
+ break;
+ }
+ }
+}
+
+/* we previously requested a resync due to buffer overrun. */
+static void resync_cb(void *data)
+{
+ nfct_catch(STATE(resync));
+ if (STATE(mode)->internal->ct.purge)
+ STATE(mode)->internal->ct.purge();
+}
+
+static void poll_cb(void *data)
+{
+ nfct_catch(STATE(resync));
+}
+
int
init(void)
{
@@ -493,7 +574,7 @@ init(void)
dlog(LOG_ERR, "can't open unix socket!");
return -1;
}
- register_fd(STATE(local).fd, STATE(fds));
+ register_fd(STATE(local).fd, local_cb, NULL, STATE(fds));
/* resynchronize (like 'dump' socket) but it also purges old entries */
STATE(resync) = nfct_open(CONFIG(netlink).subsys_id, 0);
@@ -507,7 +588,13 @@ init(void)
NFCT_T_ALL,
STATE(mode)->internal->ct.resync,
NULL);
- register_fd(nfct_fd(STATE(resync)), STATE(fds));
+ if (CONFIG(flags) & CTD_POLL) {
+ register_fd(nfct_fd(STATE(resync)), poll_cb,
+ NULL, STATE(fds));
+ } else {
+ register_fd(nfct_fd(STATE(resync)), resync_cb,
+ NULL, STATE(fds));
+ }
fcntl(nfct_fd(STATE(resync)), F_SETFL, O_NONBLOCK);
if (STATE(mode)->internal->flags & INTERNAL_F_POPULATE) {
@@ -590,7 +677,7 @@ init(void)
nfexp_callback_register2(STATE(event), NFCT_T_ALL,
exp_event_handler, NULL);
}
- register_fd(nfct_fd(STATE(event)), STATE(fds));
+ register_fd(nfct_fd(STATE(event)), event_cb, NULL, STATE(fds));
}
/* Signals handling */
@@ -618,157 +705,3 @@ init(void)
return 0;
}
-
-static void run_events(struct timeval *next_alarm)
-{
- int ret;
- fd_set readfds = STATE(fds)->readfds;
-
- ret = select(STATE(fds)->maxfd + 1, &readfds, NULL, NULL, next_alarm);
- if (ret == -1) {
- /* interrupted syscall, retry */
- if (errno == EINTR)
- return;
-
- STATE(stats).select_failed++;
- return;
- }
-
- /* signals are racy */
- sigprocmask(SIG_BLOCK, &STATE(block), NULL);
-
- /* order received via UNIX socket */
- if (FD_ISSET(STATE(local).fd, &readfds))
- do_local_server_step(&STATE(local), NULL, local_handler);
-
- /* we have receive an event from ctnetlink */
- if (FD_ISSET(nfct_fd(STATE(event)), &readfds)) {
- ret = nfct_catch(STATE(event));
- /* reset event iteration limit counter */
- STATE(event_iterations_limit) = CONFIG(event_iterations_limit);
- if (ret == -1) {
- switch(errno) {
- case ENOBUFS:
- /* We have hit ENOBUFS, it's likely that we are
- * losing events. Two possible situations may
- * trigger this error:
- *
- * 1) The netlink receiver buffer is too small:
- * increasing the netlink buffer size should
- * be enough. However, some event messages
- * got lost. We have to resync ourselves
- * with the kernel table conntrack table to
- * resolve the inconsistency.
- *
- * 2) The receiver is too slow to process the
- * netlink messages so that the queue gets
- * full quickly. This generally happens
- * if the system is under heavy workload
- * (busy CPU). In this case, increasing the
- * size of the netlink receiver buffer
- * would not help anymore since we would
- * be delaying the overrun. Moreover, we
- * should avoid resynchronizations. We
- * should do our best here and keep
- * replicating as much states as possible.
- * If workload lowers at some point,
- * we resync ourselves.
- */
- nl_resize_socket_buffer(STATE(event));
- if (CONFIG(nl_overrun_resync) > 0 &&
- STATE(mode)->internal->flags & INTERNAL_F_RESYNC) {
- add_alarm(&STATE(resync_alarm),
- CONFIG(nl_overrun_resync),0);
- }
- STATE(stats).nl_catch_event_failed++;
- STATE(stats).nl_overrun++;
- break;
- case ENOENT:
- /*
- * We received a message from another
- * netfilter subsystem that we are not
- * interested in. Just ignore it.
- */
- break;
- case EAGAIN:
- /* No more events to receive, try later. */
- break;
- default:
- STATE(stats).nl_catch_event_failed++;
- break;
- }
- }
- }
- /* we previously requested a resync due to buffer overrun. */
- if (FD_ISSET(nfct_fd(STATE(resync)), &readfds)) {
- nfct_catch(STATE(resync));
- if (STATE(mode)->internal->ct.purge)
- STATE(mode)->internal->ct.purge();
- }
-
- if (STATE(mode)->run)
- STATE(mode)->run(&readfds);
-
- sigprocmask(SIG_UNBLOCK, &STATE(block), NULL);
-}
-
-static void run_polling(struct timeval *next_alarm)
-{
- int ret;
- fd_set readfds = STATE(fds)->readfds;
-
- ret = select(STATE(fds)->maxfd + 1, &readfds, NULL, NULL, next_alarm);
- if (ret == -1) {
- /* interrupted syscall, retry */
- if (errno == EINTR)
- return;
-
- STATE(stats).select_failed++;
- return;
- }
-
- /* signals are racy */
- sigprocmask(SIG_BLOCK, &STATE(block), NULL);
-
- /* order received via UNIX socket */
- if (FD_ISSET(STATE(local).fd, &readfds))
- do_local_server_step(&STATE(local), NULL, local_handler);
-
- /* we requested a dump from the kernel via polling_alarm */
- if (FD_ISSET(nfct_fd(STATE(resync)), &readfds))
- nfct_catch(STATE(resync));
-
- if (STATE(mode)->run)
- STATE(mode)->run(&readfds);
-
- sigprocmask(SIG_UNBLOCK, &STATE(block), NULL);
-}
-
-static void __attribute__((noreturn))
-do_run(void (*run_step)(struct timeval *next_alarm))
-{
- struct timeval next_alarm;
- struct timeval *next = NULL;
-
- while(1) {
- do_gettimeofday();
-
- sigprocmask(SIG_BLOCK, &STATE(block), NULL);
- if (next != NULL && !timerisset(next))
- next = do_alarm_run(&next_alarm);
- else
- next = get_next_alarm_run(&next_alarm);
- sigprocmask(SIG_UNBLOCK, &STATE(block), NULL);
-
- run_step(next);
- }
-}
-
-void run(void)
-{
- if (CONFIG(flags) & CTD_POLL) {
- do_run(run_polling);
- } else {
- do_run(run_events);
- }
-}
diff --git a/src/stats-mode.c b/src/stats-mode.c
index b768033..6b7f08d 100644
--- a/src/stats-mode.c
+++ b/src/stats-mode.c
@@ -201,7 +201,6 @@ static struct internal_handler internal_cache_stats = {
struct ct_mode stats_mode = {
.init = init_stats,
- .run = NULL,
.local = local_handler_stats,
.kill = kill_stats,
.internal = &internal_cache_stats,
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 10fdb9e..71c320c 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -1,5 +1,5 @@
/*
- * (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org>
+ * (C) 2006-2012 by Pablo Neira Ayuso <pablo@netfilter.org>
* (C) 2011 by Vyatta Inc. <http://www.vyatta.com>
*
* This program is free software; you can redistribute it and/or modify
@@ -78,7 +78,7 @@ static struct nf_expect *msg2exp_alloc(struct nethdr *net, size_t remain)
}
static void
-do_channel_handler_step(int i, struct nethdr *net, size_t remain)
+do_channel_handler_step(struct channel *c, struct nethdr *net, size_t remain)
{
struct nf_conntrack *ct = NULL;
struct nf_expect *exp = NULL;
@@ -91,10 +91,10 @@ do_channel_handler_step(int i, struct nethdr *net, size_t remain)
switch (STATE_SYNC(sync)->recv(net)) {
case MSG_DATA:
- multichannel_change_current_channel(STATE_SYNC(channel), i);
+ multichannel_change_current_channel(STATE_SYNC(channel), c);
break;
case MSG_CTL:
- multichannel_change_current_channel(STATE_SYNC(channel), i);
+ multichannel_change_current_channel(STATE_SYNC(channel), c);
return;
case MSG_BAD:
STATE_SYNC(error).msg_rcv_malformed++;
@@ -175,7 +175,7 @@ static int channel_stream(struct channel *m, const char *ptr, ssize_t remain)
}
/* handler for messages received */
-static int channel_handler_routine(struct channel *m, int i)
+static int channel_handler_routine(struct channel *m)
{
ssize_t numbytes;
ssize_t remain, pending = cur - __net;
@@ -242,7 +242,7 @@ static int channel_handler_routine(struct channel *m, int i)
HDR_NETWORK2HOST(net);
- do_channel_handler_step(i, net, remain);
+ do_channel_handler_step(m, net, remain);
ptr += net->len;
remain -= net->len;
}
@@ -250,12 +250,13 @@ static int channel_handler_routine(struct channel *m, int i)
}
/* handler for messages received */
-static void channel_handler(struct channel *m, int i)
+static void channel_handler(void *data)
{
+ struct channel *c = data;
int k;
for (k=0; k<CONFIG(event_iterations_limit); k++) {
- if (channel_handler_routine(m, i) == -1) {
+ if (channel_handler_routine(c) == -1) {
break;
}
}
@@ -284,7 +285,7 @@ static void interface_candidate(void)
dlog(LOG_ERR, "no dedicated links available!");
}
-static void interface_handler(void)
+static void interface_handler(void *data)
{
int idx = multichannel_get_current_ifindex(STATE_SYNC(channel));
unsigned int flags;
@@ -311,6 +312,53 @@ static void do_reset_cache_alarm(struct alarm_block *a, void *data)
STATE(mode)->internal->ct.flush();
}
+static void commit_cb(void *data)
+{
+ int ret;
+
+ read_evfd(STATE_SYNC(commit).evfd);
+
+ ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0);
+ if (ret == 0) {
+ /* we still have things in the callback queue. */
+ if (STATE_SYNC(commit).rq[1].cb) {
+ int fd = STATE_SYNC(commit).clientfd;
+
+ STATE_SYNC(commit).rq[0].cb =
+ STATE_SYNC(commit).rq[1].cb;
+
+ STATE_SYNC(commit).rq[1].cb = NULL;
+
+ STATE_SYNC(commit).clientfd = -1;
+ STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, fd);
+ } else {
+ /* Close the client socket now, we're done. */
+ close(STATE_SYNC(commit).clientfd);
+ STATE_SYNC(commit).clientfd = -1;
+ }
+ }
+}
+
+static void channel_accept_cb(void *data)
+{
+ struct channel *c = data;
+ int fd;
+
+ fd = channel_accept(data);
+ if (fd < 0)
+ return;
+
+ register_fd(fd, channel_handler, c, STATE(fds));
+}
+
+static void tx_queue_cb(void *data)
+{
+ STATE_SYNC(sync)->xmit();
+
+ /* flush pending messages */
+ multichannel_send_flush(STATE_SYNC(channel));
+}
+
static int init_sync(void)
{
int i;
@@ -370,8 +418,19 @@ static int init_sync(void)
for (i=0; i<STATE_SYNC(channel)->channel_num; i++) {
int fd = channel_get_fd(STATE_SYNC(channel)->channel[i]);
fcntl(fd, F_SETFL, O_NONBLOCK);
- if (register_fd(fd, STATE(fds)) == -1)
- return -1;
+
+ switch(channel_type(STATE_SYNC(channel)->channel[i])) {
+ case CHANNEL_T_STREAM:
+ register_fd(fd, channel_accept_cb,
+ STATE_SYNC(channel)->channel[i],
+ STATE(fds));
+ break;
+ case CHANNEL_T_DATAGRAM:
+ register_fd(fd, channel_handler,
+ STATE_SYNC(channel)->channel[i],
+ STATE(fds));
+ break;
+ }
}
STATE_SYNC(interface) = nl_init_interface_handler();
@@ -379,7 +438,8 @@ static int init_sync(void)
dlog(LOG_ERR, "can't open interface watcher");
return -1;
}
- if (register_fd(nlif_fd(STATE_SYNC(interface)), STATE(fds)) == -1)
+ if (register_fd(nlif_fd(STATE_SYNC(interface)),
+ interface_handler, NULL, STATE(fds)) == -1)
return -1;
STATE_SYNC(tx_queue) = queue_create("txqueue", INT_MAX, QUEUE_F_EVFD);
@@ -387,8 +447,8 @@ static int init_sync(void)
dlog(LOG_ERR, "cannot create tx queue");
return -1;
}
- if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)),
- STATE(fds)) == -1)
+ if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)),
+ tx_queue_cb, NULL, STATE(fds)) == -1)
return -1;
STATE_SYNC(commit).h = nfct_open(CONFIG(netlink).subsys_id, 0);
@@ -404,7 +464,7 @@ static int init_sync(void)
return -1;
}
if (register_fd(get_read_evfd(STATE_SYNC(commit).evfd),
- STATE(fds)) == -1) {
+ commit_cb, NULL, STATE(fds)) == -1) {
return -1;
}
STATE_SYNC(commit).clientfd = -1;
@@ -417,61 +477,6 @@ static int init_sync(void)
return 0;
}
-static void channel_check(struct channel *c, int i, fd_set *readfds)
-{
- /* In case that this channel is connection-oriented. */
- if (channel_accept_isset(c, readfds))
- channel_accept(c);
-
- /* For data handling. */
- if (channel_isset(c, readfds))
- channel_handler(c, i);
-}
-
-static void run_sync(fd_set *readfds)
-{
- int i;
-
- for (i=0; i<STATE_SYNC(channel)->channel_num; i++)
- channel_check(STATE_SYNC(channel)->channel[i], i, readfds);
-
- if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds))
- STATE_SYNC(sync)->xmit();
-
- if (FD_ISSET(nlif_fd(STATE_SYNC(interface)), readfds))
- interface_handler();
-
- if (FD_ISSET(get_read_evfd(STATE_SYNC(commit).evfd), readfds)) {
- int ret;
-
- read_evfd(STATE_SYNC(commit).evfd);
-
- ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0);
- if (ret == 0) {
- /* we still have things in the callback queue. */
- if (STATE_SYNC(commit).rq[1].cb) {
- int fd = STATE_SYNC(commit).clientfd;
-
- STATE_SYNC(commit).rq[0].cb =
- STATE_SYNC(commit).rq[1].cb;
-
- STATE_SYNC(commit).rq[1].cb = NULL;
-
- STATE_SYNC(commit).clientfd = -1;
- STATE_SYNC(commit).rq[0].cb(
- STATE_SYNC(commit).h, fd);
- } else {
- /* Close the client socket now, we're done. */
- close(STATE_SYNC(commit).clientfd);
- STATE_SYNC(commit).clientfd = -1;
- }
- }
- }
-
- /* flush pending messages */
- multichannel_send_flush(STATE_SYNC(channel));
-}
-
static void kill_sync(void)
{
STATE(mode)->internal->close();
@@ -729,7 +734,6 @@ static int local_handler_sync(int fd, int type, void *data)
struct ct_mode sync_mode = {
.init = init_sync,
- .run = run_sync,
.local = local_handler_sync,
.kill = kill_sync,
/* the internal handler is set in run-time. */
diff --git a/src/tcp.c b/src/tcp.c
index f6b05ef..af27c46 100644
--- a/src/tcp.c
+++ b/src/tcp.c
@@ -264,7 +264,6 @@ int tcp_accept(struct tcp_sock *m)
m->client_fd = ret;
m->state = TCP_SERVER_CONNECTED;
- register_fd(m->client_fd, STATE(fds));
}
return m->client_fd;
}