diff options
Diffstat (limited to 'src/libstrongswan/networking/streams/stream_service.c')
-rw-r--r-- | src/libstrongswan/networking/streams/stream_service.c | 88 |
1 files changed, 79 insertions, 9 deletions
diff --git a/src/libstrongswan/networking/streams/stream_service.c b/src/libstrongswan/networking/streams/stream_service.c index 7358c580e..09138c76a 100644 --- a/src/libstrongswan/networking/streams/stream_service.c +++ b/src/libstrongswan/networking/streams/stream_service.c @@ -68,6 +68,11 @@ struct private_stream_service_t { u_int active; /** + * Currently running jobs + */ + u_int running; + + /** * mutex to lock active counter */ mutex_t *mutex; @@ -76,8 +81,29 @@ struct private_stream_service_t { * Condvar to wait for callback termination */ condvar_t *condvar; + + /** + * TRUE when the service is terminated + */ + bool terminated; + + /** + * Reference counter + */ + refcount_t ref; }; +static void destroy_service(private_stream_service_t *this) +{ + if (ref_put(&this->ref)) + { + close(this->fd); + this->mutex->destroy(this->mutex); + this->condvar->destroy(this->condvar); + free(this); + } +} + /** * Data to pass to async accept job */ @@ -93,6 +119,11 @@ typedef struct { } async_data_t; /** + * Forward declaration + */ +static bool watch(private_stream_service_t *this, int fd, watcher_event_t event); + +/** * Clean up accept data */ static void destroy_async_data(async_data_t *data) @@ -100,14 +131,15 @@ static void destroy_async_data(async_data_t *data) private_stream_service_t *this = data->this; this->mutex->lock(this->mutex); - if (this->active-- == this->cncrncy) + if (this->active-- == this->cncrncy && !this->terminated) { /* leaving concurrency limit, restart accept()ing. */ - this->public.on_accept(&this->public, this->cb, this->data, - this->prio, this->cncrncy); + lib->watcher->add(lib->watcher, this->fd, + WATCHER_READ, (watcher_cb_t)watch, this); } this->condvar->signal(this->condvar); this->mutex->unlock(this->mutex); + destroy_service(this); if (data->fd != -1) { @@ -117,19 +149,45 @@ static void destroy_async_data(async_data_t *data) } /** + * Reduce running counter + */ +CALLBACK(reduce_running, void, + async_data_t *data) +{ + private_stream_service_t *this = data->this; + + this->mutex->lock(this->mutex); + this->running--; + this->condvar->signal(this->condvar); + this->mutex->unlock(this->mutex); +} + +/** * Async processing of accepted connection */ static job_requeue_t accept_async(async_data_t *data) { + private_stream_service_t *this = data->this; stream_t *stream; + this->mutex->lock(this->mutex); + if (this->terminated) + { + this->mutex->unlock(this->mutex); + return JOB_REQUEUE_NONE; + } + this->running++; + this->mutex->unlock(this->mutex); + stream = stream_create_from_fd(data->fd); if (stream) { /* FD is now owned by stream, don't close it during cleanup */ data->fd = -1; + thread_cleanup_push(reduce_running, data); thread_cleanup_push((void*)stream->destroy, stream); thread_cleanup_pop(!data->cb(data->data, stream)); + thread_cleanup_pop(TRUE); } return JOB_REQUEUE_NONE; } @@ -149,7 +207,7 @@ static bool watch(private_stream_service_t *this, int fd, watcher_event_t event) .this = this, ); - if (data->fd != -1) + if (data->fd != -1 && !this->terminated) { this->mutex->lock(this->mutex); if (++this->active == this->cncrncy) @@ -158,6 +216,7 @@ static bool watch(private_stream_service_t *this, int fd, watcher_event_t event) keep = FALSE; } this->mutex->unlock(this->mutex); + ref_get(&this->ref); lib->processor->queue_job(lib->processor, (job_t*)callback_job_create_with_prio((void*)accept_async, data, @@ -177,6 +236,12 @@ METHOD(stream_service_t, on_accept, void, { this->mutex->lock(this->mutex); + if (this->terminated) + { + this->mutex->unlock(this->mutex); + return; + } + /* wait for all callbacks to return */ while (this->active) { @@ -208,11 +273,15 @@ METHOD(stream_service_t, on_accept, void, METHOD(stream_service_t, destroy, void, private_stream_service_t *this) { - on_accept(this, NULL, NULL, this->prio, this->cncrncy); - close(this->fd); - this->mutex->destroy(this->mutex); - this->condvar->destroy(this->condvar); - free(this); + this->mutex->lock(this->mutex); + lib->watcher->remove(lib->watcher, this->fd); + this->terminated = TRUE; + while (this->running) + { + this->condvar->wait(this->condvar, this->mutex); + } + this->mutex->unlock(this->mutex); + destroy_service(this); } /** @@ -231,6 +300,7 @@ stream_service_t *stream_service_create_from_fd(int fd) .prio = JOB_PRIO_MEDIUM, .mutex = mutex_create(MUTEX_TYPE_RECURSIVE), .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), + .ref = 1, ); return &this->public; |