aboutsummaryrefslogtreecommitdiffstats
path: root/src/libcharon/plugins/stroke/stroke_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libcharon/plugins/stroke/stroke_socket.c')
-rw-r--r--src/libcharon/plugins/stroke/stroke_socket.c131
1 files changed, 112 insertions, 19 deletions
diff --git a/src/libcharon/plugins/stroke/stroke_socket.c b/src/libcharon/plugins/stroke/stroke_socket.c
index 21d15afe6..4956b011f 100644
--- a/src/libcharon/plugins/stroke/stroke_socket.c
+++ b/src/libcharon/plugins/stroke/stroke_socket.c
@@ -1,4 +1,5 @@
/*
+ * Copyright (C) 2011 Tobias Brunner
* Copyright (C) 2008 Martin Willi
* Hochschule fuer Technik Rapperswil
*
@@ -25,7 +26,10 @@
#include <hydra.h>
#include <daemon.h>
+#include <threading/mutex.h>
#include <threading/thread.h>
+#include <threading/condvar.h>
+#include <utils/linked_list.h>
#include <processing/jobs/callback_job.h>
#include "stroke_config.h"
@@ -35,6 +39,12 @@
#include "stroke_attribute.h"
#include "stroke_list.h"
+/**
+ * To avoid clogging the thread pool with (blocking) jobs, we limit the number
+ * of concurrently handled stroke commands.
+ */
+#define MAX_CONCURRENT_DEFAULT 4
+
typedef struct stroke_job_context_t stroke_job_context_t;
typedef struct private_stroke_socket_t private_stroke_socket_t;
@@ -56,7 +66,37 @@ struct private_stroke_socket_t {
/**
* job accepting stroke messages
*/
- callback_job_t *job;
+ callback_job_t *receiver;
+
+ /**
+ * job handling stroke messages
+ */
+ callback_job_t *handler;
+
+ /**
+ * queued stroke commands
+ */
+ linked_list_t *commands;
+
+ /**
+ * lock for command list
+ */
+ mutex_t *mutex;
+
+ /**
+ * condvar to signal the arrival or completion of commands
+ */
+ condvar_t *condvar;
+
+ /**
+ * the number of currently handled commands
+ */
+ u_int handling;
+
+ /**
+ * the maximum number of concurrently handled commands
+ */
+ u_int max_concurrent;
/**
* configuration backend
@@ -84,7 +124,7 @@ struct private_stroke_socket_t {
stroke_ca_t *ca;
/**
- * Status information logging
+ * status information logging
*/
stroke_list_t *list;
};
@@ -450,7 +490,7 @@ static void stroke_loglevel(private_stroke_socket_t *this,
msg->loglevel.level, msg->loglevel.type);
group = enum_from_name(debug_names, msg->loglevel.type);
- if (group < 0)
+ if ((int)group < 0)
{
fprintf(out, "invalid type (%s)!\n", msg->loglevel.type);
return;
@@ -492,6 +532,18 @@ static void stroke_job_context_destroy(stroke_job_context_t *this)
}
/**
+ * called to signal the completion of a command
+ */
+static inline job_requeue_t job_processed(private_stroke_socket_t *this)
+{
+ this->mutex->lock(this->mutex);
+ this->handling--;
+ this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
+ return JOB_REQUEUE_NONE;
+}
+
+/**
* process a stroke request from the socket pointed by "fd"
*/
static job_requeue_t process(stroke_job_context_t *ctx)
@@ -509,7 +561,7 @@ static job_requeue_t process(stroke_job_context_t *ctx)
{
DBG1(DBG_CFG, "reading length of stroke message failed: %s",
strerror(errno));
- return JOB_REQUEUE_NONE;
+ return job_processed(this);
}
/* read message */
@@ -518,14 +570,14 @@ static job_requeue_t process(stroke_job_context_t *ctx)
if (bytes_read != msg_length)
{
DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno));
- return JOB_REQUEUE_NONE;
+ return job_processed(this);
}
out = fdopen(strokefd, "w+");
if (out == NULL)
{
DBG1(DBG_CFG, "opening stroke output channel failed: %s", strerror(errno));
- return JOB_REQUEUE_NONE;
+ return job_processed(this);
}
DBG3(DBG_CFG, "stroke message %b", (void*)msg, msg_length);
@@ -602,11 +654,38 @@ static job_requeue_t process(stroke_job_context_t *ctx)
fclose(out);
/* fclose() closes underlying FD */
ctx->fd = 0;
- return JOB_REQUEUE_NONE;
+ return job_processed(this);
}
/**
- * Implementation of private_stroke_socket_t.stroke_receive.
+ * Handle queued stroke commands
+ */
+static job_requeue_t handle(private_stroke_socket_t *this)
+{
+ stroke_job_context_t *ctx;
+ callback_job_t *job;
+ bool oldstate;
+
+ this->mutex->lock(this->mutex);
+ thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
+ oldstate = thread_cancelability(TRUE);
+ while (this->commands->get_count(this->commands) == 0 ||
+ this->handling >= this->max_concurrent)
+ {
+ this->condvar->wait(this->condvar, this->mutex);
+ }
+ thread_cancelability(oldstate);
+ this->commands->remove_first(this->commands, (void**)&ctx);
+ this->handling++;
+ thread_cleanup_pop(TRUE);
+ job = callback_job_create_with_prio((callback_job_cb_t)process, ctx,
+ (void*)stroke_job_context_destroy, this->handler, JOB_PRIO_HIGH);
+ lib->processor->queue_job(lib->processor, (job_t*)job);
+ return JOB_REQUEUE_DIRECT;
+}
+
+/**
+ * Accept stroke commands and queue them to be handled
*/
static job_requeue_t receive(private_stroke_socket_t *this)
{
@@ -614,7 +693,6 @@ static job_requeue_t receive(private_stroke_socket_t *this)
int strokeaddrlen = sizeof(strokeaddr);
int strokefd;
bool oldstate;
- callback_job_t *job;
stroke_job_context_t *ctx;
oldstate = thread_cancelability(TRUE);
@@ -627,17 +705,18 @@ static job_requeue_t receive(private_stroke_socket_t *this)
return JOB_REQUEUE_FAIR;
}
- ctx = malloc_thing(stroke_job_context_t);
- ctx->fd = strokefd;
- ctx->this = this;
- job = callback_job_create_with_prio((callback_job_cb_t)process,
- ctx, (void*)stroke_job_context_destroy, this->job, JOB_PRIO_HIGH);
- lib->processor->queue_job(lib->processor, (job_t*)job);
+ INIT(ctx,
+ .fd = strokefd,
+ .this = this,
+ );
+ this->mutex->lock(this->mutex);
+ this->commands->insert_last(this->commands, ctx);
+ this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
return JOB_REQUEUE_FAIR;
}
-
/**
* initialize and open stroke socket
*/
@@ -685,7 +764,11 @@ static bool open_socket(private_stroke_socket_t *this)
METHOD(stroke_socket_t, destroy, void,
private_stroke_socket_t *this)
{
- this->job->cancel(this->job);
+ this->handler->cancel(this->handler);
+ this->receiver->cancel(this->receiver);
+ this->commands->destroy_function(this->commands, (void*)stroke_job_context_destroy);
+ this->condvar->destroy(this->condvar);
+ this->mutex->destroy(this->mutex);
lib->credmgr->remove_set(lib->credmgr, &this->ca->set);
lib->credmgr->remove_set(lib->credmgr, &this->cred->set);
charon->backends->remove_backend(charon->backends, &this->config->backend);
@@ -725,14 +808,24 @@ stroke_socket_t *stroke_socket_create()
this->control = stroke_control_create();
this->list = stroke_list_create(this->attribute);
+ this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
+ this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
+ this->commands = linked_list_create();
+ this->max_concurrent = lib->settings->get_int(lib->settings,
+ "charon.plugins.stroke.max_concurrent", MAX_CONCURRENT_DEFAULT);
+
lib->credmgr->add_set(lib->credmgr, &this->ca->set);
lib->credmgr->add_set(lib->credmgr, &this->cred->set);
charon->backends->add_backend(charon->backends, &this->config->backend);
hydra->attributes->add_provider(hydra->attributes, &this->attribute->provider);
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
+ this->receiver = callback_job_create_with_prio((callback_job_cb_t)receive,
+ this, NULL, NULL, JOB_PRIO_CRITICAL);
+ lib->processor->queue_job(lib->processor, (job_t*)this->receiver);
+
+ this->handler = callback_job_create_with_prio((callback_job_cb_t)handle,
this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor, (job_t*)this->handler);
return &this->public;
}