diff options
author | Chris Hall <GMCH@hestia.halldom.com> | 2010-02-01 17:37:31 +0000 |
---|---|---|
committer | Chris Hall <GMCH@hestia.halldom.com> | 2010-02-01 17:37:31 +0000 |
commit | 6fded334f74bbb3271c2293636677db27f86ec42 (patch) | |
tree | a10c84b7830c7c3f8fbe6d591ff4f68da2682c0f /lib | |
parent | b5be77ccc56a4345fd18c97f2daa7cf4342a2438 (diff) | |
download | quagga-6fded334f74bbb3271c2293636677db27f86ec42.tar.bz2 quagga-6fded334f74bbb3271c2293636677db27f86ec42.tar.xz |
Added count to mqueue structure and implemented resets.
modified: bgpd/bgp_peer.c -- collecting notification
modified: lib/mqueue.c
modified: lib/mqueue.h
Diffstat (limited to 'lib')
-rw-r--r-- | lib/mqueue.c | 161 | ||||
-rw-r--r-- | lib/mqueue.h | 25 |
2 files changed, 150 insertions, 36 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c index de1d654c..a6dca32f 100644 --- a/lib/mqueue.c +++ b/lib/mqueue.c @@ -155,6 +155,7 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ; /* head, tail and tail_priority set NULL already */ + /* count set zero already */ /* waiters set zero already */ mq->type = type ; @@ -162,8 +163,8 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) { case mqt_cond_unicast: case mqt_cond_broadcast: - if (qpthreads_enabled) - qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ; + qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ; + if (MQUEUE_DEFAULT_INTERVAL != 0) { mq->kick.cond.interval = MQUEUE_DEFAULT_INTERVAL ; @@ -185,6 +186,62 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) } ; /*------------------------------------------------------------------------------ + * Empty message queue -- by revoking everything. + * + * Leaves queue ready for continued use with all existing settings. + * + * If there were any waiters, they are still waiting. + */ +extern void +mqueue_empty(mqueue_queue mq) +{ + mqueue_revoke(mq, NULL) ; + + assert((mq->head == NULL) && (mq->count == 0)) ; +} ; + +/*------------------------------------------------------------------------------ + * Reset message queue -- empty it out by revoking everything. + * + * Frees the structure if required, and returns NULL. + * Otherwise zeroises the structure, and returns address of same. + * + * NB: there MUST NOT be ANY waiters ! + */ +extern mqueue_queue +mqueue_reset(mqueue_queue mq, int free_structure) +{ + mqueue_empty(mq) ; + + passert(mq->waiters == 0) ; + + qpt_mutex_destroy_keep(&mq->mutex) ; + + switch (mq->type) + { + case mqt_cond_unicast: + case mqt_cond_broadcast: + qpt_cond_destroy_keep(&mq->kick.cond.wait_here) ; + break; + + case mqt_signal_unicast: + case mqt_signal_broadcast: + passert(mq->kick.signal.head == NULL) ; + break; + + default: + zabort("Invalid mqueue queue type") ; + } ; + + if (free_structure) + XFREE(MTYPE_MQUEUE_QUEUE, mq) ; /* sets mq == NULL */ + else + memset(mq, 0, sizeof(struct mqueue_queue)) ; + + return mq ; +} ; + +/*------------------------------------------------------------------------------ * Initialise new Local Message Queue, if required (lmq == NULL) allocating it. * * Returns address of Local Message Queue @@ -426,38 +483,45 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority) if (mq->head == NULL) { + assert(mq->count == 0) ; mqb->next = NULL ; mq->head = mqb ; mq->tail_priority = priority ? mqb : NULL ; mq->tail = mqb ; } - else if (priority) - { - mqueue_block after = mq->tail_priority ; - if (after == NULL) - { - mqb->next = mq->head ; - mq->head = mqb ; - /* mq non-empty, enchain at head, therefore tail unaffected */ - } - else - { - mqb->next = after->next ; - after->next = mqb ; - /* if only have priority messages then fix tail */ - if (mq->tail == after) - mq->tail = mqb; - } - mq->tail_priority = mqb ; - } else { - dassert(mq->tail != NULL) ; - mqb->next = NULL ; - mq->tail->next = mqb ; - mq->tail = mqb ; + assert(mq->count > 0) ; + if (priority) + { + mqueue_block after = mq->tail_priority ; + if (after == NULL) + { + mqb->next = mq->head ; + mq->head = mqb ; + /* mq non-empty, enchain at head, therefore tail unaffected */ + } + else + { + mqb->next = after->next ; + after->next = mqb ; + /* if only have priority messages then fix tail */ + if (mq->tail == after) + mq->tail = mqb; + } + mq->tail_priority = mqb ; + } + else + { + dassert(mq->tail != NULL) ; + mqb->next = NULL ; + mq->tail->next = mqb ; + mq->tail = mqb ; + } ; } ; + ++mq->count ; + if (mq->waiters != 0) { dassert(qpthreads_enabled) ; /* waiters == 0 if !qpthreads_enabled */ @@ -527,7 +591,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) mqueue_thread_signal mtsig ; qtime_mono_t timeout_time ; - qpt_mutex_lock(&mq->mutex) ; + qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ while (1) { @@ -535,6 +599,8 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) if (mqb != NULL) break ; /* Easy if queue not empty */ + assert(mq->count == 0) ; + if (!wait || !qpthreads_enabled) goto done ; /* Easy if not waiting ! mqb == NULL */ /* Short circuit if !qpthreads_enabled */ @@ -600,6 +666,9 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) /* Have something to pull off the queue */ + assert(mq->count > 0) ; + --mq->count ; + mq->head = mqb->next ; /* fix tails if at tail */ @@ -609,7 +678,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) mq->tail_priority = NULL ; done: - qpt_mutex_unlock(&mq->mutex) ; + qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ return mqb ; } ; @@ -633,7 +702,7 @@ mqueue_revoke(mqueue_queue mq, void* arg0) mqueue_block mqb ; mqueue_block prev ; - qpt_mutex_lock(&mq->mutex) ; + qpt_mutex_lock(&mq->mutex) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ prev = NULL ; while (1) @@ -648,6 +717,8 @@ mqueue_revoke(mqueue_queue mq, void* arg0) if ((arg0 == NULL) || (arg0 == mqb->arg0)) { + assert(mq->count > 0) ; + if (prev == NULL) mq->head = mqb->next ; else @@ -659,6 +730,8 @@ mqueue_revoke(mqueue_queue mq, void* arg0) if (mqb == mq->tail_priority) mq->tail_priority = prev ; + --mq->count ; + qpt_mutex_unlock(&mq->mutex) ; mqb_dispatch_destroy(mqb) ; qpt_mutex_lock(&mq->mutex) ; @@ -667,7 +740,7 @@ mqueue_revoke(mqueue_queue mq, void* arg0) prev = mqb ; } ; - qpt_mutex_unlock(&mq->mutex) ; + qpt_mutex_unlock(&mq->mutex) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ } ; /*------------------------------------------------------------------------------ @@ -752,7 +825,8 @@ mqueue_local_dequeue(mqueue_local_queue lmq) * Message queue signal handling */ -/* Initialise a message queue signal structure (struct mqueue_thread_signal). +/*------------------------------------------------------------------------------ + * Initialise a message queue signal structure (struct mqueue_thread_signal). * Allocate one if required. * * If !pthreads_enabled, then this structure is entirely redundant, but there @@ -778,7 +852,29 @@ mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread, return mqt ; } ; -/* Signal the first 'n' threads on the to be signalled list. +/*------------------------------------------------------------------------------ + * Reset a message queue signal structure and release if required. + * + * NB: MUST NOT be queued as a waiter anywhere !! + * + * Frees the structure if required, and returns NULL. + * Otherwise zeroises the structure, and returns address of same. + */ +mqueue_thread_signal +mqueue_thread_signal_reset(mqueue_thread_signal mqt, int free_structure) +{ + passert(mqt->prev == NULL) ; + + if (free_structure) + XFREE(MTYPE_MQUEUE_THREAD_SIGNAL, mqt) ; /* sets mqt = NULL */ + else + memset(mqt, 0, sizeof(struct mqueue_thread_signal)) ; + + return mqt ; +} ; + +/*------------------------------------------------------------------------------ + * Signal the first 'n' threads on the to be signalled list. * * Removes the threads from the list and reduces the waiters count. * @@ -802,7 +898,8 @@ mqueue_kick_signal(mqueue_queue mq, unsigned n) } ; } ; -/* Remove given signal from given message queue. +/*------------------------------------------------------------------------------ + * Remove given signal from given message queue. * * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that * the thread can tell that its signal has been kicked. diff --git a/lib/mqueue.h b/lib/mqueue.h index 2bd96896..d8790246 100644 --- a/lib/mqueue.h +++ b/lib/mqueue.h @@ -161,11 +161,13 @@ typedef struct mqueue_queue* mqueue_queue ; struct mqueue_queue { - qpt_mutex_t mutex ; + qpt_mutex_t mutex ; - mqueue_block head ; /* NULL => list is empty */ - mqueue_block tail_priority ; /* last priority message (if any & not empty) */ - mqueue_block tail ; /* last message (if not empty) */ + mqueue_block head ; /* NULL => list is empty */ + mqueue_block tail_priority ; /* last priority message (if any & not empty) */ + mqueue_block tail ; /* last message (if not empty) */ + + unsigned count ; /* of items on the queue */ enum mqueue_queue_type type ; @@ -195,6 +197,15 @@ mqueue_initialise(void) ; extern mqueue_queue mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) ; +extern void +mqueue_empty(mqueue_queue mq) ; + +extern mqueue_queue +mqueue_reset(mqueue_queue mq, int free_structure) ; + +#define mqueue_reset_keep(mq) mqueue_reset(mq, 0) +#define mqueue_reset_free(mq) mqueue_reset(mq, 1) + extern mqueue_local_queue mqueue_local_init_new(mqueue_local_queue lmq) ; @@ -210,6 +221,12 @@ mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) ; extern mqueue_thread_signal mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread, int signum) ; +mqueue_thread_signal +mqueue_thread_signal_reset(mqueue_thread_signal mqt, int free_structure) ; + +#define mqueue_thread_signal_reset_keep(mqt) mqueue_thread_signal_reset(mqt, 0) +#define mqueue_thread_signal_reset_free(mqt) mqueue_thread_signal_reset(mqt, 1) + extern mqueue_block mqb_init_new(mqueue_block mqb, mqueue_action action, void* arg0) ; |