diff options
Diffstat (limited to 'src/libstrongswan/processing/processor.c')
-rw-r--r-- | src/libstrongswan/processing/processor.c | 154 |
1 files changed, 128 insertions, 26 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index de556f86b..222f1a535 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2005-2011 Martin Willi * Copyright (C) 2011 revosec AG + * Copyright (C) 2008-2011 Tobias Brunner * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil * @@ -25,15 +26,16 @@ #include <threading/thread.h> #include <threading/condvar.h> #include <threading/mutex.h> +#include <threading/thread_value.h> #include <utils/linked_list.h> - typedef struct private_processor_t private_processor_t; /** * Private data of processor_t class. */ struct private_processor_t { + /** * Public processor_t interface. */ @@ -50,9 +52,9 @@ struct private_processor_t { u_int desired_threads; /** - * Number of threads waiting for work + * Number of threads currently working, for each priority */ - u_int idle_threads; + u_int working_threads[JOB_PRIO_MAX]; /** * All threads managed in the pool (including threads that have been @@ -61,12 +63,22 @@ struct private_processor_t { linked_list_t *threads; /** - * The jobs are stored in a linked list + * A list of queued jobs for each priority + */ + linked_list_t *jobs[JOB_PRIO_MAX]; + + /** + * Threads reserved for each priority + */ + int prio_threads[JOB_PRIO_MAX]; + + /** + * Priority of the job executed by a thread */ - linked_list_t *list; + thread_value_t *priority; /** - * access to linked_list is locked through this mutex + * access to job lists is locked through this mutex */ mutex_t *mutex; @@ -90,7 +102,7 @@ static void restart(private_processor_t *this) { thread_t *thread; - DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id()); + DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id()); /* respawn thread if required */ this->mutex->lock(this->mutex); @@ -108,6 +120,31 @@ static void restart(private_processor_t *this) } /** + * Decrement working thread count of a priority class + */ +static void decrement_working_threads(private_processor_t *this) +{ + this->mutex->lock(this->mutex); + this->working_threads[(intptr_t)this->priority->get(this->priority)]--; + this->mutex->unlock(this->mutex); +} + +/** + * Get number of idle threads, non-locking variant + */ +static u_int get_idle_threads_nolock(private_processor_t *this) +{ + u_int count, i; + + count = this->total_threads; + for (i = 0; i < JOB_PRIO_MAX; i++) + { + count -= this->working_threads[i]; + } + return count; +} + +/** * Process queued jobs, called by the worker threads */ static void process_jobs(private_processor_t *this) @@ -115,27 +152,51 @@ static void process_jobs(private_processor_t *this) /* worker threads are not cancellable by default */ thread_cancelability(FALSE); - DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id()); + DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id()); this->mutex->lock(this->mutex); while (this->desired_threads >= this->total_threads) { - job_t *job; + job_t *job = NULL; + int i, reserved = 0, idle; - if (this->list->get_count(this->list) == 0) + idle = get_idle_threads_nolock(this); + + for (i = 0; i < JOB_PRIO_MAX; i++) + { + if (reserved && reserved >= idle) + { + DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " + "but %d reserved for higher priorities", + job_priority_names, i, idle, reserved); + break; + } + if (this->working_threads[i] < this->prio_threads[i]) + { + reserved += this->prio_threads[i] - this->working_threads[i]; + } + if (this->jobs[i]->remove_first(this->jobs[i], + (void**)&job) == SUCCESS) + { + this->working_threads[i]++; + this->mutex->unlock(this->mutex); + this->priority->set(this->priority, (void*)(intptr_t)i); + /* terminated threads are restarted to get a constant pool */ + thread_cleanup_push((thread_cleanup_t)restart, this); + thread_cleanup_push((thread_cleanup_t)decrement_working_threads, + this); + job->execute(job); + thread_cleanup_pop(FALSE); + thread_cleanup_pop(FALSE); + this->mutex->lock(this->mutex); + this->working_threads[i]--; + break; + } + } + if (!job) { - this->idle_threads++; this->job_added->wait(this->job_added, this->mutex); - this->idle_threads--; - continue; } - this->list->remove_first(this->list, (void**)&job); - this->mutex->unlock(this->mutex); - /* terminated threads are restarted, so we have a constant pool */ - thread_cleanup_push((thread_cleanup_t)restart, this); - job->execute(job); - thread_cleanup_pop(FALSE); - this->mutex->lock(this->mutex); } this->total_threads--; this->thread_terminated->signal(this->thread_terminated); @@ -159,18 +220,42 @@ METHOD(processor_t, get_idle_threads, u_int, u_int count; this->mutex->lock(this->mutex); - count = this->idle_threads; + count = get_idle_threads_nolock(this); + this->mutex->unlock(this->mutex); + return count; +} + +/** + * Check priority bounds + */ +static job_priority_t sane_prio(job_priority_t prio) +{ + if ((int)prio < 0 || prio >= JOB_PRIO_MAX) + { + return JOB_PRIO_MAX - 1; + } + return prio; +} + +METHOD(processor_t, get_working_threads, u_int, + private_processor_t *this, job_priority_t prio) +{ + u_int count; + + this->mutex->lock(this->mutex); + count = this->working_threads[sane_prio(prio)]; this->mutex->unlock(this->mutex); return count; } METHOD(processor_t, get_job_load, u_int, - private_processor_t *this) + private_processor_t *this, job_priority_t prio) { u_int load; + prio = sane_prio(prio); this->mutex->lock(this->mutex); - load = this->list->get_count(this->list); + load = this->jobs[prio]->get_count(this->jobs[prio]); this->mutex->unlock(this->mutex); return load; } @@ -178,8 +263,11 @@ METHOD(processor_t, get_job_load, u_int, METHOD(processor_t, queue_job, void, private_processor_t *this, job_t *job) { + job_priority_t prio; + + prio = sane_prio(job->get_priority(job)); this->mutex->lock(this->mutex); - this->list->insert_last(this->list, job); + this->jobs[prio]->insert_last(this->jobs[prio], job); this->job_added->signal(this->job_added); this->mutex->unlock(this->mutex); } @@ -217,6 +305,7 @@ METHOD(processor_t, destroy, void, private_processor_t *this) { thread_t *current; + int i; set_threads(this, 0); this->mutex->lock(this->mutex); @@ -231,10 +320,14 @@ METHOD(processor_t, destroy, void, current->join(current); } this->mutex->unlock(this->mutex); + this->priority->destroy(this->priority); this->thread_terminated->destroy(this->thread_terminated); this->job_added->destroy(this->job_added); this->mutex->destroy(this->mutex); - this->list->destroy_offset(this->list, offsetof(job_t, destroy)); + for (i = 0; i < JOB_PRIO_MAX; i++) + { + this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy)); + } this->threads->destroy(this->threads); free(this); } @@ -245,22 +338,31 @@ METHOD(processor_t, destroy, void, processor_t *processor_create() { private_processor_t *this; + int i; INIT(this, .public = { .get_total_threads = _get_total_threads, .get_idle_threads = _get_idle_threads, + .get_working_threads = _get_working_threads, .get_job_load = _get_job_load, .queue_job = _queue_job, .set_threads = _set_threads, .destroy = _destroy, }, - .list = linked_list_create(), .threads = linked_list_create(), + .priority = thread_value_create(NULL), .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .job_added = condvar_create(CONDVAR_TYPE_DEFAULT), .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT), ); + for (i = 0; i < JOB_PRIO_MAX; i++) + { + this->jobs[i] = linked_list_create(); + this->prio_threads[i] = lib->settings->get_int(lib->settings, + "libstrongswan.processor.priority_threads.%N", 0, + job_priority_names, i); + } return &this->public; } |