aboutsummaryrefslogtreecommitdiffstats
path: root/src/charon/control/interfaces
diff options
context:
space:
mode:
authorMartin Willi <martin@strongswan.org>2007-06-11 10:57:19 +0000
committerMartin Willi <martin@strongswan.org>2007-06-11 10:57:19 +0000
commit9fe1a1ca7617bb562750864aae1892ece1a6a1e6 (patch)
tree057d73714d52c09c40950927fede15e73cd6793b /src/charon/control/interfaces
parentaca0317d92c4141e1b48c7081f39d8646bd4767d (diff)
downloadstrongswan-9fe1a1ca7617bb562750864aae1892ece1a6a1e6.tar.bz2
strongswan-9fe1a1ca7617bb562750864aae1892ece1a6a1e6.tar.xz
introduced callback_job:
simple asynchronous method invocation use daemons thread pool for all threads proper cancellation and cleanups cancellation mechanism to dynamically unload multithreaded code unified event_queue and scheduler => scheduler unified job_queue and thread_pool => processor removed job_type_t, not really needed fixes here, there and everywhere
Diffstat (limited to 'src/charon/control/interfaces')
-rwxr-xr-xsrc/charon/control/interfaces/stroke_interface.c182
-rw-r--r--src/charon/control/interfaces/xml_interface.c211
2 files changed, 290 insertions, 103 deletions
diff --git a/src/charon/control/interfaces/stroke_interface.c b/src/charon/control/interfaces/stroke_interface.c
index 6e3427e8e..045d588f2 100755
--- a/src/charon/control/interfaces/stroke_interface.c
+++ b/src/charon/control/interfaces/stroke_interface.c
@@ -43,6 +43,7 @@
#include <control/interface_manager.h>
#include <control/interfaces/interface.h>
#include <utils/leak_detective.h>
+#include <processing/jobs/callback_job.h>
#define IKE_PORT 500
#define PATH_BUF 256
@@ -69,9 +70,9 @@ struct private_stroke_interface_t {
int socket;
/**
- * Thread which reads from the Socket
+ * job accepting stroke messages
*/
- pthread_t threads[STROKE_THREADS];
+ callback_job_t *job;
};
typedef struct stroke_log_info_t stroke_log_info_t;
@@ -224,8 +225,7 @@ static void pop_end(stroke_msg_t *msg, const char* label, stroke_end_t *end)
/**
* Add a connection to the configuration list
*/
-static void stroke_add_conn(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_add_conn(stroke_msg_t *msg, FILE *out)
{
ike_cfg_t *ike_cfg;
peer_cfg_t *peer_cfg;
@@ -628,8 +628,7 @@ destroy_hosts:
/**
* Delete a connection from the list
*/
-static void stroke_del_conn(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_del_conn(stroke_msg_t *msg, FILE *out)
{
iterator_t *peer_iter, *child_iter;
peer_cfg_t *peer, *child;
@@ -747,8 +746,7 @@ static peer_cfg_t *get_peer_cfg_by_name(char *name)
/**
* initiate a connection by name
*/
-static void stroke_initiate(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_initiate(stroke_msg_t *msg, FILE *out)
{
peer_cfg_t *peer_cfg;
child_cfg_t *child_cfg;
@@ -781,7 +779,6 @@ static void stroke_initiate(private_stroke_interface_t *this,
info.out = out;
info.level = msg->output_verbosity;
-
charon->interfaces->initiate(charon->interfaces, peer_cfg, child_cfg,
(interface_manager_cb_t)stroke_log, &info);
}
@@ -789,8 +786,7 @@ static void stroke_initiate(private_stroke_interface_t *this,
/**
* route a policy (install SPD entries)
*/
-static void stroke_route(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_route(stroke_msg_t *msg, FILE *out)
{
peer_cfg_t *peer_cfg;
child_cfg_t *child_cfg;
@@ -830,8 +826,7 @@ static void stroke_route(private_stroke_interface_t *this,
/**
* unroute a policy
*/
-static void stroke_unroute(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_unroute(stroke_msg_t *msg, FILE *out)
{
char *name;
ike_sa_t *ike_sa;
@@ -874,8 +869,7 @@ static void stroke_unroute(private_stroke_interface_t *this,
/**
* terminate a connection by name
*/
-static void stroke_terminate(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_terminate(stroke_msg_t *msg, FILE *out)
{
char *string, *pos = NULL, *name = NULL;
u_int32_t id = 0;
@@ -979,8 +973,7 @@ static void stroke_terminate(private_stroke_interface_t *this,
/**
* Add a ca information record to the cainfo list
*/
-static void stroke_add_ca(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_add_ca(stroke_msg_t *msg, FILE *out)
{
x509_t *cacert;
ca_info_t *ca_info;
@@ -1047,8 +1040,7 @@ static void stroke_add_ca(private_stroke_interface_t *this,
/**
* Delete a ca information record from the cainfo list
*/
-static void stroke_del_ca(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_del_ca(stroke_msg_t *msg, FILE *out)
{
status_t status;
@@ -1194,8 +1186,7 @@ static void log_child_sa(FILE *out, child_sa_t *child_sa, bool all)
/**
* show status of daemon
*/
-static void stroke_status(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out, bool all)
+static void stroke_status(stroke_msg_t *msg, FILE *out, bool all)
{
iterator_t *iterator, *children;
linked_list_t *list;
@@ -1218,12 +1209,12 @@ static void stroke_status(private_stroke_interface_t *this,
fprintf(out, "Performance:\n");
fprintf(out, " worker threads: %d idle of %d,",
- charon->thread_pool->get_idle_threads(charon->thread_pool),
- charon->thread_pool->get_pool_size(charon->thread_pool));
+ charon->processor->get_idle_threads(charon->processor),
+ charon->processor->get_total_threads(charon->processor));
fprintf(out, " job queue load: %d,",
- charon->job_queue->get_count(charon->job_queue));
+ charon->processor->get_job_load(charon->processor));
fprintf(out, " scheduled events: %d\n",
- charon->event_queue->get_count(charon->event_queue));
+ charon->scheduler->get_job_load(charon->scheduler));
list = charon->kernel_interface->create_address_list(charon->kernel_interface);
fprintf(out, "Listening on %d IP addresses:\n", list->get_count(list));
@@ -1312,8 +1303,8 @@ static void stroke_status(private_stroke_interface_t *this,
/**
* list all authority certificates matching a specified flag
*/
-static void list_auth_certificates(private_stroke_interface_t *this, u_int flag,
- const char *label, bool utc, FILE *out)
+static void list_auth_certificates(u_int flag, const char *label,
+ bool utc, FILE *out)
{
bool first = TRUE;
x509_t *cert;
@@ -1341,8 +1332,7 @@ static void list_auth_certificates(private_stroke_interface_t *this, u_int flag
/**
* list various information
*/
-static void stroke_list(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_list(stroke_msg_t *msg, FILE *out)
{
iterator_t *iterator;
@@ -1372,15 +1362,15 @@ static void stroke_list(private_stroke_interface_t *this,
}
if (msg->list.flags & LIST_CACERTS)
{
- list_auth_certificates(this, AUTH_CA, "CA", msg->list.utc, out);
+ list_auth_certificates(AUTH_CA, "CA", msg->list.utc, out);
}
if (msg->list.flags & LIST_OCSPCERTS)
{
- list_auth_certificates(this, AUTH_OCSP, "OCSP", msg->list.utc, out);
+ list_auth_certificates(AUTH_OCSP, "OCSP", msg->list.utc, out);
}
if (msg->list.flags & LIST_AACERTS)
{
- list_auth_certificates(this, AUTH_AA, "AA", msg->list.utc, out);
+ list_auth_certificates(AUTH_AA, "AA", msg->list.utc, out);
}
if (msg->list.flags & LIST_CAINFOS)
{
@@ -1453,8 +1443,7 @@ static void stroke_list(private_stroke_interface_t *this,
/**
* reread various information
*/
-static void stroke_reread(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_reread(stroke_msg_t *msg, FILE *out)
{
if (msg->reread.flags & REREAD_CACERTS)
{
@@ -1473,8 +1462,7 @@ static void stroke_reread(private_stroke_interface_t *this,
/**
* purge various information
*/
-static void stroke_purge(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_purge(stroke_msg_t *msg, FILE *out)
{
if (msg->purge.flags & PURGE_OCSP)
{
@@ -1510,8 +1498,7 @@ signal_t get_signal_from_logtype(char *type)
/**
* set the verbosity debug output
*/
-static void stroke_loglevel(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_loglevel(stroke_msg_t *msg, FILE *out)
{
signal_t signal;
@@ -1533,20 +1520,22 @@ static void stroke_loglevel(private_stroke_interface_t *this,
/**
* process a stroke request from the socket pointed by "fd"
*/
-static void stroke_process(private_stroke_interface_t *this, int strokefd)
+static job_requeue_t stroke_process(int *fdp)
{
stroke_msg_t *msg;
u_int16_t msg_length;
ssize_t bytes_read;
FILE *out;
+ int strokefd = *fdp;
/* peek the length */
bytes_read = recv(strokefd, &msg_length, sizeof(msg_length), MSG_PEEK);
if (bytes_read != sizeof(msg_length))
{
- DBG1(DBG_CFG, "reading length of stroke message failed");
+ DBG1(DBG_CFG, "reading length of stroke message failed: %s",
+ strerror(errno));
close(strokefd);
- return;
+ return JOB_REQUEUE_NONE;
}
/* read message */
@@ -1556,105 +1545,107 @@ static void stroke_process(private_stroke_interface_t *this, int strokefd)
{
DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno));
close(strokefd);
- return;
+ return JOB_REQUEUE_NONE;
}
- out = fdopen(dup(strokefd), "w");
+ out = fdopen(strokefd, "w");
if (out == NULL)
{
DBG1(DBG_CFG, "opening stroke output channel failed: %s", strerror(errno));
close(strokefd);
free(msg);
- return;
+ return JOB_REQUEUE_NONE;
}
DBG3(DBG_CFG, "stroke message %b", (void*)msg, msg_length);
+ /* the stroke_* functions are blocking, as they listen on the bus. Add
+ * cancellation handlers. */
+ pthread_cleanup_push((void*)fclose, out);
+ pthread_cleanup_push(free, msg);
+
switch (msg->type)
{
case STR_INITIATE:
- stroke_initiate(this, msg, out);
+ stroke_initiate(msg, out);
break;
case STR_ROUTE:
- stroke_route(this, msg, out);
+ stroke_route(msg, out);
break;
case STR_UNROUTE:
- stroke_unroute(this, msg, out);
+ stroke_unroute(msg, out);
break;
case STR_TERMINATE:
- stroke_terminate(this, msg, out);
+ stroke_terminate(msg, out);
break;
case STR_STATUS:
- stroke_status(this, msg, out, FALSE);
+ stroke_status(msg, out, FALSE);
break;
case STR_STATUS_ALL:
- stroke_status(this, msg, out, TRUE);
+ stroke_status(msg, out, TRUE);
break;
case STR_ADD_CONN:
- stroke_add_conn(this, msg, out);
+ stroke_add_conn(msg, out);
break;
case STR_DEL_CONN:
- stroke_del_conn(this, msg, out);
+ stroke_del_conn(msg, out);
break;
case STR_ADD_CA:
- stroke_add_ca(this, msg, out);
+ stroke_add_ca(msg, out);
break;
case STR_DEL_CA:
- stroke_del_ca(this, msg, out);
+ stroke_del_ca(msg, out);
break;
case STR_LOGLEVEL:
- stroke_loglevel(this, msg, out);
+ stroke_loglevel(msg, out);
break;
case STR_LIST:
- stroke_list(this, msg, out);
+ stroke_list(msg, out);
break;
case STR_REREAD:
- stroke_reread(this, msg, out);
+ stroke_reread(msg, out);
break;
case STR_PURGE:
- stroke_purge(this, msg, out);
+ stroke_purge(msg, out);
break;
default:
DBG1(DBG_CFG, "received unknown stroke");
}
- fclose(out);
- close(strokefd);
- free(msg);
+ /* remove and execute cancellation handlers */
+ pthread_cleanup_pop(1);
+ pthread_cleanup_pop(1);
+
+ return JOB_REQUEUE_NONE;
}
+
/**
* Implementation of private_stroke_interface_t.stroke_receive.
*/
-static void stroke_receive(private_stroke_interface_t *this)
+static job_requeue_t stroke_receive(private_stroke_interface_t *this)
{
struct sockaddr_un strokeaddr;
int strokeaddrlen = sizeof(strokeaddr);
+ int strokefd, *fdp;
int oldstate;
- int strokefd;
+ callback_job_t *job;
- charon->drop_capabilities(charon, TRUE);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+ strokefd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
+ pthread_setcancelstate(oldstate, NULL);
- /* ignore sigpipe. writing over the pipe back to the console
- * only fails if SIGPIPE is ignored. */
- signal(SIGPIPE, SIG_IGN);
-
- /* disable cancellation by default */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-
- while (TRUE)
+ if (strokefd < 0)
{
- /* wait for connections, but allow thread to terminate */
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
- strokefd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
- pthread_setcancelstate(oldstate, NULL);
-
- if (strokefd < 0)
- {
- DBG1(DBG_CFG, "accepting stroke connection failed: %s", strerror(errno));
- continue;
- }
- stroke_process(this, strokefd);
+ DBG1(DBG_CFG, "accepting stroke connection failed: %s", strerror(errno));
+ return JOB_REQUEUE_FAIR;
}
+
+ fdp = malloc_thing(int);
+ *fdp = strokefd;
+ job = callback_job_create((callback_job_cb_t)stroke_process, fdp, free, this->job);
+ charon->processor->queue_job(charon->processor, (job_t*)job);
+
+ return JOB_REQUEUE_FAIR;
}
/**
@@ -1662,17 +1653,9 @@ static void stroke_receive(private_stroke_interface_t *this)
*/
static void destroy(private_stroke_interface_t *this)
{
- int i;
-
- for (i = 0; i < STROKE_THREADS; i++)
- {
- pthread_cancel(this->threads[i]);
- pthread_join(this->threads[i], NULL);
- }
-
- close(this->socket);
- unlink(socket_addr.sun_path);
+ this->job->cancel(this->job);
free(this);
+ unlink(socket_addr.sun_path);
}
/*
@@ -1682,7 +1665,6 @@ interface_t *interface_create()
{
private_stroke_interface_t *this = malloc_thing(private_stroke_interface_t);
mode_t old;
- int i;
/* public functions */
this->public.interface.destroy = (void (*)(interface_t*))destroy;
@@ -1715,14 +1697,10 @@ interface_t *interface_create()
return NULL;
}
- /* start threads reading from the socket */
- for (i = 0; i < STROKE_THREADS; i++)
- {
- if (pthread_create(&this->threads[i], NULL, (void*(*)(void*))stroke_receive, this) != 0)
- {
- charon->kill(charon, "unable to create stroke thread");
- }
- }
+ this->job = callback_job_create((callback_job_cb_t)stroke_receive,
+ this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
return &this->public.interface;
}
+
diff --git a/src/charon/control/interfaces/xml_interface.c b/src/charon/control/interfaces/xml_interface.c
index e570f2543..8dd614493 100644
--- a/src/charon/control/interfaces/xml_interface.c
+++ b/src/charon/control/interfaces/xml_interface.c
@@ -24,10 +24,24 @@
#include "xml_interface.h"
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+#include <errno.h>
+#include <pthread.h>
+#include <signal.h>
+#include <libxml/xmlreader.h>
+#include <libxml/xmlwriter.h>
+
#include <library.h>
#include <daemon.h>
+static struct sockaddr_un socket_addr = { AF_UNIX, "/var/run/charon.xml"};
+
+
typedef struct private_xml_interface_t private_xml_interface_t;
/**
@@ -39,14 +53,171 @@ struct private_xml_interface_t {
* Public part of xml_t object.
*/
xml_interface_t public;
+
+ /**
+ * XML unix socket fd
+ */
+ int socket;
+
+ /**
+ * thread receiving messages
+ */
+ pthread_t thread;
};
+static void get(private_xml_interface_t *this,
+ xmlTextReaderPtr reader, xmlTextWriterPtr writer)
+{
+
+ if (/* <GetResponse> */
+ xmlTextWriterStartElement(writer, "GetResponse") < 0 ||
+ /* <Status Code="200"><Message/></Status> */
+ xmlTextWriterStartElement(writer, "Status") < 0 ||
+ xmlTextWriterWriteAttribute(writer, "Code", "200") < 0 ||
+ xmlTextWriterStartElement(writer, "Message") < 0 ||
+ xmlTextWriterEndElement(writer) < 0 ||
+ xmlTextWriterEndElement(writer) < 0 ||
+ /* <ConnectionList/> */
+ xmlTextWriterStartElement(writer, "ConnectionList") < 0 ||
+ xmlTextWriterEndElement(writer) < 0 ||
+ /* </GetResponse> */
+ xmlTextWriterEndElement(writer) < 0)
+ {
+ DBG1(DBG_CFG, "error writing XML document (GetResponse)");
+ }
+
+
+/*
+ DBG1(DBG_CFG, "%d %d %s %d %d %s",
+ xmlTextReaderDepth(reader),
+ ,
+ xmlTextReaderConstName(reader),
+ xmlTextReaderIsEmptyElement(reader),
+ xmlTextReaderHasValue(reader),
+ xmlTextReaderConstValue(reader));
+ */
+}
+
+static void receive(private_xml_interface_t *this)
+{
+ charon->drop_capabilities(charon, TRUE);
+
+ /* disable cancellation by default */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+
+ while (TRUE)
+ {
+ struct sockaddr_un strokeaddr;
+ int strokeaddrlen = sizeof(strokeaddr);
+ int oldstate;
+ int fd;
+ char buffer[4096];
+ size_t len;
+
+ /* wait for connections, but allow thread to terminate */
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+ fd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
+ pthread_setcancelstate(oldstate, NULL);
+
+ if (fd < 0)
+ {
+ DBG1(DBG_CFG, "accepting SMP XML socket failed: %s", strerror(errno));
+ continue;
+ }
+ DBG2(DBG_CFG, "SMP XML connection opened");
+ while (TRUE)
+ {
+ xmlTextReaderPtr reader;
+ xmlTextWriterPtr writer;
+
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+ len = read(fd, buffer, sizeof(buffer));
+ pthread_setcancelstate(oldstate, NULL);
+ if (len <= 0)
+ {
+ close(fd);
+ DBG2(DBG_CFG, "SMP XML connection closed");
+ break;
+ }
+
+ reader = xmlReaderForMemory(buffer, len, NULL, NULL, 0);
+ if (reader == NULL)
+ {
+ DBG1(DBG_CFG, "opening SMP XML reader failed");
+ continue;
+ }
+
+ writer = xmlNewTextWriter(xmlOutputBufferCreateFd(fd, NULL));
+ if (writer == NULL)
+ {
+ xmlFreeTextReader(reader);
+ DBG1(DBG_CFG, "opening SMP XML writer failed");
+ continue;
+ }
+
+ /* create the standard message parts */
+ if (xmlTextWriterStartDocument(writer, NULL, NULL, NULL) < 0 ||
+ /* <SMPMessage xmlns="http://www.strongswan.org/smp/1.0"> */
+ xmlTextWriterStartElement(writer, "SMPMessage") < 0 ||
+ xmlTextWriterWriteAttribute(writer, "xmlns",
+ "http://www.strongswan.org/smp/1.0") < 0 ||
+ /* <Body> */
+ xmlTextWriterStartElement(writer, "Body") < 0)
+ {
+ xmlFreeTextReader(reader);
+ xmlFreeTextWriter(writer);
+ DBG1(DBG_CFG, "creating SMP XML message failed");
+ continue;
+ }
+
+ while (TRUE)
+ {
+ switch (xmlTextReaderRead(reader))
+ {
+ case 1:
+ {
+ if (xmlTextReaderNodeType(reader) ==
+ XML_READER_TYPE_ELEMENT)
+ {
+ if (streq(xmlTextReaderConstName(reader), "GetRequest"))
+ {
+ get(this, reader, writer);
+ break;
+ }
+ }
+ continue;
+ }
+ case 0:
+ /* end of XML */
+ break;
+ default:
+ DBG1(DBG_CFG, "parsing SMP XML message failed");
+ break;
+ }
+ xmlFreeTextReader(reader);
+ break;
+ }
+ /* write </Body></SMPMessage> and close document */
+ if (xmlTextWriterEndDocument(writer) < 0)
+ {
+ DBG1(DBG_CFG, "completing SMP XML message failed");
+ }
+ xmlFreeTextWriter(writer);
+ /* write a newline to indicate end of xml */
+ write(fd, "\n", 1);
+ }
+ }
+}
/**
* Implementation of itnerface_t.destroy.
*/
static void destroy(private_xml_interface_t *this)
{
+ pthread_cancel(this->thread);
+ pthread_join(this->thread, NULL);
+ close(this->socket);
+ unlink(socket_addr.sun_path);
free(this);
}
@@ -56,8 +227,46 @@ static void destroy(private_xml_interface_t *this)
interface_t *interface_create()
{
private_xml_interface_t *this = malloc_thing(private_xml_interface_t);
+ mode_t old;
- this->public.interface.destroy = (void (*)(xml_interface_t*))destroy;
+ this->public.interface.destroy = (void (*)(interface_t*))destroy;
+
+ /* set up unix socket */
+ this->socket = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (this->socket == -1)
+ {
+ DBG1(DBG_CFG, "could not create XML socket");
+ free(this);
+ return NULL;
+ }
+
+ old = umask(~S_IRWXU);
+ if (bind(this->socket, (struct sockaddr *)&socket_addr, sizeof(socket_addr)) < 0)
+ {
+ DBG1(DBG_CFG, "could not bind XML socket: %s", strerror(errno));
+ close(this->socket);
+ free(this);
+ return NULL;
+ }
+ umask(old);
+
+ if (listen(this->socket, 0) < 0)
+ {
+ DBG1(DBG_CFG, "could not listen on XML socket: %s", strerror(errno));
+ close(this->socket);
+ free(this);
+ return NULL;
+ }
+
+ if (pthread_create(&this->thread, NULL, (void*(*)(void*))receive, this) != 0)
+ {
+ DBG1(DBG_CFG, "could not create XML socket thread: %s", strerror(errno));
+ close(this->socket);
+ unlink(socket_addr.sun_path);
+ free(this);
+ return NULL;
+ }
return &this->public.interface;
}
+