summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
author/C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org </C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org>2007-07-09 19:11:53 +0000
committer/C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org </C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org>2007-07-09 19:11:53 +0000
commit96084e1a1f2e0a49c961bbddb9fffd2e03bfae3f (patch)
treec078d88b157faa7c5ce76bc4591205756f09742b /src
parent4df0be6fbf6a47905e0edf11c13b49ea0eacee5b (diff)
downloadconntrack-tools-96084e1a1f2e0a49c961bbddb9fffd2e03bfae3f.tar.gz
conntrack-tools-96084e1a1f2e0a49c961bbddb9fffd2e03bfae3f.zip
- conntrack-tools requires libnetfilter_conntrack >= 0.0.81
- add len field to nethdr - implement buffered send/recv to batch messages - stop using netlink format for network messages: use similar TLV-based format - reduce synchronization messages size up to 60% - introduce periodic alive messages for sync-nack protocol - timeslice alarm implementation: remove alarm pthread, remove locking - simplify debugging functions: use nfct_snprintf instead - remove major use of libnfnetlink functions: use libnetfilter_conntrack API - deprecate conntrackd -F, use conntrack -F instead - major rework of the network infrastructure: much simple, less messy
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am7
-rw-r--r--src/alarm.c37
-rw-r--r--src/buffer.c26
-rw-r--r--src/build.c113
-rw-r--r--src/cache.c40
-rw-r--r--src/cache_iterators.c54
-rw-r--r--src/cache_timer.c2
-rw-r--r--src/lock.c32
-rw-r--r--src/main.c1
-rw-r--r--src/mcast.c1
-rw-r--r--src/netlink.c137
-rw-r--r--src/network.c238
-rw-r--r--src/parse.c76
-rw-r--r--src/proxy.c124
-rw-r--r--src/run.c72
-rw-r--r--src/state_helper.c2
-rw-r--r--src/stats-mode.c10
-rw-r--r--src/sync-mode.c152
-rw-r--r--src/sync-nack.c260
-rw-r--r--src/sync-notrack.c6
-rw-r--r--src/timer.c75
21 files changed, 744 insertions, 721 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 8647d04..d71e23c 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -10,7 +10,7 @@ conntrack_SOURCES = conntrack.c
conntrack_LDADD = ../extensions/libct_proto_tcp.la ../extensions/libct_proto_udp.la ../extensions/libct_proto_icmp.la
conntrackd_SOURCES = alarm.c main.c run.c hash.c buffer.c \
- local.c log.c mcast.c netlink.c proxy.c lock.c \
+ local.c log.c mcast.c netlink.c \
ignore_pool.c \
cache.c cache_iterators.c \
cache_lifetime.c cache_timer.c \
@@ -18,9 +18,10 @@ conntrackd_SOURCES = alarm.c main.c run.c hash.c buffer.c \
traffic_stats.c stats-mode.c \
network.c \
state_helper.c state_helper_tcp.c \
+ timer.c \
+ build.c parse.c \
read_config_yy.y read_config_lex.l
-conntrackd_LDFLAGS = $(all_libraries) -lnfnetlink -lnetfilter_conntrack \
- -lpthread
+conntrackd_LDFLAGS = $(all_libraries) -lnfnetlink -lnetfilter_conntrack
EXTRA_DIST = read_config_yy.h
diff --git a/src/alarm.c b/src/alarm.c
index 1a465c2..b4db167 100644
--- a/src/alarm.c
+++ b/src/alarm.c
@@ -22,17 +22,13 @@
#include "conntrackd.h"
#include "alarm.h"
#include "jhash.h"
-#include <pthread.h>
#include <time.h>
#include <errno.h>
/* alarm cascade */
-#define ALARM_CASCADE_SIZE 10
+#define ALARM_CASCADE_SIZE STEPS_PER_SECONDS
static struct list_head *alarm_cascade;
-/* thread stuff */
-static pthread_t alarm_thread;
-
struct alarm_list *create_alarm()
{
return (struct alarm_list *) malloc(sizeof(struct alarm_list));
@@ -86,24 +82,11 @@ int mod_alarm(struct alarm_list *alarm, unsigned long expires)
return 0;
}
-void __run_alarms()
+void do_alarm_run(int step)
{
struct list_head *i, *tmp;
struct alarm_list *t;
- struct timespec req = {0, 1000000000 / ALARM_CASCADE_SIZE};
- struct timespec rem;
- static int step = 0;
-
-retry:
- if (nanosleep(&req, &rem) == -1) {
- /* interrupted syscall: retry with remaining time */
- if (errno == EINTR) {
- memcpy(&req, &rem, sizeof(struct timespec));
- goto retry;
- }
- }
- lock();
list_for_each_safe(i, tmp, &alarm_cascade[step]) {
t = (struct alarm_list *) i;
@@ -111,17 +94,9 @@ retry:
if (t->expires == 0)
t->function(t, t->data);
}
- step = (step + 1) < ALARM_CASCADE_SIZE ? step + 1 : 0;
- unlock();
-}
-
-void *run_alarms(void *foo)
-{
- while(1)
- __run_alarms();
}
-int create_alarm_thread()
+int init_alarm_scheduler()
{
int i;
@@ -132,10 +107,10 @@ int create_alarm_thread()
for (i=0; i<ALARM_CASCADE_SIZE; i++)
INIT_LIST_HEAD(&alarm_cascade[i]);
- return pthread_create(&alarm_thread, NULL, run_alarms, NULL);
+ return 0;
}
-int destroy_alarm_thread()
+void destroy_alarm_scheduler()
{
- return pthread_cancel(alarm_thread);
+ free(alarm_cascade);
}
diff --git a/src/buffer.c b/src/buffer.c
index fa0b859..23f7797 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -29,7 +29,6 @@ struct buffer *buffer_create(size_t max_size)
b->max_size = max_size;
INIT_LIST_HEAD(&b->head);
- pthread_mutex_init(&b->lock, NULL);
return b;
}
@@ -39,14 +38,12 @@ void buffer_destroy(struct buffer *b)
struct list_head *i, *tmp;
struct buffer_node *node;
- pthread_mutex_lock(&b->lock);
+ /* XXX: set cur_size and num_elems */
list_for_each_safe(i, tmp, &b->head) {
node = (struct buffer_node *) i;
list_del(i);
free(node);
}
- pthread_mutex_unlock(&b->lock);
- pthread_mutex_destroy(&b->lock);
free(b);
}
@@ -70,8 +67,6 @@ int buffer_add(struct buffer *b, const void *data, size_t size)
int ret = 0;
struct buffer_node *n;
- pthread_mutex_lock(&b->lock);
-
/* does it fit this buffer? */
if (size > b->max_size) {
errno = ENOSPC;
@@ -97,28 +92,22 @@ retry:
list_add(&n->head, &b->head);
b->cur_size += size;
+ b->num_elems++;
err:
- pthread_mutex_unlock(&b->lock);
return ret;
}
-void __buffer_del(struct buffer *b, void *data)
+void buffer_del(struct buffer *b, void *data)
{
struct buffer_node *n = container_of(data, struct buffer_node, data);
list_del(&n->head);
b->cur_size -= n->size;
+ b->num_elems--;
free(n);
}
-void buffer_del(struct buffer *b, void *data)
-{
- pthread_mutex_lock(&b->lock);
- buffer_del(b, data);
- pthread_mutex_unlock(&b->lock);
-}
-
void buffer_iterate(struct buffer *b,
void *data,
int (*iterate)(void *data1, void *data2))
@@ -126,11 +115,14 @@ void buffer_iterate(struct buffer *b,
struct list_head *i, *tmp;
struct buffer_node *n;
- pthread_mutex_lock(&b->lock);
list_for_each_safe(i, tmp, &b->head) {
n = (struct buffer_node *) i;
if (iterate(n->data, data))
break;
}
- pthread_mutex_unlock(&b->lock);
+}
+
+unsigned int buffer_len(struct buffer *b)
+{
+ return b->num_elems;
}
diff --git a/src/build.c b/src/build.c
new file mode 100644
index 0000000..b77dbc2
--- /dev/null
+++ b/src/build.c
@@ -0,0 +1,113 @@
+/*
+ * (C) 2006-2007 by Pablo Neira Ayuso <pablo@netfilter.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <string.h>
+#include <libnetfilter_conntrack/libnetfilter_conntrack.h>
+#include "network.h"
+
+static void addattr(struct netpld *pld, int attr, const void *data, int len)
+{
+ struct netattr *nta;
+ int tlen = NTA_LENGTH(len);
+
+ nta = PLD_TAIL(pld);
+ nta->nta_attr = htons(attr);
+ nta->nta_len = htons(len);
+ memcpy(NTA_DATA(nta), data, len);
+ pld->len += NTA_ALIGN(tlen);
+}
+
+static void __build_u8(const struct nf_conntrack *ct,
+ struct netpld *pld,
+ int attr)
+{
+ u_int8_t data = nfct_get_attr_u8(ct, attr);
+ addattr(pld, attr, &data, sizeof(u_int8_t));
+}
+
+static void __build_u16(const struct nf_conntrack *ct,
+ struct netpld *pld,
+ int attr)
+{
+ u_int16_t data = nfct_get_attr_u16(ct, attr);
+ data = htons(data);
+ addattr(pld, attr, &data, sizeof(u_int16_t));
+}
+
+static void __build_u32(const struct nf_conntrack *ct,
+ struct netpld *pld,
+ int attr)
+{
+ u_int32_t data = nfct_get_attr_u32(ct, attr);
+ data = htonl(data);
+ addattr(pld, attr, &data, sizeof(u_int32_t));
+}
+
+/* XXX: IPv6 and ICMP not supported */
+void build_netpld(struct nf_conntrack *ct, struct netpld *pld, int query)
+{
+ /* undo NAT */
+ if (nfct_getobjopt(ct, NFCT_GOPT_IS_SNAT))
+ nfct_setobjopt(ct, NFCT_SOPT_UNDO_SNAT);
+ if (nfct_getobjopt(ct, NFCT_GOPT_IS_DNAT))
+ nfct_setobjopt(ct, NFCT_SOPT_UNDO_DNAT);
+ if (nfct_getobjopt(ct, NFCT_GOPT_IS_SPAT))
+ nfct_setobjopt(ct, NFCT_SOPT_UNDO_SPAT);
+ if (nfct_getobjopt(ct, NFCT_GOPT_IS_DPAT))
+ nfct_setobjopt(ct, NFCT_SOPT_UNDO_DPAT);
+
+ /* build message */
+ if (nfct_attr_is_set(ct, ATTR_IPV4_SRC))
+ __build_u32(ct, pld, ATTR_IPV4_SRC);
+ if (nfct_attr_is_set(ct, ATTR_IPV4_DST))
+ __build_u32(ct, pld, ATTR_IPV4_DST);
+ if (nfct_attr_is_set(ct, ATTR_L3PROTO))
+ __build_u8(ct, pld, ATTR_L3PROTO);
+ if (nfct_attr_is_set(ct, ATTR_PORT_SRC))
+ __build_u16(ct, pld, ATTR_PORT_SRC);
+ if (nfct_attr_is_set(ct, ATTR_PORT_DST))
+ __build_u16(ct, pld, ATTR_PORT_DST);
+ if (nfct_attr_is_set(ct, ATTR_L4PROTO)) {
+ u_int8_t proto;
+
+ __build_u8(ct, pld, ATTR_L4PROTO);
+ proto = nfct_get_attr_u8(ct, ATTR_L4PROTO);
+ if (proto == IPPROTO_TCP) {
+ if (nfct_attr_is_set(ct, ATTR_TCP_STATE))
+ __build_u8(ct, pld, ATTR_TCP_STATE);
+ }
+ }
+ if (nfct_attr_is_set(ct, ATTR_SNAT_IPV4))
+ __build_u32(ct, pld, ATTR_SNAT_IPV4);
+ if (nfct_attr_is_set(ct, ATTR_DNAT_IPV4))
+ __build_u32(ct, pld, ATTR_DNAT_IPV4);
+ if (nfct_attr_is_set(ct, ATTR_SNAT_PORT))
+ __build_u16(ct, pld, ATTR_SNAT_PORT);
+ if (nfct_attr_is_set(ct, ATTR_DNAT_PORT))
+ __build_u16(ct, pld, ATTR_DNAT_PORT);
+ if (nfct_attr_is_set(ct, ATTR_TIMEOUT))
+ __build_u32(ct, pld, ATTR_TIMEOUT);
+ if (nfct_attr_is_set(ct, ATTR_MARK))
+ __build_u32(ct, pld, ATTR_MARK);
+ if (nfct_attr_is_set(ct, ATTR_STATUS))
+ __build_u32(ct, pld, ATTR_STATUS);
+
+ pld->query = query;
+
+ PLD_HOST2NETWORK(pld);
+}
diff --git a/src/cache.c b/src/cache.c
index 3bf331c..1e20d95 100644
--- a/src/cache.c
+++ b/src/cache.c
@@ -193,9 +193,7 @@ struct cache *cache_create(char *name,
void cache_destroy(struct cache *c)
{
- lock();
hashtable_destroy(c->h);
- unlock();
free(c->features);
free(c->feature_offset);
free(c);
@@ -237,7 +235,7 @@ static struct us_conntrack *__add(struct cache *c, struct nf_conntrack *ct)
return NULL;
}
-struct us_conntrack *__cache_add(struct cache *c, struct nf_conntrack *ct)
+struct us_conntrack *cache_add(struct cache *c, struct nf_conntrack *ct)
{
struct us_conntrack *u;
@@ -252,17 +250,6 @@ struct us_conntrack *__cache_add(struct cache *c, struct nf_conntrack *ct)
return NULL;
}
-struct us_conntrack *cache_add(struct cache *c, struct nf_conntrack *ct)
-{
- struct us_conntrack *u;
-
- lock();
- u = __cache_add(c, ct);
- unlock();
-
- return u;
-}
-
static struct us_conntrack *__update(struct cache *c, struct nf_conntrack *ct)
{
size_t size = c->h->datasize;
@@ -317,9 +304,7 @@ struct us_conntrack *cache_update(struct cache *c, struct nf_conntrack *ct)
{
struct us_conntrack *u;
- lock();
u = __cache_update(c, ct);
- unlock();
return u;
}
@@ -329,19 +314,15 @@ struct us_conntrack *cache_update_force(struct cache *c,
{
struct us_conntrack *u;
- lock();
if ((u = __update(c, ct)) != NULL) {
c->upd_ok++;
- unlock();
return u;
}
if ((u = __add(c, ct)) != NULL) {
c->add_ok++;
- unlock();
return u;
}
c->add_fail++;
- unlock();
return NULL;
}
@@ -354,9 +335,7 @@ int cache_test(struct cache *c, struct nf_conntrack *ct)
u->ct = ct;
- lock();
ret = hashtable_test(c->h, u);
- unlock();
return ret != NULL;
}
@@ -390,7 +369,7 @@ static int __del(struct cache *c, struct nf_conntrack *ct)
return 0;
}
-int __cache_del(struct cache *c, struct nf_conntrack *ct)
+int cache_del(struct cache *c, struct nf_conntrack *ct)
{
if (__del(c, ct)) {
c->del_ok++;
@@ -401,17 +380,6 @@ int __cache_del(struct cache *c, struct nf_conntrack *ct)
return 0;
}
-int cache_del(struct cache *c, struct nf_conntrack *ct)
-{
- int ret;
-
- lock();
- ret = __cache_del(c, ct);
- unlock();
-
- return ret;
-}
-
struct us_conntrack *cache_get_conntrack(struct cache *c, void *data)
{
return data - c->extra_offset;
@@ -427,7 +395,6 @@ void cache_stats(struct cache *c, int fd)
char buf[512];
int size;
- lock();
size = sprintf(buf, "cache %s:\n"
"current active connections:\t%12u\n"
"connections created:\t\t%12u\tfailed:\t%12u\n"
@@ -441,7 +408,6 @@ void cache_stats(struct cache *c, int fd)
c->upd_fail,
c->del_ok,
c->del_fail);
- unlock();
send(fd, buf, size, 0);
}
@@ -449,7 +415,5 @@ void cache_iterate(struct cache *c,
void *data,
int (*iterate)(void *data1, void *data2))
{
- lock();
hashtable_iterate(c->h, data, iterate);
- unlock();
}
diff --git a/src/cache_iterators.c b/src/cache_iterators.c
index 446cac8..1d1b2e8 100644
--- a/src/cache_iterators.c
+++ b/src/cache_iterators.c
@@ -71,37 +71,25 @@ void cache_dump(struct cache *c, int fd, int type)
.type = type
};
- /* does not require locking: called inside fork() */
hashtable_iterate(c->h, (void *) &tmp, do_dump);
}
+/* no need to clone, called from child process */
static int do_commit(void *data1, void *data2)
{
int ret;
struct cache *c = data1;
struct us_conntrack *u = data2;
- struct nf_conntrack *ct;
- char buf[4096];
- struct nlmsghdr *nlh = (struct nlmsghdr *)buf;
-
- ct = nfct_clone(u->ct);
- if (ct == NULL)
- return 0;
+ struct nf_conntrack *ct = u->ct;
+ /* XXX: related connections */
if (nfct_attr_is_set(ct, ATTR_STATUS)) {
u_int32_t status = nfct_get_attr_u32(ct, ATTR_STATUS);
status &= ~IPS_EXPECTED;
nfct_set_attr_u32(ct, ATTR_STATUS, status);
}
- if (nfct_getobjopt(ct, NFCT_GOPT_IS_SNAT))
- nfct_setobjopt(ct, NFCT_SOPT_UNDO_SNAT);
- if (nfct_getobjopt(ct, NFCT_GOPT_IS_DNAT))
- nfct_setobjopt(ct, NFCT_SOPT_UNDO_DNAT);
- if (nfct_getobjopt(ct, NFCT_GOPT_IS_SPAT))
- nfct_setobjopt(ct, NFCT_SOPT_UNDO_SPAT);
- if (nfct_getobjopt(ct, NFCT_GOPT_IS_DPAT))
- nfct_setobjopt(ct, NFCT_SOPT_UNDO_DPAT);
+ nfct_setobjopt(ct, NFCT_SOPT_SETUP_REPLY);
/*
* Set a reduced timeout for candidate-to-be-committed
@@ -109,20 +97,12 @@ static int do_commit(void *data1, void *data2)
*/
nfct_set_attr_u32(ct, ATTR_TIMEOUT, CONFIG(commit_timeout));
- ret = nfct_build_query(STATE(subsys_dump),
- NFCT_Q_CREATE_UPDATE,
- ct,
- nlh,
- sizeof(buf));
-
- free(ct);
-
if (ret == -1) {
dlog(STATE(log), "failed to build: %s", strerror(errno));
return 0;
}
- ret = nfnl_query(STATE(dump), nlh);
+ ret = nfct_query(STATE(dump), NFCT_Q_CREATE_UPDATE, ct);
if (ret == -1) {
switch(errno) {
case EEXIST:
@@ -146,7 +126,6 @@ void cache_commit(struct cache *c)
unsigned int commit_exist = c->commit_exist;
unsigned int commit_fail = c->commit_fail;
- /* does not require locking: called inside fork() */
hashtable_iterate(c->h, c, do_commit);
/* calculate new entries committed */
@@ -187,30 +166,7 @@ static int do_flush(void *data1, void *data2)
void cache_flush(struct cache *c)
{
- lock();
hashtable_iterate(c->h, c, do_flush);
hashtable_flush(c->h);
c->flush++;
- unlock();
-}
-
-#include "sync.h"
-#include "network.h"
-
-static int do_bulk(void *data1, void *data2)
-{
- int ret;
- struct us_conntrack *u = data2;
-
- mcast_build_send_update(u);
-
- /* keep iterating even if we have found errors */
- return 0;
-}
-
-void cache_bulk(struct cache *c)
-{
- lock();
- hashtable_iterate(c->h, NULL, do_bulk);
- unlock();
}
diff --git a/src/cache_timer.c b/src/cache_timer.c
index 213b59a..f3940f3 100644
--- a/src/cache_timer.c
+++ b/src/cache_timer.c
@@ -27,7 +27,7 @@ static void timeout(struct alarm_list *a, void *data)
struct us_conntrack *u = data;
debug_ct(u->ct, "expired timeout");
- __cache_del(u->cache, u->ct);
+ cache_del(u->cache, u->ct);
}
static void timer_add(struct us_conntrack *u, void *data)
diff --git a/src/lock.c b/src/lock.c
index cd68baf..e69de29 100644
--- a/src/lock.c
+++ b/src/lock.c
@@ -1,32 +0,0 @@
-/*
- * (C) 2006 by Pablo Neira Ayuso <pablo@netfilter.org>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#include <stdio.h>
-#include <pthread.h>
-
-static pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER;
-
-void lock()
-{
- pthread_mutex_lock(&global_lock);
-}
-
-void unlock()
-{
- pthread_mutex_unlock(&global_lock);
-}
diff --git a/src/main.c b/src/main.c
index a039793..007b76e 100644
--- a/src/main.c
+++ b/src/main.c
@@ -187,6 +187,7 @@ int main(int argc, char *argv[])
case 'F':
set_operation_mode(&type, REQUEST, argv);
action = FLUSH_MASTER;
+ break;
case 'f':
set_operation_mode(&type, REQUEST, argv);
action = FLUSH_CACHE;
diff --git a/src/mcast.c b/src/mcast.c
index 85992fb..6193a59 100644
--- a/src/mcast.c
+++ b/src/mcast.c
@@ -87,7 +87,6 @@ struct mcast_sock *mcast_server_create(struct mcast_conf *conf)
return NULL;
}
-
switch(conf->ipproto) {
case AF_INET:
if (setsockopt(m->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
diff --git a/src/netlink.c b/src/netlink.c
index 5f7cbeb..be5f82e 100644
--- a/src/netlink.c
+++ b/src/netlink.c
@@ -52,19 +52,10 @@ int ignore_conntrack(struct nf_conntrack *ct)
return 0;
}
-static int nl_event_handler(struct nlmsghdr *nlh,
- struct nfattr *nfa[],
- void *data)
+static int event_handler(enum nf_conntrack_msg_type type,
+ struct nf_conntrack *ct,
+ void *data)
{
- char tmp[1024];
- struct nf_conntrack *ct = (struct nf_conntrack *) tmp;
- int type;
-
- memset(tmp, 0, sizeof(tmp));
-
- if ((type = nfct_parse_conntrack(NFCT_T_ALL, nlh, ct)) == NFCT_T_ERROR)
- return NFCT_CB_STOP;
-
/*
* Ignore this conntrack: it talks about a
* connection that is not interesting for us.
@@ -74,13 +65,13 @@ static int nl_event_handler(struct nlmsghdr *nlh,
switch(type) {
case NFCT_T_NEW:
- STATE(mode)->event_new(ct, nlh);
+ STATE(mode)->event_new(ct);
break;
case NFCT_T_UPDATE:
- STATE(mode)->event_upd(ct, nlh);
+ STATE(mode)->event_upd(ct);
break;
case NFCT_T_DESTROY:
- if (STATE(mode)->event_dst(ct, nlh))
+ if (STATE(mode)->event_dst(ct))
update_traffic_stats(ct);
break;
default:
@@ -88,30 +79,31 @@ static int nl_event_handler(struct nlmsghdr *nlh,
break;
}
- return NFCT_CB_STOP;
+ return NFCT_CB_CONTINUE;
}
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/fcntl.h>
+
int nl_init_event_handler(void)
{
- struct nfnl_callback cb_events = {
- .call = nl_event_handler,
- .attr_count = CTA_MAX
- };
-
- /* open event netlink socket */
- STATE(event) = nfnl_open();
+ STATE(event) = nfct_open(CONNTRACK, NFCT_ALL_CT_GROUPS);
if (!STATE(event))
return -1;
+ fcntl(nfct_fd(STATE(event)), F_SETFL, O_NONBLOCK);
+
/* set up socket buffer size */
if (CONFIG(netlink_buffer_size))
- nfnl_rcvbufsiz(STATE(event), CONFIG(netlink_buffer_size));
+ nfnl_rcvbufsiz(nfct_nfnlh(STATE(event)),
+ CONFIG(netlink_buffer_size));
else {
socklen_t socklen = sizeof(unsigned int);
unsigned int read_size;
/* get current buffer size */
- getsockopt(nfnl_fd(STATE(event)), SOL_SOCKET,
+ getsockopt(nfct_fd(STATE(event)), SOL_SOCKET,
SO_RCVBUF, &read_size, &socklen);
CONFIG(netlink_buffer_size) = read_size;
@@ -122,40 +114,16 @@ int nl_init_event_handler(void)
CONFIG(netlink_buffer_size_max_grown) =
CONFIG(netlink_buffer_size);
- /* open event subsystem */
- STATE(subsys_event) = nfnl_subsys_open(STATE(event),
- NFNL_SUBSYS_CTNETLINK,
- IPCTNL_MSG_MAX,
- NFCT_ALL_CT_GROUPS);
- if (STATE(subsys_event) == NULL)
- return -1;
-
- /* register callback for new and update events */
- nfnl_callback_register(STATE(subsys_event),
- IPCTNL_MSG_CT_NEW,
- &cb_events);
-
- /* register callback for delete events */
- nfnl_callback_register(STATE(subsys_event),
- IPCTNL_MSG_CT_DELETE,
- &cb_events);
+ /* register callback for events */
+ nfct_callback_register(STATE(event), NFCT_T_ALL, event_handler, NULL);
return 0;
}
-static int nl_dump_handler(struct nlmsghdr *nlh,
- struct nfattr *nfa[],
- void *data)
+static int dump_handler(enum nf_conntrack_msg_type type,
+ struct nf_conntrack *ct,
+ void *data)
{
- char buf[1024];
- struct nf_conntrack *ct = (struct nf_conntrack *) buf;
- int type;
-
- memset(buf, 0, sizeof(buf));
-
- if ((type = nfct_parse_conntrack(NFCT_T_ALL, nlh, ct)) == NFCT_T_ERROR)
- return NFCT_CB_CONTINUE;
-
/*
* Ignore this conntrack: it talks about a
* connection that is not interesting for us.
@@ -165,7 +133,7 @@ static int nl_dump_handler(struct nlmsghdr *nlh,
switch(type) {
case NFCT_T_UPDATE:
- STATE(mode)->dump(ct, nlh);
+ STATE(mode)->dump(ct);
break;
default:
dlog(STATE(log), "received unknown msg from ctnetlink");
@@ -176,30 +144,15 @@ static int nl_dump_handler(struct nlmsghdr *nlh,
int nl_init_dump_handler(void)
{
- struct nfnl_callback cb_dump = {
- .call = nl_dump_handler,
- .attr_count = CTA_MAX
- };
-
/* open dump netlink socket */
- STATE(dump) = nfnl_open();
+ STATE(dump) = nfct_open(CONNTRACK, 0);
if (!STATE(dump))
return -1;
- /* open dump subsystem */
- STATE(subsys_dump) = nfnl_subsys_open(STATE(dump),
- NFNL_SUBSYS_CTNETLINK,
- IPCTNL_MSG_MAX,
- 0);
- if (STATE(subsys_dump) == NULL)
- return -1;
-
/* register callback for dumped entries */
- nfnl_callback_register(STATE(subsys_dump),
- IPCTNL_MSG_CT_NEW,
- &cb_dump);
+ nfct_callback_register(STATE(dump), NFCT_T_ALL, dump_handler, NULL);
- if (nl_dump_conntrack_table(STATE(dump), STATE(subsys_dump)) == -1)
+ if (nl_dump_conntrack_table() == -1)
return -1;
return 0;
@@ -207,7 +160,7 @@ int nl_init_dump_handler(void)
static int warned = 0;
-void nl_resize_socket_buffer(struct nfnl_handle *h)
+void nl_resize_socket_buffer(struct nfct_handle *h)
{
unsigned int s = CONFIG(netlink_buffer_size) * 2;
@@ -228,44 +181,14 @@ void nl_resize_socket_buffer(struct nfnl_handle *h)
warned = 1;
}
- CONFIG(netlink_buffer_size) = nfnl_rcvbufsiz(h, s);
+ CONFIG(netlink_buffer_size) = nfnl_rcvbufsiz(nfct_nfnlh(h), s);
/* notify the sysadmin */
dlog(STATE(log), "netlink socket buffer size has been set to %u bytes",
CONFIG(netlink_buffer_size));
}
-int nl_dump_conntrack_table(struct nfnl_handle *h,
- struct nfnl_subsys_handle *subsys)
+int nl_dump_conntrack_table(void)
{
- struct nfnlhdr req;
-
- memset(&req, 0, sizeof(req));
- nfct_build_query(subsys,
- NFCT_Q_DUMP,
- &CONFIG(family),
- &req,
- sizeof(req));
-
- if (nfnl_query(h, &req.nlh) == -1)
- return -1;
-
- return 0;
-}
-
-int nl_flush_master_conntrack_table(void)
-{
- struct nfnlhdr req;
-
- memset(&req, 0, sizeof(req));
- nfct_build_query(STATE(subsys_dump),
- NFCT_Q_FLUSH,
- &CONFIG(family),
- &req,
- sizeof(req));
-
- if (nfnl_query(STATE(dump), &req.nlh) == -1)
- return -1;
-
- return 0;
+ return nfct_query(STATE(dump), NFCT_Q_DUMP, &CONFIG(family));
}
diff --git a/src/network.c b/src/network.c
index 159bdf3..d162839 100644
--- a/src/network.c
+++ b/src/network.c
@@ -18,190 +18,159 @@
#include "conntrackd.h"
#include "network.h"
+#include "us-conntrack.h"
+#include "sync.h"
static unsigned int seq_set, cur_seq;
-static int send_netmsg(struct mcast_sock *m, void *data, unsigned int len)
+static int __do_send(struct mcast_sock *m, void *data, int len)
{
struct nethdr *net = data;
- if (!seq_set) {
- seq_set = 1;
- cur_seq = time(NULL);
- net->flags |= NET_F_HELLO;
- }
-
- net->flags = htons(net->flags);
- net->seq = htonl(cur_seq++);
-
#undef _TEST_DROP
#ifdef _TEST_DROP
static int drop = 0;
- if (++drop > 10) {
+ if (++drop >= 10) {
+ printf("drop sq: %u fl:%u len:%u\n",
+ ntohl(net->seq), ntohs(net->flags),
+ ntohs(net->len));
drop = 0;
- printf("dropping resend (seq=%u)\n", ntohl(net->seq));
return 0;
}
#endif
+ debug("send sq: %u fl:%u len:%u\n",
+ ntohl(net->seq), ntohs(net->flags),
+ ntohs(net->len));
+
return mcast_send(m, net, len);
}
-int mcast_send_netmsg(struct mcast_sock *m, void *data)
+static int __do_prepare(struct mcast_sock *m, void *data, int len)
{
- struct nlmsghdr *nlh = data + NETHDR_SIZ;
- unsigned int len = nlh->nlmsg_len + NETHDR_SIZ;
struct nethdr *net = data;
- if (nlh_host2network(nlh) == -1)
- return -1;
+ if (!seq_set) {
+ seq_set = 1;
+ cur_seq = time(NULL);
+ net->flags |= NET_F_HELLO;
+ }
+ net->len = len;
+ net->seq = cur_seq++;
+ HDR_HOST2NETWORK(net);
- return send_netmsg(m, data, len);
+ return len;
}
-int mcast_resend_netmsg(struct mcast_sock *m, void *data)
+static int __prepare_ctl(struct mcast_sock *m, void *data)
{
- struct nethdr *net = data;
- struct nlmsghdr *nlh = data + NETHDR_SIZ;
- unsigned int len;
+ struct nethdr_ack *nack = (struct nethdr_ack *) data;
- net->flags = ntohs(net->flags);
+ return __do_prepare(m, data, NETHDR_ACK_SIZ);
+}
- if (net->flags & NET_F_NACK || net->flags & NET_F_ACK)
- len = NETHDR_ACK_SIZ;
- else
- len = ntohl(nlh->nlmsg_len) + NETHDR_SIZ;
+static int __prepare_data(struct mcast_sock *m, void *data)
+{
+ struct nethdr *net = (struct nethdr *) data;
+ struct netpld *pld = NETHDR_DATA(net);
- return send_netmsg(m, data, len);
+ return __do_prepare(m, data, ntohs(pld->len) + NETPLD_SIZ + NETHDR_SIZ);
}
-int mcast_send_error(struct mcast_sock *m, void *data)
+int prepare_send_netmsg(struct mcast_sock *m, void *data)
{
- struct nethdr *net = data;
- unsigned int len = NETHDR_SIZ;
+ int ret = 0;
+ struct nethdr *net = (struct nethdr *) data;
- if (net->flags & NET_F_NACK || net->flags & NET_F_ACK) {
- struct nethdr_ack *nack = (struct nethdr_ack *) net;
- nack->from = htonl(nack->from);
- nack->to = htonl(nack->to);
- len = NETHDR_ACK_SIZ;
- }
+ if (IS_DATA(net))
+ ret = __prepare_data(m, data);
+ else if (IS_CTL(net))
+ ret = __prepare_ctl(m, data);
- return send_netmsg(m, data, len);
+ return ret;
}
-#include "us-conntrack.h"
-#include "sync.h"
+static int tx_buflen = 0;
+/* XXX: use buffer size of interface MTU */
+static char __tx_buf[1460], *tx_buf = __tx_buf;
-static int __build_send(struct us_conntrack *u, int type, int query)
+/* return 0 if it is not sent, otherwise return 1 */
+int mcast_buffered_send_netmsg(struct mcast_sock *m, void *data, int len)
{
- char __net[4096];
- struct nethdr *net = (struct nethdr *) __net;
+ int ret = 0;
+ struct nethdr *net = data;
- if (!state_helper_verdict(type, u->ct))
- return 0;
+retry:
+ if (tx_buflen + len < sizeof(__tx_buf)) {
+ memcpy(__tx_buf + tx_buflen, net, len);
+ tx_buflen += len;
+ } else {
+ __do_send(m, tx_buf, tx_buflen);
+ ret = 1;
+ tx_buflen = 0;
+ goto retry;
+ }
- int ret = build_network_msg(query,
- STATE(subsys_event),
- u->ct,
- __net,
- sizeof(__net));
+ return ret;
+}
- if (ret == -1)
- return -1;
+int mcast_buffered_pending_netmsg(struct mcast_sock *m)
+{
+ int ret;
+
+ if (tx_buflen == 0)
+ return 0;
- mcast_send_netmsg(STATE_SYNC(mcast_client), __net);
- if (STATE_SYNC(sync)->send)
- STATE_SYNC(sync)->send(type, net, u);
+ ret = __do_send(m, tx_buf, tx_buflen);
+ tx_buflen = 0;
- return 0;
+ return ret;
}
-int mcast_build_send_update(struct us_conntrack *u)
+int mcast_send_netmsg(struct mcast_sock *m, void *data)
{
- return __build_send(u, NFCT_T_UPDATE, NFCT_Q_UPDATE);
+ int ret;
+ int len = prepare_send_netmsg(m, data);
+
+ ret = mcast_buffered_send_netmsg(m, data, len);
+ mcast_buffered_pending_netmsg(m);
+
+ return ret;
}
-int mcast_build_send_destroy(struct us_conntrack *u)
+void build_netmsg(struct nf_conntrack *ct, int query, struct nethdr *net)
{
- return __build_send(u, NFCT_T_DESTROY, NFCT_Q_DESTROY);
+ struct netpld *pld = NETHDR_DATA(net);
+
+ build_netpld(ct, pld, query);
}
-int mcast_recv_netmsg(struct mcast_sock *m, void *data, int len)
+int handle_netmsg(struct nethdr *net)
{
int ret;
- struct nethdr *net = data;
- struct nlmsghdr *nlh = data + NETHDR_SIZ;
- struct nfgenmsg *nfhdr;
-
- ret = mcast_recv(m, net, len);
- if (ret <= 0)
- return ret;
+ struct netpld *pld = NETHDR_DATA(net);
/* message too small: no room for the header */
- if (ret < NETHDR_SIZ)
+ if (ntohs(net->len) < NETHDR_ACK_SIZ)
return -1;
- if (ntohs(net->flags) & NET_F_HELLO)
- STATE_SYNC(last_seq_recv) = ntohl(net->seq) - 1;
+ HDR_NETWORK2HOST(net);
- if (ntohs(net->flags) & NET_F_NACK || ntohs(net->flags) & NET_F_ACK) {
- struct nethdr_ack *nack = (struct nethdr_ack *) net;
+ if (IS_HELLO(net))
+ STATE_SYNC(last_seq_recv) = net->seq - 1;
- /* message too small: no room for the header */
- if (ret < NETHDR_ACK_SIZ)
- return -1;
-
- /* host byte order conversion */
- net->flags = ntohs(net->flags);
- net->seq = ntohl(net->seq);
-
- /* acknowledgement conversion */
- nack->from = ntohl(nack->from);
- nack->to = ntohl(nack->to);
-
- return ret;
- }
-
- if (ntohs(net->flags) & NET_F_RESYNC) {
- /* host byte order conversion */
- net->flags = ntohs(net->flags);
- net->seq = ntohl(net->seq);
-
- return ret;
- }
+ if (IS_CTL(net))
+ return 0;
/* information received is too small */
- if (ret < NLMSG_SPACE(sizeof(struct nfgenmsg)))
- return -1;
-
- /* information received and message length does not match */
- if (ret != ntohl(nlh->nlmsg_len) + NETHDR_SIZ)
- return -1;
-
- /* this message does not come from ctnetlink */
- if (NFNL_SUBSYS_ID(ntohs(nlh->nlmsg_type)) != NFNL_SUBSYS_CTNETLINK)
+ if (net->len < sizeof(struct netpld))
return -1;
- nfhdr = NLMSG_DATA(nlh);
-
- /* only AF_INET and AF_INET6 are supported */
- if (nfhdr->nfgen_family != AF_INET &&
- nfhdr->nfgen_family != AF_INET6)
- return -1;
-
- /* only process message coming from nfnetlink v0 */
- if (nfhdr->version != NFNETLINK_V0)
- return -1;
-
- /* host byte order conversion */
- net->flags = ntohs(net->flags);
- net->seq = ntohl(net->seq);
-
- if (nlh_network2host(nlh) == -1)
+ /* size mismatch! */
+ if (net->len < ntohs(pld->len) + NETHDR_SIZ)
return -1;
- return ret;
+ return 0;
}
int mcast_track_seq(u_int32_t seq, u_int32_t *exp_seq)
@@ -238,30 +207,3 @@ out:
return ret;
}
-
-int build_network_msg(const int msg_type,
- struct nfnl_subsys_handle *ssh,
- struct nf_conntrack *ct,
- void *buffer,
- unsigned int size)
-{
- memset(buffer, 0, size);
- buffer += NETHDR_SIZ;
- size -= NETHDR_SIZ;
- return nfct_build_query(ssh, msg_type, ct, buffer, size);
-}
-
-unsigned int parse_network_msg(struct nf_conntrack *ct,
- const struct nlmsghdr *nlh)
-{
- /*
- * The parsing of netlink messages going through network is
- * similar to the one that is done for messages coming from
- * kernel, therefore do not replicate more code and use the
- * function provided in the libraries.
- *
- * Yup, this is a hack 8)
- */
- return nfct_parse_conntrack(NFCT_T_ALL, nlh, ct);
-}
-
diff --git a/src/parse.c b/src/parse.c
new file mode 100644
index 0000000..81b70c4
--- /dev/null
+++ b/src/parse.c
@@ -0,0 +1,76 @@
+/*
+ * (C) 2006-2007 by Pablo Neira Ayuso <pablo@netfilter.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <string.h>
+#include <libnetfilter_conntrack/libnetfilter_conntrack.h>
+#include "network.h"
+
+static int parse_u8(struct nf_conntrack *ct, int attr, void *data)
+{
+ u_int8_t *value = (u_int8_t *) data;
+ nfct_set_attr_u8(ct, attr, *value);
+}
+
+static int parse_u16(struct nf_conntrack *ct, int attr, void *data)
+{
+ u_int16_t *value = (u_int16_t *) data;
+ nfct_set_attr_u16(ct, attr, ntohs(*value));
+}
+
+static int parse_u32(struct nf_conntrack *ct, int attr, void *data)
+{
+ u_int32_t *value = (u_int32_t *) data;
+ nfct_set_attr_u32(ct, attr, ntohl(*value));
+}
+
+typedef int (*parse)(struct nf_conntrack *ct, int attr, void *data);
+
+parse h[ATTR_MAX] = {
+ [ATTR_IPV4_SRC] = parse_u32,
+ [ATTR_IPV4_DST] = parse_u32,
+ [ATTR_L3PROTO] = parse_u8,
+ [ATTR_PORT_SRC] = parse_u16,
+ [ATTR_PORT_DST] = parse_u16,
+ [ATTR_L4PROTO] = parse_u8,
+ [ATTR_TCP_STATE] = parse_u8,
+ [ATTR_SNAT_IPV4] = parse_u32,
+ [ATTR_DNAT_IPV4] = parse_u32,
+ [ATTR_SNAT_PORT] = parse_u16,
+ [ATTR_DNAT_PORT] = parse_u16,
+ [ATTR_TIMEOUT] = parse_u32,
+ [ATTR_MARK] = parse_u32,
+ [ATTR_STATUS] = parse_u32,
+};
+
+void parse_netpld(struct nf_conntrack *ct, struct netpld *pld, int *query)
+{
+ int len;
+ struct netattr *attr;
+
+ PLD_NETWORK2HOST(pld);
+ len = pld->len;
+ attr = PLD_DATA(pld);
+
+ while (len > 0) {
+ ATTR_NETWORK2HOST(attr);
+ h[attr->nta_attr](ct, attr->nta_attr, NTA_DATA(attr));
+ attr = NTA_NEXT(attr, len);
+ }
+
+ *query = pld->query;
+}
diff --git a/src/proxy.c b/src/proxy.c
deleted file mode 100644
index b9bb04e..0000000
--- a/src/proxy.c
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * (C) 2006 by Pablo Neira Ayuso <pablo@netfilter.org>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#include <libnfnetlink/libnfnetlink.h>
-#include <libnetfilter_conntrack/libnetfilter_conntrack.h>
-
-#if 0
-#define dprintf printf
-#else
-#define dprintf
-#endif
-
-int nlh_payload_host2network(struct nfattr *nfa, int len)
-{
- struct nfattr *__nfa;
-
- while (NFA_OK(nfa, len)) {
-
- dprintf("type=%d nfalen=%d len=%d [%s]\n",
- nfa->nfa_type & 0x7fff,
- nfa->nfa_len, len,
- nfa->nfa_type & NFNL_NFA_NEST ? "NEST":"");
-
- if (nfa->nfa_type & NFNL_NFA_NEST) {
- if (NFA_PAYLOAD(nfa) > len)
- return -1;
-
- if (nlh_payload_host2network(NFA_DATA(nfa),
- NFA_PAYLOAD(nfa)) == -1)
- return -1;
- }
-
- __nfa = NFA_NEXT(nfa, len);
-
- nfa->nfa_type = htons(nfa->nfa_type);
- nfa->nfa_len = htons(nfa->nfa_len);
-
- nfa = __nfa;
- }
- return 0;
-}
-
-int nlh_host2network(struct nlmsghdr *nlh)
-{
- struct nfgenmsg *nfhdr = NLMSG_DATA(nlh);
- struct nfattr *cda[CTA_MAX];
- unsigned int min_len = NLMSG_SPACE(sizeof(struct nfgenmsg));
- unsigned int len = nlh->nlmsg_len - NLMSG_ALIGN(min_len);
-
- nlh->nlmsg_len = htonl(nlh->nlmsg_len);
- nlh->nlmsg_type = htons(nlh->nlmsg_type);
- nlh->nlmsg_flags = htons(nlh->nlmsg_flags);
- nlh->nlmsg_seq = htonl(nlh->nlmsg_seq);
- nlh->nlmsg_pid = htonl(nlh->nlmsg_pid);
-
- nfhdr->res_id = htons(nfhdr->res_id);
-
- return nlh_payload_host2network(NFM_NFA(NLMSG_DATA(nlh)), len);
-}
-
-int nlh_payload_network2host(struct nfattr *nfa, int len)
-{
- nfa->nfa_type = ntohs(nfa->nfa_type);
- nfa->nfa_len = ntohs(nfa->nfa_len);
-
- while(NFA_OK(nfa, len)) {
-
- dprintf("type=%d nfalen=%d len=%d [%s]\n",
- nfa->nfa_type & 0x7fff,
- nfa->nfa_len, len,
- nfa->nfa_type & NFNL_NFA_NEST ? "NEST":"");
-
- if (nfa->nfa_type & NFNL_NFA_NEST) {
- if (NFA_PAYLOAD(nfa) > len)
- return -1;
-
- if (nlh_payload_network2host(NFA_DATA(nfa),
- NFA_PAYLOAD(nfa)) == -1)
- return -1;
- }
-
- nfa = NFA_NEXT(nfa,len);
-
- if (len < NFA_LENGTH(0))
- break;
-
- nfa->nfa_type = ntohs(nfa->nfa_type);
- nfa->nfa_len = ntohs(nfa->nfa_len);
- }
- return 0;
-}
-
-int nlh_network2host(struct nlmsghdr *nlh)
-{
- struct nfgenmsg *nfhdr = NLMSG_DATA(nlh);
- struct nfattr *cda[CTA_MAX];
- unsigned int min_len = NLMSG_SPACE(sizeof(struct nfgenmsg));
- unsigned int len = ntohl(nlh->nlmsg_len) - NLMSG_ALIGN(min_len);
-
- nlh->nlmsg_len = ntohl(nlh->nlmsg_len);
- nlh->nlmsg_type = ntohs(nlh->nlmsg_type);
- nlh->nlmsg_flags = ntohs(nlh->nlmsg_flags);
- nlh->nlmsg_seq = ntohl(nlh->nlmsg_seq);
- nlh->nlmsg_pid = ntohl(nlh->nlmsg_pid);
-
- nfhdr->res_id = ntohs(nfhdr->res_id);
-
- return nlh_payload_network2host(NFM_NFA(NLMSG_DATA(nlh)), len);
-}
diff --git a/src/run.c b/src/run.c
index 0173c9f..644f82e 100644
--- a/src/run.c
+++ b/src/run.c
@@ -24,20 +24,21 @@
#include "us-conntrack.h"
#include <signal.h>
#include <stdlib.h>
+#include <unistd.h>
+#include "timer.h"
void killer(int foo)
{
/* no signals while handling signals */
sigprocmask(SIG_BLOCK, &STATE(block), NULL);
- nfnl_subsys_close(STATE(subsys_event));
- nfnl_subsys_close(STATE(subsys_dump));
- nfnl_close(STATE(event));
- nfnl_close(STATE(dump));
+ nfct_close(STATE(event));
+ nfct_close(STATE(dump));
ignore_pool_destroy(STATE(ignore_pool));
local_server_destroy(STATE(local));
STATE(mode)->kill();
+ destroy_alarm_scheduler();
unlink(CONFIG(lockfile));
dlog(STATE(log), "------- shutdown received ----");
close_log(STATE(log));
@@ -69,12 +70,16 @@ void local_handler(int fd, void *data)
switch(type) {
case FLUSH_MASTER:
- dlog(STATE(log), "[REQ] flushing master table");
- nl_flush_master_conntrack_table();
+ dlog(STATE(log), "[DEPRECATED] `conntrackd -F' is deprecated. "
+ "Use conntrack -F instead.");
+ if (fork() == 0) {
+ execlp("conntrack", "conntrack", "-F", NULL);
+ exit(EXIT_SUCCESS);
+ }
return;
case RESYNC_MASTER:
dlog(STATE(log), "[REQ] resync with master table");
- nl_dump_conntrack_table(STATE(dump), STATE(subsys_dump));
+ nl_dump_conntrack_table();
return;
}
@@ -104,6 +109,11 @@ int init(int mode)
return -1;
}
+ if (init_alarm_scheduler() == -1) {
+ dlog(STATE(log), "[FAIL] can't initialize alarm scheduler");
+ return -1;
+ }
+
/* local UNIX socket */
STATE(local) = local_server_create(&CONFIG(local));
if (!STATE(local)) {
@@ -147,22 +157,20 @@ int init(int mode)
return 0;
}
-#define POLL_NSECS 1
-
-static void __run(void)
+static void __run(long credit, int step)
{
int max, ret;
fd_set readfds;
struct timeval tv = {
- .tv_sec = POLL_NSECS,
- .tv_usec = 0
+ .tv_sec = 0,
+ .tv_usec = credit,
};
FD_ZERO(&readfds);
FD_SET(STATE(local), &readfds);
- FD_SET(nfnl_fd(STATE(event)), &readfds);
+ FD_SET(nfct_fd(STATE(event)), &readfds);
- max = MAX(STATE(local), nfnl_fd(STATE(event)));
+ max = MAX(STATE(local), nfct_fd(STATE(event)));
if (STATE(mode)->add_fds_to_set)
max = MAX(max, STATE(mode)->add_fds_to_set(&readfds));
@@ -185,8 +193,8 @@ static void __run(void)
do_local_server_step(STATE(local), NULL, local_handler);
/* conntrack event has happened */
- if (FD_ISSET(nfnl_fd(STATE(event)), &readfds)) {
- ret = nfnl_catch(STATE(event));
+ if (FD_ISSET(nfct_fd(STATE(event)), &readfds)) {
+ while ((ret = nfct_catch(STATE(event))) != -1);
if (ret == -1) {
switch(errno) {
case ENOBUFS:
@@ -197,6 +205,7 @@ static void __run(void)
* size and resync with master conntrack table.
*/
nl_resize_socket_buffer(STATE(event));
+ /* XXX: schedule overrun call via alarm */
STATE(mode)->overrun();
break;
case ENOENT:
@@ -206,6 +215,8 @@ static void __run(void)
* interested in. Just ignore it.
*/
break;
+ case EAGAIN:
+ break;
default:
dlog(STATE(log), "event catch says: %s",
strerror(errno));
@@ -214,14 +225,35 @@ static void __run(void)
}
}
- if (STATE(mode)->step)
- STATE(mode)->step(&readfds);
+ if (STATE(mode)->run)
+ STATE(mode)->run(&readfds, step);
sigprocmask(SIG_UNBLOCK, &STATE(block), NULL);
}
void run(void)
{
- while(1)
- __run();
+ int step = 0;
+ struct timer timer;
+
+ timer_init(&timer);
+
+ while(1) {
+ timer_start(&timer);
+ __run(GET_CREDITS(timer), step);
+ timer_stop(&timer);
+
+ if (timer_adjust_credit(&timer)) {
+ timer_start(&timer);
+ sigprocmask(SIG_BLOCK, &STATE(block), NULL);
+ do_alarm_run(step);
+ sigprocmask(SIG_UNBLOCK, &STATE(block), NULL);
+ timer_stop(&timer);
+
+ if (timer_adjust_credit(&timer))
+ dlog(STATE(log), "alarm run takes too long!");
+
+ step = (step + 1) < STEPS_PER_SECONDS ? step + 1 : 0;
+ }
+ }
}
diff --git a/src/state_helper.c b/src/state_helper.c
index 81b0d09..eba9d8f 100644
--- a/src/state_helper.c
+++ b/src/state_helper.c
@@ -25,7 +25,7 @@ int state_helper_verdict(int type, struct nf_conntrack *ct)
{
u_int8_t l4proto;
- if (type == NFCT_T_DESTROY)
+ if (type == NFCT_Q_DESTROY)
return ST_H_REPLICATE;
l4proto = nfct_get_attr_u8(ct, ATTR_ORIG_L4PROTO);
diff --git a/src/stats-mode.c b/src/stats-mode.c
index 92794cd..65bab1b 100644
--- a/src/stats-mode.c
+++ b/src/stats-mode.c
@@ -86,7 +86,7 @@ static int local_handler_stats(int fd, int type, void *data)
return ret;
}
-static void dump_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static void dump_stats(struct nf_conntrack *ct)
{
if (cache_update_force(STATE_STATS(cache), ct))
debug_ct(ct, "resync entry");
@@ -137,7 +137,7 @@ static void overrun_stats()
nfct_close(h);
}
-static void event_new_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static void event_new_stats(struct nf_conntrack *ct)
{
if (cache_add(STATE_STATS(cache), ct)) {
debug_ct(ct, "cache new");
@@ -150,7 +150,7 @@ static void event_new_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh)
}
}
-static void event_update_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static void event_update_stats(struct nf_conntrack *ct)
{
if (!cache_update_force(STATE_STATS(cache), ct)) {
debug_ct(ct, "can't update");
@@ -159,7 +159,7 @@ static void event_update_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh)
debug_ct(ct, "update");
}
-static int event_destroy_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static int event_destroy_stats(struct nf_conntrack *ct)
{
if (cache_del(STATE_STATS(cache), ct)) {
debug_ct(ct, "cache destroy");
@@ -173,7 +173,7 @@ static int event_destroy_stats(struct nf_conntrack *ct, struct nlmsghdr *nlh)
struct ct_mode stats_mode = {
.init = init_stats,
.add_fds_to_set = NULL,
- .step = NULL,
+ .run = NULL,
.local = local_handler_stats,
.kill = kill_stats,
.dump = dump_stats,
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 38ab016..f30cb95 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -27,43 +27,27 @@
#include <sys/select.h>
#include "sync.h"
#include "network.h"
+#include "buffer.h"
+#include "debug.h"
-/* handler for multicast messages received */
-static void mcast_handler()
+static void do_mcast_handler_step(struct nethdr *net)
{
- int ret;
- unsigned int type;
- char __net[4096];
- struct nethdr *net = (struct nethdr *) __net;
- struct nlmsghdr *nlh = (struct nlmsghdr *) (__net + NETHDR_SIZ);
+ unsigned int query;
+ struct netpld *pld = NETHDR_DATA(net);
char __ct[nfct_maxsize()];
struct nf_conntrack *ct = (struct nf_conntrack *) __ct;
struct us_conntrack *u = NULL;
- ret = mcast_recv_netmsg(STATE_SYNC(mcast_server), net, sizeof(__net));
- if (ret <= 0) {
- STATE(malformed)++;
- return;
- }
-
if (STATE_SYNC(sync)->recv(net))
return;
memset(ct, 0, sizeof(__ct));
- if ((type = parse_network_msg(ct, nlh)) == NFCT_T_ERROR) {
- STATE(malformed)++;
- return;
- }
+ /* XXX: check for malformed */
+ parse_netpld(ct, pld, &query);
- nfct_attr_unset(ct, ATTR_TIMEOUT);
- nfct_attr_unset(ct, ATTR_ORIG_COUNTER_BYTES);
- nfct_attr_unset(ct, ATTR_ORIG_COUNTER_PACKETS);
- nfct_attr_unset(ct, ATTR_REPL_COUNTER_BYTES);
- nfct_attr_unset(ct, ATTR_REPL_COUNTER_PACKETS);
-
- switch(type) {
- case NFCT_T_NEW:
+ switch(query) {
+ case NFCT_Q_CREATE:
retry:
if ((u = cache_add(STATE_SYNC(external), ct))) {
debug_ct(u->ct, "external new");
@@ -80,24 +64,57 @@ retry:
debug_ct(ct, "can't add");
}
break;
- case NFCT_T_UPDATE:
+ case NFCT_Q_UPDATE:
if ((u = cache_update_force(STATE_SYNC(external), ct))) {
debug_ct(u->ct, "external update");
} else
debug_ct(ct, "can't update");
break;
- case NFCT_T_DESTROY:
+ case NFCT_Q_DESTROY:
if (cache_del(STATE_SYNC(external), ct))
debug_ct(ct, "external destroy");
else
debug_ct(ct, "can't destroy");
break;
default:
- dlog(STATE(log), "mcast received unknown msg type %d\n", type);
+ dlog(STATE(log), "mcast received unknown query %d\n", query);
break;
}
}
+/* handler for multicast messages received */
+static void mcast_handler()
+{
+ int numbytes, remain;
+ char __net[4096], *ptr = __net;
+
+ numbytes = mcast_recv(STATE_SYNC(mcast_server), __net, sizeof(__net));
+ if (numbytes <= 0)
+ return;
+
+ remain = numbytes;
+ while (remain > 0) {
+ struct nethdr *net = (struct nethdr *) ptr;
+
+ if (ntohs(net->len) > remain) {
+ dlog(STATE(log), "fragmented messages");
+ break;
+ }
+
+ debug("recv sq: %u fl:%u len:%u (rem:%d)\n",
+ ntohl(net->seq), ntohs(net->flags),
+ ntohs(net->len), remain);
+
+ if (handle_netmsg(net) == -1) {
+ STATE(malformed)++;
+ return;
+ }
+ do_mcast_handler_step(net);
+ ptr += net->len;
+ remain -= net->len;
+ }
+}
+
static int init_sync(void)
{
int ret;
@@ -159,11 +176,6 @@ static int init_sync(void)
/* initialization of multicast sequence generation */
STATE_SYNC(last_seq_sent) = time(NULL);
- if (create_alarm_thread() == -1) {
- dlog(STATE(log), "[FAIL] can't initialize alarm thread");
- return -1;
- }
-
return 0;
}
@@ -174,11 +186,17 @@ static int add_fds_to_set_sync(fd_set *readfds)
return STATE_SYNC(mcast_server->fd);
}
-static void step_sync(fd_set *readfds)
+static void run_sync(fd_set *readfds, int step)
{
/* multicast packet has been received */
if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds))
mcast_handler();
+
+ if (STATE_SYNC(sync)->run)
+ STATE_SYNC(sync)->run(step);
+
+ /* flush pending messages */
+ mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
}
static void kill_sync()
@@ -189,8 +207,6 @@ static void kill_sync()
mcast_server_destroy(STATE_SYNC(mcast_server));
mcast_client_destroy(STATE_SYNC(mcast_client));
- destroy_alarm_thread();
-
if (STATE_SYNC(sync)->kill)
STATE_SYNC(sync)->kill();
}
@@ -267,10 +283,6 @@ static int local_handler_sync(int fd, int type, void *data)
STATE_SYNC(mcast_server));
dump_stats_sync(fd);
break;
- case SEND_BULK:
- dlog(STATE(log), "[REQ] sending bulk update");
- cache_bulk(STATE_SYNC(internal));
- break;
default:
if (STATE_SYNC(sync)->local)
ret = STATE_SYNC(sync)->local(fd, type, data);
@@ -280,7 +292,7 @@ static int local_handler_sync(int fd, int type, void *data)
return ret;
}
-static void dump_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static void dump_sync(struct nf_conntrack *ct)
{
/* This is required by kernels < 2.6.20 */
nfct_attr_unset(ct, ATTR_TIMEOUT);
@@ -294,23 +306,21 @@ static void dump_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
debug_ct(ct, "resync");
}
-static void mcast_send_sync(struct nlmsghdr *nlh,
- struct us_conntrack *u,
+static void mcast_send_sync(struct us_conntrack *u,
struct nf_conntrack *ct,
- int type)
+ int query)
{
- char __net[4096];
- struct nethdr *net = (struct nethdr *) __net;
-
- memset(__net, 0, sizeof(__net));
+ int len;
+ struct nethdr *net;
- if (!state_helper_verdict(type, ct))
+ if (!state_helper_verdict(query, ct))
return;
- memcpy(__net + NETHDR_SIZ, nlh, nlh->nlmsg_len);
- mcast_send_netmsg(STATE_SYNC(mcast_client), net);
+ net = BUILD_NETMSG(ct, query);
+ len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
if (STATE_SYNC(sync)->send)
- STATE_SYNC(sync)->send(type, net, u);
+ STATE_SYNC(sync)->send(net, u);
}
static int overrun_cb(enum nf_conntrack_msg_type type,
@@ -332,8 +342,16 @@ static int overrun_cb(enum nf_conntrack_msg_type type,
if (!cache_test(STATE_SYNC(internal), ct)) {
if ((u = cache_update_force(STATE_SYNC(internal), ct))) {
+ int len;
+
debug_ct(u->ct, "overrun resync");
- mcast_build_send_update(u);
+
+ struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE);
+ len = prepare_send_netmsg(STATE_SYNC(mcast_client),net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client),
+ net, len);
+ if (STATE_SYNC(sync)->send)
+ STATE_SYNC(sync)->send(net, u);
}
}
@@ -348,9 +366,17 @@ static int overrun_purge_step(void *data1, void *data2)
ret = nfct_query(h, NFCT_Q_GET, u->ct);
if (ret == -1 && errno == ENOENT) {
+ int len;
+ struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_DESTROY);
+
debug_ct(u->ct, "overrun purge resync");
- mcast_build_send_destroy(u);
- __cache_del(STATE_SYNC(internal), u->ct);
+
+ len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
+ if (STATE_SYNC(sync)->send)
+ STATE_SYNC(sync)->send(net, u);
+
+ cache_del(STATE_SYNC(internal), u->ct);
}
return 0;
@@ -382,7 +408,7 @@ static void overrun_sync()
nfct_close(h);
}
-static void event_new_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static void event_new_sync(struct nf_conntrack *ct)
{
struct us_conntrack *u;
@@ -394,12 +420,12 @@ static void event_new_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
nfct_attr_unset(ct, ATTR_TIMEOUT);
retry:
if ((u = cache_add(STATE_SYNC(internal), ct))) {
- mcast_send_sync(nlh, u, ct, NFCT_T_NEW);
+ mcast_send_sync(u, ct, NFCT_Q_CREATE);
debug_ct(u->ct, "internal new");
} else {
if (errno == EEXIST) {
cache_del(STATE_SYNC(internal), ct);
- mcast_send_sync(nlh, NULL, ct, NFCT_T_DESTROY);
+ mcast_send_sync(NULL, ct, NFCT_Q_DESTROY);
goto retry;
}
@@ -409,7 +435,7 @@ retry:
}
}
-static void event_update_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static void event_update_sync(struct nf_conntrack *ct)
{
struct us_conntrack *u;
@@ -420,15 +446,15 @@ static void event_update_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
return;
}
debug_ct(u->ct, "internal update");
- mcast_send_sync(nlh, u, ct, NFCT_T_UPDATE);
+ mcast_send_sync(u, ct, NFCT_Q_UPDATE);
}
-static int event_destroy_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static int event_destroy_sync(struct nf_conntrack *ct)
{
nfct_attr_unset(ct, ATTR_TIMEOUT);
if (cache_del(STATE_SYNC(internal), ct)) {
- mcast_send_sync(nlh, NULL, ct, NFCT_T_DESTROY);
+ mcast_send_sync(NULL, ct, NFCT_Q_DESTROY);
debug_ct(ct, "internal destroy");
} else
debug_ct(ct, "can't destroy");
@@ -437,7 +463,7 @@ static int event_destroy_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
struct ct_mode sync_mode = {
.init = init_sync,
.add_fds_to_set = add_fds_to_set_sync,
- .step = step_sync,
+ .run = run_sync,
.local = local_handler_sync,
.kill = kill_sync,
.dump = dump_sync,
diff --git a/src/sync-nack.c b/src/sync-nack.c
index 20ad1f4..dbda0a7 100644
--- a/src/sync-nack.c
+++ b/src/sync-nack.c
@@ -24,6 +24,7 @@
#include "buffer.h"
#include "debug.h"
#include "network.h"
+#include "alarm.h"
#include <libnfnetlink/libnfnetlink.h>
#include <libnetfilter_conntrack/libnetfilter_conntrack.h>
@@ -33,28 +34,34 @@
#define dp
#endif
-static LIST_HEAD(queue);
+static LIST_HEAD(rs_list);
+static LIST_HEAD(tx_list);
+static unsigned int tx_list_len;
+static struct buffer *rs_queue;
+static struct buffer *tx_queue;
struct cache_nack {
- struct list_head head;
+ struct list_head rs_list;
+ struct list_head tx_list;
u_int32_t seq;
};
static void cache_nack_add(struct us_conntrack *u, void *data)
{
struct cache_nack *cn = data;
- INIT_LIST_HEAD(&cn->head);
+ INIT_LIST_HEAD(&cn->rs_list);
+ INIT_LIST_HEAD(&cn->tx_list);
}
static void cache_nack_del(struct us_conntrack *u, void *data)
{
struct cache_nack *cn = data;
- if (cn->head.next == &cn->head &&
- cn->head.prev == &cn->head)
+ if (cn->rs_list.next == &cn->rs_list &&
+ cn->rs_list.prev == &cn->rs_list)
return;
- list_del(&cn->head);
+ list_del(&cn->rs_list);
}
static struct cache_extra cache_nack_extra = {
@@ -65,19 +72,31 @@ static struct cache_extra cache_nack_extra = {
static int nack_init()
{
- STATE_SYNC(buffer) = buffer_create(CONFIG(resend_buffer_size));
- if (STATE_SYNC(buffer) == NULL)
+ tx_queue = buffer_create(CONFIG(resend_buffer_size));
+ if (tx_queue == NULL) {
+ dlog(STATE(log), "[FAIL] cannot create tx buffer");
return -1;
+ }
+
+ rs_queue = buffer_create(CONFIG(resend_buffer_size));
+ if (rs_queue == NULL) {
+ dlog(STATE(log), "[FAIL] cannot create rs buffer");
+ return -1;
+ }
+
+ INIT_LIST_HEAD(&tx_list);
+ INIT_LIST_HEAD(&rs_list);
return 0;
}
static void nack_kill()
{
- buffer_destroy(STATE_SYNC(buffer));
+ buffer_destroy(rs_queue);
+ buffer_destroy(tx_queue);
}
-static void mcast_send_control(u_int32_t flags, u_int32_t from, u_int32_t to)
+static void tx_queue_add_ctlmsg(u_int32_t flags, u_int32_t from, u_int32_t to)
{
struct nethdr_ack ack = {
.flags = flags,
@@ -85,8 +104,19 @@ static void mcast_send_control(u_int32_t flags, u_int32_t from, u_int32_t to)
.to = to,
};
- mcast_send_error(STATE_SYNC(mcast_client), &ack);
- buffer_add(STATE_SYNC(buffer), &ack, NETHDR_ACK_SIZ);
+ buffer_add(tx_queue, &ack, NETHDR_ACK_SIZ);
+}
+
+static int do_cache_to_tx(void *data1, void *data2)
+{
+ struct us_conntrack *u = data2;
+ struct cache_nack *cn = cache_get_extra(STATE_SYNC(internal), u);
+
+ /* add to tx list */
+ list_add(&cn->tx_list, &tx_list);
+ tx_list_len++;
+
+ return 0;
}
static int nack_local(int fd, int type, void *data)
@@ -94,85 +124,78 @@ static int nack_local(int fd, int type, void *data)
int ret = 1;
switch(type) {
- case REQUEST_DUMP:
- mcast_send_control(NET_F_RESYNC, 0, 0);
- dlog(STATE(log), "[REQ] request resync");
- break;
- default:
- ret = 0;
- break;
+ case REQUEST_DUMP:
+ dlog(STATE(log), "[REQ] request resync");
+ tx_queue_add_ctlmsg(NET_F_RESYNC, 0, 0);
+ break;
+ case SEND_BULK:
+ dlog(STATE(log), "[REQ] sending bulk update");
+ cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx);
+ break;
+ default:
+ ret = 0;
+ break;
}
return ret;
}
-static int buffer_compare(void *data1, void *data2)
+static int rs_queue_to_tx(void *data1, void *data2)
{
struct nethdr *net = data1;
struct nethdr_ack *nack = data2;
- struct nlmsghdr *nlh = data1 + NETHDR_SIZ;
-
- unsigned old_seq = ntohl(net->seq);
- if (between(ntohl(net->seq), nack->from, nack->to)) {
- if (mcast_resend_netmsg(STATE_SYNC(mcast_client), net))
- dp("resend destroy (old seq=%u) (seq=%u)\n",
- old_seq, ntohl(net->seq));
+ if (between(net->seq, nack->from, nack->to)) {
+ dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
+ net->seq, net->flags, net->len);
+ buffer_add(tx_queue, net, net->len);
}
return 0;
}
-static int buffer_remove(void *data1, void *data2)
+static int rs_queue_empty(void *data1, void *data2)
{
struct nethdr *net = data1;
struct nethdr_ack *h = data2;
- if (between(ntohl(net->seq), h->from, h->to)) {
- dp("remove from buffer (seq=%u)\n", ntohl(net->seq));
- __buffer_del(STATE_SYNC(buffer), data1);
+ if (between(net->seq, h->from, h->to)) {
+ dp("remove from buffer (seq=%u)\n", net->seq);
+ buffer_del(rs_queue, data1);
}
return 0;
}
-static void queue_resend(struct cache *c, unsigned int from, unsigned int to)
+static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to)
{
struct list_head *n;
struct us_conntrack *u;
- list_for_each(n, &queue) {
+ list_for_each(n, &rs_list) {
struct cache_nack *cn = (struct cache_nack *) n;
struct us_conntrack *u;
u = cache_get_conntrack(STATE_SYNC(internal), cn);
-
if (between(cn->seq, from, to)) {
- debug_ct(u->ct, "resend nack");
- dp("resending nack'ed (oldseq=%u) ", cn->seq);
-
- if (mcast_build_send_update(u) == -1)
- continue;
-
- dp("(newseq=%u)\n", cn->seq);
+ dp("resending nack'ed (oldseq=%u)\n", cn->seq);
+ list_add(&cn->tx_list, &tx_list);
+ tx_list_len++;
}
}
}
-static void queue_empty(struct cache *c, unsigned int from, unsigned int to)
+static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
{
struct list_head *n, *tmp;
- struct us_conntrack *u;
- dp("ACK from %u to %u\n", from, to);
- list_for_each_safe(n, tmp, &queue) {
+ list_for_each_safe(n, tmp, &rs_list) {
struct cache_nack *cn = (struct cache_nack *) n;
+ struct us_conntrack *u;
u = cache_get_conntrack(STATE_SYNC(internal), cn);
if (between(cn->seq, from, to)) {
- dp("remove %u\n", cn->seq);
- debug_ct(u->ct, "ack received: empty queue");
dp("queue: deleting from queue (seq=%u)\n", cn->seq);
- list_del(&cn->head);
- INIT_LIST_HEAD(&cn->head);
+ list_del(&cn->rs_list);
+ INIT_LIST_HEAD(&cn->rs_list);
}
}
}
@@ -187,73 +210,149 @@ static int nack_recv(const struct nethdr *net)
if (!mcast_track_seq(net->seq, &exp_seq)) {
dp("OOS: sending nack (seq=%u)\n", exp_seq);
- mcast_send_control(NET_F_NACK, exp_seq, net->seq - 1);
+ tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
window = CONFIG(window_size);
} else {
/* received a window, send an acknowledgement */
if (--window == 0) {
dp("sending ack (seq=%u)\n", net->seq);
- mcast_send_control(NET_F_ACK,
- net->seq - CONFIG(window_size),
- net->seq);
+ tx_queue_add_ctlmsg(NET_F_ACK,
+ net->seq - CONFIG(window_size),
+ net->seq);
}
}
- if (net->flags & NET_F_NACK) {
+ if (IS_NACK(net)) {
struct nethdr_ack *nack = (struct nethdr_ack *) net;
dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to);
- queue_resend(STATE_SYNC(internal), nack->from, nack->to);
- buffer_iterate(STATE_SYNC(buffer), nack, buffer_compare);
+ rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to);
+ buffer_iterate(rs_queue, nack, rs_queue_to_tx);
return 1;
- } else if (net->flags & NET_F_RESYNC) {
+ } else if (IS_RESYNC(net)) {
dp("RESYNC ALL\n");
- cache_bulk(STATE_SYNC(internal));
+ cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx);
return 1;
- } else if (net->flags & NET_F_ACK) {
+ } else if (IS_ACK(net)) {
struct nethdr_ack *h = (struct nethdr_ack *) net;
dp("ACK: from seq=%u to seq=%u\n", h->from, h->to);
- queue_empty(STATE_SYNC(internal), h->from, h->to);
- buffer_iterate(STATE_SYNC(buffer), h, buffer_remove);
+ rs_list_empty(STATE_SYNC(internal), h->from, h->to);
+ buffer_iterate(rs_queue, h, rs_queue_empty);
+ return 1;
+ } else if (IS_ALIVE(net))
return 1;
- }
return 0;
}
-static void nack_send(int type,
- const struct nethdr *net,
- struct us_conntrack *u)
+static void nack_send(struct nethdr *net, struct us_conntrack *u)
{
- int size = NETHDR_SIZ;
- struct nlmsghdr *nlh = (struct nlmsghdr *) ((void *) net + size);
+ struct netpld *pld = NETHDR_DATA(net);
struct cache_nack *cn;
-
- size += ntohl(nlh->nlmsg_len);
- switch(type) {
- case NFCT_T_NEW:
- case NFCT_T_UPDATE:
+ HDR_NETWORK2HOST(net);
+
+ switch(ntohs(pld->query)) {
+ case NFCT_Q_CREATE:
+ case NFCT_Q_UPDATE:
cn = (struct cache_nack *)
cache_get_extra(STATE_SYNC(internal), u);
- if (cn->head.next == &cn->head &&
- cn->head.prev == &cn->head)
+ if (cn->rs_list.next == &cn->rs_list &&
+ cn->rs_list.prev == &cn->rs_list)
goto insert;
- list_del(&cn->head);
- INIT_LIST_HEAD(&cn->head);
+ list_del(&cn->rs_list);
+ INIT_LIST_HEAD(&cn->rs_list);
insert:
- cn->seq = ntohl(net->seq);
- list_add(&cn->head, &queue);
+ cn->seq = net->seq;
+ list_add(&cn->rs_list, &rs_list);
break;
- case NFCT_T_DESTROY:
- buffer_add(STATE_SYNC(buffer), net, size);
+ case NFCT_Q_DESTROY:
+ buffer_add(rs_queue, net, net->len);
break;
}
}
+static int tx_queue_xmit(void *data1, void *data2)
+{
+ struct nethdr *net = data1;
+ int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
+
+ dp("tx_queue sq: %u fl:%u len:%u\n",
+ ntohl(net->seq), ntohs(net->flags), ntohs(net->len));
+
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
+ HDR_NETWORK2HOST(net);
+
+ if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) {
+ dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n",
+ net->seq, net->flags, net->len);
+ buffer_add(rs_queue, net, net->len);
+ }
+ buffer_del(tx_queue, net);
+
+ return 0;
+}
+
+static int tx_list_xmit(struct list_head *i, struct us_conntrack *u)
+{
+ int ret;
+ struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE);
+ int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
+
+ dp("tx_list sq: %u fl:%u len:%u\n",
+ ntohl(net->seq), ntohs(net->flags),
+ ntohs(net->len));
+
+ list_del(i);
+ INIT_LIST_HEAD(i);
+ tx_list_len--;
+
+ ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
+ if (STATE_SYNC(sync)->send)
+ STATE_SYNC(sync)->send(net, u);
+
+ return ret;
+}
+
+static struct alarm_list alive_alarm;
+
+static void do_alive_alarm(struct alarm_list *a, void *data)
+{
+ del_alarm(a);
+ tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
+}
+
+static void nack_run(int step)
+{
+ struct list_head *i, *tmp;
+
+ /* send messages in the tx_queue */
+ buffer_iterate(tx_queue, NULL, tx_queue_xmit);
+
+ /* send conntracks in the tx_list */
+ list_for_each_safe(i, tmp, &tx_list) {
+ struct cache_nack *cn;
+ struct us_conntrack *u;
+
+ cn = container_of(i, struct cache_nack, tx_list);
+ u = cache_get_conntrack(STATE_SYNC(internal), cn);
+ tx_list_xmit(i, u);
+ }
+
+ if (alive_alarm.expires > 0)
+ mod_alarm(&alive_alarm, 1);
+ else {
+ init_alarm(&alive_alarm);
+ /* XXX: alive message expiration configurable */
+ set_alarm_expiration(&alive_alarm, 1);
+ set_alarm_function(&alive_alarm, do_alive_alarm);
+ add_alarm(&alive_alarm);
+ }
+}
+
struct sync_mode nack = {
.internal_cache_flags = LIFETIME,
.external_cache_flags = LIFETIME,
@@ -263,4 +362,5 @@ struct sync_mode nack = {
.local = nack_local,
.recv = nack_recv,
.send = nack_send,
+ .run = nack_run,
};
diff --git a/src/sync-notrack.c b/src/sync-notrack.c
index 1d6eba8..8588ecf 100644
--- a/src/sync-notrack.c
+++ b/src/sync-notrack.c
@@ -24,12 +24,16 @@
static void refresher(struct alarm_list *a, void *data)
{
+ int len;
+ struct nethdr *net;
struct us_conntrack *u = data;
debug_ct(u->ct, "persistence update");
a->expires = random() % CONFIG(refresh) + 1;
- mcast_build_send_update(u);
+ net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE);
+ len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
}
static void cache_notrack_add(struct us_conntrack *u, void *data)
diff --git a/src/timer.c b/src/timer.c
new file mode 100644
index 0000000..b85c286
--- /dev/null
+++ b/src/timer.c
@@ -0,0 +1,75 @@
+/*
+ * (C) 2006-2007 by Pablo Neira Ayuso <pablo@netfilter.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include "conntrackd.h"
+#include "timer.h"
+
+#define TIMESLICE_CREDIT (1000000 / STEPS_PER_SECONDS) /* 200 ms timeslice */
+
+void timer_init(struct timer *timer)
+{
+ memset(timer, 0, sizeof(struct timer));
+ timer->credits = TIMESLICE_CREDIT;
+}
+
+void timer_start(struct timer *timer)
+{
+ gettimeofday(&timer->start, NULL);
+}
+
+static int timeval_subtract(struct timeval *diff,
+ struct timeval *start,
+ struct timeval *stop)
+{
+ diff->tv_sec = stop->tv_sec - start->tv_sec;
+ diff->tv_usec = stop->tv_usec - start->tv_usec;
+
+ if (diff->tv_usec < 0) {
+ diff->tv_usec += 1000000;
+ diff->tv_sec--;
+ }
+
+ /* Return 1 if result is negative. */
+ return diff->tv_sec < 0;
+}
+
+void timer_stop(struct timer *timer)
+{
+ gettimeofday(&timer->stop, NULL);
+ timeval_subtract(&timer->diff, &timer->start, &timer->stop);
+}
+
+int timer_adjust_credit(struct timer *timer)
+{
+ if (timer->diff.tv_sec != 0) {
+ timer->credits = TIMESLICE_CREDIT;
+ return 1;
+ }
+
+ timer->credits -= timer->diff.tv_usec;
+
+ if (timer->credits < 0) {
+ timer->credits += TIMESLICE_CREDIT;
+ if (timer->credits < 0)
+ timer->credits = TIMESLICE_CREDIT;
+ return 1;
+ }
+ return 0;
+}