diff options
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r-- | lib/mqueue.c | 184 |
1 files changed, 112 insertions, 72 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c index 5fe892c2..7b93e4b0 100644 --- a/lib/mqueue.c +++ b/lib/mqueue.c @@ -31,6 +31,9 @@ * A message queue carries messages from one or more qpthreads to one or more * other qpthreads. * + * If !qpthreads_enabled, then a message queue hold messages for the program + * to consume later. There are never any waiters. Timeouts are ignored. + * * A message queue has one ordinary priority queue and one high priority * queue. * @@ -71,12 +74,13 @@ * * context -- identifies the context of the message (see revoke) * * * action -- void action(mqueue_block) message dispatch - * * arg_0 -- *void/uintptr_t/intptr_t ) standard arguments - * * arg_1 -- *void/uintptr_t/intptr_t ) + * * arg0 -- *void/uintptr_t/intptr_t ) standard arguments + * * arg1 -- *void/uintptr_t/intptr_t ) * - * (see struct mqueue_block). + * There are set/get functions for action/arg0/arg1 -- users should not poke + * around inside the structure. * - * To send a message, first allocate a message block (see mqueue_block_new), + * To send a message, first allocate a message block (see mqb_init_new), * then fill in the arguments and enqueue it. * * @@ -92,6 +96,9 @@ * * For mqt_cond_xxx type queues, sets the default timeout interval and the * initial timeout time to now + that interval. + * + * NB: once any message queue has been enabled, it is TOO LATE to enable + * qpthreads. */ mqueue_queue @@ -102,7 +109,8 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) else memset(mq, 0, sizeof(struct mqueue_queue)) ; - qpt_mutex_init(&mq->mutex, qpt_mutex_quagga) ; + if (qpt_freeze_qpthreads_enabled()) + qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ; /* head, tail and tail_priority set NULL already */ /* waiters set zero already */ @@ -112,7 +120,8 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) { case mqt_cond_unicast: case mqt_cond_broadcast: - qpt_cond_init(&mq->kick.cond.wait_here, qpt_cond_quagga) ; + if (qpthreads_enabled) + qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ; if (MQUEUE_DEFAULT_INTERVAL != 0) { mq->kick.cond.interval = MQUEUE_DEFAULT_INTERVAL ; @@ -136,6 +145,9 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) /* Set new timeout interval (or unset by setting <= 0) * * Sets the next timeout to be the time now + new interval (or never). + * + * This is a waste of time if !qpthreads_enabled, but does no harm. The + * timeout is ignored. */ void mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) @@ -146,7 +158,7 @@ mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) (mq->type == mqt_cond_broadcast) ) ; mq->kick.cond.interval = interval ; - mq->kick.cond.timeout = (interval > 0) ? qt_get_monotonic() + interval + mq->kick.cond.timeout = (interval > 0) ? qt_add_monotonic(interval) : 0 ; qpt_mutex_unlock(&mq->mutex) ; } ; @@ -160,55 +172,57 @@ mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) * mqueue_initialise MUST be called before the first message block is allocated. */ -static pthread_mutex_t* p_mb_mutex ; /* NULL => no mutex (yet) */ -static pthread_mutex_t mb_mutex ; +static pthread_mutex_t mqb_mutex ; -#define MB_LOT_SIZE 256 +#define MQB_LOT_SIZE 256 -static mqueue_block mb_lot_list = NULL ; -static mqueue_block mb_free_list = NULL ; +static mqueue_block mqb_lot_list = NULL ; +static mqueue_block mqb_free_list = NULL ; static mqueue_block mqueue_block_new_lot(void) ; -/* Get an empty message block +/* Initialise message block (allocate if required) and set action and context. */ mqueue_block -mqueue_block_new(void) +mqb_init_new(mqueue_block mqb, mqueue_action action, mqb_context_t context) { - mqueue_block mb ; + if (mqb == NULL) + { + qpt_mutex_lock(&mqb_mutex) ; - qpt_mutex_lock(&mb_mutex) ; + mqb = mqb_free_list ; + if (mqb == NULL) + mqb = mqueue_block_new_lot() ; - mb = mb_free_list ; - if (mb == NULL) - mb = mqueue_block_new_lot() ; + mqb_free_list = mqb->next ; - mb_free_list = mb->next ; + qpt_mutex_unlock(&mqb_mutex) ; + } ; - qpt_mutex_unlock(&mb_mutex) ; + memset(mqb, 0, sizeof(struct mqueue_block)) ; - memset(mb, 0, sizeof(struct mqueue_block)) ; + mqb->action = action ; + mqb->context = context ; - return mb ; + return mqb ; } ; /* Free message block when done with it. */ void -mqueue_block_free(mqueue_block mb) +mqb_free(mqueue_block mqb) { - qpt_mutex_lock(&mb_mutex) ; + qpt_mutex_lock(&mqb_mutex) ; - mb->next = mb_free_list ; - mb_free_list = mb ; + mqb->next = mqb_free_list ; + mqb_free_list = mqb ; - qpt_mutex_unlock(&mb_mutex) ; + qpt_mutex_unlock(&mqb_mutex) ; } ; /* Make a new lot of empty message_block structures. * - * NB: caller MUST hold the mb_mutex. - * + * NB: caller MUST hold the mqb_mutex. */ static mqueue_block mqueue_block_new_lot(void) @@ -216,12 +230,12 @@ mqueue_block_new_lot(void) mqueue_block first, last, this ; mqueue_block new = XCALLOC(MTYPE_MQUEUE_BLOCKS, - SIZE(struct mqueue_block, MB_LOT_SIZE)) ; + SIZE(struct mqueue_block, MQB_LOT_SIZE)) ; first = &new[1] ; - last = &new[MB_LOT_SIZE - 1] ; + last = &new[MQB_LOT_SIZE - 1] ; - new->next = mb_lot_list ; /* add to list of lots */ - mb_lot_list = new ; + new->next = mqb_lot_list ; /* add to list of lots */ + mqb_lot_list = new ; /* String all the new message_blocks together. */ this = last ; @@ -232,17 +246,17 @@ mqueue_block_new_lot(void) } ; assert(this == first) ; - last->next = mb_free_list ; /* point last at old free list */ - mb_free_list = first ; /* new blocks at head of free list */ + last->next = mqb_free_list ; /* point last at old free list */ + mqb_free_list = first ; /* new blocks at head of free list */ - return mb_free_list ; + return mqb_free_list ; } ; /*============================================================================== * Enqueue and dequeue messages. */ -static void mqueue_kick_signal(mqueue_queue mq, int n) ; +static void mqueue_kick_signal(mqueue_queue mq, unsigned n) ; static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ; /* Enqueue message. @@ -264,45 +278,50 @@ static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ; * * for a signal type message queue, each message that arrives will kick one * waiter. + * + * NB: this works perfectly well if !qpthreads enabled. Of course, there can + * never be any waiters... so no kicking is ever done. */ void -mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority) +mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority) { qpt_mutex_lock(&mq->mutex) ; if (mq->head == NULL) { - mb->next = NULL ; - mq->head = mb ; - mq->tail_priority = priority ? mb : NULL ; - mq->tail = mb ; + mqb->next = NULL ; + mq->head = mqb ; + mq->tail_priority = priority ? mqb : NULL ; + mq->tail = mqb ; } else if (priority) { mqueue_block after = mq->tail_priority ; if (after == NULL) { - mb->next = mq->head ; - mq->head = mb ; + mqb->next = mq->head ; + mq->head = mqb ; } else { - mb->next = after->next ; - after->next = mb ; + mqb->next = after->next ; + after->next = mqb ; } - mq->tail_priority = mb ; + mq->tail_priority = mqb ; } else { dassert(mq->tail != NULL) ; - mb->next = NULL ; - mq->tail->next = mb ; - mq->tail = mb ; + mqb->next = NULL ; + mq->tail->next = mqb ; + mq->tail = mqb ; } ; if (mq->waiters != 0) { + dassert(qpthreads_enabled) ; /* waiters == 0 if !qpthreads_enabled */ + switch (mq->type) { case mqt_cond_unicast: @@ -335,8 +354,8 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority) /* Dequeue message. * - * If the queue is empty and wait != 0, will wait for a message. In which - * case for: + * If the queue is empty and wait != 0 (and qpthreads_enabled), will wait for a + * message. In which case for: * * * mqt_cond_xxxx type message queues, will wait on the condition variable, * and may timeout. @@ -350,13 +369,19 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority) * * mqt_signal_xxxx type message queues, will register the given signal * (mtsig argument MUST be provided), and return immediately. * + * NB: if !qpthreads_enabled, will not wait on the queue. No how. + * + * Note this means that waiters == 0 all the time if !qpthreads_enabled ! + * + * NB: the argument is ignored if !wait or !qpthreads_enabled, so may be NULL. + * * Returns a message block if one is available. (And not otherwise.) */ mqueue_block mqueue_dequeue(mqueue_queue mq, int wait, void* arg) { - mqueue_block mb ; + mqueue_block mqb ; mqueue_thread_signal last ; mqueue_thread_signal mtsig ; @@ -366,12 +391,13 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) while (1) { - mb = mq->head ; - if (mb != NULL) + mqb = mq->head ; + if (mqb != NULL) break ; /* Easy if queue not empty */ - if (!wait) - goto done ; /* Easy if not waiting ! mb == NULL */ + if (!wait || !qpthreads_enabled) + goto done ; /* Easy if not waiting ! mqb == NULL */ + /* Short circuit if !qpthreads_enabled */ ++mq->waiters ; /* Another waiter */ @@ -401,7 +427,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) mq->kick.cond.timeout = timeout_time ; } ; - goto done ; /* immediate return. mb == NULL */ + goto done ; /* immediate return. mqb == NULL */ } ; } ; break ; @@ -419,13 +445,13 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) } else { - last->next = mtsig ; + last->next = mtsig ; mtsig->prev = last ; } mtsig->next = NULL ; mq->kick.signal.tail = mtsig ; - goto done ; /* BUT do not wait ! mb == NULL */ + goto done ; /* BUT do not wait ! mqb == NULL */ default: zabort("Invalid mqueue queue type") ; @@ -434,25 +460,30 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) /* Have something to pull off the queue */ - mq->head = mb->next ; - if (mb == mq->tail_priority) + mq->head = mqb->next ; + if (mqb == mq->tail_priority) mq->tail_priority = NULL ; done: qpt_mutex_unlock(&mq->mutex) ; - return mb ; + return mqb ; } ; -/* No longer waiting for a signal. +/* No longer waiting for a signal -- does nothing if !qpthreads_enabled. + * + * Returns true <=> signal has been kicked * - * Returns true <=> signal has been kicked. + * (Signal will never be kicked if !qpthreads_enabled.) */ int mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) { int kicked ; + if (!qpthreads_enabled) + return 0 ; + qpt_mutex_lock(&mq->mutex) ; dassert( (mq->type == mqt_signal_unicast) || @@ -482,6 +513,9 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) /* Initialise a message queue signal structure (struct mqueue_thread_signal). * Allocate one if required. * + * If !pthreads_enabled, then this structure is entirely redundant, but there + * is no harm in creating it -- but the signal will never be used. + * * Returns address of the structure. */ mqueue_thread_signal @@ -506,16 +540,19 @@ mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread, * * Removes the threads from the list and reduces the waiters count. * + * NB: must be qpthreads_enabled with at least 'n' waiters. + * * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that * the thread can tell that its signal has been kicked. * * NB: *** MUST own the mqueue_queue mutex. *** */ static void -mqueue_kick_signal(mqueue_queue mq, int n) +mqueue_kick_signal(mqueue_queue mq, unsigned n) { mqueue_thread_signal mtsig ; + dassert( (qpthreads_enabled) && (mq->waiters >= n) ) ; while (n--) { mqueue_dequeue_signal(mq, mtsig = mq->kick.signal.head) ; @@ -525,6 +562,9 @@ mqueue_kick_signal(mqueue_queue mq, int n) /* Remove given signal from given message queue. * + * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that + * the thread can tell that its signal has been kicked. + * * NB: *** MUST own the mqueue_queue mutex. *** */ static void @@ -533,8 +573,6 @@ mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) mqueue_thread_signal next ; mqueue_thread_signal prev ; - dassert((mq->kick.signal.head != NULL) && (mq->waiters != 0)) ; - next = mtsig->next ; prev = mtsig->prev ; @@ -565,11 +603,13 @@ mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) * * Must be called before any qpt_threads are started. * + * Freezes qpthreads_enabled. + * * TODO: how do we shut down message queue handling ? */ void -mqueue_initialise(int qpthreads) +mqueue_initialise(void) { - if (qpthreads) - p_mb_mutex = qpt_mutex_init(&mb_mutex, qpt_mutex_quagga) ; + if (qpthreads_enabled_freeze) + qpt_mutex_init_new(&mqb_mutex, qpt_mutex_quagga) ; } ; |