summaryrefslogtreecommitdiff
path: root/src/libcharon/plugins/ha/ha_segments.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libcharon/plugins/ha/ha_segments.c')
-rw-r--r--src/libcharon/plugins/ha/ha_segments.c503
1 files changed, 503 insertions, 0 deletions
diff --git a/src/libcharon/plugins/ha/ha_segments.c b/src/libcharon/plugins/ha/ha_segments.c
new file mode 100644
index 000000000..2199671fc
--- /dev/null
+++ b/src/libcharon/plugins/ha/ha_segments.c
@@ -0,0 +1,503 @@
+/*
+ * Copyright (C) 2008 Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include "ha_segments.h"
+
+#include <pthread.h>
+
+#include <threading/mutex.h>
+#include <threading/condvar.h>
+#include <utils/linked_list.h>
+#include <processing/jobs/callback_job.h>
+
+#define HEARTBEAT_DELAY 1000
+#define HEARTBEAT_TIMEOUT 2100
+
+typedef struct private_ha_segments_t private_ha_segments_t;
+
+/**
+ * Private data of an ha_segments_t object.
+ */
+struct private_ha_segments_t {
+
+ /**
+ * Public ha_segments_t interface.
+ */
+ ha_segments_t public;
+
+ /**
+ * communication socket
+ */
+ ha_socket_t *socket;
+
+ /**
+ * Sync tunnel, if any
+ */
+ ha_tunnel_t *tunnel;
+
+ /**
+ * Interface to control segments at kernel level
+ */
+ ha_kernel_t *kernel;
+
+ /**
+ * Mutex to lock segment manipulation
+ */
+ mutex_t *mutex;
+
+ /**
+ * Condvar to wait for heartbeats
+ */
+ condvar_t *condvar;
+
+ /**
+ * Job checking for heartbeats
+ */
+ callback_job_t *job;
+
+ /**
+ * Total number of ClusterIP segments
+ */
+ u_int count;
+
+ /**
+ * mask of active segments
+ */
+ segment_mask_t active;
+
+ /**
+ * Node number
+ */
+ u_int node;
+};
+
+/**
+ * Log currently active segments
+ */
+static void log_segments(private_ha_segments_t *this, bool activated,
+ u_int segment)
+{
+ char buf[64] = "none", *pos = buf;
+ int i;
+ bool first = TRUE;
+
+ for (i = 1; i <= this->count; i++)
+ {
+ if (this->active & SEGMENTS_BIT(i))
+ {
+ if (first)
+ {
+ first = FALSE;
+ }
+ else
+ {
+ pos += snprintf(pos, buf + sizeof(buf) - pos, ",");
+ }
+ pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i);
+ }
+ }
+ DBG1(DBG_CFG, "HA segment %d %sactivated, now active: %s",
+ segment, activated ? "" : "de", buf);
+}
+
+/**
+ * Enable/Disable a specific segment
+ */
+static void enable_disable(private_ha_segments_t *this, u_int segment,
+ bool enable, bool notify)
+{
+ ike_sa_t *ike_sa;
+ enumerator_t *enumerator;
+ ike_sa_state_t old, new;
+ ha_message_t *message = NULL;
+ ha_message_type_t type;
+ bool changes = FALSE;
+
+ if (segment > this->count)
+ {
+ return;
+ }
+
+ if (enable)
+ {
+ old = IKE_PASSIVE;
+ new = IKE_ESTABLISHED;
+ type = HA_SEGMENT_TAKE;
+ if (!(this->active & SEGMENTS_BIT(segment)))
+ {
+ this->active |= SEGMENTS_BIT(segment);
+ this->kernel->activate(this->kernel, segment);
+ changes = TRUE;
+ }
+ }
+ else
+ {
+ old = IKE_ESTABLISHED;
+ new = IKE_PASSIVE;
+ type = HA_SEGMENT_DROP;
+ if (this->active & SEGMENTS_BIT(segment))
+ {
+ this->active &= ~SEGMENTS_BIT(segment);
+ this->kernel->deactivate(this->kernel, segment);
+ changes = TRUE;
+ }
+ }
+
+ if (changes)
+ {
+ enumerator = charon->ike_sa_manager->create_enumerator(charon->ike_sa_manager);
+ while (enumerator->enumerate(enumerator, &ike_sa))
+ {
+ if (ike_sa->get_state(ike_sa) != old)
+ {
+ continue;
+ }
+ if (this->tunnel && this->tunnel->is_sa(this->tunnel, ike_sa))
+ {
+ continue;
+ }
+ if (this->kernel->in_segment(this->kernel,
+ ike_sa->get_other_host(ike_sa), segment))
+ {
+ ike_sa->set_state(ike_sa, new);
+ }
+ }
+ enumerator->destroy(enumerator);
+ log_segments(this, enable, segment);
+ }
+
+ if (notify)
+ {
+ message = ha_message_create(type);
+ message->add_attribute(message, HA_SEGMENT, segment);
+ this->socket->push(this->socket, message);
+ }
+}
+
+/**
+ * Enable/Disable all or a specific segment, do locking
+ */
+static void enable_disable_all(private_ha_segments_t *this, u_int segment,
+ bool enable, bool notify)
+{
+ int i;
+
+ this->mutex->lock(this->mutex);
+ if (segment == 0)
+ {
+ for (i = 1; i <= this->count; i++)
+ {
+ enable_disable(this, i, enable, notify);
+ }
+ }
+ else
+ {
+ enable_disable(this, segment, enable, notify);
+ }
+ this->mutex->unlock(this->mutex);
+}
+
+/**
+ * Implementation of ha_segments_t.activate
+ */
+static void activate(private_ha_segments_t *this, u_int segment, bool notify)
+{
+ enable_disable_all(this, segment, TRUE, notify);
+}
+
+/**
+ * Implementation of ha_segments_t.deactivate
+ */
+static void deactivate(private_ha_segments_t *this, u_int segment, bool notify)
+{
+ enable_disable_all(this, segment, FALSE, notify);
+}
+
+/**
+ * Rekey all children of an IKE_SA
+ */
+static status_t rekey_children(ike_sa_t *ike_sa)
+{
+ iterator_t *iterator;
+ child_sa_t *child_sa;
+ status_t status = SUCCESS;
+
+ iterator = ike_sa->create_child_sa_iterator(ike_sa);
+ while (iterator->iterate(iterator, (void**)&child_sa))
+ {
+ DBG1(DBG_CFG, "resyncing CHILD_SA");
+ status = ike_sa->rekey_child_sa(ike_sa, child_sa->get_protocol(child_sa),
+ child_sa->get_spi(child_sa, TRUE));
+ if (status == DESTROY_ME)
+ {
+ break;
+ }
+ }
+ iterator->destroy(iterator);
+ return status;
+}
+
+/**
+ * Implementation of ha_segments_t.resync
+ */
+static void resync(private_ha_segments_t *this, u_int segment)
+{
+ ike_sa_t *ike_sa;
+ enumerator_t *enumerator;
+ linked_list_t *list;
+ ike_sa_id_t *id;
+
+ list = linked_list_create();
+ this->mutex->lock(this->mutex);
+
+ if (segment > 0 && segment <= this->count)
+ {
+ DBG1(DBG_CFG, "resyncing HA segment %d", segment);
+
+ /* we do the actual rekeying in a seperate loop to avoid rekeying
+ * an SA twice. */
+ enumerator = charon->ike_sa_manager->create_enumerator(
+ charon->ike_sa_manager);
+ while (enumerator->enumerate(enumerator, &ike_sa))
+ {
+ if (ike_sa->get_state(ike_sa) == IKE_ESTABLISHED &&
+ this->kernel->in_segment(this->kernel,
+ ike_sa->get_other_host(ike_sa), segment))
+ {
+ id = ike_sa->get_id(ike_sa);
+ list->insert_last(list, id->clone(id));
+ }
+ }
+ enumerator->destroy(enumerator);
+ }
+ this->mutex->unlock(this->mutex);
+
+ while (list->remove_last(list, (void**)&id) == SUCCESS)
+ {
+ ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager, id);
+ id->destroy(id);
+ if (ike_sa)
+ {
+ DBG1(DBG_CFG, "resyncing IKE_SA");
+ if (ike_sa->rekey(ike_sa) != DESTROY_ME)
+ {
+ if (rekey_children(ike_sa) != DESTROY_ME)
+ {
+ charon->ike_sa_manager->checkin(
+ charon->ike_sa_manager, ike_sa);
+ continue;
+ }
+ }
+ charon->ike_sa_manager->checkin_and_destroy(
+ charon->ike_sa_manager, ike_sa);
+ }
+ }
+ list->destroy(list);
+}
+
+/**
+ * Implementation of listener_t.alert
+ */
+static bool alert_hook(private_ha_segments_t *this, ike_sa_t *ike_sa,
+ alert_t alert, va_list args)
+{
+ if (alert == ALERT_SHUTDOWN_SIGNAL)
+ {
+ deactivate(this, 0, TRUE);
+ }
+ return TRUE;
+}
+
+/**
+ * Request a resync of all segments
+ */
+static job_requeue_t request_resync(private_ha_segments_t *this)
+{
+ ha_message_t *message;
+ int i;
+
+ message = ha_message_create(HA_RESYNC);
+ for (i = 1; i <= this->count; i++)
+ {
+ message->add_attribute(message, HA_SEGMENT, i);
+ }
+ this->socket->push(this->socket, message);
+ return JOB_REQUEUE_NONE;
+}
+
+/**
+ * Monitor heartbeat activity of remote node
+ */
+static job_requeue_t watchdog(private_ha_segments_t *this)
+{
+ int oldstate;
+ bool timeout;
+
+ this->mutex->lock(this->mutex);
+ pthread_cleanup_push((void*)this->mutex->unlock, this->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+ timeout = this->condvar->timed_wait(this->condvar, this->mutex,
+ HEARTBEAT_TIMEOUT);
+ pthread_setcancelstate(oldstate, NULL);
+ pthread_cleanup_pop(TRUE);
+ if (timeout)
+ {
+ DBG1(DBG_CFG, "no heartbeat received, taking all segments");
+ activate(this, 0, TRUE);
+ /* disable heartbeat detection util we get one */
+ this->job = NULL;
+ return JOB_REQUEUE_NONE;
+ }
+ return JOB_REQUEUE_DIRECT;
+}
+
+/**
+ * Start the heartbeat detection thread
+ */
+static void start_watchdog(private_ha_segments_t *this)
+{
+ this->job = callback_job_create((callback_job_cb_t)watchdog,
+ this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
+}
+
+/**
+ * Implementation of ha_segments_t.handle_status
+ */
+static void handle_status(private_ha_segments_t *this, segment_mask_t mask)
+{
+ segment_mask_t missing;
+ int i;
+
+ this->mutex->lock(this->mutex);
+
+ missing = ~(this->active | mask);
+
+ for (i = 1; i <= this->count; i++)
+ {
+ if (missing & SEGMENTS_BIT(i))
+ {
+ if (this->node == i % 2)
+ {
+ DBG1(DBG_CFG, "HA segment %d was not handled, taking", i);
+ enable_disable(this, i, TRUE, TRUE);
+ }
+ else
+ {
+ DBG1(DBG_CFG, "HA segment %d was not handled, dropping", i);
+ enable_disable(this, i, FALSE, TRUE);
+ }
+ }
+ }
+
+ this->mutex->unlock(this->mutex);
+ this->condvar->signal(this->condvar);
+
+ if (!this->job)
+ {
+ DBG1(DBG_CFG, "received heartbeat, reenabling watchdog");
+ start_watchdog(this);
+ }
+}
+
+/**
+ * Send a status message with our active segments
+ */
+static job_requeue_t send_status(private_ha_segments_t *this)
+{
+ ha_message_t *message;
+ int i;
+
+ message = ha_message_create(HA_STATUS);
+
+ for (i = 1; i <= this->count; i++)
+ {
+ if (this->active & SEGMENTS_BIT(i))
+ {
+ message->add_attribute(message, HA_SEGMENT, i);
+ }
+ }
+
+ this->socket->push(this->socket, message);
+
+ /* schedule next invocation */
+ charon->scheduler->schedule_job_ms(charon->scheduler, (job_t*)
+ callback_job_create((callback_job_cb_t)
+ send_status, this, NULL, NULL),
+ HEARTBEAT_DELAY);
+
+ return JOB_REQUEUE_NONE;
+}
+
+/**
+ * Implementation of ha_segments_t.destroy.
+ */
+static void destroy(private_ha_segments_t *this)
+{
+ if (this->job)
+ {
+ this->job->cancel(this->job);
+ }
+ this->mutex->destroy(this->mutex);
+ this->condvar->destroy(this->condvar);
+ free(this);
+}
+
+/**
+ * See header
+ */
+ha_segments_t *ha_segments_create(ha_socket_t *socket, ha_kernel_t *kernel,
+ ha_tunnel_t *tunnel, u_int count, u_int node,
+ bool monitor, bool sync)
+{
+ private_ha_segments_t *this = malloc_thing(private_ha_segments_t);
+
+ memset(&this->public.listener, 0, sizeof(listener_t));
+ this->public.listener.alert = (bool(*)(listener_t*, ike_sa_t *, alert_t, va_list))alert_hook;
+ this->public.activate = (void(*)(ha_segments_t*, u_int segment,bool))activate;
+ this->public.deactivate = (void(*)(ha_segments_t*, u_int segment,bool))deactivate;
+ this->public.resync = (void(*)(ha_segments_t*, u_int segment))resync;
+ this->public.handle_status = (void(*)(ha_segments_t*, segment_mask_t mask))handle_status;
+ this->public.destroy = (void(*)(ha_segments_t*))destroy;
+
+ this->socket = socket;
+ this->tunnel = tunnel;
+ this->kernel = kernel;
+ this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
+ this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
+ this->count = count;
+ this->node = node;
+ this->job = NULL;
+
+ /* initially all segments are deactivated */
+ this->active = 0;
+
+ if (monitor)
+ {
+ send_status(this);
+ start_watchdog(this);
+ }
+
+ if (sync)
+ {
+ /* request a resync as soon as we are up */
+ charon->processor->queue_job(charon->processor, (job_t*)
+ callback_job_create((callback_job_cb_t)request_resync,
+ this, NULL, NULL));
+ }
+
+ return &this->public;
+}
+