diff options
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r-- | lib/mqueue.c | 126 |
1 files changed, 76 insertions, 50 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c index 8fa9fbd5..f816932b 100644 --- a/lib/mqueue.c +++ b/lib/mqueue.c @@ -18,12 +18,10 @@ * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 02111-1307, USA. */ - -#include <string.h> +#include "misc.h" #include "memory.h" #include "mqueue.h" -#include "zassert.h" /*============================================================================== * These message queues are designed for inter-qpthread communication. @@ -211,7 +209,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) memset(mq, 0, sizeof(struct mqueue_queue)) ; if (qpt_freeze_qpthreads_enabled()) - qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ; + qpt_mutex_init_new(mq->mutex, qpt_mutex_quagga) ; /* head, tail and tail_priority set NULL already */ /* count set zero already */ @@ -222,7 +220,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) { case mqt_cond_unicast: case mqt_cond_broadcast: - qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ; + qpt_cond_init_new(mq->kick.cond.wait_here, qpt_cond_quagga) ; if (MQUEUE_DEFAULT_INTERVAL != 0) { @@ -254,7 +252,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) extern void mqueue_empty(mqueue_queue mq) { - mqueue_revoke(mq, NULL) ; + mqueue_revoke(mq, NULL, 0) ; assert((mq->head == NULL) && (mq->count == 0)) ; } ; @@ -268,19 +266,22 @@ mqueue_empty(mqueue_queue mq) * NB: there MUST NOT be ANY waiters ! */ extern mqueue_queue -mqueue_reset(mqueue_queue mq, int free_structure) +mqueue_reset(mqueue_queue mq, free_keep_b free_structure) { + if (mq == NULL) + return NULL ; + mqueue_empty(mq) ; passert(mq->waiters == 0) ; - qpt_mutex_destroy_keep(&mq->mutex) ; + qpt_mutex_destroy_keep(mq->mutex) ; switch (mq->type) { case mqt_cond_unicast: case mqt_cond_broadcast: - qpt_cond_destroy_keep(&mq->kick.cond.wait_here) ; + qpt_cond_destroy_keep(mq->kick.cond.wait_here) ; break; case mqt_signal_unicast: @@ -327,13 +328,10 @@ mqueue_local_init_new(mqueue_local_queue lmq) * * Dequeues entries and dispatches them "mqb_destroy", to empty the queue. * - * See: mqueue_local_reset_keep(lmq) - * mqueue_local_reset_free(lmq) - * * Returns address of Local Message Queue */ extern mqueue_local_queue -mqueue_local_reset(mqueue_local_queue lmq, int free_structure) +mqueue_local_reset(mqueue_local_queue lmq, free_keep_b free_structure) { mqueue_block mqb ; @@ -362,7 +360,7 @@ mqueue_local_reset(mqueue_local_queue lmq, int free_structure) extern void mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) { - qpt_mutex_lock(&mq->mutex) ; + qpt_mutex_lock(mq->mutex) ; dassert( (mq->type == mqt_cond_unicast) || (mq->type == mqt_cond_broadcast) ) ; @@ -370,7 +368,7 @@ mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) mq->kick.cond.interval = interval ; mq->kick.cond.timeout = (interval > 0) ? qt_add_monotonic(interval) : 0 ; - qpt_mutex_unlock(&mq->mutex) ; + qpt_mutex_unlock(mq->mutex) ; } ; /*============================================================================== @@ -484,9 +482,12 @@ mqb_re_init(mqueue_block mqb, mqueue_action action, void* arg0) * NB: it is the caller's responsibility to free the value of any argument that * requires it. */ -extern void +extern mqueue_block mqb_free(mqueue_block mqb) { + if (mqb == NULL) + return NULL ; + if (mqb->argv != NULL) XFREE(MTYPE_MQUEUE_BLOCK_ARGV, mqb->argv) ; @@ -497,6 +498,8 @@ mqb_free(mqueue_block mqb) ++mqb_free_count ; qpt_mutex_unlock(&mqb_mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ + + return NULL ; } ; /*============================================================================== @@ -533,12 +536,12 @@ static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ; * never be any waiters... so no kicking is ever done. */ extern void -mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, enum mqb_rank priority) +mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, mqb_rank_b priority) { if (mq == NULL) return mqb_dispatch_destroy(mqb) ; - qpt_mutex_lock(&mq->mutex) ; + qpt_mutex_lock(mq->mutex) ; if (mq->head == NULL) { @@ -588,12 +591,12 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, enum mqb_rank priority) switch (mq->type) { case mqt_cond_unicast: - qpt_cond_signal(&mq->kick.cond.wait_here) ; + qpt_cond_signal(mq->kick.cond.wait_here) ; --mq->waiters ; break ; case mqt_cond_broadcast: - qpt_cond_broadcast(&mq->kick.cond.wait_here) ; + qpt_cond_broadcast(mq->kick.cond.wait_here) ; mq->waiters = 0 ; break ; @@ -612,7 +615,7 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, enum mqb_rank priority) } ; } ; - qpt_mutex_unlock(&mq->mutex) ; + qpt_mutex_unlock(mq->mutex) ; } ; /*------------------------------------------------------------------------------ @@ -655,7 +658,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) if (mq == NULL) return NULL ; - qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ + qpt_mutex_lock(mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ while (1) { @@ -676,13 +679,13 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) case mqt_cond_unicast: /* Now wait here */ case mqt_cond_broadcast: if ((arg == NULL) && (mq->kick.cond.interval <= 0)) - qpt_cond_wait(&mq->kick.cond.wait_here, &mq->mutex) ; + qpt_cond_wait(mq->kick.cond.wait_here, mq->mutex) ; else { timeout_time = (arg != NULL) ? *(qtime_mono_t*)arg : mq->kick.cond.timeout ; - if (qpt_cond_timedwait(&mq->kick.cond.wait_here, &mq->mutex, + if (qpt_cond_timedwait(mq->kick.cond.wait_here, mq->mutex, timeout_time) == 0) { /* Timed out -- update timeout time, if required */ @@ -742,7 +745,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) mq->tail_priority = NULL ; done: - qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ + qpt_mutex_unlock(mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ return mqb ; } ; @@ -755,35 +758,41 @@ done: * * Revokes by calling mqb_dispatch_destroy(). * - * During a revoke() operation more items may be enqueued, but no other mqueue - * operations may be performed. Enqueued items may promptly be revoked, except - * for priority items if the revoke operation has already moved past the last - * priority item. + * NB: for safety, holds the queue locked for the duration of the revoke + * operation. + * + * If the destroy code can handle it, this means that can revoke stuff + * from one thread even though it is usually only dequeued by another. + * + * The danger is that if queues get very long, and many revokes happen, + * may (a) spend a lot of time scanning the message queue, which stops + * other threads as soon as they try to enqueue anything, and (b) if this + * happens a lot, could end up in an O(n^2) thing scanning the message + * queue once for each revoked object type. * * If mq is NULL, does nothing. + * + * If num > 0, stops after revoking that many messages. + * + * Returns: number of messages revoked. */ -extern void -mqueue_revoke(mqueue_queue mq, void* arg0) +extern int +mqueue_revoke(mqueue_queue mq, void* arg0, int num) { mqueue_block mqb ; mqueue_block prev ; + int did ; if (mq == NULL) - return ; + return 0 ; - qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ + qpt_mutex_lock(mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ + did = 0 ; prev = NULL ; - while (1) + mqb = mq->head ; + while (mqb != NULL) { - if (prev == NULL) - mqb = mq->head ; - else - mqb = prev->next ; - - if (mqb == NULL) - break ; - if ((arg0 == NULL) || (arg0 == mqb->arg0)) { assert(mq->count > 0) ; @@ -800,16 +809,30 @@ mqueue_revoke(mqueue_queue mq, void* arg0) mq->tail_priority = prev ; --mq->count ; + ++did ; + + mqb_dispatch_destroy(mqb) ; + + if (num == 1) + break ; + else if (num > 1) + --num ; - qpt_mutex_unlock(&mq->mutex) ; - mqb_dispatch_destroy(mqb) ; - qpt_mutex_lock(&mq->mutex) ; + if (prev == NULL) + mqb = mq->head ; + else + mqb = prev->next ; } else - prev = mqb ; + { + prev = mqb ; + mqb = mqb->next ; + } ; } ; - qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ + qpt_mutex_unlock(mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ + + return did ; } ; /*------------------------------------------------------------------------------ @@ -827,7 +850,7 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) if (!qpthreads_enabled) return 0 ; - qpt_mutex_lock(&mq->mutex) ; + qpt_mutex_lock(mq->mutex) ; dassert( (mq->type == mqt_signal_unicast) || (mq->type == mqt_signal_broadcast) ) ; @@ -844,7 +867,7 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) if (!kicked) mqueue_dequeue_signal(mq, mtsig) ; - qpt_mutex_unlock(&mq->mutex) ; + qpt_mutex_unlock(mq->mutex) ; return kicked ; } ; @@ -930,8 +953,11 @@ mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread, * Otherwise zeroises the structure, and returns address of same. */ extern mqueue_thread_signal -mqueue_thread_signal_reset(mqueue_thread_signal mqt, int free_structure) +mqueue_thread_signal_reset(mqueue_thread_signal mqt, free_keep_b free_structure) { + if (mqt == NULL) + return NULL ; + passert(mqt->prev == NULL) ; if (free_structure) |