summaryrefslogtreecommitdiff
path: root/src/libstrongswan/networking/streams/stream_service.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstrongswan/networking/streams/stream_service.c')
-rw-r--r--src/libstrongswan/networking/streams/stream_service.c88
1 files changed, 79 insertions, 9 deletions
diff --git a/src/libstrongswan/networking/streams/stream_service.c b/src/libstrongswan/networking/streams/stream_service.c
index 7358c580e..09138c76a 100644
--- a/src/libstrongswan/networking/streams/stream_service.c
+++ b/src/libstrongswan/networking/streams/stream_service.c
@@ -68,6 +68,11 @@ struct private_stream_service_t {
u_int active;
/**
+ * Currently running jobs
+ */
+ u_int running;
+
+ /**
* mutex to lock active counter
*/
mutex_t *mutex;
@@ -76,8 +81,29 @@ struct private_stream_service_t {
* Condvar to wait for callback termination
*/
condvar_t *condvar;
+
+ /**
+ * TRUE when the service is terminated
+ */
+ bool terminated;
+
+ /**
+ * Reference counter
+ */
+ refcount_t ref;
};
+static void destroy_service(private_stream_service_t *this)
+{
+ if (ref_put(&this->ref))
+ {
+ close(this->fd);
+ this->mutex->destroy(this->mutex);
+ this->condvar->destroy(this->condvar);
+ free(this);
+ }
+}
+
/**
* Data to pass to async accept job
*/
@@ -93,6 +119,11 @@ typedef struct {
} async_data_t;
/**
+ * Forward declaration
+ */
+static bool watch(private_stream_service_t *this, int fd, watcher_event_t event);
+
+/**
* Clean up accept data
*/
static void destroy_async_data(async_data_t *data)
@@ -100,14 +131,15 @@ static void destroy_async_data(async_data_t *data)
private_stream_service_t *this = data->this;
this->mutex->lock(this->mutex);
- if (this->active-- == this->cncrncy)
+ if (this->active-- == this->cncrncy && !this->terminated)
{
/* leaving concurrency limit, restart accept()ing. */
- this->public.on_accept(&this->public, this->cb, this->data,
- this->prio, this->cncrncy);
+ lib->watcher->add(lib->watcher, this->fd,
+ WATCHER_READ, (watcher_cb_t)watch, this);
}
this->condvar->signal(this->condvar);
this->mutex->unlock(this->mutex);
+ destroy_service(this);
if (data->fd != -1)
{
@@ -117,19 +149,45 @@ static void destroy_async_data(async_data_t *data)
}
/**
+ * Reduce running counter
+ */
+CALLBACK(reduce_running, void,
+ async_data_t *data)
+{
+ private_stream_service_t *this = data->this;
+
+ this->mutex->lock(this->mutex);
+ this->running--;
+ this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
+}
+
+/**
* Async processing of accepted connection
*/
static job_requeue_t accept_async(async_data_t *data)
{
+ private_stream_service_t *this = data->this;
stream_t *stream;
+ this->mutex->lock(this->mutex);
+ if (this->terminated)
+ {
+ this->mutex->unlock(this->mutex);
+ return JOB_REQUEUE_NONE;
+ }
+ this->running++;
+ this->mutex->unlock(this->mutex);
+
stream = stream_create_from_fd(data->fd);
if (stream)
{
/* FD is now owned by stream, don't close it during cleanup */
data->fd = -1;
+ thread_cleanup_push(reduce_running, data);
thread_cleanup_push((void*)stream->destroy, stream);
thread_cleanup_pop(!data->cb(data->data, stream));
+ thread_cleanup_pop(TRUE);
}
return JOB_REQUEUE_NONE;
}
@@ -149,7 +207,7 @@ static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
.this = this,
);
- if (data->fd != -1)
+ if (data->fd != -1 && !this->terminated)
{
this->mutex->lock(this->mutex);
if (++this->active == this->cncrncy)
@@ -158,6 +216,7 @@ static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
keep = FALSE;
}
this->mutex->unlock(this->mutex);
+ ref_get(&this->ref);
lib->processor->queue_job(lib->processor,
(job_t*)callback_job_create_with_prio((void*)accept_async, data,
@@ -177,6 +236,12 @@ METHOD(stream_service_t, on_accept, void,
{
this->mutex->lock(this->mutex);
+ if (this->terminated)
+ {
+ this->mutex->unlock(this->mutex);
+ return;
+ }
+
/* wait for all callbacks to return */
while (this->active)
{
@@ -208,11 +273,15 @@ METHOD(stream_service_t, on_accept, void,
METHOD(stream_service_t, destroy, void,
private_stream_service_t *this)
{
- on_accept(this, NULL, NULL, this->prio, this->cncrncy);
- close(this->fd);
- this->mutex->destroy(this->mutex);
- this->condvar->destroy(this->condvar);
- free(this);
+ this->mutex->lock(this->mutex);
+ lib->watcher->remove(lib->watcher, this->fd);
+ this->terminated = TRUE;
+ while (this->running)
+ {
+ this->condvar->wait(this->condvar, this->mutex);
+ }
+ this->mutex->unlock(this->mutex);
+ destroy_service(this);
}
/**
@@ -231,6 +300,7 @@ stream_service_t *stream_service_create_from_fd(int fd)
.prio = JOB_PRIO_MEDIUM,
.mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
.condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .ref = 1,
);
return &this->public;