diff options
-rw-r--r-- | include/channel.h | 6 | ||||
-rw-r--r-- | include/conntrackd.h | 3 | ||||
-rw-r--r-- | include/queue.h | 3 | ||||
-rw-r--r-- | src/channel.c | 119 | ||||
-rw-r--r-- | src/read_config_lex.l | 1 | ||||
-rw-r--r-- | src/read_config_yy.y | 18 | ||||
-rw-r--r-- | src/sync-mode.c | 5 |
7 files changed, 144 insertions, 11 deletions
diff --git a/include/channel.h b/include/channel.h index d06e510..9b5fad8 100644 --- a/include/channel.h +++ b/include/channel.h @@ -34,7 +34,8 @@ struct tcp_channel { #define CHANNEL_F_DEFAULT (1 << 0) #define CHANNEL_F_BUFFERED (1 << 1) #define CHANNEL_F_STREAM (1 << 2) -#define CHANNEL_F_MAX (1 << 3) +#define CHANNEL_F_ERRORS (1 << 3) +#define CHANNEL_F_MAX (1 << 4) union channel_type_conf { struct mcast_conf mcast; @@ -78,7 +79,8 @@ struct channel { void *data; }; -void channel_init(void); +int channel_init(void); +void channel_end(void); struct channel *channel_open(struct channel_conf *conf); void channel_close(struct channel *c); diff --git a/include/conntrackd.h b/include/conntrackd.h index ce8f9d4..7737532 100644 --- a/include/conntrackd.h +++ b/include/conntrackd.h @@ -96,6 +96,9 @@ struct ct_conf { int filter_from_kernelspace; int event_iterations_limit; struct { + int error_queue_length; + } channelc; + struct { int external_cache_disable; } sync; struct { diff --git a/include/queue.h b/include/queue.h index cca9cba..188e106 100644 --- a/include/queue.h +++ b/include/queue.h @@ -13,7 +13,8 @@ struct queue_node { enum { Q_ELEM_OBJ = 0, - Q_ELEM_CTL = 1 + Q_ELEM_CTL = 1, + Q_ELEM_ERR = 2, }; void queue_node_init(struct queue_node *n, int type); diff --git a/src/channel.c b/src/channel.c index c442b0b..818bb01 100644 --- a/src/channel.c +++ b/src/channel.c @@ -13,20 +13,36 @@ #include <string.h> #include <sys/ioctl.h> #include <netinet/in.h> +#include <errno.h> +#include "conntrackd.h" #include "channel.h" #include "network.h" +#include "queue.h" static struct channel_ops *ops[CHANNEL_MAX]; extern struct channel_ops channel_mcast; extern struct channel_ops channel_udp; extern struct channel_ops channel_tcp; -void channel_init(void) +static struct queue *errorq; + +int channel_init(void) { ops[CHANNEL_MCAST] = &channel_mcast; ops[CHANNEL_UDP] = &channel_udp; ops[CHANNEL_TCP] = &channel_tcp; + + errorq = queue_create("errorq", CONFIG(channelc).error_queue_length, 0); + if (errorq == NULL) { + return -1; + } + return 0; +} + +void channel_end(void) +{ + queue_destroy(errorq); } struct channel_buffer { @@ -133,9 +149,79 @@ channel_close(struct channel *c) free(c); } +struct channel_error { + char *data; + int len; +}; + +static void channel_enqueue_errors(struct channel *c) +{ + struct queue_object *qobj; + struct channel_error *error; + + qobj = queue_object_new(Q_ELEM_ERR, sizeof(struct channel_error)); + if (qobj == NULL) + return; + + error = (struct channel_error *)qobj->data; + error->len = c->buffer->len; + + error->data = malloc(c->buffer->len); + if (error->data == NULL) { + queue_object_free(qobj); + return; + } + memcpy(error->data, c->buffer->data, c->buffer->len); + if (queue_add(errorq, &qobj->qnode) < 0) { + if (errno == ENOSPC) { + struct queue_node *tail; + struct channel_error *tmp; + + tail = queue_del_head(errorq); + tmp = queue_node_data(tail); + free(tmp->data); + queue_object_free((struct queue_object *)tail); + + queue_add(errorq, &qobj->qnode); + } + } +} + +static int channel_handle_error_step(struct queue_node *n, const void *data2) +{ + struct channel_error *error; + const struct channel *c = data2; + int ret; + + error = queue_node_data(n); + ret = c->ops->send(c->data, error->data, error->len); + if (ret != -1) { + /* Success. Delete it from the error queue. */ + queue_del(n); + free(error->data); + queue_object_free((struct queue_object *)n); + } else { + /* We failed to deliver, give up now, try later. */ + return 1; + } + return 0; +} + +static int channel_handle_errors(struct channel *c) +{ + /* there are pending errors that we have to handle. */ + if (c->channel_flags & CHANNEL_F_ERRORS && queue_len(errorq) > 0) { + queue_iterate(errorq, c, channel_handle_error_step); + return queue_len(errorq) > 0; + } + return 0; +} + int channel_send(struct channel *c, const struct nethdr *net) { - int ret = 0, len = ntohs(net->len); + int ret = 0, len = ntohs(net->len), pending_errors; + + pending_errors = channel_handle_errors(c); if (!(c->channel_flags & CHANNEL_F_BUFFERED)) { c->ops->send(c->data, net, len); @@ -146,7 +232,19 @@ retry: memcpy(c->buffer->data + c->buffer->len, net, len); c->buffer->len += len; } else { - c->ops->send(c->data, c->buffer->data, c->buffer->len); + /* We've got pending packets to deliver, enqueue this + * packet to avoid possible re-ordering. */ + if (pending_errors) { + channel_enqueue_errors(c); + } else { + ret = c->ops->send(c->data, c->buffer->data, + c->buffer->len); + if (ret == -1 && + (c->channel_flags & CHANNEL_F_ERRORS)) { + /* Give it another chance to deliver. */ + channel_enqueue_errors(c); + } + } ret = 1; c->buffer->len = 0; goto retry; @@ -156,10 +254,23 @@ retry: int channel_send_flush(struct channel *c) { + int ret, pending_errors; + + pending_errors = channel_handle_errors(c); + if (!(c->channel_flags & CHANNEL_F_BUFFERED) || c->buffer->len == 0) return 0; - c->ops->send(c->data, c->buffer->data, c->buffer->len); + /* We still have pending errors to deliver, avoid any re-ordering. */ + if (pending_errors) { + channel_enqueue_errors(c); + } else { + ret = c->ops->send(c->data, c->buffer->data, c->buffer->len); + if (ret == -1 && (c->channel_flags & CHANNEL_F_ERRORS)) { + /* Give it another chance to deliver it. */ + channel_enqueue_errors(c); + } + } c->buffer->len = 0; return 1; } diff --git a/src/read_config_lex.l b/src/read_config_lex.l index 9c53c6c..b4be6f0 100644 --- a/src/read_config_lex.l +++ b/src/read_config_lex.l @@ -137,6 +137,7 @@ notrack [N|n][O|o][T|t][R|r][A|a][C|c][K|k] "Priority" { return T_PRIO; } "NetlinkEventsReliable" { return T_NETLINK_EVENTS_RELIABLE; } "DisableExternalCache" { return T_DISABLE_EXTERNAL_CACHE; } +"ErrorQueueLength" { return T_ERROR_QUEUE_LENGTH; } {is_on} { return T_ON; } {is_off} { return T_OFF; } diff --git a/src/read_config_yy.y b/src/read_config_yy.y index 0804689..5075cf0 100644 --- a/src/read_config_yy.y +++ b/src/read_config_yy.y @@ -72,7 +72,7 @@ static void __max_dedicated_links_reached(void); %token T_FROM T_USERSPACE T_KERNELSPACE T_EVENT_ITER_LIMIT T_DEFAULT %token T_NETLINK_OVERRUN_RESYNC T_NICE T_IPV4_DEST_ADDR T_IPV6_DEST_ADDR %token T_SCHEDULER T_TYPE T_PRIO T_NETLINK_EVENTS_RELIABLE -%token T_DISABLE_EXTERNAL_CACHE +%token T_DISABLE_EXTERNAL_CACHE T_ERROR_QUEUE_LENGTH %token <string> T_IP T_PATH_VAL %token <val> T_NUMBER @@ -584,7 +584,8 @@ tcp_line : T_TCP '{' tcp_options '}' conf.channel_type_global = CHANNEL_TCP; conf.channel[conf.channel_num].channel_type = CHANNEL_TCP; conf.channel[conf.channel_num].channel_flags = CHANNEL_F_BUFFERED | - CHANNEL_F_STREAM; + CHANNEL_F_STREAM | + CHANNEL_F_ERRORS; conf.channel_num++; }; @@ -600,7 +601,8 @@ tcp_line : T_TCP T_DEFAULT '{' tcp_options '}' conf.channel[conf.channel_num].channel_type = CHANNEL_TCP; conf.channel[conf.channel_num].channel_flags = CHANNEL_F_DEFAULT | CHANNEL_F_BUFFERED | - CHANNEL_F_STREAM; + CHANNEL_F_STREAM | + CHANNEL_F_ERRORS; conf.channel_default = conf.channel_num; conf.channel_num++; }; @@ -709,6 +711,12 @@ tcp_option: T_CHECKSUM T_OFF conf.channel[conf.channel_num].u.tcp.checksum = 1; }; +tcp_option: T_ERROR_QUEUE_LENGTH T_NUMBER +{ + __max_dedicated_links_reached(); + CONFIG(channelc).error_queue_length = $2; +}; + hashsize : T_HASHSIZE T_NUMBER { conf.hashsize = $2; @@ -1583,5 +1591,9 @@ init_config(char *filename) if (CONFIG(nl_overrun_resync) == 0) CONFIG(nl_overrun_resync) = 30; + /* default to 128 elements in the channel error queue */ + if (CONFIG(channelc).error_queue_length == 0) + CONFIG(channelc).error_queue_length = 128; + return 0; } diff --git a/src/sync-mode.c b/src/sync-mode.c index 6781f10..63fae68 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -295,7 +295,8 @@ static int init_sync(void) if (STATE_SYNC(external)->init() == -1) return -1; - channel_init(); + if (channel_init() == -1) + return -1; /* channel to send events on the wire */ STATE_SYNC(channel) = @@ -397,6 +398,8 @@ static void kill_sync(void) queue_destroy(STATE_SYNC(tx_queue)); + channel_end(); + origin_unregister(STATE_SYNC(commit).h); nfct_close(STATE_SYNC(commit).h); destroy_evfd(STATE_SYNC(commit).evfd); |