summaryrefslogtreecommitdiffstats
path: root/lib/mqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r--lib/mqueue.c43
1 files changed, 24 insertions, 19 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c
index c7037a64..8cef1ad9 100644
--- a/lib/mqueue.c
+++ b/lib/mqueue.c
@@ -211,7 +211,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
memset(mq, 0, sizeof(struct mqueue_queue)) ;
if (qpt_freeze_qpthreads_enabled())
- qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ;
+ qpt_mutex_init_new(mq->mutex, qpt_mutex_quagga) ;
/* head, tail and tail_priority set NULL already */
/* count set zero already */
@@ -222,7 +222,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
{
case mqt_cond_unicast:
case mqt_cond_broadcast:
- qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ;
+ qpt_cond_init_new(mq->kick.cond.wait_here, qpt_cond_quagga) ;
if (MQUEUE_DEFAULT_INTERVAL != 0)
{
@@ -277,13 +277,13 @@ mqueue_reset(mqueue_queue mq, free_keep_b free_structure)
passert(mq->waiters == 0) ;
- qpt_mutex_destroy_keep(&mq->mutex) ;
+ qpt_mutex_destroy_keep(mq->mutex) ;
switch (mq->type)
{
case mqt_cond_unicast:
case mqt_cond_broadcast:
- qpt_cond_destroy_keep(&mq->kick.cond.wait_here) ;
+ qpt_cond_destroy_keep(mq->kick.cond.wait_here) ;
break;
case mqt_signal_unicast:
@@ -362,7 +362,7 @@ mqueue_local_reset(mqueue_local_queue lmq, free_keep_b free_structure)
extern void
mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval)
{
- qpt_mutex_lock(&mq->mutex) ;
+ qpt_mutex_lock(mq->mutex) ;
dassert( (mq->type == mqt_cond_unicast) ||
(mq->type == mqt_cond_broadcast) ) ;
@@ -370,7 +370,7 @@ mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval)
mq->kick.cond.interval = interval ;
mq->kick.cond.timeout = (interval > 0) ? qt_add_monotonic(interval)
: 0 ;
- qpt_mutex_unlock(&mq->mutex) ;
+ qpt_mutex_unlock(mq->mutex) ;
} ;
/*==============================================================================
@@ -484,9 +484,12 @@ mqb_re_init(mqueue_block mqb, mqueue_action action, void* arg0)
* NB: it is the caller's responsibility to free the value of any argument that
* requires it.
*/
-extern void
+extern mqueue_block
mqb_free(mqueue_block mqb)
{
+ if (mqb == NULL)
+ return NULL ;
+
if (mqb->argv != NULL)
XFREE(MTYPE_MQUEUE_BLOCK_ARGV, mqb->argv) ;
@@ -497,6 +500,8 @@ mqb_free(mqueue_block mqb)
++mqb_free_count ;
qpt_mutex_unlock(&mqb_mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+
+ return NULL ;
} ;
/*==============================================================================
@@ -538,7 +543,7 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, mqb_rank_b priority)
if (mq == NULL)
return mqb_dispatch_destroy(mqb) ;
- qpt_mutex_lock(&mq->mutex) ;
+ qpt_mutex_lock(mq->mutex) ;
if (mq->head == NULL)
{
@@ -588,12 +593,12 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, mqb_rank_b priority)
switch (mq->type)
{
case mqt_cond_unicast:
- qpt_cond_signal(&mq->kick.cond.wait_here) ;
+ qpt_cond_signal(mq->kick.cond.wait_here) ;
--mq->waiters ;
break ;
case mqt_cond_broadcast:
- qpt_cond_broadcast(&mq->kick.cond.wait_here) ;
+ qpt_cond_broadcast(mq->kick.cond.wait_here) ;
mq->waiters = 0 ;
break ;
@@ -612,7 +617,7 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, mqb_rank_b priority)
} ;
} ;
- qpt_mutex_unlock(&mq->mutex) ;
+ qpt_mutex_unlock(mq->mutex) ;
} ;
/*------------------------------------------------------------------------------
@@ -655,7 +660,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
if (mq == NULL)
return NULL ;
- qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
+ qpt_mutex_lock(mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
while (1)
{
@@ -676,13 +681,13 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
case mqt_cond_unicast: /* Now wait here */
case mqt_cond_broadcast:
if ((arg == NULL) && (mq->kick.cond.interval <= 0))
- qpt_cond_wait(&mq->kick.cond.wait_here, &mq->mutex) ;
+ qpt_cond_wait(mq->kick.cond.wait_here, mq->mutex) ;
else
{
timeout_time = (arg != NULL) ? *(qtime_mono_t*)arg
: mq->kick.cond.timeout ;
- if (qpt_cond_timedwait(&mq->kick.cond.wait_here, &mq->mutex,
+ if (qpt_cond_timedwait(mq->kick.cond.wait_here, mq->mutex,
timeout_time) == 0)
{
/* Timed out -- update timeout time, if required */
@@ -742,7 +747,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
mq->tail_priority = NULL ;
done:
- qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+ qpt_mutex_unlock(mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
return mqb ;
} ;
@@ -783,7 +788,7 @@ mqueue_revoke(mqueue_queue mq, void* arg0, int num)
if (mq == NULL)
return 0 ;
- qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
+ qpt_mutex_lock(mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
did = 0 ;
prev = NULL ;
@@ -827,7 +832,7 @@ mqueue_revoke(mqueue_queue mq, void* arg0, int num)
} ;
} ;
- qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+ qpt_mutex_unlock(mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
return did ;
} ;
@@ -847,7 +852,7 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig)
if (!qpthreads_enabled)
return 0 ;
- qpt_mutex_lock(&mq->mutex) ;
+ qpt_mutex_lock(mq->mutex) ;
dassert( (mq->type == mqt_signal_unicast) ||
(mq->type == mqt_signal_broadcast) ) ;
@@ -864,7 +869,7 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig)
if (!kicked)
mqueue_dequeue_signal(mq, mtsig) ;
- qpt_mutex_unlock(&mq->mutex) ;
+ qpt_mutex_unlock(mq->mutex) ;
return kicked ;
} ;