summaryrefslogtreecommitdiff
path: root/src/libstrongswan/processing
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstrongswan/processing')
-rw-r--r--src/libstrongswan/processing/processor.h2
-rw-r--r--src/libstrongswan/processing/scheduler.c19
-rw-r--r--src/libstrongswan/processing/scheduler.h7
-rw-r--r--src/libstrongswan/processing/watcher.c113
4 files changed, 94 insertions, 47 deletions
diff --git a/src/libstrongswan/processing/processor.h b/src/libstrongswan/processing/processor.h
index f96530e54..ee08870fb 100644
--- a/src/libstrongswan/processing/processor.h
+++ b/src/libstrongswan/processing/processor.h
@@ -23,6 +23,8 @@
#ifndef PROCESSOR_H_
#define PROCESSOR_H_
+#include <utils/utils.h>
+
typedef struct processor_t processor_t;
#include <stdlib.h>
diff --git a/src/libstrongswan/processing/scheduler.c b/src/libstrongswan/processing/scheduler.c
index 3f1598fc4..d90852561 100644
--- a/src/libstrongswan/processing/scheduler.c
+++ b/src/libstrongswan/processing/scheduler.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2008 Tobias Brunner
+ * Copyright (C) 2008-2015 Tobias Brunner
* Copyright (C) 2005-2006 Martin Willi
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
@@ -300,16 +300,26 @@ METHOD(scheduler_t, schedule_job_ms, void,
schedule_job_tv(this, job, tv);
}
-METHOD(scheduler_t, destroy, void,
+METHOD(scheduler_t, flush, void,
private_scheduler_t *this)
{
event_t *event;
- this->condvar->destroy(this->condvar);
- this->mutex->destroy(this->mutex);
+
+ this->mutex->lock(this->mutex);
while ((event = remove_event(this)) != NULL)
{
event_destroy(event);
}
+ this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
+}
+
+METHOD(scheduler_t, destroy, void,
+ private_scheduler_t *this)
+{
+ flush(this);
+ this->condvar->destroy(this->condvar);
+ this->mutex->destroy(this->mutex);
free(this->heap);
free(this);
}
@@ -328,6 +338,7 @@ scheduler_t * scheduler_create()
.schedule_job = _schedule_job,
.schedule_job_ms = _schedule_job_ms,
.schedule_job_tv = _schedule_job_tv,
+ .flush = _flush,
.destroy = _destroy,
},
.heap_size = HEAP_SIZE_DEFAULT,
diff --git a/src/libstrongswan/processing/scheduler.h b/src/libstrongswan/processing/scheduler.h
index abbf74e2c..7f91fcc59 100644
--- a/src/libstrongswan/processing/scheduler.h
+++ b/src/libstrongswan/processing/scheduler.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2009 Tobias Brunner
+ * Copyright (C) 2009-2015 Tobias Brunner
* Copyright (C) 2005-2007 Martin Willi
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
@@ -115,6 +115,11 @@ struct scheduler_t {
u_int (*get_job_load) (scheduler_t *this);
/**
+ * Remove all scheduled jobs.
+ */
+ void (*flush)(scheduler_t *this);
+
+ /**
* Destroys a scheduler object.
*/
void (*destroy) (scheduler_t *this);
diff --git a/src/libstrongswan/processing/watcher.c b/src/libstrongswan/processing/watcher.c
index d4de2a907..5b94208bf 100644
--- a/src/libstrongswan/processing/watcher.c
+++ b/src/libstrongswan/processing/watcher.c
@@ -24,9 +24,6 @@
#include <unistd.h>
#include <errno.h>
-#ifndef WIN32
-#include <sys/select.h>
-#endif
#include <fcntl.h>
typedef struct private_watcher_t private_watcher_t;
@@ -121,11 +118,7 @@ static void update(private_watcher_t *this)
this->pending = TRUE;
if (this->notify[1] != -1)
{
-#ifdef WIN32
- if (send(this->notify[1], buf, sizeof(buf), 0) == -1)
-#else
if (write(this->notify[1], buf, sizeof(buf)) == -1)
-#endif
{
DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
}
@@ -245,23 +238,57 @@ static void activate_all(private_watcher_t *this)
}
/**
+ * Find flagged revents in a pollfd set by fd
+ */
+static int find_revents(struct pollfd *pfd, int count, int fd)
+{
+ int i;
+
+ for (i = 0; i < count; i++)
+ {
+ if (pfd[i].fd == fd)
+ {
+ return pfd[i].revents;
+ }
+ }
+ return 0;
+}
+
+/**
+ * Check if entry is waiting for a specific event, and if it got signaled
+ */
+static bool entry_ready(entry_t *entry, watcher_event_t event, int revents)
+{
+ if (entry->events & event)
+ {
+ switch (event)
+ {
+ case WATCHER_READ:
+ return (revents & (POLLIN | POLLHUP | POLLNVAL)) != 0;
+ case WATCHER_WRITE:
+ return (revents & (POLLOUT | POLLHUP | POLLNVAL)) != 0;
+ case WATCHER_EXCEPT:
+ return (revents & (POLLERR | POLLHUP | POLLNVAL)) != 0;
+ }
+ }
+ return FALSE;
+}
+
+/**
* Dispatching function
*/
static job_requeue_t watch(private_watcher_t *this)
{
enumerator_t *enumerator;
entry_t *entry;
- fd_set rd, wr, ex;
- int maxfd = 0, res;
+ struct pollfd *pfd;
+ int count = 0, res;
bool rebuild = FALSE;
- FD_ZERO(&rd);
- FD_ZERO(&wr);
- FD_ZERO(&ex);
-
this->mutex->lock(this->mutex);
- if (this->fds->get_count(this->fds) == 0)
+ count = this->fds->get_count(this->fds);
+ if (count == 0)
{
this->state = WATCHER_STOPPED;
this->mutex->unlock(this->mutex);
@@ -272,33 +299,34 @@ static job_requeue_t watch(private_watcher_t *this)
this->state = WATCHER_RUNNING;
}
- if (this->notify[0] != -1)
- {
- FD_SET(this->notify[0], &rd);
- maxfd = this->notify[0];
- }
+ pfd = alloca(sizeof(*pfd) * (count + 1));
+ pfd[0].fd = this->notify[0];
+ pfd[0].events = POLLIN;
+ count = 1;
enumerator = this->fds->create_enumerator(this->fds);
while (enumerator->enumerate(enumerator, &entry))
{
if (!entry->in_callback)
{
+ pfd[count].fd = entry->fd;
+ pfd[count].events = 0;
if (entry->events & WATCHER_READ)
{
DBG3(DBG_JOB, " watching %d for reading", entry->fd);
- FD_SET(entry->fd, &rd);
+ pfd[count].events |= POLLIN;
}
if (entry->events & WATCHER_WRITE)
{
DBG3(DBG_JOB, " watching %d for writing", entry->fd);
- FD_SET(entry->fd, &wr);
+ pfd[count].events |= POLLOUT;
}
if (entry->events & WATCHER_EXCEPT)
{
DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
- FD_SET(entry->fd, &ex);
+ pfd[count].events |= POLLERR;
}
- maxfd = max(maxfd, entry->fd);
+ count++;
}
}
enumerator->destroy(enumerator);
@@ -306,30 +334,27 @@ static job_requeue_t watch(private_watcher_t *this)
while (!rebuild)
{
+ int revents;
char buf[1];
bool old;
ssize_t len;
job_t *job;
- DBG2(DBG_JOB, "watcher going to select()");
+ DBG2(DBG_JOB, "watcher going to poll() %d fds", count);
thread_cleanup_push((void*)activate_all, this);
old = thread_cancelability(TRUE);
- res = select(maxfd + 1, &rd, &wr, &ex, NULL);
+ res = poll(pfd, count, -1);
thread_cancelability(old);
thread_cleanup_pop(FALSE);
if (res > 0)
{
- if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd))
+ if (pfd[0].revents & POLLIN)
{
while (TRUE)
{
-#ifdef WIN32
- len = recv(this->notify[0], buf, sizeof(buf), 0);
-#else
len = read(this->notify[0], buf, sizeof(buf));
-#endif
if (len == -1)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
@@ -354,21 +379,25 @@ static job_requeue_t watch(private_watcher_t *this)
rebuild = TRUE;
break;
}
- if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
- {
- DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
- notify(this, entry, WATCHER_READ);
- }
- if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
- {
- DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
- notify(this, entry, WATCHER_WRITE);
- }
- if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
+ revents = find_revents(pfd, count, entry->fd);
+ if (entry_ready(entry, WATCHER_EXCEPT, revents))
{
DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
notify(this, entry, WATCHER_EXCEPT);
}
+ else
+ {
+ if (entry_ready(entry, WATCHER_READ, revents))
+ {
+ DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
+ notify(this, entry, WATCHER_READ);
+ }
+ if (entry_ready(entry, WATCHER_WRITE, revents))
+ {
+ DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
+ notify(this, entry, WATCHER_WRITE);
+ }
+ }
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
@@ -388,7 +417,7 @@ static job_requeue_t watch(private_watcher_t *this)
{
if (!this->pending && errno != EINTR)
{ /* complain only if no pending updates */
- DBG1(DBG_JOB, "watcher select() error: %s", strerror(errno));
+ DBG1(DBG_JOB, "watcher poll() error: %s", strerror(errno));
}
return JOB_REQUEUE_DIRECT;
}