diff options
author | Chris Hall (GMCH) <chris.hall@highwayman.com> | 2009-12-03 20:10:30 +0000 |
---|---|---|
committer | Chris Hall (GMCH) <chris.hall@highwayman.com> | 2009-12-03 20:10:30 +0000 |
commit | 1a720bbef1c1bbe6bf29abe34b736e077e8dd864 (patch) | |
tree | ba93418726f6aab3084d570d9893ab29cb306e58 | |
parent | 606acaa3264d6868ad06d1874137c6aa81ad14e5 (diff) | |
download | quagga-1a720bbef1c1bbe6bf29abe34b736e077e8dd864.tar.bz2 quagga-1a720bbef1c1bbe6bf29abe34b736e077e8dd864.tar.xz |
Initial commit of lib/mqueue.c & .h
Adds message queue structure to manage the passing of messages
between qpthreads.
-rw-r--r-- | lib/Makefile.am | 4 | ||||
-rw-r--r-- | lib/memory.h | 6 | ||||
-rw-r--r-- | lib/memtypes.c | 3 | ||||
-rw-r--r-- | lib/mqueue.c | 567 | ||||
-rw-r--r-- | lib/mqueue.h | 140 |
5 files changed, 716 insertions, 4 deletions
diff --git a/lib/Makefile.am b/lib/Makefile.am index 08f834ae..1a5c73d9 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -13,7 +13,7 @@ libzebra_la_SOURCES = \ filter.c routemap.c distribute.c stream.c str.c log.c plist.c \ zclient.c sockopt.c smux.c md5.c if_rmap.c keychain.c privs.c \ sigevent.c pqueue.c jhash.c memtypes.c workqueue.c symtab.c heap.c \ - qtime.c qpthreads.c + qtime.c qpthreads.c mqueue.c BUILT_SOURCES = memtypes.h route_types.h @@ -29,7 +29,7 @@ pkginclude_HEADERS = \ plist.h zclient.h sockopt.h smux.h md5.h if_rmap.h keychain.h \ privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h \ workqueue.h route_types.h symtab.h heap.h \ - qtime.h qpthreads.h + qtime.h qpthreads.h mqueue.h EXTRA_DIST = regex.c regex-gnu.h memtypes.awk route_types.awk route_types.txt diff --git a/lib/memory.h b/lib/memory.h index 037efef2..fd9f1b97 100644 --- a/lib/memory.h +++ b/lib/memory.h @@ -21,6 +21,8 @@ Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA #ifndef _ZEBRA_MEMORY_H #define _ZEBRA_MEMORY_H +#include <stddef.h> + /* For pretty printing of memory allocate information. */ struct memory_list { @@ -33,10 +35,10 @@ struct mlist { const char *name; }; -#include "lib/memtypes.h" - extern struct mlist mlists[]; +#include "lib/memtypes.h" + /* #define MEMORY_LOG */ #ifdef MEMORY_LOG #define XMALLOC(mtype, size) \ diff --git a/lib/memtypes.c b/lib/memtypes.c index fb2557c8..18d8b8f8 100644 --- a/lib/memtypes.c +++ b/lib/memtypes.c @@ -32,6 +32,9 @@ struct memory_list memory_list_lib[] = { MTYPE_QPT_THREAD_ATTR, "qpt thread attributes" }, { MTYPE_QPT_MUTEX, "qpt mutex" }, { MTYPE_QPT_COND, "qpt condition variable" }, + { MTYPE_MQUEUE_QUEUE, "Mqueue queue structure" }, + { MTYPE_MQUEUE_BLOCKS, "Mqueue message blocks" }, + { MTYPE_MQUEUE_THREAD_SIGNAL, "Mqueue thread signal" }, { MTYPE_VTY, "VTY" }, { MTYPE_VTY_OUT_BUF, "VTY output buffer" }, { MTYPE_VTY_HIST, "VTY history" }, diff --git a/lib/mqueue.c b/lib/mqueue.c new file mode 100644 index 00000000..b07dce6b --- /dev/null +++ b/lib/mqueue.c @@ -0,0 +1,567 @@ +/* Message Queue data structure -- functions + * Copyright (C) 2009 Chris Hall (GMCH), Highwayman + * + * This file is part of GNU Zebra. + * + * GNU Zebra is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * GNU Zebra is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Zebra; see the file COPYING. If not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include <string.h> + +#include "memory.h" +#include "mqueue.h" +#include "zassert.h" + +/*============================================================================== + * These message queues are designed for inter-qpthread communication. + * + * A message queue carries messages from one or more qpthreads to one or more + * other qpthreads. + * + * A message queue has one ordinary priority queue and one high priority + * queue. + * + * There are four types of queue, depending on how qpthreads wait and how they + * are woken up: + * + * mqt_cond_unicast -- wait on condition variable, one waiter kicked + * mqt_cond_broadcast -- wait on condition variable, all waiters kicked + * mqt_signal_unicast -- wait for signal, one waiter kicked + * mqt_signal_broadcast -- wait for signal, all waiters kicked + * + * For condition variables there is a timeout mechanism so that waiters + * are woken up at least every now and then. The message queue maintains + * current timeout time and timeout interval variables. Each time a waiter + * waits it will do: + * + * next = now + interval + * if (now > current) || (next <= current) + * current = next and return (don't wait) + * else + * wait until current + * + * If the wait times out, the current is set to current + interval. + * + * The effect of this is to return at least at regular intervals from the wait, + * provided the queue is waited on within the timeout period. If the queue is + * waited on after the current timeout time, it returns immediately, updating + * the current timeout time -- the "clock" slips. + * + * There is a default timeout period. The period may be set "infinite". + * + * For waiters kicked by signal, the wait does not occur within the message + * queue code, but the need for a signal is recorded in the message queue. + * + * Messages take the form of a small block of information which contains: + * + * * flags -- used by the message handler + * * context -- identifies the context of the message (see revoke) + * + * * action -- void action(mqueue_block) message dispatch + * * arg_0 -- *void/uintptr_t/intptr_t ) standard arguments + * * arg_1 -- *void/uintptr_t/intptr_t ) + * + * (see struct mqueue_block). + * + * To send a message, first allocate a message block (see mqueue_block_new), + * then fill in the arguments and enqueue it. + * + * + */ + +/*============================================================================== + * Initialisation etc. for Message Queues. + * + * TODO: how to shut down a message queue... for reset/exit ? + */ + +/* Initialise new Message Queue, if required (mq == NULL) allocating it. + * + * For mqt_cond_xxx type queues, sets the default timeout interval and the + * initial timeout time to now + that interval. + */ + +mqueue_queue +mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) +{ + if (mq == NULL) + mq = XCALLOC(MTYPE_MQUEUE_QUEUE, sizeof(struct mqueue_queue)) ; + else + memset(mq, 0, sizeof(struct mqueue_queue)) ; + + qpt_mutex_init(&mq->mutex, qpt_mutex_quagga) ; + + /* head, tail and tail_priority set NULL already */ + /* waiters set zero already */ + + mq->type = type ; + switch (type) + { + case mqt_cond_unicast: + case mqt_cond_broadcast: + qpt_cond_init(&mq->kick.cond.wait_here, qpt_cond_quagga) ; + if (MQUEUE_DEFAULT_INTERVAL != 0) + { + mq->kick.cond.interval = MQUEUE_DEFAULT_INTERVAL ; + mq->kick.cond.timeout = + qpt_cond_get_timeout_time(MQUEUE_DEFAULT_INTERVAL) ; + } ; + break; + + case mqt_signal_unicast: + case mqt_signal_broadcast: + /* head/tail pointers set NULL already */ + break; + + default: + zabort("Invalid mqueue queue type") ; + } ; + + return mq ; +} ; + +/* Set new timeout interval (or unset by setting <= 0) + * + * Sets the next timeout to be the time now + new interval (or never). + */ +void +mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) +{ + qpt_mutex_lock(&mq->mutex) ; + + dassert( (mq->type == mqt_cond_unicast) || + (mq->type == mqt_cond_broadcast) ) ; + + mq->kick.cond.interval = interval ; + mq->kick.cond.timeout = (interval > 0) ? qpt_cond_get_timeout_time(interval) + : 0 ; + qpt_mutex_unlock(&mq->mutex) ; +} ; + +/*============================================================================== + * Message Block memory management. + * + * Allocates message_block structures in lots of 256. Uses first message_block + * in each lot to keep track of the lots. + * + * mqueue_initialise MUST be called before the first message block is allocated. + */ + +static pthread_mutex_t* p_mb_mutex ; /* NULL => no mutex (yet) */ +static pthread_mutex_t mb_mutex ; + +#define MB_LOT_SIZE 256 + +static mqueue_block mb_lot_list = NULL ; +static mqueue_block mb_free_list = NULL ; + +static mqueue_block mqueue_block_new_lot(void) ; + +/* Get an empty message block + */ +mqueue_block +mqueue_block_new(void) +{ + mqueue_block mb ; + + qpt_mutex_lock(&mb_mutex) ; + + mb = mb_free_list ; + if (mb == NULL) + mb = mqueue_block_new_lot() ; + + mb_free_list = mb->next ; + + qpt_mutex_unlock(&mb_mutex) ; + + memset(mb, 0, sizeof(struct mqueue_block)) ; + + return mb ; +} ; + +/* Free message block when done with it. + */ +void +mqueue_block_free(mqueue_block mb) +{ + qpt_mutex_lock(&mb_mutex) ; + + mb->next = mb_free_list ; + mb_free_list = mb ; + + qpt_mutex_unlock(&mb_mutex) ; +} ; + +/* Make a new lot of empty message_block structures. + * + * NB: caller MUST hold the mb_mutex. + * + */ +static mqueue_block +mqueue_block_new_lot(void) +{ + mqueue_block first, last, this ; + + mqueue_block new = XCALLOC(MTYPE_MQUEUE_BLOCKS, + SIZE(struct mqueue_block, MB_LOT_SIZE)) ; + first = &new[1] ; + last = &new[MB_LOT_SIZE - 1] ; + + new->next = mb_lot_list ; /* add to list of lots */ + mb_lot_list = new ; + + /* String all the new message_blocks together. */ + this = last ; + while (this > first) + { + mqueue_block prev = this-- ; + this->next = prev ; + } ; + assert(this == first) ; + + last->next = mb_free_list ; /* point last at old free list */ + mb_free_list = first ; /* new blocks at head of free list */ + + return mb_free_list ; +} ; + +/*============================================================================== + * Enqueue and dequeue messages. + */ + +static void mqueue_kick_signal(mqueue_queue mq, int n) ; +static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ; + +/* Enqueue message. + * + * If priority != 0, will enqueue after any previously enqueued priority + * messages. + * + * If there are any waiters, then we kick one or all of them. + * + * Note that we decrement or zero the waiters count here -- because if the + * waiter did it, they might not run before something else is enqueued. + * Similarly, if the kick uses a signal, the signal block is dequeued here. + * + * The waiter count is only incremented when a dequeue is attempted and the + * queue is empty, then: + * + * for a broadcast type message queue, the first message that arrives will + * kick all the waiters into action. + * + * for a signal type message queue, each message that arrives will kick one + * waiter. + */ + +void +mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority) +{ + qpt_mutex_lock(&mq->mutex) ; + + if (mq->head == NULL) + { + mb->next = NULL ; + mq->head = mb ; + mq->tail_priority = priority ? mb : NULL ; + mq->tail = mb ; + } + else if (priority) + { + mqueue_block after = mq->tail_priority ; + if (after == NULL) + { + mb->next = mq->head ; + mq->head = mb ; + } + else + { + mb->next = after->next ; + after->next = mb ; + } + mq->tail_priority = mb ; + } + else + { + dassert(mq->tail != NULL) ; + mb->next = NULL ; + mq->tail->next = mb ; + mq->tail = mb ; + } ; + + if (mq->waiters != 0) + { + switch (mq->type) + { + case mqt_cond_unicast: + qpt_cond_signal(&mq->kick.cond.wait_here) ; + --mq->waiters ; + break ; + + case mqt_cond_broadcast: + qpt_cond_broadcast(&mq->kick.cond.wait_here) ; + mq->waiters = 0 ; + break ; + + case mqt_signal_unicast: + mqueue_kick_signal(mq, 1) ; /* pick off first and kick it (MUST be */ + /* one) and decrement the waiters count */ + break ; + + case mqt_signal_broadcast: + mqueue_kick_signal(mq, mq->waiters) ; + dassert(mq->kick.signal.head == NULL) ; + break; + + default: + zabort("Invalid mqueue queue type") ; + } ; + } ; + + qpt_mutex_unlock(&mq->mutex) ; +} ; + +/* Dequeue message. + * + * If the queue is empty and wait != 0, will wait for a message. In which + * case for: + * + * * mqt_cond_xxxx type message queues, will wait on the condition variable, + * and may time-out. (mtsig argument MUST be NULL.) + * + * * mqt_signal_xxxx type message queues, will register the given signal + * (mtsig argument MUST be provided), and return immediately. + * + * Returns a message block if one is available. (And not otherwise.) + */ + +mqueue_block +mqueue_dequeue(mqueue_queue mq, int wait, mqueue_thread_signal mtsig) +{ + mqueue_block mb ; + mqueue_thread_signal last ; + + qpt_mutex_lock(&mq->mutex) ; + + while (1) + { + mb = mq->head ; + if (mb != NULL) + break ; /* Easy if queue not empty */ + + if (!wait) + goto done ; /* Easy if not waiting ! mb == NULL */ + + ++mq->waiters ; /* Another waiter */ + + switch (mq->type) + { + case mqt_cond_unicast: /* Now wait here */ + case mqt_cond_broadcast: + dassert(mtsig == NULL) ; + + if (mq->kick.cond.interval <= 0) + qpt_cond_wait(&mq->kick.cond.wait_here, &mq->mutex) ; + else + { + qtime_t now = qpt_cond_get_timeout_time(0) ; + +#if QPT_COND_CLOCK_MONOTONIC + dassert(now >= (mq->kick.cond.timeout - mq->kick.cond.interval)) ; + if (now > mq->kick.cond.timeout) +#else + if ( (now > mq->kick.cond.timeout) + || (now < (mq->kick.cond.timeout - mq->kick.cond.interval)) ) +#endif + { + /* the "clock" has slipped. Reset it and return now. */ + mq->kick.cond.timeout = now + mq->kick.cond.interval ; + goto done ; /* immediate return. mb == NULL */ + } + else + { + if (qpt_cond_timedwait(&mq->kick.cond.wait_here, &mq->mutex, + mq->kick.cond.timeout) == 0) + mq->kick.cond.timeout += mq->kick.cond.interval ; + } ; + } ; + break ; + + case mqt_signal_unicast: /* Register desire for signal */ + case mqt_signal_broadcast: + dassert(mtsig != NULL) ; + + last = mq->kick.signal.tail ; + if (last == NULL) + { + mq->kick.signal.head = mtsig ; + mtsig->prev = (void*)mq ; + } + else + { + last->next = mtsig ; + mtsig->prev = last ; + } + mtsig->next = NULL ; + mq->kick.signal.tail = mtsig ; + + goto done ; /* BUT do not wait ! mb == NULL */ + + default: + zabort("Invalid mqueue queue type") ; + } ; + } ; + + /* Have something to pull off the queue */ + + mq->head = mb->next ; + if (mb == mq->tail_priority) + mq->tail_priority = NULL ; + +done: + qpt_mutex_unlock(&mq->mutex) ; + + return mb ; +} ; + +/* No longer waiting for a signal. + * + * Returns true <=> signal has been kicked. + */ +int +mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) +{ + int kicked ; + + qpt_mutex_lock(&mq->mutex) ; + + dassert( (mq->type == mqt_signal_unicast) || + (mq->type == mqt_signal_broadcast) ) ; + dassert(mtsig != NULL) ; + + /* When the thread is signalled, the prev entry is set NULL and the */ + /* waiters count is decremented. */ + /* */ + /* So, only need to do something here if the prev is not NULL (ie the */ + /* mqueue_thread_signal is still on the list. */ + + kicked = (mtsig->prev == NULL) ; + + if (!kicked) + mqueue_dequeue_signal(mq, mtsig) ; + + qpt_mutex_unlock(&mq->mutex) ; + + return kicked ; +} ; + +/*============================================================================== + * Message queue signal handling + */ + +/* Initialise a message queue signal structure (struct mqueue_thread_signal). + * Allocate one if required. + * + * Returns address of the structure. + */ +mqueue_thread_signal +mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread, + int signum) +{ + if (mqt == NULL) + mqt = XCALLOC(MTYPE_MQUEUE_THREAD_SIGNAL, + sizeof(struct mqueue_thread_signal)) ; + else + memset(mqt, 0, sizeof(struct mqueue_thread_signal)) ; + + /* next and prev fields set to NULL already. */ + + mqt->qpthread = thread ; + mqt->signum = signum ; + + return mqt ; +} ; + +/* Signal the first 'n' threads on the to be signalled list. + * + * Removes the threads from the list and reduces the waiters count. + * + * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that + * the thread can tell that its signal has been kicked. + * + * NB: *** MUST own the mqueue_queue mutex. *** + */ +static void +mqueue_kick_signal(mqueue_queue mq, int n) +{ + mqueue_thread_signal mtsig ; + + while (n--) + { + mqueue_dequeue_signal(mq, mtsig = mq->kick.signal.head) ; + qpt_thread_signal(mtsig->qpthread, mtsig->signum) ; + } ; +} ; + +/* Remove given signal from given message queue. + * + * NB: *** MUST own the mqueue_queue mutex. *** + */ +static void +mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) +{ + mqueue_thread_signal next ; + mqueue_thread_signal prev ; + + dassert((mq->kick.signal.head != NULL) && (mq->waiters != 0)) ; + + next = mtsig->next ; + prev = mtsig->prev ; + + if (prev == (void*)mq) /* marker for head of list */ + { + dassert(mq->kick.signal.head == mtsig) ; + mq->kick.signal.head = next ; + } + else + { + dassert((prev != NULL) && (prev->next == mtsig)) ; + prev->next = next ; + } ; + + if (next != NULL) + next->prev = prev ; + + mtsig->next = NULL ; + mtsig->prev = NULL ; /* essential to show signal kicked */ + --mq->waiters ; /* one fewer waiter */ + + dassert( ((mq->kick.signal.head == NULL) && (mq->waiters == 0)) || + ((mq->kick.signal.head != NULL) && (mq->waiters != 0)) ) ; +} ; + +/*============================================================================== + * Initialise Message Queue handling + * + * Must be called before any qpt_threads are started. + * + * TODO: how do we shut down message queue handling ? + */ +void +mqueue_initialise(int qpthreads) +{ + if (qpthreads) + p_mb_mutex = qpt_mutex_init(&mb_mutex, qpt_mutex_quagga) ; +} ; diff --git a/lib/mqueue.h b/lib/mqueue.h new file mode 100644 index 00000000..aff3e154 --- /dev/null +++ b/lib/mqueue.h @@ -0,0 +1,140 @@ +/* Message Queue data structure -- header + * Copyright (C) 2009 Chris Hall (GMCH), Highwayman + * + * This file is part of GNU Zebra. + * + * GNU Zebra is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * GNU Zebra is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Zebra; see the file COPYING. If not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef _ZEBRA_MQUEUE_H +#define _ZEBRA_MQUEUE_H + +#include "qpthreads.h" + +/*============================================================================== + */ + +typedef struct mqueue_block* mqueue_block ; + +typedef uint32_t mqueue_flags_t ; +typedef uint32_t mqueue_context_t ; +typedef union +{ + void* p ; + uintptr_t u ; + intptr_t i ; +} mqueue_arg_t ; + +typedef void mqueue_action(mqueue_block) ; + +struct mqueue_block +{ + mqueue_block next ; /* single linked list -- see ... */ + + mqueue_action* action ; /* for message dispatch */ + + mqueue_flags_t flags ; /* for message handler */ + + mqueue_context_t context ; /* for message revoke */ + + mqueue_arg_t arg0 ; /* may be pointer to more data or integer */ + mqueue_arg_t arg1 ; /* may be pointer to more data or integer */ +} ; + +typedef struct mqueue_thread_signal* mqueue_thread_signal ; + +struct mqueue_thread_signal { + mqueue_thread_signal next ; /* NULL => last on list */ + mqueue_thread_signal prev ; /* NULL => NOT on list -- vital ! */ + + qpt_thread_t qpthread ; /* qpthread to kick */ + int signum ; /* signal to kick with */ +} ; + +enum mqueue_queue_type { + mqt_cond_unicast, /* use qpt_cond_signal to kick the queue */ + mqt_cond_broadcast, /* use qpt_cond_broadcast to kick the queue */ + mqt_signal_unicast, /* use single qpt_signal to kick the queue */ + mqt_signal_broadcast, /* use multiple qpt_signal to kick the queue */ +}; + +#ifndef MQUEUE_DEFAULT_INTERVAL +# define MQUEUE_DEFAULT_INTERVAL QTIME(5) +#endif + +struct mqueue_queue_cond { + qpt_cond_t wait_here ; + qtime_t timeout ; + qtime_t interval ; +} ; + +struct mqueue_queue_signal { + mqueue_thread_signal head ; /* NULL => list is empty */ + mqueue_thread_signal tail ; +}; + +typedef struct mqueue_queue* mqueue_queue ; + +struct mqueue_queue +{ + qpt_mutex_t mutex ; + + mqueue_block head ; /* NULL => list is empty */ + mqueue_block tail_priority ; /* last priority message (if any & not empty) */ + mqueue_block tail ; /* last message (if not empty) */ + + enum mqueue_queue_type type ; + + unsigned waiters ; + + union { + struct mqueue_queue_cond cond ; + struct mqueue_queue_signal signal ; + } kick ; +} ; + +/*============================================================================== + * Functions + */ + +void +mqueue_initialise(int qpthreads) ; + +mqueue_queue +mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) ; + +void +mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) ; + +mqueue_thread_signal +mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread, + int signum) ; +mqueue_block +mqueue_block_new(void) ; + +void +mqueue_block_free(mqueue_block mb) ; + +void +mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority) ; + +mqueue_block +mqueue_dequeue(mqueue_queue mq, int wait, mqueue_thread_signal mtsig) ; + +int +mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) ; + +#endif /* _ZEBRA_MQUEUE_H */ |