summaryrefslogtreecommitdiff
path: root/src/charon/processing/processor.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/charon/processing/processor.c')
-rw-r--r--src/charon/processing/processor.c87
1 files changed, 54 insertions, 33 deletions
diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c
index 4a3943323..d5774af26 100644
--- a/src/charon/processing/processor.c
+++ b/src/charon/processing/processor.c
@@ -13,16 +13,17 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
-
+
#include <stdlib.h>
-#include <pthread.h>
#include <string.h>
#include <errno.h>
#include "processor.h"
#include <daemon.h>
-#include <utils/mutex.h>
+#include <threading/thread.h>
+#include <threading/condvar.h>
+#include <threading/mutex.h>
#include <utils/linked_list.h>
@@ -41,22 +42,28 @@ struct private_processor_t {
* Number of running threads
*/
u_int total_threads;
-
+
/**
* Desired number of threads
*/
u_int desired_threads;
-
+
/**
* Number of threads waiting for work
*/
u_int idle_threads;
/**
+ * All threads managed in the pool (including threads that have been
+ * cancelled, this allows to join them during destruction)
+ */
+ linked_list_t *threads;
+
+ /**
* The jobs are stored in a linked list
*/
linked_list_t *list;
-
+
/**
* access to linked_list is locked through this mutex
*/
@@ -66,7 +73,7 @@ struct private_processor_t {
* Condvar to wait for new jobs
*/
condvar_t *job_added;
-
+
/**
* Condvar to wait for terminated threads
*/
@@ -80,17 +87,23 @@ static void process_jobs(private_processor_t *this);
*/
static void restart(private_processor_t *this)
{
- pthread_t thread;
-
+ thread_t *thread;
+
+ DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id());
+
/* respawn thread if required */
- if (this->desired_threads == 0 ||
- pthread_create(&thread, NULL, (void*)process_jobs, this) != 0)
+ this->mutex->lock(this->mutex);
+ if (this->desired_threads < this->total_threads ||
+ (thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
{
- this->mutex->lock(this->mutex);
this->total_threads--;
- this->thread_terminated->broadcast(this->thread_terminated);
- this->mutex->unlock(this->mutex);
+ this->thread_terminated->signal(this->thread_terminated);
}
+ else
+ {
+ this->threads->insert_last(this->threads, thread);
+ }
+ this->mutex->unlock(this->mutex);
}
/**
@@ -98,17 +111,16 @@ static void restart(private_processor_t *this)
*/
static void process_jobs(private_processor_t *this)
{
- int oldstate;
-
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
-
- DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self());
-
+ /* worker threads are not cancellable by default */
+ thread_cancelability(FALSE);
+
+ DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id());
+
this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
{
job_t *job;
-
+
if (this->list->get_count(this->list) == 0)
{
this->idle_threads++;
@@ -119,14 +131,13 @@ static void process_jobs(private_processor_t *this)
this->list->remove_first(this->list, (void**)&job);
this->mutex->unlock(this->mutex);
/* terminated threads are restarted, so we have a constant pool */
- pthread_cleanup_push((void*)restart, this);
+ thread_cleanup_push((thread_cleanup_t)restart, this);
job->execute(job);
- pthread_cleanup_pop(0);
+ thread_cleanup_pop(FALSE);
this->mutex->lock(this->mutex);
}
- this->total_threads--;
- this->thread_terminated->signal(this->thread_terminated);
this->mutex->unlock(this->mutex);
+ restart(this);
}
/**
@@ -136,7 +147,7 @@ static u_int get_total_threads(private_processor_t *this)
{
u_int count;
this->mutex->lock(this->mutex);
- count = this->total_threads;
+ count = this->total_threads;
this->mutex->unlock(this->mutex);
return count;
}
@@ -175,7 +186,7 @@ static void queue_job(private_processor_t *this, job_t *job)
this->job_added->signal(this->job_added);
this->mutex->unlock(this->mutex);
}
-
+
/**
* Implementation of processor_t.set_threads.
*/
@@ -185,14 +196,16 @@ static void set_threads(private_processor_t *this, u_int count)
if (count > this->total_threads)
{ /* increase thread count */
int i;
- pthread_t current;
-
+ thread_t *current;
+
this->desired_threads = count;
DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
for (i = this->total_threads; i < count; i++)
{
- if (pthread_create(&current, NULL, (void*)process_jobs, this) == 0)
+ current = thread_create((thread_main_t)process_jobs, this);
+ if (current)
{
+ this->threads->insert_last(this->threads, current);
this->total_threads++;
}
}
@@ -210,6 +223,7 @@ static void set_threads(private_processor_t *this, u_int count)
*/
static void destroy(private_processor_t *this)
{
+ thread_t *current;
set_threads(this, 0);
this->mutex->lock(this->mutex);
while (this->total_threads > 0)
@@ -217,11 +231,17 @@ static void destroy(private_processor_t *this)
this->job_added->broadcast(this->job_added);
this->thread_terminated->wait(this->thread_terminated, this->mutex);
}
+ while (this->threads->remove_first(this->threads,
+ (void**)&current) == SUCCESS)
+ {
+ current->join(current);
+ }
this->mutex->unlock(this->mutex);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
this->list->destroy_offset(this->list, offsetof(job_t, destroy));
+ this->threads->destroy(this->threads);
free(this);
}
@@ -231,22 +251,23 @@ static void destroy(private_processor_t *this)
processor_t *processor_create(size_t pool_size)
{
private_processor_t *this = malloc_thing(private_processor_t);
-
+
this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads;
this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads;
this->public.get_job_load = (u_int(*)(processor_t*))get_job_load;
this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job;
this->public.set_threads = (void(*)(processor_t*, u_int))set_threads;
this->public.destroy = (void(*)(processor_t*))destroy;
-
+
this->list = linked_list_create();
+ this->threads = linked_list_create();
this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT);
this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT);
this->total_threads = 0;
this->desired_threads = 0;
this->idle_threads = 0;
-
+
return &this->public;
}