diff options
author | Martin Willi <martin@strongswan.org> | 2005-11-25 13:42:58 +0000 |
---|---|---|
committer | Martin Willi <martin@strongswan.org> | 2005-11-25 13:42:58 +0000 |
commit | ca76df97365ec3421719bfbf9ecd3f33939ad108 (patch) | |
tree | 98f14e2944590fd642f3bfab0e03000ed64cde65 /Source/charon | |
parent | 2a336de4cd0d2f9cbb2547dbe7b4ff278446b89a (diff) | |
download | strongswan-ca76df97365ec3421719bfbf9ecd3f33939ad108.tar.bz2 strongswan-ca76df97365ec3421719bfbf9ecd3f33939ad108.tar.xz |
- documentation and cleanup of threads package
Diffstat (limited to 'Source/charon')
-rw-r--r-- | Source/charon/definitions.h | 9 | ||||
-rw-r--r-- | Source/charon/threads/receiver.c | 25 | ||||
-rw-r--r-- | Source/charon/threads/receiver.h | 30 | ||||
-rw-r--r-- | Source/charon/threads/scheduler.c | 26 | ||||
-rw-r--r-- | Source/charon/threads/scheduler.h | 26 | ||||
-rw-r--r-- | Source/charon/threads/sender.c | 27 | ||||
-rw-r--r-- | Source/charon/threads/sender.h | 23 | ||||
-rw-r--r-- | Source/charon/threads/thread_pool.c | 356 | ||||
-rw-r--r-- | Source/charon/threads/thread_pool.h | 22 |
9 files changed, 336 insertions, 208 deletions
diff --git a/Source/charon/definitions.h b/Source/charon/definitions.h index d170cef09..03fd74b29 100644 --- a/Source/charon/definitions.h +++ b/Source/charon/definitions.h @@ -110,6 +110,15 @@ * * Generic helper classes. */ + +/** + * @addtogroup threads + * + * Threaded classes, which will do their + * job alone. + */ + + /** * macro gives back larger of two values diff --git a/Source/charon/threads/receiver.c b/Source/charon/threads/receiver.c index da751dca9..b46949bf2 100644 --- a/Source/charon/threads/receiver.c +++ b/Source/charon/threads/receiver.c @@ -1,7 +1,7 @@ /** * @file receiver.c * - * @brief Implements the Receiver Thread encapsulated in the receiver_t object + * @brief Implementation of receiver_t. * */ @@ -44,6 +44,13 @@ struct private_receiver_t { * Public part of a receiver object */ receiver_t public; + + /** + * @brief Thread function started at creation of the receiver object. + * + * @param this assigned receiver object + */ + void (*receive_packets) (private_receiver_t *this); /** * Assigned thread to the receiver_t object @@ -58,12 +65,9 @@ struct private_receiver_t { }; /** - * Thread function started at creation of the receiver object - * - * @param this assigned receiver object - * @return SUCCESS if thread_function ended successfully, FAILED otherwise + * implements private_receiver_t.receive_packets */ -static void receiver_thread_function(private_receiver_t * this) +static void receive_packets(private_receiver_t * this) { packet_t * current_packet; job_t *current_job; @@ -108,12 +112,15 @@ static status_t destroy(private_receiver_t *this) return SUCCESS; } - +/* + * see header + */ receiver_t * receiver_create() { private_receiver_t *this = allocator_alloc_thing(private_receiver_t); this->public.destroy = (status_t(*)(receiver_t*)) destroy; + this->receive_packets = receive_packets; this->logger = global_logger_manager->create_logger(global_logger_manager, RECEIVER, NULL); if (this->logger == NULL) @@ -121,9 +128,9 @@ receiver_t * receiver_create() allocator_free(this); } - if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))receiver_thread_function, this) != 0) + if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))this->receive_packets, this) != 0) { - /* thread could not be created */ + this->logger->log(this->logger, ERROR, "Receiver thread could not be started"); global_logger_manager->destroy_logger(global_logger_manager, this->logger); allocator_free(this); return NULL; diff --git a/Source/charon/threads/receiver.h b/Source/charon/threads/receiver.h index 2f330adbf..9b02165d3 100644 --- a/Source/charon/threads/receiver.h +++ b/Source/charon/threads/receiver.h @@ -1,7 +1,7 @@ /** * @file receiver.h * - * @brief Implements the Receiver Thread encapsulated in the receiver_t object + * @brief Interface of receiver_t. * */ @@ -28,20 +28,38 @@ typedef struct receiver_t receiver_t; /** - * @brief A Receiver object which receives packets on the socket and adds them to the job-queue + * @brief Receives packets from the socket and adds them to the job queue. + * + * The receiver starts a thread, wich reads on the blocking socket. If + * there is data available, a packet_t is created from the data, wrapped + * in an incoming_packet_job_t and added to the job queue. + * + * @ingroup threads */ struct receiver_t { /** - * @brief Destroys a receiver object + * @brief Destroys a receiver_t * - * @param receiver receiver object - * @return SUCCESSFUL if succeeded, FAILED otherwise + * @param receiver receiver object + * @return + * - SUCCESS in any case */ status_t (*destroy) (receiver_t *receiver); }; - +/** + * @brief Create a receiver. + * + * The receiver thread will start working, get data + * from the socket and add those packets to the job queue. + * + * @return + * - created receiver_t, or + * - NULL of thread could not be started + * + * @ingroup threads + */ receiver_t * receiver_create(); #endif /*RECEIVER_H_*/ diff --git a/Source/charon/threads/scheduler.c b/Source/charon/threads/scheduler.c index 789e39aa6..8349d3611 100644 --- a/Source/charon/threads/scheduler.c +++ b/Source/charon/threads/scheduler.c @@ -1,7 +1,7 @@ /** * @file scheduler.c * - * @brief implements the scheduler, looks for jobs in event-queue + * @brief Implementation of scheduler_t. * */ @@ -41,6 +41,16 @@ struct private_scheduler_t { * Public part of a scheduler object */ scheduler_t public; + + + /** + * @brief Get events from the event queue and add them to to job queue. + * + * Thread function started at creation of the scheduler object. + * + * @param this assigned scheduler object + */ + void (*get_events) (private_scheduler_t *this); /** * Assigned thread to the scheduler_t object @@ -55,12 +65,9 @@ struct private_scheduler_t { }; /** - * Thread function started at creation of the scheduler object - * - * @param this assigned scheduler object - * @return SUCCESS if thread_function ended successfully, FAILED otherwise + * implements private_scheduler_t.get_events */ -static void scheduler_thread_function(private_scheduler_t * this) +static void get_events(private_scheduler_t * this) { /* cancellation disabled by default */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); @@ -73,7 +80,8 @@ static void scheduler_thread_function(private_scheduler_t * this) global_event_queue->get(global_event_queue, ¤t_job); /* queue the job in the job queue, workers will eat them */ global_job_queue->add(global_job_queue, current_job); - this->logger->log(this->logger, CONTROL, "got event, added job %s to job-queue.", mapping_find(job_type_m, current_job->get_type(current_job))); + this->logger->log(this->logger, CONTROL, "got event, added job %s to job-queue.", + mapping_find(job_type_m, current_job->get_type(current_job))); } } @@ -100,6 +108,7 @@ scheduler_t * scheduler_create() private_scheduler_t *this = allocator_alloc_thing(private_scheduler_t); this->public.destroy = (status_t(*)(scheduler_t*)) destroy; + this->get_events = get_events; this->logger = global_logger_manager->create_logger(global_logger_manager, SCHEDULER, NULL); if (this->logger == NULL) @@ -108,9 +117,10 @@ scheduler_t * scheduler_create() return NULL; } - if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))scheduler_thread_function, this) != 0) + if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))this->get_events, this) != 0) { /* thread could not be created */ + this->logger->log(this->logger, ERROR, "Scheduler thread could not be created!"); global_logger_manager->destroy_logger(global_logger_manager, this->logger); allocator_free(this); return NULL; diff --git a/Source/charon/threads/scheduler.h b/Source/charon/threads/scheduler.h index 1bd9e62cd..9eeccbbf9 100644 --- a/Source/charon/threads/scheduler.h +++ b/Source/charon/threads/scheduler.h @@ -1,7 +1,7 @@ /** * @file scheduler.h * - * @brief implements the scheduler, looks for jobs in event-queue + * @brief Interface of scheduler_t. * */ @@ -31,20 +31,34 @@ typedef struct scheduler_t scheduler_t; * @brief The scheduler, looks for timed events in event-queue and adds them * to the job-queue. * - * Starts a thread which does the work, since event-queue is blocking + * Starts a thread which does the work, since event-queue is blocking. + * + * @ingroup threads */ struct scheduler_t { /** - * @brief Destroys a scheduler object + * @brief Destroys a scheduler object. * - * @param scheduler scheduler object - * @return SUCCESSFUL if succeeded, FAILED otherwise + * @param scheduler scheduler object + * @return + * - SUCCESS in any case */ status_t (*destroy) (scheduler_t *scheduler); }; - +/** + * @brief Create a scheduler with its thread. + * + * The thread will start to get jobs form the event queue + * and adds them to the job queue. + * + * @return + * - the created scheduler_t instance, or + * - NULL if thread could not be started + * + * @ingroup threads + */ scheduler_t * scheduler_create(); #endif /*SCHEDULER_H_*/ diff --git a/Source/charon/threads/sender.c b/Source/charon/threads/sender.c index 94ebda37c..5ad00e00f 100644 --- a/Source/charon/threads/sender.c +++ b/Source/charon/threads/sender.c @@ -1,7 +1,7 @@ /** * @file sender.c * - * @brief Implements the Sender Thread encapsulated in the sender_t object + * @brief Implementation of sender_t. * */ @@ -49,6 +49,13 @@ struct private_sender_t { pthread_t assigned_thread; /** + * @brief The threads function, sends out packets. + * + * @param this assigned sender object + */ + void (*send_packets) (private_sender_t * this); + + /** * logger for this sender */ logger_t *logger; @@ -56,12 +63,9 @@ struct private_sender_t { }; /** - * Thread function started at creation of the sender object - * - * @param this assigned sender object - * @return SUCCESS if thread_function ended successfully, FAILED otherwise + * implements private_sender_t.send_packets */ -static void sender_thread_function(private_sender_t * this) +static void send_packets(private_sender_t * this) { packet_t * current_packet; status_t status; @@ -86,7 +90,7 @@ static void sender_thread_function(private_sender_t * this) } /** - * Implementation of sender_t's destroy function + * implements sender_t.destroy */ static status_t destroy(private_sender_t *this) { @@ -102,11 +106,14 @@ static status_t destroy(private_sender_t *this) return SUCCESS; } - +/* + * see header + */ sender_t * sender_create() { private_sender_t *this = allocator_alloc_thing(private_sender_t); + this->send_packets = send_packets; this->public.destroy = (status_t(*)(sender_t*)) destroy; this->logger = global_logger_manager->create_logger(global_logger_manager, SENDER, NULL); @@ -116,9 +123,9 @@ sender_t * sender_create() return NULL; } - if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))sender_thread_function, this) != 0) + if (pthread_create(&(this->assigned_thread), NULL, (void*(*)(void*))this->send_packets, this) != 0) { - /* thread could not be created */ + this->logger->log(this->logger, ERROR, "Sender thread could not be created"); allocator_free(this); return NULL; } diff --git a/Source/charon/threads/sender.h b/Source/charon/threads/sender.h index c57213ce3..5ddad80d5 100644 --- a/Source/charon/threads/sender.h +++ b/Source/charon/threads/sender.h @@ -1,7 +1,7 @@ /** * @file sender.h * - * @brief Implements the Sender Thread encapsulated in the sender_t object + * @brief Interface of sender_t. * */ @@ -28,20 +28,35 @@ typedef struct sender_t sender_t; /** - * @brief A Sender object which sends packets on the socket + * @brief Sends packets over the socket. + * + * @ingroup threads */ struct sender_t { /** * @brief Destroys a sender object * - * @param sender sender object - * @return SUCCESSFUL if succeeded, FAILED otherwise + * @param sender sender object + * @return + * - SUCCESS in any case */ status_t (*destroy) (sender_t *sender); }; +/** + * @brief Create the sender thread. + * + * The thread will start to work, getting packets + * from the send queue and sends them out. + * + * @return + * - created sender_t, or + * - NULL of thread could not be started + * + * @ingroup threads + */ sender_t * sender_create(); #endif /*SENDER_H_*/ diff --git a/Source/charon/threads/thread_pool.c b/Source/charon/threads/thread_pool.c index 1ed3a20b5..ee8d50bc7 100644 --- a/Source/charon/threads/thread_pool.c +++ b/Source/charon/threads/thread_pool.c @@ -1,7 +1,7 @@ /** * @file thread_pool.c * - * @brief Thread pool with some threads processing the job_queue. + * @brief Implementation of thread_pool_t. * */ @@ -38,19 +38,45 @@ typedef struct private_thread_pool_t private_thread_pool_t; /** - * @brief structure with private members for thread_pool_t + * @brief Structure with private members for thread_pool_t. */ struct private_thread_pool_t { /** * inclusion of public members */ thread_pool_t public; + + /** + * @brief Main processing functino for worker threads. + * + * Gets a job from the job queue and calls corresponding + * function for processing. + * + * @param this private_thread_pool_t-Object + */ + void (*process_jobs) (private_thread_pool_t *this); + + /** + * @brief Process a INCOMING_PACKET_JOB. + * + * @param this private_thread_pool_t-Object + */ + void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job); + + /** + * @brief Process a INITIATE_IKE_SA_JOB. + * + * @param this private_thread_pool_t-Object + */ + void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job); + /** - * @brief Processing function of a worker thread + * @brief Process a DELETE_IKE_SA_JOB. * * @param this private_thread_pool_t-Object */ - void (*function) (private_thread_pool_t *this); + void (*process_delete_ike_sa_job) (private_thread_pool_t *this, delete_ike_sa_job_t *job); + /** * number of running threads */ @@ -64,7 +90,7 @@ struct private_thread_pool_t { */ logger_t *pool_logger; /** - * logger of the threadpool + * logger of the worker threads */ logger_t *worker_logger; } ; @@ -74,9 +100,8 @@ struct private_thread_pool_t { /** * implements private_thread_pool_t.function */ -static void job_processing(private_thread_pool_t *this) +static void process_jobs(private_thread_pool_t *this) { - /* cancellation disabled by default */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); this->worker_logger->log(this->worker_logger, CONTROL, "started working"); @@ -87,180 +112,196 @@ static void job_processing(private_thread_pool_t *this) global_job_queue->get(global_job_queue, &job); job_type = job->get_type(job); - this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s", mapping_find(job_type_m,job_type)); + this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s", + mapping_find(job_type_m,job_type)); - /* process them here */ switch (job_type) { case INCOMING_PACKET: { - packet_t *packet; - message_t *message; - ike_sa_t *ike_sa; - ike_sa_id_t *ike_sa_id; - status_t status; - incoming_packet_job_t *incoming_packet_job = (incoming_packet_job_t *)job; + this->process_incoming_packet_job(this, (incoming_packet_job_t*)job); + break; + } + case INITIATE_IKE_SA: + { + this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job); + break; + } + case DELETE_IKE_SA: + { + this->process_delete_ike_sa_job(this, (delete_ike_sa_job_t*)job); + break; + } + default: + { + this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", + mapping_find(job_type_m,job_type)); + break; + } + } + job->destroy(job); + } +} + +/** + * implementation of private_thread_pool_t.process_incoming_packet_job + */ +void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job) +{ + packet_t *packet; + message_t *message; + ike_sa_t *ike_sa; + ike_sa_id_t *ike_sa_id; + status_t status; - if (incoming_packet_job->get_packet(incoming_packet_job,&packet) != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "packet in job %s could not be retrieved!", - mapping_find(job_type_m,job_type)); - break; - } + if (job->get_packet(job,&packet) != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "packet in job could not be retrieved!"); + return; + } - message = message_create_from_packet(packet); - if (message == NULL) - { - this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!", - mapping_find(job_type_m,job_type)); - packet->destroy(packet); - break; - } + message = message_create_from_packet(packet); + if (message == NULL) + { + this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!"); + packet->destroy(packet); + return; + } - status = message->parse_header(message); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!"); - message->destroy(message); - break; - } + status = message->parse_header(message); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!"); + message->destroy(message); + return; + } - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s", - mapping_find(exchange_type_m, message->get_exchange_type(message)), - message->get_request(message) ? "request" : "reply"); + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s", + mapping_find(exchange_type_m, message->get_exchange_type(message)), + message->get_request(message) ? "request" : "reply"); - if ((message->get_major_version(message) != IKE_MAJOR_VERSION) || - (message->get_minor_version(message) != IKE_MINOR_VERSION)) - { - this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported", - message->get_major_version(message), - message->get_minor_version(message)); - /* Todo send notify */ - } + if ((message->get_major_version(message) != IKE_MAJOR_VERSION) || + (message->get_minor_version(message) != IKE_MINOR_VERSION)) + { + this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported", + message->get_major_version(message), + message->get_minor_version(message)); + /* Todo send notify */ + } - status = message->get_ike_sa_id(message, &ike_sa_id); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!"); - message->destroy(message); - break; - } + status = message->get_ike_sa_id(message, &ike_sa_id); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!"); + message->destroy(message); + return; + } - ike_sa_id->switch_initiator(ike_sa_id); + ike_sa_id->switch_initiator(ike_sa_id); - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", - ike_sa_id->get_initiator_spi(ike_sa_id), - ike_sa_id->get_responder_spi(ike_sa_id), - ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); - status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out"); - ike_sa_id->destroy(ike_sa_id); - message->destroy(message); - break; - } + status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out"); + ike_sa_id->destroy(ike_sa_id); + message->destroy(message); + return; + } - status = ike_sa->process_message(ike_sa, message); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA"); - } + status = ike_sa->process_message(ike_sa, message); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA"); + } - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s", - ike_sa_id->get_initiator_spi(ike_sa_id), - ike_sa_id->get_responder_spi(ike_sa_id), - ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); - ike_sa_id->destroy(ike_sa_id); + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + ike_sa_id->destroy(ike_sa_id); - status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed"); - } - message->destroy(message); - break; - } - case INITIATE_IKE_SA: - { - /* - * Initiatie an IKE_SA: - * - is defined by a name of a configuration - * - create an empty IKE_SA via manager - * - call initiate_connection on this sa - */ - initiate_ike_sa_job_t *initiate_job; - ike_sa_t *ike_sa; - status_t status; - - initiate_job = (initiate_ike_sa_job_t *)job; + status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed"); + } + message->destroy(message); +} + +/** + * implementation of private_thread_pool_t.process_initiate_ike_sa_job + */ +void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job) +{ + /* + * Initiatie an IKE_SA: + * - is defined by a name of a configuration + * - create an empty IKE_SA via manager + * - call initiate_connection on this sa + */ + ike_sa_t *ike_sa; + status_t status; + - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA"); - - status = global_ike_sa_manager->create_and_checkout(global_ike_sa_manager, &ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.", - mapping_find(status_m, status)); - break; - } + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA"); + status = global_ike_sa_manager->create_and_checkout(global_ike_sa_manager, &ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.", + mapping_find(status_m, status)); + return; + } - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"", - initiate_job->get_configuration_name(initiate_job)); - status = ike_sa->initialize_connection(ike_sa, initiate_job->get_configuration_name(initiate_job)); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.", - mapping_find(status_m, status)); - global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa); - break; - } - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA"); - status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.", - mapping_find(status_m, status)); - } - break; - } - case RETRANSMIT_REQUEST: - { - this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", mapping_find(job_type_m,job_type)); - break; - } - - case DELETE_IKE_SA: - { - delete_ike_sa_job_t *delete_ike_sa_job = (delete_ike_sa_job_t*) job; - ike_sa_id_t *ike_sa_id = delete_ike_sa_job->get_ike_sa_id(delete_ike_sa_job); - status_t status; - - - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s", - ike_sa_id->get_initiator_spi(ike_sa_id), - ike_sa_id->get_responder_spi(ike_sa_id), - ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); - - status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)", - mapping_find(status_m, status)); - } - break; + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"", + job->get_configuration_name(job)); + status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job)); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.", + mapping_find(status_m, status)); + global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa); + return; + } - } - } - job->destroy(job); + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA"); + status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.", + mapping_find(status_m, status)); } +} +/** + * implementation of private_thread_pool_t.process_delete_ike_sa_job + */ +void process_delete_ike_sa_job(private_thread_pool_t *this, delete_ike_sa_job_t *job) +{ + status_t status; + ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job); + + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + + status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)", + mapping_find(status_m, status)); + } } + /** * implementation of thread_pool_t.get_pool_size */ @@ -310,7 +351,10 @@ thread_pool_t *thread_pool_create(size_t pool_size) this->public.destroy = (status_t(*)(thread_pool_t*))destroy; this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size; - this->function = job_processing; + this->process_jobs = process_jobs; + this->process_initiate_ike_sa_job = process_initiate_ike_sa_job; + this->process_delete_ike_sa_job = process_delete_ike_sa_job; + this->process_incoming_packet_job = process_incoming_packet_job; this->pool_size = pool_size; this->threads = allocator_alloc(sizeof(pthread_t) * pool_size); @@ -338,7 +382,7 @@ thread_pool_t *thread_pool_create(size_t pool_size) /* try to create as many threads as possible, up tu pool_size */ for (current = 0; current < pool_size; current++) { - if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->function, this) == 0) + if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0) { this->pool_logger->log(this->pool_logger, CONTROL, "thread %u created", this->threads[current]); } diff --git a/Source/charon/threads/thread_pool.h b/Source/charon/threads/thread_pool.h index 609fb04d2..aac803ab2 100644 --- a/Source/charon/threads/thread_pool.h +++ b/Source/charon/threads/thread_pool.h @@ -1,7 +1,7 @@ /** * @file thread_pool.h * - * @brief Thread pool with some threads processing the job_queue + * @brief Interface for thread_pool_t. * */ @@ -34,18 +34,20 @@ typedef struct thread_pool_t thread_pool_t; * @brief A thread_pool contains a pool of threads processing the job queue. * * Current implementation uses as many threads as specified in constructor. - * A more improved version would dynamically increase thread count if necessary... + * A more improved version would dynamically increase thread count if necessary. + * + * @ingroup threads */ struct thread_pool_t { /** - * @brief return currently instanciated threads + * @brief Return currently instanciated threads. * * @param thread_pool thread_pool_t object * @return size of thread pool */ size_t (*get_pool_size) (thread_pool_t *thread_pool); /** - * @brief destroy pool + * @brief Destroy a thread_pool_t. * * sends cancellation request to all threads and AWAITS their termination. * @@ -57,12 +59,14 @@ struct thread_pool_t { }; /** - * @brief Create the thread pool using using pool_size of threads - * + * @brief Create the thread pool using using pool_size of threads. + * * @param pool_size desired pool size - * @return - * - NULL if no threads could be created - * - thread_pool if one ore more threads could be instanciated + * @return + * - thread_pool_t if one ore more threads could be started, or + * - NULL if no threads could be created + * + * @ingroup threads */ thread_pool_t *thread_pool_create(size_t pool_size); |