summaryrefslogtreecommitdiff
path: root/src/libstrongswan/processing/processor.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstrongswan/processing/processor.c')
-rw-r--r--src/libstrongswan/processing/processor.c154
1 files changed, 128 insertions, 26 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c
index de556f86b..222f1a535 100644
--- a/src/libstrongswan/processing/processor.c
+++ b/src/libstrongswan/processing/processor.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2005-2011 Martin Willi
* Copyright (C) 2011 revosec AG
+ * Copyright (C) 2008-2011 Tobias Brunner
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
*
@@ -25,15 +26,16 @@
#include <threading/thread.h>
#include <threading/condvar.h>
#include <threading/mutex.h>
+#include <threading/thread_value.h>
#include <utils/linked_list.h>
-
typedef struct private_processor_t private_processor_t;
/**
* Private data of processor_t class.
*/
struct private_processor_t {
+
/**
* Public processor_t interface.
*/
@@ -50,9 +52,9 @@ struct private_processor_t {
u_int desired_threads;
/**
- * Number of threads waiting for work
+ * Number of threads currently working, for each priority
*/
- u_int idle_threads;
+ u_int working_threads[JOB_PRIO_MAX];
/**
* All threads managed in the pool (including threads that have been
@@ -61,12 +63,22 @@ struct private_processor_t {
linked_list_t *threads;
/**
- * The jobs are stored in a linked list
+ * A list of queued jobs for each priority
+ */
+ linked_list_t *jobs[JOB_PRIO_MAX];
+
+ /**
+ * Threads reserved for each priority
+ */
+ int prio_threads[JOB_PRIO_MAX];
+
+ /**
+ * Priority of the job executed by a thread
*/
- linked_list_t *list;
+ thread_value_t *priority;
/**
- * access to linked_list is locked through this mutex
+ * access to job lists is locked through this mutex
*/
mutex_t *mutex;
@@ -90,7 +102,7 @@ static void restart(private_processor_t *this)
{
thread_t *thread;
- DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id());
+ DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
/* respawn thread if required */
this->mutex->lock(this->mutex);
@@ -108,6 +120,31 @@ static void restart(private_processor_t *this)
}
/**
+ * Decrement working thread count of a priority class
+ */
+static void decrement_working_threads(private_processor_t *this)
+{
+ this->mutex->lock(this->mutex);
+ this->working_threads[(intptr_t)this->priority->get(this->priority)]--;
+ this->mutex->unlock(this->mutex);
+}
+
+/**
+ * Get number of idle threads, non-locking variant
+ */
+static u_int get_idle_threads_nolock(private_processor_t *this)
+{
+ u_int count, i;
+
+ count = this->total_threads;
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ count -= this->working_threads[i];
+ }
+ return count;
+}
+
+/**
* Process queued jobs, called by the worker threads
*/
static void process_jobs(private_processor_t *this)
@@ -115,27 +152,51 @@ static void process_jobs(private_processor_t *this)
/* worker threads are not cancellable by default */
thread_cancelability(FALSE);
- DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id());
+ DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
{
- job_t *job;
+ job_t *job = NULL;
+ int i, reserved = 0, idle;
- if (this->list->get_count(this->list) == 0)
+ idle = get_idle_threads_nolock(this);
+
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ if (reserved && reserved >= idle)
+ {
+ DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
+ "but %d reserved for higher priorities",
+ job_priority_names, i, idle, reserved);
+ break;
+ }
+ if (this->working_threads[i] < this->prio_threads[i])
+ {
+ reserved += this->prio_threads[i] - this->working_threads[i];
+ }
+ if (this->jobs[i]->remove_first(this->jobs[i],
+ (void**)&job) == SUCCESS)
+ {
+ this->working_threads[i]++;
+ this->mutex->unlock(this->mutex);
+ this->priority->set(this->priority, (void*)(intptr_t)i);
+ /* terminated threads are restarted to get a constant pool */
+ thread_cleanup_push((thread_cleanup_t)restart, this);
+ thread_cleanup_push((thread_cleanup_t)decrement_working_threads,
+ this);
+ job->execute(job);
+ thread_cleanup_pop(FALSE);
+ thread_cleanup_pop(FALSE);
+ this->mutex->lock(this->mutex);
+ this->working_threads[i]--;
+ break;
+ }
+ }
+ if (!job)
{
- this->idle_threads++;
this->job_added->wait(this->job_added, this->mutex);
- this->idle_threads--;
- continue;
}
- this->list->remove_first(this->list, (void**)&job);
- this->mutex->unlock(this->mutex);
- /* terminated threads are restarted, so we have a constant pool */
- thread_cleanup_push((thread_cleanup_t)restart, this);
- job->execute(job);
- thread_cleanup_pop(FALSE);
- this->mutex->lock(this->mutex);
}
this->total_threads--;
this->thread_terminated->signal(this->thread_terminated);
@@ -159,18 +220,42 @@ METHOD(processor_t, get_idle_threads, u_int,
u_int count;
this->mutex->lock(this->mutex);
- count = this->idle_threads;
+ count = get_idle_threads_nolock(this);
+ this->mutex->unlock(this->mutex);
+ return count;
+}
+
+/**
+ * Check priority bounds
+ */
+static job_priority_t sane_prio(job_priority_t prio)
+{
+ if ((int)prio < 0 || prio >= JOB_PRIO_MAX)
+ {
+ return JOB_PRIO_MAX - 1;
+ }
+ return prio;
+}
+
+METHOD(processor_t, get_working_threads, u_int,
+ private_processor_t *this, job_priority_t prio)
+{
+ u_int count;
+
+ this->mutex->lock(this->mutex);
+ count = this->working_threads[sane_prio(prio)];
this->mutex->unlock(this->mutex);
return count;
}
METHOD(processor_t, get_job_load, u_int,
- private_processor_t *this)
+ private_processor_t *this, job_priority_t prio)
{
u_int load;
+ prio = sane_prio(prio);
this->mutex->lock(this->mutex);
- load = this->list->get_count(this->list);
+ load = this->jobs[prio]->get_count(this->jobs[prio]);
this->mutex->unlock(this->mutex);
return load;
}
@@ -178,8 +263,11 @@ METHOD(processor_t, get_job_load, u_int,
METHOD(processor_t, queue_job, void,
private_processor_t *this, job_t *job)
{
+ job_priority_t prio;
+
+ prio = sane_prio(job->get_priority(job));
this->mutex->lock(this->mutex);
- this->list->insert_last(this->list, job);
+ this->jobs[prio]->insert_last(this->jobs[prio], job);
this->job_added->signal(this->job_added);
this->mutex->unlock(this->mutex);
}
@@ -217,6 +305,7 @@ METHOD(processor_t, destroy, void,
private_processor_t *this)
{
thread_t *current;
+ int i;
set_threads(this, 0);
this->mutex->lock(this->mutex);
@@ -231,10 +320,14 @@ METHOD(processor_t, destroy, void,
current->join(current);
}
this->mutex->unlock(this->mutex);
+ this->priority->destroy(this->priority);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
- this->list->destroy_offset(this->list, offsetof(job_t, destroy));
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy));
+ }
this->threads->destroy(this->threads);
free(this);
}
@@ -245,22 +338,31 @@ METHOD(processor_t, destroy, void,
processor_t *processor_create()
{
private_processor_t *this;
+ int i;
INIT(this,
.public = {
.get_total_threads = _get_total_threads,
.get_idle_threads = _get_idle_threads,
+ .get_working_threads = _get_working_threads,
.get_job_load = _get_job_load,
.queue_job = _queue_job,
.set_threads = _set_threads,
.destroy = _destroy,
},
- .list = linked_list_create(),
.threads = linked_list_create(),
+ .priority = thread_value_create(NULL),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
.thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
);
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ this->jobs[i] = linked_list_create();
+ this->prio_threads[i] = lib->settings->get_int(lib->settings,
+ "libstrongswan.processor.priority_threads.%N", 0,
+ job_priority_names, i);
+ }
return &this->public;
}