diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/mqueue.c | 184 | ||||
-rw-r--r-- | lib/mqueue.h | 181 | ||||
-rw-r--r-- | lib/qpthreads.c | 229 | ||||
-rw-r--r-- | lib/qpthreads.h | 166 |
4 files changed, 569 insertions, 191 deletions
diff --git a/lib/mqueue.c b/lib/mqueue.c index 5fe892c2..7b93e4b0 100644 --- a/lib/mqueue.c +++ b/lib/mqueue.c @@ -31,6 +31,9 @@ * A message queue carries messages from one or more qpthreads to one or more * other qpthreads. * + * If !qpthreads_enabled, then a message queue hold messages for the program + * to consume later. There are never any waiters. Timeouts are ignored. + * * A message queue has one ordinary priority queue and one high priority * queue. * @@ -71,12 +74,13 @@ * * context -- identifies the context of the message (see revoke) * * * action -- void action(mqueue_block) message dispatch - * * arg_0 -- *void/uintptr_t/intptr_t ) standard arguments - * * arg_1 -- *void/uintptr_t/intptr_t ) + * * arg0 -- *void/uintptr_t/intptr_t ) standard arguments + * * arg1 -- *void/uintptr_t/intptr_t ) * - * (see struct mqueue_block). + * There are set/get functions for action/arg0/arg1 -- users should not poke + * around inside the structure. * - * To send a message, first allocate a message block (see mqueue_block_new), + * To send a message, first allocate a message block (see mqb_init_new), * then fill in the arguments and enqueue it. * * @@ -92,6 +96,9 @@ * * For mqt_cond_xxx type queues, sets the default timeout interval and the * initial timeout time to now + that interval. + * + * NB: once any message queue has been enabled, it is TOO LATE to enable + * qpthreads. */ mqueue_queue @@ -102,7 +109,8 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) else memset(mq, 0, sizeof(struct mqueue_queue)) ; - qpt_mutex_init(&mq->mutex, qpt_mutex_quagga) ; + if (qpt_freeze_qpthreads_enabled()) + qpt_mutex_init_new(&mq->mutex, qpt_mutex_quagga) ; /* head, tail and tail_priority set NULL already */ /* waiters set zero already */ @@ -112,7 +120,8 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) { case mqt_cond_unicast: case mqt_cond_broadcast: - qpt_cond_init(&mq->kick.cond.wait_here, qpt_cond_quagga) ; + if (qpthreads_enabled) + qpt_cond_init_new(&mq->kick.cond.wait_here, qpt_cond_quagga) ; if (MQUEUE_DEFAULT_INTERVAL != 0) { mq->kick.cond.interval = MQUEUE_DEFAULT_INTERVAL ; @@ -136,6 +145,9 @@ mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) /* Set new timeout interval (or unset by setting <= 0) * * Sets the next timeout to be the time now + new interval (or never). + * + * This is a waste of time if !qpthreads_enabled, but does no harm. The + * timeout is ignored. */ void mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) @@ -146,7 +158,7 @@ mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) (mq->type == mqt_cond_broadcast) ) ; mq->kick.cond.interval = interval ; - mq->kick.cond.timeout = (interval > 0) ? qt_get_monotonic() + interval + mq->kick.cond.timeout = (interval > 0) ? qt_add_monotonic(interval) : 0 ; qpt_mutex_unlock(&mq->mutex) ; } ; @@ -160,55 +172,57 @@ mqueue_set_timeout_interval(mqueue_queue mq, qtime_t interval) * mqueue_initialise MUST be called before the first message block is allocated. */ -static pthread_mutex_t* p_mb_mutex ; /* NULL => no mutex (yet) */ -static pthread_mutex_t mb_mutex ; +static pthread_mutex_t mqb_mutex ; -#define MB_LOT_SIZE 256 +#define MQB_LOT_SIZE 256 -static mqueue_block mb_lot_list = NULL ; -static mqueue_block mb_free_list = NULL ; +static mqueue_block mqb_lot_list = NULL ; +static mqueue_block mqb_free_list = NULL ; static mqueue_block mqueue_block_new_lot(void) ; -/* Get an empty message block +/* Initialise message block (allocate if required) and set action and context. */ mqueue_block -mqueue_block_new(void) +mqb_init_new(mqueue_block mqb, mqueue_action action, mqb_context_t context) { - mqueue_block mb ; + if (mqb == NULL) + { + qpt_mutex_lock(&mqb_mutex) ; - qpt_mutex_lock(&mb_mutex) ; + mqb = mqb_free_list ; + if (mqb == NULL) + mqb = mqueue_block_new_lot() ; - mb = mb_free_list ; - if (mb == NULL) - mb = mqueue_block_new_lot() ; + mqb_free_list = mqb->next ; - mb_free_list = mb->next ; + qpt_mutex_unlock(&mqb_mutex) ; + } ; - qpt_mutex_unlock(&mb_mutex) ; + memset(mqb, 0, sizeof(struct mqueue_block)) ; - memset(mb, 0, sizeof(struct mqueue_block)) ; + mqb->action = action ; + mqb->context = context ; - return mb ; + return mqb ; } ; /* Free message block when done with it. */ void -mqueue_block_free(mqueue_block mb) +mqb_free(mqueue_block mqb) { - qpt_mutex_lock(&mb_mutex) ; + qpt_mutex_lock(&mqb_mutex) ; - mb->next = mb_free_list ; - mb_free_list = mb ; + mqb->next = mqb_free_list ; + mqb_free_list = mqb ; - qpt_mutex_unlock(&mb_mutex) ; + qpt_mutex_unlock(&mqb_mutex) ; } ; /* Make a new lot of empty message_block structures. * - * NB: caller MUST hold the mb_mutex. - * + * NB: caller MUST hold the mqb_mutex. */ static mqueue_block mqueue_block_new_lot(void) @@ -216,12 +230,12 @@ mqueue_block_new_lot(void) mqueue_block first, last, this ; mqueue_block new = XCALLOC(MTYPE_MQUEUE_BLOCKS, - SIZE(struct mqueue_block, MB_LOT_SIZE)) ; + SIZE(struct mqueue_block, MQB_LOT_SIZE)) ; first = &new[1] ; - last = &new[MB_LOT_SIZE - 1] ; + last = &new[MQB_LOT_SIZE - 1] ; - new->next = mb_lot_list ; /* add to list of lots */ - mb_lot_list = new ; + new->next = mqb_lot_list ; /* add to list of lots */ + mqb_lot_list = new ; /* String all the new message_blocks together. */ this = last ; @@ -232,17 +246,17 @@ mqueue_block_new_lot(void) } ; assert(this == first) ; - last->next = mb_free_list ; /* point last at old free list */ - mb_free_list = first ; /* new blocks at head of free list */ + last->next = mqb_free_list ; /* point last at old free list */ + mqb_free_list = first ; /* new blocks at head of free list */ - return mb_free_list ; + return mqb_free_list ; } ; /*============================================================================== * Enqueue and dequeue messages. */ -static void mqueue_kick_signal(mqueue_queue mq, int n) ; +static void mqueue_kick_signal(mqueue_queue mq, unsigned n) ; static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ; /* Enqueue message. @@ -264,45 +278,50 @@ static void mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) ; * * for a signal type message queue, each message that arrives will kick one * waiter. + * + * NB: this works perfectly well if !qpthreads enabled. Of course, there can + * never be any waiters... so no kicking is ever done. */ void -mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority) +mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority) { qpt_mutex_lock(&mq->mutex) ; if (mq->head == NULL) { - mb->next = NULL ; - mq->head = mb ; - mq->tail_priority = priority ? mb : NULL ; - mq->tail = mb ; + mqb->next = NULL ; + mq->head = mqb ; + mq->tail_priority = priority ? mqb : NULL ; + mq->tail = mqb ; } else if (priority) { mqueue_block after = mq->tail_priority ; if (after == NULL) { - mb->next = mq->head ; - mq->head = mb ; + mqb->next = mq->head ; + mq->head = mqb ; } else { - mb->next = after->next ; - after->next = mb ; + mqb->next = after->next ; + after->next = mqb ; } - mq->tail_priority = mb ; + mq->tail_priority = mqb ; } else { dassert(mq->tail != NULL) ; - mb->next = NULL ; - mq->tail->next = mb ; - mq->tail = mb ; + mqb->next = NULL ; + mq->tail->next = mqb ; + mq->tail = mqb ; } ; if (mq->waiters != 0) { + dassert(qpthreads_enabled) ; /* waiters == 0 if !qpthreads_enabled */ + switch (mq->type) { case mqt_cond_unicast: @@ -335,8 +354,8 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority) /* Dequeue message. * - * If the queue is empty and wait != 0, will wait for a message. In which - * case for: + * If the queue is empty and wait != 0 (and qpthreads_enabled), will wait for a + * message. In which case for: * * * mqt_cond_xxxx type message queues, will wait on the condition variable, * and may timeout. @@ -350,13 +369,19 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority) * * mqt_signal_xxxx type message queues, will register the given signal * (mtsig argument MUST be provided), and return immediately. * + * NB: if !qpthreads_enabled, will not wait on the queue. No how. + * + * Note this means that waiters == 0 all the time if !qpthreads_enabled ! + * + * NB: the argument is ignored if !wait or !qpthreads_enabled, so may be NULL. + * * Returns a message block if one is available. (And not otherwise.) */ mqueue_block mqueue_dequeue(mqueue_queue mq, int wait, void* arg) { - mqueue_block mb ; + mqueue_block mqb ; mqueue_thread_signal last ; mqueue_thread_signal mtsig ; @@ -366,12 +391,13 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) while (1) { - mb = mq->head ; - if (mb != NULL) + mqb = mq->head ; + if (mqb != NULL) break ; /* Easy if queue not empty */ - if (!wait) - goto done ; /* Easy if not waiting ! mb == NULL */ + if (!wait || !qpthreads_enabled) + goto done ; /* Easy if not waiting ! mqb == NULL */ + /* Short circuit if !qpthreads_enabled */ ++mq->waiters ; /* Another waiter */ @@ -401,7 +427,7 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) mq->kick.cond.timeout = timeout_time ; } ; - goto done ; /* immediate return. mb == NULL */ + goto done ; /* immediate return. mqb == NULL */ } ; } ; break ; @@ -419,13 +445,13 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) } else { - last->next = mtsig ; + last->next = mtsig ; mtsig->prev = last ; } mtsig->next = NULL ; mq->kick.signal.tail = mtsig ; - goto done ; /* BUT do not wait ! mb == NULL */ + goto done ; /* BUT do not wait ! mqb == NULL */ default: zabort("Invalid mqueue queue type") ; @@ -434,25 +460,30 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) /* Have something to pull off the queue */ - mq->head = mb->next ; - if (mb == mq->tail_priority) + mq->head = mqb->next ; + if (mqb == mq->tail_priority) mq->tail_priority = NULL ; done: qpt_mutex_unlock(&mq->mutex) ; - return mb ; + return mqb ; } ; -/* No longer waiting for a signal. +/* No longer waiting for a signal -- does nothing if !qpthreads_enabled. + * + * Returns true <=> signal has been kicked * - * Returns true <=> signal has been kicked. + * (Signal will never be kicked if !qpthreads_enabled.) */ int mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) { int kicked ; + if (!qpthreads_enabled) + return 0 ; + qpt_mutex_lock(&mq->mutex) ; dassert( (mq->type == mqt_signal_unicast) || @@ -482,6 +513,9 @@ mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) /* Initialise a message queue signal structure (struct mqueue_thread_signal). * Allocate one if required. * + * If !pthreads_enabled, then this structure is entirely redundant, but there + * is no harm in creating it -- but the signal will never be used. + * * Returns address of the structure. */ mqueue_thread_signal @@ -506,16 +540,19 @@ mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread, * * Removes the threads from the list and reduces the waiters count. * + * NB: must be qpthreads_enabled with at least 'n' waiters. + * * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that * the thread can tell that its signal has been kicked. * * NB: *** MUST own the mqueue_queue mutex. *** */ static void -mqueue_kick_signal(mqueue_queue mq, int n) +mqueue_kick_signal(mqueue_queue mq, unsigned n) { mqueue_thread_signal mtsig ; + dassert( (qpthreads_enabled) && (mq->waiters >= n) ) ; while (n--) { mqueue_dequeue_signal(mq, mtsig = mq->kick.signal.head) ; @@ -525,6 +562,9 @@ mqueue_kick_signal(mqueue_queue mq, int n) /* Remove given signal from given message queue. * + * NB: sets the prev entry in the mqueue_thread_signal block to NULL, so that + * the thread can tell that its signal has been kicked. + * * NB: *** MUST own the mqueue_queue mutex. *** */ static void @@ -533,8 +573,6 @@ mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) mqueue_thread_signal next ; mqueue_thread_signal prev ; - dassert((mq->kick.signal.head != NULL) && (mq->waiters != 0)) ; - next = mtsig->next ; prev = mtsig->prev ; @@ -565,11 +603,13 @@ mqueue_dequeue_signal(mqueue_queue mq, mqueue_thread_signal mtsig) * * Must be called before any qpt_threads are started. * + * Freezes qpthreads_enabled. + * * TODO: how do we shut down message queue handling ? */ void -mqueue_initialise(int qpthreads) +mqueue_initialise(void) { - if (qpthreads) - p_mb_mutex = qpt_mutex_init(&mb_mutex, qpt_mutex_quagga) ; + if (qpthreads_enabled_freeze) + qpt_mutex_init_new(&mqb_mutex, qpt_mutex_quagga) ; } ; diff --git a/lib/mqueue.h b/lib/mqueue.h index 5c10911b..6fb519d4 100644 --- a/lib/mqueue.h +++ b/lib/mqueue.h @@ -25,34 +25,43 @@ #include "qpthreads.h" #include "qtime.h" +#ifndef Inline +#define Inline static inline +#endif + /*============================================================================== */ typedef struct mqueue_block* mqueue_block ; -typedef uint32_t mqueue_flags_t ; -typedef uint32_t mqueue_context_t ; +typedef uint32_t mqb_flags_t ; +typedef uint32_t mqb_context_t ; + +typedef void* mqb_ptr_t ; +typedef intptr_t mqb_int_t ; +typedef uintptr_t mqb_uint_t ; + typedef union { - void* p ; - uintptr_t u ; - intptr_t i ; -} mqueue_arg_t ; + mqb_ptr_t p ; + mqb_int_t i ; + mqb_uint_t u ; +} mqb_arg_t ; -typedef void mqueue_action(mqueue_block) ; +typedef void mqueue_action(mqueue_block mqb) ; struct mqueue_block { - mqueue_block next ; /* single linked list -- see ... */ + mqueue_block next ; /* single linked list */ mqueue_action* action ; /* for message dispatch */ - mqueue_flags_t flags ; /* for message handler */ + mqb_flags_t flags ; /* for message handler */ - mqueue_context_t context ; /* for message revoke */ + mqb_context_t context ; /* for message revoke */ - mqueue_arg_t arg0 ; /* may be pointer to more data or integer */ - mqueue_arg_t arg1 ; /* may be pointer to more data or integer */ + mqb_arg_t arg0 ; /* may be pointer or integer */ + mqb_arg_t arg1 ; /* may be pointer or integer */ } ; typedef struct mqueue_thread_signal* mqueue_thread_signal ; @@ -91,11 +100,11 @@ typedef struct mqueue_queue* mqueue_queue ; struct mqueue_queue { - qpt_mutex_t mutex ; + qpt_mutex_t mutex ; - mqueue_block head ; /* NULL => list is empty */ - mqueue_block tail_priority ; /* last priority message (if any & not empty) */ - mqueue_block tail ; /* last message (if not empty) */ + mqueue_block head ; /* NULL => list is empty */ + mqueue_block tail_priority ; /* last priority message (if any & not empty) */ + mqueue_block tail ; /* last message (if not empty) */ enum mqueue_queue_type type ; @@ -112,7 +121,7 @@ struct mqueue_queue */ void -mqueue_initialise(int qpthreads) ; +mqueue_initialise(void) ; mqueue_queue mqueue_init_new(mqueue_queue mq, enum mqueue_queue_type type) ; @@ -124,13 +133,15 @@ mqueue_thread_signal mqueue_thread_signal_init(mqueue_thread_signal mqt, qpt_thread_t thread, int signum) ; mqueue_block -mqueue_block_new(void) ; +mqb_init_new(mqueue_block mqb, mqueue_action action, mqb_context_t context) ; + +#define mqb_new(action, context) mqb_init_new(NULL, action, context) void -mqueue_block_free(mqueue_block mb) ; +mqb_free(mqueue_block mqb) ; void -mqueue_enqueue(mqueue_queue mq, mqueue_block mb, int priority) ; +mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority) ; mqueue_block mqueue_dequeue(mqueue_queue mq, int wait, void* arg) ; @@ -138,4 +149,134 @@ mqueue_dequeue(mqueue_queue mq, int wait, void* arg) ; int mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) ; +/*============================================================================== + * Access functions for mqueue_block fields -- mqb_set_xxx/mqb_get_xxx + * + * Users should not poke around inside the mqueue_block structure. + */ + +Inline void mqb_set_action(mqueue_block mqb, mqueue_action action) ; +Inline void mqb_set_context(mqueue_block mqb, mqb_context_t context) ; + +Inline void mqb_set_arg0_p(mqueue_block mqb, mqb_ptr_t p) ; +Inline void mqb_set_arg0_i(mqueue_block mqb, mqb_int_t i) ; +Inline void mqb_set_arg0_u(mqueue_block mqb, mqb_uint_t u) ; +Inline void mqb_set_arg1_p(mqueue_block mqb, mqb_ptr_t p) ; +Inline void mqb_set_arg1_i(mqueue_block mqb, mqb_int_t i) ; +Inline void mqb_set_arg1_u(mqueue_block mqb, mqb_uint_t u) ; + +Inline void mqb_dispatch(mqueue_block mqb) ; +Inline mqb_context_t mqb_qet_context(mqueue_block mqb) ; + +Inline mqb_ptr_t mqb_get_arg0_p(mqueue_block mqb) ; +Inline mqb_int_t mqb_get_arg0_i(mqueue_block mqb) ; +Inline mqb_uint_t mqb_get_arg0_u(mqueue_block mqb) ; +Inline mqb_ptr_t mqb_get_arg1_p(mqueue_block mqb) ; +Inline mqb_int_t mqb_get_arg1_i(mqueue_block mqb) ; +Inline mqb_uint_t mqb_get_arg1_u(mqueue_block mqb) ; + +/*============================================================================== + * The Inline functions. + */ + +/* Set operations. */ + +Inline void +mqb_set_action(mqueue_block mqb, mqueue_action action) +{ + mqb->action = action ; +} ; + +Inline void +mqb_set_context(mqueue_block mqb, mqb_context_t context) +{ + mqb->context = context ; +} ; + +Inline void +mqb_set_arg0_p(mqueue_block mqb, mqb_ptr_t p) +{ + mqb->arg0.p = p ; +} ; + +Inline void +mqb_set_arg0_i(mqueue_block mqb, mqb_int_t i) +{ + mqb->arg0.i = i ; +} ; + +Inline void +mqb_set_arg0_u(mqueue_block mqb, mqb_uint_t u) +{ + mqb->arg0.u = u ; +} ; + +Inline void +mqb_set_arg1_p(mqueue_block mqb, mqb_ptr_t p) +{ + mqb->arg1.p = p ; +} ; + +Inline void +mqb_set_arg1_i(mqueue_block mqb, mqb_int_t i) +{ + mqb->arg1.i = i ; +} ; + +Inline void +mqb_set_arg1_u(mqueue_block mqb, mqb_uint_t u) +{ + mqb->arg1.u = u ; +} ; + +/* Get operations */ + +Inline void +mqb_dispatch(mqueue_block mqb) +{ + mqb->action(mqb) ; +} ; + +Inline mqb_context_t +mqb_qet_context(mqueue_block mqb) +{ + return mqb->context ; +} ; + +Inline mqb_ptr_t +mqb_get_arg0_p(mqueue_block mqb) +{ + return mqb->arg0.p ; +} ; + +Inline mqb_int_t +mqb_get_arg0_i(mqueue_block mqb) +{ + return mqb->arg0.i ; +} ; + +Inline mqb_uint_t +mqb_get_arg0_u(mqueue_block mqb) +{ + return mqb->arg0.u ; +} ; + +Inline mqb_ptr_t +mqb_get_arg1_p(mqueue_block mqb) +{ + return mqb->arg1.p ; +} ; + +Inline mqb_int_t +mqb_get_arg1_i(mqueue_block mqb) +{ + return mqb->arg1.i ; +} ; + +Inline mqb_uint_t +mqb_get_arg1_u(mqueue_block mqb) +{ + return mqb->arg1.u ; +} ; + #endif /* _ZEBRA_MQUEUE_H */ diff --git a/lib/qpthreads.c b/lib/qpthreads.c index 5dd0cfc4..e7a8da2f 100644 --- a/lib/qpthreads.c +++ b/lib/qpthreads.c @@ -50,6 +50,21 @@ * * the ability to add any work-arounds which may be required if poorly * conforming pthreads implementations are encountered * + * Continued Working Without Pthreads + * ================================== + * + * A big Global Switch -- qpthreads_enabled -- is used to control whether the + * system is pthreaded or not. + * + * If this is never set, then the system runs without pthreads, and all the + * mutex and condition variable functions are NOPs. This allows, for example, + * mutex operations to be placed where they are needed for thread-safety, + * without affecting the code when running without pthreads. + * + * Before the first thread is created and before any mutexes or condition + * variables are initialised, the qpthreads_enabled MUST be set. And it MUST + * not be changed again ! + * * Pthread Requirements * ==================== * @@ -195,6 +210,79 @@ */ /*============================================================================== + * The Global Switch + * + * The state of the switch is: unset -- implicitly not enabled + * set_frozen -- implicitly not enabled & frozen + * set_disabled -- explicitly not enabled + * set_enabled -- explicitly set enabled + * + * "set_frozen" means that "qpthreads_freeze_enabled_state()" has been called, + * and the state was unset at the time. This means that some initialisation + * has been done on the basis of !qpthreads_enabled, and it is TOO LATE to + * enable qpthreads afterwards. + */ + +enum qpthreads_enabled_state +{ + qpt_state_unset = 0, + qpt_state_set_frozen = 1, + qpt_state_set_disabled = 2, + qpt_state_set_enabled = 3, +} ; + +static enum qpthreads_enabled_state qpthreads_enabled_state = qpt_state_unset ; + +int qpthreads_enabled_flag = 0 ; + +/* Function to set qpthreads_enabled, one way or the other. + * + * NB: can repeatedly set to the same state, but not change state once set. + */ +void +qpt_set_qpthreads_enabled(int how) +{ + + switch (qpthreads_enabled_state) + { + case qpt_state_unset: + break ; + case qpt_state_set_frozen: + if (how != 0) + zabort("Too late to enable qpthreads") ; + break ; + case qpt_state_set_disabled: + if (how != 0) + zabort("qpthreads_enabled is already set: cannot set enabled") ; + break ; + case qpt_state_set_enabled: + if (how == 0) + zabort("qpthreads_enabled is already set: cannot set disabled") ; + break ; + default: + break ; + } + + qpthreads_enabled_flag = (how != 0) ; + qpthreads_enabled_state = (how != 0) ? qpt_state_set_enabled + : qpt_state_set_disabled ; +} ; + +/* Get state of qpthreads_enabled, and freeze if not yet explictly set. + * + * Where some initialisation depends on the state of qpthreads_enabled(), this + * returns the state and freezes it if it is implicitly not enabled. + */ +extern int +qpt_freeze_qpthreads_enabled(void) +{ + if (qpthreads_enabled_state == qpt_state_unset) + qpthreads_enabled_state = qpt_state_set_frozen ; + + return qpthreads_enabled_flag ; +} ; + +/*============================================================================== * Thread creation and attributes. * * Threads may be created with a given set of attributes if required. @@ -234,15 +322,18 @@ * The scope, policy and priority arguments are use only if the corresponding * option is specified. * + * NB: FATAL error to attempt this is !qptthreads_enabled. + * * Returns the address of the qpt_thread_attr_t structure. */ qpt_thread_attr_t* qpt_thread_attr_init(qpt_thread_attr_t* attr, enum qpt_attr_options opts, - int scope, int policy, int priority) + int scope, int policy, int priority) { int err ; assert((opts & ~qpt_attr_known) == 0) ; + passert(qpthreads_enabled) ; /* Initialise thread attributes structure (allocating if required.) */ if (attr == NULL) @@ -309,6 +400,8 @@ qpt_thread_attr_init(qpt_thread_attr_t* attr, enum qpt_attr_options opts, * If no attributes are given (attr == NULL) the thread is created with system * default attributes -- *except* that it is created joinable. * + * NB: FATAL error to attempt this is !qptthreads_enabled. + * * Returns the qpt_thread_t "thread id". */ qpt_thread_t @@ -319,6 +412,8 @@ qpt_thread_create(void* (*start)(void*), void* arg, qpt_thread_attr_t* attr) int default_attr ; int err ; + passert(qpthreads_enabled) ; + default_attr = (attr == NULL) ; if (default_attr) attr = qpt_thread_attr_init(&thread_attr, qpt_attr_joinable, 0, 0, 0) ; @@ -341,7 +436,10 @@ qpt_thread_create(void* (*start)(void*), void* arg, qpt_thread_attr_t* attr) * Mutex initialise and destroy. */ -/* Initialise Mutex (allocating if required). +/* Initialise Mutex (allocating if required) + * + * Does nothing if !qpthreads_enabled -- but freezes the state (attempting to + * later enable qpthreads will be a FATAL error). * * Options: * @@ -353,14 +451,19 @@ qpt_thread_create(void* (*start)(void*), void* arg, qpt_thread_attr_t* attr) * * Of these _recursive is the most likely alternative to _quagga... BUT do * remember that such mutexes DO NOT play well with condition variables. + * + * Returns the mutex -- or original mx if !qpthreads_enabled. */ -qpt_mutex_t* -qpt_mutex_init(qpt_mutex_t* mx, enum qpt_mutex_options opts) +qpt_mutex +qpt_mutex_init_new(qpt_mutex mx, enum qpt_mutex_options opts) { pthread_mutexattr_t mutex_attr ; int type ; int err ; + if (!qpthreads_enabled_freeze) + return mx ; + if (mx == NULL) mx = XMALLOC(MTYPE_QPT_MUTEX, sizeof(qpt_mutex_t)) ; @@ -390,11 +493,9 @@ qpt_mutex_init(qpt_mutex_t* mx, enum qpt_mutex_options opts) zabort("Invalid qpt_mutex option") ; } ; -#ifndef PTHREAD_MUTEXATTR_SETTYPE_MISSING err = pthread_mutexattr_settype(&mutex_attr, type); if (err != 0) zabort_err("pthread_mutexattr_settype failed", err) ; -#endif /* Now we're ready to initialize the mutex itself */ err = pthread_mutex_init(mx, &mutex_attr) ; @@ -411,22 +512,30 @@ qpt_mutex_init(qpt_mutex_t* mx, enum qpt_mutex_options opts) } ; /* Destroy given mutex, and (if required) free it. + * -- or do nothing if !qpthreads_enabled. * - * Returns NULL. -*/ -qpt_mutex_t* -qpt_mutex_destroy(qpt_mutex_t* mx, int free_mutex) + * Returns NULL if freed the mutex, otherwise the address of same. + * + * NB: if !qpthreads_enabled qpt_mutex_init_new() will not have allocated + * anything, so there can be nothing to release -- so does nothing, but + * returns the original mutex address (if any). + */ +qpt_mutex +qpt_mutex_destroy(qpt_mutex mx, int free_mutex) { int err ; - err = pthread_mutex_destroy(mx) ; - if (err != 0) - zabort_err("pthread_mutex_destroy failed", err) ; + if (qpthreads_enabled) + { + err = pthread_mutex_destroy(mx) ; + if (err != 0) + zabort_err("pthread_mutex_destroy failed", err) ; - if (free_mutex) - XFREE(MTYPE_QPT_MUTEX, mx) ; + if (free_mutex) + XFREE(MTYPE_QPT_MUTEX, mx) ; /* sets mx == NULL */ + } ; - return NULL ; + return mx ; } ; /*============================================================================== @@ -435,18 +544,28 @@ qpt_mutex_destroy(qpt_mutex_t* mx, int free_mutex) /* Initialise Condition Variable (allocating if required). * + * Does nothing if !qpthreads_enabled -- but freezes the state (attempting to + * later enable qpthreads will be a FATAL error). + * * Options: * * qpt_cond_quagga -- use Quagga's default clock * qpt_cond_realtime -- force CLOCK_REALTIME * qpt_cond_monotonic -- force CLOCK_MONOTONIC (if available) + * + * NB: FATAL error to attempt this is !qptthreads_enabled. + * + * Returns the condition variable -- or original cv id !qpthreads_enabled. */ -qpt_cond_t* -qpt_cond_init(qpt_cond_t* cv, enum qpt_cond_options opts) +qpt_cond +qpt_cond_init_new(qpt_cond cv, enum qpt_cond_options opts) { pthread_condattr_t cond_attr ; int err ; + if (!qpthreads_enabled_freeze) + return cv ; + if (cv == NULL) cv = XMALLOC(MTYPE_QPT_COND, sizeof(qpt_cond_t)) ; @@ -481,26 +600,35 @@ qpt_cond_init(qpt_cond_t* cv, enum qpt_cond_options opts) return cv ; } ; -/* Destroy given mutex, and (if required) free it. +/* Destroy given condition variable, and (if required) free it + * -- or do nothing if !qpthreads_enabled. * - * Returns NULL. -*/ -qpt_cond_t* -qpt_cond_destroy(qpt_cond_t* cv, int free_cond) + * NB: if !qpthreads_enabled qpt_cond_init_new() will not have allocated + * anything, so there can be nothing to release -- so does nothing, but + * returns the original condition variable address (if any). + * + * Returns NULL if freed the condition variable, otherwise the address of same. + */ +qpt_cond +qpt_cond_destroy(qpt_cond cv, int free_cond) { int err ; - err = pthread_cond_destroy(cv) ; - if (err != 0) - zabort_err("pthread_cond_destroy failed", err) ; + if (qpthreads_enabled) + { + err = pthread_cond_destroy(cv) ; + if (err != 0) + zabort_err("pthread_cond_destroy failed", err) ; - if (free_cond) - XFREE(MTYPE_QPT_COND, cv) ; + if (free_cond) + XFREE(MTYPE_QPT_COND, cv) ; /* sets cv == NULL */ + } ; - return NULL ; + return cv ; } ; -/* Wait for given condition variable or time-out. +/* Wait for given condition variable or time-out + * -- or return immediate success if !qpthreads_enabled. * * Returns: wait succeeded (1 => success, 0 => timed-out). * @@ -510,41 +638,54 @@ qpt_cond_destroy(qpt_cond_t* cv, int free_cond) */ int -qpt_cond_timedwait(qpt_cond_t* cv, qpt_mutex_t* mx, qtime_mono_t timeout_time) +qpt_cond_timedwait(qpt_cond cv, qpt_mutex mx, qtime_mono_t timeout_time) { struct timespec ts ; + int err ; - if (QPT_COND_CLOCK_ID != CLOCK_MONOTONIC) + if (qpthreads_enabled) { - timeout_time = qt_clock_gettime(QPT_COND_CLOCK_ID) + if (QPT_COND_CLOCK_ID != CLOCK_MONOTONIC) + { + timeout_time = qt_clock_gettime(QPT_COND_CLOCK_ID) + (timeout_time - qt_get_monotonic()) ; - } ; + } ; - int err = pthread_cond_timedwait(cv, mx, qtime2timespec(&ts, timeout_time)) ; - if (err == 0) - return 1 ; /* got condition */ - if (err == ETIMEDOUT) - return 0 ; /* got time-out */ + err = pthread_cond_timedwait(cv, mx, qtime2timespec(&ts, timeout_time)) ; + if (err == 0) + return 1 ; /* got condition */ + if (err == ETIMEDOUT) + return 0 ; /* got time-out */ - zabort_err("pthread_cond_timedwait failed", err) ; - /* crunch */ + zabort_err("pthread_cond_timedwait failed", err) ; + } + else + return 0 ; } ; /*============================================================================== * Signal Handling. */ -/* Set thread signal mask +/* Set thread signal mask -- requires qpthreads_enabled. * * Thin wrapper around pthread_sigmask. * * zaborts if gets any error. + * + * NB: it is a FATAL error to do this if !qpthreads_enabled. + * + * This is mostly because wish to avoid all pthreads_xxx calls when not + * using pthreads. There is no reason not to use this in a single threaded + * program. */ void qpt_thread_sigmask(int how, const sigset_t* set, sigset_t* oset) { int err ; + passert(qpthreads_enabled) ; + if (oset != NULL) sigemptyset(oset) ; /* to make absolutely sure */ @@ -553,7 +694,7 @@ qpt_thread_sigmask(int how, const sigset_t* set, sigset_t* oset) zabort_err("pthread_sigmask failed", err) ; } ; -/* Send given thread the given signal +/* Send given thread the given signal -- requires qpthreads_enabled (!) * * Thin wrapper around pthread_kill. * @@ -564,6 +705,8 @@ qpt_thread_signal(qpt_thread_t thread, int signum) { int err ; + passert(qpthreads_enabled) ; + err = pthread_kill(thread, signum) ; if (err != 0) zabort_err("pthread_kill failed", err) ; diff --git a/lib/qpthreads.h b/lib/qpthreads.h index 5a442cc7..49a7c1e3 100644 --- a/lib/qpthreads.h +++ b/lib/qpthreads.h @@ -35,6 +35,10 @@ #define Inline static inline #endif +#ifndef private +#define private extern +#endif + /*============================================================================== * Quagga Pthread Interface -- qpt_xxxx * @@ -57,6 +61,22 @@ #endif /*============================================================================== + * Global Switch -- this allows the library to be run WITHOUT pthreads ! + * + * Nearly every qpthreads function is a NOP if !qpthreads_enabled. + * + * Early in the morning a decision may be made to enable qpthreads -- that must + * be done before any threads are created (or will zabort) and before any + * mutexes and condition variables are initialised. + * + * Use: qpthreads_enabled -- to test for the enabled-ness + * qpthreads_enabled_freeze -- to test and freeze unset if not yet enabled + */ + +#define qpthreads_enabled ((const int)qpthreads_enabled_flag) +#define qpthreads_enabled_freeze qpt_freeze_qpthreads_enabled() + +/*============================================================================== * Data types */ @@ -66,8 +86,13 @@ typedef pthread_cond_t qpt_cond_t ; typedef pthread_attr_t qpt_thread_attr_t ; +typedef qpt_mutex_t* qpt_mutex ; +typedef qpt_cond_t* qpt_cond ; + /*============================================================================== * Thread Creation -- see qpthreads.c for further discussion. + * + * NB: it is a FATAL error to attempt these if !qpthreads_enabled. */ enum qpt_attr_options @@ -92,19 +117,30 @@ enum qpt_attr_options #define qpt_attr_known ( qpt_attr_detached | qpt_attr_sched_setting ) -extern qpt_thread_attr_t* +extern qpt_thread_attr_t* /* FATAL error if !qpthreads_enabled */ qpt_thread_attr_init(qpt_thread_attr_t* attr, enum qpt_attr_options opts, int scope, int policy, int priority) ; -extern qpt_thread_t +extern qpt_thread_t /* FATAL error if !qpthreads_enabled */ qpt_thread_create(void* (*start)(void*), void* arg, qpt_thread_attr_t* attr) ; /*============================================================================== - * Thread self knowledge. + * qpthreads_enabled support -- NOT FOR PUBLIC CONSUMPTION ! + */ +private int qpthreads_enabled_flag ; /* DO NOT WRITE TO THIS PLEASE */ + +private void +qpt_set_qpthreads_enabled(int how) ; /* qpthreads_enabled := how */ + +private int +qpt_freeze_qpthreads_enabled(void) ; /* get and freeze qpthreads_enabled */ + +/*============================================================================== + * Thread self knowledge -- returns 'NULL' if !qpthreads_enabled */ Inline qpt_thread_t qpt_thread_self(void) { - return pthread_self() ; + return qpthreads_enabled ? pthread_self() : (qpt_thread_t)NULL; } ; /*============================================================================== @@ -127,6 +163,10 @@ Inline qpt_thread_t qpt_thread_self(void) * If _DEFAULT is faster than _NORMAL, then QPT_MUTEX_TYPE_DEFAULT may be * used to override this choice. * + * NB: if NOT qpthreads_enabled, all mutex actions are EMPTY. This allows + * code to be made thread-safe for when pthreads is running, but to work + * perfectly well without pthreads. + * * NB: do not (currently) support pthread_mutex_timedlock(). */ @@ -149,23 +189,25 @@ enum qpt_mutex_options # define QPT_MUTEX_TYPE PTHREAD_MUTEX_ERRORCHECK #endif -extern qpt_mutex_t* -qpt_mutex_init(qpt_mutex_t* mx, enum qpt_mutex_options opts) ; +extern qpt_mutex /* freezes qpthreads_enabled */ +qpt_mutex_init_new(qpt_mutex mx, enum qpt_mutex_options opts) ; -extern qpt_mutex_t* -qpt_mutex_destroy(qpt_mutex_t* mx, int free_mutex) ; +#define qpt_mutex_init qpt_mutex_init_new + +extern qpt_mutex /* do nothing if !qpthreads_enabled */ +qpt_mutex_destroy(qpt_mutex mx, int free_mutex) ; #define qpt_mutex_destroy_keep(mx) qpt_mutex_destroy(mx, 0) #define qpt_mutex_destroy_free(mx) qpt_mutex_destroy(mx, 1) Inline void -qpt_mutex_lock(qpt_mutex_t* mx) ; /* do nothing if mx == NULL */ +qpt_mutex_lock(qpt_mutex mx) ; /* do nothing if !qpthreads_enabled */ Inline int -qpt_mutex_trylock(qpt_mutex_t* mx) ; /* do nothing if mx == NULL */ +qpt_mutex_trylock(qpt_mutex mx) ; /* always succeeds if !qpthreads_enabled */ Inline void -qpt_mutex_unlock(qpt_mutex_t* mx) ; /* do nothing if mx == NULL */ +qpt_mutex_unlock(qpt_mutex mx) ; /* do nothing if !qpthreads_enabled */ /*============================================================================== * Condition Variable handling @@ -188,6 +230,10 @@ qpt_mutex_unlock(qpt_mutex_t* mx) ; /* do nothing if mx == NULL */ * * NB: static initialisation of condition variables is not supported, to avoid * confusion between the standard default and Quagga's default. + + * NB: if NOT qpthreads_enabled, all condition actions are EMPTY. This allows + * code to be made thread-safe for when pthreads is running, but to work + * perfectly well without pthreads. */ #ifndef QPT_COND_CLOCK_ID @@ -205,41 +251,41 @@ enum qpt_cond_options qpt_cond_quagga = 0x0000, /* Quagga's default */ } ; -extern qpt_cond_t* -qpt_cond_init(qpt_cond_t* cv, enum qpt_cond_options opts) ; +extern qpt_cond /* freezes qpthreads_enabled */ +qpt_cond_init_new(qpt_cond cv, enum qpt_cond_options opts) ; -extern qpt_cond_t* -qpt_cond_destroy(qpt_cond_t* cv, int free_cond) ; +extern qpt_cond /* do nothing if !qpthreads_enabled */ +qpt_cond_destroy(qpt_cond cv, int free_cond) ; #define qpt_cond_destroy_keep(cv) qpt_cond_destroy(cv, 0) #define qpt_cond_destroy_free(cv) qpt_cond_destroy(cv, 1) -Inline void -qpt_cond_wait(qpt_cond_t* cv, qpt_mutex_t* mx) ; +Inline void /* do nothing if !qpthreads_enabled */ +qpt_cond_wait(qpt_cond cv, qpt_mutex mx) ; -extern int -qpt_cond_timedwait(qpt_cond_t* cv, qpt_mutex_t* mx, qtime_mono_t timeout_time) ; +extern int /* returns !qpthreads_enabled */ +qpt_cond_timedwait(qpt_cond cv, qpt_mutex mx, qtime_mono_t timeout_time) ; -Inline void -qpt_cond_signal(qpt_cond_t* cv) ; +Inline void /* do nothing if !qpthreads_enabled */ +qpt_cond_signal(qpt_cond cv) ; -Inline void -qpt_cond_broadcast(qpt_cond_t* cv) ; +Inline void /* do nothing if !qpthreads_enabled */ +qpt_cond_broadcast(qpt_cond cv) ; /*============================================================================== * Mutex inline functions */ -/* Lock given mutex. +/* Lock given mutex -- or do nothing if !qpthreads_enabled. * * Unless both NCHECK_QPTHREADS and NDEBUG are defined, checks that the * return value is valid -- zabort_errno if it isn't. */ Inline void -qpt_mutex_lock(qpt_mutex_t* mx) /* do nothing if mx == NULL */ +qpt_mutex_lock(qpt_mutex mx) { - if (mx != NULL) + if (qpthreads_enabled) { #if defined(NDEBUG) && defined(NDEBUG_QPTHREADS) pthread_mutex_lock(mx) ; @@ -251,7 +297,7 @@ qpt_mutex_lock(qpt_mutex_t* mx) /* do nothing if mx == NULL */ } ; } ; -/* Try to lock given mutex. +/* Try to lock given mutex -- every time a winner if !qpthreads_enabled. * * Returns: lock succeeded (1 => have locked, 0 => unable to lock). * @@ -259,9 +305,9 @@ qpt_mutex_lock(qpt_mutex_t* mx) /* do nothing if mx == NULL */ */ Inline int -qpt_mutex_trylock(qpt_mutex_t* mx) /* do nothing if mx == NULL */ +qpt_mutex_trylock(qpt_mutex mx) { - if (mx != NULL) + if (qpthreads_enabled) { int err = pthread_mutex_trylock(mx) ; if (err == 0) @@ -270,19 +316,20 @@ qpt_mutex_trylock(qpt_mutex_t* mx) /* do nothing if mx == NULL */ return 0 ; /* unable to lock */ zabort_err("pthread_mutex_trylock failed", err) ; - /* crunch */ - } ; + } + else + return 1 ; } ; -/* Unlock given mutex. +/* Unlock given mutex -- or do nothing if !qpthreads_enabled. * * Unless both NCHECK_QPTHREADS and NDEBUG are defined, checks that the * return value is valid -- zabort_errno if it isn't. */ Inline void -qpt_mutex_unlock(qpt_mutex_t* mx) /* do nothing if mx == NULL */ +qpt_mutex_unlock(qpt_mutex mx) { - if (mx != NULL) + if (qpthreads_enabled) { #if defined(NDEBUG) && defined(NDEBUG_QPTHREADS) pthread_mutex_unlock(mx) ; @@ -298,66 +345,73 @@ qpt_mutex_unlock(qpt_mutex_t* mx) /* do nothing if mx == NULL */ * Condition variable inline functions */ -/* Wait for given condition variable. +/* Wait for given condition variable -- do nothing if !qpthreads_enabled * * Unless both NCHECK_QPTHREADS and NDEBUG are defined, checks that the * return value is valid -- zabort_errno if it isn't. */ - Inline void -qpt_cond_wait(qpt_cond_t* cv, qpt_mutex_t* mx) +qpt_cond_wait(qpt_cond cv, qpt_mutex mx) { + if (qpthreads_enabled) + { #if defined(NDEBUG) && defined(NDEBUG_QPTHREADS) - pthread_cond_wait(cv, mx) ; + pthread_cond_wait(cv, mx) ; #else - int err = pthread_cond_wait(cv, mx) ; - if (err != 0) - zabort_err("pthread_cond_wait failed", err) ; + int err = pthread_cond_wait(cv, mx) ; + if (err != 0) + zabort_err("pthread_cond_wait failed", err) ; #endif + } ; } ; -/* Signal given condition. +/* Signal given condition -- do nothing if !qpthreads_enabled * * Unless both NCHECK_QPTHREADS and NDEBUG are defined, checks that the * return value is valid -- zabort_errno if it isn't. */ - Inline void -qpt_cond_signal(qpt_cond_t* cv) +qpt_cond_signal(qpt_cond cv) { + if (qpthreads_enabled) + { #if defined(NDEBUG) && defined(NDEBUG_QPTHREADS) - pthread_cond_signal(cv) ; + pthread_cond_signal(cv) ; #else - int err = pthread_cond_signal(cv) ; - if (err != 0) - zabort_err("pthread_cond_signal failed", err) ; + int err = pthread_cond_signal(cv) ; + if (err != 0) + zabort_err("pthread_cond_signal failed", err) ; #endif + } ; } ; -/* Broadcast given condition. +/* Broadcast given condition -- do nothing if !qpthreads_enabled * * Unless both NCHECK_QPTHREADS and NDEBUG are defined, checks that the * return value is valid -- zabort_errno if it isn't. */ Inline void -qpt_cond_broadcast(qpt_cond_t* cv) +qpt_cond_broadcast(qpt_cond cv) { + if (qpthreads_enabled) + { #if defined(NDEBUG) && defined(NDEBUG_QPTHREADS) - pthread_cond_broadcast(cv) ; + pthread_cond_broadcast(cv) ; #else - int err = pthread_cond_broadcast(cv) ; - if (err != 0) - zabort_err("pthread_cond_broadcast failed", err) ; + int err = pthread_cond_broadcast(cv) ; + if (err != 0) + zabort_err("pthread_cond_broadcast failed", err) ; #endif + } ; } ; /*============================================================================== * Signal Handling. */ -void +void /* FATAL error if !qpthreads_enabled */ qpt_thread_sigmask(int how, const sigset_t* set, sigset_t* oset) ; -void +void /* FATAL error if !qpthreads_enabled */ qpt_thread_signal(qpt_thread_t thread, int signum) ; #endif /* _ZEBRA_QPTHREADS_H */ |