/*
 * (C) 2006-2012 by Pablo Neira Ayuso <pablo@netfilter.org>
 * (C) 2011 by Vyatta Inc. <http://www.vyatta.com>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 */

#include "sync.h"
#include "netlink.h"
#include "traffic_stats.h"
#include "log.h"
#include "cache.h"
#include "conntrackd.h"
#include "network.h"
#include "fds.h"
#include "event.h"
#include "queue.h"
#include "process.h"
#include "origin.h"
#include "internal.h"
#include "external.h"

#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <string.h>
#include <stdlib.h>
#include <limits.h>
#include <net/if.h>
#include <fcntl.h>

static struct nf_conntrack *msg2ct_alloc(struct nethdr *net, size_t remain)
{
	struct nf_conntrack *ct;

	/* TODO: add stats on ENOMEM errors in the future. */
	ct = nfct_new();
	if (ct == NULL)
		return NULL;

	if (msg2ct(ct, net, remain) == -1) {
		STATE_SYNC(error).msg_rcv_malformed++;
		STATE_SYNC(error).msg_rcv_bad_payload++;
		nfct_destroy(ct);
		return NULL;
	}
	return ct;
}

static struct nf_expect *msg2exp_alloc(struct nethdr *net, size_t remain)
{
	struct nf_expect *exp;

	/* TODO: add stats on ENOMEM errors in the future. */
	exp = nfexp_new();
	if (exp == NULL)
		return NULL;

	if (msg2exp(exp, net, remain) == -1) {
		STATE_SYNC(error).msg_rcv_malformed++;
		STATE_SYNC(error).msg_rcv_bad_payload++;
		nfexp_destroy(exp);
		return NULL;
	}
	return exp;
}

static void
do_channel_handler_step(struct channel *c, struct nethdr *net, size_t remain)
{
	struct nf_conntrack *ct = NULL;
	struct nf_expect *exp = NULL;

	if (net->version != CONNTRACKD_PROTOCOL_VERSION) {
		STATE_SYNC(error).msg_rcv_malformed++;
		STATE_SYNC(error).msg_rcv_bad_version++;
		return;
	}

	switch (STATE_SYNC(sync)->recv(net)) {
	case MSG_DATA:
		multichannel_change_current_channel(STATE_SYNC(channel), c);
		break;
	case MSG_CTL:
		multichannel_change_current_channel(STATE_SYNC(channel), c);
		return;
	case MSG_BAD:
		STATE_SYNC(error).msg_rcv_malformed++;
		STATE_SYNC(error).msg_rcv_bad_header++;
		return;
	case MSG_DROP:
		return;
	default:
		break;
	}

	if (net->type > NET_T_STATE_MAX) {
		STATE_SYNC(error).msg_rcv_malformed++;
		STATE_SYNC(error).msg_rcv_bad_type++;
		return;
	}

	switch(net->type) {
	case NET_T_STATE_CT_NEW:
		ct = msg2ct_alloc(net, remain);
		if (ct == NULL)
			return;
		STATE_SYNC(external)->ct.new(ct);
		break;
	case NET_T_STATE_CT_UPD:
		ct = msg2ct_alloc(net, remain);
		if (ct == NULL)
			return;
		STATE_SYNC(external)->ct.upd(ct);
		break;
	case NET_T_STATE_CT_DEL:
		ct = msg2ct_alloc(net, remain);
		if (ct == NULL)
			return;
		STATE_SYNC(external)->ct.del(ct);
		break;
	case NET_T_STATE_EXP_NEW:
		exp = msg2exp_alloc(net, remain);
		if (exp == NULL)
			return;
		STATE_SYNC(external)->exp.new(exp);
		break;
	case NET_T_STATE_EXP_UPD:
		exp = msg2exp_alloc(net, remain);
		if (exp == NULL)
			return;
		STATE_SYNC(external)->exp.upd(exp);
		break;
	case NET_T_STATE_EXP_DEL:
		exp = msg2exp_alloc(net, remain);
		if (exp == NULL)
			return;
		STATE_SYNC(external)->exp.del(exp);
		break;
	default:
		STATE_SYNC(error).msg_rcv_malformed++;
		STATE_SYNC(error).msg_rcv_bad_type++;
		break;
	}
	if (ct != NULL)
		nfct_destroy(ct);
	if (exp != NULL)
		nfexp_destroy(exp);
}

static char __net[65536];		/* XXX: maximum MTU for IPv4 */
static char *cur = __net;

static int channel_stream(struct channel *m, const char *ptr, ssize_t remain)
{
	if (m->channel_flags & CHANNEL_F_STREAM) {
		/* truncated data. */
		memcpy(__net, ptr, remain);
		cur = __net + remain;
		return 1;
	}
	return 0;
}

/* handler for messages received */
static int channel_handler_routine(struct channel *m)
{
	ssize_t numbytes;
	ssize_t remain, pending = cur - __net;
	char *ptr = __net;

	numbytes = channel_recv(m, cur, sizeof(__net) - pending);
	if (numbytes <= 0)
		return -1;

	remain = numbytes;
	if (pending) {
		remain += pending;
		cur = __net;
	}

	while (remain > 0) {
		struct nethdr *net = (struct nethdr *) ptr;
		int len;

		if (remain < NETHDR_SIZ) {
			if (!channel_stream(m, ptr, remain)) {
				STATE_SYNC(error).msg_rcv_malformed++;
				STATE_SYNC(error).msg_rcv_truncated++;
			}
			break;
		}

		len = ntohs(net->len);
		if (len <= 0) {
			STATE_SYNC(error).msg_rcv_malformed++;
			STATE_SYNC(error).msg_rcv_bad_size++;
			break;
		}

		if (len > remain) {
			if (!channel_stream(m, ptr, remain)) {
				STATE_SYNC(error).msg_rcv_malformed++;
				STATE_SYNC(error).msg_rcv_bad_size++;
			}
			break;
		}

		if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
			if (remain < NETHDR_ACK_SIZ) {
				if (!channel_stream(m, ptr, remain)) {
					STATE_SYNC(error).msg_rcv_malformed++;
					STATE_SYNC(error).msg_rcv_truncated++;
				}
				break;
			}

			if (len < NETHDR_ACK_SIZ) {
				STATE_SYNC(error).msg_rcv_malformed++;
				STATE_SYNC(error).msg_rcv_bad_size++;
				break;
			}
		} else {
			if (len < NETHDR_SIZ) {
				STATE_SYNC(error).msg_rcv_malformed++;
				STATE_SYNC(error).msg_rcv_bad_size++;
				break;
			}
		}

		HDR_NETWORK2HOST(net);

		do_channel_handler_step(m, net, remain);
		ptr += net->len;
		remain -= net->len;
	}
	return 0;
}

/* handler for messages received */
static void channel_handler(void *data)
{
	struct channel *c = data;
	int k;

	for (k=0; k<CONFIG(event_iterations_limit); k++) {
		if (channel_handler_routine(c) == -1) {
			break;
		}
	}
}

/* select a new interface candidate in a round robin basis */
static void interface_candidate(void)
{
	int i, idx;
	unsigned int flags;
	char buf[IFNAMSIZ];

	for (i=0; i<STATE_SYNC(channel)->channel_num; i++) {
		idx = multichannel_get_ifindex(STATE_SYNC(channel), i);
		if (idx == multichannel_get_current_ifindex(STATE_SYNC(channel)))
			continue;
		nlif_get_ifflags(STATE_SYNC(interface), idx, &flags);
		if (flags & (IFF_RUNNING | IFF_UP)) {
			multichannel_set_current_channel(STATE_SYNC(channel), i);
			dlog(LOG_NOTICE, "device `%s' becomes "
					 "dedicated link", 
					 if_indextoname(idx, buf));
			return;
		}
	}
	dlog(LOG_ERR, "no dedicated links available!");
}

static void interface_handler(void *data)
{
	int idx = multichannel_get_current_ifindex(STATE_SYNC(channel));
	unsigned int flags;

	nlif_catch(STATE_SYNC(interface));
	nlif_get_ifflags(STATE_SYNC(interface), idx, &flags);
	if (!(flags & IFF_RUNNING) || !(flags & IFF_UP))
		interface_candidate();
}

static void do_reset_cache_alarm(struct alarm_block *a, void *data)
{
	STATE(stats).nl_kernel_table_flush++;
	dlog(LOG_NOTICE, "flushing kernel conntrack table (scheduled)");

	/* fork a child process that performs the flush operation,
	 * meanwhile the parent process handles events. */
	if (fork_process_new(CTD_PROC_FLUSH, CTD_PROC_F_EXCL,
			     NULL, NULL) == 0) {
		nl_flush_conntrack_table_selective();
		exit(EXIT_SUCCESS);
	}
	/* this is not required if events don't get lost */
	STATE(mode)->internal->ct.flush();
}

static void commit_cb(void *data)
{
	int ret;

	read_evfd(STATE_SYNC(commit).evfd);

	ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0);
	if (ret == 0) {
		/* we still have things in the callback queue. */
		if (STATE_SYNC(commit).rq[1].cb) {
			int fd = STATE_SYNC(commit).clientfd;

			STATE_SYNC(commit).rq[0].cb =
				STATE_SYNC(commit).rq[1].cb;

			STATE_SYNC(commit).rq[1].cb = NULL;

			STATE_SYNC(commit).clientfd = -1;
			STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, fd);
		} else {
			/* Close the client socket now, we're done. */
			close(STATE_SYNC(commit).clientfd);
			STATE_SYNC(commit).clientfd = -1;
		}
	}
}

static void channel_accept_cb(void *data)
{
	struct channel *c = data;
	int fd;

	fd = channel_accept(data);
	if (fd < 0)
		return;

	register_fd(fd, channel_handler, c, STATE(fds));
}

static void tx_queue_cb(void *data)
{
	STATE_SYNC(sync)->xmit();

	/* flush pending messages */
	multichannel_send_flush(STATE_SYNC(channel));
}

static int init_sync(void)
{
	int i;

	state.sync = malloc(sizeof(struct ct_sync_state));
	if (!state.sync) {
		dlog(LOG_ERR, "can't allocate memory for sync");
		return -1;
	}
	memset(state.sync, 0, sizeof(struct ct_sync_state));

	if (CONFIG(flags) & CTD_SYNC_FTFW)
		STATE_SYNC(sync) = &sync_ftfw;
	else if (CONFIG(flags) & CTD_SYNC_ALARM)
		STATE_SYNC(sync) = &sync_alarm;
	else if (CONFIG(flags) & CTD_SYNC_NOTRACK)
		STATE_SYNC(sync) = &sync_notrack;
	else {
		fprintf(stderr, "WARNING: No synchronization mode specified. "
				"Defaulting to FT-FW mode.\n");
		CONFIG(flags) |= CTD_SYNC_FTFW;
		STATE_SYNC(sync) = &sync_ftfw;
	}

	if (STATE_SYNC(sync)->init)
		STATE_SYNC(sync)->init();

	if (CONFIG(sync).internal_cache_disable == 0) {
		STATE(mode)->internal = &internal_cache;
	} else {
		STATE(mode)->internal = &internal_bypass;
		dlog(LOG_NOTICE, "disabling internal cache");

	}
	if (STATE(mode)->internal->init() == -1)
		return -1;

	if (CONFIG(sync).external_cache_disable == 0) {
		STATE_SYNC(external) = &external_cache;
	} else {
		STATE_SYNC(external) = &external_inject;
		dlog(LOG_NOTICE, "disabling external cache");
	}
	if (STATE_SYNC(external)->init() == -1)
		return -1;

	if (channel_init() == -1)
		return -1;

	/* channel to send events on the wire */
	STATE_SYNC(channel) =
		multichannel_open(CONFIG(channel), CONFIG(channel_num));
	if (STATE_SYNC(channel) == NULL) {
		dlog(LOG_ERR, "can't open channel socket");
		return -1;
	}
	for (i=0; i<STATE_SYNC(channel)->channel_num; i++) {
		int fd = channel_get_fd(STATE_SYNC(channel)->channel[i]);
		fcntl(fd, F_SETFL, O_NONBLOCK);

		switch(channel_type(STATE_SYNC(channel)->channel[i])) {
		case CHANNEL_T_STREAM:
			register_fd(fd, channel_accept_cb,
					STATE_SYNC(channel)->channel[i],
					STATE(fds));
			break;
		case CHANNEL_T_DATAGRAM:
			register_fd(fd, channel_handler,
					STATE_SYNC(channel)->channel[i],
					STATE(fds));
			break;
		}
	}

	STATE_SYNC(interface) = nl_init_interface_handler();
	if (!STATE_SYNC(interface)) {
		dlog(LOG_ERR, "can't open interface watcher");
		return -1;
	}
	if (register_fd(nlif_fd(STATE_SYNC(interface)),
			interface_handler, NULL, STATE(fds)) == -1)
		return -1;

	STATE_SYNC(tx_queue) = queue_create("txqueue", INT_MAX, QUEUE_F_EVFD);
	if (STATE_SYNC(tx_queue) == NULL) {
		dlog(LOG_ERR, "cannot create tx queue");
		return -1;
	}
	if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)),
			tx_queue_cb, NULL, STATE(fds)) == -1)
		return -1;

	STATE_SYNC(commit).h = nfct_open(CONFIG(netlink).subsys_id, 0);
	if (STATE_SYNC(commit).h == NULL) {
		dlog(LOG_ERR, "can't create handler to commit");
		return -1;
	}
	origin_register(STATE_SYNC(commit).h, CTD_ORIGIN_COMMIT);

	STATE_SYNC(commit).evfd = create_evfd();
	if (STATE_SYNC(commit).evfd == NULL) {
		dlog(LOG_ERR, "can't create eventfd to commit");
		return -1;
	}
	if (register_fd(get_read_evfd(STATE_SYNC(commit).evfd),
				commit_cb, NULL, STATE(fds)) == -1) {
		return -1;
	}
	STATE_SYNC(commit).clientfd = -1;

	init_alarm(&STATE_SYNC(reset_cache_alarm), NULL, do_reset_cache_alarm);

	/* initialization of message sequence generation */
	STATE_SYNC(last_seq_sent) = time(NULL);

	return 0;
}

static void kill_sync(void)
{
	STATE(mode)->internal->close();
	STATE_SYNC(external)->close();

	multichannel_close(STATE_SYNC(channel));

	nlif_close(STATE_SYNC(interface));

	queue_destroy(STATE_SYNC(tx_queue));

	channel_end();

	origin_unregister(STATE_SYNC(commit).h);
	nfct_close(STATE_SYNC(commit).h);
	destroy_evfd(STATE_SYNC(commit).evfd);

	if (STATE_SYNC(sync)->kill)
		STATE_SYNC(sync)->kill();
}

static void dump_stats_sync(int fd)
{
	char buf[512];
	int size;

	size = sprintf(buf, "message tracking:\n"
			    "%20llu Malformed msgs "
			    "%20llu Lost msgs\n\n",
			(unsigned long long)STATE_SYNC(error).msg_rcv_malformed,
			(unsigned long long)STATE_SYNC(error).msg_rcv_lost);

	send(fd, buf, size, 0);
}

static void dump_stats_sync_extended(int fd)
{
	char buf[512];
	int size;

	size = snprintf(buf, sizeof(buf),
			"network statistics:\n"
			"\trecv:\n"
			"\t\tMalformed messages:\t%20llu\n"
			"\t\tWrong protocol version:\t%20u\n"
			"\t\tMalformed header:\t%20u\n"
			"\t\tMalformed payload:\t%20u\n"
			"\t\tBad message type:\t%20u\n"
			"\t\tTruncated message:\t%20u\n"
			"\t\tBad message size:\t%20u\n"
			"\tsend:\n"
			"\t\tMalformed messages:\t%20u\n\n"
			"sequence tracking statistics:\n"
			"\trecv:\n"
			"\t\tPackets lost:\t\t%20llu\n"
			"\t\tPackets before:\t\t%20llu\n\n",
			(unsigned long long)STATE_SYNC(error).msg_rcv_malformed,
			STATE_SYNC(error).msg_rcv_bad_version,
			STATE_SYNC(error).msg_rcv_bad_header,
			STATE_SYNC(error).msg_rcv_bad_payload,
			STATE_SYNC(error).msg_rcv_bad_type,
			STATE_SYNC(error).msg_rcv_truncated,
			STATE_SYNC(error).msg_rcv_bad_size,
			STATE_SYNC(error).msg_snd_malformed,
			(unsigned long long)STATE_SYNC(error).msg_rcv_lost,
			(unsigned long long)STATE_SYNC(error).msg_rcv_before);

	send(fd, buf, size, 0);
}

static int local_commit(int fd)
{
	int ret;

	/* delete the reset alarm if any before committing */
	del_alarm(&STATE_SYNC(reset_cache_alarm));

	ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, fd);
	if (ret == -1) {
		dlog(LOG_NOTICE, "commit already in progress, skipping");
		ret = LOCAL_RET_OK;
	} else if (ret == 0) {
		/* we've finished the commit. */
		ret = LOCAL_RET_OK;
	} else {
		/* Keep open the client, we want synchronous commit. */
		ret = LOCAL_RET_STOLEN;
	}
	return ret;
}

/* handler for requests coming via UNIX socket */
static int local_handler_sync(int fd, int type, void *data)
{
	int ret = LOCAL_RET_OK;

	switch(type) {
	case CT_DUMP_INTERNAL:
		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
			STATE(mode)->internal->ct.dump(fd, NFCT_O_PLAIN);
			exit(EXIT_SUCCESS);
		}
		break;
	case CT_DUMP_EXTERNAL:
		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
			STATE_SYNC(external)->ct.dump(fd, NFCT_O_PLAIN);
			exit(EXIT_SUCCESS);
		} 
		break;
	case CT_DUMP_INT_XML:
		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
			STATE(mode)->internal->ct.dump(fd, NFCT_O_XML);
			exit(EXIT_SUCCESS);
		}
		break;
	case CT_DUMP_EXT_XML:
		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
			STATE_SYNC(external)->ct.dump(fd, NFCT_O_XML);
			exit(EXIT_SUCCESS);
		}
		break;
	case CT_COMMIT:
		dlog(LOG_NOTICE, "committing conntrack cache");
		STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->ct.commit;
		STATE_SYNC(commit).rq[1].cb = NULL;
		ret = local_commit(fd);
		break;
	case RESET_TIMERS:
		if (!alarm_pending(&STATE_SYNC(reset_cache_alarm))) {
			dlog(LOG_NOTICE, "flushing conntrack table in %d secs",
					 CONFIG(purge_timeout));
			add_alarm(&STATE_SYNC(reset_cache_alarm),
				  CONFIG(purge_timeout), 0);
		}
		break;
	case CT_FLUSH_CACHE:
		/* if we're still committing, abort this command */
		if (STATE_SYNC(commit).clientfd != -1) {
			dlog(LOG_ERR, "ignoring flush command, "
				      "commit still in progress");
			break;
		}
		/* inmediate flush, remove pending flush scheduled if any */
		del_alarm(&STATE_SYNC(reset_cache_alarm));
		dlog(LOG_NOTICE, "flushing caches");
		STATE(mode)->internal->ct.flush();
		STATE_SYNC(external)->ct.flush();
		break;
	case CT_FLUSH_INT_CACHE:
		/* inmediate flush, remove pending flush scheduled if any */
		del_alarm(&STATE_SYNC(reset_cache_alarm));
		dlog(LOG_NOTICE, "flushing internal cache");
		STATE(mode)->internal->ct.flush();
		break;
	case CT_FLUSH_EXT_CACHE:
		/* if we're still committing, abort this command */
		if (STATE_SYNC(commit).clientfd != -1) {
			dlog(LOG_ERR, "ignoring flush command, "
				      "commit still in progress");
			break;
		}
		dlog(LOG_NOTICE, "flushing external cache");
		STATE_SYNC(external)->ct.flush();
		break;
	case STATS:
		STATE(mode)->internal->ct.stats(fd);
		STATE_SYNC(external)->ct.stats(fd);
		dump_traffic_stats(fd);
		multichannel_stats(STATE_SYNC(channel), fd);
		dump_stats_sync(fd);
		break;
	case STATS_NETWORK:
		dump_stats_sync_extended(fd);
		multichannel_stats(STATE_SYNC(channel), fd);
		break;
	case STATS_CACHE:
		STATE(mode)->internal->ct.stats_ext(fd);
		STATE_SYNC(external)->ct.stats_ext(fd);
		break;
	case STATS_LINK:
		multichannel_stats_extended(STATE_SYNC(channel),
					    STATE_SYNC(interface), fd);
		break;
	case STATS_QUEUE:
		queue_stats_show(fd);
		break;
	case EXP_STATS:
		if (!(CONFIG(flags) & CTD_EXPECT))
			break;

		STATE(mode)->internal->exp.stats(fd);
		STATE_SYNC(external)->exp.stats(fd);
		dump_traffic_stats(fd);
		multichannel_stats(STATE_SYNC(channel), fd);
		dump_stats_sync(fd);
		break;
	case EXP_DUMP_INTERNAL:
		if (!(CONFIG(flags) & CTD_EXPECT))
			break;

		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
			STATE(mode)->internal->exp.dump(fd, NFCT_O_PLAIN);
			exit(EXIT_SUCCESS);
		}
		break;
	case EXP_DUMP_EXTERNAL:
		if (!(CONFIG(flags) & CTD_EXPECT))
			break;

		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
			STATE_SYNC(external)->exp.dump(fd, NFCT_O_PLAIN);
			exit(EXIT_SUCCESS);
		}
		break;
	case EXP_COMMIT:
		if (!(CONFIG(flags) & CTD_EXPECT))
			break;

		dlog(LOG_NOTICE, "committing expectation cache");
		STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->exp.commit;
		STATE_SYNC(commit).rq[1].cb = NULL;
		ret = local_commit(fd);
		break;
	case ALL_FLUSH_CACHE:
		/* if we're still committing, abort this command */
		if (STATE_SYNC(commit).clientfd != -1) {
			dlog(LOG_ERR, "ignoring flush command, "
				      "commit still in progress");
			break;
		}
		dlog(LOG_NOTICE, "flushing caches");
		STATE(mode)->internal->ct.flush();
		STATE_SYNC(external)->ct.flush();
		if (CONFIG(flags) & CTD_EXPECT) {
			STATE(mode)->internal->exp.flush();
			STATE_SYNC(external)->exp.flush();
		}
		break;
	case ALL_COMMIT:
		dlog(LOG_NOTICE, "committing all external caches");
		STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->ct.commit;
		if (CONFIG(flags) & CTD_EXPECT) {
			STATE_SYNC(commit).rq[1].cb =
				STATE_SYNC(external)->exp.commit;
		} else {
			STATE_SYNC(commit).rq[1].cb = NULL;
		}
		ret = local_commit(fd);
		break;
	case EXP_DUMP_INT_XML:
		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
			STATE(mode)->internal->exp.dump(fd, NFCT_O_XML);
			exit(EXIT_SUCCESS);
		}
		break;
	case EXP_DUMP_EXT_XML:
		if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
			STATE_SYNC(external)->exp.dump(fd, NFCT_O_XML);
			exit(EXIT_SUCCESS);
		}
		break;
	default:
		if (STATE_SYNC(sync)->local)
			ret = STATE_SYNC(sync)->local(fd, type, data);
		break;
	}

	return ret;
}

struct ct_mode sync_mode = {
	.init 			= init_sync,
	.local			= local_handler_sync,
	.kill			= kill_sync,
	/* the internal handler is set in run-time. */
};