aboutsummaryrefslogtreecommitdiffstats
path: root/src/charon/processing
diff options
context:
space:
mode:
Diffstat (limited to 'src/charon/processing')
-rw-r--r--src/charon/processing/jobs/callback_job.c26
-rw-r--r--src/charon/processing/processor.c66
2 files changed, 49 insertions, 43 deletions
diff --git a/src/charon/processing/jobs/callback_job.c b/src/charon/processing/jobs/callback_job.c
index 9cc4eeae6..b38094565 100644
--- a/src/charon/processing/jobs/callback_job.c
+++ b/src/charon/processing/jobs/callback_job.c
@@ -20,6 +20,7 @@
#include <pthread.h>
#include <daemon.h>
+#include <utils/mutex.h>
typedef struct private_callback_job_t private_callback_job_t;
@@ -51,12 +52,12 @@ struct private_callback_job_t {
* thread ID of the job, if running
*/
pthread_t thread;
-
+
/**
* mutex to access jobs interna
*/
- pthread_mutex_t mutex;
-
+ mutex_t *mutex;
+
/**
* list of asociated child jobs
*/
@@ -78,6 +79,7 @@ static void destroy(private_callback_job_t *this)
this->cleanup(this->data);
}
this->children->destroy(this->children);
+ this->mutex->destroy(this->mutex);
free(this);
}
@@ -91,7 +93,7 @@ static void unregister(private_callback_job_t *this)
iterator_t *iterator;
private_callback_job_t *child;
- pthread_mutex_lock(&this->parent->mutex);
+ this->parent->mutex->lock(this->parent->mutex);
iterator = this->parent->children->create_iterator(this->parent->children, TRUE);
while (iterator->iterate(iterator, (void**)&child))
{
@@ -102,7 +104,7 @@ static void unregister(private_callback_job_t *this)
}
}
iterator->destroy(iterator);
- pthread_mutex_unlock(&this->parent->mutex);
+ this->parent->mutex->unlock(this->parent->mutex);
}
}
@@ -113,12 +115,12 @@ static void cancel(private_callback_job_t *this)
{
pthread_t thread;
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
thread = this->thread;
/* terminate its children */
this->children->invoke_offset(this->children, offsetof(callback_job_t, cancel));
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
/* terminate thread */
if (thread)
@@ -135,9 +137,9 @@ static void execute(private_callback_job_t *this)
{
bool cleanup = FALSE;
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
this->thread = pthread_self();
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
pthread_cleanup_push((void*)destroy, this);
while (TRUE)
@@ -182,7 +184,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
this->public.cancel = (void(*)(callback_job_t*))cancel;
/* private variables */
- pthread_mutex_init(&this->mutex, NULL);
+ this->mutex = mutex_create(MUTEX_DEFAULT);
this->callback = cb;
this->data = data;
this->cleanup = cleanup;
@@ -193,9 +195,9 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
/* register us at parent */
if (parent)
{
- pthread_mutex_lock(&this->parent->mutex);
+ this->parent->mutex->lock(this->parent->mutex);
this->parent->children->insert_last(this->parent->children, this);
- pthread_mutex_unlock(&this->parent->mutex);
+ this->parent->mutex->unlock(this->parent->mutex);
}
return &this->public;
diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c
index 336a28b45..a5f1833c0 100644
--- a/src/charon/processing/processor.c
+++ b/src/charon/processing/processor.c
@@ -24,6 +24,7 @@
#include "processor.h"
#include <daemon.h>
+#include <utils/mutex.h>
#include <utils/linked_list.h>
@@ -61,17 +62,17 @@ struct private_processor_t {
/**
* access to linked_list is locked through this mutex
*/
- pthread_mutex_t mutex;
+ mutex_t *mutex;
/**
* Condvar to wait for new jobs
*/
- pthread_cond_t jobadded;
+ condvar_t *job_added;
/**
* Condvar to wait for terminated threads
*/
- pthread_cond_t threadterminated;
+ condvar_t *thread_terminated;
};
static void process_jobs(private_processor_t *this);
@@ -85,10 +86,10 @@ static void restart(private_processor_t *this)
if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0)
{
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
this->total_threads--;
- pthread_cond_broadcast(&this->threadterminated);
- pthread_mutex_unlock(&this->mutex);
+ this->thread_terminated->broadcast(this->thread_terminated);
+ this->mutex->unlock(this->mutex);
}
}
@@ -103,7 +104,7 @@ static void process_jobs(private_processor_t *this)
DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self());
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
{
job_t *job;
@@ -111,21 +112,21 @@ static void process_jobs(private_processor_t *this)
if (this->list->get_count(this->list) == 0)
{
this->idle_threads++;
- pthread_cond_wait(&this->jobadded, &this->mutex);
+ this->job_added->wait(this->job_added, this->mutex);
this->idle_threads--;
continue;
}
this->list->remove_first(this->list, (void**)&job);
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
/* terminated threads are restarted, so we have a constant pool */
pthread_cleanup_push((void*)restart, this);
job->execute(job);
pthread_cleanup_pop(0);
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
}
this->total_threads--;
- pthread_cond_signal(&this->threadterminated);
- pthread_mutex_unlock(&this->mutex);
+ this->thread_terminated->signal(this->thread_terminated);
+ this->mutex->unlock(this->mutex);
}
/**
@@ -134,9 +135,9 @@ static void process_jobs(private_processor_t *this)
static u_int get_total_threads(private_processor_t *this)
{
u_int count;
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
count = this->total_threads;
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
return count;
}
@@ -146,9 +147,9 @@ static u_int get_total_threads(private_processor_t *this)
static u_int get_idle_threads(private_processor_t *this)
{
u_int count;
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
count = this->idle_threads;
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
return count;
}
@@ -158,9 +159,9 @@ static u_int get_idle_threads(private_processor_t *this)
static u_int get_job_load(private_processor_t *this)
{
u_int load;
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
load = this->list->get_count(this->list);
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
return load;
}
@@ -169,10 +170,10 @@ static u_int get_job_load(private_processor_t *this)
*/
static void queue_job(private_processor_t *this, job_t *job)
{
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
this->list->insert_last(this->list, job);
- pthread_cond_signal(&this->jobadded);
- pthread_mutex_unlock(&this->mutex);
+ this->job_added->signal(this->job_added);
+ this->mutex->unlock(this->mutex);
}
/**
@@ -180,7 +181,7 @@ static void queue_job(private_processor_t *this, job_t *job)
*/
static void set_threads(private_processor_t *this, u_int count)
{
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
if (count > this->total_threads)
{ /* increase thread count */
int i;
@@ -200,8 +201,8 @@ static void set_threads(private_processor_t *this, u_int count)
{ /* decrease thread count */
this->desired_threads = count;
}
- pthread_cond_broadcast(&this->jobadded);
- pthread_mutex_unlock(&this->mutex);
+ this->job_added->broadcast(this->job_added);
+ this->mutex->unlock(this->mutex);
}
/**
@@ -210,13 +211,16 @@ static void set_threads(private_processor_t *this, u_int count)
static void destroy(private_processor_t *this)
{
set_threads(this, 0);
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
while (this->total_threads > 0)
{
- pthread_cond_broadcast(&this->jobadded);
- pthread_cond_wait(&this->threadterminated, &this->mutex);
+ this->job_added->broadcast(this->job_added);
+ this->thread_terminated->wait(this->thread_terminated, this->mutex);
}
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
+ this->thread_terminated->destroy(this->thread_terminated);
+ this->job_added->destroy(this->job_added);
+ this->mutex->destroy(this->mutex);
this->list->destroy_offset(this->list, offsetof(job_t, destroy));
free(this);
}
@@ -236,9 +240,9 @@ processor_t *processor_create(size_t pool_size)
this->public.destroy = (void(*)(processor_t*))destroy;
this->list = linked_list_create();
- pthread_mutex_init(&this->mutex, NULL);
- pthread_cond_init(&this->jobadded, NULL);
- pthread_cond_init(&this->threadterminated, NULL);
+ this->mutex = mutex_create(MUTEX_DEFAULT);
+ this->job_added = condvar_create(CONDVAR_DEFAULT);
+ this->thread_terminated = condvar_create(CONDVAR_DEFAULT);
this->total_threads = 0;
this->desired_threads = 0;
this->idle_threads = 0;