diff options
Diffstat (limited to 'src/libstrongswan/processing/processor.c')
-rw-r--r-- | src/libstrongswan/processing/processor.c | 346 |
1 files changed, 265 insertions, 81 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index 222f1a535..adbd95685 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -1,7 +1,7 @@ /* * Copyright (C) 2005-2011 Martin Willi * Copyright (C) 2011 revosec AG - * Copyright (C) 2008-2011 Tobias Brunner + * Copyright (C) 2008-2013 Tobias Brunner * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil * @@ -22,12 +22,12 @@ #include "processor.h" -#include <debug.h> +#include <utils/debug.h> #include <threading/thread.h> #include <threading/condvar.h> #include <threading/mutex.h> #include <threading/thread_value.h> -#include <utils/linked_list.h> +#include <collections/linked_list.h> typedef struct private_processor_t private_processor_t; @@ -58,7 +58,7 @@ struct private_processor_t { /** * All threads managed in the pool (including threads that have been - * cancelled, this allows to join them during destruction) + * canceled, this allows to join them later), as worker_thread_t */ linked_list_t *threads; @@ -73,11 +73,6 @@ struct private_processor_t { int prio_threads[JOB_PRIO_MAX]; /** - * Priority of the job executed by a thread - */ - thread_value_t *priority; - - /** * access to job lists is locked through this mutex */ mutex_t *mutex; @@ -93,39 +88,79 @@ struct private_processor_t { condvar_t *thread_terminated; }; -static void process_jobs(private_processor_t *this); +/** + * Worker thread + */ +typedef struct { + + /** + * Reference to the processor + */ + private_processor_t *processor; + + /** + * The actual thread + */ + thread_t *thread; + + /** + * Job currently being executed by this worker thread + */ + job_t *job; + + /** + * Priority of the current job + */ + job_priority_t priority; + +} worker_thread_t; + +static void process_jobs(worker_thread_t *worker); /** * restart a terminated thread */ -static void restart(private_processor_t *this) +static void restart(worker_thread_t *worker) { - thread_t *thread; + private_processor_t *this = worker->processor; + job_t *job; DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id()); - /* respawn thread if required */ this->mutex->lock(this->mutex); - if (this->desired_threads < this->total_threads || - (thread = thread_create((thread_main_t)process_jobs, this)) == NULL) - { - this->total_threads--; - this->thread_terminated->signal(this->thread_terminated); - } - else - { - this->threads->insert_last(this->threads, thread); - } + /* cleanup worker thread */ + this->working_threads[worker->priority]--; + worker->job->status = JOB_STATUS_CANCELED; + job = worker->job; + /* unset the job before releasing the mutex, otherwise cancel() might + * interfere */ + worker->job = NULL; + /* release mutex to avoid deadlocks if the same lock is required + * during queue_job() and in the destructor called here */ this->mutex->unlock(this->mutex); -} - -/** - * Decrement working thread count of a priority class - */ -static void decrement_working_threads(private_processor_t *this) -{ + job->destroy(job); this->mutex->lock(this->mutex); - this->working_threads[(intptr_t)this->priority->get(this->priority)]--; + + /* respawn thread if required */ + if (this->desired_threads >= this->total_threads) + { + worker_thread_t *new_worker; + + INIT(new_worker, + .processor = this, + ); + new_worker->thread = thread_create((thread_main_t)process_jobs, + new_worker); + if (new_worker->thread) + { + this->threads->insert_last(this->threads, new_worker); + this->mutex->unlock(this->mutex); + return; + } + free(new_worker); + } + this->total_threads--; + this->thread_terminated->signal(this->thread_terminated); this->mutex->unlock(this->mutex); } @@ -145,11 +180,135 @@ static u_int get_idle_threads_nolock(private_processor_t *this) } /** + * Get a job from any job queue, starting with the highest priority. + * + * this->mutex is expected to be locked. + */ +static bool get_job(private_processor_t *this, worker_thread_t *worker) +{ + int i, reserved = 0, idle; + + idle = get_idle_threads_nolock(this); + + for (i = 0; i < JOB_PRIO_MAX; i++) + { + if (reserved && reserved >= idle) + { + DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " + "but %d reserved for higher priorities", + job_priority_names, i, idle, reserved); + /* wait until a job of higher priority gets queued */ + return FALSE; + } + if (this->working_threads[i] < this->prio_threads[i]) + { + reserved += this->prio_threads[i] - this->working_threads[i]; + } + if (this->jobs[i]->remove_first(this->jobs[i], + (void**)&worker->job) == SUCCESS) + { + worker->priority = i; + return TRUE; + } + } + return FALSE; +} + +/** + * Process a single job (provided in worker->job, worker->priority is also + * expected to be set) + * + * this->mutex is expected to be locked. + */ +static void process_job(private_processor_t *this, worker_thread_t *worker) +{ + job_t *to_destroy = NULL; + job_requeue_t requeue; + + this->working_threads[worker->priority]++; + worker->job->status = JOB_STATUS_EXECUTING; + this->mutex->unlock(this->mutex); + /* canceled threads are restarted to get a constant pool */ + thread_cleanup_push((thread_cleanup_t)restart, worker); + while (TRUE) + { + requeue = worker->job->execute(worker->job); + if (requeue.type != JOB_REQUEUE_TYPE_DIRECT) + { + break; + } + else if (!worker->job->cancel) + { /* only allow cancelable jobs to requeue directly */ + requeue.type = JOB_REQUEUE_TYPE_FAIR; + break; + } + } + thread_cleanup_pop(FALSE); + this->mutex->lock(this->mutex); + this->working_threads[worker->priority]--; + if (worker->job->status == JOB_STATUS_CANCELED) + { /* job was canceled via a custom cancel() method or did not + * use JOB_REQUEUE_TYPE_DIRECT */ + to_destroy = worker->job; + } + else + { + switch (requeue.type) + { + case JOB_REQUEUE_TYPE_NONE: + worker->job->status = JOB_STATUS_DONE; + to_destroy = worker->job; + break; + case JOB_REQUEUE_TYPE_FAIR: + worker->job->status = JOB_STATUS_QUEUED; + this->jobs[worker->priority]->insert_last( + this->jobs[worker->priority], worker->job); + this->job_added->signal(this->job_added); + break; + case JOB_REQUEUE_TYPE_SCHEDULE: + /* scheduler_t does not hold its lock when queuing jobs + * so this should be safe without unlocking our mutex */ + switch (requeue.schedule) + { + case JOB_SCHEDULE: + lib->scheduler->schedule_job(lib->scheduler, + worker->job, requeue.time.rel); + break; + case JOB_SCHEDULE_MS: + lib->scheduler->schedule_job_ms(lib->scheduler, + worker->job, requeue.time.rel); + break; + case JOB_SCHEDULE_TV: + lib->scheduler->schedule_job_tv(lib->scheduler, + worker->job, requeue.time.abs); + break; + } + break; + default: + break; + } + } + /* unset the current job to avoid interference with cancel() when + * destroying the job below */ + worker->job = NULL; + + if (to_destroy) + { /* release mutex to avoid deadlocks if the same lock is required + * during queue_job() and in the destructor called here */ + this->mutex->unlock(this->mutex); + to_destroy->destroy(to_destroy); + this->mutex->lock(this->mutex); + } +} + +/** * Process queued jobs, called by the worker threads */ -static void process_jobs(private_processor_t *this) +static void process_jobs(worker_thread_t *worker) { - /* worker threads are not cancellable by default */ + private_processor_t *this = worker->processor; + + /* worker threads are not cancelable by default */ thread_cancelability(FALSE); DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id()); @@ -157,43 +316,11 @@ static void process_jobs(private_processor_t *this) this->mutex->lock(this->mutex); while (this->desired_threads >= this->total_threads) { - job_t *job = NULL; - int i, reserved = 0, idle; - - idle = get_idle_threads_nolock(this); - - for (i = 0; i < JOB_PRIO_MAX; i++) + if (get_job(this, worker)) { - if (reserved && reserved >= idle) - { - DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " - "but %d reserved for higher priorities", - job_priority_names, i, idle, reserved); - break; - } - if (this->working_threads[i] < this->prio_threads[i]) - { - reserved += this->prio_threads[i] - this->working_threads[i]; - } - if (this->jobs[i]->remove_first(this->jobs[i], - (void**)&job) == SUCCESS) - { - this->working_threads[i]++; - this->mutex->unlock(this->mutex); - this->priority->set(this->priority, (void*)(intptr_t)i); - /* terminated threads are restarted to get a constant pool */ - thread_cleanup_push((thread_cleanup_t)restart, this); - thread_cleanup_push((thread_cleanup_t)decrement_working_threads, - this); - job->execute(job); - thread_cleanup_pop(FALSE); - thread_cleanup_pop(FALSE); - this->mutex->lock(this->mutex); - this->working_threads[i]--; - break; - } + process_job(this, worker); } - if (!job) + else { this->job_added->wait(this->job_added, this->mutex); } @@ -266,31 +393,65 @@ METHOD(processor_t, queue_job, void, job_priority_t prio; prio = sane_prio(job->get_priority(job)); + job->status = JOB_STATUS_QUEUED; + this->mutex->lock(this->mutex); this->jobs[prio]->insert_last(this->jobs[prio], job); this->job_added->signal(this->job_added); this->mutex->unlock(this->mutex); } +METHOD(processor_t, execute_job, void, + private_processor_t *this, job_t *job) +{ + job_priority_t prio; + bool queued = FALSE; + + this->mutex->lock(this->mutex); + if (this->desired_threads && get_idle_threads_nolock(this)) + { + prio = sane_prio(job->get_priority(job)); + job->status = JOB_STATUS_QUEUED; + /* insert job in front to execute it immediately */ + this->jobs[prio]->insert_first(this->jobs[prio], job); + queued = TRUE; + } + this->job_added->signal(this->job_added); + this->mutex->unlock(this->mutex); + + if (!queued) + { + job->execute(job); + job->destroy(job); + } +} + METHOD(processor_t, set_threads, void, private_processor_t *this, u_int count) { this->mutex->lock(this->mutex); if (count > this->total_threads) { /* increase thread count */ + worker_thread_t *worker; int i; - thread_t *current; this->desired_threads = count; DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads); for (i = this->total_threads; i < count; i++) { - current = thread_create((thread_main_t)process_jobs, this); - if (current) + INIT(worker, + .processor = this, + ); + worker->thread = thread_create((thread_main_t)process_jobs, worker); + if (worker->thread) { - this->threads->insert_last(this->threads, current); + this->threads->insert_last(this->threads, worker); this->total_threads++; } + else + { + free(worker); + } } } else if (count < this->total_threads) @@ -301,26 +462,49 @@ METHOD(processor_t, set_threads, void, this->mutex->unlock(this->mutex); } -METHOD(processor_t, destroy, void, +METHOD(processor_t, cancel, void, private_processor_t *this) { - thread_t *current; - int i; + enumerator_t *enumerator; + worker_thread_t *worker; - set_threads(this, 0); this->mutex->lock(this->mutex); + this->desired_threads = 0; + /* cancel potentially blocking jobs */ + enumerator = this->threads->create_enumerator(this->threads); + while (enumerator->enumerate(enumerator, (void**)&worker)) + { + if (worker->job && worker->job->cancel) + { + worker->job->status = JOB_STATUS_CANCELED; + if (!worker->job->cancel(worker->job)) + { /* job requests to be canceled explicitly, otherwise we assume + * the thread terminates itself and can be joined */ + worker->thread->cancel(worker->thread); + } + } + } + enumerator->destroy(enumerator); while (this->total_threads > 0) { this->job_added->broadcast(this->job_added); this->thread_terminated->wait(this->thread_terminated, this->mutex); } while (this->threads->remove_first(this->threads, - (void**)¤t) == SUCCESS) + (void**)&worker) == SUCCESS) { - current->join(current); + worker->thread->join(worker->thread); + free(worker); } this->mutex->unlock(this->mutex); - this->priority->destroy(this->priority); +} + +METHOD(processor_t, destroy, void, + private_processor_t *this) +{ + int i; + + cancel(this); this->thread_terminated->destroy(this->thread_terminated); this->job_added->destroy(this->job_added); this->mutex->destroy(this->mutex); @@ -347,11 +531,12 @@ processor_t *processor_create() .get_working_threads = _get_working_threads, .get_job_load = _get_job_load, .queue_job = _queue_job, + .execute_job = _execute_job, .set_threads = _set_threads, + .cancel = _cancel, .destroy = _destroy, }, .threads = linked_list_create(), - .priority = thread_value_create(NULL), .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .job_added = condvar_create(CONDVAR_TYPE_DEFAULT), .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT), @@ -366,4 +551,3 @@ processor_t *processor_create() return &this->public; } - |