summaryrefslogtreecommitdiff
path: root/src/libstrongswan/processing
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstrongswan/processing')
-rw-r--r--src/libstrongswan/processing/processor.c12
-rw-r--r--src/libstrongswan/processing/watcher.c118
2 files changed, 110 insertions, 20 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c
index 012b169e3..27e5ab5f6 100644
--- a/src/libstrongswan/processing/processor.c
+++ b/src/libstrongswan/processing/processor.c
@@ -467,6 +467,8 @@ METHOD(processor_t, cancel, void,
{
enumerator_t *enumerator;
worker_thread_t *worker;
+ job_t *job;
+ int i;
this->mutex->lock(this->mutex);
this->desired_threads = 0;
@@ -496,6 +498,14 @@ METHOD(processor_t, cancel, void,
worker->thread->join(worker->thread);
free(worker);
}
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ while (this->jobs[i]->remove_first(this->jobs[i],
+ (void**)&job) == SUCCESS)
+ {
+ job->destroy(job);
+ }
+ }
this->mutex->unlock(this->mutex);
}
@@ -510,7 +520,7 @@ METHOD(processor_t, destroy, void,
this->mutex->destroy(this->mutex);
for (i = 0; i < JOB_PRIO_MAX; i++)
{
- this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy));
+ this->jobs[i]->destroy(this->jobs[i]);
}
this->threads->destroy(this->threads);
free(this);
diff --git a/src/libstrongswan/processing/watcher.c b/src/libstrongswan/processing/watcher.c
index cc3c3a788..3518dfdae 100644
--- a/src/libstrongswan/processing/watcher.c
+++ b/src/libstrongswan/processing/watcher.c
@@ -24,7 +24,9 @@
#include <unistd.h>
#include <errno.h>
+#ifndef WIN32
#include <sys/select.h>
+#endif
#include <fcntl.h>
typedef struct private_watcher_t private_watcher_t;
@@ -50,6 +52,11 @@ struct private_watcher_t {
bool pending;
/**
+ * Is watcher running?
+ */
+ bool running;
+
+ /**
* Lock to access FD list
*/
mutex_t *mutex;
@@ -114,7 +121,14 @@ static void update(private_watcher_t *this)
this->pending = TRUE;
if (this->notify[1] != -1)
{
- ignore_result(write(this->notify[1], buf, sizeof(buf)));
+#ifdef WIN32
+ if (send(this->notify[1], buf, sizeof(buf), 0) == -1)
+#else
+ if (write(this->notify[1], buf, sizeof(buf)) == -1)
+#endif
+ {
+ DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
+ }
}
}
@@ -225,6 +239,7 @@ static void activate_all(private_watcher_t *this)
entry->in_callback = 0;
}
enumerator->destroy(enumerator);
+ this->running = FALSE;
this->condvar->broadcast(this->condvar);
this->mutex->unlock(this->mutex);
}
@@ -238,14 +253,17 @@ static job_requeue_t watch(private_watcher_t *this)
entry_t *entry;
fd_set rd, wr, ex;
int maxfd = 0, res;
+ bool rebuild = FALSE;
FD_ZERO(&rd);
FD_ZERO(&wr);
FD_ZERO(&ex);
this->mutex->lock(this->mutex);
+
if (this->fds->get_count(this->fds) == 0)
{
+ this->running = FALSE;
this->mutex->unlock(this->mutex);
return JOB_REQUEUE_NONE;
}
@@ -282,25 +300,44 @@ static job_requeue_t watch(private_watcher_t *this)
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
- while (TRUE)
+ while (!rebuild)
{
char buf[1];
bool old;
+ ssize_t len;
job_t *job;
DBG2(DBG_JOB, "watcher going to select()");
thread_cleanup_push((void*)activate_all, this);
old = thread_cancelability(TRUE);
+
res = select(maxfd + 1, &rd, &wr, &ex, NULL);
thread_cancelability(old);
thread_cleanup_pop(FALSE);
+
if (res > 0)
{
if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
{
- DBG2(DBG_JOB, "watcher got notification, rebuilding");
- while (read(this->notify[0], buf, sizeof(buf)) > 0);
+ while (TRUE)
+ {
+#ifdef WIN32
+ len = recv(this->notify[0], buf, sizeof(buf), 0);
+#else
+ len = read(this->notify[0], buf, sizeof(buf));
+#endif
+ if (len == -1)
+ {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ {
+ DBG1(DBG_JOB, "reading watcher notify failed: %s",
+ strerror(errno));
+ }
+ break;
+ }
+ }
this->pending = FALSE;
+ DBG2(DBG_JOB, "watcher got notification, rebuilding");
return JOB_REQUEUE_DIRECT;
}
@@ -308,6 +345,11 @@ static job_requeue_t watch(private_watcher_t *this)
enumerator = this->fds->create_enumerator(this->fds);
while (enumerator->enumerate(enumerator, &entry))
{
+ if (entry->in_callback)
+ {
+ rebuild = TRUE;
+ break;
+ }
if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
{
DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
@@ -347,6 +389,7 @@ static job_requeue_t watch(private_watcher_t *this)
return JOB_REQUEUE_DIRECT;
}
}
+ return JOB_REQUEUE_DIRECT;
}
METHOD(watcher_t, add, void,
@@ -364,8 +407,9 @@ METHOD(watcher_t, add, void,
this->mutex->lock(this->mutex);
this->fds->insert_last(this->fds, entry);
- if (this->fds->get_count(this->fds) == 1)
+ if (!this->running)
{
+ this->running = TRUE;
lib->processor->queue_job(lib->processor,
(job_t*)callback_job_create_with_prio((void*)watch, this,
NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
@@ -393,7 +437,7 @@ METHOD(watcher_t, remove_, void,
{
if (entry->fd == fd)
{
- if (entry->in_callback)
+ if (this->running && entry->in_callback)
{
is_in_callback = TRUE;
break;
@@ -432,13 +476,60 @@ METHOD(watcher_t, destroy, void,
free(this);
}
+#ifdef WIN32
+
+/**
+ * Create notify pipe with a TCP socketpair
+ */
+static bool create_notify(private_watcher_t *this)
+{
+ u_long on = 1;
+
+ if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
+ {
+ /* use non-blocking I/O on read-end of notify pipe */
+ if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
+ {
+ return TRUE;
+ }
+ DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
+ "failed: %s", strerror(errno));
+ }
+ return FALSE;
+}
+
+#else /* !WIN32 */
+
+/**
+ * Create a notify pipe with a one-directional pipe
+ */
+static bool create_notify(private_watcher_t *this)
+{
+ int flags;
+
+ if (pipe(this->notify) == 0)
+ {
+ /* use non-blocking I/O on read-end of notify pipe */
+ flags = fcntl(this->notify[0], F_GETFL);
+ if (flags != -1 &&
+ fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
+ {
+ return TRUE;
+ }
+ DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
+ "failed: %s", strerror(errno));
+ }
+ return FALSE;
+}
+
+#endif /* !WIN32 */
+
/**
* See header
*/
watcher_t *watcher_create()
{
private_watcher_t *this;
- int flags;
INIT(this,
.public = {
@@ -453,18 +544,7 @@ watcher_t *watcher_create()
.notify = {-1, -1},
);
- if (pipe(this->notify) == 0)
- {
- /* use non-blocking I/O on read-end of notify pipe */
- flags = fcntl(this->notify[0], F_GETFL);
- if (flags == -1 ||
- fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1)
- {
- DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
- "failed: %s", strerror(errno));
- }
- }
- else
+ if (!create_notify(this))
{
DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
strerror(errno));