diff options
Diffstat (limited to 'src/libstrongswan/processing')
-rw-r--r-- | src/libstrongswan/processing/jobs/callback_job.c | 171 | ||||
-rw-r--r-- | src/libstrongswan/processing/jobs/callback_job.h | 39 | ||||
-rw-r--r-- | src/libstrongswan/processing/jobs/job.h | 22 | ||||
-rw-r--r-- | src/libstrongswan/processing/processor.c | 73 | ||||
-rw-r--r-- | src/libstrongswan/processing/processor.h | 12 | ||||
-rw-r--r-- | src/libstrongswan/processing/scheduler.c | 13 |
6 files changed, 124 insertions, 206 deletions
diff --git a/src/libstrongswan/processing/jobs/callback_job.c b/src/libstrongswan/processing/jobs/callback_job.c index 86d5228bf..a5ddc8ff6 100644 --- a/src/libstrongswan/processing/jobs/callback_job.c +++ b/src/libstrongswan/processing/jobs/callback_job.c @@ -51,42 +51,9 @@ struct private_callback_job_t { callback_job_cleanup_t cleanup; /** - * thread of the job, if running + * cancel function */ - thread_t *thread; - - /** - * mutex to access private job data - */ - mutex_t *mutex; - - /** - * list of associated child jobs - */ - linked_list_t *children; - - /** - * parent of this job, or NULL - */ - private_callback_job_t *parent; - - /** - * TRUE if the job got canceled - */ - bool canceled; - - /** - * condvar to synchronize the cancellation/destruction of the job - */ - condvar_t *destroyable; - - /** - * semaphore to synchronize the termination of the assigned thread. - * - * separately created during cancellation, so that we can wait on it - * without risking that it gets destroyed too early during destruction. - */ - semaphore_t *terminated; + callback_job_cancel_t cancel; /** * Priority of this job @@ -94,131 +61,26 @@ struct private_callback_job_t { job_priority_t prio; }; -/** - * unregister a child from its parent, if any. - * note: this->mutex has to be locked - */ -static void unregister(private_callback_job_t *this) -{ - if (this->parent) - { - this->parent->mutex->lock(this->parent->mutex); - if (this->parent->canceled && !this->canceled) - { - /* if the parent has been canceled but we have not yet, we do not - * unregister until we got canceled by the parent. */ - this->parent->mutex->unlock(this->parent->mutex); - this->destroyable->wait(this->destroyable, this->mutex); - this->parent->mutex->lock(this->parent->mutex); - } - this->parent->children->remove(this->parent->children, this, NULL); - this->parent->mutex->unlock(this->parent->mutex); - this->parent = NULL; - } -} - METHOD(job_t, destroy, void, private_callback_job_t *this) { - this->mutex->lock(this->mutex); - unregister(this); if (this->cleanup) { this->cleanup(this->data); } - if (this->terminated) - { - this->terminated->post(this->terminated); - } - this->children->destroy(this->children); - this->destroyable->destroy(this->destroyable); - this->mutex->unlock(this->mutex); - this->mutex->destroy(this->mutex); free(this); } -METHOD(callback_job_t, cancel, void, +METHOD(job_t, execute, job_requeue_t, private_callback_job_t *this) { - callback_job_t *child; - semaphore_t *terminated = NULL; - - this->mutex->lock(this->mutex); - this->canceled = TRUE; - /* terminate children */ - while (this->children->get_first(this->children, (void**)&child) == SUCCESS) - { - this->mutex->unlock(this->mutex); - child->cancel(child); - this->mutex->lock(this->mutex); - } - if (this->thread) - { - /* terminate the thread, if there is currently one executing the job. - * we wait for its termination using a semaphore */ - this->thread->cancel(this->thread); - terminated = this->terminated = semaphore_create(0); - } - else - { - /* if the job is currently queued, it gets terminated later. - * we can't wait, because it might not get executed at all. - * we also unregister the queued job manually from its parent (the - * others get unregistered during destruction) */ - unregister(this); - } - this->destroyable->signal(this->destroyable); - this->mutex->unlock(this->mutex); - - if (terminated) - { - terminated->wait(terminated); - terminated->destroy(terminated); - } + return this->callback(this->data); } -METHOD(job_t, execute, job_requeue_t, +METHOD(job_t, cancel, bool, private_callback_job_t *this) { - bool requeue = FALSE; - - this->mutex->lock(this->mutex); - this->thread = thread_current(); - this->mutex->unlock(this->mutex); - - while (TRUE) - { - this->mutex->lock(this->mutex); - if (this->canceled) - { - this->mutex->unlock(this->mutex); - break; - } - this->mutex->unlock(this->mutex); - switch (this->callback(this->data)) - { - case JOB_REQUEUE_DIRECT: - continue; - case JOB_REQUEUE_FAIR: - { - requeue = TRUE; - break; - } - case JOB_REQUEUE_NONE: - default: - { - break; - } - } - break; - } - this->mutex->lock(this->mutex); - this->thread = NULL; - this->mutex->unlock(this->mutex); - /* manually create a cancellation point to avoid that a canceled thread - * goes back into the thread pool at all */ - thread_cancellation_point(); - return requeue ? JOB_REQUEUE_FAIR : JOB_REQUEUE_NONE; + return this->cancel(this->data); } METHOD(job_t, get_priority, job_priority_t, @@ -231,8 +93,8 @@ METHOD(job_t, get_priority, job_priority_t, * Described in header. */ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, - callback_job_cleanup_t cleanup, callback_job_t *parent, - job_priority_t prio) + callback_job_cleanup_t cleanup, callback_job_cancel_t cancel, + job_priority_t prio) { private_callback_job_t *this; @@ -243,24 +105,17 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, .get_priority = _get_priority, .destroy = _destroy, }, - .cancel = _cancel, }, - .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .callback = cb, .data = data, .cleanup = cleanup, - .children = linked_list_create(), - .parent = (private_callback_job_t*)parent, - .destroyable = condvar_create(CONDVAR_TYPE_DEFAULT), + .cancel = cancel, .prio = prio, ); - /* register us at parent */ - if (parent) + if (cancel) { - this->parent->mutex->lock(this->parent->mutex); - this->parent->children->insert_last(this->parent->children, this); - this->parent->mutex->unlock(this->parent->mutex); + this->public.job.cancel = _cancel; } return &this->public; @@ -271,8 +126,8 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, */ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, callback_job_cleanup_t cleanup, - callback_job_t *parent) + callback_job_cancel_t cancel) { - return callback_job_create_with_prio(cb, data, cleanup, parent, + return callback_job_create_with_prio(cb, data, cleanup, cancel, JOB_PRIO_MEDIUM); } diff --git a/src/libstrongswan/processing/jobs/callback_job.h b/src/libstrongswan/processing/jobs/callback_job.h index ebe5c9c2b..6f2e39eb8 100644 --- a/src/libstrongswan/processing/jobs/callback_job.h +++ b/src/libstrongswan/processing/jobs/callback_job.h @@ -1,4 +1,5 @@ /* + * Copyright (C) 2012 Tobias Brunner * Copyright (C) 2007-2011 Martin Willi * Copyright (C) 2011 revosec AG * Hochschule fuer Technik Rapperswil @@ -46,11 +47,22 @@ typedef job_requeue_t (*callback_job_cb_t)(void *data); * to supply to the constructor. * * @param data param supplied to job - * @return requeing policy how to requeue the job */ typedef void (*callback_job_cleanup_t)(void *data); /** + * Cancellation function to use for the callback job. + * + * Optional function to be called when a job has to be canceled. + * + * See job_t.cancel() for details on the return value. + * + * @param data param supplied to job + * @return TRUE if canceled, FALSE to explicitly cancel the thread + */ +typedef bool (*callback_job_cancel_t)(void *data); + +/** * Class representing an callback Job. * * This is a special job which allows a simple callback function to @@ -64,14 +76,6 @@ struct callback_job_t { */ job_t job; - /** - * Cancel the job's thread and wait for its termination. - * - * This only works reliably for jobs that always use JOB_REQUEUE_FAIR or - * JOB_REQUEUE_DIRECT, otherwise the job may already be destroyed when - * cancel is called. - */ - void (*cancel)(callback_job_t *this); }; /** @@ -79,19 +83,20 @@ struct callback_job_t { * * The cleanup function is called when the job gets destroyed to destroy * the associated data. - * If parent is not NULL, the specified job gets an association. Whenever - * the parent gets cancelled (or runs out), all of its children are cancelled, - * too. + * + * The cancel function is optional and should only be provided if the callback + * function calls potentially blocking functions and/or always returns + * JOB_REQUEUE_DIRECT. * * @param cb callback to call from the processor * @param data user data to supply to callback * @param cleanup destructor for data on destruction, or NULL - * @param parent parent of this job + * @param cancel function to cancel the job, or NULL * @return callback_job_t object */ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, callback_job_cleanup_t cleanup, - callback_job_t *parent); + callback_job_cancel_t cancel); /** * Creates a callback job, with priority. @@ -101,12 +106,12 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, * @param cb callback to call from the processor * @param data user data to supply to callback * @param cleanup destructor for data on destruction, or NULL - * @param parent parent of this job + * @param cancel function to cancel the job, or NULL * @param prio job priority * @return callback_job_t object */ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, - callback_job_cleanup_t cleanup, callback_job_t *parent, - job_priority_t prio); + callback_job_cleanup_t cleanup, callback_job_cancel_t cancel, + job_priority_t prio); #endif /** CALLBACK_JOB_H_ @}*/ diff --git a/src/libstrongswan/processing/jobs/job.h b/src/libstrongswan/processing/jobs/job.h index c3e640065..43bb5430e 100644 --- a/src/libstrongswan/processing/jobs/job.h +++ b/src/libstrongswan/processing/jobs/job.h @@ -103,6 +103,26 @@ struct job_t { job_requeue_t (*execute) (job_t *this); /** + * Cancel a job. + * + * Implementing this method is optional. It allows potentially blocking + * jobs to be canceled during shutdown. + * + * If no special action is to be taken simply return FALSE then the thread + * executing the job will be canceled. If TRUE is returned the job is + * expected to return from execute() itself (i.e. the thread won't be + * canceled explicitly and can still be joined later). + * Jobs that return FALSE have to make sure they provide the appropriate + * cancellation points. + * + * @note Regular jobs that do not block MUST NOT implement this method. + * @note This method could be called even before execute() has been called. + * + * @return FALSE to cancel the thread, TRUE if canceled otherwise + */ + bool (*cancel)(job_t *this); + + /** * Get the priority of a job. * * @return job priority @@ -117,7 +137,7 @@ struct job_t { * * Use the status of a job to decide what to do during destruction. */ - void (*destroy) (job_t *this); + void (*destroy)(job_t *this); }; #endif /** JOB_H_ @}*/ diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index 69838aa13..0f0c192d2 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -88,7 +88,6 @@ struct private_processor_t { condvar_t *thread_terminated; }; - /** * Worker thread */ @@ -222,26 +221,38 @@ static void process_jobs(worker_thread_t *worker) { break; } + else if (!worker->job->cancel) + { /* only allow cancelable jobs to requeue directly */ + requeue = JOB_REQUEUE_FAIR; + break; + } } thread_cleanup_pop(FALSE); this->mutex->lock(this->mutex); this->working_threads[i]--; - switch (requeue) + if (worker->job->status == JOB_STATUS_CANCELED) + { /* job was canceled via a custom cancel() method or did not + * use JOB_REQUEUE_TYPE_DIRECT */ + worker->job->destroy(worker->job); + } + else { - case JOB_REQUEUE_NONE: - worker->job->status = JOB_STATUS_DONE; - worker->job->destroy(worker->job); - break; - case JOB_REQUEUE_FAIR: - worker->job->status = JOB_STATUS_QUEUED; - this->jobs[i]->insert_last(this->jobs[i], worker->job); - this->job_added->signal(this->job_added); - break; - case JOB_REQUEUE_SCHEDULED: - worker->job->status = JOB_STATUS_QUEUED; - /* fall-through */ - default: - break; + switch (requeue) + { + case JOB_REQUEUE_NONE: + worker->job->status = JOB_STATUS_DONE; + worker->job->destroy(worker->job); + break; + case JOB_REQUEUE_FAIR: + worker->job->status = JOB_STATUS_QUEUED; + this->jobs[i]->insert_last(this->jobs[i], + worker->job); + this->job_added->signal(this->job_added); + break; + case JOB_REQUEUE_SCHEDULED: + default: + break; + } } break; } @@ -364,14 +375,29 @@ METHOD(processor_t, set_threads, void, this->mutex->unlock(this->mutex); } -METHOD(processor_t, destroy, void, +METHOD(processor_t, cancel, void, private_processor_t *this) { + enumerator_t *enumerator; worker_thread_t *worker; - int i; - set_threads(this, 0); this->mutex->lock(this->mutex); + this->desired_threads = 0; + /* cancel potentially blocking jobs */ + enumerator = this->threads->create_enumerator(this->threads); + while (enumerator->enumerate(enumerator, (void**)&worker)) + { + if (worker->job && worker->job->cancel) + { + worker->job->status = JOB_STATUS_CANCELED; + if (!worker->job->cancel(worker->job)) + { /* job requests to be canceled explicitly, otherwise we assume + * the thread terminates itself and can be joined */ + worker->thread->cancel(worker->thread); + } + } + } + enumerator->destroy(enumerator); while (this->total_threads > 0) { this->job_added->broadcast(this->job_added); @@ -384,6 +410,14 @@ METHOD(processor_t, destroy, void, free(worker); } this->mutex->unlock(this->mutex); +} + +METHOD(processor_t, destroy, void, + private_processor_t *this) +{ + int i; + + cancel(this); this->thread_terminated->destroy(this->thread_terminated); this->job_added->destroy(this->job_added); this->mutex->destroy(this->mutex); @@ -411,6 +445,7 @@ processor_t *processor_create() .get_job_load = _get_job_load, .queue_job = _queue_job, .set_threads = _set_threads, + .cancel = _cancel, .destroy = _destroy, }, .threads = linked_list_create(), diff --git a/src/libstrongswan/processing/processor.h b/src/libstrongswan/processing/processor.h index 05e88a2cf..94860f5d3 100644 --- a/src/libstrongswan/processing/processor.h +++ b/src/libstrongswan/processing/processor.h @@ -1,4 +1,5 @@ /* + * Copyright (C) 2012 Tobias Brunner * Copyright (C) 2005-2007 Martin Willi * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil @@ -78,14 +79,21 @@ struct processor_t { * * If the number of threads is smaller than number of currently running * threads, thread count is decreased. Use 0 to disable the processor. - * This call blocks if it decreases thread count until threads have - * terminated, so make sure there are not too many blocking jobs. + * + * This call does not block and wait for threads to terminate if the number + * of threads is reduced. Instead use cancel() for that during shutdown. * * @param count number of threads to allocate */ void (*set_threads)(processor_t *this, u_int count); /** + * Sets the number of threads to 0 and cancels all blocking jobs, then waits + * for all threads to be terminated. + */ + void (*cancel)(processor_t *this); + + /** * Destroy a processor object. */ void (*destroy) (processor_t *processor); diff --git a/src/libstrongswan/processing/scheduler.c b/src/libstrongswan/processing/scheduler.c index 979a7139f..c97dbc4be 100644 --- a/src/libstrongswan/processing/scheduler.c +++ b/src/libstrongswan/processing/scheduler.c @@ -68,11 +68,6 @@ struct private_scheduler_t { scheduler_t public; /** - * Job which queues scheduled jobs to the processor. - */ - callback_job_t *job; - - /** * The heap in which the events are stored. */ event_t **heap; @@ -309,7 +304,6 @@ METHOD(scheduler_t, destroy, void, private_scheduler_t *this) { event_t *event; - this->job->cancel(this->job); this->condvar->destroy(this->condvar); this->mutex->destroy(this->mutex); while ((event = remove_event(this)) != NULL) @@ -326,6 +320,7 @@ METHOD(scheduler_t, destroy, void, scheduler_t * scheduler_create() { private_scheduler_t *this; + callback_job_t *job; INIT(this, .public = { @@ -342,9 +337,9 @@ scheduler_t * scheduler_create() this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*)); - this->job = callback_job_create_with_prio((callback_job_cb_t)schedule, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + job = callback_job_create_with_prio((callback_job_cb_t)schedule, this, + NULL, return_false, JOB_PRIO_CRITICAL); + lib->processor->queue_job(lib->processor, (job_t*)job); return &this->public; } |