diff options
Diffstat (limited to 'src/libstrongswan/processing')
-rw-r--r-- | src/libstrongswan/processing/processor.c | 12 | ||||
-rw-r--r-- | src/libstrongswan/processing/watcher.c | 118 |
2 files changed, 110 insertions, 20 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index 012b169e3..27e5ab5f6 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -467,6 +467,8 @@ METHOD(processor_t, cancel, void, { enumerator_t *enumerator; worker_thread_t *worker; + job_t *job; + int i; this->mutex->lock(this->mutex); this->desired_threads = 0; @@ -496,6 +498,14 @@ METHOD(processor_t, cancel, void, worker->thread->join(worker->thread); free(worker); } + for (i = 0; i < JOB_PRIO_MAX; i++) + { + while (this->jobs[i]->remove_first(this->jobs[i], + (void**)&job) == SUCCESS) + { + job->destroy(job); + } + } this->mutex->unlock(this->mutex); } @@ -510,7 +520,7 @@ METHOD(processor_t, destroy, void, this->mutex->destroy(this->mutex); for (i = 0; i < JOB_PRIO_MAX; i++) { - this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy)); + this->jobs[i]->destroy(this->jobs[i]); } this->threads->destroy(this->threads); free(this); diff --git a/src/libstrongswan/processing/watcher.c b/src/libstrongswan/processing/watcher.c index cc3c3a788..3518dfdae 100644 --- a/src/libstrongswan/processing/watcher.c +++ b/src/libstrongswan/processing/watcher.c @@ -24,7 +24,9 @@ #include <unistd.h> #include <errno.h> +#ifndef WIN32 #include <sys/select.h> +#endif #include <fcntl.h> typedef struct private_watcher_t private_watcher_t; @@ -50,6 +52,11 @@ struct private_watcher_t { bool pending; /** + * Is watcher running? + */ + bool running; + + /** * Lock to access FD list */ mutex_t *mutex; @@ -114,7 +121,14 @@ static void update(private_watcher_t *this) this->pending = TRUE; if (this->notify[1] != -1) { - ignore_result(write(this->notify[1], buf, sizeof(buf))); +#ifdef WIN32 + if (send(this->notify[1], buf, sizeof(buf), 0) == -1) +#else + if (write(this->notify[1], buf, sizeof(buf)) == -1) +#endif + { + DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno)); + } } } @@ -225,6 +239,7 @@ static void activate_all(private_watcher_t *this) entry->in_callback = 0; } enumerator->destroy(enumerator); + this->running = FALSE; this->condvar->broadcast(this->condvar); this->mutex->unlock(this->mutex); } @@ -238,14 +253,17 @@ static job_requeue_t watch(private_watcher_t *this) entry_t *entry; fd_set rd, wr, ex; int maxfd = 0, res; + bool rebuild = FALSE; FD_ZERO(&rd); FD_ZERO(&wr); FD_ZERO(&ex); this->mutex->lock(this->mutex); + if (this->fds->get_count(this->fds) == 0) { + this->running = FALSE; this->mutex->unlock(this->mutex); return JOB_REQUEUE_NONE; } @@ -282,25 +300,44 @@ static job_requeue_t watch(private_watcher_t *this) enumerator->destroy(enumerator); this->mutex->unlock(this->mutex); - while (TRUE) + while (!rebuild) { char buf[1]; bool old; + ssize_t len; job_t *job; DBG2(DBG_JOB, "watcher going to select()"); thread_cleanup_push((void*)activate_all, this); old = thread_cancelability(TRUE); + res = select(maxfd + 1, &rd, &wr, &ex, NULL); thread_cancelability(old); thread_cleanup_pop(FALSE); + if (res > 0) { if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd)) { - DBG2(DBG_JOB, "watcher got notification, rebuilding"); - while (read(this->notify[0], buf, sizeof(buf)) > 0); + while (TRUE) + { +#ifdef WIN32 + len = recv(this->notify[0], buf, sizeof(buf), 0); +#else + len = read(this->notify[0], buf, sizeof(buf)); +#endif + if (len == -1) + { + if (errno != EAGAIN && errno != EWOULDBLOCK) + { + DBG1(DBG_JOB, "reading watcher notify failed: %s", + strerror(errno)); + } + break; + } + } this->pending = FALSE; + DBG2(DBG_JOB, "watcher got notification, rebuilding"); return JOB_REQUEUE_DIRECT; } @@ -308,6 +345,11 @@ static job_requeue_t watch(private_watcher_t *this) enumerator = this->fds->create_enumerator(this->fds); while (enumerator->enumerate(enumerator, &entry)) { + if (entry->in_callback) + { + rebuild = TRUE; + break; + } if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ)) { DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd); @@ -347,6 +389,7 @@ static job_requeue_t watch(private_watcher_t *this) return JOB_REQUEUE_DIRECT; } } + return JOB_REQUEUE_DIRECT; } METHOD(watcher_t, add, void, @@ -364,8 +407,9 @@ METHOD(watcher_t, add, void, this->mutex->lock(this->mutex); this->fds->insert_last(this->fds, entry); - if (this->fds->get_count(this->fds) == 1) + if (!this->running) { + this->running = TRUE; lib->processor->queue_job(lib->processor, (job_t*)callback_job_create_with_prio((void*)watch, this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); @@ -393,7 +437,7 @@ METHOD(watcher_t, remove_, void, { if (entry->fd == fd) { - if (entry->in_callback) + if (this->running && entry->in_callback) { is_in_callback = TRUE; break; @@ -432,13 +476,60 @@ METHOD(watcher_t, destroy, void, free(this); } +#ifdef WIN32 + +/** + * Create notify pipe with a TCP socketpair + */ +static bool create_notify(private_watcher_t *this) +{ + u_long on = 1; + + if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0) + { + /* use non-blocking I/O on read-end of notify pipe */ + if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0) + { + return TRUE; + } + DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking " + "failed: %s", strerror(errno)); + } + return FALSE; +} + +#else /* !WIN32 */ + +/** + * Create a notify pipe with a one-directional pipe + */ +static bool create_notify(private_watcher_t *this) +{ + int flags; + + if (pipe(this->notify) == 0) + { + /* use non-blocking I/O on read-end of notify pipe */ + flags = fcntl(this->notify[0], F_GETFL); + if (flags != -1 && + fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1) + { + return TRUE; + } + DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking " + "failed: %s", strerror(errno)); + } + return FALSE; +} + +#endif /* !WIN32 */ + /** * See header */ watcher_t *watcher_create() { private_watcher_t *this; - int flags; INIT(this, .public = { @@ -453,18 +544,7 @@ watcher_t *watcher_create() .notify = {-1, -1}, ); - if (pipe(this->notify) == 0) - { - /* use non-blocking I/O on read-end of notify pipe */ - flags = fcntl(this->notify[0], F_GETFL); - if (flags == -1 || - fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) == -1) - { - DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking " - "failed: %s", strerror(errno)); - } - } - else + if (!create_notify(this)) { DBG1(DBG_LIB, "creating watcher notify pipe failed: %s", strerror(errno)); |