diff options
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r-- | lib/mqueue.c | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c index c7037a64..8cef1ad9 100644 --- a/lib/mqueue.c +++ b/lib/mqueue.c @@ -211,7 +211,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) memset(mq, 0, sizeof(struct mqueue_queue)) ; if (qpt_freeze_qpthreads_enabled()) - qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ; + qpt_mutex_init_new(mq->mutex, qpt_mutex_quagga) ; /* head, tail and tail_priority set NULL already */ /* count set zero already */ @@ -222,7 +222,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) { case mqt_cond_unicast: case mqt_cond_broadcast: - qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ; + qpt_cond_init_new(mq->kick.cond.wait_here, qpt_cond_quagga) ; if (MQUEUE_DEFAULT_INTERVAL != 0) { @@ -277,13 +277,13 @@ mqueue_reset(mqueue_queue mq, free_keep_b free_structure) passert(mq->waiters == 0) ; - qpt_mutex_destroy_keep(&mq->mutex) ; + qpt_mutex_destroy_keep(mq->mutex) ; switch (mq->type) { case mqt_cond_unicast: case mqt_cond_broadcast: - qpt_cond_destroy_keep(&mq->kick.cond.wait_here) ; + qpt_cond_destroy_keep(mq->kick.cond.wait_here) ; break; case mqt_signal_unicast: @@ -362,7 +362,7 @@ mqueue_local_reset(mqueue_local_queue lmq, free_keep_b free_structure) extern void mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) { - qpt_mutex_lock(&mq->mutex) ; + qpt_mutex_lock(mq->mutex) ; dassert( (mq->type == mqt_cond_unicast) || (mq->type == mqt_cond_broadcast) ) ; @@ -370,7 +370,7 @@ mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) mq->kick.cond.interval = interval ; mq->kick.cond.timeout = (interval > 0) ? qt_add_monotonic(interval) : 0 ; - qpt_mutex_unlock(&mq->mutex) ; + qpt_mutex_unlock(mq->mutex) ; } ; /*============================================================================== @@ -484,9 +484,12 @@ mqb_re_init(mqueue_block mqb, mqueue_action action, void* arg0) * NB: it is the caller's responsibility to free the value of any argument that * requires it. */ -extern void +extern mqueue_block mqb_free(mqueue_block mqb) { + if (mqb == NULL) + return NULL ; + if (mqb->argv != NULL) XFREE(MTYPE_MQUEUE_BLOCK_ARGV, mqb->argv) ; @@ -497,6 +500,8 @@ mqb_free(mqueue_block mqb) ++mqb_free_count ; qpt_mutex_unlock(&mqb_mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ + + return NULL ; } ; /*============================================================================== @@ -538,7 +543,7 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, mqb_rank_b priority) if (mq == NULL) return mqb_dispatch_destroy(mqb) ; - qpt_mutex_lock(&mq->mutex) ; + qpt_mutex_lock(mq->mutex) ; if (mq->head == NULL) { @@ -588,12 +593,12 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, mqb_rank_b priority) switch (mq->type) { case mqt_cond_unicast: - qpt_cond_signal(&mq->kick.cond.wait_here) ; + qpt_cond_signal(mq->kick.cond.wait_here) ; --mq->waiters ; break ; case mqt_cond_broadcast: - qpt_cond_broadcast(&mq->kick.cond.wait_here) ; + qpt_cond_broadcast(mq->kick.cond.wait_here) ; mq->waiters = 0 ; break ; @@ -612,7 +617,7 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, mqb_rank_b priority) } ; } ; - qpt_mutex_unlock(&mq->mutex) ; + qpt_mutex_unlock(mq->mutex) ; } ; /*------------------------------------------------------------------------------ @@ -655,7 +660,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) if (mq == NULL) return NULL ; - qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ + qpt_mutex_lock(mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ while (1) { @@ -676,13 +681,13 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) case mqt_cond_unicast: /* Now wait here */ case mqt_cond_broadcast: if ((arg == NULL) && (mq->kick.cond.interval <= 0)) - qpt_cond_wait(&mq->kick.cond.wait_here, &mq->mutex) ; + qpt_cond_wait(mq->kick.cond.wait_here, mq->mutex) ; else { timeout_time = (arg != NULL) ? *(qtime_mono_t*)arg : mq->kick.cond.timeout ; - if (qpt_cond_timedwait(&mq->kick.cond.wait_here, &mq->mutex, + if (qpt_cond_timedwait(mq->kick.cond.wait_here, mq->mutex, timeout_time) == 0) { /* Timed out -- update timeout time, if required */ @@ -742,7 +747,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) mq->tail_priority = NULL ; done: - qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ + qpt_mutex_unlock(mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ return mqb ; } ; @@ -783,7 +788,7 @@ mqueue_revoke(mqueue_queue mq, void* arg0, int num) if (mq == NULL) return 0 ; - qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ + qpt_mutex_lock(mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ did = 0 ; prev = NULL ; @@ -827,7 +832,7 @@ mqueue_revoke(mqueue_queue mq, void* arg0, int num) } ; } ; - qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ + qpt_mutex_unlock(mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ return did ; } ; @@ -847,7 +852,7 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) if (!qpthreads_enabled) return 0 ; - qpt_mutex_lock(&mq->mutex) ; + qpt_mutex_lock(mq->mutex) ; dassert( (mq->type == mqt_signal_unicast) || (mq->type == mqt_signal_broadcast) ) ; @@ -864,7 +869,7 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) if (!kicked) mqueue_dequeue_signal(mq, mtsig) ; - qpt_mutex_unlock(&mq->mutex) ; + qpt_mutex_unlock(mq->mutex) ; return kicked ; } ; |