summaryrefslogtreecommitdiffstats
path: root/lib/mqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r--lib/mqueue.c79
1 files changed, 51 insertions, 28 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c
index 8fa9fbd5..c7037a64 100644
--- a/lib/mqueue.c
+++ b/lib/mqueue.c
@@ -254,7 +254,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
extern void
mqueue_empty(mqueue_queue mq)
{
- mqueue_revoke(mq, NULL) ;
+ mqueue_revoke(mq, NULL, 0) ;
assert((mq->head == NULL) && (mq->count == 0)) ;
} ;
@@ -268,8 +268,11 @@ mqueue_empty(mqueue_queue mq)
* NB: there MUST NOT be ANY waiters !
*/
extern mqueue_queue
-mqueue_reset(mqueue_queue mq, int free_structure)
+mqueue_reset(mqueue_queue mq, free_keep_b free_structure)
{
+ if (mq == NULL)
+ return NULL ;
+
mqueue_empty(mq) ;
passert(mq->waiters == 0) ;
@@ -327,13 +330,10 @@ mqueue_local_init_new(mqueue_local_queue lmq)
*
* Dequeues entries and dispatches them "mqb_destroy", to empty the queue.
*
- * See: mqueue_local_reset_keep(lmq)
- * mqueue_local_reset_free(lmq)
- *
* Returns address of Local Message Queue
*/
extern mqueue_local_queue
-mqueue_local_reset(mqueue_local_queue lmq, int free_structure)
+mqueue_local_reset(mqueue_local_queue lmq, free_keep_b free_structure)
{
mqueue_block mqb ;
@@ -533,7 +533,7 @@ static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ;
* never be any waiters... so no kicking is ever done.
*/
extern void
-mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, enum mqb_rank priority)
+mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, mqb_rank_b priority)
{
if (mq == NULL)
return mqb_dispatch_destroy(mqb) ;
@@ -755,35 +755,41 @@ done:
*
* Revokes by calling mqb_dispatch_destroy().
*
- * During a revoke() operation more items may be enqueued, but no other mqueue
- * operations may be performed. Enqueued items may promptly be revoked, except
- * for priority items if the revoke operation has already moved past the last
- * priority item.
+ * NB: for safety, holds the queue locked for the duration of the revoke
+ * operation.
+ *
+ * If the destroy code can handle it, this means that can revoke stuff
+ * from one thread even though it is usually only dequeued by another.
+ *
+ * The danger is that if queues get very long, and many revokes happen,
+ * may (a) spend a lot of time scanning the message queue, which stops
+ * other threads as soon as they try to enqueue anything, and (b) if this
+ * happens a lot, could end up in an O(n^2) thing scanning the message
+ * queue once for each revoked object type.
*
* If mq is NULL, does nothing.
+ *
+ * If num > 0, stops after revoking that many messages.
+ *
+ * Returns: number of messages revoked.
*/
-extern void
-mqueue_revoke(mqueue_queue mq, void* arg0)
+extern int
+mqueue_revoke(mqueue_queue mq, void* arg0, int num)
{
mqueue_block mqb ;
mqueue_block prev ;
+ int did ;
if (mq == NULL)
- return ;
+ return 0 ;
qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
+ did = 0 ;
prev = NULL ;
- while (1)
+ mqb = mq->head ;
+ while (mqb != NULL)
{
- if (prev == NULL)
- mqb = mq->head ;
- else
- mqb = prev->next ;
-
- if (mqb == NULL)
- break ;
-
if ((arg0 == NULL) || (arg0 == mqb->arg0))
{
assert(mq->count > 0) ;
@@ -800,16 +806,30 @@ mqueue_revoke(mqueue_queue mq, void* arg0)
mq->tail_priority = prev ;
--mq->count ;
+ ++did ;
+
+ mqb_dispatch_destroy(mqb) ;
- qpt_mutex_unlock(&mq->mutex) ;
- mqb_dispatch_destroy(mqb) ;
- qpt_mutex_lock(&mq->mutex) ;
+ if (num == 1)
+ break ;
+ else if (num > 1)
+ --num ;
+
+ if (prev == NULL)
+ mqb = mq->head ;
+ else
+ mqb = prev->next ;
}
else
- prev = mqb ;
+ {
+ prev = mqb ;
+ mqb = mqb->next ;
+ } ;
} ;
qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+
+ return did ;
} ;
/*------------------------------------------------------------------------------
@@ -930,8 +950,11 @@ mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread,
* Otherwise zeroises the structure, and returns address of same.
*/
extern mqueue_thread_signal
-mqueue_thread_signal_reset(mqueue_thread_signal mqt, int free_structure)
+mqueue_thread_signal_reset(mqueue_thread_signal mqt, free_keep_b free_structure)
{
+ if (mqt == NULL)
+ return NULL ;
+
passert(mqt->prev == NULL) ;
if (free_structure)