diff options
author | Rene Mayrhofer <rene@mayrhofer.eu.org> | 2010-11-28 12:11:49 +0000 |
---|---|---|
committer | Rene Mayrhofer <rene@mayrhofer.eu.org> | 2010-11-28 12:11:49 +0000 |
commit | 7b8b352039efd78338a7bf451a0550644ec8a8da (patch) | |
tree | 62e0548df49dfb3ddacc8cac4309fa10f7b42610 /src/libcharon/processing | |
parent | 9587b8e553eda7b1b6fd48c77ebe4592e1e3532a (diff) | |
download | vyos-strongswan-7b8b352039efd78338a7bf451a0550644ec8a8da.tar.gz vyos-strongswan-7b8b352039efd78338a7bf451a0550644ec8a8da.zip |
New upstream version.
Diffstat (limited to 'src/libcharon/processing')
23 files changed, 21 insertions, 1315 deletions
diff --git a/src/libcharon/processing/jobs/acquire_job.h b/src/libcharon/processing/jobs/acquire_job.h index eff79a9b0..2b5bf4805 100644 --- a/src/libcharon/processing/jobs/acquire_job.h +++ b/src/libcharon/processing/jobs/acquire_job.h @@ -15,7 +15,7 @@ /** * @defgroup acquire_job acquire_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef ACQUIRE_JOB_H_ diff --git a/src/libcharon/processing/jobs/callback_job.c b/src/libcharon/processing/jobs/callback_job.c deleted file mode 100644 index 45e49112e..000000000 --- a/src/libcharon/processing/jobs/callback_job.c +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Copyright (C) 2009 Tobias Brunner - * Copyright (C) 2007 Martin Willi - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -#include "callback_job.h" - -#include <semaphore.h> - -#include <daemon.h> -#include <threading/thread.h> -#include <threading/condvar.h> -#include <threading/mutex.h> - -typedef struct private_callback_job_t private_callback_job_t; - -/** - * Private data of an callback_job_t Object. - */ -struct private_callback_job_t { - /** - * Public callback_job_t interface. - */ - callback_job_t public; - - /** - * Callback to call on execution - */ - callback_job_cb_t callback; - - /** - * parameter to supply to callback - */ - void *data; - - /** - * cleanup function for data - */ - callback_job_cleanup_t cleanup; - - /** - * thread of the job, if running - */ - thread_t *thread; - - /** - * mutex to access jobs interna - */ - mutex_t *mutex; - - /** - * list of asociated child jobs - */ - linked_list_t *children; - - /** - * parent of this job, or NULL - */ - private_callback_job_t *parent; - - /** - * TRUE if the job got cancelled - */ - bool cancelled; - - /** - * condvar to synchronize the cancellation/destruction of the job - */ - condvar_t *destroyable; - - /** - * semaphore to synchronize the termination of the assigned thread. - * - * separately allocated during cancellation, so that we can wait on it - * without risking that it gets freed too early during destruction. - */ - sem_t *terminated; -}; - -/** - * unregister a child from its parent, if any. - * note: this->mutex has to be locked - */ -static void unregister(private_callback_job_t *this) -{ - if (this->parent) - { - this->parent->mutex->lock(this->parent->mutex); - if (this->parent->cancelled && !this->cancelled) - { - /* if the parent has been cancelled but we have not yet, we do not - * unregister until we got cancelled by the parent. */ - this->parent->mutex->unlock(this->parent->mutex); - this->destroyable->wait(this->destroyable, this->mutex); - this->parent->mutex->lock(this->parent->mutex); - } - this->parent->children->remove(this->parent->children, this, NULL); - this->parent->mutex->unlock(this->parent->mutex); - this->parent = NULL; - } -} - -/** - * Implements job_t.destroy. - */ -static void destroy(private_callback_job_t *this) -{ - this->mutex->lock(this->mutex); - unregister(this); - if (this->cleanup) - { - this->cleanup(this->data); - } - if (this->terminated) - { - sem_post(this->terminated); - } - this->children->destroy(this->children); - this->destroyable->destroy(this->destroyable); - this->mutex->unlock(this->mutex); - this->mutex->destroy(this->mutex); - free(this); -} - -/** - * Implementation of callback_job_t.cancel. - */ -static void cancel(private_callback_job_t *this) -{ - callback_job_t *child; - sem_t *terminated = NULL; - - this->mutex->lock(this->mutex); - this->cancelled = TRUE; - /* terminate children */ - while (this->children->get_first(this->children, (void**)&child) == SUCCESS) - { - this->mutex->unlock(this->mutex); - child->cancel(child); - this->mutex->lock(this->mutex); - } - if (this->thread) - { - /* terminate the thread, if there is currently one executing the job. - * we wait for its termination using a semaphore */ - this->thread->cancel(this->thread); - terminated = this->terminated = malloc_thing(sem_t); - sem_init(terminated, 0, 0); - } - else - { - /* if the job is currently queued, it gets terminated later. - * we can't wait, because it might not get executed at all. - * we also unregister the queued job manually from its parent (the - * others get unregistered during destruction) */ - unregister(this); - } - this->destroyable->signal(this->destroyable); - this->mutex->unlock(this->mutex); - - if (terminated) - { - sem_wait(terminated); - sem_destroy(terminated); - free(terminated); - } -} - -/** - * Implementation of job_t.execute. - */ -static void execute(private_callback_job_t *this) -{ - bool cleanup = FALSE, requeue = FALSE; - - thread_cleanup_push((thread_cleanup_t)destroy, this); - - this->mutex->lock(this->mutex); - this->thread = thread_current(); - this->mutex->unlock(this->mutex); - - while (TRUE) - { - this->mutex->lock(this->mutex); - if (this->cancelled) - { - this->mutex->unlock(this->mutex); - cleanup = TRUE; - break; - } - this->mutex->unlock(this->mutex); - switch (this->callback(this->data)) - { - case JOB_REQUEUE_DIRECT: - continue; - case JOB_REQUEUE_FAIR: - { - requeue = TRUE; - break; - } - case JOB_REQUEUE_NONE: - default: - { - cleanup = TRUE; - break; - } - } - break; - } - this->mutex->lock(this->mutex); - this->thread = NULL; - this->mutex->unlock(this->mutex); - /* manually create a cancellation point to avoid that a cancelled thread - * goes back into the thread pool */ - thread_cancellation_point(); - if (requeue) - { - charon->processor->queue_job(charon->processor, - &this->public.job_interface); - } - thread_cleanup_pop(cleanup); -} - -/* - * Described in header. - */ -callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, - callback_job_cleanup_t cleanup, - callback_job_t *parent) -{ - private_callback_job_t *this = malloc_thing(private_callback_job_t); - - /* interface functions */ - this->public.job_interface.execute = (void (*) (job_t *)) execute; - this->public.job_interface.destroy = (void (*) (job_t *)) destroy; - this->public.cancel = (void(*)(callback_job_t*))cancel; - - /* private variables */ - this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); - this->callback = cb; - this->data = data; - this->cleanup = cleanup; - this->thread = 0; - this->children = linked_list_create(); - this->parent = (private_callback_job_t*)parent; - this->cancelled = FALSE; - this->destroyable = condvar_create(CONDVAR_TYPE_DEFAULT); - this->terminated = NULL; - - /* register us at parent */ - if (parent) - { - this->parent->mutex->lock(this->parent->mutex); - this->parent->children->insert_last(this->parent->children, this); - this->parent->mutex->unlock(this->parent->mutex); - } - - return &this->public; -} - diff --git a/src/libcharon/processing/jobs/callback_job.h b/src/libcharon/processing/jobs/callback_job.h deleted file mode 100644 index 62da1edd1..000000000 --- a/src/libcharon/processing/jobs/callback_job.h +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (C) 2007 Martin Willi - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -/** - * @defgroup callback_job callback_job - * @{ @ingroup jobs - */ - -#ifndef CALLBACK_JOB_H_ -#define CALLBACK_JOB_H_ - -typedef struct callback_job_t callback_job_t; - -#include <library.h> -#include <processing/jobs/job.h> - - -typedef enum job_requeue_t job_requeue_t; - -/** - * Job requeueing policy - * - * The job requeueing policy defines how a job is handled when the callback - * function returns. - */ -enum job_requeue_t { - - /** - * Do not requeue job, destroy it - */ - JOB_REQUEUE_NONE, - - /** - * Reque the job fairly, meaning it has to requeue as any other job - */ - JOB_REQUEUE_FAIR, - - /** - * Reexecute the job directly, without the need of requeueing it - */ - JOB_REQUEUE_DIRECT, -}; - -/** - * The callback function to use for the callback job. - * - * This is the function to use as callback for a callback job. It receives - * a parameter supplied to the callback jobs constructor. - * - * @param data param supplied to job - * @return requeing policy how to requeue the job - */ -typedef job_requeue_t (*callback_job_cb_t)(void *data); - -/** - * Cleanup function to use for data cleanup. - * - * The callback has an optional user argument which receives data. However, - * this data may be cleaned up if it is allocated. This is the function - * to supply to the constructor. - * - * @param data param supplied to job - * @return requeing policy how to requeue the job - */ -typedef void (*callback_job_cleanup_t)(void *data); - -/** - * Class representing an callback Job. - * - * This is a special job which allows a simple callback function to - * be executed by a thread of the thread pool. This allows simple execution - * of asynchronous methods, without to manage threads. - */ -struct callback_job_t { - /** - * The job_t interface. - */ - job_t job_interface; - - /** - * Cancel the job's thread and wait for its termination. This only works - * reliably for jobs that always use JOB_REQUEUE_FAIR or JOB_REQUEUE_DIRECT, - * otherwise the job may already be destroyed when cancel is called. */ - void (*cancel)(callback_job_t *this); -}; - -/** - * Creates a callback job. - * - * The cleanup function is called when the job gets destroyed to destroy - * the associated data. - * If parent is not NULL, the specified job gets an association. Whenever - * the parent gets cancelled (or runs out), all of its children are cancelled, - * too. - * - * @param cb callback to call from the processor - * @param data user data to supply to callback - * @param cleanup destructor for data on destruction, or NULL - * @param parent parent of this job - * @return callback_job_t object - */ -callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, - callback_job_cleanup_t cleanup, - callback_job_t *parent); - -#endif /** CALLBACK_JOB_H_ @}*/ diff --git a/src/libcharon/processing/jobs/delete_child_sa_job.h b/src/libcharon/processing/jobs/delete_child_sa_job.h index 662a7b7c7..fc0e2b518 100644 --- a/src/libcharon/processing/jobs/delete_child_sa_job.h +++ b/src/libcharon/processing/jobs/delete_child_sa_job.h @@ -15,7 +15,7 @@ /** * @defgroup delete_child_sa_job delete_child_sa_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef DELETE_CHILD_SA_JOB_H_ diff --git a/src/libcharon/processing/jobs/delete_ike_sa_job.h b/src/libcharon/processing/jobs/delete_ike_sa_job.h index f641deea3..ae06b9cfc 100644 --- a/src/libcharon/processing/jobs/delete_ike_sa_job.h +++ b/src/libcharon/processing/jobs/delete_ike_sa_job.h @@ -16,7 +16,7 @@ /** * @defgroup delete_child_sa_job delete_child_sa_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef DELETE_IKE_SA_JOB_H_ diff --git a/src/libcharon/processing/jobs/inactivity_job.c b/src/libcharon/processing/jobs/inactivity_job.c index 13fc5e3d0..1371000eb 100644 --- a/src/libcharon/processing/jobs/inactivity_job.c +++ b/src/libcharon/processing/jobs/inactivity_job.c @@ -87,7 +87,7 @@ METHOD(job_t, execute, void, } else { - charon->scheduler->schedule_job(charon->scheduler, + lib->scheduler->schedule_job(lib->scheduler, &this->public.job_interface, this->timeout - diff); rescheduled = TRUE; } @@ -136,9 +136,11 @@ inactivity_job_t *inactivity_job_create(u_int32_t reqid, u_int32_t timeout, private_inactivity_job_t *this; INIT(this, - .public.job_interface = { - .execute = _execute, - .destroy = _destroy, + .public = { + .job_interface = { + .execute = _execute, + .destroy = _destroy, + }, }, .reqid = reqid, .timeout = timeout, diff --git a/src/libcharon/processing/jobs/inactivity_job.h b/src/libcharon/processing/jobs/inactivity_job.h index 9c9daced8..890f7704b 100644 --- a/src/libcharon/processing/jobs/inactivity_job.h +++ b/src/libcharon/processing/jobs/inactivity_job.h @@ -15,7 +15,7 @@ /** * @defgroup inactivity_job inactivity_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef INACTIVITY_JOB_H_ diff --git a/src/libcharon/processing/jobs/initiate_mediation_job.h b/src/libcharon/processing/jobs/initiate_mediation_job.h index fddb1dd7b..d105de2b9 100644 --- a/src/libcharon/processing/jobs/initiate_mediation_job.h +++ b/src/libcharon/processing/jobs/initiate_mediation_job.h @@ -15,7 +15,7 @@ /** * @defgroup initiate_mediation_job initiate_mediation_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef INITIATE_MEDIATION_JOB_H_ diff --git a/src/libcharon/processing/jobs/job.h b/src/libcharon/processing/jobs/job.h deleted file mode 100644 index 0f1c16ebe..000000000 --- a/src/libcharon/processing/jobs/job.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (C) 2005-2006 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -/** - * @defgroup job job - * @{ @ingroup jobs - */ - -#ifndef JOB_H_ -#define JOB_H_ - -typedef struct job_t job_t; - -#include <library.h> - -/** - * Job-Interface as it is stored in the job queue. - */ -struct job_t { - - /** - * Execute a job. - * - * The processing facility executes a job using this method. Jobs are - * one-shot, they destroy themself after execution, so don't use a job - * once it has been executed. - */ - void (*execute) (job_t *this); - - /** - * Destroy a job. - * - * Is only called whenever a job was not executed (e.g. due daemon shutdown). - * After execution, jobs destroy themself. - */ - void (*destroy) (job_t *job); -}; - -#endif /** JOB_H_ @}*/ diff --git a/src/libcharon/processing/jobs/mediation_job.h b/src/libcharon/processing/jobs/mediation_job.h index 0574c65eb..41485cbc6 100644 --- a/src/libcharon/processing/jobs/mediation_job.h +++ b/src/libcharon/processing/jobs/mediation_job.h @@ -15,7 +15,7 @@ /** * @defgroup mediation_job mediation_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef MEDIATION_JOB_H_ diff --git a/src/libcharon/processing/jobs/migrate_job.h b/src/libcharon/processing/jobs/migrate_job.h index de313d517..09679c734 100644 --- a/src/libcharon/processing/jobs/migrate_job.h +++ b/src/libcharon/processing/jobs/migrate_job.h @@ -15,7 +15,7 @@ /** * @defgroup migrate_job migrate_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef MIGRATE_JOB_H_ diff --git a/src/libcharon/processing/jobs/process_message_job.h b/src/libcharon/processing/jobs/process_message_job.h index 5e3f44d1f..2c42aa577 100644 --- a/src/libcharon/processing/jobs/process_message_job.h +++ b/src/libcharon/processing/jobs/process_message_job.h @@ -16,7 +16,7 @@ /** * @defgroup process_message_job process_message_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef PROCESS_MESSAGE_JOB_H_ diff --git a/src/libcharon/processing/jobs/rekey_child_sa_job.h b/src/libcharon/processing/jobs/rekey_child_sa_job.h index 62887d6b9..fcbe65a06 100644 --- a/src/libcharon/processing/jobs/rekey_child_sa_job.h +++ b/src/libcharon/processing/jobs/rekey_child_sa_job.h @@ -15,7 +15,7 @@ /** * @defgroup rekey_child_sa_job rekey_child_sa_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef REKEY_CHILD_SA_JOB_H_ diff --git a/src/libcharon/processing/jobs/rekey_ike_sa_job.h b/src/libcharon/processing/jobs/rekey_ike_sa_job.h index a5c1028aa..3e3e13d00 100644 --- a/src/libcharon/processing/jobs/rekey_ike_sa_job.h +++ b/src/libcharon/processing/jobs/rekey_ike_sa_job.h @@ -15,7 +15,7 @@ /** * @defgroup rekey_ike_sa_job rekey_ike_sa_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef REKEY_IKE_SA_JOB_H_ diff --git a/src/libcharon/processing/jobs/retransmit_job.h b/src/libcharon/processing/jobs/retransmit_job.h index c8c13479b..c4545534b 100644 --- a/src/libcharon/processing/jobs/retransmit_job.h +++ b/src/libcharon/processing/jobs/retransmit_job.h @@ -16,7 +16,7 @@ /** * @defgroup retransmit_job retransmit_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef RETRANSMIT_JOB_H_ diff --git a/src/libcharon/processing/jobs/roam_job.h b/src/libcharon/processing/jobs/roam_job.h index 55bdf2b28..acfb8bed8 100644 --- a/src/libcharon/processing/jobs/roam_job.h +++ b/src/libcharon/processing/jobs/roam_job.h @@ -15,7 +15,7 @@ /** * @defgroup roam_job roam_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef ROAM_JOB_H_ diff --git a/src/libcharon/processing/jobs/send_dpd_job.h b/src/libcharon/processing/jobs/send_dpd_job.h index 8078a38bc..bd2728b9a 100644 --- a/src/libcharon/processing/jobs/send_dpd_job.h +++ b/src/libcharon/processing/jobs/send_dpd_job.h @@ -15,7 +15,7 @@ /** * @defgroup send_dpd_job send_dpd_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef SEND_DPD_JOB_H_ diff --git a/src/libcharon/processing/jobs/send_keepalive_job.h b/src/libcharon/processing/jobs/send_keepalive_job.h index cda83cd7e..acf6d11aa 100644 --- a/src/libcharon/processing/jobs/send_keepalive_job.h +++ b/src/libcharon/processing/jobs/send_keepalive_job.h @@ -15,7 +15,7 @@ /** * @defgroup send_keepalive_job send_keepalive_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef SEND_KEEPALIVE_JOB_H_ diff --git a/src/libcharon/processing/jobs/update_sa_job.h b/src/libcharon/processing/jobs/update_sa_job.h index 11d1ac9b6..e2344fcc4 100644 --- a/src/libcharon/processing/jobs/update_sa_job.h +++ b/src/libcharon/processing/jobs/update_sa_job.h @@ -15,7 +15,7 @@ /** * @defgroup update_sa_job update_sa_job - * @{ @ingroup jobs + * @{ @ingroup cjobs */ #ifndef UPDATE_SA_JOB_H_ diff --git a/src/libcharon/processing/processor.c b/src/libcharon/processing/processor.c deleted file mode 100644 index d5774af26..000000000 --- a/src/libcharon/processing/processor.c +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Copyright (C) 2005-2007 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -#include <stdlib.h> -#include <string.h> -#include <errno.h> - -#include "processor.h" - -#include <daemon.h> -#include <threading/thread.h> -#include <threading/condvar.h> -#include <threading/mutex.h> -#include <utils/linked_list.h> - - -typedef struct private_processor_t private_processor_t; - -/** - * Private data of processor_t class. - */ -struct private_processor_t { - /** - * Public processor_t interface. - */ - processor_t public; - - /** - * Number of running threads - */ - u_int total_threads; - - /** - * Desired number of threads - */ - u_int desired_threads; - - /** - * Number of threads waiting for work - */ - u_int idle_threads; - - /** - * All threads managed in the pool (including threads that have been - * cancelled, this allows to join them during destruction) - */ - linked_list_t *threads; - - /** - * The jobs are stored in a linked list - */ - linked_list_t *list; - - /** - * access to linked_list is locked through this mutex - */ - mutex_t *mutex; - - /** - * Condvar to wait for new jobs - */ - condvar_t *job_added; - - /** - * Condvar to wait for terminated threads - */ - condvar_t *thread_terminated; -}; - -static void process_jobs(private_processor_t *this); - -/** - * restart a terminated thread - */ -static void restart(private_processor_t *this) -{ - thread_t *thread; - - DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id()); - - /* respawn thread if required */ - this->mutex->lock(this->mutex); - if (this->desired_threads < this->total_threads || - (thread = thread_create((thread_main_t)process_jobs, this)) == NULL) - { - this->total_threads--; - this->thread_terminated->signal(this->thread_terminated); - } - else - { - this->threads->insert_last(this->threads, thread); - } - this->mutex->unlock(this->mutex); -} - -/** - * Process queued jobs, called by the worker threads - */ -static void process_jobs(private_processor_t *this) -{ - /* worker threads are not cancellable by default */ - thread_cancelability(FALSE); - - DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id()); - - this->mutex->lock(this->mutex); - while (this->desired_threads >= this->total_threads) - { - job_t *job; - - if (this->list->get_count(this->list) == 0) - { - this->idle_threads++; - this->job_added->wait(this->job_added, this->mutex); - this->idle_threads--; - continue; - } - this->list->remove_first(this->list, (void**)&job); - this->mutex->unlock(this->mutex); - /* terminated threads are restarted, so we have a constant pool */ - thread_cleanup_push((thread_cleanup_t)restart, this); - job->execute(job); - thread_cleanup_pop(FALSE); - this->mutex->lock(this->mutex); - } - this->mutex->unlock(this->mutex); - restart(this); -} - -/** - * Implementation of processor_t.get_total_threads. - */ -static u_int get_total_threads(private_processor_t *this) -{ - u_int count; - this->mutex->lock(this->mutex); - count = this->total_threads; - this->mutex->unlock(this->mutex); - return count; -} - -/** - * Implementation of processor_t.get_idle_threads. - */ -static u_int get_idle_threads(private_processor_t *this) -{ - u_int count; - this->mutex->lock(this->mutex); - count = this->idle_threads; - this->mutex->unlock(this->mutex); - return count; -} - -/** - * implements processor_t.get_job_load - */ -static u_int get_job_load(private_processor_t *this) -{ - u_int load; - this->mutex->lock(this->mutex); - load = this->list->get_count(this->list); - this->mutex->unlock(this->mutex); - return load; -} - -/** - * implements function processor_t.queue_job - */ -static void queue_job(private_processor_t *this, job_t *job) -{ - this->mutex->lock(this->mutex); - this->list->insert_last(this->list, job); - this->job_added->signal(this->job_added); - this->mutex->unlock(this->mutex); -} - -/** - * Implementation of processor_t.set_threads. - */ -static void set_threads(private_processor_t *this, u_int count) -{ - this->mutex->lock(this->mutex); - if (count > this->total_threads) - { /* increase thread count */ - int i; - thread_t *current; - - this->desired_threads = count; - DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads); - for (i = this->total_threads; i < count; i++) - { - current = thread_create((thread_main_t)process_jobs, this); - if (current) - { - this->threads->insert_last(this->threads, current); - this->total_threads++; - } - } - } - else if (count < this->total_threads) - { /* decrease thread count */ - this->desired_threads = count; - } - this->job_added->broadcast(this->job_added); - this->mutex->unlock(this->mutex); -} - -/** - * Implementation of processor_t.destroy. - */ -static void destroy(private_processor_t *this) -{ - thread_t *current; - set_threads(this, 0); - this->mutex->lock(this->mutex); - while (this->total_threads > 0) - { - this->job_added->broadcast(this->job_added); - this->thread_terminated->wait(this->thread_terminated, this->mutex); - } - while (this->threads->remove_first(this->threads, - (void**)¤t) == SUCCESS) - { - current->join(current); - } - this->mutex->unlock(this->mutex); - this->thread_terminated->destroy(this->thread_terminated); - this->job_added->destroy(this->job_added); - this->mutex->destroy(this->mutex); - this->list->destroy_offset(this->list, offsetof(job_t, destroy)); - this->threads->destroy(this->threads); - free(this); -} - -/* - * Described in header. - */ -processor_t *processor_create(size_t pool_size) -{ - private_processor_t *this = malloc_thing(private_processor_t); - - this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads; - this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads; - this->public.get_job_load = (u_int(*)(processor_t*))get_job_load; - this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job; - this->public.set_threads = (void(*)(processor_t*, u_int))set_threads; - this->public.destroy = (void(*)(processor_t*))destroy; - - this->list = linked_list_create(); - this->threads = linked_list_create(); - this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); - this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT); - this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT); - this->total_threads = 0; - this->desired_threads = 0; - this->idle_threads = 0; - - return &this->public; -} - diff --git a/src/libcharon/processing/processor.h b/src/libcharon/processing/processor.h deleted file mode 100644 index 5bf8cf573..000000000 --- a/src/libcharon/processing/processor.h +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (C) 2005-2007 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -/** - * @defgroup processor processor - * @{ @ingroup processing - */ - -#ifndef PROCESSOR_H_ -#define PROCESSOR_H_ - -typedef struct processor_t processor_t; - -#include <stdlib.h> - -#include <library.h> -#include <processing/jobs/job.h> - -/** - * The processor uses threads to process queued jobs. - */ -struct processor_t { - - /** - * Get the total number of threads used by the processor. - * - * @return size of thread pool - */ - u_int (*get_total_threads) (processor_t *this); - - /** - * Get the number of threads currently waiting. - * - * @return number of idle threads - */ - u_int (*get_idle_threads) (processor_t *this); - - /** - * Get the number of queued jobs. - * - * @returns number of items in queue - */ - u_int (*get_job_load) (processor_t *this); - - /** - * Adds a job to the queue. - * - * This function is non blocking and adds a job_t to the queue. - * - * @param job job to add to the queue - */ - void (*queue_job) (processor_t *this, job_t *job); - - /** - * Set the number of threads to use in the processor. - * - * If the number of threads is smaller than number of currently running - * threads, thread count is decreased. Use 0 to disable the processor. - * This call blocks if it decreases thread count until threads have - * terminated, so make sure there are not too many blocking jobs. - * - * @param count number of threads to allocate - */ - void (*set_threads)(processor_t *this, u_int count); - - /** - * Destroy a processor object. - */ - void (*destroy) (processor_t *processor); -}; - -/** - * Create the thread pool without any threads. - * - * Use the set_threads method to start processing jobs. - * - * @return processor_t object - */ -processor_t *processor_create(); - -#endif /** PROCESSOR_H_ @}*/ diff --git a/src/libcharon/processing/scheduler.c b/src/libcharon/processing/scheduler.c deleted file mode 100644 index 345af502a..000000000 --- a/src/libcharon/processing/scheduler.c +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Copyright (C) 2008 Tobias Brunner - * Copyright (C) 2005-2006 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -#include <stdlib.h> - -#include "scheduler.h" - -#include <daemon.h> -#include <processing/processor.h> -#include <processing/jobs/callback_job.h> -#include <threading/thread.h> -#include <threading/condvar.h> -#include <threading/mutex.h> - -/* the initial size of the heap */ -#define HEAP_SIZE_DEFAULT 64 - -typedef struct event_t event_t; - -/** - * Event containing a job and a schedule time - */ -struct event_t { - /** - * Time to fire the event. - */ - timeval_t time; - - /** - * Every event has its assigned job. - */ - job_t *job; -}; - -/** - * destroy an event and its job - */ -static void event_destroy(event_t *event) -{ - event->job->destroy(event->job); - free(event); -} - -typedef struct private_scheduler_t private_scheduler_t; - -/** - * Private data of a scheduler_t object. - */ -struct private_scheduler_t { - - /** - * Public part of a scheduler_t object. - */ - scheduler_t public; - - /** - * Job which queues scheduled jobs to the processor. - */ - callback_job_t *job; - - /** - * The heap in which the events are stored. - */ - event_t **heap; - - /** - * The size of the heap. - */ - u_int heap_size; - - /** - * The number of scheduled events. - */ - u_int event_count; - - /** - * Exclusive access to list - */ - mutex_t *mutex; - - /** - * Condvar to wait for next job. - */ - condvar_t *condvar; -}; - -/** - * Comparse two timevals, return >0 if a > b, <0 if a < b and =0 if equal - */ -static int timeval_cmp(timeval_t *a, timeval_t *b) -{ - if (a->tv_sec > b->tv_sec) - { - return 1; - } - if (a->tv_sec < b->tv_sec) - { - return -1; - } - if (a->tv_usec > b->tv_usec) - { - return 1; - } - if (a->tv_usec < b->tv_usec) - { - return -1; - } - return 0; -} - -/** - * Returns the top event without removing it. Returns NULL if the heap is empty. - */ -static event_t *peek_event(private_scheduler_t *this) -{ - return this->event_count > 0 ? this->heap[1] : NULL; -} - -/** - * Removes the top event from the heap and returns it. Returns NULL if the heap - * is empty. - */ -static event_t *remove_event(private_scheduler_t *this) -{ - event_t *event, *top; - if (!this->event_count) - { - return NULL; - } - - /* store the value to return */ - event = this->heap[1]; - /* move the bottom event to the top */ - top = this->heap[1] = this->heap[this->event_count]; - - if (--this->event_count > 1) - { - /* seep down the top event */ - u_int position = 1; - while ((position << 1) <= this->event_count) - { - u_int child = position << 1; - - if ((child + 1) <= this->event_count && - timeval_cmp(&this->heap[child + 1]->time, - &this->heap[child]->time) < 0) - { - /* the "right" child is smaller */ - child++; - } - - if (timeval_cmp(&top->time, &this->heap[child]->time) <= 0) - { - /* the top event fires before the smaller of the two children, - * stop */ - break; - } - - /* swap with the smaller child */ - this->heap[position] = this->heap[child]; - position = child; - } - this->heap[position] = top; - } - return event; -} - -/** - * Get events from the queue and pass it to the processor - */ -static job_requeue_t schedule(private_scheduler_t * this) -{ - timeval_t now; - event_t *event; - bool timed = FALSE, oldstate; - - this->mutex->lock(this->mutex); - - time_monotonic(&now); - - if ((event = peek_event(this)) != NULL) - { - if (timeval_cmp(&now, &event->time) >= 0) - { - remove_event(this); - this->mutex->unlock(this->mutex); - DBG2(DBG_JOB, "got event, queuing job for execution"); - charon->processor->queue_job(charon->processor, event->job); - free(event); - return JOB_REQUEUE_DIRECT; - } - timersub(&event->time, &now, &now); - if (now.tv_sec) - { - DBG2(DBG_JOB, "next event in %ds %dms, waiting", - now.tv_sec, now.tv_usec/1000); - } - else - { - DBG2(DBG_JOB, "next event in %dms, waiting", now.tv_usec/1000); - } - timed = TRUE; - } - thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex); - oldstate = thread_cancelability(TRUE); - - if (timed) - { - this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time); - } - else - { - DBG2(DBG_JOB, "no events, waiting"); - this->condvar->wait(this->condvar, this->mutex); - } - thread_cancelability(oldstate); - thread_cleanup_pop(TRUE); - return JOB_REQUEUE_DIRECT; -} - -/** - * Implements scheduler_t.get_job_load - */ -static u_int get_job_load(private_scheduler_t *this) -{ - int count; - this->mutex->lock(this->mutex); - count = this->event_count; - this->mutex->unlock(this->mutex); - return count; -} - -/** - * Implements scheduler_t.schedule_job_tv. - */ -static void schedule_job_tv(private_scheduler_t *this, job_t *job, timeval_t tv) -{ - event_t *event; - u_int position; - - event = malloc_thing(event_t); - event->job = job; - event->time = tv; - - this->mutex->lock(this->mutex); - - this->event_count++; - if (this->event_count > this->heap_size) - { - /* double the size of the heap */ - this->heap_size <<= 1; - this->heap = (event_t**)realloc(this->heap, - (this->heap_size + 1) * sizeof(event_t*)); - } - /* "put" the event to the bottom */ - position = this->event_count; - - /* then bubble it up */ - while (position > 1 && timeval_cmp(&this->heap[position >> 1]->time, - &event->time) > 0) - { - /* parent has to be fired after the new event, move up */ - this->heap[position] = this->heap[position >> 1]; - position >>= 1; - } - this->heap[position] = event; - - this->condvar->signal(this->condvar); - this->mutex->unlock(this->mutex); -} - -/** - * Implements scheduler_t.schedule_job. - */ -static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t s) -{ - timeval_t tv; - - time_monotonic(&tv); - tv.tv_sec += s; - - schedule_job_tv(this, job, tv); -} - -/** - * Implements scheduler_t.schedule_job_ms. - */ -static void schedule_job_ms(private_scheduler_t *this, job_t *job, u_int32_t ms) -{ - timeval_t tv, add; - - time_monotonic(&tv); - add.tv_sec = ms / 1000; - add.tv_usec = (ms % 1000) * 1000; - - timeradd(&tv, &add, &tv); - - schedule_job_tv(this, job, tv); -} - -/** - * Implementation of scheduler_t.destroy. - */ -static void destroy(private_scheduler_t *this) -{ - event_t *event; - this->job->cancel(this->job); - this->condvar->destroy(this->condvar); - this->mutex->destroy(this->mutex); - while ((event = remove_event(this)) != NULL) - { - event_destroy(event); - } - free(this->heap); - free(this); -} - -/* - * Described in header. - */ -scheduler_t * scheduler_create() -{ - private_scheduler_t *this = malloc_thing(private_scheduler_t); - - this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load; - this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t s)) schedule_job; - this->public.schedule_job_ms = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job_ms; - this->public.schedule_job_tv = (void (*) (scheduler_t *this, job_t *job, timeval_t tv)) schedule_job_tv; - this->public.destroy = (void(*)(scheduler_t*)) destroy; - - /* Note: the root of the heap is at index 1 */ - this->event_count = 0; - this->heap_size = HEAP_SIZE_DEFAULT; - this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*)); - - this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); - this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT); - - this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL); - charon->processor->queue_job(charon->processor, (job_t*)this->job); - - return &this->public; -} - diff --git a/src/libcharon/processing/scheduler.h b/src/libcharon/processing/scheduler.h deleted file mode 100644 index 5f5d2a563..000000000 --- a/src/libcharon/processing/scheduler.h +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright (C) 2009 Tobias Brunner - * Copyright (C) 2005-2007 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -/** - * @defgroup scheduler scheduler - * @{ @ingroup processing - */ - -#ifndef SCHEDULER_H_ -#define SCHEDULER_H_ - -typedef struct scheduler_t scheduler_t; - -#include <library.h> -#include <processing/jobs/job.h> - -/** - * The scheduler queues timed events which are then passed to the processor. - * - * The scheduler is implemented as a heap. A heap is a special kind of tree- - * based data structure that satisfies the following property: if B is a child - * node of A, then key(A) >= (or <=) key(B). So either the element with the - * greatest (max-heap) or the smallest (min-heap) key is the root of the heap. - * We use a min-heap whith the key being the absolute unix time at which an - * event is scheduled. So the root is always the event that will fire next. - * - * An earlier implementation of the scheduler used a sorted linked list to store - * the events. That had the advantage that removing the next event was extremely - * fast, also, adding an event scheduled before or after all other events was - * equally fast (all in O(1)). The problem was, though, that adding an event - * in-between got slower, as the number of events grew larger (O(n)). - * For each connection there could be several events: IKE-rekey, NAT-keepalive, - * retransmissions, expire (half-open), and others. So a gateway that probably - * has to handle thousands of concurrent connnections has to be able to queue a - * large number of events as fast as possible. Locking makes this even worse, to - * provide thread-safety, no events can be processed, while an event is queued, - * so making the insertion fast is even more important. - * - * That's the advantage of the heap. Adding an element to the heap can be - * achieved in O(log n) - on the other hand, removing the root node also - * requires O(log n) operations. Consider 10000 queued events. Inserting a new - * event in the list implementation required up to 10000 comparisons. In the - * heap implementation, the worst case is about 13.3 comparisons. That's a - * drastic improvement. - * - * The implementation itself uses a binary tree mapped to a one-based array to - * store the elements. This reduces storage overhead and simplifies navigation: - * the children of the node at position n are at position 2n and 2n+1 (likewise - * the parent node of the node at position n is at position [n/2]). Thus, - * navigating up and down the tree is reduced to simple index computations. - * - * Adding an element to the heap works as follows: The heap is always filled - * from left to right, until a row is full, then the next row is filled. Mapped - * to an array this gets as simple as putting the new element to the first free - * position. In a one-based array that position equals the number of elements - * currently stored in the heap. Then the heap property has to be restored, i.e. - * the new element has to be "bubbled up" the tree until the parent node's key - * is smaller or the element got the new root of the tree. - * - * Removing the next event from the heap works similarly. The event itself is - * the root node and stored at position 1 of the array. After removing it, the - * root has to be replaced and the heap property has to be restored. This is - * done by moving the bottom element (last row, rightmost element) to the root - * and then "seep it down" by swapping it with child nodes until none of the - * children has a smaller key or it is again a leaf node. - */ -struct scheduler_t { - - /** - * Adds a event to the queue, using a relative time offset in s. - * - * @param job job to schedule - * @param time relative time to schedule job, in s - */ - void (*schedule_job) (scheduler_t *this, job_t *job, u_int32_t s); - - /** - * Adds a event to the queue, using a relative time offset in ms. - * - * @param job job to schedule - * @param time relative time to schedule job, in ms - */ - void (*schedule_job_ms) (scheduler_t *this, job_t *job, u_int32_t ms); - - /** - * Adds a event to the queue, using an absolut time. - * - * The passed timeval should be calculated based on the time_monotonic() - * function. - * - * @param job job to schedule - * @param time absolut time to schedule job - */ - void (*schedule_job_tv) (scheduler_t *this, job_t *job, timeval_t tv); - - /** - * Returns number of jobs scheduled. - * - * @return number of scheduled jobs - */ - u_int (*get_job_load) (scheduler_t *this); - - /** - * Destroys a scheduler object. - */ - void (*destroy) (scheduler_t *this); -}; - -/** - * Create a scheduler. - * - * @return scheduler_t object - */ -scheduler_t *scheduler_create(void); - -#endif /** SCHEDULER_H_ @}*/ |