diff options
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r-- | lib/mqueue.c | 79 |
1 files changed, 51 insertions, 28 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c index 8fa9fbd5..c7037a64 100644 --- a/lib/mqueue.c +++ b/lib/mqueue.c @@ -254,7 +254,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) extern void mqueue_empty(mqueue_queue mq) { - mqueue_revoke(mq, NULL) ; + mqueue_revoke(mq, NULL, 0) ; assert((mq->head == NULL) && (mq->count == 0)) ; } ; @@ -268,8 +268,11 @@ mqueue_empty(mqueue_queue mq) * NB: there MUST NOT be ANY waiters ! */ extern mqueue_queue -mqueue_reset(mqueue_queue mq, int free_structure) +mqueue_reset(mqueue_queue mq, free_keep_b free_structure) { + if (mq == NULL) + return NULL ; + mqueue_empty(mq) ; passert(mq->waiters == 0) ; @@ -327,13 +330,10 @@ mqueue_local_init_new(mqueue_local_queue lmq) * * Dequeues entries and dispatches them "mqb_destroy", to empty the queue. * - * See: mqueue_local_reset_keep(lmq) - * mqueue_local_reset_free(lmq) - * * Returns address of Local Message Queue */ extern mqueue_local_queue -mqueue_local_reset(mqueue_local_queue lmq, int free_structure) +mqueue_local_reset(mqueue_local_queue lmq, free_keep_b free_structure) { mqueue_block mqb ; @@ -533,7 +533,7 @@ static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ; * never be any waiters... so no kicking is ever done. */ extern void -mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, enum mqb_rank priority) +mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, mqb_rank_b priority) { if (mq == NULL) return mqb_dispatch_destroy(mqb) ; @@ -755,35 +755,41 @@ done: * * Revokes by calling mqb_dispatch_destroy(). * - * During a revoke() operation more items may be enqueued, but no other mqueue - * operations may be performed. Enqueued items may promptly be revoked, except - * for priority items if the revoke operation has already moved past the last - * priority item. + * NB: for safety, holds the queue locked for the duration of the revoke + * operation. + * + * If the destroy code can handle it, this means that can revoke stuff + * from one thread even though it is usually only dequeued by another. + * + * The danger is that if queues get very long, and many revokes happen, + * may (a) spend a lot of time scanning the message queue, which stops + * other threads as soon as they try to enqueue anything, and (b) if this + * happens a lot, could end up in an O(n^2) thing scanning the message + * queue once for each revoked object type. * * If mq is NULL, does nothing. + * + * If num > 0, stops after revoking that many messages. + * + * Returns: number of messages revoked. */ -extern void -mqueue_revoke(mqueue_queue mq, void* arg0) +extern int +mqueue_revoke(mqueue_queue mq, void* arg0, int num) { mqueue_block mqb ; mqueue_block prev ; + int did ; if (mq == NULL) - return ; + return 0 ; qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ + did = 0 ; prev = NULL ; - while (1) + mqb = mq->head ; + while (mqb != NULL) { - if (prev == NULL) - mqb = mq->head ; - else - mqb = prev->next ; - - if (mqb == NULL) - break ; - if ((arg0 == NULL) || (arg0 == mqb->arg0)) { assert(mq->count > 0) ; @@ -800,16 +806,30 @@ mqueue_revoke(mqueue_queue mq, void* arg0) mq->tail_priority = prev ; --mq->count ; + ++did ; + + mqb_dispatch_destroy(mqb) ; - qpt_mutex_unlock(&mq->mutex) ; - mqb_dispatch_destroy(mqb) ; - qpt_mutex_lock(&mq->mutex) ; + if (num == 1) + break ; + else if (num > 1) + --num ; + + if (prev == NULL) + mqb = mq->head ; + else + mqb = prev->next ; } else - prev = mqb ; + { + prev = mqb ; + mqb = mqb->next ; + } ; } ; qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ + + return did ; } ; /*------------------------------------------------------------------------------ @@ -930,8 +950,11 @@ mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread, * Otherwise zeroises the structure, and returns address of same. */ extern mqueue_thread_signal -mqueue_thread_signal_reset(mqueue_thread_signal mqt, int free_structure) +mqueue_thread_signal_reset(mqueue_thread_signal mqt, free_keep_b free_structure) { + if (mqt == NULL) + return NULL ; + passert(mqt->prev == NULL) ; if (free_structure) |