diff options
Diffstat (limited to 'Source/charon/threads')
-rw-r--r-- | Source/charon/threads/receiver.c | 132 | ||||
-rw-r--r-- | Source/charon/threads/receiver.h | 47 | ||||
-rw-r--r-- | Source/charon/threads/scheduler.c | 120 | ||||
-rw-r--r-- | Source/charon/threads/scheduler.h | 50 | ||||
-rw-r--r-- | Source/charon/threads/sender.c | 127 | ||||
-rw-r--r-- | Source/charon/threads/sender.h | 47 | ||||
-rw-r--r-- | Source/charon/threads/thread_pool.c | 378 | ||||
-rw-r--r-- | Source/charon/threads/thread_pool.h | 69 |
8 files changed, 970 insertions, 0 deletions
diff --git a/Source/charon/threads/receiver.c b/Source/charon/threads/receiver.c new file mode 100644 index 000000000..ba7e229b0 --- /dev/null +++ b/Source/charon/threads/receiver.c @@ -0,0 +1,132 @@ +/** + * @file receiver.c + * + * @brief Implements the Receiver Thread encapsulated in the receiver_t object + * + */ + +/* + * Copyright (C) 2005 Jan Hutter, Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include <stdlib.h> +#include <pthread.h> + +#include "receiver.h" + +#include <globals.h> +#include <network/socket.h> +#include <network/packet.h> +#include <queues/job_queue.h> +#include <queues/jobs/job.h> +#include <utils/allocator.h> +#include <utils/logger_manager.h> + +/** + * Private data of a receiver object + */ +typedef struct private_receiver_s private_receiver_t; + +struct private_receiver_s { + /** + * Public part of a receiver object + */ + receiver_t public; + + /** + * Assigned thread to the receiver_t object + */ + pthread_t assigned_thread; + /** + * logger for the receiver + */ + logger_t *logger; + + +}; + +/** + * Thread function started at creation of the receiver object + * + * @param this assigned receiver object + * @return SUCCESS if thread_function ended successfully, FAILED otherwise + */ +static void receiver_thread_function(private_receiver_t * this) +{ + packet_t * current_packet; + job_t *current_job; + /* cancellation disabled by default */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + while (1) + { + while (global_socket->receive(global_socket,¤t_packet) == SUCCESS) + { + this->logger->log(this->logger, CONTROL, "creating job from packet"); + current_job = (job_t *) incoming_packet_job_create(current_packet); + if (current_job == NULL) + { + this->logger->log(this->logger, ERROR, "job creation failed"); + } + + if (global_job_queue->add(global_job_queue,current_job) != SUCCESS) + { + this->logger->log(this->logger, ERROR, "job queueing failed"); + } + + } + /* bad bad, rebuild the socket ? */ + this->logger->log(this->logger, ERROR, "receiving from socket failed!"); + } +} + +/** + * Implementation of receiver_t's destroy function + */ +static status_t destroy(private_receiver_t *this) +{ + this->logger->log(this->logger, CONTROL | MORE, "Going to terminate receiver thread"); + pthread_cancel(this->assigned_thread); + + pthread_join(this->assigned_thread, NULL); + this->logger->log(this->logger, CONTROL | MORE, "Receiver thread terminated"); + + global_logger_manager->destroy_logger(global_logger_manager, this->logger); + + allocator_free(this); + return SUCCESS; +} + + +receiver_t * receiver_create() +{ + private_receiver_t *this = allocator_alloc_thing(private_receiver_t); + + this->public.destroy = (status_t(*)(receiver_t*)) destroy; + + this->logger = global_logger_manager->create_logger(global_logger_manager, RECEIVER, NULL); + if (this->logger == NULL) + { + allocator_free(this); + } + + if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))receiver_thread_function, this) != 0) + { + /* thread could not be created */ + global_logger_manager->destroy_logger(global_logger_manager, this->logger); + allocator_free(this); + return NULL; + } + + return &(this->public); +} diff --git a/Source/charon/threads/receiver.h b/Source/charon/threads/receiver.h new file mode 100644 index 000000000..49f71be40 --- /dev/null +++ b/Source/charon/threads/receiver.h @@ -0,0 +1,47 @@ +/** + * @file receiver.h + * + * @brief Implements the Receiver Thread encapsulated in the receiver_t object + * + */ + +/* + * Copyright (C) 2005 Jan Hutter, Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#ifndef RECEIVER_H_ +#define RECEIVER_H_ + +#include <types.h> + +/** + * @brief A Receiver object which receives packets on the socket and adds them to the job-queue + */ +typedef struct receiver_s receiver_t; + +struct receiver_s { + + /** + * @brief Destroys a receiver object + * + * @param receiver receiver object + * @return SUCCESSFUL if succeeded, FAILED otherwise + */ + status_t (*destroy) (receiver_t *receiver); +}; + + +receiver_t * receiver_create(); + +#endif /*RECEIVER_H_*/ diff --git a/Source/charon/threads/scheduler.c b/Source/charon/threads/scheduler.c new file mode 100644 index 000000000..d7f0694e7 --- /dev/null +++ b/Source/charon/threads/scheduler.c @@ -0,0 +1,120 @@ +/** + * @file scheduler.c + * + * @brief implements the scheduler, looks for jobs in event-queue + * + */ + +/* + * Copyright (C) 2005 Jan Hutter, Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include <stdlib.h> +#include <pthread.h> + +#include "scheduler.h" + +#include <globals.h> +#include <definitions.h> +#include <utils/allocator.h> +#include <utils/logger_manager.h> +#include <queues/job_queue.h> + +/** + * Private data of a scheduler object + */ +typedef struct private_scheduler_s private_scheduler_t; + +struct private_scheduler_s { + /** + * Public part of a scheduler object + */ + scheduler_t public; + + /** + * Assigned thread to the scheduler_t object + */ + pthread_t assigned_thread; + + /** + * logger for this scheduler + */ + logger_t *logger; + +}; + +/** + * Thread function started at creation of the scheduler object + * + * @param this assigned scheduler object + * @return SUCCESS if thread_function ended successfully, FAILED otherwise + */ +static void scheduler_thread_function(private_scheduler_t * this) +{ + /* cancellation disabled by default */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + job_t *current_job; + + for (;;) + { + this->logger->log(this->logger, CONTROL|MORE, "waiting for next event..."); + /* get a job, this block until one is available */ + global_event_queue->get(global_event_queue, ¤t_job); + /* queue the job in the job queue, workers will eat them */ + global_job_queue->add(global_job_queue, current_job); + this->logger->log(this->logger, CONTROL, "got event, added job %s to job-queue.", mapping_find(job_type_m, current_job->get_type(current_job))); + } +} + +/** + * Implementation of scheduler_t's destroy function + */ +static status_t destroy(private_scheduler_t *this) +{ + this->logger->log(this->logger, CONTROL | MORE, "Going to terminate scheduler thread"); + pthread_cancel(this->assigned_thread); + + pthread_join(this->assigned_thread, NULL); + this->logger->log(this->logger, CONTROL | MORE, "Scheduler thread terminated"); + + global_logger_manager->destroy_logger(global_logger_manager, this->logger); + + allocator_free(this); + return SUCCESS; +} + + +scheduler_t * scheduler_create() +{ + private_scheduler_t *this = allocator_alloc_thing(private_scheduler_t); + + this->public.destroy = (status_t(*)(scheduler_t*)) destroy; + + this->logger = global_logger_manager->create_logger(global_logger_manager, SCHEDULER, NULL); + if (this->logger == NULL) + { + allocator_free(this); + return NULL; + } + + if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))scheduler_thread_function, this) != 0) + { + /* thread could not be created */ + global_logger_manager->destroy_logger(global_logger_manager, this->logger); + allocator_free(this); + return NULL; + } + + return &(this->public); +} diff --git a/Source/charon/threads/scheduler.h b/Source/charon/threads/scheduler.h new file mode 100644 index 000000000..8aa8fbbef --- /dev/null +++ b/Source/charon/threads/scheduler.h @@ -0,0 +1,50 @@ +/** + * @file scheduler.h + * + * @brief implements the scheduler, looks for jobs in event-queue + * + */ + +/* + * Copyright (C) 2005 Jan Hutter, Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#ifndef SCHEDULER_H_ +#define SCHEDULER_H_ + +#include <types.h> + +/** + * @brief The scheduler, looks for timed events in event-queue and adds them + * to the job-queue. + * + * Starts a thread which does the work, since event-queue is blocking + */ +typedef struct scheduler_s scheduler_t; + +struct scheduler_s { + + /** + * @brief Destroys a scheduler object + * + * @param scheduler scheduler object + * @return SUCCESSFUL if succeeded, FAILED otherwise + */ + status_t (*destroy) (scheduler_t *scheduler); +}; + + +scheduler_t * scheduler_create(); + +#endif /*SCHEDULER_H_*/ diff --git a/Source/charon/threads/sender.c b/Source/charon/threads/sender.c new file mode 100644 index 000000000..d2376962f --- /dev/null +++ b/Source/charon/threads/sender.c @@ -0,0 +1,127 @@ +/** + * @file sender.c + * + * @brief Implements the Sender Thread encapsulated in the sender_t object + * + */ + +/* + * Copyright (C) 2005 Jan Hutter, Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include <stdlib.h> +#include <pthread.h> + +#include "sender.h" + +#include <globals.h> +#include <network/socket.h> +#include <network/packet.h> +#include <queues/send_queue.h> +#include <utils/allocator.h> +#include <utils/logger_manager.h> + +/** + * Private data of a sender object + */ +typedef struct private_sender_s private_sender_t; + +struct private_sender_s { + /** + * Public part of a sender object + */ + sender_t public; + + /** + * Assigned thread to the sender_t object + */ + pthread_t assigned_thread; + + /** + * logger for this sender + */ + logger_t *logger; + +}; + +/** + * Thread function started at creation of the sender object + * + * @param this assigned sender object + * @return SUCCESS if thread_function ended successfully, FAILED otherwise + */ +static void sender_thread_function(private_sender_t * this) +{ + packet_t * current_packet; + status_t status; + + /* cancellation disabled by default */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + + while (1) + { + while (global_send_queue->get(global_send_queue,¤t_packet) == SUCCESS) + { + this->logger->log(this->logger, CONTROL|MORE, "got a packet, sending it"); + status = global_socket->send(global_socket,current_packet); + if (status != SUCCESS) + { + this->logger->log(this->logger, ERROR, "sending failed, socket returned %s", + mapping_find(status_m, status)); + } + current_packet->destroy(current_packet); + } + } +} + +/** + * Implementation of sender_t's destroy function + */ +static status_t destroy(private_sender_t *this) +{ + this->logger->log(this->logger, CONTROL | MORE, "Going to terminate sender thread"); + pthread_cancel(this->assigned_thread); + + pthread_join(this->assigned_thread, NULL); + this->logger->log(this->logger, CONTROL | MORE, "Sender thread terminated"); + + global_logger_manager->destroy_logger(global_logger_manager, this->logger); + + allocator_free(this); + return SUCCESS; +} + + +sender_t * sender_create() +{ + private_sender_t *this = allocator_alloc_thing(private_sender_t); + + this->public.destroy = (status_t(*)(sender_t*)) destroy; + + this->logger = global_logger_manager->create_logger(global_logger_manager, SENDER, NULL); + if (this->logger == NULL) + { + allocator_free(this); + return NULL; + } + + if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))sender_thread_function, this) != 0) + { + /* thread could not be created */ + allocator_free(this); + return NULL; + } + + return &(this->public); +} diff --git a/Source/charon/threads/sender.h b/Source/charon/threads/sender.h new file mode 100644 index 000000000..386e429ee --- /dev/null +++ b/Source/charon/threads/sender.h @@ -0,0 +1,47 @@ +/** + * @file sender.h + * + * @brief Implements the Sender Thread encapsulated in the sender_t object + * + */ + +/* + * Copyright (C) 2005 Jan Hutter, Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#ifndef SENDER_H_ +#define SENDER_H_ + +#include <types.h> + +/** + * @brief A Sender object which sends packets on the socket + */ +typedef struct sender_s sender_t; + +struct sender_s { + + /** + * @brief Destroys a sender object + * + * @param sender sender object + * @return SUCCESSFUL if succeeded, FAILED otherwise + */ + status_t (*destroy) (sender_t *sender); +}; + + +sender_t * sender_create(); + +#endif /*SENDER_H_*/ diff --git a/Source/charon/threads/thread_pool.c b/Source/charon/threads/thread_pool.c new file mode 100644 index 000000000..037222810 --- /dev/null +++ b/Source/charon/threads/thread_pool.c @@ -0,0 +1,378 @@ +/** + * @file thread_pool.c + * + * @brief Thread pool with some threads processing the job_queue. + * + */ + +/* + * Copyright (C) 2005 Jan Hutter, Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include <stdlib.h> +#include <pthread.h> +#include <string.h> +#include <errno.h> + +#include "thread_pool.h" + +#include <globals.h> +#include <queues/job_queue.h> +#include <queues/jobs/delete_ike_sa_job.h> +#include <queues/jobs/incoming_packet_job.h> +#include <queues/jobs/initiate_ike_sa_job.h> +#include <utils/allocator.h> +#include <utils/logger.h> + +/** + * @brief structure with private members for thread_pool_t + */ +typedef struct private_thread_pool_s private_thread_pool_t; + +struct private_thread_pool_s { + /** + * inclusion of public members + */ + thread_pool_t public; + /** + * @brief Processing function of a worker thread + * + * @param this private_thread_pool_t-Object + */ + void (*function) (private_thread_pool_t *this); + /** + * number of running threads + */ + size_t pool_size; + /** + * array of thread ids + */ + pthread_t *threads; + /** + * logger of the threadpool + */ + logger_t *pool_logger; + /** + * logger of the threadpool + */ + logger_t *worker_logger; +} ; + + + +/** + * implements private_thread_pool_t.function + */ +static void job_processing(private_thread_pool_t *this) +{ + + /* cancellation disabled by default */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + this->worker_logger->log(this->worker_logger, CONTROL, "started working"); + + for (;;) { + job_t *job; + job_type_t job_type; + + global_job_queue->get(global_job_queue, &job); + job_type = job->get_type(job); + this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s", mapping_find(job_type_m,job_type)); + + /* process them here */ + switch (job_type) + { + case INCOMING_PACKET: + { + packet_t *packet; + message_t *message; + ike_sa_t *ike_sa; + ike_sa_id_t *ike_sa_id; + status_t status; + incoming_packet_job_t *incoming_packet_job = (incoming_packet_job_t *)job; + + + if (incoming_packet_job->get_packet(incoming_packet_job,&packet) != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "packet in job %s could not be retrieved!", + mapping_find(job_type_m,job_type)); + break; + } + + message = message_create_from_packet(packet); + if (message == NULL) + { + this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!", + mapping_find(job_type_m,job_type)); + packet->destroy(packet); + break; + } + + status = message->parse_header(message); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!"); + message->destroy(message); + break; + } + + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s", + mapping_find(exchange_type_m, message->get_exchange_type(message)), + message->get_request(message) ? "request" : "reply"); + + if ((message->get_major_version(message) != IKE_MAJOR_VERSION) || + (message->get_minor_version(message) != IKE_MINOR_VERSION)) + { + this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported", + message->get_major_version(message), + message->get_minor_version(message)); + /* Todo send notify */ + } + + status = message->get_ike_sa_id(message, &ike_sa_id); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!"); + message->destroy(message); + break; + } + + ike_sa_id->switch_initiator(ike_sa_id); + + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + + status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out"); + ike_sa_id->destroy(ike_sa_id); + message->destroy(message); + break; + } + + status = ike_sa->process_message(ike_sa, message); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA"); + } + + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + ike_sa_id->destroy(ike_sa_id); + + status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed"); + } + message->destroy(message); + break; + } + case INITIATE_IKE_SA: + { + /* + * Initiatie an IKE_SA: + * - is defined by a name of a configuration + * - create an empty IKE_SA via manager + * - call initiate_connection on this sa + */ + initiate_ike_sa_job_t *initiate_job; + ike_sa_id_t *ike_sa_id; + ike_sa_t *ike_sa; + status_t status; + + initiate_job = (initiate_ike_sa_job_t *)job; + + ike_sa_id = ike_sa_id_create(0, 0, TRUE); + if (ike_sa_id == NULL) + { + this->worker_logger->log(this->worker_logger, ERROR, "%s by creating ike_sa_id_t, job rejected.", + mapping_find(status_m, status)); + break; + } + + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + + status = global_ike_sa_manager->checkout(global_ike_sa_manager, ike_sa_id, &ike_sa); + ike_sa_id->destroy(ike_sa_id); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.", + mapping_find(status_m, status)); + break; + } + + + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"", + initiate_job->get_configuration_name(initiate_job)); + status = ike_sa->initialize_connection(ike_sa, initiate_job->get_configuration_name(initiate_job)); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.", + mapping_find(status_m, status)); + global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa); + break; + } + + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA"); + status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.", + mapping_find(status_m, status)); + } + break; + } + case RETRANSMIT_REQUEST: + { + this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", mapping_find(job_type_m,job_type)); + break; + } + + case DELETE_IKE_SA: + { + delete_ike_sa_job_t *delete_ike_sa_job = (delete_ike_sa_job_t*) job; + ike_sa_id_t *ike_sa_id = delete_ike_sa_job->get_ike_sa_id(delete_ike_sa_job); + status_t status; + + + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + + status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)", + mapping_find(status_m, status)); + } + break; + + } + } + job->destroy(job); + } + +} + +/** + * implementation of thread_pool_t.get_pool_size + */ +static size_t get_pool_size(private_thread_pool_t *this) +{ + return this->pool_size; +} + +/** + * Implementation of thread_pool_t.destroy + */ +static status_t destroy(private_thread_pool_t *this) +{ + int current; + /* flag thread for termination */ + for (current = 0; current < this->pool_size; current++) { + this->pool_logger->log(this->pool_logger, CONTROL, "cancelling thread %u", this->threads[current]); + pthread_cancel(this->threads[current]); + } + + /* wait for all threads */ + for (current = 0; current < this->pool_size; current++) { + pthread_join(this->threads[current], NULL); + this->pool_logger->log(this->pool_logger, CONTROL, "thread %u terminated", this->threads[current]); + } + + /* free mem */ + global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger); + global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger); + allocator_free(this->threads); + allocator_free(this); + return SUCCESS; +} + +#include <stdio.h> + +/* + * see header + */ +thread_pool_t *thread_pool_create(size_t pool_size) +{ + int current; + + private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t); + + /* fill in public fields */ + this->public.destroy = (status_t(*)(thread_pool_t*))destroy; + this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size; + + this->function = job_processing; + this->pool_size = pool_size; + + this->threads = allocator_alloc(sizeof(pthread_t) * pool_size); + if (this->threads == NULL) + { + allocator_free(this); + return NULL; + } + this->pool_logger = global_logger_manager->create_logger(global_logger_manager,THREAD_POOL,NULL); + if (this->threads == NULL) + { + allocator_free(this); + allocator_free(this->threads); + return NULL; + } + this->worker_logger = global_logger_manager->create_logger(global_logger_manager,WORKER,NULL); + if (this->threads == NULL) + { + global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger); + allocator_free(this); + allocator_free(this->threads); + return NULL; + } + + /* try to create as many threads as possible, up tu pool_size */ + for (current = 0; current < pool_size; current++) + { + if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->function, this) == 0) + { + this->pool_logger->log(this->pool_logger, CONTROL, "thread %u created", this->threads[current]); + } + else + { + /* creation failed, is it the first one? */ + if (current == 0) + { + this->pool_logger->log(this->pool_logger, ERROR, "could not create any thread: %s\n", strerror(errno)); + global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger); + global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger); + allocator_free(this->threads); + allocator_free(this); + return NULL; + } + /* not all threads could be created, but at least one :-/ */ + this->pool_logger->log(this->pool_logger, CONTROL, "could only create %d from requested %d threads: %s\n", current, pool_size, strerror(errno)); + + this->pool_size = current; + return (thread_pool_t*)this; + } + } + return (thread_pool_t*)this; +} diff --git a/Source/charon/threads/thread_pool.h b/Source/charon/threads/thread_pool.h new file mode 100644 index 000000000..54022e4b4 --- /dev/null +++ b/Source/charon/threads/thread_pool.h @@ -0,0 +1,69 @@ +/** + * @file thread_pool.h + * + * @brief Thread pool with some threads processing the job_queue + * + */ + +/* + * Copyright (C) 2005 Jan Hutter, Martin Willi + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#ifndef THREAD_POOL_H_ +#define THREAD_POOL_H_ + +#include <stdlib.h> + +#include <types.h> + +/** + * @brief A thread_pool contains a pool of threads processing the job queue. + * + * Current implementation uses as many threads as specified in constructor. + * A more improved version would dynamically increase thread count if necessary... + */ +typedef struct thread_pool_s thread_pool_t; + +struct thread_pool_s { + /** + * @brief return currently instanciated threads + * + * @param thread_pool thread_pool_t object + * @return size of thread pool + */ + size_t (*get_pool_size) (thread_pool_t *thread_pool); + /** + * @brief destroy pool + * + * sends cancellation request to all threads and AWAITS their termination. + * + * @param thread_pool thread_pool_t object + * @return + * - SUCCESS in any case + */ + status_t (*destroy) (thread_pool_t *thread_pool); +}; + +/** + * @brief Create the thread pool using using pool_size of threads + * + * @param pool_size desired pool size + * @return + * - NULL if no threads could be created + * - thread_pool if one ore more threads could be instanciated + */ +thread_pool_t *thread_pool_create(size_t pool_size); + + +#endif /*THREAD_POOL_H_*/ |