diff options
Diffstat (limited to 'src/charon/threads')
-rw-r--r-- | src/charon/threads/kernel_interface.c | 82 | ||||
-rw-r--r-- | src/charon/threads/kernel_interface.h | 32 | ||||
-rw-r--r-- | src/charon/threads/thread_pool.c | 430 |
3 files changed, 97 insertions, 447 deletions
diff --git a/src/charon/threads/kernel_interface.c b/src/charon/threads/kernel_interface.c index 679cf69ee..d88fa3f29 100644 --- a/src/charon/threads/kernel_interface.c +++ b/src/charon/threads/kernel_interface.c @@ -36,6 +36,8 @@ #include <daemon.h> #include <utils/linked_list.h> +#include <queues/jobs/delete_child_sa_job.h> +#include <queues/jobs/rekey_child_sa_job.h> #define KERNEL_ESP 50 @@ -101,6 +103,8 @@ struct netlink_message_t { struct xfrm_userpolicy_id policy_id; /** message for policy installation */ struct xfrm_userpolicy_info policy; + /** expire message sent from kernel */ + struct xfrm_user_expire expire; }; u_int8_t data[XFRM_DATA_LENGTH]; }; @@ -205,7 +209,6 @@ mapping_t kernel_integrity_algs_m[] = { {MAPPING_END, NULL} }; - /** * Implementation of kernel_interface_t.get_spi. */ @@ -272,6 +275,8 @@ static status_t add_sa( private_kernel_interface_t *this, u_int32_t spi, int protocol, u_int32_t reqid, + u_int64_t expire_soft, + u_int64_t expire_hard, encryption_algorithm_t enc_alg, chunk_t encryption_key, integrity_algorithm_t int_alg, @@ -296,10 +301,16 @@ static status_t add_sa( private_kernel_interface_t *this, request.sa.mode = TRUE; /* tunnel mode */ request.sa.replay_window = 32; request.sa.reqid = reqid; + /* we currently do not expire SAs by volume/packet count */ request.sa.lft.soft_byte_limit = XFRM_INF; - request.sa.lft.soft_packet_limit = XFRM_INF; request.sa.lft.hard_byte_limit = XFRM_INF; + request.sa.lft.soft_packet_limit = XFRM_INF; request.sa.lft.hard_packet_limit = XFRM_INF; + /* we use lifetimes since added, not since used */ + request.sa.lft.soft_add_expires_seconds = expire_soft; + request.sa.lft.hard_add_expires_seconds = expire_hard; + request.sa.lft.soft_use_expires_seconds = 0; + request.sa.lft.hard_use_expires_seconds = 0; request.hdr.nlmsg_len = NLMSG_ALIGN(NLMSG_LENGTH(sizeof(request.sa))); @@ -435,10 +446,15 @@ static status_t add_policy(private_kernel_interface_t *this, request.policy.action = XFRM_POLICY_ALLOW; request.policy.share = XFRM_SHARE_ANY; + /* policies currently don't expire */ request.policy.lft.soft_byte_limit = XFRM_INF; request.policy.lft.soft_packet_limit = XFRM_INF; request.policy.lft.hard_byte_limit = XFRM_INF; request.policy.lft.hard_packet_limit = XFRM_INF; + request.sa.lft.soft_add_expires_seconds = 0; + request.sa.lft.hard_add_expires_seconds = 0; + request.sa.lft.soft_use_expires_seconds = 0; + request.sa.lft.hard_use_expires_seconds = 0; if (esp || ah) { @@ -645,30 +661,58 @@ static void receive_messages(private_kernel_interface_t *this) /* not from kernel. not interested, try another one */ continue; } + /* good message, handle it */ break; } - /* got a valid message. - * requests are handled on our own, - * responses are listed for the requesters + /* we handle ACQUIRE and EXPIRE messages directly */ - if (response.hdr.nlmsg_flags & NLM_F_REQUEST) + if (response.hdr.nlmsg_type == XFRM_MSG_ACQUIRE) + { + this->logger->log(this->logger, CONTROL, + "Received a XFRM_MSG_ACQUIRE. Ignored"); + } + else if (response.hdr.nlmsg_type == XFRM_MSG_EXPIRE) { - /* handle request */ + job_t *job; + this->logger->log(this->logger, CONTROL|LEVEL1, + "Received a XFRM_MSG_EXPIRE"); + this->logger->log(this->logger, CONTROL|LEVEL0, + "creating %s job for CHILD_SA with reqid %d", + response.expire.hard ? "delete" : "rekey", + response.expire.state.reqid); + if (response.expire.hard) + { + job = (job_t*)delete_child_sa_job_create( + response.expire.state.reqid); + } + else + { + job = (job_t*)rekey_child_sa_job_create( + response.expire.state.reqid); + } + charon->job_queue->add(charon->job_queue, job); } - else + /* NLMSG_ERROR is send back for acknowledge (or on error), an + * XFRM_MSG_NEWSA is returned when we alloc spis. + * list these responses for the sender + */ + else if (response.hdr.nlmsg_type == NLMSG_ERROR || + response.hdr.nlmsg_type == XFRM_MSG_NEWSA) { - /* add response to queue */ + /* add response to queue */ listed_response = malloc(sizeof(response)); memcpy(listed_response, &response, sizeof(response)); - + pthread_mutex_lock(&(this->mutex)); this->responses->insert_last(this->responses, (void*)listed_response); pthread_mutex_unlock(&(this->mutex)); /* signal ALL waiting threads */ pthread_cond_broadcast(&(this->condvar)); } - /* get the next one */ + /* we are not interested in anything other. + * anyway, move on to the next message */ + continue; } } @@ -689,11 +733,12 @@ static void destroy(private_kernel_interface_t *this) */ kernel_interface_t *kernel_interface_create() { + struct sockaddr_nl addr; private_kernel_interface_t *this = malloc_thing(private_kernel_interface_t); /* public functions */ this->public.get_spi = (status_t(*)(kernel_interface_t*,host_t*,host_t*,protocol_id_t,u_int32_t,u_int32_t*))get_spi; - this->public.add_sa = (status_t(*)(kernel_interface_t *,host_t*,host_t*,u_int32_t,protocol_id_t,u_int32_t,encryption_algorithm_t,chunk_t,integrity_algorithm_t,chunk_t,bool))add_sa; + this->public.add_sa = (status_t(*)(kernel_interface_t *,host_t*,host_t*,u_int32_t,protocol_id_t,u_int32_t,u_int64_t,u_int64_t,encryption_algorithm_t,chunk_t,integrity_algorithm_t,chunk_t,bool))add_sa; this->public.add_policy = (status_t(*)(kernel_interface_t*,host_t*, host_t*,host_t*,host_t*,u_int8_t,u_int8_t,int,int,bool,bool,u_int32_t))add_policy; this->public.del_sa = (status_t(*)(kernel_interface_t*,host_t*,u_int32_t,protocol_id_t))del_sa; this->public.del_policy = (status_t(*)(kernel_interface_t*,host_t*,host_t*,host_t*,host_t*,u_int8_t,u_int8_t,int,int))del_policy; @@ -709,6 +754,8 @@ kernel_interface_t *kernel_interface_create() pthread_mutex_init(&(this->mutex),NULL); pthread_cond_init(&(this->condvar),NULL); this->seq = 0; + + /* open netlink socket */ this->socket = socket(PF_NETLINK, SOCK_RAW, NETLINK_XFRM); if (this->socket <= 0) { @@ -716,6 +763,17 @@ kernel_interface_t *kernel_interface_create() free(this); charon->kill(charon, "Unable to create netlink socket"); } + /* bind the socket and reqister for ACQUIRE & EXPIRE */ + addr.nl_family = AF_NETLINK; + addr.nl_pid = getpid(); + addr.nl_groups = XFRMGRP_ACQUIRE | XFRMGRP_EXPIRE; + if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) != 0) + { + this->responses->destroy(this->responses); + close(this->socket); + free(this); + charon->kill(charon, "Unable to bind netlink socket"); + } if (pthread_create(&(this->thread), NULL, (void*(*)(void*))this->receive_messages, this) != 0) { diff --git a/src/charon/threads/kernel_interface.h b/src/charon/threads/kernel_interface.h index b3ca13faa..6c9a181ed 100644 --- a/src/charon/threads/kernel_interface.h +++ b/src/charon/threads/kernel_interface.h @@ -68,30 +68,34 @@ struct kernel_interface_t { * * add_sa() may update an already allocated * SPI (via get_spi). In this case, the replace - * flag must be set. + * flag must be set. * This function does install a single SA for a * single protocol in one direction. * - * @param this calling object - * @param src source address for this SA - * @param dst destination address for this SA - * @param spi SPI allocated by us or remote peer - * @param protocol protocol for this SA (ESP/AH) - * @param reqid unique ID for this SA - * @param enc_alg Algorithm to use for encryption (ESP only) - * @param enc_key Key to use for encryption - * @param int_alg Algorithm to use for integrity protection - * @param int_key Key for integrity protection - * @param replace Should an already installed SA be updated? + * @param this calling object + * @param src source address for this SA + * @param dst destination address for this SA + * @param spi SPI allocated by us or remote peer + * @param protocol protocol for this SA (ESP/AH) + * @param reqid unique ID for this SA + * @param expire_soft lifetime in seconds before rekeying + * @param expire_hard lieftime in seconds before delete + * @param enc_alg Algorithm to use for encryption (ESP only) + * @param enc_key Key to use for encryption + * @param int_alg Algorithm to use for integrity protection + * @param int_key Key for integrity protection + * @param replace Should an already installed SA be updated? * @return - * - SUCCESS - * - FAILED if kernel comm failed + * - SUCCESS + * - FAILED if kernel comm failed */ status_t (*add_sa)(kernel_interface_t *this, host_t *src, host_t *dst, u_int32_t spi, protocol_id_t protocol, u_int32_t reqid, + u_int64_t expire_soft, + u_int64_t expire_hard, encryption_algorithm_t enc_alg, chunk_t enc_key, integrity_algorithm_t int_alg, diff --git a/src/charon/threads/thread_pool.c b/src/charon/threads/thread_pool.c index 51d29c222..83771ceb5 100644 --- a/src/charon/threads/thread_pool.c +++ b/src/charon/threads/thread_pool.c @@ -48,69 +48,8 @@ struct private_thread_pool_t { * Public thread_pool_t interface. */ thread_pool_t public; - - /** - * @brief Main processing function for worker threads. - * - * Gets a job from the job queue and calls corresponding - * function for processing. - * - * @param this calling object - */ - void (*process_jobs) (private_thread_pool_t *this); - - /** - * @brief Process a INCOMING_PACKET job. - * - * @param this calling object - * @param job incoming_packet_job_t object - */ - void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job); - - /** - * @brief Process a INITIATE_IKE_SA job. - * - * @param this calling object - * @param job initiate_ike_sa_job_t object - */ - void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job); - - /** - * @brief Process a DELETE_HALF_OPEN_IKE_SA job. - * - * @param this calling object - * @param job delete__half_open_ike_sa_job_t object - */ - void (*process_delete_half_open_ike_sa_job) (private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job); - - /** - * @brief Process a DELETE_ESTABLISHED_IKE_SA job. - * - * @param this calling object - * @param job delete_established_ike_sa_job_t object - */ - void (*process_delete_established_ike_sa_job) (private_thread_pool_t *this, delete_established_ike_sa_job_t *job); /** - * @brief Process a RETRANSMIT_REQUEST job. - * - * @param this calling object - * @param job retransmit_request_job_t object - */ - void (*process_retransmit_request_job) (private_thread_pool_t *this, retransmit_request_job_t *job); - - /** - * Creates a job of type DELETE_HALF_OPEN_IKE_SA. - * - * This job is used to delete IKE_SA's which are still in state INITIATOR_INIT, - * RESPONDER_INIT, IKE_AUTH_REQUESTED, IKE_INIT_REQUESTED or IKE_INIT_RESPONDED. - * - * @param ike_sa_id ID of IKE_SA to delete - * @param delay Delay in ms after a half open IKE_SA gets deleted! - */ - void (*create_delete_half_open_ike_sa_job) (private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay); - - /** * Number of running threads. */ size_t pool_size; @@ -137,367 +76,27 @@ struct private_thread_pool_t { static void process_jobs(private_thread_pool_t *this) { job_t *job; - job_type_t job_type; - timeval_t start_time; - timeval_t end_time; + status_t status; /* cancellation disabled by default */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); this->worker_logger->log(this->worker_logger, CONTROL, "worker thread running, thread_ID: %06d", (int)pthread_self()); - - for (;;) + + while (TRUE) { job = charon->job_queue->get(charon->job_queue); - job_type = job->get_type(job); - this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Process job of type %s", - mapping_find(job_type_m,job_type)); - gettimeofday(&start_time,NULL); - switch (job_type) - { - case INCOMING_PACKET: - { - this->process_incoming_packet_job(this, (incoming_packet_job_t*)job); - job->destroy(job); - break; - } - case INITIATE_IKE_SA: - { - this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job); - job->destroy(job); - break; - } - case DELETE_HALF_OPEN_IKE_SA: - { - this->process_delete_half_open_ike_sa_job(this, (delete_half_open_ike_sa_job_t*)job); - job->destroy(job); - break; - } - case DELETE_ESTABLISHED_IKE_SA: - { - this->process_delete_established_ike_sa_job(this, (delete_established_ike_sa_job_t*)job); - job->destroy(job); - break; - } - case RETRANSMIT_REQUEST: - { - this->process_retransmit_request_job(this, (retransmit_request_job_t*)job); - break; - } - default: - { - this->worker_logger->log(this->worker_logger, ERROR, "Job of type %s not supported!", - mapping_find(job_type_m,job_type)); - job->destroy(job); - break; - } - } - gettimeofday(&end_time,NULL); - this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Processed job of type %s in %d us", - mapping_find(job_type_m,job_type), - (((end_time.tv_sec - start_time.tv_sec) * 1000000) + (end_time.tv_usec - start_time.tv_usec))); - - - } -} - -/** - * Implementation of private_thread_pool_t.process_incoming_packet_job. - */ -static void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job) -{ - packet_t *packet; - message_t *message; - ike_sa_t *ike_sa; - ike_sa_id_t *ike_sa_id; - status_t status; - - packet = job->get_packet(job); - - message = message_create_from_packet(packet); - status = message->parse_header(message); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "Message header could not be verified!"); - message->destroy(message); - return; - } - - this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Message is a %s %s", - mapping_find(exchange_type_m, message->get_exchange_type(message)), - message->get_request(message) ? "request" : "reply"); - - if ((message->get_major_version(message) != IKE_MAJOR_VERSION) || - (message->get_minor_version(message) != IKE_MINOR_VERSION)) - { - this->worker_logger->log(this->worker_logger, ERROR | LEVEL2, - "IKE version %d.%d not supported", - message->get_major_version(message), - message->get_minor_version(message)); - if ((message->get_exchange_type(message) == IKE_SA_INIT) && (message->get_request(message))) - { - message_t *response; - message->get_ike_sa_id(message, &ike_sa_id); - ike_sa_id->switch_initiator(ike_sa_id); - response = message_create_notify_reply(message->get_destination(message), - message->get_source(message), - IKE_SA_INIT, FALSE, ike_sa_id, - INVALID_MAJOR_VERSION); - message->destroy(message); - ike_sa_id->destroy(ike_sa_id); - status = response->generate(response, NULL, NULL, &packet); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "Could not generate packet from message"); - response->destroy(response); - return; - } - this->worker_logger->log(this->worker_logger, ERROR, "Send notify reply of type INVALID_MAJOR_VERSION"); - charon->send_queue->add(charon->send_queue, packet); - response->destroy(response); - return; - } - message->destroy(message); - return; - } - - message->get_ike_sa_id(message, &ike_sa_id); - - ike_sa_id->switch_initiator(ike_sa_id); - - this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Checking out IKE SA %lld:%lld, role %s", - ike_sa_id->get_initiator_spi(ike_sa_id), - ike_sa_id->get_responder_spi(ike_sa_id), - ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); - - status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa); - if ((status != SUCCESS) && (status != CREATED)) - { - this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out"); - ike_sa_id->destroy(ike_sa_id); - message->destroy(message); + status = job->execute(job); - /* TODO: send notify reply of type INVALID_IKE_SPI if SPI could not be found ? */ - return; - } - - if (status == CREATED) - { - this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, - "Create Job to delete half open IKE_SA."); - this->create_delete_half_open_ike_sa_job(this,ike_sa_id, - charon->configuration->get_half_open_ike_sa_timeout(charon->configuration)); - } - - status = ike_sa->process_message(ike_sa, message); - - this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "%s IKE SA %lld:%lld, role %s", - (status == DESTROY_ME) ? "Checkin and delete" : "Checkin", - ike_sa_id->get_initiator_spi(ike_sa_id), - ike_sa_id->get_responder_spi(ike_sa_id), - ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); - ike_sa_id->destroy(ike_sa_id); - - if (status == DESTROY_ME) - { - status = charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa); - } - else - { - status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); - } - - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!"); - } - message->destroy(message); -} - -/** - * Implementation of private_thread_pool_t.process_initiate_ike_sa_job. - */ -static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job) -{ - /* - * Initiatie an IKE_SA: - * - is defined by a name of a configuration - * - create an empty IKE_SA via manager - * - call initiate_connection on this sa - */ - ike_sa_t *ike_sa; - status_t status; - - - this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Creating and checking out IKE SA"); - charon->ike_sa_manager->create_and_checkout(charon->ike_sa_manager, &ike_sa); - - status = ike_sa->initiate_connection(ike_sa, job->get_connection(job)); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "Initiation returned %s, going to delete IKE_SA.", - mapping_find(status_m, status)); - charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa); - return; - } - - this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Create Job to delete half open IKE_SA."); - this->create_delete_half_open_ike_sa_job(this,ike_sa->get_id(ike_sa), - charon->configuration->get_half_open_ike_sa_timeout(charon->configuration)); - - this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking in IKE SA"); - status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin IKE_SA (%s)", - mapping_find(status_m, status)); - } -} - -/** - * Implementation of private_thread_pool_t.process_delete_ike_sa_job. - */ -static void process_delete_half_open_ike_sa_job(private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job) -{ - ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job); - ike_sa_t *ike_sa; - status_t status; - status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa); - if ((status != SUCCESS) && (status != CREATED)) - { - this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "IKE SA seems to be already deleted"); - return; - } - - switch (ike_sa->get_state(ike_sa)) - { - case INITIATOR_INIT: - case RESPONDER_INIT: - case IKE_SA_INIT_REQUESTED: - case IKE_SA_INIT_RESPONDED: - case IKE_AUTH_REQUESTED: - case DELETE_REQUESTED: - { - /* IKE_SA is half open and gets deleted! */ - status = charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin and delete checked out IKE_SA!"); - } - break; - } - default: + if (status == DESTROY_ME) { - /* IKE_SA is established and so is not getting deleted! */ - status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin a checked out IKE_SA!"); - } - break; + job->destroy(job); } } } /** - * Implementation of private_thread_pool_t.process_delete_established_ike_sa_job. - */ -static void process_delete_established_ike_sa_job(private_thread_pool_t *this, delete_established_ike_sa_job_t *job) -{ - ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job); - ike_sa_t *ike_sa; - status_t status; - status = charon->ike_sa_manager->delete(charon->ike_sa_manager, ike_sa_id); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, CONTROL, "IKE SA didn't exist anymore"); - return; - } -} - -/** - * Implementation of private_thread_pool_t.process_retransmit_request_job. - */ -static void process_retransmit_request_job(private_thread_pool_t *this, retransmit_request_job_t *job) -{ - - ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job); - u_int32_t message_id = job->get_message_id(job); - bool stop_retransmitting = FALSE; - u_int32_t timeout; - ike_sa_t *ike_sa; - status_t status; - - this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking out IKE SA %lld:%lld, role %s", - ike_sa_id->get_initiator_spi(ike_sa_id), - ike_sa_id->get_responder_spi(ike_sa_id), - ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); - - status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa); - if ((status != SUCCESS) && (status != CREATED)) - { - job->destroy(job); - this->worker_logger->log(this->worker_logger, ERROR|LEVEL1, "IKE SA could not be checked out. Already deleted?"); - return; - } - - status = ike_sa->retransmit_request(ike_sa, message_id); - - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Message doesn't have to be retransmitted"); - stop_retransmitting = TRUE; - } - - this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checkin IKE SA %lld:%lld, role %s", - ike_sa_id->get_initiator_spi(ike_sa_id), - ike_sa_id->get_responder_spi(ike_sa_id), - ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); - - status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!"); - } - - if (stop_retransmitting) - { - job->destroy(job); - return; - } - - job->increase_retransmit_count(job); - status = charon->configuration->get_retransmit_timeout (charon->configuration,job->get_retransmit_count(job),&timeout); - if (status != SUCCESS) - { - this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Message will not be anymore retransmitted"); - job->destroy(job); - /* - * TODO delete IKE_SA ? - */ - return; - } - charon->event_queue->add_relative(charon->event_queue,(job_t *) job,timeout); -} - - - -/** - * Implementation of private_thread_pool_t.create_delete_half_open_ike_sa_job. - */ -static void create_delete_half_open_ike_sa_job(private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay) -{ - job_t *delete_job; - - this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Going to create job to delete half open IKE_SA in %d ms", delay); - - delete_job = (job_t *) delete_half_open_ike_sa_job_create(ike_sa_id); - charon->event_queue->add_relative(charon->event_queue,delete_job, delay); -} - - -/** * Implementation of thread_pool_t.get_pool_size. */ static size_t get_pool_size(private_thread_pool_t *this) @@ -528,7 +127,7 @@ static void destroy(private_thread_pool_t *this) this->pool_logger->log(this->pool_logger, ERROR, "could not terminate worker thread #%d", current+1); } } - + /* free mem */ free(this->threads); free(this); @@ -540,33 +139,22 @@ static void destroy(private_thread_pool_t *this) thread_pool_t *thread_pool_create(size_t pool_size) { int current; - private_thread_pool_t *this = malloc_thing(private_thread_pool_t); /* fill in public fields */ this->public.destroy = (void(*)(thread_pool_t*))destroy; this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size; - this->process_jobs = process_jobs; - this->process_initiate_ike_sa_job = process_initiate_ike_sa_job; - this->process_delete_half_open_ike_sa_job = process_delete_half_open_ike_sa_job; - this->process_delete_established_ike_sa_job = process_delete_established_ike_sa_job; - this->process_incoming_packet_job = process_incoming_packet_job; - this->process_retransmit_request_job = process_retransmit_request_job; - this->create_delete_half_open_ike_sa_job = create_delete_half_open_ike_sa_job; - + /* initialze memeber */ this->pool_size = pool_size; - this->threads = malloc(sizeof(pthread_t) * pool_size); - this->pool_logger = logger_manager->get_logger(logger_manager, THREAD_POOL); - this->worker_logger = logger_manager->get_logger(logger_manager, WORKER); /* try to create as many threads as possible, up tu pool_size */ for (current = 0; current < pool_size; current++) { - if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0) + if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))process_jobs, this) == 0) { this->pool_logger->log(this->pool_logger, CONTROL, "created worker thread #%d", current+1); } |