summaryrefslogtreecommitdiff
path: root/src/charon/processing
diff options
context:
space:
mode:
Diffstat (limited to 'src/charon/processing')
-rw-r--r--src/charon/processing/jobs/acquire_job.c12
-rw-r--r--src/charon/processing/jobs/acquire_job.h4
-rw-r--r--src/charon/processing/jobs/callback_job.c178
-rw-r--r--src/charon/processing/jobs/callback_job.h13
-rw-r--r--src/charon/processing/jobs/delete_child_sa_job.c20
-rw-r--r--src/charon/processing/jobs/delete_child_sa_job.h6
-rw-r--r--src/charon/processing/jobs/delete_ike_sa_job.c14
-rw-r--r--src/charon/processing/jobs/delete_ike_sa_job.h8
-rw-r--r--src/charon/processing/jobs/inactivity_job.c150
-rw-r--r--src/charon/processing/jobs/inactivity_job.h53
-rw-r--r--src/charon/processing/jobs/initiate_mediation_job.c78
-rw-r--r--src/charon/processing/jobs/initiate_mediation_job.h11
-rw-r--r--src/charon/processing/jobs/job.h2
-rw-r--r--src/charon/processing/jobs/mediation_job.c34
-rw-r--r--src/charon/processing/jobs/mediation_job.h10
-rw-r--r--src/charon/processing/jobs/migrate_job.c12
-rw-r--r--src/charon/processing/jobs/migrate_job.h18
-rw-r--r--src/charon/processing/jobs/process_message_job.c12
-rw-r--r--src/charon/processing/jobs/process_message_job.h2
-rw-r--r--src/charon/processing/jobs/rekey_child_sa_job.c20
-rw-r--r--src/charon/processing/jobs/rekey_child_sa_job.h2
-rw-r--r--src/charon/processing/jobs/rekey_ike_sa_job.c16
-rw-r--r--src/charon/processing/jobs/rekey_ike_sa_job.h2
-rw-r--r--src/charon/processing/jobs/retransmit_job.c10
-rw-r--r--src/charon/processing/jobs/retransmit_job.h2
-rw-r--r--src/charon/processing/jobs/roam_job.c16
-rw-r--r--src/charon/processing/jobs/roam_job.h4
-rw-r--r--src/charon/processing/jobs/send_dpd_job.c12
-rw-r--r--src/charon/processing/jobs/send_dpd_job.h4
-rw-r--r--src/charon/processing/jobs/send_keepalive_job.c10
-rw-r--r--src/charon/processing/jobs/send_keepalive_job.h4
-rw-r--r--src/charon/processing/jobs/update_sa_job.c14
-rw-r--r--src/charon/processing/jobs/update_sa_job.h2
-rw-r--r--src/charon/processing/processor.c87
-rw-r--r--src/charon/processing/processor.h16
-rw-r--r--src/charon/processing/scheduler.c86
-rw-r--r--src/charon/processing/scheduler.h15
37 files changed, 627 insertions, 332 deletions
diff --git a/src/charon/processing/jobs/acquire_job.c b/src/charon/processing/jobs/acquire_job.c
index 90b221b84..45ace9312 100644
--- a/src/charon/processing/jobs/acquire_job.c
+++ b/src/charon/processing/jobs/acquire_job.c
@@ -28,17 +28,17 @@ struct private_acquire_job_t {
* Public acquire_job_t interface.
*/
acquire_job_t public;
-
+
/**
* reqid of the child to rekey
*/
u_int32_t reqid;
-
+
/**
* acquired source traffic selector
*/
traffic_selector_t *src_ts;
-
+
/**
* acquired destination traffic selector
*/
@@ -73,14 +73,14 @@ acquire_job_t *acquire_job_create(u_int32_t reqid,
traffic_selector_t *dst_ts)
{
private_acquire_job_t *this = malloc_thing(private_acquire_job_t);
-
+
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
-
+
this->reqid = reqid;
this->src_ts = src_ts;
this->dst_ts = dst_ts;
-
+
return &this->public;
}
diff --git a/src/charon/processing/jobs/acquire_job.h b/src/charon/processing/jobs/acquire_job.h
index a78e5274d..eff79a9b0 100644
--- a/src/charon/processing/jobs/acquire_job.h
+++ b/src/charon/processing/jobs/acquire_job.h
@@ -24,12 +24,12 @@
typedef struct acquire_job_t acquire_job_t;
#include <library.h>
-#include <config/traffic_selector.h>
+#include <selectors/traffic_selector.h>
#include <processing/jobs/job.h>
/**
* Class representing an ACQUIRE Job.
- *
+ *
* This job initiates a CHILD SA on kernel request.
*/
struct acquire_job_t {
diff --git a/src/charon/processing/jobs/callback_job.c b/src/charon/processing/jobs/callback_job.c
index f4beb5abd..7e35dcdcb 100644
--- a/src/charon/processing/jobs/callback_job.c
+++ b/src/charon/processing/jobs/callback_job.c
@@ -1,4 +1,5 @@
/*
+ * Copyright (C) 2009 Tobias Brunner
* Copyright (C) 2007 Martin Willi
* Hochschule fuer Technik Rapperswil
*
@@ -12,13 +13,15 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
-
+
#include "callback_job.h"
-#include <pthread.h>
+#include <semaphore.h>
#include <daemon.h>
-#include <utils/mutex.h>
+#include <threading/thread.h>
+#include <threading/condvar.h>
+#include <threading/mutex.h>
typedef struct private_callback_job_t private_callback_job_t;
@@ -30,7 +33,7 @@ struct private_callback_job_t {
* Public callback_job_t interface.
*/
callback_job_t public;
-
+
/**
* Callback to call on execution
*/
@@ -40,70 +43,94 @@ struct private_callback_job_t {
* parameter to supply to callback
*/
void *data;
-
+
/**
* cleanup function for data
*/
callback_job_cleanup_t cleanup;
-
+
/**
- * thread ID of the job, if running
+ * thread of the job, if running
*/
- pthread_t thread;
-
+ thread_t *thread;
+
/**
* mutex to access jobs interna
*/
mutex_t *mutex;
-
+
/**
* list of asociated child jobs
*/
linked_list_t *children;
-
+
/**
* parent of this job, or NULL
*/
private_callback_job_t *parent;
-};
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_callback_job_t *this)
-{
- if (this->cleanup)
- {
- this->cleanup(this->data);
- }
- this->children->destroy(this->children);
- this->mutex->destroy(this->mutex);
- free(this);
-}
+ /**
+ * TRUE if the job got cancelled
+ */
+ bool cancelled;
+
+ /**
+ * condvar to synchronize the cancellation/destruction of the job
+ */
+ condvar_t *destroyable;
+
+ /**
+ * semaphore to synchronize the termination of the assigned thread.
+ *
+ * separately allocated during cancellation, so that we can wait on it
+ * without risking that it gets freed too early during destruction.
+ */
+ sem_t *terminated;
+};
/**
* unregister a child from its parent, if any.
+ * note: this->mutex has to be locked
*/
static void unregister(private_callback_job_t *this)
{
if (this->parent)
{
- iterator_t *iterator;
- private_callback_job_t *child;
-
this->parent->mutex->lock(this->parent->mutex);
- iterator = this->parent->children->create_iterator(this->parent->children, TRUE);
- while (iterator->iterate(iterator, (void**)&child))
+ if (this->parent->cancelled && !this->cancelled)
{
- if (child == this)
- {
- iterator->remove(iterator);
- break;
- }
+ /* if the parent has been cancelled but we have not yet, we do not
+ * unregister until we got cancelled by the parent. */
+ this->parent->mutex->unlock(this->parent->mutex);
+ this->destroyable->wait(this->destroyable, this->mutex);
+ this->parent->mutex->lock(this->parent->mutex);
}
- iterator->destroy(iterator);
+ this->parent->children->remove(this->parent->children, this, NULL);
this->parent->mutex->unlock(this->parent->mutex);
+ this->parent = NULL;
+ }
+}
+
+/**
+ * Implements job_t.destroy.
+ */
+static void destroy(private_callback_job_t *this)
+{
+ this->mutex->lock(this->mutex);
+ unregister(this);
+ if (this->cleanup)
+ {
+ this->cleanup(this->data);
}
+ if (this->terminated)
+ {
+ sem_post(this->terminated);
+ }
+ this->children->destroy(this->children);
+ this->destroyable->destroy(this->destroyable);
+ this->mutex->unlock(this->mutex);
+ this->mutex->destroy(this->mutex);
+ free(this);
}
/**
@@ -111,20 +138,42 @@ static void unregister(private_callback_job_t *this)
*/
static void cancel(private_callback_job_t *this)
{
- pthread_t thread;
-
+ callback_job_t *child;
+ sem_t *terminated = NULL;
+
this->mutex->lock(this->mutex);
- thread = this->thread;
-
- /* terminate its children */
- this->children->invoke_offset(this->children, offsetof(callback_job_t, cancel));
+ this->cancelled = TRUE;
+ /* terminate children */
+ while (this->children->get_first(this->children, (void**)&child) == SUCCESS)
+ {
+ this->mutex->unlock(this->mutex);
+ child->cancel(child);
+ this->mutex->lock(this->mutex);
+ }
+ if (this->thread)
+ {
+ /* terminate the thread, if there is currently one executing the job.
+ * we wait for its termination using a semaphore */
+ this->thread->cancel(this->thread);
+ terminated = this->terminated = malloc_thing(sem_t);
+ sem_init(terminated, 0, 0);
+ }
+ else
+ {
+ /* if the job is currently queued, it gets terminated later.
+ * we can't wait, because it might not get executed at all.
+ * we also unregister the queued job manually from its parent (the
+ * others get unregistered during destruction) */
+ unregister(this);
+ }
+ this->destroyable->signal(this->destroyable);
this->mutex->unlock(this->mutex);
-
- /* terminate thread */
- if (thread)
+
+ if (terminated)
{
- pthread_cancel(thread);
- pthread_join(thread, NULL);
+ sem_wait(terminated);
+ sem_destroy(terminated);
+ free(terminated);
}
}
@@ -135,20 +184,28 @@ static void execute(private_callback_job_t *this)
{
bool cleanup = FALSE;
+ thread_cleanup_push((thread_cleanup_t)destroy, this);
+
this->mutex->lock(this->mutex);
- this->thread = pthread_self();
+ this->thread = thread_current();
this->mutex->unlock(this->mutex);
-
- pthread_cleanup_push((void*)destroy, this);
+
while (TRUE)
{
+ this->mutex->lock(this->mutex);
+ if (this->cancelled)
+ {
+ this->mutex->unlock(this->mutex);
+ cleanup = TRUE;
+ break;
+ }
+ this->mutex->unlock(this->mutex);
switch (this->callback(this->data))
{
case JOB_REQUEUE_DIRECT:
continue;
case JOB_REQUEUE_FAIR:
{
- this->thread = 0;
charon->processor->queue_job(charon->processor,
&this->public.job_interface);
break;
@@ -156,15 +213,19 @@ static void execute(private_callback_job_t *this)
case JOB_REQUEUE_NONE:
default:
{
- this->thread = 0;
cleanup = TRUE;
break;
}
}
break;
}
- unregister(this);
- pthread_cleanup_pop(cleanup);
+ this->mutex->lock(this->mutex);
+ this->thread = NULL;
+ this->mutex->unlock(this->mutex);
+ /* manually create a cancellation point to avoid that a cancelled thread
+ * goes back into the thread pool */
+ thread_cancellation_point();
+ thread_cleanup_pop(cleanup);
}
/*
@@ -175,7 +236,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
callback_job_t *parent)
{
private_callback_job_t *this = malloc_thing(private_callback_job_t);
-
+
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
@@ -189,7 +250,10 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
this->thread = 0;
this->children = linked_list_create();
this->parent = (private_callback_job_t*)parent;
-
+ this->cancelled = FALSE;
+ this->destroyable = condvar_create(CONDVAR_TYPE_DEFAULT);
+ this->terminated = NULL;
+
/* register us at parent */
if (parent)
{
@@ -197,7 +261,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
this->parent->children->insert_last(this->parent->children, this);
this->parent->mutex->unlock(this->parent->mutex);
}
-
+
return &this->public;
}
diff --git a/src/charon/processing/jobs/callback_job.h b/src/charon/processing/jobs/callback_job.h
index 2bb209cb7..62da1edd1 100644
--- a/src/charon/processing/jobs/callback_job.h
+++ b/src/charon/processing/jobs/callback_job.h
@@ -41,12 +41,12 @@ enum job_requeue_t {
* Do not requeue job, destroy it
*/
JOB_REQUEUE_NONE,
-
+
/**
* Reque the job fairly, meaning it has to requeue as any other job
*/
JOB_REQUEUE_FAIR,
-
+
/**
* Reexecute the job directly, without the need of requeueing it
*/
@@ -88,10 +88,11 @@ struct callback_job_t {
* The job_t interface.
*/
job_t job_interface;
-
+
/**
- * Cancel the jobs thread and wait for its termination.
- */
+ * Cancel the job's thread and wait for its termination. This only works
+ * reliably for jobs that always use JOB_REQUEUE_FAIR or JOB_REQUEUE_DIRECT,
+ * otherwise the job may already be destroyed when cancel is called. */
void (*cancel)(callback_job_t *this);
};
@@ -103,7 +104,7 @@ struct callback_job_t {
* If parent is not NULL, the specified job gets an association. Whenever
* the parent gets cancelled (or runs out), all of its children are cancelled,
* too.
- *
+ *
* @param cb callback to call from the processor
* @param data user data to supply to callback
* @param cleanup destructor for data on destruction, or NULL
diff --git a/src/charon/processing/jobs/delete_child_sa_job.c b/src/charon/processing/jobs/delete_child_sa_job.c
index 206f07617..ca55721f2 100644
--- a/src/charon/processing/jobs/delete_child_sa_job.c
+++ b/src/charon/processing/jobs/delete_child_sa_job.c
@@ -29,17 +29,17 @@ struct private_delete_child_sa_job_t {
* Public delete_child_sa_job_t interface.
*/
delete_child_sa_job_t public;
-
+
/**
* reqid of the CHILD_SA
*/
u_int32_t reqid;
-
+
/**
* protocol of the CHILD_SA (ESP/AH)
*/
protocol_id_t protocol;
-
+
/**
* inbound SPI of the CHILD_SA
*/
@@ -60,7 +60,7 @@ static void destroy(private_delete_child_sa_job_t *this)
static void execute(private_delete_child_sa_job_t *this)
{
ike_sa_t *ike_sa;
-
+
ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
this->reqid, TRUE);
if (ike_sa == NULL)
@@ -71,7 +71,7 @@ static void execute(private_delete_child_sa_job_t *this)
else
{
ike_sa->delete_child_sa(ike_sa, this->protocol, this->spi);
-
+
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
destroy(this);
@@ -80,21 +80,21 @@ static void execute(private_delete_child_sa_job_t *this)
/*
* Described in header
*/
-delete_child_sa_job_t *delete_child_sa_job_create(u_int32_t reqid,
- protocol_id_t protocol,
+delete_child_sa_job_t *delete_child_sa_job_create(u_int32_t reqid,
+ protocol_id_t protocol,
u_int32_t spi)
{
private_delete_child_sa_job_t *this = malloc_thing(private_delete_child_sa_job_t);
-
+
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
-
+
/* private variables */
this->reqid = reqid;
this->protocol = protocol;
this->spi = spi;
-
+
return &this->public;
}
diff --git a/src/charon/processing/jobs/delete_child_sa_job.h b/src/charon/processing/jobs/delete_child_sa_job.h
index 9bf6ee423..662a7b7c7 100644
--- a/src/charon/processing/jobs/delete_child_sa_job.h
+++ b/src/charon/processing/jobs/delete_child_sa_job.h
@@ -31,7 +31,7 @@ typedef struct delete_child_sa_job_t delete_child_sa_job_t;
/**
* Class representing an DELETE_CHILD_SA Job.
- *
+ *
* This job initiates the delete of a CHILD SA.
*/
struct delete_child_sa_job_t {
@@ -52,8 +52,8 @@ struct delete_child_sa_job_t {
* @param spi security parameter index of the CHILD_SA
* @return delete_child_sa_job_t object
*/
-delete_child_sa_job_t *delete_child_sa_job_create(u_int32_t reqid,
- protocol_id_t protocol,
+delete_child_sa_job_t *delete_child_sa_job_create(u_int32_t reqid,
+ protocol_id_t protocol,
u_int32_t spi);
#endif /** DELETE_CHILD_SA_JOB_H_ @}*/
diff --git a/src/charon/processing/jobs/delete_ike_sa_job.c b/src/charon/processing/jobs/delete_ike_sa_job.c
index 6d4639fad..dffd08ba3 100644
--- a/src/charon/processing/jobs/delete_ike_sa_job.c
+++ b/src/charon/processing/jobs/delete_ike_sa_job.c
@@ -28,12 +28,12 @@ struct private_delete_ike_sa_job_t {
* public delete_ike_sa_job_t interface
*/
delete_ike_sa_job_t public;
-
+
/**
* ID of the ike_sa to delete
*/
ike_sa_id_t *ike_sa_id;
-
+
/**
* Should the IKE_SA be deleted if it is in ESTABLISHED state?
*/
@@ -56,7 +56,7 @@ static void destroy(private_delete_ike_sa_job_t *this)
static void execute(private_delete_ike_sa_job_t *this)
{
ike_sa_t *ike_sa;
-
+
ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
this->ike_sa_id);
if (ike_sa)
@@ -99,18 +99,18 @@ static void execute(private_delete_ike_sa_job_t *this)
/*
* Described in header
*/
-delete_ike_sa_job_t *delete_ike_sa_job_create(ike_sa_id_t *ike_sa_id,
+delete_ike_sa_job_t *delete_ike_sa_job_create(ike_sa_id_t *ike_sa_id,
bool delete_if_established)
{
private_delete_ike_sa_job_t *this = malloc_thing(private_delete_ike_sa_job_t);
-
+
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*)(job_t *)) destroy;;
-
+
/* private variables */
this->ike_sa_id = ike_sa_id->clone(ike_sa_id);
this->delete_if_established = delete_if_established;
-
+
return &(this->public);
}
diff --git a/src/charon/processing/jobs/delete_ike_sa_job.h b/src/charon/processing/jobs/delete_ike_sa_job.h
index 8209977f9..f641deea3 100644
--- a/src/charon/processing/jobs/delete_ike_sa_job.h
+++ b/src/charon/processing/jobs/delete_ike_sa_job.h
@@ -18,7 +18,7 @@
* @defgroup delete_child_sa_job delete_child_sa_job
* @{ @ingroup jobs
*/
-
+
#ifndef DELETE_IKE_SA_JOB_H_
#define DELETE_IKE_SA_JOB_H_
@@ -32,12 +32,12 @@ typedef struct delete_ike_sa_job_t delete_ike_sa_job_t;
/**
* Class representing an DELETE_IKE_SA Job.
*
- * This job is responsible for deleting established or half open IKE_SAs.
+ * This job is responsible for deleting established or half open IKE_SAs.
* A half open IKE_SA is every IKE_SA which hasn't reache the SA_ESTABLISHED
* state.
*/
struct delete_ike_sa_job_t {
-
+
/**
* The job_t interface.
*/
@@ -46,7 +46,7 @@ struct delete_ike_sa_job_t {
/**
* Creates a job of type DELETE_IKE_SA.
- *
+ *
* @param ike_sa_id id of the IKE_SA to delete
* @param delete_if_established should the IKE_SA be deleted if it is established?
* @return created delete_ike_sa_job_t object
diff --git a/src/charon/processing/jobs/inactivity_job.c b/src/charon/processing/jobs/inactivity_job.c
new file mode 100644
index 000000000..13fc5e3d0
--- /dev/null
+++ b/src/charon/processing/jobs/inactivity_job.c
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2010 Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include "inactivity_job.h"
+
+#include <daemon.h>
+
+typedef struct private_inactivity_job_t private_inactivity_job_t;
+
+/**
+ * Private data of an inactivity_job_t object.
+ */
+struct private_inactivity_job_t {
+
+ /**
+ * Public inactivity_job_t interface.
+ */
+ inactivity_job_t public;
+
+ /**
+ * Reqid of CHILD_SA to check
+ */
+ u_int32_t reqid;
+
+ /**
+ * Inactivity timeout
+ */
+ u_int32_t timeout;
+
+ /**
+ * Close IKE_SA if last remaining CHILD inactive?
+ */
+ bool close_ike;
+};
+
+METHOD(job_t, destroy, void,
+ private_inactivity_job_t *this)
+{
+ free(this);
+}
+
+METHOD(job_t, execute, void,
+ private_inactivity_job_t *this)
+{
+ ike_sa_t *ike_sa;
+ bool rescheduled = FALSE;
+
+ ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
+ this->reqid, TRUE);
+ if (ike_sa)
+ {
+ iterator_t *iterator;
+ child_sa_t *child_sa;
+ u_int32_t delete = 0;
+ protocol_id_t proto = 0;
+ int children = 0;
+ status_t status = SUCCESS;
+
+ iterator = ike_sa->create_child_sa_iterator(ike_sa);
+ while (iterator->iterate(iterator, (void**)&child_sa))
+ {
+ if (child_sa->get_reqid(child_sa) == this->reqid)
+ {
+ time_t in, out, diff;
+
+ child_sa->get_usestats(child_sa, TRUE, &in, NULL);
+ child_sa->get_usestats(child_sa, FALSE, &out, NULL);
+
+ diff = time_monotonic(NULL) - max(in, out);
+
+ if (diff >= this->timeout)
+ {
+ delete = child_sa->get_spi(child_sa, TRUE);
+ proto = child_sa->get_protocol(child_sa);
+ }
+ else
+ {
+ charon->scheduler->schedule_job(charon->scheduler,
+ &this->public.job_interface, this->timeout - diff);
+ rescheduled = TRUE;
+ }
+ }
+ children++;
+ }
+ iterator->destroy(iterator);
+
+ if (delete)
+ {
+ if (children == 1 && this->close_ike)
+ {
+ DBG1(DBG_JOB, "deleting IKE_SA after %d seconds "
+ "of CHILD_SA inactivity", this->timeout);
+ status = ike_sa->delete(ike_sa);
+ }
+ else
+ {
+ DBG1(DBG_JOB, "deleting CHILD_SA after %d seconds "
+ "of inactivity", this->timeout);
+ status = ike_sa->delete_child_sa(ike_sa, proto, delete);
+ }
+ }
+ if (status == DESTROY_ME)
+ {
+ charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager,
+ ike_sa);
+ }
+ else
+ {
+ charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ }
+ }
+ if (!rescheduled)
+ {
+ destroy(this);
+ }
+}
+
+/**
+ * See header
+ */
+inactivity_job_t *inactivity_job_create(u_int32_t reqid, u_int32_t timeout,
+ bool close_ike)
+{
+ private_inactivity_job_t *this;
+
+ INIT(this,
+ .public.job_interface = {
+ .execute = _execute,
+ .destroy = _destroy,
+ },
+ .reqid = reqid,
+ .timeout = timeout,
+ .close_ike = close_ike,
+ );
+
+ return &this->public;
+}
+
diff --git a/src/charon/processing/jobs/inactivity_job.h b/src/charon/processing/jobs/inactivity_job.h
new file mode 100644
index 000000000..9c9daced8
--- /dev/null
+++ b/src/charon/processing/jobs/inactivity_job.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2010 Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup inactivity_job inactivity_job
+ * @{ @ingroup jobs
+ */
+
+#ifndef INACTIVITY_JOB_H_
+#define INACTIVITY_JOB_H_
+
+#include <library.h>
+#include <processing/jobs/job.h>
+
+typedef struct inactivity_job_t inactivity_job_t;
+
+/**
+ * Job checking for inactivity of CHILD_SA to close them.
+ *
+ * The inactivity job reschedules itself to check CHILD_SAs prediodically.
+ */
+struct inactivity_job_t {
+
+ /**
+ * Implements job_t.
+ */
+ job_t job_interface;
+};
+
+/**
+ * Create a inactivity_job instance.
+ *
+ * @param reqid reqid of CHILD_SA to check for inactivity
+ * @param timeout inactivity timeout in s
+ * @param close_ike close IKE_SA if the last remaining CHILD_SA is inactive?
+ * @return inactivity checking job
+ */
+inactivity_job_t *inactivity_job_create(u_int32_t reqid, u_int32_t timeout,
+ bool close_ike);
+
+#endif /** INACTIVITY_JOB_H_ @}*/
diff --git a/src/charon/processing/jobs/initiate_mediation_job.c b/src/charon/processing/jobs/initiate_mediation_job.c
index 157d84341..ffe8755e2 100644
--- a/src/charon/processing/jobs/initiate_mediation_job.c
+++ b/src/charon/processing/jobs/initiate_mediation_job.c
@@ -29,12 +29,12 @@ struct private_initiate_mediation_job_t {
* public initiate_mediation_job_t interface
*/
initiate_mediation_job_t public;
-
+
/**
* ID of the IKE_SA of the mediated connection.
*/
ike_sa_id_t *mediated_sa_id;
-
+
/**
* ID of the IKE_SA of the mediation connection.
*/
@@ -68,26 +68,27 @@ static bool initiate_callback(private_initiate_mediation_job_t *this,
/**
* Implementation of job_t.execute.
- */
+ */
static void initiate(private_initiate_mediation_job_t *this)
{
ike_sa_t *mediated_sa, *mediation_sa;
peer_cfg_t *mediated_cfg, *mediation_cfg;
enumerator_t *enumerator;
auth_cfg_t *auth_cfg;
-
+
mediated_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
this->mediated_sa_id);
if (mediated_sa)
{
+ DBG1(DBG_IKE, "initiating mediation connection");
mediated_cfg = mediated_sa->get_peer_cfg(mediated_sa);
- mediated_cfg->get_ref(mediated_cfg);
-
+ mediated_cfg->get_ref(mediated_cfg);
+
charon->ike_sa_manager->checkin(charon->ike_sa_manager, mediated_sa);
-
+
mediation_cfg = mediated_cfg->get_mediated_by(mediated_cfg);
mediation_cfg->get_ref(mediation_cfg);
-
+
enumerator = mediation_cfg->create_auth_cfg_enumerator(mediation_cfg,
TRUE);
if (!enumerator->enumerate(enumerator, &auth_cfg) ||
@@ -99,7 +100,8 @@ static void initiate(private_initiate_mediation_job_t *this)
destroy(this);
return;
}
-
+ enumerator->destroy(enumerator);
+
if (charon->connect_manager->check_and_register(charon->connect_manager,
auth_cfg->get(auth_cfg, AUTH_RULE_IDENTITY),
mediated_cfg->get_peer_id(mediated_cfg),
@@ -107,7 +109,7 @@ static void initiate(private_initiate_mediation_job_t *this)
{
mediated_cfg->destroy(mediated_cfg);
mediation_cfg->destroy(mediation_cfg);
-
+
mediated_sa = charon->ike_sa_manager->checkout(
charon->ike_sa_manager, this->mediated_sa_id);
if (mediated_sa)
@@ -121,7 +123,7 @@ static void initiate(private_initiate_mediation_job_t *this)
return;
}
/* we need an additional reference because initiate consumes one */
- mediation_cfg->get_ref(mediation_cfg);
+ mediation_cfg->get_ref(mediation_cfg);
if (charon->controller->initiate(charon->controller, mediation_cfg,
NULL, (controller_cb_t)initiate_callback, this) != SUCCESS)
@@ -142,8 +144,7 @@ static void initiate(private_initiate_mediation_job_t *this)
mediation_cfg->destroy(mediation_cfg);
mediation_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
- this->mediation_sa_id);
-
+ this->mediation_sa_id);
if (mediation_sa)
{
if (mediation_sa->initiate_mediation(mediation_sa,
@@ -163,10 +164,9 @@ static void initiate(private_initiate_mediation_job_t *this)
destroy(this);
return;
}
-
- charon->ike_sa_manager->checkin(charon->ike_sa_manager, mediation_sa);
+ charon->ike_sa_manager->checkin(charon->ike_sa_manager,
+ mediation_sa);
}
-
mediated_cfg->destroy(mediated_cfg);
}
destroy(this);
@@ -174,44 +174,50 @@ static void initiate(private_initiate_mediation_job_t *this)
/**
* Implementation of job_t.execute.
- */
+ */
static void reinitiate(private_initiate_mediation_job_t *this)
{
ike_sa_t *mediated_sa, *mediation_sa;
peer_cfg_t *mediated_cfg;
-
+
mediated_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
- this->mediated_sa_id);
+ this->mediated_sa_id);
if (mediated_sa)
{
mediated_cfg = mediated_sa->get_peer_cfg(mediated_sa);
mediated_cfg->get_ref(mediated_cfg);
charon->ike_sa_manager->checkin(charon->ike_sa_manager, mediated_sa);
-
+
mediation_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
- this->mediation_sa_id);
+ this->mediation_sa_id);
if (mediation_sa)
{
- if (mediation_sa->initiate_mediation(mediation_sa, mediated_cfg) != SUCCESS)
+ if (mediation_sa->initiate_mediation(mediation_sa,
+ mediated_cfg) != SUCCESS)
{
DBG1(DBG_JOB, "initiating mediated connection '%s' failed",
- mediated_cfg->get_name(mediated_cfg));
+ mediated_cfg->get_name(mediated_cfg));
mediated_cfg->destroy(mediated_cfg);
- charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, mediation_sa);
+ charon->ike_sa_manager->checkin_and_destroy(
+ charon->ike_sa_manager,
+ mediation_sa);
mediated_sa = charon->ike_sa_manager->checkout(
- charon->ike_sa_manager, this->mediated_sa_id);
+ charon->ike_sa_manager,
+ this->mediated_sa_id);
if (mediated_sa)
{
DBG1(DBG_IKE, "establishing mediation connection failed");
charon->ike_sa_manager->checkin_and_destroy(
- charon->ike_sa_manager, mediated_sa);
+ charon->ike_sa_manager,
+ mediated_sa);
}
destroy(this);
return;
}
- charon->ike_sa_manager->checkin(charon->ike_sa_manager, mediation_sa);
+ charon->ike_sa_manager->checkin(charon->ike_sa_manager,
+ mediation_sa);
}
-
+
mediated_cfg->destroy(mediated_cfg);
}
destroy(this);
@@ -223,10 +229,10 @@ static void reinitiate(private_initiate_mediation_job_t *this)
static private_initiate_mediation_job_t *initiate_mediation_job_create_empty()
{
private_initiate_mediation_job_t *this = malloc_thing(private_initiate_mediation_job_t);
-
+
/* interface functions */
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
-
+
/* private variables */
this->mediation_sa_id = NULL;
this->mediated_sa_id = NULL;
@@ -240,9 +246,9 @@ static private_initiate_mediation_job_t *initiate_mediation_job_create_empty()
initiate_mediation_job_t *initiate_mediation_job_create(ike_sa_id_t *ike_sa_id)
{
private_initiate_mediation_job_t *this = initiate_mediation_job_create_empty();
-
+
this->public.job_interface.execute = (void (*) (job_t *)) initiate;
-
+
this->mediated_sa_id = ike_sa_id->clone(ike_sa_id);
return &this->public;
@@ -255,11 +261,11 @@ initiate_mediation_job_t *reinitiate_mediation_job_create(ike_sa_id_t *mediation
ike_sa_id_t *mediated_sa_id)
{
private_initiate_mediation_job_t *this = initiate_mediation_job_create_empty();
-
+
this->public.job_interface.execute = (void (*) (job_t *)) reinitiate;
-
+
this->mediation_sa_id = mediation_sa_id->clone(mediation_sa_id);
this->mediated_sa_id = mediated_sa_id->clone(mediated_sa_id);
-
- return &this->public;
+
+ return &this->public;
}
diff --git a/src/charon/processing/jobs/initiate_mediation_job.h b/src/charon/processing/jobs/initiate_mediation_job.h
index 084e1b9fd..fddb1dd7b 100644
--- a/src/charon/processing/jobs/initiate_mediation_job.h
+++ b/src/charon/processing/jobs/initiate_mediation_job.h
@@ -28,7 +28,7 @@ typedef struct initiate_mediation_job_t initiate_mediation_job_t;
/**
* Class representing a INITIATE_MEDIATION Job.
- *
+ *
* This job will initiate a mediation on behalf of a mediated connection.
* If required the mediation connection is established.
*/
@@ -41,7 +41,7 @@ struct initiate_mediation_job_t {
/**
* Creates a job of type INITIATE_MEDIATION.
- *
+ *
* @param ike_sa_id identification of the ike_sa as ike_sa_id_t object (gets cloned)
* @return job object
*/
@@ -50,12 +50,13 @@ initiate_mediation_job_t *initiate_mediation_job_create(ike_sa_id_t *ike_sa_id);
/**
* Creates a special job of type INITIATE_MEDIATION that reinitiates a
* specific connection.
- *
+ *
* @param mediation_sa_id identification of the mediation sa (gets cloned)
* @param mediated_sa_id identification of the mediated sa (gets cloned)
* @return job object
*/
-initiate_mediation_job_t *reinitiate_mediation_job_create(ike_sa_id_t *mediation_sa_id,
- ike_sa_id_t *mediated_sa_id);
+initiate_mediation_job_t *reinitiate_mediation_job_create(
+ ike_sa_id_t *mediation_sa_id,
+ ike_sa_id_t *mediated_sa_id);
#endif /** INITIATE_MEDIATION_JOB_H_ @}*/
diff --git a/src/charon/processing/jobs/job.h b/src/charon/processing/jobs/job.h
index acc88b124..0f1c16ebe 100644
--- a/src/charon/processing/jobs/job.h
+++ b/src/charon/processing/jobs/job.h
@@ -33,7 +33,7 @@ struct job_t {
/**
* Execute a job.
- *
+ *
* The processing facility executes a job using this method. Jobs are
* one-shot, they destroy themself after execution, so don't use a job
* once it has been executed.
diff --git a/src/charon/processing/jobs/mediation_job.c b/src/charon/processing/jobs/mediation_job.c
index cf522faff..b5b8af3b3 100644
--- a/src/charon/processing/jobs/mediation_job.c
+++ b/src/charon/processing/jobs/mediation_job.c
@@ -29,37 +29,37 @@ struct private_mediation_job_t {
* public mediation_job_t interface
*/
mediation_job_t public;
-
+
/**
* ID of target peer.
*/
identification_t *target;
-
+
/**
* ID of the source peer.
*/
identification_t *source;
-
+
/**
* ME_CONNECTID
*/
chunk_t connect_id;
-
+
/**
* ME_CONNECTKEY
*/
chunk_t connect_key;
-
+
/**
* Submitted endpoints
*/
linked_list_t *endpoints;
-
+
/**
* Is this a callback job?
*/
bool callback;
-
+
/**
* Is this a response?
*/
@@ -81,13 +81,13 @@ static void destroy(private_mediation_job_t *this)
/**
* Implementation of job_t.execute.
- */
+ */
static void execute(private_mediation_job_t *this)
{
ike_sa_id_t *target_sa_id;
-
+
target_sa_id = charon->mediation_manager->check(charon->mediation_manager, this->target);
-
+
if (target_sa_id)
{
ike_sa_t *target_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
@@ -120,7 +120,7 @@ static void execute(private_mediation_job_t *this)
return;
}
}
-
+
charon->ike_sa_manager->checkin(charon->ike_sa_manager, target_sa);
}
else
@@ -143,11 +143,11 @@ static void execute(private_mediation_job_t *this)
static private_mediation_job_t *mediation_job_create_empty()
{
private_mediation_job_t *this = malloc_thing(private_mediation_job_t);
-
+
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
-
+
/* private variables */
this->target = NULL;
this->source = NULL;
@@ -156,7 +156,7 @@ static private_mediation_job_t *mediation_job_create_empty()
this->connect_key = chunk_empty;
this->endpoints = NULL;
this->response = FALSE;
-
+
return this;
}
@@ -175,7 +175,7 @@ mediation_job_t *mediation_job_create(identification_t *peer_id,
this->connect_key = chunk_clone(connect_key);
this->endpoints = endpoints->clone_offset(endpoints, offsetof(endpoint_notify_t, clone));
this->response = response;
-
+
return &this->public;
}
@@ -186,10 +186,10 @@ mediation_job_t *mediation_callback_job_create(identification_t *requester,
identification_t *peer_id)
{
private_mediation_job_t *this = mediation_job_create_empty();
-
+
this->target = requester->clone(requester);
this->source = peer_id->clone(peer_id);
this->callback = TRUE;
-
+
return &this->public;
}
diff --git a/src/charon/processing/jobs/mediation_job.h b/src/charon/processing/jobs/mediation_job.h
index 583ea8230..0574c65eb 100644
--- a/src/charon/processing/jobs/mediation_job.h
+++ b/src/charon/processing/jobs/mediation_job.h
@@ -30,7 +30,7 @@ typedef struct mediation_job_t mediation_job_t;
/**
* Class representing a MEDIATION Job.
- *
+ *
* This job handles the mediation on the mediation server.
*/
struct mediation_job_t {
@@ -42,9 +42,9 @@ struct mediation_job_t {
/**
* Creates a job of type MEDIATION.
- *
+ *
* Parameters get cloned.
- *
+ *
* @param peer_id ID of the requested peer
* @param requester ID of the requesting peer
* @param connect_id content of ME_CONNECTID (could be NULL)
@@ -61,9 +61,9 @@ mediation_job_t *mediation_job_create(identification_t *peer_id,
/**
* Creates a special job of type MEDIATION that is used to send a callback
* notification to a peer.
- *
+ *
* Parameters get cloned.
- *
+ *
* @param requester ID of the waiting peer
* @param peer_id ID of the requested peer
* @return job object
diff --git a/src/charon/processing/jobs/migrate_job.c b/src/charon/processing/jobs/migrate_job.c
index a57d0478b..05f47340c 100644
--- a/src/charon/processing/jobs/migrate_job.c
+++ b/src/charon/processing/jobs/migrate_job.c
@@ -30,7 +30,7 @@ struct private_migrate_job_t {
* Public migrate_job_t interface.
*/
migrate_job_t public;
-
+
/**
* reqid of the CHILD_SA if it already exists
*/
@@ -75,7 +75,7 @@ static void destroy(private_migrate_job_t *this)
static void execute(private_migrate_job_t *this)
{
ike_sa_t *ike_sa = NULL;
-
+
if (this->reqid)
{
ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
@@ -110,7 +110,7 @@ static void execute(private_migrate_job_t *this)
if (child_sa->update(child_sa, this->local, this->remote,
ike_sa->get_virtual_ip(ike_sa, TRUE),
- ike_sa->has_condition(ike_sa, COND_NAT_ANY)) == NOT_SUPPORTED)
+ ike_sa->has_condition(ike_sa, COND_NAT_ANY)) == NOT_SUPPORTED)
{
ike_sa->rekey_child_sa(ike_sa, child_sa->get_protocol(child_sa),
child_sa->get_spi(child_sa, TRUE));
@@ -134,17 +134,17 @@ migrate_job_t *migrate_job_create(u_int32_t reqid,
host_t *local, host_t *remote)
{
private_migrate_job_t *this = malloc_thing(private_migrate_job_t);
-
+
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
-
+
/* private variables */
this->reqid = reqid;
this->src_ts = (dir == POLICY_OUT) ? src_ts : dst_ts;
this->dst_ts = (dir == POLICY_OUT) ? dst_ts : src_ts;
this->local = local;
this->remote = remote;
-
+
return &this->public;
}
diff --git a/src/charon/processing/jobs/migrate_job.h b/src/charon/processing/jobs/migrate_job.h
index 672a09b0a..de313d517 100644
--- a/src/charon/processing/jobs/migrate_job.h
+++ b/src/charon/processing/jobs/migrate_job.h
@@ -25,14 +25,14 @@ typedef struct migrate_job_t migrate_job_t;
#include <library.h>
#include <utils/host.h>
-#include <config/traffic_selector.h>
+#include <selectors/traffic_selector.h>
#include <kernel/kernel_ipsec.h>
#include <processing/jobs/job.h>
/**
* Class representing a MIGRATE Job.
- *
- * This job sets a routed CHILD_SA for an existing IPsec policy.
+ *
+ * This job sets a routed CHILD_SA for an existing IPsec policy.
*/
struct migrate_job_t {
/**
@@ -49,15 +49,13 @@ struct migrate_job_t {
* @param reqid reqid of the CHILD_SA to acquire
* @param src_ts source traffic selector to be used in the policy
* @param dst_ts destination traffic selector to be used in the policy
- * @param dir direction of the policy (in|out)
- * @param local local host address to be used in the IKE_SA
- * @param remote remote host address to be used in the IKE_SA
+ * @param dir direction of the policy (in|out)
+ * @param local local host address to be used in the IKE_SA
+ * @param remote remote host address to be used in the IKE_SA
* @return migrate_job_t object
*/
migrate_job_t *migrate_job_create(u_int32_t reqid,
- traffic_selector_t *src_ts,
- traffic_selector_t *dst_ts,
- policy_dir_t dir,
- host_t *local, host_t *remote);
+ traffic_selector_t *src_ts, traffic_selector_t *dst_ts,
+ policy_dir_t dir, host_t *local, host_t *remote);
#endif /** MIGRATE_JOB_H_ @}*/
diff --git a/src/charon/processing/jobs/process_message_job.c b/src/charon/processing/jobs/process_message_job.c
index 1f0b3e287..a47d48e38 100644
--- a/src/charon/processing/jobs/process_message_job.c
+++ b/src/charon/processing/jobs/process_message_job.c
@@ -28,7 +28,7 @@ struct private_process_message_job_t {
* public process_message_job_t interface
*/
process_message_job_t public;
-
+
/**
* Message associated with this job
*/
@@ -50,9 +50,9 @@ static void destroy(private_process_message_job_t *this)
static void execute(private_process_message_job_t *this)
{
ike_sa_t *ike_sa;
-
+
#ifdef ME
- /* if this is an unencrypted INFORMATIONAL exchange it is likely a
+ /* if this is an unencrypted INFORMATIONAL exchange it is likely a
* connectivity check. */
if (this->message->get_exchange_type(this->message) == INFORMATIONAL &&
this->message->get_first_payload_type(this->message) != ENCRYPTED)
@@ -67,7 +67,7 @@ static void execute(private_process_message_job_t *this)
return;
}
#endif /* ME */
-
+
ike_sa = charon->ike_sa_manager->checkout_by_message(charon->ike_sa_manager,
this->message);
if (ike_sa)
@@ -98,9 +98,9 @@ process_message_job_t *process_message_job_create(message_t *message)
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void(*)(job_t*))destroy;
-
+
/* private variables */
this->message = message;
-
+
return &(this->public);
}
diff --git a/src/charon/processing/jobs/process_message_job.h b/src/charon/processing/jobs/process_message_job.h
index b01d388f9..5e3f44d1f 100644
--- a/src/charon/processing/jobs/process_message_job.h
+++ b/src/charon/processing/jobs/process_message_job.h
@@ -40,7 +40,7 @@ struct process_message_job_t {
/**
* Creates a job of type PROCESS_MESSAGE.
- *
+ *
* @param message message to process
* @return created process_message_job_t object
*/
diff --git a/src/charon/processing/jobs/rekey_child_sa_job.c b/src/charon/processing/jobs/rekey_child_sa_job.c
index 17fcf641b..b797d181e 100644
--- a/src/charon/processing/jobs/rekey_child_sa_job.c
+++ b/src/charon/processing/jobs/rekey_child_sa_job.c
@@ -28,17 +28,17 @@ struct private_rekey_child_sa_job_t {
* Public rekey_child_sa_job_t interface.
*/
rekey_child_sa_job_t public;
-
+
/**
* reqid of the child to rekey
*/
u_int32_t reqid;
-
+
/**
* protocol of the CHILD_SA (ESP/AH)
*/
protocol_id_t protocol;
-
+
/**
* inbound SPI of the CHILD_SA
*/
@@ -59,7 +59,7 @@ static void destroy(private_rekey_child_sa_job_t *this)
static void execute(private_rekey_child_sa_job_t *this)
{
ike_sa_t *ike_sa;
-
+
ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
this->reqid, TRUE);
if (ike_sa == NULL)
@@ -69,7 +69,7 @@ static void execute(private_rekey_child_sa_job_t *this)
}
else
{
- ike_sa->rekey_child_sa(ike_sa, this->protocol, this->spi);
+ ike_sa->rekey_child_sa(ike_sa, this->protocol, this->spi);
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
destroy(this);
@@ -78,20 +78,20 @@ static void execute(private_rekey_child_sa_job_t *this)
/*
* Described in header
*/
-rekey_child_sa_job_t *rekey_child_sa_job_create(u_int32_t reqid,
- protocol_id_t protocol,
+rekey_child_sa_job_t *rekey_child_sa_job_create(u_int32_t reqid,
+ protocol_id_t protocol,
u_int32_t spi)
{
private_rekey_child_sa_job_t *this = malloc_thing(private_rekey_child_sa_job_t);
-
+
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
-
+
/* private variables */
this->reqid = reqid;
this->protocol = protocol;
this->spi = spi;
-
+
return &this->public;
}
diff --git a/src/charon/processing/jobs/rekey_child_sa_job.h b/src/charon/processing/jobs/rekey_child_sa_job.h
index 2e2eef361..62887d6b9 100644
--- a/src/charon/processing/jobs/rekey_child_sa_job.h
+++ b/src/charon/processing/jobs/rekey_child_sa_job.h
@@ -51,7 +51,7 @@ struct rekey_child_sa_job_t {
* @param spi security parameter index of the CHILD_SA
* @return rekey_child_sa_job_t object
*/
-rekey_child_sa_job_t *rekey_child_sa_job_create(u_int32_t reqid,
+rekey_child_sa_job_t *rekey_child_sa_job_create(u_int32_t reqid,
protocol_id_t protocol,
u_int32_t spi);
#endif /** REKEY_CHILD_SA_JOB_H_ @}*/
diff --git a/src/charon/processing/jobs/rekey_ike_sa_job.c b/src/charon/processing/jobs/rekey_ike_sa_job.c
index 1ceb1e144..5ec0b1b88 100644
--- a/src/charon/processing/jobs/rekey_ike_sa_job.c
+++ b/src/charon/processing/jobs/rekey_ike_sa_job.c
@@ -12,7 +12,7 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
-
+
#include "rekey_ike_sa_job.h"
#include <daemon.h>
@@ -27,12 +27,12 @@ struct private_rekey_ike_sa_job_t {
* Public rekey_ike_sa_job_t interface.
*/
rekey_ike_sa_job_t public;
-
+
/**
* ID of the IKE_SA to rekey
*/
ike_sa_id_t *ike_sa_id;
-
+
/**
* force reauthentication of the peer (full IKE_SA setup)
*/
@@ -55,7 +55,7 @@ static void execute(private_rekey_ike_sa_job_t *this)
{
ike_sa_t *ike_sa;
status_t status = SUCCESS;
-
+
ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
this->ike_sa_id);
if (ike_sa == NULL)
@@ -72,7 +72,7 @@ static void execute(private_rekey_ike_sa_job_t *this)
{
status = ike_sa->rekey(ike_sa);
}
-
+
if (status == DESTROY_ME)
{
charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
@@ -91,14 +91,14 @@ static void execute(private_rekey_ike_sa_job_t *this)
rekey_ike_sa_job_t *rekey_ike_sa_job_create(ike_sa_id_t *ike_sa_id, bool reauth)
{
private_rekey_ike_sa_job_t *this = malloc_thing(private_rekey_ike_sa_job_t);
-
+
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
-
+
/* private variables */
this->ike_sa_id = ike_sa_id->clone(ike_sa_id);
this->reauth = reauth;
-
+
return &(this->public);
}
diff --git a/src/charon/processing/jobs/rekey_ike_sa_job.h b/src/charon/processing/jobs/rekey_ike_sa_job.h
index 0d830e134..a5c1028aa 100644
--- a/src/charon/processing/jobs/rekey_ike_sa_job.h
+++ b/src/charon/processing/jobs/rekey_ike_sa_job.h
@@ -29,7 +29,7 @@ typedef struct rekey_ike_sa_job_t rekey_ike_sa_job_t;
/**
* Class representing an REKEY_IKE_SA Job.
- *
+ *
* This job initiates the rekeying of an IKE_SA.
*/
struct rekey_ike_sa_job_t {
diff --git a/src/charon/processing/jobs/retransmit_job.c b/src/charon/processing/jobs/retransmit_job.c
index 122cad853..fc787f208 100644
--- a/src/charon/processing/jobs/retransmit_job.c
+++ b/src/charon/processing/jobs/retransmit_job.c
@@ -13,7 +13,7 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
-
+
#include "retransmit_job.h"
#include <daemon.h>
@@ -28,7 +28,7 @@ struct private_retransmit_job_t {
* Public retransmit_job_t interface.
*/
retransmit_job_t public;
-
+
/**
* Message ID of the request to resend.
*/
@@ -55,7 +55,7 @@ static void destroy(private_retransmit_job_t *this)
static void execute(private_retransmit_job_t *this)
{
ike_sa_t *ike_sa;
-
+
ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
this->ike_sa_id);
if (ike_sa)
@@ -80,7 +80,7 @@ static void execute(private_retransmit_job_t *this)
retransmit_job_t *retransmit_job_create(u_int32_t message_id,ike_sa_id_t *ike_sa_id)
{
private_retransmit_job_t *this = malloc_thing(private_retransmit_job_t);
-
+
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
@@ -88,6 +88,6 @@ retransmit_job_t *retransmit_job_create(u_int32_t message_id,ike_sa_id_t *ike_sa
/* private variables */
this->message_id = message_id;
this->ike_sa_id = ike_sa_id->clone(ike_sa_id);
-
+
return &this->public;
}
diff --git a/src/charon/processing/jobs/retransmit_job.h b/src/charon/processing/jobs/retransmit_job.h
index 4c9bea1c8..c8c13479b 100644
--- a/src/charon/processing/jobs/retransmit_job.h
+++ b/src/charon/processing/jobs/retransmit_job.h
@@ -44,7 +44,7 @@ struct retransmit_job_t {
/**
* Creates a job of type retransmit.
- *
+ *
* @param message_id message_id of the request to resend
* @param ike_sa_id identification of the ike_sa as ike_sa_id_t
* @return retransmit_job_t object
diff --git a/src/charon/processing/jobs/roam_job.c b/src/charon/processing/jobs/roam_job.c
index c01f83248..adc884a8a 100644
--- a/src/charon/processing/jobs/roam_job.c
+++ b/src/charon/processing/jobs/roam_job.c
@@ -31,7 +31,7 @@ struct private_roam_job_t {
* public roam_job_t interface
*/
roam_job_t public;
-
+
/**
* has the address list changed, or the routing only?
*/
@@ -47,16 +47,16 @@ static void destroy(private_roam_job_t *this)
}
/**
- * Implementation of job_t.execute.
- */
+ * Implementation of job_t.execute.
+ */
static void execute(private_roam_job_t *this)
{
ike_sa_t *ike_sa;
linked_list_t *list;
ike_sa_id_t *id;
enumerator_t *enumerator;
-
- /* enumerator over all IKE_SAs gives us no way to checkin_and_destroy
+
+ /* enumerator over all IKE_SAs gives us no way to checkin_and_destroy
* after a DESTROY_ME, so we check out each available IKE_SA by hand. */
list = linked_list_create();
enumerator = charon->ike_sa_manager->create_enumerator(charon->ike_sa_manager);
@@ -66,7 +66,7 @@ static void execute(private_roam_job_t *this)
list->insert_last(list, id->clone(id));
}
enumerator->destroy(enumerator);
-
+
while (list->remove_last(list, (void**)&id) == SUCCESS)
{
ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager, id);
@@ -95,10 +95,10 @@ static void execute(private_roam_job_t *this)
roam_job_t *roam_job_create(bool address)
{
private_roam_job_t *this = malloc_thing(private_roam_job_t);
-
+
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
-
+
this->address = address;
return &this->public;
diff --git a/src/charon/processing/jobs/roam_job.h b/src/charon/processing/jobs/roam_job.h
index 7bb1227f5..55bdf2b28 100644
--- a/src/charon/processing/jobs/roam_job.h
+++ b/src/charon/processing/jobs/roam_job.h
@@ -29,7 +29,7 @@ typedef struct roam_job_t roam_job_t;
/**
* A job to inform IKE_SAs about changed local address setup.
- *
+ *
* If a local address appears or disappears, the kernel fires this job to
* update all IKE_SAs.
*/
@@ -43,7 +43,7 @@ struct roam_job_t {
/**
* Creates a job to inform IKE_SAs about an updated address list.
- *
+ *
* @param address TRUE if address list changed, FALSE if routing changed
* @return initiate_ike_sa_job_t object
*/
diff --git a/src/charon/processing/jobs/send_dpd_job.c b/src/charon/processing/jobs/send_dpd_job.c
index c6e81a56f..1c2da52b8 100644
--- a/src/charon/processing/jobs/send_dpd_job.c
+++ b/src/charon/processing/jobs/send_dpd_job.c
@@ -31,7 +31,7 @@ struct private_send_dpd_job_t {
* public send_dpd_job_t interface
*/
send_dpd_job_t public;
-
+
/**
* ID of the IKE_SA which the message belongs to.
*/
@@ -48,12 +48,12 @@ static void destroy(private_send_dpd_job_t *this)
}
/**
- * Implementation of job_t.execute.
- */
+ * Implementation of job_t.execute.
+ */
static void execute(private_send_dpd_job_t *this)
{
ike_sa_t *ike_sa;
-
+
ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
this->ike_sa_id);
if (ike_sa)
@@ -76,11 +76,11 @@ static void execute(private_send_dpd_job_t *this)
send_dpd_job_t *send_dpd_job_create(ike_sa_id_t *ike_sa_id)
{
private_send_dpd_job_t *this = malloc_thing(private_send_dpd_job_t);
-
+
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
-
+
/* private variables */
this->ike_sa_id = ike_sa_id->clone(ike_sa_id);
diff --git a/src/charon/processing/jobs/send_dpd_job.h b/src/charon/processing/jobs/send_dpd_job.h
index 91556a9d1..8078a38bc 100644
--- a/src/charon/processing/jobs/send_dpd_job.h
+++ b/src/charon/processing/jobs/send_dpd_job.h
@@ -29,7 +29,7 @@ typedef struct send_dpd_job_t send_dpd_job_t;
/**
* Class representing a SEND_DPD Job.
- *
+ *
* Job to periodically send a Dead Peer Detection (DPD) request,
* ie. an IKE request with no payloads other than the encrypted payload
* required by the syntax.
@@ -43,7 +43,7 @@ struct send_dpd_job_t {
/**
* Creates a job of type SEND_DPD.
- *
+ *
* @param ike_sa_id identification of the ike_sa as ike_sa_id_t object (gets cloned)
* @return initiate_ike_sa_job_t object
*/
diff --git a/src/charon/processing/jobs/send_keepalive_job.c b/src/charon/processing/jobs/send_keepalive_job.c
index 5d3cfb530..3d02cea2e 100644
--- a/src/charon/processing/jobs/send_keepalive_job.c
+++ b/src/charon/processing/jobs/send_keepalive_job.c
@@ -31,7 +31,7 @@ struct private_send_keepalive_job_t {
* public send_keepalive_job_t interface
*/
send_keepalive_job_t public;
-
+
/**
* ID of the IKE_SA which the message belongs to.
*/
@@ -49,11 +49,11 @@ static void destroy(private_send_keepalive_job_t *this)
/**
* Implementation of job_t.execute.
- */
+ */
static void execute(private_send_keepalive_job_t *this)
{
ike_sa_t *ike_sa;
-
+
ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
this->ike_sa_id);
if (ike_sa)
@@ -70,11 +70,11 @@ static void execute(private_send_keepalive_job_t *this)
send_keepalive_job_t *send_keepalive_job_create(ike_sa_id_t *ike_sa_id)
{
private_send_keepalive_job_t *this = malloc_thing(private_send_keepalive_job_t);
-
+
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
-
+
/* private variables */
this->ike_sa_id = ike_sa_id->clone(ike_sa_id);
diff --git a/src/charon/processing/jobs/send_keepalive_job.h b/src/charon/processing/jobs/send_keepalive_job.h
index f92e6217a..cda83cd7e 100644
--- a/src/charon/processing/jobs/send_keepalive_job.h
+++ b/src/charon/processing/jobs/send_keepalive_job.h
@@ -29,7 +29,7 @@ typedef struct send_keepalive_job_t send_keepalive_job_t;
/**
* Class representing a SEND_KEEPALIVE Job.
- *
+ *
* This job will send a NAT keepalive packet if the IKE SA is still alive,
* and reinsert itself into the event queue.
*/
@@ -42,7 +42,7 @@ struct send_keepalive_job_t {
/**
* Creates a job of type SEND_KEEPALIVE.
- *
+ *
* @param ike_sa_id identification of the ike_sa as ike_sa_id_t object (gets cloned)
* @return initiate_ike_sa_job_t object
*/
diff --git a/src/charon/processing/jobs/update_sa_job.c b/src/charon/processing/jobs/update_sa_job.c
index 5e6c83942..17dce2548 100644
--- a/src/charon/processing/jobs/update_sa_job.c
+++ b/src/charon/processing/jobs/update_sa_job.c
@@ -31,12 +31,12 @@ struct private_update_sa_job_t {
* public update_sa_job_t interface
*/
update_sa_job_t public;
-
+
/**
* reqid of the CHILD_SA
*/
u_int32_t reqid;
-
+
/**
* New SA address and port
*/
@@ -53,12 +53,12 @@ static void destroy(private_update_sa_job_t *this)
}
/**
- * Implementation of job_t.execute.
- */
+ * Implementation of job_t.execute.
+ */
static void execute(private_update_sa_job_t *this)
{
ike_sa_t *ike_sa;
-
+
ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
this->reqid, TRUE);
if (ike_sa == NULL)
@@ -84,10 +84,10 @@ static void execute(private_update_sa_job_t *this)
update_sa_job_t *update_sa_job_create(u_int32_t reqid, host_t *new)
{
private_update_sa_job_t *this = malloc_thing(private_update_sa_job_t);
-
+
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
-
+
this->reqid = reqid;
this->new = new;
diff --git a/src/charon/processing/jobs/update_sa_job.h b/src/charon/processing/jobs/update_sa_job.h
index 93262d46f..11d1ac9b6 100644
--- a/src/charon/processing/jobs/update_sa_job.h
+++ b/src/charon/processing/jobs/update_sa_job.h
@@ -40,7 +40,7 @@ struct update_sa_job_t {
/**
* Creates a job to update IKE and CHILD_SA addresses.
- *
+ *
* @param reqid reqid of the CHILD_SA
* @param new new address and port
* @return update_sa_job_t object
diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c
index 4a3943323..d5774af26 100644
--- a/src/charon/processing/processor.c
+++ b/src/charon/processing/processor.c
@@ -13,16 +13,17 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
-
+
#include <stdlib.h>
-#include <pthread.h>
#include <string.h>
#include <errno.h>
#include "processor.h"
#include <daemon.h>
-#include <utils/mutex.h>
+#include <threading/thread.h>
+#include <threading/condvar.h>
+#include <threading/mutex.h>
#include <utils/linked_list.h>
@@ -41,22 +42,28 @@ struct private_processor_t {
* Number of running threads
*/
u_int total_threads;
-
+
/**
* Desired number of threads
*/
u_int desired_threads;
-
+
/**
* Number of threads waiting for work
*/
u_int idle_threads;
/**
+ * All threads managed in the pool (including threads that have been
+ * cancelled, this allows to join them during destruction)
+ */
+ linked_list_t *threads;
+
+ /**
* The jobs are stored in a linked list
*/
linked_list_t *list;
-
+
/**
* access to linked_list is locked through this mutex
*/
@@ -66,7 +73,7 @@ struct private_processor_t {
* Condvar to wait for new jobs
*/
condvar_t *job_added;
-
+
/**
* Condvar to wait for terminated threads
*/
@@ -80,17 +87,23 @@ static void process_jobs(private_processor_t *this);
*/
static void restart(private_processor_t *this)
{
- pthread_t thread;
-
+ thread_t *thread;
+
+ DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id());
+
/* respawn thread if required */
- if (this->desired_threads == 0 ||
- pthread_create(&thread, NULL, (void*)process_jobs, this) != 0)
+ this->mutex->lock(this->mutex);
+ if (this->desired_threads < this->total_threads ||
+ (thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
{
- this->mutex->lock(this->mutex);
this->total_threads--;
- this->thread_terminated->broadcast(this->thread_terminated);
- this->mutex->unlock(this->mutex);
+ this->thread_terminated->signal(this->thread_terminated);
}
+ else
+ {
+ this->threads->insert_last(this->threads, thread);
+ }
+ this->mutex->unlock(this->mutex);
}
/**
@@ -98,17 +111,16 @@ static void restart(private_processor_t *this)
*/
static void process_jobs(private_processor_t *this)
{
- int oldstate;
-
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
-
- DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self());
-
+ /* worker threads are not cancellable by default */
+ thread_cancelability(FALSE);
+
+ DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id());
+
this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
{
job_t *job;
-
+
if (this->list->get_count(this->list) == 0)
{
this->idle_threads++;
@@ -119,14 +131,13 @@ static void process_jobs(private_processor_t *this)
this->list->remove_first(this->list, (void**)&job);
this->mutex->unlock(this->mutex);
/* terminated threads are restarted, so we have a constant pool */
- pthread_cleanup_push((void*)restart, this);
+ thread_cleanup_push((thread_cleanup_t)restart, this);
job->execute(job);
- pthread_cleanup_pop(0);
+ thread_cleanup_pop(FALSE);
this->mutex->lock(this->mutex);
}
- this->total_threads--;
- this->thread_terminated->signal(this->thread_terminated);
this->mutex->unlock(this->mutex);
+ restart(this);
}
/**
@@ -136,7 +147,7 @@ static u_int get_total_threads(private_processor_t *this)
{
u_int count;
this->mutex->lock(this->mutex);
- count = this->total_threads;
+ count = this->total_threads;
this->mutex->unlock(this->mutex);
return count;
}
@@ -175,7 +186,7 @@ static void queue_job(private_processor_t *this, job_t *job)
this->job_added->signal(this->job_added);
this->mutex->unlock(this->mutex);
}
-
+
/**
* Implementation of processor_t.set_threads.
*/
@@ -185,14 +196,16 @@ static void set_threads(private_processor_t *this, u_int count)
if (count > this->total_threads)
{ /* increase thread count */
int i;
- pthread_t current;
-
+ thread_t *current;
+
this->desired_threads = count;
DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
for (i = this->total_threads; i < count; i++)
{
- if (pthread_create(&current, NULL, (void*)process_jobs, this) == 0)
+ current = thread_create((thread_main_t)process_jobs, this);
+ if (current)
{
+ this->threads->insert_last(this->threads, current);
this->total_threads++;
}
}
@@ -210,6 +223,7 @@ static void set_threads(private_processor_t *this, u_int count)
*/
static void destroy(private_processor_t *this)
{
+ thread_t *current;
set_threads(this, 0);
this->mutex->lock(this->mutex);
while (this->total_threads > 0)
@@ -217,11 +231,17 @@ static void destroy(private_processor_t *this)
this->job_added->broadcast(this->job_added);
this->thread_terminated->wait(this->thread_terminated, this->mutex);
}
+ while (this->threads->remove_first(this->threads,
+ (void**)&current) == SUCCESS)
+ {
+ current->join(current);
+ }
this->mutex->unlock(this->mutex);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
this->list->destroy_offset(this->list, offsetof(job_t, destroy));
+ this->threads->destroy(this->threads);
free(this);
}
@@ -231,22 +251,23 @@ static void destroy(private_processor_t *this)
processor_t *processor_create(size_t pool_size)
{
private_processor_t *this = malloc_thing(private_processor_t);
-
+
this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads;
this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads;
this->public.get_job_load = (u_int(*)(processor_t*))get_job_load;
this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job;
this->public.set_threads = (void(*)(processor_t*, u_int))set_threads;
this->public.destroy = (void(*)(processor_t*))destroy;
-
+
this->list = linked_list_create();
+ this->threads = linked_list_create();
this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT);
this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT);
this->total_threads = 0;
this->desired_threads = 0;
this->idle_threads = 0;
-
+
return &this->public;
}
diff --git a/src/charon/processing/processor.h b/src/charon/processing/processor.h
index e56e69382..5bf8cf573 100644
--- a/src/charon/processing/processor.h
+++ b/src/charon/processing/processor.h
@@ -33,21 +33,21 @@ typedef struct processor_t processor_t;
* The processor uses threads to process queued jobs.
*/
struct processor_t {
-
+
/**
* Get the total number of threads used by the processor.
- *
+ *
* @return size of thread pool
*/
u_int (*get_total_threads) (processor_t *this);
-
+
/**
* Get the number of threads currently waiting.
- *
+ *
* @return number of idle threads
*/
u_int (*get_idle_threads) (processor_t *this);
-
+
/**
* Get the number of queued jobs.
*
@@ -60,10 +60,10 @@ struct processor_t {
*
* This function is non blocking and adds a job_t to the queue.
*
- * @param job job to add to the queue
+ * @param job job to add to the queue
*/
void (*queue_job) (processor_t *this, job_t *job);
-
+
/**
* Set the number of threads to use in the processor.
*
@@ -75,7 +75,7 @@ struct processor_t {
* @param count number of threads to allocate
*/
void (*set_threads)(processor_t *this, u_int count);
-
+
/**
* Destroy a processor object.
*/
diff --git a/src/charon/processing/scheduler.c b/src/charon/processing/scheduler.c
index 1f59205af..345af502a 100644
--- a/src/charon/processing/scheduler.c
+++ b/src/charon/processing/scheduler.c
@@ -16,14 +16,15 @@
*/
#include <stdlib.h>
-#include <pthread.h>
#include "scheduler.h"
#include <daemon.h>
#include <processing/processor.h>
#include <processing/jobs/callback_job.h>
-#include <utils/mutex.h>
+#include <threading/thread.h>
+#include <threading/condvar.h>
+#include <threading/mutex.h>
/* the initial size of the heap */
#define HEAP_SIZE_DEFAULT 64
@@ -38,7 +39,7 @@ struct event_t {
* Time to fire the event.
*/
timeval_t time;
-
+
/**
* Every event has its assigned job.
*/
@@ -60,37 +61,37 @@ typedef struct private_scheduler_t private_scheduler_t;
* Private data of a scheduler_t object.
*/
struct private_scheduler_t {
-
+
/**
* Public part of a scheduler_t object.
*/
scheduler_t public;
-
+
/**
* Job which queues scheduled jobs to the processor.
*/
callback_job_t *job;
-
+
/**
* The heap in which the events are stored.
*/
event_t **heap;
-
+
/**
* The size of the heap.
*/
u_int heap_size;
-
+
/**
* The number of scheduled events.
*/
u_int event_count;
-
+
/**
* Exclusive access to list
*/
mutex_t *mutex;
-
+
/**
* Condvar to wait for next job.
*/
@@ -140,12 +141,12 @@ static event_t *remove_event(private_scheduler_t *this)
{
return NULL;
}
-
+
/* store the value to return */
event = this->heap[1];
/* move the bottom event to the top */
top = this->heap[1] = this->heap[this->event_count];
-
+
if (--this->event_count > 1)
{
/* seep down the top event */
@@ -153,7 +154,7 @@ static event_t *remove_event(private_scheduler_t *this)
while ((position << 1) <= this->event_count)
{
u_int child = position << 1;
-
+
if ((child + 1) <= this->event_count &&
timeval_cmp(&this->heap[child + 1]->time,
&this->heap[child]->time) < 0)
@@ -161,14 +162,14 @@ static event_t *remove_event(private_scheduler_t *this)
/* the "right" child is smaller */
child++;
}
-
+
if (timeval_cmp(&top->time, &this->heap[child]->time) <= 0)
{
/* the top event fires before the smaller of the two children,
* stop */
break;
}
-
+
/* swap with the smaller child */
this->heap[position] = this->heap[child];
position = child;
@@ -185,13 +186,12 @@ static job_requeue_t schedule(private_scheduler_t * this)
{
timeval_t now;
event_t *event;
- int oldstate;
- bool timed = FALSE;
-
+ bool timed = FALSE, oldstate;
+
this->mutex->lock(this->mutex);
-
- gettimeofday(&now, NULL);
-
+
+ time_monotonic(&now);
+
if ((event = peek_event(this)) != NULL)
{
if (timeval_cmp(&now, &event->time) >= 0)
@@ -215,9 +215,9 @@ static job_requeue_t schedule(private_scheduler_t * this)
}
timed = TRUE;
}
- pthread_cleanup_push((void*)this->mutex->unlock, this->mutex);
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
-
+ thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
+ oldstate = thread_cancelability(TRUE);
+
if (timed)
{
this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time);
@@ -227,8 +227,8 @@ static job_requeue_t schedule(private_scheduler_t * this)
DBG2(DBG_JOB, "no events, waiting");
this->condvar->wait(this->condvar, this->mutex);
}
- pthread_setcancelstate(oldstate, NULL);
- pthread_cleanup_pop(TRUE);
+ thread_cancelability(oldstate);
+ thread_cleanup_pop(TRUE);
return JOB_REQUEUE_DIRECT;
}
@@ -251,13 +251,13 @@ static void schedule_job_tv(private_scheduler_t *this, job_t *job, timeval_t tv)
{
event_t *event;
u_int position;
-
+
event = malloc_thing(event_t);
event->job = job;
event->time = tv;
-
+
this->mutex->lock(this->mutex);
-
+
this->event_count++;
if (this->event_count > this->heap_size)
{
@@ -268,7 +268,7 @@ static void schedule_job_tv(private_scheduler_t *this, job_t *job, timeval_t tv)
}
/* "put" the event to the bottom */
position = this->event_count;
-
+
/* then bubble it up */
while (position > 1 && timeval_cmp(&this->heap[position >> 1]->time,
&event->time) > 0)
@@ -278,7 +278,7 @@ static void schedule_job_tv(private_scheduler_t *this, job_t *job, timeval_t tv)
position >>= 1;
}
this->heap[position] = event;
-
+
this->condvar->signal(this->condvar);
this->mutex->unlock(this->mutex);
}
@@ -289,10 +289,10 @@ static void schedule_job_tv(private_scheduler_t *this, job_t *job, timeval_t tv)
static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t s)
{
timeval_t tv;
-
- gettimeofday(&tv, NULL);
+
+ time_monotonic(&tv);
tv.tv_sec += s;
-
+
schedule_job_tv(this, job, tv);
}
@@ -302,13 +302,13 @@ static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t s)
static void schedule_job_ms(private_scheduler_t *this, job_t *job, u_int32_t ms)
{
timeval_t tv, add;
-
- gettimeofday(&tv, NULL);
+
+ time_monotonic(&tv);
add.tv_sec = ms / 1000;
add.tv_usec = (ms % 1000) * 1000;
-
+
timeradd(&tv, &add, &tv);
-
+
schedule_job_tv(this, job, tv);
}
@@ -335,24 +335,24 @@ static void destroy(private_scheduler_t *this)
scheduler_t * scheduler_create()
{
private_scheduler_t *this = malloc_thing(private_scheduler_t);
-
+
this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load;
this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t s)) schedule_job;
this->public.schedule_job_ms = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job_ms;
this->public.schedule_job_tv = (void (*) (scheduler_t *this, job_t *job, timeval_t tv)) schedule_job_tv;
this->public.destroy = (void(*)(scheduler_t*)) destroy;
-
+
/* Note: the root of the heap is at index 1 */
this->event_count = 0;
this->heap_size = HEAP_SIZE_DEFAULT;
this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
-
+
this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
-
+
this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL);
charon->processor->queue_job(charon->processor, (job_t*)this->job);
-
+
return &this->public;
}
diff --git a/src/charon/processing/scheduler.h b/src/charon/processing/scheduler.h
index 502f70b33..5f5d2a563 100644
--- a/src/charon/processing/scheduler.h
+++ b/src/charon/processing/scheduler.h
@@ -25,8 +25,6 @@
typedef struct scheduler_t scheduler_t;
-#include <sys/time.h>
-
#include <library.h>
#include <processing/jobs/job.h>
@@ -81,7 +79,7 @@ typedef struct scheduler_t scheduler_t;
* children has a smaller key or it is again a leaf node.
*/
struct scheduler_t {
-
+
/**
* Adds a event to the queue, using a relative time offset in s.
*
@@ -89,7 +87,7 @@ struct scheduler_t {
* @param time relative time to schedule job, in s
*/
void (*schedule_job) (scheduler_t *this, job_t *job, u_int32_t s);
-
+
/**
* Adds a event to the queue, using a relative time offset in ms.
*
@@ -97,22 +95,25 @@ struct scheduler_t {
* @param time relative time to schedule job, in ms
*/
void (*schedule_job_ms) (scheduler_t *this, job_t *job, u_int32_t ms);
-
+
/**
* Adds a event to the queue, using an absolut time.
*
+ * The passed timeval should be calculated based on the time_monotonic()
+ * function.
+ *
* @param job job to schedule
* @param time absolut time to schedule job
*/
void (*schedule_job_tv) (scheduler_t *this, job_t *job, timeval_t tv);
-
+
/**
* Returns number of jobs scheduled.
*
* @return number of scheduled jobs
*/
u_int (*get_job_load) (scheduler_t *this);
-
+
/**
* Destroys a scheduler object.
*/