diff options
author | Yves-Alexis Perez <corsac@debian.org> | 2013-08-25 15:37:26 +0200 |
---|---|---|
committer | Yves-Alexis Perez <corsac@debian.org> | 2013-08-25 15:37:26 +0200 |
commit | 6b99c8d9cff7b3e8ae8f3204b99e7ea40f791349 (patch) | |
tree | 009fc492961e13860d2a4bc2de8caf2bbe2975e7 /src/libstrongswan/processing/processor.c | |
parent | c83921a2b566aa9d55d8ccc7258f04fca6292ee6 (diff) | |
download | vyos-strongswan-6b99c8d9cff7b3e8ae8f3204b99e7ea40f791349.tar.gz vyos-strongswan-6b99c8d9cff7b3e8ae8f3204b99e7ea40f791349.zip |
Imported Upstream version 5.1.0
Diffstat (limited to 'src/libstrongswan/processing/processor.c')
-rw-r--r-- | src/libstrongswan/processing/processor.c | 251 |
1 files changed, 161 insertions, 90 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index 934636fc0..adbd95685 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -1,7 +1,7 @@ /* * Copyright (C) 2005-2011 Martin Willi * Copyright (C) 2011 revosec AG - * Copyright (C) 2008-2012 Tobias Brunner + * Copyright (C) 2008-2013 Tobias Brunner * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil * @@ -123,6 +123,7 @@ static void process_jobs(worker_thread_t *worker); static void restart(worker_thread_t *worker) { private_processor_t *this = worker->processor; + job_t *job; DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id()); @@ -130,8 +131,15 @@ static void restart(worker_thread_t *worker) /* cleanup worker thread */ this->working_threads[worker->priority]--; worker->job->status = JOB_STATUS_CANCELED; - worker->job->destroy(worker->job); + job = worker->job; + /* unset the job before releasing the mutex, otherwise cancel() might + * interfere */ worker->job = NULL; + /* release mutex to avoid deadlocks if the same lock is required + * during queue_job() and in the destructor called here */ + this->mutex->unlock(this->mutex); + job->destroy(job); + this->mutex->lock(this->mutex); /* respawn thread if required */ if (this->desired_threads >= this->total_threads) @@ -172,112 +180,150 @@ static u_int get_idle_threads_nolock(private_processor_t *this) } /** - * Process queued jobs, called by the worker threads + * Get a job from any job queue, starting with the highest priority. + * + * this->mutex is expected to be locked. */ -static void process_jobs(worker_thread_t *worker) +static bool get_job(private_processor_t *this, worker_thread_t *worker) { - private_processor_t *this = worker->processor; + int i, reserved = 0, idle; - /* worker threads are not cancelable by default */ - thread_cancelability(FALSE); - - DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id()); + idle = get_idle_threads_nolock(this); - this->mutex->lock(this->mutex); - while (this->desired_threads >= this->total_threads) + for (i = 0; i < JOB_PRIO_MAX; i++) { - int i, reserved = 0, idle; + if (reserved && reserved >= idle) + { + DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " + "but %d reserved for higher priorities", + job_priority_names, i, idle, reserved); + /* wait until a job of higher priority gets queued */ + return FALSE; + } + if (this->working_threads[i] < this->prio_threads[i]) + { + reserved += this->prio_threads[i] - this->working_threads[i]; + } + if (this->jobs[i]->remove_first(this->jobs[i], + (void**)&worker->job) == SUCCESS) + { + worker->priority = i; + return TRUE; + } + } + return FALSE; +} - idle = get_idle_threads_nolock(this); +/** + * Process a single job (provided in worker->job, worker->priority is also + * expected to be set) + * + * this->mutex is expected to be locked. + */ +static void process_job(private_processor_t *this, worker_thread_t *worker) +{ + job_t *to_destroy = NULL; + job_requeue_t requeue; - for (i = 0; i < JOB_PRIO_MAX; i++) + this->working_threads[worker->priority]++; + worker->job->status = JOB_STATUS_EXECUTING; + this->mutex->unlock(this->mutex); + /* canceled threads are restarted to get a constant pool */ + thread_cleanup_push((thread_cleanup_t)restart, worker); + while (TRUE) + { + requeue = worker->job->execute(worker->job); + if (requeue.type != JOB_REQUEUE_TYPE_DIRECT) { - if (reserved && reserved >= idle) - { - DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " - "but %d reserved for higher priorities", - job_priority_names, i, idle, reserved); + break; + } + else if (!worker->job->cancel) + { /* only allow cancelable jobs to requeue directly */ + requeue.type = JOB_REQUEUE_TYPE_FAIR; + break; + } + } + thread_cleanup_pop(FALSE); + this->mutex->lock(this->mutex); + this->working_threads[worker->priority]--; + if (worker->job->status == JOB_STATUS_CANCELED) + { /* job was canceled via a custom cancel() method or did not + * use JOB_REQUEUE_TYPE_DIRECT */ + to_destroy = worker->job; + } + else + { + switch (requeue.type) + { + case JOB_REQUEUE_TYPE_NONE: + worker->job->status = JOB_STATUS_DONE; + to_destroy = worker->job; break; - } - if (this->working_threads[i] < this->prio_threads[i]) - { - reserved += this->prio_threads[i] - this->working_threads[i]; - } - if (this->jobs[i]->remove_first(this->jobs[i], - (void**)&worker->job) == SUCCESS) - { - job_requeue_t requeue; - - this->working_threads[i]++; - worker->job->status = JOB_STATUS_EXECUTING; - worker->priority = i; - this->mutex->unlock(this->mutex); - /* canceled threads are restarted to get a constant pool */ - thread_cleanup_push((thread_cleanup_t)restart, worker); - while (TRUE) - { - requeue = worker->job->execute(worker->job); - if (requeue.type != JOB_REQUEUE_TYPE_DIRECT) - { - break; - } - else if (!worker->job->cancel) - { /* only allow cancelable jobs to requeue directly */ - requeue.type = JOB_REQUEUE_TYPE_FAIR; - break; - } - } - thread_cleanup_pop(FALSE); - this->mutex->lock(this->mutex); - this->working_threads[i]--; - if (worker->job->status == JOB_STATUS_CANCELED) - { /* job was canceled via a custom cancel() method or did not - * use JOB_REQUEUE_TYPE_DIRECT */ - worker->job->destroy(worker->job); - break; - } - switch (requeue.type) + case JOB_REQUEUE_TYPE_FAIR: + worker->job->status = JOB_STATUS_QUEUED; + this->jobs[worker->priority]->insert_last( + this->jobs[worker->priority], worker->job); + this->job_added->signal(this->job_added); + break; + case JOB_REQUEUE_TYPE_SCHEDULE: + /* scheduler_t does not hold its lock when queuing jobs + * so this should be safe without unlocking our mutex */ + switch (requeue.schedule) { - case JOB_REQUEUE_TYPE_NONE: - worker->job->status = JOB_STATUS_DONE; - worker->job->destroy(worker->job); + case JOB_SCHEDULE: + lib->scheduler->schedule_job(lib->scheduler, + worker->job, requeue.time.rel); break; - case JOB_REQUEUE_TYPE_FAIR: - worker->job->status = JOB_STATUS_QUEUED; - this->jobs[i]->insert_last(this->jobs[i], - worker->job); - this->job_added->signal(this->job_added); + case JOB_SCHEDULE_MS: + lib->scheduler->schedule_job_ms(lib->scheduler, + worker->job, requeue.time.rel); break; - case JOB_REQUEUE_TYPE_SCHEDULE: - /* scheduler_t does not hold its lock when queeuing jobs - * so this should be safe without unlocking our mutex */ - switch (requeue.schedule) - { - case JOB_SCHEDULE: - lib->scheduler->schedule_job(lib->scheduler, - worker->job, requeue.time.rel); - break; - case JOB_SCHEDULE_MS: - lib->scheduler->schedule_job_ms(lib->scheduler, - worker->job, requeue.time.rel); - break; - case JOB_SCHEDULE_TV: - lib->scheduler->schedule_job_tv(lib->scheduler, - worker->job, requeue.time.abs); - break; - } - break; - default: + case JOB_SCHEDULE_TV: + lib->scheduler->schedule_job_tv(lib->scheduler, + worker->job, requeue.time.abs); break; } break; - } + default: + break; + } + } + /* unset the current job to avoid interference with cancel() when + * destroying the job below */ + worker->job = NULL; + + if (to_destroy) + { /* release mutex to avoid deadlocks if the same lock is required + * during queue_job() and in the destructor called here */ + this->mutex->unlock(this->mutex); + to_destroy->destroy(to_destroy); + this->mutex->lock(this->mutex); + } +} + +/** + * Process queued jobs, called by the worker threads + */ +static void process_jobs(worker_thread_t *worker) +{ + private_processor_t *this = worker->processor; + + /* worker threads are not cancelable by default */ + thread_cancelability(FALSE); + + DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id()); + + this->mutex->lock(this->mutex); + while (this->desired_threads >= this->total_threads) + { + if (get_job(this, worker)) + { + process_job(this, worker); } - if (!worker->job) + else { this->job_added->wait(this->job_added, this->mutex); } - worker->job = NULL; } this->total_threads--; this->thread_terminated->signal(this->thread_terminated); @@ -355,6 +401,31 @@ METHOD(processor_t, queue_job, void, this->mutex->unlock(this->mutex); } +METHOD(processor_t, execute_job, void, + private_processor_t *this, job_t *job) +{ + job_priority_t prio; + bool queued = FALSE; + + this->mutex->lock(this->mutex); + if (this->desired_threads && get_idle_threads_nolock(this)) + { + prio = sane_prio(job->get_priority(job)); + job->status = JOB_STATUS_QUEUED; + /* insert job in front to execute it immediately */ + this->jobs[prio]->insert_first(this->jobs[prio], job); + queued = TRUE; + } + this->job_added->signal(this->job_added); + this->mutex->unlock(this->mutex); + + if (!queued) + { + job->execute(job); + job->destroy(job); + } +} + METHOD(processor_t, set_threads, void, private_processor_t *this, u_int count) { @@ -460,6 +531,7 @@ processor_t *processor_create() .get_working_threads = _get_working_threads, .get_job_load = _get_job_load, .queue_job = _queue_job, + .execute_job = _execute_job, .set_threads = _set_threads, .cancel = _cancel, .destroy = _destroy, @@ -479,4 +551,3 @@ processor_t *processor_create() return &this->public; } - |