/*
 * (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org>
 * (C) 2011 by Vyatta Inc. <http://www.vyatta.com>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 */

#include "conntrackd.h"
#include "sync.h"
#include "queue.h"
#include "network.h"
#include "alarm.h"
#include "log.h"
#include "cache.h"
#include "fds.h"

#include <string.h>
#include <errno.h>

#if 0 
#define dp printf
#else
#define dp(...)
#endif

struct queue *rs_queue;
static uint32_t exp_seq;
static uint32_t window;
static uint32_t ack_from;
static int ack_from_set = 0;
static struct alarm_block alive_alarm;

enum {
	HELLO_INIT,
	HELLO_SAY,
	HELLO_DONE,
};
static int hello_state = HELLO_INIT;
static int say_hello_back;

/* XXX: alive message expiration configurable */
#define ALIVE_INT 1

struct cache_ftfw {
	struct queue_node	qnode;
	uint32_t 		seq;
};

static void cache_ftfw_add(struct cache_object *obj, void *data)
{
	struct cache_ftfw *cn = data;
	/* These nodes are not inserted in the list */
	queue_node_init(&cn->qnode, Q_ELEM_OBJ);
}

static void cache_ftfw_del(struct cache_object *obj, void *data)
{
	struct cache_ftfw *cn = data;
	queue_del(&cn->qnode);
}

static struct cache_extra cache_ftfw_extra = {
	.size 		= sizeof(struct cache_ftfw),
	.add		= cache_ftfw_add,
	.destroy	= cache_ftfw_del
};

static void nethdr_set_hello(struct nethdr *net)
{
	switch(hello_state) {
	case HELLO_INIT:
		hello_state = HELLO_SAY;
		/* fall through */
	case HELLO_SAY:
		net->flags |= NET_F_HELLO;
		break;
	}
	if (say_hello_back) {
		net->flags |= NET_F_HELLO_BACK;
		say_hello_back = 0;
	}
}

static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
{
	struct queue_object *qobj;
	struct nethdr_ack *ack;

	qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
	if (qobj == NULL)
		return;

	ack		= (struct nethdr_ack *)qobj->data;
	ack->type 	= NET_T_CTL;
	ack->flags	= flags;
	ack->from	= from;
	ack->to		= to;

	if (queue_add(STATE_SYNC(tx_queue), &qobj->qnode) < 0)
		queue_object_free(qobj);
}

static void tx_queue_add_ctlmsg2(uint32_t flags)
{
	struct queue_object *qobj;
	struct nethdr *ctl;

	qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
	if (qobj == NULL)
		return;

	ctl		= (struct nethdr *)qobj->data;
	ctl->type 	= NET_T_CTL;
	ctl->flags	= flags;

	if (queue_add(STATE_SYNC(tx_queue), &qobj->qnode) < 0)
		queue_object_free(qobj);
}

/* this function is called from the alarm framework */
static void do_alive_alarm(struct alarm_block *a, void *data)
{
	if (ack_from_set && nethdr_track_is_seq_set()) {
		/* exp_seq contains the last update received */
		tx_queue_add_ctlmsg(NET_F_ACK,
				    ack_from,
				    STATE_SYNC(last_seq_recv));
		ack_from_set = 0;
	} else
		tx_queue_add_ctlmsg2(NET_F_ALIVE);

	add_alarm(&alive_alarm, ALIVE_INT, 0);
}

static int ftfw_init(void)
{
	rs_queue = queue_create("rsqueue", CONFIG(resend_queue_size), 0);
	if (rs_queue == NULL) {
		dlog(LOG_ERR, "cannot create rs queue");
		return -1;
	}

	init_alarm(&alive_alarm, NULL, do_alive_alarm);
	add_alarm(&alive_alarm, ALIVE_INT, 0);

	/* set ack window size */
	window = CONFIG(window_size);

	return 0;
}

static void ftfw_kill(void)
{
	queue_destroy(rs_queue);
}

static int do_cache_to_tx(void *data1, void *data2)
{
	struct cache_object *obj = data2;
	struct cache_ftfw *cn =
		cache_get_extra(STATE(mode)->internal->ct.data, obj);

	if (queue_in(rs_queue, &cn->qnode)) {
		queue_del(&cn->qnode);
		queue_add(STATE_SYNC(tx_queue), &cn->qnode);
	} else {
		if (queue_add(STATE_SYNC(tx_queue), &cn->qnode) > 0)
			cache_object_get(obj);
	}
	return 0;
}

static int rs_queue_dump(struct queue_node *n, const void *data2)
{
	const int *fd = data2;
	char buf[512];
	int size;

	switch(n->type) {
		case Q_ELEM_CTL: {
			struct nethdr *net = queue_node_data(n);
			size = sprintf(buf, "control -> seq:%u flags:%u\n",
					    net->seq, net->flags);
			break;
		}
		case Q_ELEM_OBJ: {
			struct cache_ftfw *cn = (struct cache_ftfw *) n;
			size = sprintf(buf, "object -> seq:%u\n", cn->seq);
		break;
		}
		default:
			return 0;
	}
	send(*fd, buf, size, 0);
	return 0;
}

static void ftfw_local_queue(int fd)
{
	char buf[512];
	int size;

	size = sprintf(buf, "resent queue (len=%u)\n", queue_len(rs_queue));
	send(fd, buf, size, 0);
	queue_iterate(rs_queue, &fd, rs_queue_dump);
}

static int ftfw_local(int fd, int type, void *data)
{
	int ret = LOCAL_RET_OK;

	switch(type) {
	case REQUEST_DUMP:
		dlog(LOG_NOTICE, "request resync");
		tx_queue_add_ctlmsg(NET_F_RESYNC, 0, 0);
		break;
	case SEND_BULK:
		dlog(LOG_NOTICE, "sending bulk update");
		cache_iterate(STATE(mode)->internal->ct.data,
			      NULL, do_cache_to_tx);
		break;
	case STATS_RSQUEUE:
		ftfw_local_queue(fd);
		break;
	}

	return ret;
}

static int rs_queue_to_tx(struct queue_node *n, const void *data)
{
	const struct nethdr_ack *nack = data;

	switch(n->type) {
	case Q_ELEM_CTL: {
		struct nethdr_ack *net = queue_node_data(n);

		if (before(net->seq, nack->from))
			return 0;	/* continue */
		else if (after(net->seq, nack->to))
			return 1;	/* break */

		dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
			net->seq, net->flags, net->len);

		queue_del(n);
		queue_add(STATE_SYNC(tx_queue), n);
		break;
	}
	case Q_ELEM_OBJ: {
		struct cache_ftfw *cn;

		cn = (struct cache_ftfw *) n;
		if (before(cn->seq, nack->from))
			return 0;
		else if (after(cn->seq, nack->to))
			return 1;

		dp("resending nack'ed (oldseq=%u)\n", cn->seq);

		queue_del(n);
		queue_add(STATE_SYNC(tx_queue), n);
		break;
	}
	}
	return 0;
}

static int rs_queue_empty(struct queue_node *n, const void *data)
{
	const struct nethdr_ack *h = data;

	switch(n->type) {
	case Q_ELEM_CTL: {
		struct nethdr_ack *net = queue_node_data(n);

		if (h == NULL) {
			queue_del(n);
			queue_object_free((struct queue_object *)n);
			return 0;
		}
		if (before(net->seq, h->from))
			return 0;	/* continue */
		else if (after(net->seq, h->to))
			return 1;	/* break */

		dp("remove from queue (seq=%u)\n", net->seq);
		queue_del(n);
		queue_object_free((struct queue_object *)n);
		break;
	}
	case Q_ELEM_OBJ: {
		struct cache_ftfw *cn;
		struct cache_object *obj;

		cn = (struct cache_ftfw *) n;
		if (h == NULL) {
			queue_del(n);
			obj = cache_data_get_object(STATE(mode)->internal->ct.data, cn);
			cache_object_put(obj);
			return 0;
		}
		if (before(cn->seq, h->from))
			return 0;
		else if (after(cn->seq, h->to))
			return 1;

		dp("queue: deleting from queue (seq=%u)\n", cn->seq);
		queue_del(n);
		obj = cache_data_get_object(STATE(mode)->internal->ct.data, cn);
		cache_object_put(obj);
		break;
	}
	}
	return 0;
}

static int digest_msg(const struct nethdr *net)
{
	if (IS_DATA(net))
		return MSG_DATA;

	else if (IS_ACK(net)) {
		const struct nethdr_ack *h = (const struct nethdr_ack *) net;

		if (before(h->to, h->from))
			return MSG_BAD;

		queue_iterate(rs_queue, h, rs_queue_empty);
		return MSG_CTL;

	} else if (IS_NACK(net)) {
		const struct nethdr_ack *nack = (const struct nethdr_ack *) net;

		if (before(nack->to, nack->from))
			return MSG_BAD;

		queue_iterate(rs_queue, nack, rs_queue_to_tx);
		return MSG_CTL;

	} else if (IS_RESYNC(net)) {
		dp("RESYNC ALL\n");
		cache_iterate(STATE(mode)->internal->ct.data, NULL, do_cache_to_tx);
		return MSG_CTL;

	} else if (IS_ALIVE(net))
		return MSG_CTL;

	return MSG_BAD;
}

static int digest_hello(const struct nethdr *net)
{
	int ret = 0;

	if (IS_HELLO(net)) {
		say_hello_back = 1;
		ret = 1;
	}
	if (IS_HELLO_BACK(net)) {
		/* this is a hello back for a requested hello */
		if (hello_state == HELLO_SAY)
			hello_state = HELLO_DONE;
	}

	return ret;
}

static int ftfw_recv(const struct nethdr *net)
{
	int ret = MSG_DATA;

	if (digest_hello(net)) {
		/* we have received a hello while we had data to acknowledge.
		 * reset the window, the other doesn't know anthing about it. */
		if (ack_from_set && before(net->seq, ack_from)) {
			window = CONFIG(window_size) - 1;
			ack_from = net->seq;
		}

		/* XXX: flush the resend queues since the other does not 
		 * know anything about that data, we are unreliable until 
		 * the helloing finishes */
		queue_iterate(rs_queue, NULL, rs_queue_empty);

		goto bypass;
	}

	switch (nethdr_track_seq(net->seq, &exp_seq)) {
	case SEQ_AFTER:
		ret = digest_msg(net);
		if (ret == MSG_BAD) {
			ret = MSG_BAD;
			goto out;
		}

		if (ack_from_set) {
			tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1);
			ack_from_set = 0;
		}

		tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);

		/* count this message as part of the new window */
		window = CONFIG(window_size) - 1;
		ack_from = net->seq;
		ack_from_set = 1;
		break;

	case SEQ_BEFORE:
		/* we don't accept delayed packets */
		ret = MSG_DROP;
		break;

	case SEQ_UNSET:
	case SEQ_IN_SYNC:
bypass:
		ret = digest_msg(net);
		if (ret == MSG_BAD) {
			ret = MSG_BAD;
			goto out;
		}

		if (!ack_from_set) {
			ack_from_set = 1;
			ack_from = net->seq;
		}

		if (--window <= 0) {
			/* received a window, send an acknowledgement */
			tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq);
			window = CONFIG(window_size);
			ack_from_set = 0;
		}
	}

out:
	if ((ret == MSG_DATA || ret == MSG_CTL))
		nethdr_track_update_seq(net->seq);

	return ret;
}

static void rs_queue_purge_full(void)
{
	struct queue_node *n;

	n = queue_del_head(rs_queue);
	switch(n->type) {
	case Q_ELEM_CTL: {
		struct queue_object *qobj = (struct queue_object *)n;
		queue_object_free(qobj);
		break;
	}
	case Q_ELEM_OBJ: {
		struct cache_ftfw *cn;
		struct cache_object *obj;

		cn = (struct cache_ftfw *)n;
		obj = cache_data_get_object(STATE(mode)->internal->ct.data, cn);
		cache_object_put(obj);
		break;
	}
	}
}

static int tx_queue_xmit(struct queue_node *n, const void *data)
{
	queue_del(n);

	switch(n->type) {
	case Q_ELEM_CTL: {
		struct nethdr *net = queue_node_data(n);

		nethdr_set_hello(net);

		if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
			nethdr_set_ack(net);
		} else {
			nethdr_set_ctl(net);
		}
		HDR_HOST2NETWORK(net);

		dp("tx_queue sq: %u fl:%u len:%u\n",
	               ntohl(net->seq), net->flags, ntohs(net->len));

		multichannel_send(STATE_SYNC(channel), net);
		HDR_NETWORK2HOST(net);

		if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
			if (queue_add(rs_queue, n) < 0) {
				if (errno == ENOSPC) {
					rs_queue_purge_full();
					queue_add(rs_queue, n);
				}
			}
		} else
			queue_object_free((struct queue_object *)n);
		break;
	}
	case Q_ELEM_OBJ: {
		struct cache_ftfw *cn;
		struct cache_object *obj;
		int type;
		struct nethdr *net;

		cn = (struct cache_ftfw *)n;
		obj = cache_data_get_object(STATE(mode)->internal->ct.data, cn);
		type = object_status_to_network_type(obj);
		net = obj->cache->ops->build_msg(obj, type);
		nethdr_set_hello(net);

		dp("tx_list sq: %u fl:%u len:%u\n",
	                ntohl(net->seq), net->flags, ntohs(net->len));

		multichannel_send(STATE_SYNC(channel), net);
		cn->seq = ntohl(net->seq);
		if (queue_add(rs_queue, &cn->qnode) < 0) {
			if (errno == ENOSPC) {
				rs_queue_purge_full();
				queue_add(rs_queue, &cn->qnode);
			}
		}
		/* we release the object once we get the acknowlegment */
		break;
	}
	}

	return 0;
}

static void ftfw_xmit(void)
{
	queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit);
	add_alarm(&alive_alarm, ALIVE_INT, 0);
	dp("tx_queue_len:%u rs_queue_len:%u\n", 
		queue_len(tx_queue), queue_len(rs_queue));
}

static void ftfw_enqueue(struct cache_object *obj, int type)
{
	struct cache_ftfw *cn =
		cache_get_extra(STATE(mode)->internal->ct.data, obj);
	if (queue_in(rs_queue, &cn->qnode)) {
		queue_del(&cn->qnode);
		queue_add(STATE_SYNC(tx_queue), &cn->qnode);
	} else {
		if (queue_add(STATE_SYNC(tx_queue), &cn->qnode) > 0)
			cache_object_get(obj);
	}
}

struct sync_mode sync_ftfw = {
	.internal_cache_flags	= NO_FEATURES,
	.external_cache_flags	= NO_FEATURES,
	.internal_cache_extra	= &cache_ftfw_extra,
	.init			= ftfw_init,
	.kill			= ftfw_kill,
	.local			= ftfw_local,
	.recv			= ftfw_recv,
	.enqueue		= ftfw_enqueue,
	.xmit			= ftfw_xmit,
};