summaryrefslogtreecommitdiff
path: root/src/libcharon/plugins/ha/ha_segments.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libcharon/plugins/ha/ha_segments.c')
-rw-r--r--src/libcharon/plugins/ha/ha_segments.c90
1 files changed, 87 insertions, 3 deletions
diff --git a/src/libcharon/plugins/ha/ha_segments.c b/src/libcharon/plugins/ha/ha_segments.c
index 688e09bdc..cab38c63d 100644
--- a/src/libcharon/plugins/ha/ha_segments.c
+++ b/src/libcharon/plugins/ha/ha_segments.c
@@ -90,6 +90,11 @@ struct private_ha_segments_t {
* Timeout for heartbeats received from other node
*/
int heartbeat_timeout;
+
+ /**
+ * Interval to check for autobalance, 0 to disable
+ */
+ int autobalance;
};
/**
@@ -289,12 +294,13 @@ static void start_watchdog(private_ha_segments_t *this)
METHOD(ha_segments_t, handle_status, void,
private_ha_segments_t *this, segment_mask_t mask)
{
- segment_mask_t missing;
+ segment_mask_t missing, twice;
int i;
this->mutex->lock(this->mutex);
missing = ~(this->active | mask);
+ twice = this->active & mask;
for (i = 1; i <= this->count; i++)
{
@@ -311,6 +317,19 @@ METHOD(ha_segments_t, handle_status, void,
enable_disable(this, i, FALSE, TRUE);
}
}
+ if (twice & SEGMENTS_BIT(i))
+ {
+ if (this->node == i % 2)
+ {
+ DBG1(DBG_CFG, "HA segment %d was handled twice, taking", i);
+ enable_disable(this, i, TRUE, TRUE);
+ }
+ else
+ {
+ DBG1(DBG_CFG, "HA segment %d was handled twice, dropping", i);
+ enable_disable(this, i, FALSE, TRUE);
+ }
+ }
}
this->condvar->signal(this->condvar);
@@ -333,6 +352,7 @@ static job_requeue_t send_status(private_ha_segments_t *this)
message = ha_message_create(HA_STATUS);
+ this->mutex->lock(this->mutex);
for (i = 1; i <= this->count; i++)
{
if (this->active & SEGMENTS_BIT(i))
@@ -340,6 +360,7 @@ static job_requeue_t send_status(private_ha_segments_t *this)
message->add_attribute(message, HA_SEGMENT, i);
}
}
+ this->mutex->unlock(this->mutex);
this->socket->push(this->socket, message);
message->destroy(message);
@@ -348,6 +369,64 @@ static job_requeue_t send_status(private_ha_segments_t *this)
return JOB_RESCHEDULE_MS(this->heartbeat_delay);
}
+/**
+ * Start the heartbeat sending task
+ */
+static void start_heartbeat(private_ha_segments_t *this)
+{
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)send_status,
+ this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+}
+
+/**
+ * Take a segment if we are handling less than half of segments
+ */
+static job_requeue_t autobalance(private_ha_segments_t *this)
+{
+ int i, active = 0;
+
+ this->mutex->lock(this->mutex);
+
+ for (i = 1; i <= this->count; i++)
+ {
+ if (this->active & SEGMENTS_BIT(i))
+ {
+ active++;
+ }
+ }
+ if (active < this->count / 2)
+ {
+ for (i = 1; i <= this->count; i++)
+ {
+ if (!(this->active & SEGMENTS_BIT(i)))
+ {
+ DBG1(DBG_CFG, "autobalancing HA (%d/%d active), taking %d",
+ active, this->count, i);
+ enable_disable(this, i, TRUE, TRUE);
+ /* we claim only one in each interval */
+ break;
+ }
+ }
+ }
+
+ this->mutex->unlock(this->mutex);
+
+ return JOB_RESCHEDULE(this->autobalance);
+}
+
+/**
+ * Schedule autobalancing
+ */
+static void start_autobalance(private_ha_segments_t *this)
+{
+ DBG1(DBG_CFG, "scheduling HA autobalance every %ds", this->autobalance);
+ lib->scheduler->schedule_job(lib->scheduler,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)autobalance,
+ this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL),
+ this->autobalance);
+}
+
METHOD(ha_segments_t, is_active, bool,
private_ha_segments_t *this, u_int segment)
{
@@ -395,16 +474,21 @@ ha_segments_t *ha_segments_create(ha_socket_t *socket, ha_kernel_t *kernel,
.heartbeat_timeout = lib->settings->get_int(lib->settings,
"%s.plugins.ha.heartbeat_timeout", DEFAULT_HEARTBEAT_TIMEOUT,
charon->name),
+ .autobalance = lib->settings->get_int(lib->settings,
+ "%s.plugins.ha.autobalance", 0, charon->name),
);
if (monitor)
{
DBG1(DBG_CFG, "starting HA heartbeat, delay %dms, timeout %dms",
this->heartbeat_delay, this->heartbeat_timeout);
- send_status(this);
+ start_heartbeat(this);
start_watchdog(this);
}
+ if (this->autobalance)
+ {
+ start_autobalance(this);
+ }
return &this->public;
}
-