aboutsummaryrefslogtreecommitdiffstats
path: root/src/libstrongswan/processing/processor.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstrongswan/processing/processor.c')
-rw-r--r--src/libstrongswan/processing/processor.c73
1 files changed, 54 insertions, 19 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c
index 69838aa13..0f0c192d2 100644
--- a/src/libstrongswan/processing/processor.c
+++ b/src/libstrongswan/processing/processor.c
@@ -88,7 +88,6 @@ struct private_processor_t {
condvar_t *thread_terminated;
};
-
/**
* Worker thread
*/
@@ -222,26 +221,38 @@ static void process_jobs(worker_thread_t *worker)
{
break;
}
+ else if (!worker->job->cancel)
+ { /* only allow cancelable jobs to requeue directly */
+ requeue = JOB_REQUEUE_FAIR;
+ break;
+ }
}
thread_cleanup_pop(FALSE);
this->mutex->lock(this->mutex);
this->working_threads[i]--;
- switch (requeue)
+ if (worker->job->status == JOB_STATUS_CANCELED)
+ { /* job was canceled via a custom cancel() method or did not
+ * use JOB_REQUEUE_TYPE_DIRECT */
+ worker->job->destroy(worker->job);
+ }
+ else
{
- case JOB_REQUEUE_NONE:
- worker->job->status = JOB_STATUS_DONE;
- worker->job->destroy(worker->job);
- break;
- case JOB_REQUEUE_FAIR:
- worker->job->status = JOB_STATUS_QUEUED;
- this->jobs[i]->insert_last(this->jobs[i], worker->job);
- this->job_added->signal(this->job_added);
- break;
- case JOB_REQUEUE_SCHEDULED:
- worker->job->status = JOB_STATUS_QUEUED;
- /* fall-through */
- default:
- break;
+ switch (requeue)
+ {
+ case JOB_REQUEUE_NONE:
+ worker->job->status = JOB_STATUS_DONE;
+ worker->job->destroy(worker->job);
+ break;
+ case JOB_REQUEUE_FAIR:
+ worker->job->status = JOB_STATUS_QUEUED;
+ this->jobs[i]->insert_last(this->jobs[i],
+ worker->job);
+ this->job_added->signal(this->job_added);
+ break;
+ case JOB_REQUEUE_SCHEDULED:
+ default:
+ break;
+ }
}
break;
}
@@ -364,14 +375,29 @@ METHOD(processor_t, set_threads, void,
this->mutex->unlock(this->mutex);
}
-METHOD(processor_t, destroy, void,
+METHOD(processor_t, cancel, void,
private_processor_t *this)
{
+ enumerator_t *enumerator;
worker_thread_t *worker;
- int i;
- set_threads(this, 0);
this->mutex->lock(this->mutex);
+ this->desired_threads = 0;
+ /* cancel potentially blocking jobs */
+ enumerator = this->threads->create_enumerator(this->threads);
+ while (enumerator->enumerate(enumerator, (void**)&worker))
+ {
+ if (worker->job && worker->job->cancel)
+ {
+ worker->job->status = JOB_STATUS_CANCELED;
+ if (!worker->job->cancel(worker->job))
+ { /* job requests to be canceled explicitly, otherwise we assume
+ * the thread terminates itself and can be joined */
+ worker->thread->cancel(worker->thread);
+ }
+ }
+ }
+ enumerator->destroy(enumerator);
while (this->total_threads > 0)
{
this->job_added->broadcast(this->job_added);
@@ -384,6 +410,14 @@ METHOD(processor_t, destroy, void,
free(worker);
}
this->mutex->unlock(this->mutex);
+}
+
+METHOD(processor_t, destroy, void,
+ private_processor_t *this)
+{
+ int i;
+
+ cancel(this);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
@@ -411,6 +445,7 @@ processor_t *processor_create()
.get_job_load = _get_job_load,
.queue_job = _queue_job,
.set_threads = _set_threads,
+ .cancel = _cancel,
.destroy = _destroy,
},
.threads = linked_list_create(),