aboutsummaryrefslogtreecommitdiffstats
path: root/src/libstrongswan/networking/streams/stream_service.c
diff options
context:
space:
mode:
authorTobias Brunner <tobias@strongswan.org>2014-07-21 12:23:37 +0200
committerTobias Brunner <tobias@strongswan.org>2014-09-09 10:58:59 +0200
commitcee338eccd57b4c2bbcdc899637cbbbbafa608c5 (patch)
treec60fbcf7617097e1ce88d3d05cc16599c6fa8282 /src/libstrongswan/networking/streams/stream_service.c
parent10859adfd4268a819766b436e9b5f40bc69c54bb (diff)
downloadstrongswan-cee338eccd57b4c2bbcdc899637cbbbbafa608c5.tar.bz2
strongswan-cee338eccd57b4c2bbcdc899637cbbbbafa608c5.tar.xz
stream-service: Prevent race conditions due to blocking call to destroy()
In the previous implementation queued jobs could prevent a service from getting destroyed. This could have lead to a deadlock when the processor is cancelled. Now destroy() still blocks, but waits only for actually running tasks. The service instance is reference counted so that queued jobs can safely be destroyed.
Diffstat (limited to 'src/libstrongswan/networking/streams/stream_service.c')
-rw-r--r--src/libstrongswan/networking/streams/stream_service.c67
1 files changed, 62 insertions, 5 deletions
diff --git a/src/libstrongswan/networking/streams/stream_service.c b/src/libstrongswan/networking/streams/stream_service.c
index 6ce37e887..09138c76a 100644
--- a/src/libstrongswan/networking/streams/stream_service.c
+++ b/src/libstrongswan/networking/streams/stream_service.c
@@ -68,6 +68,11 @@ struct private_stream_service_t {
u_int active;
/**
+ * Currently running jobs
+ */
+ u_int running;
+
+ /**
* mutex to lock active counter
*/
mutex_t *mutex;
@@ -81,8 +86,24 @@ struct private_stream_service_t {
* TRUE when the service is terminated
*/
bool terminated;
+
+ /**
+ * Reference counter
+ */
+ refcount_t ref;
};
+static void destroy_service(private_stream_service_t *this)
+{
+ if (ref_put(&this->ref))
+ {
+ close(this->fd);
+ this->mutex->destroy(this->mutex);
+ this->condvar->destroy(this->condvar);
+ free(this);
+ }
+}
+
/**
* Data to pass to async accept job
*/
@@ -118,6 +139,7 @@ static void destroy_async_data(async_data_t *data)
}
this->condvar->signal(this->condvar);
this->mutex->unlock(this->mutex);
+ destroy_service(this);
if (data->fd != -1)
{
@@ -127,19 +149,45 @@ static void destroy_async_data(async_data_t *data)
}
/**
+ * Reduce running counter
+ */
+CALLBACK(reduce_running, void,
+ async_data_t *data)
+{
+ private_stream_service_t *this = data->this;
+
+ this->mutex->lock(this->mutex);
+ this->running--;
+ this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
+}
+
+/**
* Async processing of accepted connection
*/
static job_requeue_t accept_async(async_data_t *data)
{
+ private_stream_service_t *this = data->this;
stream_t *stream;
+ this->mutex->lock(this->mutex);
+ if (this->terminated)
+ {
+ this->mutex->unlock(this->mutex);
+ return JOB_REQUEUE_NONE;
+ }
+ this->running++;
+ this->mutex->unlock(this->mutex);
+
stream = stream_create_from_fd(data->fd);
if (stream)
{
/* FD is now owned by stream, don't close it during cleanup */
data->fd = -1;
+ thread_cleanup_push(reduce_running, data);
thread_cleanup_push((void*)stream->destroy, stream);
thread_cleanup_pop(!data->cb(data->data, stream));
+ thread_cleanup_pop(TRUE);
}
return JOB_REQUEUE_NONE;
}
@@ -168,6 +216,7 @@ static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
keep = FALSE;
}
this->mutex->unlock(this->mutex);
+ ref_get(&this->ref);
lib->processor->queue_job(lib->processor,
(job_t*)callback_job_create_with_prio((void*)accept_async, data,
@@ -187,6 +236,12 @@ METHOD(stream_service_t, on_accept, void,
{
this->mutex->lock(this->mutex);
+ if (this->terminated)
+ {
+ this->mutex->unlock(this->mutex);
+ return;
+ }
+
/* wait for all callbacks to return */
while (this->active)
{
@@ -219,13 +274,14 @@ METHOD(stream_service_t, destroy, void,
private_stream_service_t *this)
{
this->mutex->lock(this->mutex);
+ lib->watcher->remove(lib->watcher, this->fd);
this->terminated = TRUE;
+ while (this->running)
+ {
+ this->condvar->wait(this->condvar, this->mutex);
+ }
this->mutex->unlock(this->mutex);
- on_accept(this, NULL, NULL, this->prio, this->cncrncy);
- close(this->fd);
- this->mutex->destroy(this->mutex);
- this->condvar->destroy(this->condvar);
- free(this);
+ destroy_service(this);
}
/**
@@ -244,6 +300,7 @@ stream_service_t *stream_service_create_from_fd(int fd)
.prio = JOB_PRIO_MEDIUM,
.mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
.condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .ref = 1,
);
return &this->public;