summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPablo Neira Ayuso <pablo@netfilter.org>2009-09-23 18:12:37 +0200
committerPablo Neira Ayuso <pablo@netfilter.org>2009-09-23 18:12:37 +0200
commit6360f319362fd13c86c3387a4bac57665d5ecd73 (patch)
treeef6b98cd0893dfbf7a53c2807d59db1a34dde10d
parent90bbd8b34565ff5106dde34e0798c5e33fb4b786 (diff)
downloadconntrack-tools-6360f319362fd13c86c3387a4bac57665d5ecd73.tar.gz
conntrack-tools-6360f319362fd13c86c3387a4bac57665d5ecd73.zip
conntrackd: add retention queue for TCP errors
Under stress, the TCP stack may return EAGAIN if there is not space left in the sender buffer. We also enqueue any other error. Signed-off-by: Pablo Neira Ayuso <pablo@netfilter.org>
-rw-r--r--include/channel.h6
-rw-r--r--include/conntrackd.h3
-rw-r--r--include/queue.h3
-rw-r--r--src/channel.c119
-rw-r--r--src/read_config_lex.l1
-rw-r--r--src/read_config_yy.y18
-rw-r--r--src/sync-mode.c5
7 files changed, 144 insertions, 11 deletions
diff --git a/include/channel.h b/include/channel.h
index d06e510..9b5fad8 100644
--- a/include/channel.h
+++ b/include/channel.h
@@ -34,7 +34,8 @@ struct tcp_channel {
#define CHANNEL_F_DEFAULT (1 << 0)
#define CHANNEL_F_BUFFERED (1 << 1)
#define CHANNEL_F_STREAM (1 << 2)
-#define CHANNEL_F_MAX (1 << 3)
+#define CHANNEL_F_ERRORS (1 << 3)
+#define CHANNEL_F_MAX (1 << 4)
union channel_type_conf {
struct mcast_conf mcast;
@@ -78,7 +79,8 @@ struct channel {
void *data;
};
-void channel_init(void);
+int channel_init(void);
+void channel_end(void);
struct channel *channel_open(struct channel_conf *conf);
void channel_close(struct channel *c);
diff --git a/include/conntrackd.h b/include/conntrackd.h
index ce8f9d4..7737532 100644
--- a/include/conntrackd.h
+++ b/include/conntrackd.h
@@ -96,6 +96,9 @@ struct ct_conf {
int filter_from_kernelspace;
int event_iterations_limit;
struct {
+ int error_queue_length;
+ } channelc;
+ struct {
int external_cache_disable;
} sync;
struct {
diff --git a/include/queue.h b/include/queue.h
index cca9cba..188e106 100644
--- a/include/queue.h
+++ b/include/queue.h
@@ -13,7 +13,8 @@ struct queue_node {
enum {
Q_ELEM_OBJ = 0,
- Q_ELEM_CTL = 1
+ Q_ELEM_CTL = 1,
+ Q_ELEM_ERR = 2,
};
void queue_node_init(struct queue_node *n, int type);
diff --git a/src/channel.c b/src/channel.c
index c442b0b..818bb01 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -13,20 +13,36 @@
#include <string.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
+#include <errno.h>
+#include "conntrackd.h"
#include "channel.h"
#include "network.h"
+#include "queue.h"
static struct channel_ops *ops[CHANNEL_MAX];
extern struct channel_ops channel_mcast;
extern struct channel_ops channel_udp;
extern struct channel_ops channel_tcp;
-void channel_init(void)
+static struct queue *errorq;
+
+int channel_init(void)
{
ops[CHANNEL_MCAST] = &channel_mcast;
ops[CHANNEL_UDP] = &channel_udp;
ops[CHANNEL_TCP] = &channel_tcp;
+
+ errorq = queue_create("errorq", CONFIG(channelc).error_queue_length, 0);
+ if (errorq == NULL) {
+ return -1;
+ }
+ return 0;
+}
+
+void channel_end(void)
+{
+ queue_destroy(errorq);
}
struct channel_buffer {
@@ -133,9 +149,79 @@ channel_close(struct channel *c)
free(c);
}
+struct channel_error {
+ char *data;
+ int len;
+};
+
+static void channel_enqueue_errors(struct channel *c)
+{
+ struct queue_object *qobj;
+ struct channel_error *error;
+
+ qobj = queue_object_new(Q_ELEM_ERR, sizeof(struct channel_error));
+ if (qobj == NULL)
+ return;
+
+ error = (struct channel_error *)qobj->data;
+ error->len = c->buffer->len;
+
+ error->data = malloc(c->buffer->len);
+ if (error->data == NULL) {
+ queue_object_free(qobj);
+ return;
+ }
+ memcpy(error->data, c->buffer->data, c->buffer->len);
+ if (queue_add(errorq, &qobj->qnode) < 0) {
+ if (errno == ENOSPC) {
+ struct queue_node *tail;
+ struct channel_error *tmp;
+
+ tail = queue_del_head(errorq);
+ tmp = queue_node_data(tail);
+ free(tmp->data);
+ queue_object_free((struct queue_object *)tail);
+
+ queue_add(errorq, &qobj->qnode);
+ }
+ }
+}
+
+static int channel_handle_error_step(struct queue_node *n, const void *data2)
+{
+ struct channel_error *error;
+ const struct channel *c = data2;
+ int ret;
+
+ error = queue_node_data(n);
+ ret = c->ops->send(c->data, error->data, error->len);
+ if (ret != -1) {
+ /* Success. Delete it from the error queue. */
+ queue_del(n);
+ free(error->data);
+ queue_object_free((struct queue_object *)n);
+ } else {
+ /* We failed to deliver, give up now, try later. */
+ return 1;
+ }
+ return 0;
+}
+
+static int channel_handle_errors(struct channel *c)
+{
+ /* there are pending errors that we have to handle. */
+ if (c->channel_flags & CHANNEL_F_ERRORS && queue_len(errorq) > 0) {
+ queue_iterate(errorq, c, channel_handle_error_step);
+ return queue_len(errorq) > 0;
+ }
+ return 0;
+}
+
int channel_send(struct channel *c, const struct nethdr *net)
{
- int ret = 0, len = ntohs(net->len);
+ int ret = 0, len = ntohs(net->len), pending_errors;
+
+ pending_errors = channel_handle_errors(c);
if (!(c->channel_flags & CHANNEL_F_BUFFERED)) {
c->ops->send(c->data, net, len);
@@ -146,7 +232,19 @@ retry:
memcpy(c->buffer->data + c->buffer->len, net, len);
c->buffer->len += len;
} else {
- c->ops->send(c->data, c->buffer->data, c->buffer->len);
+ /* We've got pending packets to deliver, enqueue this
+ * packet to avoid possible re-ordering. */
+ if (pending_errors) {
+ channel_enqueue_errors(c);
+ } else {
+ ret = c->ops->send(c->data, c->buffer->data,
+ c->buffer->len);
+ if (ret == -1 &&
+ (c->channel_flags & CHANNEL_F_ERRORS)) {
+ /* Give it another chance to deliver. */
+ channel_enqueue_errors(c);
+ }
+ }
ret = 1;
c->buffer->len = 0;
goto retry;
@@ -156,10 +254,23 @@ retry:
int channel_send_flush(struct channel *c)
{
+ int ret, pending_errors;
+
+ pending_errors = channel_handle_errors(c);
+
if (!(c->channel_flags & CHANNEL_F_BUFFERED) || c->buffer->len == 0)
return 0;
- c->ops->send(c->data, c->buffer->data, c->buffer->len);
+ /* We still have pending errors to deliver, avoid any re-ordering. */
+ if (pending_errors) {
+ channel_enqueue_errors(c);
+ } else {
+ ret = c->ops->send(c->data, c->buffer->data, c->buffer->len);
+ if (ret == -1 && (c->channel_flags & CHANNEL_F_ERRORS)) {
+ /* Give it another chance to deliver it. */
+ channel_enqueue_errors(c);
+ }
+ }
c->buffer->len = 0;
return 1;
}
diff --git a/src/read_config_lex.l b/src/read_config_lex.l
index 9c53c6c..b4be6f0 100644
--- a/src/read_config_lex.l
+++ b/src/read_config_lex.l
@@ -137,6 +137,7 @@ notrack [N|n][O|o][T|t][R|r][A|a][C|c][K|k]
"Priority" { return T_PRIO; }
"NetlinkEventsReliable" { return T_NETLINK_EVENTS_RELIABLE; }
"DisableExternalCache" { return T_DISABLE_EXTERNAL_CACHE; }
+"ErrorQueueLength" { return T_ERROR_QUEUE_LENGTH; }
{is_on} { return T_ON; }
{is_off} { return T_OFF; }
diff --git a/src/read_config_yy.y b/src/read_config_yy.y
index 0804689..5075cf0 100644
--- a/src/read_config_yy.y
+++ b/src/read_config_yy.y
@@ -72,7 +72,7 @@ static void __max_dedicated_links_reached(void);
%token T_FROM T_USERSPACE T_KERNELSPACE T_EVENT_ITER_LIMIT T_DEFAULT
%token T_NETLINK_OVERRUN_RESYNC T_NICE T_IPV4_DEST_ADDR T_IPV6_DEST_ADDR
%token T_SCHEDULER T_TYPE T_PRIO T_NETLINK_EVENTS_RELIABLE
-%token T_DISABLE_EXTERNAL_CACHE
+%token T_DISABLE_EXTERNAL_CACHE T_ERROR_QUEUE_LENGTH
%token <string> T_IP T_PATH_VAL
%token <val> T_NUMBER
@@ -584,7 +584,8 @@ tcp_line : T_TCP '{' tcp_options '}'
conf.channel_type_global = CHANNEL_TCP;
conf.channel[conf.channel_num].channel_type = CHANNEL_TCP;
conf.channel[conf.channel_num].channel_flags = CHANNEL_F_BUFFERED |
- CHANNEL_F_STREAM;
+ CHANNEL_F_STREAM |
+ CHANNEL_F_ERRORS;
conf.channel_num++;
};
@@ -600,7 +601,8 @@ tcp_line : T_TCP T_DEFAULT '{' tcp_options '}'
conf.channel[conf.channel_num].channel_type = CHANNEL_TCP;
conf.channel[conf.channel_num].channel_flags = CHANNEL_F_DEFAULT |
CHANNEL_F_BUFFERED |
- CHANNEL_F_STREAM;
+ CHANNEL_F_STREAM |
+ CHANNEL_F_ERRORS;
conf.channel_default = conf.channel_num;
conf.channel_num++;
};
@@ -709,6 +711,12 @@ tcp_option: T_CHECKSUM T_OFF
conf.channel[conf.channel_num].u.tcp.checksum = 1;
};
+tcp_option: T_ERROR_QUEUE_LENGTH T_NUMBER
+{
+ __max_dedicated_links_reached();
+ CONFIG(channelc).error_queue_length = $2;
+};
+
hashsize : T_HASHSIZE T_NUMBER
{
conf.hashsize = $2;
@@ -1583,5 +1591,9 @@ init_config(char *filename)
if (CONFIG(nl_overrun_resync) == 0)
CONFIG(nl_overrun_resync) = 30;
+ /* default to 128 elements in the channel error queue */
+ if (CONFIG(channelc).error_queue_length == 0)
+ CONFIG(channelc).error_queue_length = 128;
+
return 0;
}
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 6781f10..63fae68 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -295,7 +295,8 @@ static int init_sync(void)
if (STATE_SYNC(external)->init() == -1)
return -1;
- channel_init();
+ if (channel_init() == -1)
+ return -1;
/* channel to send events on the wire */
STATE_SYNC(channel) =
@@ -397,6 +398,8 @@ static void kill_sync(void)
queue_destroy(STATE_SYNC(tx_queue));
+ channel_end();
+
origin_unregister(STATE_SYNC(commit).h);
nfct_close(STATE_SYNC(commit).h);
destroy_evfd(STATE_SYNC(commit).evfd);