summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPablo Neira Ayuso <pablo@netfilter.org>2009-01-17 17:54:57 +0100
committerPablo Neira Ayuso <pablo@netfilter.org>2009-01-17 17:54:57 +0100
commit786f37040cdcb64b24eb0b437307ed5e208f717f (patch)
tree4441b2aa4e376c30e30a416f87e06597d87516c0
parent4ec9fc2bcceb4e609c43af1a2ecf8d1d87b55d5c (diff)
downloadconntrack-tools-786f37040cdcb64b24eb0b437307ed5e208f717f.tar.gz
conntrack-tools-786f37040cdcb64b24eb0b437307ed5e208f717f.zip
sync: add generic tx_queue for all synchronization modes
This patch adds a generic tx queue for all synchronization modes. Signed-off-by: Pablo Neira Ayuso <pablo@netfilter.org>
-rw-r--r--include/conntrackd.h1
-rw-r--r--include/sync.h3
-rw-r--r--src/sync-ftfw.c37
-rw-r--r--src/sync-mode.c16
-rw-r--r--src/sync-notrack.c37
5 files changed, 30 insertions, 64 deletions
diff --git a/include/conntrackd.h b/include/conntrackd.h
index 8cb520d..3637e2c 100644
--- a/include/conntrackd.h
+++ b/include/conntrackd.h
@@ -150,6 +150,7 @@ struct ct_sync_state {
struct mcast_sock *mcast_server; /* multicast socket: incoming */
struct mcast_sock *mcast_client; /* multicast socket: outgoing */
+ struct queue *tx_queue;
struct sync_mode *sync; /* sync mode */
diff --git a/include/sync.h b/include/sync.h
index 9a9540c..bced1cc 100644
--- a/include/sync.h
+++ b/include/sync.h
@@ -18,8 +18,7 @@ struct sync_mode {
int (*local)(int fd, int type, void *data);
int (*recv)(const struct nethdr *net);
void (*send)(struct nethdr *net, struct cache_object *obj);
- void (*run)(fd_set *readfds);
- int (*register_fds)(struct fds *fds);
+ void (*xmit)(void);
};
extern struct sync_mode sync_alarm;
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c
index 565a4bc..d544a7b 100644
--- a/src/sync-ftfw.c
+++ b/src/sync-ftfw.c
@@ -34,7 +34,6 @@
#define dp(...)
#endif
-struct queue *tx_queue;
struct queue *rs_queue;
static uint32_t exp_seq;
static uint32_t window;
@@ -108,7 +107,7 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
ack->from = from;
ack->to = to;
- queue_add(tx_queue, &qobj->qnode);
+ queue_add(STATE_SYNC(tx_queue), &qobj->qnode);
}
static void tx_queue_add_ctlmsg2(uint32_t flags)
@@ -124,7 +123,7 @@ static void tx_queue_add_ctlmsg2(uint32_t flags)
ctl->type = NET_T_CTL;
ctl->flags = flags;
- queue_add(tx_queue, &qobj->qnode);
+ queue_add(STATE_SYNC(tx_queue), &qobj->qnode);
}
/* this function is called from the alarm framework */
@@ -144,11 +143,6 @@ static void do_alive_alarm(struct alarm_block *a, void *data)
static int ftfw_init(void)
{
- tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD);
- if (tx_queue == NULL) {
- dlog(LOG_ERR, "cannot create tx queue");
- return -1;
- }
rs_queue = queue_create(INT_MAX, 0);
if (rs_queue == NULL) {
dlog(LOG_ERR, "cannot create rs queue");
@@ -167,7 +161,6 @@ static int ftfw_init(void)
static void ftfw_kill(void)
{
queue_destroy(rs_queue);
- queue_destroy(tx_queue);
}
static int do_cache_to_tx(void *data1, void *data2)
@@ -178,7 +171,7 @@ static int do_cache_to_tx(void *data1, void *data2)
if (queue_in(rs_queue, &cn->qnode))
queue_del(&cn->qnode);
- queue_add(tx_queue, &cn->qnode);
+ queue_add(STATE_SYNC(tx_queue), &cn->qnode);
return 0;
}
@@ -259,7 +252,7 @@ static int rs_queue_to_tx(struct queue_node *n, const void *data)
net->seq, net->flags, net->len);
queue_del(n);
- queue_add(tx_queue, n);
+ queue_add(STATE_SYNC(tx_queue), n);
break;
}
case Q_ELEM_OBJ: {
@@ -274,7 +267,7 @@ static int rs_queue_to_tx(struct queue_node *n, const void *data)
dp("resending nack'ed (oldseq=%u)\n", cn->seq);
queue_del(n);
- queue_add(tx_queue, n);
+ queue_add(STATE_SYNC(tx_queue), n);
break;
}
}
@@ -526,19 +519,12 @@ static int tx_queue_xmit(struct queue_node *n, const void *data)
return 0;
}
-static void ftfw_run(fd_set *readfds)
+static void ftfw_xmit(void)
{
- if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) {
- queue_iterate(tx_queue, NULL, tx_queue_xmit);
- add_alarm(&alive_alarm, 1, 0);
- dp("tx_queue_len:%u rs_queue_len:%u\n",
- queue_len(tx_queue), queue_len(rs_queue));
- }
-}
-
-static int ftfw_register_fds(struct fds *fds)
-{
- return register_fd(queue_get_eventfd(tx_queue), fds);
+ queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit);
+ add_alarm(&alive_alarm, ALIVE_INT, 0);
+ dp("tx_queue_len:%u rs_queue_len:%u\n",
+ queue_len(tx_queue), queue_len(rs_queue));
}
struct sync_mode sync_ftfw = {
@@ -550,6 +536,5 @@ struct sync_mode sync_ftfw = {
.local = ftfw_local,
.recv = ftfw_recv,
.send = ftfw_send,
- .run = ftfw_run,
- .register_fds = ftfw_register_fds,
+ .xmit = ftfw_xmit,
};
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 711f71b..5ae9062 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -26,6 +26,7 @@
#include "fds.h"
#include "event.h"
#include "debug.h"
+#include "queue.h"
#include <errno.h>
#include <unistd.h>
@@ -242,6 +243,12 @@ static int init_sync(void)
return -1;
}
+ STATE_SYNC(tx_queue) = queue_create(INT_MAX, QUEUE_F_EVFD);
+ if (STATE_SYNC(tx_queue) == NULL) {
+ dlog(LOG_ERR, "cannot create tx queue");
+ return -1;
+ }
+
/* initialization of multicast sequence generation */
STATE_SYNC(last_seq_sent) = time(NULL);
@@ -253,8 +260,8 @@ static int register_fds_sync(struct fds *fds)
if (register_fd(STATE_SYNC(mcast_server->fd), fds) == -1)
return -1;
- if (STATE_SYNC(sync)->register_fds)
- return STATE_SYNC(sync)->register_fds(fds);
+ if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)), fds) == -1)
+ return -1;
return 0;
}
@@ -265,8 +272,8 @@ static void run_sync(fd_set *readfds)
if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds))
mcast_handler();
- if (STATE_SYNC(sync)->run)
- STATE_SYNC(sync)->run(readfds);
+ if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds))
+ STATE_SYNC(sync)->xmit();
/* flush pending messages */
mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
@@ -281,6 +288,7 @@ static void kill_sync(void)
mcast_client_destroy(STATE_SYNC(mcast_client));
mcast_buffered_destroy();
+ queue_destroy(STATE_SYNC(tx_queue));
if (STATE_SYNC(sync)->kill)
STATE_SYNC(sync)->kill();
diff --git a/src/sync-notrack.c b/src/sync-notrack.c
index 40cc199..4ded298 100644
--- a/src/sync-notrack.c
+++ b/src/sync-notrack.c
@@ -27,8 +27,6 @@
#include <string.h>
-static struct queue *tx_queue;
-
struct cache_notrack {
struct queue_node qnode;
};
@@ -66,30 +64,14 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
ack->from = from;
ack->to = to;
- queue_add(tx_queue, &qobj->qnode);
-}
-
-static int notrack_init(void)
-{
- tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD);
- if (tx_queue == NULL) {
- dlog(LOG_ERR, "cannot create tx queue");
- return -1;
- }
-
- return 0;
-}
-
-static void notrack_kill(void)
-{
- queue_destroy(tx_queue);
+ queue_add(STATE_SYNC(tx_queue), &qobj->qnode);
}
static int do_cache_to_tx(void *data1, void *data2)
{
struct cache_object *obj = data2;
struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj);
- queue_add(tx_queue, &cn->qnode);
+ queue_add(STATE_SYNC(tx_queue), &cn->qnode);
return 0;
}
@@ -176,25 +158,16 @@ static int tx_queue_xmit(struct queue_node *n, const void *data2)
return 0;
}
-static void notrack_run(fd_set *readfds)
-{
- if (FD_ISSET(queue_get_eventfd(tx_queue), readfds))
- queue_iterate(tx_queue, NULL, tx_queue_xmit);
-}
-
-static int notrack_register_fds(struct fds *fds)
+static void notrack_xmit(void)
{
- return register_fd(queue_get_eventfd(tx_queue), fds);
+ queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit);
}
struct sync_mode sync_notrack = {
.internal_cache_flags = LIFETIME,
.external_cache_flags = LIFETIME,
.internal_cache_extra = &cache_notrack_extra,
- .init = notrack_init,
- .kill = notrack_kill,
.local = notrack_local,
.recv = notrack_recv,
- .run = notrack_run,
- .register_fds = notrack_register_fds,
+ .xmit = notrack_xmit,
};