diff options
Diffstat (limited to 'src/charon/processing/processor.c')
-rw-r--r-- | src/charon/processing/processor.c | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c index 58203afa3..d5774af26 100644 --- a/src/charon/processing/processor.c +++ b/src/charon/processing/processor.c @@ -54,6 +54,12 @@ struct private_processor_t { u_int idle_threads; /** + * All threads managed in the pool (including threads that have been + * cancelled, this allows to join them during destruction) + */ + linked_list_t *threads; + + /** * The jobs are stored in a linked list */ linked_list_t *list; @@ -83,17 +89,19 @@ static void restart(private_processor_t *this) { thread_t *thread; + DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id()); + /* respawn thread if required */ this->mutex->lock(this->mutex); - if (this->desired_threads == 0 || + if (this->desired_threads < this->total_threads || (thread = thread_create((thread_main_t)process_jobs, this)) == NULL) { this->total_threads--; - this->thread_terminated->broadcast(this->thread_terminated); + this->thread_terminated->signal(this->thread_terminated); } else { - thread->detach(thread); + this->threads->insert_last(this->threads, thread); } this->mutex->unlock(this->mutex); } @@ -103,11 +111,10 @@ static void restart(private_processor_t *this) */ static void process_jobs(private_processor_t *this) { - bool oldstate; + /* worker threads are not cancellable by default */ + thread_cancelability(FALSE); - oldstate = thread_cancelability(FALSE); - - DBG2(DBG_JOB, "started worker thread, thread_ID: %u", thread_current_id()); + DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id()); this->mutex->lock(this->mutex); while (this->desired_threads >= this->total_threads) @@ -129,9 +136,8 @@ static void process_jobs(private_processor_t *this) thread_cleanup_pop(FALSE); this->mutex->lock(this->mutex); } - this->total_threads--; - this->thread_terminated->signal(this->thread_terminated); this->mutex->unlock(this->mutex); + restart(this); } /** @@ -199,7 +205,7 @@ static void set_threads(private_processor_t *this, u_int count) current = thread_create((thread_main_t)process_jobs, this); if (current) { - current->detach(current); + this->threads->insert_last(this->threads, current); this->total_threads++; } } @@ -217,6 +223,7 @@ static void set_threads(private_processor_t *this, u_int count) */ static void destroy(private_processor_t *this) { + thread_t *current; set_threads(this, 0); this->mutex->lock(this->mutex); while (this->total_threads > 0) @@ -224,11 +231,17 @@ static void destroy(private_processor_t *this) this->job_added->broadcast(this->job_added); this->thread_terminated->wait(this->thread_terminated, this->mutex); } + while (this->threads->remove_first(this->threads, + (void**)¤t) == SUCCESS) + { + current->join(current); + } this->mutex->unlock(this->mutex); this->thread_terminated->destroy(this->thread_terminated); this->job_added->destroy(this->job_added); this->mutex->destroy(this->mutex); this->list->destroy_offset(this->list, offsetof(job_t, destroy)); + this->threads->destroy(this->threads); free(this); } @@ -247,6 +260,7 @@ processor_t *processor_create(size_t pool_size) this->public.destroy = (void(*)(processor_t*))destroy; this->list = linked_list_create(); + this->threads = linked_list_create(); this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT); this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT); |