aboutsummaryrefslogtreecommitdiffstats
path: root/src/charon/threads/thread_pool.c
diff options
context:
space:
mode:
authorMartin Willi <martin@strongswan.org>2006-05-10 08:02:49 +0000
committerMartin Willi <martin@strongswan.org>2006-05-10 08:02:49 +0000
commitb8577029d1d37b798907f0418ddb1445e13c3c44 (patch)
tree6bc5eaa1db913fdf6b6541a177607a7aa79f4dfc /src/charon/threads/thread_pool.c
parent95806de938a287ca71b28fa07016c9785130c1da (diff)
downloadstrongswan-b8577029d1d37b798907f0418ddb1445e13c3c44.tar.bz2
strongswan-b8577029d1d37b798907f0418ddb1445e13c3c44.tar.xz
Diffstat (limited to 'src/charon/threads/thread_pool.c')
-rw-r--r--src/charon/threads/thread_pool.c623
1 files changed, 623 insertions, 0 deletions
diff --git a/src/charon/threads/thread_pool.c b/src/charon/threads/thread_pool.c
new file mode 100644
index 000000000..4482e795f
--- /dev/null
+++ b/src/charon/threads/thread_pool.c
@@ -0,0 +1,623 @@
+/**
+ * @file thread_pool.c
+ *
+ * @brief Implementation of thread_pool_t.
+ *
+ */
+
+/*
+ * Copyright (C) 2005 Jan Hutter, Martin Willi
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include <stdlib.h>
+#include <pthread.h>
+#include <string.h>
+#include <errno.h>
+
+#include "thread_pool.h"
+
+#include <daemon.h>
+#include <queues/job_queue.h>
+#include <queues/jobs/delete_half_open_ike_sa_job.h>
+#include <queues/jobs/delete_established_ike_sa_job.h>
+#include <queues/jobs/incoming_packet_job.h>
+#include <queues/jobs/initiate_ike_sa_job.h>
+#include <queues/jobs/retransmit_request_job.h>
+#include <encoding/payloads/notify_payload.h>
+#include <utils/logger.h>
+
+
+typedef struct private_thread_pool_t private_thread_pool_t;
+
+/**
+ * @brief Private data of thread_pool_t class.
+ */
+struct private_thread_pool_t {
+ /**
+ * Public thread_pool_t interface.
+ */
+ thread_pool_t public;
+
+ /**
+ * @brief Main processing function for worker threads.
+ *
+ * Gets a job from the job queue and calls corresponding
+ * function for processing.
+ *
+ * @param this calling object
+ */
+ void (*process_jobs) (private_thread_pool_t *this);
+
+ /**
+ * @brief Process a INCOMING_PACKET job.
+ *
+ * @param this calling object
+ * @param job incoming_packet_job_t object
+ */
+ void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
+
+ /**
+ * @brief Process a INITIATE_IKE_SA job.
+ *
+ * @param this calling object
+ * @param job initiate_ike_sa_job_t object
+ */
+ void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
+
+ /**
+ * @brief Process a DELETE_HALF_OPEN_IKE_SA job.
+ *
+ * @param this calling object
+ * @param job delete__half_open_ike_sa_job_t object
+ */
+ void (*process_delete_half_open_ike_sa_job) (private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job);
+
+ /**
+ * @brief Process a DELETE_ESTABLISHED_IKE_SA job.
+ *
+ * @param this calling object
+ * @param job delete_established_ike_sa_job_t object
+ */
+ void (*process_delete_established_ike_sa_job) (private_thread_pool_t *this, delete_established_ike_sa_job_t *job);
+
+ /**
+ * @brief Process a RETRANSMIT_REQUEST job.
+ *
+ * @param this calling object
+ * @param job retransmit_request_job_t object
+ */
+ void (*process_retransmit_request_job) (private_thread_pool_t *this, retransmit_request_job_t *job);
+
+ /**
+ * Creates a job of type DELETE_HALF_OPEN_IKE_SA.
+ *
+ * This job is used to delete IKE_SA's which are still in state INITIATOR_INIT,
+ * RESPONDER_INIT, IKE_AUTH_REQUESTED, IKE_INIT_REQUESTED or IKE_INIT_RESPONDED.
+ *
+ * @param ike_sa_id ID of IKE_SA to delete
+ * @param delay Delay in ms after a half open IKE_SA gets deleted!
+ */
+ void (*create_delete_half_open_ike_sa_job) (private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay);
+
+ /**
+ * Number of running threads.
+ */
+ size_t pool_size;
+
+ /**
+ * Array of thread ids.
+ */
+ pthread_t *threads;
+
+ /**
+ * Logger of the thread pool.
+ */
+ logger_t *pool_logger;
+
+ /**
+ * Logger of the worker threads.
+ */
+ logger_t *worker_logger;
+} ;
+
+/**
+ * Implementation of private_thread_pool_t.process_jobs.
+ */
+static void process_jobs(private_thread_pool_t *this)
+{
+ job_t *job;
+ job_type_t job_type;
+ timeval_t start_time;
+ timeval_t end_time;
+
+ /* cancellation disabled by default */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+
+ this->worker_logger->log(this->worker_logger, CONTROL, "Worker thread running, thread_id: %u", (int)pthread_self());
+
+ for (;;) {
+
+ job = charon->job_queue->get(charon->job_queue);
+ job_type = job->get_type(job);
+ this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Process job of type %s",
+ mapping_find(job_type_m,job_type));
+ gettimeofday(&start_time,NULL);
+ switch (job_type)
+ {
+ case INCOMING_PACKET:
+ {
+ this->process_incoming_packet_job(this, (incoming_packet_job_t*)job);
+ job->destroy(job);
+ break;
+ }
+ case INITIATE_IKE_SA:
+ {
+ this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job);
+ job->destroy(job);
+ break;
+ }
+ case DELETE_HALF_OPEN_IKE_SA:
+ {
+ this->process_delete_half_open_ike_sa_job(this, (delete_half_open_ike_sa_job_t*)job);
+ job->destroy(job);
+ break;
+ }
+ case DELETE_ESTABLISHED_IKE_SA:
+ {
+ this->process_delete_established_ike_sa_job(this, (delete_established_ike_sa_job_t*)job);
+ job->destroy(job);
+ break;
+ }
+ case RETRANSMIT_REQUEST:
+ {
+ this->process_retransmit_request_job(this, (retransmit_request_job_t*)job);
+ break;
+ }
+ default:
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "Job of type %s not supported!",
+ mapping_find(job_type_m,job_type));
+ job->destroy(job);
+ break;
+ }
+ }
+ gettimeofday(&end_time,NULL);
+
+ this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Processed job of type %s in %d us",
+ mapping_find(job_type_m,job_type),
+ (((end_time.tv_sec - start_time.tv_sec) * 1000000) + (end_time.tv_usec - start_time.tv_usec)));
+
+
+ }
+}
+
+/**
+ * Implementation of private_thread_pool_t.process_incoming_packet_job.
+ */
+static void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job)
+{
+ packet_t *packet;
+ message_t *message;
+ ike_sa_t *ike_sa;
+ ike_sa_id_t *ike_sa_id;
+ status_t status;
+
+
+ packet = job->get_packet(job);
+
+ message = message_create_from_packet(packet);
+
+ status = message->parse_header(message);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "Message header could not be verified!");
+ message->destroy(message);
+ return;
+ }
+
+ this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Message is a %s %s",
+ mapping_find(exchange_type_m, message->get_exchange_type(message)),
+ message->get_request(message) ? "request" : "reply");
+
+ if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
+ (message->get_minor_version(message) != IKE_MINOR_VERSION))
+ {
+ this->worker_logger->log(this->worker_logger, ERROR | LEVEL2, "IKE version %d.%d not supported",
+ message->get_major_version(message),
+ message->get_minor_version(message));
+ /*
+ * This check is not handled in state_t object of IKE_SA to increase speed.
+ */
+ if ((message->get_exchange_type(message) == IKE_SA_INIT) && (message->get_request(message)))
+ {
+ message_t *response;
+ message->get_ike_sa_id(message, &ike_sa_id);
+ ike_sa_id->switch_initiator(ike_sa_id);
+ response = message_create_notify_reply(message->get_destination(message),
+ message->get_source(message),
+ IKE_SA_INIT,
+ FALSE,ike_sa_id,INVALID_MAJOR_VERSION);
+ message->destroy(message);
+ ike_sa_id->destroy(ike_sa_id);
+ status = response->generate(response, NULL, NULL, &packet);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "Could not generate packet from message");
+ response->destroy(response);
+ return;
+ }
+ this->worker_logger->log(this->worker_logger, ERROR, "Send notify reply of type INVALID_MAJOR_VERSION");
+ charon->send_queue->add(charon->send_queue, packet);
+ response->destroy(response);
+ return;
+ }
+ message->destroy(message);
+ return;
+ }
+
+ message->get_ike_sa_id(message, &ike_sa_id);
+
+ ike_sa_id->switch_initiator(ike_sa_id);
+
+ this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Checking out IKE SA %lld:%lld, role %s",
+ ike_sa_id->get_initiator_spi(ike_sa_id),
+ ike_sa_id->get_responder_spi(ike_sa_id),
+ ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
+
+ status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
+ if ((status != SUCCESS) && (status != CREATED))
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
+ ike_sa_id->destroy(ike_sa_id);
+ message->destroy(message);
+
+ /*
+ * TODO send notify reply of type INVALID_IKE_SPI if SPI could not be found ?
+ */
+
+ return;
+ }
+
+ if (status == CREATED)
+ {
+ this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3,
+ "Create Job to delete half open IKE_SA.");
+ this->create_delete_half_open_ike_sa_job(this,ike_sa_id,
+ charon->configuration->get_half_open_ike_sa_timeout(charon->configuration));
+ }
+
+ status = ike_sa->process_message(ike_sa, message);
+
+ this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "%s IKE SA %lld:%lld, role %s",
+ (status == DELETE_ME) ? "Checkin and delete" : "Checkin",
+ ike_sa_id->get_initiator_spi(ike_sa_id),
+ ike_sa_id->get_responder_spi(ike_sa_id),
+ ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
+ ike_sa_id->destroy(ike_sa_id);
+
+ if (status == DELETE_ME)
+ {
+ status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
+ }
+ else
+ {
+ status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ }
+
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
+ }
+ message->destroy(message);
+}
+
+/**
+ * Implementation of private_thread_pool_t.process_initiate_ike_sa_job.
+ */
+static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job)
+{
+ /*
+ * Initiatie an IKE_SA:
+ * - is defined by a name of a configuration
+ * - create an empty IKE_SA via manager
+ * - call initiate_connection on this sa
+ */
+ ike_sa_t *ike_sa;
+ status_t status;
+
+
+ this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Creating and checking out IKE SA");
+ charon->ike_sa_manager->create_and_checkout(charon->ike_sa_manager, &ike_sa);
+
+ status = ike_sa->initiate_connection(ike_sa, job->get_connection(job));
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "Initiation returned %s, going to delete IKE_SA.",
+ mapping_find(status_m, status));
+ charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
+ return;
+ }
+
+ this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Create Job to delete half open IKE_SA.");
+ this->create_delete_half_open_ike_sa_job(this,ike_sa->get_id(ike_sa),
+ charon->configuration->get_half_open_ike_sa_timeout(charon->configuration));
+
+ this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking in IKE SA");
+ status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin IKE_SA (%s)",
+ mapping_find(status_m, status));
+ }
+}
+
+/**
+ * Implementation of private_thread_pool_t.process_delete_ike_sa_job.
+ */
+static void process_delete_half_open_ike_sa_job(private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job)
+{
+ ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
+ ike_sa_t *ike_sa;
+ status_t status;
+ status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
+ if ((status != SUCCESS) && (status != CREATED))
+ {
+ this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "IKE SA seems to be allready deleted and so doesn't have to be deleted");
+ return;
+ }
+
+
+ switch (ike_sa->get_state(ike_sa))
+ {
+ case INITIATOR_INIT:
+ case RESPONDER_INIT:
+ case IKE_SA_INIT_REQUESTED:
+ case IKE_SA_INIT_RESPONDED:
+ case IKE_AUTH_REQUESTED:
+ {
+ /* IKE_SA is half open and gets deleted! */
+ status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin and delete checked out IKE_SA!");
+ }
+ break;
+ }
+ default:
+ {
+ /* IKE_SA is established and so is not getting deleted! */
+ status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin a checked out IKE_SA!");
+ }
+ break;
+ }
+ }
+}
+
+/**
+ * Implementation of private_thread_pool_t.process_delete_established_ike_sa_job.
+ */
+static void process_delete_established_ike_sa_job(private_thread_pool_t *this, delete_established_ike_sa_job_t *job)
+{
+ ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
+ ike_sa_t *ike_sa;
+ status_t status;
+ status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
+ if ((status != SUCCESS) && (status != CREATED))
+ {
+ this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "IKE SA seems to be allready deleted and so doesn't have to be deleted");
+ return;
+ }
+
+ switch (ike_sa->get_state(ike_sa))
+ {
+ case INITIATOR_INIT:
+ case RESPONDER_INIT:
+ case IKE_SA_INIT_REQUESTED:
+ case IKE_SA_INIT_RESPONDED:
+ case IKE_AUTH_REQUESTED:
+ {
+ break;
+ }
+ default:
+ {
+ this->worker_logger->log(this->worker_logger, CONTROL, "Send delete request for IKE_SA.");
+ ike_sa->send_delete_ike_sa_request(ike_sa);
+ break;
+ }
+ }
+ this->worker_logger->log(this->worker_logger, CONTROL, "Delete established IKE_SA.");
+ status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin and delete checked out IKE_SA!");
+ }
+}
+
+
+/**
+ * Implementation of private_thread_pool_t.process_retransmit_request_job.
+ */
+static void process_retransmit_request_job(private_thread_pool_t *this, retransmit_request_job_t *job)
+{
+
+ ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
+ u_int32_t message_id = job->get_message_id(job);
+ bool stop_retransmitting = FALSE;
+ u_int32_t timeout;
+ ike_sa_t *ike_sa;
+ status_t status;
+
+ this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking out IKE SA %lld:%lld, role %s",
+ ike_sa_id->get_initiator_spi(ike_sa_id),
+ ike_sa_id->get_responder_spi(ike_sa_id),
+ ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
+
+ status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
+ if ((status != SUCCESS) && (status != CREATED))
+ {
+ job->destroy(job);
+ this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out. Allready deleted?");
+ return;
+ }
+
+ status = ike_sa->retransmit_request(ike_sa, message_id);
+
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "Message doesn't have to be retransmitted");
+ stop_retransmitting = TRUE;
+ }
+
+ this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checkin IKE SA %lld:%lld, role %s",
+ ike_sa_id->get_initiator_spi(ike_sa_id),
+ ike_sa_id->get_responder_spi(ike_sa_id),
+ ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
+
+ status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
+ }
+
+ if (stop_retransmitting)
+ {
+ job->destroy(job);
+ return;
+ }
+
+ job->increase_retransmit_count(job);
+ status = charon->configuration->get_retransmit_timeout (charon->configuration,job->get_retransmit_count(job),&timeout);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Message will not be anymore retransmitted");
+ job->destroy(job);
+ /*
+ * TODO delete IKE_SA ?
+ */
+ return;
+ }
+ charon->event_queue->add_relative(charon->event_queue,(job_t *) job,timeout);
+}
+
+
+
+/**
+ * Implementation of private_thread_pool_t.create_delete_half_open_ike_sa_job.
+ */
+static void create_delete_half_open_ike_sa_job(private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay)
+{
+ job_t *delete_job;
+
+ this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Going to create job to delete half open IKE_SA in %d ms", delay);
+
+ delete_job = (job_t *) delete_half_open_ike_sa_job_create(ike_sa_id);
+ charon->event_queue->add_relative(charon->event_queue,delete_job, delay);
+}
+
+
+/**
+ * Implementation of thread_pool_t.get_pool_size.
+ */
+static size_t get_pool_size(private_thread_pool_t *this)
+{
+ return this->pool_size;
+}
+
+/**
+ * Implementation of thread_pool_t.destroy.
+ */
+static void destroy(private_thread_pool_t *this)
+{
+ int current;
+ /* flag thread for termination */
+ for (current = 0; current < this->pool_size; current++) {
+ this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker thread #%d", current+1);
+ pthread_cancel(this->threads[current]);
+ }
+
+ /* wait for all threads */
+ for (current = 0; current < this->pool_size; current++) {
+ if (pthread_join(this->threads[current], NULL) == 0)
+ {
+ this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
+ }
+ else
+ {
+ this->pool_logger->log(this->pool_logger, ERROR, "could not terminate worker thread #%d", current+1);
+ }
+ }
+
+ /* free mem */
+ free(this->threads);
+ free(this);
+}
+
+/*
+ * Described in header.
+ */
+thread_pool_t *thread_pool_create(size_t pool_size)
+{
+ int current;
+
+ private_thread_pool_t *this = malloc_thing(private_thread_pool_t);
+
+ /* fill in public fields */
+ this->public.destroy = (void(*)(thread_pool_t*))destroy;
+ this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
+
+ this->process_jobs = process_jobs;
+ this->process_initiate_ike_sa_job = process_initiate_ike_sa_job;
+ this->process_delete_half_open_ike_sa_job = process_delete_half_open_ike_sa_job;
+ this->process_delete_established_ike_sa_job = process_delete_established_ike_sa_job;
+ this->process_incoming_packet_job = process_incoming_packet_job;
+ this->process_retransmit_request_job = process_retransmit_request_job;
+ this->create_delete_half_open_ike_sa_job = create_delete_half_open_ike_sa_job;
+
+ this->pool_size = pool_size;
+
+ this->threads = malloc(sizeof(pthread_t) * pool_size);
+
+ this->pool_logger = logger_manager->get_logger(logger_manager, THREAD_POOL);
+
+ this->worker_logger = logger_manager->get_logger(logger_manager, WORKER);
+
+ /* try to create as many threads as possible, up tu pool_size */
+ for (current = 0; current < pool_size; current++)
+ {
+ if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
+ {
+ this->pool_logger->log(this->pool_logger, CONTROL, "Created worker thread #%d", current+1);
+ }
+ else
+ {
+ /* creation failed, is it the first one? */
+ if (current == 0)
+ {
+ this->pool_logger->log(this->pool_logger, ERROR, "Could not create any thread");
+ free(this->threads);
+ free(this);
+ return NULL;
+ }
+ /* not all threads could be created, but at least one :-/ */
+ this->pool_logger->log(this->pool_logger, ERROR, "Could only create %d from requested %d threads!", current, pool_size);
+
+ this->pool_size = current;
+ return (thread_pool_t*)this;
+ }
+ }
+ return (thread_pool_t*)this;
+}