diff options
author | Martin Willi <martin@revosec.ch> | 2013-06-28 11:50:59 +0200 |
---|---|---|
committer | Martin Willi <martin@revosec.ch> | 2013-07-18 16:00:28 +0200 |
commit | 70d1ccec963e14c755683cefe33af90f51035560 (patch) | |
tree | 08bcc1fbcaa4fe7bb30eba82e35f5043684d3e43 /src/libstrongswan/networking/streams/stream_service.c | |
parent | db0e160ba28c0ec355f76f88033b0a3a2277deaa (diff) | |
download | strongswan-70d1ccec963e14c755683cefe33af90f51035560.tar.bz2 strongswan-70d1ccec963e14c755683cefe33af90f51035560.tar.xz |
stream: add a concurrency option to services, limiting parallel callbacks
Diffstat (limited to 'src/libstrongswan/networking/streams/stream_service.c')
-rw-r--r-- | src/libstrongswan/networking/streams/stream_service.c | 67 |
1 files changed, 64 insertions, 3 deletions
diff --git a/src/libstrongswan/networking/streams/stream_service.c b/src/libstrongswan/networking/streams/stream_service.c index 5f2905146..34d45a067 100644 --- a/src/libstrongswan/networking/streams/stream_service.c +++ b/src/libstrongswan/networking/streams/stream_service.c @@ -15,6 +15,8 @@ #include <library.h> #include <threading/thread.h> +#include <threading/mutex.h> +#include <threading/condvar.h> #include <processing/jobs/callback_job.h> #include <errno.h> @@ -54,6 +56,26 @@ struct private_stream_service_t { * Job priority to invoke callback with */ job_priority_t prio; + + /** + * Maximum number of parallel callback invocations + */ + u_int cncrncy; + + /** + * Currently active jobs + */ + u_int active; + + /** + * mutex to lock active counter + */ + mutex_t *mutex; + + /** + * Condvar to wait for callback termination + */ + condvar_t *condvar; }; /** @@ -66,6 +88,8 @@ typedef struct { void *data; /** accepted connection */ int fd; + /** reference to stream service */ + private_stream_service_t *this; } async_data_t; /** @@ -73,6 +97,18 @@ typedef struct { */ static void destroy_async_data(async_data_t *data) { + private_stream_service_t *this = data->this; + + this->mutex->lock(this->mutex); + if (this->active-- == this->cncrncy) + { + /* leaving concurrency limit, restart accept()ing. */ + this->public.on_accept(&this->public, this->cb, this->data, + this->prio, this->cncrncy); + } + this->condvar->signal(this->condvar); + this->mutex->unlock(this->mutex); + close(data->fd); free(data); } @@ -100,15 +136,25 @@ static job_requeue_t accept_async(async_data_t *data) static bool watch(private_stream_service_t *this, int fd, watcher_event_t event) { async_data_t *data; + bool keep = TRUE; INIT(data, .cb = this->cb, .data = this->data, .fd = accept(fd, NULL, NULL), + .this = this, ); if (data->fd != -1) { + this->mutex->lock(this->mutex); + if (++this->active == this->cncrncy) + { + /* concurrency limit reached, stop accept()ing new connections */ + keep = FALSE; + } + this->mutex->unlock(this->mutex); + lib->processor->queue_job(lib->processor, (job_t*)callback_job_create_with_prio((void*)accept_async, data, (void*)destroy_async_data, NULL, this->prio)); @@ -117,13 +163,21 @@ static bool watch(private_stream_service_t *this, int fd, watcher_event_t event) { free(data); } - return TRUE; + return keep; } METHOD(stream_service_t, on_accept, void, private_stream_service_t *this, stream_service_cb_t cb, void *data, - job_priority_t prio) + job_priority_t prio, u_int cncrncy) { + this->mutex->lock(this->mutex); + + /* wait for all callbacks to return */ + while (this->active) + { + this->condvar->wait(this->condvar, this->mutex); + } + if (this->cb) { lib->watcher->remove(lib->watcher, this->fd); @@ -135,19 +189,24 @@ METHOD(stream_service_t, on_accept, void, { this->prio = prio; } + this->cncrncy = cncrncy; if (this->cb) { lib->watcher->add(lib->watcher, this->fd, WATCHER_READ, (watcher_cb_t)watch, this); } + + this->mutex->unlock(this->mutex); } METHOD(stream_service_t, destroy, void, private_stream_service_t *this) { - on_accept(this, NULL, NULL, this->prio); + on_accept(this, NULL, NULL, this->prio, this->cncrncy); close(this->fd); + this->mutex->destroy(this->mutex); + this->condvar->destroy(this->condvar); free(this); } @@ -165,6 +224,8 @@ stream_service_t *stream_service_create_from_fd(int fd) }, .fd = fd, .prio = JOB_PRIO_MEDIUM, + .mutex = mutex_create(MUTEX_TYPE_RECURSIVE), + .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), ); return &this->public; |