diff options
Diffstat (limited to 'src/charon/processing')
-rw-r--r-- | src/charon/processing/jobs/callback_job.c | 26 | ||||
-rw-r--r-- | src/charon/processing/processor.c | 66 |
2 files changed, 49 insertions, 43 deletions
diff --git a/src/charon/processing/jobs/callback_job.c b/src/charon/processing/jobs/callback_job.c index 9cc4eeae6..b38094565 100644 --- a/src/charon/processing/jobs/callback_job.c +++ b/src/charon/processing/jobs/callback_job.c @@ -20,6 +20,7 @@ #include <pthread.h> #include <daemon.h> +#include <utils/mutex.h> typedef struct private_callback_job_t private_callback_job_t; @@ -51,12 +52,12 @@ struct private_callback_job_t { * thread ID of the job, if running */ pthread_t thread; - + /** * mutex to access jobs interna */ - pthread_mutex_t mutex; - + mutex_t *mutex; + /** * list of asociated child jobs */ @@ -78,6 +79,7 @@ static void destroy(private_callback_job_t *this) this->cleanup(this->data); } this->children->destroy(this->children); + this->mutex->destroy(this->mutex); free(this); } @@ -91,7 +93,7 @@ static void unregister(private_callback_job_t *this) iterator_t *iterator; private_callback_job_t *child; - pthread_mutex_lock(&this->parent->mutex); + this->parent->mutex->lock(this->parent->mutex); iterator = this->parent->children->create_iterator(this->parent->children, TRUE); while (iterator->iterate(iterator, (void**)&child)) { @@ -102,7 +104,7 @@ static void unregister(private_callback_job_t *this) } } iterator->destroy(iterator); - pthread_mutex_unlock(&this->parent->mutex); + this->parent->mutex->unlock(this->parent->mutex); } } @@ -113,12 +115,12 @@ static void cancel(private_callback_job_t *this) { pthread_t thread; - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); thread = this->thread; /* terminate its children */ this->children->invoke_offset(this->children, offsetof(callback_job_t, cancel)); - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); /* terminate thread */ if (thread) @@ -135,9 +137,9 @@ static void execute(private_callback_job_t *this) { bool cleanup = FALSE; - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); this->thread = pthread_self(); - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); pthread_cleanup_push((void*)destroy, this); while (TRUE) @@ -182,7 +184,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, this->public.cancel = (void(*)(callback_job_t*))cancel; /* private variables */ - pthread_mutex_init(&this->mutex, NULL); + this->mutex = mutex_create(MUTEX_DEFAULT); this->callback = cb; this->data = data; this->cleanup = cleanup; @@ -193,9 +195,9 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, /* register us at parent */ if (parent) { - pthread_mutex_lock(&this->parent->mutex); + this->parent->mutex->lock(this->parent->mutex); this->parent->children->insert_last(this->parent->children, this); - pthread_mutex_unlock(&this->parent->mutex); + this->parent->mutex->unlock(this->parent->mutex); } return &this->public; diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c index 336a28b45..a5f1833c0 100644 --- a/src/charon/processing/processor.c +++ b/src/charon/processing/processor.c @@ -24,6 +24,7 @@ #include "processor.h" #include <daemon.h> +#include <utils/mutex.h> #include <utils/linked_list.h> @@ -61,17 +62,17 @@ struct private_processor_t { /** * access to linked_list is locked through this mutex */ - pthread_mutex_t mutex; + mutex_t *mutex; /** * Condvar to wait for new jobs */ - pthread_cond_t jobadded; + condvar_t *job_added; /** * Condvar to wait for terminated threads */ - pthread_cond_t threadterminated; + condvar_t *thread_terminated; }; static void process_jobs(private_processor_t *this); @@ -85,10 +86,10 @@ static void restart(private_processor_t *this) if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0) { - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); this->total_threads--; - pthread_cond_broadcast(&this->threadterminated); - pthread_mutex_unlock(&this->mutex); + this->thread_terminated->broadcast(this->thread_terminated); + this->mutex->unlock(this->mutex); } } @@ -103,7 +104,7 @@ static void process_jobs(private_processor_t *this) DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self()); - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); while (this->desired_threads >= this->total_threads) { job_t *job; @@ -111,21 +112,21 @@ static void process_jobs(private_processor_t *this) if (this->list->get_count(this->list) == 0) { this->idle_threads++; - pthread_cond_wait(&this->jobadded, &this->mutex); + this->job_added->wait(this->job_added, this->mutex); this->idle_threads--; continue; } this->list->remove_first(this->list, (void**)&job); - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); /* terminated threads are restarted, so we have a constant pool */ pthread_cleanup_push((void*)restart, this); job->execute(job); pthread_cleanup_pop(0); - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); } this->total_threads--; - pthread_cond_signal(&this->threadterminated); - pthread_mutex_unlock(&this->mutex); + this->thread_terminated->signal(this->thread_terminated); + this->mutex->unlock(this->mutex); } /** @@ -134,9 +135,9 @@ static void process_jobs(private_processor_t *this) static u_int get_total_threads(private_processor_t *this) { u_int count; - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); count = this->total_threads; - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); return count; } @@ -146,9 +147,9 @@ static u_int get_total_threads(private_processor_t *this) static u_int get_idle_threads(private_processor_t *this) { u_int count; - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); count = this->idle_threads; - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); return count; } @@ -158,9 +159,9 @@ static u_int get_idle_threads(private_processor_t *this) static u_int get_job_load(private_processor_t *this) { u_int load; - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); load = this->list->get_count(this->list); - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); return load; } @@ -169,10 +170,10 @@ static u_int get_job_load(private_processor_t *this) */ static void queue_job(private_processor_t *this, job_t *job) { - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); this->list->insert_last(this->list, job); - pthread_cond_signal(&this->jobadded); - pthread_mutex_unlock(&this->mutex); + this->job_added->signal(this->job_added); + this->mutex->unlock(this->mutex); } /** @@ -180,7 +181,7 @@ static void queue_job(private_processor_t *this, job_t *job) */ static void set_threads(private_processor_t *this, u_int count) { - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); if (count > this->total_threads) { /* increase thread count */ int i; @@ -200,8 +201,8 @@ static void set_threads(private_processor_t *this, u_int count) { /* decrease thread count */ this->desired_threads = count; } - pthread_cond_broadcast(&this->jobadded); - pthread_mutex_unlock(&this->mutex); + this->job_added->broadcast(this->job_added); + this->mutex->unlock(this->mutex); } /** @@ -210,13 +211,16 @@ static void set_threads(private_processor_t *this, u_int count) static void destroy(private_processor_t *this) { set_threads(this, 0); - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); while (this->total_threads > 0) { - pthread_cond_broadcast(&this->jobadded); - pthread_cond_wait(&this->threadterminated, &this->mutex); + this->job_added->broadcast(this->job_added); + this->thread_terminated->wait(this->thread_terminated, this->mutex); } - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); + this->thread_terminated->destroy(this->thread_terminated); + this->job_added->destroy(this->job_added); + this->mutex->destroy(this->mutex); this->list->destroy_offset(this->list, offsetof(job_t, destroy)); free(this); } @@ -236,9 +240,9 @@ processor_t *processor_create(size_t pool_size) this->public.destroy = (void(*)(processor_t*))destroy; this->list = linked_list_create(); - pthread_mutex_init(&this->mutex, NULL); - pthread_cond_init(&this->jobadded, NULL); - pthread_cond_init(&this->threadterminated, NULL); + this->mutex = mutex_create(MUTEX_DEFAULT); + this->job_added = condvar_create(CONDVAR_DEFAULT); + this->thread_terminated = condvar_create(CONDVAR_DEFAULT); this->total_threads = 0; this->desired_threads = 0; this->idle_threads = 0; |