diff options
Diffstat (limited to 'src/charon/processing/processor.c')
-rw-r--r-- | src/charon/processing/processor.c | 87 |
1 files changed, 54 insertions, 33 deletions
diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c index 4a3943323..d5774af26 100644 --- a/src/charon/processing/processor.c +++ b/src/charon/processing/processor.c @@ -13,16 +13,17 @@ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. */ - + #include <stdlib.h> -#include <pthread.h> #include <string.h> #include <errno.h> #include "processor.h" #include <daemon.h> -#include <utils/mutex.h> +#include <threading/thread.h> +#include <threading/condvar.h> +#include <threading/mutex.h> #include <utils/linked_list.h> @@ -41,22 +42,28 @@ struct private_processor_t { * Number of running threads */ u_int total_threads; - + /** * Desired number of threads */ u_int desired_threads; - + /** * Number of threads waiting for work */ u_int idle_threads; /** + * All threads managed in the pool (including threads that have been + * cancelled, this allows to join them during destruction) + */ + linked_list_t *threads; + + /** * The jobs are stored in a linked list */ linked_list_t *list; - + /** * access to linked_list is locked through this mutex */ @@ -66,7 +73,7 @@ struct private_processor_t { * Condvar to wait for new jobs */ condvar_t *job_added; - + /** * Condvar to wait for terminated threads */ @@ -80,17 +87,23 @@ static void process_jobs(private_processor_t *this); */ static void restart(private_processor_t *this) { - pthread_t thread; - + thread_t *thread; + + DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id()); + /* respawn thread if required */ - if (this->desired_threads == 0 || - pthread_create(&thread, NULL, (void*)process_jobs, this) != 0) + this->mutex->lock(this->mutex); + if (this->desired_threads < this->total_threads || + (thread = thread_create((thread_main_t)process_jobs, this)) == NULL) { - this->mutex->lock(this->mutex); this->total_threads--; - this->thread_terminated->broadcast(this->thread_terminated); - this->mutex->unlock(this->mutex); + this->thread_terminated->signal(this->thread_terminated); } + else + { + this->threads->insert_last(this->threads, thread); + } + this->mutex->unlock(this->mutex); } /** @@ -98,17 +111,16 @@ static void restart(private_processor_t *this) */ static void process_jobs(private_processor_t *this) { - int oldstate; - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate); - - DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self()); - + /* worker threads are not cancellable by default */ + thread_cancelability(FALSE); + + DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id()); + this->mutex->lock(this->mutex); while (this->desired_threads >= this->total_threads) { job_t *job; - + if (this->list->get_count(this->list) == 0) { this->idle_threads++; @@ -119,14 +131,13 @@ static void process_jobs(private_processor_t *this) this->list->remove_first(this->list, (void**)&job); this->mutex->unlock(this->mutex); /* terminated threads are restarted, so we have a constant pool */ - pthread_cleanup_push((void*)restart, this); + thread_cleanup_push((thread_cleanup_t)restart, this); job->execute(job); - pthread_cleanup_pop(0); + thread_cleanup_pop(FALSE); this->mutex->lock(this->mutex); } - this->total_threads--; - this->thread_terminated->signal(this->thread_terminated); this->mutex->unlock(this->mutex); + restart(this); } /** @@ -136,7 +147,7 @@ static u_int get_total_threads(private_processor_t *this) { u_int count; this->mutex->lock(this->mutex); - count = this->total_threads; + count = this->total_threads; this->mutex->unlock(this->mutex); return count; } @@ -175,7 +186,7 @@ static void queue_job(private_processor_t *this, job_t *job) this->job_added->signal(this->job_added); this->mutex->unlock(this->mutex); } - + /** * Implementation of processor_t.set_threads. */ @@ -185,14 +196,16 @@ static void set_threads(private_processor_t *this, u_int count) if (count > this->total_threads) { /* increase thread count */ int i; - pthread_t current; - + thread_t *current; + this->desired_threads = count; DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads); for (i = this->total_threads; i < count; i++) { - if (pthread_create(¤t, NULL, (void*)process_jobs, this) == 0) + current = thread_create((thread_main_t)process_jobs, this); + if (current) { + this->threads->insert_last(this->threads, current); this->total_threads++; } } @@ -210,6 +223,7 @@ static void set_threads(private_processor_t *this, u_int count) */ static void destroy(private_processor_t *this) { + thread_t *current; set_threads(this, 0); this->mutex->lock(this->mutex); while (this->total_threads > 0) @@ -217,11 +231,17 @@ static void destroy(private_processor_t *this) this->job_added->broadcast(this->job_added); this->thread_terminated->wait(this->thread_terminated, this->mutex); } + while (this->threads->remove_first(this->threads, + (void**)¤t) == SUCCESS) + { + current->join(current); + } this->mutex->unlock(this->mutex); this->thread_terminated->destroy(this->thread_terminated); this->job_added->destroy(this->job_added); this->mutex->destroy(this->mutex); this->list->destroy_offset(this->list, offsetof(job_t, destroy)); + this->threads->destroy(this->threads); free(this); } @@ -231,22 +251,23 @@ static void destroy(private_processor_t *this) processor_t *processor_create(size_t pool_size) { private_processor_t *this = malloc_thing(private_processor_t); - + this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads; this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads; this->public.get_job_load = (u_int(*)(processor_t*))get_job_load; this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job; this->public.set_threads = (void(*)(processor_t*, u_int))set_threads; this->public.destroy = (void(*)(processor_t*))destroy; - + this->list = linked_list_create(); + this->threads = linked_list_create(); this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT); this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT); this->total_threads = 0; this->desired_threads = 0; this->idle_threads = 0; - + return &this->public; } |