summaryrefslogtreecommitdiffstats
path: root/lib/mqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r--lib/mqueue.c1319
1 files changed, 1319 insertions, 0 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c
new file mode 100644
index 00000000..8fa9fbd5
--- /dev/null
+++ b/lib/mqueue.c
@@ -0,0 +1,1319 @@
+/* Message Queue data structure -- functions
+ * Copyright (C) 2009 Chris Hall (GMCH), Highwayman
+ *
+ * This file is part of GNU Zebra.
+ *
+ * GNU Zebra is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * GNU Zebra is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Zebra; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <string.h>
+
+#include "memory.h"
+#include "mqueue.h"
+#include "zassert.h"
+
+/*==============================================================================
+ * These message queues are designed for inter-qpthread communication.
+ *
+ * A message queue carries messages from one or more qpthreads to one or more
+ * other qpthreads.
+ *
+ * If !qpthreads_enabled, then a message queue holds messages for the program
+ * to consume later. There are never any waiters. Timeouts are ignored.
+ *
+ * A message queue has one ordinary priority queue and one high priority
+ * queue.
+ *
+ * There are four types of queue, depending on how qpthreads wait and how they
+ * are woken up:
+ *
+ * mqt_cond_unicast -- wait on condition variable, one waiter kicked
+ * mqt_cond_broadcast -- wait on condition variable, all waiters kicked
+ * mqt_signal_unicast -- wait for signal, one waiter kicked
+ * mqt_signal_broadcast -- wait for signal, all waiters kicked
+ *
+ * For condition variables there is a timeout mechanism so that waiters
+ * are woken up at least every now and then. The message queue maintains
+ * a timeout time and a timeout interval. The timeout time is a qtime_mono_t
+ * time -- so is monotonic.
+ *
+ * When waiting, an explicit timeout may be given, otherwise the stored timeout
+ * will be used:
+ *
+ * wait until explicit/stored timeout
+ * if times out and there is a stored interval:
+ * new stored timeout = stored timeout + stored interval
+ * if new stored timeout < time now
+ * new stored timeout = time now + stored interval
+ *
+ * Left to its own devices, this will produce a regular timeout every interval,
+ * assuming that the queue is waited on within the interval. Otherwise the
+ * "clock" will slip.
+ *
+ * There is a default timeout period. The period may be set "infinite".
+ *
+ * For waiters kicked by signal, the wait does not occur within the message
+ * queue code, but the need for a signal is recorded in the message queue.
+ *
+ *------------------------------------------------------------------------------
+ * Message Blocks and Arguments
+ *
+ * Messages take the form of a small block of information which contain:
+ *
+ * * action -- void action(mqueue_block) message dispatch
+ * * arg0 -- void* argument
+ * * struct args -- embedded argument structure
+ * * argv -- optional array of union of: *void/uintptr_t/intptr_t
+ *
+ * There are set/get functions for action/arguments -- users should not poke
+ * around inside the structure.
+ *
+ * To send a message, first initialise/allocate a message block
+ * (see mqb_init_new), then fill in the arguments and enqueue it.
+ *
+ * NB: arg0 is expected to be used as the "context" for the message -- to
+ * point to some data common to both ends of the conversation.
+ *
+ * For specific revoke, arg0 is assumed to identify the messages to be
+ * revoked.
+ *
+ * NB: the struct args is expected to be a modest sized structure, carrying
+ * the key elements of the message.
+ *
+ * Some other structure must be overlaid on this, in the same way by sender
+ * and receiver of the message. So:
+ *
+ * mqueue_block mqb = mqb_init_new(NULL, arg0, action_func) ;
+ *
+ * struct my_message* args = mqb_get_args(mqb) ;
+ *
+ * allocates mqueue block, filling in arg0 and the action func. Then
+ * args can be used to fill in a "struct my_message" form of args.
+ *
+ * NB: the sizeof(struct my_message) MUST BE <= mqb_args_size_max !!!
+ *
+ * The argv is an optional, flexible list/array of optional array of
+ * union of: *void/uintptr_t/intptr_t -- see mqb_arg_t et al.
+ *
+ * May set any number of arguments in argv.
+ *
+ * A count of arguments is maintained, and is the highest index set + 1. That
+ * count can be fetched. (So there is no need to maintain it separately.)
+ *
+ * May get any argument by its index -- but it is a fatal error to attempt to
+ * access a non-existent argument (one beyond the known count).
+ *
+ * There is support for pushing values onto the argv "list" and for iterating
+ * along the "list". May also push and pop entire arrays of items.
+ *
+ *==============================================================================
+ * Local Queues
+ *
+ * A local queue may be used within a thread to requeue messages for later
+ * processing.
+ *
+ * Local queues are simple FIFO queues.
+ */
+
+/*==============================================================================
+ * Message Block allocation statics
+ *
+ * Once a message block is allocated it is not deallocated, but kept ready
+ * for future use.
+ *
+ * Keeps a count of free message blocks. (Could at some later date reduce the
+ * number of free message blocks if it is known that some burst of messages has
+ * now passed.)
+ */
+
+static pthread_mutex_t mqb_mutex ; /* for allocation of mqueue blocks */
+
+static mqueue_block mqb_free_list = NULL ;
+static unsigned mqb_free_count = 0 ;
+
+/*==============================================================================
+ * Initialise and shut down Message Queue and Message Block handling
+ */
+
+/*------------------------------------------------------------------------------
+ * Initialise Message Queue handling.
+ *
+ * Must be called before any qpt_threads are started.
+ *
+ * Freezes qpthreads_enabled.
+ */
+extern void
+mqueue_initialise(void)
+{
+ if (qpthreads_enabled_freeze)
+ qpt_mutex_init_new(&mqb_mutex, qpt_mutex_quagga) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Shut down Message Queue handling.
+ *
+ * Release all resources used.
+ *
+ * NB: all pthreads must have stopped -- mutex must be free and no further
+ * uses may be made.
+ */
+extern void
+mqueue_finish(void)
+{
+ mqueue_block mqb ;
+
+ while ((mqb = mqb_free_list) != NULL)
+ {
+ assert(mqb_free_count != 0) ;
+ mqb_free_count-- ;
+ mqb_free_list = mqb->next ;
+ XFREE(MTYPE_MQUEUE_BLOCK, mqb) ;
+ } ;
+
+ assert(mqb_free_count == 0) ;
+
+ qpt_mutex_destroy_keep(&mqb_mutex) ;
+} ;
+
+/*==============================================================================
+ * Initialisation etc. for Message Queue
+ *
+ */
+
+/*------------------------------------------------------------------------------
+ * Initialise new Message Queue, if required (mq == NULL) allocating it.
+ *
+ * For mqt_cond_xxx type queues, sets the default timeout interval and the
+ * initial timeout time to now + that interval.
+ *
+ * NB: once any message queue has been initialised, it is TOO LATE to enable
+ * qpthreads.
+ */
+extern mqueue_queue
+mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
+{
+ if (mq == NULL)
+ mq = XCALLOC(MTYPE_MQUEUE_QUEUE, sizeof(struct mqueue_queue)) ;
+ else
+ memset(mq, 0, sizeof(struct mqueue_queue)) ;
+
+ if (qpt_freeze_qpthreads_enabled())
+ qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ;
+
+ /* head, tail and tail_priority set NULL already */
+ /* count set zero already */
+ /* waiters set zero already */
+
+ mq->type = type ;
+ switch (type)
+ {
+ case mqt_cond_unicast:
+ case mqt_cond_broadcast:
+ qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ;
+
+ if (MQUEUE_DEFAULT_INTERVAL != 0)
+ {
+ mq->kick.cond.interval = MQUEUE_DEFAULT_INTERVAL ;
+ mq->kick.cond.timeout = qt_get_monotonic()
+ + MQUEUE_DEFAULT_INTERVAL ;
+ } ;
+ break;
+
+ case mqt_signal_unicast:
+ case mqt_signal_broadcast:
+ /* head/tail pointers set NULL already */
+ break;
+
+ default:
+ zabort("Invalid mqueue queue type") ;
+ } ;
+
+ return mq ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Empty message queue -- by revoking everything.
+ *
+ * Leaves queue ready for continued use with all existing settings.
+ *
+ * If there were any waiters, they are still waiting.
+ */
+extern void
+mqueue_empty(mqueue_queue mq)
+{
+ mqueue_revoke(mq, NULL) ;
+
+ assert((mq->head == NULL) && (mq->count == 0)) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Reset message queue -- empty it out by revoking everything.
+ *
+ * Frees the structure if required, and returns NULL.
+ * Otherwise zeroises the structure, and returns address of same.
+ *
+ * NB: there MUST NOT be ANY waiters !
+ */
+extern mqueue_queue
+mqueue_reset(mqueue_queue mq, int free_structure)
+{
+ mqueue_empty(mq) ;
+
+ passert(mq->waiters == 0) ;
+
+ qpt_mutex_destroy_keep(&mq->mutex) ;
+
+ switch (mq->type)
+ {
+ case mqt_cond_unicast:
+ case mqt_cond_broadcast:
+ qpt_cond_destroy_keep(&mq->kick.cond.wait_here) ;
+ break;
+
+ case mqt_signal_unicast:
+ case mqt_signal_broadcast:
+ passert(mq->kick.signal.head == NULL) ;
+ break;
+
+ default:
+ zabort("Invalid mqueue queue type") ;
+ } ;
+
+ if (free_structure)
+ XFREE(MTYPE_MQUEUE_QUEUE, mq) ; /* sets mq == NULL */
+ else
+ memset(mq, 0, sizeof(struct mqueue_queue)) ;
+
+ return mq ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Initialise new Local Message Queue, if required (lmq == NULL) allocating it.
+ *
+ * Returns address of Local Message Queue
+ */
+extern mqueue_local_queue
+mqueue_local_init_new(mqueue_local_queue lmq)
+{
+ if (lmq == NULL)
+ lmq = XCALLOC(MTYPE_MQUEUE_QUEUE, sizeof(struct mqueue_local_queue)) ;
+ else
+ memset(lmq, 0, sizeof(struct mqueue_local_queue)) ;
+
+ /* Zeroising the structure is enough to initialise:
+ *
+ * * head -- NULL
+ * * tail -- NULL
+ */
+
+ return lmq ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Reset Local Message Queue, and if required free it.
+ *
+ * Dequeues entries and dispatches them "mqb_destroy", to empty the queue.
+ *
+ * See: mqueue_local_reset_keep(lmq)
+ * mqueue_local_reset_free(lmq)
+ *
+ * Returns address of Local Message Queue
+ */
+extern mqueue_local_queue
+mqueue_local_reset(mqueue_local_queue lmq, int free_structure)
+{
+ mqueue_block mqb ;
+
+ while ((mqb = lmq->head) != NULL)
+ {
+ lmq->head = mqb->next ;
+ mqb_dispatch_destroy(mqb) ;
+ } ;
+
+ if (free_structure)
+ XFREE(MTYPE_MQUEUE_QUEUE, lmq) ; /* sets lmq = NULL */
+ else
+ memset(lmq, 0, sizeof(struct mqueue_local_queue)) ;
+
+ return lmq ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Set new timeout interval (or unset by setting <= 0)
+ *
+ * Sets the next timeout to be the time now + new interval (or never).
+ *
+ * This is a waste of time if !qpthreads_enabled, but does no harm. The
+ * timeout is ignored.
+ */
+extern void
+mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval)
+{
+ qpt_mutex_lock(&mq->mutex) ;
+
+ dassert( (mq->type == mqt_cond_unicast) ||
+ (mq->type == mqt_cond_broadcast) ) ;
+
+ mq->kick.cond.interval = interval ;
+ mq->kick.cond.timeout = (interval > 0) ? qt_add_monotonic(interval)
+ : 0 ;
+ qpt_mutex_unlock(&mq->mutex) ;
+} ;
+
+/*==============================================================================
+ * Message Block memory management.
+ *
+ * Allocates message block structures when required.
+ *
+ * Places those structures on the free list when they are freed.
+ *
+ * Keeps a count of free structures. (Could at some later date reduce the
+ * number of free structures if it is known that some burst of messages has
+ * now passed.)
+ *
+ * mqueue_initialise MUST be called before the first message block is allocated.
+ */
+
+inline static size_t mqb_argv_size(mqb_index_t alloc)
+{
+ return alloc * sizeof(mqb_arg_t) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Initialise message block (allocate if required) and set action & arg0.
+ *
+ * Zeroises the struct args.
+ *
+ * Returns address of message block.
+ */
+extern mqueue_block
+mqb_init_new(mqueue_block mqb, mqueue_action action, void* arg0)
+{
+ if (mqb == NULL)
+ {
+ qpt_mutex_lock(&mqb_mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
+
+ mqb = mqb_free_list ;
+ if (mqb == NULL)
+ {
+ dassert(mqb_free_count == 0) ;
+ mqb = XMALLOC(MTYPE_MQUEUE_BLOCK, sizeof(struct mqueue_block)) ;
+ }
+ else
+ {
+ dassert(mqb_free_count >= 0) ;
+ mqb_free_list = mqb->next ;
+ --mqb_free_count ;
+ } ;
+
+ qpt_mutex_unlock(&mqb_mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+ } ;
+
+ memset(mqb, 0, sizeof(struct mqueue_block)) ;
+
+ mqb->action = action ;
+ mqb->arg0 = arg0 ;
+
+ /* Zeroising the mqb sets:
+ *
+ * next -- NULL
+ *
+ * args -- zeroised
+ *
+ * argv -- NULL -- empty list/array
+ *
+ * argv_count -- 0 -- empty
+ * argv_alloc -- 0 -- nothing allocated
+ * argv_next -- 0 -- iterator reset
+ */
+
+ return mqb ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Re-initialise message block (or allocate if required) and set action & arg0.
+ *
+ * NB: preserves any existing argv, but empties it.
+ *
+ * NB: it is the caller's responsibility to free the value of any argument that
+ * requires it.
+ */
+extern mqueue_block
+mqb_re_init(mqueue_block mqb, mqueue_action action, void* arg0)
+{
+ mqb_index_t argv_alloc ;
+ mqb_arg_t* argv ;
+
+ /* Exactly mqb_init_new if mqb is NULL */
+ if (mqb == NULL)
+ return mqb_init_new(NULL, action, arg0) ;
+
+ /* Otherwise, need to put argv to one side first */
+ argv = mqb->argv ;
+ argv_alloc = mqb->argv_alloc ;
+
+ mqb_init_new(mqb, action, arg0) ;
+
+ /* Now zeroize the argv, and restore it */
+ memset(argv, 0, mqb_argv_size(argv_alloc)) ;
+
+ mqb->argv = argv ;
+ mqb->argv_alloc = argv_alloc ;
+
+ return mqb ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Free message block when done with it.
+ *
+ * Frees any argv argument vector.
+ *
+ * NB: it is the caller's responsibility to free the value of any argument that
+ * requires it.
+ */
+extern void
+mqb_free(mqueue_block mqb)
+{
+ if (mqb->argv != NULL)
+ XFREE(MTYPE_MQUEUE_BLOCK_ARGV, mqb->argv) ;
+
+ qpt_mutex_lock(&mqb_mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
+
+ mqb->next = mqb_free_list ;
+ mqb_free_list = mqb ;
+ ++mqb_free_count ;
+
+ qpt_mutex_unlock(&mqb_mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+} ;
+
+/*==============================================================================
+ * Enqueue and dequeue messages.
+ */
+
+static void mqueue_kick_signal(mqueue_queue mq, unsigned n) ;
+static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ;
+
+/*------------------------------------------------------------------------------
+ * Enqueue message.
+ *
+ * If priority, will enqueue after any previously enqueued priority
+ * messages. (See enum: mqb_priority and mqb_ordinary.)
+ *
+ * If there are any waiters, then we kick one or all of them.
+ *
+ * Note that we decrement or zero the waiters count here -- because if the
+ * waiter did it, they might not run before something else is enqueued.
+ * Similarly, if the kick uses a signal, the signal block is dequeued here.
+ *
+ * The waiter count is only incremented when a dequeue is attempted and the
+ * queue is empty, then:
+ *
+ * for a broadcast type message queue, the first message that arrives will
+ * kick all the waiters into action.
+ *
+ * for a signal type message queue, each message that arrives will kick one
+ * waiter.
+ *
+ * If mq is NULL, the message is not queued but is immediately destroyed.
+ *
+ * NB: this works perfectly well if !qpthreads enabled. Of course, there can
+ * never be any waiters... so no kicking is ever done.
+ */
+extern void
+mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, enum mqb_rank priority)
+{
+ if (mq == NULL)
+ return mqb_dispatch_destroy(mqb) ;
+
+ qpt_mutex_lock(&mq->mutex) ;
+
+ if (mq->head == NULL)
+ {
+ assert(mq->count == 0) ;
+ mqb->next = NULL ;
+ mq->head = mqb ;
+ mq->tail_priority = priority ? mqb : NULL ;
+ mq->tail = mqb ;
+ }
+ else
+ {
+ assert(mq->count > 0) ;
+ if (priority)
+ {
+ mqueue_block after = mq->tail_priority ;
+ if (after == NULL)
+ {
+ mqb->next = mq->head ;
+ mq->head = mqb ;
+ /* mq non-empty, enchain at head, therefore tail unaffected */
+ }
+ else
+ {
+ mqb->next = after->next ;
+ after->next = mqb ;
+ /* if only have priority messages then fix tail */
+ if (mq->tail == after)
+ mq->tail = mqb;
+ }
+ mq->tail_priority = mqb ;
+ }
+ else
+ {
+ dassert(mq->tail != NULL) ;
+ mqb->next = NULL ;
+ mq->tail->next = mqb ;
+ mq->tail = mqb ;
+ } ;
+ } ;
+
+ ++mq->count ;
+
+ if (mq->waiters != 0)
+ {
+ dassert(qpthreads_enabled) ; /* waiters == 0 if !qpthreads_enabled */
+
+ switch (mq->type)
+ {
+ case mqt_cond_unicast:
+ qpt_cond_signal(&mq->kick.cond.wait_here) ;
+ --mq->waiters ;
+ break ;
+
+ case mqt_cond_broadcast:
+ qpt_cond_broadcast(&mq->kick.cond.wait_here) ;
+ mq->waiters = 0 ;
+ break ;
+
+ case mqt_signal_unicast:
+ mqueue_kick_signal(mq, 1) ; /* pick off first and kick it (MUST be */
+ /* one) and decrement the waiters count */
+ break ;
+
+ case mqt_signal_broadcast:
+ mqueue_kick_signal(mq, mq->waiters) ;
+ dassert(mq->kick.signal.head == NULL) ;
+ break;
+
+ default:
+ zabort("Invalid mqueue queue type") ;
+ } ;
+ } ;
+
+ qpt_mutex_unlock(&mq->mutex) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Dequeue message.
+ *
+ * If the queue is empty and wait != 0 (and qpthreads_enabled), will wait for a
+ * message. In which case for:
+ *
+ * * mqt_cond_xxxx type message queues, will wait on the condition variable,
+ * and may timeout.
+ *
+ * If the argument is NULL, uses the already set up timeout, if there is
+ * one.
+ *
+ * If the argument is not NULL, it is a pointer to a qtime_mono_t time,
+ * to be used as the new timeout time.
+ *
+ * * mqt_signal_xxxx type message queues, will register the given signal
+ * (mtsig argument MUST be provided), and return immediately.
+ *
+ * NB: if !qpthreads_enabled, will not wait on the queue. No how.
+ *
+ * Note this means that waiters == 0 all the time if !qpthreads_enabled !
+ *
+ * NB: the argument is ignored if !wait or !qpthreads_enabled, so may be NULL.
+ *
+ * Returns a message block if one is available. (And not otherwise.)
+ *
+ * NB: if mq is NULL, returns NULL -- nothing available
+ */
+extern mqueue_block
+mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
+{
+ mqueue_block mqb ;
+ mqueue_thread_signal last ;
+
+ mqueue_thread_signal mtsig ;
+ qtime_mono_t timeout_time ;
+
+ if (mq == NULL)
+ return NULL ;
+
+ qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
+
+ while (1)
+ {
+ mqb = mq->head ;
+ if (mqb != NULL)
+ break ; /* Easy if queue not empty */
+
+ assert(mq->count == 0) ;
+
+ if (!wait || !qpthreads_enabled)
+ goto done ; /* Easy if not waiting ! mqb == NULL */
+ /* Short circuit if !qpthreads_enabled */
+
+ ++mq->waiters ; /* Another waiter */
+
+ switch (mq->type)
+ {
+ case mqt_cond_unicast: /* Now wait here */
+ case mqt_cond_broadcast:
+ if ((arg == NULL) && (mq->kick.cond.interval <= 0))
+ qpt_cond_wait(&mq->kick.cond.wait_here, &mq->mutex) ;
+ else
+ {
+ timeout_time = (arg != NULL) ? *(qtime_mono_t*)arg
+ : mq->kick.cond.timeout ;
+
+ if (qpt_cond_timedwait(&mq->kick.cond.wait_here, &mq->mutex,
+ timeout_time) == 0)
+ {
+ /* Timed out -- update timeout time, if required */
+ if (mq->kick.cond.interval > 0)
+ {
+ qtime_mono_t now = qt_get_monotonic() ;
+ timeout_time = mq->kick.cond.timeout
+ + mq->kick.cond.interval ;
+ if (timeout_time < now)
+ timeout_time = now + mq->kick.cond.interval ;
+
+ mq->kick.cond.timeout = timeout_time ;
+ } ;
+
+ goto done ; /* immediate return. mqb == NULL */
+ } ;
+ } ;
+ break ;
+
+ case mqt_signal_unicast: /* Register desire for signal */
+ case mqt_signal_broadcast:
+ mtsig = arg ;
+ dassert(mtsig != NULL) ;
+
+ if (mq->kick.signal.head == NULL)
+ {
+ mq->kick.signal.head = mtsig ;
+ mtsig->prev = (void*)mq ;
+ }
+ else
+ {
+ last = mq->kick.signal.tail ;
+ last->next = mtsig ;
+ mtsig->prev = last ;
+ }
+ mtsig->next = NULL ;
+ mq->kick.signal.tail = mtsig ;
+
+ goto done ; /* BUT do not wait ! mqb == NULL */
+
+ default:
+ zabort("Invalid mqueue queue type") ;
+ } ;
+ } ;
+
+ /* Have something to pull off the queue */
+
+ assert(mq->count > 0) ;
+ --mq->count ;
+
+ mq->head = mqb->next ;
+
+ /* fix tails if at tail */
+ if (mqb == mq->tail)
+ mq->tail = NULL ;
+ if (mqb == mq->tail_priority)
+ mq->tail_priority = NULL ;
+
+done:
+ qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+
+ return mqb ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Revoke message(s)
+ *
+ * Revokes all messages, or only messages whose arg0 matches the given value.
+ * (If the given value is NULL revokes everything.)
+ *
+ * Revokes by calling mqb_dispatch_destroy().
+ *
+ * During a revoke() operation more items may be enqueued, but no other mqueue
+ * operations may be performed. Enqueued items may promptly be revoked, except
+ * for priority items if the revoke operation has already moved past the last
+ * priority item.
+ *
+ * If mq is NULL, does nothing.
+ */
+extern void
+mqueue_revoke(mqueue_queue mq, void* arg0)
+{
+ mqueue_block mqb ;
+ mqueue_block prev ;
+
+ if (mq == NULL)
+ return ;
+
+ qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
+
+ prev = NULL ;
+ while (1)
+ {
+ if (prev == NULL)
+ mqb = mq->head ;
+ else
+ mqb = prev->next ;
+
+ if (mqb == NULL)
+ break ;
+
+ if ((arg0 == NULL) || (arg0 == mqb->arg0))
+ {
+ assert(mq->count > 0) ;
+
+ if (prev == NULL)
+ mq->head = mqb->next ;
+ else
+ prev->next = mqb->next ;
+
+ if (mqb == mq->tail)
+ mq->tail = prev ;
+
+ if (mqb == mq->tail_priority)
+ mq->tail_priority = prev ;
+
+ --mq->count ;
+
+ qpt_mutex_unlock(&mq->mutex) ;
+ mqb_dispatch_destroy(mqb) ;
+ qpt_mutex_lock(&mq->mutex) ;
+ }
+ else
+ prev = mqb ;
+ } ;
+
+ qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+} ;
+
+/*------------------------------------------------------------------------------
+ * No longer waiting for a signal -- does nothing if !qpthreads_enabled.
+ *
+ * Returns true <=> signal has been kicked
+ *
+ * (Signal will never be kicked if !qpthreads_enabled.)
+ */
+extern int
+mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig)
+{
+ int kicked ;
+
+ if (!qpthreads_enabled)
+ return 0 ;
+
+ qpt_mutex_lock(&mq->mutex) ;
+
+ dassert( (mq->type == mqt_signal_unicast) ||
+ (mq->type == mqt_signal_broadcast) ) ;
+ dassert(mtsig != NULL) ;
+
+ /* When the thread is signalled, the prev entry is set NULL and the */
+ /* waiters count is decremented. */
+ /* */
+ /* So, only need to do something here if the prev is not NULL (ie the */
+ /* mqueue_thread_signal is still on the list. */
+
+ kicked = (mtsig->prev == NULL) ;
+
+ if (!kicked)
+ mqueue_dequeue_signal(mq, mtsig) ;
+
+ qpt_mutex_unlock(&mq->mutex) ;
+
+ return kicked ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Enqueue message on local queue -- at tail
+ */
+extern void
+mqueue_local_enqueue(mqueue_local_queue lmq, mqueue_block mqb)
+{
+ if (lmq->head == NULL)
+ lmq->head = mqb ;
+ else
+ lmq->tail->next = mqb ;
+ lmq->tail = mqb ;
+ mqb->next = NULL ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Enqueue message on local queue -- at head
+ */
+extern void
+mqueue_local_enqueue_head(mqueue_local_queue lmq, mqueue_block mqb)
+{
+ if (lmq->head == NULL)
+ lmq->tail = mqb ;
+
+ mqb->next = lmq->head ;
+ lmq->head = mqb ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Dequeue message from local queue -- returns NULL if empty
+ */
+extern mqueue_block
+mqueue_local_dequeue(mqueue_local_queue lmq)
+{
+ mqueue_block mqb = lmq->head ;
+
+ if (mqb != NULL)
+ lmq->head = mqb->next ;
+
+ return mqb ;
+} ;
+
+/*==============================================================================
+ * Message queue signal handling
+ */
+
+/*------------------------------------------------------------------------------
+ * Initialise a message queue signal structure (struct mqueue_thread_signal).
+ * Allocate one if required.
+ *
+ * If !pthreads_enabled, then this structure is entirely redundant, but there
+ * is no harm in creating it -- but the signal will never be used.
+ *
+ * Returns address of the structure.
+ */
+extern mqueue_thread_signal
+mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread,
+ int signum)
+{
+ if (mqt == NULL)
+ mqt = XCALLOC(MTYPE_MQUEUE_THREAD_SIGNAL,
+ sizeof(struct mqueue_thread_signal)) ;
+ else
+ memset(mqt, 0, sizeof(struct mqueue_thread_signal)) ;
+
+ /* next and prev fields set to NULL already. */
+
+ mqt->qpthread = thread ;
+ mqt->signum = signum ;
+
+ return mqt ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Reset a message queue signal structure and release if required.
+ *
+ * NB: MUST NOT be queued as a waiter anywhere !!
+ *
+ * Frees the structure if required, and returns NULL.
+ * Otherwise zeroises the structure, and returns address of same.
+ */
+extern mqueue_thread_signal
+mqueue_thread_signal_reset(mqueue_thread_signal mqt, int free_structure)
+{
+ passert(mqt->prev == NULL) ;
+
+ if (free_structure)
+ XFREE(MTYPE_MQUEUE_THREAD_SIGNAL, mqt) ; /* sets mqt = NULL */
+ else
+ memset(mqt, 0, sizeof(struct mqueue_thread_signal)) ;
+
+ return mqt ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Signal the first 'n' threads on the to be signalled list.
+ *
+ * Removes the threads from the list and reduces the waiters count.
+ *
+ * NB: must be qpthreads_enabled with at least 'n' waiters.
+ *
+ * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that
+ * the thread can tell that its signal has been kicked.
+ *
+ * NB: *** MUST own the mqueue_queue mutex. ***
+ */
+static void
+mqueue_kick_signal(mqueue_queue mq, unsigned n)
+{
+ mqueue_thread_signal mtsig ;
+
+ dassert( (qpthreads_enabled) && (mq->waiters >= n) ) ;
+ while (n--)
+ {
+ mqueue_dequeue_signal(mq, mtsig = mq->kick.signal.head) ;
+ qpt_thread_signal(mtsig->qpthread, mtsig->signum) ;
+ } ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Remove given signal from given message queue.
+ *
+ * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that
+ * the thread can tell that its signal has been kicked.
+ *
+ * NB: *** MUST own the mqueue_queue mutex. ***
+ */
+static void
+mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig)
+{
+ mqueue_thread_signal next ;
+ mqueue_thread_signal prev ;
+
+ next = mtsig->next ;
+ prev = mtsig->prev ;
+
+ if (prev == (void*)mq) /* marker for head of list */
+ {
+ dassert(mq->kick.signal.head == mtsig) ;
+ mq->kick.signal.head = next ;
+ }
+ else
+ {
+ dassert((prev != NULL) && (prev->next == mtsig)) ;
+ prev->next = next ;
+ } ;
+
+ if (next != NULL)
+ next->prev = prev ;
+
+ mtsig->next = NULL ;
+ mtsig->prev = NULL ; /* essential to show signal kicked */
+ --mq->waiters ; /* one fewer waiter */
+
+ dassert( ((mq->kick.signal.head == NULL) && (mq->waiters == 0)) ||
+ ((mq->kick.signal.head != NULL) && (mq->waiters != 0)) ) ;
+} ;
+
+/*==============================================================================
+ * Message Queue Block Argument Handling
+ */
+
+static void mqb_argv_extend(mqueue_block mqb, mqb_index_t iv) ;
+
+/*------------------------------------------------------------------------------
+ * Get pointer to argv[iv] -- extending if required
+ */
+inline static mqb_arg_t*
+mqb_p_arg_set(mqueue_block mqb, mqb_index_t iv)
+{
+ if (iv >= mqb->argv_count)
+ {
+ if (iv >= mqb->argv_alloc)
+ mqb_argv_extend(mqb, iv) ;
+ mqb->argv_count = iv + 1 ;
+ } ;
+
+ return &mqb->argv[iv] ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Set pointer argv[iv] to given value.
+ */
+extern void
+mqb_set_argv_p(mqueue_block mqb, mqb_index_t iv, mqb_ptr_t p)
+{
+ mqb_arg_t* p_arg = mqb_p_arg_set(mqb, iv) ;
+ p_arg->p = p ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Set integer argv[iv] to given value.
+ */
+extern void
+mqb_set_argv_i(mqueue_block mqb, mqb_index_t iv, mqb_int_t i)
+{
+ mqb_arg_t* p_arg = mqb_p_arg_set(mqb, iv) ;
+ p_arg->i = i ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Set unsigned integer argv[iv] to given value.
+ */
+extern void
+mqb_set_argv_u(mqueue_block mqb, mqb_index_t iv, mqb_uint_t u)
+{
+ mqb_arg_t* p_arg = mqb_p_arg_set(mqb, iv) ;
+ p_arg->u = u ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Set size of argv[].
+ *
+ * This is entirely optional, and may be used to ensure that at least the given
+ * number of elements have been allocated.
+ *
+ * Does not change the "count". Will not reduce the allocated size.
+ *
+ * Just avoids repeated extensions of argv if it is known that it will become
+ * large.
+ */
+extern void
+mqb_set_argv_size(mqueue_block mqb, unsigned n)
+{
+ if (n > mqb->argv_alloc)
+ mqb_argv_extend(mqb, n - 1) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Push a pointer onto the argv "list"
+ */
+extern void
+mqb_push_argv_p(mqueue_block mqb, mqb_ptr_t p)
+{
+ mqb_arg_t* p_arg ;
+
+ p_arg = mqb_p_arg_set(mqb, mqb->argv_count) ;
+ p_arg->p = p ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Push an integer onto the argv "list"
+ */
+extern void
+mqb_push_argv_i(mqueue_block mqb, mqb_int_t i)
+{
+ mqb_arg_t* p_arg ;
+
+ p_arg = mqb_p_arg_set(mqb, mqb->argv_count) ;
+ p_arg->i = i ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Push an unsigned integer onto the argv "list"
+ */
+extern void
+mqb_push_argv_u(mqueue_block mqb, mqb_uint_t u)
+{
+ mqb_arg_t* p_arg ;
+
+ p_arg = mqb_p_arg_set(mqb, mqb->argv_count) ;
+ p_arg->u = u ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Push an array of 'n' void* pointers onto the argv "list"
+ */
+extern void
+mqb_push_argv_array(mqueue_block mqb, unsigned n, void** array)
+{
+ mqb_index_t iv ;
+
+ /* need do nothing if n == 0, get out now to avoid edge cases */
+ if (n == 0)
+ return ;
+
+ /* make sure we are allocated upto and including the last array item */
+ iv = mqb->argv_count ;
+ mqb_set_argv_size(mqb, iv + n - 1) ;
+
+ /* require that mqb_ptr_t values exactly fill mqb_arg_t entries */
+ /* and that mqb_ptr_t values are exactly same as void* values */
+ CONFIRM(sizeof(mqb_ptr_t) == sizeof(mqb_arg_t)) ;
+ CONFIRM(sizeof(mqb_ptr_t) == sizeof(void*)) ;
+
+ /* copy the pointers */
+ memcpy(&mqb->argv[iv], array, sizeof(void*) * n) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Get pointer to argv[iv] -- which MUST exist
+ *
+ * NB: it is a FATAL error to reference an argument beyond the last one set.
+ */
+inline static mqb_arg_t*
+mqb_p_arg_get(mqueue_block mqb, mqb_index_t iv)
+{
+ if (iv >= mqb->argv_count)
+ zabort("invalid message block argument index") ;
+
+ return &mqb->argv[iv] ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Get pointer value of argv[iv]
+ *
+ * NB: it is a FATAL error to reference an argument beyond the last one set.
+ *
+ * mqb_get_argv_count() returns the number of arguments set in argv.
+ */
+extern mqb_ptr_t
+mqb_get_argv_p(mqueue_block mqb, mqb_index_t iv)
+{
+ mqb_arg_t* p_arg = mqb_p_arg_get(mqb, iv) ;
+ return p_arg->p ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Get integer value of argv[iv]
+ *
+ * NB: it is a FATAL error to reference an argument beyond the last one set.
+ *
+ * mqb_get_argv_count() returns the number of arguments set in argv.
+ */
+extern mqb_int_t
+mqb_get_argv_i(mqueue_block mqb, mqb_index_t iv)
+{
+ mqb_arg_t* p_arg = mqb_p_arg_get(mqb, iv) ;
+ return p_arg->i ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Get unsigned integer value of argv[iv]
+ *
+ * NB: it is a FATAL error to reference an argument beyond the last one set.
+ *
+ * mqb_get_argv_count() returns the number of arguments set in argv.
+ */
+extern mqb_uint_t
+mqb_get_argv_u(mqueue_block mqb, mqb_index_t iv)
+{
+ mqb_arg_t* p_arg = mqb_p_arg_get(mqb, iv) ;
+ return p_arg->u ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Get pointer value of next argv "list" argument -- if any.
+ *
+ * There is a "next" counter in the message queue block, which is reset when
+ * the mqb is initialised or re-initialised, and by mqb_reset_argv_next().
+ *
+ * NB: returns NULL if there is no "list" or if already at the end of same.
+ */
+extern mqb_ptr_t
+mqb_next_argv_p(mqueue_block mqb)
+{
+ if (mqb->argv_next >= mqb->argv_count)
+ return NULL ;
+
+ return mqb_get_argv_p(mqb, mqb->argv_next++) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Get integer value of next argv "list" argument -- if any.
+ *
+ * There is a "next" counter in the message queue block, which is reset when
+ * the mqb is initialised or re-initialised, and by mqb_reset_argv_next().
+ *
+ * NB: returns 0 if there is no "list" or if already at the end of same.
+ */
+extern mqb_int_t
+mqb_next_argv_i(mqueue_block mqb)
+{
+ if (mqb->argv_next >= mqb->argv_count)
+ return 0 ;
+
+ return mqb_get_argv_i(mqb, mqb->argv_next++) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Get unsigned integer value of next argv "list" argument -- if any.
+ *
+ * There is a "next" counter in the message queue block, which is reset when
+ * the mqb is initialised or re-initialised, and by mqb_reset_argv_next().
+ *
+ * NB: returns 0 if there is no "list" or if already at the end of same.
+ */
+extern mqb_uint_t
+mqb_next_argv_u(mqueue_block mqb)
+{
+ if (mqb->argv_next >= mqb->argv_count)
+ return 0 ;
+
+ return mqb_get_argv_u(mqb, mqb->argv_next++) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Pop an array of 'n' void* pointers from the argv "list"
+ *
+ * There is a "next" counter in the message queue block, which is reset when
+ * the mqb is initialised or re-initialised, and by mqb_reset_argv_next().
+ *
+ * Treats from "next" to the end of the "list" as an array of void* pointers.
+ *
+ * Creates a temporary void* [] array (MTYPE_TMP), which caller must free.
+ *
+ * NB: returns NULL if there is no "list" or if already at the end of same.
+ */
+extern void**
+mqb_pop_argv_array(mqueue_block mqb)
+{
+ void** array ;
+ unsigned n ;
+
+ mqb_index_t iv = mqb->argv_next ;
+
+ /* worry about state of "next" and get out if nothing to do. */
+ if (iv >= mqb->argv_count)
+ return NULL ;
+
+ /* work out how much to pop and update "next" */
+ n = mqb->argv_count - iv ;
+
+ mqb->argv_next = mqb->argv_count ;
+
+ /* construct target array */
+ array = XMALLOC(MTYPE_TMP, sizeof(void*) * n) ;
+
+ /* require that mqb_ptr_t values exactly fill mqb_arg_t entries */
+ /* and that mqb_ptr_t values are exactly same as void* values */
+ CONFIRM(sizeof(mqb_ptr_t) == sizeof(mqb_arg_t)) ;
+ CONFIRM(sizeof(mqb_ptr_t) == sizeof(void*)) ;
+
+ /* now transfer pointers to the array */
+ memcpy(array, mqb->argv + iv, sizeof(void*) * n) ;
+
+ return array ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Extend the argv to include at least given iv.
+ *
+ * The number of argv slots allocated is arranged to be a multiple of
+ * mqb_argv_size_unit.
+ *
+ * Ensures that newly created slots are zeroised.
+ */
+static void
+mqb_argv_extend(mqueue_block mqb, mqb_index_t iv)
+{
+ mqb_index_t need ; /* total slots required */
+ mqb_index_t have ;
+
+ have = mqb->argv_alloc ;
+ assert(have <= iv) ;
+
+ need = ((iv / mqb_argv_size_unit) + 1) * mqb_argv_size_unit ;
+
+ if (mqb->argv == NULL)
+ mqb->argv = XCALLOC(MTYPE_MQUEUE_BLOCK_ARGV, mqb_argv_size(need)) ;
+ else
+ {
+ mqb->argv = XREALLOC(MTYPE_MQUEUE_BLOCK_ARGV, mqb->argv,
+ mqb_argv_size(need)) ;
+ memset(&mqb->argv[have], 0, mqb_argv_size(need - have)) ;
+ } ;
+
+ mqb->argv_alloc = need ;
+} ;