aboutsummaryrefslogtreecommitdiffstats
path: root/src/charon/bus/bus.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/charon/bus/bus.c')
-rw-r--r--src/charon/bus/bus.c274
1 files changed, 267 insertions, 7 deletions
diff --git a/src/charon/bus/bus.c b/src/charon/bus/bus.c
index 1e5ff9857..028fd37c9 100644
--- a/src/charon/bus/bus.c
+++ b/src/charon/bus/bus.c
@@ -22,6 +22,102 @@
#include "bus.h"
+#include <pthread.h>
+
+ENUM(signal_names, SIG_ANY, SIG_MAX,
+ /** should not get printed */
+ "SIG_ANY",
+ /** debugging message types */
+ "DMN",
+ "MGR",
+ "IKE",
+ "CHD",
+ "JOB",
+ "CFG",
+ "KNL",
+ "NET",
+ "ENC",
+ "LIB",
+ /** should not get printed */
+ "SIG_DBG_MAX",
+ /** all level0 signals are AUDIT signals */
+ "AUD",
+ "AUD",
+ "AUD",
+ "AUD",
+ "AUD",
+ "AUD",
+ "AUD",
+ "AUD",
+ "AUD",
+ "AUD",
+ "AUD",
+ "AUD",
+ /** should not get printed */
+ "SIG_MAX",
+);
+
+typedef struct active_listener_t active_listener_t;
+
+/**
+ * information for a active listener
+ */
+struct active_listener_t {
+
+ /**
+ * associated thread
+ */
+ pthread_t id;
+
+ /**
+ * condvar to wait for a signal
+ */
+ pthread_cond_t cond;
+
+ /**
+ * state of the thread
+ */
+ enum {
+ /** not registered, do not wait for thread */
+ UNREGISTERED,
+ /** registered, if a signal occurs, wait until it is LISTENING */
+ REGISTERED,
+ /** listening, deliver signal */
+ LISTENING,
+ } state;
+
+ /**
+ * currently processed signals type
+ */
+ signal_t signal;
+
+ /**
+ * verbosity level of the signal
+ */
+ level_t level;
+
+ /**
+ * current processed signals thread number
+ */
+ int thread;
+
+ /**
+ * currently processed signals ike_sa
+ */
+ ike_sa_t *ike_sa;
+
+ /**
+ * currently processed signals format string
+ */
+ char *format;
+
+ /**
+ * currently processed signals format varargs
+ */
+ va_list args;
+
+};
+
typedef struct private_bus_t private_bus_t;
/**
@@ -39,6 +135,16 @@ struct private_bus_t {
linked_list_t *listeners;
/**
+ * List of active listeners with listener_state TRUE
+ */
+ linked_list_t *active_listeners;
+
+ /**
+ * mutex to synchronize active listeners
+ */
+ pthread_mutex_t mutex;
+
+ /**
* Thread local storage for a unique, simple thread ID
*/
pthread_key_t thread_id;
@@ -76,10 +182,96 @@ static int get_thread_number(private_bus_t *this)
*/
static void add_listener(private_bus_t *this, bus_listener_t *listener)
{
+ pthread_mutex_lock(&this->mutex);
this->listeners->insert_last(this->listeners, (void*)listener);
+ pthread_mutex_unlock(&this->mutex);
+}
+
+/**
+ * Get the listener object for the calling thread
+ */
+static active_listener_t *get_active_listener(private_bus_t *this)
+{
+ active_listener_t *current, *found = NULL;
+ iterator_t *iterator;
+
+ /* if the thread was here once before, we have a active_listener record */
+ iterator = this->active_listeners->create_iterator(this->active_listeners, TRUE);
+ while (iterator->iterate(iterator, (void**)&current))
+ {
+ if (current->id == pthread_self())
+ {
+ found = current;
+ break;
+ }
+ }
+ iterator->destroy(iterator);
+
+ if (found == NULL)
+ {
+ /* create a new object for a never-seen thread */
+ found = malloc_thing(active_listener_t);
+ found->id = pthread_self();
+ pthread_cond_init(&found->cond, NULL);
+ this->active_listeners->insert_last(this->active_listeners, found);
+ }
+
+ return found;
+}
+
+/**
+ * Implementation of bus_t.listen.
+ */
+static signal_t listen_(private_bus_t *this, level_t *level, int *thread,
+ ike_sa_t **ike_sa, char** format, va_list* args)
+{
+ active_listener_t *listener;
+
+ pthread_mutex_lock(&this->mutex);
+ listener = get_active_listener(this);
+ /* go "listening", say hello to a thread which have a signal for us */
+ listener->state = LISTENING;
+ pthread_cond_broadcast(&listener->cond);
+ /* wait until it has us delivered a signal, and go back to "registered" */
+ pthread_cond_wait(&listener->cond, &this->mutex);
+ pthread_mutex_unlock(&this->mutex);
+
+ /* return signal values */
+ *level = listener->level;
+ *thread = listener->thread;
+ *ike_sa = listener->ike_sa;
+ *format = listener->format;
+ *args = listener->args;
+
+ return listener->signal;
}
/**
+ * Implementation of bus_t.set_listen_state.
+ */
+static void set_listen_state(private_bus_t *this, bool active)
+{
+ active_listener_t *listener;
+
+ pthread_mutex_lock(&this->mutex);
+
+ listener = get_active_listener(this);
+ if (active)
+ {
+ listener->state = REGISTERED;
+ }
+ else
+ {
+ listener->state = UNREGISTERED;
+ /* say hello to signal omitter; we are finished processing the signal */
+ pthread_cond_signal(&listener->cond);
+ }
+
+ pthread_mutex_unlock(&this->mutex);
+}
+
+
+/**
* Implementation of bus_t.set_sa.
*/
static void set_sa(private_bus_t *this, ike_sa_t *ike_sa)
@@ -88,28 +280,83 @@ static void set_sa(private_bus_t *this, ike_sa_t *ike_sa)
}
/**
- * Implementation of bus_t.signal.
+ * Implementation of bus_t.vsignal.
*/
-static void signal_(private_bus_t *this, signal_t signal, level_t condition,
- char* format, ...)
+static void vsignal(private_bus_t *this, signal_t signal, level_t level,
+ char* format, va_list args)
{
iterator_t *iterator;
bus_listener_t *listener;
- va_list args;
+ active_listener_t *active_listener;
ike_sa_t *ike_sa;
int thread;
ike_sa = pthread_getspecific(this->thread_sa);
thread = get_thread_number(this);
- va_start(args, format);
+ pthread_mutex_lock(&this->mutex);
+
+ /* do the job for all passive bus_listeners */
iterator = this->listeners->create_iterator(this->listeners, TRUE);
while (iterator->iterate(iterator, (void**)&listener))
{
- listener->signal(listener, thread, ike_sa,
- signal, condition, format, args);
+ va_list args_copy;
+
+ va_copy(args_copy, args);
+ listener->signal(listener, signal, level, thread, ike_sa, format, args_copy);
+ va_end(args_copy);
}
iterator->destroy(iterator);
+
+ /* wake up all active listeners */
+ iterator = this->active_listeners->create_iterator(this->active_listeners, TRUE);
+ while (iterator->iterate(iterator, (void**)&active_listener))
+ {
+ /* wait until it is back */
+ while (active_listener->state == REGISTERED)
+ {
+ pthread_cond_wait(&active_listener->cond, &this->mutex);
+ }
+ /* if thread is listening now, give it the signal to process */
+ if (active_listener->state == LISTENING)
+ {
+ active_listener->level = level;
+ active_listener->thread = thread;
+ active_listener->ike_sa = ike_sa;
+ active_listener->signal = signal;
+ active_listener->format = format;
+ va_copy(active_listener->args, args);
+ active_listener->state = REGISTERED;
+ pthread_cond_signal(&active_listener->cond);
+ }
+ }
+
+ /* we must wait now until all are not in state REGISTERED,
+ * as they may still use our arguments */
+ iterator->reset(iterator);
+ while (iterator->iterate(iterator, (void**)&active_listener))
+ {
+ while (active_listener->state == REGISTERED)
+ {
+ pthread_cond_wait(&active_listener->cond, &this->mutex);
+ }
+ va_end(active_listener->args);
+ }
+ iterator->destroy(iterator);
+
+ pthread_mutex_unlock(&this->mutex);
+}
+
+/**
+ * Implementation of bus_t.signal.
+ */
+static void signal_(private_bus_t *this, signal_t signal, level_t level,
+ char* format, ...)
+{
+ va_list args;
+
+ va_start(args, format);
+ vsignal(this, signal, level, format, args);
va_end(args);
}
@@ -118,6 +365,14 @@ static void signal_(private_bus_t *this, signal_t signal, level_t condition,
*/
static void destroy(private_bus_t *this)
{
+ active_listener_t *listener;
+ while (this->active_listeners->remove_last(this->active_listeners,
+ (void**)&listener) == SUCCESS)
+ {
+ free(listener);
+ }
+
+ this->active_listeners->destroy(this->active_listeners);
this->listeners->destroy(this->listeners);
free(this);
}
@@ -130,11 +385,16 @@ bus_t *bus_create()
private_bus_t *this = malloc_thing(private_bus_t);
this->public.add_listener = (void(*)(bus_t*,bus_listener_t*))add_listener;
+ this->public.listen = (signal_t(*)(bus_t*,level_t*,int*,ike_sa_t**,char**,va_list*))listen_;
+ this->public.set_listen_state = (void(*)(bus_t*,bool))set_listen_state;
this->public.set_sa = (void(*)(bus_t*,ike_sa_t*))set_sa;
this->public.signal = (void(*)(bus_t*,signal_t,level_t,char*,...))signal_;
+ this->public.vsignal = (void(*)(bus_t*,signal_t,level_t,char*,va_list))vsignal;
this->public.destroy = (void(*)(bus_t*)) destroy;
this->listeners = linked_list_create();
+ this->active_listeners = linked_list_create();
+ pthread_mutex_init(&this->mutex, NULL);
pthread_key_create(&this->thread_id, NULL);
pthread_key_create(&this->thread_sa, NULL);