diff options
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r-- | lib/mqueue.c | 161 |
1 files changed, 129 insertions, 32 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c index de1d654c..a6dca32f 100644 --- a/lib/mqueue.c +++ b/lib/mqueue.c @@ -155,6 +155,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ; /* head, tail and tail_priority set NULL already */ + /* count set zero already */ /* waiters set zero already */ mq->type = type ; @@ -162,8 +163,8 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) { case mqt_cond_unicast: case mqt_cond_broadcast: - if (qpthreads_enabled) - qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ; + qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ; + if (MQUEUE_DEFAULT_INTERVAL != 0) { mq->kick.cond.interval = MQUEUE_DEFAULT_INTERVAL ; @@ -185,6 +186,62 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) } ; /*------------------------------------------------------------------------------ + * Empty message queue -- by revoking everything. + * + * Leaves queue ready for continued use with all existing settings. + * + * If there were any waiters, they are still waiting. + */ +extern void +mqueue_empty(mqueue_queue mq) +{ + mqueue_revoke(mq, NULL) ; + + assert((mq->head == NULL) && (mq->count == 0)) ; +} ; + +/*------------------------------------------------------------------------------ + * Reset message queue -- empty it out by revoking everything. + * + * Frees the structure if required, and returns NULL. + * Otherwise zeroises the structure, and returns address of same. + * + * NB: there MUST NOT be ANY waiters ! + */ +extern mqueue_queue +mqueue_reset(mqueue_queue mq, int free_structure) +{ + mqueue_empty(mq) ; + + passert(mq->waiters == 0) ; + + qpt_mutex_destroy_keep(&mq->mutex) ; + + switch (mq->type) + { + case mqt_cond_unicast: + case mqt_cond_broadcast: + qpt_cond_destroy_keep(&mq->kick.cond.wait_here) ; + break; + + case mqt_signal_unicast: + case mqt_signal_broadcast: + passert(mq->kick.signal.head == NULL) ; + break; + + default: + zabort("Invalid mqueue queue type") ; + } ; + + if (free_structure) + XFREE(MTYPE_MQUEUE_QUEUE, mq) ; /* sets mq == NULL */ + else + memset(mq, 0, sizeof(struct mqueue_queue)) ; + + return mq ; +} ; + +/*------------------------------------------------------------------------------ * Initialise new Local Message Queue, if required (lmq == NULL) allocating it. * * Returns address of Local Message Queue @@ -426,38 +483,45 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority) if (mq->head == NULL) { + assert(mq->count == 0) ; mqb->next = NULL ; mq->head = mqb ; mq->tail_priority = priority ? mqb : NULL ; mq->tail = mqb ; } - else if (priority) - { - mqueue_block after = mq->tail_priority ; - if (after == NULL) - { - mqb->next = mq->head ; - mq->head = mqb ; - /* mq non-empty, enchain at head, therefore tail unaffected */ - } - else - { - mqb->next = after->next ; - after->next = mqb ; - /* if only have priority messages then fix tail */ - if (mq->tail == after) - mq->tail = mqb; - } - mq->tail_priority = mqb ; - } else { - dassert(mq->tail != NULL) ; - mqb->next = NULL ; - mq->tail->next = mqb ; - mq->tail = mqb ; + assert(mq->count > 0) ; + if (priority) + { + mqueue_block after = mq->tail_priority ; + if (after == NULL) + { + mqb->next = mq->head ; + mq->head = mqb ; + /* mq non-empty, enchain at head, therefore tail unaffected */ + } + else + { + mqb->next = after->next ; + after->next = mqb ; + /* if only have priority messages then fix tail */ + if (mq->tail == after) + mq->tail = mqb; + } + mq->tail_priority = mqb ; + } + else + { + dassert(mq->tail != NULL) ; + mqb->next = NULL ; + mq->tail->next = mqb ; + mq->tail = mqb ; + } ; } ; + ++mq->count ; + if (mq->waiters != 0) { dassert(qpthreads_enabled) ; /* waiters == 0 if !qpthreads_enabled */ @@ -527,7 +591,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) mqueue_thread_signal mtsig ; qtime_mono_t timeout_time ; - qpt_mutex_lock(&mq->mutex) ; + qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ while (1) { @@ -535,6 +599,8 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) if (mqb != NULL) break ; /* Easy if queue not empty */ + assert(mq->count == 0) ; + if (!wait || !qpthreads_enabled) goto done ; /* Easy if not waiting ! mqb == NULL */ /* Short circuit if !qpthreads_enabled */ @@ -600,6 +666,9 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) /* Have something to pull off the queue */ + assert(mq->count > 0) ; + --mq->count ; + mq->head = mqb->next ; /* fix tails if at tail */ @@ -609,7 +678,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) mq->tail_priority = NULL ; done: - qpt_mutex_unlock(&mq->mutex) ; + qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ return mqb ; } ; @@ -633,7 +702,7 @@ mqueue_revoke(mqueue_queue mq, void* arg0) mqueue_block mqb ; mqueue_block prev ; - qpt_mutex_lock(&mq->mutex) ; + qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ prev = NULL ; while (1) @@ -648,6 +717,8 @@ mqueue_revoke(mqueue_queue mq, void* arg0) if ((arg0 == NULL) || (arg0 == mqb->arg0)) { + assert(mq->count > 0) ; + if (prev == NULL) mq->head = mqb->next ; else @@ -659,6 +730,8 @@ mqueue_revoke(mqueue_queue mq, void* arg0) if (mqb == mq->tail_priority) mq->tail_priority = prev ; + --mq->count ; + qpt_mutex_unlock(&mq->mutex) ; mqb_dispatch_destroy(mqb) ; qpt_mutex_lock(&mq->mutex) ; @@ -667,7 +740,7 @@ mqueue_revoke(mqueue_queue mq, void* arg0) prev = mqb ; } ; - qpt_mutex_unlock(&mq->mutex) ; + qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ } ; /*------------------------------------------------------------------------------ @@ -752,7 +825,8 @@ mqueue_local_dequeue(mqueue_local_queue lmq) * Message queue signal handling */ -/* Initialise a message queue signal structure (struct mqueue_thread_signal). +/*------------------------------------------------------------------------------ + * Initialise a message queue signal structure (struct mqueue_thread_signal). * Allocate one if required. * * If !pthreads_enabled, then this structure is entirely redundant, but there @@ -778,7 +852,29 @@ mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread, return mqt ; } ; -/* Signal the first 'n' threads on the to be signalled list. +/*------------------------------------------------------------------------------ + * Reset a message queue signal structure and release if required. + * + * NB: MUST NOT be queued as a waiter anywhere !! + * + * Frees the structure if required, and returns NULL. + * Otherwise zeroises the structure, and returns address of same. + */ +mqueue_thread_signal +mqueue_thread_signal_reset(mqueue_thread_signal mqt, int free_structure) +{ + passert(mqt->prev == NULL) ; + + if (free_structure) + XFREE(MTYPE_MQUEUE_THREAD_SIGNAL, mqt) ; /* sets mqt = NULL */ + else + memset(mqt, 0, sizeof(struct mqueue_thread_signal)) ; + + return mqt ; +} ; + +/*------------------------------------------------------------------------------ + * Signal the first 'n' threads on the to be signalled list. * * Removes the threads from the list and reduces the waiters count. * @@ -802,7 +898,8 @@ mqueue_kick_signal(mqueue_queue mq, unsigned n) } ; } ; -/* Remove given signal from given message queue. +/*------------------------------------------------------------------------------ + * Remove given signal from given message queue. * * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that * the thread can tell that its signal has been kicked. |