diff options
author | Yves-Alexis Perez <corsac@corsac.net> | 2012-06-28 21:16:07 +0200 |
---|---|---|
committer | Yves-Alexis Perez <corsac@corsac.net> | 2012-06-28 21:16:07 +0200 |
commit | b34738ed08c2227300d554b139e2495ca5da97d6 (patch) | |
tree | 62f33b52820f2e49f0e53c0f8c636312037c8054 /src/libstrongswan/processing | |
parent | 0a9d51a49042a68daa15b0c74a2b7f152f52606b (diff) | |
download | vyos-strongswan-b34738ed08c2227300d554b139e2495ca5da97d6.tar.gz vyos-strongswan-b34738ed08c2227300d554b139e2495ca5da97d6.zip |
Imported Upstream version 4.6.4
Diffstat (limited to 'src/libstrongswan/processing')
-rw-r--r-- | src/libstrongswan/processing/jobs/callback_job.c | 31 | ||||
-rw-r--r-- | src/libstrongswan/processing/jobs/callback_job.h | 16 | ||||
-rw-r--r-- | src/libstrongswan/processing/jobs/job.c | 23 | ||||
-rw-r--r-- | src/libstrongswan/processing/jobs/job.h | 32 | ||||
-rw-r--r-- | src/libstrongswan/processing/processor.c | 154 | ||||
-rw-r--r-- | src/libstrongswan/processing/processor.h | 15 | ||||
-rw-r--r-- | src/libstrongswan/processing/scheduler.c | 3 | ||||
-rw-r--r-- | src/libstrongswan/processing/scheduler.h | 2 |
8 files changed, 239 insertions, 37 deletions
diff --git a/src/libstrongswan/processing/jobs/callback_job.c b/src/libstrongswan/processing/jobs/callback_job.c index 0043a9cdb..13f22e69c 100644 --- a/src/libstrongswan/processing/jobs/callback_job.c +++ b/src/libstrongswan/processing/jobs/callback_job.c @@ -62,7 +62,7 @@ struct private_callback_job_t { mutex_t *mutex; /** - * list of asociated child jobs + * list of associated child jobs */ linked_list_t *children; @@ -88,6 +88,11 @@ struct private_callback_job_t { * without risking that it gets freed too early during destruction. */ sem_t *terminated; + + /** + * Priority of this job + */ + job_priority_t prio; }; /** @@ -227,12 +232,18 @@ METHOD(job_t, execute, void, thread_cleanup_pop(cleanup); } +METHOD(job_t, get_priority, job_priority_t, + private_callback_job_t *this) +{ + return this->prio; +} + /* * Described in header. */ -callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, - callback_job_cleanup_t cleanup, - callback_job_t *parent) +callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, + callback_job_cleanup_t cleanup, callback_job_t *parent, + job_priority_t prio) { private_callback_job_t *this; @@ -240,6 +251,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, .public = { .job = { .execute = _execute, + .get_priority = _get_priority, .destroy = _destroy, }, .cancel = _cancel, @@ -251,6 +263,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, .children = linked_list_create(), .parent = (private_callback_job_t*)parent, .destroyable = condvar_create(CONDVAR_TYPE_DEFAULT), + .prio = prio, ); /* register us at parent */ @@ -264,3 +277,13 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, return &this->public; } +/* + * Described in header. + */ +callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, + callback_job_cleanup_t cleanup, + callback_job_t *parent) +{ + return callback_job_create_with_prio(cb, data, cleanup, parent, + JOB_PRIO_MEDIUM); +} diff --git a/src/libstrongswan/processing/jobs/callback_job.h b/src/libstrongswan/processing/jobs/callback_job.h index 1eb5664d3..3e92b01c0 100644 --- a/src/libstrongswan/processing/jobs/callback_job.h +++ b/src/libstrongswan/processing/jobs/callback_job.h @@ -120,4 +120,20 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, callback_job_cleanup_t cleanup, callback_job_t *parent); +/** + * Creates a callback job, with priority. + * + * Same as callback_job_create(), but with different priorities than default. + * + * @param cb callback to call from the processor + * @param data user data to supply to callback + * @param cleanup destructor for data on destruction, or NULL + * @param parent parent of this job + * @param prio job priority + * @return callback_job_t object + */ +callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, + callback_job_cleanup_t cleanup, callback_job_t *parent, + job_priority_t prio); + #endif /** CALLBACK_JOB_H_ @}*/ diff --git a/src/libstrongswan/processing/jobs/job.c b/src/libstrongswan/processing/jobs/job.c new file mode 100644 index 000000000..ccb897173 --- /dev/null +++ b/src/libstrongswan/processing/jobs/job.c @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2011 Martin Willi + * Copyright (C) 2011 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "job.h" + +ENUM(job_priority_names, JOB_PRIO_CRITICAL, JOB_PRIO_LOW, + "critical", + "high", + "medium", + "low", +); diff --git a/src/libstrongswan/processing/jobs/job.h b/src/libstrongswan/processing/jobs/job.h index 0f1c16ebe..d25cee03e 100644 --- a/src/libstrongswan/processing/jobs/job.h +++ b/src/libstrongswan/processing/jobs/job.h @@ -23,11 +23,32 @@ #define JOB_H_ typedef struct job_t job_t; +typedef enum job_priority_t job_priority_t; #include <library.h> /** - * Job-Interface as it is stored in the job queue. + * Priority classes of jobs + */ +enum job_priority_t { + /** Critical infrastructure jobs that should always been served */ + JOB_PRIO_CRITICAL = 0, + /** Short jobs executed with highest priority */ + JOB_PRIO_HIGH, + /** Default job priority */ + JOB_PRIO_MEDIUM, + /** Low priority jobs with thread blocking operations */ + JOB_PRIO_LOW, + JOB_PRIO_MAX +}; + +/** + * Enum names for job priorities + */ +extern enum_name_t *job_priority_names; + +/** + * Job interface as it is stored in the job queue. */ struct job_t { @@ -41,12 +62,19 @@ struct job_t { void (*execute) (job_t *this); /** + * Get the priority of a job. + * + * @return job priority + */ + job_priority_t (*get_priority)(job_t *this); + + /** * Destroy a job. * * Is only called whenever a job was not executed (e.g. due daemon shutdown). * After execution, jobs destroy themself. */ - void (*destroy) (job_t *job); + void (*destroy) (job_t *this); }; #endif /** JOB_H_ @}*/ diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index de556f86b..222f1a535 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2005-2011 Martin Willi * Copyright (C) 2011 revosec AG + * Copyright (C) 2008-2011 Tobias Brunner * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil * @@ -25,15 +26,16 @@ #include <threading/thread.h> #include <threading/condvar.h> #include <threading/mutex.h> +#include <threading/thread_value.h> #include <utils/linked_list.h> - typedef struct private_processor_t private_processor_t; /** * Private data of processor_t class. */ struct private_processor_t { + /** * Public processor_t interface. */ @@ -50,9 +52,9 @@ struct private_processor_t { u_int desired_threads; /** - * Number of threads waiting for work + * Number of threads currently working, for each priority */ - u_int idle_threads; + u_int working_threads[JOB_PRIO_MAX]; /** * All threads managed in the pool (including threads that have been @@ -61,12 +63,22 @@ struct private_processor_t { linked_list_t *threads; /** - * The jobs are stored in a linked list + * A list of queued jobs for each priority + */ + linked_list_t *jobs[JOB_PRIO_MAX]; + + /** + * Threads reserved for each priority + */ + int prio_threads[JOB_PRIO_MAX]; + + /** + * Priority of the job executed by a thread */ - linked_list_t *list; + thread_value_t *priority; /** - * access to linked_list is locked through this mutex + * access to job lists is locked through this mutex */ mutex_t *mutex; @@ -90,7 +102,7 @@ static void restart(private_processor_t *this) { thread_t *thread; - DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id()); + DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id()); /* respawn thread if required */ this->mutex->lock(this->mutex); @@ -108,6 +120,31 @@ static void restart(private_processor_t *this) } /** + * Decrement working thread count of a priority class + */ +static void decrement_working_threads(private_processor_t *this) +{ + this->mutex->lock(this->mutex); + this->working_threads[(intptr_t)this->priority->get(this->priority)]--; + this->mutex->unlock(this->mutex); +} + +/** + * Get number of idle threads, non-locking variant + */ +static u_int get_idle_threads_nolock(private_processor_t *this) +{ + u_int count, i; + + count = this->total_threads; + for (i = 0; i < JOB_PRIO_MAX; i++) + { + count -= this->working_threads[i]; + } + return count; +} + +/** * Process queued jobs, called by the worker threads */ static void process_jobs(private_processor_t *this) @@ -115,27 +152,51 @@ static void process_jobs(private_processor_t *this) /* worker threads are not cancellable by default */ thread_cancelability(FALSE); - DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id()); + DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id()); this->mutex->lock(this->mutex); while (this->desired_threads >= this->total_threads) { - job_t *job; + job_t *job = NULL; + int i, reserved = 0, idle; - if (this->list->get_count(this->list) == 0) + idle = get_idle_threads_nolock(this); + + for (i = 0; i < JOB_PRIO_MAX; i++) + { + if (reserved && reserved >= idle) + { + DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " + "but %d reserved for higher priorities", + job_priority_names, i, idle, reserved); + break; + } + if (this->working_threads[i] < this->prio_threads[i]) + { + reserved += this->prio_threads[i] - this->working_threads[i]; + } + if (this->jobs[i]->remove_first(this->jobs[i], + (void**)&job) == SUCCESS) + { + this->working_threads[i]++; + this->mutex->unlock(this->mutex); + this->priority->set(this->priority, (void*)(intptr_t)i); + /* terminated threads are restarted to get a constant pool */ + thread_cleanup_push((thread_cleanup_t)restart, this); + thread_cleanup_push((thread_cleanup_t)decrement_working_threads, + this); + job->execute(job); + thread_cleanup_pop(FALSE); + thread_cleanup_pop(FALSE); + this->mutex->lock(this->mutex); + this->working_threads[i]--; + break; + } + } + if (!job) { - this->idle_threads++; this->job_added->wait(this->job_added, this->mutex); - this->idle_threads--; - continue; } - this->list->remove_first(this->list, (void**)&job); - this->mutex->unlock(this->mutex); - /* terminated threads are restarted, so we have a constant pool */ - thread_cleanup_push((thread_cleanup_t)restart, this); - job->execute(job); - thread_cleanup_pop(FALSE); - this->mutex->lock(this->mutex); } this->total_threads--; this->thread_terminated->signal(this->thread_terminated); @@ -159,18 +220,42 @@ METHOD(processor_t, get_idle_threads, u_int, u_int count; this->mutex->lock(this->mutex); - count = this->idle_threads; + count = get_idle_threads_nolock(this); + this->mutex->unlock(this->mutex); + return count; +} + +/** + * Check priority bounds + */ +static job_priority_t sane_prio(job_priority_t prio) +{ + if ((int)prio < 0 || prio >= JOB_PRIO_MAX) + { + return JOB_PRIO_MAX - 1; + } + return prio; +} + +METHOD(processor_t, get_working_threads, u_int, + private_processor_t *this, job_priority_t prio) +{ + u_int count; + + this->mutex->lock(this->mutex); + count = this->working_threads[sane_prio(prio)]; this->mutex->unlock(this->mutex); return count; } METHOD(processor_t, get_job_load, u_int, - private_processor_t *this) + private_processor_t *this, job_priority_t prio) { u_int load; + prio = sane_prio(prio); this->mutex->lock(this->mutex); - load = this->list->get_count(this->list); + load = this->jobs[prio]->get_count(this->jobs[prio]); this->mutex->unlock(this->mutex); return load; } @@ -178,8 +263,11 @@ METHOD(processor_t, get_job_load, u_int, METHOD(processor_t, queue_job, void, private_processor_t *this, job_t *job) { + job_priority_t prio; + + prio = sane_prio(job->get_priority(job)); this->mutex->lock(this->mutex); - this->list->insert_last(this->list, job); + this->jobs[prio]->insert_last(this->jobs[prio], job); this->job_added->signal(this->job_added); this->mutex->unlock(this->mutex); } @@ -217,6 +305,7 @@ METHOD(processor_t, destroy, void, private_processor_t *this) { thread_t *current; + int i; set_threads(this, 0); this->mutex->lock(this->mutex); @@ -231,10 +320,14 @@ METHOD(processor_t, destroy, void, current->join(current); } this->mutex->unlock(this->mutex); + this->priority->destroy(this->priority); this->thread_terminated->destroy(this->thread_terminated); this->job_added->destroy(this->job_added); this->mutex->destroy(this->mutex); - this->list->destroy_offset(this->list, offsetof(job_t, destroy)); + for (i = 0; i < JOB_PRIO_MAX; i++) + { + this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy)); + } this->threads->destroy(this->threads); free(this); } @@ -245,22 +338,31 @@ METHOD(processor_t, destroy, void, processor_t *processor_create() { private_processor_t *this; + int i; INIT(this, .public = { .get_total_threads = _get_total_threads, .get_idle_threads = _get_idle_threads, + .get_working_threads = _get_working_threads, .get_job_load = _get_job_load, .queue_job = _queue_job, .set_threads = _set_threads, .destroy = _destroy, }, - .list = linked_list_create(), .threads = linked_list_create(), + .priority = thread_value_create(NULL), .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .job_added = condvar_create(CONDVAR_TYPE_DEFAULT), .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT), ); + for (i = 0; i < JOB_PRIO_MAX; i++) + { + this->jobs[i] = linked_list_create(); + this->prio_threads[i] = lib->settings->get_int(lib->settings, + "libstrongswan.processor.priority_threads.%N", 0, + job_priority_names, i); + } return &this->public; } diff --git a/src/libstrongswan/processing/processor.h b/src/libstrongswan/processing/processor.h index bebbe3a15..5db42c04c 100644 --- a/src/libstrongswan/processing/processor.h +++ b/src/libstrongswan/processing/processor.h @@ -42,18 +42,27 @@ struct processor_t { u_int (*get_total_threads) (processor_t *this); /** - * Get the number of threads currently waiting. + * Get the number of threads currently waiting for work. * * @return number of idle threads */ u_int (*get_idle_threads) (processor_t *this); /** - * Get the number of queued jobs. + * Get the number of threads currently working, per priority class. * + * @param prioritiy to check + * @return number of threads in priority working + */ + u_int (*get_working_threads)(processor_t *this, job_priority_t prio); + + /** + * Get the number of queued jobs for a specified priority. + * + * @param prio priority class to get job load for * @return number of items in queue */ - u_int (*get_job_load) (processor_t *this); + u_int (*get_job_load) (processor_t *this, job_priority_t prio); /** * Adds a job to the queue. diff --git a/src/libstrongswan/processing/scheduler.c b/src/libstrongswan/processing/scheduler.c index 7d9bcd70f..f3cc1164a 100644 --- a/src/libstrongswan/processing/scheduler.c +++ b/src/libstrongswan/processing/scheduler.c @@ -341,7 +341,8 @@ scheduler_t * scheduler_create() this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*)); - this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL); + this->job = callback_job_create_with_prio((callback_job_cb_t)schedule, + this, NULL, NULL, JOB_PRIO_CRITICAL); lib->processor->queue_job(lib->processor, (job_t*)this->job); return &this->public; diff --git a/src/libstrongswan/processing/scheduler.h b/src/libstrongswan/processing/scheduler.h index f2c72550f..abbf74e2c 100644 --- a/src/libstrongswan/processing/scheduler.h +++ b/src/libstrongswan/processing/scheduler.h @@ -35,7 +35,7 @@ typedef struct scheduler_t scheduler_t; * based data structure that satisfies the following property: if B is a child * node of A, then key(A) >= (or <=) key(B). So either the element with the * greatest (max-heap) or the smallest (min-heap) key is the root of the heap. - * We use a min-heap whith the key being the absolute unix time at which an + * We use a min-heap with the key being the absolute unix time at which an * event is scheduled. So the root is always the event that will fire next. * * An earlier implementation of the scheduler used a sorted linked list to store |