summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPablo Neira Ayuso <pablo@netfilter.org>2009-01-17 18:03:50 +0100
committerPablo Neira Ayuso <pablo@netfilter.org>2009-01-17 18:03:50 +0100
commitb1d00262f999a597fa24af3298195db9cf52b790 (patch)
tree1d4219c35ff62fa663339e0c6a514f954f60f8b4
parent786f37040cdcb64b24eb0b437307ed5e208f717f (diff)
downloadconntrack-tools-b1d00262f999a597fa24af3298195db9cf52b790.tar.gz
conntrack-tools-b1d00262f999a597fa24af3298195db9cf52b790.zip
sync: enqueue state updates to tx_queue
With this patch, all the states updates are enqueued in the tx_queue. Thus, there's a single output path. This patch adds a simple refcounting mechanism to note when an object is sitting in the txqueue. This patch also removes the alarm that is required by the ftfw approach. Signed-off-by: Pablo Neira Ayuso <pablo@netfilter.org>
-rw-r--r--include/cache.h7
-rw-r--r--include/sync.h2
-rw-r--r--src/cache.c59
-rw-r--r--src/sync-alarm.c75
-rw-r--r--src/sync-ftfw.c70
-rw-r--r--src/sync-mode.c23
-rw-r--r--src/sync-notrack.c12
7 files changed, 152 insertions, 96 deletions
diff --git a/include/cache.h b/include/cache.h
index fd8e05f..03b6822 100644
--- a/include/cache.h
+++ b/include/cache.h
@@ -4,7 +4,6 @@
#include <stdint.h>
#include <stddef.h>
#include "hash.h"
-#include "alarm.h"
/* cache features */
enum {
@@ -36,7 +35,7 @@ struct cache_object {
struct nf_conntrack *ct;
struct cache *cache;
int status;
- struct alarm_block alarm;
+ int refcnt;
char data[0];
};
@@ -106,12 +105,14 @@ void cache_destroy(struct cache *e);
struct cache_object *cache_object_new(struct cache *c, struct nf_conntrack *ct);
void cache_object_free(struct cache_object *obj);
+void cache_object_get(struct cache_object *obj);
+int cache_object_put(struct cache_object *obj);
+void cache_object_set_status(struct cache_object *obj, int status);
int cache_add(struct cache *c, struct cache_object *obj, int id);
void cache_update(struct cache *c, struct cache_object *obj, int id, struct nf_conntrack *ct);
struct cache_object *cache_update_force(struct cache *c, struct nf_conntrack *ct);
void cache_del(struct cache *c, struct cache_object *obj);
-int cache_del_timer(struct cache *c, struct cache_object *obj, int timeout);
struct cache_object *cache_find(struct cache *c, struct nf_conntrack *ct, int *pos);
void cache_stats(const struct cache *c, int fd);
void cache_stats_extended(const struct cache *c, int fd);
diff --git a/include/sync.h b/include/sync.h
index bced1cc..51f8f5b 100644
--- a/include/sync.h
+++ b/include/sync.h
@@ -17,7 +17,7 @@ struct sync_mode {
void (*kill)(void);
int (*local)(int fd, int type, void *data);
int (*recv)(const struct nethdr *net);
- void (*send)(struct nethdr *net, struct cache_object *obj);
+ void (*enqueue)(struct cache_object *obj, int type);
void (*xmit)(void);
};
diff --git a/src/cache.c b/src/cache.c
index c46498b..1e08a33 100644
--- a/src/cache.c
+++ b/src/cache.c
@@ -174,8 +174,6 @@ void cache_destroy(struct cache *c)
free(c);
}
-static void __del_timeout(struct alarm_block *a, void *data);
-
struct cache_object *cache_object_new(struct cache *c, struct nf_conntrack *ct)
{
struct cache_object *obj;
@@ -187,7 +185,6 @@ struct cache_object *cache_object_new(struct cache *c, struct nf_conntrack *ct)
return NULL;
}
obj->cache = c;
- init_alarm(&obj->alarm, obj, __del_timeout);
if ((obj->ct = nfct_new()) == NULL) {
free(obj);
@@ -207,6 +204,30 @@ void cache_object_free(struct cache_object *obj)
free(obj);
}
+int cache_object_put(struct cache_object *obj)
+{
+ if (--obj->refcnt == 0) {
+ cache_del(obj->cache, obj);
+ cache_object_free(obj);
+ return 1;
+ }
+ return 0;
+}
+
+void cache_object_get(struct cache_object *obj)
+{
+ obj->refcnt++;
+}
+
+void cache_object_set_status(struct cache_object *obj, int status)
+{
+ if (status == C_OBJ_DEAD) {
+ obj->cache->stats.del_ok++;
+ obj->cache->stats.active--;
+ }
+ obj->status = status;
+}
+
static int __add(struct cache *c, struct cache_object *obj, int id)
{
int ret;
@@ -227,6 +248,7 @@ static int __add(struct cache *c, struct cache_object *obj, int id)
c->stats.active++;
obj->status = C_OBJ_NEW;
+ obj->refcnt++;
return 0;
}
@@ -292,7 +314,6 @@ void cache_del(struct cache *c, struct cache_object *obj)
c->stats.del_ok++;
c->stats.active--;
}
- del_alarm(&obj->alarm);
__del(c, obj);
}
@@ -322,36 +343,6 @@ cache_update_force(struct cache *c, struct nf_conntrack *ct)
return obj;
}
-static void __del_timeout(struct alarm_block *a, void *data)
-{
- struct cache_object *obj = (struct cache_object *) data;
- __del(obj->cache, obj);
- cache_object_free(obj);
-}
-
-int cache_del_timer(struct cache *c, struct cache_object *obj, int timeout)
-{
- if (timeout <= 0) {
- cache_del(c, obj);
- cache_object_free(obj);
- return 1;
- }
- if (obj->status != C_OBJ_DEAD) {
- obj->status = C_OBJ_DEAD;
- add_alarm(&obj->alarm, timeout, 0);
- /*
- * increase stats even if this entry was not really
- * removed yet. We do not want to make people think
- * that the replication protocol does not work
- * properly.
- */
- c->stats.del_ok++;
- c->stats.active--;
- return 1;
- }
- return 0;
-}
-
struct cache_object *
cache_find(struct cache *c, struct nf_conntrack *ct, int *id)
{
diff --git a/src/sync-alarm.c b/src/sync-alarm.c
index 34937fe..a2f17ac 100644
--- a/src/sync-alarm.c
+++ b/src/sync-alarm.c
@@ -21,14 +21,21 @@
#include "network.h"
#include "alarm.h"
#include "cache.h"
+#include "queue.h"
#include "debug.h"
#include <stdlib.h>
#include <string.h>
+struct cache_alarm {
+ struct queue_node qnode;
+ struct alarm_block alarm;
+};
+
+static void alarm_enqueue(struct cache_object *obj, int query);
+
static void refresher(struct alarm_block *a, void *data)
{
- struct nethdr *net;
struct cache_object *obj = data;
debug_ct(obj->ct, "persistence update");
@@ -37,36 +44,37 @@ static void refresher(struct alarm_block *a, void *data)
random() % CONFIG(refresh) + 1,
((random() % 5 + 1) * 200000) - 1);
- net = BUILD_NETMSG(obj->ct, NET_T_STATE_UPD);
- mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ alarm_enqueue(obj, NET_T_STATE_UPD);
}
static void cache_alarm_add(struct cache_object *obj, void *data)
{
- struct alarm_block *a = data;
+ struct cache_alarm *ca = data;
- init_alarm(a, obj, refresher);
- add_alarm(a,
+ queue_node_init(&ca->qnode, Q_ELEM_OBJ);
+ init_alarm(&ca->alarm, obj, refresher);
+ add_alarm(&ca->alarm,
random() % CONFIG(refresh) + 1,
((random() % 5 + 1) * 200000) - 1);
}
static void cache_alarm_update(struct cache_object *obj, void *data)
{
- struct alarm_block *a = data;
- add_alarm(a,
+ struct cache_alarm *ca = data;
+ add_alarm(&ca->alarm,
random() % CONFIG(refresh) + 1,
((random() % 5 + 1) * 200000) - 1);
}
static void cache_alarm_destroy(struct cache_object *obj, void *data)
{
- struct alarm_block *a = data;
- del_alarm(a);
+ struct cache_alarm *ca = data;
+ queue_del(&ca->qnode);
+ del_alarm(&ca->alarm);
}
static struct cache_extra cache_alarm_extra = {
- .size = sizeof(struct alarm_block),
+ .size = sizeof(struct cache_alarm),
.add = cache_alarm_add,
.update = cache_alarm_update,
.destroy = cache_alarm_destroy
@@ -102,9 +110,54 @@ static int alarm_recv(const struct nethdr *net)
return 0;
}
+static void alarm_enqueue(struct cache_object *obj, int query)
+{
+ struct cache_alarm *ca = cache_get_extra(STATE_SYNC(internal), obj);
+ if (queue_add(STATE_SYNC(tx_queue), &ca->qnode))
+ cache_object_get(obj);
+}
+
+static int tx_queue_xmit(struct queue_node *n, const void *data)
+{
+ struct nethdr *net;
+
+ queue_del(n);
+
+ switch(n->type) {
+ case Q_ELEM_CTL:
+ net = queue_node_data(n);
+ nethdr_set_ctl(net);
+ HDR_HOST2NETWORK(net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ queue_object_free((struct queue_object *)n);
+ break;
+ case Q_ELEM_OBJ: {
+ struct cache_alarm *ca;
+ struct cache_object *obj;
+ int type;
+
+ ca = (struct cache_alarm *)n;
+ obj = cache_data_get_object(STATE_SYNC(internal), ca);
+ type = object_status_to_network_type(obj->status);
+ net = BUILD_NETMSG(obj->ct, type);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ cache_object_put(obj);
+ break;
+ }
+ }
+ return 0;
+}
+
+static void alarm_xmit(void)
+{
+ queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit);
+}
+
struct sync_mode sync_alarm = {
.internal_cache_flags = LIFETIME,
.external_cache_flags = TIMER | LIFETIME,
.internal_cache_extra = &cache_alarm_extra,
.recv = alarm_recv,
+ .enqueue = alarm_enqueue,
+ .xmit = alarm_xmit,
};
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c
index d544a7b..a287ecd 100644
--- a/src/sync-ftfw.c
+++ b/src/sync-ftfw.c
@@ -168,11 +168,13 @@ static int do_cache_to_tx(void *data1, void *data2)
struct cache_object *obj = data2;
struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj);
- if (queue_in(rs_queue, &cn->qnode))
+ if (queue_in(rs_queue, &cn->qnode)) {
queue_del(&cn->qnode);
-
- queue_add(STATE_SYNC(tx_queue), &cn->qnode);
-
+ queue_add(STATE_SYNC(tx_queue), &cn->qnode);
+ } else {
+ if (queue_add(STATE_SYNC(tx_queue), &cn->qnode))
+ cache_object_get(obj);
+ }
return 0;
}
@@ -278,16 +280,15 @@ static int rs_queue_empty(struct queue_node *n, const void *data)
{
const struct nethdr_ack *h = data;
- if (h == NULL) {
- dp("inconditional remove from queue (seq=%u)\n", net->seq);
- queue_del(n);
- return 0;
- }
-
switch(n->type) {
case Q_ELEM_CTL: {
struct nethdr_ack *net = queue_node_data(n);
+ if (h == NULL) {
+ queue_del(n);
+ queue_object_free((struct queue_object *)n);
+ return 0;
+ }
if (before(net->seq, h->from))
return 0; /* continue */
else if (after(net->seq, h->to))
@@ -300,8 +301,15 @@ static int rs_queue_empty(struct queue_node *n, const void *data)
}
case Q_ELEM_OBJ: {
struct cache_ftfw *cn;
+ struct cache_object *obj;
cn = (struct cache_ftfw *) n;
+ if (h == NULL) {
+ queue_del(n);
+ obj = cache_data_get_object(STATE_SYNC(internal), cn);
+ cache_object_put(obj);
+ return 0;
+ }
if (before(cn->seq, h->from))
return 0;
else if (after(cn->seq, h->to))
@@ -309,6 +317,8 @@ static int rs_queue_empty(struct queue_node *n, const void *data)
dp("queue: deleting from queue (seq=%u)\n", cn->seq);
queue_del(n);
+ obj = cache_data_get_object(STATE_SYNC(internal), cn);
+ cache_object_put(obj);
break;
}
}
@@ -441,28 +451,6 @@ out:
return ret;
}
-static void ftfw_send(struct nethdr *net, struct cache_object *obj)
-{
- struct cache_ftfw *cn;
-
- switch(net->type) {
- case NET_T_STATE_NEW:
- case NET_T_STATE_UPD:
- case NET_T_STATE_DEL:
- cn = (struct cache_ftfw *)
- cache_get_extra(STATE_SYNC(internal), obj);
-
- if (queue_in(rs_queue, &cn->qnode))
- queue_del(&cn->qnode);
-
- nethdr_set_hello(net);
-
- cn->seq = ntohl(net->seq);
- queue_add(rs_queue, &cn->qnode);
- break;
- }
-}
-
static int tx_queue_xmit(struct queue_node *n, const void *data)
{
queue_del(n);
@@ -511,7 +499,9 @@ static int tx_queue_xmit(struct queue_node *n, const void *data)
ntohl(net->seq), net->flags, ntohs(net->len));
mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
- ftfw_send(net, obj);
+ cn->seq = ntohl(net->seq);
+ queue_add(rs_queue, &cn->qnode);
+ /* we release the object once we get the acknowlegment */
break;
}
}
@@ -527,6 +517,18 @@ static void ftfw_xmit(void)
queue_len(tx_queue), queue_len(rs_queue));
}
+static void ftfw_enqueue(struct cache_object *obj, int type)
+{
+ struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj);
+ if (queue_in(rs_queue, &cn->qnode)) {
+ queue_del(&cn->qnode);
+ queue_add(STATE_SYNC(tx_queue), &cn->qnode);
+ } else {
+ if (queue_add(STATE_SYNC(tx_queue), &cn->qnode))
+ cache_object_get(obj);
+ }
+}
+
struct sync_mode sync_ftfw = {
.internal_cache_flags = LIFETIME,
.external_cache_flags = LIFETIME,
@@ -535,6 +537,6 @@ struct sync_mode sync_ftfw = {
.kill = ftfw_kill,
.local = ftfw_local,
.recv = ftfw_recv,
- .send = ftfw_send,
+ .enqueue = ftfw_enqueue,
.xmit = ftfw_xmit,
};
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 5ae9062..00e2f7b 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -442,14 +442,7 @@ static void dump_sync(struct nf_conntrack *ct)
static void mcast_send_sync(struct cache_object *obj, int query)
{
- struct nethdr *net;
-
- net = BUILD_NETMSG(obj->ct, query);
-
- if (STATE_SYNC(sync)->send)
- STATE_SYNC(sync)->send(net, obj);
-
- mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ STATE_SYNC(sync)->enqueue(obj, query);
}
static int purge_step(void *data1, void *data2)
@@ -461,8 +454,11 @@ static int purge_step(void *data1, void *data2)
ret = nfct_query(h, NFCT_Q_GET, obj->ct);
if (ret == -1 && errno == ENOENT) {
debug_ct(obj->ct, "overrun purge resync");
- mcast_send_sync(obj, NET_T_STATE_DEL);
- cache_del_timer(STATE_SYNC(internal), obj, CONFIG(del_timeout));
+ if (obj->status != C_OBJ_DEAD) {
+ cache_object_set_status(obj, C_OBJ_DEAD);
+ mcast_send_sync(obj, NET_T_STATE_DEL);
+ cache_object_put(obj);
+ }
}
return 0;
@@ -552,8 +548,11 @@ static int event_destroy_sync(struct nf_conntrack *ct)
debug_ct(ct, "can't destroy");
return 0;
}
- mcast_send_sync(obj, NET_T_STATE_DEL);
- cache_del_timer(STATE_SYNC(internal), obj, CONFIG(del_timeout));
+ if (obj->status != C_OBJ_DEAD) {
+ cache_object_set_status(obj, C_OBJ_DEAD);
+ mcast_send_sync(obj, NET_T_STATE_DEL);
+ cache_object_put(obj);
+ }
debug_ct(ct, "internal destroy");
return 1;
}
diff --git a/src/sync-notrack.c b/src/sync-notrack.c
index 4ded298..3b547ee 100644
--- a/src/sync-notrack.c
+++ b/src/sync-notrack.c
@@ -71,7 +71,8 @@ static int do_cache_to_tx(void *data1, void *data2)
{
struct cache_object *obj = data2;
struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj);
- queue_add(STATE_SYNC(tx_queue), &cn->qnode);
+ if (queue_add(STATE_SYNC(tx_queue), &cn->qnode))
+ cache_object_get(obj);
return 0;
}
@@ -152,6 +153,7 @@ static int tx_queue_xmit(struct queue_node *n, const void *data2)
mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
queue_del(n);
+ cache_object_put(obj);
break;
}
}
@@ -163,11 +165,19 @@ static void notrack_xmit(void)
queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit);
}
+static void notrack_enqueue(struct cache_object *obj, int query)
+{
+ struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj);
+ if (queue_add(STATE_SYNC(tx_queue), &cn->qnode))
+ cache_object_get(obj);
+}
+
struct sync_mode sync_notrack = {
.internal_cache_flags = LIFETIME,
.external_cache_flags = LIFETIME,
.internal_cache_extra = &cache_notrack_extra,
.local = notrack_local,
.recv = notrack_recv,
+ .enqueue = notrack_enqueue,
.xmit = notrack_xmit,
};