diff options
Diffstat (limited to 'src/libstrongswan/processing')
-rw-r--r-- | src/libstrongswan/processing/jobs/callback_job.c | 59 | ||||
-rw-r--r-- | src/libstrongswan/processing/jobs/callback_job.h | 17 | ||||
-rw-r--r-- | src/libstrongswan/processing/processor.c | 80 | ||||
-rw-r--r-- | src/libstrongswan/processing/scheduler.c | 57 |
4 files changed, 99 insertions, 114 deletions
diff --git a/src/libstrongswan/processing/jobs/callback_job.c b/src/libstrongswan/processing/jobs/callback_job.c index 556cbd907..0043a9cdb 100644 --- a/src/libstrongswan/processing/jobs/callback_job.c +++ b/src/libstrongswan/processing/jobs/callback_job.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2009 Tobias Brunner - * Copyright (C) 2007 Martin Willi + * Copyright (C) 2007-2011 Martin Willi + * Copyright (C) 2011 revosec AG * Hochschule fuer Technik Rapperswil * * This program is free software; you can redistribute it and/or modify it @@ -29,6 +30,7 @@ typedef struct private_callback_job_t private_callback_job_t; * Private data of an callback_job_t Object. */ struct private_callback_job_t { + /** * Public callback_job_t interface. */ @@ -111,10 +113,8 @@ static void unregister(private_callback_job_t *this) } } -/** - * Implements job_t.destroy. - */ -static void destroy(private_callback_job_t *this) +METHOD(job_t, destroy, void, + private_callback_job_t *this) { this->mutex->lock(this->mutex); unregister(this); @@ -133,10 +133,8 @@ static void destroy(private_callback_job_t *this) free(this); } -/** - * Implementation of callback_job_t.cancel. - */ -static void cancel(private_callback_job_t *this) +METHOD(callback_job_t, cancel, void, + private_callback_job_t *this) { callback_job_t *child; sem_t *terminated = NULL; @@ -177,10 +175,8 @@ static void cancel(private_callback_job_t *this) } } -/** - * Implementation of job_t.execute. - */ -static void execute(private_callback_job_t *this) +METHOD(job_t, execute, void, + private_callback_job_t *this) { bool cleanup = FALSE, requeue = FALSE; @@ -226,8 +222,7 @@ static void execute(private_callback_job_t *this) thread_cancellation_point(); if (requeue) { - lib->processor->queue_job(lib->processor, - &this->public.job_interface); + lib->processor->queue_job(lib->processor, &this->public.job); } thread_cleanup_pop(cleanup); } @@ -239,24 +234,24 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, callback_job_cleanup_t cleanup, callback_job_t *parent) { - private_callback_job_t *this = malloc_thing(private_callback_job_t); - - /* interface functions */ - this->public.job_interface.execute = (void (*) (job_t *)) execute; - this->public.job_interface.destroy = (void (*) (job_t *)) destroy; - this->public.cancel = (void(*)(callback_job_t*))cancel; + private_callback_job_t *this; - /* private variables */ - this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); - this->callback = cb; - this->data = data; - this->cleanup = cleanup; - this->thread = 0; - this->children = linked_list_create(); - this->parent = (private_callback_job_t*)parent; - this->cancelled = FALSE; - this->destroyable = condvar_create(CONDVAR_TYPE_DEFAULT); - this->terminated = NULL; + INIT(this, + .public = { + .job = { + .execute = _execute, + .destroy = _destroy, + }, + .cancel = _cancel, + }, + .mutex = mutex_create(MUTEX_TYPE_DEFAULT), + .callback = cb, + .data = data, + .cleanup = cleanup, + .children = linked_list_create(), + .parent = (private_callback_job_t*)parent, + .destroyable = condvar_create(CONDVAR_TYPE_DEFAULT), + ); /* register us at parent */ if (parent) diff --git a/src/libstrongswan/processing/jobs/callback_job.h b/src/libstrongswan/processing/jobs/callback_job.h index 62da1edd1..1eb5664d3 100644 --- a/src/libstrongswan/processing/jobs/callback_job.h +++ b/src/libstrongswan/processing/jobs/callback_job.h @@ -1,5 +1,6 @@ /* - * Copyright (C) 2007 Martin Willi + * Copyright (C) 2007-2011 Martin Willi + * Copyright (C) 2011 revosec AG * Hochschule fuer Technik Rapperswil * * This program is free software; you can redistribute it and/or modify it @@ -30,7 +31,7 @@ typedef struct callback_job_t callback_job_t; typedef enum job_requeue_t job_requeue_t; /** - * Job requeueing policy + * Job requeueing policy. * * The job requeueing policy defines how a job is handled when the callback * function returns. @@ -84,15 +85,19 @@ typedef void (*callback_job_cleanup_t)(void *data); * of asynchronous methods, without to manage threads. */ struct callback_job_t { + /** * The job_t interface. */ - job_t job_interface; + job_t job; /** - * Cancel the job's thread and wait for its termination. This only works - * reliably for jobs that always use JOB_REQUEUE_FAIR or JOB_REQUEUE_DIRECT, - * otherwise the job may already be destroyed when cancel is called. */ + * Cancel the job's thread and wait for its termination. + * + * This only works reliably for jobs that always use JOB_REQUEUE_FAIR or + * JOB_REQUEUE_DIRECT, otherwise the job may already be destroyed when + * cancel is called. + */ void (*cancel)(callback_job_t *this); }; diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index 723aec908..de556f86b 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -1,5 +1,6 @@ /* - * Copyright (C) 2005-2007 Martin Willi + * Copyright (C) 2005-2011 Martin Willi + * Copyright (C) 2011 revosec AG * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil * @@ -136,50 +137,46 @@ static void process_jobs(private_processor_t *this) thread_cleanup_pop(FALSE); this->mutex->lock(this->mutex); } + this->total_threads--; + this->thread_terminated->signal(this->thread_terminated); this->mutex->unlock(this->mutex); - restart(this); } -/** - * Implementation of processor_t.get_total_threads. - */ -static u_int get_total_threads(private_processor_t *this) +METHOD(processor_t, get_total_threads, u_int, + private_processor_t *this) { u_int count; + this->mutex->lock(this->mutex); count = this->total_threads; this->mutex->unlock(this->mutex); return count; } -/** - * Implementation of processor_t.get_idle_threads. - */ -static u_int get_idle_threads(private_processor_t *this) +METHOD(processor_t, get_idle_threads, u_int, + private_processor_t *this) { u_int count; + this->mutex->lock(this->mutex); count = this->idle_threads; this->mutex->unlock(this->mutex); return count; } -/** - * implements processor_t.get_job_load - */ -static u_int get_job_load(private_processor_t *this) +METHOD(processor_t, get_job_load, u_int, + private_processor_t *this) { u_int load; + this->mutex->lock(this->mutex); load = this->list->get_count(this->list); this->mutex->unlock(this->mutex); return load; } -/** - * implements function processor_t.queue_job - */ -static void queue_job(private_processor_t *this, job_t *job) +METHOD(processor_t, queue_job, void, + private_processor_t *this, job_t *job) { this->mutex->lock(this->mutex); this->list->insert_last(this->list, job); @@ -187,10 +184,8 @@ static void queue_job(private_processor_t *this, job_t *job) this->mutex->unlock(this->mutex); } -/** - * Implementation of processor_t.set_threads. - */ -static void set_threads(private_processor_t *this, u_int count) +METHOD(processor_t, set_threads, void, + private_processor_t *this, u_int count) { this->mutex->lock(this->mutex); if (count > this->total_threads) @@ -218,12 +213,11 @@ static void set_threads(private_processor_t *this, u_int count) this->mutex->unlock(this->mutex); } -/** - * Implementation of processor_t.destroy. - */ -static void destroy(private_processor_t *this) +METHOD(processor_t, destroy, void, + private_processor_t *this) { thread_t *current; + set_threads(this, 0); this->mutex->lock(this->mutex); while (this->total_threads > 0) @@ -250,23 +244,23 @@ static void destroy(private_processor_t *this) */ processor_t *processor_create() { - private_processor_t *this = malloc_thing(private_processor_t); - - this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads; - this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads; - this->public.get_job_load = (u_int(*)(processor_t*))get_job_load; - this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job; - this->public.set_threads = (void(*)(processor_t*, u_int))set_threads; - this->public.destroy = (void(*)(processor_t*))destroy; - - this->list = linked_list_create(); - this->threads = linked_list_create(); - this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); - this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT); - this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT); - this->total_threads = 0; - this->desired_threads = 0; - this->idle_threads = 0; + private_processor_t *this; + + INIT(this, + .public = { + .get_total_threads = _get_total_threads, + .get_idle_threads = _get_idle_threads, + .get_job_load = _get_job_load, + .queue_job = _queue_job, + .set_threads = _set_threads, + .destroy = _destroy, + }, + .list = linked_list_create(), + .threads = linked_list_create(), + .mutex = mutex_create(MUTEX_TYPE_DEFAULT), + .job_added = condvar_create(CONDVAR_TYPE_DEFAULT), + .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT), + ); return &this->public; } diff --git a/src/libstrongswan/processing/scheduler.c b/src/libstrongswan/processing/scheduler.c index e23f04598..7d9bcd70f 100644 --- a/src/libstrongswan/processing/scheduler.c +++ b/src/libstrongswan/processing/scheduler.c @@ -232,10 +232,8 @@ static job_requeue_t schedule(private_scheduler_t * this) return JOB_REQUEUE_DIRECT; } -/** - * Implements scheduler_t.get_job_load - */ -static u_int get_job_load(private_scheduler_t *this) +METHOD(scheduler_t, get_job_load, u_int, + private_scheduler_t *this) { int count; this->mutex->lock(this->mutex); @@ -244,10 +242,8 @@ static u_int get_job_load(private_scheduler_t *this) return count; } -/** - * Implements scheduler_t.schedule_job_tv. - */ -static void schedule_job_tv(private_scheduler_t *this, job_t *job, timeval_t tv) +METHOD(scheduler_t, schedule_job_tv, void, + private_scheduler_t *this, job_t *job, timeval_t tv) { event_t *event; u_int position; @@ -283,10 +279,8 @@ static void schedule_job_tv(private_scheduler_t *this, job_t *job, timeval_t tv) this->mutex->unlock(this->mutex); } -/** - * Implements scheduler_t.schedule_job. - */ -static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t s) +METHOD(scheduler_t, schedule_job, void, + private_scheduler_t *this, job_t *job, u_int32_t s) { timeval_t tv; @@ -296,10 +290,8 @@ static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t s) schedule_job_tv(this, job, tv); } -/** - * Implements scheduler_t.schedule_job_ms. - */ -static void schedule_job_ms(private_scheduler_t *this, job_t *job, u_int32_t ms) +METHOD(scheduler_t, schedule_job_ms, void, + private_scheduler_t *this, job_t *job, u_int32_t ms) { timeval_t tv, add; @@ -312,10 +304,8 @@ static void schedule_job_ms(private_scheduler_t *this, job_t *job, u_int32_t ms) schedule_job_tv(this, job, tv); } -/** - * Implementation of scheduler_t.destroy. - */ -static void destroy(private_scheduler_t *this) +METHOD(scheduler_t, destroy, void, + private_scheduler_t *this) { event_t *event; this->job->cancel(this->job); @@ -334,22 +324,23 @@ static void destroy(private_scheduler_t *this) */ scheduler_t * scheduler_create() { - private_scheduler_t *this = malloc_thing(private_scheduler_t); - - this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load; - this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t s)) schedule_job; - this->public.schedule_job_ms = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job_ms; - this->public.schedule_job_tv = (void (*) (scheduler_t *this, job_t *job, timeval_t tv)) schedule_job_tv; - this->public.destroy = (void(*)(scheduler_t*)) destroy; + private_scheduler_t *this; + + INIT(this, + .public = { + .get_job_load = _get_job_load, + .schedule_job = _schedule_job, + .schedule_job_ms = _schedule_job_ms, + .schedule_job_tv = _schedule_job_tv, + .destroy = _destroy, + }, + .heap_size = HEAP_SIZE_DEFAULT, + .mutex = mutex_create(MUTEX_TYPE_DEFAULT), + .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), + ); - /* Note: the root of the heap is at index 1 */ - this->event_count = 0; - this->heap_size = HEAP_SIZE_DEFAULT; this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*)); - this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); - this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT); - this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL); lib->processor->queue_job(lib->processor, (job_t*)this->job); |