summaryrefslogtreecommitdiffstats
path: root/lib/mqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r--lib/mqueue.c161
1 files changed, 129 insertions, 32 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c
index de1d654c..a6dca32f 100644
--- a/lib/mqueue.c
+++ b/lib/mqueue.c
@@ -155,6 +155,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ;
/* head, tail and tail_priority set NULL already */
+ /* count set zero already */
/* waiters set zero already */
mq->type = type ;
@@ -162,8 +163,8 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
{
case mqt_cond_unicast:
case mqt_cond_broadcast:
- if (qpthreads_enabled)
- qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ;
+ qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ;
+
if (MQUEUE_DEFAULT_INTERVAL != 0)
{
mq->kick.cond.interval = MQUEUE_DEFAULT_INTERVAL ;
@@ -185,6 +186,62 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
} ;
/*------------------------------------------------------------------------------
+ * Empty message queue -- by revoking everything.
+ *
+ * Leaves queue ready for continued use with all existing settings.
+ *
+ * If there were any waiters, they are still waiting.
+ */
+extern void
+mqueue_empty(mqueue_queue mq)
+{
+ mqueue_revoke(mq, NULL) ;
+
+ assert((mq->head == NULL) && (mq->count == 0)) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Reset message queue -- empty it out by revoking everything.
+ *
+ * Frees the structure if required, and returns NULL.
+ * Otherwise zeroises the structure, and returns address of same.
+ *
+ * NB: there MUST NOT be ANY waiters !
+ */
+extern mqueue_queue
+mqueue_reset(mqueue_queue mq, int free_structure)
+{
+ mqueue_empty(mq) ;
+
+ passert(mq->waiters == 0) ;
+
+ qpt_mutex_destroy_keep(&mq->mutex) ;
+
+ switch (mq->type)
+ {
+ case mqt_cond_unicast:
+ case mqt_cond_broadcast:
+ qpt_cond_destroy_keep(&mq->kick.cond.wait_here) ;
+ break;
+
+ case mqt_signal_unicast:
+ case mqt_signal_broadcast:
+ passert(mq->kick.signal.head == NULL) ;
+ break;
+
+ default:
+ zabort("Invalid mqueue queue type") ;
+ } ;
+
+ if (free_structure)
+ XFREE(MTYPE_MQUEUE_QUEUE, mq) ; /* sets mq == NULL */
+ else
+ memset(mq, 0, sizeof(struct mqueue_queue)) ;
+
+ return mq ;
+} ;
+
+/*------------------------------------------------------------------------------
* Initialise new Local Message Queue, if required (lmq == NULL) allocating it.
*
* Returns address of Local Message Queue
@@ -426,38 +483,45 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority)
if (mq->head == NULL)
{
+ assert(mq->count == 0) ;
mqb->next = NULL ;
mq->head = mqb ;
mq->tail_priority = priority ? mqb : NULL ;
mq->tail = mqb ;
}
- else if (priority)
- {
- mqueue_block after = mq->tail_priority ;
- if (after == NULL)
- {
- mqb->next = mq->head ;
- mq->head = mqb ;
- /* mq non-empty, enchain at head, therefore tail unaffected */
- }
- else
- {
- mqb->next = after->next ;
- after->next = mqb ;
- /* if only have priority messages then fix tail */
- if (mq->tail == after)
- mq->tail = mqb;
- }
- mq->tail_priority = mqb ;
- }
else
{
- dassert(mq->tail != NULL) ;
- mqb->next = NULL ;
- mq->tail->next = mqb ;
- mq->tail = mqb ;
+ assert(mq->count > 0) ;
+ if (priority)
+ {
+ mqueue_block after = mq->tail_priority ;
+ if (after == NULL)
+ {
+ mqb->next = mq->head ;
+ mq->head = mqb ;
+ /* mq non-empty, enchain at head, therefore tail unaffected */
+ }
+ else
+ {
+ mqb->next = after->next ;
+ after->next = mqb ;
+ /* if only have priority messages then fix tail */
+ if (mq->tail == after)
+ mq->tail = mqb;
+ }
+ mq->tail_priority = mqb ;
+ }
+ else
+ {
+ dassert(mq->tail != NULL) ;
+ mqb->next = NULL ;
+ mq->tail->next = mqb ;
+ mq->tail = mqb ;
+ } ;
} ;
+ ++mq->count ;
+
if (mq->waiters != 0)
{
dassert(qpthreads_enabled) ; /* waiters == 0 if !qpthreads_enabled */
@@ -527,7 +591,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
mqueue_thread_signal mtsig ;
qtime_mono_t timeout_time ;
- qpt_mutex_lock(&mq->mutex) ;
+ qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
while (1)
{
@@ -535,6 +599,8 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
if (mqb != NULL)
break ; /* Easy if queue not empty */
+ assert(mq->count == 0) ;
+
if (!wait || !qpthreads_enabled)
goto done ; /* Easy if not waiting ! mqb == NULL */
/* Short circuit if !qpthreads_enabled */
@@ -600,6 +666,9 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
/* Have something to pull off the queue */
+ assert(mq->count > 0) ;
+ --mq->count ;
+
mq->head = mqb->next ;
/* fix tails if at tail */
@@ -609,7 +678,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
mq->tail_priority = NULL ;
done:
- qpt_mutex_unlock(&mq->mutex) ;
+ qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
return mqb ;
} ;
@@ -633,7 +702,7 @@ mqueue_revoke(mqueue_queue mq, void* arg0)
mqueue_block mqb ;
mqueue_block prev ;
- qpt_mutex_lock(&mq->mutex) ;
+ qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
prev = NULL ;
while (1)
@@ -648,6 +717,8 @@ mqueue_revoke(mqueue_queue mq, void* arg0)
if ((arg0 == NULL) || (arg0 == mqb->arg0))
{
+ assert(mq->count > 0) ;
+
if (prev == NULL)
mq->head = mqb->next ;
else
@@ -659,6 +730,8 @@ mqueue_revoke(mqueue_queue mq, void* arg0)
if (mqb == mq->tail_priority)
mq->tail_priority = prev ;
+ --mq->count ;
+
qpt_mutex_unlock(&mq->mutex) ;
mqb_dispatch_destroy(mqb) ;
qpt_mutex_lock(&mq->mutex) ;
@@ -667,7 +740,7 @@ mqueue_revoke(mqueue_queue mq, void* arg0)
prev = mqb ;
} ;
- qpt_mutex_unlock(&mq->mutex) ;
+ qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
} ;
/*------------------------------------------------------------------------------
@@ -752,7 +825,8 @@ mqueue_local_dequeue(mqueue_local_queue lmq)
* Message queue signal handling
*/
-/* Initialise a message queue signal structure (struct mqueue_thread_signal).
+/*------------------------------------------------------------------------------
+ * Initialise a message queue signal structure (struct mqueue_thread_signal).
* Allocate one if required.
*
* If !pthreads_enabled, then this structure is entirely redundant, but there
@@ -778,7 +852,29 @@ mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread,
return mqt ;
} ;
-/* Signal the first 'n' threads on the to be signalled list.
+/*------------------------------------------------------------------------------
+ * Reset a message queue signal structure and release if required.
+ *
+ * NB: MUST NOT be queued as a waiter anywhere !!
+ *
+ * Frees the structure if required, and returns NULL.
+ * Otherwise zeroises the structure, and returns address of same.
+ */
+mqueue_thread_signal
+mqueue_thread_signal_reset(mqueue_thread_signal mqt, int free_structure)
+{
+ passert(mqt->prev == NULL) ;
+
+ if (free_structure)
+ XFREE(MTYPE_MQUEUE_THREAD_SIGNAL, mqt) ; /* sets mqt = NULL */
+ else
+ memset(mqt, 0, sizeof(struct mqueue_thread_signal)) ;
+
+ return mqt ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Signal the first 'n' threads on the to be signalled list.
*
* Removes the threads from the list and reduces the waiters count.
*
@@ -802,7 +898,8 @@ mqueue_kick_signal(mqueue_queue mq, unsigned n)
} ;
} ;
-/* Remove given signal from given message queue.
+/*------------------------------------------------------------------------------
+ * Remove given signal from given message queue.
*
* NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that
* the thread can tell that its signal has been kicked.