diff options
Diffstat (limited to 'src/charon/processing/scheduler.c')
-rw-r--r-- | src/charon/processing/scheduler.c | 214 |
1 files changed, 186 insertions, 28 deletions
diff --git a/src/charon/processing/scheduler.c b/src/charon/processing/scheduler.c index 7249e43e6..2706585b0 100644 --- a/src/charon/processing/scheduler.c +++ b/src/charon/processing/scheduler.c @@ -23,12 +23,39 @@ #include <stdlib.h> #include <pthread.h> +#include <sys/time.h> #include "scheduler.h" #include <daemon.h> -#include <processing/job_queue.h> +#include <processing/processor.h> +#include <processing/jobs/callback_job.h> +typedef struct event_t event_t; + +/** + * Event containing a job and a schedule time + */ +struct event_t { + /** + * Time to fire the event. + */ + timeval_t time; + + /** + * Every event has its assigned job. + */ + job_t *job; +}; + +/** + * destroy an event and its job + */ +static void event_destroy(event_t *event) +{ + event->job->destroy(event->job); + free(event); +} typedef struct private_scheduler_t private_scheduler_t; @@ -42,36 +69,164 @@ struct private_scheduler_t { scheduler_t public; /** - * Assigned thread. + * Job wich schedules */ - pthread_t assigned_thread; + callback_job_t *job; + + /** + * The jobs are scheduled in a list. + */ + linked_list_t *list; + + /** + * Exclusive access to list + */ + pthread_mutex_t mutex; + + /** + * Condvar to wait for next job. + */ + pthread_cond_t condvar; }; /** - * Implementation of private_scheduler_t.get_events. + * Returns the difference of two timeval structs in milliseconds + */ +static long time_difference(timeval_t *end, timeval_t *start) +{ + time_t s; + suseconds_t us; + + s = end->tv_sec - start->tv_sec; + us = end->tv_usec - start->tv_usec; + return (s * 1000 + us/1000); +} + +/** + * Get events from the queue and pass it to the processor */ -static void get_events(private_scheduler_t * this) +static job_requeue_t schedule(private_scheduler_t * this) { - job_t *current_job; + timespec_t timeout; + timeval_t now; + event_t *event; + long difference; + int oldstate; + bool timed = FALSE; - /* cancellation disabled by default */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + DBG2(DBG_JOB, "waiting for next event..."); + pthread_mutex_lock(&this->mutex); - DBG1(DBG_JOB, "scheduler thread running, thread_ID: %06u", - (int)pthread_self()); + gettimeofday(&now, NULL); + + if (this->list->get_count(this->list) > 0) + { + this->list->get_first(this->list, (void **)&event); + difference = time_difference(&now, &event->time); + if (difference > 0) + { + DBG2(DBG_JOB, "got event, queueing job for execution"); + this->list->remove_first(this->list, (void **)&event); + pthread_mutex_unlock(&this->mutex); + charon->processor->queue_job(charon->processor, event->job); + free(event); + return JOB_REQUEUE_DIRECT; + } + timeout.tv_sec = event->time.tv_sec; + timeout.tv_nsec = event->time.tv_usec * 1000; + timed = TRUE; + } + pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); + + if (timed) + { + pthread_cond_timedwait(&this->condvar, &this->mutex, &timeout); + } + else + { + pthread_cond_wait(&this->condvar, &this->mutex); + } + pthread_setcancelstate(oldstate, NULL); + pthread_cleanup_pop(0); + + pthread_mutex_unlock(&this->mutex); + return JOB_REQUEUE_DIRECT; +} - charon->drop_capabilities(charon, TRUE); +/** + * Implements scheduler_t.get_job_load + */ +static u_int get_job_load(private_scheduler_t *this) +{ + int count; + pthread_mutex_lock(&this->mutex); + count = this->list->get_count(this->list); + pthread_mutex_unlock(&this->mutex); + return count; +} - while (TRUE) +/** + * Implements scheduler_t.schedule_job. + */ +static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t time) +{ + timeval_t now; + event_t *event, *current; + iterator_t *iterator; + time_t s; + suseconds_t us; + + event = malloc_thing(event_t); + event->job = job; + + /* calculate absolute time */ + s = time / 1000; + us = (time - s * 1000) * 1000; + gettimeofday(&now, NULL); + event->time.tv_usec = (now.tv_usec + us) % 1000000; + event->time.tv_sec = now.tv_sec + (now.tv_usec + us)/1000000 + s; + + pthread_mutex_lock(&this->mutex); + while(TRUE) { - DBG2(DBG_JOB, "waiting for next event..."); - /* get a job, this block until one is available */ - current_job = charon->event_queue->get(charon->event_queue); - /* queue the job in the job queue, workers will eat them */ - DBG2(DBG_JOB, "got event, adding job %N to job-queue", - job_type_names, current_job->get_type(current_job)); - charon->job_queue->add(charon->job_queue, current_job); + if (this->list->get_count(this->list) == 0) + { + this->list->insert_first(this->list,event); + break; + } + + this->list->get_last(this->list, (void**)¤t); + if (time_difference(&event->time, ¤t->time) >= 0) + { /* new event has to be fired after the last event in list */ + this->list->insert_last(this->list, event); + break; + } + + this->list->get_first(this->list, (void**)¤t); + if (time_difference(&event->time, ¤t->time) < 0) + { /* new event has to be fired before the first event in list */ + this->list->insert_first(this->list, event); + break; + } + + iterator = this->list->create_iterator(this->list, TRUE); + /* first element has not to be checked (already done) */ + iterator->iterate(iterator, (void**)¤t); + while(iterator->iterate(iterator, (void**)¤t)) + { + if (time_difference(&event->time, ¤t->time) <= 0) + { + /* new event has to be fired before the current event in list */ + iterator->insert_before(iterator, event); + break; + } + } + iterator->destroy(iterator); + break; } + pthread_cond_signal(&this->condvar); + pthread_mutex_unlock(&this->mutex); } /** @@ -79,8 +234,8 @@ static void get_events(private_scheduler_t * this) */ static void destroy(private_scheduler_t *this) { - pthread_cancel(this->assigned_thread); - pthread_join(this->assigned_thread, NULL); + this->job->cancel(this->job); + this->list->destroy_function(this->list, (void*)event_destroy); free(this); } @@ -91,14 +246,17 @@ scheduler_t * scheduler_create() { private_scheduler_t *this = malloc_thing(private_scheduler_t); + this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load; + this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job; this->public.destroy = (void(*)(scheduler_t*)) destroy; - if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))get_events, this) != 0) - { - /* thread could not be created */ - free(this); - charon->kill(charon, "unable to create scheduler thread"); - } + this->list = linked_list_create(); + pthread_mutex_init(&this->mutex, NULL); + pthread_cond_init(&this->condvar, NULL); + + this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL); + charon->processor->queue_job(charon->processor, (job_t*)this->job); - return &(this->public); + return &this->public; } + |