summaryrefslogtreecommitdiffstats
path: root/lib/mqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r--lib/mqueue.c615
1 files changed, 615 insertions, 0 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c
new file mode 100644
index 00000000..7b93e4b0
--- /dev/null
+++ b/lib/mqueue.c
@@ -0,0 +1,615 @@
+/* Message Queue data structure -- functions
+ * Copyright (C) 2009 Chris Hall (GMCH), Highwayman
+ *
+ * This file is part of GNU Zebra.
+ *
+ * GNU Zebra is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * GNU Zebra is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Zebra; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <string.h>
+
+#include "memory.h"
+#include "mqueue.h"
+#include "zassert.h"
+
+/*==============================================================================
+ * These message queues are designed for inter-qpthread communication.
+ *
+ * A message queue carries messages from one or more qpthreads to one or more
+ * other qpthreads.
+ *
+ * If !qpthreads_enabled, then a message queue hold messages for the program
+ * to consume later. There are never any waiters. Timeouts are ignored.
+ *
+ * A message queue has one ordinary priority queue and one high priority
+ * queue.
+ *
+ * There are four types of queue, depending on how qpthreads wait and how they
+ * are woken up:
+ *
+ * mqt_cond_unicast -- wait on condition variable, one waiter kicked
+ * mqt_cond_broadcast -- wait on condition variable, all waiters kicked
+ * mqt_signal_unicast -- wait for signal, one waiter kicked
+ * mqt_signal_broadcast -- wait for signal, all waiters kicked
+ *
+ * For condition variables there is a timeout mechanism so that waiters
+ * are woken up at least every now and then. The message queue maintains
+ * a timeout time and a timeout interval. The timeout time is a qtime_mono_t
+ * time -- so is monotonic.
+ *
+ * When waiting, an explicit timeout may be given, otherwise the stored timeout
+ * will be used:
+ *
+ * wait until explicit/stored timeout
+ * if times out and there is a stored interval:
+ * new stored timeout = stored timeout + stored interval
+ * if new stored timeout < time now
+ * new stored timeout = time now + stored interval
+ *
+ * Left to its own devices, this will produce a regular timeout every interval,
+ * assuming that the queue is waited on within the interval. Otherwise the
+ * "clock" will slip.
+ *
+ * There is a default timeout period. The period may be set "infinite".
+ *
+ * For waiters kicked by signal, the wait does not occur within the message
+ * queue code, but the need for a signal is recorded in the message queue.
+ *
+ * Messages take the form of a small block of information which contains:
+ *
+ * * flags -- used by the message handler
+ * * context -- identifies the context of the message (see revoke)
+ *
+ * * action -- void action(mqueue_block) message dispatch
+ * * arg0 -- *void/uintptr_t/intptr_t ) standard arguments
+ * * arg1 -- *void/uintptr_t/intptr_t )
+ *
+ * There are set/get functions for action/arg0/arg1 -- users should not poke
+ * around inside the structure.
+ *
+ * To send a message, first allocate a message block (see mqb_init_new),
+ * then fill in the arguments and enqueue it.
+ *
+ *
+ */
+
+/*==============================================================================
+ * Initialisation etc. for Message Queues.
+ *
+ * TODO: how to shut down a message queue... for reset/exit ?
+ */
+
+/* Initialise new Message Queue, if required (mq == NULL) allocating it.
+ *
+ * For mqt_cond_xxx type queues, sets the default timeout interval and the
+ * initial timeout time to now + that interval.
+ *
+ * NB: once any message queue has been enabled, it is TOO LATE to enable
+ * qpthreads.
+ */
+
+mqueue_queue
+mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
+{
+ if (mq == NULL)
+ mq = XCALLOC(MTYPE_MQUEUE_QUEUE, sizeof(struct mqueue_queue)) ;
+ else
+ memset(mq, 0, sizeof(struct mqueue_queue)) ;
+
+ if (qpt_freeze_qpthreads_enabled())
+ qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ;
+
+ /* head, tail and tail_priority set NULL already */
+ /* waiters set zero already */
+
+ mq->type = type ;
+ switch (type)
+ {
+ case mqt_cond_unicast:
+ case mqt_cond_broadcast:
+ if (qpthreads_enabled)
+ qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ;
+ if (MQUEUE_DEFAULT_INTERVAL != 0)
+ {
+ mq->kick.cond.interval = MQUEUE_DEFAULT_INTERVAL ;
+ mq->kick.cond.timeout = qt_get_monotonic()
+ + MQUEUE_DEFAULT_INTERVAL ;
+ } ;
+ break;
+
+ case mqt_signal_unicast:
+ case mqt_signal_broadcast:
+ /* head/tail pointers set NULL already */
+ break;
+
+ default:
+ zabort("Invalid mqueue queue type") ;
+ } ;
+
+ return mq ;
+} ;
+
+/* Set new timeout interval (or unset by setting <= 0)
+ *
+ * Sets the next timeout to be the time now + new interval (or never).
+ *
+ * This is a waste of time if !qpthreads_enabled, but does no harm. The
+ * timeout is ignored.
+ */
+void
+mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval)
+{
+ qpt_mutex_lock(&mq->mutex) ;
+
+ dassert( (mq->type == mqt_cond_unicast) ||
+ (mq->type == mqt_cond_broadcast) ) ;
+
+ mq->kick.cond.interval = interval ;
+ mq->kick.cond.timeout = (interval > 0) ? qt_add_monotonic(interval)
+ : 0 ;
+ qpt_mutex_unlock(&mq->mutex) ;
+} ;
+
+/*==============================================================================
+ * Message Block memory management.
+ *
+ * Allocates message_block structures in lots of 256. Uses first message_block
+ * in each lot to keep track of the lots.
+ *
+ * mqueue_initialise MUST be called before the first message block is allocated.
+ */
+
+static pthread_mutex_t mqb_mutex ;
+
+#define MQB_LOT_SIZE 256
+
+static mqueue_block mqb_lot_list = NULL ;
+static mqueue_block mqb_free_list = NULL ;
+
+static mqueue_block mqueue_block_new_lot(void) ;
+
+/* Initialise message block (allocate if required) and set action and context.
+ */
+mqueue_block
+mqb_init_new(mqueue_block mqb, mqueue_action action, mqb_context_t context)
+{
+ if (mqb == NULL)
+ {
+ qpt_mutex_lock(&mqb_mutex) ;
+
+ mqb = mqb_free_list ;
+ if (mqb == NULL)
+ mqb = mqueue_block_new_lot() ;
+
+ mqb_free_list = mqb->next ;
+
+ qpt_mutex_unlock(&mqb_mutex) ;
+ } ;
+
+ memset(mqb, 0, sizeof(struct mqueue_block)) ;
+
+ mqb->action = action ;
+ mqb->context = context ;
+
+ return mqb ;
+} ;
+
+/* Free message block when done with it.
+ */
+void
+mqb_free(mqueue_block mqb)
+{
+ qpt_mutex_lock(&mqb_mutex) ;
+
+ mqb->next = mqb_free_list ;
+ mqb_free_list = mqb ;
+
+ qpt_mutex_unlock(&mqb_mutex) ;
+} ;
+
+/* Make a new lot of empty message_block structures.
+ *
+ * NB: caller MUST hold the mqb_mutex.
+ */
+static mqueue_block
+mqueue_block_new_lot(void)
+{
+ mqueue_block first, last, this ;
+
+ mqueue_block new = XCALLOC(MTYPE_MQUEUE_BLOCKS,
+ SIZE(struct mqueue_block, MQB_LOT_SIZE)) ;
+ first = &new[1] ;
+ last = &new[MQB_LOT_SIZE - 1] ;
+
+ new->next = mqb_lot_list ; /* add to list of lots */
+ mqb_lot_list = new ;
+
+ /* String all the new message_blocks together. */
+ this = last ;
+ while (this > first)
+ {
+ mqueue_block prev = this-- ;
+ this->next = prev ;
+ } ;
+ assert(this == first) ;
+
+ last->next = mqb_free_list ; /* point last at old free list */
+ mqb_free_list = first ; /* new blocks at head of free list */
+
+ return mqb_free_list ;
+} ;
+
+/*==============================================================================
+ * Enqueue and dequeue messages.
+ */
+
+static void mqueue_kick_signal(mqueue_queue mq, unsigned n) ;
+static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ;
+
+/* Enqueue message.
+ *
+ * If priority != 0, will enqueue after any previously enqueued priority
+ * messages.
+ *
+ * If there are any waiters, then we kick one or all of them.
+ *
+ * Note that we decrement or zero the waiters count here -- because if the
+ * waiter did it, they might not run before something else is enqueued.
+ * Similarly, if the kick uses a signal, the signal block is dequeued here.
+ *
+ * The waiter count is only incremented when a dequeue is attempted and the
+ * queue is empty, then:
+ *
+ * for a broadcast type message queue, the first message that arrives will
+ * kick all the waiters into action.
+ *
+ * for a signal type message queue, each message that arrives will kick one
+ * waiter.
+ *
+ * NB: this works perfectly well if !qpthreads enabled. Of course, there can
+ * never be any waiters... so no kicking is ever done.
+ */
+
+void
+mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority)
+{
+ qpt_mutex_lock(&mq->mutex) ;
+
+ if (mq->head == NULL)
+ {
+ mqb->next = NULL ;
+ mq->head = mqb ;
+ mq->tail_priority = priority ? mqb : NULL ;
+ mq->tail = mqb ;
+ }
+ else if (priority)
+ {
+ mqueue_block after = mq->tail_priority ;
+ if (after == NULL)
+ {
+ mqb->next = mq->head ;
+ mq->head = mqb ;
+ }
+ else
+ {
+ mqb->next = after->next ;
+ after->next = mqb ;
+ }
+ mq->tail_priority = mqb ;
+ }
+ else
+ {
+ dassert(mq->tail != NULL) ;
+ mqb->next = NULL ;
+ mq->tail->next = mqb ;
+ mq->tail = mqb ;
+ } ;
+
+ if (mq->waiters != 0)
+ {
+ dassert(qpthreads_enabled) ; /* waiters == 0 if !qpthreads_enabled */
+
+ switch (mq->type)
+ {
+ case mqt_cond_unicast:
+ qpt_cond_signal(&mq->kick.cond.wait_here) ;
+ --mq->waiters ;
+ break ;
+
+ case mqt_cond_broadcast:
+ qpt_cond_broadcast(&mq->kick.cond.wait_here) ;
+ mq->waiters = 0 ;
+ break ;
+
+ case mqt_signal_unicast:
+ mqueue_kick_signal(mq, 1) ; /* pick off first and kick it (MUST be */
+ /* one) and decrement the waiters count */
+ break ;
+
+ case mqt_signal_broadcast:
+ mqueue_kick_signal(mq, mq->waiters) ;
+ dassert(mq->kick.signal.head == NULL) ;
+ break;
+
+ default:
+ zabort("Invalid mqueue queue type") ;
+ } ;
+ } ;
+
+ qpt_mutex_unlock(&mq->mutex) ;
+} ;
+
+/* Dequeue message.
+ *
+ * If the queue is empty and wait != 0 (and qpthreads_enabled), will wait for a
+ * message. In which case for:
+ *
+ * * mqt_cond_xxxx type message queues, will wait on the condition variable,
+ * and may timeout.
+ *
+ * If the argument is NULL, uses the already set up timeout, if there is
+ * one.
+ *
+ * If the argument is not NULL, it is a pointer to a qtime_mono_t time,
+ * to be used as the new timeout time.
+ *
+ * * mqt_signal_xxxx type message queues, will register the given signal
+ * (mtsig argument MUST be provided), and return immediately.
+ *
+ * NB: if !qpthreads_enabled, will not wait on the queue. No how.
+ *
+ * Note this means that waiters == 0 all the time if !qpthreads_enabled !
+ *
+ * NB: the argument is ignored if !wait or !qpthreads_enabled, so may be NULL.
+ *
+ * Returns a message block if one is available. (And not otherwise.)
+ */
+
+mqueue_block
+mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
+{
+ mqueue_block mqb ;
+ mqueue_thread_signal last ;
+
+ mqueue_thread_signal mtsig ;
+ qtime_mono_t timeout_time ;
+
+ qpt_mutex_lock(&mq->mutex) ;
+
+ while (1)
+ {
+ mqb = mq->head ;
+ if (mqb != NULL)
+ break ; /* Easy if queue not empty */
+
+ if (!wait || !qpthreads_enabled)
+ goto done ; /* Easy if not waiting ! mqb == NULL */
+ /* Short circuit if !qpthreads_enabled */
+
+ ++mq->waiters ; /* Another waiter */
+
+ switch (mq->type)
+ {
+ case mqt_cond_unicast: /* Now wait here */
+ case mqt_cond_broadcast:
+ if ((arg == NULL) && (mq->kick.cond.interval <= 0))
+ qpt_cond_wait(&mq->kick.cond.wait_here, &mq->mutex) ;
+ else
+ {
+ timeout_time = (arg != NULL) ? *(qtime_mono_t*)arg
+ : mq->kick.cond.timeout ;
+
+ if (qpt_cond_timedwait(&mq->kick.cond.wait_here, &mq->mutex,
+ timeout_time) == 0)
+ {
+ /* Timed out -- update timeout time, if required */
+ if (mq->kick.cond.interval > 0)
+ {
+ qtime_mono_t now = qt_get_monotonic() ;
+ timeout_time = mq->kick.cond.timeout
+ + mq->kick.cond.interval ;
+ if (timeout_time < now)
+ timeout_time = now + mq->kick.cond.interval ;
+
+ mq->kick.cond.timeout = timeout_time ;
+ } ;
+
+ goto done ; /* immediate return. mqb == NULL */
+ } ;
+ } ;
+ break ;
+
+ case mqt_signal_unicast: /* Register desire for signal */
+ case mqt_signal_broadcast:
+ mtsig = arg ;
+ dassert(mtsig != NULL) ;
+
+ last = mq->kick.signal.tail ;
+ if (last == NULL)
+ {
+ mq->kick.signal.head = mtsig ;
+ mtsig->prev = (void*)mq ;
+ }
+ else
+ {
+ last->next = mtsig ;
+ mtsig->prev = last ;
+ }
+ mtsig->next = NULL ;
+ mq->kick.signal.tail = mtsig ;
+
+ goto done ; /* BUT do not wait ! mqb == NULL */
+
+ default:
+ zabort("Invalid mqueue queue type") ;
+ } ;
+ } ;
+
+ /* Have something to pull off the queue */
+
+ mq->head = mqb->next ;
+ if (mqb == mq->tail_priority)
+ mq->tail_priority = NULL ;
+
+done:
+ qpt_mutex_unlock(&mq->mutex) ;
+
+ return mqb ;
+} ;
+
+/* No longer waiting for a signal -- does nothing if !qpthreads_enabled.
+ *
+ * Returns true <=> signal has been kicked
+ *
+ * (Signal will never be kicked if !qpthreads_enabled.)
+ */
+int
+mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig)
+{
+ int kicked ;
+
+ if (!qpthreads_enabled)
+ return 0 ;
+
+ qpt_mutex_lock(&mq->mutex) ;
+
+ dassert( (mq->type == mqt_signal_unicast) ||
+ (mq->type == mqt_signal_broadcast) ) ;
+ dassert(mtsig != NULL) ;
+
+ /* When the thread is signalled, the prev entry is set NULL and the */
+ /* waiters count is decremented. */
+ /* */
+ /* So, only need to do something here if the prev is not NULL (ie the */
+ /* mqueue_thread_signal is still on the list. */
+
+ kicked = (mtsig->prev == NULL) ;
+
+ if (!kicked)
+ mqueue_dequeue_signal(mq, mtsig) ;
+
+ qpt_mutex_unlock(&mq->mutex) ;
+
+ return kicked ;
+} ;
+
+/*==============================================================================
+ * Message queue signal handling
+ */
+
+/* Initialise a message queue signal structure (struct mqueue_thread_signal).
+ * Allocate one if required.
+ *
+ * If !pthreads_enabled, then this structure is entirely redundant, but there
+ * is no harm in creating it -- but the signal will never be used.
+ *
+ * Returns address of the structure.
+ */
+mqueue_thread_signal
+mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread,
+ int signum)
+{
+ if (mqt == NULL)
+ mqt = XCALLOC(MTYPE_MQUEUE_THREAD_SIGNAL,
+ sizeof(struct mqueue_thread_signal)) ;
+ else
+ memset(mqt, 0, sizeof(struct mqueue_thread_signal)) ;
+
+ /* next and prev fields set to NULL already. */
+
+ mqt->qpthread = thread ;
+ mqt->signum = signum ;
+
+ return mqt ;
+} ;
+
+/* Signal the first 'n' threads on the to be signalled list.
+ *
+ * Removes the threads from the list and reduces the waiters count.
+ *
+ * NB: must be qpthreads_enabled with at least 'n' waiters.
+ *
+ * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that
+ * the thread can tell that its signal has been kicked.
+ *
+ * NB: *** MUST own the mqueue_queue mutex. ***
+ */
+static void
+mqueue_kick_signal(mqueue_queue mq, unsigned n)
+{
+ mqueue_thread_signal mtsig ;
+
+ dassert( (qpthreads_enabled) && (mq->waiters >= n) ) ;
+ while (n--)
+ {
+ mqueue_dequeue_signal(mq, mtsig = mq->kick.signal.head) ;
+ qpt_thread_signal(mtsig->qpthread, mtsig->signum) ;
+ } ;
+} ;
+
+/* Remove given signal from given message queue.
+ *
+ * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that
+ * the thread can tell that its signal has been kicked.
+ *
+ * NB: *** MUST own the mqueue_queue mutex. ***
+ */
+static void
+mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig)
+{
+ mqueue_thread_signal next ;
+ mqueue_thread_signal prev ;
+
+ next = mtsig->next ;
+ prev = mtsig->prev ;
+
+ if (prev == (void*)mq) /* marker for head of list */
+ {
+ dassert(mq->kick.signal.head == mtsig) ;
+ mq->kick.signal.head = next ;
+ }
+ else
+ {
+ dassert((prev != NULL) && (prev->next == mtsig)) ;
+ prev->next = next ;
+ } ;
+
+ if (next != NULL)
+ next->prev = prev ;
+
+ mtsig->next = NULL ;
+ mtsig->prev = NULL ; /* essential to show signal kicked */
+ --mq->waiters ; /* one fewer waiter */
+
+ dassert( ((mq->kick.signal.head == NULL) && (mq->waiters == 0)) ||
+ ((mq->kick.signal.head != NULL) && (mq->waiters != 0)) ) ;
+} ;
+
+/*==============================================================================
+ * Initialise Message Queue handling
+ *
+ * Must be called before any qpt_threads are started.
+ *
+ * Freezes qpthreads_enabled.
+ *
+ * TODO: how do we shut down message queue handling ?
+ */
+void
+mqueue_initialise(void)
+{
+ if (qpthreads_enabled_freeze)
+ qpt_mutex_init_new(&mqb_mutex, qpt_mutex_quagga) ;
+} ;