diff options
author | Tobias Brunner <tobias@strongswan.org> | 2012-06-19 13:29:09 +0200 |
---|---|---|
committer | Tobias Brunner <tobias@strongswan.org> | 2012-06-25 17:38:59 +0200 |
commit | 26d77eb3e61b2ff929dff96bbb53a5d22d76ce4f (patch) | |
tree | 5d5b7bd7bb2cd6a721c65b82f8ce2a8b3f4fdf22 | |
parent | 7fec83af28f233a02b7ae08c6fd4de65799cb6b4 (diff) | |
download | strongswan-26d77eb3e61b2ff929dff96bbb53a5d22d76ce4f.tar.bz2 strongswan-26d77eb3e61b2ff929dff96bbb53a5d22d76ce4f.tar.xz |
Centralized thread cancellation in processor_t
This ensures that no threads are active when plugins and the rest of the
daemon are unloaded.
callback_job_t was simplified a lot in the process as its main
functionality is now contained in processor_t. The parent-child
relationships were abandoned as these were only needed to simplify job
cancellation.
31 files changed, 261 insertions, 475 deletions
diff --git a/src/charon-nm/nm/nm_backend.c b/src/charon-nm/nm/nm_backend.c index 19382a028..19d095935 100644 --- a/src/charon-nm/nm/nm_backend.c +++ b/src/charon-nm/nm/nm_backend.c @@ -67,6 +67,22 @@ static job_requeue_t run(nm_backend_t *this) return JOB_REQUEUE_NONE; } +/** + * Cancel the GLib Main Event Loop + */ +static bool cancel(nm_backend_t *this) +{ + if (this->loop) + { + if (g_main_loop_is_running(this->loop)) + { + g_main_loop_quit(this->loop); + } + g_main_loop_unref(this->loop); + } + return TRUE; +} + /* * see header file */ @@ -78,14 +94,6 @@ void nm_backend_deinit() { return; } - if (this->loop) - { - if (g_main_loop_is_running(this->loop)) - { - g_main_loop_quit(this->loop); - } - g_main_loop_unref(this->loop); - } if (this->plugin) { g_object_unref(this->plugin); @@ -132,8 +140,8 @@ bool nm_backend_init() charon->keep_cap(charon, CAP_DAC_OVERRIDE); lib->processor->queue_job(lib->processor, - (job_t*)callback_job_create_with_prio((callback_job_cb_t)run, - this, NULL, NULL, JOB_PRIO_CRITICAL)); + (job_t*)callback_job_create_with_prio((callback_job_cb_t)run, this, + NULL, (callback_job_cancel_t)cancel, JOB_PRIO_CRITICAL)); return TRUE; } diff --git a/src/libcharon/daemon.c b/src/libcharon/daemon.c index 60a32339d..8a4b3906f 100644 --- a/src/libcharon/daemon.c +++ b/src/libcharon/daemon.c @@ -113,6 +113,10 @@ static void destroy(private_daemon_t *this) { this->public.sender->flush(this->public.sender); } + + /* cancel all threads and wait for their termination */ + lib->processor->cancel(lib->processor); + DESTROY_IF(this->public.receiver); #ifdef ME DESTROY_IF(this->public.connect_manager); @@ -120,7 +124,6 @@ static void destroy(private_daemon_t *this) #endif /* ME */ /* make sure the cache is clear before unloading plugins */ lib->credmgr->flush_cache(lib->credmgr, CERT_ANY); - /* unload plugins to release threads */ lib->plugins->unload(lib->plugins); #ifdef CAPABILITIES_LIBCAP cap_free(this->caps); diff --git a/src/libcharon/network/receiver.c b/src/libcharon/network/receiver.c index 1e00eb239..f0cb0b2d1 100644 --- a/src/libcharon/network/receiver.c +++ b/src/libcharon/network/receiver.c @@ -55,11 +55,6 @@ struct private_receiver_t { receiver_t public; /** - * Threads job receiving packets - */ - callback_job_t *job; - - /** * current secret to use for cookie calculation */ char secret[SECRET_LENGTH]; @@ -393,8 +388,6 @@ static job_requeue_t receive_packets(private_receiver_t *this) status = charon->socket->receive(charon->socket, &packet); if (status == NOT_SUPPORTED) { - /* the processor destroys this job */ - this->job = NULL; return JOB_REQUEUE_NONE; } else if (status != SUCCESS) @@ -504,10 +497,6 @@ static job_requeue_t receive_packets(private_receiver_t *this) METHOD(receiver_t, destroy, void, private_receiver_t *this) { - if (this->job) - { - this->job->cancel(this->job); - } this->rng->destroy(this->rng); this->hasher->destroy(this->hasher); free(this); @@ -568,9 +557,9 @@ receiver_t *receiver_create() this->rng->get_bytes(this->rng, SECRET_LENGTH, this->secret); memcpy(this->secret_old, this->secret, SECRET_LENGTH); - this->job = callback_job_create_with_prio((callback_job_cb_t)receive_packets, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_packets, + this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/network/sender.c b/src/libcharon/network/sender.c index dc2641f7f..75635d2e3 100644 --- a/src/libcharon/network/sender.c +++ b/src/libcharon/network/sender.c @@ -39,11 +39,6 @@ struct private_sender_t { sender_t public; /** - * Sender threads job. - */ - callback_job_t *job; - - /** * The packets are stored in a linked list */ linked_list_t *list; @@ -164,7 +159,6 @@ METHOD(sender_t, flush, void, METHOD(sender_t, destroy, void, private_sender_t *this) { - this->job->cancel(this->job); this->list->destroy_offset(this->list, offsetof(packet_t, destroy)); this->got->destroy(this->got); this->sent->destroy(this->sent); @@ -189,8 +183,6 @@ sender_t * sender_create() .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .got = condvar_create(CONDVAR_TYPE_DEFAULT), .sent = condvar_create(CONDVAR_TYPE_DEFAULT), - .job = callback_job_create_with_prio((callback_job_cb_t)send_packets, - this, NULL, NULL, JOB_PRIO_CRITICAL), .send_delay = lib->settings->get_int(lib->settings, "%s.send_delay", 0, charon->name), .send_delay_type = lib->settings->get_int(lib->settings, @@ -201,7 +193,9 @@ sender_t * sender_create() "%s.send_delay_response", TRUE, charon->name), ); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)send_packets, + this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/plugins/android/android_service.c b/src/libcharon/plugins/android/android_service.c index f4e7e5097..6ca7407ca 100644 --- a/src/libcharon/plugins/android/android_service.c +++ b/src/libcharon/plugins/android/android_service.c @@ -42,11 +42,6 @@ struct private_android_service_t { ike_sa_t *ike_sa; /** - * job that handles requests from the Android control socket - */ - callback_job_t *job; - - /** * android credentials */ android_creds_t *creds; @@ -384,9 +379,9 @@ android_service_t *android_service_create(android_creds_t *creds) } charon->bus->add_listener(charon->bus, &this->public.listener); - this->job = callback_job_create((callback_job_cb_t)initiate, this, - NULL, NULL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create((callback_job_cb_t)initiate, this, + NULL, NULL)); return &this->public; } diff --git a/src/libcharon/plugins/dhcp/dhcp_socket.c b/src/libcharon/plugins/dhcp/dhcp_socket.c index 091d91c2d..b4e9af788 100644 --- a/src/libcharon/plugins/dhcp/dhcp_socket.c +++ b/src/libcharon/plugins/dhcp/dhcp_socket.c @@ -105,11 +105,6 @@ struct private_dhcp_socket_t { * DHCP server address, or broadcast */ host_t *dst; - - /** - * Callback job receiving DHCP responses - */ - callback_job_t *job; }; /** @@ -613,10 +608,6 @@ static job_requeue_t receive_dhcp(private_dhcp_socket_t *this) METHOD(dhcp_socket_t, destroy, void, private_dhcp_socket_t *this) { - if (this->job) - { - this->job->cancel(this->job); - } while (this->waiting) { this->condvar->signal(this->condvar); @@ -761,9 +752,9 @@ dhcp_socket_t *dhcp_socket_create() return NULL; } - this->job = callback_job_create_with_prio((callback_job_cb_t)receive_dhcp, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_dhcp, + this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/plugins/duplicheck/duplicheck_notify.c b/src/libcharon/plugins/duplicheck/duplicheck_notify.c index b86f1ef3d..3da640efe 100644 --- a/src/libcharon/plugins/duplicheck/duplicheck_notify.c +++ b/src/libcharon/plugins/duplicheck/duplicheck_notify.c @@ -43,11 +43,6 @@ struct private_duplicheck_notify_t { duplicheck_notify_t public; /** - * Callback job dispatching connections - */ - callback_job_t *job; - - /** * Mutex to lock list */ mutex_t *mutex; @@ -167,10 +162,6 @@ METHOD(duplicheck_notify_t, destroy, void, enumerator_t *enumerator; uintptr_t fd; - if (this->job) - { - this->job->cancel(this->job); - } enumerator = this->connected->create_enumerator(this->connected); while (enumerator->enumerate(enumerator, &fd)) { @@ -203,9 +194,9 @@ duplicheck_notify_t *duplicheck_notify_create() destroy(this); return NULL; } - this->job = callback_job_create_with_prio((callback_job_cb_t)receive, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/plugins/eap_radius/eap_radius_dae.c b/src/libcharon/plugins/eap_radius/eap_radius_dae.c index 967c731bf..80da99a0f 100644 --- a/src/libcharon/plugins/eap_radius/eap_radius_dae.c +++ b/src/libcharon/plugins/eap_radius/eap_radius_dae.c @@ -53,11 +53,6 @@ struct private_eap_radius_dae_t { int fd; /** - * Listen job - */ - callback_job_t *job; - - /** * RADIUS shared secret for DAE exchanges */ chunk_t secret; @@ -481,10 +476,6 @@ static bool open_socket(private_eap_radius_dae_t *this) METHOD(eap_radius_dae_t, destroy, void, private_eap_radius_dae_t *this) { - if (this->job) - { - this->job->cancel(this->job); - } if (this->fd != -1) { close(this->fd); @@ -538,9 +529,9 @@ eap_radius_dae_t *eap_radius_dae_create(eap_radius_accounting_t *accounting) return NULL; } - this->job = callback_job_create_with_prio((callback_job_cb_t)receive, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, + this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/plugins/farp/farp_spoofer.c b/src/libcharon/plugins/farp/farp_spoofer.c index 587a3a74e..52b037c19 100644 --- a/src/libcharon/plugins/farp/farp_spoofer.c +++ b/src/libcharon/plugins/farp/farp_spoofer.c @@ -45,11 +45,6 @@ struct private_farp_spoofer_t { farp_listener_t *listener; /** - * Callback job to read ARP requests - */ - callback_job_t *job; - - /** * RAW socket for ARP requests */ int skt; @@ -135,7 +130,6 @@ static job_requeue_t receive_arp(private_farp_spoofer_t *this) METHOD(farp_spoofer_t, destroy, void, private_farp_spoofer_t *this) { - this->job->cancel(this->job); close(this->skt); free(this); } @@ -189,9 +183,9 @@ farp_spoofer_t *farp_spoofer_create(farp_listener_t *listener) return NULL; } - this->job = callback_job_create_with_prio((callback_job_cb_t)receive_arp, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_arp, + this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/plugins/ha/ha_ctl.c b/src/libcharon/plugins/ha/ha_ctl.c index 9c99807ed..32f7d04d9 100644 --- a/src/libcharon/plugins/ha/ha_ctl.c +++ b/src/libcharon/plugins/ha/ha_ctl.c @@ -48,11 +48,6 @@ struct private_ha_ctl_t { * Resynchronization message cache */ ha_cache_t *cache; - - /** - * FIFO reader thread - */ - callback_job_t *job; }; /** @@ -105,7 +100,6 @@ static job_requeue_t dispatch_fifo(private_ha_ctl_t *this) METHOD(ha_ctl_t, destroy, void, private_ha_ctl_t *this) { - this->job->cancel(this->job); free(this); } @@ -141,9 +135,9 @@ ha_ctl_t *ha_ctl_create(ha_segments_t *segments, ha_cache_t *cache) strerror(errno)); } - this->job = callback_job_create_with_prio((callback_job_cb_t)dispatch_fifo, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)dispatch_fifo, + this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/plugins/ha/ha_dispatcher.c b/src/libcharon/plugins/ha/ha_dispatcher.c index de5253b37..98055fa7e 100644 --- a/src/libcharon/plugins/ha/ha_dispatcher.c +++ b/src/libcharon/plugins/ha/ha_dispatcher.c @@ -58,11 +58,6 @@ struct private_ha_dispatcher_t { * HA enabled pool */ ha_attribute_t *attr; - - /** - * Dispatcher job - */ - callback_job_t *job; }; /** @@ -1032,7 +1027,6 @@ static job_requeue_t dispatch(private_ha_dispatcher_t *this) METHOD(ha_dispatcher_t, destroy, void, private_ha_dispatcher_t *this) { - this->job->cancel(this->job); free(this); } @@ -1056,9 +1050,9 @@ ha_dispatcher_t *ha_dispatcher_create(ha_socket_t *socket, .kernel = kernel, .attr = attr, ); - this->job = callback_job_create_with_prio((callback_job_cb_t)dispatch, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)dispatch, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/plugins/ha/ha_segments.c b/src/libcharon/plugins/ha/ha_segments.c index f947970b5..01792624c 100644 --- a/src/libcharon/plugins/ha/ha_segments.c +++ b/src/libcharon/plugins/ha/ha_segments.c @@ -62,11 +62,6 @@ struct private_ha_segments_t { condvar_t *condvar; /** - * Job checking for heartbeats - */ - callback_job_t *job; - - /** * Total number of ClusterIP segments */ u_int count; @@ -82,6 +77,11 @@ struct private_ha_segments_t { u_int node; /** + * Are we checking for heartbeats? + */ + bool heartbeat_active; + + /** * Interval we send hearbeats */ int heartbeat_delay; @@ -237,7 +237,7 @@ METHOD(listener_t, alert_hook, bool, { if (alert == ALERT_SHUTDOWN_SIGNAL) { - if (this->job) + if (this->heartbeat_active) { DBG1(DBG_CFG, "HA heartbeat active, dropping all segments"); deactivate(this, 0, TRUE); @@ -269,7 +269,7 @@ static job_requeue_t watchdog(private_ha_segments_t *this) DBG1(DBG_CFG, "no heartbeat received, taking all segments"); activate(this, 0, TRUE); /* disable heartbeat detection util we get one */ - this->job = NULL; + this->heartbeat_active = FALSE; return JOB_REQUEUE_NONE; } return JOB_REQUEUE_DIRECT; @@ -280,9 +280,10 @@ static job_requeue_t watchdog(private_ha_segments_t *this) */ static void start_watchdog(private_ha_segments_t *this) { - this->job = callback_job_create_with_prio((callback_job_cb_t)watchdog, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + this->heartbeat_active = TRUE; + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)watchdog, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); } METHOD(ha_segments_t, handle_status, void, @@ -312,10 +313,10 @@ METHOD(ha_segments_t, handle_status, void, } } - this->mutex->unlock(this->mutex); this->condvar->signal(this->condvar); + this->mutex->unlock(this->mutex); - if (!this->job) + if (!this->heartbeat_active) { DBG1(DBG_CFG, "received heartbeat, reenabling watchdog"); start_watchdog(this); @@ -361,10 +362,6 @@ METHOD(ha_segments_t, is_active, bool, METHOD(ha_segments_t, destroy, void, private_ha_segments_t *this) { - if (this->job) - { - this->job->cancel(this->job); - } this->mutex->destroy(this->mutex); this->condvar->destroy(this->condvar); free(this); diff --git a/src/libcharon/plugins/maemo/maemo_service.c b/src/libcharon/plugins/maemo/maemo_service.c index 0e834918e..b5f50f1c6 100644 --- a/src/libcharon/plugins/maemo/maemo_service.c +++ b/src/libcharon/plugins/maemo/maemo_service.c @@ -431,8 +431,10 @@ static job_requeue_t run(private_maemo_service_t *this) return JOB_REQUEUE_NONE; } -METHOD(maemo_service_t, destroy, void, - private_maemo_service_t *this) +/** + * Cancel the GLib Main Event Loop + */ +static bool cancel(private_maemo_service_t *this) { if (this->loop) { @@ -442,6 +444,12 @@ METHOD(maemo_service_t, destroy, void, } g_main_loop_unref(this->loop); } + return TRUE; +} + +METHOD(maemo_service_t, destroy, void, + private_maemo_service_t *this) +{ if (this->context) { osso_rpc_unset_cb_f(this->context, @@ -510,8 +518,8 @@ maemo_service_t *maemo_service_create() } lib->processor->queue_job(lib->processor, - (job_t*)callback_job_create_with_prio((callback_job_cb_t)run, - this, NULL, NULL, JOB_PRIO_CRITICAL)); + (job_t*)callback_job_create_with_prio((callback_job_cb_t)run, this, + NULL, (callback_job_cancel_t)cancel, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/plugins/smp/smp.c b/src/libcharon/plugins/smp/smp.c index 7fdbdbae4..870f0a022 100644 --- a/src/libcharon/plugins/smp/smp.c +++ b/src/libcharon/plugins/smp/smp.c @@ -49,11 +49,6 @@ struct private_smp_t { * XML unix socket fd */ int socket; - - /** - * job accepting stroke messages - */ - callback_job_t *job; }; ENUM(ike_sa_state_lower_names, IKE_CREATED, IKE_DELETING, @@ -704,7 +699,8 @@ static job_requeue_t dispatch(private_smp_t *this) fdp = malloc_thing(int); *fdp = fd; - job = callback_job_create((callback_job_cb_t)process, fdp, free, this->job); + job = callback_job_create((callback_job_cb_t)process, fdp, free, + (callback_job_cancel_t)return_false); lib->processor->queue_job(lib->processor, (job_t*)job); return JOB_REQUEUE_DIRECT; @@ -719,7 +715,6 @@ METHOD(plugin_t, get_name, char*, METHOD(plugin_t, destroy, void, private_smp_t *this) { - this->job->cancel(this->job); close(this->socket); free(this); } @@ -775,9 +770,9 @@ plugin_t *smp_plugin_create() return NULL; } - this->job = callback_job_create_with_prio((callback_job_cb_t)dispatch, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)dispatch, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public.plugin; } diff --git a/src/libcharon/plugins/stroke/stroke_socket.c b/src/libcharon/plugins/stroke/stroke_socket.c index daf244e74..e2865a640 100644 --- a/src/libcharon/plugins/stroke/stroke_socket.c +++ b/src/libcharon/plugins/stroke/stroke_socket.c @@ -64,16 +64,6 @@ struct private_stroke_socket_t { int socket; /** - * job accepting stroke messages - */ - callback_job_t *receiver; - - /** - * job handling stroke messages - */ - callback_job_t *handler; - - /** * queued stroke commands */ linked_list_t *commands; @@ -702,7 +692,7 @@ static job_requeue_t handle(private_stroke_socket_t *this) this->handling++; thread_cleanup_pop(TRUE); job = callback_job_create_with_prio((callback_job_cb_t)process, ctx, - (void*)stroke_job_context_destroy, this->handler, JOB_PRIO_HIGH); + (void*)stroke_job_context_destroy, NULL, JOB_PRIO_HIGH); lib->processor->queue_job(lib->processor, (job_t*)job); return JOB_REQUEUE_DIRECT; } @@ -787,8 +777,6 @@ static bool open_socket(private_stroke_socket_t *this) METHOD(stroke_socket_t, destroy, void, private_stroke_socket_t *this) { - this->handler->cancel(this->handler); - this->receiver->cancel(this->receiver); this->commands->destroy_function(this->commands, (void*)stroke_job_context_destroy); this->condvar->destroy(this->condvar); this->mutex->destroy(this->mutex); @@ -843,13 +831,13 @@ stroke_socket_t *stroke_socket_create() charon->backends->add_backend(charon->backends, &this->config->backend); hydra->attributes->add_provider(hydra->attributes, &this->attribute->provider); - this->receiver = callback_job_create_with_prio((callback_job_cb_t)receive, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->receiver); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); - this->handler = callback_job_create_with_prio((callback_job_cb_t)handle, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->handler); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)handle, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/plugins/tnc_pdp/tnc_pdp.c b/src/libcharon/plugins/tnc_pdp/tnc_pdp.c index c5509a245..7e2e667f9 100644 --- a/src/libcharon/plugins/tnc_pdp/tnc_pdp.c +++ b/src/libcharon/plugins/tnc_pdp/tnc_pdp.c @@ -67,11 +67,6 @@ struct private_tnc_pdp_t { int ipv6; /** - * Callback job dispatching commands - */ - callback_job_t *job; - - /** * RADIUS shared secret */ chunk_t secret; @@ -546,10 +541,6 @@ static job_requeue_t receive(private_tnc_pdp_t *this) METHOD(tnc_pdp_t, destroy, void, private_tnc_pdp_t *this) { - if (this->job) - { - this->job->cancel(this->job); - } if (this->ipv4) { close(this->ipv4); @@ -639,9 +630,9 @@ tnc_pdp_t *tnc_pdp_create(u_int16_t port) } DBG1(DBG_IKE, "eap method %N selected", eap_type_names, this->type); - this->job = callback_job_create_with_prio((callback_job_cb_t)receive, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/plugins/uci/uci_control.c b/src/libcharon/plugins/uci/uci_control.c index 87d0f8603..53221b786 100644 --- a/src/libcharon/plugins/uci/uci_control.c +++ b/src/libcharon/plugins/uci/uci_control.c @@ -42,11 +42,6 @@ struct private_uci_control_t { * Public part */ uci_control_t public; - - /** - * Job - */ - callback_job_t *job; }; /** @@ -269,7 +264,6 @@ static job_requeue_t receive(private_uci_control_t *this) METHOD(uci_control_t, destroy, void, private_uci_control_t *this) { - this->job->cancel(this->job); unlink(FIFO_FILE); free(this); } @@ -295,9 +289,10 @@ uci_control_t *uci_control_create() } else { - this->job = callback_job_create_with_prio((callback_job_cb_t)receive, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, + this, NULL, (callback_job_cancel_t)return_false, + JOB_PRIO_CRITICAL)); } return &this->public; } diff --git a/src/libcharon/plugins/whitelist/whitelist_control.c b/src/libcharon/plugins/whitelist/whitelist_control.c index 202c9a418..0c20bd1aa 100644 --- a/src/libcharon/plugins/whitelist/whitelist_control.c +++ b/src/libcharon/plugins/whitelist/whitelist_control.c @@ -49,11 +49,6 @@ struct private_whitelist_control_t { * Whitelist unix socket file descriptor */ int socket; - - /** - * Callback job dispatching commands - */ - callback_job_t *job; }; /** @@ -200,7 +195,6 @@ static job_requeue_t receive(private_whitelist_control_t *this) METHOD(whitelist_control_t, destroy, void, private_whitelist_control_t *this) { - this->job->cancel(this->job); close(this->socket); free(this); } @@ -225,9 +219,9 @@ whitelist_control_t *whitelist_control_create(whitelist_listener_t *listener) return NULL; } - this->job = callback_job_create_with_prio((callback_job_cb_t)receive, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libcharon/sa/ikev2/connect_manager.c b/src/libcharon/sa/ikev2/connect_manager.c index a8366e953..75bb8f7ea 100644 --- a/src/libcharon/sa/ikev2/connect_manager.c +++ b/src/libcharon/sa/ikev2/connect_manager.c @@ -919,8 +919,10 @@ static void update_checklist_state(private_connect_manager_t *this, &checklist->connect_id); callback_data_t *data = callback_data_create(this, checklist->connect_id); - job_t *job = (job_t*)callback_job_create((callback_job_cb_t)initiator_finish, data, (callback_job_cleanup_t)callback_data_destroy, NULL); - lib->scheduler->schedule_job_ms(lib->scheduler, job, ME_WAIT_TO_FINISH); + lib->scheduler->schedule_job_ms(lib->scheduler, + (job_t*)callback_job_create((callback_job_cb_t)initiator_finish, + data, (callback_job_cleanup_t)callback_data_destroy, NULL), + ME_WAIT_TO_FINISH); checklist->is_finishing = TRUE; } @@ -1007,8 +1009,12 @@ retransmit_end: */ static void queue_retransmission(private_connect_manager_t *this, check_list_t *checklist, endpoint_pair_t *pair) { - callback_data_t *data = retransmit_data_create(this, checklist->connect_id, pair->id); - job_t *job = (job_t*)callback_job_create((callback_job_cb_t)retransmit, data, (callback_job_cleanup_t)callback_data_destroy, NULL); + callback_data_t *data; + job_t *job; + + data = retransmit_data_create(this, checklist->connect_id, pair->id); + job = (job_t*)callback_job_create((callback_job_cb_t)retransmit, data, + (callback_job_cleanup_t)callback_data_destroy, NULL); u_int32_t retransmission = pair->retransmitted + 1; u_int32_t rto = ME_INTERVAL; @@ -1155,10 +1161,12 @@ static job_requeue_t sender(callback_data_t *data) /** * Schedules checks for a checklist (time in ms) */ -static void schedule_checks(private_connect_manager_t *this, check_list_t *checklist, u_int32_t time) +static void schedule_checks(private_connect_manager_t *this, + check_list_t *checklist, u_int32_t time) { callback_data_t *data = callback_data_create(this, checklist->connect_id); - checklist->sender = (job_t*)callback_job_create((callback_job_cb_t)sender, data, (callback_job_cleanup_t)callback_data_destroy, NULL); + checklist->sender = (job_t*)callback_job_create((callback_job_cb_t)sender, + data, (callback_job_cleanup_t)callback_data_destroy, NULL); lib->scheduler->schedule_job_ms(lib->scheduler, checklist->sender, time); } @@ -1210,12 +1218,15 @@ static void finish_checks(private_connect_manager_t *this, check_list_t *checkli if (get_initiated_by_ids(this, checklist->initiator.id, checklist->responder.id, &initiated) == SUCCESS) { + callback_job_t *job; + remove_checklist(this, checklist); remove_initiated(this, initiated); initiate_data_t *data = initiate_data_create(checklist, initiated); - job_t *job = (job_t*)callback_job_create((callback_job_cb_t)initiate_mediated, data, (callback_job_cleanup_t)initiate_data_destroy, NULL); - lib->processor->queue_job(lib->processor, job); + job = callback_job_create((callback_job_cb_t)initiate_mediated, + data, (callback_job_cleanup_t)initiate_data_destroy, NULL); + lib->processor->queue_job(lib->processor, (job_t*)job); return; } else diff --git a/src/libhydra/plugins/kernel_klips/kernel_klips_ipsec.c b/src/libhydra/plugins/kernel_klips/kernel_klips_ipsec.c index ceff8cdc9..b8d44d686 100644 --- a/src/libhydra/plugins/kernel_klips/kernel_klips_ipsec.c +++ b/src/libhydra/plugins/kernel_klips/kernel_klips_ipsec.c @@ -138,11 +138,6 @@ struct private_kernel_klips_ipsec_t linked_list_t *ipsec_devices; /** - * job receiving PF_KEY events - */ - callback_job_t *job; - - /** * mutex to lock access to the PF_KEY socket */ mutex_t *mutex_pfkey; @@ -2552,10 +2547,6 @@ METHOD(kernel_ipsec_t, bypass_socket, bool, METHOD(kernel_ipsec_t, destroy, void, private_kernel_klips_ipsec_t *this) { - if (this->job) - { - this->job->cancel(this->job); - } if (this->socket > 0) { close(this->socket); @@ -2639,9 +2630,9 @@ kernel_klips_ipsec_t *kernel_klips_ipsec_create() return NULL; } - this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_events, + this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); return &this->public; } diff --git a/src/libhydra/plugins/kernel_netlink/kernel_netlink_ipsec.c b/src/libhydra/plugins/kernel_netlink/kernel_netlink_ipsec.c index 63f8894a3..b46450c38 100644 --- a/src/libhydra/plugins/kernel_netlink/kernel_netlink_ipsec.c +++ b/src/libhydra/plugins/kernel_netlink/kernel_netlink_ipsec.c @@ -269,11 +269,6 @@ struct private_kernel_netlink_ipsec_t { hashtable_t *sas; /** - * Job receiving netlink events - */ - callback_job_t *job; - - /** * Netlink xfrm socket (IPsec) */ netlink_socket_t *socket_xfrm; @@ -2618,10 +2613,6 @@ METHOD(kernel_ipsec_t, destroy, void, enumerator_t *enumerator; policy_entry_t *policy; - if (this->job) - { - this->job->cancel(this->job); - } if (this->socket_xfrm_events > 0) { close(this->socket_xfrm_events); @@ -2730,9 +2721,10 @@ kernel_netlink_ipsec_t *kernel_netlink_ipsec_create() destroy(this); return NULL; } - this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio( + (callback_job_cb_t)receive_events, this, NULL, + (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); } return &this->public; diff --git a/src/libhydra/plugins/kernel_netlink/kernel_netlink_net.c b/src/libhydra/plugins/kernel_netlink/kernel_netlink_net.c index 2f2167adf..fc59c8abe 100644 --- a/src/libhydra/plugins/kernel_netlink/kernel_netlink_net.c +++ b/src/libhydra/plugins/kernel_netlink/kernel_netlink_net.c @@ -254,11 +254,6 @@ struct private_kernel_netlink_net_t { linked_list_t *ifaces; /** - * job receiving netlink events - */ - callback_job_t *job; - - /** * netlink rt socket (routing) */ netlink_socket_t *socket; @@ -1794,10 +1789,6 @@ METHOD(kernel_net_t, destroy, void, manage_rule(this, RTM_DELRULE, AF_INET6, this->routing_table, this->routing_table_prio); } - if (this->job) - { - this->job->cancel(this->job); - } if (this->socket_events > 0) { close(this->socket_events); @@ -1923,9 +1914,10 @@ kernel_netlink_net_t *kernel_netlink_net_create() return NULL; } - this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio( + (callback_job_cb_t)receive_events, this, NULL, + (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); } if (init_address_list(this) != SUCCESS) diff --git a/src/libhydra/plugins/kernel_pfkey/kernel_pfkey_ipsec.c b/src/libhydra/plugins/kernel_pfkey/kernel_pfkey_ipsec.c index c39ebbd9a..dfe10f93f 100644 --- a/src/libhydra/plugins/kernel_pfkey/kernel_pfkey_ipsec.c +++ b/src/libhydra/plugins/kernel_pfkey/kernel_pfkey_ipsec.c @@ -173,11 +173,6 @@ struct private_kernel_pfkey_ipsec_t bool install_routes; /** - * job receiving PF_KEY events - */ - callback_job_t *job; - - /** * mutex to lock access to the PF_KEY socket */ mutex_t *mutex_pfkey; @@ -2496,10 +2491,6 @@ METHOD(kernel_ipsec_t, bypass_socket, bool, METHOD(kernel_ipsec_t, destroy, void, private_kernel_pfkey_ipsec_t *this) { - if (this->job) - { - this->job->cancel(this->job); - } if (this->socket > 0) { close(this->socket); @@ -2592,9 +2583,10 @@ kernel_pfkey_ipsec_t *kernel_pfkey_ipsec_create() return NULL; } - this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio( + (callback_job_cb_t)receive_events, this, NULL, + (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); } return &this->public; diff --git a/src/libhydra/plugins/kernel_pfroute/kernel_pfroute_net.c b/src/libhydra/plugins/kernel_pfroute/kernel_pfroute_net.c index bd6941754..918fdd3c7 100644 --- a/src/libhydra/plugins/kernel_pfroute/kernel_pfroute_net.c +++ b/src/libhydra/plugins/kernel_pfroute/kernel_pfroute_net.c @@ -120,11 +120,6 @@ struct private_kernel_pfroute_net_t linked_list_t *ifaces; /** - * job receiving PF_ROUTE events - */ - callback_job_t *job; - - /** * mutex to lock access to the PF_ROUTE socket */ mutex_t *mutex_pfroute; @@ -648,10 +643,6 @@ static status_t init_address_list(private_kernel_pfroute_net_t *this) METHOD(kernel_net_t, destroy, void, private_kernel_pfroute_net_t *this) { - if (this->job) - { - this->job->cancel(this->job); - } if (this->socket > 0) { close(this->socket); @@ -718,9 +709,10 @@ kernel_pfroute_net_t *kernel_pfroute_net_create() return NULL; } - this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio( + (callback_job_cb_t)receive_events, this, NULL, + (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); } if (init_address_list(this) != SUCCESS) diff --git a/src/libstrongswan/plugins/pkcs11/pkcs11_manager.c b/src/libstrongswan/plugins/pkcs11/pkcs11_manager.c index 5b321b26e..83c383671 100644 --- a/src/libstrongswan/plugins/pkcs11/pkcs11_manager.c +++ b/src/libstrongswan/plugins/pkcs11/pkcs11_manager.c @@ -61,8 +61,6 @@ typedef struct { char *path; /* loaded library */ pkcs11_library_t *lib; - /* event dispatcher job */ - callback_job_t *job; } lib_entry_t; /** @@ -70,10 +68,6 @@ typedef struct { */ static void lib_entry_destroy(lib_entry_t *entry) { - if (entry->job) - { - entry->job->cancel(entry->job); - } entry->lib->destroy(entry->lib); free(entry); } @@ -202,14 +196,6 @@ static job_requeue_t dispatch_slot_events(lib_entry_t *entry) } /** - * End dispatching, unset job - */ -static void end_dispatch(lib_entry_t *entry) -{ - entry->job = NULL; -} - -/** * Get the slot list of a library */ static CK_SLOT_ID_PTR get_slot_list(pkcs11_library_t *p11, CK_ULONG *out) @@ -384,9 +370,9 @@ pkcs11_manager_t *pkcs11_manager_create(pkcs11_manager_token_event_t cb, while (enumerator->enumerate(enumerator, &entry)) { query_slots(entry); - entry->job = callback_job_create_with_prio((void*)dispatch_slot_events, - entry, (void*)end_dispatch, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)entry->job); + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((void*)dispatch_slot_events, + entry, NULL, (void*)return_false, JOB_PRIO_CRITICAL)); } enumerator->destroy(enumerator); diff --git a/src/libstrongswan/processing/jobs/callback_job.c b/src/libstrongswan/processing/jobs/callback_job.c index 86d5228bf..a5ddc8ff6 100644 --- a/src/libstrongswan/processing/jobs/callback_job.c +++ b/src/libstrongswan/processing/jobs/callback_job.c @@ -51,42 +51,9 @@ struct private_callback_job_t { callback_job_cleanup_t cleanup; /** - * thread of the job, if running + * cancel function */ - thread_t *thread; - - /** - * mutex to access private job data - */ - mutex_t *mutex; - - /** - * list of associated child jobs - */ - linked_list_t *children; - - /** - * parent of this job, or NULL - */ - private_callback_job_t *parent; - - /** - * TRUE if the job got canceled - */ - bool canceled; - - /** - * condvar to synchronize the cancellation/destruction of the job - */ - condvar_t *destroyable; - - /** - * semaphore to synchronize the termination of the assigned thread. - * - * separately created during cancellation, so that we can wait on it - * without risking that it gets destroyed too early during destruction. - */ - semaphore_t *terminated; + callback_job_cancel_t cancel; /** * Priority of this job @@ -94,131 +61,26 @@ struct private_callback_job_t { job_priority_t prio; }; -/** - * unregister a child from its parent, if any. - * note: this->mutex has to be locked - */ -static void unregister(private_callback_job_t *this) -{ - if (this->parent) - { - this->parent->mutex->lock(this->parent->mutex); - if (this->parent->canceled && !this->canceled) - { - /* if the parent has been canceled but we have not yet, we do not - * unregister until we got canceled by the parent. */ - this->parent->mutex->unlock(this->parent->mutex); - this->destroyable->wait(this->destroyable, this->mutex); - this->parent->mutex->lock(this->parent->mutex); - } - this->parent->children->remove(this->parent->children, this, NULL); - this->parent->mutex->unlock(this->parent->mutex); - this->parent = NULL; - } -} - METHOD(job_t, destroy, void, private_callback_job_t *this) { - this->mutex->lock(this->mutex); - unregister(this); if (this->cleanup) { this->cleanup(this->data); } - if (this->terminated) - { - this->terminated->post(this->terminated); - } - this->children->destroy(this->children); - this->destroyable->destroy(this->destroyable); - this->mutex->unlock(this->mutex); - this->mutex->destroy(this->mutex); free(this); } -METHOD(callback_job_t, cancel, void, +METHOD(job_t, execute, job_requeue_t, private_callback_job_t *this) { - callback_job_t *child; - semaphore_t *terminated = NULL; - - this->mutex->lock(this->mutex); - this->canceled = TRUE; - /* terminate children */ - while (this->children->get_first(this->children, (void**)&child) == SUCCESS) - { - this->mutex->unlock(this->mutex); - child->cancel(child); - this->mutex->lock(this->mutex); - } - if (this->thread) - { - /* terminate the thread, if there is currently one executing the job. - * we wait for its termination using a semaphore */ - this->thread->cancel(this->thread); - terminated = this->terminated = semaphore_create(0); - } - else - { - /* if the job is currently queued, it gets terminated later. - * we can't wait, because it might not get executed at all. - * we also unregister the queued job manually from its parent (the - * others get unregistered during destruction) */ - unregister(this); - } - this->destroyable->signal(this->destroyable); - this->mutex->unlock(this->mutex); - - if (terminated) - { - terminated->wait(terminated); - terminated->destroy(terminated); - } + return this->callback(this->data); } -METHOD(job_t, execute, job_requeue_t, +METHOD(job_t, cancel, bool, private_callback_job_t *this) { - bool requeue = FALSE; - - this->mutex->lock(this->mutex); - this->thread = thread_current(); - this->mutex->unlock(this->mutex); - - while (TRUE) - { - this->mutex->lock(this->mutex); - if (this->canceled) - { - this->mutex->unlock(this->mutex); - break; - } - this->mutex->unlock(this->mutex); - switch (this->callback(this->data)) - { - case JOB_REQUEUE_DIRECT: - continue; - case JOB_REQUEUE_FAIR: - { - requeue = TRUE; - break; - } - case JOB_REQUEUE_NONE: - default: - { - break; - } - } - break; - } - this->mutex->lock(this->mutex); - this->thread = NULL; - this->mutex->unlock(this->mutex); - /* manually create a cancellation point to avoid that a canceled thread - * goes back into the thread pool at all */ - thread_cancellation_point(); - return requeue ? JOB_REQUEUE_FAIR : JOB_REQUEUE_NONE; + return this->cancel(this->data); } METHOD(job_t, get_priority, job_priority_t, @@ -231,8 +93,8 @@ METHOD(job_t, get_priority, job_priority_t, * Described in header. */ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, - callback_job_cleanup_t cleanup, callback_job_t *parent, - job_priority_t prio) + callback_job_cleanup_t cleanup, callback_job_cancel_t cancel, + job_priority_t prio) { private_callback_job_t *this; @@ -243,24 +105,17 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, .get_priority = _get_priority, .destroy = _destroy, }, - .cancel = _cancel, }, - .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .callback = cb, .data = data, .cleanup = cleanup, - .children = linked_list_create(), - .parent = (private_callback_job_t*)parent, - .destroyable = condvar_create(CONDVAR_TYPE_DEFAULT), + .cancel = cancel, .prio = prio, ); - /* register us at parent */ - if (parent) + if (cancel) { - this->parent->mutex->lock(this->parent->mutex); - this->parent->children->insert_last(this->parent->children, this); - this->parent->mutex->unlock(this->parent->mutex); + this->public.job.cancel = _cancel; } return &this->public; @@ -271,8 +126,8 @@ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, */ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, callback_job_cleanup_t cleanup, - callback_job_t *parent) + callback_job_cancel_t cancel) { - return callback_job_create_with_prio(cb, data, cleanup, parent, + return callback_job_create_with_prio(cb, data, cleanup, cancel, JOB_PRIO_MEDIUM); } diff --git a/src/libstrongswan/processing/jobs/callback_job.h b/src/libstrongswan/processing/jobs/callback_job.h index ebe5c9c2b..6f2e39eb8 100644 --- a/src/libstrongswan/processing/jobs/callback_job.h +++ b/src/libstrongswan/processing/jobs/callback_job.h @@ -1,4 +1,5 @@ /* + * Copyright (C) 2012 Tobias Brunner * Copyright (C) 2007-2011 Martin Willi * Copyright (C) 2011 revosec AG * Hochschule fuer Technik Rapperswil @@ -46,11 +47,22 @@ typedef job_requeue_t (*callback_job_cb_t)(void *data); * to supply to the constructor. * * @param data param supplied to job - * @return requeing policy how to requeue the job */ typedef void (*callback_job_cleanup_t)(void *data); /** + * Cancellation function to use for the callback job. + * + * Optional function to be called when a job has to be canceled. + * + * See job_t.cancel() for details on the return value. + * + * @param data param supplied to job + * @return TRUE if canceled, FALSE to explicitly cancel the thread + */ +typedef bool (*callback_job_cancel_t)(void *data); + +/** * Class representing an callback Job. * * This is a special job which allows a simple callback function to @@ -64,14 +76,6 @@ struct callback_job_t { */ job_t job; - /** - * Cancel the job's thread and wait for its termination. - * - * This only works reliably for jobs that always use JOB_REQUEUE_FAIR or - * JOB_REQUEUE_DIRECT, otherwise the job may already be destroyed when - * cancel is called. - */ - void (*cancel)(callback_job_t *this); }; /** @@ -79,19 +83,20 @@ struct callback_job_t { * * The cleanup function is called when the job gets destroyed to destroy * the associated data. - * If parent is not NULL, the specified job gets an association. Whenever - * the parent gets cancelled (or runs out), all of its children are cancelled, - * too. + * + * The cancel function is optional and should only be provided if the callback + * function calls potentially blocking functions and/or always returns + * JOB_REQUEUE_DIRECT. * * @param cb callback to call from the processor * @param data user data to supply to callback * @param cleanup destructor for data on destruction, or NULL - * @param parent parent of this job + * @param cancel function to cancel the job, or NULL * @return callback_job_t object */ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, callback_job_cleanup_t cleanup, - callback_job_t *parent); + callback_job_cancel_t cancel); /** * Creates a callback job, with priority. @@ -101,12 +106,12 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, * @param cb callback to call from the processor * @param data user data to supply to callback * @param cleanup destructor for data on destruction, or NULL - * @param parent parent of this job + * @param cancel function to cancel the job, or NULL * @param prio job priority * @return callback_job_t object */ callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data, - callback_job_cleanup_t cleanup, callback_job_t *parent, - job_priority_t prio); + callback_job_cleanup_t cleanup, callback_job_cancel_t cancel, + job_priority_t prio); #endif /** CALLBACK_JOB_H_ @}*/ diff --git a/src/libstrongswan/processing/jobs/job.h b/src/libstrongswan/processing/jobs/job.h index c3e640065..43bb5430e 100644 --- a/src/libstrongswan/processing/jobs/job.h +++ b/src/libstrongswan/processing/jobs/job.h @@ -103,6 +103,26 @@ struct job_t { job_requeue_t (*execute) (job_t *this); /** + * Cancel a job. + * + * Implementing this method is optional. It allows potentially blocking + * jobs to be canceled during shutdown. + * + * If no special action is to be taken simply return FALSE then the thread + * executing the job will be canceled. If TRUE is returned the job is + * expected to return from execute() itself (i.e. the thread won't be + * canceled explicitly and can still be joined later). + * Jobs that return FALSE have to make sure they provide the appropriate + * cancellation points. + * + * @note Regular jobs that do not block MUST NOT implement this method. + * @note This method could be called even before execute() has been called. + * + * @return FALSE to cancel the thread, TRUE if canceled otherwise + */ + bool (*cancel)(job_t *this); + + /** * Get the priority of a job. * * @return job priority @@ -117,7 +137,7 @@ struct job_t { * * Use the status of a job to decide what to do during destruction. */ - void (*destroy) (job_t *this); + void (*destroy)(job_t *this); }; #endif /** JOB_H_ @}*/ diff --git a/src/libstrongswan/processing/processor.c b/src/libstrongswan/processing/processor.c index 69838aa13..0f0c192d2 100644 --- a/src/libstrongswan/processing/processor.c +++ b/src/libstrongswan/processing/processor.c @@ -88,7 +88,6 @@ struct private_processor_t { condvar_t *thread_terminated; }; - /** * Worker thread */ @@ -222,26 +221,38 @@ static void process_jobs(worker_thread_t *worker) { break; } + else if (!worker->job->cancel) + { /* only allow cancelable jobs to requeue directly */ + requeue = JOB_REQUEUE_FAIR; + break; + } } thread_cleanup_pop(FALSE); this->mutex->lock(this->mutex); this->working_threads[i]--; - switch (requeue) + if (worker->job->status == JOB_STATUS_CANCELED) + { /* job was canceled via a custom cancel() method or did not + * use JOB_REQUEUE_TYPE_DIRECT */ + worker->job->destroy(worker->job); + } + else { - case JOB_REQUEUE_NONE: - worker->job->status = JOB_STATUS_DONE; - worker->job->destroy(worker->job); - break; - case JOB_REQUEUE_FAIR: - worker->job->status = JOB_STATUS_QUEUED; - this->jobs[i]->insert_last(this->jobs[i], worker->job); - this->job_added->signal(this->job_added); - break; - case JOB_REQUEUE_SCHEDULED: - worker->job->status = JOB_STATUS_QUEUED; - /* fall-through */ - default: - break; + switch (requeue) + { + case JOB_REQUEUE_NONE: + worker->job->status = JOB_STATUS_DONE; + worker->job->destroy(worker->job); + break; + case JOB_REQUEUE_FAIR: + worker->job->status = JOB_STATUS_QUEUED; + this->jobs[i]->insert_last(this->jobs[i], + worker->job); + this->job_added->signal(this->job_added); + break; + case JOB_REQUEUE_SCHEDULED: + default: + break; + } } break; } @@ -364,14 +375,29 @@ METHOD(processor_t, set_threads, void, this->mutex->unlock(this->mutex); } -METHOD(processor_t, destroy, void, +METHOD(processor_t, cancel, void, private_processor_t *this) { + enumerator_t *enumerator; worker_thread_t *worker; - int i; - set_threads(this, 0); this->mutex->lock(this->mutex); + this->desired_threads = 0; + /* cancel potentially blocking jobs */ + enumerator = this->threads->create_enumerator(this->threads); + while (enumerator->enumerate(enumerator, (void**)&worker)) + { + if (worker->job && worker->job->cancel) + { + worker->job->status = JOB_STATUS_CANCELED; + if (!worker->job->cancel(worker->job)) + { /* job requests to be canceled explicitly, otherwise we assume + * the thread terminates itself and can be joined */ + worker->thread->cancel(worker->thread); + } + } + } + enumerator->destroy(enumerator); while (this->total_threads > 0) { this->job_added->broadcast(this->job_added); @@ -384,6 +410,14 @@ METHOD(processor_t, destroy, void, free(worker); } this->mutex->unlock(this->mutex); +} + +METHOD(processor_t, destroy, void, + private_processor_t *this) +{ + int i; + + cancel(this); this->thread_terminated->destroy(this->thread_terminated); this->job_added->destroy(this->job_added); this->mutex->destroy(this->mutex); @@ -411,6 +445,7 @@ processor_t *processor_create() .get_job_load = _get_job_load, .queue_job = _queue_job, .set_threads = _set_threads, + .cancel = _cancel, .destroy = _destroy, }, .threads = linked_list_create(), diff --git a/src/libstrongswan/processing/processor.h b/src/libstrongswan/processing/processor.h index 05e88a2cf..94860f5d3 100644 --- a/src/libstrongswan/processing/processor.h +++ b/src/libstrongswan/processing/processor.h @@ -1,4 +1,5 @@ /* + * Copyright (C) 2012 Tobias Brunner * Copyright (C) 2005-2007 Martin Willi * Copyright (C) 2005 Jan Hutter * Hochschule fuer Technik Rapperswil @@ -78,14 +79,21 @@ struct processor_t { * * If the number of threads is smaller than number of currently running * threads, thread count is decreased. Use 0 to disable the processor. - * This call blocks if it decreases thread count until threads have - * terminated, so make sure there are not too many blocking jobs. + * + * This call does not block and wait for threads to terminate if the number + * of threads is reduced. Instead use cancel() for that during shutdown. * * @param count number of threads to allocate */ void (*set_threads)(processor_t *this, u_int count); /** + * Sets the number of threads to 0 and cancels all blocking jobs, then waits + * for all threads to be terminated. + */ + void (*cancel)(processor_t *this); + + /** * Destroy a processor object. */ void (*destroy) (processor_t *processor); diff --git a/src/libstrongswan/processing/scheduler.c b/src/libstrongswan/processing/scheduler.c index 979a7139f..c97dbc4be 100644 --- a/src/libstrongswan/processing/scheduler.c +++ b/src/libstrongswan/processing/scheduler.c @@ -68,11 +68,6 @@ struct private_scheduler_t { scheduler_t public; /** - * Job which queues scheduled jobs to the processor. - */ - callback_job_t *job; - - /** * The heap in which the events are stored. */ event_t **heap; @@ -309,7 +304,6 @@ METHOD(scheduler_t, destroy, void, private_scheduler_t *this) { event_t *event; - this->job->cancel(this->job); this->condvar->destroy(this->condvar); this->mutex->destroy(this->mutex); while ((event = remove_event(this)) != NULL) @@ -326,6 +320,7 @@ METHOD(scheduler_t, destroy, void, scheduler_t * scheduler_create() { private_scheduler_t *this; + callback_job_t *job; INIT(this, .public = { @@ -342,9 +337,9 @@ scheduler_t * scheduler_create() this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*)); - this->job = callback_job_create_with_prio((callback_job_cb_t)schedule, - this, NULL, NULL, JOB_PRIO_CRITICAL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + job = callback_job_create_with_prio((callback_job_cb_t)schedule, this, + NULL, return_false, JOB_PRIO_CRITICAL); + lib->processor->queue_job(lib->processor, (job_t*)job); return &this->public; } |