From 5cae7eea451f2b7d65b5892e2c1dafc70f8b836e Mon Sep 17 00:00:00 2001 From: Chris Hall Date: Sun, 13 Feb 2011 23:11:45 +0000 Subject: Second tranche of updates for pipework branch. modified: bgpd/bgp_connection.c modified: bgpd/bgp_debug.c modified: bgpd/bgp_engine.h modified: bgpd/bgp_main.c modified: bgpd/bgp_packet.c modified: bgpd/bgp_peer.c modified: bgpd/bgp_route.c modified: bgpd/bgp_routemap.c modified: bgpd/bgp_session.c modified: bgpd/bgp_vty.c modified: bgpd/bgpd.c modified: bgpd/bgpd.h modified: configure.ac modified: isisd/dict.h modified: isisd/isis_misc.c modified: isisd/isis_routemap.c modified: isisd/isis_spf.c modified: lib/Makefile.am modified: lib/command.c modified: lib/command.h modified: lib/command_execute.h modified: lib/command_parse.c modified: lib/command_parse.h modified: lib/command_queue.c modified: lib/command_queue.h modified: lib/elstring.h modified: lib/heap.c modified: lib/if.c modified: lib/if.h modified: lib/keychain.c modified: lib/keystroke.c modified: lib/keystroke.h modified: lib/list_util.c modified: lib/list_util.h modified: lib/log.c modified: lib/log.h modified: lib/memory.c modified: lib/memory.h modified: lib/memtypes.c modified: lib/misc.h modified: lib/mqueue.c modified: lib/mqueue.h deleted: lib/node_type.h modified: lib/pthread_safe.c modified: lib/qfstring.c modified: lib/qiovec.c modified: lib/qiovec.h modified: lib/qpath.c modified: lib/qpnexus.c modified: lib/qpnexus.h modified: lib/qpselect.c modified: lib/qpthreads.h modified: lib/qstring.c modified: lib/qstring.h modified: lib/qtime.c modified: lib/qtime.h modified: lib/qtimers.c modified: lib/qtimers.h modified: lib/routemap.c modified: lib/symtab.h modified: lib/thread.h deleted: lib/uty.h modified: lib/vector.c modified: lib/vector.h modified: lib/version.h.in modified: lib/vio_fifo.c modified: lib/vio_fifo.h modified: lib/vio_lines.c modified: lib/vio_lines.h modified: lib/vty.c modified: lib/vty.h modified: lib/vty_cli.c modified: lib/vty_cli.h modified: lib/vty_io.c modified: lib/vty_io.h modified: lib/vty_io_basic.c modified: lib/vty_io_basic.h modified: lib/vty_io_file.c modified: lib/vty_io_file.h modified: lib/vty_io_shell.c modified: lib/vty_io_term.c modified: lib/vty_io_term.h modified: lib/vty_local.h modified: lib/vty_pipe.c modified: lib/workqueue.h modified: lib/zebra.h modified: ospf6d/ospf6_lsa.c modified: ripngd/ripngd.c modified: tests/test-list_util.c modified: tests/test-vector.c modified: vtysh/vtysh.c modified: vtysh/vtysh_config.c --- lib/mqueue.c | 79 +++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 51 insertions(+), 28 deletions(-) (limited to 'lib/mqueue.c') diff --git a/lib/mqueue.c b/lib/mqueue.c index 8fa9fbd5..c7037a64 100644 --- a/lib/mqueue.c +++ b/lib/mqueue.c @@ -254,7 +254,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) extern void mqueue_empty(mqueue_queue mq) { - mqueue_revoke(mq, NULL) ; + mqueue_revoke(mq, NULL, 0) ; assert((mq->head == NULL) && (mq->count == 0)) ; } ; @@ -268,8 +268,11 @@ mqueue_empty(mqueue_queue mq) * NB: there MUST NOT be ANY waiters ! */ extern mqueue_queue -mqueue_reset(mqueue_queue mq, int free_structure) +mqueue_reset(mqueue_queue mq, free_keep_b free_structure) { + if (mq == NULL) + return NULL ; + mqueue_empty(mq) ; passert(mq->waiters == 0) ; @@ -327,13 +330,10 @@ mqueue_local_init_new(mqueue_local_queue lmq) * * Dequeues entries and dispatches them "mqb_destroy", to empty the queue. * - * See: mqueue_local_reset_keep(lmq) - * mqueue_local_reset_free(lmq) - * * Returns address of Local Message Queue */ extern mqueue_local_queue -mqueue_local_reset(mqueue_local_queue lmq, int free_structure) +mqueue_local_reset(mqueue_local_queue lmq, free_keep_b free_structure) { mqueue_block mqb ; @@ -533,7 +533,7 @@ static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ; * never be any waiters... so no kicking is ever done. */ extern void -mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, enum mqb_rank priority) +mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, mqb_rank_b priority) { if (mq == NULL) return mqb_dispatch_destroy(mqb) ; @@ -755,35 +755,41 @@ done: * * Revokes by calling mqb_dispatch_destroy(). * - * During a revoke() operation more items may be enqueued, but no other mqueue - * operations may be performed. Enqueued items may promptly be revoked, except - * for priority items if the revoke operation has already moved past the last - * priority item. + * NB: for safety, holds the queue locked for the duration of the revoke + * operation. + * + * If the destroy code can handle it, this means that can revoke stuff + * from one thread even though it is usually only dequeued by another. + * + * The danger is that if queues get very long, and many revokes happen, + * may (a) spend a lot of time scanning the message queue, which stops + * other threads as soon as they try to enqueue anything, and (b) if this + * happens a lot, could end up in an O(n^2) thing scanning the message + * queue once for each revoked object type. * * If mq is NULL, does nothing. + * + * If num > 0, stops after revoking that many messages. + * + * Returns: number of messages revoked. */ -extern void -mqueue_revoke(mqueue_queue mq, void* arg0) +extern int +mqueue_revoke(mqueue_queue mq, void* arg0, int num) { mqueue_block mqb ; mqueue_block prev ; + int did ; if (mq == NULL) - return ; + return 0 ; qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ + did = 0 ; prev = NULL ; - while (1) + mqb = mq->head ; + while (mqb != NULL) { - if (prev == NULL) - mqb = mq->head ; - else - mqb = prev->next ; - - if (mqb == NULL) - break ; - if ((arg0 == NULL) || (arg0 == mqb->arg0)) { assert(mq->count > 0) ; @@ -800,16 +806,30 @@ mqueue_revoke(mqueue_queue mq, void* arg0) mq->tail_priority = prev ; --mq->count ; + ++did ; + + mqb_dispatch_destroy(mqb) ; - qpt_mutex_unlock(&mq->mutex) ; - mqb_dispatch_destroy(mqb) ; - qpt_mutex_lock(&mq->mutex) ; + if (num == 1) + break ; + else if (num > 1) + --num ; + + if (prev == NULL) + mqb = mq->head ; + else + mqb = prev->next ; } else - prev = mqb ; + { + prev = mqb ; + mqb = mqb->next ; + } ; } ; qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ + + return did ; } ; /*------------------------------------------------------------------------------ @@ -930,8 +950,11 @@ mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread, * Otherwise zeroises the structure, and returns address of same. */ extern mqueue_thread_signal -mqueue_thread_signal_reset(mqueue_thread_signal mqt, int free_structure) +mqueue_thread_signal_reset(mqueue_thread_signal mqt, free_keep_b free_structure) { + if (mqt == NULL) + return NULL ; + passert(mqt->prev == NULL) ; if (free_structure) -- cgit v1.2.3