aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libstrongswan/networking/streams/stream_service.c67
1 files changed, 62 insertions, 5 deletions
diff --git a/src/libstrongswan/networking/streams/stream_service.c b/src/libstrongswan/networking/streams/stream_service.c
index 6ce37e887..09138c76a 100644
--- a/src/libstrongswan/networking/streams/stream_service.c
+++ b/src/libstrongswan/networking/streams/stream_service.c
@@ -68,6 +68,11 @@ struct private_stream_service_t {
u_int active;
/**
+ * Currently running jobs
+ */
+ u_int running;
+
+ /**
* mutex to lock active counter
*/
mutex_t *mutex;
@@ -81,8 +86,24 @@ struct private_stream_service_t {
* TRUE when the service is terminated
*/
bool terminated;
+
+ /**
+ * Reference counter
+ */
+ refcount_t ref;
};
+static void destroy_service(private_stream_service_t *this)
+{
+ if (ref_put(&this->ref))
+ {
+ close(this->fd);
+ this->mutex->destroy(this->mutex);
+ this->condvar->destroy(this->condvar);
+ free(this);
+ }
+}
+
/**
* Data to pass to async accept job
*/
@@ -118,6 +139,7 @@ static void destroy_async_data(async_data_t *data)
}
this->condvar->signal(this->condvar);
this->mutex->unlock(this->mutex);
+ destroy_service(this);
if (data->fd != -1)
{
@@ -127,19 +149,45 @@ static void destroy_async_data(async_data_t *data)
}
/**
+ * Reduce running counter
+ */
+CALLBACK(reduce_running, void,
+ async_data_t *data)
+{
+ private_stream_service_t *this = data->this;
+
+ this->mutex->lock(this->mutex);
+ this->running--;
+ this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
+}
+
+/**
* Async processing of accepted connection
*/
static job_requeue_t accept_async(async_data_t *data)
{
+ private_stream_service_t *this = data->this;
stream_t *stream;
+ this->mutex->lock(this->mutex);
+ if (this->terminated)
+ {
+ this->mutex->unlock(this->mutex);
+ return JOB_REQUEUE_NONE;
+ }
+ this->running++;
+ this->mutex->unlock(this->mutex);
+
stream = stream_create_from_fd(data->fd);
if (stream)
{
/* FD is now owned by stream, don't close it during cleanup */
data->fd = -1;
+ thread_cleanup_push(reduce_running, data);
thread_cleanup_push((void*)stream->destroy, stream);
thread_cleanup_pop(!data->cb(data->data, stream));
+ thread_cleanup_pop(TRUE);
}
return JOB_REQUEUE_NONE;
}
@@ -168,6 +216,7 @@ static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
keep = FALSE;
}
this->mutex->unlock(this->mutex);
+ ref_get(&this->ref);
lib->processor->queue_job(lib->processor,
(job_t*)callback_job_create_with_prio((void*)accept_async, data,
@@ -187,6 +236,12 @@ METHOD(stream_service_t, on_accept, void,
{
this->mutex->lock(this->mutex);
+ if (this->terminated)
+ {
+ this->mutex->unlock(this->mutex);
+ return;
+ }
+
/* wait for all callbacks to return */
while (this->active)
{
@@ -219,13 +274,14 @@ METHOD(stream_service_t, destroy, void,
private_stream_service_t *this)
{
this->mutex->lock(this->mutex);
+ lib->watcher->remove(lib->watcher, this->fd);
this->terminated = TRUE;
+ while (this->running)
+ {
+ this->condvar->wait(this->condvar, this->mutex);
+ }
this->mutex->unlock(this->mutex);
- on_accept(this, NULL, NULL, this->prio, this->cncrncy);
- close(this->fd);
- this->mutex->destroy(this->mutex);
- this->condvar->destroy(this->condvar);
- free(this);
+ destroy_service(this);
}
/**
@@ -244,6 +300,7 @@ stream_service_t *stream_service_create_from_fd(int fd)
.prio = JOB_PRIO_MEDIUM,
.mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
.condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .ref = 1,
);
return &this->public;