diff options
author | Martin Willi <martin@revosec.ch> | 2013-07-01 12:18:15 +0200 |
---|---|---|
committer | Martin Willi <martin@revosec.ch> | 2013-07-18 16:00:29 +0200 |
commit | 73da4ed849afca1aa3c0b8f13d703796a98184d4 (patch) | |
tree | 441ab49710d1d517e2928e871069651c568c8217 | |
parent | e11c02c8f1591fef64200137c24598fba9d488a9 (diff) | |
download | strongswan-73da4ed849afca1aa3c0b8f13d703796a98184d4.tar.bz2 strongswan-73da4ed849afca1aa3c0b8f13d703796a98184d4.tar.xz |
load-tester: use a stream service to dispatch control connections
-rw-r--r-- | src/libcharon/plugins/load_tester/load_tester.c | 2 | ||||
-rw-r--r-- | src/libcharon/plugins/load_tester/load_tester_control.c | 118 |
2 files changed, 27 insertions, 93 deletions
diff --git a/src/libcharon/plugins/load_tester/load_tester.c b/src/libcharon/plugins/load_tester/load_tester.c index f7361e606..b7b971ee8 100644 --- a/src/libcharon/plugins/load_tester/load_tester.c +++ b/src/libcharon/plugins/load_tester/load_tester.c @@ -35,7 +35,7 @@ static FILE* make_connection() addr.sun_family = AF_UNIX; strcpy(addr.sun_path, LOAD_TESTER_SOCKET); - fd = socket(AF_UNIX, SOCK_SEQPACKET, 0); + fd = socket(AF_UNIX, SOCK_STREAM, 0); if (fd < 0) { fprintf(stderr, "opening socket failed: %s\n", strerror(errno)); diff --git a/src/libcharon/plugins/load_tester/load_tester_control.c b/src/libcharon/plugins/load_tester/load_tester_control.c index 3c82b5c30..f9ec9142f 100644 --- a/src/libcharon/plugins/load_tester/load_tester_control.c +++ b/src/libcharon/plugins/load_tester/load_tester_control.c @@ -43,9 +43,9 @@ struct private_load_tester_control_t { load_tester_control_t public; /** - * Load tester unix socket file descriptor + * Load tester control stream service */ - int socket; + stream_service_t *service; }; /** @@ -85,48 +85,6 @@ struct init_listener_t { }; /** - * Open load-tester listening socket - */ -static bool open_socket(private_load_tester_control_t *this) -{ - struct sockaddr_un addr; - mode_t old; - - addr.sun_family = AF_UNIX; - strcpy(addr.sun_path, LOAD_TESTER_SOCKET); - - this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0); - if (this->socket == -1) - { - DBG1(DBG_CFG, "creating load-tester socket failed"); - return FALSE; - } - unlink(addr.sun_path); - old = umask(~(S_IRWXU | S_IRWXG)); - if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0) - { - DBG1(DBG_CFG, "binding load-tester socket failed: %s", strerror(errno)); - close(this->socket); - return FALSE; - } - umask(old); - if (chown(addr.sun_path, lib->caps->get_uid(lib->caps), - lib->caps->get_gid(lib->caps)) != 0) - { - DBG1(DBG_CFG, "changing load-tester socket permissions failed: %s", - strerror(errno)); - } - if (listen(this->socket, 10) < 0) - { - DBG1(DBG_CFG, "listening on load-tester socket failed: %s", strerror(errno)); - close(this->socket); - unlink(addr.sun_path); - return FALSE; - } - return TRUE; -} - -/** * Hashtable hash function */ static u_int hash(uintptr_t id) @@ -215,9 +173,9 @@ static bool initiate_cb(init_listener_t *this, debug_t group, level_t level, } /** - * Initiate load-test, write progress to stream + * Accept connections, initiate load-test, write progress to stream */ -static job_requeue_t initiate(FILE *stream) +static bool on_accept(private_load_tester_control_t *this, stream_t *io) { init_listener_t *listener; enumerator_t *enumerator; @@ -225,15 +183,23 @@ static job_requeue_t initiate(FILE *stream) child_cfg_t *child_cfg; u_int i, count, failed = 0, delay = 0; char buf[16] = ""; + FILE *stream; + stream = io->get_file(io); + if (!stream) + { + return FALSE; + } fflush(stream); if (fgets(buf, sizeof(buf), stream) == NULL) { - return JOB_REQUEUE_NONE; + fclose(stream); + return FALSE; } if (sscanf(buf, "%u %u", &count, &delay) < 1) { - return JOB_REQUEUE_NONE; + fclose(stream); + return FALSE; } INIT(listener, @@ -308,50 +274,15 @@ static job_requeue_t initiate(FILE *stream) free(listener); fprintf(stream, "\n"); + fclose(stream); - return JOB_REQUEUE_NONE; -} - -/** - * Accept load-tester control connections, dispatch - */ -static job_requeue_t receive(private_load_tester_control_t *this) -{ - struct sockaddr_un addr; - int fd, len = sizeof(addr); - bool oldstate; - FILE *stream; - - oldstate = thread_cancelability(TRUE); - fd = accept(this->socket, (struct sockaddr*)&addr, &len); - thread_cancelability(oldstate); - - if (fd != -1) - { - stream = fdopen(fd, "r+"); - if (stream) - { - DBG1(DBG_CFG, "client connected"); - lib->processor->queue_job(lib->processor, - (job_t*)callback_job_create_with_prio( - (callback_job_cb_t)initiate, stream, (void*)fclose, - (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); - } - else - { - close(fd); - } - } - return JOB_REQUEUE_FAIR; + return FALSE; } METHOD(load_tester_control_t, destroy, void, private_load_tester_control_t *this) { - if (this->socket != -1) - { - close(this->socket); - } + DESTROY_IF(this->service); free(this); } @@ -361,6 +292,7 @@ METHOD(load_tester_control_t, destroy, void, load_tester_control_t *load_tester_control_create() { private_load_tester_control_t *this; + char *uri; INIT(this, .public = { @@ -368,16 +300,18 @@ load_tester_control_t *load_tester_control_create() }, ); - if (open_socket(this)) + uri = lib->settings->get_str(lib->settings, + "%s.plugins.load-tester.socket", "unix://" LOAD_TESTER_SOCKET, + charon->name); + this->service = lib->streams->create_service(lib->streams, uri, 10); + if (this->service) { - lib->processor->queue_job(lib->processor, (job_t*) - callback_job_create_with_prio((callback_job_cb_t)receive, this, NULL, - (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); + this->service->on_accept(this->service, (stream_service_cb_t)on_accept, + this, JOB_PRIO_CRITICAL, 0); } else { - this->socket = -1; + DBG1(DBG_CFG, "creating load-tester control socket failed"); } - return &this->public; } |