diff options
author | Tobias Brunner <tobias@strongswan.org> | 2012-06-19 10:45:17 +0200 |
---|---|---|
committer | Tobias Brunner <tobias@strongswan.org> | 2012-06-25 17:10:28 +0200 |
commit | 7fec83af28f233a02b7ae08c6fd4de65799cb6b4 (patch) | |
tree | c1093faa3be57635e0cb8baf112c6edfaa3001e9 | |
parent | 18d21a57df6085e1738a92d5a352ec17d314a753 (diff) | |
download | strongswan-7fec83af28f233a02b7ae08c6fd4de65799cb6b4.tar.bz2 strongswan-7fec83af28f233a02b7ae08c6fd4de65799cb6b4.tar.xz |
Give processor_t more control over the lifecycle of a job
Jobs are now destroyed by the processor, but they are allowed to
reschedule themselves. That is, parts of the reschedule functionality
already provided by callback_job_t is moved to the processor. Not yet
fully supported is JOB_REQUEUE_DIRECT and canceling jobs.
Note: job_t.destroy() is now called not only for queued jobs but also
after execution or cancellation of jobs. job_t.status can be used to
decide what to do in said method.
26 files changed, 236 insertions, 169 deletions
diff --git a/src/libcharon/control/controller.c b/src/libcharon/control/controller.c index c23bf044e..be691178d 100644 --- a/src/libcharon/control/controller.c +++ b/src/libcharon/control/controller.c @@ -290,7 +290,8 @@ METHOD(listener_t, child_state_change, bool, METHOD(job_t, recheckin, void, interface_job_t *job) { - if (job->listener.ike_sa) + if (job->public.status == JOB_STATUS_QUEUED && + job->listener.ike_sa) { charon->ike_sa_manager->checkin(charon->ike_sa_manager, job->listener.ike_sa); @@ -304,7 +305,7 @@ METHOD(controller_t, create_ike_sa_enumerator, enumerator_t*, wait); } -METHOD(job_t, initiate_execute, void, +METHOD(job_t, initiate_execute, job_requeue_t, interface_job_t *job) { ike_sa_t *ike_sa; @@ -322,7 +323,7 @@ METHOD(job_t, initiate_execute, void, charon->ike_sa_manager, IKE_ANY, TRUE); DESTROY_IF(listener->ike_sa); listener->status = FAILED; - return; + return JOB_REQUEUE_NONE; } listener->ike_sa = ike_sa; @@ -343,6 +344,7 @@ METHOD(job_t, initiate_execute, void, ike_sa); listener->status = FAILED; } + return JOB_REQUEUE_NONE; } METHOD(controller_t, initiate, status_t, @@ -389,7 +391,7 @@ METHOD(controller_t, initiate, status_t, return job.listener.status; } -METHOD(job_t, terminate_ike_execute, void, +METHOD(job_t, terminate_ike_execute, job_requeue_t, interface_job_t *job) { interface_listener_t *listener = &job->listener; @@ -409,6 +411,7 @@ METHOD(job_t, terminate_ike_execute, void, ike_sa); listener->status = SUCCESS; } + return JOB_REQUEUE_NONE; } METHOD(controller_t, terminate_ike, status_t, @@ -466,7 +469,7 @@ METHOD(controller_t, terminate_ike, status_t, return job.listener.status; } -METHOD(job_t, terminate_child_execute, void, +METHOD(job_t, terminate_child_execute, job_requeue_t, interface_job_t *job) { interface_listener_t *listener = &job->listener; @@ -486,6 +489,7 @@ METHOD(job_t, terminate_child_execute, void, ike_sa); listener->status = FAILED; } + return JOB_REQUEUE_NONE; } METHOD(controller_t, terminate_child, status_t, diff --git a/src/libcharon/processing/jobs/acquire_job.c b/src/libcharon/processing/jobs/acquire_job.c index 2d836b002..207f534ba 100644 --- a/src/libcharon/processing/jobs/acquire_job.c +++ b/src/libcharon/processing/jobs/acquire_job.c @@ -53,12 +53,12 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_acquire_job_t *this) { charon->traps->acquire(charon->traps, this->reqid, this->src_ts, this->dst_ts); - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/adopt_children_job.c b/src/libcharon/processing/jobs/adopt_children_job.c index 4ba6e872b..df5b70c0f 100644 --- a/src/libcharon/processing/jobs/adopt_children_job.c +++ b/src/libcharon/processing/jobs/adopt_children_job.c @@ -43,7 +43,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_adopt_children_job_t *this) { identification_t *my_id, *other_id, *xauth; @@ -146,7 +146,7 @@ METHOD(job_t, execute, void, } children->destroy_offset(children, offsetof(child_sa_t, destroy)); } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/delete_child_sa_job.c b/src/libcharon/processing/jobs/delete_child_sa_job.c index ac1dfd663..9afbac02b 100644 --- a/src/libcharon/processing/jobs/delete_child_sa_job.c +++ b/src/libcharon/processing/jobs/delete_child_sa_job.c @@ -57,7 +57,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_delete_child_sa_job_t *this) { ike_sa_t *ike_sa; @@ -75,7 +75,7 @@ METHOD(job_t, execute, void, charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/delete_ike_sa_job.c b/src/libcharon/processing/jobs/delete_ike_sa_job.c index c29b72230..08b41af8c 100644 --- a/src/libcharon/processing/jobs/delete_ike_sa_job.c +++ b/src/libcharon/processing/jobs/delete_ike_sa_job.c @@ -48,7 +48,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_delete_ike_sa_job_t *this) { ike_sa_t *ike_sa; @@ -60,7 +60,7 @@ METHOD(job_t, execute, void, if (ike_sa->get_state(ike_sa) == IKE_PASSIVE) { charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); - return destroy(this); + return JOB_REQUEUE_NONE; } if (this->delete_if_established) { @@ -89,7 +89,7 @@ METHOD(job_t, execute, void, } } } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/dpd_timeout_job.c b/src/libcharon/processing/jobs/dpd_timeout_job.c index 4f427f1d3..91a76bbaf 100644 --- a/src/libcharon/processing/jobs/dpd_timeout_job.c +++ b/src/libcharon/processing/jobs/dpd_timeout_job.c @@ -51,7 +51,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_dpd_timeout_job_t *this) { time_t use_time, current; @@ -86,7 +86,7 @@ METHOD(job_t, execute, void, charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/inactivity_job.c b/src/libcharon/processing/jobs/inactivity_job.c index 55fc0093a..fb80ae6c9 100644 --- a/src/libcharon/processing/jobs/inactivity_job.c +++ b/src/libcharon/processing/jobs/inactivity_job.c @@ -51,7 +51,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_inactivity_job_t *this) { ike_sa_t *ike_sa; @@ -121,10 +121,11 @@ METHOD(job_t, execute, void, charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } } - if (!rescheduled) + if (rescheduled) { - destroy(this); + return JOB_REQUEUE_SCHEDULED; } + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/initiate_mediation_job.c b/src/libcharon/processing/jobs/initiate_mediation_job.c index 610998d88..17ab83053 100644 --- a/src/libcharon/processing/jobs/initiate_mediation_job.c +++ b/src/libcharon/processing/jobs/initiate_mediation_job.c @@ -64,7 +64,7 @@ static bool initiate_callback(private_initiate_mediation_job_t *this, return TRUE; } -METHOD(job_t, initiate, void, +METHOD(job_t, initiate, job_requeue_t, private_initiate_mediation_job_t *this) { ike_sa_t *mediated_sa, *mediation_sa; @@ -93,8 +93,7 @@ METHOD(job_t, initiate, void, mediated_cfg->destroy(mediated_cfg); mediation_cfg->destroy(mediation_cfg); enumerator->destroy(enumerator); - destroy(this); - return; + return JOB_REQUEUE_NONE; } enumerator->destroy(enumerator); @@ -115,8 +114,7 @@ METHOD(job_t, initiate, void, charon->ike_sa_manager->checkin( charon->ike_sa_manager, mediated_sa); } - destroy(this); - return; + return JOB_REQUEUE_NONE; } /* we need an additional reference because initiate consumes one */ mediation_cfg->get_ref(mediation_cfg); @@ -134,8 +132,7 @@ METHOD(job_t, initiate, void, charon->ike_sa_manager->checkin_and_destroy( charon->ike_sa_manager, mediated_sa); } - destroy(this); - return; + return JOB_REQUEUE_NONE; } mediation_cfg->destroy(mediation_cfg); @@ -157,18 +154,17 @@ METHOD(job_t, initiate, void, charon->ike_sa_manager->checkin_and_destroy( charon->ike_sa_manager, mediated_sa); } - destroy(this); - return; + return JOB_REQUEUE_NONE; } charon->ike_sa_manager->checkin(charon->ike_sa_manager, mediation_sa); } mediated_cfg->destroy(mediated_cfg); } - destroy(this); + return JOB_REQUEUE_NONE; } -METHOD(job_t, reinitiate, void, +METHOD(job_t, reinitiate, job_requeue_t, private_initiate_mediation_job_t *this) { ike_sa_t *mediated_sa, *mediation_sa; @@ -205,8 +201,7 @@ METHOD(job_t, reinitiate, void, charon->ike_sa_manager, mediated_sa); } - destroy(this); - return; + return JOB_REQUEUE_NONE; } charon->ike_sa_manager->checkin(charon->ike_sa_manager, mediation_sa); @@ -214,7 +209,7 @@ METHOD(job_t, reinitiate, void, mediated_cfg->destroy(mediated_cfg); } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/mediation_job.c b/src/libcharon/processing/jobs/mediation_job.c index 6f02f2a0a..759aad003 100644 --- a/src/libcharon/processing/jobs/mediation_job.c +++ b/src/libcharon/processing/jobs/mediation_job.c @@ -77,7 +77,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_mediation_job_t *this) { ike_sa_id_t *target_sa_id; @@ -98,8 +98,7 @@ METHOD(job_t, execute, void, DBG1(DBG_JOB, "callback for '%Y' to '%Y' failed", this->source, this->target); charon->ike_sa_manager->checkin(charon->ike_sa_manager, target_sa); - destroy(this); - return; + return JOB_REQUEUE_NONE; } } else @@ -112,8 +111,7 @@ METHOD(job_t, execute, void, this->source, this->target); charon->ike_sa_manager->checkin(charon->ike_sa_manager, target_sa); /* FIXME: notify the initiator */ - destroy(this); - return; + return JOB_REQUEUE_NONE; } } @@ -130,7 +128,7 @@ METHOD(job_t, execute, void, DBG1(DBG_JOB, "mediation between '%Y' and '%Y' failed: " "peer is not online anymore", this->source, this->target); } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/migrate_job.c b/src/libcharon/processing/jobs/migrate_job.c index eb10e2e46..45bac7cf8 100644 --- a/src/libcharon/processing/jobs/migrate_job.c +++ b/src/libcharon/processing/jobs/migrate_job.c @@ -67,7 +67,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_migrate_job_t *this) { ike_sa_t *ike_sa = NULL; @@ -117,7 +117,7 @@ METHOD(job_t, execute, void, { DBG1(DBG_JOB, "no CHILD_SA found with reqid {%d}", this->reqid); } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/process_message_job.c b/src/libcharon/processing/jobs/process_message_job.c index a4924d001..71a2cb45d 100644 --- a/src/libcharon/processing/jobs/process_message_job.c +++ b/src/libcharon/processing/jobs/process_message_job.c @@ -42,7 +42,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_process_message_job_t *this) { ike_sa_t *ike_sa; @@ -59,8 +59,7 @@ METHOD(job_t, execute, void, this->message->get_source(this->message), this->message->get_destination(this->message)); charon->connect_manager->process_check(charon->connect_manager, this->message); - destroy(this); - return; + return JOB_REQUEUE_NONE; } #endif /* ME */ @@ -81,7 +80,7 @@ METHOD(job_t, execute, void, charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/rekey_child_sa_job.c b/src/libcharon/processing/jobs/rekey_child_sa_job.c index 5855f1bc9..1bf8dc0cb 100644 --- a/src/libcharon/processing/jobs/rekey_child_sa_job.c +++ b/src/libcharon/processing/jobs/rekey_child_sa_job.c @@ -51,7 +51,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_rekey_child_sa_job_t *this) { ike_sa_t *ike_sa; @@ -68,7 +68,7 @@ METHOD(job_t, execute, void, ike_sa->rekey_child_sa(ike_sa, this->protocol, this->spi); charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/rekey_ike_sa_job.c b/src/libcharon/processing/jobs/rekey_ike_sa_job.c index 5366195fd..712c7c2c1 100644 --- a/src/libcharon/processing/jobs/rekey_ike_sa_job.c +++ b/src/libcharon/processing/jobs/rekey_ike_sa_job.c @@ -46,7 +46,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_rekey_ike_sa_job_t *this) { ike_sa_t *ike_sa; @@ -78,7 +78,7 @@ METHOD(job_t, execute, void, charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/retransmit_job.c b/src/libcharon/processing/jobs/retransmit_job.c index 050f7005a..48c326804 100644 --- a/src/libcharon/processing/jobs/retransmit_job.c +++ b/src/libcharon/processing/jobs/retransmit_job.c @@ -47,7 +47,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_retransmit_job_t *this) { ike_sa_t *ike_sa; @@ -67,7 +67,7 @@ METHOD(job_t, execute, void, charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/retry_initiate_job.c b/src/libcharon/processing/jobs/retry_initiate_job.c index e6da3e362..1cdc3058a 100644 --- a/src/libcharon/processing/jobs/retry_initiate_job.c +++ b/src/libcharon/processing/jobs/retry_initiate_job.c @@ -41,7 +41,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_retry_initiate_job_t *this) { ike_sa_t *ike_sa; @@ -64,7 +64,7 @@ METHOD(job_t, execute, void, charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/roam_job.c b/src/libcharon/processing/jobs/roam_job.c index 951ac5ad3..0af4c6c39 100644 --- a/src/libcharon/processing/jobs/roam_job.c +++ b/src/libcharon/processing/jobs/roam_job.c @@ -44,7 +44,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_roam_job_t *this) { ike_sa_t *ike_sa; @@ -82,8 +82,7 @@ METHOD(job_t, execute, void, id->destroy(id); } list->destroy(list); - - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/send_dpd_job.c b/src/libcharon/processing/jobs/send_dpd_job.c index ab00d013d..d2f38b803 100644 --- a/src/libcharon/processing/jobs/send_dpd_job.c +++ b/src/libcharon/processing/jobs/send_dpd_job.c @@ -45,7 +45,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_send_dpd_job_t *this) { ike_sa_t *ike_sa; @@ -63,7 +63,7 @@ METHOD(job_t, execute, void, charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/send_keepalive_job.c b/src/libcharon/processing/jobs/send_keepalive_job.c index 5e128d478..3e3477679 100644 --- a/src/libcharon/processing/jobs/send_keepalive_job.c +++ b/src/libcharon/processing/jobs/send_keepalive_job.c @@ -45,7 +45,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_send_keepalive_job_t *this) { ike_sa_t *ike_sa; @@ -57,7 +57,7 @@ METHOD(job_t, execute, void, ike_sa->send_keepalive(ike_sa); charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/start_action_job.c b/src/libcharon/processing/jobs/start_action_job.c index 294ac154a..bc4aaf6d6 100644 --- a/src/libcharon/processing/jobs/start_action_job.c +++ b/src/libcharon/processing/jobs/start_action_job.c @@ -36,7 +36,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_start_action_job_t *this) { enumerator_t *enumerator, *children; @@ -83,7 +83,7 @@ METHOD(job_t, execute, void, children->destroy(children); } enumerator->destroy(enumerator); - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libcharon/processing/jobs/update_sa_job.c b/src/libcharon/processing/jobs/update_sa_job.c index c4f6e4782..694318522 100644 --- a/src/libcharon/processing/jobs/update_sa_job.c +++ b/src/libcharon/processing/jobs/update_sa_job.c @@ -50,7 +50,7 @@ METHOD(job_t, destroy, void, free(this); } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_update_sa_job_t *this) { ike_sa_t *ike_sa; @@ -71,7 +71,7 @@ METHOD(job_t, execute, void, } charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); } - destroy(this); + return JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libstrongswan/processing/jobs/callback_job.c b/src/libstrongswan/processing/jobs/callback_job.c index 452c07ce6..86d5228bf 100644 --- a/src/libstrongswan/processing/jobs/callback_job.c +++ b/src/libstrongswan/processing/jobs/callback_job.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2009 Tobias Brunner + * Copyright (C) 2009-2012 Tobias Brunner * Copyright (C) 2007-2011 Martin Willi * Copyright (C) 2011 revosec AG * Hochschule fuer Technik Rapperswil @@ -56,7 +56,7 @@ struct private_callback_job_t { thread_t *thread; /** - * mutex to access jobs interna + * mutex to access private job data */ mutex_t *mutex; @@ -71,9 +71,9 @@ struct private_callback_job_t { private_callback_job_t *parent; /** - * TRUE if the job got cancelled + * TRUE if the job got canceled */ - bool cancelled; + bool canceled; /** * condvar to synchronize the cancellation/destruction of the job @@ -103,10 +103,10 @@ static void unregister(private_callback_job_t *this) if (this->parent) { this->parent->mutex->lock(this->parent->mutex); - if (this->parent->cancelled && !this->cancelled) + if (this->parent->canceled && !this->canceled) { - /* if the parent has been cancelled but we have not yet, we do not - * unregister until we got cancelled by the parent. */ + /* if the parent has been canceled but we have not yet, we do not + * unregister until we got canceled by the parent. */ this->parent->mutex->unlock(this->parent->mutex); this->destroyable->wait(this->destroyable, this->mutex); this->parent->mutex->lock(this->parent->mutex); @@ -144,7 +144,7 @@ METHOD(callback_job_t, cancel, void, semaphore_t *terminated = NULL; this->mutex->lock(this->mutex); - this->cancelled = TRUE; + this->canceled = TRUE; /* terminate children */ while (this->children->get_first(this->children, (void**)&child) == SUCCESS) { @@ -177,12 +177,10 @@ METHOD(callback_job_t, cancel, void, } } -METHOD(job_t, execute, void, +METHOD(job_t, execute, job_requeue_t, private_callback_job_t *this) { - bool cleanup = FALSE, requeue = FALSE; - - thread_cleanup_push((thread_cleanup_t)destroy, this); + bool requeue = FALSE; this->mutex->lock(this->mutex); this->thread = thread_current(); @@ -191,10 +189,9 @@ METHOD(job_t, execute, void, while (TRUE) { this->mutex->lock(this->mutex); - if (this->cancelled) + if (this->canceled) { this->mutex->unlock(this->mutex); - cleanup = TRUE; break; } this->mutex->unlock(this->mutex); @@ -210,7 +207,6 @@ METHOD(job_t, execute, void, case JOB_REQUEUE_NONE: default: { - cleanup = TRUE; break; } } @@ -219,14 +215,10 @@ METHOD(job_t, execute, void, this->mutex->lock(this->mutex); this->thread = NULL; this->mutex->unlock(this->mutex); - /* manually create a cancellation point to avoid that a cancelled thread - * goes back into the thread pool */ + /* manually create a cancellation point to avoid that a canceled thread + * goes back into the thread pool at all */ thread_cancellation_point(); - if (requeue) - { - lib->processor->queue_job(lib->processor, &this->public.job); - } - thread_cleanup_pop(cleanup); + return requeue ? JOB_REQUEUE_FAIR : JOB_REQUEUE_NONE; } METHOD(job_t, get_priority, job_priority_t, diff --git a/src/libstrongswan/processing/jobs/callback_job.h b/src/libstrongswan/processing/jobs/callback_job.h index 3e92b01c0..ebe5c9c2b 100644 --- a/src/libstrongswan/processing/jobs/callback_job.h +++ b/src/libstrongswan/processing/jobs/callback_job.h @@ -27,33 +27,6 @@ typedef struct callback_job_t callback_job_t; #include <library.h> #include <processing/jobs/job.h> - -typedef enum job_requeue_t job_requeue_t; - -/** - * Job requeueing policy. - * - * The job requeueing policy defines how a job is handled when the callback - * function returns. - */ -enum job_requeue_t { - - /** - * Do not requeue job, destroy it - */ - JOB_REQUEUE_NONE, - - /** - * Reque the job fairly, meaning it has to requeue as any other job - */ - JOB_REQUEUE_FAIR, - - /** - * Reexecute the job directly, without the need of requeueing it - */ - JOB_REQUEUE_DIRECT, -}; - /** * The callback function to use for the callback job. * diff --git a/src/libstrongswan/processing/jobs/job.h b/src/libstrongswan/processing/jobs/job.h index d25cee03e..c3e640065 100644 --- a/src/libstrongswan/processing/jobs/job.h +++ b/src/libstrongswan/processing/jobs/job.h @@ -1,4 +1,5 @@ /* + * Copyright (C) 2012 Tobias Brunner * Copyright (C) 2005-2006 Martin Willi * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil @@ -24,6 +25,8 @@ typedef struct job_t job_t; typedef enum job_priority_t job_priority_t; +typedef enum job_requeue_t job_requeue_t; +typedef enum job_status_t job_status_t; #include <library.h> @@ -48,18 +51,56 @@ enum job_priority_t { extern enum_name_t *job_priority_names; /** + * Job requeueing policy. + * + * The job requeueing policy defines how a job is handled after it has been + * executed. + */ +enum job_requeue_t { + /** Do not requeue job, destroy it */ + JOB_REQUEUE_NONE = 0, + /** Requeue the job fairly, i.e. it is inserted at the end of the queue */ + JOB_REQUEUE_FAIR, + /** Reexecute the job directly, without the need of requeueing it */ + JOB_REQUEUE_DIRECT, + /** For jobs that rescheduled themselves via scheduler_t */ + JOB_REQUEUE_SCHEDULED, +}; + +/** + * Job status + */ +enum job_status_t { + /** The job is queued and has not yet been executed */ + JOB_STATUS_QUEUED = 0, + /** During execution */ + JOB_STATUS_EXECUTING, + /** If the job got canceled */ + JOB_STATUS_CANCELED, + /** The job was executed successfully */ + JOB_STATUS_DONE, +}; + +/** * Job interface as it is stored in the job queue. */ struct job_t { /** + * Status of this job, is modified exclusively by the processor/scheduler + */ + job_status_t status; + + /** * Execute a job. * * The processing facility executes a job using this method. Jobs are - * one-shot, they destroy themself after execution, so don't use a job - * once it has been executed. + * one-shot, they are destroyed after execution (depending on the return + * value here), so don't use a job once it has been queued. + * + * @return policy how to requeue the job */ - void (*execute) (job_t *this); + job_requeue_t (*execute) (job_t *this); /** * Get the priority of a job. @@ -71,8 +112,10 @@ struct job_t { /** * Destroy a job. * - * Is only called whenever a job was not executed (e.g. due daemon shutdown). - * After execution, jobs destroy themself. + * Is called after a job is executed or got canceled. It is also called + * for queued jobs that were never executed. + * + * Use the status of a job to decide what to do during destruction. */ void (*destroy) (job_t *this); }; diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index 222f1a535..69838aa13 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -1,7 +1,7 @@ /* * Copyright (C) 2005-2011 Martin Willi * Copyright (C) 2011 revosec AG - * Copyright (C) 2008-2011 Tobias Brunner + * Copyright (C) 2008-2012 Tobias Brunner * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil * @@ -58,7 +58,7 @@ struct private_processor_t { /** * All threads managed in the pool (including threads that have been - * cancelled, this allows to join them during destruction) + * canceled, this allows to join them later), as worker_thread_t */ linked_list_t *threads; @@ -73,11 +73,6 @@ struct private_processor_t { int prio_threads[JOB_PRIO_MAX]; /** - * Priority of the job executed by a thread - */ - thread_value_t *priority; - - /** * access to job lists is locked through this mutex */ mutex_t *mutex; @@ -93,39 +88,72 @@ struct private_processor_t { condvar_t *thread_terminated; }; -static void process_jobs(private_processor_t *this); /** - * restart a terminated thread + * Worker thread */ -static void restart(private_processor_t *this) -{ +typedef struct { + + /** + * Reference to the processor + */ + private_processor_t *processor; + + /** + * The actual thread + */ thread_t *thread; - DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id()); + /** + * Job currently being executed by this worker thread + */ + job_t *job; - /* respawn thread if required */ - this->mutex->lock(this->mutex); - if (this->desired_threads < this->total_threads || - (thread = thread_create((thread_main_t)process_jobs, this)) == NULL) - { - this->total_threads--; - this->thread_terminated->signal(this->thread_terminated); - } - else - { - this->threads->insert_last(this->threads, thread); - } - this->mutex->unlock(this->mutex); -} + /** + * Priority of the current job + */ + job_priority_t priority; + +} worker_thread_t; + +static void process_jobs(worker_thread_t *worker); /** - * Decrement working thread count of a priority class + * restart a terminated thread */ -static void decrement_working_threads(private_processor_t *this) +static void restart(worker_thread_t *worker) { + private_processor_t *this = worker->processor; + + DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id()); + this->mutex->lock(this->mutex); - this->working_threads[(intptr_t)this->priority->get(this->priority)]--; + /* cleanup worker thread */ + this->working_threads[worker->priority]--; + worker->job->status = JOB_STATUS_CANCELED; + worker->job->destroy(worker->job); + worker->job = NULL; + + /* respawn thread if required */ + if (this->desired_threads >= this->total_threads) + { + worker_thread_t *new_worker; + + INIT(new_worker, + .processor = this, + ); + new_worker->thread = thread_create((thread_main_t)process_jobs, + new_worker); + if (new_worker->thread) + { + this->threads->insert_last(this->threads, new_worker); + this->mutex->unlock(this->mutex); + return; + } + free(new_worker); + } + this->total_threads--; + this->thread_terminated->signal(this->thread_terminated); this->mutex->unlock(this->mutex); } @@ -147,9 +175,11 @@ static u_int get_idle_threads_nolock(private_processor_t *this) /** * Process queued jobs, called by the worker threads */ -static void process_jobs(private_processor_t *this) +static void process_jobs(worker_thread_t *worker) { - /* worker threads are not cancellable by default */ + private_processor_t *this = worker->processor; + + /* worker threads are not cancelable by default */ thread_cancelability(FALSE); DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id()); @@ -157,7 +187,6 @@ static void process_jobs(private_processor_t *this) this->mutex->lock(this->mutex); while (this->desired_threads >= this->total_threads) { - job_t *job = NULL; int i, reserved = 0, idle; idle = get_idle_threads_nolock(this); @@ -176,27 +205,52 @@ static void process_jobs(private_processor_t *this) reserved += this->prio_threads[i] - this->working_threads[i]; } if (this->jobs[i]->remove_first(this->jobs[i], - (void**)&job) == SUCCESS) + (void**)&worker->job) == SUCCESS) { + job_requeue_t requeue; + this->working_threads[i]++; + worker->job->status = JOB_STATUS_EXECUTING; + worker->priority = i; this->mutex->unlock(this->mutex); - this->priority->set(this->priority, (void*)(intptr_t)i); - /* terminated threads are restarted to get a constant pool */ - thread_cleanup_push((thread_cleanup_t)restart, this); - thread_cleanup_push((thread_cleanup_t)decrement_working_threads, - this); - job->execute(job); - thread_cleanup_pop(FALSE); + /* canceled threads are restarted to get a constant pool */ + thread_cleanup_push((thread_cleanup_t)restart, worker); + while (TRUE) + { + requeue = worker->job->execute(worker->job); + if (requeue != JOB_REQUEUE_DIRECT) + { + break; + } + } thread_cleanup_pop(FALSE); this->mutex->lock(this->mutex); this->working_threads[i]--; + switch (requeue) + { + case JOB_REQUEUE_NONE: + worker->job->status = JOB_STATUS_DONE; + worker->job->destroy(worker->job); + break; + case JOB_REQUEUE_FAIR: + worker->job->status = JOB_STATUS_QUEUED; + this->jobs[i]->insert_last(this->jobs[i], worker->job); + this->job_added->signal(this->job_added); + break; + case JOB_REQUEUE_SCHEDULED: + worker->job->status = JOB_STATUS_QUEUED; + /* fall-through */ + default: + break; + } break; } } - if (!job) + if (!worker->job) { this->job_added->wait(this->job_added, this->mutex); } + worker->job = NULL; } this->total_threads--; this->thread_terminated->signal(this->thread_terminated); @@ -266,6 +320,8 @@ METHOD(processor_t, queue_job, void, job_priority_t prio; prio = sane_prio(job->get_priority(job)); + job->status = JOB_STATUS_QUEUED; + this->mutex->lock(this->mutex); this->jobs[prio]->insert_last(this->jobs[prio], job); this->job_added->signal(this->job_added); @@ -278,19 +334,26 @@ METHOD(processor_t, set_threads, void, this->mutex->lock(this->mutex); if (count > this->total_threads) { /* increase thread count */ + worker_thread_t *worker; int i; - thread_t *current; this->desired_threads = count; DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads); for (i = this->total_threads; i < count; i++) { - current = thread_create((thread_main_t)process_jobs, this); - if (current) + INIT(worker, + .processor = this, + ); + worker->thread = thread_create((thread_main_t)process_jobs, worker); + if (worker->thread) { - this->threads->insert_last(this->threads, current); + this->threads->insert_last(this->threads, worker); this->total_threads++; } + else + { + free(worker); + } } } else if (count < this->total_threads) @@ -304,7 +367,7 @@ METHOD(processor_t, set_threads, void, METHOD(processor_t, destroy, void, private_processor_t *this) { - thread_t *current; + worker_thread_t *worker; int i; set_threads(this, 0); @@ -315,12 +378,12 @@ METHOD(processor_t, destroy, void, this->thread_terminated->wait(this->thread_terminated, this->mutex); } while (this->threads->remove_first(this->threads, - (void**)¤t) == SUCCESS) + (void**)&worker) == SUCCESS) { - current->join(current); + worker->thread->join(worker->thread); + free(worker); } this->mutex->unlock(this->mutex); - this->priority->destroy(this->priority); this->thread_terminated->destroy(this->thread_terminated); this->job_added->destroy(this->job_added); this->mutex->destroy(this->mutex); @@ -351,7 +414,6 @@ processor_t *processor_create() .destroy = _destroy, }, .threads = linked_list_create(), - .priority = thread_value_create(NULL), .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .job_added = condvar_create(CONDVAR_TYPE_DEFAULT), .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT), diff --git a/src/libstrongswan/processing/processor.h b/src/libstrongswan/processing/processor.h index 5db42c04c..05e88a2cf 100644 --- a/src/libstrongswan/processing/processor.h +++ b/src/libstrongswan/processing/processor.h @@ -51,7 +51,7 @@ struct processor_t { /** * Get the number of threads currently working, per priority class. * - * @param prioritiy to check + * @param priority to check * @return number of threads in priority working */ u_int (*get_working_threads)(processor_t *this, job_priority_t prio); diff --git a/src/libstrongswan/processing/scheduler.c b/src/libstrongswan/processing/scheduler.c index f3cc1164a..979a7139f 100644 --- a/src/libstrongswan/processing/scheduler.c +++ b/src/libstrongswan/processing/scheduler.c @@ -250,6 +250,7 @@ METHOD(scheduler_t, schedule_job_tv, void, event = malloc_thing(event_t); event->job = job; + event->job->status = JOB_STATUS_QUEUED; event->time = tv; this->mutex->lock(this->mutex); |