summaryrefslogtreecommitdiffstats
path: root/lib/mqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mqueue.c')
-rw-r--r--lib/mqueue.c126
1 files changed, 76 insertions, 50 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c
index 8fa9fbd5..f816932b 100644
--- a/lib/mqueue.c
+++ b/lib/mqueue.c
@@ -18,12 +18,10 @@
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
-
-#include <string.h>
+#include "misc.h"
#include "memory.h"
#include "mqueue.h"
-#include "zassert.h"
/*==============================================================================
* These message queues are designed for inter-qpthread communication.
@@ -211,7 +209,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
memset(mq, 0, sizeof(struct mqueue_queue)) ;
if (qpt_freeze_qpthreads_enabled())
- qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ;
+ qpt_mutex_init_new(mq->mutex, qpt_mutex_quagga) ;
/* head, tail and tail_priority set NULL already */
/* count set zero already */
@@ -222,7 +220,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
{
case mqt_cond_unicast:
case mqt_cond_broadcast:
- qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ;
+ qpt_cond_init_new(mq->kick.cond.wait_here, qpt_cond_quagga) ;
if (MQUEUE_DEFAULT_INTERVAL != 0)
{
@@ -254,7 +252,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type)
extern void
mqueue_empty(mqueue_queue mq)
{
- mqueue_revoke(mq, NULL) ;
+ mqueue_revoke(mq, NULL, 0) ;
assert((mq->head == NULL) && (mq->count == 0)) ;
} ;
@@ -268,19 +266,22 @@ mqueue_empty(mqueue_queue mq)
* NB: there MUST NOT be ANY waiters !
*/
extern mqueue_queue
-mqueue_reset(mqueue_queue mq, int free_structure)
+mqueue_reset(mqueue_queue mq, free_keep_b free_structure)
{
+ if (mq == NULL)
+ return NULL ;
+
mqueue_empty(mq) ;
passert(mq->waiters == 0) ;
- qpt_mutex_destroy_keep(&mq->mutex) ;
+ qpt_mutex_destroy_keep(mq->mutex) ;
switch (mq->type)
{
case mqt_cond_unicast:
case mqt_cond_broadcast:
- qpt_cond_destroy_keep(&mq->kick.cond.wait_here) ;
+ qpt_cond_destroy_keep(mq->kick.cond.wait_here) ;
break;
case mqt_signal_unicast:
@@ -327,13 +328,10 @@ mqueue_local_init_new(mqueue_local_queue lmq)
*
* Dequeues entries and dispatches them "mqb_destroy", to empty the queue.
*
- * See: mqueue_local_reset_keep(lmq)
- * mqueue_local_reset_free(lmq)
- *
* Returns address of Local Message Queue
*/
extern mqueue_local_queue
-mqueue_local_reset(mqueue_local_queue lmq, int free_structure)
+mqueue_local_reset(mqueue_local_queue lmq, free_keep_b free_structure)
{
mqueue_block mqb ;
@@ -362,7 +360,7 @@ mqueue_local_reset(mqueue_local_queue lmq, int free_structure)
extern void
mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval)
{
- qpt_mutex_lock(&mq->mutex) ;
+ qpt_mutex_lock(mq->mutex) ;
dassert( (mq->type == mqt_cond_unicast) ||
(mq->type == mqt_cond_broadcast) ) ;
@@ -370,7 +368,7 @@ mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval)
mq->kick.cond.interval = interval ;
mq->kick.cond.timeout = (interval > 0) ? qt_add_monotonic(interval)
: 0 ;
- qpt_mutex_unlock(&mq->mutex) ;
+ qpt_mutex_unlock(mq->mutex) ;
} ;
/*==============================================================================
@@ -484,9 +482,12 @@ mqb_re_init(mqueue_block mqb, mqueue_action action, void* arg0)
* NB: it is the caller's responsibility to free the value of any argument that
* requires it.
*/
-extern void
+extern mqueue_block
mqb_free(mqueue_block mqb)
{
+ if (mqb == NULL)
+ return NULL ;
+
if (mqb->argv != NULL)
XFREE(MTYPE_MQUEUE_BLOCK_ARGV, mqb->argv) ;
@@ -497,6 +498,8 @@ mqb_free(mqueue_block mqb)
++mqb_free_count ;
qpt_mutex_unlock(&mqb_mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+
+ return NULL ;
} ;
/*==============================================================================
@@ -533,12 +536,12 @@ static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ;
* never be any waiters... so no kicking is ever done.
*/
extern void
-mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, enum mqb_rank priority)
+mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, mqb_rank_b priority)
{
if (mq == NULL)
return mqb_dispatch_destroy(mqb) ;
- qpt_mutex_lock(&mq->mutex) ;
+ qpt_mutex_lock(mq->mutex) ;
if (mq->head == NULL)
{
@@ -588,12 +591,12 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, enum mqb_rank priority)
switch (mq->type)
{
case mqt_cond_unicast:
- qpt_cond_signal(&mq->kick.cond.wait_here) ;
+ qpt_cond_signal(mq->kick.cond.wait_here) ;
--mq->waiters ;
break ;
case mqt_cond_broadcast:
- qpt_cond_broadcast(&mq->kick.cond.wait_here) ;
+ qpt_cond_broadcast(mq->kick.cond.wait_here) ;
mq->waiters = 0 ;
break ;
@@ -612,7 +615,7 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, enum mqb_rank priority)
} ;
} ;
- qpt_mutex_unlock(&mq->mutex) ;
+ qpt_mutex_unlock(mq->mutex) ;
} ;
/*------------------------------------------------------------------------------
@@ -655,7 +658,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
if (mq == NULL)
return NULL ;
- qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
+ qpt_mutex_lock(mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
while (1)
{
@@ -676,13 +679,13 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
case mqt_cond_unicast: /* Now wait here */
case mqt_cond_broadcast:
if ((arg == NULL) && (mq->kick.cond.interval <= 0))
- qpt_cond_wait(&mq->kick.cond.wait_here, &mq->mutex) ;
+ qpt_cond_wait(mq->kick.cond.wait_here, mq->mutex) ;
else
{
timeout_time = (arg != NULL) ? *(qtime_mono_t*)arg
: mq->kick.cond.timeout ;
- if (qpt_cond_timedwait(&mq->kick.cond.wait_here, &mq->mutex,
+ if (qpt_cond_timedwait(mq->kick.cond.wait_here, mq->mutex,
timeout_time) == 0)
{
/* Timed out -- update timeout time, if required */
@@ -742,7 +745,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg)
mq->tail_priority = NULL ;
done:
- qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+ qpt_mutex_unlock(mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
return mqb ;
} ;
@@ -755,35 +758,41 @@ done:
*
* Revokes by calling mqb_dispatch_destroy().
*
- * During a revoke() operation more items may be enqueued, but no other mqueue
- * operations may be performed. Enqueued items may promptly be revoked, except
- * for priority items if the revoke operation has already moved past the last
- * priority item.
+ * NB: for safety, holds the queue locked for the duration of the revoke
+ * operation.
+ *
+ * If the destroy code can handle it, this means that can revoke stuff
+ * from one thread even though it is usually only dequeued by another.
+ *
+ * The danger is that if queues get very long, and many revokes happen,
+ * may (a) spend a lot of time scanning the message queue, which stops
+ * other threads as soon as they try to enqueue anything, and (b) if this
+ * happens a lot, could end up in an O(n^2) thing scanning the message
+ * queue once for each revoked object type.
*
* If mq is NULL, does nothing.
+ *
+ * If num > 0, stops after revoking that many messages.
+ *
+ * Returns: number of messages revoked.
*/
-extern void
-mqueue_revoke(mqueue_queue mq, void* arg0)
+extern int
+mqueue_revoke(mqueue_queue mq, void* arg0, int num)
{
mqueue_block mqb ;
mqueue_block prev ;
+ int did ;
if (mq == NULL)
- return ;
+ return 0 ;
- qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
+ qpt_mutex_lock(mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
+ did = 0 ;
prev = NULL ;
- while (1)
+ mqb = mq->head ;
+ while (mqb != NULL)
{
- if (prev == NULL)
- mqb = mq->head ;
- else
- mqb = prev->next ;
-
- if (mqb == NULL)
- break ;
-
if ((arg0 == NULL) || (arg0 == mqb->arg0))
{
assert(mq->count > 0) ;
@@ -800,16 +809,30 @@ mqueue_revoke(mqueue_queue mq, void* arg0)
mq->tail_priority = prev ;
--mq->count ;
+ ++did ;
+
+ mqb_dispatch_destroy(mqb) ;
+
+ if (num == 1)
+ break ;
+ else if (num > 1)
+ --num ;
- qpt_mutex_unlock(&mq->mutex) ;
- mqb_dispatch_destroy(mqb) ;
- qpt_mutex_lock(&mq->mutex) ;
+ if (prev == NULL)
+ mqb = mq->head ;
+ else
+ mqb = prev->next ;
}
else
- prev = mqb ;
+ {
+ prev = mqb ;
+ mqb = mqb->next ;
+ } ;
} ;
- qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+ qpt_mutex_unlock(mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+
+ return did ;
} ;
/*------------------------------------------------------------------------------
@@ -827,7 +850,7 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig)
if (!qpthreads_enabled)
return 0 ;
- qpt_mutex_lock(&mq->mutex) ;
+ qpt_mutex_lock(mq->mutex) ;
dassert( (mq->type == mqt_signal_unicast) ||
(mq->type == mqt_signal_broadcast) ) ;
@@ -844,7 +867,7 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig)
if (!kicked)
mqueue_dequeue_signal(mq, mtsig) ;
- qpt_mutex_unlock(&mq->mutex) ;
+ qpt_mutex_unlock(mq->mutex) ;
return kicked ;
} ;
@@ -930,8 +953,11 @@ mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread,
* Otherwise zeroises the structure, and returns address of same.
*/
extern mqueue_thread_signal
-mqueue_thread_signal_reset(mqueue_thread_signal mqt, int free_structure)
+mqueue_thread_signal_reset(mqueue_thread_signal mqt, free_keep_b free_structure)
{
+ if (mqt == NULL)
+ return NULL ;
+
passert(mqt->prev == NULL) ;
if (free_structure)