diff options
Diffstat (limited to 'src/libcharon/plugins/ha/ha_segments.c')
-rw-r--r-- | src/libcharon/plugins/ha/ha_segments.c | 132 |
1 files changed, 105 insertions, 27 deletions
diff --git a/src/libcharon/plugins/ha/ha_segments.c b/src/libcharon/plugins/ha/ha_segments.c index c5a180683..cab38c63d 100644 --- a/src/libcharon/plugins/ha/ha_segments.c +++ b/src/libcharon/plugins/ha/ha_segments.c @@ -17,7 +17,7 @@ #include <threading/mutex.h> #include <threading/condvar.h> -#include <utils/linked_list.h> +#include <collections/linked_list.h> #include <threading/thread.h> #include <processing/jobs/callback_job.h> @@ -62,11 +62,6 @@ struct private_ha_segments_t { condvar_t *condvar; /** - * Job checking for heartbeats - */ - callback_job_t *job; - - /** * Total number of ClusterIP segments */ u_int count; @@ -82,6 +77,11 @@ struct private_ha_segments_t { u_int node; /** + * Are we checking for heartbeats? + */ + bool heartbeat_active; + + /** * Interval we send hearbeats */ int heartbeat_delay; @@ -90,6 +90,11 @@ struct private_ha_segments_t { * Timeout for heartbeats received from other node */ int heartbeat_timeout; + + /** + * Interval to check for autobalance, 0 to disable + */ + int autobalance; }; /** @@ -237,7 +242,7 @@ METHOD(listener_t, alert_hook, bool, { if (alert == ALERT_SHUTDOWN_SIGNAL) { - if (this->job) + if (this->heartbeat_active) { DBG1(DBG_CFG, "HA heartbeat active, dropping all segments"); deactivate(this, 0, TRUE); @@ -269,7 +274,7 @@ static job_requeue_t watchdog(private_ha_segments_t *this) DBG1(DBG_CFG, "no heartbeat received, taking all segments"); activate(this, 0, TRUE); /* disable heartbeat detection util we get one */ - this->job = NULL; + this->heartbeat_active = FALSE; return JOB_REQUEUE_NONE; } return JOB_REQUEUE_DIRECT; @@ -280,20 +285,22 @@ static job_requeue_t watchdog(private_ha_segments_t *this) */ static void start_watchdog(private_ha_segments_t *this) { - this->job = callback_job_create_with_prio((callback_job_cb_t)watchdog, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + this->heartbeat_active = TRUE; + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)watchdog, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); } METHOD(ha_segments_t, handle_status, void, private_ha_segments_t *this, segment_mask_t mask) { - segment_mask_t missing; + segment_mask_t missing, twice; int i; this->mutex->lock(this->mutex); missing = ~(this->active | mask); + twice = this->active & mask; for (i = 1; i <= this->count; i++) { @@ -310,12 +317,25 @@ METHOD(ha_segments_t, handle_status, void, enable_disable(this, i, FALSE, TRUE); } } + if (twice & SEGMENTS_BIT(i)) + { + if (this->node == i % 2) + { + DBG1(DBG_CFG, "HA segment %d was handled twice, taking", i); + enable_disable(this, i, TRUE, TRUE); + } + else + { + DBG1(DBG_CFG, "HA segment %d was handled twice, dropping", i); + enable_disable(this, i, FALSE, TRUE); + } + } } - this->mutex->unlock(this->mutex); this->condvar->signal(this->condvar); + this->mutex->unlock(this->mutex); - if (!this->job) + if (!this->heartbeat_active) { DBG1(DBG_CFG, "received heartbeat, reenabling watchdog"); start_watchdog(this); @@ -332,6 +352,7 @@ static job_requeue_t send_status(private_ha_segments_t *this) message = ha_message_create(HA_STATUS); + this->mutex->lock(this->mutex); for (i = 1; i <= this->count; i++) { if (this->active & SEGMENTS_BIT(i)) @@ -339,17 +360,71 @@ static job_requeue_t send_status(private_ha_segments_t *this) message->add_attribute(message, HA_SEGMENT, i); } } + this->mutex->unlock(this->mutex); this->socket->push(this->socket, message); message->destroy(message); /* schedule next invocation */ - lib->scheduler->schedule_job_ms(lib->scheduler, (job_t*) - callback_job_create((callback_job_cb_t) - send_status, this, NULL, NULL), - this->heartbeat_delay); + return JOB_RESCHEDULE_MS(this->heartbeat_delay); +} + +/** + * Start the heartbeat sending task + */ +static void start_heartbeat(private_ha_segments_t *this) +{ + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)send_status, + this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); +} + +/** + * Take a segment if we are handling less than half of segments + */ +static job_requeue_t autobalance(private_ha_segments_t *this) +{ + int i, active = 0; + + this->mutex->lock(this->mutex); - return JOB_REQUEUE_NONE; + for (i = 1; i <= this->count; i++) + { + if (this->active & SEGMENTS_BIT(i)) + { + active++; + } + } + if (active < this->count / 2) + { + for (i = 1; i <= this->count; i++) + { + if (!(this->active & SEGMENTS_BIT(i))) + { + DBG1(DBG_CFG, "autobalancing HA (%d/%d active), taking %d", + active, this->count, i); + enable_disable(this, i, TRUE, TRUE); + /* we claim only one in each interval */ + break; + } + } + } + + this->mutex->unlock(this->mutex); + + return JOB_RESCHEDULE(this->autobalance); +} + +/** + * Schedule autobalancing + */ +static void start_autobalance(private_ha_segments_t *this) +{ + DBG1(DBG_CFG, "scheduling HA autobalance every %ds", this->autobalance); + lib->scheduler->schedule_job(lib->scheduler, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)autobalance, + this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL), + this->autobalance); } METHOD(ha_segments_t, is_active, bool, @@ -361,10 +436,6 @@ METHOD(ha_segments_t, is_active, bool, METHOD(ha_segments_t, destroy, void, private_ha_segments_t *this) { - if (this->job) - { - this->job->cancel(this->job); - } this->mutex->destroy(this->mutex); this->condvar->destroy(this->condvar); free(this); @@ -398,19 +469,26 @@ ha_segments_t *ha_segments_create(ha_socket_t *socket, ha_kernel_t *kernel, .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), .heartbeat_delay = lib->settings->get_int(lib->settings, - "charon.plugins.ha.heartbeat_delay", DEFAULT_HEARTBEAT_DELAY), + "%s.plugins.ha.heartbeat_delay", DEFAULT_HEARTBEAT_DELAY, + charon->name), .heartbeat_timeout = lib->settings->get_int(lib->settings, - "charon.plugins.ha.heartbeat_timeout", DEFAULT_HEARTBEAT_TIMEOUT), + "%s.plugins.ha.heartbeat_timeout", DEFAULT_HEARTBEAT_TIMEOUT, + charon->name), + .autobalance = lib->settings->get_int(lib->settings, + "%s.plugins.ha.autobalance", 0, charon->name), ); if (monitor) { DBG1(DBG_CFG, "starting HA heartbeat, delay %dms, timeout %dms", this->heartbeat_delay, this->heartbeat_timeout); - send_status(this); + start_heartbeat(this); start_watchdog(this); } + if (this->autobalance) + { + start_autobalance(this); + } return &this->public; } - |