From b1b0cce396a039bc7cbd6a5203cf51ac688fb089 Mon Sep 17 00:00:00 2001 From: Tobias Brunner Date: Fri, 28 Jun 2013 16:46:12 +0200 Subject: processor: Simplified the main loop --- src/libstrongswan/processing/processor.c | 236 +++++++++++++++++-------------- 1 file changed, 127 insertions(+), 109 deletions(-) (limited to 'src/libstrongswan/processing/processor.c') diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index d2540706e..605a7af75 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -1,7 +1,7 @@ /* * Copyright (C) 2005-2011 Martin Willi * Copyright (C) 2011 revosec AG - * Copyright (C) 2008-2012 Tobias Brunner + * Copyright (C) 2008-2013 Tobias Brunner * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil * @@ -180,132 +180,150 @@ static u_int get_idle_threads_nolock(private_processor_t *this) } /** - * Process queued jobs, called by the worker threads + * Get a job from any job queue, starting with the highest priority. + * + * this->mutex is expected to be locked. */ -static void process_jobs(worker_thread_t *worker) +static bool get_job(private_processor_t *this, worker_thread_t *worker) { - private_processor_t *this = worker->processor; + int i, reserved = 0, idle; - /* worker threads are not cancelable by default */ - thread_cancelability(FALSE); + idle = get_idle_threads_nolock(this); - DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id()); + for (i = 0; i < JOB_PRIO_MAX; i++) + { + if (reserved && reserved >= idle) + { + DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " + "but %d reserved for higher priorities", + job_priority_names, i, idle, reserved); + /* wait until a job of higher priority gets queued */ + return FALSE; + } + if (this->working_threads[i] < this->prio_threads[i]) + { + reserved += this->prio_threads[i] - this->working_threads[i]; + } + if (this->jobs[i]->remove_first(this->jobs[i], + (void**)&worker->job) == SUCCESS) + { + worker->priority = i; + return TRUE; + } + } + return FALSE; +} - this->mutex->lock(this->mutex); +/** + * Process a single job (provided in worker->job, worker->priority is also + * expected to be set) + * + * this->mutex is expected to be locked. + */ +static void process_job(private_processor_t *this, worker_thread_t *worker) +{ + job_t *to_destroy = NULL; + job_requeue_t requeue; + + this->working_threads[worker->priority]++; + worker->job->status = JOB_STATUS_EXECUTING; + this->mutex->unlock(this->mutex); + /* canceled threads are restarted to get a constant pool */ + thread_cleanup_push((thread_cleanup_t)restart, worker); while (TRUE) { - int i, reserved, idle; - -recheck_queues: - if (this->desired_threads < this->total_threads) + requeue = worker->job->execute(worker->job); + if (requeue.type != JOB_REQUEUE_TYPE_DIRECT) { break; } - idle = get_idle_threads_nolock(this); - reserved = 0; - - for (i = 0; i < JOB_PRIO_MAX; i++) + else if (!worker->job->cancel) + { /* only allow cancelable jobs to requeue directly */ + requeue.type = JOB_REQUEUE_TYPE_FAIR; + break; + } + } + thread_cleanup_pop(FALSE); + this->mutex->lock(this->mutex); + this->working_threads[worker->priority]--; + if (worker->job->status == JOB_STATUS_CANCELED) + { /* job was canceled via a custom cancel() method or did not + * use JOB_REQUEUE_TYPE_DIRECT */ + to_destroy = worker->job; + } + else + { + switch (requeue.type) { - job_t *to_destroy = NULL; - job_requeue_t requeue; - - if (reserved && reserved >= idle) - { - DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " - "but %d reserved for higher priorities", - job_priority_names, i, idle, reserved); - /* go and wait until a job of higher priority gets queued */ - break; - } - if (this->working_threads[i] < this->prio_threads[i]) - { - reserved += this->prio_threads[i] - this->working_threads[i]; - } - if (this->jobs[i]->remove_first(this->jobs[i], - (void**)&worker->job) != SUCCESS) - { /* check next priority queue for a job */ - continue; - } - this->working_threads[i]++; - worker->job->status = JOB_STATUS_EXECUTING; - worker->priority = i; - this->mutex->unlock(this->mutex); - /* canceled threads are restarted to get a constant pool */ - thread_cleanup_push((thread_cleanup_t)restart, worker); - while (TRUE) - { - requeue = worker->job->execute(worker->job); - if (requeue.type != JOB_REQUEUE_TYPE_DIRECT) - { - break; - } - else if (!worker->job->cancel) - { /* only allow cancelable jobs to requeue directly */ - requeue.type = JOB_REQUEUE_TYPE_FAIR; - break; - } - } - thread_cleanup_pop(FALSE); - this->mutex->lock(this->mutex); - this->working_threads[i]--; - if (worker->job->status == JOB_STATUS_CANCELED) - { /* job was canceled via a custom cancel() method or did not - * use JOB_REQUEUE_TYPE_DIRECT */ + case JOB_REQUEUE_TYPE_NONE: + worker->job->status = JOB_STATUS_DONE; to_destroy = worker->job; - } - else - { - switch (requeue.type) + break; + case JOB_REQUEUE_TYPE_FAIR: + worker->job->status = JOB_STATUS_QUEUED; + this->jobs[worker->priority]->insert_last( + this->jobs[worker->priority], worker->job); + this->job_added->signal(this->job_added); + break; + case JOB_REQUEUE_TYPE_SCHEDULE: + /* scheduler_t does not hold its lock when queuing jobs + * so this should be safe without unlocking our mutex */ + switch (requeue.schedule) { - case JOB_REQUEUE_TYPE_NONE: - worker->job->status = JOB_STATUS_DONE; - to_destroy = worker->job; - break; - case JOB_REQUEUE_TYPE_FAIR: - worker->job->status = JOB_STATUS_QUEUED; - this->jobs[i]->insert_last(this->jobs[i], - worker->job); - this->job_added->signal(this->job_added); + case JOB_SCHEDULE: + lib->scheduler->schedule_job(lib->scheduler, + worker->job, requeue.time.rel); break; - case JOB_REQUEUE_TYPE_SCHEDULE: - /* scheduler_t does not hold its lock when queuing jobs - * so this should be safe without unlocking our mutex */ - switch (requeue.schedule) - { - case JOB_SCHEDULE: - lib->scheduler->schedule_job(lib->scheduler, - worker->job, requeue.time.rel); - break; - case JOB_SCHEDULE_MS: - lib->scheduler->schedule_job_ms(lib->scheduler, - worker->job, requeue.time.rel); - break; - case JOB_SCHEDULE_TV: - lib->scheduler->schedule_job_tv(lib->scheduler, - worker->job, requeue.time.abs); - break; - } + case JOB_SCHEDULE_MS: + lib->scheduler->schedule_job_ms(lib->scheduler, + worker->job, requeue.time.rel); break; - default: + case JOB_SCHEDULE_TV: + lib->scheduler->schedule_job_tv(lib->scheduler, + worker->job, requeue.time.abs); break; } - } - /* unset the current job to avoid interference with cancel() when - * destroying the job below */ - worker->job = NULL; - - if (to_destroy) - { /* release mutex to avoid deadlocks if the same lock is required - * during queue_job() and in the destructor called here */ - this->mutex->unlock(this->mutex); - to_destroy->destroy(to_destroy); - this->mutex->lock(this->mutex); - } - /* check the priority queues for another job from the beginning */ - goto recheck_queues; + break; + default: + break; + } + } + /* unset the current job to avoid interference with cancel() when + * destroying the job below */ + worker->job = NULL; + + if (to_destroy) + { /* release mutex to avoid deadlocks if the same lock is required + * during queue_job() and in the destructor called here */ + this->mutex->unlock(this->mutex); + to_destroy->destroy(to_destroy); + this->mutex->lock(this->mutex); + } +} + +/** + * Process queued jobs, called by the worker threads + */ +static void process_jobs(worker_thread_t *worker) +{ + private_processor_t *this = worker->processor; + + /* worker threads are not cancelable by default */ + thread_cancelability(FALSE); + + DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id()); + + this->mutex->lock(this->mutex); + while (this->desired_threads >= this->total_threads) + { + if (get_job(this, worker)) + { + process_job(this, worker); + } + else + { + this->job_added->wait(this->job_added, this->mutex); } - /* wait until a job gets queued */ - this->job_added->wait(this->job_added, this->mutex); } this->total_threads--; this->thread_terminated->signal(this->thread_terminated); -- cgit v1.2.3