diff options
author | Martin Willi <martin@strongswan.org> | 2007-06-11 10:57:19 +0000 |
---|---|---|
committer | Martin Willi <martin@strongswan.org> | 2007-06-11 10:57:19 +0000 |
commit | 9fe1a1ca7617bb562750864aae1892ece1a6a1e6 (patch) | |
tree | 057d73714d52c09c40950927fede15e73cd6793b | |
parent | aca0317d92c4141e1b48c7081f39d8646bd4767d (diff) | |
download | strongswan-9fe1a1ca7617bb562750864aae1892ece1a6a1e6.tar.bz2 strongswan-9fe1a1ca7617bb562750864aae1892ece1a6a1e6.tar.xz |
introduced callback_job:
simple asynchronous method invocation
use daemons thread pool for all threads
proper cancellation and cleanups
cancellation mechanism to dynamically unload multithreaded code
unified event_queue and scheduler => scheduler
unified job_queue and thread_pool => processor
removed job_type_t, not really needed
fixes here, there and everywhere
36 files changed, 1606 insertions, 1778 deletions
diff --git a/src/charon/Makefile.am b/src/charon/Makefile.am index a64d9fa70..13a5ad253 100644 --- a/src/charon/Makefile.am +++ b/src/charon/Makefile.am @@ -48,12 +48,11 @@ network/packet.c network/packet.h \ network/receiver.c network/receiver.h \ network/sender.c network/sender.h \ network/socket.c network/socket.h \ -processing/event_queue.c processing/event_queue.h \ -processing/job_queue.c processing/job_queue.h \ +processing/jobs/job.h \ processing/jobs/acquire_job.c processing/jobs/acquire_job.h \ +processing/jobs/callback_job.c processing/jobs/callback_job.h \ processing/jobs/delete_child_sa_job.c processing/jobs/delete_child_sa_job.h \ processing/jobs/delete_ike_sa_job.c processing/jobs/delete_ike_sa_job.h \ -processing/jobs/job.c processing/jobs/job.h \ processing/jobs/process_message_job.c processing/jobs/process_message_job.h \ processing/jobs/rekey_child_sa_job.c processing/jobs/rekey_child_sa_job.h \ processing/jobs/rekey_ike_sa_job.c processing/jobs/rekey_ike_sa_job.h \ @@ -61,7 +60,7 @@ processing/jobs/retransmit_job.c processing/jobs/retransmit_job.h \ processing/jobs/send_dpd_job.c processing/jobs/send_dpd_job.h \ processing/jobs/send_keepalive_job.c processing/jobs/send_keepalive_job.h \ processing/scheduler.c processing/scheduler.h \ -processing/thread_pool.c processing/thread_pool.h \ +processing/processor.c processing/processor.h \ sa/authenticators/authenticator.c sa/authenticators/authenticator.h \ sa/authenticators/eap_authenticator.c sa/authenticators/eap_authenticator.h \ sa/authenticators/eap/eap_method.c sa/authenticators/eap/eap_method.h \ diff --git a/src/charon/bus/bus.c b/src/charon/bus/bus.c index 5f46cd29e..d1984d7f7 100644 --- a/src/charon/bus/bus.c +++ b/src/charon/bus/bus.c @@ -238,30 +238,13 @@ static active_listener_t *get_active_listener(private_bus_t *this) return found; } -typedef struct cancel_info_t cancel_info_t; - -/** - * cancellation info to cancel a listening operation cleanly - */ -struct cancel_info_t { - /** - * mutex to unlock on cancellation - */ - pthread_mutex_t *mutex; - - /** - * listener to unregister - */ - active_listener_t *listener; -}; - /** * disable a listener to cleanly clean up */ -static void unregister(cancel_info_t *info) +static void unregister(active_listener_t *listener) { - info->listener->state = UNREGISTERED; - pthread_mutex_unlock(info->mutex); + listener->state = UNREGISTERED; + pthread_cond_broadcast(&listener->cond); } /** @@ -272,7 +255,6 @@ static signal_t listen_(private_bus_t *this, level_t *level, int *thread, { active_listener_t *listener; int oldstate; - cancel_info_t info; pthread_mutex_lock(&this->mutex); listener = get_active_listener(this); @@ -281,13 +263,13 @@ static signal_t listen_(private_bus_t *this, level_t *level, int *thread, pthread_cond_broadcast(&listener->cond); /* wait until it has us delivered a signal, and go back to "registered". * we allow cancellation here, but must cleanly disable the listener. */ - info.mutex = &this->mutex; - info.listener = listener; - pthread_cleanup_push((void*)unregister, &info); + pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex); + pthread_cleanup_push((void*)unregister, listener); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); pthread_cond_wait(&listener->cond, &this->mutex); pthread_setcancelstate(oldstate, NULL); pthread_cleanup_pop(0); + pthread_cleanup_pop(0); pthread_mutex_unlock(&this->mutex); @@ -320,7 +302,7 @@ static void set_listen_state(private_bus_t *this, bool active) { listener->state = UNREGISTERED; /* say hello to signal emitter; we are finished processing the signal */ - pthread_cond_signal(&listener->cond); + pthread_cond_broadcast(&listener->cond); } pthread_mutex_unlock(&this->mutex); diff --git a/src/charon/control/interfaces/stroke_interface.c b/src/charon/control/interfaces/stroke_interface.c index 6e3427e8e..045d588f2 100755 --- a/src/charon/control/interfaces/stroke_interface.c +++ b/src/charon/control/interfaces/stroke_interface.c @@ -43,6 +43,7 @@ #include <control/interface_manager.h> #include <control/interfaces/interface.h> #include <utils/leak_detective.h> +#include <processing/jobs/callback_job.h> #define IKE_PORT 500 #define PATH_BUF 256 @@ -69,9 +70,9 @@ struct private_stroke_interface_t { int socket; /** - * Thread which reads from the Socket + * job accepting stroke messages */ - pthread_t threads[STROKE_THREADS]; + callback_job_t *job; }; typedef struct stroke_log_info_t stroke_log_info_t; @@ -224,8 +225,7 @@ static void pop_end(stroke_msg_t *msg, const char* label, stroke_end_t *end) /** * Add a connection to the configuration list */ -static void stroke_add_conn(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_add_conn(stroke_msg_t *msg, FILE *out) { ike_cfg_t *ike_cfg; peer_cfg_t *peer_cfg; @@ -628,8 +628,7 @@ destroy_hosts: /** * Delete a connection from the list */ -static void stroke_del_conn(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_del_conn(stroke_msg_t *msg, FILE *out) { iterator_t *peer_iter, *child_iter; peer_cfg_t *peer, *child; @@ -747,8 +746,7 @@ static peer_cfg_t *get_peer_cfg_by_name(char *name) /** * initiate a connection by name */ -static void stroke_initiate(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_initiate(stroke_msg_t *msg, FILE *out) { peer_cfg_t *peer_cfg; child_cfg_t *child_cfg; @@ -781,7 +779,6 @@ static void stroke_initiate(private_stroke_interface_t *this, info.out = out; info.level = msg->output_verbosity; - charon->interfaces->initiate(charon->interfaces, peer_cfg, child_cfg, (interface_manager_cb_t)stroke_log, &info); } @@ -789,8 +786,7 @@ static void stroke_initiate(private_stroke_interface_t *this, /** * route a policy (install SPD entries) */ -static void stroke_route(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_route(stroke_msg_t *msg, FILE *out) { peer_cfg_t *peer_cfg; child_cfg_t *child_cfg; @@ -830,8 +826,7 @@ static void stroke_route(private_stroke_interface_t *this, /** * unroute a policy */ -static void stroke_unroute(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_unroute(stroke_msg_t *msg, FILE *out) { char *name; ike_sa_t *ike_sa; @@ -874,8 +869,7 @@ static void stroke_unroute(private_stroke_interface_t *this, /** * terminate a connection by name */ -static void stroke_terminate(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_terminate(stroke_msg_t *msg, FILE *out) { char *string, *pos = NULL, *name = NULL; u_int32_t id = 0; @@ -979,8 +973,7 @@ static void stroke_terminate(private_stroke_interface_t *this, /** * Add a ca information record to the cainfo list */ -static void stroke_add_ca(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_add_ca(stroke_msg_t *msg, FILE *out) { x509_t *cacert; ca_info_t *ca_info; @@ -1047,8 +1040,7 @@ static void stroke_add_ca(private_stroke_interface_t *this, /** * Delete a ca information record from the cainfo list */ -static void stroke_del_ca(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_del_ca(stroke_msg_t *msg, FILE *out) { status_t status; @@ -1194,8 +1186,7 @@ static void log_child_sa(FILE *out, child_sa_t *child_sa, bool all) /** * show status of daemon */ -static void stroke_status(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out, bool all) +static void stroke_status(stroke_msg_t *msg, FILE *out, bool all) { iterator_t *iterator, *children; linked_list_t *list; @@ -1218,12 +1209,12 @@ static void stroke_status(private_stroke_interface_t *this, fprintf(out, "Performance:\n"); fprintf(out, " worker threads: %d idle of %d,", - charon->thread_pool->get_idle_threads(charon->thread_pool), - charon->thread_pool->get_pool_size(charon->thread_pool)); + charon->processor->get_idle_threads(charon->processor), + charon->processor->get_total_threads(charon->processor)); fprintf(out, " job queue load: %d,", - charon->job_queue->get_count(charon->job_queue)); + charon->processor->get_job_load(charon->processor)); fprintf(out, " scheduled events: %d\n", - charon->event_queue->get_count(charon->event_queue)); + charon->scheduler->get_job_load(charon->scheduler)); list = charon->kernel_interface->create_address_list(charon->kernel_interface); fprintf(out, "Listening on %d IP addresses:\n", list->get_count(list)); @@ -1312,8 +1303,8 @@ static void stroke_status(private_stroke_interface_t *this, /** * list all authority certificates matching a specified flag */ -static void list_auth_certificates(private_stroke_interface_t *this, u_int flag, - const char *label, bool utc, FILE *out) +static void list_auth_certificates(u_int flag, const char *label, + bool utc, FILE *out) { bool first = TRUE; x509_t *cert; @@ -1341,8 +1332,7 @@ static void list_auth_certificates(private_stroke_interface_t *this, u_int flag /** * list various information */ -static void stroke_list(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_list(stroke_msg_t *msg, FILE *out) { iterator_t *iterator; @@ -1372,15 +1362,15 @@ static void stroke_list(private_stroke_interface_t *this, } if (msg->list.flags & LIST_CACERTS) { - list_auth_certificates(this, AUTH_CA, "CA", msg->list.utc, out); + list_auth_certificates(AUTH_CA, "CA", msg->list.utc, out); } if (msg->list.flags & LIST_OCSPCERTS) { - list_auth_certificates(this, AUTH_OCSP, "OCSP", msg->list.utc, out); + list_auth_certificates(AUTH_OCSP, "OCSP", msg->list.utc, out); } if (msg->list.flags & LIST_AACERTS) { - list_auth_certificates(this, AUTH_AA, "AA", msg->list.utc, out); + list_auth_certificates(AUTH_AA, "AA", msg->list.utc, out); } if (msg->list.flags & LIST_CAINFOS) { @@ -1453,8 +1443,7 @@ static void stroke_list(private_stroke_interface_t *this, /** * reread various information */ -static void stroke_reread(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_reread(stroke_msg_t *msg, FILE *out) { if (msg->reread.flags & REREAD_CACERTS) { @@ -1473,8 +1462,7 @@ static void stroke_reread(private_stroke_interface_t *this, /** * purge various information */ -static void stroke_purge(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_purge(stroke_msg_t *msg, FILE *out) { if (msg->purge.flags & PURGE_OCSP) { @@ -1510,8 +1498,7 @@ signal_t get_signal_from_logtype(char *type) /** * set the verbosity debug output */ -static void stroke_loglevel(private_stroke_interface_t *this, - stroke_msg_t *msg, FILE *out) +static void stroke_loglevel(stroke_msg_t *msg, FILE *out) { signal_t signal; @@ -1533,20 +1520,22 @@ static void stroke_loglevel(private_stroke_interface_t *this, /** * process a stroke request from the socket pointed by "fd" */ -static void stroke_process(private_stroke_interface_t *this, int strokefd) +static job_requeue_t stroke_process(int *fdp) { stroke_msg_t *msg; u_int16_t msg_length; ssize_t bytes_read; FILE *out; + int strokefd = *fdp; /* peek the length */ bytes_read = recv(strokefd, &msg_length, sizeof(msg_length), MSG_PEEK); if (bytes_read != sizeof(msg_length)) { - DBG1(DBG_CFG, "reading length of stroke message failed"); + DBG1(DBG_CFG, "reading length of stroke message failed: %s", + strerror(errno)); close(strokefd); - return; + return JOB_REQUEUE_NONE; } /* read message */ @@ -1556,105 +1545,107 @@ static void stroke_process(private_stroke_interface_t *this, int strokefd) { DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno)); close(strokefd); - return; + return JOB_REQUEUE_NONE; } - out = fdopen(dup(strokefd), "w"); + out = fdopen(strokefd, "w"); if (out == NULL) { DBG1(DBG_CFG, "opening stroke output channel failed: %s", strerror(errno)); close(strokefd); free(msg); - return; + return JOB_REQUEUE_NONE; } DBG3(DBG_CFG, "stroke message %b", (void*)msg, msg_length); + /* the stroke_* functions are blocking, as they listen on the bus. Add + * cancellation handlers. */ + pthread_cleanup_push((void*)fclose, out); + pthread_cleanup_push(free, msg); + switch (msg->type) { case STR_INITIATE: - stroke_initiate(this, msg, out); + stroke_initiate(msg, out); break; case STR_ROUTE: - stroke_route(this, msg, out); + stroke_route(msg, out); break; case STR_UNROUTE: - stroke_unroute(this, msg, out); + stroke_unroute(msg, out); break; case STR_TERMINATE: - stroke_terminate(this, msg, out); + stroke_terminate(msg, out); break; case STR_STATUS: - stroke_status(this, msg, out, FALSE); + stroke_status(msg, out, FALSE); break; case STR_STATUS_ALL: - stroke_status(this, msg, out, TRUE); + stroke_status(msg, out, TRUE); break; case STR_ADD_CONN: - stroke_add_conn(this, msg, out); + stroke_add_conn(msg, out); break; case STR_DEL_CONN: - stroke_del_conn(this, msg, out); + stroke_del_conn(msg, out); break; case STR_ADD_CA: - stroke_add_ca(this, msg, out); + stroke_add_ca(msg, out); break; case STR_DEL_CA: - stroke_del_ca(this, msg, out); + stroke_del_ca(msg, out); break; case STR_LOGLEVEL: - stroke_loglevel(this, msg, out); + stroke_loglevel(msg, out); break; case STR_LIST: - stroke_list(this, msg, out); + stroke_list(msg, out); break; case STR_REREAD: - stroke_reread(this, msg, out); + stroke_reread(msg, out); break; case STR_PURGE: - stroke_purge(this, msg, out); + stroke_purge(msg, out); break; default: DBG1(DBG_CFG, "received unknown stroke"); } - fclose(out); - close(strokefd); - free(msg); + /* remove and execute cancellation handlers */ + pthread_cleanup_pop(1); + pthread_cleanup_pop(1); + + return JOB_REQUEUE_NONE; } + /** * Implementation of private_stroke_interface_t.stroke_receive. */ -static void stroke_receive(private_stroke_interface_t *this) +static job_requeue_t stroke_receive(private_stroke_interface_t *this) { struct sockaddr_un strokeaddr; int strokeaddrlen = sizeof(strokeaddr); + int strokefd, *fdp; int oldstate; - int strokefd; + callback_job_t *job; - charon->drop_capabilities(charon, TRUE); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); + strokefd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen); + pthread_setcancelstate(oldstate, NULL); - /* ignore sigpipe. writing over the pipe back to the console - * only fails if SIGPIPE is ignored. */ - signal(SIGPIPE, SIG_IGN); - - /* disable cancellation by default */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - - while (TRUE) + if (strokefd < 0) { - /* wait for connections, but allow thread to terminate */ - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); - strokefd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen); - pthread_setcancelstate(oldstate, NULL); - - if (strokefd < 0) - { - DBG1(DBG_CFG, "accepting stroke connection failed: %s", strerror(errno)); - continue; - } - stroke_process(this, strokefd); + DBG1(DBG_CFG, "accepting stroke connection failed: %s", strerror(errno)); + return JOB_REQUEUE_FAIR; } + + fdp = malloc_thing(int); + *fdp = strokefd; + job = callback_job_create((callback_job_cb_t)stroke_process, fdp, free, this->job); + charon->processor->queue_job(charon->processor, (job_t*)job); + + return JOB_REQUEUE_FAIR; } /** @@ -1662,17 +1653,9 @@ static void stroke_receive(private_stroke_interface_t *this) */ static void destroy(private_stroke_interface_t *this) { - int i; - - for (i = 0; i < STROKE_THREADS; i++) - { - pthread_cancel(this->threads[i]); - pthread_join(this->threads[i], NULL); - } - - close(this->socket); - unlink(socket_addr.sun_path); + this->job->cancel(this->job); free(this); + unlink(socket_addr.sun_path); } /* @@ -1682,7 +1665,6 @@ interface_t *interface_create() { private_stroke_interface_t *this = malloc_thing(private_stroke_interface_t); mode_t old; - int i; /* public functions */ this->public.interface.destroy = (void (*)(interface_t*))destroy; @@ -1715,14 +1697,10 @@ interface_t *interface_create() return NULL; } - /* start threads reading from the socket */ - for (i = 0; i < STROKE_THREADS; i++) - { - if (pthread_create(&this->threads[i], NULL, (void*(*)(void*))stroke_receive, this) != 0) - { - charon->kill(charon, "unable to create stroke thread"); - } - } + this->job = callback_job_create((callback_job_cb_t)stroke_receive, + this, NULL, NULL); + charon->processor->queue_job(charon->processor, (job_t*)this->job); return &this->public.interface; } + diff --git a/src/charon/control/interfaces/xml_interface.c b/src/charon/control/interfaces/xml_interface.c index e570f2543..8dd614493 100644 --- a/src/charon/control/interfaces/xml_interface.c +++ b/src/charon/control/interfaces/xml_interface.c @@ -24,10 +24,24 @@ #include "xml_interface.h" +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <unistd.h> +#include <errno.h> +#include <pthread.h> +#include <signal.h> +#include <libxml/xmlreader.h> +#include <libxml/xmlwriter.h> + #include <library.h> #include <daemon.h> +static struct sockaddr_un socket_addr = { AF_UNIX, "/var/run/charon.xml"}; + + typedef struct private_xml_interface_t private_xml_interface_t; /** @@ -39,14 +53,171 @@ struct private_xml_interface_t { * Public part of xml_t object. */ xml_interface_t public; + + /** + * XML unix socket fd + */ + int socket; + + /** + * thread receiving messages + */ + pthread_t thread; }; +static void get(private_xml_interface_t *this, + xmlTextReaderPtr reader, xmlTextWriterPtr writer) +{ + + if (/* <GetResponse> */ + xmlTextWriterStartElement(writer, "GetResponse") < 0 || + /* <Status Code="200"><Message/></Status> */ + xmlTextWriterStartElement(writer, "Status") < 0 || + xmlTextWriterWriteAttribute(writer, "Code", "200") < 0 || + xmlTextWriterStartElement(writer, "Message") < 0 || + xmlTextWriterEndElement(writer) < 0 || + xmlTextWriterEndElement(writer) < 0 || + /* <ConnectionList/> */ + xmlTextWriterStartElement(writer, "ConnectionList") < 0 || + xmlTextWriterEndElement(writer) < 0 || + /* </GetResponse> */ + xmlTextWriterEndElement(writer) < 0) + { + DBG1(DBG_CFG, "error writing XML document (GetResponse)"); + } + + +/* + DBG1(DBG_CFG, "%d %d %s %d %d %s", + xmlTextReaderDepth(reader), + , + xmlTextReaderConstName(reader), + xmlTextReaderIsEmptyElement(reader), + xmlTextReaderHasValue(reader), + xmlTextReaderConstValue(reader)); + */ +} + +static void receive(private_xml_interface_t *this) +{ + charon->drop_capabilities(charon, TRUE); + + /* disable cancellation by default */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + + while (TRUE) + { + struct sockaddr_un strokeaddr; + int strokeaddrlen = sizeof(strokeaddr); + int oldstate; + int fd; + char buffer[4096]; + size_t len; + + /* wait for connections, but allow thread to terminate */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); + fd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen); + pthread_setcancelstate(oldstate, NULL); + + if (fd < 0) + { + DBG1(DBG_CFG, "accepting SMP XML socket failed: %s", strerror(errno)); + continue; + } + DBG2(DBG_CFG, "SMP XML connection opened"); + while (TRUE) + { + xmlTextReaderPtr reader; + xmlTextWriterPtr writer; + + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); + len = read(fd, buffer, sizeof(buffer)); + pthread_setcancelstate(oldstate, NULL); + if (len <= 0) + { + close(fd); + DBG2(DBG_CFG, "SMP XML connection closed"); + break; + } + + reader = xmlReaderForMemory(buffer, len, NULL, NULL, 0); + if (reader == NULL) + { + DBG1(DBG_CFG, "opening SMP XML reader failed"); + continue; + } + + writer = xmlNewTextWriter(xmlOutputBufferCreateFd(fd, NULL)); + if (writer == NULL) + { + xmlFreeTextReader(reader); + DBG1(DBG_CFG, "opening SMP XML writer failed"); + continue; + } + + /* create the standard message parts */ + if (xmlTextWriterStartDocument(writer, NULL, NULL, NULL) < 0 || + /* <SMPMessage xmlns="http://www.strongswan.org/smp/1.0"> */ + xmlTextWriterStartElement(writer, "SMPMessage") < 0 || + xmlTextWriterWriteAttribute(writer, "xmlns", + "http://www.strongswan.org/smp/1.0") < 0 || + /* <Body> */ + xmlTextWriterStartElement(writer, "Body") < 0) + { + xmlFreeTextReader(reader); + xmlFreeTextWriter(writer); + DBG1(DBG_CFG, "creating SMP XML message failed"); + continue; + } + + while (TRUE) + { + switch (xmlTextReaderRead(reader)) + { + case 1: + { + if (xmlTextReaderNodeType(reader) == + XML_READER_TYPE_ELEMENT) + { + if (streq(xmlTextReaderConstName(reader), "GetRequest")) + { + get(this, reader, writer); + break; + } + } + continue; + } + case 0: + /* end of XML */ + break; + default: + DBG1(DBG_CFG, "parsing SMP XML message failed"); + break; + } + xmlFreeTextReader(reader); + break; + } + /* write </Body></SMPMessage> and close document */ + if (xmlTextWriterEndDocument(writer) < 0) + { + DBG1(DBG_CFG, "completing SMP XML message failed"); + } + xmlFreeTextWriter(writer); + /* write a newline to indicate end of xml */ + write(fd, "\n", 1); + } + } +} /** * Implementation of itnerface_t.destroy. */ static void destroy(private_xml_interface_t *this) { + pthread_cancel(this->thread); + pthread_join(this->thread, NULL); + close(this->socket); + unlink(socket_addr.sun_path); free(this); } @@ -56,8 +227,46 @@ static void destroy(private_xml_interface_t *this) interface_t *interface_create() { private_xml_interface_t *this = malloc_thing(private_xml_interface_t); + mode_t old; - this->public.interface.destroy = (void (*)(xml_interface_t*))destroy; + this->public.interface.destroy = (void (*)(interface_t*))destroy; + + /* set up unix socket */ + this->socket = socket(AF_UNIX, SOCK_STREAM, 0); + if (this->socket == -1) + { + DBG1(DBG_CFG, "could not create XML socket"); + free(this); + return NULL; + } + + old = umask(~S_IRWXU); + if (bind(this->socket, (struct sockaddr *)&socket_addr, sizeof(socket_addr)) < 0) + { + DBG1(DBG_CFG, "could not bind XML socket: %s", strerror(errno)); + close(this->socket); + free(this); + return NULL; + } + umask(old); + + if (listen(this->socket, 0) < 0) + { + DBG1(DBG_CFG, "could not listen on XML socket: %s", strerror(errno)); + close(this->socket); + free(this); + return NULL; + } + + if (pthread_create(&this->thread, NULL, (void*(*)(void*))receive, this) != 0) + { + DBG1(DBG_CFG, "could not create XML socket thread: %s", strerror(errno)); + close(this->socket); + unlink(socket_addr.sun_path); + free(this); + return NULL; + } return &this->public.interface; } + diff --git a/src/charon/daemon.c b/src/charon/daemon.c index 62e29b365..aa81442cd 100644 --- a/src/charon/daemon.c +++ b/src/charon/daemon.c @@ -115,25 +115,26 @@ static void dbg_stderr(int level, char *fmt, ...) */ static void run(private_daemon_t *this) { - /* reselect signals for this thread */ - sigemptyset(&(this->signal_set)); - sigaddset(&(this->signal_set), SIGINT); - sigaddset(&(this->signal_set), SIGHUP); - sigaddset(&(this->signal_set), SIGTERM); - pthread_sigmask(SIG_BLOCK, &(this->signal_set), 0); - - while(TRUE) + sigset_t set; + + /* handle SIGINT, SIGHUP ans SIGTERM in this handler */ + sigemptyset(&set); + sigaddset(&set, SIGINT); + sigaddset(&set, SIGHUP); + sigaddset(&set, SIGTERM); + + while (TRUE) { - int signal_number; + int sig; int error; - error = sigwait(&(this->signal_set), &signal_number); - if(error) + error = sigwait(&set, &sig); + if (error) { DBG1(DBG_DMN, "error %d while waiting for a signal", error); return; } - switch (signal_number) + switch (sig) { case SIGHUP: { @@ -146,11 +147,13 @@ static void run(private_daemon_t *this) return; } case SIGTERM: + { DBG1(DBG_DMN, "signal of type SIGTERM received. Shutting down"); return; + } default: { - DBG1(DBG_DMN, "unknown signal %d received. Ignored", signal_number); + DBG1(DBG_DMN, "unknown signal %d received. Ignored", sig); break; } } @@ -162,33 +165,22 @@ static void run(private_daemon_t *this) */ static void destroy(private_daemon_t *this) { - /* destruction is a non trivial task, we need to follow - * a strict order to prevent threading issues! - * Kill active threads first, except the sender, as - * the killed IKE_SA want to send delete messages. - */ - /* we don't want to receive anything anymore... */ - DESTROY_IF(this->public.receiver); - /* ignore all incoming user requests */ - DESTROY_IF(this->public.interfaces); - /* stop scheduing jobs */ - DESTROY_IF(this->public.scheduler); - /* stop processing jobs */ - DESTROY_IF(this->public.thread_pool); - /* shut down manager with all IKE SAs */ + /* terminate all idle threads */ + this->public.processor->set_threads(this->public.processor, 0); + /* close all IKE_SAs */ DESTROY_IF(this->public.ike_sa_manager); - /* all child SAs should be down now, so kill kernel interface */ - DESTROY_IF(this->public.kernel_interface); - /* destroy other infrastructure */ - DESTROY_IF(this->public.job_queue); - DESTROY_IF(this->public.event_queue); - DESTROY_IF(this->public.credentials); + DESTROY_IF(this->public.scheduler); + DESTROY_IF(this->public.interfaces); DESTROY_IF(this->public.backends); - /* we hope the sender could send the outstanding deletes, but - * we shut down here at any cost */ + DESTROY_IF(this->public.credentials); + DESTROY_IF(this->public.kernel_interface); DESTROY_IF(this->public.sender); + DESTROY_IF(this->public.receiver); DESTROY_IF(this->public.socket); - /* before destroying bus with its listeners, rehook library logs */ + /* wait until all threads are gone */ + DESTROY_IF(this->public.processor); + + /* rehook library logging, shutdown logging */ dbg = dbg_stderr; DESTROY_IF(this->public.bus); DESTROY_IF(this->public.outlog); @@ -197,7 +189,6 @@ static void destroy(private_daemon_t *this) free(this); } - /** * Enforce daemon shutdown, with a given reason to do so. */ @@ -228,6 +219,7 @@ static void drop_capabilities(private_daemon_t *this, bool full) { struct __user_cap_header_struct hdr; struct __user_cap_data_struct data; + /* CAP_NET_ADMIN is needed to use netlink */ u_int32_t keep = (1<<CAP_NET_ADMIN); @@ -242,11 +234,11 @@ static void drop_capabilities(private_daemon_t *this, bool full) } else { - /* CAP_NET_BIND_SERVICE to bind services below port 1024, - * CAP_NET_RAW to create RAW sockets. - * CAP_DAC_READ_SEARCH is needed to read ipsec.secrets */ + /* CAP_NET_BIND_SERVICE to bind services below port 1024 */ keep |= (1<<CAP_NET_BIND_SERVICE); + /* CAP_NET_RAW to create RAW sockets */ keep |= (1<<CAP_NET_RAW); + /* CAP_DAC_READ_SEARCH to read ipsec.secrets */ keep |= (1<<CAP_DAC_READ_SEARCH); } @@ -257,7 +249,7 @@ static void drop_capabilities(private_daemon_t *this, bool full) if (capset(&hdr, &data)) { - kill_daemon(this, "unable to drop threads capabilities"); + kill_daemon(this, "unable to drop daemon capabilities"); } } @@ -266,7 +258,6 @@ static void drop_capabilities(private_daemon_t *this, bool full) */ static void initialize(private_daemon_t *this, bool syslog, level_t levels[]) { - credential_store_t* credentials; signal_t signal; /* for uncritical pseudo random numbers */ @@ -298,38 +289,32 @@ static void initialize(private_daemon_t *this, bool syslog, level_t levels[]) DBG1(DBG_DMN, "starting charon (strongSwan Version %s)", VERSION); - this->public.socket = socket_create(IKEV2_UDP_PORT, IKEV2_NATT_PORT); this->public.ike_sa_manager = ike_sa_manager_create(); - this->public.job_queue = job_queue_create(); - this->public.event_queue = event_queue_create(); - this->public.credentials = (credential_store_t*)local_credential_store_create(); - this->public.backends = backend_manager_create(); - - /* initialize fetcher_t class */ - fetcher_initialize(); + this->public.processor = processor_create(); + this->public.scheduler = scheduler_create(); /* load secrets, ca certificates and crls */ - credentials = this->public.credentials; - credentials->load_ca_certificates(credentials); - credentials->load_aa_certificates(credentials); - credentials->load_attr_certificates(credentials); - credentials->load_ocsp_certificates(credentials); - credentials->load_crls(credentials); - credentials->load_secrets(credentials); - - /* start building threads, we are multi-threaded NOW */ + this->public.credentials = (credential_store_t*)local_credential_store_create(); + this->public.credentials->load_ca_certificates(this->public.credentials); + this->public.credentials->load_aa_certificates(this->public.credentials); + this->public.credentials->load_attr_certificates(this->public.credentials); + this->public.credentials->load_ocsp_certificates(this->public.credentials); + this->public.credentials->load_crls(this->public.credentials); + this->public.credentials->load_secrets(this->public.credentials); + this->public.interfaces = interface_manager_create(); + this->public.backends = backend_manager_create(); + this->public.kernel_interface = kernel_interface_create(); + this->public.socket = socket_create(IKEV2_UDP_PORT, IKEV2_NATT_PORT); this->public.sender = sender_create(); this->public.receiver = receiver_create(); - this->public.scheduler = scheduler_create(); - this->public.kernel_interface = kernel_interface_create(); - this->public.thread_pool = thread_pool_create(NUMBER_OF_WORKING_THREADS); + } /** * Handle SIGSEGV/SIGILL signals raised by threads */ -void signal_handler(int signal) +static void segv_handler(int signal) { #ifdef HAVE_BACKTRACE void *array[20]; @@ -340,7 +325,7 @@ void signal_handler(int signal) size = backtrace(array, 20); strings = backtrace_symbols(array, size); - DBG1(DBG_DMN, "thread %u received %s. Dumping %d frames from stack:", + DBG1(DBG_JOB, "thread %u received %s. Dumping %d frames from stack:", pthread_self(), signal == SIGSEGV ? "SIGSEGV" : "SIGILL", size); for (i = 0; i < size; i++) @@ -352,7 +337,7 @@ void signal_handler(int signal) DBG1(DBG_DMN, "thread %u received %s", pthread_self(), signal == SIGSEGV ? "SIGSEGV" : "SIGILL"); #endif /* HAVE_BACKTRACE */ - DBG1(DBG_DMN, "killing ourself hard after SIGSEGV"); + DBG1(DBG_DMN, "killing ourself, received critical signal"); raise(SIGKILL); } @@ -361,25 +346,22 @@ void signal_handler(int signal) */ private_daemon_t *daemon_create(void) { - private_daemon_t *this = malloc_thing(private_daemon_t); struct sigaction action; + private_daemon_t *this = malloc_thing(private_daemon_t); /* assign methods */ this->public.kill = (void (*) (daemon_t*,char*))kill_daemon; - this->public.drop_capabilities = (void(*)(daemon_t*,bool))drop_capabilities; /* NULL members for clean destruction */ this->public.socket = NULL; this->public.ike_sa_manager = NULL; - this->public.job_queue = NULL; - this->public.event_queue = NULL; this->public.credentials = NULL; this->public.backends = NULL; this->public.sender= NULL; this->public.receiver = NULL; this->public.scheduler = NULL; this->public.kernel_interface = NULL; - this->public.thread_pool = NULL; + this->public.processor = NULL; this->public.interfaces = NULL; this->public.bus = NULL; this->public.outlog = NULL; @@ -388,20 +370,19 @@ private_daemon_t *daemon_create(void) this->main_thread_id = pthread_self(); - /* setup signal handling for all threads */ - sigemptyset(&(this->signal_set)); - sigaddset(&(this->signal_set), SIGSEGV); - sigaddset(&(this->signal_set), SIGINT); - sigaddset(&(this->signal_set), SIGHUP); - sigaddset(&(this->signal_set), SIGTERM); - pthread_sigmask(SIG_BLOCK, &(this->signal_set), 0); - - /* setup SIGSEGV handler for all threads */ - action.sa_handler = signal_handler; - action.sa_mask = this->signal_set; + /* add handler for SEGV and ILL, + * add handler for USR1 (cancellation). + * INT, TERM and HUP are handled by sigwait() in run() */ + action.sa_handler = segv_handler; action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaddset(&action.sa_mask, SIGINT); + sigaddset(&action.sa_mask, SIGTERM); + sigaddset(&action.sa_mask, SIGHUP); sigaction(SIGSEGV, &action, NULL); sigaction(SIGILL, &action, NULL); + pthread_sigmask(SIG_SETMASK, &action.sa_mask, 0); + return this; } @@ -450,10 +431,12 @@ int main(int argc, char *argv[]) level_t levels[DBG_MAX]; int signal; - prctl(PR_SET_KEEPCAPS, 1); + private_charon = daemon_create(); + charon = (daemon_t*)private_charon; - /* drop the capabilities we won't need at all */ - drop_capabilities(NULL, FALSE); + /* drop the capabilities we won't need for initialization */ + prctl(PR_SET_KEEPCAPS, 1); + drop_capabilities(private_charon, FALSE); /* use CTRL loglevel for default */ for (signal = 0; signal < DBG_MAX; signal++) @@ -522,13 +505,11 @@ int main(int argc, char *argv[]) } break; } - - private_charon = daemon_create(); - charon = (daemon_t*)private_charon; /* initialize daemon */ initialize(private_charon, use_syslog, levels); - + /* initialize fetcher_t class */ + fetcher_initialize(); /* load pluggable EAP modules */ eap_method_load(eapdir); @@ -562,6 +543,9 @@ int main(int argc, char *argv[]) /* drop additional capabilites (bind & root) */ drop_capabilities(private_charon, TRUE); + /* start the engine, go multithreaded */ + charon->processor->set_threads(charon->processor, WORKER_THREADS); + /* run daemon */ run(private_charon); diff --git a/src/charon/daemon.h b/src/charon/daemon.h index 640bc6a09..534ae748e 100644 --- a/src/charon/daemon.h +++ b/src/charon/daemon.h @@ -33,9 +33,7 @@ typedef struct daemon_t daemon_t; #include <network/receiver.h> #include <network/socket.h> #include <processing/scheduler.h> -#include <processing/thread_pool.h> -#include <processing/job_queue.h> -#include <processing/event_queue.h> +#include <processing/processor.h> #include <kernel/kernel_interface.h> #include <control/interface_manager.h> #include <bus/bus.h> @@ -234,12 +232,9 @@ typedef struct daemon_t daemon_t; /** * @brief Number of threads in the thread pool. * - * There are several other threads, this defines - * only the number of threads in thread_pool_t. - * * @ingroup charon */ -#define NUMBER_OF_WORKING_THREADS 4 +#define WORKER_THREADS 16 /** * UDP Port on which the daemon will listen for incoming traffic. @@ -338,20 +333,11 @@ typedef struct daemon_t daemon_t; * @ingroup charon */ struct daemon_t { + /** * A socket_t instance. */ socket_t *socket; - - /** - * A job_queue_t instance. - */ - job_queue_t *job_queue; - - /** - * A event_queue_t instance. - */ - event_queue_t *event_queue; /** * A ike_sa_manager_t instance. @@ -384,9 +370,9 @@ struct daemon_t { scheduler_t *scheduler; /** - * The Thread pool managing the worker threads. + * Job processing using a thread pool. */ - thread_pool_t *thread_pool; + processor_t *processor; /** * The signaling bus. @@ -419,14 +405,6 @@ struct daemon_t { interface_manager_t *interfaces; /** - * @brief Let the calling thread drop its capabilities. - * - * @param this calling daemon - * @param full TRUE to drop as many as possible - */ - void (*drop_capabilities) (daemon_t *this, bool full); - - /** * @brief Shut down the daemon. * * @param this the daemon to kill diff --git a/src/charon/kernel/kernel_interface.c b/src/charon/kernel/kernel_interface.c index d82783b03..c1764b5a5 100644 --- a/src/charon/kernel/kernel_interface.c +++ b/src/charon/kernel/kernel_interface.c @@ -48,6 +48,7 @@ #include <processing/jobs/delete_child_sa_job.h> #include <processing/jobs/rekey_child_sa_job.h> #include <processing/jobs/acquire_job.h> +#include <processing/jobs/callback_job.h> /** kernel level protocol identifiers */ #define KERNEL_ESP 50 @@ -156,7 +157,7 @@ char* lookup_algorithm(kernel_algorithm_t *kernel_algo, } kernel_algo++; } - return NULL; + return NULL; } typedef struct route_entry_t route_entry_t; @@ -297,6 +298,11 @@ struct private_kernel_interface_t { * Mutex to lock access to vips. */ pthread_mutex_t vips_mutex; + + /** + * job receiving xfrm events + */ + callback_job_t *job; /** * netlink xfrm socket to receive acquire and expire events @@ -312,11 +318,6 @@ struct private_kernel_interface_t { * Netlink rt socket (routing) */ int socket_rt; - - /** - * Thread receiving events from kernel - */ - pthread_t event_thread; }; /** @@ -444,99 +445,98 @@ static void add_attribute(struct nlmsghdr *hdr, int rta_type, chunk_t data, /** * Receives events from kernel */ -static void receive_events(private_kernel_interface_t *this) +static job_requeue_t receive_events(private_kernel_interface_t *this) { - charon->drop_capabilities(charon, TRUE); + unsigned char response[512]; + struct nlmsghdr *hdr; + struct sockaddr_nl addr; + socklen_t addr_len = sizeof(addr); + int len, oldstate; + + hdr = (struct nlmsghdr*)response; + + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); + len = recvfrom(this->socket_xfrm_events, response, sizeof(response), 0, + (struct sockaddr*)&addr, &addr_len); + pthread_setcancelstate(oldstate, NULL); + + if (len < 0) + { + if (errno == EINTR) + { /* interrupted, try again */ + return JOB_REQUEUE_DIRECT; + } + charon->kill(charon, "unable to receive netlink events"); + } + + if (!NLMSG_OK(hdr, len)) + { + /* bad netlink message */ + return JOB_REQUEUE_DIRECT; + } - while(TRUE) + if (addr.nl_pid != 0) { - unsigned char response[512]; - struct nlmsghdr *hdr; - struct sockaddr_nl addr; - socklen_t addr_len = sizeof(addr); - int len; - - hdr = (struct nlmsghdr*)response; - len = recvfrom(this->socket_xfrm_events, response, sizeof(response), - 0, (struct sockaddr*)&addr, &addr_len); - if (len < 0) + /* not from kernel. not interested, try another one */ + return JOB_REQUEUE_DIRECT; + } + + /* we handle ACQUIRE and EXPIRE messages directly */ + if (hdr->nlmsg_type == XFRM_MSG_ACQUIRE) + { + u_int32_t reqid = 0; + job_t *job; + struct rtattr *rtattr = XFRM_RTA(hdr, struct xfrm_user_acquire); + size_t rtsize = XFRM_PAYLOAD(hdr, struct xfrm_user_tmpl); + if (RTA_OK(rtattr, rtsize)) { - if (errno == EINTR) + if (rtattr->rta_type == XFRMA_TMPL) { - /* interrupted, try again */ - continue; + struct xfrm_user_tmpl* tmpl = (struct xfrm_user_tmpl*)RTA_DATA(rtattr); + reqid = tmpl->reqid; } - charon->kill(charon, "unable to receive netlink events"); } - - if (!NLMSG_OK(hdr, len)) + if (reqid == 0) { - /* bad netlink message */ - continue; + DBG1(DBG_KNL, "received a XFRM_MSG_ACQUIRE, but no reqid found"); } - - if (addr.nl_pid != 0) + else { - /* not from kernel. not interested, try another one */ - continue; + DBG2(DBG_KNL, "received a XFRM_MSG_ACQUIRE"); + DBG1(DBG_KNL, "creating acquire job for CHILD_SA with reqid %d", + reqid); + job = (job_t*)acquire_job_create(reqid); + charon->processor->queue_job(charon->processor, job); } + } + else if (hdr->nlmsg_type == XFRM_MSG_EXPIRE) + { + job_t *job; + protocol_id_t protocol; + u_int32_t spi, reqid; + struct xfrm_user_expire *expire; + + expire = (struct xfrm_user_expire*)NLMSG_DATA(hdr); + protocol = expire->state.id.proto == KERNEL_ESP ? + PROTO_ESP : PROTO_AH; + spi = expire->state.id.spi; + reqid = expire->state.reqid; - /* we handle ACQUIRE and EXPIRE messages directly */ - if (hdr->nlmsg_type == XFRM_MSG_ACQUIRE) + DBG2(DBG_KNL, "received a XFRM_MSG_EXPIRE"); + DBG1(DBG_KNL, "creating %s job for %N CHILD_SA 0x%x (reqid %d)", + expire->hard ? "delete" : "rekey", protocol_id_names, + protocol, ntohl(spi), reqid); + if (expire->hard) { - u_int32_t reqid = 0; - job_t *job; - struct rtattr *rtattr = XFRM_RTA(hdr, struct xfrm_user_acquire); - size_t rtsize = XFRM_PAYLOAD(hdr, struct xfrm_user_tmpl); - if (RTA_OK(rtattr, rtsize)) - { - if (rtattr->rta_type == XFRMA_TMPL) - { - struct xfrm_user_tmpl* tmpl = (struct xfrm_user_tmpl*)RTA_DATA(rtattr); - reqid = tmpl->reqid; - } - } - if (reqid == 0) - { - DBG1(DBG_KNL, "received a XFRM_MSG_ACQUIRE, but no reqid found"); - } - else - { - DBG2(DBG_KNL, "received a XFRM_MSG_ACQUIRE"); - DBG1(DBG_KNL, "creating acquire job for CHILD_SA with reqid %d", - reqid); - job = (job_t*)acquire_job_create(reqid); - charon->job_queue->add(charon->job_queue, job); - } + job = (job_t*)delete_child_sa_job_create(reqid, protocol, spi); } - else if (hdr->nlmsg_type == XFRM_MSG_EXPIRE) + else { - job_t *job; - protocol_id_t protocol; - u_int32_t spi, reqid; - struct xfrm_user_expire *expire; - - expire = (struct xfrm_user_expire*)NLMSG_DATA(hdr); - protocol = expire->state.id.proto == KERNEL_ESP ? - PROTO_ESP : PROTO_AH; - spi = expire->state.id.spi; - reqid = expire->state.reqid; - - DBG2(DBG_KNL, "received a XFRM_MSG_EXPIRE"); - DBG1(DBG_KNL, "creating %s job for %N CHILD_SA 0x%x (reqid %d)", - expire->hard ? "delete" : "rekey", protocol_id_names, - protocol, ntohl(spi), reqid); - if (expire->hard) - { - job = (job_t*)delete_child_sa_job_create(reqid, protocol, spi); - } - else - { - job = (job_t*)rekey_child_sa_job_create(reqid, protocol, spi); - } - charon->job_queue->add(charon->job_queue, job); + job = (job_t*)rekey_child_sa_job_create(reqid, protocol, spi); } + charon->processor->queue_job(charon->processor, job); } + return JOB_REQUEUE_DIRECT; } /** @@ -1880,8 +1880,7 @@ static status_t del_policy(private_kernel_interface_t *this, */ static void destroy(private_kernel_interface_t *this) { - pthread_cancel(this->event_thread); - pthread_join(this->event_thread, NULL); + this->job->cancel(this->job); close(this->socket_xfrm_events); close(this->socket_xfrm); close(this->socket_rt); @@ -1961,14 +1960,10 @@ kernel_interface_t *kernel_interface_create() charon->kill(charon, "unable to bind XFRM event socket"); } - /* create a thread receiving ACQUIRE & EXPIRE events */ - if (pthread_create(&this->event_thread, NULL, - (void*(*)(void*))receive_events, this)) - { - charon->kill(charon, "unable to create xfrm event dispatcher thread"); - } + this->job = callback_job_create((callback_job_cb_t)receive_events, + this, NULL, NULL); + charon->processor->queue_job(charon->processor, (job_t*)this->job); return &this->public; } -/* vim: set ts=4 sw=4 noet: */ diff --git a/src/charon/network/receiver.c b/src/charon/network/receiver.c index 9b4bf71ac..1de1dd3d2 100644 --- a/src/charon/network/receiver.c +++ b/src/charon/network/receiver.c @@ -30,9 +30,9 @@ #include <daemon.h> #include <network/socket.h> #include <network/packet.h> -#include <processing/job_queue.h> #include <processing/jobs/job.h> #include <processing/jobs/process_message_job.h> +#include <processing/jobs/callback_job.h> /** length of the full cookie, including time (u_int32_t + SHA1()) */ #define COOKIE_LENGTH 24 @@ -56,12 +56,17 @@ struct private_receiver_t { /** * Public part of a receiver_t object. */ - receiver_t public; + receiver_t public; + + /** + * Threads job receiving packets + */ + callback_job_t *job; - /** - * Assigned thread. - */ - pthread_t assigned_thread; + /** + * Assigned thread. + */ + pthread_t assigned_thread; /** * current secret to use for cookie calculation @@ -245,94 +250,84 @@ static bool peer_to_aggressive(private_receiver_t *this, message_t *message) /** * Implementation of receiver_t.receive_packets. */ -static void receive_packets(private_receiver_t *this) +static job_requeue_t receive_packets(private_receiver_t *this) { packet_t *packet; message_t *message; job_t *job; - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - DBG1(DBG_NET, "receiver thread running, thread_ID: %06u", - (int)pthread_self()); + /* read in a packet */ + if (charon->socket->receive(charon->socket, &packet) != SUCCESS) + { + DBG2(DBG_NET, "receiving from socket failed!"); + return JOB_REQUEUE_FAIR; + } - charon->drop_capabilities(charon, TRUE); + /* parse message header */ + message = message_create_from_packet(packet); + if (message->parse_header(message) != SUCCESS) + { + DBG1(DBG_NET, "received invalid IKE header from %H - ignored", + packet->get_source(packet)); + message->destroy(message); + return JOB_REQUEUE_DIRECT; + } - while (TRUE) + /* check IKE major version */ + if (message->get_major_version(message) != IKE_MAJOR_VERSION) { - /* read in a packet */ - if (charon->socket->receive(charon->socket, &packet) != SUCCESS) - { - DBG2(DBG_NET, "receiving from socket failed!"); - /* try again after a delay */ - sleep(1); - continue; - } - - /* parse message header */ - message = message_create_from_packet(packet); - if (message->parse_header(message) != SUCCESS) + DBG1(DBG_NET, "received unsupported IKE version %d.%d from %H, " + "sending INVALID_MAJOR_VERSION", message->get_major_version(message), + message->get_minor_version(message), packet->get_source(packet)); + send_notify(message, INVALID_MAJOR_VERSION, chunk_empty); + message->destroy(message); + return JOB_REQUEUE_DIRECT; + } + + if (message->get_request(message) && + message->get_exchange_type(message) == IKE_SA_INIT) + { + /* check for cookies */ + if (cookie_required(this, message)) { - DBG1(DBG_NET, "received invalid IKE header from %H - ignored", - packet->get_source(packet)); + u_int32_t now = time(NULL); + chunk_t cookie = cookie_build(this, message, now - this->secret_offset, + chunk_from_thing(this->secret)); + + DBG2(DBG_NET, "received packet from: %#H to %#H", + message->get_source(message), + message->get_destination(message)); + DBG2(DBG_NET, "sending COOKIE notify to %H", + message->get_source(message)); + send_notify(message, COOKIE, cookie); + chunk_free(&cookie); + if (++this->secret_used > COOKIE_REUSE) + { + /* create new cookie */ + DBG1(DBG_NET, "generating new cookie secret after %d uses", + this->secret_used); + memcpy(this->secret_old, this->secret, SECRET_LENGTH); + this->randomizer->get_pseudo_random_bytes(this->randomizer, + SECRET_LENGTH, this->secret); + this->secret_switch = now; + this->secret_used = 0; + } message->destroy(message); - continue; + return JOB_REQUEUE_DIRECT; } - /* check IKE major version */ - if (message->get_major_version(message) != IKE_MAJOR_VERSION) + /* check if peer has not too many IKE_SAs half open */ + if (peer_to_aggressive(this, message)) { - DBG1(DBG_NET, "received unsupported IKE version %d.%d from %H, " - "sending INVALID_MAJOR_VERSION", message->get_major_version(message), - message->get_minor_version(message), packet->get_source(packet)); - send_notify(message, INVALID_MAJOR_VERSION, chunk_empty); + DBG1(DBG_NET, "ignoring IKE_SA setup from %H, " + "peer to aggressive", message->get_source(message)); message->destroy(message); - continue; - } - - if (message->get_request(message) && - message->get_exchange_type(message) == IKE_SA_INIT) - { - /* check for cookies */ - if (cookie_required(this, message)) - { - u_int32_t now = time(NULL); - chunk_t cookie = cookie_build(this, message, now - this->secret_offset, - chunk_from_thing(this->secret)); - - DBG2(DBG_NET, "received packet from: %#H to %#H", - message->get_source(message), - message->get_destination(message)); - DBG2(DBG_NET, "sending COOKIE notify to %H", - message->get_source(message)); - send_notify(message, COOKIE, cookie); - chunk_free(&cookie); - if (++this->secret_used > COOKIE_REUSE) - { - /* create new cookie */ - DBG1(DBG_NET, "generating new cookie secret after %d uses", - this->secret_used); - memcpy(this->secret_old, this->secret, SECRET_LENGTH); - this->randomizer->get_pseudo_random_bytes(this->randomizer, - SECRET_LENGTH, this->secret); - this->secret_switch = now; - this->secret_used = 0; - } - message->destroy(message); - continue; - } - - /* check if peer has not too many IKE_SAs half open */ - if (peer_to_aggressive(this, message)) - { - DBG1(DBG_NET, "ignoring IKE_SA setup from %H, " - "peer to aggressive", message->get_source(message)); - message->destroy(message); - continue; - } + return JOB_REQUEUE_DIRECT; } - job = (job_t *)process_message_job_create(message); - charon->job_queue->add(charon->job_queue, job); } + job = (job_t*)process_message_job_create(message); + charon->processor->queue_job(charon->processor, job); + return JOB_REQUEUE_DIRECT; } /** @@ -340,8 +335,7 @@ static void receive_packets(private_receiver_t *this) */ static void destroy(private_receiver_t *this) { - pthread_cancel(this->assigned_thread); - pthread_join(this->assigned_thread, NULL); + this->job->cancel(this->job); this->randomizer->destroy(this->randomizer); this->hasher->destroy(this->hasher); free(this); @@ -366,12 +360,10 @@ receiver_t *receiver_create() this->secret); memcpy(this->secret_old, this->secret, SECRET_LENGTH); - if (pthread_create(&this->assigned_thread, NULL, - (void*)receive_packets, this) != 0) - { - free(this); - charon->kill(charon, "unable to create receiver thread"); - } + this->job = callback_job_create((callback_job_cb_t)receive_packets, + this, NULL, NULL); + charon->processor->queue_job(charon->processor, (job_t*)this->job); return &this->public; } + diff --git a/src/charon/network/sender.c b/src/charon/network/sender.c index 933b8c192..f934dc509 100644 --- a/src/charon/network/sender.c +++ b/src/charon/network/sender.c @@ -28,6 +28,7 @@ #include <daemon.h> #include <network/socket.h> +#include <processing/jobs/callback_job.h> typedef struct private_sender_t private_sender_t; @@ -39,12 +40,12 @@ struct private_sender_t { /** * Public part of a sender_t object. */ - sender_t public; + sender_t public; - /** - * Assigned thread. - */ - pthread_t assigned_thread; + /** + * Sender threads job. + */ + callback_job_t *job; /** * The packets are stored in a linked list @@ -82,37 +83,29 @@ static void send_(private_sender_t *this, packet_t *packet) /** * Implementation of private_sender_t.send_packets. */ -static void send_packets(private_sender_t * this) +static job_requeue_t send_packets(private_sender_t * this) { - /* cancellation disabled by default */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - DBG1(DBG_NET, "sender thread running, thread_ID: %06u", (int)pthread_self()); + packet_t *packet; + int oldstate; - charon->drop_capabilities(charon, TRUE); - - while (TRUE) + pthread_mutex_lock(&this->mutex); + while (this->list->get_count(this->list) == 0) { - packet_t *packet; - int oldstate; - - pthread_mutex_lock(&this->mutex); - /* go to wait while no packets available */ - while (this->list->get_count(this->list) == 0) - { - /* add cleanup handler, wait for packet, remove cleanup handler */ - pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); - pthread_cond_wait(&this->condvar, &this->mutex); - - pthread_setcancelstate(oldstate, NULL); - pthread_cleanup_pop(0); - } - this->list->remove_first(this->list, (void**)&packet); - pthread_mutex_unlock(&this->mutex); + /* add cleanup handler, wait for packet, remove cleanup handler */ + pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); + + pthread_cond_wait(&this->condvar, &this->mutex); - charon->socket->send(charon->socket, packet); - packet->destroy(packet); + pthread_setcancelstate(oldstate, NULL); + pthread_cleanup_pop(0); } + this->list->remove_first(this->list, (void**)&packet); + pthread_mutex_unlock(&this->mutex); + + charon->socket->send(charon->socket, packet); + packet->destroy(packet); + return JOB_REQUEUE_DIRECT; } /** @@ -125,8 +118,7 @@ static void destroy(private_sender_t *this) { sched_yield(); } - pthread_cancel(this->assigned_thread); - pthread_join(this->assigned_thread, NULL); + this->job->cancel(this->job); this->list->destroy(this->list); free(this); } @@ -145,11 +137,10 @@ sender_t * sender_create() pthread_mutex_init(&this->mutex, NULL); pthread_cond_init(&this->condvar, NULL); - if (pthread_create(&this->assigned_thread, NULL, - (void*)send_packets, this) != 0) - { - charon->kill(charon, "unable to create sender thread"); - } + this->job = callback_job_create((callback_job_cb_t)send_packets, + this, NULL, NULL); + charon->processor->queue_job(charon->processor, (job_t*)this->job); - return &(this->public); + return &this->public; } + diff --git a/src/charon/processing/event_queue.c b/src/charon/processing/event_queue.c deleted file mode 100644 index 40bcb1ed8..000000000 --- a/src/charon/processing/event_queue.c +++ /dev/null @@ -1,290 +0,0 @@ -/** - * @file event_queue.c - * - * @brief Implementation of event_queue_t - * - */ - -/* - * Copyright (C) 2005-2006 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -#include <pthread.h> -#include <stdlib.h> - -#include "event_queue.h" - -#include <library.h> -#include <utils/linked_list.h> - - - -typedef struct event_t event_t; - -/** - * Event containing a job and a schedule time - */ -struct event_t { - /** - * Time to fire the event. - */ - timeval_t time; - - /** - * Every event has its assigned job. - */ - job_t * job; -}; - -/** - * destroy an event and its job - */ -static void event_destroy(event_t *event) -{ - event->job->destroy(event->job); - free(event); -} - -typedef struct private_event_queue_t private_event_queue_t; - -/** - * Private Variables and Functions of event_queue_t class. - */ -struct private_event_queue_t { - /** - * Public part. - */ - event_queue_t public; - - /** - * The events are stored in a linked list of type linked_list_t. - */ - linked_list_t *list; - - /** - * Access to linked_list is locked through this mutex. - */ - pthread_mutex_t mutex; - - /** - * If the queue is empty or an event has not to be fired - * a thread has to wait. - * - * This condvar is used to wake up such a thread. - */ - pthread_cond_t condvar; -}; - -/** - * Returns the difference of to timeval structs in milliseconds - */ -static long time_difference(struct timeval *end_time, struct timeval *start_time) -{ - time_t s; - suseconds_t us; - - s = (end_time->tv_sec - start_time->tv_sec); - us = (end_time->tv_usec - start_time->tv_usec); - return ((s * 1000) + us/1000); -} - -/** - * Implements event_queue_t.get_count - */ -static int get_count(private_event_queue_t *this) -{ - int count; - pthread_mutex_lock(&(this->mutex)); - count = this->list->get_count(this->list); - pthread_mutex_unlock(&(this->mutex)); - return count; -} - -/** - * Implements event_queue_t.get - */ -static job_t *get(private_event_queue_t *this) -{ - timespec_t timeout; - timeval_t current_time; - event_t * next_event; - job_t *job; - int oldstate; - - pthread_mutex_lock(&(this->mutex)); - - while (TRUE) - { - while(this->list->get_count(this->list) == 0) - { - /* add mutex unlock handler for cancellation, enable cancellation */ - pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&(this->mutex)); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); - - pthread_cond_wait( &(this->condvar), &(this->mutex)); - - /* reset cancellation, remove mutex-unlock handler (without executing) */ - pthread_setcancelstate(oldstate, NULL); - pthread_cleanup_pop(0); - } - - this->list->get_first(this->list, (void **)&next_event); - - gettimeofday(¤t_time, NULL); - long difference = time_difference(¤t_time,&(next_event->time)); - if (difference <= 0) - { - timeout.tv_sec = next_event->time.tv_sec; - timeout.tv_nsec = next_event->time.tv_usec * 1000; - - /* add mutex unlock handler for cancellation, enable cancellation */ - pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&(this->mutex)); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); - - pthread_cond_timedwait(&(this->condvar), &(this->mutex), &timeout); - - /* reset cancellation, remove mutex-unlock handler (without executing) */ - pthread_setcancelstate(oldstate, NULL); - pthread_cleanup_pop(0); - } - else - { - /* event available */ - this->list->remove_first(this->list, (void **)&next_event); - job = next_event->job; - free(next_event); - break; - } - } - pthread_cond_signal( &(this->condvar)); - pthread_mutex_unlock(&(this->mutex)); - - return job; -} - -/** - * Implements function add_absolute of event_queue_t. - * See #event_queue_s.add_absolute for description. - */ -static void add_absolute(private_event_queue_t *this, job_t *job, timeval_t time) -{ - event_t *event; - event_t *current_event; - iterator_t *iterator; - - /* create event */ - event = malloc_thing(event_t); - event->time = time; - event->job = job; - - pthread_mutex_lock(&(this->mutex)); - - /* while just used to break out */ - while(TRUE) - { - if (this->list->get_count(this->list) == 0) - { - this->list->insert_first(this->list,event); - break; - } - - /* check last entry */ - this->list->get_last(this->list,(void **) ¤t_event); - - if (time_difference(&(event->time), &(current_event->time)) >= 0) - { - /* my event has to be fired after the last event in list */ - this->list->insert_last(this->list,event); - break; - } - - /* check first entry */ - this->list->get_first(this->list,(void **) ¤t_event); - - if (time_difference(&(event->time), &(current_event->time)) < 0) - { - /* my event has to be fired before the first event in list */ - this->list->insert_first(this->list,event); - break; - } - - iterator = this->list->create_iterator(this->list,TRUE); - iterator->iterate(iterator, (void**)¤t_event); - /* first element has not to be checked (already done) */ - while(iterator->iterate(iterator, (void**)¤t_event)) - { - if (time_difference(&(event->time), &(current_event->time)) <= 0) - { - /* my event has to be fired before the current event in list */ - iterator->insert_before(iterator,event); - break; - } - } - iterator->destroy(iterator); - break; - } - - pthread_cond_signal( &(this->condvar)); - pthread_mutex_unlock(&(this->mutex)); -} - -/** - * Implements event_queue_t.add_relative. - */ -static void add_relative(event_queue_t *this, job_t *job, u_int32_t ms) -{ - timeval_t current_time; - timeval_t time; - - time_t s = ms / 1000; - suseconds_t us = (ms - s * 1000) * 1000; - - gettimeofday(¤t_time, NULL); - - time.tv_usec = (current_time.tv_usec + us) % 1000000; - time.tv_sec = current_time.tv_sec + (current_time.tv_usec + us)/1000000 + s; - - this->add_absolute(this, job, time); -} - - -/** - * Implements event_queue_t.destroy. - */ -static void event_queue_destroy(private_event_queue_t *this) -{ - this->list->destroy_function(this->list, (void*)event_destroy); - free(this); -} - -/* - * Documented in header - */ -event_queue_t *event_queue_create() -{ - private_event_queue_t *this = malloc_thing(private_event_queue_t); - - this->public.get_count = (int (*) (event_queue_t *event_queue)) get_count; - this->public.get = (job_t *(*) (event_queue_t *event_queue)) get; - this->public.add_absolute = (void (*) (event_queue_t *event_queue, job_t *job, timeval_t time)) add_absolute; - this->public.add_relative = (void (*) (event_queue_t *event_queue, job_t *job, u_int32_t ms)) add_relative; - this->public.destroy = (void (*) (event_queue_t *event_queue)) event_queue_destroy; - - this->list = linked_list_create(); - pthread_mutex_init(&(this->mutex), NULL); - pthread_cond_init(&(this->condvar), NULL); - - return (&this->public); -} diff --git a/src/charon/processing/event_queue.h b/src/charon/processing/event_queue.h deleted file mode 100644 index c85286bf2..000000000 --- a/src/charon/processing/event_queue.h +++ /dev/null @@ -1,118 +0,0 @@ -/** - * @file event_queue.h - * - * @brief Interface of job_queue_t. - * - */ - -/* - * Copyright (C) 2005-2006 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -#ifndef EVENT_QUEUE_H_ -#define EVENT_QUEUE_H_ - -typedef struct event_queue_t event_queue_t; - -#include <sys/time.h> - -#include <library.h> -#include <processing/jobs/job.h> - -/** - * @brief Event-Queue used to store timed events. - * - * Added events are sorted. The get method blocks until - * the time is elapsed to process the next event. The get - * method is called from the scheduler_t thread, which - * will add the jobs to to job_queue_t for further processing. - * - * Although the event-queue is based on a linked_list_t - * all access functions are thread-save implemented. - * - * @b Constructors: - * - event_queue_create() - * - * @ingroup processing - */ -struct event_queue_t { - - /** - * @brief Returns number of events in queue. - * - * @param event_queue calling object - * @return number of events in queue - */ - int (*get_count) (event_queue_t *event_queue); - - /** - * @brief Get the next job from the event-queue. - * - * If no event is pending, this function blocks until a job can be returned. - * - * @param event_queue calling object - * @param[out] job pointer to a job pointer where to job is returned to - * @return next job - */ - job_t *(*get) (event_queue_t *event_queue); - - /** - * @brief Adds a event to the queue, using a relative time. - * - * This function is non blocking and adds a job_t at a specific time to the list. - * The specific job object has to get destroyed by the thread which - * removes the job. - * - * @param event_queue calling object - * @param[in] job job to add to the queue (job is not copied) - * @param[in] time relative time, when the event has to get fired - */ - void (*add_relative) (event_queue_t *event_queue, job_t *job, u_int32_t ms); - - /** - * @brief Adds a event to the queue, using an absolute time. - * - * This function is non blocking and adds a job_t at a specific time to the list. - * The specific job object has to get destroyed by the thread which - * removes the job. - * - * @param event_queue calling object - * @param[in] job job to add to the queue (job is not copied) - * @param[in] time absolute time, when the event has to get fired - */ - void (*add_absolute) (event_queue_t *event_queue, job_t *job, timeval_t time); - - /** - * @brief Destroys a event_queue object. - * - * @warning The caller of this function has to make sure - * that no thread is going to add or get an event from the event_queue - * after calling this function. - * - * @param event_queue calling object - */ - void (*destroy) (event_queue_t *event_queue); -}; - -/** - * @brief Creates an empty event_queue. - * - * @returns event_queue_t object - * - * @ingroup processing - */ -event_queue_t *event_queue_create(void); - -#endif /*EVENT_QUEUE_H_*/ diff --git a/src/charon/processing/job_queue.c b/src/charon/processing/job_queue.c deleted file mode 100644 index 2310ca6ff..000000000 --- a/src/charon/processing/job_queue.c +++ /dev/null @@ -1,139 +0,0 @@ -/** - * @file job_queue.c - * - * @brief Implementation of job_queue_t - * - */ - -/* - * Copyright (C) 2005-2006 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -#include <stdlib.h> -#include <pthread.h> - -#include "job_queue.h" - -#include <utils/linked_list.h> - - -typedef struct private_job_queue_t private_job_queue_t; - -/** - * @brief Private Variables and Functions of job_queue class - * - */ -struct private_job_queue_t { - - /** - * public members - */ - job_queue_t public; - - /** - * The jobs are stored in a linked list - */ - linked_list_t *list; - - /** - * access to linked_list is locked through this mutex - */ - pthread_mutex_t mutex; - - /** - * If the queue is empty a thread has to wait - * This condvar is used to wake up such a thread - */ - pthread_cond_t condvar; -}; - - -/** - * implements job_queue_t.get_count - */ -static int get_count(private_job_queue_t *this) -{ - int count; - pthread_mutex_lock(&(this->mutex)); - count = this->list->get_count(this->list); - pthread_mutex_unlock(&(this->mutex)); - return count; -} - -/** - * implements job_queue_t.get - */ -static job_t *get(private_job_queue_t *this) -{ - int oldstate; - job_t *job; - pthread_mutex_lock(&(this->mutex)); - /* go to wait while no jobs available */ - while(this->list->get_count(this->list) == 0) - { - /* add mutex unlock handler for cancellation, enable cancellation */ - pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&(this->mutex)); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); - - pthread_cond_wait( &(this->condvar), &(this->mutex)); - - /* reset cancellation, remove mutex-unlock handler (without executing) */ - pthread_setcancelstate(oldstate, NULL); - pthread_cleanup_pop(0); - } - this->list->remove_first(this->list, (void **)&job); - pthread_mutex_unlock(&(this->mutex)); - return job; -} - -/** - * implements function job_queue_t.add - */ -static void add(private_job_queue_t *this, job_t *job) -{ - pthread_mutex_lock(&(this->mutex)); - this->list->insert_last(this->list,job); - pthread_cond_signal( &(this->condvar)); - pthread_mutex_unlock(&(this->mutex)); -} - -/** - * implements job_queue_t.destroy - */ -static void job_queue_destroy (private_job_queue_t *this) -{ - this->list->destroy_offset(this->list, offsetof(job_t, destroy)); - free(this); -} - -/* - * - * Documented in header - */ -job_queue_t *job_queue_create(void) -{ - private_job_queue_t *this = malloc_thing(private_job_queue_t); - - this->public.get_count = (int(*)(job_queue_t*))get_count; - this->public.get = (job_t*(*)(job_queue_t*))get; - this->public.add = (void(*)(job_queue_t*, job_t*))add; - this->public.destroy = (void(*)(job_queue_t*))job_queue_destroy; - - this->list = linked_list_create(); - pthread_mutex_init(&(this->mutex), NULL); - pthread_cond_init(&(this->condvar), NULL); - - return (&this->public); -} diff --git a/src/charon/processing/job_queue.h b/src/charon/processing/job_queue.h deleted file mode 100644 index 9b58588ae..000000000 --- a/src/charon/processing/job_queue.h +++ /dev/null @@ -1,100 +0,0 @@ -/** - * @file job_queue.h - * - * @brief Interface of job_queue_t. - * - */ - -/* - * Copyright (C) 2005-2006 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -#ifndef JOB_QUEUE_H_ -#define JOB_QUEUE_H_ - -typedef struct job_queue_t job_queue_t; - -#include <library.h> -#include <processing/jobs/job.h> - -/** - * @brief The job queue stores jobs, which will be processed by the thread_pool_t. - * - * Jobs are added from various sources, from the threads and - * from the event_queue_t. - * Although the job-queue is based on a linked_list_t - * all access functions are thread-save implemented. - * - * @b Constructors: - * - job_queue_create() - * - * @ingroup processing - */ -struct job_queue_t { - - /** - * @brief Returns number of jobs in queue. - * - * @param job_queue_t calling object - * @returns number of items in queue - */ - int (*get_count) (job_queue_t *job_queue); - - /** - * @brief Get the next job from the queue. - * - * If the queue is empty, this function blocks until a job can be returned. - * After using, the returned job has to get destroyed by the caller. - * - * @param job_queue_t calling object - * @param[out] job pointer to a job pointer where to job is returned to - * @return next job - */ - job_t *(*get) (job_queue_t *job_queue); - - /** - * @brief Adds a job to the queue. - * - * This function is non blocking and adds a job_t to the list. - * The specific job object has to get destroyed by the thread which - * removes the job. - * - * @param job_queue_t calling object - * @param job job to add to the queue (job is not copied) - */ - void (*add) (job_queue_t *job_queue, job_t *job); - - /** - * @brief Destroys a job_queue object. - * - * @warning The caller of this function has to make sure - * that no thread is going to add or get a job from the job_queue - * after calling this function. - * - * @param job_queue_t calling object - */ - void (*destroy) (job_queue_t *job_queue); -}; - -/** - * @brief Creates an empty job_queue. - * - * @return job_queue_t object - * - * @ingroup processing - */ -job_queue_t *job_queue_create(void); - -#endif /*JOB_QUEUE_H_*/ diff --git a/src/charon/processing/jobs/acquire_job.c b/src/charon/processing/jobs/acquire_job.c index b4ffb258d..48a77f558 100644 --- a/src/charon/processing/jobs/acquire_job.c +++ b/src/charon/processing/jobs/acquire_job.c @@ -43,17 +43,17 @@ struct private_acquire_job_t { }; /** - * Implementation of job_t.get_type. + * Implementation of job_t.destroy. */ -static job_type_t get_type(private_acquire_job_t *this) +static void destroy(private_acquire_job_t *this) { - return ACQUIRE; + free(this); } /** * Implementation of job_t.execute. */ -static status_t execute(private_acquire_job_t *this) +static void execute(private_acquire_job_t *this) { ike_sa_t *ike_sa; @@ -63,20 +63,14 @@ static status_t execute(private_acquire_job_t *this) { DBG2(DBG_JOB, "CHILD_SA with reqid %d not found for acquiring", this->reqid); - return DESTROY_ME; } - ike_sa->acquire(ike_sa, this->reqid); - - charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); - return DESTROY_ME; -} - -/** - * Implementation of job_t.destroy. - */ -static void destroy(private_acquire_job_t *this) -{ - free(this); + else + { + ike_sa->acquire(ike_sa, this->reqid); + + charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); + } + destroy(this); } /* @@ -87,12 +81,11 @@ acquire_job_t *acquire_job_create(u_int32_t reqid) private_acquire_job_t *this = malloc_thing(private_acquire_job_t); /* interface functions */ - this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type; - this->public.job_interface.execute = (status_t (*) (job_t *)) execute; + this->public.job_interface.execute = (void (*) (job_t *)) execute; this->public.job_interface.destroy = (void (*)(job_t*)) destroy; /* private variables */ this->reqid = reqid; - return &(this->public); + return &this->public; } diff --git a/src/charon/processing/jobs/callback_job.c b/src/charon/processing/jobs/callback_job.c new file mode 100644 index 000000000..86aa93c6c --- /dev/null +++ b/src/charon/processing/jobs/callback_job.c @@ -0,0 +1,213 @@ +/** + * @file callback_job.c + * + * @brief Implementation of callback_job_t. + * + */ + +/* + * Copyright (C) 2007 Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "callback_job.h" + +#include <daemon.h> + +typedef struct private_callback_job_t private_callback_job_t; + +/** + * Private data of an callback_job_t Object. + */ +struct private_callback_job_t { + /** + * Public callback_job_t interface. + */ + callback_job_t public; + + /** + * Callback to call on execution + */ + callback_job_cb_t callback; + + /** + * parameter to supply to callback + */ + void *data; + + /** + * cleanup function for data + */ + callback_job_cleanup_t cleanup; + + /** + * thread ID of the job, if running + */ + pthread_t thread; + + /** + * mutex to synchronize thread startup/cancellation + */ + pthread_mutex_t mutex; + + /** + * condvar to synchronize thread startup/cancellation + */ + pthread_cond_t condvar; + + /** + * list of asociated child jobs + */ + linked_list_t *children; + + /** + * parent of this job, or NULL + */ + private_callback_job_t *parent; +}; + +/** + * Implements job_t.destroy. + */ +static void destroy(private_callback_job_t *this) +{ + if (this->cleanup) + { + this->cleanup(this->data); + } + this->children->destroy(this->children); + free(this); +} + +/** + * unregister a child from its parent, if any. + */ +static void unregister(private_callback_job_t *this) +{ + if (this->parent) + { + iterator_t *iterator; + private_callback_job_t *child; + + pthread_mutex_lock(&this->parent->mutex); + iterator = this->parent->children->create_iterator(this->parent->children, TRUE); + while (iterator->iterate(iterator, (void**)&child)) + { + if (child == this) + { + iterator->remove(iterator); + break; + } + } + iterator->destroy(iterator); + pthread_mutex_unlock(&this->parent->mutex); + } +} + +/** + * Implementation of callback_job_t.cancel. + */ +static void cancel(private_callback_job_t *this) +{ + pthread_t thread; + + /* wait until thread has started */ + pthread_mutex_lock(&this->mutex); + while (this->thread == 0) + { + pthread_cond_wait(&this->condvar, &this->mutex); + } + thread = this->thread; + + /* terminate its children */ + this->children->invoke(this->children, offsetof(callback_job_t, cancel)); + pthread_mutex_unlock(&this->mutex); + + /* terminate thread */ + pthread_cancel(thread); + pthread_join(thread, NULL); +} + +/** + * Implementation of job_t.execute. + */ +static void execute(private_callback_job_t *this) +{ + bool cleanup = FALSE; + + pthread_mutex_lock(&this->mutex); + this->thread = pthread_self(); + pthread_cond_signal(&this->condvar); + pthread_mutex_unlock(&this->mutex); + + pthread_cleanup_push((void*)destroy, this); + while (TRUE) + { + switch (this->callback(this->data)) + { + case JOB_REQUEUE_DIRECT: + continue; + case JOB_REQUEUE_FAIR: + { + charon->processor->queue_job(charon->processor, + &this->public.job_interface); + break; + } + case JOB_REQUEUE_NONE: + default: + { + cleanup = TRUE; + break; + } + } + break; + } + unregister(this); + pthread_cleanup_pop(cleanup); +} + +/* + * Described in header. + */ +callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, + callback_job_cleanup_t cleanup, + callback_job_t *parent) +{ + private_callback_job_t *this = malloc_thing(private_callback_job_t); + + /* interface functions */ + this->public.job_interface.execute = (void (*) (job_t *)) execute; + this->public.job_interface.destroy = (void (*) (job_t *)) destroy; + this->public.cancel = (void(*)(callback_job_t*))cancel; + + /* private variables */ + pthread_mutex_init(&this->mutex, NULL); + pthread_cond_init(&this->condvar, NULL); + this->callback = cb; + this->data = data; + this->cleanup = cleanup; + this->thread = 0; + this->children = linked_list_create(); + this->parent = (private_callback_job_t*)parent; + + /* register us at parent */ + if (parent) + { + pthread_mutex_lock(&this->parent->mutex); + this->parent->children->insert_last(this->parent->children, this); + pthread_mutex_unlock(&this->parent->mutex); + } + + return &this->public; +} + diff --git a/src/charon/processing/jobs/callback_job.h b/src/charon/processing/jobs/callback_job.h new file mode 100644 index 000000000..5450cb61b --- /dev/null +++ b/src/charon/processing/jobs/callback_job.h @@ -0,0 +1,126 @@ +/** + * @file callback_job.h + * + * @brief Interface of callback_job_t. + * + */ + +/* + * Copyright (C) 2007 Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#ifndef CALLBACK_JOB_H_ +#define CALLBACK_JOB_H_ + +typedef struct callback_job_t callback_job_t; + +#include <library.h> +#include <processing/jobs/job.h> + + +typedef enum job_requeue_t job_requeue_t; + +/** + * @brief Job requeueing policy + */ +enum job_requeue_t { + + /** + * Do not requeue job, destroy it + */ + JOB_REQUEUE_NONE, + + /** + * Reque the job farly, meaning it has to queue as any other job + */ + JOB_REQUEUE_FAIR, + + /** + * Reexecute the job directly, without the need of requeing it + */ + JOB_REQUEUE_DIRECT, +}; + +/** + * @brief The callback function to use for the callback job. + * + * This is the function to use as callback for a callback job. It receives + * a parameter supplied to the callback jobs constructor. + * + * @param data param supplied to job + * @return requeing policy how to requeue the job + */ +typedef job_requeue_t (*callback_job_cb_t)(void *data); + +/** + * @brief Cleanup function to use for data cleanup. + * + * The callback has an optional user argument which receives data. However, + * this data may be cleaned up if it is allocated. This is the function + * to supply to the constructor. + * + * @param data param supplied to job + * @return requeing policy how to requeue the job + */ +typedef void (*callback_job_cleanup_t)(void *data); + +/** + * @brief Class representing an callback Job. + * + * This is a special job which allows a simple callback function to + * be executed by a thread of the thread pool. This allows simple execution + * of asynchronous methods, without to manage threads. + * + * @b Constructors: + * - callback_job_create() + * + * @ingroup jobs + */ +struct callback_job_t { + /** + * The job_t interface. + */ + job_t job_interface; + + /** + * @brief Cancel the jobs thread and wait for its termination. + * + * @param this calling object + */ + void (*cancel)(callback_job_t *this); +}; + +/** + * @brief Creates a callback job. + * + * The cleanup function is called when the job gets destroyed to destroy + * the associated data. + * If parent is not NULL, the specified job gets an association. Whenever + * the parent gets cancelled (or runs out), all of its children are cancelled, + * too. + * + * @param cb callback to call from the processor + * @param data user data to supply to callback + * @param cleanup destructor for data on destruction, or NULL + * @param parent parent of this job + * @return callback_job_t object + * + * @ingroup jobs + */ +callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, + callback_job_cleanup_t cleanup, + callback_job_t *parent); + +#endif /* CALLBACK_JOB_H_ */ + diff --git a/src/charon/processing/jobs/delete_child_sa_job.c b/src/charon/processing/jobs/delete_child_sa_job.c index f694696b0..23f330293 100644 --- a/src/charon/processing/jobs/delete_child_sa_job.c +++ b/src/charon/processing/jobs/delete_child_sa_job.c @@ -54,17 +54,17 @@ struct private_delete_child_sa_job_t { }; /** - * Implementation of job_t.get_type. + * Implementation of job_t.destroy. */ -static job_type_t get_type(private_delete_child_sa_job_t *this) +static void destroy(private_delete_child_sa_job_t *this) { - return DELETE_CHILD_SA; + free(this); } /** * Implementation of job_t.execute. */ -static status_t execute(private_delete_child_sa_job_t *this) +static void execute(private_delete_child_sa_job_t *this) { ike_sa_t *ike_sa; @@ -74,20 +74,14 @@ static status_t execute(private_delete_child_sa_job_t *this) { DBG1(DBG_JOB, "CHILD_SA with reqid %d not found for delete", this->reqid); - return DESTROY_ME; } - ike_sa->delete_child_sa(ike_sa, this->protocol, this->spi); - - charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); - return DESTROY_ME; -} - -/** - * Implementation of job_t.destroy. - */ -static void destroy(private_delete_child_sa_job_t *this) -{ - free(this); + else + { + ike_sa->delete_child_sa(ike_sa, this->protocol, this->spi); + + charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); + } + destroy(this); } /* @@ -100,8 +94,7 @@ delete_child_sa_job_t *delete_child_sa_job_create(u_int32_t reqid, private_delete_child_sa_job_t *this = malloc_thing(private_delete_child_sa_job_t); /* interface functions */ - this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type; - this->public.job_interface.execute = (status_t (*) (job_t *)) execute; + this->public.job_interface.execute = (void (*) (job_t *)) execute; this->public.job_interface.destroy = (void (*)(job_t*)) destroy; /* private variables */ @@ -109,5 +102,6 @@ delete_child_sa_job_t *delete_child_sa_job_create(u_int32_t reqid, this->protocol = protocol; this->spi = spi; - return &(this->public); + return &this->public; } + diff --git a/src/charon/processing/jobs/delete_ike_sa_job.c b/src/charon/processing/jobs/delete_ike_sa_job.c index 706155aa6..8d8c0cf36 100644 --- a/src/charon/processing/jobs/delete_ike_sa_job.c +++ b/src/charon/processing/jobs/delete_ike_sa_job.c @@ -47,18 +47,20 @@ struct private_delete_ike_sa_job_t { bool delete_if_established; }; + /** - * Implements job_t.get_type. + * Implements job_t.destroy. */ -static job_type_t get_type(private_delete_ike_sa_job_t *this) +static void destroy(private_delete_ike_sa_job_t *this) { - return DELETE_IKE_SA; + this->ike_sa_id->destroy(this->ike_sa_id); + free(this); } /** * Implementation of job_t.execute. */ -static status_t execute(private_delete_ike_sa_job_t *this) +static void execute(private_delete_ike_sa_job_t *this) { ike_sa_t *ike_sa; @@ -93,16 +95,7 @@ static status_t execute(private_delete_ike_sa_job_t *this) } } } - return DESTROY_ME; -} - -/** - * Implements job_t.destroy. - */ -static void destroy(private_delete_ike_sa_job_t *this) -{ - this->ike_sa_id->destroy(this->ike_sa_id); - free(this); + destroy(this); } /* @@ -114,8 +107,7 @@ delete_ike_sa_job_t *delete_ike_sa_job_create(ike_sa_id_t *ike_sa_id, private_delete_ike_sa_job_t *this = malloc_thing(private_delete_ike_sa_job_t); /* interface functions */ - this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type; - this->public.job_interface.execute = (status_t (*) (job_t *)) execute; + this->public.job_interface.execute = (void (*) (job_t *)) execute; this->public.job_interface.destroy = (void (*)(job_t *)) destroy;; /* private variables */ diff --git a/src/charon/processing/jobs/job.c b/src/charon/processing/jobs/job.c deleted file mode 100644 index d32d1bc61..000000000 --- a/src/charon/processing/jobs/job.c +++ /dev/null @@ -1,39 +0,0 @@ -/** - * @file job.c - * - * @brief Interface additions to job_t. - * - */ - -/* - * Copyright (C) 2005-2006 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - - -#include "job.h" - -ENUM(job_type_names, PROCESS_MESSAGE, SEND_DPD, - "PROCESS_MESSAGE", - "RETRANSMIT", - "INITIATE", - "ROUTE", - "ACQUIRE", - "DELETE_IKE_SA", - "DELETE_CHILD_SA", - "REKEY_CHILD_SA", - "REKEY_IKE_SA", - "SEND_KEEPALIVE", - "SEND_DPD", -); diff --git a/src/charon/processing/jobs/job.h b/src/charon/processing/jobs/job.h index 28632672d..1826c53b4 100644 --- a/src/charon/processing/jobs/job.h +++ b/src/charon/processing/jobs/job.h @@ -24,108 +24,14 @@ #ifndef JOB_H_ #define JOB_H_ -typedef enum job_type_t job_type_t; typedef struct job_t job_t; #include <library.h> -/** - * @brief Definition of the various job types. - * - * @ingroup jobs - */ -enum job_type_t { - /** - * Process an incoming IKEv2-Message. - * - * Job is implemented in class process_message_job_t - */ - PROCESS_MESSAGE, - - /** - * Retransmit an IKEv2-Message. - * - * Job is implemented in class retransmit_job_t - */ - RETRANSMIT, - - /** - * Set up a CHILD_SA, optional with an IKE_SA. - * - * Job is implemented in class initiate_job_t - */ - INITIATE, - - /** - * Install SPD entries. - * - * Job is implemented in class route_job_t - */ - ROUTE, - - /** - * React on a acquire message from the kernel (e.g. setup CHILD_SA) - * - * Job is implemented in class acquire_job_t - */ - ACQUIRE, - - /** - * Delete an IKE_SA. - * - * Job is implemented in class delete_ike_sa_job_t - */ - DELETE_IKE_SA, - - /** - * Delete a CHILD_SA. - * - * Job is implemented in class delete_child_sa_job_t - */ - DELETE_CHILD_SA, - - /** - * Rekey a CHILD_SA. - * - * Job is implemented in class rekey_child_sa_job_t - */ - REKEY_CHILD_SA, - - /** - * Rekey an IKE_SA. - * - * Job is implemented in class rekey_ike_sa_job_t - */ - REKEY_IKE_SA, - - /** - * Send a keepalive packet. - * - * Job is implemented in class type send_keepalive_job_t - */ - SEND_KEEPALIVE, - - /** - * Send a DPD packet. - * - * Job is implemented in class type send_dpd_job_t - */ - SEND_DPD -}; - -/** - * enum name for job_type_t - * - * @ingroup jobs - */ -extern enum_name_t *job_type_names; - /** * @brief Job-Interface as it is stored in the job queue. * - * A job consists of a job-type and one or more assigned values. - * * @b Constructors: * - None, use specific implementation of the interface. * @@ -134,32 +40,26 @@ extern enum_name_t *job_type_names; struct job_t { /** - * @brief get type of job. - * - * @param this calling object - * @return type of this job - */ - job_type_t (*get_type) (job_t *this); - - /** * @brief Execute a job. * - * Call the internall job routine to process the - * job. If this method returns DESTROY_ME, the job - * must be destroyed by the caller. + * The processing facility executes a job using this method. Jobs are + * one-shot, they destroy themself after execution, so don't use a job + * once it has been executed. * * @param this calling object - * @return status of job execution */ - status_t (*execute) (job_t *this); + void (*execute) (job_t *this); /** - * @brief Destroys a job_t object + * @brief Destroy a job. + * + * Is only called whenever a job was not executed (e.g. due daemon shutdown). + * After execution, jobs destroy themself. * * @param job_t calling object */ void (*destroy) (job_t *job); }; - #endif /* JOB_H_ */ + diff --git a/src/charon/processing/jobs/process_message_job.c b/src/charon/processing/jobs/process_message_job.c index ee7484bbd..6a0921248 100644 --- a/src/charon/processing/jobs/process_message_job.c +++ b/src/charon/processing/jobs/process_message_job.c @@ -44,17 +44,18 @@ struct private_process_message_job_t { }; /** - * Implements job_t.get_type. + * Implements job_t.destroy. */ -static job_type_t get_type(private_process_message_job_t *this) +static void destroy(private_process_message_job_t *this) { - return PROCESS_MESSAGE; + this->message->destroy(this->message); + free(this); } /** * Implementation of job_t.execute. */ -static status_t execute(private_process_message_job_t *this) +static void execute(private_process_message_job_t *this) { ike_sa_t *ike_sa; @@ -75,16 +76,7 @@ static status_t execute(private_process_message_job_t *this) charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } } - return DESTROY_ME; -} - -/** - * Implements job_t.destroy. - */ -static void destroy(private_process_message_job_t *this) -{ - this->message->destroy(this->message); - free(this); + destroy(this); } /* @@ -95,8 +87,7 @@ process_message_job_t *process_message_job_create(message_t *message) private_process_message_job_t *this = malloc_thing(private_process_message_job_t); /* interface functions */ - this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type; - this->public.job_interface.execute = (status_t (*) (job_t *)) execute; + this->public.job_interface.execute = (void (*) (job_t *)) execute; this->public.job_interface.destroy = (void(*)(job_t*))destroy; /* private variables */ diff --git a/src/charon/processing/jobs/rekey_child_sa_job.c b/src/charon/processing/jobs/rekey_child_sa_job.c index 3422b614d..f754e5a1f 100644 --- a/src/charon/processing/jobs/rekey_child_sa_job.c +++ b/src/charon/processing/jobs/rekey_child_sa_job.c @@ -53,17 +53,17 @@ struct private_rekey_child_sa_job_t { }; /** - * Implementation of job_t.get_type. + * Implementation of job_t.destroy. */ -static job_type_t get_type(private_rekey_child_sa_job_t *this) +static void destroy(private_rekey_child_sa_job_t *this) { - return REKEY_CHILD_SA; + free(this); } /** * Implementation of job_t.execute. */ -static status_t execute(private_rekey_child_sa_job_t *this) +static void execute(private_rekey_child_sa_job_t *this) { ike_sa_t *ike_sa; @@ -73,20 +73,13 @@ static status_t execute(private_rekey_child_sa_job_t *this) { DBG2(DBG_JOB, "CHILD_SA with reqid %d not found for rekeying", this->reqid); - return DESTROY_ME; } - ike_sa->rekey_child_sa(ike_sa, this->protocol, this->spi); - - charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); - return DESTROY_ME; -} - -/** - * Implementation of job_t.destroy. - */ -static void destroy(private_rekey_child_sa_job_t *this) -{ - free(this); + else + { + ike_sa->rekey_child_sa(ike_sa, this->protocol, this->spi); + charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); + } + destroy(this); } /* @@ -99,8 +92,7 @@ rekey_child_sa_job_t *rekey_child_sa_job_create(u_int32_t reqid, private_rekey_child_sa_job_t *this = malloc_thing(private_rekey_child_sa_job_t); /* interface functions */ - this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type; - this->public.job_interface.execute = (status_t (*) (job_t *)) execute; + this->public.job_interface.execute = (void (*) (job_t *)) execute; this->public.job_interface.destroy = (void (*)(job_t*)) destroy; /* private variables */ @@ -108,5 +100,5 @@ rekey_child_sa_job_t *rekey_child_sa_job_create(u_int32_t reqid, this->protocol = protocol; this->spi = spi; - return &(this->public); + return &this->public; } diff --git a/src/charon/processing/jobs/rekey_ike_sa_job.c b/src/charon/processing/jobs/rekey_ike_sa_job.c index f6c058634..25785221d 100644 --- a/src/charon/processing/jobs/rekey_ike_sa_job.c +++ b/src/charon/processing/jobs/rekey_ike_sa_job.c @@ -48,17 +48,18 @@ struct private_rekey_ike_sa_job_t { }; /** - * Implementation of job_t.get_type. + * Implementation of job_t.destroy. */ -static job_type_t get_type(private_rekey_ike_sa_job_t *this) +static void destroy(private_rekey_ike_sa_job_t *this) { - return REKEY_IKE_SA; + this->ike_sa_id->destroy(this->ike_sa_id); + free(this); } /** * Implementation of job_t.execute. */ -static status_t execute(private_rekey_ike_sa_job_t *this) +static void execute(private_rekey_ike_sa_job_t *this) { ike_sa_t *ike_sa; status_t status = SUCCESS; @@ -68,36 +69,28 @@ static status_t execute(private_rekey_ike_sa_job_t *this) if (ike_sa == NULL) { DBG2(DBG_JOB, "IKE_SA to rekey not found"); - return DESTROY_ME; - } - - if (this->reauth) - { - ike_sa->reestablish(ike_sa); } else { - status = ike_sa->rekey(ike_sa); - } - - if (status == DESTROY_ME) - { - charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa); - } - else - { - charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); + if (this->reauth) + { + ike_sa->reestablish(ike_sa); + } + else + { + status = ike_sa->rekey(ike_sa); + } + + if (status == DESTROY_ME) + { + charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa); + } + else + { + charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); + } } - return DESTROY_ME; -} - -/** - * Implementation of job_t.destroy. - */ -static void destroy(private_rekey_ike_sa_job_t *this) -{ - this->ike_sa_id->destroy(this->ike_sa_id); - free(this); + destroy(this); } /* @@ -108,8 +101,7 @@ rekey_ike_sa_job_t *rekey_ike_sa_job_create(ike_sa_id_t *ike_sa_id, bool reauth) private_rekey_ike_sa_job_t *this = malloc_thing(private_rekey_ike_sa_job_t); /* interface functions */ - this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type; - this->public.job_interface.execute = (status_t (*) (job_t *)) execute; + this->public.job_interface.execute = (void (*) (job_t *)) execute; this->public.job_interface.destroy = (void (*)(job_t*)) destroy; /* private variables */ diff --git a/src/charon/processing/jobs/retransmit_job.c b/src/charon/processing/jobs/retransmit_job.c index 5bfa20dfd..8c15aa651 100644 --- a/src/charon/processing/jobs/retransmit_job.c +++ b/src/charon/processing/jobs/retransmit_job.c @@ -48,17 +48,18 @@ struct private_retransmit_job_t { }; /** - * Implements job_t.get_type. + * Implements job_t.destroy. */ -static job_type_t get_type(private_retransmit_job_t *this) +static void destroy(private_retransmit_job_t *this) { - return RETRANSMIT; + this->ike_sa_id->destroy(this->ike_sa_id); + free(this); } /** * Implementation of job_t.execute. */ -static status_t execute(private_retransmit_job_t *this) +static void execute(private_retransmit_job_t *this) { ike_sa_t *ike_sa; @@ -77,16 +78,7 @@ static status_t execute(private_retransmit_job_t *this) charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } } - return DESTROY_ME; -} - -/** - * Implements job_t.destroy. - */ -static void destroy(private_retransmit_job_t *this) -{ - this->ike_sa_id->destroy(this->ike_sa_id); - free(this); + destroy(this); } /* @@ -97,8 +89,7 @@ retransmit_job_t *retransmit_job_create(u_int32_t message_id,ike_sa_id_t *ike_sa private_retransmit_job_t *this = malloc_thing(private_retransmit_job_t); /* interface functions */ - this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type; - this->public.job_interface.execute = (status_t (*) (job_t *)) execute; + this->public.job_interface.execute = (void (*) (job_t *)) execute; this->public.job_interface.destroy = (void (*) (job_t *)) destroy; /* private variables */ diff --git a/src/charon/processing/jobs/send_dpd_job.c b/src/charon/processing/jobs/send_dpd_job.c index 7294d78d5..cb259d9c6 100644 --- a/src/charon/processing/jobs/send_dpd_job.c +++ b/src/charon/processing/jobs/send_dpd_job.c @@ -47,45 +47,35 @@ struct private_send_dpd_job_t { }; /** - * Implements send_dpd_job_t.get_type. + * Implements job_t.destroy. */ -static job_type_t get_type(private_send_dpd_job_t *this) +static void destroy(private_send_dpd_job_t *this) { - return SEND_DPD; + this->ike_sa_id->destroy(this->ike_sa_id); + free(this); } /** * Implementation of job_t.execute. */ -static status_t execute(private_send_dpd_job_t *this) +static void execute(private_send_dpd_job_t *this) { ike_sa_t *ike_sa; ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager, this->ike_sa_id); - if (ike_sa == NULL) - { - return DESTROY_ME; - } - - if (ike_sa->send_dpd(ike_sa) == DESTROY_ME) - { - charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa); - } - else + if (ike_sa) { - charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); + if (ike_sa->send_dpd(ike_sa) == DESTROY_ME) + { + charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa); + } + else + { + charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); + } } - return DESTROY_ME; -} - -/** - * Implements job_t.destroy. - */ -static void destroy(private_send_dpd_job_t *this) -{ - this->ike_sa_id->destroy(this->ike_sa_id); - free(this); + destroy(this); } /* @@ -96,9 +86,8 @@ send_dpd_job_t *send_dpd_job_create(ike_sa_id_t *ike_sa_id) private_send_dpd_job_t *this = malloc_thing(private_send_dpd_job_t); /* interface functions */ - this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type; this->public.job_interface.destroy = (void (*) (job_t *)) destroy; - this->public.job_interface.execute = (status_t (*) (job_t *)) execute; + this->public.job_interface.execute = (void (*) (job_t *)) execute; /* public functions */ this->public.destroy = (void (*)(send_dpd_job_t *)) destroy; @@ -106,5 +95,5 @@ send_dpd_job_t *send_dpd_job_create(ike_sa_id_t *ike_sa_id) /* private variables */ this->ike_sa_id = ike_sa_id->clone(ike_sa_id); - return &(this->public); + return &this->public; } diff --git a/src/charon/processing/jobs/send_keepalive_job.c b/src/charon/processing/jobs/send_keepalive_job.c index 1c1cb288e..6d529e1b3 100644 --- a/src/charon/processing/jobs/send_keepalive_job.c +++ b/src/charon/processing/jobs/send_keepalive_job.c @@ -47,38 +47,29 @@ struct private_send_keepalive_job_t { }; /** - * Implements send_keepalive_job_t.get_type. + * Implements job_t.destroy. */ -static job_type_t get_type(private_send_keepalive_job_t *this) +static void destroy(private_send_keepalive_job_t *this) { - return SEND_KEEPALIVE; + this->ike_sa_id->destroy(this->ike_sa_id); + free(this); } /** * Implementation of job_t.execute. */ -static status_t execute(private_send_keepalive_job_t *this) +static void execute(private_send_keepalive_job_t *this) { ike_sa_t *ike_sa; ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager, this->ike_sa_id); - if (ike_sa == NULL) + if (ike_sa) { - return DESTROY_ME; + ike_sa->send_keepalive(ike_sa); + charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } - ike_sa->send_keepalive(ike_sa); - charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); - return DESTROY_ME; -} - -/** - * Implements job_t.destroy. - */ -static void destroy(private_send_keepalive_job_t *this) -{ - this->ike_sa_id->destroy(this->ike_sa_id); - free(this); + destroy(this); } /* @@ -89,9 +80,8 @@ send_keepalive_job_t *send_keepalive_job_create(ike_sa_id_t *ike_sa_id) private_send_keepalive_job_t *this = malloc_thing(private_send_keepalive_job_t); /* interface functions */ - this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type; this->public.job_interface.destroy = (void (*) (job_t *)) destroy; - this->public.job_interface.execute = (status_t (*) (job_t *)) execute; + this->public.job_interface.execute = (void (*) (job_t *)) execute; /* public functions */ this->public.destroy = (void (*)(send_keepalive_job_t *)) destroy; @@ -99,5 +89,5 @@ send_keepalive_job_t *send_keepalive_job_create(ike_sa_id_t *ike_sa_id) /* private variables */ this->ike_sa_id = ike_sa_id->clone(ike_sa_id); - return &(this->public); + return &this->public; } diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c new file mode 100644 index 000000000..b3815eeb1 --- /dev/null +++ b/src/charon/processing/processor.c @@ -0,0 +1,233 @@ +/** + * @file processor.c + * + * @brief Implementation of processor_t. + * + */ + +/* + * Copyright (C) 2005-2007 Martin Willi + * Copyright (C) 2005 Jan Hutter + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include <stdlib.h> +#include <pthread.h> +#include <string.h> +#include <errno.h> + +#include "processor.h" + +#include <daemon.h> +#include <utils/linked_list.h> + + +typedef struct private_processor_t private_processor_t; + +/** + * @brief Private data of processor_t class. + */ +struct private_processor_t { + /** + * Public processor_t interface. + */ + processor_t public; + + /** + * Number of running threads + */ + u_int total_threads; + + /** + * Desired number of threads + */ + u_int desired_threads; + + /** + * Number of threads waiting for work + */ + u_int idle_threads; + + /** + * The jobs are stored in a linked list + */ + linked_list_t *list; + + /** + * access to linked_list is locked through this mutex + */ + pthread_mutex_t mutex; + + /** + * Condvar to wait for new jobs + */ + pthread_cond_t condvar; +}; + +static void process_jobs(private_processor_t *this); + +/** + * restart a terminated thread + */ +static void restart(private_processor_t *this) +{ + pthread_t thread; + + if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0) + { + this->total_threads--; + } +} + +/** + * Process queued jobs, called by the worker threads + */ +static void process_jobs(private_processor_t *this) +{ + int oldstate; + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate); + + DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self()); + + pthread_mutex_lock(&this->mutex); + while (this->desired_threads >= this->total_threads) + { + job_t *job; + + if (this->list->get_count(this->list) == 0) + { + this->idle_threads++; + pthread_cond_wait(&this->condvar, &this->mutex); + this->idle_threads--; + continue; + } + this->list->remove_first(this->list, (void**)&job); + pthread_mutex_unlock(&this->mutex); + /* terminated threads are restarted, so we have a constant pool */ + pthread_cleanup_push((void*)restart, this); + job->execute(job); + pthread_cleanup_pop(0); + pthread_mutex_lock(&this->mutex); + } + this->total_threads--; + pthread_cond_broadcast(&this->condvar); + pthread_mutex_unlock(&this->mutex); +} + +/** + * Implementation of processor_t.get_total_threads. + */ +static u_int get_total_threads(private_processor_t *this) +{ + return this->total_threads; +} + +/** + * Implementation of processor_t.get_idle_threads. + */ +static u_int get_idle_threads(private_processor_t *this) +{ + return this->idle_threads; +} + +/** + * implements processor_t.get_job_load + */ +static u_int get_job_load(private_processor_t *this) +{ + u_int load; + pthread_mutex_lock(&this->mutex); + load = this->list->get_count(this->list); + pthread_mutex_unlock(&this->mutex); + return load; +} + +/** + * implements function processor_t.queue_job + */ +static void queue_job(private_processor_t *this, job_t *job) +{ + pthread_mutex_lock(&this->mutex); + this->list->insert_last(this->list, job); + pthread_mutex_unlock(&this->mutex); + pthread_cond_signal(&this->condvar); +} + +/** + * Implementation of processor_t.set_threads. + */ +static void set_threads(private_processor_t *this, u_int count) +{ + pthread_mutex_lock(&this->mutex); + if (count > this->total_threads) + { /* increase thread count */ + int i; + pthread_t current; + + this->desired_threads = count; + DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads); + for (i = this->total_threads; i < count; i++) + { + if (pthread_create(¤t, NULL, (void*)process_jobs, this) == 0) + { + this->total_threads++; + } + } + } + else if (count < this->total_threads) + { /* decrease thread count */ + this->desired_threads = count; + } + pthread_mutex_unlock(&this->mutex); +} + +/** + * Implementation of processor_t.destroy. + */ +static void destroy(private_processor_t *this) +{ + set_threads(this, 0); + while (this->total_threads > 0) + { + pthread_cond_broadcast(&this->condvar); + pthread_cond_wait(&this->condvar, &this->mutex); + } + this->list->destroy_offset(this->list, offsetof(job_t, destroy)); + free(this); +} + +/* + * Described in header. + */ +processor_t *processor_create(size_t pool_size) +{ + private_processor_t *this = malloc_thing(private_processor_t); + + this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads; + this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads; + this->public.get_job_load = (u_int(*)(processor_t*))get_job_load; + this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job; + this->public.set_threads = (void(*)(processor_t*, u_int))set_threads; + this->public.destroy = (void(*)(processor_t*))destroy; + + this->list = linked_list_create(); + pthread_mutex_init(&this->mutex, NULL); + pthread_cond_init(&this->condvar, NULL); + this->total_threads = 0; + this->desired_threads = 0; + this->idle_threads = 0; + + return &this->public; +} + diff --git a/src/charon/processing/processor.h b/src/charon/processing/processor.h new file mode 100644 index 000000000..6763e27d0 --- /dev/null +++ b/src/charon/processing/processor.h @@ -0,0 +1,109 @@ +/** + * @file processor.h + * + * @brief Interface of processor_t. + * + */ + +/* + * Copyright (C) 2005-2007 Martin Willi + * Copyright (C) 2005 Jan Hutter + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#ifndef PROCESSOR_H_ +#define PROCESSOR_H_ + +typedef struct processor_t processor_t; + +#include <stdlib.h> + +#include <library.h> +#include <processing/jobs/job.h> + +/** + * @brief The processor uses threads to process queued jobs. + * + * @b Constructors: + * - processor_create() + * + * @ingroup processing + */ +struct processor_t { + + /** + * @brief Get the total number of threads used by the processor. + * + * @param this calling object + * @return size of thread pool + */ + u_int (*get_total_threads) (processor_t *this); + + /** + * @brief Get the number of threads currently waiting. + * + * @param this calling object + * @return number of idle threads + */ + u_int (*get_idle_threads) (processor_t *this); + + /** + * @brief Get the number of queued jobs. + * + * @param this calling object + * @returns number of items in queue + */ + u_int (*get_job_load) (processor_t *this); + + /** + * @brief Adds a job to the queue. + * + * This function is non blocking and adds a job_t to the queue. + * + * @param this calling object + * @param job job to add to the queue + */ + void (*queue_job) (processor_t *this, job_t *job); + + /** + * @brief Set the number of threads to use in the processor. + * + * If the number of threads is smaller than number of currently running + * threads, thread count is decreased. Use 0 to disable the processor. + * This call blocks if it decreases thread count until threads have + * terminated, so make sure there are not too many blocking jobs. + * + * @param this calling object + * @param count number of threads to allocate + */ + void (*set_threads)(processor_t *this, u_int count); + + /** + * @brief Destroy a processor object. + * + * @param processor calling object + */ + void (*destroy) (processor_t *processor); +}; + +/** + * @brief Create the thread pool without any threads. + * + * @return processor_t object + * + * @ingroup processing + */ +processor_t *processor_create(); + +#endif /*PROCESSOR_H_*/ + diff --git a/src/charon/processing/scheduler.c b/src/charon/processing/scheduler.c index 7249e43e6..2706585b0 100644 --- a/src/charon/processing/scheduler.c +++ b/src/charon/processing/scheduler.c @@ -23,12 +23,39 @@ #include <stdlib.h> #include <pthread.h> +#include <sys/time.h> #include "scheduler.h" #include <daemon.h> -#include <processing/job_queue.h> +#include <processing/processor.h> +#include <processing/jobs/callback_job.h> +typedef struct event_t event_t; + +/** + * Event containing a job and a schedule time + */ +struct event_t { + /** + * Time to fire the event. + */ + timeval_t time; + + /** + * Every event has its assigned job. + */ + job_t *job; +}; + +/** + * destroy an event and its job + */ +static void event_destroy(event_t *event) +{ + event->job->destroy(event->job); + free(event); +} typedef struct private_scheduler_t private_scheduler_t; @@ -42,36 +69,164 @@ struct private_scheduler_t { scheduler_t public; /** - * Assigned thread. + * Job wich schedules */ - pthread_t assigned_thread; + callback_job_t *job; + + /** + * The jobs are scheduled in a list. + */ + linked_list_t *list; + + /** + * Exclusive access to list + */ + pthread_mutex_t mutex; + + /** + * Condvar to wait for next job. + */ + pthread_cond_t condvar; }; /** - * Implementation of private_scheduler_t.get_events. + * Returns the difference of two timeval structs in milliseconds + */ +static long time_difference(timeval_t *end, timeval_t *start) +{ + time_t s; + suseconds_t us; + + s = end->tv_sec - start->tv_sec; + us = end->tv_usec - start->tv_usec; + return (s * 1000 + us/1000); +} + +/** + * Get events from the queue and pass it to the processor */ -static void get_events(private_scheduler_t * this) +static job_requeue_t schedule(private_scheduler_t * this) { - job_t *current_job; + timespec_t timeout; + timeval_t now; + event_t *event; + long difference; + int oldstate; + bool timed = FALSE; - /* cancellation disabled by default */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + DBG2(DBG_JOB, "waiting for next event..."); + pthread_mutex_lock(&this->mutex); - DBG1(DBG_JOB, "scheduler thread running, thread_ID: %06u", - (int)pthread_self()); + gettimeofday(&now, NULL); + + if (this->list->get_count(this->list) > 0) + { + this->list->get_first(this->list, (void **)&event); + difference = time_difference(&now, &event->time); + if (difference > 0) + { + DBG2(DBG_JOB, "got event, queueing job for execution"); + this->list->remove_first(this->list, (void **)&event); + pthread_mutex_unlock(&this->mutex); + charon->processor->queue_job(charon->processor, event->job); + free(event); + return JOB_REQUEUE_DIRECT; + } + timeout.tv_sec = event->time.tv_sec; + timeout.tv_nsec = event->time.tv_usec * 1000; + timed = TRUE; + } + pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); + + if (timed) + { + pthread_cond_timedwait(&this->condvar, &this->mutex, &timeout); + } + else + { + pthread_cond_wait(&this->condvar, &this->mutex); + } + pthread_setcancelstate(oldstate, NULL); + pthread_cleanup_pop(0); + + pthread_mutex_unlock(&this->mutex); + return JOB_REQUEUE_DIRECT; +} - charon->drop_capabilities(charon, TRUE); +/** + * Implements scheduler_t.get_job_load + */ +static u_int get_job_load(private_scheduler_t *this) +{ + int count; + pthread_mutex_lock(&this->mutex); + count = this->list->get_count(this->list); + pthread_mutex_unlock(&this->mutex); + return count; +} - while (TRUE) +/** + * Implements scheduler_t.schedule_job. + */ +static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t time) +{ + timeval_t now; + event_t *event, *current; + iterator_t *iterator; + time_t s; + suseconds_t us; + + event = malloc_thing(event_t); + event->job = job; + + /* calculate absolute time */ + s = time / 1000; + us = (time - s * 1000) * 1000; + gettimeofday(&now, NULL); + event->time.tv_usec = (now.tv_usec + us) % 1000000; + event->time.tv_sec = now.tv_sec + (now.tv_usec + us)/1000000 + s; + + pthread_mutex_lock(&this->mutex); + while(TRUE) { - DBG2(DBG_JOB, "waiting for next event..."); - /* get a job, this block until one is available */ - current_job = charon->event_queue->get(charon->event_queue); - /* queue the job in the job queue, workers will eat them */ - DBG2(DBG_JOB, "got event, adding job %N to job-queue", - job_type_names, current_job->get_type(current_job)); - charon->job_queue->add(charon->job_queue, current_job); + if (this->list->get_count(this->list) == 0) + { + this->list->insert_first(this->list,event); + break; + } + + this->list->get_last(this->list, (void**)¤t); + if (time_difference(&event->time, ¤t->time) >= 0) + { /* new event has to be fired after the last event in list */ + this->list->insert_last(this->list, event); + break; + } + + this->list->get_first(this->list, (void**)¤t); + if (time_difference(&event->time, ¤t->time) < 0) + { /* new event has to be fired before the first event in list */ + this->list->insert_first(this->list, event); + break; + } + + iterator = this->list->create_iterator(this->list, TRUE); + /* first element has not to be checked (already done) */ + iterator->iterate(iterator, (void**)¤t); + while(iterator->iterate(iterator, (void**)¤t)) + { + if (time_difference(&event->time, ¤t->time) <= 0) + { + /* new event has to be fired before the current event in list */ + iterator->insert_before(iterator, event); + break; + } + } + iterator->destroy(iterator); + break; } + pthread_cond_signal(&this->condvar); + pthread_mutex_unlock(&this->mutex); } /** @@ -79,8 +234,8 @@ static void get_events(private_scheduler_t * this) */ static void destroy(private_scheduler_t *this) { - pthread_cancel(this->assigned_thread); - pthread_join(this->assigned_thread, NULL); + this->job->cancel(this->job); + this->list->destroy_function(this->list, (void*)event_destroy); free(this); } @@ -91,14 +246,17 @@ scheduler_t * scheduler_create() { private_scheduler_t *this = malloc_thing(private_scheduler_t); + this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load; + this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job; this->public.destroy = (void(*)(scheduler_t*)) destroy; - if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))get_events, this) != 0) - { - /* thread could not be created */ - free(this); - charon->kill(charon, "unable to create scheduler thread"); - } + this->list = linked_list_create(); + pthread_mutex_init(&this->mutex, NULL); + pthread_cond_init(&this->condvar, NULL); + + this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL); + charon->processor->queue_job(charon->processor, (job_t*)this->job); - return &(this->public); + return &this->public; } + diff --git a/src/charon/processing/scheduler.h b/src/charon/processing/scheduler.h index bea93e7c9..7bde6e638 100644 --- a/src/charon/processing/scheduler.h +++ b/src/charon/processing/scheduler.h @@ -6,7 +6,7 @@ */ /* - * Copyright (C) 2005-2006 Martin Willi + * Copyright (C) 2005-2007 Martin Willi * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil * @@ -27,14 +27,12 @@ typedef struct scheduler_t scheduler_t; #include <library.h> +#include <processing/jobs/job.h> /** - * @brief The scheduler thread is responsible for timed events. + * @brief The scheduler queues and executes timed events. * - * The scheduler thread takes out jobs from the event-queue and adds them - * to the job-queue. - * - * Starts a thread which does the work, since event-queue is blocking. + * The scheduler stores timed events and passes them to the processor. * * @b Constructors: * - scheduler_create() @@ -44,25 +42,40 @@ typedef struct scheduler_t scheduler_t; struct scheduler_t { /** + * @brief Adds a event to the queue, using a relative time offset. + * + * Schedules a job for execution using a relative time offset. + * + * @param this calling object + * @param job job to schedule + * @param time relative to to schedule job (in ms) + */ + void (*schedule_job) (scheduler_t *this, job_t *job, u_int32_t time); + + /** + * @brief Returns number of jobs scheduled. + * + * @param this calling object + * @return number of scheduled jobs + */ + u_int (*get_job_load) (scheduler_t *this); + + /** * @brief Destroys a scheduler object. * - * @param scheduler calling object + * @param this calling object */ - void (*destroy) (scheduler_t *scheduler); + void (*destroy) (scheduler_t *this); }; /** - * @brief Create a scheduler with its associated thread. - * - * The thread will start to get jobs form the event queue - * and adds them to the job queue. + * @brief Create a scheduler. * - * @return - * - scheduler_t object - * - NULL if thread could not be started + * @return scheduler_t object * * @ingroup processing */ -scheduler_t * scheduler_create(void); +scheduler_t *scheduler_create(void); #endif /*SCHEDULER_H_*/ + diff --git a/src/charon/processing/thread_pool.c b/src/charon/processing/thread_pool.c deleted file mode 100644 index a9891da15..000000000 --- a/src/charon/processing/thread_pool.c +++ /dev/null @@ -1,183 +0,0 @@ -/** - * @file thread_pool.c - * - * @brief Implementation of thread_pool_t. - * - */ - -/* - * Copyright (C) 2005-2006 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -#include <stdlib.h> -#include <pthread.h> -#include <string.h> -#include <errno.h> - -#include "thread_pool.h" - -#include <daemon.h> -#include <processing/job_queue.h> - - -typedef struct private_thread_pool_t private_thread_pool_t; - -/** - * @brief Private data of thread_pool_t class. - */ -struct private_thread_pool_t { - /** - * Public thread_pool_t interface. - */ - thread_pool_t public; - - /** - * Number of running threads. - */ - u_int pool_size; - - /** - * Number of threads waiting for work - */ - u_int idle_threads; - - /** - * Array of thread ids. - */ - pthread_t *threads; -}; - -/** - * Implementation of private_thread_pool_t.process_jobs. - */ -static void process_jobs(private_thread_pool_t *this) -{ - job_t *job; - status_t status; - - /* cancellation disabled by default */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - - DBG1(DBG_JOB, "worker thread running, thread_ID: %06u", - (int)pthread_self()); - - charon->drop_capabilities(charon, TRUE); - - while (TRUE) - { - /* TODO: should be atomic, but is not mission critical */ - this->idle_threads++; - job = charon->job_queue->get(charon->job_queue); - this->idle_threads--; - - status = job->execute(job); - - if (status == DESTROY_ME) - { - job->destroy(job); - } - } -} - -/** - * Implementation of thread_pool_t.get_pool_size. - */ -static u_int get_pool_size(private_thread_pool_t *this) -{ - return this->pool_size; -} - -/** - * Implementation of thread_pool_t.get_idle_threads. - */ -static u_int get_idle_threads(private_thread_pool_t *this) -{ - return this->idle_threads; -} - -/** - * Implementation of thread_pool_t.destroy. - */ -static void destroy(private_thread_pool_t *this) -{ - int current; - /* flag thread for termination */ - for (current = 0; current < this->pool_size; current++) - { - DBG1(DBG_JOB, "cancelling worker thread #%d", current+1); - pthread_cancel(this->threads[current]); - } - - /* wait for all threads */ - for (current = 0; current < this->pool_size; current++) { - if (pthread_join(this->threads[current], NULL) == 0) - { - DBG1(DBG_JOB, "worker thread #%d terminated", current+1); - } - else - { - DBG1(DBG_JOB, "could not terminate worker thread #%d", current+1); - } - } - - /* free mem */ - free(this->threads); - free(this); -} - -/* - * Described in header. - */ -thread_pool_t *thread_pool_create(size_t pool_size) -{ - int current; - private_thread_pool_t *this = malloc_thing(private_thread_pool_t); - - /* fill in public fields */ - this->public.destroy = (void(*)(thread_pool_t*))destroy; - this->public.get_pool_size = (u_int(*)(thread_pool_t*))get_pool_size; - this->public.get_idle_threads = (u_int(*)(thread_pool_t*))get_idle_threads; - - /* initialize member */ - this->pool_size = pool_size; - this->idle_threads = 0; - this->threads = malloc(sizeof(pthread_t) * pool_size); - - /* try to create as many threads as possible, up to pool_size */ - for (current = 0; current < pool_size; current++) - { - if (pthread_create(&(this->threads[current]), NULL, - (void*(*)(void*))process_jobs, this) == 0) - { - DBG1(DBG_JOB, "created worker thread #%d", current+1); - } - else - { - /* creation failed, is it the first one? */ - if (current == 0) - { - free(this->threads); - free(this); - charon->kill(charon, "could not create any worker threads"); - } - /* not all threads could be created, but at least one :-/ */ - DBG1(DBG_JOB, "could only create %d from requested %d threads!", - current, pool_size); - this->pool_size = current; - break; - } - } - return (thread_pool_t*)this; -} diff --git a/src/charon/processing/thread_pool.h b/src/charon/processing/thread_pool.h deleted file mode 100644 index 09a6312a8..000000000 --- a/src/charon/processing/thread_pool.h +++ /dev/null @@ -1,87 +0,0 @@ -/** - * @file thread_pool.h - * - * @brief Interface of thread_pool_t. - * - */ - -/* - * Copyright (C) 2005-2006 Martin Willi - * Copyright (C) 2005 Jan Hutter - * Hochschule fuer Technik Rapperswil - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - */ - -#ifndef THREAD_POOL_H_ -#define THREAD_POOL_H_ - -typedef struct thread_pool_t thread_pool_t; - -#include <stdlib.h> - -#include <library.h> - -/** - * @brief A thread_pool consists of a pool of threads processing jobs from the job queue. - * - * Current implementation uses as many threads as specified in constructor. - * A more improved version would dynamically increase thread count if necessary. - * - * @b Constructors: - * - thread_pool_create() - * - * @todo Add support for dynamic thread handling - * - * @ingroup processing - */ -struct thread_pool_t { - - /** - * @brief Return currently instanciated thread count. - * - * @param thread_pool calling object - * @return size of thread pool - */ - u_int (*get_pool_size) (thread_pool_t *thread_pool); - - /** - * @brief Get the number of threads currently waiting for work. - * - * @param thread_pool calling object - * @return number of idle threads - */ - u_int (*get_idle_threads) (thread_pool_t *thread_pool); - - /** - * @brief Destroy a thread_pool_t object. - * - * Sends cancellation request to all threads and AWAITS their termination. - * - * @param thread_pool calling object - */ - void (*destroy) (thread_pool_t *thread_pool); -}; - -/** - * @brief Create the thread pool using using pool_size of threads. - * - * @param pool_size desired pool size - * @return - * - thread_pool_t object if one ore more threads could be started, or - * - NULL if no threads could be created - * - * @ingroup processing - */ -thread_pool_t *thread_pool_create(size_t pool_size); - - -#endif /*THREAD_POOL_H_*/ diff --git a/src/charon/sa/ike_sa.c b/src/charon/sa/ike_sa.c index 8b4b53e10..5fc6a9625 100644 --- a/src/charon/sa/ike_sa.c +++ b/src/charon/sa/ike_sa.c @@ -442,8 +442,8 @@ static status_t send_dpd(private_ike_sa_t *this) } /* recheck in "interval" seconds */ job = send_dpd_job_create(this->ike_sa_id); - charon->event_queue->add_relative(charon->event_queue, (job_t*)job, - (delay - diff) * 1000); + charon->scheduler->schedule_job(charon->scheduler, (job_t*)job, + (delay - diff) * 1000); return SUCCESS; } @@ -477,8 +477,8 @@ static void send_keepalive(private_ike_sa_t *this) diff = 0; } job = send_keepalive_job_create(this->ike_sa_id); - charon->event_queue->add_relative(charon->event_queue, (job_t*)job, - (KEEPALIVE_INTERVAL - diff) * 1000); + charon->scheduler->schedule_job(charon->scheduler, (job_t*)job, + (KEEPALIVE_INTERVAL - diff) * 1000); } /** @@ -524,16 +524,16 @@ static void set_state(private_ike_sa_t *this, ike_sa_state_t state) { this->time.rekey = now + soft; job = (job_t*)rekey_ike_sa_job_create(this->ike_sa_id, reauth); - charon->event_queue->add_relative(charon->event_queue, job, - soft * 1000); + charon->scheduler->schedule_job(charon->scheduler, job, + soft * 1000); } if (hard) { this->time.delete = now + hard; job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, TRUE); - charon->event_queue->add_relative(charon->event_queue, job, - hard * 1000); + charon->scheduler->schedule_job(charon->scheduler, job, + hard * 1000); } } break; @@ -542,8 +542,8 @@ static void set_state(private_ike_sa_t *this, ike_sa_state_t state) { /* delete may fail if a packet gets lost, so set a timeout */ job_t *job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, TRUE); - charon->event_queue->add_relative(charon->event_queue, job, - HALF_OPEN_IKE_SA_TIMEOUT); + charon->scheduler->schedule_job(charon->scheduler, job, + HALF_OPEN_IKE_SA_TIMEOUT); break; } default: @@ -761,8 +761,8 @@ static status_t process_message(private_ike_sa_t *this, message_t *message) } /* add a timeout if peer does not establish it completely */ job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, FALSE); - charon->event_queue->add_relative(charon->event_queue, job, - HALF_OPEN_IKE_SA_TIMEOUT); + charon->scheduler->schedule_job(charon->scheduler, job, + HALF_OPEN_IKE_SA_TIMEOUT); } /* check if message is trustworthy, and update host information */ @@ -1625,7 +1625,7 @@ static void reestablish(private_ike_sa_t *this) charon->ike_sa_manager->checkin(charon->ike_sa_manager, &other->public); job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, TRUE); - charon->job_queue->add(charon->job_queue, job); + charon->processor->queue_job(charon->processor, job); } /** diff --git a/src/charon/sa/task_manager.c b/src/charon/sa/task_manager.c index e67508ed1..3f13dcef5 100644 --- a/src/charon/sa/task_manager.c +++ b/src/charon/sa/task_manager.c @@ -235,7 +235,7 @@ static status_t retransmit(private_task_manager_t *this, u_int32_t message_id) this->initiating.packet->clone(this->initiating.packet)); job = (job_t*)retransmit_job_create(this->initiating.mid, this->ike_sa->get_id(this->ike_sa)); - charon->event_queue->add_relative(charon->event_queue, job, timeout); + charon->scheduler->schedule_job(charon->scheduler, job, timeout); } return SUCCESS; } diff --git a/src/charon/sa/tasks/child_rekey.c b/src/charon/sa/tasks/child_rekey.c index 4f3c69034..3667d8fad 100644 --- a/src/charon/sa/tasks/child_rekey.c +++ b/src/charon/sa/tasks/child_rekey.c @@ -206,7 +206,7 @@ static status_t process_i(private_child_rekey_t *this, message_t *message) DBG1(DBG_IKE, "CHILD_SA rekeying failed, " "trying again in %d seconds", retry); this->child_sa->set_state(this->child_sa, CHILD_INSTALLED); - charon->event_queue->add_relative(charon->event_queue, job, retry * 1000); + charon->scheduler->schedule_job(charon->scheduler, job, retry * 1000); } return SUCCESS; } diff --git a/src/charon/sa/tasks/ike_rekey.c b/src/charon/sa/tasks/ike_rekey.c index d54fc3524..60cb1e63c 100644 --- a/src/charon/sa/tasks/ike_rekey.c +++ b/src/charon/sa/tasks/ike_rekey.c @@ -180,7 +180,7 @@ static status_t process_i(private_ike_rekey_t *this, message_t *message) DBG1(DBG_IKE, "IKE_SA rekeying failed, " "trying again in %d seconds", retry); this->ike_sa->set_state(this->ike_sa, IKE_ESTABLISHED); - charon->event_queue->add_relative(charon->event_queue, job, retry * 1000); + charon->scheduler->schedule_job(charon->scheduler, job, retry * 1000); } return SUCCESS; case NEED_MORE: @@ -231,7 +231,7 @@ static status_t process_i(private_ike_rekey_t *this, message_t *message) } job = (job_t*)delete_ike_sa_job_create(to_delete, TRUE); - charon->job_queue->add(charon->job_queue, job); + charon->processor->queue_job(charon->processor, job); return SUCCESS; } |