diff options
author | Rene Mayrhofer <rene@mayrhofer.eu.org> | 2010-11-28 12:11:49 +0000 |
---|---|---|
committer | Rene Mayrhofer <rene@mayrhofer.eu.org> | 2010-11-28 12:11:49 +0000 |
commit | 7b8b352039efd78338a7bf451a0550644ec8a8da (patch) | |
tree | 62e0548df49dfb3ddacc8cac4309fa10f7b42610 /src/libstrongswan/processing | |
parent | 9587b8e553eda7b1b6fd48c77ebe4592e1e3532a (diff) | |
download | vyos-strongswan-7b8b352039efd78338a7bf451a0550644ec8a8da.tar.gz vyos-strongswan-7b8b352039efd78338a7bf451a0550644ec8a8da.zip |
New upstream version.
Diffstat (limited to 'src/libstrongswan/processing')
-rw-r--r-- | src/libstrongswan/processing/jobs/callback_job.c | 271 | ||||
-rw-r--r-- | src/libstrongswan/processing/jobs/callback_job.h | 118 | ||||
-rw-r--r-- | src/libstrongswan/processing/jobs/job.h | 52 | ||||
-rw-r--r-- | src/libstrongswan/processing/processor.c | 273 | ||||
-rw-r--r-- | src/libstrongswan/processing/processor.h | 94 | ||||
-rw-r--r-- | src/libstrongswan/processing/scheduler.c | 358 | ||||
-rw-r--r-- | src/libstrongswan/processing/scheduler.h | 130 |
7 files changed, 1296 insertions, 0 deletions
diff --git a/src/libstrongswan/processing/jobs/callback_job.c b/src/libstrongswan/processing/jobs/callback_job.c new file mode 100644 index 000000000..556cbd907 --- /dev/null +++ b/src/libstrongswan/processing/jobs/callback_job.c @@ -0,0 +1,271 @@ +/* + * Copyright (C) 2009 Tobias Brunner + * Copyright (C) 2007 Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "callback_job.h" + +#include <semaphore.h> + +#include <threading/thread.h> +#include <threading/condvar.h> +#include <threading/mutex.h> +#include <utils/linked_list.h> + +typedef struct private_callback_job_t private_callback_job_t; + +/** + * Private data of an callback_job_t Object. + */ +struct private_callback_job_t { + /** + * Public callback_job_t interface. + */ + callback_job_t public; + + /** + * Callback to call on execution + */ + callback_job_cb_t callback; + + /** + * parameter to supply to callback + */ + void *data; + + /** + * cleanup function for data + */ + callback_job_cleanup_t cleanup; + + /** + * thread of the job, if running + */ + thread_t *thread; + + /** + * mutex to access jobs interna + */ + mutex_t *mutex; + + /** + * list of asociated child jobs + */ + linked_list_t *children; + + /** + * parent of this job, or NULL + */ + private_callback_job_t *parent; + + /** + * TRUE if the job got cancelled + */ + bool cancelled; + + /** + * condvar to synchronize the cancellation/destruction of the job + */ + condvar_t *destroyable; + + /** + * semaphore to synchronize the termination of the assigned thread. + * + * separately allocated during cancellation, so that we can wait on it + * without risking that it gets freed too early during destruction. + */ + sem_t *terminated; +}; + +/** + * unregister a child from its parent, if any. + * note: this->mutex has to be locked + */ +static void unregister(private_callback_job_t *this) +{ + if (this->parent) + { + this->parent->mutex->lock(this->parent->mutex); + if (this->parent->cancelled && !this->cancelled) + { + /* if the parent has been cancelled but we have not yet, we do not + * unregister until we got cancelled by the parent. */ + this->parent->mutex->unlock(this->parent->mutex); + this->destroyable->wait(this->destroyable, this->mutex); + this->parent->mutex->lock(this->parent->mutex); + } + this->parent->children->remove(this->parent->children, this, NULL); + this->parent->mutex->unlock(this->parent->mutex); + this->parent = NULL; + } +} + +/** + * Implements job_t.destroy. + */ +static void destroy(private_callback_job_t *this) +{ + this->mutex->lock(this->mutex); + unregister(this); + if (this->cleanup) + { + this->cleanup(this->data); + } + if (this->terminated) + { + sem_post(this->terminated); + } + this->children->destroy(this->children); + this->destroyable->destroy(this->destroyable); + this->mutex->unlock(this->mutex); + this->mutex->destroy(this->mutex); + free(this); +} + +/** + * Implementation of callback_job_t.cancel. + */ +static void cancel(private_callback_job_t *this) +{ + callback_job_t *child; + sem_t *terminated = NULL; + + this->mutex->lock(this->mutex); + this->cancelled = TRUE; + /* terminate children */ + while (this->children->get_first(this->children, (void**)&child) == SUCCESS) + { + this->mutex->unlock(this->mutex); + child->cancel(child); + this->mutex->lock(this->mutex); + } + if (this->thread) + { + /* terminate the thread, if there is currently one executing the job. + * we wait for its termination using a semaphore */ + this->thread->cancel(this->thread); + terminated = this->terminated = malloc_thing(sem_t); + sem_init(terminated, 0, 0); + } + else + { + /* if the job is currently queued, it gets terminated later. + * we can't wait, because it might not get executed at all. + * we also unregister the queued job manually from its parent (the + * others get unregistered during destruction) */ + unregister(this); + } + this->destroyable->signal(this->destroyable); + this->mutex->unlock(this->mutex); + + if (terminated) + { + sem_wait(terminated); + sem_destroy(terminated); + free(terminated); + } +} + +/** + * Implementation of job_t.execute. + */ +static void execute(private_callback_job_t *this) +{ + bool cleanup = FALSE, requeue = FALSE; + + thread_cleanup_push((thread_cleanup_t)destroy, this); + + this->mutex->lock(this->mutex); + this->thread = thread_current(); + this->mutex->unlock(this->mutex); + + while (TRUE) + { + this->mutex->lock(this->mutex); + if (this->cancelled) + { + this->mutex->unlock(this->mutex); + cleanup = TRUE; + break; + } + this->mutex->unlock(this->mutex); + switch (this->callback(this->data)) + { + case JOB_REQUEUE_DIRECT: + continue; + case JOB_REQUEUE_FAIR: + { + requeue = TRUE; + break; + } + case JOB_REQUEUE_NONE: + default: + { + cleanup = TRUE; + break; + } + } + break; + } + this->mutex->lock(this->mutex); + this->thread = NULL; + this->mutex->unlock(this->mutex); + /* manually create a cancellation point to avoid that a cancelled thread + * goes back into the thread pool */ + thread_cancellation_point(); + if (requeue) + { + lib->processor->queue_job(lib->processor, + &this->public.job_interface); + } + thread_cleanup_pop(cleanup); +} + +/* + * Described in header. + */ +callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, + callback_job_cleanup_t cleanup, + callback_job_t *parent) +{ + private_callback_job_t *this = malloc_thing(private_callback_job_t); + + /* interface functions */ + this->public.job_interface.execute = (void (*) (job_t *)) execute; + this->public.job_interface.destroy = (void (*) (job_t *)) destroy; + this->public.cancel = (void(*)(callback_job_t*))cancel; + + /* private variables */ + this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); + this->callback = cb; + this->data = data; + this->cleanup = cleanup; + this->thread = 0; + this->children = linked_list_create(); + this->parent = (private_callback_job_t*)parent; + this->cancelled = FALSE; + this->destroyable = condvar_create(CONDVAR_TYPE_DEFAULT); + this->terminated = NULL; + + /* register us at parent */ + if (parent) + { + this->parent->mutex->lock(this->parent->mutex); + this->parent->children->insert_last(this->parent->children, this); + this->parent->mutex->unlock(this->parent->mutex); + } + + return &this->public; +} + diff --git a/src/libstrongswan/processing/jobs/callback_job.h b/src/libstrongswan/processing/jobs/callback_job.h new file mode 100644 index 000000000..62da1edd1 --- /dev/null +++ b/src/libstrongswan/processing/jobs/callback_job.h @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2007 Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +/** + * @defgroup callback_job callback_job + * @{ @ingroup jobs + */ + +#ifndef CALLBACK_JOB_H_ +#define CALLBACK_JOB_H_ + +typedef struct callback_job_t callback_job_t; + +#include <library.h> +#include <processing/jobs/job.h> + + +typedef enum job_requeue_t job_requeue_t; + +/** + * Job requeueing policy + * + * The job requeueing policy defines how a job is handled when the callback + * function returns. + */ +enum job_requeue_t { + + /** + * Do not requeue job, destroy it + */ + JOB_REQUEUE_NONE, + + /** + * Reque the job fairly, meaning it has to requeue as any other job + */ + JOB_REQUEUE_FAIR, + + /** + * Reexecute the job directly, without the need of requeueing it + */ + JOB_REQUEUE_DIRECT, +}; + +/** + * The callback function to use for the callback job. + * + * This is the function to use as callback for a callback job. It receives + * a parameter supplied to the callback jobs constructor. + * + * @param data param supplied to job + * @return requeing policy how to requeue the job + */ +typedef job_requeue_t (*callback_job_cb_t)(void *data); + +/** + * Cleanup function to use for data cleanup. + * + * The callback has an optional user argument which receives data. However, + * this data may be cleaned up if it is allocated. This is the function + * to supply to the constructor. + * + * @param data param supplied to job + * @return requeing policy how to requeue the job + */ +typedef void (*callback_job_cleanup_t)(void *data); + +/** + * Class representing an callback Job. + * + * This is a special job which allows a simple callback function to + * be executed by a thread of the thread pool. This allows simple execution + * of asynchronous methods, without to manage threads. + */ +struct callback_job_t { + /** + * The job_t interface. + */ + job_t job_interface; + + /** + * Cancel the job's thread and wait for its termination. This only works + * reliably for jobs that always use JOB_REQUEUE_FAIR or JOB_REQUEUE_DIRECT, + * otherwise the job may already be destroyed when cancel is called. */ + void (*cancel)(callback_job_t *this); +}; + +/** + * Creates a callback job. + * + * The cleanup function is called when the job gets destroyed to destroy + * the associated data. + * If parent is not NULL, the specified job gets an association. Whenever + * the parent gets cancelled (or runs out), all of its children are cancelled, + * too. + * + * @param cb callback to call from the processor + * @param data user data to supply to callback + * @param cleanup destructor for data on destruction, or NULL + * @param parent parent of this job + * @return callback_job_t object + */ +callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, + callback_job_cleanup_t cleanup, + callback_job_t *parent); + +#endif /** CALLBACK_JOB_H_ @}*/ diff --git a/src/libstrongswan/processing/jobs/job.h b/src/libstrongswan/processing/jobs/job.h new file mode 100644 index 000000000..0f1c16ebe --- /dev/null +++ b/src/libstrongswan/processing/jobs/job.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2005-2006 Martin Willi + * Copyright (C) 2005 Jan Hutter + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +/** + * @defgroup job job + * @{ @ingroup jobs + */ + +#ifndef JOB_H_ +#define JOB_H_ + +typedef struct job_t job_t; + +#include <library.h> + +/** + * Job-Interface as it is stored in the job queue. + */ +struct job_t { + + /** + * Execute a job. + * + * The processing facility executes a job using this method. Jobs are + * one-shot, they destroy themself after execution, so don't use a job + * once it has been executed. + */ + void (*execute) (job_t *this); + + /** + * Destroy a job. + * + * Is only called whenever a job was not executed (e.g. due daemon shutdown). + * After execution, jobs destroy themself. + */ + void (*destroy) (job_t *job); +}; + +#endif /** JOB_H_ @}*/ diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c new file mode 100644 index 000000000..2a44f61e8 --- /dev/null +++ b/src/libstrongswan/processing/processor.c @@ -0,0 +1,273 @@ +/* + * Copyright (C) 2005-2007 Martin Willi + * Copyright (C) 2005 Jan Hutter + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include <stdlib.h> +#include <string.h> +#include <errno.h> + +#include "processor.h" + +#include <debug.h> +#include <threading/thread.h> +#include <threading/condvar.h> +#include <threading/mutex.h> +#include <utils/linked_list.h> + + +typedef struct private_processor_t private_processor_t; + +/** + * Private data of processor_t class. + */ +struct private_processor_t { + /** + * Public processor_t interface. + */ + processor_t public; + + /** + * Number of running threads + */ + u_int total_threads; + + /** + * Desired number of threads + */ + u_int desired_threads; + + /** + * Number of threads waiting for work + */ + u_int idle_threads; + + /** + * All threads managed in the pool (including threads that have been + * cancelled, this allows to join them during destruction) + */ + linked_list_t *threads; + + /** + * The jobs are stored in a linked list + */ + linked_list_t *list; + + /** + * access to linked_list is locked through this mutex + */ + mutex_t *mutex; + + /** + * Condvar to wait for new jobs + */ + condvar_t *job_added; + + /** + * Condvar to wait for terminated threads + */ + condvar_t *thread_terminated; +}; + +static void process_jobs(private_processor_t *this); + +/** + * restart a terminated thread + */ +static void restart(private_processor_t *this) +{ + thread_t *thread; + + DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id()); + + /* respawn thread if required */ + this->mutex->lock(this->mutex); + if (this->desired_threads < this->total_threads || + (thread = thread_create((thread_main_t)process_jobs, this)) == NULL) + { + this->total_threads--; + this->thread_terminated->signal(this->thread_terminated); + } + else + { + this->threads->insert_last(this->threads, thread); + } + this->mutex->unlock(this->mutex); +} + +/** + * Process queued jobs, called by the worker threads + */ +static void process_jobs(private_processor_t *this) +{ + /* worker threads are not cancellable by default */ + thread_cancelability(FALSE); + + DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id()); + + this->mutex->lock(this->mutex); + while (this->desired_threads >= this->total_threads) + { + job_t *job; + + if (this->list->get_count(this->list) == 0) + { + this->idle_threads++; + this->job_added->wait(this->job_added, this->mutex); + this->idle_threads--; + continue; + } + this->list->remove_first(this->list, (void**)&job); + this->mutex->unlock(this->mutex); + /* terminated threads are restarted, so we have a constant pool */ + thread_cleanup_push((thread_cleanup_t)restart, this); + job->execute(job); + thread_cleanup_pop(FALSE); + this->mutex->lock(this->mutex); + } + this->mutex->unlock(this->mutex); + restart(this); +} + +/** + * Implementation of processor_t.get_total_threads. + */ +static u_int get_total_threads(private_processor_t *this) +{ + u_int count; + this->mutex->lock(this->mutex); + count = this->total_threads; + this->mutex->unlock(this->mutex); + return count; +} + +/** + * Implementation of processor_t.get_idle_threads. + */ +static u_int get_idle_threads(private_processor_t *this) +{ + u_int count; + this->mutex->lock(this->mutex); + count = this->idle_threads; + this->mutex->unlock(this->mutex); + return count; +} + +/** + * implements processor_t.get_job_load + */ +static u_int get_job_load(private_processor_t *this) +{ + u_int load; + this->mutex->lock(this->mutex); + load = this->list->get_count(this->list); + this->mutex->unlock(this->mutex); + return load; +} + +/** + * implements function processor_t.queue_job + */ +static void queue_job(private_processor_t *this, job_t *job) +{ + this->mutex->lock(this->mutex); + this->list->insert_last(this->list, job); + this->job_added->signal(this->job_added); + this->mutex->unlock(this->mutex); +} + +/** + * Implementation of processor_t.set_threads. + */ +static void set_threads(private_processor_t *this, u_int count) +{ + this->mutex->lock(this->mutex); + if (count > this->total_threads) + { /* increase thread count */ + int i; + thread_t *current; + + this->desired_threads = count; + DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads); + for (i = this->total_threads; i < count; i++) + { + current = thread_create((thread_main_t)process_jobs, this); + if (current) + { + this->threads->insert_last(this->threads, current); + this->total_threads++; + } + } + } + else if (count < this->total_threads) + { /* decrease thread count */ + this->desired_threads = count; + } + this->job_added->broadcast(this->job_added); + this->mutex->unlock(this->mutex); +} + +/** + * Implementation of processor_t.destroy. + */ +static void destroy(private_processor_t *this) +{ + thread_t *current; + set_threads(this, 0); + this->mutex->lock(this->mutex); + while (this->total_threads > 0) + { + this->job_added->broadcast(this->job_added); + this->thread_terminated->wait(this->thread_terminated, this->mutex); + } + while (this->threads->remove_first(this->threads, + (void**)¤t) == SUCCESS) + { + current->join(current); + } + this->mutex->unlock(this->mutex); + this->thread_terminated->destroy(this->thread_terminated); + this->job_added->destroy(this->job_added); + this->mutex->destroy(this->mutex); + this->list->destroy_offset(this->list, offsetof(job_t, destroy)); + this->threads->destroy(this->threads); + free(this); +} + +/* + * Described in header. + */ +processor_t *processor_create(size_t pool_size) +{ + private_processor_t *this = malloc_thing(private_processor_t); + + this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads; + this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads; + this->public.get_job_load = (u_int(*)(processor_t*))get_job_load; + this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job; + this->public.set_threads = (void(*)(processor_t*, u_int))set_threads; + this->public.destroy = (void(*)(processor_t*))destroy; + + this->list = linked_list_create(); + this->threads = linked_list_create(); + this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); + this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT); + this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT); + this->total_threads = 0; + this->desired_threads = 0; + this->idle_threads = 0; + + return &this->public; +} + diff --git a/src/libstrongswan/processing/processor.h b/src/libstrongswan/processing/processor.h new file mode 100644 index 000000000..bebbe3a15 --- /dev/null +++ b/src/libstrongswan/processing/processor.h @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2005-2007 Martin Willi + * Copyright (C) 2005 Jan Hutter + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +/** + * @defgroup processor processor + * @{ @ingroup processing + */ + +#ifndef PROCESSOR_H_ +#define PROCESSOR_H_ + +typedef struct processor_t processor_t; + +#include <stdlib.h> + +#include <library.h> +#include <processing/jobs/job.h> + +/** + * The processor uses threads to process queued jobs. + */ +struct processor_t { + + /** + * Get the total number of threads used by the processor. + * + * @return size of thread pool + */ + u_int (*get_total_threads) (processor_t *this); + + /** + * Get the number of threads currently waiting. + * + * @return number of idle threads + */ + u_int (*get_idle_threads) (processor_t *this); + + /** + * Get the number of queued jobs. + * + * @return number of items in queue + */ + u_int (*get_job_load) (processor_t *this); + + /** + * Adds a job to the queue. + * + * This function is non blocking and adds a job_t to the queue. + * + * @param job job to add to the queue + */ + void (*queue_job) (processor_t *this, job_t *job); + + /** + * Set the number of threads to use in the processor. + * + * If the number of threads is smaller than number of currently running + * threads, thread count is decreased. Use 0 to disable the processor. + * This call blocks if it decreases thread count until threads have + * terminated, so make sure there are not too many blocking jobs. + * + * @param count number of threads to allocate + */ + void (*set_threads)(processor_t *this, u_int count); + + /** + * Destroy a processor object. + */ + void (*destroy) (processor_t *processor); +}; + +/** + * Create the thread pool without any threads. + * + * Use the set_threads method to start processing jobs. + * + * @return processor_t object + */ +processor_t *processor_create(); + +#endif /** PROCESSOR_H_ @}*/ diff --git a/src/libstrongswan/processing/scheduler.c b/src/libstrongswan/processing/scheduler.c new file mode 100644 index 000000000..e23f04598 --- /dev/null +++ b/src/libstrongswan/processing/scheduler.c @@ -0,0 +1,358 @@ +/* + * Copyright (C) 2008 Tobias Brunner + * Copyright (C) 2005-2006 Martin Willi + * Copyright (C) 2005 Jan Hutter + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include <stdlib.h> + +#include "scheduler.h" + +#include <debug.h> +#include <processing/processor.h> +#include <processing/jobs/callback_job.h> +#include <threading/thread.h> +#include <threading/condvar.h> +#include <threading/mutex.h> + +/* the initial size of the heap */ +#define HEAP_SIZE_DEFAULT 64 + +typedef struct event_t event_t; + +/** + * Event containing a job and a schedule time + */ +struct event_t { + /** + * Time to fire the event. + */ + timeval_t time; + + /** + * Every event has its assigned job. + */ + job_t *job; +}; + +/** + * destroy an event and its job + */ +static void event_destroy(event_t *event) +{ + event->job->destroy(event->job); + free(event); +} + +typedef struct private_scheduler_t private_scheduler_t; + +/** + * Private data of a scheduler_t object. + */ +struct private_scheduler_t { + + /** + * Public part of a scheduler_t object. + */ + scheduler_t public; + + /** + * Job which queues scheduled jobs to the processor. + */ + callback_job_t *job; + + /** + * The heap in which the events are stored. + */ + event_t **heap; + + /** + * The size of the heap. + */ + u_int heap_size; + + /** + * The number of scheduled events. + */ + u_int event_count; + + /** + * Exclusive access to list + */ + mutex_t *mutex; + + /** + * Condvar to wait for next job. + */ + condvar_t *condvar; +}; + +/** + * Comparse two timevals, return >0 if a > b, <0 if a < b and =0 if equal + */ +static int timeval_cmp(timeval_t *a, timeval_t *b) +{ + if (a->tv_sec > b->tv_sec) + { + return 1; + } + if (a->tv_sec < b->tv_sec) + { + return -1; + } + if (a->tv_usec > b->tv_usec) + { + return 1; + } + if (a->tv_usec < b->tv_usec) + { + return -1; + } + return 0; +} + +/** + * Returns the top event without removing it. Returns NULL if the heap is empty. + */ +static event_t *peek_event(private_scheduler_t *this) +{ + return this->event_count > 0 ? this->heap[1] : NULL; +} + +/** + * Removes the top event from the heap and returns it. Returns NULL if the heap + * is empty. + */ +static event_t *remove_event(private_scheduler_t *this) +{ + event_t *event, *top; + if (!this->event_count) + { + return NULL; + } + + /* store the value to return */ + event = this->heap[1]; + /* move the bottom event to the top */ + top = this->heap[1] = this->heap[this->event_count]; + + if (--this->event_count > 1) + { + /* seep down the top event */ + u_int position = 1; + while ((position << 1) <= this->event_count) + { + u_int child = position << 1; + + if ((child + 1) <= this->event_count && + timeval_cmp(&this->heap[child + 1]->time, + &this->heap[child]->time) < 0) + { + /* the "right" child is smaller */ + child++; + } + + if (timeval_cmp(&top->time, &this->heap[child]->time) <= 0) + { + /* the top event fires before the smaller of the two children, + * stop */ + break; + } + + /* swap with the smaller child */ + this->heap[position] = this->heap[child]; + position = child; + } + this->heap[position] = top; + } + return event; +} + +/** + * Get events from the queue and pass it to the processor + */ +static job_requeue_t schedule(private_scheduler_t * this) +{ + timeval_t now; + event_t *event; + bool timed = FALSE, oldstate; + + this->mutex->lock(this->mutex); + + time_monotonic(&now); + + if ((event = peek_event(this)) != NULL) + { + if (timeval_cmp(&now, &event->time) >= 0) + { + remove_event(this); + this->mutex->unlock(this->mutex); + DBG2(DBG_JOB, "got event, queuing job for execution"); + lib->processor->queue_job(lib->processor, event->job); + free(event); + return JOB_REQUEUE_DIRECT; + } + timersub(&event->time, &now, &now); + if (now.tv_sec) + { + DBG2(DBG_JOB, "next event in %ds %dms, waiting", + now.tv_sec, now.tv_usec/1000); + } + else + { + DBG2(DBG_JOB, "next event in %dms, waiting", now.tv_usec/1000); + } + timed = TRUE; + } + thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex); + oldstate = thread_cancelability(TRUE); + + if (timed) + { + this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time); + } + else + { + DBG2(DBG_JOB, "no events, waiting"); + this->condvar->wait(this->condvar, this->mutex); + } + thread_cancelability(oldstate); + thread_cleanup_pop(TRUE); + return JOB_REQUEUE_DIRECT; +} + +/** + * Implements scheduler_t.get_job_load + */ +static u_int get_job_load(private_scheduler_t *this) +{ + int count; + this->mutex->lock(this->mutex); + count = this->event_count; + this->mutex->unlock(this->mutex); + return count; +} + +/** + * Implements scheduler_t.schedule_job_tv. + */ +static void schedule_job_tv(private_scheduler_t *this, job_t *job, timeval_t tv) +{ + event_t *event; + u_int position; + + event = malloc_thing(event_t); + event->job = job; + event->time = tv; + + this->mutex->lock(this->mutex); + + this->event_count++; + if (this->event_count > this->heap_size) + { + /* double the size of the heap */ + this->heap_size <<= 1; + this->heap = (event_t**)realloc(this->heap, + (this->heap_size + 1) * sizeof(event_t*)); + } + /* "put" the event to the bottom */ + position = this->event_count; + + /* then bubble it up */ + while (position > 1 && timeval_cmp(&this->heap[position >> 1]->time, + &event->time) > 0) + { + /* parent has to be fired after the new event, move up */ + this->heap[position] = this->heap[position >> 1]; + position >>= 1; + } + this->heap[position] = event; + + this->condvar->signal(this->condvar); + this->mutex->unlock(this->mutex); +} + +/** + * Implements scheduler_t.schedule_job. + */ +static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t s) +{ + timeval_t tv; + + time_monotonic(&tv); + tv.tv_sec += s; + + schedule_job_tv(this, job, tv); +} + +/** + * Implements scheduler_t.schedule_job_ms. + */ +static void schedule_job_ms(private_scheduler_t *this, job_t *job, u_int32_t ms) +{ + timeval_t tv, add; + + time_monotonic(&tv); + add.tv_sec = ms / 1000; + add.tv_usec = (ms % 1000) * 1000; + + timeradd(&tv, &add, &tv); + + schedule_job_tv(this, job, tv); +} + +/** + * Implementation of scheduler_t.destroy. + */ +static void destroy(private_scheduler_t *this) +{ + event_t *event; + this->job->cancel(this->job); + this->condvar->destroy(this->condvar); + this->mutex->destroy(this->mutex); + while ((event = remove_event(this)) != NULL) + { + event_destroy(event); + } + free(this->heap); + free(this); +} + +/* + * Described in header. + */ +scheduler_t * scheduler_create() +{ + private_scheduler_t *this = malloc_thing(private_scheduler_t); + + this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load; + this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t s)) schedule_job; + this->public.schedule_job_ms = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job_ms; + this->public.schedule_job_tv = (void (*) (scheduler_t *this, job_t *job, timeval_t tv)) schedule_job_tv; + this->public.destroy = (void(*)(scheduler_t*)) destroy; + + /* Note: the root of the heap is at index 1 */ + this->event_count = 0; + this->heap_size = HEAP_SIZE_DEFAULT; + this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*)); + + this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); + this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT); + + this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL); + lib->processor->queue_job(lib->processor, (job_t*)this->job); + + return &this->public; +} + diff --git a/src/libstrongswan/processing/scheduler.h b/src/libstrongswan/processing/scheduler.h new file mode 100644 index 000000000..f2c72550f --- /dev/null +++ b/src/libstrongswan/processing/scheduler.h @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2009 Tobias Brunner + * Copyright (C) 2005-2007 Martin Willi + * Copyright (C) 2005 Jan Hutter + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +/** + * @defgroup scheduler scheduler + * @{ @ingroup processing + */ + +#ifndef SCHEDULER_H_ +#define SCHEDULER_H_ + +typedef struct scheduler_t scheduler_t; + +#include <library.h> +#include <processing/jobs/job.h> + +/** + * The scheduler queues timed events which are then passed to the processor. + * + * The scheduler is implemented as a heap. A heap is a special kind of tree- + * based data structure that satisfies the following property: if B is a child + * node of A, then key(A) >= (or <=) key(B). So either the element with the + * greatest (max-heap) or the smallest (min-heap) key is the root of the heap. + * We use a min-heap whith the key being the absolute unix time at which an + * event is scheduled. So the root is always the event that will fire next. + * + * An earlier implementation of the scheduler used a sorted linked list to store + * the events. That had the advantage that removing the next event was extremely + * fast, also, adding an event scheduled before or after all other events was + * equally fast (all in O(1)). The problem was, though, that adding an event + * in-between got slower, as the number of events grew larger (O(n)). + * For each connection there could be several events: IKE-rekey, NAT-keepalive, + * retransmissions, expire (half-open), and others. So a gateway that probably + * has to handle thousands of concurrent connnections has to be able to queue a + * large number of events as fast as possible. Locking makes this even worse, to + * provide thread-safety, no events can be processed, while an event is queued, + * so making the insertion fast is even more important. + * + * That's the advantage of the heap. Adding an element to the heap can be + * achieved in O(log n) - on the other hand, removing the root node also + * requires O(log n) operations. Consider 10000 queued events. Inserting a new + * event in the list implementation required up to 10000 comparisons. In the + * heap implementation, the worst case is about 13.3 comparisons. That's a + * drastic improvement. + * + * The implementation itself uses a binary tree mapped to a one-based array to + * store the elements. This reduces storage overhead and simplifies navigation: + * the children of the node at position n are at position 2n and 2n+1 (likewise + * the parent node of the node at position n is at position [n/2]). Thus, + * navigating up and down the tree is reduced to simple index computations. + * + * Adding an element to the heap works as follows: The heap is always filled + * from left to right, until a row is full, then the next row is filled. Mapped + * to an array this gets as simple as putting the new element to the first free + * position. In a one-based array that position equals the number of elements + * currently stored in the heap. Then the heap property has to be restored, i.e. + * the new element has to be "bubbled up" the tree until the parent node's key + * is smaller or the element got the new root of the tree. + * + * Removing the next event from the heap works similarly. The event itself is + * the root node and stored at position 1 of the array. After removing it, the + * root has to be replaced and the heap property has to be restored. This is + * done by moving the bottom element (last row, rightmost element) to the root + * and then "seep it down" by swapping it with child nodes until none of the + * children has a smaller key or it is again a leaf node. + */ +struct scheduler_t { + + /** + * Adds a event to the queue, using a relative time offset in s. + * + * @param job job to schedule + * @param time relative time to schedule job, in s + */ + void (*schedule_job) (scheduler_t *this, job_t *job, u_int32_t s); + + /** + * Adds a event to the queue, using a relative time offset in ms. + * + * @param job job to schedule + * @param time relative time to schedule job, in ms + */ + void (*schedule_job_ms) (scheduler_t *this, job_t *job, u_int32_t ms); + + /** + * Adds a event to the queue, using an absolut time. + * + * The passed timeval should be calculated based on the time_monotonic() + * function. + * + * @param job job to schedule + * @param time absolut time to schedule job + */ + void (*schedule_job_tv) (scheduler_t *this, job_t *job, timeval_t tv); + + /** + * Returns number of jobs scheduled. + * + * @return number of scheduled jobs + */ + u_int (*get_job_load) (scheduler_t *this); + + /** + * Destroys a scheduler object. + */ + void (*destroy) (scheduler_t *this); +}; + +/** + * Create a scheduler. + * + * @return scheduler_t object + */ +scheduler_t *scheduler_create(void); + +#endif /** SCHEDULER_H_ @}*/ |