summaryrefslogtreecommitdiffstats
path: root/lib/qpnexus.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/qpnexus.c')
-rw-r--r--lib/qpnexus.c327
1 files changed, 327 insertions, 0 deletions
diff --git a/lib/qpnexus.c b/lib/qpnexus.c
new file mode 100644
index 00000000..3b014be2
--- /dev/null
+++ b/lib/qpnexus.c
@@ -0,0 +1,327 @@
+/* Quagga Pthreads support -- header
+ * Copyright (C) 2009 Chris Hall (GMCH), Highwayman
+ *
+ * This file is part of GNU Zebra.
+ *
+ * GNU Zebra is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * GNU Zebra is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Zebra; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <zebra.h>
+#include <stdbool.h>
+
+#include "qpnexus.h"
+#include "memory.h"
+#include "thread.h"
+#include "sigevent.h"
+
+/* prototypes */
+static void* qpn_start(void* arg);
+static void qpn_in_thread_init(qpn_nexus qpn);
+
+/*==============================================================================
+ * Quagga Nexus Interface -- qpn_xxxx
+ *
+
+ */
+
+/*==============================================================================
+ * Initialisation, add hook, free etc.
+ *
+ */
+
+/*------------------------------------------------------------------------------
+ * Initialise a nexus -- allocating it if required.
+ *
+ * If main_thread is set then no new thread will be created
+ * when qpn_exec() is called, instead the finite state machine will be
+ * run in the calling thread. The main thread will only block the
+ * message queue's signal. Non-main threads will block most signals.
+ *
+ * Returns the qpn_nexus.
+ */
+extern qpn_nexus
+qpn_init_new(qpn_nexus qpn, bool main_thread)
+{
+ if (qpn == NULL)
+ qpn = XCALLOC(MTYPE_QPN_NEXUS, sizeof(struct qpn_nexus)) ;
+ else
+ memset(qpn, 0, sizeof(struct qpn_nexus)) ;
+
+ qpn->selection = qps_selection_init_new(qpn->selection);
+ qpn->pile = qtimer_pile_init_new(qpn->pile);
+ qpn->queue = mqueue_init_new(qpn->queue, mqt_signal_unicast);
+ qpn->main_thread = main_thread;
+ qpn->start = qpn_start;
+
+ if (main_thread)
+ qpn->thread_id = qpt_thread_self();
+
+ return qpn;
+}
+
+/*------------------------------------------------------------------------------
+ * Add a hook function to the given nexus.
+ */
+extern void
+qpn_add_hook_function(qpn_hook_list list, void* hook)
+{
+ passert(list->count < qpn_hooks_max) ;
+ list->hooks[list->count++] = hook ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Reset given nexus and, if required, free the nexus structure.
+ *
+ * Free timers, selection, message queue and its thread signal.
+ *
+ * Leaves all pointers to these things NULL -- which generally means that the
+ * object is empty or otherwise out of action.
+ */
+extern qpn_nexus
+qpn_reset(qpn_nexus qpn, bool free_structure)
+{
+ qps_file qf;
+ qtimer qtr;
+
+ if (qpn == NULL)
+ return NULL;
+
+ /* timers and the pile */
+ if (qpn->pile != NULL)
+ {
+ while ((qtr = qtimer_pile_ream(qpn->pile, 1)))
+ qtimer_free(qtr);
+ qpn->pile = NULL ;
+ }
+
+ /* files and selection */
+ if (qpn->selection != NULL)
+ {
+ while ((qf = qps_selection_ream(qpn->selection, 1)))
+ qps_file_free(qf);
+ qpn->selection = NULL ;
+ }
+
+ if (qpn->queue != NULL)
+ qpn->queue = mqueue_reset(qpn->queue, 1);
+
+ if (qpn->mts != NULL)
+ qpn->mts = mqueue_thread_signal_reset(qpn->mts, 1);
+
+ if (free_structure)
+ XFREE(MTYPE_QPN_NEXUS, qpn) ; /* sets qpn = NULL */
+
+ return qpn ;
+} ;
+
+/*==============================================================================
+ * Execution of a nexus
+ */
+
+/*------------------------------------------------------------------------------
+ * If not main qpthread create new qpthread.
+ *
+ * For all qpthreads: start the thread !
+ */
+extern void
+qpn_exec(qpn_nexus qpn)
+{
+ if (qpn->main_thread)
+ qpn->start(qpn);
+ else
+ qpt_thread_create(qpn->start, qpn, NULL) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Pthread routine
+ *
+ * Processes:
+ *
+ * 1) Main thread only -- signals.
+ *
+ * 2) High priority pending work -- event hooks.
+ *
+ * 3) Messages coming from other pthreads -- mqueue_queue.
+ *
+ * 4) All priority pending work -- event hooks.
+ *
+ * 5) I/O -- qpselect
+ *
+ * This deals with all active sockets for read/write/connect/accept.
+ *
+ * Each time a socket is readable, one message is read and dispatched.
+ * The pselect timeout is set to be when the next timer is due.
+ *
+ * 6) Timers -- qtimers
+ *
+ */
+static void*
+qpn_start(void* arg)
+{
+ qpn_nexus qpn = arg;
+ mqueue_block mqb;
+ int actions;
+ qtime_mono_t now ;
+ qtime_t max_wait ;
+ unsigned i;
+ unsigned done ;
+ unsigned wait ;
+
+ /* now in our thread, complete initialisation */
+ qpn_in_thread_init(qpn);
+
+ /* custom in-thread initialization */
+ for (i = 0; i < qpn->in_thread_init.count ;)
+ ((qpn_init_function*)(qpn->in_thread_init.hooks[i++]))() ;
+
+ /* Until required to terminate, loop */
+ done = 1 ;
+ while (!qpn->terminate)
+ {
+ /* Signals are highest priority -- only execute for main thread */
+ if (qpn->main_thread)
+ done |= quagga_sigevent_process() ;
+
+ /* Foreground hooks, if any. */
+ for (i = 0; i < qpn->foreground.count ;)
+ done |= ((qpn_hook_function*)(qpn->foreground.hooks[i++]))() ;
+
+ /* take stuff from the message queue
+ *
+ * If nothing done the last time around the loop then may wait this
+ * time if the queue is empty first time through.
+ */
+ wait = (done == 0) ; /* may wait this time only if nothing
+ found to do on the last pass */
+ done = 0 ;
+ do
+ {
+ mqb = mqueue_dequeue(qpn->queue, wait, qpn->mts) ;
+ if (mqb == NULL)
+ break;
+
+ mqb_dispatch(mqb, mqb_action);
+
+ ++done ; /* done another */
+ wait = 0 ; /* done something, so turn off wait */
+ } while (done < 200) ;
+
+ /* block for some input, output, signal or timeout
+ *
+ * wait will be true iff did nothing the last time round the loop, and
+ * not found anything to be done up to this point either.
+ */
+ if (wait)
+ max_wait = qtimer_pile_top_wait(qpn->pile, QTIME(MAX_PSELECT_WAIT)) ;
+ else
+ max_wait = 0 ;
+
+ actions = qps_pselect(qpn->selection, max_wait) ;
+ done |= actions ;
+
+ if (wait)
+ mqueue_done_waiting(qpn->queue, qpn->mts);
+
+ /* process I/O actions */
+ while (actions)
+ actions = qps_dispatch_next(qpn->selection) ;
+
+ /* process timers */
+ now = qt_get_monotonic() ;
+ while (qtimer_pile_dispatch_next(qpn->pile, now))
+ done = 1 ;
+
+ /* If nothing done in this pass, see if anything in the background */
+ if (done == 0)
+ for (i = 0; i < qpn->background.count ; ++i)
+ done |= ((qpn_hook_function*)(qpn->background.hooks[i]))() ;
+ } ;
+
+ /* custom in-thread finalization */
+ for (i = qpn->in_thread_final.count; i > 0 ;)
+ ((qpn_init_function*)(qpn->in_thread_final.hooks[--i]))() ;
+
+ return NULL;
+}
+
+/*------------------------------------------------------------------------------
+ * Now running in our thread, do common initialisation
+ */
+static void
+qpn_in_thread_init(qpn_nexus qpn)
+{
+ sigset_t newmask;
+
+ qpn->thread_id = qpt_thread_self();
+
+ if (qpn->main_thread)
+ {
+ /* Main thread, block the message queue's signal */
+ sigemptyset (&newmask);
+ sigaddset (&newmask, SIGMQUEUE);
+ }
+ else
+ {
+ /*
+ * Not main thread. Block most signals, but be careful not to
+ * defer SIGTRAP because doing so breaks gdb, at least on
+ * NetBSD 2.0. Avoid asking to block SIGKILL, just because
+ * we shouldn't be able to do so. Avoid blocking SIGFPE,
+ * SIGILL, SIGSEGV, SIGBUS as this is undefined by POSIX.
+ * Don't block SIGPIPE so that is gets ignored on this thread.
+ */
+ sigfillset (&newmask);
+ sigdelset (&newmask, SIGTRAP);
+ sigdelset (&newmask, SIGKILL);
+ sigdelset (&newmask, SIGPIPE);
+ sigdelset (&newmask, SIGFPE);
+ sigdelset (&newmask, SIGILL);
+ sigdelset (&newmask, SIGSEGV);
+ sigdelset (&newmask, SIGBUS);
+ }
+
+ if (qpthreads_enabled)
+ qpt_thread_sigmask(SIG_BLOCK, &newmask, NULL);
+ else
+ {
+ if (sigprocmask(SIG_BLOCK, &newmask, NULL) != 0)
+ zabort_errno("sigprocmask failed") ;
+ }
+
+ /* Now we have thread_id and mask, prep for using message queue. */
+ if (qpn->queue != NULL)
+ qpn->mts = mqueue_thread_signal_init(qpn->mts, qpn->thread_id, SIGMQUEUE);
+ if (qpn->selection != NULL)
+ qps_set_signal(qpn->selection, SIGMQUEUE, newmask);
+}
+
+/*------------------------------------------------------------------------------
+ * Ask the thread to terminate itself quickly and cleanly.
+ *
+ * Does nothing if terminate already set.
+ */
+void
+qpn_terminate(qpn_nexus qpn)
+{
+ if (!qpn->terminate)
+ {
+ qpn->terminate = true ;
+
+ /* wake up any pselect */
+ if (qpthreads_enabled)
+ qpt_thread_signal(qpn->thread_id, SIGMQUEUE);
+ } ;
+}