diff options
Diffstat (limited to 'src/libstrongswan/processing')
| -rw-r--r-- | src/libstrongswan/processing/jobs/callback_job.c | 188 | ||||
| -rw-r--r-- | src/libstrongswan/processing/jobs/callback_job.h | 66 | ||||
| -rw-r--r-- | src/libstrongswan/processing/jobs/job.h | 107 | ||||
| -rw-r--r-- | src/libstrongswan/processing/processor.c | 346 | ||||
| -rw-r--r-- | src/libstrongswan/processing/processor.h | 24 | ||||
| -rw-r--r-- | src/libstrongswan/processing/scheduler.c | 16 | ||||
| -rw-r--r-- | src/libstrongswan/processing/watcher.c | 462 | ||||
| -rw-r--r-- | src/libstrongswan/processing/watcher.h | 101 |
8 files changed, 994 insertions, 316 deletions
diff --git a/src/libstrongswan/processing/jobs/callback_job.c b/src/libstrongswan/processing/jobs/callback_job.c index 13f22e69c..8258ccb33 100644 --- a/src/libstrongswan/processing/jobs/callback_job.c +++ b/src/libstrongswan/processing/jobs/callback_job.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2009 Tobias Brunner + * Copyright (C) 2009-2012 Tobias Brunner * Copyright (C) 2007-2011 Martin Willi * Copyright (C) 2011 revosec AG * Hochschule fuer Technik Rapperswil @@ -17,12 +17,11 @@ #include "callback_job.h" -#include <semaphore.h> - #include <threading/thread.h> #include <threading/condvar.h> +#include <threading/semaphore.h> #include <threading/mutex.h> -#include <utils/linked_list.h> +#include <collections/linked_list.h> typedef struct private_callback_job_t private_callback_job_t; @@ -52,42 +51,9 @@ struct private_callback_job_t { callback_job_cleanup_t cleanup; /** - * thread of the job, if running - */ - thread_t *thread; - - /** - * mutex to access jobs interna - */ - mutex_t *mutex; - - /** - * list of associated child jobs - */ - linked_list_t *children; - - /** - * parent of this job, or NULL + * cancel function */ - private_callback_job_t *parent; - - /** - * TRUE if the job got cancelled - */ - bool cancelled; - - /** - * condvar to synchronize the cancellation/destruction of the job - */ - condvar_t *destroyable; - - /** - * semaphore to synchronize the termination of the assigned thread. - * - * separately allocated during cancellation, so that we can wait on it - * without risking that it gets freed too early during destruction. - */ - sem_t *terminated; + callback_job_cancel_t cancel; /** * Priority of this job @@ -95,141 +61,26 @@ struct private_callback_job_t { job_priority_t prio; }; -/** - * unregister a child from its parent, if any. - * note: this->mutex has to be locked - */ -static void unregister(private_callback_job_t *this) -{ - if (this->parent) - { - this->parent->mutex->lock(this->parent->mutex); - if (this->parent->cancelled && !this->cancelled) - { - /* if the parent has been cancelled but we have not yet, we do not - * unregister until we got cancelled by the parent. */ - this->parent->mutex->unlock(this->parent->mutex); - this->destroyable->wait(this->destroyable, this->mutex); - this->parent->mutex->lock(this->parent->mutex); - } - this->parent->children->remove(this->parent->children, this, NULL); - this->parent->mutex->unlock(this->parent->mutex); - this->parent = NULL; - } -} - METHOD(job_t, destroy, void, private_callback_job_t *this) { - this->mutex->lock(this->mutex); - unregister(this); if (this->cleanup) { this->cleanup(this->data); } - if (this->terminated) - { - sem_post(this->terminated); - } - this->children->destroy(this->children); - this->destroyable->destroy(this->destroyable); - this->mutex->unlock(this->mutex); - this->mutex->destroy(this->mutex); free(this); } -METHOD(callback_job_t, cancel, void, +METHOD(job_t, execute, job_requeue_t, private_callback_job_t *this) { - callback_job_t *child; - sem_t *terminated = NULL; - - this->mutex->lock(this->mutex); - this->cancelled = TRUE; - /* terminate children */ - while (this->children->get_first(this->children, (void**)&child) == SUCCESS) - { - this->mutex->unlock(this->mutex); - child->cancel(child); - this->mutex->lock(this->mutex); - } - if (this->thread) - { - /* terminate the thread, if there is currently one executing the job. - * we wait for its termination using a semaphore */ - this->thread->cancel(this->thread); - terminated = this->terminated = malloc_thing(sem_t); - sem_init(terminated, 0, 0); - } - else - { - /* if the job is currently queued, it gets terminated later. - * we can't wait, because it might not get executed at all. - * we also unregister the queued job manually from its parent (the - * others get unregistered during destruction) */ - unregister(this); - } - this->destroyable->signal(this->destroyable); - this->mutex->unlock(this->mutex); - - if (terminated) - { - sem_wait(terminated); - sem_destroy(terminated); - free(terminated); - } + return this->callback(this->data); } -METHOD(job_t, execute, void, +METHOD(job_t, cancel, bool, private_callback_job_t *this) { - bool cleanup = FALSE, requeue = FALSE; - - thread_cleanup_push((thread_cleanup_t)destroy, this); - - this->mutex->lock(this->mutex); - this->thread = thread_current(); - this->mutex->unlock(this->mutex); - - while (TRUE) - { - this->mutex->lock(this->mutex); - if (this->cancelled) - { - this->mutex->unlock(this->mutex); - cleanup = TRUE; - break; - } - this->mutex->unlock(this->mutex); - switch (this->callback(this->data)) - { - case JOB_REQUEUE_DIRECT: - continue; - case JOB_REQUEUE_FAIR: - { - requeue = TRUE; - break; - } - case JOB_REQUEUE_NONE: - default: - { - cleanup = TRUE; - break; - } - } - break; - } - this->mutex->lock(this->mutex); - this->thread = NULL; - this->mutex->unlock(this->mutex); - /* manually create a cancellation point to avoid that a cancelled thread - * goes back into the thread pool */ - thread_cancellation_point(); - if (requeue) - { - lib->processor->queue_job(lib->processor, &this->public.job); - } - thread_cleanup_pop(cleanup); + return this->cancel(this->data); } METHOD(job_t, get_priority, job_priority_t, @@ -242,8 +93,8 @@ METHOD(job_t, get_priority, job_priority_t, * Described in header. */ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, - callback_job_cleanup_t cleanup, callback_job_t *parent, - job_priority_t prio) + callback_job_cleanup_t cleanup, callback_job_cancel_t cancel, + job_priority_t prio) { private_callback_job_t *this; @@ -254,24 +105,17 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, .get_priority = _get_priority, .destroy = _destroy, }, - .cancel = _cancel, }, - .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .callback = cb, .data = data, .cleanup = cleanup, - .children = linked_list_create(), - .parent = (private_callback_job_t*)parent, - .destroyable = condvar_create(CONDVAR_TYPE_DEFAULT), + .cancel = cancel, .prio = prio, ); - /* register us at parent */ - if (parent) + if (cancel) { - this->parent->mutex->lock(this->parent->mutex); - this->parent->children->insert_last(this->parent->children, this); - this->parent->mutex->unlock(this->parent->mutex); + this->public.job.cancel = _cancel; } return &this->public; @@ -282,8 +126,8 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, */ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, callback_job_cleanup_t cleanup, - callback_job_t *parent) + callback_job_cancel_t cancel) { - return callback_job_create_with_prio(cb, data, cleanup, parent, + return callback_job_create_with_prio(cb, data, cleanup, cancel, JOB_PRIO_MEDIUM); } diff --git a/src/libstrongswan/processing/jobs/callback_job.h b/src/libstrongswan/processing/jobs/callback_job.h index 3e92b01c0..6f2e39eb8 100644 --- a/src/libstrongswan/processing/jobs/callback_job.h +++ b/src/libstrongswan/processing/jobs/callback_job.h @@ -1,4 +1,5 @@ /* + * Copyright (C) 2012 Tobias Brunner * Copyright (C) 2007-2011 Martin Willi * Copyright (C) 2011 revosec AG * Hochschule fuer Technik Rapperswil @@ -27,33 +28,6 @@ typedef struct callback_job_t callback_job_t; #include <library.h> #include <processing/jobs/job.h> - -typedef enum job_requeue_t job_requeue_t; - -/** - * Job requeueing policy. - * - * The job requeueing policy defines how a job is handled when the callback - * function returns. - */ -enum job_requeue_t { - - /** - * Do not requeue job, destroy it - */ - JOB_REQUEUE_NONE, - - /** - * Reque the job fairly, meaning it has to requeue as any other job - */ - JOB_REQUEUE_FAIR, - - /** - * Reexecute the job directly, without the need of requeueing it - */ - JOB_REQUEUE_DIRECT, -}; - /** * The callback function to use for the callback job. * @@ -73,11 +47,22 @@ typedef job_requeue_t (*callback_job_cb_t)(void *data); * to supply to the constructor. * * @param data param supplied to job - * @return requeing policy how to requeue the job */ typedef void (*callback_job_cleanup_t)(void *data); /** + * Cancellation function to use for the callback job. + * + * Optional function to be called when a job has to be canceled. + * + * See job_t.cancel() for details on the return value. + * + * @param data param supplied to job + * @return TRUE if canceled, FALSE to explicitly cancel the thread + */ +typedef bool (*callback_job_cancel_t)(void *data); + +/** * Class representing an callback Job. * * This is a special job which allows a simple callback function to @@ -91,14 +76,6 @@ struct callback_job_t { */ job_t job; - /** - * Cancel the job's thread and wait for its termination. - * - * This only works reliably for jobs that always use JOB_REQUEUE_FAIR or - * JOB_REQUEUE_DIRECT, otherwise the job may already be destroyed when - * cancel is called. - */ - void (*cancel)(callback_job_t *this); }; /** @@ -106,19 +83,20 @@ struct callback_job_t { * * The cleanup function is called when the job gets destroyed to destroy * the associated data. - * If parent is not NULL, the specified job gets an association. Whenever - * the parent gets cancelled (or runs out), all of its children are cancelled, - * too. + * + * The cancel function is optional and should only be provided if the callback + * function calls potentially blocking functions and/or always returns + * JOB_REQUEUE_DIRECT. * * @param cb callback to call from the processor * @param data user data to supply to callback * @param cleanup destructor for data on destruction, or NULL - * @param parent parent of this job + * @param cancel function to cancel the job, or NULL * @return callback_job_t object */ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, callback_job_cleanup_t cleanup, - callback_job_t *parent); + callback_job_cancel_t cancel); /** * Creates a callback job, with priority. @@ -128,12 +106,12 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, * @param cb callback to call from the processor * @param data user data to supply to callback * @param cleanup destructor for data on destruction, or NULL - * @param parent parent of this job + * @param cancel function to cancel the job, or NULL * @param prio job priority * @return callback_job_t object */ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, - callback_job_cleanup_t cleanup, callback_job_t *parent, - job_priority_t prio); + callback_job_cleanup_t cleanup, callback_job_cancel_t cancel, + job_priority_t prio); #endif /** CALLBACK_JOB_H_ @}*/ diff --git a/src/libstrongswan/processing/jobs/job.h b/src/libstrongswan/processing/jobs/job.h index d25cee03e..64454718a 100644 --- a/src/libstrongswan/processing/jobs/job.h +++ b/src/libstrongswan/processing/jobs/job.h @@ -1,4 +1,5 @@ /* + * Copyright (C) 2012 Tobias Brunner * Copyright (C) 2005-2006 Martin Willi * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil @@ -24,6 +25,9 @@ typedef struct job_t job_t; typedef enum job_priority_t job_priority_t; +typedef enum job_status_t job_status_t; +typedef enum job_requeue_type_t job_requeue_type_t; +typedef struct job_requeue_t job_requeue_t; #include <library.h> @@ -48,18 +52,107 @@ enum job_priority_t { extern enum_name_t *job_priority_names; /** + * Job status + */ +enum job_status_t { + /** The job is queued and has not yet been executed */ + JOB_STATUS_QUEUED = 0, + /** During execution */ + JOB_STATUS_EXECUTING, + /** If the job got canceled */ + JOB_STATUS_CANCELED, + /** The job was executed successfully */ + JOB_STATUS_DONE, +}; + +/** + * How a job is handled after is has been executed. + */ +enum job_requeue_type_t { + /** Do not requeue job, destroy it */ + JOB_REQUEUE_TYPE_NONE = 0, + /** Requeue the job fairly, i.e. it is inserted at the end of the queue */ + JOB_REQUEUE_TYPE_FAIR, + /** Reexecute the job directly, without the need of requeueing it */ + JOB_REQUEUE_TYPE_DIRECT, + /** Rescheduled the job via scheduler_t */ + JOB_REQUEUE_TYPE_SCHEDULE, +}; + +/** + * Job requeueing policy. + * + * The job requeueing policy defines how a job is handled after it has been + * executed. + */ +struct job_requeue_t { + /** How to handle the job after executing it */ + job_requeue_type_t type; + /** How to reschedule the job, if so */ + enum { + JOB_SCHEDULE, + JOB_SCHEDULE_MS, + JOB_SCHEDULE_TV, + } schedule; + /** Time to reschedule the job */ + union { + u_int32_t rel; + timeval_t abs; + } time; +}; + +/** + * Helper macros to easily define requeueing policies. + */ +#define __JOB_REQUEUE(t) (job_requeue_t){ .type = t } +#define JOB_REQUEUE_NONE __JOB_REQUEUE(JOB_REQUEUE_TYPE_NONE) +#define JOB_REQUEUE_FAIR __JOB_REQUEUE(JOB_REQUEUE_TYPE_FAIR) +#define JOB_REQUEUE_DIRECT __JOB_REQUEUE(JOB_REQUEUE_TYPE_DIRECT) +#define __JOB_RESCHEDULE(t, ...) (job_requeue_t){ .type = JOB_REQUEUE_TYPE_SCHEDULE, .schedule = t, { __VA_ARGS__ } } +#define JOB_RESCHEDULE(s) __JOB_RESCHEDULE(JOB_SCHEDULE, .rel = s) +#define JOB_RESCHEDULE_MS(ms) __JOB_RESCHEDULE(JOB_SCHEDULE_MS, .rel = ms) +#define JOB_RESCHEDULE_TV(tv) __JOB_RESCHEDULE(JOB_SCHEDULE_TV, .abs = tv) + +/** * Job interface as it is stored in the job queue. */ struct job_t { /** + * Status of this job, is modified exclusively by the processor/scheduler + */ + job_status_t status; + + /** * Execute a job. * * The processing facility executes a job using this method. Jobs are - * one-shot, they destroy themself after execution, so don't use a job - * once it has been executed. + * one-shot, they are destroyed after execution (depending on the return + * value here), so don't use a job once it has been queued. + * + * @return policy how to requeue the job + */ + job_requeue_t (*execute) (job_t *this); + + /** + * Cancel a job. + * + * Implementing this method is optional. It allows potentially blocking + * jobs to be canceled during shutdown. + * + * If no special action is to be taken simply return FALSE then the thread + * executing the job will be canceled. If TRUE is returned the job is + * expected to return from execute() itself (i.e. the thread won't be + * canceled explicitly and can still be joined later). + * Jobs that return FALSE have to make sure they provide the appropriate + * cancellation points. + * + * @note Regular jobs that do not block MUST NOT implement this method. + * @note This method could be called even before execute() has been called. + * + * @return FALSE to cancel the thread, TRUE if canceled otherwise */ - void (*execute) (job_t *this); + bool (*cancel)(job_t *this); /** * Get the priority of a job. @@ -71,10 +164,12 @@ struct job_t { /** * Destroy a job. * - * Is only called whenever a job was not executed (e.g. due daemon shutdown). - * After execution, jobs destroy themself. + * Is called after a job is executed or got canceled. It is also called + * for queued jobs that were never executed. + * + * Use the status of a job to decide what to do during destruction. */ - void (*destroy) (job_t *this); + void (*destroy)(job_t *this); }; #endif /** JOB_H_ @}*/ diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index 222f1a535..adbd95685 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -1,7 +1,7 @@ /* * Copyright (C) 2005-2011 Martin Willi * Copyright (C) 2011 revosec AG - * Copyright (C) 2008-2011 Tobias Brunner + * Copyright (C) 2008-2013 Tobias Brunner * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil * @@ -22,12 +22,12 @@ #include "processor.h" -#include <debug.h> +#include <utils/debug.h> #include <threading/thread.h> #include <threading/condvar.h> #include <threading/mutex.h> #include <threading/thread_value.h> -#include <utils/linked_list.h> +#include <collections/linked_list.h> typedef struct private_processor_t private_processor_t; @@ -58,7 +58,7 @@ struct private_processor_t { /** * All threads managed in the pool (including threads that have been - * cancelled, this allows to join them during destruction) + * canceled, this allows to join them later), as worker_thread_t */ linked_list_t *threads; @@ -73,11 +73,6 @@ struct private_processor_t { int prio_threads[JOB_PRIO_MAX]; /** - * Priority of the job executed by a thread - */ - thread_value_t *priority; - - /** * access to job lists is locked through this mutex */ mutex_t *mutex; @@ -93,39 +88,79 @@ struct private_processor_t { condvar_t *thread_terminated; }; -static void process_jobs(private_processor_t *this); +/** + * Worker thread + */ +typedef struct { + + /** + * Reference to the processor + */ + private_processor_t *processor; + + /** + * The actual thread + */ + thread_t *thread; + + /** + * Job currently being executed by this worker thread + */ + job_t *job; + + /** + * Priority of the current job + */ + job_priority_t priority; + +} worker_thread_t; + +static void process_jobs(worker_thread_t *worker); /** * restart a terminated thread */ -static void restart(private_processor_t *this) +static void restart(worker_thread_t *worker) { - thread_t *thread; + private_processor_t *this = worker->processor; + job_t *job; DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id()); - /* respawn thread if required */ this->mutex->lock(this->mutex); - if (this->desired_threads < this->total_threads || - (thread = thread_create((thread_main_t)process_jobs, this)) == NULL) - { - this->total_threads--; - this->thread_terminated->signal(this->thread_terminated); - } - else - { - this->threads->insert_last(this->threads, thread); - } + /* cleanup worker thread */ + this->working_threads[worker->priority]--; + worker->job->status = JOB_STATUS_CANCELED; + job = worker->job; + /* unset the job before releasing the mutex, otherwise cancel() might + * interfere */ + worker->job = NULL; + /* release mutex to avoid deadlocks if the same lock is required + * during queue_job() and in the destructor called here */ this->mutex->unlock(this->mutex); -} - -/** - * Decrement working thread count of a priority class - */ -static void decrement_working_threads(private_processor_t *this) -{ + job->destroy(job); this->mutex->lock(this->mutex); - this->working_threads[(intptr_t)this->priority->get(this->priority)]--; + + /* respawn thread if required */ + if (this->desired_threads >= this->total_threads) + { + worker_thread_t *new_worker; + + INIT(new_worker, + .processor = this, + ); + new_worker->thread = thread_create((thread_main_t)process_jobs, + new_worker); + if (new_worker->thread) + { + this->threads->insert_last(this->threads, new_worker); + this->mutex->unlock(this->mutex); + return; + } + free(new_worker); + } + this->total_threads--; + this->thread_terminated->signal(this->thread_terminated); this->mutex->unlock(this->mutex); } @@ -145,11 +180,135 @@ static u_int get_idle_threads_nolock(private_processor_t *this) } /** + * Get a job from any job queue, starting with the highest priority. + * + * this->mutex is expected to be locked. + */ +static bool get_job(private_processor_t *this, worker_thread_t *worker) +{ + int i, reserved = 0, idle; + + idle = get_idle_threads_nolock(this); + + for (i = 0; i < JOB_PRIO_MAX; i++) + { + if (reserved && reserved >= idle) + { + DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " + "but %d reserved for higher priorities", + job_priority_names, i, idle, reserved); + /* wait until a job of higher priority gets queued */ + return FALSE; + } + if (this->working_threads[i] < this->prio_threads[i]) + { + reserved += this->prio_threads[i] - this->working_threads[i]; + } + if (this->jobs[i]->remove_first(this->jobs[i], + (void**)&worker->job) == SUCCESS) + { + worker->priority = i; + return TRUE; + } + } + return FALSE; +} + +/** + * Process a single job (provided in worker->job, worker->priority is also + * expected to be set) + * + * this->mutex is expected to be locked. + */ +static void process_job(private_processor_t *this, worker_thread_t *worker) +{ + job_t *to_destroy = NULL; + job_requeue_t requeue; + + this->working_threads[worker->priority]++; + worker->job->status = JOB_STATUS_EXECUTING; + this->mutex->unlock(this->mutex); + /* canceled threads are restarted to get a constant pool */ + thread_cleanup_push((thread_cleanup_t)restart, worker); + while (TRUE) + { + requeue = worker->job->execute(worker->job); + if (requeue.type != JOB_REQUEUE_TYPE_DIRECT) + { + break; + } + else if (!worker->job->cancel) + { /* only allow cancelable jobs to requeue directly */ + requeue.type = JOB_REQUEUE_TYPE_FAIR; + break; + } + } + thread_cleanup_pop(FALSE); + this->mutex->lock(this->mutex); + this->working_threads[worker->priority]--; + if (worker->job->status == JOB_STATUS_CANCELED) + { /* job was canceled via a custom cancel() method or did not + * use JOB_REQUEUE_TYPE_DIRECT */ + to_destroy = worker->job; + } + else + { + switch (requeue.type) + { + case JOB_REQUEUE_TYPE_NONE: + worker->job->status = JOB_STATUS_DONE; + to_destroy = worker->job; + break; + case JOB_REQUEUE_TYPE_FAIR: + worker->job->status = JOB_STATUS_QUEUED; + this->jobs[worker->priority]->insert_last( + this->jobs[worker->priority], worker->job); + this->job_added->signal(this->job_added); + break; + case JOB_REQUEUE_TYPE_SCHEDULE: + /* scheduler_t does not hold its lock when queuing jobs + * so this should be safe without unlocking our mutex */ + switch (requeue.schedule) + { + case JOB_SCHEDULE: + lib->scheduler->schedule_job(lib->scheduler, + worker->job, requeue.time.rel); + break; + case JOB_SCHEDULE_MS: + lib->scheduler->schedule_job_ms(lib->scheduler, + worker->job, requeue.time.rel); + break; + case JOB_SCHEDULE_TV: + lib->scheduler->schedule_job_tv(lib->scheduler, + worker->job, requeue.time.abs); + break; + } + break; + default: + break; + } + } + /* unset the current job to avoid interference with cancel() when + * destroying the job below */ + worker->job = NULL; + + if (to_destroy) + { /* release mutex to avoid deadlocks if the same lock is required + * during queue_job() and in the destructor called here */ + this->mutex->unlock(this->mutex); + to_destroy->destroy(to_destroy); + this->mutex->lock(this->mutex); + } +} + +/** * Process queued jobs, called by the worker threads */ -static void process_jobs(private_processor_t *this) +static void process_jobs(worker_thread_t *worker) { - /* worker threads are not cancellable by default */ + private_processor_t *this = worker->processor; + + /* worker threads are not cancelable by default */ thread_cancelability(FALSE); DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id()); @@ -157,43 +316,11 @@ static void process_jobs(private_processor_t *this) this->mutex->lock(this->mutex); while (this->desired_threads >= this->total_threads) { - job_t *job = NULL; - int i, reserved = 0, idle; - - idle = get_idle_threads_nolock(this); - - for (i = 0; i < JOB_PRIO_MAX; i++) + if (get_job(this, worker)) { - if (reserved && reserved >= idle) - { - DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " - "but %d reserved for higher priorities", - job_priority_names, i, idle, reserved); - break; - } - if (this->working_threads[i] < this->prio_threads[i]) - { - reserved += this->prio_threads[i] - this->working_threads[i]; - } - if (this->jobs[i]->remove_first(this->jobs[i], - (void**)&job) == SUCCESS) - { - this->working_threads[i]++; - this->mutex->unlock(this->mutex); - this->priority->set(this->priority, (void*)(intptr_t)i); - /* terminated threads are restarted to get a constant pool */ - thread_cleanup_push((thread_cleanup_t)restart, this); - thread_cleanup_push((thread_cleanup_t)decrement_working_threads, - this); - job->execute(job); - thread_cleanup_pop(FALSE); - thread_cleanup_pop(FALSE); - this->mutex->lock(this->mutex); - this->working_threads[i]--; - break; - } + process_job(this, worker); } - if (!job) + else { this->job_added->wait(this->job_added, this->mutex); } @@ -266,31 +393,65 @@ METHOD(processor_t, queue_job, void, job_priority_t prio; prio = sane_prio(job->get_priority(job)); + job->status = JOB_STATUS_QUEUED; + this->mutex->lock(this->mutex); this->jobs[prio]->insert_last(this->jobs[prio], job); this->job_added->signal(this->job_added); this->mutex->unlock(this->mutex); } +METHOD(processor_t, execute_job, void, + private_processor_t *this, job_t *job) +{ + job_priority_t prio; + bool queued = FALSE; + + this->mutex->lock(this->mutex); + if (this->desired_threads && get_idle_threads_nolock(this)) + { + prio = sane_prio(job->get_priority(job)); + job->status = JOB_STATUS_QUEUED; + /* insert job in front to execute it immediately */ + this->jobs[prio]->insert_first(this->jobs[prio], job); + queued = TRUE; + } + this->job_added->signal(this->job_added); + this->mutex->unlock(this->mutex); + + if (!queued) + { + job->execute(job); + job->destroy(job); + } +} + METHOD(processor_t, set_threads, void, private_processor_t *this, u_int count) { this->mutex->lock(this->mutex); if (count > this->total_threads) { /* increase thread count */ + worker_thread_t *worker; int i; - thread_t *current; this->desired_threads = count; DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads); for (i = this->total_threads; i < count; i++) { - current = thread_create((thread_main_t)process_jobs, this); - if (current) + INIT(worker, + .processor = this, + ); + worker->thread = thread_create((thread_main_t)process_jobs, worker); + if (worker->thread) { - this->threads->insert_last(this->threads, current); + this->threads->insert_last(this->threads, worker); this->total_threads++; } + else + { + free(worker); + } } } else if (count < this->total_threads) @@ -301,26 +462,49 @@ METHOD(processor_t, set_threads, void, this->mutex->unlock(this->mutex); } -METHOD(processor_t, destroy, void, +METHOD(processor_t, cancel, void, private_processor_t *this) { - thread_t *current; - int i; + enumerator_t *enumerator; + worker_thread_t *worker; - set_threads(this, 0); this->mutex->lock(this->mutex); + this->desired_threads = 0; + /* cancel potentially blocking jobs */ + enumerator = this->threads->create_enumerator(this->threads); + while (enumerator->enumerate(enumerator, (void**)&worker)) + { + if (worker->job && worker->job->cancel) + { + worker->job->status = JOB_STATUS_CANCELED; + if (!worker->job->cancel(worker->job)) + { /* job requests to be canceled explicitly, otherwise we assume + * the thread terminates itself and can be joined */ + worker->thread->cancel(worker->thread); + } + } + } + enumerator->destroy(enumerator); while (this->total_threads > 0) { this->job_added->broadcast(this->job_added); this->thread_terminated->wait(this->thread_terminated, this->mutex); } while (this->threads->remove_first(this->threads, - (void**)¤t) == SUCCESS) + (void**)&worker) == SUCCESS) { - current->join(current); + worker->thread->join(worker->thread); + free(worker); } this->mutex->unlock(this->mutex); - this->priority->destroy(this->priority); +} + +METHOD(processor_t, destroy, void, + private_processor_t *this) +{ + int i; + + cancel(this); this->thread_terminated->destroy(this->thread_terminated); this->job_added->destroy(this->job_added); this->mutex->destroy(this->mutex); @@ -347,11 +531,12 @@ processor_t *processor_create() .get_working_threads = _get_working_threads, .get_job_load = _get_job_load, .queue_job = _queue_job, + .execute_job = _execute_job, .set_threads = _set_threads, + .cancel = _cancel, .destroy = _destroy, }, .threads = linked_list_create(), - .priority = thread_value_create(NULL), .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .job_added = condvar_create(CONDVAR_TYPE_DEFAULT), .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT), @@ -366,4 +551,3 @@ processor_t *processor_create() return &this->public; } - diff --git a/src/libstrongswan/processing/processor.h b/src/libstrongswan/processing/processor.h index 5db42c04c..f96530e54 100644 --- a/src/libstrongswan/processing/processor.h +++ b/src/libstrongswan/processing/processor.h @@ -1,4 +1,5 @@ /* + * Copyright (C) 2012 Tobias Brunner * Copyright (C) 2005-2007 Martin Willi * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil @@ -51,7 +52,7 @@ struct processor_t { /** * Get the number of threads currently working, per priority class. * - * @param prioritiy to check + * @param priority to check * @return number of threads in priority working */ u_int (*get_working_threads)(processor_t *this, job_priority_t prio); @@ -74,18 +75,35 @@ struct processor_t { void (*queue_job) (processor_t *this, job_t *job); /** + * Directly execute a job with an idle worker thread. + * + * If no idle thread is available, the job gets executed by the calling + * thread. + * + * @param job job, gets destroyed + */ + void (*execute_job)(processor_t *this, job_t *job); + + /** * Set the number of threads to use in the processor. * * If the number of threads is smaller than number of currently running * threads, thread count is decreased. Use 0 to disable the processor. - * This call blocks if it decreases thread count until threads have - * terminated, so make sure there are not too many blocking jobs. + * + * This call does not block and wait for threads to terminate if the number + * of threads is reduced. Instead use cancel() for that during shutdown. * * @param count number of threads to allocate */ void (*set_threads)(processor_t *this, u_int count); /** + * Sets the number of threads to 0 and cancels all blocking jobs, then waits + * for all threads to be terminated. + */ + void (*cancel)(processor_t *this); + + /** * Destroy a processor object. */ void (*destroy) (processor_t *processor); diff --git a/src/libstrongswan/processing/scheduler.c b/src/libstrongswan/processing/scheduler.c index f3cc1164a..3f1598fc4 100644 --- a/src/libstrongswan/processing/scheduler.c +++ b/src/libstrongswan/processing/scheduler.c @@ -19,7 +19,7 @@ #include "scheduler.h" -#include <debug.h> +#include <utils/debug.h> #include <processing/processor.h> #include <processing/jobs/callback_job.h> #include <threading/thread.h> @@ -68,11 +68,6 @@ struct private_scheduler_t { scheduler_t public; /** - * Job which queues scheduled jobs to the processor. - */ - callback_job_t *job; - - /** * The heap in which the events are stored. */ event_t **heap; @@ -250,6 +245,7 @@ METHOD(scheduler_t, schedule_job_tv, void, event = malloc_thing(event_t); event->job = job; + event->job->status = JOB_STATUS_QUEUED; event->time = tv; this->mutex->lock(this->mutex); @@ -308,7 +304,6 @@ METHOD(scheduler_t, destroy, void, private_scheduler_t *this) { event_t *event; - this->job->cancel(this->job); this->condvar->destroy(this->condvar); this->mutex->destroy(this->mutex); while ((event = remove_event(this)) != NULL) @@ -325,6 +320,7 @@ METHOD(scheduler_t, destroy, void, scheduler_t * scheduler_create() { private_scheduler_t *this; + callback_job_t *job; INIT(this, .public = { @@ -341,9 +337,9 @@ scheduler_t * scheduler_create() this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*)); - this->job = callback_job_create_with_prio((callback_job_cb_t)schedule, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + job = callback_job_create_with_prio((callback_job_cb_t)schedule, this, + NULL, return_false, JOB_PRIO_CRITICAL); + lib->processor->queue_job(lib->processor, (job_t*)job); return &this->public; } diff --git a/src/libstrongswan/processing/watcher.c b/src/libstrongswan/processing/watcher.c new file mode 100644 index 000000000..3009be608 --- /dev/null +++ b/src/libstrongswan/processing/watcher.c @@ -0,0 +1,462 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "watcher.h" + +#include <library.h> +#include <threading/thread.h> +#include <threading/mutex.h> +#include <threading/condvar.h> +#include <collections/linked_list.h> +#include <processing/jobs/callback_job.h> + +#include <unistd.h> +#include <errno.h> +#include <sys/select.h> +#include <fcntl.h> + +typedef struct private_watcher_t private_watcher_t; + +/** + * Private data of an watcher_t object. + */ +struct private_watcher_t { + + /** + * Public watcher_t interface. + */ + watcher_t public; + + /** + * List of registered FDs, as entry_t + */ + linked_list_t *fds; + + /** + * Lock to access FD list + */ + mutex_t *mutex; + + /** + * Condvar to signal completion of callback + */ + condvar_t *condvar; + + /** + * Notification pipe to signal watcher thread + */ + int notify[2]; + + /** + * List of callback jobs to process by watcher thread, as job_t + */ + linked_list_t *jobs; +}; + +/** + * Entry for a registered file descriptor + */ +typedef struct { + /** file descriptor */ + int fd; + /** events to watch */ + watcher_event_t events; + /** registered callback function */ + watcher_cb_t cb; + /** user data to pass to callback */ + void *data; + /** callback(s) currently active? */ + int in_callback; +} entry_t; + +/** + * Data we pass on for an async notification + */ +typedef struct { + /** file descriptor */ + int fd; + /** event type */ + watcher_event_t event; + /** registered callback function */ + watcher_cb_t cb; + /** user data to pass to callback */ + void *data; + /** keep registered? */ + bool keep; + /** reference to watcher */ + private_watcher_t *this; +} notify_data_t; + +/** + * Notify watcher thread about changes + */ +static void update(private_watcher_t *this) +{ + char buf[1] = { 'u' }; + + if (this->notify[1] != -1) + { + ignore_result(write(this->notify[1], buf, sizeof(buf))); + } +} + +/** + * Cleanup function if callback gets cancelled + */ +static void unregister(notify_data_t *data) +{ + /* if a thread processing a callback gets cancelled, we mark the entry + * as cancelled, like the callback would return FALSE. This is required + * to not queue this watcher again if all threads have been gone. */ + data->keep = FALSE; +} + + /** + * Execute callback of registered FD, asynchronous + */ +static job_requeue_t notify_async(notify_data_t *data) +{ + thread_cleanup_push((void*)unregister, data); + data->keep = data->cb(data->data, data->fd, data->event); + thread_cleanup_pop(FALSE); + return JOB_REQUEUE_NONE; +} + +/** + * Clean up notification data, reactivate FD + */ +static void notify_end(notify_data_t *data) +{ + private_watcher_t *this = data->this; + enumerator_t *enumerator; + entry_t *entry; + + /* reactivate the disabled entry */ + this->mutex->lock(this->mutex); + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (entry->fd == data->fd) + { + if (!data->keep) + { + entry->events &= ~data->event; + if (!entry->events) + { + this->fds->remove_at(this->fds, enumerator); + free(entry); + break; + } + } + entry->in_callback--; + break; + } + } + enumerator->destroy(enumerator); + + update(this); + this->condvar->broadcast(this->condvar); + this->mutex->unlock(this->mutex); + + free(data); +} + +/** + * Execute the callback for a registered FD + */ +static void notify(private_watcher_t *this, entry_t *entry, + watcher_event_t event) +{ + notify_data_t *data; + + /* get a copy of entry for async job, but with specific event */ + INIT(data, + .fd = entry->fd, + .event = event, + .cb = entry->cb, + .data = entry->data, + .keep = TRUE, + .this = this, + ); + + /* deactivate entry, so we can select() other FDs even if the async + * processing did not handle the event yet */ + entry->in_callback++; + + this->jobs->insert_last(this->jobs, + callback_job_create_with_prio((void*)notify_async, data, + (void*)notify_end, (callback_job_cancel_t)return_false, + JOB_PRIO_CRITICAL)); +} + +/** + * Thread cancellation function for watcher thread + */ +static void activate_all(private_watcher_t *this) +{ + enumerator_t *enumerator; + entry_t *entry; + + /* When the watcher thread gets cancelled, we have to reactivate any entry + * and signal threads in remove() to go on. */ + + this->mutex->lock(this->mutex); + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + entry->in_callback = 0; + } + enumerator->destroy(enumerator); + this->condvar->broadcast(this->condvar); + this->mutex->unlock(this->mutex); +} + +/** + * Dispatching function + */ +static job_requeue_t watch(private_watcher_t *this) +{ + enumerator_t *enumerator; + entry_t *entry; + fd_set rd, wr, ex; + int maxfd = 0, res; + + FD_ZERO(&rd); + FD_ZERO(&wr); + FD_ZERO(&ex); + + this->mutex->lock(this->mutex); + if (this->fds->get_count(this->fds) == 0) + { + this->mutex->unlock(this->mutex); + return JOB_REQUEUE_NONE; + } + + if (this->notify[0] != -1) + { + FD_SET(this->notify[0], &rd); + maxfd = this->notify[0]; + } + + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (!entry->in_callback) + { + if (entry->events & WATCHER_READ) + { + DBG3(DBG_JOB, " watching %d for reading", entry->fd); + FD_SET(entry->fd, &rd); + } + if (entry->events & WATCHER_WRITE) + { + DBG3(DBG_JOB, " watching %d for writing", entry->fd); + FD_SET(entry->fd, &wr); + } + if (entry->events & WATCHER_EXCEPT) + { + DBG3(DBG_JOB, " watching %d for exceptions", entry->fd); + FD_SET(entry->fd, &ex); + } + maxfd = max(maxfd, entry->fd); + } + } + enumerator->destroy(enumerator); + this->mutex->unlock(this->mutex); + + while (TRUE) + { + char buf[1]; + bool old; + job_t *job; + + DBG2(DBG_JOB, "watcher going to select()"); + thread_cleanup_push((void*)activate_all, this); + old = thread_cancelability(TRUE); + res = select(maxfd + 1, &rd, &wr, &ex, NULL); + thread_cancelability(old); + thread_cleanup_pop(FALSE); + if (res > 0) + { + if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd)) + { + DBG2(DBG_JOB, "watcher got notification, rebuilding"); + while (read(this->notify[0], buf, sizeof(buf)) > 0); + return JOB_REQUEUE_DIRECT; + } + + this->mutex->lock(this->mutex); + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ)) + { + DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd); + notify(this, entry, WATCHER_READ); + } + if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE)) + { + DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd); + notify(this, entry, WATCHER_WRITE); + } + if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT)) + { + DBG2(DBG_JOB, "watched FD %d has exception", entry->fd); + notify(this, entry, WATCHER_EXCEPT); + } + } + enumerator->destroy(enumerator); + this->mutex->unlock(this->mutex); + + if (this->jobs->get_count(this->jobs)) + { + while (this->jobs->remove_first(this->jobs, + (void**)&job) == SUCCESS) + { + lib->processor->execute_job(lib->processor, job); + } + /* we temporarily disable a notified FD, rebuild FDSET */ + return JOB_REQUEUE_DIRECT; + } + } + else + { + DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno)); + } + } +} + +METHOD(watcher_t, add, void, + private_watcher_t *this, int fd, watcher_event_t events, + watcher_cb_t cb, void *data) +{ + entry_t *entry; + + INIT(entry, + .fd = fd, + .events = events, + .cb = cb, + .data = data, + ); + + this->mutex->lock(this->mutex); + this->fds->insert_last(this->fds, entry); + if (this->fds->get_count(this->fds) == 1) + { + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((void*)watch, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); + } + else + { + update(this); + } + this->mutex->unlock(this->mutex); +} + +METHOD(watcher_t, remove_, void, + private_watcher_t *this, int fd) +{ + enumerator_t *enumerator; + entry_t *entry; + + this->mutex->lock(this->mutex); + while (TRUE) + { + bool is_in_callback = FALSE; + + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (entry->fd == fd) + { + if (entry->in_callback) + { + is_in_callback = TRUE; + break; + } + this->fds->remove_at(this->fds, enumerator); + free(entry); + } + } + enumerator->destroy(enumerator); + if (!is_in_callback) + { + break; + } + this->condvar->wait(this->condvar, this->mutex); + } + + update(this); + this->mutex->unlock(this->mutex); +} + +METHOD(watcher_t, destroy, void, + private_watcher_t *this) +{ + this->mutex->destroy(this->mutex); + this->condvar->destroy(this->condvar); + this->fds->destroy(this->fds); + if (this->notify[0] != -1) + { + close(this->notify[0]); + } + if (this->notify[1] != -1) + { + close(this->notify[1]); + } + this->jobs->destroy(this->jobs); + free(this); +} + +/** + * See header + */ +watcher_t *watcher_create() +{ + private_watcher_t *this; + int flags; + + INIT(this, + .public = { + .add = _add, + .remove = _remove_, + .destroy = _destroy, + }, + .fds = linked_list_create(), + .mutex = mutex_create(MUTEX_TYPE_DEFAULT), + .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), + .jobs = linked_list_create(), + .notify = {-1, -1}, + ); + + if (pipe(this->notify) == 0) + { + /* use non-blocking I/O on read-end of notify pipe */ + flags = fcntl(this->notify[0], F_GETFL); + if (flags == -1 || + fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1) + { + DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking " + "failed: %s", strerror(errno)); + } + } + else + { + DBG1(DBG_LIB, "creating watcher notify pipe failed: %s", + strerror(errno)); + } + return &this->public; +} diff --git a/src/libstrongswan/processing/watcher.h b/src/libstrongswan/processing/watcher.h new file mode 100644 index 000000000..6e158cec2 --- /dev/null +++ b/src/libstrongswan/processing/watcher.h @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +/** + * @defgroup watcher watcher + * @{ @ingroup processor + */ + +#ifndef WATCHER_H_ +#define WATCHER_H_ + +typedef struct watcher_t watcher_t; +typedef enum watcher_event_t watcher_event_t; + +#include <library.h> + +/** + * Callback function to register for file descriptor events. + * + * The callback is executed asynchronously using a thread from the pool. + * Monitoring of fd is temporarily suspended to avoid additional events while + * it is processed asynchronously. To allow concurrent events, one can quickly + * process it (using a read/write) and return from the callback. This will + * re-enable the event, while the data read can be processed in another + * asynchronous job. + * + * On Linux, even if select() marks an FD as "ready", a subsequent read/write + * can block. It is therefore highly recommended to use non-blocking I/O + * and handle EAGAIN/EWOULDBLOCK gracefully. + * + * @param data user data passed during registration + * @param fd file descriptor the event occurred on + * @param event type of event + * @return TRUE to keep watching event, FALSE to unregister fd for event + */ +typedef bool (*watcher_cb_t)(void *data, int fd, watcher_event_t event); + +/** + * What events to watch for a file descriptor. + */ +enum watcher_event_t { + WATCHER_READ = (1<<0), + WATCHER_WRITE = (1<<1), + WATCHER_EXCEPT = (1<<2), +}; + +/** + * Watch multiple file descriptors using select(). + */ +struct watcher_t { + + /** + * Start watching a new file descriptor. + * + * Multiple callbacks can be registered for the same file descriptor, and + * all of them get notified. Such callbacks are executed concurrently. + * + * @param fd file descriptor to start watching + * @param events ORed set of events to watch + * @param cb callback function to invoke on events + * @param data data to pass to cb() + */ + void (*add)(watcher_t *this, int fd, watcher_event_t events, + watcher_cb_t cb, void *data); + + /** + * Stop watching a previously registered file descriptor. + * + * This call blocks until any active callback for this FD returns. All + * callbacks registered for that FD get unregistered. + * + * @param fd file descriptor to stop watching + */ + void (*remove)(watcher_t *this, int fd); + + /** + * Destroy a watcher_t. + */ + void (*destroy)(watcher_t *this); +}; + +/** + * Create a watcher instance. + * + * @return watcher + */ +watcher_t *watcher_create(); + +#endif /** WATCHER_H_ @}*/ |
