diff options
Diffstat (limited to 'Source/charon/threads')
-rw-r--r-- | Source/charon/threads/thread_pool.c | 103 |
1 files changed, 86 insertions, 17 deletions
diff --git a/Source/charon/threads/thread_pool.c b/Source/charon/threads/thread_pool.c index f6f5278f5..649679fc3 100644 --- a/Source/charon/threads/thread_pool.c +++ b/Source/charon/threads/thread_pool.c @@ -33,6 +33,7 @@ #include <queues/jobs/delete_ike_sa_job.h> #include <queues/jobs/incoming_packet_job.h> #include <queues/jobs/initiate_ike_sa_job.h> +#include <queues/jobs/retransmit_request_job.h> #include <utils/allocator.h> #include <utils/logger.h> @@ -58,27 +59,38 @@ struct private_thread_pool_t { void (*process_jobs) (private_thread_pool_t *this); /** - * @brief Process a INCOMING_PACKET_JOB. + * @brief Process a INCOMING_PACKET job. * - * @param this private_thread_pool_t-Object + * @param this private_thread_pool_t object + * @param job incoming_packet_job_t object */ void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job); /** - * @brief Process a INITIATE_IKE_SA_JOB. + * @brief Process a INITIATE_IKE_SA job. * - * @param this private_thread_pool_t-Object + * @param this private_thread_pool_t object + * @param job initiate_ike_sa_job_t object */ void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job); /** - * @brief Process a DELETE_IKE_SA_JOB. + * @brief Process a DELETE_IKE_SA job. * - * @param this private_thread_pool_t-Object + * @param this private_thread_pool_t object + * @param job delete_ike_sa_job_t object */ void (*process_delete_ike_sa_job) (private_thread_pool_t *this, delete_ike_sa_job_t *job); /** + * @brief Process a RETRANSMIT_REQUEST job. + * + * @param this private_thread_pool_t object + * @param job retransmit_request_job_t object + */ + void (*process_retransmit_request_job) (private_thread_pool_t *this, retransmit_request_job_t *job); + + /** * number of running threads */ size_t pool_size; @@ -99,10 +111,8 @@ struct private_thread_pool_t { logger_t *worker_logger; } ; - - /** - * implements private_thread_pool_t.function + * Implementation of private_thread_pool_t.process_jobs. */ static void process_jobs(private_thread_pool_t *this) { @@ -117,7 +127,7 @@ static void process_jobs(private_thread_pool_t *this) job = charon->job_queue->get(charon->job_queue); job_type = job->get_type(job); - this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s", + this->worker_logger->log(this->worker_logger, CONTROL|MORE, "Process job of type %s", mapping_find(job_type_m,job_type)); switch (job_type) @@ -125,31 +135,44 @@ static void process_jobs(private_thread_pool_t *this) case INCOMING_PACKET: { this->process_incoming_packet_job(this, (incoming_packet_job_t*)job); + job->destroy(job); break; } case INITIATE_IKE_SA: { this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job); + job->destroy(job); break; } case DELETE_IKE_SA: { this->process_delete_ike_sa_job(this, (delete_ike_sa_job_t*)job); + job->destroy(job); + break; + } + case RETRANSMIT_REQUEST: + { + this->process_retransmit_request_job(this, (retransmit_request_job_t*)job); + job->destroy(job); break; } default: { this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", mapping_find(job_type_m,job_type)); + job->destroy(job); break; } } - job->destroy(job); + + this->worker_logger->log(this->worker_logger, CONTROL|MORE, "Processing of job finished"); + + } } /** - * implementation of private_thread_pool_t.process_incoming_packet_job + * Implementation of private_thread_pool_t.process_incoming_packet_job. */ static void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job) { @@ -240,7 +263,7 @@ static void process_incoming_packet_job(private_thread_pool_t *this, incoming_pa } /** - * implementation of private_thread_pool_t.process_initiate_ike_sa_job + * Implementation of private_thread_pool_t.process_initiate_ike_sa_job. */ static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job) { @@ -279,7 +302,7 @@ static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ik } /** - * implementation of private_thread_pool_t.process_delete_ike_sa_job + * Implementation of private_thread_pool_t.process_delete_ike_sa_job. */ static void process_delete_ike_sa_job(private_thread_pool_t *this, delete_ike_sa_job_t *job) { @@ -299,9 +322,54 @@ static void process_delete_ike_sa_job(private_thread_pool_t *this, delete_ike_sa } } +/** + * Implementation of private_thread_pool_t.process_retransmit_request_job. + */ +static void process_retransmit_request_job(private_thread_pool_t *this, retransmit_request_job_t *job) +{ + status_t status; + ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job); + u_int32_t message_id = job->get_message_id(job); + ike_sa_t *ike_sa; + + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + + status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out. Allready deleted?"); + return; + } + + status = ike_sa->retransmit_request(ike_sa, message_id); + + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, CONTROL | MOST, "Message does'nt have to be retransmitted"); + } + + this->worker_logger->log(this->worker_logger, CONTROL|MOST, "Checkin IKE SA %lld:%lld, role %s", + ike_sa_id->get_initiator_spi(ike_sa_id), + ike_sa_id->get_responder_spi(ike_sa_id), + ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); + + status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); + if (status != SUCCESS) + { + this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!"); + } +/* + u_int32_t message_id = message->get_message_id(message); + retransmit_request_job_t *new_job = retransmit_request_job_create(message_id,ike_sa_id); + charon->event_queue->add_relative(charon->event_queue,(job_t *) new_job,5000);*/ + +} /** - * implementation of thread_pool_t.get_pool_size + * Implementation of thread_pool_t.get_pool_size. */ static size_t get_pool_size(private_thread_pool_t *this) { @@ -309,7 +377,7 @@ static size_t get_pool_size(private_thread_pool_t *this) } /** - * Implementation of thread_pool_t.destroy + * Implementation of thread_pool_t.destroy. */ static void destroy(private_thread_pool_t *this) { @@ -334,7 +402,7 @@ static void destroy(private_thread_pool_t *this) } /* - * see header + * Described in header. */ thread_pool_t *thread_pool_create(size_t pool_size) { @@ -350,6 +418,7 @@ thread_pool_t *thread_pool_create(size_t pool_size) this->process_initiate_ike_sa_job = process_initiate_ike_sa_job; this->process_delete_ike_sa_job = process_delete_ike_sa_job; this->process_incoming_packet_job = process_incoming_packet_job; + this->process_retransmit_request_job = process_retransmit_request_job; this->pool_size = pool_size; this->threads = allocator_alloc(sizeof(pthread_t) * pool_size); |