diff options
author | Martin Willi <martin@strongswan.org> | 2005-11-25 13:42:58 +0000 |
---|---|---|
committer | Martin Willi <martin@strongswan.org> | 2005-11-25 13:42:58 +0000 |
commit | ca76df97365ec3421719bfbf9ecd3f33939ad108 (patch) | |
tree | 98f14e2944590fd642f3bfab0e03000ed64cde65 /Source/charon/threads/thread_pool.c | |
parent | 2a336de4cd0d2f9cbb2547dbe7b4ff278446b89a (diff) | |
download | strongswan-ca76df97365ec3421719bfbf9ecd3f33939ad108.tar.bz2 strongswan-ca76df97365ec3421719bfbf9ecd3f33939ad108.tar.xz |
- documentation and cleanup of threads package
Diffstat (limited to 'Source/charon/threads/thread_pool.c')
-rw-r--r-- | Source/charon/threads/thread_pool.c | 356 |
1 files changed, 200 insertions, 156 deletions
diff --git a/Source/charon/threads/thread_pool.c b/Source/charon/threads/thread_pool.c index 1ed3a20b5..ee8d50bc7 100644 --- a/Source/charon/threads/thread_pool.c +++ b/Source/charon/threads/thread_pool.c @@ -1,7 +1,7 @@ /** * @file thread_pool.c * - * @brief Thread pool with some threads processing the job_queue. + * @brief Implementation of thread_pool_t. * */ @@ -38,19 +38,45 @@ typedef struct private_thread_pool_t private_thread_pool_t; /** - * @brief structure with private members for thread_pool_t + * @brief Structure with private members for thread_pool_t. */ struct private_thread_pool_t { /** * inclusion of public members */ thread_pool_t public; + + /** + * @brief Main processing functino for worker threads. + * + * Gets a job from the job queue and calls corresponding + * function for processing. + * + * @param this private_thread_pool_t-Object + */ + void (*process_jobs) (private_thread_pool_t *this); + + /** + * @brief Process a INCOMING_PACKET_JOB. + * + * @param this private_thread_pool_t-Object + */ + void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job); + + /** + * @brief Process a INITIATE_IKE_SA_JOB. + * + * @param this private_thread_pool_t-Object + */ + void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job); + /** - * @brief Processing function of a worker thread + * @brief Process a DELETE_IKE_SA_JOB. * * @param this private_thread_pool_t-Object */ - void (*function) (private_thread_pool_t *this); + void (*process_delete_ike_sa_job) (private_thread_pool_t *this, delete_ike_sa_job_t *job); + /** * number of running threads */ @@ -64,7 +90,7 @@ struct private_thread_pool_t { */ logger_t *pool_logger; /** - * logger of the threadpool + * logger of the worker threads */ logger_t *worker_logger; } ; @@ -74,9 +100,8 @@ struct private_thread_pool_t { /** * implements private_thread_pool_t.function */ -static void job_processing(private_thread_pool_t *this) +static void process_jobs(private_thread_pool_t *this) { - /* cancellation disabled by default */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); this->worker_logger->log(this->worker_logger, CONTROL, "started working"); @@ -87,180 +112,196 @@ static void job_processing(private_thread_pool_t *this) global_job_queue->get(global_job_queue, &job); job_type = job->get_type(job); - this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s", mapping_find(job_type_m,job_type)); + this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s", + mapping_find(job_type_m,job_type)); - /* process them here */ switch (job_type) { case INCOMING_PACKET: { - packet_t *packet; - message_t *message; - ike_sa_t *ike_sa; - ike_sa_id_t *ike_sa_id; - status_t status; - incoming_packet_job_t *incoming_packet_job = (incoming_packet_job_t *)job; + this->process_incoming_packet_job(this, (incoming_packet_job_t*)job); + break; + } + case INITIATE_IKE_SA: + { + this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job); + break; + } + case DELETE_IKE_SA: + { + this->process_delete_ike_sa_job(this, (delete_ike_sa_job_t*)job); + break; + } + default: + { + this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", + mapping_find(job_type_m,job_type)); + break; + } + } + job->destroy(job); + } +} + +/** + * implementation of private_thread_pool_t.process_incoming_packet_job + */ +void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job) +{ + packet_t *packet; + message_t *message; + ike_sa_t *ike_sa; + ike_sa_id_t *ike_sa_id; + status_t status; - if (incoming_packet_job->get_packet(incoming_packet_job,&packet) != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "packet in job %s could not be retrieved!", - mapping_find(job_type_m,job_type)); - break; - } + if (job->get_packet(job,&packet) != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "packet in job could not be retrieved!"); + return; + } - message = message_create_from_packet(packet); - if (message == NULL) - { - this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!", - mapping_find(job_type_m,job_type)); - packet->destroy(packet); - break; - } + message = message_create_from_packet(packet); + if (message == NULL) + { + this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!"); + packet->destroy(packet); + return; + } - status = message->parse_header(message); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!"); - message->destroy(message); - break; - } + status = message->parse_header(message); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!"); + message->destroy(message); + return; + } - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s", - mapping_find(exchange_type_m, message->get_exchange_type(message)), - message->get_request(message) ? "request" : "reply"); + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s", + mapping_find(exchange_type_m, message->get_exchange_type(message)), + message->get_request(message) ? "request" : "reply"); - if ((message->get_major_version(message) != IKE_MAJOR_VERSION) || - (message->get_minor_version(message) != IKE_MINOR_VERSION)) - { - this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported", - message->get_major_version(message), - message->get_minor_version(message)); - /* Todo send notify */ - } + if ((message->get_major_version(message) != IKE_MAJOR_VERSION) || + (message->get_minor_version(message) != IKE_MINOR_VERSION)) + { + this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported", + message->get_major_version(message), + message->get_minor_version(message)); + /* Todo send notify */ + } - status = message->get_ike_sa_id(message, &ike_sa_id); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!"); - message->destroy(message); - break; - } + status = message->get_ike_sa_id(message, &ike_sa_id); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!"); + message->destroy(message); + return; + } - ike_sa_id->switch_initiator(ike_sa_id); + ike_sa_id->switch_initiator(ike_sa_id); - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", - ike_sa_id->get_initiator_spi(ike_sa_id), - ike_sa_id->get_responder_spi(ike_sa_id), - ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); - status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out"); - ike_sa_id->destroy(ike_sa_id); - message->destroy(message); - break; - } + status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out"); + ike_sa_id->destroy(ike_sa_id); + message->destroy(message); + return; + } - status = ike_sa->process_message(ike_sa, message); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA"); - } + status = ike_sa->process_message(ike_sa, message); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA"); + } - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s", - ike_sa_id->get_initiator_spi(ike_sa_id), - ike_sa_id->get_responder_spi(ike_sa_id), - ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); - ike_sa_id->destroy(ike_sa_id); + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + ike_sa_id->destroy(ike_sa_id); - status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed"); - } - message->destroy(message); - break; - } - case INITIATE_IKE_SA: - { - /* - * Initiatie an IKE_SA: - * - is defined by a name of a configuration - * - create an empty IKE_SA via manager - * - call initiate_connection on this sa - */ - initiate_ike_sa_job_t *initiate_job; - ike_sa_t *ike_sa; - status_t status; - - initiate_job = (initiate_ike_sa_job_t *)job; + status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed"); + } + message->destroy(message); +} + +/** + * implementation of private_thread_pool_t.process_initiate_ike_sa_job + */ +void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job) +{ + /* + * Initiatie an IKE_SA: + * - is defined by a name of a configuration + * - create an empty IKE_SA via manager + * - call initiate_connection on this sa + */ + ike_sa_t *ike_sa; + status_t status; + - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA"); - - status = global_ike_sa_manager->create_and_checkout(global_ike_sa_manager, &ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.", - mapping_find(status_m, status)); - break; - } + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA"); + status = global_ike_sa_manager->create_and_checkout(global_ike_sa_manager, &ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.", + mapping_find(status_m, status)); + return; + } - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"", - initiate_job->get_configuration_name(initiate_job)); - status = ike_sa->initialize_connection(ike_sa, initiate_job->get_configuration_name(initiate_job)); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.", - mapping_find(status_m, status)); - global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa); - break; - } - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA"); - status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.", - mapping_find(status_m, status)); - } - break; - } - case RETRANSMIT_REQUEST: - { - this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", mapping_find(job_type_m,job_type)); - break; - } - - case DELETE_IKE_SA: - { - delete_ike_sa_job_t *delete_ike_sa_job = (delete_ike_sa_job_t*) job; - ike_sa_id_t *ike_sa_id = delete_ike_sa_job->get_ike_sa_id(delete_ike_sa_job); - status_t status; - - - this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s", - ike_sa_id->get_initiator_spi(ike_sa_id), - ike_sa_id->get_responder_spi(ike_sa_id), - ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); - - status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)", - mapping_find(status_m, status)); - } - break; + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"", + job->get_configuration_name(job)); + status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job)); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.", + mapping_find(status_m, status)); + global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa); + return; + } - } - } - job->destroy(job); + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA"); + status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.", + mapping_find(status_m, status)); } +} +/** + * implementation of private_thread_pool_t.process_delete_ike_sa_job + */ +void process_delete_ike_sa_job(private_thread_pool_t *this, delete_ike_sa_job_t *job) +{ + status_t status; + ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job); + + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + + status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)", + mapping_find(status_m, status)); + } } + /** * implementation of thread_pool_t.get_pool_size */ @@ -310,7 +351,10 @@ thread_pool_t *thread_pool_create(size_t pool_size) this->public.destroy = (status_t(*)(thread_pool_t*))destroy; this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size; - this->function = job_processing; + this->process_jobs = process_jobs; + this->process_initiate_ike_sa_job = process_initiate_ike_sa_job; + this->process_delete_ike_sa_job = process_delete_ike_sa_job; + this->process_incoming_packet_job = process_incoming_packet_job; this->pool_size = pool_size; this->threads = allocator_alloc(sizeof(pthread_t) * pool_size); @@ -338,7 +382,7 @@ thread_pool_t *thread_pool_create(size_t pool_size) /* try to create as many threads as possible, up tu pool_size */ for (current = 0; current < pool_size; current++) { - if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->function, this) == 0) + if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0) { this->pool_logger->log(this->pool_logger, CONTROL, "thread %u created", this->threads[current]); } |