summaryrefslogtreecommitdiff
path: root/src/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel.c')
-rw-r--r--src/channel.c119
1 files changed, 115 insertions, 4 deletions
diff --git a/src/channel.c b/src/channel.c
index c442b0b..818bb01 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -13,20 +13,36 @@
#include <string.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
+#include <errno.h>
+#include "conntrackd.h"
#include "channel.h"
#include "network.h"
+#include "queue.h"
static struct channel_ops *ops[CHANNEL_MAX];
extern struct channel_ops channel_mcast;
extern struct channel_ops channel_udp;
extern struct channel_ops channel_tcp;
-void channel_init(void)
+static struct queue *errorq;
+
+int channel_init(void)
{
ops[CHANNEL_MCAST] = &channel_mcast;
ops[CHANNEL_UDP] = &channel_udp;
ops[CHANNEL_TCP] = &channel_tcp;
+
+ errorq = queue_create("errorq", CONFIG(channelc).error_queue_length, 0);
+ if (errorq == NULL) {
+ return -1;
+ }
+ return 0;
+}
+
+void channel_end(void)
+{
+ queue_destroy(errorq);
}
struct channel_buffer {
@@ -133,9 +149,79 @@ channel_close(struct channel *c)
free(c);
}
+struct channel_error {
+ char *data;
+ int len;
+};
+
+static void channel_enqueue_errors(struct channel *c)
+{
+ struct queue_object *qobj;
+ struct channel_error *error;
+
+ qobj = queue_object_new(Q_ELEM_ERR, sizeof(struct channel_error));
+ if (qobj == NULL)
+ return;
+
+ error = (struct channel_error *)qobj->data;
+ error->len = c->buffer->len;
+
+ error->data = malloc(c->buffer->len);
+ if (error->data == NULL) {
+ queue_object_free(qobj);
+ return;
+ }
+ memcpy(error->data, c->buffer->data, c->buffer->len);
+ if (queue_add(errorq, &qobj->qnode) < 0) {
+ if (errno == ENOSPC) {
+ struct queue_node *tail;
+ struct channel_error *tmp;
+
+ tail = queue_del_head(errorq);
+ tmp = queue_node_data(tail);
+ free(tmp->data);
+ queue_object_free((struct queue_object *)tail);
+
+ queue_add(errorq, &qobj->qnode);
+ }
+ }
+}
+
+static int channel_handle_error_step(struct queue_node *n, const void *data2)
+{
+ struct channel_error *error;
+ const struct channel *c = data2;
+ int ret;
+
+ error = queue_node_data(n);
+ ret = c->ops->send(c->data, error->data, error->len);
+ if (ret != -1) {
+ /* Success. Delete it from the error queue. */
+ queue_del(n);
+ free(error->data);
+ queue_object_free((struct queue_object *)n);
+ } else {
+ /* We failed to deliver, give up now, try later. */
+ return 1;
+ }
+ return 0;
+}
+
+static int channel_handle_errors(struct channel *c)
+{
+ /* there are pending errors that we have to handle. */
+ if (c->channel_flags & CHANNEL_F_ERRORS && queue_len(errorq) > 0) {
+ queue_iterate(errorq, c, channel_handle_error_step);
+ return queue_len(errorq) > 0;
+ }
+ return 0;
+}
+
int channel_send(struct channel *c, const struct nethdr *net)
{
- int ret = 0, len = ntohs(net->len);
+ int ret = 0, len = ntohs(net->len), pending_errors;
+
+ pending_errors = channel_handle_errors(c);
if (!(c->channel_flags & CHANNEL_F_BUFFERED)) {
c->ops->send(c->data, net, len);
@@ -146,7 +232,19 @@ retry:
memcpy(c->buffer->data + c->buffer->len, net, len);
c->buffer->len += len;
} else {
- c->ops->send(c->data, c->buffer->data, c->buffer->len);
+ /* We've got pending packets to deliver, enqueue this
+ * packet to avoid possible re-ordering. */
+ if (pending_errors) {
+ channel_enqueue_errors(c);
+ } else {
+ ret = c->ops->send(c->data, c->buffer->data,
+ c->buffer->len);
+ if (ret == -1 &&
+ (c->channel_flags & CHANNEL_F_ERRORS)) {
+ /* Give it another chance to deliver. */
+ channel_enqueue_errors(c);
+ }
+ }
ret = 1;
c->buffer->len = 0;
goto retry;
@@ -156,10 +254,23 @@ retry:
int channel_send_flush(struct channel *c)
{
+ int ret, pending_errors;
+
+ pending_errors = channel_handle_errors(c);
+
if (!(c->channel_flags & CHANNEL_F_BUFFERED) || c->buffer->len == 0)
return 0;
- c->ops->send(c->data, c->buffer->data, c->buffer->len);
+ /* We still have pending errors to deliver, avoid any re-ordering. */
+ if (pending_errors) {
+ channel_enqueue_errors(c);
+ } else {
+ ret = c->ops->send(c->data, c->buffer->data, c->buffer->len);
+ if (ret == -1 && (c->channel_flags & CHANNEL_F_ERRORS)) {
+ /* Give it another chance to deliver it. */
+ channel_enqueue_errors(c);
+ }
+ }
c->buffer->len = 0;
return 1;
}