summaryrefslogtreecommitdiff
path: root/src/charon/processing/scheduler.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/charon/processing/scheduler.c')
-rw-r--r--src/charon/processing/scheduler.c214
1 files changed, 186 insertions, 28 deletions
diff --git a/src/charon/processing/scheduler.c b/src/charon/processing/scheduler.c
index 7249e43e6..2706585b0 100644
--- a/src/charon/processing/scheduler.c
+++ b/src/charon/processing/scheduler.c
@@ -23,12 +23,39 @@
#include <stdlib.h>
#include <pthread.h>
+#include <sys/time.h>
#include "scheduler.h"
#include <daemon.h>
-#include <processing/job_queue.h>
+#include <processing/processor.h>
+#include <processing/jobs/callback_job.h>
+typedef struct event_t event_t;
+
+/**
+ * Event containing a job and a schedule time
+ */
+struct event_t {
+ /**
+ * Time to fire the event.
+ */
+ timeval_t time;
+
+ /**
+ * Every event has its assigned job.
+ */
+ job_t *job;
+};
+
+/**
+ * destroy an event and its job
+ */
+static void event_destroy(event_t *event)
+{
+ event->job->destroy(event->job);
+ free(event);
+}
typedef struct private_scheduler_t private_scheduler_t;
@@ -42,36 +69,164 @@ struct private_scheduler_t {
scheduler_t public;
/**
- * Assigned thread.
+ * Job wich schedules
*/
- pthread_t assigned_thread;
+ callback_job_t *job;
+
+ /**
+ * The jobs are scheduled in a list.
+ */
+ linked_list_t *list;
+
+ /**
+ * Exclusive access to list
+ */
+ pthread_mutex_t mutex;
+
+ /**
+ * Condvar to wait for next job.
+ */
+ pthread_cond_t condvar;
};
/**
- * Implementation of private_scheduler_t.get_events.
+ * Returns the difference of two timeval structs in milliseconds
+ */
+static long time_difference(timeval_t *end, timeval_t *start)
+{
+ time_t s;
+ suseconds_t us;
+
+ s = end->tv_sec - start->tv_sec;
+ us = end->tv_usec - start->tv_usec;
+ return (s * 1000 + us/1000);
+}
+
+/**
+ * Get events from the queue and pass it to the processor
*/
-static void get_events(private_scheduler_t * this)
+static job_requeue_t schedule(private_scheduler_t * this)
{
- job_t *current_job;
+ timespec_t timeout;
+ timeval_t now;
+ event_t *event;
+ long difference;
+ int oldstate;
+ bool timed = FALSE;
- /* cancellation disabled by default */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ DBG2(DBG_JOB, "waiting for next event...");
+ pthread_mutex_lock(&this->mutex);
- DBG1(DBG_JOB, "scheduler thread running, thread_ID: %06u",
- (int)pthread_self());
+ gettimeofday(&now, NULL);
+
+ if (this->list->get_count(this->list) > 0)
+ {
+ this->list->get_first(this->list, (void **)&event);
+ difference = time_difference(&now, &event->time);
+ if (difference > 0)
+ {
+ DBG2(DBG_JOB, "got event, queueing job for execution");
+ this->list->remove_first(this->list, (void **)&event);
+ pthread_mutex_unlock(&this->mutex);
+ charon->processor->queue_job(charon->processor, event->job);
+ free(event);
+ return JOB_REQUEUE_DIRECT;
+ }
+ timeout.tv_sec = event->time.tv_sec;
+ timeout.tv_nsec = event->time.tv_usec * 1000;
+ timed = TRUE;
+ }
+ pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+
+ if (timed)
+ {
+ pthread_cond_timedwait(&this->condvar, &this->mutex, &timeout);
+ }
+ else
+ {
+ pthread_cond_wait(&this->condvar, &this->mutex);
+ }
+ pthread_setcancelstate(oldstate, NULL);
+ pthread_cleanup_pop(0);
+
+ pthread_mutex_unlock(&this->mutex);
+ return JOB_REQUEUE_DIRECT;
+}
- charon->drop_capabilities(charon, TRUE);
+/**
+ * Implements scheduler_t.get_job_load
+ */
+static u_int get_job_load(private_scheduler_t *this)
+{
+ int count;
+ pthread_mutex_lock(&this->mutex);
+ count = this->list->get_count(this->list);
+ pthread_mutex_unlock(&this->mutex);
+ return count;
+}
- while (TRUE)
+/**
+ * Implements scheduler_t.schedule_job.
+ */
+static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t time)
+{
+ timeval_t now;
+ event_t *event, *current;
+ iterator_t *iterator;
+ time_t s;
+ suseconds_t us;
+
+ event = malloc_thing(event_t);
+ event->job = job;
+
+ /* calculate absolute time */
+ s = time / 1000;
+ us = (time - s * 1000) * 1000;
+ gettimeofday(&now, NULL);
+ event->time.tv_usec = (now.tv_usec + us) % 1000000;
+ event->time.tv_sec = now.tv_sec + (now.tv_usec + us)/1000000 + s;
+
+ pthread_mutex_lock(&this->mutex);
+ while(TRUE)
{
- DBG2(DBG_JOB, "waiting for next event...");
- /* get a job, this block until one is available */
- current_job = charon->event_queue->get(charon->event_queue);
- /* queue the job in the job queue, workers will eat them */
- DBG2(DBG_JOB, "got event, adding job %N to job-queue",
- job_type_names, current_job->get_type(current_job));
- charon->job_queue->add(charon->job_queue, current_job);
+ if (this->list->get_count(this->list) == 0)
+ {
+ this->list->insert_first(this->list,event);
+ break;
+ }
+
+ this->list->get_last(this->list, (void**)&current);
+ if (time_difference(&event->time, &current->time) >= 0)
+ { /* new event has to be fired after the last event in list */
+ this->list->insert_last(this->list, event);
+ break;
+ }
+
+ this->list->get_first(this->list, (void**)&current);
+ if (time_difference(&event->time, &current->time) < 0)
+ { /* new event has to be fired before the first event in list */
+ this->list->insert_first(this->list, event);
+ break;
+ }
+
+ iterator = this->list->create_iterator(this->list, TRUE);
+ /* first element has not to be checked (already done) */
+ iterator->iterate(iterator, (void**)&current);
+ while(iterator->iterate(iterator, (void**)&current))
+ {
+ if (time_difference(&event->time, &current->time) <= 0)
+ {
+ /* new event has to be fired before the current event in list */
+ iterator->insert_before(iterator, event);
+ break;
+ }
+ }
+ iterator->destroy(iterator);
+ break;
}
+ pthread_cond_signal(&this->condvar);
+ pthread_mutex_unlock(&this->mutex);
}
/**
@@ -79,8 +234,8 @@ static void get_events(private_scheduler_t * this)
*/
static void destroy(private_scheduler_t *this)
{
- pthread_cancel(this->assigned_thread);
- pthread_join(this->assigned_thread, NULL);
+ this->job->cancel(this->job);
+ this->list->destroy_function(this->list, (void*)event_destroy);
free(this);
}
@@ -91,14 +246,17 @@ scheduler_t * scheduler_create()
{
private_scheduler_t *this = malloc_thing(private_scheduler_t);
+ this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load;
+ this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job;
this->public.destroy = (void(*)(scheduler_t*)) destroy;
- if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))get_events, this) != 0)
- {
- /* thread could not be created */
- free(this);
- charon->kill(charon, "unable to create scheduler thread");
- }
+ this->list = linked_list_create();
+ pthread_mutex_init(&this->mutex, NULL);
+ pthread_cond_init(&this->condvar, NULL);
+
+ this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
- return &(this->public);
+ return &this->public;
}
+