summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/mqueue.c184
-rw-r--r--lib/mqueue.h181
2 files changed, 273 insertions, 92 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c
index 5fe892c2..7b93e4b0 100644
--- a/lib/mqueue.c
+++ b/lib/mqueue.c
@@ -31,6 +31,9 @@
* A message queue carries messages from one or more qpthreads to one or more
* other qpthreads.
*
+ * If !qpthreads_enabled, then a message queue hold messages for the program
+ * to consume later. There are never any waiters. Timeouts are ignored.
+ *
* A message queue has one ordinary priority queue and one high priority
* queue.
*
@@ -71,12 +74,13 @@
* * context -- identifies the context of the message (see revoke)
*
* * action -- void action(mqueue_block) message dispatch
- * * arg_0 -- *void/uintptr_t/intptr_t ) standard arguments
- * * arg_1 -- *void/uintptr_t/intptr_t )
+ * * arg0 -- *void/uintptr_t/intptr_t ) standard arguments
+ * * arg1 -- *void/uintptr_t/intptr_t )
*
- * (see struct mqueue_block).
+ * There are set/get functions for action/arg0/arg1 -- users should not poke
+ * around inside the structure.
*
- * To send a message, first allocate a message block (see mqueue_block_new),
+ * To send a message, first allocate a message block (see mqb_init_new),
* then fill in the arguments and enqueue it.
*
*
@@ -92,6 +96,9 @@
*
* For mqt_cond_xxx type queues, sets the default timeout interval and the
* initial timeout time to now + that interval.
+ *
+ * NB: once any message queue has been enabled, it is TOO LATE to enable
+ * qpthreads.
*/
mqueue_queue
@@ -102,7 +109,8 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
else
memset(mq, 0, sizeof(struct mqueue_queue)) ;
- qpt_mutex_init(&mq->mutex, qpt_mutex_quagga) ;
+ if (qpt_freeze_qpthreads_enabled())
+ qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ;
/* head, tail and tail_priority set NULL already */
/* waiters set zero already */
@@ -112,7 +120,8 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
{
case mqt_cond_unicast:
case mqt_cond_broadcast:
- qpt_cond_init(&mq->kick.cond.wait_here, qpt_cond_quagga) ;
+ if (qpthreads_enabled)
+ qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ;
if (MQUEUE_DEFAULT_INTERVAL != 0)
{
mq->kick.cond.interval = MQUEUE_DEFAULT_INTERVAL ;
@@ -136,6 +145,9 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
/* Set new timeout interval (or unset by setting <= 0)
*
* Sets the next timeout to be the time now + new interval (or never).
+ *
+ * This is a waste of time if !qpthreads_enabled, but does no harm. The
+ * timeout is ignored.
*/
void
mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval)
@@ -146,7 +158,7 @@ mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval)
(mq->type == mqt_cond_broadcast) ) ;
mq->kick.cond.interval = interval ;
- mq->kick.cond.timeout = (interval > 0) ? qt_get_monotonic() + interval
+ mq->kick.cond.timeout = (interval > 0) ? qt_add_monotonic(interval)
: 0 ;
qpt_mutex_unlock(&mq->mutex) ;
} ;
@@ -160,55 +172,57 @@ mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval)
* mqueue_initialise MUST be called before the first message block is allocated.
*/
-static pthread_mutex_t* p_mb_mutex ; /* NULL => no mutex (yet) */
-static pthread_mutex_t mb_mutex ;
+static pthread_mutex_t mqb_mutex ;
-#define MB_LOT_SIZE 256
+#define MQB_LOT_SIZE 256
-static mqueue_block mb_lot_list = NULL ;
-static mqueue_block mb_free_list = NULL ;
+static mqueue_block mqb_lot_list = NULL ;
+static mqueue_block mqb_free_list = NULL ;
static mqueue_block mqueue_block_new_lot(void) ;
-/* Get an empty message block
+/* Initialise message block (allocate if required) and set action and context.
*/
mqueue_block
-mqueue_block_new(void)
+mqb_init_new(mqueue_block mqb, mqueue_action action, mqb_context_t context)
{
- mqueue_block mb ;
+ if (mqb == NULL)
+ {
+ qpt_mutex_lock(&mqb_mutex) ;
- qpt_mutex_lock(&mb_mutex) ;
+ mqb = mqb_free_list ;
+ if (mqb == NULL)
+ mqb = mqueue_block_new_lot() ;
- mb = mb_free_list ;
- if (mb == NULL)
- mb = mqueue_block_new_lot() ;
+ mqb_free_list = mqb->next ;
- mb_free_list = mb->next ;
+ qpt_mutex_unlock(&mqb_mutex) ;
+ } ;
- qpt_mutex_unlock(&mb_mutex) ;
+ memset(mqb, 0, sizeof(struct mqueue_block)) ;
- memset(mb, 0, sizeof(struct mqueue_block)) ;
+ mqb->action = action ;
+ mqb->context = context ;
- return mb ;
+ return mqb ;
} ;
/* Free message block when done with it.
*/
void
-mqueue_block_free(mqueue_block mb)
+mqb_free(mqueue_block mqb)
{
- qpt_mutex_lock(&mb_mutex) ;
+ qpt_mutex_lock(&mqb_mutex) ;
- mb->next = mb_free_list ;
- mb_free_list = mb ;
+ mqb->next = mqb_free_list ;
+ mqb_free_list = mqb ;
- qpt_mutex_unlock(&mb_mutex) ;
+ qpt_mutex_unlock(&mqb_mutex) ;
} ;
/* Make a new lot of empty message_block structures.
*
- * NB: caller MUST hold the mb_mutex.
- *
+ * NB: caller MUST hold the mqb_mutex.
*/
static mqueue_block
mqueue_block_new_lot(void)
@@ -216,12 +230,12 @@ mqueue_block_new_lot(void)
mqueue_block first, last, this ;
mqueue_block new = XCALLOC(MTYPE_MQUEUE_BLOCKS,
- SIZE(struct mqueue_block, MB_LOT_SIZE)) ;
+ SIZE(struct mqueue_block, MQB_LOT_SIZE)) ;
first = &new[1] ;
- last = &new[MB_LOT_SIZE - 1] ;
+ last = &new[MQB_LOT_SIZE - 1] ;
- new->next = mb_lot_list ; /* add to list of lots */
- mb_lot_list = new ;
+ new->next = mqb_lot_list ; /* add to list of lots */
+ mqb_lot_list = new ;
/* String all the new message_blocks together. */
this = last ;
@@ -232,17 +246,17 @@ mqueue_block_new_lot(void)
} ;
assert(this == first) ;
- last->next = mb_free_list ; /* point last at old free list */
- mb_free_list = first ; /* new blocks at head of free list */
+ last->next = mqb_free_list ; /* point last at old free list */
+ mqb_free_list = first ; /* new blocks at head of free list */
- return mb_free_list ;
+ return mqb_free_list ;
} ;
/*==============================================================================
* Enqueue and dequeue messages.
*/
-static void mqueue_kick_signal(mqueue_queue mq, int n) ;
+static void mqueue_kick_signal(mqueue_queue mq, unsigned n) ;
static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ;
/* Enqueue message.
@@ -264,45 +278,50 @@ static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ;
*
* for a signal type message queue, each message that arrives will kick one
* waiter.
+ *
+ * NB: this works perfectly well if !qpthreads enabled. Of course, there can
+ * never be any waiters... so no kicking is ever done.
*/
void
-mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority)
+mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority)
{
qpt_mutex_lock(&mq->mutex) ;
if (mq->head == NULL)
{
- mb->next = NULL ;
- mq->head = mb ;
- mq->tail_priority = priority ? mb : NULL ;
- mq->tail = mb ;
+ mqb->next = NULL ;
+ mq->head = mqb ;
+ mq->tail_priority = priority ? mqb : NULL ;
+ mq->tail = mqb ;
}
else if (priority)
{
mqueue_block after = mq->tail_priority ;
if (after == NULL)
{
- mb->next = mq->head ;
- mq->head = mb ;
+ mqb->next = mq->head ;
+ mq->head = mqb ;
}
else
{
- mb->next = after->next ;
- after->next = mb ;
+ mqb->next = after->next ;
+ after->next = mqb ;
}
- mq->tail_priority = mb ;
+ mq->tail_priority = mqb ;
}
else
{
dassert(mq->tail != NULL) ;
- mb->next = NULL ;
- mq->tail->next = mb ;
- mq->tail = mb ;
+ mqb->next = NULL ;
+ mq->tail->next = mqb ;
+ mq->tail = mqb ;
} ;
if (mq->waiters != 0)
{
+ dassert(qpthreads_enabled) ; /* waiters == 0 if !qpthreads_enabled */
+
switch (mq->type)
{
case mqt_cond_unicast:
@@ -335,8 +354,8 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority)
/* Dequeue message.
*
- * If the queue is empty and wait != 0, will wait for a message. In which
- * case for:
+ * If the queue is empty and wait != 0 (and qpthreads_enabled), will wait for a
+ * message. In which case for:
*
* * mqt_cond_xxxx type message queues, will wait on the condition variable,
* and may timeout.
@@ -350,13 +369,19 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority)
* * mqt_signal_xxxx type message queues, will register the given signal
* (mtsig argument MUST be provided), and return immediately.
*
+ * NB: if !qpthreads_enabled, will not wait on the queue. No how.
+ *
+ * Note this means that waiters == 0 all the time if !qpthreads_enabled !
+ *
+ * NB: the argument is ignored if !wait or !qpthreads_enabled, so may be NULL.
+ *
* Returns a message block if one is available. (And not otherwise.)
*/
mqueue_block
mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
{
- mqueue_block mb ;
+ mqueue_block mqb ;
mqueue_thread_signal last ;
mqueue_thread_signal mtsig ;
@@ -366,12 +391,13 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
while (1)
{
- mb = mq->head ;
- if (mb != NULL)
+ mqb = mq->head ;
+ if (mqb != NULL)
break ; /* Easy if queue not empty */
- if (!wait)
- goto done ; /* Easy if not waiting ! mb == NULL */
+ if (!wait || !qpthreads_enabled)
+ goto done ; /* Easy if not waiting ! mqb == NULL */
+ /* Short circuit if !qpthreads_enabled */
++mq->waiters ; /* Another waiter */
@@ -401,7 +427,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
mq->kick.cond.timeout = timeout_time ;
} ;
- goto done ; /* immediate return. mb == NULL */
+ goto done ; /* immediate return. mqb == NULL */
} ;
} ;
break ;
@@ -419,13 +445,13 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
}
else
{
- last->next = mtsig ;
+ last->next = mtsig ;
mtsig->prev = last ;
}
mtsig->next = NULL ;
mq->kick.signal.tail = mtsig ;
- goto done ; /* BUT do not wait ! mb == NULL */
+ goto done ; /* BUT do not wait ! mqb == NULL */
default:
zabort("Invalid mqueue queue type") ;
@@ -434,25 +460,30 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
/* Have something to pull off the queue */
- mq->head = mb->next ;
- if (mb == mq->tail_priority)
+ mq->head = mqb->next ;
+ if (mqb == mq->tail_priority)
mq->tail_priority = NULL ;
done:
qpt_mutex_unlock(&mq->mutex) ;
- return mb ;
+ return mqb ;
} ;
-/* No longer waiting for a signal.
+/* No longer waiting for a signal -- does nothing if !qpthreads_enabled.
+ *
+ * Returns true <=> signal has been kicked
*
- * Returns true <=> signal has been kicked.
+ * (Signal will never be kicked if !qpthreads_enabled.)
*/
int
mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig)
{
int kicked ;
+ if (!qpthreads_enabled)
+ return 0 ;
+
qpt_mutex_lock(&mq->mutex) ;
dassert( (mq->type == mqt_signal_unicast) ||
@@ -482,6 +513,9 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig)
/* Initialise a message queue signal structure (struct mqueue_thread_signal).
* Allocate one if required.
*
+ * If !pthreads_enabled, then this structure is entirely redundant, but there
+ * is no harm in creating it -- but the signal will never be used.
+ *
* Returns address of the structure.
*/
mqueue_thread_signal
@@ -506,16 +540,19 @@ mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread,
*
* Removes the threads from the list and reduces the waiters count.
*
+ * NB: must be qpthreads_enabled with at least 'n' waiters.
+ *
* NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that
* the thread can tell that its signal has been kicked.
*
* NB: *** MUST own the mqueue_queue mutex. ***
*/
static void
-mqueue_kick_signal(mqueue_queue mq, int n)
+mqueue_kick_signal(mqueue_queue mq, unsigned n)
{
mqueue_thread_signal mtsig ;
+ dassert( (qpthreads_enabled) && (mq->waiters >= n) ) ;
while (n--)
{
mqueue_dequeue_signal(mq, mtsig = mq->kick.signal.head) ;
@@ -525,6 +562,9 @@ mqueue_kick_signal(mqueue_queue mq, int n)
/* Remove given signal from given message queue.
*
+ * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that
+ * the thread can tell that its signal has been kicked.
+ *
* NB: *** MUST own the mqueue_queue mutex. ***
*/
static void
@@ -533,8 +573,6 @@ mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig)
mqueue_thread_signal next ;
mqueue_thread_signal prev ;
- dassert((mq->kick.signal.head != NULL) && (mq->waiters != 0)) ;
-
next = mtsig->next ;
prev = mtsig->prev ;
@@ -565,11 +603,13 @@ mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig)
*
* Must be called before any qpt_threads are started.
*
+ * Freezes qpthreads_enabled.
+ *
* TODO: how do we shut down message queue handling ?
*/
void
-mqueue_initialise(int qpthreads)
+mqueue_initialise(void)
{
- if (qpthreads)
- p_mb_mutex = qpt_mutex_init(&mb_mutex, qpt_mutex_quagga) ;
+ if (qpthreads_enabled_freeze)
+ qpt_mutex_init_new(&mqb_mutex, qpt_mutex_quagga) ;
} ;
diff --git a/lib/mqueue.h b/lib/mqueue.h
index 5c10911b..6fb519d4 100644
--- a/lib/mqueue.h
+++ b/lib/mqueue.h
@@ -25,34 +25,43 @@
#include "qpthreads.h"
#include "qtime.h"
+#ifndef Inline
+#define Inline static inline
+#endif
+
/*==============================================================================
*/
typedef struct mqueue_block* mqueue_block ;
-typedef uint32_t mqueue_flags_t ;
-typedef uint32_t mqueue_context_t ;
+typedef uint32_t mqb_flags_t ;
+typedef uint32_t mqb_context_t ;
+
+typedef void* mqb_ptr_t ;
+typedef intptr_t mqb_int_t ;
+typedef uintptr_t mqb_uint_t ;
+
typedef union
{
- void* p ;
- uintptr_t u ;
- intptr_t i ;
-} mqueue_arg_t ;
+ mqb_ptr_t p ;
+ mqb_int_t i ;
+ mqb_uint_t u ;
+} mqb_arg_t ;
-typedef void mqueue_action(mqueue_block) ;
+typedef void mqueue_action(mqueue_block mqb) ;
struct mqueue_block
{
- mqueue_block next ; /* single linked list -- see ... */
+ mqueue_block next ; /* single linked list */
mqueue_action* action ; /* for message dispatch */
- mqueue_flags_t flags ; /* for message handler */
+ mqb_flags_t flags ; /* for message handler */
- mqueue_context_t context ; /* for message revoke */
+ mqb_context_t context ; /* for message revoke */
- mqueue_arg_t arg0 ; /* may be pointer to more data or integer */
- mqueue_arg_t arg1 ; /* may be pointer to more data or integer */
+ mqb_arg_t arg0 ; /* may be pointer or integer */
+ mqb_arg_t arg1 ; /* may be pointer or integer */
} ;
typedef struct mqueue_thread_signal* mqueue_thread_signal ;
@@ -91,11 +100,11 @@ typedef struct mqueue_queue* mqueue_queue ;
struct mqueue_queue
{
- qpt_mutex_t mutex ;
+ qpt_mutex_t mutex ;
- mqueue_block head ; /* NULL => list is empty */
- mqueue_block tail_priority ; /* last priority message (if any & not empty) */
- mqueue_block tail ; /* last message (if not empty) */
+ mqueue_block head ; /* NULL => list is empty */
+ mqueue_block tail_priority ; /* last priority message (if any & not empty) */
+ mqueue_block tail ; /* last message (if not empty) */
enum mqueue_queue_type type ;
@@ -112,7 +121,7 @@ struct mqueue_queue
*/
void
-mqueue_initialise(int qpthreads) ;
+mqueue_initialise(void) ;
mqueue_queue
mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) ;
@@ -124,13 +133,15 @@ mqueue_thread_signal
mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread,
int signum) ;
mqueue_block
-mqueue_block_new(void) ;
+mqb_init_new(mqueue_block mqb, mqueue_action action, mqb_context_t context) ;
+
+#define mqb_new(action, context) mqb_init_new(NULL, action, context)
void
-mqueue_block_free(mqueue_block mb) ;
+mqb_free(mqueue_block mqb) ;
void
-mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority) ;
+mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority) ;
mqueue_block
mqueue_dequeue(mqueue_queue mq, int wait, void* arg) ;
@@ -138,4 +149,134 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) ;
int
mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) ;
+/*==============================================================================
+ * Access functions for mqueue_block fields -- mqb_set_xxx/mqb_get_xxx
+ *
+ * Users should not poke around inside the mqueue_block structure.
+ */
+
+Inline void mqb_set_action(mqueue_block mqb, mqueue_action action) ;
+Inline void mqb_set_context(mqueue_block mqb, mqb_context_t context) ;
+
+Inline void mqb_set_arg0_p(mqueue_block mqb, mqb_ptr_t p) ;
+Inline void mqb_set_arg0_i(mqueue_block mqb, mqb_int_t i) ;
+Inline void mqb_set_arg0_u(mqueue_block mqb, mqb_uint_t u) ;
+Inline void mqb_set_arg1_p(mqueue_block mqb, mqb_ptr_t p) ;
+Inline void mqb_set_arg1_i(mqueue_block mqb, mqb_int_t i) ;
+Inline void mqb_set_arg1_u(mqueue_block mqb, mqb_uint_t u) ;
+
+Inline void mqb_dispatch(mqueue_block mqb) ;
+Inline mqb_context_t mqb_qet_context(mqueue_block mqb) ;
+
+Inline mqb_ptr_t mqb_get_arg0_p(mqueue_block mqb) ;
+Inline mqb_int_t mqb_get_arg0_i(mqueue_block mqb) ;
+Inline mqb_uint_t mqb_get_arg0_u(mqueue_block mqb) ;
+Inline mqb_ptr_t mqb_get_arg1_p(mqueue_block mqb) ;
+Inline mqb_int_t mqb_get_arg1_i(mqueue_block mqb) ;
+Inline mqb_uint_t mqb_get_arg1_u(mqueue_block mqb) ;
+
+/*==============================================================================
+ * The Inline functions.
+ */
+
+/* Set operations. */
+
+Inline void
+mqb_set_action(mqueue_block mqb, mqueue_action action)
+{
+ mqb->action = action ;
+} ;
+
+Inline void
+mqb_set_context(mqueue_block mqb, mqb_context_t context)
+{
+ mqb->context = context ;
+} ;
+
+Inline void
+mqb_set_arg0_p(mqueue_block mqb, mqb_ptr_t p)
+{
+ mqb->arg0.p = p ;
+} ;
+
+Inline void
+mqb_set_arg0_i(mqueue_block mqb, mqb_int_t i)
+{
+ mqb->arg0.i = i ;
+} ;
+
+Inline void
+mqb_set_arg0_u(mqueue_block mqb, mqb_uint_t u)
+{
+ mqb->arg0.u = u ;
+} ;
+
+Inline void
+mqb_set_arg1_p(mqueue_block mqb, mqb_ptr_t p)
+{
+ mqb->arg1.p = p ;
+} ;
+
+Inline void
+mqb_set_arg1_i(mqueue_block mqb, mqb_int_t i)
+{
+ mqb->arg1.i = i ;
+} ;
+
+Inline void
+mqb_set_arg1_u(mqueue_block mqb, mqb_uint_t u)
+{
+ mqb->arg1.u = u ;
+} ;
+
+/* Get operations */
+
+Inline void
+mqb_dispatch(mqueue_block mqb)
+{
+ mqb->action(mqb) ;
+} ;
+
+Inline mqb_context_t
+mqb_qet_context(mqueue_block mqb)
+{
+ return mqb->context ;
+} ;
+
+Inline mqb_ptr_t
+mqb_get_arg0_p(mqueue_block mqb)
+{
+ return mqb->arg0.p ;
+} ;
+
+Inline mqb_int_t
+mqb_get_arg0_i(mqueue_block mqb)
+{
+ return mqb->arg0.i ;
+} ;
+
+Inline mqb_uint_t
+mqb_get_arg0_u(mqueue_block mqb)
+{
+ return mqb->arg0.u ;
+} ;
+
+Inline mqb_ptr_t
+mqb_get_arg1_p(mqueue_block mqb)
+{
+ return mqb->arg1.p ;
+} ;
+
+Inline mqb_int_t
+mqb_get_arg1_i(mqueue_block mqb)
+{
+ return mqb->arg1.i ;
+} ;
+
+Inline mqb_uint_t
+mqb_get_arg1_u(mqueue_block mqb)
+{
+ return mqb->arg1.u ;
+} ;
+
#endif /* _ZEBRA_MQUEUE_H */