summaryrefslogtreecommitdiff
path: root/src/libstrongswan/processing/processor.c
diff options
context:
space:
mode:
authorYves-Alexis Perez <corsac@debian.org>2013-08-25 15:37:26 +0200
committerYves-Alexis Perez <corsac@debian.org>2013-08-25 15:37:26 +0200
commit6b99c8d9cff7b3e8ae8f3204b99e7ea40f791349 (patch)
tree009fc492961e13860d2a4bc2de8caf2bbe2975e7 /src/libstrongswan/processing/processor.c
parentc83921a2b566aa9d55d8ccc7258f04fca6292ee6 (diff)
downloadvyos-strongswan-6b99c8d9cff7b3e8ae8f3204b99e7ea40f791349.tar.gz
vyos-strongswan-6b99c8d9cff7b3e8ae8f3204b99e7ea40f791349.zip
Imported Upstream version 5.1.0
Diffstat (limited to 'src/libstrongswan/processing/processor.c')
-rw-r--r--src/libstrongswan/processing/processor.c251
1 files changed, 161 insertions, 90 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c
index 934636fc0..adbd95685 100644
--- a/src/libstrongswan/processing/processor.c
+++ b/src/libstrongswan/processing/processor.c
@@ -1,7 +1,7 @@
/*
* Copyright (C) 2005-2011 Martin Willi
* Copyright (C) 2011 revosec AG
- * Copyright (C) 2008-2012 Tobias Brunner
+ * Copyright (C) 2008-2013 Tobias Brunner
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
*
@@ -123,6 +123,7 @@ static void process_jobs(worker_thread_t *worker);
static void restart(worker_thread_t *worker)
{
private_processor_t *this = worker->processor;
+ job_t *job;
DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
@@ -130,8 +131,15 @@ static void restart(worker_thread_t *worker)
/* cleanup worker thread */
this->working_threads[worker->priority]--;
worker->job->status = JOB_STATUS_CANCELED;
- worker->job->destroy(worker->job);
+ job = worker->job;
+ /* unset the job before releasing the mutex, otherwise cancel() might
+ * interfere */
worker->job = NULL;
+ /* release mutex to avoid deadlocks if the same lock is required
+ * during queue_job() and in the destructor called here */
+ this->mutex->unlock(this->mutex);
+ job->destroy(job);
+ this->mutex->lock(this->mutex);
/* respawn thread if required */
if (this->desired_threads >= this->total_threads)
@@ -172,112 +180,150 @@ static u_int get_idle_threads_nolock(private_processor_t *this)
}
/**
- * Process queued jobs, called by the worker threads
+ * Get a job from any job queue, starting with the highest priority.
+ *
+ * this->mutex is expected to be locked.
*/
-static void process_jobs(worker_thread_t *worker)
+static bool get_job(private_processor_t *this, worker_thread_t *worker)
{
- private_processor_t *this = worker->processor;
+ int i, reserved = 0, idle;
- /* worker threads are not cancelable by default */
- thread_cancelability(FALSE);
-
- DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
+ idle = get_idle_threads_nolock(this);
- this->mutex->lock(this->mutex);
- while (this->desired_threads >= this->total_threads)
+ for (i = 0; i < JOB_PRIO_MAX; i++)
{
- int i, reserved = 0, idle;
+ if (reserved && reserved >= idle)
+ {
+ DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
+ "but %d reserved for higher priorities",
+ job_priority_names, i, idle, reserved);
+ /* wait until a job of higher priority gets queued */
+ return FALSE;
+ }
+ if (this->working_threads[i] < this->prio_threads[i])
+ {
+ reserved += this->prio_threads[i] - this->working_threads[i];
+ }
+ if (this->jobs[i]->remove_first(this->jobs[i],
+ (void**)&worker->job) == SUCCESS)
+ {
+ worker->priority = i;
+ return TRUE;
+ }
+ }
+ return FALSE;
+}
- idle = get_idle_threads_nolock(this);
+/**
+ * Process a single job (provided in worker->job, worker->priority is also
+ * expected to be set)
+ *
+ * this->mutex is expected to be locked.
+ */
+static void process_job(private_processor_t *this, worker_thread_t *worker)
+{
+ job_t *to_destroy = NULL;
+ job_requeue_t requeue;
- for (i = 0; i < JOB_PRIO_MAX; i++)
+ this->working_threads[worker->priority]++;
+ worker->job->status = JOB_STATUS_EXECUTING;
+ this->mutex->unlock(this->mutex);
+ /* canceled threads are restarted to get a constant pool */
+ thread_cleanup_push((thread_cleanup_t)restart, worker);
+ while (TRUE)
+ {
+ requeue = worker->job->execute(worker->job);
+ if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
{
- if (reserved && reserved >= idle)
- {
- DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
- "but %d reserved for higher priorities",
- job_priority_names, i, idle, reserved);
+ break;
+ }
+ else if (!worker->job->cancel)
+ { /* only allow cancelable jobs to requeue directly */
+ requeue.type = JOB_REQUEUE_TYPE_FAIR;
+ break;
+ }
+ }
+ thread_cleanup_pop(FALSE);
+ this->mutex->lock(this->mutex);
+ this->working_threads[worker->priority]--;
+ if (worker->job->status == JOB_STATUS_CANCELED)
+ { /* job was canceled via a custom cancel() method or did not
+ * use JOB_REQUEUE_TYPE_DIRECT */
+ to_destroy = worker->job;
+ }
+ else
+ {
+ switch (requeue.type)
+ {
+ case JOB_REQUEUE_TYPE_NONE:
+ worker->job->status = JOB_STATUS_DONE;
+ to_destroy = worker->job;
break;
- }
- if (this->working_threads[i] < this->prio_threads[i])
- {
- reserved += this->prio_threads[i] - this->working_threads[i];
- }
- if (this->jobs[i]->remove_first(this->jobs[i],
- (void**)&worker->job) == SUCCESS)
- {
- job_requeue_t requeue;
-
- this->working_threads[i]++;
- worker->job->status = JOB_STATUS_EXECUTING;
- worker->priority = i;
- this->mutex->unlock(this->mutex);
- /* canceled threads are restarted to get a constant pool */
- thread_cleanup_push((thread_cleanup_t)restart, worker);
- while (TRUE)
- {
- requeue = worker->job->execute(worker->job);
- if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
- {
- break;
- }
- else if (!worker->job->cancel)
- { /* only allow cancelable jobs to requeue directly */
- requeue.type = JOB_REQUEUE_TYPE_FAIR;
- break;
- }
- }
- thread_cleanup_pop(FALSE);
- this->mutex->lock(this->mutex);
- this->working_threads[i]--;
- if (worker->job->status == JOB_STATUS_CANCELED)
- { /* job was canceled via a custom cancel() method or did not
- * use JOB_REQUEUE_TYPE_DIRECT */
- worker->job->destroy(worker->job);
- break;
- }
- switch (requeue.type)
+ case JOB_REQUEUE_TYPE_FAIR:
+ worker->job->status = JOB_STATUS_QUEUED;
+ this->jobs[worker->priority]->insert_last(
+ this->jobs[worker->priority], worker->job);
+ this->job_added->signal(this->job_added);
+ break;
+ case JOB_REQUEUE_TYPE_SCHEDULE:
+ /* scheduler_t does not hold its lock when queuing jobs
+ * so this should be safe without unlocking our mutex */
+ switch (requeue.schedule)
{
- case JOB_REQUEUE_TYPE_NONE:
- worker->job->status = JOB_STATUS_DONE;
- worker->job->destroy(worker->job);
+ case JOB_SCHEDULE:
+ lib->scheduler->schedule_job(lib->scheduler,
+ worker->job, requeue.time.rel);
break;
- case JOB_REQUEUE_TYPE_FAIR:
- worker->job->status = JOB_STATUS_QUEUED;
- this->jobs[i]->insert_last(this->jobs[i],
- worker->job);
- this->job_added->signal(this->job_added);
+ case JOB_SCHEDULE_MS:
+ lib->scheduler->schedule_job_ms(lib->scheduler,
+ worker->job, requeue.time.rel);
break;
- case JOB_REQUEUE_TYPE_SCHEDULE:
- /* scheduler_t does not hold its lock when queeuing jobs
- * so this should be safe without unlocking our mutex */
- switch (requeue.schedule)
- {
- case JOB_SCHEDULE:
- lib->scheduler->schedule_job(lib->scheduler,
- worker->job, requeue.time.rel);
- break;
- case JOB_SCHEDULE_MS:
- lib->scheduler->schedule_job_ms(lib->scheduler,
- worker->job, requeue.time.rel);
- break;
- case JOB_SCHEDULE_TV:
- lib->scheduler->schedule_job_tv(lib->scheduler,
- worker->job, requeue.time.abs);
- break;
- }
- break;
- default:
+ case JOB_SCHEDULE_TV:
+ lib->scheduler->schedule_job_tv(lib->scheduler,
+ worker->job, requeue.time.abs);
break;
}
break;
- }
+ default:
+ break;
+ }
+ }
+ /* unset the current job to avoid interference with cancel() when
+ * destroying the job below */
+ worker->job = NULL;
+
+ if (to_destroy)
+ { /* release mutex to avoid deadlocks if the same lock is required
+ * during queue_job() and in the destructor called here */
+ this->mutex->unlock(this->mutex);
+ to_destroy->destroy(to_destroy);
+ this->mutex->lock(this->mutex);
+ }
+}
+
+/**
+ * Process queued jobs, called by the worker threads
+ */
+static void process_jobs(worker_thread_t *worker)
+{
+ private_processor_t *this = worker->processor;
+
+ /* worker threads are not cancelable by default */
+ thread_cancelability(FALSE);
+
+ DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
+
+ this->mutex->lock(this->mutex);
+ while (this->desired_threads >= this->total_threads)
+ {
+ if (get_job(this, worker))
+ {
+ process_job(this, worker);
}
- if (!worker->job)
+ else
{
this->job_added->wait(this->job_added, this->mutex);
}
- worker->job = NULL;
}
this->total_threads--;
this->thread_terminated->signal(this->thread_terminated);
@@ -355,6 +401,31 @@ METHOD(processor_t, queue_job, void,
this->mutex->unlock(this->mutex);
}
+METHOD(processor_t, execute_job, void,
+ private_processor_t *this, job_t *job)
+{
+ job_priority_t prio;
+ bool queued = FALSE;
+
+ this->mutex->lock(this->mutex);
+ if (this->desired_threads && get_idle_threads_nolock(this))
+ {
+ prio = sane_prio(job->get_priority(job));
+ job->status = JOB_STATUS_QUEUED;
+ /* insert job in front to execute it immediately */
+ this->jobs[prio]->insert_first(this->jobs[prio], job);
+ queued = TRUE;
+ }
+ this->job_added->signal(this->job_added);
+ this->mutex->unlock(this->mutex);
+
+ if (!queued)
+ {
+ job->execute(job);
+ job->destroy(job);
+ }
+}
+
METHOD(processor_t, set_threads, void,
private_processor_t *this, u_int count)
{
@@ -460,6 +531,7 @@ processor_t *processor_create()
.get_working_threads = _get_working_threads,
.get_job_load = _get_job_load,
.queue_job = _queue_job,
+ .execute_job = _execute_job,
.set_threads = _set_threads,
.cancel = _cancel,
.destroy = _destroy,
@@ -479,4 +551,3 @@ processor_t *processor_create()
return &this->public;
}
-