summaryrefslogtreecommitdiff
path: root/src/libstrongswan/processing
diff options
context:
space:
mode:
authorYves-Alexis Perez <corsac@corsac.net>2012-06-28 21:16:07 +0200
committerYves-Alexis Perez <corsac@corsac.net>2012-06-28 21:16:07 +0200
commita3b482a8facde4b453ad821bfe40effbe3d17903 (patch)
tree636f02074b05b7473f5db1fe60fa2bceb0094a62 /src/libstrongswan/processing
parentd816a1afbd841e9943bb439fe4e110b7c4970550 (diff)
parentb34738ed08c2227300d554b139e2495ca5da97d6 (diff)
downloadvyos-strongswan-a3b482a8facde4b453ad821bfe40effbe3d17903.tar.gz
vyos-strongswan-a3b482a8facde4b453ad821bfe40effbe3d17903.zip
Merge tag 'upstream/4.6.4'
Upstream version 4.6.4
Diffstat (limited to 'src/libstrongswan/processing')
-rw-r--r--src/libstrongswan/processing/jobs/callback_job.c31
-rw-r--r--src/libstrongswan/processing/jobs/callback_job.h16
-rw-r--r--src/libstrongswan/processing/jobs/job.c23
-rw-r--r--src/libstrongswan/processing/jobs/job.h32
-rw-r--r--src/libstrongswan/processing/processor.c154
-rw-r--r--src/libstrongswan/processing/processor.h15
-rw-r--r--src/libstrongswan/processing/scheduler.c3
-rw-r--r--src/libstrongswan/processing/scheduler.h2
8 files changed, 239 insertions, 37 deletions
diff --git a/src/libstrongswan/processing/jobs/callback_job.c b/src/libstrongswan/processing/jobs/callback_job.c
index 0043a9cdb..13f22e69c 100644
--- a/src/libstrongswan/processing/jobs/callback_job.c
+++ b/src/libstrongswan/processing/jobs/callback_job.c
@@ -62,7 +62,7 @@ struct private_callback_job_t {
mutex_t *mutex;
/**
- * list of asociated child jobs
+ * list of associated child jobs
*/
linked_list_t *children;
@@ -88,6 +88,11 @@ struct private_callback_job_t {
* without risking that it gets freed too early during destruction.
*/
sem_t *terminated;
+
+ /**
+ * Priority of this job
+ */
+ job_priority_t prio;
};
/**
@@ -227,12 +232,18 @@ METHOD(job_t, execute, void,
thread_cleanup_pop(cleanup);
}
+METHOD(job_t, get_priority, job_priority_t,
+ private_callback_job_t *this)
+{
+ return this->prio;
+}
+
/*
* Described in header.
*/
-callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
- callback_job_cleanup_t cleanup,
- callback_job_t *parent)
+callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
+ callback_job_cleanup_t cleanup, callback_job_t *parent,
+ job_priority_t prio)
{
private_callback_job_t *this;
@@ -240,6 +251,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
.public = {
.job = {
.execute = _execute,
+ .get_priority = _get_priority,
.destroy = _destroy,
},
.cancel = _cancel,
@@ -251,6 +263,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
.children = linked_list_create(),
.parent = (private_callback_job_t*)parent,
.destroyable = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .prio = prio,
);
/* register us at parent */
@@ -264,3 +277,13 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
return &this->public;
}
+/*
+ * Described in header.
+ */
+callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
+ callback_job_cleanup_t cleanup,
+ callback_job_t *parent)
+{
+ return callback_job_create_with_prio(cb, data, cleanup, parent,
+ JOB_PRIO_MEDIUM);
+}
diff --git a/src/libstrongswan/processing/jobs/callback_job.h b/src/libstrongswan/processing/jobs/callback_job.h
index 1eb5664d3..3e92b01c0 100644
--- a/src/libstrongswan/processing/jobs/callback_job.h
+++ b/src/libstrongswan/processing/jobs/callback_job.h
@@ -120,4 +120,20 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
callback_job_cleanup_t cleanup,
callback_job_t *parent);
+/**
+ * Creates a callback job, with priority.
+ *
+ * Same as callback_job_create(), but with different priorities than default.
+ *
+ * @param cb callback to call from the processor
+ * @param data user data to supply to callback
+ * @param cleanup destructor for data on destruction, or NULL
+ * @param parent parent of this job
+ * @param prio job priority
+ * @return callback_job_t object
+ */
+callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
+ callback_job_cleanup_t cleanup, callback_job_t *parent,
+ job_priority_t prio);
+
#endif /** CALLBACK_JOB_H_ @}*/
diff --git a/src/libstrongswan/processing/jobs/job.c b/src/libstrongswan/processing/jobs/job.c
new file mode 100644
index 000000000..ccb897173
--- /dev/null
+++ b/src/libstrongswan/processing/jobs/job.c
@@ -0,0 +1,23 @@
+/*
+ * Copyright (C) 2011 Martin Willi
+ * Copyright (C) 2011 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include "job.h"
+
+ENUM(job_priority_names, JOB_PRIO_CRITICAL, JOB_PRIO_LOW,
+ "critical",
+ "high",
+ "medium",
+ "low",
+);
diff --git a/src/libstrongswan/processing/jobs/job.h b/src/libstrongswan/processing/jobs/job.h
index 0f1c16ebe..d25cee03e 100644
--- a/src/libstrongswan/processing/jobs/job.h
+++ b/src/libstrongswan/processing/jobs/job.h
@@ -23,11 +23,32 @@
#define JOB_H_
typedef struct job_t job_t;
+typedef enum job_priority_t job_priority_t;
#include <library.h>
/**
- * Job-Interface as it is stored in the job queue.
+ * Priority classes of jobs
+ */
+enum job_priority_t {
+ /** Critical infrastructure jobs that should always been served */
+ JOB_PRIO_CRITICAL = 0,
+ /** Short jobs executed with highest priority */
+ JOB_PRIO_HIGH,
+ /** Default job priority */
+ JOB_PRIO_MEDIUM,
+ /** Low priority jobs with thread blocking operations */
+ JOB_PRIO_LOW,
+ JOB_PRIO_MAX
+};
+
+/**
+ * Enum names for job priorities
+ */
+extern enum_name_t *job_priority_names;
+
+/**
+ * Job interface as it is stored in the job queue.
*/
struct job_t {
@@ -41,12 +62,19 @@ struct job_t {
void (*execute) (job_t *this);
/**
+ * Get the priority of a job.
+ *
+ * @return job priority
+ */
+ job_priority_t (*get_priority)(job_t *this);
+
+ /**
* Destroy a job.
*
* Is only called whenever a job was not executed (e.g. due daemon shutdown).
* After execution, jobs destroy themself.
*/
- void (*destroy) (job_t *job);
+ void (*destroy) (job_t *this);
};
#endif /** JOB_H_ @}*/
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c
index de556f86b..222f1a535 100644
--- a/src/libstrongswan/processing/processor.c
+++ b/src/libstrongswan/processing/processor.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2005-2011 Martin Willi
* Copyright (C) 2011 revosec AG
+ * Copyright (C) 2008-2011 Tobias Brunner
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
*
@@ -25,15 +26,16 @@
#include <threading/thread.h>
#include <threading/condvar.h>
#include <threading/mutex.h>
+#include <threading/thread_value.h>
#include <utils/linked_list.h>
-
typedef struct private_processor_t private_processor_t;
/**
* Private data of processor_t class.
*/
struct private_processor_t {
+
/**
* Public processor_t interface.
*/
@@ -50,9 +52,9 @@ struct private_processor_t {
u_int desired_threads;
/**
- * Number of threads waiting for work
+ * Number of threads currently working, for each priority
*/
- u_int idle_threads;
+ u_int working_threads[JOB_PRIO_MAX];
/**
* All threads managed in the pool (including threads that have been
@@ -61,12 +63,22 @@ struct private_processor_t {
linked_list_t *threads;
/**
- * The jobs are stored in a linked list
+ * A list of queued jobs for each priority
+ */
+ linked_list_t *jobs[JOB_PRIO_MAX];
+
+ /**
+ * Threads reserved for each priority
+ */
+ int prio_threads[JOB_PRIO_MAX];
+
+ /**
+ * Priority of the job executed by a thread
*/
- linked_list_t *list;
+ thread_value_t *priority;
/**
- * access to linked_list is locked through this mutex
+ * access to job lists is locked through this mutex
*/
mutex_t *mutex;
@@ -90,7 +102,7 @@ static void restart(private_processor_t *this)
{
thread_t *thread;
- DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id());
+ DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
/* respawn thread if required */
this->mutex->lock(this->mutex);
@@ -108,6 +120,31 @@ static void restart(private_processor_t *this)
}
/**
+ * Decrement working thread count of a priority class
+ */
+static void decrement_working_threads(private_processor_t *this)
+{
+ this->mutex->lock(this->mutex);
+ this->working_threads[(intptr_t)this->priority->get(this->priority)]--;
+ this->mutex->unlock(this->mutex);
+}
+
+/**
+ * Get number of idle threads, non-locking variant
+ */
+static u_int get_idle_threads_nolock(private_processor_t *this)
+{
+ u_int count, i;
+
+ count = this->total_threads;
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ count -= this->working_threads[i];
+ }
+ return count;
+}
+
+/**
* Process queued jobs, called by the worker threads
*/
static void process_jobs(private_processor_t *this)
@@ -115,27 +152,51 @@ static void process_jobs(private_processor_t *this)
/* worker threads are not cancellable by default */
thread_cancelability(FALSE);
- DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id());
+ DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
{
- job_t *job;
+ job_t *job = NULL;
+ int i, reserved = 0, idle;
- if (this->list->get_count(this->list) == 0)
+ idle = get_idle_threads_nolock(this);
+
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ if (reserved && reserved >= idle)
+ {
+ DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
+ "but %d reserved for higher priorities",
+ job_priority_names, i, idle, reserved);
+ break;
+ }
+ if (this->working_threads[i] < this->prio_threads[i])
+ {
+ reserved += this->prio_threads[i] - this->working_threads[i];
+ }
+ if (this->jobs[i]->remove_first(this->jobs[i],
+ (void**)&job) == SUCCESS)
+ {
+ this->working_threads[i]++;
+ this->mutex->unlock(this->mutex);
+ this->priority->set(this->priority, (void*)(intptr_t)i);
+ /* terminated threads are restarted to get a constant pool */
+ thread_cleanup_push((thread_cleanup_t)restart, this);
+ thread_cleanup_push((thread_cleanup_t)decrement_working_threads,
+ this);
+ job->execute(job);
+ thread_cleanup_pop(FALSE);
+ thread_cleanup_pop(FALSE);
+ this->mutex->lock(this->mutex);
+ this->working_threads[i]--;
+ break;
+ }
+ }
+ if (!job)
{
- this->idle_threads++;
this->job_added->wait(this->job_added, this->mutex);
- this->idle_threads--;
- continue;
}
- this->list->remove_first(this->list, (void**)&job);
- this->mutex->unlock(this->mutex);
- /* terminated threads are restarted, so we have a constant pool */
- thread_cleanup_push((thread_cleanup_t)restart, this);
- job->execute(job);
- thread_cleanup_pop(FALSE);
- this->mutex->lock(this->mutex);
}
this->total_threads--;
this->thread_terminated->signal(this->thread_terminated);
@@ -159,18 +220,42 @@ METHOD(processor_t, get_idle_threads, u_int,
u_int count;
this->mutex->lock(this->mutex);
- count = this->idle_threads;
+ count = get_idle_threads_nolock(this);
+ this->mutex->unlock(this->mutex);
+ return count;
+}
+
+/**
+ * Check priority bounds
+ */
+static job_priority_t sane_prio(job_priority_t prio)
+{
+ if ((int)prio < 0 || prio >= JOB_PRIO_MAX)
+ {
+ return JOB_PRIO_MAX - 1;
+ }
+ return prio;
+}
+
+METHOD(processor_t, get_working_threads, u_int,
+ private_processor_t *this, job_priority_t prio)
+{
+ u_int count;
+
+ this->mutex->lock(this->mutex);
+ count = this->working_threads[sane_prio(prio)];
this->mutex->unlock(this->mutex);
return count;
}
METHOD(processor_t, get_job_load, u_int,
- private_processor_t *this)
+ private_processor_t *this, job_priority_t prio)
{
u_int load;
+ prio = sane_prio(prio);
this->mutex->lock(this->mutex);
- load = this->list->get_count(this->list);
+ load = this->jobs[prio]->get_count(this->jobs[prio]);
this->mutex->unlock(this->mutex);
return load;
}
@@ -178,8 +263,11 @@ METHOD(processor_t, get_job_load, u_int,
METHOD(processor_t, queue_job, void,
private_processor_t *this, job_t *job)
{
+ job_priority_t prio;
+
+ prio = sane_prio(job->get_priority(job));
this->mutex->lock(this->mutex);
- this->list->insert_last(this->list, job);
+ this->jobs[prio]->insert_last(this->jobs[prio], job);
this->job_added->signal(this->job_added);
this->mutex->unlock(this->mutex);
}
@@ -217,6 +305,7 @@ METHOD(processor_t, destroy, void,
private_processor_t *this)
{
thread_t *current;
+ int i;
set_threads(this, 0);
this->mutex->lock(this->mutex);
@@ -231,10 +320,14 @@ METHOD(processor_t, destroy, void,
current->join(current);
}
this->mutex->unlock(this->mutex);
+ this->priority->destroy(this->priority);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
- this->list->destroy_offset(this->list, offsetof(job_t, destroy));
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy));
+ }
this->threads->destroy(this->threads);
free(this);
}
@@ -245,22 +338,31 @@ METHOD(processor_t, destroy, void,
processor_t *processor_create()
{
private_processor_t *this;
+ int i;
INIT(this,
.public = {
.get_total_threads = _get_total_threads,
.get_idle_threads = _get_idle_threads,
+ .get_working_threads = _get_working_threads,
.get_job_load = _get_job_load,
.queue_job = _queue_job,
.set_threads = _set_threads,
.destroy = _destroy,
},
- .list = linked_list_create(),
.threads = linked_list_create(),
+ .priority = thread_value_create(NULL),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
.thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
);
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ this->jobs[i] = linked_list_create();
+ this->prio_threads[i] = lib->settings->get_int(lib->settings,
+ "libstrongswan.processor.priority_threads.%N", 0,
+ job_priority_names, i);
+ }
return &this->public;
}
diff --git a/src/libstrongswan/processing/processor.h b/src/libstrongswan/processing/processor.h
index bebbe3a15..5db42c04c 100644
--- a/src/libstrongswan/processing/processor.h
+++ b/src/libstrongswan/processing/processor.h
@@ -42,18 +42,27 @@ struct processor_t {
u_int (*get_total_threads) (processor_t *this);
/**
- * Get the number of threads currently waiting.
+ * Get the number of threads currently waiting for work.
*
* @return number of idle threads
*/
u_int (*get_idle_threads) (processor_t *this);
/**
- * Get the number of queued jobs.
+ * Get the number of threads currently working, per priority class.
*
+ * @param prioritiy to check
+ * @return number of threads in priority working
+ */
+ u_int (*get_working_threads)(processor_t *this, job_priority_t prio);
+
+ /**
+ * Get the number of queued jobs for a specified priority.
+ *
+ * @param prio priority class to get job load for
* @return number of items in queue
*/
- u_int (*get_job_load) (processor_t *this);
+ u_int (*get_job_load) (processor_t *this, job_priority_t prio);
/**
* Adds a job to the queue.
diff --git a/src/libstrongswan/processing/scheduler.c b/src/libstrongswan/processing/scheduler.c
index 7d9bcd70f..f3cc1164a 100644
--- a/src/libstrongswan/processing/scheduler.c
+++ b/src/libstrongswan/processing/scheduler.c
@@ -341,7 +341,8 @@ scheduler_t * scheduler_create()
this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
- this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL);
+ this->job = callback_job_create_with_prio((callback_job_cb_t)schedule,
+ this, NULL, NULL, JOB_PRIO_CRITICAL);
lib->processor->queue_job(lib->processor, (job_t*)this->job);
return &this->public;
diff --git a/src/libstrongswan/processing/scheduler.h b/src/libstrongswan/processing/scheduler.h
index f2c72550f..abbf74e2c 100644
--- a/src/libstrongswan/processing/scheduler.h
+++ b/src/libstrongswan/processing/scheduler.h
@@ -35,7 +35,7 @@ typedef struct scheduler_t scheduler_t;
* based data structure that satisfies the following property: if B is a child
* node of A, then key(A) >= (or <=) key(B). So either the element with the
* greatest (max-heap) or the smallest (min-heap) key is the root of the heap.
- * We use a min-heap whith the key being the absolute unix time at which an
+ * We use a min-heap with the key being the absolute unix time at which an
* event is scheduled. So the root is always the event that will fire next.
*
* An earlier implementation of the scheduler used a sorted linked list to store