aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Willi <martin@strongswan.org>2007-06-11 10:57:19 +0000
committerMartin Willi <martin@strongswan.org>2007-06-11 10:57:19 +0000
commit9fe1a1ca7617bb562750864aae1892ece1a6a1e6 (patch)
tree057d73714d52c09c40950927fede15e73cd6793b
parentaca0317d92c4141e1b48c7081f39d8646bd4767d (diff)
downloadstrongswan-9fe1a1ca7617bb562750864aae1892ece1a6a1e6.tar.bz2
strongswan-9fe1a1ca7617bb562750864aae1892ece1a6a1e6.tar.xz
introduced callback_job:
simple asynchronous method invocation use daemons thread pool for all threads proper cancellation and cleanups cancellation mechanism to dynamically unload multithreaded code unified event_queue and scheduler => scheduler unified job_queue and thread_pool => processor removed job_type_t, not really needed fixes here, there and everywhere
-rw-r--r--src/charon/Makefile.am7
-rw-r--r--src/charon/bus/bus.c32
-rwxr-xr-xsrc/charon/control/interfaces/stroke_interface.c182
-rw-r--r--src/charon/control/interfaces/xml_interface.c211
-rw-r--r--src/charon/daemon.c160
-rw-r--r--src/charon/daemon.h32
-rw-r--r--src/charon/kernel/kernel_interface.c177
-rw-r--r--src/charon/network/receiver.c164
-rw-r--r--src/charon/network/sender.c69
-rw-r--r--src/charon/processing/event_queue.c290
-rw-r--r--src/charon/processing/event_queue.h118
-rw-r--r--src/charon/processing/job_queue.c139
-rw-r--r--src/charon/processing/job_queue.h100
-rw-r--r--src/charon/processing/jobs/acquire_job.c33
-rw-r--r--src/charon/processing/jobs/callback_job.c213
-rw-r--r--src/charon/processing/jobs/callback_job.h126
-rw-r--r--src/charon/processing/jobs/delete_child_sa_job.c34
-rw-r--r--src/charon/processing/jobs/delete_ike_sa_job.c24
-rw-r--r--src/charon/processing/jobs/job.c39
-rw-r--r--src/charon/processing/jobs/job.h118
-rw-r--r--src/charon/processing/jobs/process_message_job.c23
-rw-r--r--src/charon/processing/jobs/rekey_child_sa_job.c32
-rw-r--r--src/charon/processing/jobs/rekey_ike_sa_job.c56
-rw-r--r--src/charon/processing/jobs/retransmit_job.c23
-rw-r--r--src/charon/processing/jobs/send_dpd_job.c45
-rw-r--r--src/charon/processing/jobs/send_keepalive_job.c32
-rw-r--r--src/charon/processing/processor.c233
-rw-r--r--src/charon/processing/processor.h109
-rw-r--r--src/charon/processing/scheduler.c214
-rw-r--r--src/charon/processing/scheduler.h45
-rw-r--r--src/charon/processing/thread_pool.c183
-rw-r--r--src/charon/processing/thread_pool.h87
-rw-r--r--src/charon/sa/ike_sa.c26
-rw-r--r--src/charon/sa/task_manager.c2
-rw-r--r--src/charon/sa/tasks/child_rekey.c2
-rw-r--r--src/charon/sa/tasks/ike_rekey.c4
36 files changed, 1606 insertions, 1778 deletions
diff --git a/src/charon/Makefile.am b/src/charon/Makefile.am
index a64d9fa70..13a5ad253 100644
--- a/src/charon/Makefile.am
+++ b/src/charon/Makefile.am
@@ -48,12 +48,11 @@ network/packet.c network/packet.h \
network/receiver.c network/receiver.h \
network/sender.c network/sender.h \
network/socket.c network/socket.h \
-processing/event_queue.c processing/event_queue.h \
-processing/job_queue.c processing/job_queue.h \
+processing/jobs/job.h \
processing/jobs/acquire_job.c processing/jobs/acquire_job.h \
+processing/jobs/callback_job.c processing/jobs/callback_job.h \
processing/jobs/delete_child_sa_job.c processing/jobs/delete_child_sa_job.h \
processing/jobs/delete_ike_sa_job.c processing/jobs/delete_ike_sa_job.h \
-processing/jobs/job.c processing/jobs/job.h \
processing/jobs/process_message_job.c processing/jobs/process_message_job.h \
processing/jobs/rekey_child_sa_job.c processing/jobs/rekey_child_sa_job.h \
processing/jobs/rekey_ike_sa_job.c processing/jobs/rekey_ike_sa_job.h \
@@ -61,7 +60,7 @@ processing/jobs/retransmit_job.c processing/jobs/retransmit_job.h \
processing/jobs/send_dpd_job.c processing/jobs/send_dpd_job.h \
processing/jobs/send_keepalive_job.c processing/jobs/send_keepalive_job.h \
processing/scheduler.c processing/scheduler.h \
-processing/thread_pool.c processing/thread_pool.h \
+processing/processor.c processing/processor.h \
sa/authenticators/authenticator.c sa/authenticators/authenticator.h \
sa/authenticators/eap_authenticator.c sa/authenticators/eap_authenticator.h \
sa/authenticators/eap/eap_method.c sa/authenticators/eap/eap_method.h \
diff --git a/src/charon/bus/bus.c b/src/charon/bus/bus.c
index 5f46cd29e..d1984d7f7 100644
--- a/src/charon/bus/bus.c
+++ b/src/charon/bus/bus.c
@@ -238,30 +238,13 @@ static active_listener_t *get_active_listener(private_bus_t *this)
return found;
}
-typedef struct cancel_info_t cancel_info_t;
-
-/**
- * cancellation info to cancel a listening operation cleanly
- */
-struct cancel_info_t {
- /**
- * mutex to unlock on cancellation
- */
- pthread_mutex_t *mutex;
-
- /**
- * listener to unregister
- */
- active_listener_t *listener;
-};
-
/**
* disable a listener to cleanly clean up
*/
-static void unregister(cancel_info_t *info)
+static void unregister(active_listener_t *listener)
{
- info->listener->state = UNREGISTERED;
- pthread_mutex_unlock(info->mutex);
+ listener->state = UNREGISTERED;
+ pthread_cond_broadcast(&listener->cond);
}
/**
@@ -272,7 +255,6 @@ static signal_t listen_(private_bus_t *this, level_t *level, int *thread,
{
active_listener_t *listener;
int oldstate;
- cancel_info_t info;
pthread_mutex_lock(&this->mutex);
listener = get_active_listener(this);
@@ -281,13 +263,13 @@ static signal_t listen_(private_bus_t *this, level_t *level, int *thread,
pthread_cond_broadcast(&listener->cond);
/* wait until it has us delivered a signal, and go back to "registered".
* we allow cancellation here, but must cleanly disable the listener. */
- info.mutex = &this->mutex;
- info.listener = listener;
- pthread_cleanup_push((void*)unregister, &info);
+ pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex);
+ pthread_cleanup_push((void*)unregister, listener);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
pthread_cond_wait(&listener->cond, &this->mutex);
pthread_setcancelstate(oldstate, NULL);
pthread_cleanup_pop(0);
+ pthread_cleanup_pop(0);
pthread_mutex_unlock(&this->mutex);
@@ -320,7 +302,7 @@ static void set_listen_state(private_bus_t *this, bool active)
{
listener->state = UNREGISTERED;
/* say hello to signal emitter; we are finished processing the signal */
- pthread_cond_signal(&listener->cond);
+ pthread_cond_broadcast(&listener->cond);
}
pthread_mutex_unlock(&this->mutex);
diff --git a/src/charon/control/interfaces/stroke_interface.c b/src/charon/control/interfaces/stroke_interface.c
index 6e3427e8e..045d588f2 100755
--- a/src/charon/control/interfaces/stroke_interface.c
+++ b/src/charon/control/interfaces/stroke_interface.c
@@ -43,6 +43,7 @@
#include <control/interface_manager.h>
#include <control/interfaces/interface.h>
#include <utils/leak_detective.h>
+#include <processing/jobs/callback_job.h>
#define IKE_PORT 500
#define PATH_BUF 256
@@ -69,9 +70,9 @@ struct private_stroke_interface_t {
int socket;
/**
- * Thread which reads from the Socket
+ * job accepting stroke messages
*/
- pthread_t threads[STROKE_THREADS];
+ callback_job_t *job;
};
typedef struct stroke_log_info_t stroke_log_info_t;
@@ -224,8 +225,7 @@ static void pop_end(stroke_msg_t *msg, const char* label, stroke_end_t *end)
/**
* Add a connection to the configuration list
*/
-static void stroke_add_conn(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_add_conn(stroke_msg_t *msg, FILE *out)
{
ike_cfg_t *ike_cfg;
peer_cfg_t *peer_cfg;
@@ -628,8 +628,7 @@ destroy_hosts:
/**
* Delete a connection from the list
*/
-static void stroke_del_conn(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_del_conn(stroke_msg_t *msg, FILE *out)
{
iterator_t *peer_iter, *child_iter;
peer_cfg_t *peer, *child;
@@ -747,8 +746,7 @@ static peer_cfg_t *get_peer_cfg_by_name(char *name)
/**
* initiate a connection by name
*/
-static void stroke_initiate(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_initiate(stroke_msg_t *msg, FILE *out)
{
peer_cfg_t *peer_cfg;
child_cfg_t *child_cfg;
@@ -781,7 +779,6 @@ static void stroke_initiate(private_stroke_interface_t *this,
info.out = out;
info.level = msg->output_verbosity;
-
charon->interfaces->initiate(charon->interfaces, peer_cfg, child_cfg,
(interface_manager_cb_t)stroke_log, &info);
}
@@ -789,8 +786,7 @@ static void stroke_initiate(private_stroke_interface_t *this,
/**
* route a policy (install SPD entries)
*/
-static void stroke_route(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_route(stroke_msg_t *msg, FILE *out)
{
peer_cfg_t *peer_cfg;
child_cfg_t *child_cfg;
@@ -830,8 +826,7 @@ static void stroke_route(private_stroke_interface_t *this,
/**
* unroute a policy
*/
-static void stroke_unroute(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_unroute(stroke_msg_t *msg, FILE *out)
{
char *name;
ike_sa_t *ike_sa;
@@ -874,8 +869,7 @@ static void stroke_unroute(private_stroke_interface_t *this,
/**
* terminate a connection by name
*/
-static void stroke_terminate(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_terminate(stroke_msg_t *msg, FILE *out)
{
char *string, *pos = NULL, *name = NULL;
u_int32_t id = 0;
@@ -979,8 +973,7 @@ static void stroke_terminate(private_stroke_interface_t *this,
/**
* Add a ca information record to the cainfo list
*/
-static void stroke_add_ca(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_add_ca(stroke_msg_t *msg, FILE *out)
{
x509_t *cacert;
ca_info_t *ca_info;
@@ -1047,8 +1040,7 @@ static void stroke_add_ca(private_stroke_interface_t *this,
/**
* Delete a ca information record from the cainfo list
*/
-static void stroke_del_ca(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_del_ca(stroke_msg_t *msg, FILE *out)
{
status_t status;
@@ -1194,8 +1186,7 @@ static void log_child_sa(FILE *out, child_sa_t *child_sa, bool all)
/**
* show status of daemon
*/
-static void stroke_status(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out, bool all)
+static void stroke_status(stroke_msg_t *msg, FILE *out, bool all)
{
iterator_t *iterator, *children;
linked_list_t *list;
@@ -1218,12 +1209,12 @@ static void stroke_status(private_stroke_interface_t *this,
fprintf(out, "Performance:\n");
fprintf(out, " worker threads: %d idle of %d,",
- charon->thread_pool->get_idle_threads(charon->thread_pool),
- charon->thread_pool->get_pool_size(charon->thread_pool));
+ charon->processor->get_idle_threads(charon->processor),
+ charon->processor->get_total_threads(charon->processor));
fprintf(out, " job queue load: %d,",
- charon->job_queue->get_count(charon->job_queue));
+ charon->processor->get_job_load(charon->processor));
fprintf(out, " scheduled events: %d\n",
- charon->event_queue->get_count(charon->event_queue));
+ charon->scheduler->get_job_load(charon->scheduler));
list = charon->kernel_interface->create_address_list(charon->kernel_interface);
fprintf(out, "Listening on %d IP addresses:\n", list->get_count(list));
@@ -1312,8 +1303,8 @@ static void stroke_status(private_stroke_interface_t *this,
/**
* list all authority certificates matching a specified flag
*/
-static void list_auth_certificates(private_stroke_interface_t *this, u_int flag,
- const char *label, bool utc, FILE *out)
+static void list_auth_certificates(u_int flag, const char *label,
+ bool utc, FILE *out)
{
bool first = TRUE;
x509_t *cert;
@@ -1341,8 +1332,7 @@ static void list_auth_certificates(private_stroke_interface_t *this, u_int flag
/**
* list various information
*/
-static void stroke_list(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_list(stroke_msg_t *msg, FILE *out)
{
iterator_t *iterator;
@@ -1372,15 +1362,15 @@ static void stroke_list(private_stroke_interface_t *this,
}
if (msg->list.flags & LIST_CACERTS)
{
- list_auth_certificates(this, AUTH_CA, "CA", msg->list.utc, out);
+ list_auth_certificates(AUTH_CA, "CA", msg->list.utc, out);
}
if (msg->list.flags & LIST_OCSPCERTS)
{
- list_auth_certificates(this, AUTH_OCSP, "OCSP", msg->list.utc, out);
+ list_auth_certificates(AUTH_OCSP, "OCSP", msg->list.utc, out);
}
if (msg->list.flags & LIST_AACERTS)
{
- list_auth_certificates(this, AUTH_AA, "AA", msg->list.utc, out);
+ list_auth_certificates(AUTH_AA, "AA", msg->list.utc, out);
}
if (msg->list.flags & LIST_CAINFOS)
{
@@ -1453,8 +1443,7 @@ static void stroke_list(private_stroke_interface_t *this,
/**
* reread various information
*/
-static void stroke_reread(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_reread(stroke_msg_t *msg, FILE *out)
{
if (msg->reread.flags & REREAD_CACERTS)
{
@@ -1473,8 +1462,7 @@ static void stroke_reread(private_stroke_interface_t *this,
/**
* purge various information
*/
-static void stroke_purge(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_purge(stroke_msg_t *msg, FILE *out)
{
if (msg->purge.flags & PURGE_OCSP)
{
@@ -1510,8 +1498,7 @@ signal_t get_signal_from_logtype(char *type)
/**
* set the verbosity debug output
*/
-static void stroke_loglevel(private_stroke_interface_t *this,
- stroke_msg_t *msg, FILE *out)
+static void stroke_loglevel(stroke_msg_t *msg, FILE *out)
{
signal_t signal;
@@ -1533,20 +1520,22 @@ static void stroke_loglevel(private_stroke_interface_t *this,
/**
* process a stroke request from the socket pointed by "fd"
*/
-static void stroke_process(private_stroke_interface_t *this, int strokefd)
+static job_requeue_t stroke_process(int *fdp)
{
stroke_msg_t *msg;
u_int16_t msg_length;
ssize_t bytes_read;
FILE *out;
+ int strokefd = *fdp;
/* peek the length */
bytes_read = recv(strokefd, &msg_length, sizeof(msg_length), MSG_PEEK);
if (bytes_read != sizeof(msg_length))
{
- DBG1(DBG_CFG, "reading length of stroke message failed");
+ DBG1(DBG_CFG, "reading length of stroke message failed: %s",
+ strerror(errno));
close(strokefd);
- return;
+ return JOB_REQUEUE_NONE;
}
/* read message */
@@ -1556,105 +1545,107 @@ static void stroke_process(private_stroke_interface_t *this, int strokefd)
{
DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno));
close(strokefd);
- return;
+ return JOB_REQUEUE_NONE;
}
- out = fdopen(dup(strokefd), "w");
+ out = fdopen(strokefd, "w");
if (out == NULL)
{
DBG1(DBG_CFG, "opening stroke output channel failed: %s", strerror(errno));
close(strokefd);
free(msg);
- return;
+ return JOB_REQUEUE_NONE;
}
DBG3(DBG_CFG, "stroke message %b", (void*)msg, msg_length);
+ /* the stroke_* functions are blocking, as they listen on the bus. Add
+ * cancellation handlers. */
+ pthread_cleanup_push((void*)fclose, out);
+ pthread_cleanup_push(free, msg);
+
switch (msg->type)
{
case STR_INITIATE:
- stroke_initiate(this, msg, out);
+ stroke_initiate(msg, out);
break;
case STR_ROUTE:
- stroke_route(this, msg, out);
+ stroke_route(msg, out);
break;
case STR_UNROUTE:
- stroke_unroute(this, msg, out);
+ stroke_unroute(msg, out);
break;
case STR_TERMINATE:
- stroke_terminate(this, msg, out);
+ stroke_terminate(msg, out);
break;
case STR_STATUS:
- stroke_status(this, msg, out, FALSE);
+ stroke_status(msg, out, FALSE);
break;
case STR_STATUS_ALL:
- stroke_status(this, msg, out, TRUE);
+ stroke_status(msg, out, TRUE);
break;
case STR_ADD_CONN:
- stroke_add_conn(this, msg, out);
+ stroke_add_conn(msg, out);
break;
case STR_DEL_CONN:
- stroke_del_conn(this, msg, out);
+ stroke_del_conn(msg, out);
break;
case STR_ADD_CA:
- stroke_add_ca(this, msg, out);
+ stroke_add_ca(msg, out);
break;
case STR_DEL_CA:
- stroke_del_ca(this, msg, out);
+ stroke_del_ca(msg, out);
break;
case STR_LOGLEVEL:
- stroke_loglevel(this, msg, out);
+ stroke_loglevel(msg, out);
break;
case STR_LIST:
- stroke_list(this, msg, out);
+ stroke_list(msg, out);
break;
case STR_REREAD:
- stroke_reread(this, msg, out);
+ stroke_reread(msg, out);
break;
case STR_PURGE:
- stroke_purge(this, msg, out);
+ stroke_purge(msg, out);
break;
default:
DBG1(DBG_CFG, "received unknown stroke");
}
- fclose(out);
- close(strokefd);
- free(msg);
+ /* remove and execute cancellation handlers */
+ pthread_cleanup_pop(1);
+ pthread_cleanup_pop(1);
+
+ return JOB_REQUEUE_NONE;
}
+
/**
* Implementation of private_stroke_interface_t.stroke_receive.
*/
-static void stroke_receive(private_stroke_interface_t *this)
+static job_requeue_t stroke_receive(private_stroke_interface_t *this)
{
struct sockaddr_un strokeaddr;
int strokeaddrlen = sizeof(strokeaddr);
+ int strokefd, *fdp;
int oldstate;
- int strokefd;
+ callback_job_t *job;
- charon->drop_capabilities(charon, TRUE);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+ strokefd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
+ pthread_setcancelstate(oldstate, NULL);
- /* ignore sigpipe. writing over the pipe back to the console
- * only fails if SIGPIPE is ignored. */
- signal(SIGPIPE, SIG_IGN);
-
- /* disable cancellation by default */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-
- while (TRUE)
+ if (strokefd < 0)
{
- /* wait for connections, but allow thread to terminate */
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
- strokefd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
- pthread_setcancelstate(oldstate, NULL);
-
- if (strokefd < 0)
- {
- DBG1(DBG_CFG, "accepting stroke connection failed: %s", strerror(errno));
- continue;
- }
- stroke_process(this, strokefd);
+ DBG1(DBG_CFG, "accepting stroke connection failed: %s", strerror(errno));
+ return JOB_REQUEUE_FAIR;
}
+
+ fdp = malloc_thing(int);
+ *fdp = strokefd;
+ job = callback_job_create((callback_job_cb_t)stroke_process, fdp, free, this->job);
+ charon->processor->queue_job(charon->processor, (job_t*)job);
+
+ return JOB_REQUEUE_FAIR;
}
/**
@@ -1662,17 +1653,9 @@ static void stroke_receive(private_stroke_interface_t *this)
*/
static void destroy(private_stroke_interface_t *this)
{
- int i;
-
- for (i = 0; i < STROKE_THREADS; i++)
- {
- pthread_cancel(this->threads[i]);
- pthread_join(this->threads[i], NULL);
- }
-
- close(this->socket);
- unlink(socket_addr.sun_path);
+ this->job->cancel(this->job);
free(this);
+ unlink(socket_addr.sun_path);
}
/*
@@ -1682,7 +1665,6 @@ interface_t *interface_create()
{
private_stroke_interface_t *this = malloc_thing(private_stroke_interface_t);
mode_t old;
- int i;
/* public functions */
this->public.interface.destroy = (void (*)(interface_t*))destroy;
@@ -1715,14 +1697,10 @@ interface_t *interface_create()
return NULL;
}
- /* start threads reading from the socket */
- for (i = 0; i < STROKE_THREADS; i++)
- {
- if (pthread_create(&this->threads[i], NULL, (void*(*)(void*))stroke_receive, this) != 0)
- {
- charon->kill(charon, "unable to create stroke thread");
- }
- }
+ this->job = callback_job_create((callback_job_cb_t)stroke_receive,
+ this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
return &this->public.interface;
}
+
diff --git a/src/charon/control/interfaces/xml_interface.c b/src/charon/control/interfaces/xml_interface.c
index e570f2543..8dd614493 100644
--- a/src/charon/control/interfaces/xml_interface.c
+++ b/src/charon/control/interfaces/xml_interface.c
@@ -24,10 +24,24 @@
#include "xml_interface.h"
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+#include <errno.h>
+#include <pthread.h>
+#include <signal.h>
+#include <libxml/xmlreader.h>
+#include <libxml/xmlwriter.h>
+
#include <library.h>
#include <daemon.h>
+static struct sockaddr_un socket_addr = { AF_UNIX, "/var/run/charon.xml"};
+
+
typedef struct private_xml_interface_t private_xml_interface_t;
/**
@@ -39,14 +53,171 @@ struct private_xml_interface_t {
* Public part of xml_t object.
*/
xml_interface_t public;
+
+ /**
+ * XML unix socket fd
+ */
+ int socket;
+
+ /**
+ * thread receiving messages
+ */
+ pthread_t thread;
};
+static void get(private_xml_interface_t *this,
+ xmlTextReaderPtr reader, xmlTextWriterPtr writer)
+{
+
+ if (/* <GetResponse> */
+ xmlTextWriterStartElement(writer, "GetResponse") < 0 ||
+ /* <Status Code="200"><Message/></Status> */
+ xmlTextWriterStartElement(writer, "Status") < 0 ||
+ xmlTextWriterWriteAttribute(writer, "Code", "200") < 0 ||
+ xmlTextWriterStartElement(writer, "Message") < 0 ||
+ xmlTextWriterEndElement(writer) < 0 ||
+ xmlTextWriterEndElement(writer) < 0 ||
+ /* <ConnectionList/> */
+ xmlTextWriterStartElement(writer, "ConnectionList") < 0 ||
+ xmlTextWriterEndElement(writer) < 0 ||
+ /* </GetResponse> */
+ xmlTextWriterEndElement(writer) < 0)
+ {
+ DBG1(DBG_CFG, "error writing XML document (GetResponse)");
+ }
+
+
+/*
+ DBG1(DBG_CFG, "%d %d %s %d %d %s",
+ xmlTextReaderDepth(reader),
+ ,
+ xmlTextReaderConstName(reader),
+ xmlTextReaderIsEmptyElement(reader),
+ xmlTextReaderHasValue(reader),
+ xmlTextReaderConstValue(reader));
+ */
+}
+
+static void receive(private_xml_interface_t *this)
+{
+ charon->drop_capabilities(charon, TRUE);
+
+ /* disable cancellation by default */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+
+ while (TRUE)
+ {
+ struct sockaddr_un strokeaddr;
+ int strokeaddrlen = sizeof(strokeaddr);
+ int oldstate;
+ int fd;
+ char buffer[4096];
+ size_t len;
+
+ /* wait for connections, but allow thread to terminate */
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+ fd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
+ pthread_setcancelstate(oldstate, NULL);
+
+ if (fd < 0)
+ {
+ DBG1(DBG_CFG, "accepting SMP XML socket failed: %s", strerror(errno));
+ continue;
+ }
+ DBG2(DBG_CFG, "SMP XML connection opened");
+ while (TRUE)
+ {
+ xmlTextReaderPtr reader;
+ xmlTextWriterPtr writer;
+
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+ len = read(fd, buffer, sizeof(buffer));
+ pthread_setcancelstate(oldstate, NULL);
+ if (len <= 0)
+ {
+ close(fd);
+ DBG2(DBG_CFG, "SMP XML connection closed");
+ break;
+ }
+
+ reader = xmlReaderForMemory(buffer, len, NULL, NULL, 0);
+ if (reader == NULL)
+ {
+ DBG1(DBG_CFG, "opening SMP XML reader failed");
+ continue;
+ }
+
+ writer = xmlNewTextWriter(xmlOutputBufferCreateFd(fd, NULL));
+ if (writer == NULL)
+ {
+ xmlFreeTextReader(reader);
+ DBG1(DBG_CFG, "opening SMP XML writer failed");
+ continue;
+ }
+
+ /* create the standard message parts */
+ if (xmlTextWriterStartDocument(writer, NULL, NULL, NULL) < 0 ||
+ /* <SMPMessage xmlns="http://www.strongswan.org/smp/1.0"> */
+ xmlTextWriterStartElement(writer, "SMPMessage") < 0 ||
+ xmlTextWriterWriteAttribute(writer, "xmlns",
+ "http://www.strongswan.org/smp/1.0") < 0 ||
+ /* <Body> */
+ xmlTextWriterStartElement(writer, "Body") < 0)
+ {
+ xmlFreeTextReader(reader);
+ xmlFreeTextWriter(writer);
+ DBG1(DBG_CFG, "creating SMP XML message failed");
+ continue;
+ }
+
+ while (TRUE)
+ {
+ switch (xmlTextReaderRead(reader))
+ {
+ case 1:
+ {
+ if (xmlTextReaderNodeType(reader) ==
+ XML_READER_TYPE_ELEMENT)
+ {
+ if (streq(xmlTextReaderConstName(reader), "GetRequest"))
+ {
+ get(this, reader, writer);
+ break;
+ }
+ }
+ continue;
+ }
+ case 0:
+ /* end of XML */
+ break;
+ default:
+ DBG1(DBG_CFG, "parsing SMP XML message failed");
+ break;
+ }
+ xmlFreeTextReader(reader);
+ break;
+ }
+ /* write </Body></SMPMessage> and close document */
+ if (xmlTextWriterEndDocument(writer) < 0)
+ {
+ DBG1(DBG_CFG, "completing SMP XML message failed");
+ }
+ xmlFreeTextWriter(writer);
+ /* write a newline to indicate end of xml */
+ write(fd, "\n", 1);
+ }
+ }
+}
/**
* Implementation of itnerface_t.destroy.
*/
static void destroy(private_xml_interface_t *this)
{
+ pthread_cancel(this->thread);
+ pthread_join(this->thread, NULL);
+ close(this->socket);
+ unlink(socket_addr.sun_path);
free(this);
}
@@ -56,8 +227,46 @@ static void destroy(private_xml_interface_t *this)
interface_t *interface_create()
{
private_xml_interface_t *this = malloc_thing(private_xml_interface_t);
+ mode_t old;
- this->public.interface.destroy = (void (*)(xml_interface_t*))destroy;
+ this->public.interface.destroy = (void (*)(interface_t*))destroy;
+
+ /* set up unix socket */
+ this->socket = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (this->socket == -1)
+ {
+ DBG1(DBG_CFG, "could not create XML socket");
+ free(this);
+ return NULL;
+ }
+
+ old = umask(~S_IRWXU);
+ if (bind(this->socket, (struct sockaddr *)&socket_addr, sizeof(socket_addr)) < 0)
+ {
+ DBG1(DBG_CFG, "could not bind XML socket: %s", strerror(errno));
+ close(this->socket);
+ free(this);
+ return NULL;
+ }
+ umask(old);
+
+ if (listen(this->socket, 0) < 0)
+ {
+ DBG1(DBG_CFG, "could not listen on XML socket: %s", strerror(errno));
+ close(this->socket);
+ free(this);
+ return NULL;
+ }
+
+ if (pthread_create(&this->thread, NULL, (void*(*)(void*))receive, this) != 0)
+ {
+ DBG1(DBG_CFG, "could not create XML socket thread: %s", strerror(errno));
+ close(this->socket);
+ unlink(socket_addr.sun_path);
+ free(this);
+ return NULL;
+ }
return &this->public.interface;
}
+
diff --git a/src/charon/daemon.c b/src/charon/daemon.c
index 62e29b365..aa81442cd 100644
--- a/src/charon/daemon.c
+++ b/src/charon/daemon.c
@@ -115,25 +115,26 @@ static void dbg_stderr(int level, char *fmt, ...)
*/
static void run(private_daemon_t *this)
{
- /* reselect signals for this thread */
- sigemptyset(&(this->signal_set));
- sigaddset(&(this->signal_set), SIGINT);
- sigaddset(&(this->signal_set), SIGHUP);
- sigaddset(&(this->signal_set), SIGTERM);
- pthread_sigmask(SIG_BLOCK, &(this->signal_set), 0);
-
- while(TRUE)
+ sigset_t set;
+
+ /* handle SIGINT, SIGHUP ans SIGTERM in this handler */
+ sigemptyset(&set);
+ sigaddset(&set, SIGINT);
+ sigaddset(&set, SIGHUP);
+ sigaddset(&set, SIGTERM);
+
+ while (TRUE)
{
- int signal_number;
+ int sig;
int error;
- error = sigwait(&(this->signal_set), &signal_number);
- if(error)
+ error = sigwait(&set, &sig);
+ if (error)
{
DBG1(DBG_DMN, "error %d while waiting for a signal", error);
return;
}
- switch (signal_number)
+ switch (sig)
{
case SIGHUP:
{
@@ -146,11 +147,13 @@ static void run(private_daemon_t *this)
return;
}
case SIGTERM:
+ {
DBG1(DBG_DMN, "signal of type SIGTERM received. Shutting down");
return;
+ }
default:
{
- DBG1(DBG_DMN, "unknown signal %d received. Ignored", signal_number);
+ DBG1(DBG_DMN, "unknown signal %d received. Ignored", sig);
break;
}
}
@@ -162,33 +165,22 @@ static void run(private_daemon_t *this)
*/
static void destroy(private_daemon_t *this)
{
- /* destruction is a non trivial task, we need to follow
- * a strict order to prevent threading issues!
- * Kill active threads first, except the sender, as
- * the killed IKE_SA want to send delete messages.
- */
- /* we don't want to receive anything anymore... */
- DESTROY_IF(this->public.receiver);
- /* ignore all incoming user requests */
- DESTROY_IF(this->public.interfaces);
- /* stop scheduing jobs */
- DESTROY_IF(this->public.scheduler);
- /* stop processing jobs */
- DESTROY_IF(this->public.thread_pool);
- /* shut down manager with all IKE SAs */
+ /* terminate all idle threads */
+ this->public.processor->set_threads(this->public.processor, 0);
+ /* close all IKE_SAs */
DESTROY_IF(this->public.ike_sa_manager);
- /* all child SAs should be down now, so kill kernel interface */
- DESTROY_IF(this->public.kernel_interface);
- /* destroy other infrastructure */
- DESTROY_IF(this->public.job_queue);
- DESTROY_IF(this->public.event_queue);
- DESTROY_IF(this->public.credentials);
+ DESTROY_IF(this->public.scheduler);
+ DESTROY_IF(this->public.interfaces);
DESTROY_IF(this->public.backends);
- /* we hope the sender could send the outstanding deletes, but
- * we shut down here at any cost */
+ DESTROY_IF(this->public.credentials);
+ DESTROY_IF(this->public.kernel_interface);
DESTROY_IF(this->public.sender);
+ DESTROY_IF(this->public.receiver);
DESTROY_IF(this->public.socket);
- /* before destroying bus with its listeners, rehook library logs */
+ /* wait until all threads are gone */
+ DESTROY_IF(this->public.processor);
+
+ /* rehook library logging, shutdown logging */
dbg = dbg_stderr;
DESTROY_IF(this->public.bus);
DESTROY_IF(this->public.outlog);
@@ -197,7 +189,6 @@ static void destroy(private_daemon_t *this)
free(this);
}
-
/**
* Enforce daemon shutdown, with a given reason to do so.
*/
@@ -228,6 +219,7 @@ static void drop_capabilities(private_daemon_t *this, bool full)
{
struct __user_cap_header_struct hdr;
struct __user_cap_data_struct data;
+
/* CAP_NET_ADMIN is needed to use netlink */
u_int32_t keep = (1<<CAP_NET_ADMIN);
@@ -242,11 +234,11 @@ static void drop_capabilities(private_daemon_t *this, bool full)
}
else
{
- /* CAP_NET_BIND_SERVICE to bind services below port 1024,
- * CAP_NET_RAW to create RAW sockets.
- * CAP_DAC_READ_SEARCH is needed to read ipsec.secrets */
+ /* CAP_NET_BIND_SERVICE to bind services below port 1024 */
keep |= (1<<CAP_NET_BIND_SERVICE);
+ /* CAP_NET_RAW to create RAW sockets */
keep |= (1<<CAP_NET_RAW);
+ /* CAP_DAC_READ_SEARCH to read ipsec.secrets */
keep |= (1<<CAP_DAC_READ_SEARCH);
}
@@ -257,7 +249,7 @@ static void drop_capabilities(private_daemon_t *this, bool full)
if (capset(&hdr, &data))
{
- kill_daemon(this, "unable to drop threads capabilities");
+ kill_daemon(this, "unable to drop daemon capabilities");
}
}
@@ -266,7 +258,6 @@ static void drop_capabilities(private_daemon_t *this, bool full)
*/
static void initialize(private_daemon_t *this, bool syslog, level_t levels[])
{
- credential_store_t* credentials;
signal_t signal;
/* for uncritical pseudo random numbers */
@@ -298,38 +289,32 @@ static void initialize(private_daemon_t *this, bool syslog, level_t levels[])
DBG1(DBG_DMN, "starting charon (strongSwan Version %s)", VERSION);
- this->public.socket = socket_create(IKEV2_UDP_PORT, IKEV2_NATT_PORT);
this->public.ike_sa_manager = ike_sa_manager_create();
- this->public.job_queue = job_queue_create();
- this->public.event_queue = event_queue_create();
- this->public.credentials = (credential_store_t*)local_credential_store_create();
- this->public.backends = backend_manager_create();
-
- /* initialize fetcher_t class */
- fetcher_initialize();
+ this->public.processor = processor_create();
+ this->public.scheduler = scheduler_create();
/* load secrets, ca certificates and crls */
- credentials = this->public.credentials;
- credentials->load_ca_certificates(credentials);
- credentials->load_aa_certificates(credentials);
- credentials->load_attr_certificates(credentials);
- credentials->load_ocsp_certificates(credentials);
- credentials->load_crls(credentials);
- credentials->load_secrets(credentials);
-
- /* start building threads, we are multi-threaded NOW */
+ this->public.credentials = (credential_store_t*)local_credential_store_create();
+ this->public.credentials->load_ca_certificates(this->public.credentials);
+ this->public.credentials->load_aa_certificates(this->public.credentials);
+ this->public.credentials->load_attr_certificates(this->public.credentials);
+ this->public.credentials->load_ocsp_certificates(this->public.credentials);
+ this->public.credentials->load_crls(this->public.credentials);
+ this->public.credentials->load_secrets(this->public.credentials);
+
this->public.interfaces = interface_manager_create();
+ this->public.backends = backend_manager_create();
+ this->public.kernel_interface = kernel_interface_create();
+ this->public.socket = socket_create(IKEV2_UDP_PORT, IKEV2_NATT_PORT);
this->public.sender = sender_create();
this->public.receiver = receiver_create();
- this->public.scheduler = scheduler_create();
- this->public.kernel_interface = kernel_interface_create();
- this->public.thread_pool = thread_pool_create(NUMBER_OF_WORKING_THREADS);
+
}
/**
* Handle SIGSEGV/SIGILL signals raised by threads
*/
-void signal_handler(int signal)
+static void segv_handler(int signal)
{
#ifdef HAVE_BACKTRACE
void *array[20];
@@ -340,7 +325,7 @@ void signal_handler(int signal)
size = backtrace(array, 20);
strings = backtrace_symbols(array, size);
- DBG1(DBG_DMN, "thread %u received %s. Dumping %d frames from stack:",
+ DBG1(DBG_JOB, "thread %u received %s. Dumping %d frames from stack:",
pthread_self(), signal == SIGSEGV ? "SIGSEGV" : "SIGILL", size);
for (i = 0; i < size; i++)
@@ -352,7 +337,7 @@ void signal_handler(int signal)
DBG1(DBG_DMN, "thread %u received %s",
pthread_self(), signal == SIGSEGV ? "SIGSEGV" : "SIGILL");
#endif /* HAVE_BACKTRACE */
- DBG1(DBG_DMN, "killing ourself hard after SIGSEGV");
+ DBG1(DBG_DMN, "killing ourself, received critical signal");
raise(SIGKILL);
}
@@ -361,25 +346,22 @@ void signal_handler(int signal)
*/
private_daemon_t *daemon_create(void)
{
- private_daemon_t *this = malloc_thing(private_daemon_t);
struct sigaction action;
+ private_daemon_t *this = malloc_thing(private_daemon_t);
/* assign methods */
this->public.kill = (void (*) (daemon_t*,char*))kill_daemon;
- this->public.drop_capabilities = (void(*)(daemon_t*,bool))drop_capabilities;
/* NULL members for clean destruction */
this->public.socket = NULL;
this->public.ike_sa_manager = NULL;
- this->public.job_queue = NULL;
- this->public.event_queue = NULL;
this->public.credentials = NULL;
this->public.backends = NULL;
this->public.sender= NULL;
this->public.receiver = NULL;
this->public.scheduler = NULL;
this->public.kernel_interface = NULL;
- this->public.thread_pool = NULL;
+ this->public.processor = NULL;
this->public.interfaces = NULL;
this->public.bus = NULL;
this->public.outlog = NULL;
@@ -388,20 +370,19 @@ private_daemon_t *daemon_create(void)
this->main_thread_id = pthread_self();
- /* setup signal handling for all threads */
- sigemptyset(&(this->signal_set));
- sigaddset(&(this->signal_set), SIGSEGV);
- sigaddset(&(this->signal_set), SIGINT);
- sigaddset(&(this->signal_set), SIGHUP);
- sigaddset(&(this->signal_set), SIGTERM);
- pthread_sigmask(SIG_BLOCK, &(this->signal_set), 0);
-
- /* setup SIGSEGV handler for all threads */
- action.sa_handler = signal_handler;
- action.sa_mask = this->signal_set;
+ /* add handler for SEGV and ILL,
+ * add handler for USR1 (cancellation).
+ * INT, TERM and HUP are handled by sigwait() in run() */
+ action.sa_handler = segv_handler;
action.sa_flags = 0;
+ sigemptyset(&action.sa_mask);
+ sigaddset(&action.sa_mask, SIGINT);
+ sigaddset(&action.sa_mask, SIGTERM);
+ sigaddset(&action.sa_mask, SIGHUP);
sigaction(SIGSEGV, &action, NULL);
sigaction(SIGILL, &action, NULL);
+ pthread_sigmask(SIG_SETMASK, &action.sa_mask, 0);
+
return this;
}
@@ -450,10 +431,12 @@ int main(int argc, char *argv[])
level_t levels[DBG_MAX];
int signal;
- prctl(PR_SET_KEEPCAPS, 1);
+ private_charon = daemon_create();
+ charon = (daemon_t*)private_charon;
- /* drop the capabilities we won't need at all */
- drop_capabilities(NULL, FALSE);
+ /* drop the capabilities we won't need for initialization */
+ prctl(PR_SET_KEEPCAPS, 1);
+ drop_capabilities(private_charon, FALSE);
/* use CTRL loglevel for default */
for (signal = 0; signal < DBG_MAX; signal++)
@@ -522,13 +505,11 @@ int main(int argc, char *argv[])
}
break;
}
-
- private_charon = daemon_create();
- charon = (daemon_t*)private_charon;
/* initialize daemon */
initialize(private_charon, use_syslog, levels);
-
+ /* initialize fetcher_t class */
+ fetcher_initialize();
/* load pluggable EAP modules */
eap_method_load(eapdir);
@@ -562,6 +543,9 @@ int main(int argc, char *argv[])
/* drop additional capabilites (bind & root) */
drop_capabilities(private_charon, TRUE);
+ /* start the engine, go multithreaded */
+ charon->processor->set_threads(charon->processor, WORKER_THREADS);
+
/* run daemon */
run(private_charon);
diff --git a/src/charon/daemon.h b/src/charon/daemon.h
index 640bc6a09..534ae748e 100644
--- a/src/charon/daemon.h
+++ b/src/charon/daemon.h
@@ -33,9 +33,7 @@ typedef struct daemon_t daemon_t;
#include <network/receiver.h>
#include <network/socket.h>
#include <processing/scheduler.h>
-#include <processing/thread_pool.h>
-#include <processing/job_queue.h>
-#include <processing/event_queue.h>
+#include <processing/processor.h>
#include <kernel/kernel_interface.h>
#include <control/interface_manager.h>
#include <bus/bus.h>
@@ -234,12 +232,9 @@ typedef struct daemon_t daemon_t;
/**
* @brief Number of threads in the thread pool.
*
- * There are several other threads, this defines
- * only the number of threads in thread_pool_t.
- *
* @ingroup charon
*/
-#define NUMBER_OF_WORKING_THREADS 4
+#define WORKER_THREADS 16
/**
* UDP Port on which the daemon will listen for incoming traffic.
@@ -338,20 +333,11 @@ typedef struct daemon_t daemon_t;
* @ingroup charon
*/
struct daemon_t {
+
/**
* A socket_t instance.
*/
socket_t *socket;
-
- /**
- * A job_queue_t instance.
- */
- job_queue_t *job_queue;
-
- /**
- * A event_queue_t instance.
- */
- event_queue_t *event_queue;
/**
* A ike_sa_manager_t instance.
@@ -384,9 +370,9 @@ struct daemon_t {
scheduler_t *scheduler;
/**
- * The Thread pool managing the worker threads.
+ * Job processing using a thread pool.
*/
- thread_pool_t *thread_pool;
+ processor_t *processor;
/**
* The signaling bus.
@@ -419,14 +405,6 @@ struct daemon_t {
interface_manager_t *interfaces;
/**
- * @brief Let the calling thread drop its capabilities.
- *
- * @param this calling daemon
- * @param full TRUE to drop as many as possible
- */
- void (*drop_capabilities) (daemon_t *this, bool full);
-
- /**
* @brief Shut down the daemon.
*
* @param this the daemon to kill
diff --git a/src/charon/kernel/kernel_interface.c b/src/charon/kernel/kernel_interface.c
index d82783b03..c1764b5a5 100644
--- a/src/charon/kernel/kernel_interface.c
+++ b/src/charon/kernel/kernel_interface.c
@@ -48,6 +48,7 @@
#include <processing/jobs/delete_child_sa_job.h>
#include <processing/jobs/rekey_child_sa_job.h>
#include <processing/jobs/acquire_job.h>
+#include <processing/jobs/callback_job.h>
/** kernel level protocol identifiers */
#define KERNEL_ESP 50
@@ -156,7 +157,7 @@ char* lookup_algorithm(kernel_algorithm_t *kernel_algo,
}
kernel_algo++;
}
- return NULL;
+ return NULL;
}
typedef struct route_entry_t route_entry_t;
@@ -297,6 +298,11 @@ struct private_kernel_interface_t {
* Mutex to lock access to vips.
*/
pthread_mutex_t vips_mutex;
+
+ /**
+ * job receiving xfrm events
+ */
+ callback_job_t *job;
/**
* netlink xfrm socket to receive acquire and expire events
@@ -312,11 +318,6 @@ struct private_kernel_interface_t {
* Netlink rt socket (routing)
*/
int socket_rt;
-
- /**
- * Thread receiving events from kernel
- */
- pthread_t event_thread;
};
/**
@@ -444,99 +445,98 @@ static void add_attribute(struct nlmsghdr *hdr, int rta_type, chunk_t data,
/**
* Receives events from kernel
*/
-static void receive_events(private_kernel_interface_t *this)
+static job_requeue_t receive_events(private_kernel_interface_t *this)
{
- charon->drop_capabilities(charon, TRUE);
+ unsigned char response[512];
+ struct nlmsghdr *hdr;
+ struct sockaddr_nl addr;
+ socklen_t addr_len = sizeof(addr);
+ int len, oldstate;
+
+ hdr = (struct nlmsghdr*)response;
+
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+ len = recvfrom(this->socket_xfrm_events, response, sizeof(response), 0,
+ (struct sockaddr*)&addr, &addr_len);
+ pthread_setcancelstate(oldstate, NULL);
+
+ if (len < 0)
+ {
+ if (errno == EINTR)
+ { /* interrupted, try again */
+ return JOB_REQUEUE_DIRECT;
+ }
+ charon->kill(charon, "unable to receive netlink events");
+ }
+
+ if (!NLMSG_OK(hdr, len))
+ {
+ /* bad netlink message */
+ return JOB_REQUEUE_DIRECT;
+ }
- while(TRUE)
+ if (addr.nl_pid != 0)
{
- unsigned char response[512];
- struct nlmsghdr *hdr;
- struct sockaddr_nl addr;
- socklen_t addr_len = sizeof(addr);
- int len;
-
- hdr = (struct nlmsghdr*)response;
- len = recvfrom(this->socket_xfrm_events, response, sizeof(response),
- 0, (struct sockaddr*)&addr, &addr_len);
- if (len < 0)
+ /* not from kernel. not interested, try another one */
+ return JOB_REQUEUE_DIRECT;
+ }
+
+ /* we handle ACQUIRE and EXPIRE messages directly */
+ if (hdr->nlmsg_type == XFRM_MSG_ACQUIRE)
+ {
+ u_int32_t reqid = 0;
+ job_t *job;
+ struct rtattr *rtattr = XFRM_RTA(hdr, struct xfrm_user_acquire);
+ size_t rtsize = XFRM_PAYLOAD(hdr, struct xfrm_user_tmpl);
+ if (RTA_OK(rtattr, rtsize))
{
- if (errno == EINTR)
+ if (rtattr->rta_type == XFRMA_TMPL)
{
- /* interrupted, try again */
- continue;
+ struct xfrm_user_tmpl* tmpl = (struct xfrm_user_tmpl*)RTA_DATA(rtattr);
+ reqid = tmpl->reqid;
}
- charon->kill(charon, "unable to receive netlink events");
}
-
- if (!NLMSG_OK(hdr, len))
+ if (reqid == 0)
{
- /* bad netlink message */
- continue;
+ DBG1(DBG_KNL, "received a XFRM_MSG_ACQUIRE, but no reqid found");
}
-
- if (addr.nl_pid != 0)
+ else
{
- /* not from kernel. not interested, try another one */
- continue;
+ DBG2(DBG_KNL, "received a XFRM_MSG_ACQUIRE");
+ DBG1(DBG_KNL, "creating acquire job for CHILD_SA with reqid %d",
+ reqid);
+ job = (job_t*)acquire_job_create(reqid);
+ charon->processor->queue_job(charon->processor, job);
}
+ }
+ else if (hdr->nlmsg_type == XFRM_MSG_EXPIRE)
+ {
+ job_t *job;
+ protocol_id_t protocol;
+ u_int32_t spi, reqid;
+ struct xfrm_user_expire *expire;
+
+ expire = (struct xfrm_user_expire*)NLMSG_DATA(hdr);
+ protocol = expire->state.id.proto == KERNEL_ESP ?
+ PROTO_ESP : PROTO_AH;
+ spi = expire->state.id.spi;
+ reqid = expire->state.reqid;
- /* we handle ACQUIRE and EXPIRE messages directly */
- if (hdr->nlmsg_type == XFRM_MSG_ACQUIRE)
+ DBG2(DBG_KNL, "received a XFRM_MSG_EXPIRE");
+ DBG1(DBG_KNL, "creating %s job for %N CHILD_SA 0x%x (reqid %d)",
+ expire->hard ? "delete" : "rekey", protocol_id_names,
+ protocol, ntohl(spi), reqid);
+ if (expire->hard)
{
- u_int32_t reqid = 0;
- job_t *job;
- struct rtattr *rtattr = XFRM_RTA(hdr, struct xfrm_user_acquire);
- size_t rtsize = XFRM_PAYLOAD(hdr, struct xfrm_user_tmpl);
- if (RTA_OK(rtattr, rtsize))
- {
- if (rtattr->rta_type == XFRMA_TMPL)
- {
- struct xfrm_user_tmpl* tmpl = (struct xfrm_user_tmpl*)RTA_DATA(rtattr);
- reqid = tmpl->reqid;
- }
- }
- if (reqid == 0)
- {
- DBG1(DBG_KNL, "received a XFRM_MSG_ACQUIRE, but no reqid found");
- }
- else
- {
- DBG2(DBG_KNL, "received a XFRM_MSG_ACQUIRE");
- DBG1(DBG_KNL, "creating acquire job for CHILD_SA with reqid %d",
- reqid);
- job = (job_t*)acquire_job_create(reqid);
- charon->job_queue->add(charon->job_queue, job);
- }
+ job = (job_t*)delete_child_sa_job_create(reqid, protocol, spi);
}
- else if (hdr->nlmsg_type == XFRM_MSG_EXPIRE)
+ else
{
- job_t *job;
- protocol_id_t protocol;
- u_int32_t spi, reqid;
- struct xfrm_user_expire *expire;
-
- expire = (struct xfrm_user_expire*)NLMSG_DATA(hdr);
- protocol = expire->state.id.proto == KERNEL_ESP ?
- PROTO_ESP : PROTO_AH;
- spi = expire->state.id.spi;
- reqid = expire->state.reqid;
-
- DBG2(DBG_KNL, "received a XFRM_MSG_EXPIRE");
- DBG1(DBG_KNL, "creating %s job for %N CHILD_SA 0x%x (reqid %d)",
- expire->hard ? "delete" : "rekey", protocol_id_names,
- protocol, ntohl(spi), reqid);
- if (expire->hard)
- {
- job = (job_t*)delete_child_sa_job_create(reqid, protocol, spi);
- }
- else
- {
- job = (job_t*)rekey_child_sa_job_create(reqid, protocol, spi);
- }
- charon->job_queue->add(charon->job_queue, job);
+ job = (job_t*)rekey_child_sa_job_create(reqid, protocol, spi);
}
+ charon->processor->queue_job(charon->processor, job);
}
+ return JOB_REQUEUE_DIRECT;
}
/**
@@ -1880,8 +1880,7 @@ static status_t del_policy(private_kernel_interface_t *this,
*/
static void destroy(private_kernel_interface_t *this)
{
- pthread_cancel(this->event_thread);
- pthread_join(this->event_thread, NULL);
+ this->job->cancel(this->job);
close(this->socket_xfrm_events);
close(this->socket_xfrm);
close(this->socket_rt);
@@ -1961,14 +1960,10 @@ kernel_interface_t *kernel_interface_create()
charon->kill(charon, "unable to bind XFRM event socket");
}
- /* create a thread receiving ACQUIRE & EXPIRE events */
- if (pthread_create(&this->event_thread, NULL,
- (void*(*)(void*))receive_events, this))
- {
- charon->kill(charon, "unable to create xfrm event dispatcher thread");
- }
+ this->job = callback_job_create((callback_job_cb_t)receive_events,
+ this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
return &this->public;
}
-/* vim: set ts=4 sw=4 noet: */
diff --git a/src/charon/network/receiver.c b/src/charon/network/receiver.c
index 9b4bf71ac..1de1dd3d2 100644
--- a/src/charon/network/receiver.c
+++ b/src/charon/network/receiver.c
@@ -30,9 +30,9 @@
#include <daemon.h>
#include <network/socket.h>
#include <network/packet.h>
-#include <processing/job_queue.h>
#include <processing/jobs/job.h>
#include <processing/jobs/process_message_job.h>
+#include <processing/jobs/callback_job.h>
/** length of the full cookie, including time (u_int32_t + SHA1()) */
#define COOKIE_LENGTH 24
@@ -56,12 +56,17 @@ struct private_receiver_t {
/**
* Public part of a receiver_t object.
*/
- receiver_t public;
+ receiver_t public;
+
+ /**
+ * Threads job receiving packets
+ */
+ callback_job_t *job;
- /**
- * Assigned thread.
- */
- pthread_t assigned_thread;
+ /**
+ * Assigned thread.
+ */
+ pthread_t assigned_thread;
/**
* current secret to use for cookie calculation
@@ -245,94 +250,84 @@ static bool peer_to_aggressive(private_receiver_t *this, message_t *message)
/**
* Implementation of receiver_t.receive_packets.
*/
-static void receive_packets(private_receiver_t *this)
+static job_requeue_t receive_packets(private_receiver_t *this)
{
packet_t *packet;
message_t *message;
job_t *job;
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
- DBG1(DBG_NET, "receiver thread running, thread_ID: %06u",
- (int)pthread_self());
+ /* read in a packet */
+ if (charon->socket->receive(charon->socket, &packet) != SUCCESS)
+ {
+ DBG2(DBG_NET, "receiving from socket failed!");
+ return JOB_REQUEUE_FAIR;
+ }
- charon->drop_capabilities(charon, TRUE);
+ /* parse message header */
+ message = message_create_from_packet(packet);
+ if (message->parse_header(message) != SUCCESS)
+ {
+ DBG1(DBG_NET, "received invalid IKE header from %H - ignored",
+ packet->get_source(packet));
+ message->destroy(message);
+ return JOB_REQUEUE_DIRECT;
+ }
- while (TRUE)
+ /* check IKE major version */
+ if (message->get_major_version(message) != IKE_MAJOR_VERSION)
{
- /* read in a packet */
- if (charon->socket->receive(charon->socket, &packet) != SUCCESS)
- {
- DBG2(DBG_NET, "receiving from socket failed!");
- /* try again after a delay */
- sleep(1);
- continue;
- }
-
- /* parse message header */
- message = message_create_from_packet(packet);
- if (message->parse_header(message) != SUCCESS)
+ DBG1(DBG_NET, "received unsupported IKE version %d.%d from %H, "
+ "sending INVALID_MAJOR_VERSION", message->get_major_version(message),
+ message->get_minor_version(message), packet->get_source(packet));
+ send_notify(message, INVALID_MAJOR_VERSION, chunk_empty);
+ message->destroy(message);
+ return JOB_REQUEUE_DIRECT;
+ }
+
+ if (message->get_request(message) &&
+ message->get_exchange_type(message) == IKE_SA_INIT)
+ {
+ /* check for cookies */
+ if (cookie_required(this, message))
{
- DBG1(DBG_NET, "received invalid IKE header from %H - ignored",
- packet->get_source(packet));
+ u_int32_t now = time(NULL);
+ chunk_t cookie = cookie_build(this, message, now - this->secret_offset,
+ chunk_from_thing(this->secret));
+
+ DBG2(DBG_NET, "received packet from: %#H to %#H",
+ message->get_source(message),
+ message->get_destination(message));
+ DBG2(DBG_NET, "sending COOKIE notify to %H",
+ message->get_source(message));
+ send_notify(message, COOKIE, cookie);
+ chunk_free(&cookie);
+ if (++this->secret_used > COOKIE_REUSE)
+ {
+ /* create new cookie */
+ DBG1(DBG_NET, "generating new cookie secret after %d uses",
+ this->secret_used);
+ memcpy(this->secret_old, this->secret, SECRET_LENGTH);
+ this->randomizer->get_pseudo_random_bytes(this->randomizer,
+ SECRET_LENGTH, this->secret);
+ this->secret_switch = now;
+ this->secret_used = 0;
+ }
message->destroy(message);
- continue;
+ return JOB_REQUEUE_DIRECT;
}
- /* check IKE major version */
- if (message->get_major_version(message) != IKE_MAJOR_VERSION)
+ /* check if peer has not too many IKE_SAs half open */
+ if (peer_to_aggressive(this, message))
{
- DBG1(DBG_NET, "received unsupported IKE version %d.%d from %H, "
- "sending INVALID_MAJOR_VERSION", message->get_major_version(message),
- message->get_minor_version(message), packet->get_source(packet));
- send_notify(message, INVALID_MAJOR_VERSION, chunk_empty);
+ DBG1(DBG_NET, "ignoring IKE_SA setup from %H, "
+ "peer to aggressive", message->get_source(message));
message->destroy(message);
- continue;
- }
-
- if (message->get_request(message) &&
- message->get_exchange_type(message) == IKE_SA_INIT)
- {
- /* check for cookies */
- if (cookie_required(this, message))
- {
- u_int32_t now = time(NULL);
- chunk_t cookie = cookie_build(this, message, now - this->secret_offset,
- chunk_from_thing(this->secret));
-
- DBG2(DBG_NET, "received packet from: %#H to %#H",
- message->get_source(message),
- message->get_destination(message));
- DBG2(DBG_NET, "sending COOKIE notify to %H",
- message->get_source(message));
- send_notify(message, COOKIE, cookie);
- chunk_free(&cookie);
- if (++this->secret_used > COOKIE_REUSE)
- {
- /* create new cookie */
- DBG1(DBG_NET, "generating new cookie secret after %d uses",
- this->secret_used);
- memcpy(this->secret_old, this->secret, SECRET_LENGTH);
- this->randomizer->get_pseudo_random_bytes(this->randomizer,
- SECRET_LENGTH, this->secret);
- this->secret_switch = now;
- this->secret_used = 0;
- }
- message->destroy(message);
- continue;
- }
-
- /* check if peer has not too many IKE_SAs half open */
- if (peer_to_aggressive(this, message))
- {
- DBG1(DBG_NET, "ignoring IKE_SA setup from %H, "
- "peer to aggressive", message->get_source(message));
- message->destroy(message);
- continue;
- }
+ return JOB_REQUEUE_DIRECT;
}
- job = (job_t *)process_message_job_create(message);
- charon->job_queue->add(charon->job_queue, job);
}
+ job = (job_t*)process_message_job_create(message);
+ charon->processor->queue_job(charon->processor, job);
+ return JOB_REQUEUE_DIRECT;
}
/**
@@ -340,8 +335,7 @@ static void receive_packets(private_receiver_t *this)
*/
static void destroy(private_receiver_t *this)
{
- pthread_cancel(this->assigned_thread);
- pthread_join(this->assigned_thread, NULL);
+ this->job->cancel(this->job);
this->randomizer->destroy(this->randomizer);
this->hasher->destroy(this->hasher);
free(this);
@@ -366,12 +360,10 @@ receiver_t *receiver_create()
this->secret);
memcpy(this->secret_old, this->secret, SECRET_LENGTH);
- if (pthread_create(&this->assigned_thread, NULL,
- (void*)receive_packets, this) != 0)
- {
- free(this);
- charon->kill(charon, "unable to create receiver thread");
- }
+ this->job = callback_job_create((callback_job_cb_t)receive_packets,
+ this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
return &this->public;
}
+
diff --git a/src/charon/network/sender.c b/src/charon/network/sender.c
index 933b8c192..f934dc509 100644
--- a/src/charon/network/sender.c
+++ b/src/charon/network/sender.c
@@ -28,6 +28,7 @@
#include <daemon.h>
#include <network/socket.h>
+#include <processing/jobs/callback_job.h>
typedef struct private_sender_t private_sender_t;
@@ -39,12 +40,12 @@ struct private_sender_t {
/**
* Public part of a sender_t object.
*/
- sender_t public;
+ sender_t public;
- /**
- * Assigned thread.
- */
- pthread_t assigned_thread;
+ /**
+ * Sender threads job.
+ */
+ callback_job_t *job;
/**
* The packets are stored in a linked list
@@ -82,37 +83,29 @@ static void send_(private_sender_t *this, packet_t *packet)
/**
* Implementation of private_sender_t.send_packets.
*/
-static void send_packets(private_sender_t * this)
+static job_requeue_t send_packets(private_sender_t * this)
{
- /* cancellation disabled by default */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
- DBG1(DBG_NET, "sender thread running, thread_ID: %06u", (int)pthread_self());
+ packet_t *packet;
+ int oldstate;
- charon->drop_capabilities(charon, TRUE);
-
- while (TRUE)
+ pthread_mutex_lock(&this->mutex);
+ while (this->list->get_count(this->list) == 0)
{
- packet_t *packet;
- int oldstate;
-
- pthread_mutex_lock(&this->mutex);
- /* go to wait while no packets available */
- while (this->list->get_count(this->list) == 0)
- {
- /* add cleanup handler, wait for packet, remove cleanup handler */
- pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex);
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
- pthread_cond_wait(&this->condvar, &this->mutex);
-
- pthread_setcancelstate(oldstate, NULL);
- pthread_cleanup_pop(0);
- }
- this->list->remove_first(this->list, (void**)&packet);
- pthread_mutex_unlock(&this->mutex);
+ /* add cleanup handler, wait for packet, remove cleanup handler */
+ pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+
+ pthread_cond_wait(&this->condvar, &this->mutex);
- charon->socket->send(charon->socket, packet);
- packet->destroy(packet);
+ pthread_setcancelstate(oldstate, NULL);
+ pthread_cleanup_pop(0);
}
+ this->list->remove_first(this->list, (void**)&packet);
+ pthread_mutex_unlock(&this->mutex);
+
+ charon->socket->send(charon->socket, packet);
+ packet->destroy(packet);
+ return JOB_REQUEUE_DIRECT;
}
/**
@@ -125,8 +118,7 @@ static void destroy(private_sender_t *this)
{
sched_yield();
}
- pthread_cancel(this->assigned_thread);
- pthread_join(this->assigned_thread, NULL);
+ this->job->cancel(this->job);
this->list->destroy(this->list);
free(this);
}
@@ -145,11 +137,10 @@ sender_t * sender_create()
pthread_mutex_init(&this->mutex, NULL);
pthread_cond_init(&this->condvar, NULL);
- if (pthread_create(&this->assigned_thread, NULL,
- (void*)send_packets, this) != 0)
- {
- charon->kill(charon, "unable to create sender thread");
- }
+ this->job = callback_job_create((callback_job_cb_t)send_packets,
+ this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
- return &(this->public);
+ return &this->public;
}
+
diff --git a/src/charon/processing/event_queue.c b/src/charon/processing/event_queue.c
deleted file mode 100644
index 40bcb1ed8..000000000
--- a/src/charon/processing/event_queue.c
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * @file event_queue.c
- *
- * @brief Implementation of event_queue_t
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * for more details.
- */
-
-#include <pthread.h>
-#include <stdlib.h>
-
-#include "event_queue.h"
-
-#include <library.h>
-#include <utils/linked_list.h>
-
-
-
-typedef struct event_t event_t;
-
-/**
- * Event containing a job and a schedule time
- */
-struct event_t {
- /**
- * Time to fire the event.
- */
- timeval_t time;
-
- /**
- * Every event has its assigned job.
- */
- job_t * job;
-};
-
-/**
- * destroy an event and its job
- */
-static void event_destroy(event_t *event)
-{
- event->job->destroy(event->job);
- free(event);
-}
-
-typedef struct private_event_queue_t private_event_queue_t;
-
-/**
- * Private Variables and Functions of event_queue_t class.
- */
-struct private_event_queue_t {
- /**
- * Public part.
- */
- event_queue_t public;
-
- /**
- * The events are stored in a linked list of type linked_list_t.
- */
- linked_list_t *list;
-
- /**
- * Access to linked_list is locked through this mutex.
- */
- pthread_mutex_t mutex;
-
- /**
- * If the queue is empty or an event has not to be fired
- * a thread has to wait.
- *
- * This condvar is used to wake up such a thread.
- */
- pthread_cond_t condvar;
-};
-
-/**
- * Returns the difference of to timeval structs in milliseconds
- */
-static long time_difference(struct timeval *end_time, struct timeval *start_time)
-{
- time_t s;
- suseconds_t us;
-
- s = (end_time->tv_sec - start_time->tv_sec);
- us = (end_time->tv_usec - start_time->tv_usec);
- return ((s * 1000) + us/1000);
-}
-
-/**
- * Implements event_queue_t.get_count
- */
-static int get_count(private_event_queue_t *this)
-{
- int count;
- pthread_mutex_lock(&(this->mutex));
- count = this->list->get_count(this->list);
- pthread_mutex_unlock(&(this->mutex));
- return count;
-}
-
-/**
- * Implements event_queue_t.get
- */
-static job_t *get(private_event_queue_t *this)
-{
- timespec_t timeout;
- timeval_t current_time;
- event_t * next_event;
- job_t *job;
- int oldstate;
-
- pthread_mutex_lock(&(this->mutex));
-
- while (TRUE)
- {
- while(this->list->get_count(this->list) == 0)
- {
- /* add mutex unlock handler for cancellation, enable cancellation */
- pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&(this->mutex));
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
-
- pthread_cond_wait( &(this->condvar), &(this->mutex));
-
- /* reset cancellation, remove mutex-unlock handler (without executing) */
- pthread_setcancelstate(oldstate, NULL);
- pthread_cleanup_pop(0);
- }
-
- this->list->get_first(this->list, (void **)&next_event);
-
- gettimeofday(&current_time, NULL);
- long difference = time_difference(&current_time,&(next_event->time));
- if (difference <= 0)
- {
- timeout.tv_sec = next_event->time.tv_sec;
- timeout.tv_nsec = next_event->time.tv_usec * 1000;
-
- /* add mutex unlock handler for cancellation, enable cancellation */
- pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&(this->mutex));
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
-
- pthread_cond_timedwait(&(this->condvar), &(this->mutex), &timeout);
-
- /* reset cancellation, remove mutex-unlock handler (without executing) */
- pthread_setcancelstate(oldstate, NULL);
- pthread_cleanup_pop(0);
- }
- else
- {
- /* event available */
- this->list->remove_first(this->list, (void **)&next_event);
- job = next_event->job;
- free(next_event);
- break;
- }
- }
- pthread_cond_signal( &(this->condvar));
- pthread_mutex_unlock(&(this->mutex));
-
- return job;
-}
-
-/**
- * Implements function add_absolute of event_queue_t.
- * See #event_queue_s.add_absolute for description.
- */
-static void add_absolute(private_event_queue_t *this, job_t *job, timeval_t time)
-{
- event_t *event;
- event_t *current_event;
- iterator_t *iterator;
-
- /* create event */
- event = malloc_thing(event_t);
- event->time = time;
- event->job = job;
-
- pthread_mutex_lock(&(this->mutex));
-
- /* while just used to break out */
- while(TRUE)
- {
- if (this->list->get_count(this->list) == 0)
- {
- this->list->insert_first(this->list,event);
- break;
- }
-
- /* check last entry */
- this->list->get_last(this->list,(void **) &current_event);
-
- if (time_difference(&(event->time), &(current_event->time)) >= 0)
- {
- /* my event has to be fired after the last event in list */
- this->list->insert_last(this->list,event);
- break;
- }
-
- /* check first entry */
- this->list->get_first(this->list,(void **) &current_event);
-
- if (time_difference(&(event->time), &(current_event->time)) < 0)
- {
- /* my event has to be fired before the first event in list */
- this->list->insert_first(this->list,event);
- break;
- }
-
- iterator = this->list->create_iterator(this->list,TRUE);
- iterator->iterate(iterator, (void**)&current_event);
- /* first element has not to be checked (already done) */
- while(iterator->iterate(iterator, (void**)&current_event))
- {
- if (time_difference(&(event->time), &(current_event->time)) <= 0)
- {
- /* my event has to be fired before the current event in list */
- iterator->insert_before(iterator,event);
- break;
- }
- }
- iterator->destroy(iterator);
- break;
- }
-
- pthread_cond_signal( &(this->condvar));
- pthread_mutex_unlock(&(this->mutex));
-}
-
-/**
- * Implements event_queue_t.add_relative.
- */
-static void add_relative(event_queue_t *this, job_t *job, u_int32_t ms)
-{
- timeval_t current_time;
- timeval_t time;
-
- time_t s = ms / 1000;
- suseconds_t us = (ms - s * 1000) * 1000;
-
- gettimeofday(&current_time, NULL);
-
- time.tv_usec = (current_time.tv_usec + us) % 1000000;
- time.tv_sec = current_time.tv_sec + (current_time.tv_usec + us)/1000000 + s;
-
- this->add_absolute(this, job, time);
-}
-
-
-/**
- * Implements event_queue_t.destroy.
- */
-static void event_queue_destroy(private_event_queue_t *this)
-{
- this->list->destroy_function(this->list, (void*)event_destroy);
- free(this);
-}
-
-/*
- * Documented in header
- */
-event_queue_t *event_queue_create()
-{
- private_event_queue_t *this = malloc_thing(private_event_queue_t);
-
- this->public.get_count = (int (*) (event_queue_t *event_queue)) get_count;
- this->public.get = (job_t *(*) (event_queue_t *event_queue)) get;
- this->public.add_absolute = (void (*) (event_queue_t *event_queue, job_t *job, timeval_t time)) add_absolute;
- this->public.add_relative = (void (*) (event_queue_t *event_queue, job_t *job, u_int32_t ms)) add_relative;
- this->public.destroy = (void (*) (event_queue_t *event_queue)) event_queue_destroy;
-
- this->list = linked_list_create();
- pthread_mutex_init(&(this->mutex), NULL);
- pthread_cond_init(&(this->condvar), NULL);
-
- return (&this->public);
-}
diff --git a/src/charon/processing/event_queue.h b/src/charon/processing/event_queue.h
deleted file mode 100644
index c85286bf2..000000000
--- a/src/charon/processing/event_queue.h
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * @file event_queue.h
- *
- * @brief Interface of job_queue_t.
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * for more details.
- */
-
-#ifndef EVENT_QUEUE_H_
-#define EVENT_QUEUE_H_
-
-typedef struct event_queue_t event_queue_t;
-
-#include <sys/time.h>
-
-#include <library.h>
-#include <processing/jobs/job.h>
-
-/**
- * @brief Event-Queue used to store timed events.
- *
- * Added events are sorted. The get method blocks until
- * the time is elapsed to process the next event. The get
- * method is called from the scheduler_t thread, which
- * will add the jobs to to job_queue_t for further processing.
- *
- * Although the event-queue is based on a linked_list_t
- * all access functions are thread-save implemented.
- *
- * @b Constructors:
- * - event_queue_create()
- *
- * @ingroup processing
- */
-struct event_queue_t {
-
- /**
- * @brief Returns number of events in queue.
- *
- * @param event_queue calling object
- * @return number of events in queue
- */
- int (*get_count) (event_queue_t *event_queue);
-
- /**
- * @brief Get the next job from the event-queue.
- *
- * If no event is pending, this function blocks until a job can be returned.
- *
- * @param event_queue calling object
- * @param[out] job pointer to a job pointer where to job is returned to
- * @return next job
- */
- job_t *(*get) (event_queue_t *event_queue);
-
- /**
- * @brief Adds a event to the queue, using a relative time.
- *
- * This function is non blocking and adds a job_t at a specific time to the list.
- * The specific job object has to get destroyed by the thread which
- * removes the job.
- *
- * @param event_queue calling object
- * @param[in] job job to add to the queue (job is not copied)
- * @param[in] time relative time, when the event has to get fired
- */
- void (*add_relative) (event_queue_t *event_queue, job_t *job, u_int32_t ms);
-
- /**
- * @brief Adds a event to the queue, using an absolute time.
- *
- * This function is non blocking and adds a job_t at a specific time to the list.
- * The specific job object has to get destroyed by the thread which
- * removes the job.
- *
- * @param event_queue calling object
- * @param[in] job job to add to the queue (job is not copied)
- * @param[in] time absolute time, when the event has to get fired
- */
- void (*add_absolute) (event_queue_t *event_queue, job_t *job, timeval_t time);
-
- /**
- * @brief Destroys a event_queue object.
- *
- * @warning The caller of this function has to make sure
- * that no thread is going to add or get an event from the event_queue
- * after calling this function.
- *
- * @param event_queue calling object
- */
- void (*destroy) (event_queue_t *event_queue);
-};
-
-/**
- * @brief Creates an empty event_queue.
- *
- * @returns event_queue_t object
- *
- * @ingroup processing
- */
-event_queue_t *event_queue_create(void);
-
-#endif /*EVENT_QUEUE_H_*/
diff --git a/src/charon/processing/job_queue.c b/src/charon/processing/job_queue.c
deleted file mode 100644
index 2310ca6ff..000000000
--- a/src/charon/processing/job_queue.c
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * @file job_queue.c
- *
- * @brief Implementation of job_queue_t
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * for more details.
- */
-
-#include <stdlib.h>
-#include <pthread.h>
-
-#include "job_queue.h"
-
-#include <utils/linked_list.h>
-
-
-typedef struct private_job_queue_t private_job_queue_t;
-
-/**
- * @brief Private Variables and Functions of job_queue class
- *
- */
-struct private_job_queue_t {
-
- /**
- * public members
- */
- job_queue_t public;
-
- /**
- * The jobs are stored in a linked list
- */
- linked_list_t *list;
-
- /**
- * access to linked_list is locked through this mutex
- */
- pthread_mutex_t mutex;
-
- /**
- * If the queue is empty a thread has to wait
- * This condvar is used to wake up such a thread
- */
- pthread_cond_t condvar;
-};
-
-
-/**
- * implements job_queue_t.get_count
- */
-static int get_count(private_job_queue_t *this)
-{
- int count;
- pthread_mutex_lock(&(this->mutex));
- count = this->list->get_count(this->list);
- pthread_mutex_unlock(&(this->mutex));
- return count;
-}
-
-/**
- * implements job_queue_t.get
- */
-static job_t *get(private_job_queue_t *this)
-{
- int oldstate;
- job_t *job;
- pthread_mutex_lock(&(this->mutex));
- /* go to wait while no jobs available */
- while(this->list->get_count(this->list) == 0)
- {
- /* add mutex unlock handler for cancellation, enable cancellation */
- pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&(this->mutex));
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
-
- pthread_cond_wait( &(this->condvar), &(this->mutex));
-
- /* reset cancellation, remove mutex-unlock handler (without executing) */
- pthread_setcancelstate(oldstate, NULL);
- pthread_cleanup_pop(0);
- }
- this->list->remove_first(this->list, (void **)&job);
- pthread_mutex_unlock(&(this->mutex));
- return job;
-}
-
-/**
- * implements function job_queue_t.add
- */
-static void add(private_job_queue_t *this, job_t *job)
-{
- pthread_mutex_lock(&(this->mutex));
- this->list->insert_last(this->list,job);
- pthread_cond_signal( &(this->condvar));
- pthread_mutex_unlock(&(this->mutex));
-}
-
-/**
- * implements job_queue_t.destroy
- */
-static void job_queue_destroy (private_job_queue_t *this)
-{
- this->list->destroy_offset(this->list, offsetof(job_t, destroy));
- free(this);
-}
-
-/*
- *
- * Documented in header
- */
-job_queue_t *job_queue_create(void)
-{
- private_job_queue_t *this = malloc_thing(private_job_queue_t);
-
- this->public.get_count = (int(*)(job_queue_t*))get_count;
- this->public.get = (job_t*(*)(job_queue_t*))get;
- this->public.add = (void(*)(job_queue_t*, job_t*))add;
- this->public.destroy = (void(*)(job_queue_t*))job_queue_destroy;
-
- this->list = linked_list_create();
- pthread_mutex_init(&(this->mutex), NULL);
- pthread_cond_init(&(this->condvar), NULL);
-
- return (&this->public);
-}
diff --git a/src/charon/processing/job_queue.h b/src/charon/processing/job_queue.h
deleted file mode 100644
index 9b58588ae..000000000
--- a/src/charon/processing/job_queue.h
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * @file job_queue.h
- *
- * @brief Interface of job_queue_t.
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * for more details.
- */
-
-#ifndef JOB_QUEUE_H_
-#define JOB_QUEUE_H_
-
-typedef struct job_queue_t job_queue_t;
-
-#include <library.h>
-#include <processing/jobs/job.h>
-
-/**
- * @brief The job queue stores jobs, which will be processed by the thread_pool_t.
- *
- * Jobs are added from various sources, from the threads and
- * from the event_queue_t.
- * Although the job-queue is based on a linked_list_t
- * all access functions are thread-save implemented.
- *
- * @b Constructors:
- * - job_queue_create()
- *
- * @ingroup processing
- */
-struct job_queue_t {
-
- /**
- * @brief Returns number of jobs in queue.
- *
- * @param job_queue_t calling object
- * @returns number of items in queue
- */
- int (*get_count) (job_queue_t *job_queue);
-
- /**
- * @brief Get the next job from the queue.
- *
- * If the queue is empty, this function blocks until a job can be returned.
- * After using, the returned job has to get destroyed by the caller.
- *
- * @param job_queue_t calling object
- * @param[out] job pointer to a job pointer where to job is returned to
- * @return next job
- */
- job_t *(*get) (job_queue_t *job_queue);
-
- /**
- * @brief Adds a job to the queue.
- *
- * This function is non blocking and adds a job_t to the list.
- * The specific job object has to get destroyed by the thread which
- * removes the job.
- *
- * @param job_queue_t calling object
- * @param job job to add to the queue (job is not copied)
- */
- void (*add) (job_queue_t *job_queue, job_t *job);
-
- /**
- * @brief Destroys a job_queue object.
- *
- * @warning The caller of this function has to make sure
- * that no thread is going to add or get a job from the job_queue
- * after calling this function.
- *
- * @param job_queue_t calling object
- */
- void (*destroy) (job_queue_t *job_queue);
-};
-
-/**
- * @brief Creates an empty job_queue.
- *
- * @return job_queue_t object
- *
- * @ingroup processing
- */
-job_queue_t *job_queue_create(void);
-
-#endif /*JOB_QUEUE_H_*/
diff --git a/src/charon/processing/jobs/acquire_job.c b/src/charon/processing/jobs/acquire_job.c
index b4ffb258d..48a77f558 100644
--- a/src/charon/processing/jobs/acquire_job.c
+++ b/src/charon/processing/jobs/acquire_job.c
@@ -43,17 +43,17 @@ struct private_acquire_job_t {
};
/**
- * Implementation of job_t.get_type.
+ * Implementation of job_t.destroy.
*/
-static job_type_t get_type(private_acquire_job_t *this)
+static void destroy(private_acquire_job_t *this)
{
- return ACQUIRE;
+ free(this);
}
/**
* Implementation of job_t.execute.
*/
-static status_t execute(private_acquire_job_t *this)
+static void execute(private_acquire_job_t *this)
{
ike_sa_t *ike_sa;
@@ -63,20 +63,14 @@ static status_t execute(private_acquire_job_t *this)
{
DBG2(DBG_JOB, "CHILD_SA with reqid %d not found for acquiring",
this->reqid);
- return DESTROY_ME;
}
- ike_sa->acquire(ike_sa, this->reqid);
-
- charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
- return DESTROY_ME;
-}
-
-/**
- * Implementation of job_t.destroy.
- */
-static void destroy(private_acquire_job_t *this)
-{
- free(this);
+ else
+ {
+ ike_sa->acquire(ike_sa, this->reqid);
+
+ charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ }
+ destroy(this);
}
/*
@@ -87,12 +81,11 @@ acquire_job_t *acquire_job_create(u_int32_t reqid)
private_acquire_job_t *this = malloc_thing(private_acquire_job_t);
/* interface functions */
- this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
- this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+ this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
/* private variables */
this->reqid = reqid;
- return &(this->public);
+ return &this->public;
}
diff --git a/src/charon/processing/jobs/callback_job.c b/src/charon/processing/jobs/callback_job.c
new file mode 100644
index 000000000..86aa93c6c
--- /dev/null
+++ b/src/charon/processing/jobs/callback_job.c
@@ -0,0 +1,213 @@
+/**
+ * @file callback_job.c
+ *
+ * @brief Implementation of callback_job_t.
+ *
+ */
+
+/*
+ * Copyright (C) 2007 Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include "callback_job.h"
+
+#include <daemon.h>
+
+typedef struct private_callback_job_t private_callback_job_t;
+
+/**
+ * Private data of an callback_job_t Object.
+ */
+struct private_callback_job_t {
+ /**
+ * Public callback_job_t interface.
+ */
+ callback_job_t public;
+
+ /**
+ * Callback to call on execution
+ */
+ callback_job_cb_t callback;
+
+ /**
+ * parameter to supply to callback
+ */
+ void *data;
+
+ /**
+ * cleanup function for data
+ */
+ callback_job_cleanup_t cleanup;
+
+ /**
+ * thread ID of the job, if running
+ */
+ pthread_t thread;
+
+ /**
+ * mutex to synchronize thread startup/cancellation
+ */
+ pthread_mutex_t mutex;
+
+ /**
+ * condvar to synchronize thread startup/cancellation
+ */
+ pthread_cond_t condvar;
+
+ /**
+ * list of asociated child jobs
+ */
+ linked_list_t *children;
+
+ /**
+ * parent of this job, or NULL
+ */
+ private_callback_job_t *parent;
+};
+
+/**
+ * Implements job_t.destroy.
+ */
+static void destroy(private_callback_job_t *this)
+{
+ if (this->cleanup)
+ {
+ this->cleanup(this->data);
+ }
+ this->children->destroy(this->children);
+ free(this);
+}
+
+/**
+ * unregister a child from its parent, if any.
+ */
+static void unregister(private_callback_job_t *this)
+{
+ if (this->parent)
+ {
+ iterator_t *iterator;
+ private_callback_job_t *child;
+
+ pthread_mutex_lock(&this->parent->mutex);
+ iterator = this->parent->children->create_iterator(this->parent->children, TRUE);
+ while (iterator->iterate(iterator, (void**)&child))
+ {
+ if (child == this)
+ {
+ iterator->remove(iterator);
+ break;
+ }
+ }
+ iterator->destroy(iterator);
+ pthread_mutex_unlock(&this->parent->mutex);
+ }
+}
+
+/**
+ * Implementation of callback_job_t.cancel.
+ */
+static void cancel(private_callback_job_t *this)
+{
+ pthread_t thread;
+
+ /* wait until thread has started */
+ pthread_mutex_lock(&this->mutex);
+ while (this->thread == 0)
+ {
+ pthread_cond_wait(&this->condvar, &this->mutex);
+ }
+ thread = this->thread;
+
+ /* terminate its children */
+ this->children->invoke(this->children, offsetof(callback_job_t, cancel));
+ pthread_mutex_unlock(&this->mutex);
+
+ /* terminate thread */
+ pthread_cancel(thread);
+ pthread_join(thread, NULL);
+}
+
+/**
+ * Implementation of job_t.execute.
+ */
+static void execute(private_callback_job_t *this)
+{
+ bool cleanup = FALSE;
+
+ pthread_mutex_lock(&this->mutex);
+ this->thread = pthread_self();
+ pthread_cond_signal(&this->condvar);
+ pthread_mutex_unlock(&this->mutex);
+
+ pthread_cleanup_push((void*)destroy, this);
+ while (TRUE)
+ {
+ switch (this->callback(this->data))
+ {
+ case JOB_REQUEUE_DIRECT:
+ continue;
+ case JOB_REQUEUE_FAIR:
+ {
+ charon->processor->queue_job(charon->processor,
+ &this->public.job_interface);
+ break;
+ }
+ case JOB_REQUEUE_NONE:
+ default:
+ {
+ cleanup = TRUE;
+ break;
+ }
+ }
+ break;
+ }
+ unregister(this);
+ pthread_cleanup_pop(cleanup);
+}
+
+/*
+ * Described in header.
+ */
+callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
+ callback_job_cleanup_t cleanup,
+ callback_job_t *parent)
+{
+ private_callback_job_t *this = malloc_thing(private_callback_job_t);
+
+ /* interface functions */
+ this->public.job_interface.execute = (void (*) (job_t *)) execute;
+ this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
+ this->public.cancel = (void(*)(callback_job_t*))cancel;
+
+ /* private variables */
+ pthread_mutex_init(&this->mutex, NULL);
+ pthread_cond_init(&this->condvar, NULL);
+ this->callback = cb;
+ this->data = data;
+ this->cleanup = cleanup;
+ this->thread = 0;
+ this->children = linked_list_create();
+ this->parent = (private_callback_job_t*)parent;
+
+ /* register us at parent */
+ if (parent)
+ {
+ pthread_mutex_lock(&this->parent->mutex);
+ this->parent->children->insert_last(this->parent->children, this);
+ pthread_mutex_unlock(&this->parent->mutex);
+ }
+
+ return &this->public;
+}
+
diff --git a/src/charon/processing/jobs/callback_job.h b/src/charon/processing/jobs/callback_job.h
new file mode 100644
index 000000000..5450cb61b
--- /dev/null
+++ b/src/charon/processing/jobs/callback_job.h
@@ -0,0 +1,126 @@
+/**
+ * @file callback_job.h
+ *
+ * @brief Interface of callback_job_t.
+ *
+ */
+
+/*
+ * Copyright (C) 2007 Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#ifndef CALLBACK_JOB_H_
+#define CALLBACK_JOB_H_
+
+typedef struct callback_job_t callback_job_t;
+
+#include <library.h>
+#include <processing/jobs/job.h>
+
+
+typedef enum job_requeue_t job_requeue_t;
+
+/**
+ * @brief Job requeueing policy
+ */
+enum job_requeue_t {
+
+ /**
+ * Do not requeue job, destroy it
+ */
+ JOB_REQUEUE_NONE,
+
+ /**
+ * Reque the job farly, meaning it has to queue as any other job
+ */
+ JOB_REQUEUE_FAIR,
+
+ /**
+ * Reexecute the job directly, without the need of requeing it
+ */
+ JOB_REQUEUE_DIRECT,
+};
+
+/**
+ * @brief The callback function to use for the callback job.
+ *
+ * This is the function to use as callback for a callback job. It receives
+ * a parameter supplied to the callback jobs constructor.
+ *
+ * @param data param supplied to job
+ * @return requeing policy how to requeue the job
+ */
+typedef job_requeue_t (*callback_job_cb_t)(void *data);
+
+/**
+ * @brief Cleanup function to use for data cleanup.
+ *
+ * The callback has an optional user argument which receives data. However,
+ * this data may be cleaned up if it is allocated. This is the function
+ * to supply to the constructor.
+ *
+ * @param data param supplied to job
+ * @return requeing policy how to requeue the job
+ */
+typedef void (*callback_job_cleanup_t)(void *data);
+
+/**
+ * @brief Class representing an callback Job.
+ *
+ * This is a special job which allows a simple callback function to
+ * be executed by a thread of the thread pool. This allows simple execution
+ * of asynchronous methods, without to manage threads.
+ *
+ * @b Constructors:
+ * - callback_job_create()
+ *
+ * @ingroup jobs
+ */
+struct callback_job_t {
+ /**
+ * The job_t interface.
+ */
+ job_t job_interface;
+
+ /**
+ * @brief Cancel the jobs thread and wait for its termination.
+ *
+ * @param this calling object
+ */
+ void (*cancel)(callback_job_t *this);
+};
+
+/**
+ * @brief Creates a callback job.
+ *
+ * The cleanup function is called when the job gets destroyed to destroy
+ * the associated data.
+ * If parent is not NULL, the specified job gets an association. Whenever
+ * the parent gets cancelled (or runs out), all of its children are cancelled,
+ * too.
+ *
+ * @param cb callback to call from the processor
+ * @param data user data to supply to callback
+ * @param cleanup destructor for data on destruction, or NULL
+ * @param parent parent of this job
+ * @return callback_job_t object
+ *
+ * @ingroup jobs
+ */
+callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
+ callback_job_cleanup_t cleanup,
+ callback_job_t *parent);
+
+#endif /* CALLBACK_JOB_H_ */
+
diff --git a/src/charon/processing/jobs/delete_child_sa_job.c b/src/charon/processing/jobs/delete_child_sa_job.c
index f694696b0..23f330293 100644
--- a/src/charon/processing/jobs/delete_child_sa_job.c
+++ b/src/charon/processing/jobs/delete_child_sa_job.c
@@ -54,17 +54,17 @@ struct private_delete_child_sa_job_t {
};
/**
- * Implementation of job_t.get_type.
+ * Implementation of job_t.destroy.
*/
-static job_type_t get_type(private_delete_child_sa_job_t *this)
+static void destroy(private_delete_child_sa_job_t *this)
{
- return DELETE_CHILD_SA;
+ free(this);
}
/**
* Implementation of job_t.execute.
*/
-static status_t execute(private_delete_child_sa_job_t *this)
+static void execute(private_delete_child_sa_job_t *this)
{
ike_sa_t *ike_sa;
@@ -74,20 +74,14 @@ static status_t execute(private_delete_child_sa_job_t *this)
{
DBG1(DBG_JOB, "CHILD_SA with reqid %d not found for delete",
this->reqid);
- return DESTROY_ME;
}
- ike_sa->delete_child_sa(ike_sa, this->protocol, this->spi);
-
- charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
- return DESTROY_ME;
-}
-
-/**
- * Implementation of job_t.destroy.
- */
-static void destroy(private_delete_child_sa_job_t *this)
-{
- free(this);
+ else
+ {
+ ike_sa->delete_child_sa(ike_sa, this->protocol, this->spi);
+
+ charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ }
+ destroy(this);
}
/*
@@ -100,8 +94,7 @@ delete_child_sa_job_t *delete_child_sa_job_create(u_int32_t reqid,
private_delete_child_sa_job_t *this = malloc_thing(private_delete_child_sa_job_t);
/* interface functions */
- this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
- this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+ this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
/* private variables */
@@ -109,5 +102,6 @@ delete_child_sa_job_t *delete_child_sa_job_create(u_int32_t reqid,
this->protocol = protocol;
this->spi = spi;
- return &(this->public);
+ return &this->public;
}
+
diff --git a/src/charon/processing/jobs/delete_ike_sa_job.c b/src/charon/processing/jobs/delete_ike_sa_job.c
index 706155aa6..8d8c0cf36 100644
--- a/src/charon/processing/jobs/delete_ike_sa_job.c
+++ b/src/charon/processing/jobs/delete_ike_sa_job.c
@@ -47,18 +47,20 @@ struct private_delete_ike_sa_job_t {
bool delete_if_established;
};
+
/**
- * Implements job_t.get_type.
+ * Implements job_t.destroy.
*/
-static job_type_t get_type(private_delete_ike_sa_job_t *this)
+static void destroy(private_delete_ike_sa_job_t *this)
{
- return DELETE_IKE_SA;
+ this->ike_sa_id->destroy(this->ike_sa_id);
+ free(this);
}
/**
* Implementation of job_t.execute.
*/
-static status_t execute(private_delete_ike_sa_job_t *this)
+static void execute(private_delete_ike_sa_job_t *this)
{
ike_sa_t *ike_sa;
@@ -93,16 +95,7 @@ static status_t execute(private_delete_ike_sa_job_t *this)
}
}
}
- return DESTROY_ME;
-}
-
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_delete_ike_sa_job_t *this)
-{
- this->ike_sa_id->destroy(this->ike_sa_id);
- free(this);
+ destroy(this);
}
/*
@@ -114,8 +107,7 @@ delete_ike_sa_job_t *delete_ike_sa_job_create(ike_sa_id_t *ike_sa_id,
private_delete_ike_sa_job_t *this = malloc_thing(private_delete_ike_sa_job_t);
/* interface functions */
- this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
- this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+ this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*)(job_t *)) destroy;;
/* private variables */
diff --git a/src/charon/processing/jobs/job.c b/src/charon/processing/jobs/job.c
deleted file mode 100644
index d32d1bc61..000000000
--- a/src/charon/processing/jobs/job.c
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * @file job.c
- *
- * @brief Interface additions to job_t.
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * for more details.
- */
-
-
-#include "job.h"
-
-ENUM(job_type_names, PROCESS_MESSAGE, SEND_DPD,
- "PROCESS_MESSAGE",
- "RETRANSMIT",
- "INITIATE",
- "ROUTE",
- "ACQUIRE",
- "DELETE_IKE_SA",
- "DELETE_CHILD_SA",
- "REKEY_CHILD_SA",
- "REKEY_IKE_SA",
- "SEND_KEEPALIVE",
- "SEND_DPD",
-);
diff --git a/src/charon/processing/jobs/job.h b/src/charon/processing/jobs/job.h
index 28632672d..1826c53b4 100644
--- a/src/charon/processing/jobs/job.h
+++ b/src/charon/processing/jobs/job.h
@@ -24,108 +24,14 @@
#ifndef JOB_H_
#define JOB_H_
-typedef enum job_type_t job_type_t;
typedef struct job_t job_t;
#include <library.h>
-/**
- * @brief Definition of the various job types.
- *
- * @ingroup jobs
- */
-enum job_type_t {
- /**
- * Process an incoming IKEv2-Message.
- *
- * Job is implemented in class process_message_job_t
- */
- PROCESS_MESSAGE,
-
- /**
- * Retransmit an IKEv2-Message.
- *
- * Job is implemented in class retransmit_job_t
- */
- RETRANSMIT,
-
- /**
- * Set up a CHILD_SA, optional with an IKE_SA.
- *
- * Job is implemented in class initiate_job_t
- */
- INITIATE,
-
- /**
- * Install SPD entries.
- *
- * Job is implemented in class route_job_t
- */
- ROUTE,
-
- /**
- * React on a acquire message from the kernel (e.g. setup CHILD_SA)
- *
- * Job is implemented in class acquire_job_t
- */
- ACQUIRE,
-
- /**
- * Delete an IKE_SA.
- *
- * Job is implemented in class delete_ike_sa_job_t
- */
- DELETE_IKE_SA,
-
- /**
- * Delete a CHILD_SA.
- *
- * Job is implemented in class delete_child_sa_job_t
- */
- DELETE_CHILD_SA,
-
- /**
- * Rekey a CHILD_SA.
- *
- * Job is implemented in class rekey_child_sa_job_t
- */
- REKEY_CHILD_SA,
-
- /**
- * Rekey an IKE_SA.
- *
- * Job is implemented in class rekey_ike_sa_job_t
- */
- REKEY_IKE_SA,
-
- /**
- * Send a keepalive packet.
- *
- * Job is implemented in class type send_keepalive_job_t
- */
- SEND_KEEPALIVE,
-
- /**
- * Send a DPD packet.
- *
- * Job is implemented in class type send_dpd_job_t
- */
- SEND_DPD
-};
-
-/**
- * enum name for job_type_t
- *
- * @ingroup jobs
- */
-extern enum_name_t *job_type_names;
-
/**
* @brief Job-Interface as it is stored in the job queue.
*
- * A job consists of a job-type and one or more assigned values.
- *
* @b Constructors:
* - None, use specific implementation of the interface.
*
@@ -134,32 +40,26 @@ extern enum_name_t *job_type_names;
struct job_t {
/**
- * @brief get type of job.
- *
- * @param this calling object
- * @return type of this job
- */
- job_type_t (*get_type) (job_t *this);
-
- /**
* @brief Execute a job.
*
- * Call the internall job routine to process the
- * job. If this method returns DESTROY_ME, the job
- * must be destroyed by the caller.
+ * The processing facility executes a job using this method. Jobs are
+ * one-shot, they destroy themself after execution, so don't use a job
+ * once it has been executed.
*
* @param this calling object
- * @return status of job execution
*/
- status_t (*execute) (job_t *this);
+ void (*execute) (job_t *this);
/**
- * @brief Destroys a job_t object
+ * @brief Destroy a job.
+ *
+ * Is only called whenever a job was not executed (e.g. due daemon shutdown).
+ * After execution, jobs destroy themself.
*
* @param job_t calling object
*/
void (*destroy) (job_t *job);
};
-
#endif /* JOB_H_ */
+
diff --git a/src/charon/processing/jobs/process_message_job.c b/src/charon/processing/jobs/process_message_job.c
index ee7484bbd..6a0921248 100644
--- a/src/charon/processing/jobs/process_message_job.c
+++ b/src/charon/processing/jobs/process_message_job.c
@@ -44,17 +44,18 @@ struct private_process_message_job_t {
};
/**
- * Implements job_t.get_type.
+ * Implements job_t.destroy.
*/
-static job_type_t get_type(private_process_message_job_t *this)
+static void destroy(private_process_message_job_t *this)
{
- return PROCESS_MESSAGE;
+ this->message->destroy(this->message);
+ free(this);
}
/**
* Implementation of job_t.execute.
*/
-static status_t execute(private_process_message_job_t *this)
+static void execute(private_process_message_job_t *this)
{
ike_sa_t *ike_sa;
@@ -75,16 +76,7 @@ static status_t execute(private_process_message_job_t *this)
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
}
- return DESTROY_ME;
-}
-
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_process_message_job_t *this)
-{
- this->message->destroy(this->message);
- free(this);
+ destroy(this);
}
/*
@@ -95,8 +87,7 @@ process_message_job_t *process_message_job_create(message_t *message)
private_process_message_job_t *this = malloc_thing(private_process_message_job_t);
/* interface functions */
- this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
- this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+ this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void(*)(job_t*))destroy;
/* private variables */
diff --git a/src/charon/processing/jobs/rekey_child_sa_job.c b/src/charon/processing/jobs/rekey_child_sa_job.c
index 3422b614d..f754e5a1f 100644
--- a/src/charon/processing/jobs/rekey_child_sa_job.c
+++ b/src/charon/processing/jobs/rekey_child_sa_job.c
@@ -53,17 +53,17 @@ struct private_rekey_child_sa_job_t {
};
/**
- * Implementation of job_t.get_type.
+ * Implementation of job_t.destroy.
*/
-static job_type_t get_type(private_rekey_child_sa_job_t *this)
+static void destroy(private_rekey_child_sa_job_t *this)
{
- return REKEY_CHILD_SA;
+ free(this);
}
/**
* Implementation of job_t.execute.
*/
-static status_t execute(private_rekey_child_sa_job_t *this)
+static void execute(private_rekey_child_sa_job_t *this)
{
ike_sa_t *ike_sa;
@@ -73,20 +73,13 @@ static status_t execute(private_rekey_child_sa_job_t *this)
{
DBG2(DBG_JOB, "CHILD_SA with reqid %d not found for rekeying",
this->reqid);
- return DESTROY_ME;
}
- ike_sa->rekey_child_sa(ike_sa, this->protocol, this->spi);
-
- charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
- return DESTROY_ME;
-}
-
-/**
- * Implementation of job_t.destroy.
- */
-static void destroy(private_rekey_child_sa_job_t *this)
-{
- free(this);
+ else
+ {
+ ike_sa->rekey_child_sa(ike_sa, this->protocol, this->spi);
+ charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ }
+ destroy(this);
}
/*
@@ -99,8 +92,7 @@ rekey_child_sa_job_t *rekey_child_sa_job_create(u_int32_t reqid,
private_rekey_child_sa_job_t *this = malloc_thing(private_rekey_child_sa_job_t);
/* interface functions */
- this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
- this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+ this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
/* private variables */
@@ -108,5 +100,5 @@ rekey_child_sa_job_t *rekey_child_sa_job_create(u_int32_t reqid,
this->protocol = protocol;
this->spi = spi;
- return &(this->public);
+ return &this->public;
}
diff --git a/src/charon/processing/jobs/rekey_ike_sa_job.c b/src/charon/processing/jobs/rekey_ike_sa_job.c
index f6c058634..25785221d 100644
--- a/src/charon/processing/jobs/rekey_ike_sa_job.c
+++ b/src/charon/processing/jobs/rekey_ike_sa_job.c
@@ -48,17 +48,18 @@ struct private_rekey_ike_sa_job_t {
};
/**
- * Implementation of job_t.get_type.
+ * Implementation of job_t.destroy.
*/
-static job_type_t get_type(private_rekey_ike_sa_job_t *this)
+static void destroy(private_rekey_ike_sa_job_t *this)
{
- return REKEY_IKE_SA;
+ this->ike_sa_id->destroy(this->ike_sa_id);
+ free(this);
}
/**
* Implementation of job_t.execute.
*/
-static status_t execute(private_rekey_ike_sa_job_t *this)
+static void execute(private_rekey_ike_sa_job_t *this)
{
ike_sa_t *ike_sa;
status_t status = SUCCESS;
@@ -68,36 +69,28 @@ static status_t execute(private_rekey_ike_sa_job_t *this)
if (ike_sa == NULL)
{
DBG2(DBG_JOB, "IKE_SA to rekey not found");
- return DESTROY_ME;
- }
-
- if (this->reauth)
- {
- ike_sa->reestablish(ike_sa);
}
else
{
- status = ike_sa->rekey(ike_sa);
- }
-
- if (status == DESTROY_ME)
- {
- charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
- }
- else
- {
- charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ if (this->reauth)
+ {
+ ike_sa->reestablish(ike_sa);
+ }
+ else
+ {
+ status = ike_sa->rekey(ike_sa);
+ }
+
+ if (status == DESTROY_ME)
+ {
+ charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
+ }
+ else
+ {
+ charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ }
}
- return DESTROY_ME;
-}
-
-/**
- * Implementation of job_t.destroy.
- */
-static void destroy(private_rekey_ike_sa_job_t *this)
-{
- this->ike_sa_id->destroy(this->ike_sa_id);
- free(this);
+ destroy(this);
}
/*
@@ -108,8 +101,7 @@ rekey_ike_sa_job_t *rekey_ike_sa_job_create(ike_sa_id_t *ike_sa_id, bool reauth)
private_rekey_ike_sa_job_t *this = malloc_thing(private_rekey_ike_sa_job_t);
/* interface functions */
- this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
- this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+ this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
/* private variables */
diff --git a/src/charon/processing/jobs/retransmit_job.c b/src/charon/processing/jobs/retransmit_job.c
index 5bfa20dfd..8c15aa651 100644
--- a/src/charon/processing/jobs/retransmit_job.c
+++ b/src/charon/processing/jobs/retransmit_job.c
@@ -48,17 +48,18 @@ struct private_retransmit_job_t {
};
/**
- * Implements job_t.get_type.
+ * Implements job_t.destroy.
*/
-static job_type_t get_type(private_retransmit_job_t *this)
+static void destroy(private_retransmit_job_t *this)
{
- return RETRANSMIT;
+ this->ike_sa_id->destroy(this->ike_sa_id);
+ free(this);
}
/**
* Implementation of job_t.execute.
*/
-static status_t execute(private_retransmit_job_t *this)
+static void execute(private_retransmit_job_t *this)
{
ike_sa_t *ike_sa;
@@ -77,16 +78,7 @@ static status_t execute(private_retransmit_job_t *this)
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
}
- return DESTROY_ME;
-}
-
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_retransmit_job_t *this)
-{
- this->ike_sa_id->destroy(this->ike_sa_id);
- free(this);
+ destroy(this);
}
/*
@@ -97,8 +89,7 @@ retransmit_job_t *retransmit_job_create(u_int32_t message_id,ike_sa_id_t *ike_sa
private_retransmit_job_t *this = malloc_thing(private_retransmit_job_t);
/* interface functions */
- this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
- this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+ this->public.job_interface.execute = (void (*) (job_t *)) execute;
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
/* private variables */
diff --git a/src/charon/processing/jobs/send_dpd_job.c b/src/charon/processing/jobs/send_dpd_job.c
index 7294d78d5..cb259d9c6 100644
--- a/src/charon/processing/jobs/send_dpd_job.c
+++ b/src/charon/processing/jobs/send_dpd_job.c
@@ -47,45 +47,35 @@ struct private_send_dpd_job_t {
};
/**
- * Implements send_dpd_job_t.get_type.
+ * Implements job_t.destroy.
*/
-static job_type_t get_type(private_send_dpd_job_t *this)
+static void destroy(private_send_dpd_job_t *this)
{
- return SEND_DPD;
+ this->ike_sa_id->destroy(this->ike_sa_id);
+ free(this);
}
/**
* Implementation of job_t.execute.
*/
-static status_t execute(private_send_dpd_job_t *this)
+static void execute(private_send_dpd_job_t *this)
{
ike_sa_t *ike_sa;
ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
this->ike_sa_id);
- if (ike_sa == NULL)
- {
- return DESTROY_ME;
- }
-
- if (ike_sa->send_dpd(ike_sa) == DESTROY_ME)
- {
- charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
- }
- else
+ if (ike_sa)
{
- charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ if (ike_sa->send_dpd(ike_sa) == DESTROY_ME)
+ {
+ charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa);
+ }
+ else
+ {
+ charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ }
}
- return DESTROY_ME;
-}
-
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_send_dpd_job_t *this)
-{
- this->ike_sa_id->destroy(this->ike_sa_id);
- free(this);
+ destroy(this);
}
/*
@@ -96,9 +86,8 @@ send_dpd_job_t *send_dpd_job_create(ike_sa_id_t *ike_sa_id)
private_send_dpd_job_t *this = malloc_thing(private_send_dpd_job_t);
/* interface functions */
- this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
- this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+ this->public.job_interface.execute = (void (*) (job_t *)) execute;
/* public functions */
this->public.destroy = (void (*)(send_dpd_job_t *)) destroy;
@@ -106,5 +95,5 @@ send_dpd_job_t *send_dpd_job_create(ike_sa_id_t *ike_sa_id)
/* private variables */
this->ike_sa_id = ike_sa_id->clone(ike_sa_id);
- return &(this->public);
+ return &this->public;
}
diff --git a/src/charon/processing/jobs/send_keepalive_job.c b/src/charon/processing/jobs/send_keepalive_job.c
index 1c1cb288e..6d529e1b3 100644
--- a/src/charon/processing/jobs/send_keepalive_job.c
+++ b/src/charon/processing/jobs/send_keepalive_job.c
@@ -47,38 +47,29 @@ struct private_send_keepalive_job_t {
};
/**
- * Implements send_keepalive_job_t.get_type.
+ * Implements job_t.destroy.
*/
-static job_type_t get_type(private_send_keepalive_job_t *this)
+static void destroy(private_send_keepalive_job_t *this)
{
- return SEND_KEEPALIVE;
+ this->ike_sa_id->destroy(this->ike_sa_id);
+ free(this);
}
/**
* Implementation of job_t.execute.
*/
-static status_t execute(private_send_keepalive_job_t *this)
+static void execute(private_send_keepalive_job_t *this)
{
ike_sa_t *ike_sa;
ike_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
this->ike_sa_id);
- if (ike_sa == NULL)
+ if (ike_sa)
{
- return DESTROY_ME;
+ ike_sa->send_keepalive(ike_sa);
+ charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
- ike_sa->send_keepalive(ike_sa);
- charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
- return DESTROY_ME;
-}
-
-/**
- * Implements job_t.destroy.
- */
-static void destroy(private_send_keepalive_job_t *this)
-{
- this->ike_sa_id->destroy(this->ike_sa_id);
- free(this);
+ destroy(this);
}
/*
@@ -89,9 +80,8 @@ send_keepalive_job_t *send_keepalive_job_create(ike_sa_id_t *ike_sa_id)
private_send_keepalive_job_t *this = malloc_thing(private_send_keepalive_job_t);
/* interface functions */
- this->public.job_interface.get_type = (job_type_t (*) (job_t *)) get_type;
this->public.job_interface.destroy = (void (*) (job_t *)) destroy;
- this->public.job_interface.execute = (status_t (*) (job_t *)) execute;
+ this->public.job_interface.execute = (void (*) (job_t *)) execute;
/* public functions */
this->public.destroy = (void (*)(send_keepalive_job_t *)) destroy;
@@ -99,5 +89,5 @@ send_keepalive_job_t *send_keepalive_job_create(ike_sa_id_t *ike_sa_id)
/* private variables */
this->ike_sa_id = ike_sa_id->clone(ike_sa_id);
- return &(this->public);
+ return &this->public;
}
diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c
new file mode 100644
index 000000000..b3815eeb1
--- /dev/null
+++ b/src/charon/processing/processor.c
@@ -0,0 +1,233 @@
+/**
+ * @file processor.c
+ *
+ * @brief Implementation of processor_t.
+ *
+ */
+
+/*
+ * Copyright (C) 2005-2007 Martin Willi
+ * Copyright (C) 2005 Jan Hutter
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include <stdlib.h>
+#include <pthread.h>
+#include <string.h>
+#include <errno.h>
+
+#include "processor.h"
+
+#include <daemon.h>
+#include <utils/linked_list.h>
+
+
+typedef struct private_processor_t private_processor_t;
+
+/**
+ * @brief Private data of processor_t class.
+ */
+struct private_processor_t {
+ /**
+ * Public processor_t interface.
+ */
+ processor_t public;
+
+ /**
+ * Number of running threads
+ */
+ u_int total_threads;
+
+ /**
+ * Desired number of threads
+ */
+ u_int desired_threads;
+
+ /**
+ * Number of threads waiting for work
+ */
+ u_int idle_threads;
+
+ /**
+ * The jobs are stored in a linked list
+ */
+ linked_list_t *list;
+
+ /**
+ * access to linked_list is locked through this mutex
+ */
+ pthread_mutex_t mutex;
+
+ /**
+ * Condvar to wait for new jobs
+ */
+ pthread_cond_t condvar;
+};
+
+static void process_jobs(private_processor_t *this);
+
+/**
+ * restart a terminated thread
+ */
+static void restart(private_processor_t *this)
+{
+ pthread_t thread;
+
+ if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0)
+ {
+ this->total_threads--;
+ }
+}
+
+/**
+ * Process queued jobs, called by the worker threads
+ */
+static void process_jobs(private_processor_t *this)
+{
+ int oldstate;
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
+
+ DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self());
+
+ pthread_mutex_lock(&this->mutex);
+ while (this->desired_threads >= this->total_threads)
+ {
+ job_t *job;
+
+ if (this->list->get_count(this->list) == 0)
+ {
+ this->idle_threads++;
+ pthread_cond_wait(&this->condvar, &this->mutex);
+ this->idle_threads--;
+ continue;
+ }
+ this->list->remove_first(this->list, (void**)&job);
+ pthread_mutex_unlock(&this->mutex);
+ /* terminated threads are restarted, so we have a constant pool */
+ pthread_cleanup_push((void*)restart, this);
+ job->execute(job);
+ pthread_cleanup_pop(0);
+ pthread_mutex_lock(&this->mutex);
+ }
+ this->total_threads--;
+ pthread_cond_broadcast(&this->condvar);
+ pthread_mutex_unlock(&this->mutex);
+}
+
+/**
+ * Implementation of processor_t.get_total_threads.
+ */
+static u_int get_total_threads(private_processor_t *this)
+{
+ return this->total_threads;
+}
+
+/**
+ * Implementation of processor_t.get_idle_threads.
+ */
+static u_int get_idle_threads(private_processor_t *this)
+{
+ return this->idle_threads;
+}
+
+/**
+ * implements processor_t.get_job_load
+ */
+static u_int get_job_load(private_processor_t *this)
+{
+ u_int load;
+ pthread_mutex_lock(&this->mutex);
+ load = this->list->get_count(this->list);
+ pthread_mutex_unlock(&this->mutex);
+ return load;
+}
+
+/**
+ * implements function processor_t.queue_job
+ */
+static void queue_job(private_processor_t *this, job_t *job)
+{
+ pthread_mutex_lock(&this->mutex);
+ this->list->insert_last(this->list, job);
+ pthread_mutex_unlock(&this->mutex);
+ pthread_cond_signal(&this->condvar);
+}
+
+/**
+ * Implementation of processor_t.set_threads.
+ */
+static void set_threads(private_processor_t *this, u_int count)
+{
+ pthread_mutex_lock(&this->mutex);
+ if (count > this->total_threads)
+ { /* increase thread count */
+ int i;
+ pthread_t current;
+
+ this->desired_threads = count;
+ DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
+ for (i = this->total_threads; i < count; i++)
+ {
+ if (pthread_create(&current, NULL, (void*)process_jobs, this) == 0)
+ {
+ this->total_threads++;
+ }
+ }
+ }
+ else if (count < this->total_threads)
+ { /* decrease thread count */
+ this->desired_threads = count;
+ }
+ pthread_mutex_unlock(&this->mutex);
+}
+
+/**
+ * Implementation of processor_t.destroy.
+ */
+static void destroy(private_processor_t *this)
+{
+ set_threads(this, 0);
+ while (this->total_threads > 0)
+ {
+ pthread_cond_broadcast(&this->condvar);
+ pthread_cond_wait(&this->condvar, &this->mutex);
+ }
+ this->list->destroy_offset(this->list, offsetof(job_t, destroy));
+ free(this);
+}
+
+/*
+ * Described in header.
+ */
+processor_t *processor_create(size_t pool_size)
+{
+ private_processor_t *this = malloc_thing(private_processor_t);
+
+ this->public.get_total_threads = (u_int(*)(processor_t*))get_total_threads;
+ this->public.get_idle_threads = (u_int(*)(processor_t*))get_idle_threads;
+ this->public.get_job_load = (u_int(*)(processor_t*))get_job_load;
+ this->public.queue_job = (void(*)(processor_t*, job_t*))queue_job;
+ this->public.set_threads = (void(*)(processor_t*, u_int))set_threads;
+ this->public.destroy = (void(*)(processor_t*))destroy;
+
+ this->list = linked_list_create();
+ pthread_mutex_init(&this->mutex, NULL);
+ pthread_cond_init(&this->condvar, NULL);
+ this->total_threads = 0;
+ this->desired_threads = 0;
+ this->idle_threads = 0;
+
+ return &this->public;
+}
+
diff --git a/src/charon/processing/processor.h b/src/charon/processing/processor.h
new file mode 100644
index 000000000..6763e27d0
--- /dev/null
+++ b/src/charon/processing/processor.h
@@ -0,0 +1,109 @@
+/**
+ * @file processor.h
+ *
+ * @brief Interface of processor_t.
+ *
+ */
+
+/*
+ * Copyright (C) 2005-2007 Martin Willi
+ * Copyright (C) 2005 Jan Hutter
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#ifndef PROCESSOR_H_
+#define PROCESSOR_H_
+
+typedef struct processor_t processor_t;
+
+#include <stdlib.h>
+
+#include <library.h>
+#include <processing/jobs/job.h>
+
+/**
+ * @brief The processor uses threads to process queued jobs.
+ *
+ * @b Constructors:
+ * - processor_create()
+ *
+ * @ingroup processing
+ */
+struct processor_t {
+
+ /**
+ * @brief Get the total number of threads used by the processor.
+ *
+ * @param this calling object
+ * @return size of thread pool
+ */
+ u_int (*get_total_threads) (processor_t *this);
+
+ /**
+ * @brief Get the number of threads currently waiting.
+ *
+ * @param this calling object
+ * @return number of idle threads
+ */
+ u_int (*get_idle_threads) (processor_t *this);
+
+ /**
+ * @brief Get the number of queued jobs.
+ *
+ * @param this calling object
+ * @returns number of items in queue
+ */
+ u_int (*get_job_load) (processor_t *this);
+
+ /**
+ * @brief Adds a job to the queue.
+ *
+ * This function is non blocking and adds a job_t to the queue.
+ *
+ * @param this calling object
+ * @param job job to add to the queue
+ */
+ void (*queue_job) (processor_t *this, job_t *job);
+
+ /**
+ * @brief Set the number of threads to use in the processor.
+ *
+ * If the number of threads is smaller than number of currently running
+ * threads, thread count is decreased. Use 0 to disable the processor.
+ * This call blocks if it decreases thread count until threads have
+ * terminated, so make sure there are not too many blocking jobs.
+ *
+ * @param this calling object
+ * @param count number of threads to allocate
+ */
+ void (*set_threads)(processor_t *this, u_int count);
+
+ /**
+ * @brief Destroy a processor object.
+ *
+ * @param processor calling object
+ */
+ void (*destroy) (processor_t *processor);
+};
+
+/**
+ * @brief Create the thread pool without any threads.
+ *
+ * @return processor_t object
+ *
+ * @ingroup processing
+ */
+processor_t *processor_create();
+
+#endif /*PROCESSOR_H_*/
+
diff --git a/src/charon/processing/scheduler.c b/src/charon/processing/scheduler.c
index 7249e43e6..2706585b0 100644
--- a/src/charon/processing/scheduler.c
+++ b/src/charon/processing/scheduler.c
@@ -23,12 +23,39 @@
#include <stdlib.h>
#include <pthread.h>
+#include <sys/time.h>
#include "scheduler.h"
#include <daemon.h>
-#include <processing/job_queue.h>
+#include <processing/processor.h>
+#include <processing/jobs/callback_job.h>
+typedef struct event_t event_t;
+
+/**
+ * Event containing a job and a schedule time
+ */
+struct event_t {
+ /**
+ * Time to fire the event.
+ */
+ timeval_t time;
+
+ /**
+ * Every event has its assigned job.
+ */
+ job_t *job;
+};
+
+/**
+ * destroy an event and its job
+ */
+static void event_destroy(event_t *event)
+{
+ event->job->destroy(event->job);
+ free(event);
+}
typedef struct private_scheduler_t private_scheduler_t;
@@ -42,36 +69,164 @@ struct private_scheduler_t {
scheduler_t public;
/**
- * Assigned thread.
+ * Job wich schedules
*/
- pthread_t assigned_thread;
+ callback_job_t *job;
+
+ /**
+ * The jobs are scheduled in a list.
+ */
+ linked_list_t *list;
+
+ /**
+ * Exclusive access to list
+ */
+ pthread_mutex_t mutex;
+
+ /**
+ * Condvar to wait for next job.
+ */
+ pthread_cond_t condvar;
};
/**
- * Implementation of private_scheduler_t.get_events.
+ * Returns the difference of two timeval structs in milliseconds
+ */
+static long time_difference(timeval_t *end, timeval_t *start)
+{
+ time_t s;
+ suseconds_t us;
+
+ s = end->tv_sec - start->tv_sec;
+ us = end->tv_usec - start->tv_usec;
+ return (s * 1000 + us/1000);
+}
+
+/**
+ * Get events from the queue and pass it to the processor
*/
-static void get_events(private_scheduler_t * this)
+static job_requeue_t schedule(private_scheduler_t * this)
{
- job_t *current_job;
+ timespec_t timeout;
+ timeval_t now;
+ event_t *event;
+ long difference;
+ int oldstate;
+ bool timed = FALSE;
- /* cancellation disabled by default */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ DBG2(DBG_JOB, "waiting for next event...");
+ pthread_mutex_lock(&this->mutex);
- DBG1(DBG_JOB, "scheduler thread running, thread_ID: %06u",
- (int)pthread_self());
+ gettimeofday(&now, NULL);
+
+ if (this->list->get_count(this->list) > 0)
+ {
+ this->list->get_first(this->list, (void **)&event);
+ difference = time_difference(&now, &event->time);
+ if (difference > 0)
+ {
+ DBG2(DBG_JOB, "got event, queueing job for execution");
+ this->list->remove_first(this->list, (void **)&event);
+ pthread_mutex_unlock(&this->mutex);
+ charon->processor->queue_job(charon->processor, event->job);
+ free(event);
+ return JOB_REQUEUE_DIRECT;
+ }
+ timeout.tv_sec = event->time.tv_sec;
+ timeout.tv_nsec = event->time.tv_usec * 1000;
+ timed = TRUE;
+ }
+ pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+
+ if (timed)
+ {
+ pthread_cond_timedwait(&this->condvar, &this->mutex, &timeout);
+ }
+ else
+ {
+ pthread_cond_wait(&this->condvar, &this->mutex);
+ }
+ pthread_setcancelstate(oldstate, NULL);
+ pthread_cleanup_pop(0);
+
+ pthread_mutex_unlock(&this->mutex);
+ return JOB_REQUEUE_DIRECT;
+}
- charon->drop_capabilities(charon, TRUE);
+/**
+ * Implements scheduler_t.get_job_load
+ */
+static u_int get_job_load(private_scheduler_t *this)
+{
+ int count;
+ pthread_mutex_lock(&this->mutex);
+ count = this->list->get_count(this->list);
+ pthread_mutex_unlock(&this->mutex);
+ return count;
+}
- while (TRUE)
+/**
+ * Implements scheduler_t.schedule_job.
+ */
+static void schedule_job(private_scheduler_t *this, job_t *job, u_int32_t time)
+{
+ timeval_t now;
+ event_t *event, *current;
+ iterator_t *iterator;
+ time_t s;
+ suseconds_t us;
+
+ event = malloc_thing(event_t);
+ event->job = job;
+
+ /* calculate absolute time */
+ s = time / 1000;
+ us = (time - s * 1000) * 1000;
+ gettimeofday(&now, NULL);
+ event->time.tv_usec = (now.tv_usec + us) % 1000000;
+ event->time.tv_sec = now.tv_sec + (now.tv_usec + us)/1000000 + s;
+
+ pthread_mutex_lock(&this->mutex);
+ while(TRUE)
{
- DBG2(DBG_JOB, "waiting for next event...");
- /* get a job, this block until one is available */
- current_job = charon->event_queue->get(charon->event_queue);
- /* queue the job in the job queue, workers will eat them */
- DBG2(DBG_JOB, "got event, adding job %N to job-queue",
- job_type_names, current_job->get_type(current_job));
- charon->job_queue->add(charon->job_queue, current_job);
+ if (this->list->get_count(this->list) == 0)
+ {
+ this->list->insert_first(this->list,event);
+ break;
+ }
+
+ this->list->get_last(this->list, (void**)&current);
+ if (time_difference(&event->time, &current->time) >= 0)
+ { /* new event has to be fired after the last event in list */
+ this->list->insert_last(this->list, event);
+ break;
+ }
+
+ this->list->get_first(this->list, (void**)&current);
+ if (time_difference(&event->time, &current->time) < 0)
+ { /* new event has to be fired before the first event in list */
+ this->list->insert_first(this->list, event);
+ break;
+ }
+
+ iterator = this->list->create_iterator(this->list, TRUE);
+ /* first element has not to be checked (already done) */
+ iterator->iterate(iterator, (void**)&current);
+ while(iterator->iterate(iterator, (void**)&current))
+ {
+ if (time_difference(&event->time, &current->time) <= 0)
+ {
+ /* new event has to be fired before the current event in list */
+ iterator->insert_before(iterator, event);
+ break;
+ }
+ }
+ iterator->destroy(iterator);
+ break;
}
+ pthread_cond_signal(&this->condvar);
+ pthread_mutex_unlock(&this->mutex);
}
/**
@@ -79,8 +234,8 @@ static void get_events(private_scheduler_t * this)
*/
static void destroy(private_scheduler_t *this)
{
- pthread_cancel(this->assigned_thread);
- pthread_join(this->assigned_thread, NULL);
+ this->job->cancel(this->job);
+ this->list->destroy_function(this->list, (void*)event_destroy);
free(this);
}
@@ -91,14 +246,17 @@ scheduler_t * scheduler_create()
{
private_scheduler_t *this = malloc_thing(private_scheduler_t);
+ this->public.get_job_load = (u_int (*) (scheduler_t *this)) get_job_load;
+ this->public.schedule_job = (void (*) (scheduler_t *this, job_t *job, u_int32_t ms)) schedule_job;
this->public.destroy = (void(*)(scheduler_t*)) destroy;
- if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))get_events, this) != 0)
- {
- /* thread could not be created */
- free(this);
- charon->kill(charon, "unable to create scheduler thread");
- }
+ this->list = linked_list_create();
+ pthread_mutex_init(&this->mutex, NULL);
+ pthread_cond_init(&this->condvar, NULL);
+
+ this->job = callback_job_create((callback_job_cb_t)schedule, this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
- return &(this->public);
+ return &this->public;
}
+
diff --git a/src/charon/processing/scheduler.h b/src/charon/processing/scheduler.h
index bea93e7c9..7bde6e638 100644
--- a/src/charon/processing/scheduler.h
+++ b/src/charon/processing/scheduler.h
@@ -6,7 +6,7 @@
*/
/*
- * Copyright (C) 2005-2006 Martin Willi
+ * Copyright (C) 2005-2007 Martin Willi
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
*
@@ -27,14 +27,12 @@
typedef struct scheduler_t scheduler_t;
#include <library.h>
+#include <processing/jobs/job.h>
/**
- * @brief The scheduler thread is responsible for timed events.
+ * @brief The scheduler queues and executes timed events.
*
- * The scheduler thread takes out jobs from the event-queue and adds them
- * to the job-queue.
- *
- * Starts a thread which does the work, since event-queue is blocking.
+ * The scheduler stores timed events and passes them to the processor.
*
* @b Constructors:
* - scheduler_create()
@@ -44,25 +42,40 @@ typedef struct scheduler_t scheduler_t;
struct scheduler_t {
/**
+ * @brief Adds a event to the queue, using a relative time offset.
+ *
+ * Schedules a job for execution using a relative time offset.
+ *
+ * @param this calling object
+ * @param job job to schedule
+ * @param time relative to to schedule job (in ms)
+ */
+ void (*schedule_job) (scheduler_t *this, job_t *job, u_int32_t time);
+
+ /**
+ * @brief Returns number of jobs scheduled.
+ *
+ * @param this calling object
+ * @return number of scheduled jobs
+ */
+ u_int (*get_job_load) (scheduler_t *this);
+
+ /**
* @brief Destroys a scheduler object.
*
- * @param scheduler calling object
+ * @param this calling object
*/
- void (*destroy) (scheduler_t *scheduler);
+ void (*destroy) (scheduler_t *this);
};
/**
- * @brief Create a scheduler with its associated thread.
- *
- * The thread will start to get jobs form the event queue
- * and adds them to the job queue.
+ * @brief Create a scheduler.
*
- * @return
- * - scheduler_t object
- * - NULL if thread could not be started
+ * @return scheduler_t object
*
* @ingroup processing
*/
-scheduler_t * scheduler_create(void);
+scheduler_t *scheduler_create(void);
#endif /*SCHEDULER_H_*/
+
diff --git a/src/charon/processing/thread_pool.c b/src/charon/processing/thread_pool.c
deleted file mode 100644
index a9891da15..000000000
--- a/src/charon/processing/thread_pool.c
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * @file thread_pool.c
- *
- * @brief Implementation of thread_pool_t.
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * for more details.
- */
-
-#include <stdlib.h>
-#include <pthread.h>
-#include <string.h>
-#include <errno.h>
-
-#include "thread_pool.h"
-
-#include <daemon.h>
-#include <processing/job_queue.h>
-
-
-typedef struct private_thread_pool_t private_thread_pool_t;
-
-/**
- * @brief Private data of thread_pool_t class.
- */
-struct private_thread_pool_t {
- /**
- * Public thread_pool_t interface.
- */
- thread_pool_t public;
-
- /**
- * Number of running threads.
- */
- u_int pool_size;
-
- /**
- * Number of threads waiting for work
- */
- u_int idle_threads;
-
- /**
- * Array of thread ids.
- */
- pthread_t *threads;
-};
-
-/**
- * Implementation of private_thread_pool_t.process_jobs.
- */
-static void process_jobs(private_thread_pool_t *this)
-{
- job_t *job;
- status_t status;
-
- /* cancellation disabled by default */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-
- DBG1(DBG_JOB, "worker thread running, thread_ID: %06u",
- (int)pthread_self());
-
- charon->drop_capabilities(charon, TRUE);
-
- while (TRUE)
- {
- /* TODO: should be atomic, but is not mission critical */
- this->idle_threads++;
- job = charon->job_queue->get(charon->job_queue);
- this->idle_threads--;
-
- status = job->execute(job);
-
- if (status == DESTROY_ME)
- {
- job->destroy(job);
- }
- }
-}
-
-/**
- * Implementation of thread_pool_t.get_pool_size.
- */
-static u_int get_pool_size(private_thread_pool_t *this)
-{
- return this->pool_size;
-}
-
-/**
- * Implementation of thread_pool_t.get_idle_threads.
- */
-static u_int get_idle_threads(private_thread_pool_t *this)
-{
- return this->idle_threads;
-}
-
-/**
- * Implementation of thread_pool_t.destroy.
- */
-static void destroy(private_thread_pool_t *this)
-{
- int current;
- /* flag thread for termination */
- for (current = 0; current < this->pool_size; current++)
- {
- DBG1(DBG_JOB, "cancelling worker thread #%d", current+1);
- pthread_cancel(this->threads[current]);
- }
-
- /* wait for all threads */
- for (current = 0; current < this->pool_size; current++) {
- if (pthread_join(this->threads[current], NULL) == 0)
- {
- DBG1(DBG_JOB, "worker thread #%d terminated", current+1);
- }
- else
- {
- DBG1(DBG_JOB, "could not terminate worker thread #%d", current+1);
- }
- }
-
- /* free mem */
- free(this->threads);
- free(this);
-}
-
-/*
- * Described in header.
- */
-thread_pool_t *thread_pool_create(size_t pool_size)
-{
- int current;
- private_thread_pool_t *this = malloc_thing(private_thread_pool_t);
-
- /* fill in public fields */
- this->public.destroy = (void(*)(thread_pool_t*))destroy;
- this->public.get_pool_size = (u_int(*)(thread_pool_t*))get_pool_size;
- this->public.get_idle_threads = (u_int(*)(thread_pool_t*))get_idle_threads;
-
- /* initialize member */
- this->pool_size = pool_size;
- this->idle_threads = 0;
- this->threads = malloc(sizeof(pthread_t) * pool_size);
-
- /* try to create as many threads as possible, up to pool_size */
- for (current = 0; current < pool_size; current++)
- {
- if (pthread_create(&(this->threads[current]), NULL,
- (void*(*)(void*))process_jobs, this) == 0)
- {
- DBG1(DBG_JOB, "created worker thread #%d", current+1);
- }
- else
- {
- /* creation failed, is it the first one? */
- if (current == 0)
- {
- free(this->threads);
- free(this);
- charon->kill(charon, "could not create any worker threads");
- }
- /* not all threads could be created, but at least one :-/ */
- DBG1(DBG_JOB, "could only create %d from requested %d threads!",
- current, pool_size);
- this->pool_size = current;
- break;
- }
- }
- return (thread_pool_t*)this;
-}
diff --git a/src/charon/processing/thread_pool.h b/src/charon/processing/thread_pool.h
deleted file mode 100644
index 09a6312a8..000000000
--- a/src/charon/processing/thread_pool.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * @file thread_pool.h
- *
- * @brief Interface of thread_pool_t.
- *
- */
-
-/*
- * Copyright (C) 2005-2006 Martin Willi
- * Copyright (C) 2005 Jan Hutter
- * Hochschule fuer Technik Rapperswil
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * for more details.
- */
-
-#ifndef THREAD_POOL_H_
-#define THREAD_POOL_H_
-
-typedef struct thread_pool_t thread_pool_t;
-
-#include <stdlib.h>
-
-#include <library.h>
-
-/**
- * @brief A thread_pool consists of a pool of threads processing jobs from the job queue.
- *
- * Current implementation uses as many threads as specified in constructor.
- * A more improved version would dynamically increase thread count if necessary.
- *
- * @b Constructors:
- * - thread_pool_create()
- *
- * @todo Add support for dynamic thread handling
- *
- * @ingroup processing
- */
-struct thread_pool_t {
-
- /**
- * @brief Return currently instanciated thread count.
- *
- * @param thread_pool calling object
- * @return size of thread pool
- */
- u_int (*get_pool_size) (thread_pool_t *thread_pool);
-
- /**
- * @brief Get the number of threads currently waiting for work.
- *
- * @param thread_pool calling object
- * @return number of idle threads
- */
- u_int (*get_idle_threads) (thread_pool_t *thread_pool);
-
- /**
- * @brief Destroy a thread_pool_t object.
- *
- * Sends cancellation request to all threads and AWAITS their termination.
- *
- * @param thread_pool calling object
- */
- void (*destroy) (thread_pool_t *thread_pool);
-};
-
-/**
- * @brief Create the thread pool using using pool_size of threads.
- *
- * @param pool_size desired pool size
- * @return
- * - thread_pool_t object if one ore more threads could be started, or
- * - NULL if no threads could be created
- *
- * @ingroup processing
- */
-thread_pool_t *thread_pool_create(size_t pool_size);
-
-
-#endif /*THREAD_POOL_H_*/
diff --git a/src/charon/sa/ike_sa.c b/src/charon/sa/ike_sa.c
index 8b4b53e10..5fc6a9625 100644
--- a/src/charon/sa/ike_sa.c
+++ b/src/charon/sa/ike_sa.c
@@ -442,8 +442,8 @@ static status_t send_dpd(private_ike_sa_t *this)
}
/* recheck in "interval" seconds */
job = send_dpd_job_create(this->ike_sa_id);
- charon->event_queue->add_relative(charon->event_queue, (job_t*)job,
- (delay - diff) * 1000);
+ charon->scheduler->schedule_job(charon->scheduler, (job_t*)job,
+ (delay - diff) * 1000);
return SUCCESS;
}
@@ -477,8 +477,8 @@ static void send_keepalive(private_ike_sa_t *this)
diff = 0;
}
job = send_keepalive_job_create(this->ike_sa_id);
- charon->event_queue->add_relative(charon->event_queue, (job_t*)job,
- (KEEPALIVE_INTERVAL - diff) * 1000);
+ charon->scheduler->schedule_job(charon->scheduler, (job_t*)job,
+ (KEEPALIVE_INTERVAL - diff) * 1000);
}
/**
@@ -524,16 +524,16 @@ static void set_state(private_ike_sa_t *this, ike_sa_state_t state)
{
this->time.rekey = now + soft;
job = (job_t*)rekey_ike_sa_job_create(this->ike_sa_id, reauth);
- charon->event_queue->add_relative(charon->event_queue, job,
- soft * 1000);
+ charon->scheduler->schedule_job(charon->scheduler, job,
+ soft * 1000);
}
if (hard)
{
this->time.delete = now + hard;
job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, TRUE);
- charon->event_queue->add_relative(charon->event_queue, job,
- hard * 1000);
+ charon->scheduler->schedule_job(charon->scheduler, job,
+ hard * 1000);
}
}
break;
@@ -542,8 +542,8 @@ static void set_state(private_ike_sa_t *this, ike_sa_state_t state)
{
/* delete may fail if a packet gets lost, so set a timeout */
job_t *job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, TRUE);
- charon->event_queue->add_relative(charon->event_queue, job,
- HALF_OPEN_IKE_SA_TIMEOUT);
+ charon->scheduler->schedule_job(charon->scheduler, job,
+ HALF_OPEN_IKE_SA_TIMEOUT);
break;
}
default:
@@ -761,8 +761,8 @@ static status_t process_message(private_ike_sa_t *this, message_t *message)
}
/* add a timeout if peer does not establish it completely */
job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, FALSE);
- charon->event_queue->add_relative(charon->event_queue, job,
- HALF_OPEN_IKE_SA_TIMEOUT);
+ charon->scheduler->schedule_job(charon->scheduler, job,
+ HALF_OPEN_IKE_SA_TIMEOUT);
}
/* check if message is trustworthy, and update host information */
@@ -1625,7 +1625,7 @@ static void reestablish(private_ike_sa_t *this)
charon->ike_sa_manager->checkin(charon->ike_sa_manager, &other->public);
job = (job_t*)delete_ike_sa_job_create(this->ike_sa_id, TRUE);
- charon->job_queue->add(charon->job_queue, job);
+ charon->processor->queue_job(charon->processor, job);
}
/**
diff --git a/src/charon/sa/task_manager.c b/src/charon/sa/task_manager.c
index e67508ed1..3f13dcef5 100644
--- a/src/charon/sa/task_manager.c
+++ b/src/charon/sa/task_manager.c
@@ -235,7 +235,7 @@ static status_t retransmit(private_task_manager_t *this, u_int32_t message_id)
this->initiating.packet->clone(this->initiating.packet));
job = (job_t*)retransmit_job_create(this->initiating.mid,
this->ike_sa->get_id(this->ike_sa));
- charon->event_queue->add_relative(charon->event_queue, job, timeout);
+ charon->scheduler->schedule_job(charon->scheduler, job, timeout);
}
return SUCCESS;
}
diff --git a/src/charon/sa/tasks/child_rekey.c b/src/charon/sa/tasks/child_rekey.c
index 4f3c69034..3667d8fad 100644
--- a/src/charon/sa/tasks/child_rekey.c
+++ b/src/charon/sa/tasks/child_rekey.c
@@ -206,7 +206,7 @@ static status_t process_i(private_child_rekey_t *this, message_t *message)
DBG1(DBG_IKE, "CHILD_SA rekeying failed, "
"trying again in %d seconds", retry);
this->child_sa->set_state(this->child_sa, CHILD_INSTALLED);
- charon->event_queue->add_relative(charon->event_queue, job, retry * 1000);
+ charon->scheduler->schedule_job(charon->scheduler, job, retry * 1000);
}
return SUCCESS;
}
diff --git a/src/charon/sa/tasks/ike_rekey.c b/src/charon/sa/tasks/ike_rekey.c
index d54fc3524..60cb1e63c 100644
--- a/src/charon/sa/tasks/ike_rekey.c
+++ b/src/charon/sa/tasks/ike_rekey.c
@@ -180,7 +180,7 @@ static status_t process_i(private_ike_rekey_t *this, message_t *message)
DBG1(DBG_IKE, "IKE_SA rekeying failed, "
"trying again in %d seconds", retry);
this->ike_sa->set_state(this->ike_sa, IKE_ESTABLISHED);
- charon->event_queue->add_relative(charon->event_queue, job, retry * 1000);
+ charon->scheduler->schedule_job(charon->scheduler, job, retry * 1000);
}
return SUCCESS;
case NEED_MORE:
@@ -231,7 +231,7 @@ static status_t process_i(private_ike_rekey_t *this, message_t *message)
}
job = (job_t*)delete_ike_sa_job_create(to_delete, TRUE);
- charon->job_queue->add(charon->job_queue, job);
+ charon->processor->queue_job(charon->processor, job);
return SUCCESS;
}