diff options
Diffstat (limited to 'src/libstrongswan/processing/processor.c')
-rw-r--r-- | src/libstrongswan/processing/processor.c | 73 |
1 files changed, 54 insertions, 19 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index 69838aa13..0f0c192d2 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -88,7 +88,6 @@ struct private_processor_t { condvar_t *thread_terminated; }; - /** * Worker thread */ @@ -222,26 +221,38 @@ static void process_jobs(worker_thread_t *worker) { break; } + else if (!worker->job->cancel) + { /* only allow cancelable jobs to requeue directly */ + requeue = JOB_REQUEUE_FAIR; + break; + } } thread_cleanup_pop(FALSE); this->mutex->lock(this->mutex); this->working_threads[i]--; - switch (requeue) + if (worker->job->status == JOB_STATUS_CANCELED) + { /* job was canceled via a custom cancel() method or did not + * use JOB_REQUEUE_TYPE_DIRECT */ + worker->job->destroy(worker->job); + } + else { - case JOB_REQUEUE_NONE: - worker->job->status = JOB_STATUS_DONE; - worker->job->destroy(worker->job); - break; - case JOB_REQUEUE_FAIR: - worker->job->status = JOB_STATUS_QUEUED; - this->jobs[i]->insert_last(this->jobs[i], worker->job); - this->job_added->signal(this->job_added); - break; - case JOB_REQUEUE_SCHEDULED: - worker->job->status = JOB_STATUS_QUEUED; - /* fall-through */ - default: - break; + switch (requeue) + { + case JOB_REQUEUE_NONE: + worker->job->status = JOB_STATUS_DONE; + worker->job->destroy(worker->job); + break; + case JOB_REQUEUE_FAIR: + worker->job->status = JOB_STATUS_QUEUED; + this->jobs[i]->insert_last(this->jobs[i], + worker->job); + this->job_added->signal(this->job_added); + break; + case JOB_REQUEUE_SCHEDULED: + default: + break; + } } break; } @@ -364,14 +375,29 @@ METHOD(processor_t, set_threads, void, this->mutex->unlock(this->mutex); } -METHOD(processor_t, destroy, void, +METHOD(processor_t, cancel, void, private_processor_t *this) { + enumerator_t *enumerator; worker_thread_t *worker; - int i; - set_threads(this, 0); this->mutex->lock(this->mutex); + this->desired_threads = 0; + /* cancel potentially blocking jobs */ + enumerator = this->threads->create_enumerator(this->threads); + while (enumerator->enumerate(enumerator, (void**)&worker)) + { + if (worker->job && worker->job->cancel) + { + worker->job->status = JOB_STATUS_CANCELED; + if (!worker->job->cancel(worker->job)) + { /* job requests to be canceled explicitly, otherwise we assume + * the thread terminates itself and can be joined */ + worker->thread->cancel(worker->thread); + } + } + } + enumerator->destroy(enumerator); while (this->total_threads > 0) { this->job_added->broadcast(this->job_added); @@ -384,6 +410,14 @@ METHOD(processor_t, destroy, void, free(worker); } this->mutex->unlock(this->mutex); +} + +METHOD(processor_t, destroy, void, + private_processor_t *this) +{ + int i; + + cancel(this); this->thread_terminated->destroy(this->thread_terminated); this->job_added->destroy(this->job_added); this->mutex->destroy(this->mutex); @@ -411,6 +445,7 @@ processor_t *processor_create() .get_job_load = _get_job_load, .queue_job = _queue_job, .set_threads = _set_threads, + .cancel = _cancel, .destroy = _destroy, }, .threads = linked_list_create(), |