aboutsummaryrefslogtreecommitdiffstats
path: root/src/libstrongswan/networking/streams/stream_service.c
diff options
context:
space:
mode:
authorMartin Willi <martin@revosec.ch>2013-06-28 11:50:59 +0200
committerMartin Willi <martin@revosec.ch>2013-07-18 16:00:28 +0200
commit70d1ccec963e14c755683cefe33af90f51035560 (patch)
tree08bcc1fbcaa4fe7bb30eba82e35f5043684d3e43 /src/libstrongswan/networking/streams/stream_service.c
parentdb0e160ba28c0ec355f76f88033b0a3a2277deaa (diff)
downloadstrongswan-70d1ccec963e14c755683cefe33af90f51035560.tar.bz2
strongswan-70d1ccec963e14c755683cefe33af90f51035560.tar.xz
stream: add a concurrency option to services, limiting parallel callbacks
Diffstat (limited to 'src/libstrongswan/networking/streams/stream_service.c')
-rw-r--r--src/libstrongswan/networking/streams/stream_service.c67
1 files changed, 64 insertions, 3 deletions
diff --git a/src/libstrongswan/networking/streams/stream_service.c b/src/libstrongswan/networking/streams/stream_service.c
index 5f2905146..34d45a067 100644
--- a/src/libstrongswan/networking/streams/stream_service.c
+++ b/src/libstrongswan/networking/streams/stream_service.c
@@ -15,6 +15,8 @@
#include <library.h>
#include <threading/thread.h>
+#include <threading/mutex.h>
+#include <threading/condvar.h>
#include <processing/jobs/callback_job.h>
#include <errno.h>
@@ -54,6 +56,26 @@ struct private_stream_service_t {
* Job priority to invoke callback with
*/
job_priority_t prio;
+
+ /**
+ * Maximum number of parallel callback invocations
+ */
+ u_int cncrncy;
+
+ /**
+ * Currently active jobs
+ */
+ u_int active;
+
+ /**
+ * mutex to lock active counter
+ */
+ mutex_t *mutex;
+
+ /**
+ * Condvar to wait for callback termination
+ */
+ condvar_t *condvar;
};
/**
@@ -66,6 +88,8 @@ typedef struct {
void *data;
/** accepted connection */
int fd;
+ /** reference to stream service */
+ private_stream_service_t *this;
} async_data_t;
/**
@@ -73,6 +97,18 @@ typedef struct {
*/
static void destroy_async_data(async_data_t *data)
{
+ private_stream_service_t *this = data->this;
+
+ this->mutex->lock(this->mutex);
+ if (this->active-- == this->cncrncy)
+ {
+ /* leaving concurrency limit, restart accept()ing. */
+ this->public.on_accept(&this->public, this->cb, this->data,
+ this->prio, this->cncrncy);
+ }
+ this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
+
close(data->fd);
free(data);
}
@@ -100,15 +136,25 @@ static job_requeue_t accept_async(async_data_t *data)
static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
{
async_data_t *data;
+ bool keep = TRUE;
INIT(data,
.cb = this->cb,
.data = this->data,
.fd = accept(fd, NULL, NULL),
+ .this = this,
);
if (data->fd != -1)
{
+ this->mutex->lock(this->mutex);
+ if (++this->active == this->cncrncy)
+ {
+ /* concurrency limit reached, stop accept()ing new connections */
+ keep = FALSE;
+ }
+ this->mutex->unlock(this->mutex);
+
lib->processor->queue_job(lib->processor,
(job_t*)callback_job_create_with_prio((void*)accept_async, data,
(void*)destroy_async_data, NULL, this->prio));
@@ -117,13 +163,21 @@ static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
{
free(data);
}
- return TRUE;
+ return keep;
}
METHOD(stream_service_t, on_accept, void,
private_stream_service_t *this, stream_service_cb_t cb, void *data,
- job_priority_t prio)
+ job_priority_t prio, u_int cncrncy)
{
+ this->mutex->lock(this->mutex);
+
+ /* wait for all callbacks to return */
+ while (this->active)
+ {
+ this->condvar->wait(this->condvar, this->mutex);
+ }
+
if (this->cb)
{
lib->watcher->remove(lib->watcher, this->fd);
@@ -135,19 +189,24 @@ METHOD(stream_service_t, on_accept, void,
{
this->prio = prio;
}
+ this->cncrncy = cncrncy;
if (this->cb)
{
lib->watcher->add(lib->watcher, this->fd,
WATCHER_READ, (watcher_cb_t)watch, this);
}
+
+ this->mutex->unlock(this->mutex);
}
METHOD(stream_service_t, destroy, void,
private_stream_service_t *this)
{
- on_accept(this, NULL, NULL, this->prio);
+ on_accept(this, NULL, NULL, this->prio, this->cncrncy);
close(this->fd);
+ this->mutex->destroy(this->mutex);
+ this->condvar->destroy(this->condvar);
free(this);
}
@@ -165,6 +224,8 @@ stream_service_t *stream_service_create_from_fd(int fd)
},
.fd = fd,
.prio = JOB_PRIO_MEDIUM,
+ .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
+ .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
);
return &this->public;