summaryrefslogtreecommitdiff
path: root/src/charon/processing/jobs/callback_job.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/charon/processing/jobs/callback_job.c')
-rw-r--r--src/charon/processing/jobs/callback_job.c178
1 files changed, 121 insertions, 57 deletions
diff --git a/src/charon/processing/jobs/callback_job.c b/src/charon/processing/jobs/callback_job.c
index f4beb5abd..7e35dcdcb 100644
--- a/src/charon/processing/jobs/callback_job.c
+++ b/src/charon/processing/jobs/callback_job.c
@@ -1,4 +1,5 @@
/*
+ * Copyright (C) 2009 Tobias Brunner
* Copyright (C) 2007 Martin Willi
* Hochschule fuer Technik Rapperswil
*
@@ -12,13 +13,15 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
-
+
#include "callback_job.h"
-#include <pthread.h>
+#include <semaphore.h>
#include <daemon.h>
-#include <utils/mutex.h>
+#include <threading/thread.h>
+#include <threading/condvar.h>
+#include <threading/mutex.h>
typedef struct private_callback_job_t private_callback_job_t;
@@ -30,7 +33,7 @@ struct private_callback_job_t {
* Public callback_job_t interface.
*/
callback_job_t public;
-
+
/**
* Callback to call on execution
*/
@@ -40,70 +43,94 @@ struct private_callback_job_t {
* parameter to supply to callback
*/
void *data;
-
+
/**
* cleanup function for data
*/
callback_job_cleanup_t cleanup;
-
+
/**
- * thread ID of the job, if running
+ * thread of the job, if running
*/
- pthread_t thread;
-
+ thread_t *thread;
+
/**
* mutex to access jobs interna
*/
mutex_t *mutex;
-
+
/**
* list of asociated child jobs
*/
linked_list_t *children;
-
+
/**
* parent of this job, or NULL
*/
private_callback_job_t *parent;
-};
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_callback_job_t *this)
-{
- if (this->cleanup)
- {
- this->cleanup(this->data);
- }
- this->children->destroy(this->children);
- this->mutex->destroy(this->mutex);
- free(this);
-}
+ /**
+ * TRUE if the job got cancelled
+ */
+ bool cancelled;
+
+ /**
+ * condvar to synchronize the cancellation/destruction of the job
+ */
+ condvar_t *destroyable;
+
+ /**
+ * semaphore to synchronize the termination of the assigned thread.
+ *
+ * separately allocated during cancellation, so that we can wait on it
+ * without risking that it gets freed too early during destruction.
+ */
+ sem_t *terminated;
+};
/**
* unregister a child from its parent, if any.
+ * note: this->mutex has to be locked
*/
static void unregister(private_callback_job_t *this)
{
if (this->parent)
{
- iterator_t *iterator;
- private_callback_job_t *child;
-
this->parent->mutex->lock(this->parent->mutex);
- iterator = this->parent->children->create_iterator(this->parent->children, TRUE);
- while (iterator->iterate(iterator, (void**)&child))
+ if (this->parent->cancelled && !this->cancelled)
{
- if (child == this)
- {
- iterator->remove(iterator);
- break;
- }
+ /* if the parent has been cancelled but we have not yet, we do not
+ * unregister until we got cancelled by the parent. */
+ this->parent->mutex->unlock(this->parent->mutex);
+ this->destroyable->wait(this->destroyable, this->mutex);
+ this->parent->mutex->lock(this->parent->mutex);
}
- iterator->destroy(iterator);
+ this->parent->children->remove(this->parent->children, this, NULL);
this->parent->mutex->unlock(this->parent->mutex);
+ this->parent = NULL;
+ }
+}
+
+/**
+ * Implements job_t.destroy.
+ */
+static void destroy(private_callback_job_t *this)
+{
+ this->mutex->lock(this->mutex);
+ unregister(this);
+ if (this->cleanup)
+ {
+ this->cleanup(this->data);
}
+ if (this->terminated)
+ {
+ sem_post(this->terminated);
+ }
+ this->children->destroy(this->children);
+ this->destroyable->destroy(this->destroyable);
+ this->mutex->unlock(this->mutex);
+ this->mutex->destroy(this->mutex);
+ free(this);
}
/**
@@ -111,20 +138,42 @@ static void unregister(private_callback_job_t *this)
*/
static void cancel(private_callback_job_t *this)
{
- pthread_t thread;
-
+ callback_job_t *child;
+ sem_t *terminated = NULL;
+
this->mutex->lock(this->mutex);
- thread = this->thread;
-
- /* terminate its children */
- this->children->invoke_offset(this->children, offsetof(callback_job_t, cancel));
+ this->cancelled = TRUE;
+ /* terminate children */
+ while (this->children->get_first(this->children, (void**)&child) == SUCCESS)
+ {
+ this->mutex->unlock(this->mutex);
+ child->cancel(child);
+ this->mutex->lock(this->mutex);
+ }
+ if (this->thread)
+ {
+ /* terminate the thread, if there is currently one executing the job.
+ * we wait for its termination using a semaphore */
+ this->thread->cancel(this->thread);
+ terminated = this->terminated = malloc_thing(sem_t);
+ sem_init(terminated, 0, 0);
+ }
+ else
+ {
+ /* if the job is currently queued, it gets terminated later.
+ * we can't wait, because it might not get executed at all.
+ * we also unregister the queued job manually from its parent (the
+ * others get unregistered during destruction) */
+ unregister(this);
+ }
+ this->destroyable->signal(this->destroyable);
this->mutex->unlock(this->mutex);
-
- /* terminate thread */
- if (thread)
+
+ if (terminated)
{
- pthread_cancel(thread);
- pthread_join(thread, NULL);
+ sem_wait(terminated);
+ sem_destroy(terminated);
+ free(terminated);
}
}
@@ -135,20 +184,28 @@ static void execute(private_callback_job_t *this)
{
bool cleanup = FALSE;
+ thread_cleanup_push((thread_cleanup_t)destroy, this);
+
this->mutex->lock(this->mutex);
- this->thread = pthread_self();
+ this->thread = thread_current();
this->mutex->unlock(this->mutex);
-
- pthread_cleanup_push((void*)destroy, this);
+
while (TRUE)
{
+ this->mutex->lock(this->mutex);
+ if (this->cancelled)
+ {
+ this->mutex->unlock(this->mutex);
+ cleanup = TRUE;
+ break;
+ }
+ this->mutex->unlock(this->mutex);
switch (this->callback(this->data))
{
case JOB_REQUEUE_DIRECT:
continue;
case JOB_REQUEUE_FAIR:
{
- this->thread = 0;
charon->processor->queue_job(charon->processor,
&this->public.job_interface);
break;
@@ -156,15 +213,19 @@ static void execute(private_callback_job_t *this)
case JOB_REQUEUE_NONE:
default:
{
- this->thread = 0;
cleanup = TRUE;
break;
}
}
break;
}
- unregister(this);
- pthread_cleanup_pop(cleanup);
+ this->mutex->lock(this->mutex);
+ this->thread = NULL;
+ this->mutex->unlock(this->mutex);
+ /* manually create a cancellation point to avoid that a cancelled thread
+ * goes back into the thread pool */
+ thread_cancellation_point();
+ thread_cleanup_pop(cleanup);
}
/*
@@ -175,7 +236,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
callback_job_t *parent)
{
private_callback_job_t *this = malloc_thing(private_callback_job_t);
-
+
/* interface functions */
this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
@@ -189,7 +250,10 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
this->thread = 0;
this->children = linked_list_create();
this->parent = (private_callback_job_t*)parent;
-
+ this->cancelled = FALSE;
+ this->destroyable = condvar_create(CONDVAR_TYPE_DEFAULT);
+ this->terminated = NULL;
+
/* register us at parent */
if (parent)
{
@@ -197,7 +261,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
this->parent->children->insert_last(this->parent->children, this);
this->parent->mutex->unlock(this->parent->mutex);
}
-
+
return &this->public;
}