diff options
-rw-r--r-- | src/charon/processing/jobs/callback_job.c | 148 | ||||
-rw-r--r-- | src/charon/processing/jobs/callback_job.h | 5 |
2 files changed, 106 insertions, 47 deletions
diff --git a/src/charon/processing/jobs/callback_job.c b/src/charon/processing/jobs/callback_job.c index 145481c0b..7e35dcdcb 100644 --- a/src/charon/processing/jobs/callback_job.c +++ b/src/charon/processing/jobs/callback_job.c @@ -1,4 +1,5 @@ /* + * Copyright (C) 2009 Tobias Brunner * Copyright (C) 2007 Martin Willi * Hochschule fuer Technik Rapperswil * @@ -15,9 +16,11 @@ #include "callback_job.h" -#include <pthread.h> +#include <semaphore.h> #include <daemon.h> +#include <threading/thread.h> +#include <threading/condvar.h> #include <threading/mutex.h> typedef struct private_callback_job_t private_callback_job_t; @@ -47,9 +50,9 @@ struct private_callback_job_t { callback_job_cleanup_t cleanup; /** - * thread ID of the job, if running + * thread of the job, if running */ - pthread_t thread; + thread_t *thread; /** * mutex to access jobs interna @@ -65,45 +68,69 @@ struct private_callback_job_t { * parent of this job, or NULL */ private_callback_job_t *parent; -}; -/** - * Implements job_t.destroy. - */ -static void destroy(private_callback_job_t *this) -{ - if (this->cleanup) - { - this->cleanup(this->data); - } - this->children->destroy(this->children); - this->mutex->destroy(this->mutex); - free(this); -} + /** + * TRUE if the job got cancelled + */ + bool cancelled; + + /** + * condvar to synchronize the cancellation/destruction of the job + */ + condvar_t *destroyable; + + /** + * semaphore to synchronize the termination of the assigned thread. + * + * separately allocated during cancellation, so that we can wait on it + * without risking that it gets freed too early during destruction. + */ + sem_t *terminated; +}; /** * unregister a child from its parent, if any. + * note: this->mutex has to be locked */ static void unregister(private_callback_job_t *this) { if (this->parent) { - iterator_t *iterator; - private_callback_job_t *child; - this->parent->mutex->lock(this->parent->mutex); - iterator = this->parent->children->create_iterator(this->parent->children, TRUE); - while (iterator->iterate(iterator, (void**)&child)) + if (this->parent->cancelled && !this->cancelled) { - if (child == this) - { - iterator->remove(iterator); - break; - } + /* if the parent has been cancelled but we have not yet, we do not + * unregister until we got cancelled by the parent. */ + this->parent->mutex->unlock(this->parent->mutex); + this->destroyable->wait(this->destroyable, this->mutex); + this->parent->mutex->lock(this->parent->mutex); } - iterator->destroy(iterator); + this->parent->children->remove(this->parent->children, this, NULL); this->parent->mutex->unlock(this->parent->mutex); + this->parent = NULL; + } +} + +/** + * Implements job_t.destroy. + */ +static void destroy(private_callback_job_t *this) +{ + this->mutex->lock(this->mutex); + unregister(this); + if (this->cleanup) + { + this->cleanup(this->data); } + if (this->terminated) + { + sem_post(this->terminated); + } + this->children->destroy(this->children); + this->destroyable->destroy(this->destroyable); + this->mutex->unlock(this->mutex); + this->mutex->destroy(this->mutex); + free(this); } /** @@ -111,23 +138,43 @@ static void unregister(private_callback_job_t *this) */ static void cancel(private_callback_job_t *this) { - pthread_t thread; + callback_job_t *child; + sem_t *terminated = NULL; this->mutex->lock(this->mutex); - thread = this->thread; - - /* terminate its children */ - this->children->invoke_offset(this->children, offsetof(callback_job_t, cancel)); + this->cancelled = TRUE; + /* terminate children */ + while (this->children->get_first(this->children, (void**)&child) == SUCCESS) + { + this->mutex->unlock(this->mutex); + child->cancel(child); + this->mutex->lock(this->mutex); + } + if (this->thread) + { + /* terminate the thread, if there is currently one executing the job. + * we wait for its termination using a semaphore */ + this->thread->cancel(this->thread); + terminated = this->terminated = malloc_thing(sem_t); + sem_init(terminated, 0, 0); + } + else + { + /* if the job is currently queued, it gets terminated later. + * we can't wait, because it might not get executed at all. + * we also unregister the queued job manually from its parent (the + * others get unregistered during destruction) */ + unregister(this); + } + this->destroyable->signal(this->destroyable); this->mutex->unlock(this->mutex); - /* terminate thread */ - if (thread) + if (terminated) { - pthread_cancel(thread); - pthread_join(thread, NULL); + sem_wait(terminated); + sem_destroy(terminated); + free(terminated); } - /* avoid later execution of a cancelled job */ - this->callback = NULL; } /** @@ -137,18 +184,22 @@ static void execute(private_callback_job_t *this) { bool cleanup = FALSE; + thread_cleanup_push((thread_cleanup_t)destroy, this); + this->mutex->lock(this->mutex); - this->thread = pthread_self(); + this->thread = thread_current(); this->mutex->unlock(this->mutex); - pthread_cleanup_push((void*)destroy, this); while (TRUE) { - if (this->callback == NULL) + this->mutex->lock(this->mutex); + if (this->cancelled) { + this->mutex->unlock(this->mutex); cleanup = TRUE; break; } + this->mutex->unlock(this->mutex); switch (this->callback(this->data)) { case JOB_REQUEUE_DIRECT: @@ -168,9 +219,13 @@ static void execute(private_callback_job_t *this) } break; } - this->thread = 0; - unregister(this); - pthread_cleanup_pop(cleanup); + this->mutex->lock(this->mutex); + this->thread = NULL; + this->mutex->unlock(this->mutex); + /* manually create a cancellation point to avoid that a cancelled thread + * goes back into the thread pool */ + thread_cancellation_point(); + thread_cleanup_pop(cleanup); } /* @@ -195,6 +250,9 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, this->thread = 0; this->children = linked_list_create(); this->parent = (private_callback_job_t*)parent; + this->cancelled = FALSE; + this->destroyable = condvar_create(CONDVAR_TYPE_DEFAULT); + this->terminated = NULL; /* register us at parent */ if (parent) diff --git a/src/charon/processing/jobs/callback_job.h b/src/charon/processing/jobs/callback_job.h index 5435bc09c..62da1edd1 100644 --- a/src/charon/processing/jobs/callback_job.h +++ b/src/charon/processing/jobs/callback_job.h @@ -90,8 +90,9 @@ struct callback_job_t { job_t job_interface; /** - * Cancel the jobs thread and wait for its termination. - */ + * Cancel the job's thread and wait for its termination. This only works + * reliably for jobs that always use JOB_REQUEUE_FAIR or JOB_REQUEUE_DIRECT, + * otherwise the job may already be destroyed when cancel is called. */ void (*cancel)(callback_job_t *this); }; |