diff options
Diffstat (limited to 'src/libstrongswan/processing/processor.c')
-rw-r--r-- | src/libstrongswan/processing/processor.c | 73 |
1 files changed, 57 insertions, 16 deletions
diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index 46c49a993..1df8d0a18 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -51,9 +51,9 @@ struct private_processor_t { u_int desired_threads; /** - * Number of threads waiting for work + * Number of threads currently working, for each priority */ - u_int idle_threads; + u_int working_threads[JOB_PRIO_MAX]; /** * All threads managed in the pool (including threads that have been @@ -114,6 +114,29 @@ static void restart(private_processor_t *this) } /** + * Decrement working thread count of a priority class + */ +static void decrement_working_threads(u_int *working_threads) +{ + (*working_threads)--; +} + +/** + * Get number of idle threads, non-locking variant + */ +static u_int get_idle_threads_nolock(private_processor_t *this) +{ + u_int count, i; + + count = this->total_threads; + for (i = 0; i < JOB_PRIO_MAX; i++) + { + count -= this->working_threads[i]; + } + return count; +} + +/** * Process queued jobs, called by the worker threads */ static void process_jobs(private_processor_t *this) @@ -127,37 +150,43 @@ static void process_jobs(private_processor_t *this) while (this->desired_threads >= this->total_threads) { job_t *job = NULL; - int i, prio_sum = 0; + int i, reserved = 0, idle; + + idle = get_idle_threads_nolock(this); for (i = 0; i < JOB_PRIO_MAX; i++) { - if (prio_sum && prio_sum >= this->idle_threads) + if (reserved && reserved >= idle) { DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " "but %d reserved for higher priorities", - job_priority_names, i, this->idle_threads, prio_sum); + job_priority_names, i, idle, reserved); break; } - prio_sum += this->prio_threads[i]; + if (this->working_threads[i] < this->prio_threads[i]) + { + reserved += this->prio_threads[i] - this->working_threads[i]; + } if (this->jobs[i]->remove_first(this->jobs[i], (void**)&job) == SUCCESS) { + this->working_threads[i]++; + this->mutex->unlock(this->mutex); + thread_cleanup_push((thread_cleanup_t)decrement_working_threads, + &this->working_threads[i]); + /* terminated threads are restarted to get a constant pool */ + thread_cleanup_push((thread_cleanup_t)restart, this); + job->execute(job); + thread_cleanup_pop(FALSE); + this->mutex->lock(this->mutex); + thread_cleanup_pop(TRUE); break; } } if (!job) { - this->idle_threads++; this->job_added->wait(this->job_added, this->mutex); - this->idle_threads--; - continue; } - this->mutex->unlock(this->mutex); - /* terminated threads are restarted, so we have a constant pool */ - thread_cleanup_push((thread_cleanup_t)restart, this); - job->execute(job); - thread_cleanup_pop(FALSE); - this->mutex->lock(this->mutex); } this->total_threads--; this->thread_terminated->signal(this->thread_terminated); @@ -181,7 +210,7 @@ METHOD(processor_t, get_idle_threads, u_int, u_int count; this->mutex->lock(this->mutex); - count = this->idle_threads; + count = get_idle_threads_nolock(this); this->mutex->unlock(this->mutex); return count; } @@ -198,6 +227,17 @@ static job_priority_t sane_prio(job_priority_t prio) return prio; } +METHOD(processor_t, get_working_threads, u_int, + private_processor_t *this, job_priority_t prio) +{ + u_int count; + + this->mutex->lock(this->mutex); + count = this->working_threads[sane_prio(prio)]; + this->mutex->unlock(this->mutex); + return count; +} + METHOD(processor_t, get_job_load, u_int, private_processor_t *this, job_priority_t prio) { @@ -293,6 +333,7 @@ processor_t *processor_create() .public = { .get_total_threads = _get_total_threads, .get_idle_threads = _get_idle_threads, + .get_working_threads = _get_working_threads, .get_job_load = _get_job_load, .queue_job = _queue_job, .set_threads = _set_threads, |