summaryrefslogtreecommitdiff
path: root/src/libstrongswan/processing/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstrongswan/processing/jobs')
-rw-r--r--src/libstrongswan/processing/jobs/callback_job.c186
-rw-r--r--src/libstrongswan/processing/jobs/callback_job.h66
-rw-r--r--src/libstrongswan/processing/jobs/job.h107
3 files changed, 138 insertions, 221 deletions
diff --git a/src/libstrongswan/processing/jobs/callback_job.c b/src/libstrongswan/processing/jobs/callback_job.c
index 13f22e69c..a5ddc8ff6 100644
--- a/src/libstrongswan/processing/jobs/callback_job.c
+++ b/src/libstrongswan/processing/jobs/callback_job.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2009 Tobias Brunner
+ * Copyright (C) 2009-2012 Tobias Brunner
* Copyright (C) 2007-2011 Martin Willi
* Copyright (C) 2011 revosec AG
* Hochschule fuer Technik Rapperswil
@@ -17,10 +17,9 @@
#include "callback_job.h"
-#include <semaphore.h>
-
#include <threading/thread.h>
#include <threading/condvar.h>
+#include <threading/semaphore.h>
#include <threading/mutex.h>
#include <utils/linked_list.h>
@@ -52,42 +51,9 @@ struct private_callback_job_t {
callback_job_cleanup_t cleanup;
/**
- * thread of the job, if running
- */
- thread_t *thread;
-
- /**
- * mutex to access jobs interna
- */
- mutex_t *mutex;
-
- /**
- * list of associated child jobs
- */
- linked_list_t *children;
-
- /**
- * parent of this job, or NULL
+ * cancel function
*/
- private_callback_job_t *parent;
-
- /**
- * TRUE if the job got cancelled
- */
- bool cancelled;
-
- /**
- * condvar to synchronize the cancellation/destruction of the job
- */
- condvar_t *destroyable;
-
- /**
- * semaphore to synchronize the termination of the assigned thread.
- *
- * separately allocated during cancellation, so that we can wait on it
- * without risking that it gets freed too early during destruction.
- */
- sem_t *terminated;
+ callback_job_cancel_t cancel;
/**
* Priority of this job
@@ -95,141 +61,26 @@ struct private_callback_job_t {
job_priority_t prio;
};
-/**
- * unregister a child from its parent, if any.
- * note: this->mutex has to be locked
- */
-static void unregister(private_callback_job_t *this)
-{
- if (this->parent)
- {
- this->parent->mutex->lock(this->parent->mutex);
- if (this->parent->cancelled && !this->cancelled)
- {
- /* if the parent has been cancelled but we have not yet, we do not
- * unregister until we got cancelled by the parent. */
- this->parent->mutex->unlock(this->parent->mutex);
- this->destroyable->wait(this->destroyable, this->mutex);
- this->parent->mutex->lock(this->parent->mutex);
- }
- this->parent->children->remove(this->parent->children, this, NULL);
- this->parent->mutex->unlock(this->parent->mutex);
- this->parent = NULL;
- }
-}
-
METHOD(job_t, destroy, void,
private_callback_job_t *this)
{
- this->mutex->lock(this->mutex);
- unregister(this);
if (this->cleanup)
{
this->cleanup(this->data);
}
- if (this->terminated)
- {
- sem_post(this->terminated);
- }
- this->children->destroy(this->children);
- this->destroyable->destroy(this->destroyable);
- this->mutex->unlock(this->mutex);
- this->mutex->destroy(this->mutex);
free(this);
}
-METHOD(callback_job_t, cancel, void,
+METHOD(job_t, execute, job_requeue_t,
private_callback_job_t *this)
{
- callback_job_t *child;
- sem_t *terminated = NULL;
-
- this->mutex->lock(this->mutex);
- this->cancelled = TRUE;
- /* terminate children */
- while (this->children->get_first(this->children, (void**)&child) == SUCCESS)
- {
- this->mutex->unlock(this->mutex);
- child->cancel(child);
- this->mutex->lock(this->mutex);
- }
- if (this->thread)
- {
- /* terminate the thread, if there is currently one executing the job.
- * we wait for its termination using a semaphore */
- this->thread->cancel(this->thread);
- terminated = this->terminated = malloc_thing(sem_t);
- sem_init(terminated, 0, 0);
- }
- else
- {
- /* if the job is currently queued, it gets terminated later.
- * we can't wait, because it might not get executed at all.
- * we also unregister the queued job manually from its parent (the
- * others get unregistered during destruction) */
- unregister(this);
- }
- this->destroyable->signal(this->destroyable);
- this->mutex->unlock(this->mutex);
-
- if (terminated)
- {
- sem_wait(terminated);
- sem_destroy(terminated);
- free(terminated);
- }
+ return this->callback(this->data);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, cancel, bool,
private_callback_job_t *this)
{
- bool cleanup = FALSE, requeue = FALSE;
-
- thread_cleanup_push((thread_cleanup_t)destroy, this);
-
- this->mutex->lock(this->mutex);
- this->thread = thread_current();
- this->mutex->unlock(this->mutex);
-
- while (TRUE)
- {
- this->mutex->lock(this->mutex);
- if (this->cancelled)
- {
- this->mutex->unlock(this->mutex);
- cleanup = TRUE;
- break;
- }
- this->mutex->unlock(this->mutex);
- switch (this->callback(this->data))
- {
- case JOB_REQUEUE_DIRECT:
- continue;
- case JOB_REQUEUE_FAIR:
- {
- requeue = TRUE;
- break;
- }
- case JOB_REQUEUE_NONE:
- default:
- {
- cleanup = TRUE;
- break;
- }
- }
- break;
- }
- this->mutex->lock(this->mutex);
- this->thread = NULL;
- this->mutex->unlock(this->mutex);
- /* manually create a cancellation point to avoid that a cancelled thread
- * goes back into the thread pool */
- thread_cancellation_point();
- if (requeue)
- {
- lib->processor->queue_job(lib->processor, &this->public.job);
- }
- thread_cleanup_pop(cleanup);
+ return this->cancel(this->data);
}
METHOD(job_t, get_priority, job_priority_t,
@@ -242,8 +93,8 @@ METHOD(job_t, get_priority, job_priority_t,
* Described in header.
*/
callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
- callback_job_cleanup_t cleanup, callback_job_t *parent,
- job_priority_t prio)
+ callback_job_cleanup_t cleanup, callback_job_cancel_t cancel,
+ job_priority_t prio)
{
private_callback_job_t *this;
@@ -254,24 +105,17 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
.get_priority = _get_priority,
.destroy = _destroy,
},
- .cancel = _cancel,
},
- .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.callback = cb,
.data = data,
.cleanup = cleanup,
- .children = linked_list_create(),
- .parent = (private_callback_job_t*)parent,
- .destroyable = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .cancel = cancel,
.prio = prio,
);
- /* register us at parent */
- if (parent)
+ if (cancel)
{
- this->parent->mutex->lock(this->parent->mutex);
- this->parent->children->insert_last(this->parent->children, this);
- this->parent->mutex->unlock(this->parent->mutex);
+ this->public.job.cancel = _cancel;
}
return &this->public;
@@ -282,8 +126,8 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
*/
callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
callback_job_cleanup_t cleanup,
- callback_job_t *parent)
+ callback_job_cancel_t cancel)
{
- return callback_job_create_with_prio(cb, data, cleanup, parent,
+ return callback_job_create_with_prio(cb, data, cleanup, cancel,
JOB_PRIO_MEDIUM);
}
diff --git a/src/libstrongswan/processing/jobs/callback_job.h b/src/libstrongswan/processing/jobs/callback_job.h
index 3e92b01c0..6f2e39eb8 100644
--- a/src/libstrongswan/processing/jobs/callback_job.h
+++ b/src/libstrongswan/processing/jobs/callback_job.h
@@ -1,4 +1,5 @@
/*
+ * Copyright (C) 2012 Tobias Brunner
* Copyright (C) 2007-2011 Martin Willi
* Copyright (C) 2011 revosec AG
* Hochschule fuer Technik Rapperswil
@@ -27,33 +28,6 @@ typedef struct callback_job_t callback_job_t;
#include <library.h>
#include <processing/jobs/job.h>
-
-typedef enum job_requeue_t job_requeue_t;
-
-/**
- * Job requeueing policy.
- *
- * The job requeueing policy defines how a job is handled when the callback
- * function returns.
- */
-enum job_requeue_t {
-
- /**
- * Do not requeue job, destroy it
- */
- JOB_REQUEUE_NONE,
-
- /**
- * Reque the job fairly, meaning it has to requeue as any other job
- */
- JOB_REQUEUE_FAIR,
-
- /**
- * Reexecute the job directly, without the need of requeueing it
- */
- JOB_REQUEUE_DIRECT,
-};
-
/**
* The callback function to use for the callback job.
*
@@ -73,11 +47,22 @@ typedef job_requeue_t (*callback_job_cb_t)(void *data);
* to supply to the constructor.
*
* @param data param supplied to job
- * @return requeing policy how to requeue the job
*/
typedef void (*callback_job_cleanup_t)(void *data);
/**
+ * Cancellation function to use for the callback job.
+ *
+ * Optional function to be called when a job has to be canceled.
+ *
+ * See job_t.cancel() for details on the return value.
+ *
+ * @param data param supplied to job
+ * @return TRUE if canceled, FALSE to explicitly cancel the thread
+ */
+typedef bool (*callback_job_cancel_t)(void *data);
+
+/**
* Class representing an callback Job.
*
* This is a special job which allows a simple callback function to
@@ -91,14 +76,6 @@ struct callback_job_t {
*/
job_t job;
- /**
- * Cancel the job's thread and wait for its termination.
- *
- * This only works reliably for jobs that always use JOB_REQUEUE_FAIR or
- * JOB_REQUEUE_DIRECT, otherwise the job may already be destroyed when
- * cancel is called.
- */
- void (*cancel)(callback_job_t *this);
};
/**
@@ -106,19 +83,20 @@ struct callback_job_t {
*
* The cleanup function is called when the job gets destroyed to destroy
* the associated data.
- * If parent is not NULL, the specified job gets an association. Whenever
- * the parent gets cancelled (or runs out), all of its children are cancelled,
- * too.
+ *
+ * The cancel function is optional and should only be provided if the callback
+ * function calls potentially blocking functions and/or always returns
+ * JOB_REQUEUE_DIRECT.
*
* @param cb callback to call from the processor
* @param data user data to supply to callback
* @param cleanup destructor for data on destruction, or NULL
- * @param parent parent of this job
+ * @param cancel function to cancel the job, or NULL
* @return callback_job_t object
*/
callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
callback_job_cleanup_t cleanup,
- callback_job_t *parent);
+ callback_job_cancel_t cancel);
/**
* Creates a callback job, with priority.
@@ -128,12 +106,12 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
* @param cb callback to call from the processor
* @param data user data to supply to callback
* @param cleanup destructor for data on destruction, or NULL
- * @param parent parent of this job
+ * @param cancel function to cancel the job, or NULL
* @param prio job priority
* @return callback_job_t object
*/
callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
- callback_job_cleanup_t cleanup, callback_job_t *parent,
- job_priority_t prio);
+ callback_job_cleanup_t cleanup, callback_job_cancel_t cancel,
+ job_priority_t prio);
#endif /** CALLBACK_JOB_H_ @}*/
diff --git a/src/libstrongswan/processing/jobs/job.h b/src/libstrongswan/processing/jobs/job.h
index d25cee03e..64454718a 100644
--- a/src/libstrongswan/processing/jobs/job.h
+++ b/src/libstrongswan/processing/jobs/job.h
@@ -1,4 +1,5 @@
/*
+ * Copyright (C) 2012 Tobias Brunner
* Copyright (C) 2005-2006 Martin Willi
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
@@ -24,6 +25,9 @@
typedef struct job_t job_t;
typedef enum job_priority_t job_priority_t;
+typedef enum job_status_t job_status_t;
+typedef enum job_requeue_type_t job_requeue_type_t;
+typedef struct job_requeue_t job_requeue_t;
#include <library.h>
@@ -48,18 +52,107 @@ enum job_priority_t {
extern enum_name_t *job_priority_names;
/**
+ * Job status
+ */
+enum job_status_t {
+ /** The job is queued and has not yet been executed */
+ JOB_STATUS_QUEUED = 0,
+ /** During execution */
+ JOB_STATUS_EXECUTING,
+ /** If the job got canceled */
+ JOB_STATUS_CANCELED,
+ /** The job was executed successfully */
+ JOB_STATUS_DONE,
+};
+
+/**
+ * How a job is handled after is has been executed.
+ */
+enum job_requeue_type_t {
+ /** Do not requeue job, destroy it */
+ JOB_REQUEUE_TYPE_NONE = 0,
+ /** Requeue the job fairly, i.e. it is inserted at the end of the queue */
+ JOB_REQUEUE_TYPE_FAIR,
+ /** Reexecute the job directly, without the need of requeueing it */
+ JOB_REQUEUE_TYPE_DIRECT,
+ /** Rescheduled the job via scheduler_t */
+ JOB_REQUEUE_TYPE_SCHEDULE,
+};
+
+/**
+ * Job requeueing policy.
+ *
+ * The job requeueing policy defines how a job is handled after it has been
+ * executed.
+ */
+struct job_requeue_t {
+ /** How to handle the job after executing it */
+ job_requeue_type_t type;
+ /** How to reschedule the job, if so */
+ enum {
+ JOB_SCHEDULE,
+ JOB_SCHEDULE_MS,
+ JOB_SCHEDULE_TV,
+ } schedule;
+ /** Time to reschedule the job */
+ union {
+ u_int32_t rel;
+ timeval_t abs;
+ } time;
+};
+
+/**
+ * Helper macros to easily define requeueing policies.
+ */
+#define __JOB_REQUEUE(t) (job_requeue_t){ .type = t }
+#define JOB_REQUEUE_NONE __JOB_REQUEUE(JOB_REQUEUE_TYPE_NONE)
+#define JOB_REQUEUE_FAIR __JOB_REQUEUE(JOB_REQUEUE_TYPE_FAIR)
+#define JOB_REQUEUE_DIRECT __JOB_REQUEUE(JOB_REQUEUE_TYPE_DIRECT)
+#define __JOB_RESCHEDULE(t, ...) (job_requeue_t){ .type = JOB_REQUEUE_TYPE_SCHEDULE, .schedule = t, { __VA_ARGS__ } }
+#define JOB_RESCHEDULE(s) __JOB_RESCHEDULE(JOB_SCHEDULE, .rel = s)
+#define JOB_RESCHEDULE_MS(ms) __JOB_RESCHEDULE(JOB_SCHEDULE_MS, .rel = ms)
+#define JOB_RESCHEDULE_TV(tv) __JOB_RESCHEDULE(JOB_SCHEDULE_TV, .abs = tv)
+
+/**
* Job interface as it is stored in the job queue.
*/
struct job_t {
/**
+ * Status of this job, is modified exclusively by the processor/scheduler
+ */
+ job_status_t status;
+
+ /**
* Execute a job.
*
* The processing facility executes a job using this method. Jobs are
- * one-shot, they destroy themself after execution, so don't use a job
- * once it has been executed.
+ * one-shot, they are destroyed after execution (depending on the return
+ * value here), so don't use a job once it has been queued.
+ *
+ * @return policy how to requeue the job
+ */
+ job_requeue_t (*execute) (job_t *this);
+
+ /**
+ * Cancel a job.
+ *
+ * Implementing this method is optional. It allows potentially blocking
+ * jobs to be canceled during shutdown.
+ *
+ * If no special action is to be taken simply return FALSE then the thread
+ * executing the job will be canceled. If TRUE is returned the job is
+ * expected to return from execute() itself (i.e. the thread won't be
+ * canceled explicitly and can still be joined later).
+ * Jobs that return FALSE have to make sure they provide the appropriate
+ * cancellation points.
+ *
+ * @note Regular jobs that do not block MUST NOT implement this method.
+ * @note This method could be called even before execute() has been called.
+ *
+ * @return FALSE to cancel the thread, TRUE if canceled otherwise
*/
- void (*execute) (job_t *this);
+ bool (*cancel)(job_t *this);
/**
* Get the priority of a job.
@@ -71,10 +164,12 @@ struct job_t {
/**
* Destroy a job.
*
- * Is only called whenever a job was not executed (e.g. due daemon shutdown).
- * After execution, jobs destroy themself.
+ * Is called after a job is executed or got canceled. It is also called
+ * for queued jobs that were never executed.
+ *
+ * Use the status of a job to decide what to do during destruction.
*/
- void (*destroy) (job_t *this);
+ void (*destroy)(job_t *this);
};
#endif /** JOB_H_ @}*/