aboutsummaryrefslogtreecommitdiffstats
path: root/src/libstrongswan/processing/processor.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstrongswan/processing/processor.c')
-rw-r--r--src/libstrongswan/processing/processor.c162
1 files changed, 112 insertions, 50 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c
index 222f1a535..69838aa13 100644
--- a/src/libstrongswan/processing/processor.c
+++ b/src/libstrongswan/processing/processor.c
@@ -1,7 +1,7 @@
/*
* Copyright (C) 2005-2011 Martin Willi
* Copyright (C) 2011 revosec AG
- * Copyright (C) 2008-2011 Tobias Brunner
+ * Copyright (C) 2008-2012 Tobias Brunner
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
*
@@ -58,7 +58,7 @@ struct private_processor_t {
/**
* All threads managed in the pool (including threads that have been
- * cancelled, this allows to join them during destruction)
+ * canceled, this allows to join them later), as worker_thread_t
*/
linked_list_t *threads;
@@ -73,11 +73,6 @@ struct private_processor_t {
int prio_threads[JOB_PRIO_MAX];
/**
- * Priority of the job executed by a thread
- */
- thread_value_t *priority;
-
- /**
* access to job lists is locked through this mutex
*/
mutex_t *mutex;
@@ -93,39 +88,72 @@ struct private_processor_t {
condvar_t *thread_terminated;
};
-static void process_jobs(private_processor_t *this);
/**
- * restart a terminated thread
+ * Worker thread
*/
-static void restart(private_processor_t *this)
-{
+typedef struct {
+
+ /**
+ * Reference to the processor
+ */
+ private_processor_t *processor;
+
+ /**
+ * The actual thread
+ */
thread_t *thread;
- DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
+ /**
+ * Job currently being executed by this worker thread
+ */
+ job_t *job;
- /* respawn thread if required */
- this->mutex->lock(this->mutex);
- if (this->desired_threads < this->total_threads ||
- (thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
- {
- this->total_threads--;
- this->thread_terminated->signal(this->thread_terminated);
- }
- else
- {
- this->threads->insert_last(this->threads, thread);
- }
- this->mutex->unlock(this->mutex);
-}
+ /**
+ * Priority of the current job
+ */
+ job_priority_t priority;
+
+} worker_thread_t;
+
+static void process_jobs(worker_thread_t *worker);
/**
- * Decrement working thread count of a priority class
+ * restart a terminated thread
*/
-static void decrement_working_threads(private_processor_t *this)
+static void restart(worker_thread_t *worker)
{
+ private_processor_t *this = worker->processor;
+
+ DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
+
this->mutex->lock(this->mutex);
- this->working_threads[(intptr_t)this->priority->get(this->priority)]--;
+ /* cleanup worker thread */
+ this->working_threads[worker->priority]--;
+ worker->job->status = JOB_STATUS_CANCELED;
+ worker->job->destroy(worker->job);
+ worker->job = NULL;
+
+ /* respawn thread if required */
+ if (this->desired_threads >= this->total_threads)
+ {
+ worker_thread_t *new_worker;
+
+ INIT(new_worker,
+ .processor = this,
+ );
+ new_worker->thread = thread_create((thread_main_t)process_jobs,
+ new_worker);
+ if (new_worker->thread)
+ {
+ this->threads->insert_last(this->threads, new_worker);
+ this->mutex->unlock(this->mutex);
+ return;
+ }
+ free(new_worker);
+ }
+ this->total_threads--;
+ this->thread_terminated->signal(this->thread_terminated);
this->mutex->unlock(this->mutex);
}
@@ -147,9 +175,11 @@ static u_int get_idle_threads_nolock(private_processor_t *this)
/**
* Process queued jobs, called by the worker threads
*/
-static void process_jobs(private_processor_t *this)
+static void process_jobs(worker_thread_t *worker)
{
- /* worker threads are not cancellable by default */
+ private_processor_t *this = worker->processor;
+
+ /* worker threads are not cancelable by default */
thread_cancelability(FALSE);
DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
@@ -157,7 +187,6 @@ static void process_jobs(private_processor_t *this)
this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
{
- job_t *job = NULL;
int i, reserved = 0, idle;
idle = get_idle_threads_nolock(this);
@@ -176,27 +205,52 @@ static void process_jobs(private_processor_t *this)
reserved += this->prio_threads[i] - this->working_threads[i];
}
if (this->jobs[i]->remove_first(this->jobs[i],
- (void**)&job) == SUCCESS)
+ (void**)&worker->job) == SUCCESS)
{
+ job_requeue_t requeue;
+
this->working_threads[i]++;
+ worker->job->status = JOB_STATUS_EXECUTING;
+ worker->priority = i;
this->mutex->unlock(this->mutex);
- this->priority->set(this->priority, (void*)(intptr_t)i);
- /* terminated threads are restarted to get a constant pool */
- thread_cleanup_push((thread_cleanup_t)restart, this);
- thread_cleanup_push((thread_cleanup_t)decrement_working_threads,
- this);
- job->execute(job);
- thread_cleanup_pop(FALSE);
+ /* canceled threads are restarted to get a constant pool */
+ thread_cleanup_push((thread_cleanup_t)restart, worker);
+ while (TRUE)
+ {
+ requeue = worker->job->execute(worker->job);
+ if (requeue != JOB_REQUEUE_DIRECT)
+ {
+ break;
+ }
+ }
thread_cleanup_pop(FALSE);
this->mutex->lock(this->mutex);
this->working_threads[i]--;
+ switch (requeue)
+ {
+ case JOB_REQUEUE_NONE:
+ worker->job->status = JOB_STATUS_DONE;
+ worker->job->destroy(worker->job);
+ break;
+ case JOB_REQUEUE_FAIR:
+ worker->job->status = JOB_STATUS_QUEUED;
+ this->jobs[i]->insert_last(this->jobs[i], worker->job);
+ this->job_added->signal(this->job_added);
+ break;
+ case JOB_REQUEUE_SCHEDULED:
+ worker->job->status = JOB_STATUS_QUEUED;
+ /* fall-through */
+ default:
+ break;
+ }
break;
}
}
- if (!job)
+ if (!worker->job)
{
this->job_added->wait(this->job_added, this->mutex);
}
+ worker->job = NULL;
}
this->total_threads--;
this->thread_terminated->signal(this->thread_terminated);
@@ -266,6 +320,8 @@ METHOD(processor_t, queue_job, void,
job_priority_t prio;
prio = sane_prio(job->get_priority(job));
+ job->status = JOB_STATUS_QUEUED;
+
this->mutex->lock(this->mutex);
this->jobs[prio]->insert_last(this->jobs[prio], job);
this->job_added->signal(this->job_added);
@@ -278,19 +334,26 @@ METHOD(processor_t, set_threads, void,
this->mutex->lock(this->mutex);
if (count > this->total_threads)
{ /* increase thread count */
+ worker_thread_t *worker;
int i;
- thread_t *current;
this->desired_threads = count;
DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads);
for (i = this->total_threads; i < count; i++)
{
- current = thread_create((thread_main_t)process_jobs, this);
- if (current)
+ INIT(worker,
+ .processor = this,
+ );
+ worker->thread = thread_create((thread_main_t)process_jobs, worker);
+ if (worker->thread)
{
- this->threads->insert_last(this->threads, current);
+ this->threads->insert_last(this->threads, worker);
this->total_threads++;
}
+ else
+ {
+ free(worker);
+ }
}
}
else if (count < this->total_threads)
@@ -304,7 +367,7 @@ METHOD(processor_t, set_threads, void,
METHOD(processor_t, destroy, void,
private_processor_t *this)
{
- thread_t *current;
+ worker_thread_t *worker;
int i;
set_threads(this, 0);
@@ -315,12 +378,12 @@ METHOD(processor_t, destroy, void,
this->thread_terminated->wait(this->thread_terminated, this->mutex);
}
while (this->threads->remove_first(this->threads,
- (void**)&current) == SUCCESS)
+ (void**)&worker) == SUCCESS)
{
- current->join(current);
+ worker->thread->join(worker->thread);
+ free(worker);
}
this->mutex->unlock(this->mutex);
- this->priority->destroy(this->priority);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
@@ -351,7 +414,6 @@ processor_t *processor_create()
.destroy = _destroy,
},
.threads = linked_list_create(),
- .priority = thread_value_create(NULL),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
.thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),