aboutsummaryrefslogtreecommitdiffstats
path: root/src/libstrongswan/processing/processor.c
diff options
context:
space:
mode:
authorMartin Willi <martin@revosec.ch>2011-05-04 15:32:31 +0200
committerMartin Willi <martin@revosec.ch>2011-05-16 15:24:14 +0200
commit877fdcf0b89a9eb242f5eb8a0615e449154e6765 (patch)
treefafaf0d96bcb83ea5f64c24f047b7a2178d2a38a /src/libstrongswan/processing/processor.c
parenta694b481eed8ffbbbfde899dfdee54a394ec49d8 (diff)
downloadstrongswan-877fdcf0b89a9eb242f5eb8a0615e449154e6765.tar.bz2
strongswan-877fdcf0b89a9eb242f5eb8a0615e449154e6765.tar.xz
Count number of threads active in each class, and reserve threads only if none active
Diffstat (limited to 'src/libstrongswan/processing/processor.c')
-rw-r--r--src/libstrongswan/processing/processor.c73
1 files changed, 57 insertions, 16 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c
index 46c49a993..1df8d0a18 100644
--- a/src/libstrongswan/processing/processor.c
+++ b/src/libstrongswan/processing/processor.c
@@ -51,9 +51,9 @@ struct private_processor_t {
u_int desired_threads;
/**
- * Number of threads waiting for work
+ * Number of threads currently working, for each priority
*/
- u_int idle_threads;
+ u_int working_threads[JOB_PRIO_MAX];
/**
* All threads managed in the pool (including threads that have been
@@ -114,6 +114,29 @@ static void restart(private_processor_t *this)
}
/**
+ * Decrement working thread count of a priority class
+ */
+static void decrement_working_threads(u_int *working_threads)
+{
+ (*working_threads)--;
+}
+
+/**
+ * Get number of idle threads, non-locking variant
+ */
+static u_int get_idle_threads_nolock(private_processor_t *this)
+{
+ u_int count, i;
+
+ count = this->total_threads;
+ for (i = 0; i < JOB_PRIO_MAX; i++)
+ {
+ count -= this->working_threads[i];
+ }
+ return count;
+}
+
+/**
* Process queued jobs, called by the worker threads
*/
static void process_jobs(private_processor_t *this)
@@ -127,37 +150,43 @@ static void process_jobs(private_processor_t *this)
while (this->desired_threads >= this->total_threads)
{
job_t *job = NULL;
- int i, prio_sum = 0;
+ int i, reserved = 0, idle;
+
+ idle = get_idle_threads_nolock(this);
for (i = 0; i < JOB_PRIO_MAX; i++)
{
- if (prio_sum && prio_sum >= this->idle_threads)
+ if (reserved && reserved >= idle)
{
DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
"but %d reserved for higher priorities",
- job_priority_names, i, this->idle_threads, prio_sum);
+ job_priority_names, i, idle, reserved);
break;
}
- prio_sum += this->prio_threads[i];
+ if (this->working_threads[i] < this->prio_threads[i])
+ {
+ reserved += this->prio_threads[i] - this->working_threads[i];
+ }
if (this->jobs[i]->remove_first(this->jobs[i],
(void**)&job) == SUCCESS)
{
+ this->working_threads[i]++;
+ this->mutex->unlock(this->mutex);
+ thread_cleanup_push((thread_cleanup_t)decrement_working_threads,
+ &this->working_threads[i]);
+ /* terminated threads are restarted to get a constant pool */
+ thread_cleanup_push((thread_cleanup_t)restart, this);
+ job->execute(job);
+ thread_cleanup_pop(FALSE);
+ this->mutex->lock(this->mutex);
+ thread_cleanup_pop(TRUE);
break;
}
}
if (!job)
{
- this->idle_threads++;
this->job_added->wait(this->job_added, this->mutex);
- this->idle_threads--;
- continue;
}
- this->mutex->unlock(this->mutex);
- /* terminated threads are restarted, so we have a constant pool */
- thread_cleanup_push((thread_cleanup_t)restart, this);
- job->execute(job);
- thread_cleanup_pop(FALSE);
- this->mutex->lock(this->mutex);
}
this->total_threads--;
this->thread_terminated->signal(this->thread_terminated);
@@ -181,7 +210,7 @@ METHOD(processor_t, get_idle_threads, u_int,
u_int count;
this->mutex->lock(this->mutex);
- count = this->idle_threads;
+ count = get_idle_threads_nolock(this);
this->mutex->unlock(this->mutex);
return count;
}
@@ -198,6 +227,17 @@ static job_priority_t sane_prio(job_priority_t prio)
return prio;
}
+METHOD(processor_t, get_working_threads, u_int,
+ private_processor_t *this, job_priority_t prio)
+{
+ u_int count;
+
+ this->mutex->lock(this->mutex);
+ count = this->working_threads[sane_prio(prio)];
+ this->mutex->unlock(this->mutex);
+ return count;
+}
+
METHOD(processor_t, get_job_load, u_int,
private_processor_t *this, job_priority_t prio)
{
@@ -293,6 +333,7 @@ processor_t *processor_create()
.public = {
.get_total_threads = _get_total_threads,
.get_idle_threads = _get_idle_threads,
+ .get_working_threads = _get_working_threads,
.get_job_load = _get_job_load,
.queue_job = _queue_job,
.set_threads = _set_threads,