aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Willi <martin@revosec.ch>2013-07-01 12:18:15 +0200
committerMartin Willi <martin@revosec.ch>2013-07-18 16:00:29 +0200
commit73da4ed849afca1aa3c0b8f13d703796a98184d4 (patch)
tree441ab49710d1d517e2928e871069651c568c8217
parente11c02c8f1591fef64200137c24598fba9d488a9 (diff)
downloadstrongswan-73da4ed849afca1aa3c0b8f13d703796a98184d4.tar.bz2
strongswan-73da4ed849afca1aa3c0b8f13d703796a98184d4.tar.xz
load-tester: use a stream service to dispatch control connections
-rw-r--r--src/libcharon/plugins/load_tester/load_tester.c2
-rw-r--r--src/libcharon/plugins/load_tester/load_tester_control.c118
2 files changed, 27 insertions, 93 deletions
diff --git a/src/libcharon/plugins/load_tester/load_tester.c b/src/libcharon/plugins/load_tester/load_tester.c
index f7361e606..b7b971ee8 100644
--- a/src/libcharon/plugins/load_tester/load_tester.c
+++ b/src/libcharon/plugins/load_tester/load_tester.c
@@ -35,7 +35,7 @@ static FILE* make_connection()
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, LOAD_TESTER_SOCKET);
- fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
{
fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
diff --git a/src/libcharon/plugins/load_tester/load_tester_control.c b/src/libcharon/plugins/load_tester/load_tester_control.c
index 3c82b5c30..f9ec9142f 100644
--- a/src/libcharon/plugins/load_tester/load_tester_control.c
+++ b/src/libcharon/plugins/load_tester/load_tester_control.c
@@ -43,9 +43,9 @@ struct private_load_tester_control_t {
load_tester_control_t public;
/**
- * Load tester unix socket file descriptor
+ * Load tester control stream service
*/
- int socket;
+ stream_service_t *service;
};
/**
@@ -85,48 +85,6 @@ struct init_listener_t {
};
/**
- * Open load-tester listening socket
- */
-static bool open_socket(private_load_tester_control_t *this)
-{
- struct sockaddr_un addr;
- mode_t old;
-
- addr.sun_family = AF_UNIX;
- strcpy(addr.sun_path, LOAD_TESTER_SOCKET);
-
- this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
- if (this->socket == -1)
- {
- DBG1(DBG_CFG, "creating load-tester socket failed");
- return FALSE;
- }
- unlink(addr.sun_path);
- old = umask(~(S_IRWXU | S_IRWXG));
- if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
- {
- DBG1(DBG_CFG, "binding load-tester socket failed: %s", strerror(errno));
- close(this->socket);
- return FALSE;
- }
- umask(old);
- if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
- lib->caps->get_gid(lib->caps)) != 0)
- {
- DBG1(DBG_CFG, "changing load-tester socket permissions failed: %s",
- strerror(errno));
- }
- if (listen(this->socket, 10) < 0)
- {
- DBG1(DBG_CFG, "listening on load-tester socket failed: %s", strerror(errno));
- close(this->socket);
- unlink(addr.sun_path);
- return FALSE;
- }
- return TRUE;
-}
-
-/**
* Hashtable hash function
*/
static u_int hash(uintptr_t id)
@@ -215,9 +173,9 @@ static bool initiate_cb(init_listener_t *this, debug_t group, level_t level,
}
/**
- * Initiate load-test, write progress to stream
+ * Accept connections, initiate load-test, write progress to stream
*/
-static job_requeue_t initiate(FILE *stream)
+static bool on_accept(private_load_tester_control_t *this, stream_t *io)
{
init_listener_t *listener;
enumerator_t *enumerator;
@@ -225,15 +183,23 @@ static job_requeue_t initiate(FILE *stream)
child_cfg_t *child_cfg;
u_int i, count, failed = 0, delay = 0;
char buf[16] = "";
+ FILE *stream;
+ stream = io->get_file(io);
+ if (!stream)
+ {
+ return FALSE;
+ }
fflush(stream);
if (fgets(buf, sizeof(buf), stream) == NULL)
{
- return JOB_REQUEUE_NONE;
+ fclose(stream);
+ return FALSE;
}
if (sscanf(buf, "%u %u", &count, &delay) < 1)
{
- return JOB_REQUEUE_NONE;
+ fclose(stream);
+ return FALSE;
}
INIT(listener,
@@ -308,50 +274,15 @@ static job_requeue_t initiate(FILE *stream)
free(listener);
fprintf(stream, "\n");
+ fclose(stream);
- return JOB_REQUEUE_NONE;
-}
-
-/**
- * Accept load-tester control connections, dispatch
- */
-static job_requeue_t receive(private_load_tester_control_t *this)
-{
- struct sockaddr_un addr;
- int fd, len = sizeof(addr);
- bool oldstate;
- FILE *stream;
-
- oldstate = thread_cancelability(TRUE);
- fd = accept(this->socket, (struct sockaddr*)&addr, &len);
- thread_cancelability(oldstate);
-
- if (fd != -1)
- {
- stream = fdopen(fd, "r+");
- if (stream)
- {
- DBG1(DBG_CFG, "client connected");
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio(
- (callback_job_cb_t)initiate, stream, (void*)fclose,
- (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
- }
- else
- {
- close(fd);
- }
- }
- return JOB_REQUEUE_FAIR;
+ return FALSE;
}
METHOD(load_tester_control_t, destroy, void,
private_load_tester_control_t *this)
{
- if (this->socket != -1)
- {
- close(this->socket);
- }
+ DESTROY_IF(this->service);
free(this);
}
@@ -361,6 +292,7 @@ METHOD(load_tester_control_t, destroy, void,
load_tester_control_t *load_tester_control_create()
{
private_load_tester_control_t *this;
+ char *uri;
INIT(this,
.public = {
@@ -368,16 +300,18 @@ load_tester_control_t *load_tester_control_create()
},
);
- if (open_socket(this))
+ uri = lib->settings->get_str(lib->settings,
+ "%s.plugins.load-tester.socket", "unix://" LOAD_TESTER_SOCKET,
+ charon->name);
+ this->service = lib->streams->create_service(lib->streams, uri, 10);
+ if (this->service)
{
- lib->processor->queue_job(lib->processor, (job_t*)
- callback_job_create_with_prio((callback_job_cb_t)receive, this, NULL,
- (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+ this, JOB_PRIO_CRITICAL, 0);
}
else
{
- this->socket = -1;
+ DBG1(DBG_CFG, "creating load-tester control socket failed");
}
-
return &this->public;
}