summaryrefslogtreecommitdiff
path: root/src/libstrongswan/processing
diff options
context:
space:
mode:
authorYves-Alexis Perez <corsac@debian.org>2013-01-02 14:18:20 +0100
committerYves-Alexis Perez <corsac@debian.org>2013-01-02 14:18:20 +0100
commitc1343b3278cdf99533b7902744d15969f9d6fdc1 (patch)
treed5ed3dc5677a59260ec41cd39bb284d3e94c91b3 /src/libstrongswan/processing
parentb34738ed08c2227300d554b139e2495ca5da97d6 (diff)
downloadvyos-strongswan-c1343b3278cdf99533b7902744d15969f9d6fdc1.tar.gz
vyos-strongswan-c1343b3278cdf99533b7902744d15969f9d6fdc1.zip
Imported Upstream version 5.0.1
Diffstat (limited to 'src/libstrongswan/processing')
-rw-r--r--src/libstrongswan/processing/jobs/callback_job.c186
-rw-r--r--src/libstrongswan/processing/jobs/callback_job.h66
-rw-r--r--src/libstrongswan/processing/jobs/job.h107
-rw-r--r--src/libstrongswan/processing/processor.c219
-rw-r--r--src/libstrongswan/processing/processor.h14
-rw-r--r--src/libstrongswan/processing/scheduler.c14
6 files changed, 320 insertions, 286 deletions
diff --git a/src/libstrongswan/processing/jobs/callback_job.c b/src/libstrongswan/processing/jobs/callback_job.c
index 13f22e69c..a5ddc8ff6 100644
--- a/src/libstrongswan/processing/jobs/callback_job.c
+++ b/src/libstrongswan/processing/jobs/callback_job.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2009 Tobias Brunner
+ * Copyright (C) 2009-2012 Tobias Brunner
* Copyright (C) 2007-2011 Martin Willi
* Copyright (C) 2011 revosec AG
* Hochschule fuer Technik Rapperswil
@@ -17,10 +17,9 @@
#include "callback_job.h"
-#include <semaphore.h>
-
#include <threading/thread.h>
#include <threading/condvar.h>
+#include <threading/semaphore.h>
#include <threading/mutex.h>
#include <utils/linked_list.h>
@@ -52,42 +51,9 @@ struct private_callback_job_t {
callback_job_cleanup_t cleanup;
/**
- * thread of the job, if running
- */
- thread_t *thread;
-
- /**
- * mutex to access jobs interna
- */
- mutex_t *mutex;
-
- /**
- * list of associated child jobs
- */
- linked_list_t *children;
-
- /**
- * parent of this job, or NULL
+ * cancel function
*/
- private_callback_job_t *parent;
-
- /**
- * TRUE if the job got cancelled
- */
- bool cancelled;
-
- /**
- * condvar to synchronize the cancellation/destruction of the job
- */
- condvar_t *destroyable;
-
- /**
- * semaphore to synchronize the termination of the assigned thread.
- *
- * separately allocated during cancellation, so that we can wait on it
- * without risking that it gets freed too early during destruction.
- */
- sem_t *terminated;
+ callback_job_cancel_t cancel;
/**
* Priority of this job
@@ -95,141 +61,26 @@ struct private_callback_job_t {
job_priority_t prio;
};
-/**
- * unregister a child from its parent, if any.
- * note: this->mutex has to be locked
- */
-static void unregister(private_callback_job_t *this)
-{
- if (this->parent)
- {
- this->parent->mutex->lock(this->parent->mutex);
- if (this->parent->cancelled && !this->cancelled)
- {
- /* if the parent has been cancelled but we have not yet, we do not
- * unregister until we got cancelled by the parent. */
- this->parent->mutex->unlock(this->parent->mutex);
- this->destroyable->wait(this->destroyable, this->mutex);
- this->parent->mutex->lock(this->parent->mutex);
- }
- this->parent->children->remove(this->parent->children, this, NULL);
- this->parent->mutex->unlock(this->parent->mutex);
- this->parent = NULL;
- }
-}
-
METHOD(job_t, destroy, void,
private_callback_job_t *this)
{
- this->mutex->lock(this->mutex);
- unregister(this);
if (this->cleanup)
{
this->cleanup(this->data);
}
- if (this->terminated)
- {
- sem_post(this->terminated);
- }
- this->children->destroy(this->children);
- this->destroyable->destroy(this->destroyable);
- this->mutex->unlock(this->mutex);
- this->mutex->destroy(this->mutex);
free(this);
}
-METHOD(callback_job_t, cancel, void,
+METHOD(job_t, execute, job_requeue_t,
private_callback_job_t *this)
{
- callback_job_t *child;
- sem_t *terminated = NULL;
-
- this->mutex->lock(this->mutex);
- this->cancelled = TRUE;
- /* terminate children */
- while (this->children->get_first(this->children, (void**)&child) == SUCCESS)
- {
- this->mutex->unlock(this->mutex);
- child->cancel(child);
- this->mutex->lock(this->mutex);
- }
- if (this->thread)
- {
- /* terminate the thread, if there is currently one executing the job.
- * we wait for its termination using a semaphore */
- this->thread->cancel(this->thread);
- terminated = this->terminated = malloc_thing(sem_t);
- sem_init(terminated, 0, 0);
- }
- else
- {
- /* if the job is currently queued, it gets terminated later.
- * we can't wait, because it might not get executed at all.
- * we also unregister the queued job manually from its parent (the
- * others get unregistered during destruction) */
- unregister(this);
- }
- this->destroyable->signal(this->destroyable);
- this->mutex->unlock(this->mutex);
-
- if (terminated)
- {
- sem_wait(terminated);
- sem_destroy(terminated);
- free(terminated);
- }
+ return this->callback(this->data);
}
-METHOD(job_t, execute, void,
+METHOD(job_t, cancel, bool,
private_callback_job_t *this)
{
- bool cleanup = FALSE, requeue = FALSE;
-
- thread_cleanup_push((thread_cleanup_t)destroy, this);
-
- this->mutex->lock(this->mutex);
- this->thread = thread_current();
- this->mutex->unlock(this->mutex);
-
- while (TRUE)
- {
- this->mutex->lock(this->mutex);
- if (this->cancelled)
- {
- this->mutex->unlock(this->mutex);
- cleanup = TRUE;
- break;
- }
- this->mutex->unlock(this->mutex);
- switch (this->callback(this->data))
- {
- case JOB_REQUEUE_DIRECT:
- continue;
- case JOB_REQUEUE_FAIR:
- {
- requeue = TRUE;
- break;
- }
- case JOB_REQUEUE_NONE:
- default:
- {
- cleanup = TRUE;
- break;
- }
- }
- break;
- }
- this->mutex->lock(this->mutex);
- this->thread = NULL;
- this->mutex->unlock(this->mutex);
- /* manually create a cancellation point to avoid that a cancelled thread
- * goes back into the thread pool */
- thread_cancellation_point();
- if (requeue)
- {
- lib->processor->queue_job(lib->processor, &this->public.job);
- }
- thread_cleanup_pop(cleanup);
+ return this->cancel(this->data);
}
METHOD(job_t, get_priority, job_priority_t,
@@ -242,8 +93,8 @@ METHOD(job_t, get_priority, job_priority_t,
* Described in header.
*/
callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
- callback_job_cleanup_t cleanup, callback_job_t *parent,
- job_priority_t prio)
+ callback_job_cleanup_t cleanup, callback_job_cancel_t cancel,
+ job_priority_t prio)
{
private_callback_job_t *this;
@@ -254,24 +105,17 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
.get_priority = _get_priority,
.destroy = _destroy,
},
- .cancel = _cancel,
},
- .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.callback = cb,
.data = data,
.cleanup = cleanup,
- .children = linked_list_create(),
- .parent = (private_callback_job_t*)parent,
- .destroyable = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .cancel = cancel,
.prio = prio,
);
- /* register us at parent */
- if (parent)
+ if (cancel)
{
- this->parent->mutex->lock(this->parent->mutex);
- this->parent->children->insert_last(this->parent->children, this);
- this->parent->mutex->unlock(this->parent->mutex);
+ this->public.job.cancel = _cancel;
}
return &this->public;
@@ -282,8 +126,8 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
*/
callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
callback_job_cleanup_t cleanup,
- callback_job_t *parent)
+ callback_job_cancel_t cancel)
{
- return callback_job_create_with_prio(cb, data, cleanup, parent,
+ return callback_job_create_with_prio(cb, data, cleanup, cancel,
JOB_PRIO_MEDIUM);
}
diff --git a/src/libstrongswan/processing/jobs/callback_job.h b/src/libstrongswan/processing/jobs/callback_job.h
index 3e92b01c0..6f2e39eb8 100644
--- a/src/libstrongswan/processing/jobs/callback_job.h
+++ b/src/libstrongswan/processing/jobs/callback_job.h
@@ -1,4 +1,5 @@
/*
+ * Copyright (C) 2012 Tobias Brunner
* Copyright (C) 2007-2011 Martin Willi
* Copyright (C) 2011 revosec AG
* Hochschule fuer Technik Rapperswil
@@ -27,33 +28,6 @@ typedef struct callback_job_t callback_job_t;
#include <library.h>
#include <processing/jobs/job.h>
-
-typedef enum job_requeue_t job_requeue_t;
-
-/**
- * Job requeueing policy.
- *
- * The job requeueing policy defines how a job is handled when the callback
- * function returns.
- */
-enum job_requeue_t {
-
- /**
- * Do not requeue job, destroy it
- */
- JOB_REQUEUE_NONE,
-
- /**
- * Reque the job fairly, meaning it has to requeue as any other job
- */
- JOB_REQUEUE_FAIR,
-
- /**
- * Reexecute the job directly, without the need of requeueing it
- */
- JOB_REQUEUE_DIRECT,
-};
-
/**
* The callback function to use for the callback job.
*
@@ -73,11 +47,22 @@ typedef job_requeue_t (*callback_job_cb_t)(void *data);
* to supply to the constructor.
*
* @param data param supplied to job
- * @return requeing policy how to requeue the job
*/
typedef void (*callback_job_cleanup_t)(void *data);
/**
+ * Cancellation function to use for the callback job.
+ *
+ * Optional function to be called when a job has to be canceled.
+ *
+ * See job_t.cancel() for details on the return value.
+ *
+ * @param data param supplied to job
+ * @return TRUE if canceled, FALSE to explicitly cancel the thread
+ */
+typedef bool (*callback_job_cancel_t)(void *data);
+
+/**
* Class representing an callback Job.
*
* This is a special job which allows a simple callback function to
@@ -91,14 +76,6 @@ struct callback_job_t {
*/
job_t job;
- /**
- * Cancel the job's thread and wait for its termination.
- *
- * This only works reliably for jobs that always use JOB_REQUEUE_FAIR or
- * JOB_REQUEUE_DIRECT, otherwise the job may already be destroyed when
- * cancel is called.
- */
- void (*cancel)(callback_job_t *this);
};
/**
@@ -106,19 +83,20 @@ struct callback_job_t {
*
* The cleanup function is called when the job gets destroyed to destroy
* the associated data.
- * If parent is not NULL, the specified job gets an association. Whenever
- * the parent gets cancelled (or runs out), all of its children are cancelled,
- * too.
+ *
+ * The cancel function is optional and should only be provided if the callback
+ * function calls potentially blocking functions and/or always returns
+ * JOB_REQUEUE_DIRECT.
*
* @param cb callback to call from the processor
* @param data user data to supply to callback
* @param cleanup destructor for data on destruction, or NULL
- * @param parent parent of this job
+ * @param cancel function to cancel the job, or NULL
* @return callback_job_t object
*/
callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
callback_job_cleanup_t cleanup,
- callback_job_t *parent);
+ callback_job_cancel_t cancel);
/**
* Creates a callback job, with priority.
@@ -128,12 +106,12 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
* @param cb callback to call from the processor
* @param data user data to supply to callback
* @param cleanup destructor for data on destruction, or NULL
- * @param parent parent of this job
+ * @param cancel function to cancel the job, or NULL
* @param prio job priority
* @return callback_job_t object
*/
callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
- callback_job_cleanup_t cleanup, callback_job_t *parent,
- job_priority_t prio);
+ callback_job_cleanup_t cleanup, callback_job_cancel_t cancel,
+ job_priority_t prio);
#endif /** CALLBACK_JOB_H_ @}*/
diff --git a/src/libstrongswan/processing/jobs/job.h b/src/libstrongswan/processing/jobs/job.h
index d25cee03e..64454718a 100644
--- a/src/libstrongswan/processing/jobs/job.h
+++ b/src/libstrongswan/processing/jobs/job.h
@@ -1,4 +1,5 @@
/*
+ * Copyright (C) 2012 Tobias Brunner
* Copyright (C) 2005-2006 Martin Willi
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
@@ -24,6 +25,9 @@
typedef struct job_t job_t;
typedef enum job_priority_t job_priority_t;
+typedef enum job_status_t job_status_t;
+typedef enum job_requeue_type_t job_requeue_type_t;
+typedef struct job_requeue_t job_requeue_t;
#include <library.h>
@@ -48,18 +52,107 @@ enum job_priority_t {
extern enum_name_t *job_priority_names;
/**
+ * Job status
+ */
+enum job_status_t {
+ /** The job is queued and has not yet been executed */
+ JOB_STATUS_QUEUED = 0,
+ /** During execution */
+ JOB_STATUS_EXECUTING,
+ /** If the job got canceled */
+ JOB_STATUS_CANCELED,
+ /** The job was executed successfully */
+ JOB_STATUS_DONE,
+};
+
+/**
+ * How a job is handled after is has been executed.
+ */
+enum job_requeue_type_t {
+ /** Do not requeue job, destroy it */
+ JOB_REQUEUE_TYPE_NONE = 0,
+ /** Requeue the job fairly, i.e. it is inserted at the end of the queue */
+ JOB_REQUEUE_TYPE_FAIR,
+ /** Reexecute the job directly, without the need of requeueing it */
+ JOB_REQUEUE_TYPE_DIRECT,
+ /** Rescheduled the job via scheduler_t */
+ JOB_REQUEUE_TYPE_SCHEDULE,
+};
+
+/**
+ * Job requeueing policy.
+ *
+ * The job requeueing policy defines how a job is handled after it has been
+ * executed.
+ */
+struct job_requeue_t {
+ /** How to handle the job after executing it */
+ job_requeue_type_t type;
+ /** How to reschedule the job, if so */
+ enum {
+ JOB_SCHEDULE,
+ JOB_SCHEDULE_MS,
+ JOB_SCHEDULE_TV,
+ } schedule;
+ /** Time to reschedule the job */
+ union {
+ u_int32_t rel;
+ timeval_t abs;
+ } time;
+};
+
+/**
+ * Helper macros to easily define requeueing policies.
+ */
+#define __JOB_REQUEUE(t) (job_requeue_t){ .type = t }
+#define JOB_REQUEUE_NONE __JOB_REQUEUE(JOB_REQUEUE_TYPE_NONE)
+#define JOB_REQUEUE_FAIR __JOB_REQUEUE(JOB_REQUEUE_TYPE_FAIR)
+#define JOB_REQUEUE_DIRECT __JOB_REQUEUE(JOB_REQUEUE_TYPE_DIRECT)
+#define __JOB_RESCHEDULE(t, ...) (job_requeue_t){ .type = JOB_REQUEUE_TYPE_SCHEDULE, .schedule = t, { __VA_ARGS__ } }
+#define JOB_RESCHEDULE(s) __JOB_RESCHEDULE(JOB_SCHEDULE, .rel = s)
+#define JOB_RESCHEDULE_MS(ms) __JOB_RESCHEDULE(JOB_SCHEDULE_MS, .rel = ms)
+#define JOB_RESCHEDULE_TV(tv) __JOB_RESCHEDULE(JOB_SCHEDULE_TV, .abs = tv)
+
+/**
* Job interface as it is stored in the job queue.
*/
struct job_t {
/**
+ * Status of this job, is modified exclusively by the processor/scheduler
+ */
+ job_status_t status;
+
+ /**
* Execute a job.
*
* The processing facility executes a job using this method. Jobs are
- * one-shot, they destroy themself after execution, so don't use a job
- * once it has been executed.
+ * one-shot, they are destroyed after execution (depending on the return
+ * value here), so don't use a job once it has been queued.
+ *
+ * @return policy how to requeue the job
+ */
+ job_requeue_t (*execute) (job_t *this);
+
+ /**
+ * Cancel a job.
+ *
+ * Implementing this method is optional. It allows potentially blocking
+ * jobs to be canceled during shutdown.
+ *
+ * If no special action is to be taken simply return FALSE then the thread
+ * executing the job will be canceled. If TRUE is returned the job is
+ * expected to return from execute() itself (i.e. the thread won't be
+ * canceled explicitly and can still be joined later).
+ * Jobs that return FALSE have to make sure they provide the appropriate
+ * cancellation points.
+ *
+ * @note Regular jobs that do not block MUST NOT implement this method.
+ * @note This method could be called even before execute() has been called.
+ *
+ * @return FALSE to cancel the thread, TRUE if canceled otherwise
*/
- void (*execute) (job_t *this);
+ bool (*cancel)(job_t *this);
/**
* Get the priority of a job.
@@ -71,10 +164,12 @@ struct job_t {
/**
* Destroy a job.
*
- * Is only called whenever a job was not executed (e.g. due daemon shutdown).
- * After execution, jobs destroy themself.
+ * Is called after a job is executed or got canceled. It is also called
+ * for queued jobs that were never executed.
+ *
+ * Use the status of a job to decide what to do during destruction.
*/
- void (*destroy) (job_t *this);
+ void (*destroy)(job_t *this);
};
#endif /** JOB_H_ @}*/
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c
index 222f1a535..5b7fd467c 100644
--- a/src/libstrongswan/processing/processor.c
+++ b/src/libstrongswan/processing/processor.c
@@ -1,7 +1,7 @@
/*
* Copyright (C) 2005-2011 Martin Willi
* Copyright (C) 2011 revosec AG
- * Copyright (C) 2008-2011 Tobias Brunner
+ * Copyright (C) 2008-2012 Tobias Brunner
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
*
@@ -58,7 +58,7 @@ struct private_processor_t {
/**
* All threads managed in the pool (including threads that have been
- * cancelled, this allows to join them during destruction)
+ * canceled, this allows to join them later), as worker_thread_t
*/
linked_list_t *threads;
@@ -73,11 +73,6 @@ struct private_processor_t {
int prio_threads[JOB_PRIO_MAX];
/**
- * Priority of the job executed by a thread
- */
- thread_value_t *priority;
-
- /**
* access to job lists is locked through this mutex
*/
mutex_t *mutex;
@@ -93,39 +88,71 @@ struct private_processor_t {
condvar_t *thread_terminated;
};
-static void process_jobs(private_processor_t *this);
+/**
+ * Worker thread
+ */
+typedef struct {
+
+ /**
+ * Reference to the processor
+ */
+ private_processor_t *processor;
+
+ /**
+ * The actual thread
+ */
+ thread_t *thread;
+
+ /**
+ * Job currently being executed by this worker thread
+ */
+ job_t *job;
+
+ /**
+ * Priority of the current job
+ */
+ job_priority_t priority;
+
+} worker_thread_t;
+
+static void process_jobs(worker_thread_t *worker);
/**
* restart a terminated thread
*/
-static void restart(private_processor_t *this)
+static void restart(worker_thread_t *worker)
{
- thread_t *thread;
+ private_processor_t *this = worker->processor;
DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
- /* respawn thread if required */
this->mutex->lock(this->mutex);
- if (this->desired_threads < this->total_threads ||
- (thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
- {
- this->total_threads--;
- this->thread_terminated->signal(this->thread_terminated);
- }
- else
+ /* cleanup worker thread */
+ this->working_threads[worker->priority]--;
+ worker->job->status = JOB_STATUS_CANCELED;
+ worker->job->destroy(worker->job);
+ worker->job = NULL;
+
+ /* respawn thread if required */
+ if (this->desired_threads >= this->total_threads)
{
- this->threads->insert_last(this->threads, thread);
+ worker_thread_t *new_worker;
+
+ INIT(new_worker,
+ .processor = this,
+ );
+ new_worker->thread = thread_create((thread_main_t)process_jobs,
+ new_worker);
+ if (new_worker->thread)
+ {
+ this->threads->insert_last(this->threads, new_worker);
+ this->mutex->unlock(this->mutex);
+ return;
+ }
+ free(new_worker);
}
- this->mutex->unlock(this->mutex);
-}
-
-/**
- * Decrement working thread count of a priority class
- */
-static void decrement_working_threads(private_processor_t *this)
-{
- this->mutex->lock(this->mutex);
- this->working_threads[(intptr_t)this->priority->get(this->priority)]--;
+ this->total_threads--;
+ this->thread_terminated->signal(this->thread_terminated);
this->mutex->unlock(this->mutex);
}
@@ -147,9 +174,11 @@ static u_int get_idle_threads_nolock(private_processor_t *this)
/**
* Process queued jobs, called by the worker threads
*/
-static void process_jobs(private_processor_t *this)
+static void process_jobs(worker_thread_t *worker)
{
- /* worker threads are not cancellable by default */
+ private_processor_t *this = worker->processor;
+
+ /* worker threads are not cancelable by default */
thread_cancelability(FALSE);
DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
@@ -157,7 +186,6 @@ static void process_jobs(private_processor_t *this)
this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
{
- job_t *job = NULL;
int i, reserved = 0, idle;
idle = get_idle_threads_nolock(this);
@@ -176,27 +204,80 @@ static void process_jobs(private_processor_t *this)
reserved += this->prio_threads[i] - this->working_threads[i];
}
if (this->jobs[i]->remove_first(this->jobs[i],
- (void**)&job) == SUCCESS)
+ (void**)&worker->job) == SUCCESS)
{
+ job_requeue_t requeue;
+
this->working_threads[i]++;
+ worker->job->status = JOB_STATUS_EXECUTING;
+ worker->priority = i;
this->mutex->unlock(this->mutex);
- this->priority->set(this->priority, (void*)(intptr_t)i);
- /* terminated threads are restarted to get a constant pool */
- thread_cleanup_push((thread_cleanup_t)restart, this);
- thread_cleanup_push((thread_cleanup_t)decrement_working_threads,
- this);
- job->execute(job);
- thread_cleanup_pop(FALSE);
+ /* canceled threads are restarted to get a constant pool */
+ thread_cleanup_push((thread_cleanup_t)restart, worker);
+ while (TRUE)
+ {
+ requeue = worker->job->execute(worker->job);
+ if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
+ {
+ break;
+ }
+ else if (!worker->job->cancel)
+ { /* only allow cancelable jobs to requeue directly */
+ requeue.type = JOB_REQUEUE_TYPE_FAIR;
+ break;
+ }
+ }
thread_cleanup_pop(FALSE);
this->mutex->lock(this->mutex);
this->working_threads[i]--;
+ if (worker->job->status == JOB_STATUS_CANCELED)
+ { /* job was canceled via a custom cancel() method or did not
+ * use JOB_REQUEUE_TYPE_DIRECT */
+ worker->job->destroy(worker->job);
+ break;
+ }
+ switch (requeue.type)
+ {
+ case JOB_REQUEUE_TYPE_NONE:
+ worker->job->status = JOB_STATUS_DONE;
+ worker->job->destroy(worker->job);
+ break;
+ case JOB_REQUEUE_TYPE_FAIR:
+ worker->job->status = JOB_STATUS_QUEUED;
+ this->jobs[i]->insert_last(this->jobs[i],
+ worker->job);
+ this->job_added->signal(this->job_added);
+ break;
+ case JOB_REQUEUE_TYPE_SCHEDULE:
+ /* scheduler_t does not hold its lock when queeuing jobs
+ * so this should be safe without unlocking our mutex */
+ switch (requeue.schedule)
+ {
+ case JOB_SCHEDULE:
+ lib->scheduler->schedule_job(lib->scheduler,
+ worker->job, requeue.time.rel);
+ break;
+ case JOB_SCHEDULE_MS:
+ lib->scheduler->schedule_job_ms(lib->scheduler,
+ worker->job, requeue.time.rel);
+ break;
+ case JOB_SCHEDULE_TV:
+ lib->scheduler->schedule_job_tv(lib->scheduler,
+ worker->job, requeue.time.abs);
+ break;
+ }
+ break;
+ default:
+ break;
+ }
break;
}
}
- if (!job)
+ if (!worker->job)
{
this->job_added->wait(this->job_added, this->mutex);
}
+ worker->job = NULL;
}
this->total_threads--;
this->thread_terminated->signal(this->thread_terminated);
@@ -266,6 +347,8 @@ METHOD(processor_t, queue_job, void,
job_priority_t prio;
prio = sane_prio(job->get_priority(job));
+ job->status = JOB_STATUS_QUEUED;
+
this->mutex->lock(this->mutex);
this->jobs[prio]->insert_last(this->jobs[prio], job);
this->job_added->signal(this->job_added);
@@ -278,19 +361,26 @@ METHOD(processor_t, set_threads, void,
this->mutex->lock(this->mutex);
if (count > this->total_threads)
{ /* increase thread count */
+ worker_thread_t *worker;
int i;
- thread_t *current;
this->desired_threads = count;
DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
for (i = this->total_threads; i < count; i++)
{
- current = thread_create((thread_main_t)process_jobs, this);
- if (current)
+ INIT(worker,
+ .processor = this,
+ );
+ worker->thread = thread_create((thread_main_t)process_jobs, worker);
+ if (worker->thread)
{
- this->threads->insert_last(this->threads, current);
+ this->threads->insert_last(this->threads, worker);
this->total_threads++;
}
+ else
+ {
+ free(worker);
+ }
}
}
else if (count < this->total_threads)
@@ -301,26 +391,49 @@ METHOD(processor_t, set_threads, void,
this->mutex->unlock(this->mutex);
}
-METHOD(processor_t, destroy, void,
+METHOD(processor_t, cancel, void,
private_processor_t *this)
{
- thread_t *current;
- int i;
+ enumerator_t *enumerator;
+ worker_thread_t *worker;
- set_threads(this, 0);
this->mutex->lock(this->mutex);
+ this->desired_threads = 0;
+ /* cancel potentially blocking jobs */
+ enumerator = this->threads->create_enumerator(this->threads);
+ while (enumerator->enumerate(enumerator, (void**)&worker))
+ {
+ if (worker->job && worker->job->cancel)
+ {
+ worker->job->status = JOB_STATUS_CANCELED;
+ if (!worker->job->cancel(worker->job))
+ { /* job requests to be canceled explicitly, otherwise we assume
+ * the thread terminates itself and can be joined */
+ worker->thread->cancel(worker->thread);
+ }
+ }
+ }
+ enumerator->destroy(enumerator);
while (this->total_threads > 0)
{
this->job_added->broadcast(this->job_added);
this->thread_terminated->wait(this->thread_terminated, this->mutex);
}
while (this->threads->remove_first(this->threads,
- (void**)&current) == SUCCESS)
+ (void**)&worker) == SUCCESS)
{
- current->join(current);
+ worker->thread->join(worker->thread);
+ free(worker);
}
this->mutex->unlock(this->mutex);
- this->priority->destroy(this->priority);
+}
+
+METHOD(processor_t, destroy, void,
+ private_processor_t *this)
+{
+ int i;
+
+ cancel(this);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
@@ -348,10 +461,10 @@ processor_t *processor_create()
.get_job_load = _get_job_load,
.queue_job = _queue_job,
.set_threads = _set_threads,
+ .cancel = _cancel,
.destroy = _destroy,
},
.threads = linked_list_create(),
- .priority = thread_value_create(NULL),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
.thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
diff --git a/src/libstrongswan/processing/processor.h b/src/libstrongswan/processing/processor.h
index 5db42c04c..94860f5d3 100644
--- a/src/libstrongswan/processing/processor.h
+++ b/src/libstrongswan/processing/processor.h
@@ -1,4 +1,5 @@
/*
+ * Copyright (C) 2012 Tobias Brunner
* Copyright (C) 2005-2007 Martin Willi
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
@@ -51,7 +52,7 @@ struct processor_t {
/**
* Get the number of threads currently working, per priority class.
*
- * @param prioritiy to check
+ * @param priority to check
* @return number of threads in priority working
*/
u_int (*get_working_threads)(processor_t *this, job_priority_t prio);
@@ -78,14 +79,21 @@ struct processor_t {
*
* If the number of threads is smaller than number of currently running
* threads, thread count is decreased. Use 0 to disable the processor.
- * This call blocks if it decreases thread count until threads have
- * terminated, so make sure there are not too many blocking jobs.
+ *
+ * This call does not block and wait for threads to terminate if the number
+ * of threads is reduced. Instead use cancel() for that during shutdown.
*
* @param count number of threads to allocate
*/
void (*set_threads)(processor_t *this, u_int count);
/**
+ * Sets the number of threads to 0 and cancels all blocking jobs, then waits
+ * for all threads to be terminated.
+ */
+ void (*cancel)(processor_t *this);
+
+ /**
* Destroy a processor object.
*/
void (*destroy) (processor_t *processor);
diff --git a/src/libstrongswan/processing/scheduler.c b/src/libstrongswan/processing/scheduler.c
index f3cc1164a..c97dbc4be 100644
--- a/src/libstrongswan/processing/scheduler.c
+++ b/src/libstrongswan/processing/scheduler.c
@@ -68,11 +68,6 @@ struct private_scheduler_t {
scheduler_t public;
/**
- * Job which queues scheduled jobs to the processor.
- */
- callback_job_t *job;
-
- /**
* The heap in which the events are stored.
*/
event_t **heap;
@@ -250,6 +245,7 @@ METHOD(scheduler_t, schedule_job_tv, void,
event = malloc_thing(event_t);
event->job = job;
+ event->job->status = JOB_STATUS_QUEUED;
event->time = tv;
this->mutex->lock(this->mutex);
@@ -308,7 +304,6 @@ METHOD(scheduler_t, destroy, void,
private_scheduler_t *this)
{
event_t *event;
- this->job->cancel(this->job);
this->condvar->destroy(this->condvar);
this->mutex->destroy(this->mutex);
while ((event = remove_event(this)) != NULL)
@@ -325,6 +320,7 @@ METHOD(scheduler_t, destroy, void,
scheduler_t * scheduler_create()
{
private_scheduler_t *this;
+ callback_job_t *job;
INIT(this,
.public = {
@@ -341,9 +337,9 @@ scheduler_t * scheduler_create()
this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
- this->job = callback_job_create_with_prio((callback_job_cb_t)schedule,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ job = callback_job_create_with_prio((callback_job_cb_t)schedule, this,
+ NULL, return_false, JOB_PRIO_CRITICAL);
+ lib->processor->queue_job(lib->processor, (job_t*)job);
return &this->public;
}