summaryrefslogtreecommitdiff
path: root/src/libstrongswan/processing
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstrongswan/processing')
-rw-r--r--src/libstrongswan/processing/processor.c251
-rw-r--r--src/libstrongswan/processing/processor.h10
-rw-r--r--src/libstrongswan/processing/watcher.c462
-rw-r--r--src/libstrongswan/processing/watcher.h101
4 files changed, 734 insertions, 90 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c
index 934636fc0..adbd95685 100644
--- a/src/libstrongswan/processing/processor.c
+++ b/src/libstrongswan/processing/processor.c
@@ -1,7 +1,7 @@
/*
* Copyright (C) 2005-2011 Martin Willi
* Copyright (C) 2011 revosec AG
- * Copyright (C) 2008-2012 Tobias Brunner
+ * Copyright (C) 2008-2013 Tobias Brunner
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
*
@@ -123,6 +123,7 @@ static void process_jobs(worker_thread_t *worker);
static void restart(worker_thread_t *worker)
{
private_processor_t *this = worker->processor;
+ job_t *job;
DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
@@ -130,8 +131,15 @@ static void restart(worker_thread_t *worker)
/* cleanup worker thread */
this->working_threads[worker->priority]--;
worker->job->status = JOB_STATUS_CANCELED;
- worker->job->destroy(worker->job);
+ job = worker->job;
+ /* unset the job before releasing the mutex, otherwise cancel() might
+ * interfere */
worker->job = NULL;
+ /* release mutex to avoid deadlocks if the same lock is required
+ * during queue_job() and in the destructor called here */
+ this->mutex->unlock(this->mutex);
+ job->destroy(job);
+ this->mutex->lock(this->mutex);
/* respawn thread if required */
if (this->desired_threads >= this->total_threads)
@@ -172,112 +180,150 @@ static u_int get_idle_threads_nolock(private_processor_t *this)
}
/**
- * Process queued jobs, called by the worker threads
+ * Get a job from any job queue, starting with the highest priority.
+ *
+ * this->mutex is expected to be locked.
*/
-static void process_jobs(worker_thread_t *worker)
+static bool get_job(private_processor_t *this, worker_thread_t *worker)
{
- private_processor_t *this = worker->processor;
+ int i, reserved = 0, idle;
- /* worker threads are not cancelable by default */
- thread_cancelability(FALSE);
-
- DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
+ idle = get_idle_threads_nolock(this);
- this->mutex->lock(this->mutex);
- while (this->desired_threads >= this->total_threads)
+ for (i = 0; i < JOB_PRIO_MAX; i++)
{
- int i, reserved = 0, idle;
+ if (reserved && reserved >= idle)
+ {
+ DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
+ "but %d reserved for higher priorities",
+ job_priority_names, i, idle, reserved);
+ /* wait until a job of higher priority gets queued */
+ return FALSE;
+ }
+ if (this->working_threads[i] < this->prio_threads[i])
+ {
+ reserved += this->prio_threads[i] - this->working_threads[i];
+ }
+ if (this->jobs[i]->remove_first(this->jobs[i],
+ (void**)&worker->job) == SUCCESS)
+ {
+ worker->priority = i;
+ return TRUE;
+ }
+ }
+ return FALSE;
+}
- idle = get_idle_threads_nolock(this);
+/**
+ * Process a single job (provided in worker->job, worker->priority is also
+ * expected to be set)
+ *
+ * this->mutex is expected to be locked.
+ */
+static void process_job(private_processor_t *this, worker_thread_t *worker)
+{
+ job_t *to_destroy = NULL;
+ job_requeue_t requeue;
- for (i = 0; i < JOB_PRIO_MAX; i++)
+ this->working_threads[worker->priority]++;
+ worker->job->status = JOB_STATUS_EXECUTING;
+ this->mutex->unlock(this->mutex);
+ /* canceled threads are restarted to get a constant pool */
+ thread_cleanup_push((thread_cleanup_t)restart, worker);
+ while (TRUE)
+ {
+ requeue = worker->job->execute(worker->job);
+ if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
{
- if (reserved && reserved >= idle)
- {
- DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
- "but %d reserved for higher priorities",
- job_priority_names, i, idle, reserved);
+ break;
+ }
+ else if (!worker->job->cancel)
+ { /* only allow cancelable jobs to requeue directly */
+ requeue.type = JOB_REQUEUE_TYPE_FAIR;
+ break;
+ }
+ }
+ thread_cleanup_pop(FALSE);
+ this->mutex->lock(this->mutex);
+ this->working_threads[worker->priority]--;
+ if (worker->job->status == JOB_STATUS_CANCELED)
+ { /* job was canceled via a custom cancel() method or did not
+ * use JOB_REQUEUE_TYPE_DIRECT */
+ to_destroy = worker->job;
+ }
+ else
+ {
+ switch (requeue.type)
+ {
+ case JOB_REQUEUE_TYPE_NONE:
+ worker->job->status = JOB_STATUS_DONE;
+ to_destroy = worker->job;
break;
- }
- if (this->working_threads[i] < this->prio_threads[i])
- {
- reserved += this->prio_threads[i] - this->working_threads[i];
- }
- if (this->jobs[i]->remove_first(this->jobs[i],
- (void**)&worker->job) == SUCCESS)
- {
- job_requeue_t requeue;
-
- this->working_threads[i]++;
- worker->job->status = JOB_STATUS_EXECUTING;
- worker->priority = i;
- this->mutex->unlock(this->mutex);
- /* canceled threads are restarted to get a constant pool */
- thread_cleanup_push((thread_cleanup_t)restart, worker);
- while (TRUE)
- {
- requeue = worker->job->execute(worker->job);
- if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
- {
- break;
- }
- else if (!worker->job->cancel)
- { /* only allow cancelable jobs to requeue directly */
- requeue.type = JOB_REQUEUE_TYPE_FAIR;
- break;
- }
- }
- thread_cleanup_pop(FALSE);
- this->mutex->lock(this->mutex);
- this->working_threads[i]--;
- if (worker->job->status == JOB_STATUS_CANCELED)
- { /* job was canceled via a custom cancel() method or did not
- * use JOB_REQUEUE_TYPE_DIRECT */
- worker->job->destroy(worker->job);
- break;
- }
- switch (requeue.type)
+ case JOB_REQUEUE_TYPE_FAIR:
+ worker->job->status = JOB_STATUS_QUEUED;
+ this->jobs[worker->priority]->insert_last(
+ this->jobs[worker->priority], worker->job);
+ this->job_added->signal(this->job_added);
+ break;
+ case JOB_REQUEUE_TYPE_SCHEDULE:
+ /* scheduler_t does not hold its lock when queuing jobs
+ * so this should be safe without unlocking our mutex */
+ switch (requeue.schedule)
{
- case JOB_REQUEUE_TYPE_NONE:
- worker->job->status = JOB_STATUS_DONE;
- worker->job->destroy(worker->job);
+ case JOB_SCHEDULE:
+ lib->scheduler->schedule_job(lib->scheduler,
+ worker->job, requeue.time.rel);
break;
- case JOB_REQUEUE_TYPE_FAIR:
- worker->job->status = JOB_STATUS_QUEUED;
- this->jobs[i]->insert_last(this->jobs[i],
- worker->job);
- this->job_added->signal(this->job_added);
+ case JOB_SCHEDULE_MS:
+ lib->scheduler->schedule_job_ms(lib->scheduler,
+ worker->job, requeue.time.rel);
break;
- case JOB_REQUEUE_TYPE_SCHEDULE:
- /* scheduler_t does not hold its lock when queeuing jobs
- * so this should be safe without unlocking our mutex */
- switch (requeue.schedule)
- {
- case JOB_SCHEDULE:
- lib->scheduler->schedule_job(lib->scheduler,
- worker->job, requeue.time.rel);
- break;
- case JOB_SCHEDULE_MS:
- lib->scheduler->schedule_job_ms(lib->scheduler,
- worker->job, requeue.time.rel);
- break;
- case JOB_SCHEDULE_TV:
- lib->scheduler->schedule_job_tv(lib->scheduler,
- worker->job, requeue.time.abs);
- break;
- }
- break;
- default:
+ case JOB_SCHEDULE_TV:
+ lib->scheduler->schedule_job_tv(lib->scheduler,
+ worker->job, requeue.time.abs);
break;
}
break;
- }
+ default:
+ break;
+ }
+ }
+ /* unset the current job to avoid interference with cancel() when
+ * destroying the job below */
+ worker->job = NULL;
+
+ if (to_destroy)
+ { /* release mutex to avoid deadlocks if the same lock is required
+ * during queue_job() and in the destructor called here */
+ this->mutex->unlock(this->mutex);
+ to_destroy->destroy(to_destroy);
+ this->mutex->lock(this->mutex);
+ }
+}
+
+/**
+ * Process queued jobs, called by the worker threads
+ */
+static void process_jobs(worker_thread_t *worker)
+{
+ private_processor_t *this = worker->processor;
+
+ /* worker threads are not cancelable by default */
+ thread_cancelability(FALSE);
+
+ DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
+
+ this->mutex->lock(this->mutex);
+ while (this->desired_threads >= this->total_threads)
+ {
+ if (get_job(this, worker))
+ {
+ process_job(this, worker);
}
- if (!worker->job)
+ else
{
this->job_added->wait(this->job_added, this->mutex);
}
- worker->job = NULL;
}
this->total_threads--;
this->thread_terminated->signal(this->thread_terminated);
@@ -355,6 +401,31 @@ METHOD(processor_t, queue_job, void,
this->mutex->unlock(this->mutex);
}
+METHOD(processor_t, execute_job, void,
+ private_processor_t *this, job_t *job)
+{
+ job_priority_t prio;
+ bool queued = FALSE;
+
+ this->mutex->lock(this->mutex);
+ if (this->desired_threads && get_idle_threads_nolock(this))
+ {
+ prio = sane_prio(job->get_priority(job));
+ job->status = JOB_STATUS_QUEUED;
+ /* insert job in front to execute it immediately */
+ this->jobs[prio]->insert_first(this->jobs[prio], job);
+ queued = TRUE;
+ }
+ this->job_added->signal(this->job_added);
+ this->mutex->unlock(this->mutex);
+
+ if (!queued)
+ {
+ job->execute(job);
+ job->destroy(job);
+ }
+}
+
METHOD(processor_t, set_threads, void,
private_processor_t *this, u_int count)
{
@@ -460,6 +531,7 @@ processor_t *processor_create()
.get_working_threads = _get_working_threads,
.get_job_load = _get_job_load,
.queue_job = _queue_job,
+ .execute_job = _execute_job,
.set_threads = _set_threads,
.cancel = _cancel,
.destroy = _destroy,
@@ -479,4 +551,3 @@ processor_t *processor_create()
return &this->public;
}
-
diff --git a/src/libstrongswan/processing/processor.h b/src/libstrongswan/processing/processor.h
index 94860f5d3..f96530e54 100644
--- a/src/libstrongswan/processing/processor.h
+++ b/src/libstrongswan/processing/processor.h
@@ -75,6 +75,16 @@ struct processor_t {
void (*queue_job) (processor_t *this, job_t *job);
/**
+ * Directly execute a job with an idle worker thread.
+ *
+ * If no idle thread is available, the job gets executed by the calling
+ * thread.
+ *
+ * @param job job, gets destroyed
+ */
+ void (*execute_job)(processor_t *this, job_t *job);
+
+ /**
* Set the number of threads to use in the processor.
*
* If the number of threads is smaller than number of currently running
diff --git a/src/libstrongswan/processing/watcher.c b/src/libstrongswan/processing/watcher.c
new file mode 100644
index 000000000..3009be608
--- /dev/null
+++ b/src/libstrongswan/processing/watcher.c
@@ -0,0 +1,462 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include "watcher.h"
+
+#include <library.h>
+#include <threading/thread.h>
+#include <threading/mutex.h>
+#include <threading/condvar.h>
+#include <collections/linked_list.h>
+#include <processing/jobs/callback_job.h>
+
+#include <unistd.h>
+#include <errno.h>
+#include <sys/select.h>
+#include <fcntl.h>
+
+typedef struct private_watcher_t private_watcher_t;
+
+/**
+ * Private data of an watcher_t object.
+ */
+struct private_watcher_t {
+
+ /**
+ * Public watcher_t interface.
+ */
+ watcher_t public;
+
+ /**
+ * List of registered FDs, as entry_t
+ */
+ linked_list_t *fds;
+
+ /**
+ * Lock to access FD list
+ */
+ mutex_t *mutex;
+
+ /**
+ * Condvar to signal completion of callback
+ */
+ condvar_t *condvar;
+
+ /**
+ * Notification pipe to signal watcher thread
+ */
+ int notify[2];
+
+ /**
+ * List of callback jobs to process by watcher thread, as job_t
+ */
+ linked_list_t *jobs;
+};
+
+/**
+ * Entry for a registered file descriptor
+ */
+typedef struct {
+ /** file descriptor */
+ int fd;
+ /** events to watch */
+ watcher_event_t events;
+ /** registered callback function */
+ watcher_cb_t cb;
+ /** user data to pass to callback */
+ void *data;
+ /** callback(s) currently active? */
+ int in_callback;
+} entry_t;
+
+/**
+ * Data we pass on for an async notification
+ */
+typedef struct {
+ /** file descriptor */
+ int fd;
+ /** event type */
+ watcher_event_t event;
+ /** registered callback function */
+ watcher_cb_t cb;
+ /** user data to pass to callback */
+ void *data;
+ /** keep registered? */
+ bool keep;
+ /** reference to watcher */
+ private_watcher_t *this;
+} notify_data_t;
+
+/**
+ * Notify watcher thread about changes
+ */
+static void update(private_watcher_t *this)
+{
+ char buf[1] = { 'u' };
+
+ if (this->notify[1] != -1)
+ {
+ ignore_result(write(this->notify[1], buf, sizeof(buf)));
+ }
+}
+
+/**
+ * Cleanup function if callback gets cancelled
+ */
+static void unregister(notify_data_t *data)
+{
+ /* if a thread processing a callback gets cancelled, we mark the entry
+ * as cancelled, like the callback would return FALSE. This is required
+ * to not queue this watcher again if all threads have been gone. */
+ data->keep = FALSE;
+}
+
+ /**
+ * Execute callback of registered FD, asynchronous
+ */
+static job_requeue_t notify_async(notify_data_t *data)
+{
+ thread_cleanup_push((void*)unregister, data);
+ data->keep = data->cb(data->data, data->fd, data->event);
+ thread_cleanup_pop(FALSE);
+ return JOB_REQUEUE_NONE;
+}
+
+/**
+ * Clean up notification data, reactivate FD
+ */
+static void notify_end(notify_data_t *data)
+{
+ private_watcher_t *this = data->this;
+ enumerator_t *enumerator;
+ entry_t *entry;
+
+ /* reactivate the disabled entry */
+ this->mutex->lock(this->mutex);
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (entry->fd == data->fd)
+ {
+ if (!data->keep)
+ {
+ entry->events &= ~data->event;
+ if (!entry->events)
+ {
+ this->fds->remove_at(this->fds, enumerator);
+ free(entry);
+ break;
+ }
+ }
+ entry->in_callback--;
+ break;
+ }
+ }
+ enumerator->destroy(enumerator);
+
+ update(this);
+ this->condvar->broadcast(this->condvar);
+ this->mutex->unlock(this->mutex);
+
+ free(data);
+}
+
+/**
+ * Execute the callback for a registered FD
+ */
+static void notify(private_watcher_t *this, entry_t *entry,
+ watcher_event_t event)
+{
+ notify_data_t *data;
+
+ /* get a copy of entry for async job, but with specific event */
+ INIT(data,
+ .fd = entry->fd,
+ .event = event,
+ .cb = entry->cb,
+ .data = entry->data,
+ .keep = TRUE,
+ .this = this,
+ );
+
+ /* deactivate entry, so we can select() other FDs even if the async
+ * processing did not handle the event yet */
+ entry->in_callback++;
+
+ this->jobs->insert_last(this->jobs,
+ callback_job_create_with_prio((void*)notify_async, data,
+ (void*)notify_end, (callback_job_cancel_t)return_false,
+ JOB_PRIO_CRITICAL));
+}
+
+/**
+ * Thread cancellation function for watcher thread
+ */
+static void activate_all(private_watcher_t *this)
+{
+ enumerator_t *enumerator;
+ entry_t *entry;
+
+ /* When the watcher thread gets cancelled, we have to reactivate any entry
+ * and signal threads in remove() to go on. */
+
+ this->mutex->lock(this->mutex);
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ entry->in_callback = 0;
+ }
+ enumerator->destroy(enumerator);
+ this->condvar->broadcast(this->condvar);
+ this->mutex->unlock(this->mutex);
+}
+
+/**
+ * Dispatching function
+ */
+static job_requeue_t watch(private_watcher_t *this)
+{
+ enumerator_t *enumerator;
+ entry_t *entry;
+ fd_set rd, wr, ex;
+ int maxfd = 0, res;
+
+ FD_ZERO(&rd);
+ FD_ZERO(&wr);
+ FD_ZERO(&ex);
+
+ this->mutex->lock(this->mutex);
+ if (this->fds->get_count(this->fds) == 0)
+ {
+ this->mutex->unlock(this->mutex);
+ return JOB_REQUEUE_NONE;
+ }
+
+ if (this->notify[0] != -1)
+ {
+ FD_SET(this->notify[0], &rd);
+ maxfd = this->notify[0];
+ }
+
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (!entry->in_callback)
+ {
+ if (entry->events & WATCHER_READ)
+ {
+ DBG3(DBG_JOB, " watching %d for reading", entry->fd);
+ FD_SET(entry->fd, &rd);
+ }
+ if (entry->events & WATCHER_WRITE)
+ {
+ DBG3(DBG_JOB, " watching %d for writing", entry->fd);
+ FD_SET(entry->fd, &wr);
+ }
+ if (entry->events & WATCHER_EXCEPT)
+ {
+ DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
+ FD_SET(entry->fd, &ex);
+ }
+ maxfd = max(maxfd, entry->fd);
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->mutex->unlock(this->mutex);
+
+ while (TRUE)
+ {
+ char buf[1];
+ bool old;
+ job_t *job;
+
+ DBG2(DBG_JOB, "watcher going to select()");
+ thread_cleanup_push((void*)activate_all, this);
+ old = thread_cancelability(TRUE);
+ res = select(maxfd + 1, &rd, &wr, &ex, NULL);
+ thread_cancelability(old);
+ thread_cleanup_pop(FALSE);
+ if (res > 0)
+ {
+ if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
+ {
+ DBG2(DBG_JOB, "watcher got notification, rebuilding");
+ while (read(this->notify[0], buf, sizeof(buf)) > 0);
+ return JOB_REQUEUE_DIRECT;
+ }
+
+ this->mutex->lock(this->mutex);
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
+ {
+ DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
+ notify(this, entry, WATCHER_READ);
+ }
+ if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
+ {
+ DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
+ notify(this, entry, WATCHER_WRITE);
+ }
+ if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
+ {
+ DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
+ notify(this, entry, WATCHER_EXCEPT);
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->mutex->unlock(this->mutex);
+
+ if (this->jobs->get_count(this->jobs))
+ {
+ while (this->jobs->remove_first(this->jobs,
+ (void**)&job) == SUCCESS)
+ {
+ lib->processor->execute_job(lib->processor, job);
+ }
+ /* we temporarily disable a notified FD, rebuild FDSET */
+ return JOB_REQUEUE_DIRECT;
+ }
+ }
+ else
+ {
+ DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
+ }
+ }
+}
+
+METHOD(watcher_t, add, void,
+ private_watcher_t *this, int fd, watcher_event_t events,
+ watcher_cb_t cb, void *data)
+{
+ entry_t *entry;
+
+ INIT(entry,
+ .fd = fd,
+ .events = events,
+ .cb = cb,
+ .data = data,
+ );
+
+ this->mutex->lock(this->mutex);
+ this->fds->insert_last(this->fds, entry);
+ if (this->fds->get_count(this->fds) == 1)
+ {
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((void*)watch, this,
+ NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ }
+ else
+ {
+ update(this);
+ }
+ this->mutex->unlock(this->mutex);
+}
+
+METHOD(watcher_t, remove_, void,
+ private_watcher_t *this, int fd)
+{
+ enumerator_t *enumerator;
+ entry_t *entry;
+
+ this->mutex->lock(this->mutex);
+ while (TRUE)
+ {
+ bool is_in_callback = FALSE;
+
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (entry->fd == fd)
+ {
+ if (entry->in_callback)
+ {
+ is_in_callback = TRUE;
+ break;
+ }
+ this->fds->remove_at(this->fds, enumerator);
+ free(entry);
+ }
+ }
+ enumerator->destroy(enumerator);
+ if (!is_in_callback)
+ {
+ break;
+ }
+ this->condvar->wait(this->condvar, this->mutex);
+ }
+
+ update(this);
+ this->mutex->unlock(this->mutex);
+}
+
+METHOD(watcher_t, destroy, void,
+ private_watcher_t *this)
+{
+ this->mutex->destroy(this->mutex);
+ this->condvar->destroy(this->condvar);
+ this->fds->destroy(this->fds);
+ if (this->notify[0] != -1)
+ {
+ close(this->notify[0]);
+ }
+ if (this->notify[1] != -1)
+ {
+ close(this->notify[1]);
+ }
+ this->jobs->destroy(this->jobs);
+ free(this);
+}
+
+/**
+ * See header
+ */
+watcher_t *watcher_create()
+{
+ private_watcher_t *this;
+ int flags;
+
+ INIT(this,
+ .public = {
+ .add = _add,
+ .remove = _remove_,
+ .destroy = _destroy,
+ },
+ .fds = linked_list_create(),
+ .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
+ .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .jobs = linked_list_create(),
+ .notify = {-1, -1},
+ );
+
+ if (pipe(this->notify) == 0)
+ {
+ /* use non-blocking I/O on read-end of notify pipe */
+ flags = fcntl(this->notify[0], F_GETFL);
+ if (flags == -1 ||
+ fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1)
+ {
+ DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
+ "failed: %s", strerror(errno));
+ }
+ }
+ else
+ {
+ DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
+ strerror(errno));
+ }
+ return &this->public;
+}
diff --git a/src/libstrongswan/processing/watcher.h b/src/libstrongswan/processing/watcher.h
new file mode 100644
index 000000000..6e158cec2
--- /dev/null
+++ b/src/libstrongswan/processing/watcher.h
@@ -0,0 +1,101 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+/**
+ * @defgroup watcher watcher
+ * @{ @ingroup processor
+ */
+
+#ifndef WATCHER_H_
+#define WATCHER_H_
+
+typedef struct watcher_t watcher_t;
+typedef enum watcher_event_t watcher_event_t;
+
+#include <library.h>
+
+/**
+ * Callback function to register for file descriptor events.
+ *
+ * The callback is executed asynchronously using a thread from the pool.
+ * Monitoring of fd is temporarily suspended to avoid additional events while
+ * it is processed asynchronously. To allow concurrent events, one can quickly
+ * process it (using a read/write) and return from the callback. This will
+ * re-enable the event, while the data read can be processed in another
+ * asynchronous job.
+ *
+ * On Linux, even if select() marks an FD as "ready", a subsequent read/write
+ * can block. It is therefore highly recommended to use non-blocking I/O
+ * and handle EAGAIN/EWOULDBLOCK gracefully.
+ *
+ * @param data user data passed during registration
+ * @param fd file descriptor the event occurred on
+ * @param event type of event
+ * @return TRUE to keep watching event, FALSE to unregister fd for event
+ */
+typedef bool (*watcher_cb_t)(void *data, int fd, watcher_event_t event);
+
+/**
+ * What events to watch for a file descriptor.
+ */
+enum watcher_event_t {
+ WATCHER_READ = (1<<0),
+ WATCHER_WRITE = (1<<1),
+ WATCHER_EXCEPT = (1<<2),
+};
+
+/**
+ * Watch multiple file descriptors using select().
+ */
+struct watcher_t {
+
+ /**
+ * Start watching a new file descriptor.
+ *
+ * Multiple callbacks can be registered for the same file descriptor, and
+ * all of them get notified. Such callbacks are executed concurrently.
+ *
+ * @param fd file descriptor to start watching
+ * @param events ORed set of events to watch
+ * @param cb callback function to invoke on events
+ * @param data data to pass to cb()
+ */
+ void (*add)(watcher_t *this, int fd, watcher_event_t events,
+ watcher_cb_t cb, void *data);
+
+ /**
+ * Stop watching a previously registered file descriptor.
+ *
+ * This call blocks until any active callback for this FD returns. All
+ * callbacks registered for that FD get unregistered.
+ *
+ * @param fd file descriptor to stop watching
+ */
+ void (*remove)(watcher_t *this, int fd);
+
+ /**
+ * Destroy a watcher_t.
+ */
+ void (*destroy)(watcher_t *this);
+};
+
+/**
+ * Create a watcher instance.
+ *
+ * @return watcher
+ */
+watcher_t *watcher_create();
+
+#endif /** WATCHER_H_ @}*/