diff options
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r-- | lib/mqueue.c | 103 |
1 files changed, 96 insertions, 7 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c index 5afe902a..1e5e8be5 100644 --- a/lib/mqueue.c +++ b/lib/mqueue.c @@ -85,6 +85,13 @@ * For specific revoke, arg0 is assumed to identify the messages to be * revoked. * + *============================================================================== + * Local Queues + * + * A local queue may be used within a thread to requeue messages for later + * processing. + * + * Local queues are very simple FIFO queues. */ /*============================================================================== @@ -93,7 +100,8 @@ * TODO: how to shut down a message queue... for reset/exit ? */ -/* Initialise new Message Queue, if required (mq == NULL) allocating it. +/*------------------------------------------------------------------------------ + * Initialise new Message Queue, if required (mq == NULL) allocating it. * * For mqt_cond_xxx type queues, sets the default timeout interval and the * initial timeout time to now + that interval. @@ -143,7 +151,59 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) return mq ; } ; -/* Set new timeout interval (or unset by setting <= 0) +/*------------------------------------------------------------------------------ + * Initialise new Local Message Queue, if required (lmq == NULL) allocating it. + * + * Returns address of Local Message Queue + */ +extern mqueue_local_queue +mqueue_local_init_new(mqueue_local_queue lmq) +{ + if (lmq == NULL) + lmq = XCALLOC(MTYPE_MQUEUE_QUEUE, sizeof(struct mqueue_local_queue)) ; + else + memset(lmq, 0, sizeof(struct mqueue_local_queue)) ; + + /* Zeroising the structure is enough to initialise: + * + * * head -- NULL + * * tail -- NULL + */ + + return lmq ; +} ; + +/*------------------------------------------------------------------------------ + * Reset Local Message Queue, and if required free it. + * + * Dequeues entries and dispatches them "mqb_revoke", to empty the queue. + * + * See: mqueue_local_reset_keep(lmq) + * mqueue_local_reset_free(lmq) + * + * Returns address of Local Message Queue + */ +extern mqueue_local_queue +mqueue_local_reset(mqueue_local_queue lmq, int free_structure) +{ + mqueue_block mqb ; + + while ((mqb = lmq->head) != NULL) + { + lmq->head = mqb->next ; + mqueue_dispatch_destroy(mqb) ; + } ; + + if (free_structure) + XFREE(MTYPE_MQUEUE_QUEUE, lmq) ; /* sets lmq = NULL */ + else + memset(lmq, 0, sizeof(struct mqueue_local_queue)) ; + + return lmq ; +} ; + +/*------------------------------------------------------------------------------ + * Set new timeout interval (or unset by setting <= 0) * * Sets the next timeout to be the time now + new interval (or never). * @@ -260,7 +320,8 @@ mqueue_block_new_lot(void) static void mqueue_kick_signal(mqueue_queue mq, unsigned n) ; static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ; -/* Enqueue message. +/*------------------------------------------------------------------------------ + * Enqueue message. * * If priority != 0, will enqueue after any previously enqueued priority * messages. @@ -283,7 +344,6 @@ static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ; * NB: this works perfectly well if !qpthreads enabled. Of course, there can * never be any waiters... so no kicking is ever done. */ - void mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority) { @@ -353,7 +413,8 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority) qpt_mutex_unlock(&mq->mutex) ; } ; -/* Dequeue message. +/*------------------------------------------------------------------------------ + * Dequeue message. * * If the queue is empty and wait != 0 (and qpthreads_enabled), will wait for a * message. In which case for: @@ -378,7 +439,6 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority) * * Returns a message block if one is available. (And not otherwise.) */ - mqueue_block mqueue_dequeue(mqueue_queue mq, int wait, void* arg) { @@ -471,7 +531,8 @@ done: return mqb ; } ; -/* No longer waiting for a signal -- does nothing if !qpthreads_enabled. +/*------------------------------------------------------------------------------ + * No longer waiting for a signal -- does nothing if !qpthreads_enabled. * * Returns true <=> signal has been kicked * @@ -507,6 +568,34 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) return kicked ; } ; +/*------------------------------------------------------------------------------ + * Enqueue message on local queue + */ +extern void +mqueue_local_enqueue(mqueue_local_queue lmq, mqueue_block mqb) +{ + if (lmq->head == NULL) + lmq->head = mqb ; + else + lmq->tail->next = mqb ; + lmq->tail = mqb ; + mqb->next = NULL ; +} ; + +/*------------------------------------------------------------------------------ + * Dequeue message from local queue -- returns NULL if empty + */ +extern mqueue_block +mqueue_local_dequeue(mqueue_local_queue lmq) +{ + mqueue_block mqb = lmq->head ; + + if (mqb != NULL) + lmq->head = mqb->next ; + + return mqb ; +} ; + /*============================================================================== * Message queue signal handling */ |