aboutsummaryrefslogtreecommitdiffstats
path: root/Source/charon/threads/thread_pool.c
diff options
context:
space:
mode:
authorMartin Willi <martin@strongswan.org>2005-11-25 13:42:58 +0000
committerMartin Willi <martin@strongswan.org>2005-11-25 13:42:58 +0000
commitca76df97365ec3421719bfbf9ecd3f33939ad108 (patch)
tree98f14e2944590fd642f3bfab0e03000ed64cde65 /Source/charon/threads/thread_pool.c
parent2a336de4cd0d2f9cbb2547dbe7b4ff278446b89a (diff)
downloadstrongswan-ca76df97365ec3421719bfbf9ecd3f33939ad108.tar.bz2
strongswan-ca76df97365ec3421719bfbf9ecd3f33939ad108.tar.xz
- documentation and cleanup of threads package
Diffstat (limited to 'Source/charon/threads/thread_pool.c')
-rw-r--r--Source/charon/threads/thread_pool.c356
1 files changed, 200 insertions, 156 deletions
diff --git a/Source/charon/threads/thread_pool.c b/Source/charon/threads/thread_pool.c
index 1ed3a20b5..ee8d50bc7 100644
--- a/Source/charon/threads/thread_pool.c
+++ b/Source/charon/threads/thread_pool.c
@@ -1,7 +1,7 @@
/**
* @file thread_pool.c
*
- * @brief Thread pool with some threads processing the job_queue.
+ * @brief Implementation of thread_pool_t.
*
*/
@@ -38,19 +38,45 @@
typedef struct private_thread_pool_t private_thread_pool_t;
/**
- * @brief structure with private members for thread_pool_t
+ * @brief Structure with private members for thread_pool_t.
*/
struct private_thread_pool_t {
/**
* inclusion of public members
*/
thread_pool_t public;
+
+ /**
+ * @brief Main processing functino for worker threads.
+ *
+ * Gets a job from the job queue and calls corresponding
+ * function for processing.
+ *
+ * @param this private_thread_pool_t-Object
+ */
+ void (*process_jobs) (private_thread_pool_t *this);
+
+ /**
+ * @brief Process a INCOMING_PACKET_JOB.
+ *
+ * @param this private_thread_pool_t-Object
+ */
+ void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
+
+ /**
+ * @brief Process a INITIATE_IKE_SA_JOB.
+ *
+ * @param this private_thread_pool_t-Object
+ */
+ void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
+
/**
- * @brief Processing function of a worker thread
+ * @brief Process a DELETE_IKE_SA_JOB.
*
* @param this private_thread_pool_t-Object
*/
- void (*function) (private_thread_pool_t *this);
+ void (*process_delete_ike_sa_job) (private_thread_pool_t *this, delete_ike_sa_job_t *job);
+
/**
* number of running threads
*/
@@ -64,7 +90,7 @@ struct private_thread_pool_t {
*/
logger_t *pool_logger;
/**
- * logger of the threadpool
+ * logger of the worker threads
*/
logger_t *worker_logger;
} ;
@@ -74,9 +100,8 @@ struct private_thread_pool_t {
/**
* implements private_thread_pool_t.function
*/
-static void job_processing(private_thread_pool_t *this)
+static void process_jobs(private_thread_pool_t *this)
{
-
/* cancellation disabled by default */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
this->worker_logger->log(this->worker_logger, CONTROL, "started working");
@@ -87,180 +112,196 @@ static void job_processing(private_thread_pool_t *this)
global_job_queue->get(global_job_queue, &job);
job_type = job->get_type(job);
- this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s", mapping_find(job_type_m,job_type));
+ this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s",
+ mapping_find(job_type_m,job_type));
- /* process them here */
switch (job_type)
{
case INCOMING_PACKET:
{
- packet_t *packet;
- message_t *message;
- ike_sa_t *ike_sa;
- ike_sa_id_t *ike_sa_id;
- status_t status;
- incoming_packet_job_t *incoming_packet_job = (incoming_packet_job_t *)job;
+ this->process_incoming_packet_job(this, (incoming_packet_job_t*)job);
+ break;
+ }
+ case INITIATE_IKE_SA:
+ {
+ this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job);
+ break;
+ }
+ case DELETE_IKE_SA:
+ {
+ this->process_delete_ike_sa_job(this, (delete_ike_sa_job_t*)job);
+ break;
+ }
+ default:
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!",
+ mapping_find(job_type_m,job_type));
+ break;
+ }
+ }
+ job->destroy(job);
+ }
+}
+
+/**
+ * implementation of private_thread_pool_t.process_incoming_packet_job
+ */
+void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job)
+{
+ packet_t *packet;
+ message_t *message;
+ ike_sa_t *ike_sa;
+ ike_sa_id_t *ike_sa_id;
+ status_t status;
- if (incoming_packet_job->get_packet(incoming_packet_job,&packet) != SUCCESS)
- {
- this->worker_logger->log(this->worker_logger, ERROR, "packet in job %s could not be retrieved!",
- mapping_find(job_type_m,job_type));
- break;
- }
+ if (job->get_packet(job,&packet) != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "packet in job could not be retrieved!");
+ return;
+ }
- message = message_create_from_packet(packet);
- if (message == NULL)
- {
- this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!",
- mapping_find(job_type_m,job_type));
- packet->destroy(packet);
- break;
- }
+ message = message_create_from_packet(packet);
+ if (message == NULL)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "message could not be created from packet!");
+ packet->destroy(packet);
+ return;
+ }
- status = message->parse_header(message);
- if (status != SUCCESS)
- {
- this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!");
- message->destroy(message);
- break;
- }
+ status = message->parse_header(message);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!");
+ message->destroy(message);
+ return;
+ }
- this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s",
- mapping_find(exchange_type_m, message->get_exchange_type(message)),
- message->get_request(message) ? "request" : "reply");
+ this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s",
+ mapping_find(exchange_type_m, message->get_exchange_type(message)),
+ message->get_request(message) ? "request" : "reply");
- if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
- (message->get_minor_version(message) != IKE_MINOR_VERSION))
- {
- this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported",
- message->get_major_version(message),
- message->get_minor_version(message));
- /* Todo send notify */
- }
+ if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
+ (message->get_minor_version(message) != IKE_MINOR_VERSION))
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported",
+ message->get_major_version(message),
+ message->get_minor_version(message));
+ /* Todo send notify */
+ }
- status = message->get_ike_sa_id(message, &ike_sa_id);
- if (status != SUCCESS)
- {
- this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!");
- message->destroy(message);
- break;
- }
+ status = message->get_ike_sa_id(message, &ike_sa_id);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "IKE SA ID of message could not be created!");
+ message->destroy(message);
+ return;
+ }
- ike_sa_id->switch_initiator(ike_sa_id);
+ ike_sa_id->switch_initiator(ike_sa_id);
- this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
- ike_sa_id->get_initiator_spi(ike_sa_id),
- ike_sa_id->get_responder_spi(ike_sa_id),
- ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
+ this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
+ ike_sa_id->get_initiator_spi(ike_sa_id),
+ ike_sa_id->get_responder_spi(ike_sa_id),
+ ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
- status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa);
- if (status != SUCCESS)
- {
- this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
- ike_sa_id->destroy(ike_sa_id);
- message->destroy(message);
- break;
- }
+ status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
+ ike_sa_id->destroy(ike_sa_id);
+ message->destroy(message);
+ return;
+ }
- status = ike_sa->process_message(ike_sa, message);
- if (status != SUCCESS)
- {
- this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA");
- }
+ status = ike_sa->process_message(ike_sa, message);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA");
+ }
- this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s",
- ike_sa_id->get_initiator_spi(ike_sa_id),
- ike_sa_id->get_responder_spi(ike_sa_id),
- ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
- ike_sa_id->destroy(ike_sa_id);
+ this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s",
+ ike_sa_id->get_initiator_spi(ike_sa_id),
+ ike_sa_id->get_responder_spi(ike_sa_id),
+ ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
+ ike_sa_id->destroy(ike_sa_id);
- status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
- if (status != SUCCESS)
- {
- this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed");
- }
- message->destroy(message);
- break;
- }
- case INITIATE_IKE_SA:
- {
- /*
- * Initiatie an IKE_SA:
- * - is defined by a name of a configuration
- * - create an empty IKE_SA via manager
- * - call initiate_connection on this sa
- */
- initiate_ike_sa_job_t *initiate_job;
- ike_sa_t *ike_sa;
- status_t status;
-
- initiate_job = (initiate_ike_sa_job_t *)job;
+ status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed");
+ }
+ message->destroy(message);
+}
+
+/**
+ * implementation of private_thread_pool_t.process_initiate_ike_sa_job
+ */
+void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job)
+{
+ /*
+ * Initiatie an IKE_SA:
+ * - is defined by a name of a configuration
+ * - create an empty IKE_SA via manager
+ * - call initiate_connection on this sa
+ */
+ ike_sa_t *ike_sa;
+ status_t status;
+
- this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA");
-
- status = global_ike_sa_manager->create_and_checkout(global_ike_sa_manager, &ike_sa);
- if (status != SUCCESS)
- {
- this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.",
- mapping_find(status_m, status));
- break;
- }
+ this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA");
+ status = global_ike_sa_manager->create_and_checkout(global_ike_sa_manager, &ike_sa);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "%s by checking out new IKE_SA, job rejected.",
+ mapping_find(status_m, status));
+ return;
+ }
- this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"",
- initiate_job->get_configuration_name(initiate_job));
- status = ike_sa->initialize_connection(ike_sa, initiate_job->get_configuration_name(initiate_job));
- if (status != SUCCESS)
- {
- this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.",
- mapping_find(status_m, status));
- global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa);
- break;
- }
- this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA");
- status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
- if (status != SUCCESS)
- {
- this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.",
- mapping_find(status_m, status));
- }
- break;
- }
- case RETRANSMIT_REQUEST:
- {
- this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", mapping_find(job_type_m,job_type));
- break;
- }
-
- case DELETE_IKE_SA:
- {
- delete_ike_sa_job_t *delete_ike_sa_job = (delete_ike_sa_job_t*) job;
- ike_sa_id_t *ike_sa_id = delete_ike_sa_job->get_ike_sa_id(delete_ike_sa_job);
- status_t status;
-
-
- this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s",
- ike_sa_id->get_initiator_spi(ike_sa_id),
- ike_sa_id->get_responder_spi(ike_sa_id),
- ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
-
- status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id);
- if (status != SUCCESS)
- {
- this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)",
- mapping_find(status_m, status));
- }
- break;
+ this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"",
+ job->get_configuration_name(job));
+ status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job));
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.",
+ mapping_find(status_m, status));
+ global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa);
+ return;
+ }
- }
- }
- job->destroy(job);
+ this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA");
+ status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.",
+ mapping_find(status_m, status));
}
+}
+/**
+ * implementation of private_thread_pool_t.process_delete_ike_sa_job
+ */
+void process_delete_ike_sa_job(private_thread_pool_t *this, delete_ike_sa_job_t *job)
+{
+ status_t status;
+ ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
+
+ this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s",
+ ike_sa_id->get_initiator_spi(ike_sa_id),
+ ike_sa_id->get_responder_spi(ike_sa_id),
+ ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
+
+ status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id);
+ if (status != SUCCESS)
+ {
+ this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)",
+ mapping_find(status_m, status));
+ }
}
+
/**
* implementation of thread_pool_t.get_pool_size
*/
@@ -310,7 +351,10 @@ thread_pool_t *thread_pool_create(size_t pool_size)
this->public.destroy = (status_t(*)(thread_pool_t*))destroy;
this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
- this->function = job_processing;
+ this->process_jobs = process_jobs;
+ this->process_initiate_ike_sa_job = process_initiate_ike_sa_job;
+ this->process_delete_ike_sa_job = process_delete_ike_sa_job;
+ this->process_incoming_packet_job = process_incoming_packet_job;
this->pool_size = pool_size;
this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
@@ -338,7 +382,7 @@ thread_pool_t *thread_pool_create(size_t pool_size)
/* try to create as many threads as possible, up tu pool_size */
for (current = 0; current < pool_size; current++)
{
- if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->function, this) == 0)
+ if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
{
this->pool_logger->log(this->pool_logger, CONTROL, "thread %u created", this->threads[current]);
}