summaryrefslogtreecommitdiff
path: root/src/libstrongswan/processing/watcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstrongswan/processing/watcher.c')
-rw-r--r--src/libstrongswan/processing/watcher.c462
1 files changed, 462 insertions, 0 deletions
diff --git a/src/libstrongswan/processing/watcher.c b/src/libstrongswan/processing/watcher.c
new file mode 100644
index 000000000..3009be608
--- /dev/null
+++ b/src/libstrongswan/processing/watcher.c
@@ -0,0 +1,462 @@
+/*
+ * Copyright (C) 2013 Martin Willi
+ * Copyright (C) 2013 revosec AG
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include "watcher.h"
+
+#include <library.h>
+#include <threading/thread.h>
+#include <threading/mutex.h>
+#include <threading/condvar.h>
+#include <collections/linked_list.h>
+#include <processing/jobs/callback_job.h>
+
+#include <unistd.h>
+#include <errno.h>
+#include <sys/select.h>
+#include <fcntl.h>
+
+typedef struct private_watcher_t private_watcher_t;
+
+/**
+ * Private data of an watcher_t object.
+ */
+struct private_watcher_t {
+
+ /**
+ * Public watcher_t interface.
+ */
+ watcher_t public;
+
+ /**
+ * List of registered FDs, as entry_t
+ */
+ linked_list_t *fds;
+
+ /**
+ * Lock to access FD list
+ */
+ mutex_t *mutex;
+
+ /**
+ * Condvar to signal completion of callback
+ */
+ condvar_t *condvar;
+
+ /**
+ * Notification pipe to signal watcher thread
+ */
+ int notify[2];
+
+ /**
+ * List of callback jobs to process by watcher thread, as job_t
+ */
+ linked_list_t *jobs;
+};
+
+/**
+ * Entry for a registered file descriptor
+ */
+typedef struct {
+ /** file descriptor */
+ int fd;
+ /** events to watch */
+ watcher_event_t events;
+ /** registered callback function */
+ watcher_cb_t cb;
+ /** user data to pass to callback */
+ void *data;
+ /** callback(s) currently active? */
+ int in_callback;
+} entry_t;
+
+/**
+ * Data we pass on for an async notification
+ */
+typedef struct {
+ /** file descriptor */
+ int fd;
+ /** event type */
+ watcher_event_t event;
+ /** registered callback function */
+ watcher_cb_t cb;
+ /** user data to pass to callback */
+ void *data;
+ /** keep registered? */
+ bool keep;
+ /** reference to watcher */
+ private_watcher_t *this;
+} notify_data_t;
+
+/**
+ * Notify watcher thread about changes
+ */
+static void update(private_watcher_t *this)
+{
+ char buf[1] = { 'u' };
+
+ if (this->notify[1] != -1)
+ {
+ ignore_result(write(this->notify[1], buf, sizeof(buf)));
+ }
+}
+
+/**
+ * Cleanup function if callback gets cancelled
+ */
+static void unregister(notify_data_t *data)
+{
+ /* if a thread processing a callback gets cancelled, we mark the entry
+ * as cancelled, like the callback would return FALSE. This is required
+ * to not queue this watcher again if all threads have been gone. */
+ data->keep = FALSE;
+}
+
+ /**
+ * Execute callback of registered FD, asynchronous
+ */
+static job_requeue_t notify_async(notify_data_t *data)
+{
+ thread_cleanup_push((void*)unregister, data);
+ data->keep = data->cb(data->data, data->fd, data->event);
+ thread_cleanup_pop(FALSE);
+ return JOB_REQUEUE_NONE;
+}
+
+/**
+ * Clean up notification data, reactivate FD
+ */
+static void notify_end(notify_data_t *data)
+{
+ private_watcher_t *this = data->this;
+ enumerator_t *enumerator;
+ entry_t *entry;
+
+ /* reactivate the disabled entry */
+ this->mutex->lock(this->mutex);
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (entry->fd == data->fd)
+ {
+ if (!data->keep)
+ {
+ entry->events &= ~data->event;
+ if (!entry->events)
+ {
+ this->fds->remove_at(this->fds, enumerator);
+ free(entry);
+ break;
+ }
+ }
+ entry->in_callback--;
+ break;
+ }
+ }
+ enumerator->destroy(enumerator);
+
+ update(this);
+ this->condvar->broadcast(this->condvar);
+ this->mutex->unlock(this->mutex);
+
+ free(data);
+}
+
+/**
+ * Execute the callback for a registered FD
+ */
+static void notify(private_watcher_t *this, entry_t *entry,
+ watcher_event_t event)
+{
+ notify_data_t *data;
+
+ /* get a copy of entry for async job, but with specific event */
+ INIT(data,
+ .fd = entry->fd,
+ .event = event,
+ .cb = entry->cb,
+ .data = entry->data,
+ .keep = TRUE,
+ .this = this,
+ );
+
+ /* deactivate entry, so we can select() other FDs even if the async
+ * processing did not handle the event yet */
+ entry->in_callback++;
+
+ this->jobs->insert_last(this->jobs,
+ callback_job_create_with_prio((void*)notify_async, data,
+ (void*)notify_end, (callback_job_cancel_t)return_false,
+ JOB_PRIO_CRITICAL));
+}
+
+/**
+ * Thread cancellation function for watcher thread
+ */
+static void activate_all(private_watcher_t *this)
+{
+ enumerator_t *enumerator;
+ entry_t *entry;
+
+ /* When the watcher thread gets cancelled, we have to reactivate any entry
+ * and signal threads in remove() to go on. */
+
+ this->mutex->lock(this->mutex);
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ entry->in_callback = 0;
+ }
+ enumerator->destroy(enumerator);
+ this->condvar->broadcast(this->condvar);
+ this->mutex->unlock(this->mutex);
+}
+
+/**
+ * Dispatching function
+ */
+static job_requeue_t watch(private_watcher_t *this)
+{
+ enumerator_t *enumerator;
+ entry_t *entry;
+ fd_set rd, wr, ex;
+ int maxfd = 0, res;
+
+ FD_ZERO(&rd);
+ FD_ZERO(&wr);
+ FD_ZERO(&ex);
+
+ this->mutex->lock(this->mutex);
+ if (this->fds->get_count(this->fds) == 0)
+ {
+ this->mutex->unlock(this->mutex);
+ return JOB_REQUEUE_NONE;
+ }
+
+ if (this->notify[0] != -1)
+ {
+ FD_SET(this->notify[0], &rd);
+ maxfd = this->notify[0];
+ }
+
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (!entry->in_callback)
+ {
+ if (entry->events & WATCHER_READ)
+ {
+ DBG3(DBG_JOB, " watching %d for reading", entry->fd);
+ FD_SET(entry->fd, &rd);
+ }
+ if (entry->events & WATCHER_WRITE)
+ {
+ DBG3(DBG_JOB, " watching %d for writing", entry->fd);
+ FD_SET(entry->fd, &wr);
+ }
+ if (entry->events & WATCHER_EXCEPT)
+ {
+ DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
+ FD_SET(entry->fd, &ex);
+ }
+ maxfd = max(maxfd, entry->fd);
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->mutex->unlock(this->mutex);
+
+ while (TRUE)
+ {
+ char buf[1];
+ bool old;
+ job_t *job;
+
+ DBG2(DBG_JOB, "watcher going to select()");
+ thread_cleanup_push((void*)activate_all, this);
+ old = thread_cancelability(TRUE);
+ res = select(maxfd + 1, &rd, &wr, &ex, NULL);
+ thread_cancelability(old);
+ thread_cleanup_pop(FALSE);
+ if (res > 0)
+ {
+ if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
+ {
+ DBG2(DBG_JOB, "watcher got notification, rebuilding");
+ while (read(this->notify[0], buf, sizeof(buf)) > 0);
+ return JOB_REQUEUE_DIRECT;
+ }
+
+ this->mutex->lock(this->mutex);
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
+ {
+ DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
+ notify(this, entry, WATCHER_READ);
+ }
+ if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
+ {
+ DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
+ notify(this, entry, WATCHER_WRITE);
+ }
+ if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
+ {
+ DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
+ notify(this, entry, WATCHER_EXCEPT);
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->mutex->unlock(this->mutex);
+
+ if (this->jobs->get_count(this->jobs))
+ {
+ while (this->jobs->remove_first(this->jobs,
+ (void**)&job) == SUCCESS)
+ {
+ lib->processor->execute_job(lib->processor, job);
+ }
+ /* we temporarily disable a notified FD, rebuild FDSET */
+ return JOB_REQUEUE_DIRECT;
+ }
+ }
+ else
+ {
+ DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
+ }
+ }
+}
+
+METHOD(watcher_t, add, void,
+ private_watcher_t *this, int fd, watcher_event_t events,
+ watcher_cb_t cb, void *data)
+{
+ entry_t *entry;
+
+ INIT(entry,
+ .fd = fd,
+ .events = events,
+ .cb = cb,
+ .data = data,
+ );
+
+ this->mutex->lock(this->mutex);
+ this->fds->insert_last(this->fds, entry);
+ if (this->fds->get_count(this->fds) == 1)
+ {
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((void*)watch, this,
+ NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ }
+ else
+ {
+ update(this);
+ }
+ this->mutex->unlock(this->mutex);
+}
+
+METHOD(watcher_t, remove_, void,
+ private_watcher_t *this, int fd)
+{
+ enumerator_t *enumerator;
+ entry_t *entry;
+
+ this->mutex->lock(this->mutex);
+ while (TRUE)
+ {
+ bool is_in_callback = FALSE;
+
+ enumerator = this->fds->create_enumerator(this->fds);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (entry->fd == fd)
+ {
+ if (entry->in_callback)
+ {
+ is_in_callback = TRUE;
+ break;
+ }
+ this->fds->remove_at(this->fds, enumerator);
+ free(entry);
+ }
+ }
+ enumerator->destroy(enumerator);
+ if (!is_in_callback)
+ {
+ break;
+ }
+ this->condvar->wait(this->condvar, this->mutex);
+ }
+
+ update(this);
+ this->mutex->unlock(this->mutex);
+}
+
+METHOD(watcher_t, destroy, void,
+ private_watcher_t *this)
+{
+ this->mutex->destroy(this->mutex);
+ this->condvar->destroy(this->condvar);
+ this->fds->destroy(this->fds);
+ if (this->notify[0] != -1)
+ {
+ close(this->notify[0]);
+ }
+ if (this->notify[1] != -1)
+ {
+ close(this->notify[1]);
+ }
+ this->jobs->destroy(this->jobs);
+ free(this);
+}
+
+/**
+ * See header
+ */
+watcher_t *watcher_create()
+{
+ private_watcher_t *this;
+ int flags;
+
+ INIT(this,
+ .public = {
+ .add = _add,
+ .remove = _remove_,
+ .destroy = _destroy,
+ },
+ .fds = linked_list_create(),
+ .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
+ .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .jobs = linked_list_create(),
+ .notify = {-1, -1},
+ );
+
+ if (pipe(this->notify) == 0)
+ {
+ /* use non-blocking I/O on read-end of notify pipe */
+ flags = fcntl(this->notify[0], F_GETFL);
+ if (flags == -1 ||
+ fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1)
+ {
+ DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
+ "failed: %s", strerror(errno));
+ }
+ }
+ else
+ {
+ DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
+ strerror(errno));
+ }
+ return &this->public;
+}