aboutsummaryrefslogtreecommitdiffstats
path: root/src/charon/processing/processor.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/charon/processing/processor.c')
-rw-r--r--src/charon/processing/processor.c34
1 files changed, 24 insertions, 10 deletions
diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c
index 58203afa3..d5774af26 100644
--- a/src/charon/processing/processor.c
+++ b/src/charon/processing/processor.c
@@ -54,6 +54,12 @@ struct private_processor_t {
u_int idle_threads;
/**
+ * All threads managed in the pool (including threads that have been
+ * cancelled, this allows to join them during destruction)
+ */
+ linked_list_t *threads;
+
+ /**
* The jobs are stored in a linked list
*/
linked_list_t *list;
@@ -83,17 +89,19 @@ static void restart(private_processor_t *this)
{
thread_t *thread;
+ DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id());
+
/* respawn thread if required */
this->mutex->lock(this->mutex);
- if (this->desired_threads == 0 ||
+ if (this->desired_threads < this->total_threads ||
(thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
{
this->total_threads--;
- this->thread_terminated->broadcast(this->thread_terminated);
+ this->thread_terminated->signal(this->thread_terminated);
}
else
{
- thread->detach(thread);
+ this->threads->insert_last(this->threads, thread);
}
this->mutex->unlock(this->mutex);
}
@@ -103,11 +111,10 @@ static void restart(private_processor_t *this)
*/
static void process_jobs(private_processor_t *this)
{
- bool oldstate;
+ /* worker threads are not cancellable by default */
+ thread_cancelability(FALSE);
- oldstate = thread_cancelability(FALSE);
-
- DBG2(DBG_JOB, "started worker thread, thread_ID: %u", thread_current_id());
+ DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id());
this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
@@ -129,9 +136,8 @@ static void process_jobs(private_processor_t *this)
thread_cleanup_pop(FALSE);
this->mutex->lock(this->mutex);
}
- this->total_threads--;
- this->thread_terminated->signal(this->thread_terminated);
this->mutex->unlock(this->mutex);
+ restart(this);
}
/**
@@ -199,7 +205,7 @@ static void set_threads(private_processor_t *this, u_int count)
current = thread_create((thread_main_t)process_jobs, this);
if (current)
{
- current->detach(current);
+ this->threads->insert_last(this->threads, current);
this->total_threads++;
}
}
@@ -217,6 +223,7 @@ static void set_threads(private_processor_t *this, u_int count)
*/
static void destroy(private_processor_t *this)
{
+ thread_t *current;
set_threads(this, 0);
this->mutex->lock(this->mutex);
while (this->total_threads > 0)
@@ -224,11 +231,17 @@ static void destroy(private_processor_t *this)
this->job_added->broadcast(this->job_added);
this->thread_terminated->wait(this->thread_terminated, this->mutex);
}
+ while (this->threads->remove_first(this->threads,
+ (void**)&current) == SUCCESS)
+ {
+ current->join(current);
+ }
this->mutex->unlock(this->mutex);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
this->list->destroy_offset(this->list, offsetof(job_t, destroy));
+ this->threads->destroy(this->threads);
free(this);
}
@@ -247,6 +260,7 @@ processor_t *processor_create(size_t pool_size)
this->public.destroy = (void(*)(processor_t*))destroy;
this->list = linked_list_create();
+ this->threads = linked_list_create();
this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT);
this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT);