summaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/mqueue.h14
-rw-r--r--lib/qpnexus.c166
-rw-r--r--lib/qpnexus.h63
-rw-r--r--lib/qpselect.c20
-rw-r--r--lib/qtimers.c24
-rw-r--r--lib/qtimers.h4
-rw-r--r--lib/sigevent.c50
-rw-r--r--lib/stream.c47
-rw-r--r--lib/stream.h1
-rw-r--r--lib/thread.c562
-rw-r--r--lib/thread.h20
-rw-r--r--lib/workqueue.c285
-rw-r--r--lib/workqueue.h95
13 files changed, 841 insertions, 510 deletions
diff --git a/lib/mqueue.h b/lib/mqueue.h
index d8790246..355aec23 100644
--- a/lib/mqueue.h
+++ b/lib/mqueue.h
@@ -93,16 +93,16 @@ typedef void mqueue_action(mqueue_block mqb, mqb_flag_t flag) ;
enum { mqb_args_size_max = 64 } ; /* maximum size of struct args */
enum { mqb_argv_size_unit = 16 } ; /* allocate argv in these units */
-struct args
+struct mqb_args
{
- char data[mqb_args_size_max] ; /* empty space */
+ char bytes[mqb_args_size_max] ; /* empty space */
} ;
#define MQB_ARGS_SIZE_OK(s) CONFIRM(sizeof(struct s) <= mqb_args_size_max)
struct mqueue_block
{
- struct args args ; /* user structure */
+ struct mqb_args args ; /* user structure */
mqueue_block next ; /* single linked list */
@@ -116,8 +116,12 @@ struct mqueue_block
mqb_index_t argv_next ; /* iterator */
} ;
-/* mqueue_block structures are malloced. That guarantees maximum alignment. */
-/* To guarantee maximum alignment for "struct args", it must be first item ! */
+/* mqueue_block structures are malloced. That guarantees maximum alignment.
+ * To guarantee maximum alignment for "struct args", it must be first item !
+ *
+ * (The typedef is required to stop Eclipse (3.4.2 with CDT 5.0) whining
+ * about first argument of offsetof().)
+ */
typedef struct mqueue_block mqueue_block_t ;
CONFIRM(offsetof(mqueue_block_t, args) == 0) ;
diff --git a/lib/qpnexus.c b/lib/qpnexus.c
index 8a78a70b..cb0bd12c 100644
--- a/lib/qpnexus.c
+++ b/lib/qpnexus.c
@@ -36,7 +36,13 @@ static void qpn_in_thread_init(qpn_nexus qpn);
*/
-/* Initialise a nexus -- allocating it if required.
+/*==============================================================================
+ * Initialisation, add hook, free etc.
+ *
+ */
+
+/*------------------------------------------------------------------------------
+ * Initialise a nexus -- allocating it if required.
*
* If main_thread is set then no new thread will be created
* when qpn_exec() is called, instead the finite state machine will be
@@ -45,7 +51,7 @@ static void qpn_in_thread_init(qpn_nexus qpn);
*
* Returns the qpn_nexus.
*/
-qpn_nexus
+extern qpn_nexus
qpn_init_new(qpn_nexus qpn, int main_thread)
{
if (qpn == NULL)
@@ -53,16 +59,27 @@ qpn_init_new(qpn_nexus qpn, int main_thread)
else
memset(qpn, 0, sizeof(struct qpn_nexus)) ;
- qpn->selection = qps_selection_init_new(qpn->selection);
- qpn->pile = qtimer_pile_init_new(qpn->pile);
- qpn->queue = mqueue_init_new(qpn->queue, mqt_signal_unicast);
+ qpn->selection = qps_selection_init_new(qpn->selection);
+ qpn->pile = qtimer_pile_init_new(qpn->pile);
+ qpn->queue = mqueue_init_new(qpn->queue, mqt_signal_unicast);
qpn->main_thread = main_thread;
- qpn->start = qpn_start;
+ qpn->start = qpn_start;
return qpn;
}
-/* free timers, selection, message queue and nexus
+/*------------------------------------------------------------------------------
+ * Add a hook function to the given nexus.
+ */
+extern void
+qpn_add_hook_function(qpn_hook_list list, void* hook)
+{
+ passert(list->count < qpn_hooks_max) ;
+ list->hooks[list->count++] = hook ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * free timers, selection, message queue and nexus
* return NULL
*/
qpn_nexus
@@ -99,24 +116,25 @@ qpn_free(qpn_nexus qpn)
return NULL;
}
-/* If not main thread create new qpthread.
- * Execute the state machine */
-void
+/*==============================================================================
+ * Execution of a nexus
+ */
+
+/*------------------------------------------------------------------------------
+ * If not main qpthread create new qpthread.
+ *
+ * For all qpthreads: start the thread !
+ */
+extern void
qpn_exec(qpn_nexus qpn)
{
if (qpn->main_thread)
- {
- /* Run the state machine in calling thread */
- qpn->start(qpn);
- }
+ qpn->start(qpn);
else
- {
- /* create a qpthread and run the state machine in it */
- qpt_thread_create(qpn->start, qpn, NULL) ;
- }
-}
+ qpt_thread_create(qpn->start, qpn, NULL) ;
+} ;
-/*==============================================================================
+/*------------------------------------------------------------------------------
* Pthread routine
*
* Processes:
@@ -145,78 +163,86 @@ qpn_start(void* arg)
qpn_nexus qpn = arg;
mqueue_block mqb;
int actions;
- qtime_mono_t now;
- qtime_mono_t max_wait;
- int i;
+ qtime_mono_t now ;
+ qtime_t max_wait ;
+ unsigned i;
+ unsigned done ;
+ unsigned wait ;
- /* now in our thread, complete initialisation */
+ /* now in our thread, complete initialisation */
qpn_in_thread_init(qpn);
+ /* Until required to terminate, loop */
+ done = 1 ;
while (!qpn->terminate)
{
- now = qt_get_monotonic();
+ wait = (done == 0) ; /* may wait this time only if nothing
+ found to do on the last pass */
- /* Signals are highest priority.
- * only execute on the main thread */
+ /* Signals are highest priority -- only execute for main thread
+ *
+ * Restarts "done" for this pass.
+ */
if (qpn->main_thread)
- quagga_sigevent_process ();
+ done = quagga_sigevent_process() ;
+ else
+ done = 0 ;
- /* max time to wait in pselect */
- max_wait = QTIME(MAX_PSELECT_TIMOUT);
-
- /* event hooks, if any. High priority */
- for (i = 0; i < NUM_EVENT_HOOK; ++i)
- {
- if (qpn->event_hook[i] != NULL)
- {
- /* first, second and third priority */
- qtime_mono_t event_wait = qpn->event_hook[i](qpn_pri_third);
- if (event_wait > 0 && event_wait < max_wait)
- max_wait = event_wait;
- }
- }
+ /* Foreground hooks, if any. */
+ for (i = 0; i < qpn->foreground.count ; ++i)
+ done |= ((qpn_hook_function*)(qpn->foreground.hooks[i]))() ;
/* drain the message queue, will be in waiting for signal state
* when it's empty */
- for (;;)
+
+ if (done != 0)
+ wait = 0 ; /* turn off wait if found something */
+
+ while (1)
{
- mqb = mqueue_dequeue(qpn->queue, 1, qpn->mts) ;
+ mqb = mqueue_dequeue(qpn->queue, wait, qpn->mts) ;
if (mqb == NULL)
break;
mqb_dispatch(mqb, mqb_action);
- }
- /* Event hooks, if any. All priorities */
- for (i = 0; i < NUM_EVENT_HOOK; ++i)
- {
- if (qpn->event_hook[i] != NULL)
- {
- /* first, second third and fourth priority */
- qtime_mono_t event_wait = qpn->event_hook[i](qpn_pri_fourth);
- if (event_wait > 0 && event_wait < max_wait)
- max_wait = event_wait;
- }
- }
-
- /* block for some input, output, signal or timeout */
- actions = qps_pselect(qpn->selection,
- qtimer_pile_top_time(qpn->pile, now + max_wait));
-
- /* process I/O actions */
- while (actions)
- actions = qps_dispatch_next(qpn->selection) ;
+ done = 1 ; /* done something */
+ wait = 0 ; /* turn off wait */
+ } ;
- mqueue_done_waiting(qpn->queue, qpn->mts);
+ /* block for some input, output, signal or timeout
+ *
+ * wait will be true iff did nothing the last time round the loop, and
+ * not found anything to be done up to this point either.
+ */
+ if (wait)
+ max_wait = qtimer_pile_top_wait(qpn->pile, QTIME(MAX_PSELECT_WAIT)) ;
+ else
+ max_wait = 0 ;
+
+ actions = qps_pselect(qpn->selection, max_wait) ;
+ done |= actions ;
+
+ if (wait)
+ mqueue_done_waiting(qpn->queue, qpn->mts);
+
+ /* process I/O actions */
+ while (actions)
+ actions = qps_dispatch_next(qpn->selection) ;
- /* process timers */
+ /* process timers */
+ now = qt_get_monotonic() ;
while (qtimer_pile_dispatch_next(qpn->pile, now))
- {
- }
- }
+ done = 1 ;
+
+ /* If nothing done in this pass, see if anything in the background */
+ if (done == 0)
+ for (i = 0; i < qpn->background.count ; ++i)
+ done |= ((qpn_hook_function*)(qpn->background.hooks[i]))() ;
+ } ;
/* last bit of code to run in this thread */
- if (qpn->in_thread_final)
+ if (qpn->in_thread_final != NULL)
qpn->in_thread_final();
return NULL;
diff --git a/lib/qpnexus.h b/lib/qpnexus.h
index a6cad148..d5b7c5a6 100644
--- a/lib/qpnexus.h
+++ b/lib/qpnexus.h
@@ -48,31 +48,27 @@
*/
/* maximum time in seconds to sit in a pselect */
-#define MAX_PSELECT_TIMOUT 10
+#define MAX_PSELECT_WAIT 10
/* signal for message queues */
#define SIGMQUEUE SIGUSR2
/* number of event hooks */
-#define NUM_EVENT_HOOK 2
-
-/* Work priorities */
-enum qpn_priority
-{
- qpn_pri_highest = 1,
-
- qpn_pri_first = 1,
- qpn_pri_second = 2,
- qpn_pri_third = 3,
- qpn_pri_fourth = 4,
-
- qpn_pri_lowest = 4,
-};
+enum { qpn_hooks_max = 4 } ;
/*==============================================================================
* Data Structures.
*/
+typedef int qpn_hook_function(void) ;
+
+typedef struct qpn_hook_list* qpn_hook_list ;
+struct qpn_hook_list
+{
+ void* hooks[qpn_hooks_max] ;
+ unsigned count ;
+} ;
+
typedef struct qpn_nexus* qpn_nexus ;
struct qpn_nexus
@@ -99,30 +95,45 @@ struct qpn_nexus
/* qpthread routine, can override */
void* (*start)(void*);
- /* in-thread initialize, can override. Called within the thread
- * after all other initializion just before thread loop */
+ /* in-thread initialise, can override. Called within the thread
+ * after all other initialisation just before thread loop */
void (*in_thread_init)(void);
- /* in-thread finalize, can override. Called within thread
+ /* in-thread finalise, can override. Called within thread
* just before thread dies. Nexus components all exist but
* thread loop is no longer executed */
void (*in_thread_final)(void);
- /* thread loop events, can override. Called before and after message queue,
- * and before I/O and timers.
- * Hook should perform all work <= given priority.
- * Returns the time to try again, 0 means default to maximum.
+ /* in-thread queue(s) of events or other work.
+ *
+ * The hook function(s) are called in the qpnexus loop, at the top of the
+ * loop. So in addition to the mqueue, I/O, timers and any background stuff,
+ * the thread may have other queue(s) of things to be done.
+ *
+ * Hook function can process some queue(s) of things to be done. It does not
+ * have to empty its queues, but it MUST only return 0 if all queues are now
+ * empty.
*/
- qtime_mono_t (*event_hook[NUM_EVENT_HOOK])(enum qpn_priority);
-
+ struct qpn_hook_list foreground ;
+
+ /* in-thread background queue(s) of events or other work.
+ *
+ * The hook functions are called at the bottom of the qpnexus loop, but only
+ * when there is absolutely nothing else to do.
+ *
+ * The hook function should do some unit of background work (if there is any)
+ * and return. MUST return 0 iff there is no more work to do.
+ */
+ struct qpn_hook_list background ;
};
/*==============================================================================
* Functions
*/
-extern qpn_nexus qpn_init_new(qpn_nexus qtn, int main_thread);
-extern void qpn_exec(qpn_nexus qtn);
+extern qpn_nexus qpn_init_new(qpn_nexus qpn, int main_thread);
+extern void qpn_add_hook_function(qpn_hook_list list, void* hook) ;
+extern void qpn_exec(qpn_nexus qpn);
extern void qpn_terminate(qpn_nexus qpn);
extern qpn_nexus qpn_free(qpn_nexus qpn);
diff --git a/lib/qpselect.c b/lib/qpselect.c
index 7df59752..d3f8e5ad 100644
--- a/lib/qpselect.c
+++ b/lib/qpselect.c
@@ -270,13 +270,8 @@ qps_set_signal(qps_selection qps, int signum, sigset_t sigmask)
} ;
} ;
-/* Execute a pselect for the given selection -- subject to the given timeout
- * *time*.
- *
- * The time-out time is an "absolute" time, as measured by qt_get_monotonic().
- *
- * A timeout time <= the current qt_get_monotonic() is treated as a zero
- * timeout period, and will return immediately from the pselect.
+/* Execute a pselect for the given selection -- subject to the given maximum
+ * time to wait.
*
* There is no support for an infinite timeout.
*
@@ -289,7 +284,7 @@ qps_set_signal(qps_selection qps, int signum, sigset_t sigmask)
* The qps_dispatch_next() processes the returns from pselect().
*/
int
-qps_pselect(qps_selection qps, qtime_mono_t timeout)
+qps_pselect(qps_selection qps, qtime_t max_wait)
{
struct timespec ts ;
qps_mnum_t mnum ;
@@ -334,16 +329,15 @@ qps_pselect(qps_selection qps, qtime_mono_t timeout)
qps->tried_fd_last = qps->fd_last ;
qps->pend_fd = 0 ;
- /* Convert timeout time to interval for pselect() */
- timeout -= qt_get_monotonic() ;
- if (timeout < 0)
- timeout = 0 ;
+ /* Make sure not trying to do something stupid */
+ if (max_wait < 0)
+ max_wait = 0 ;
/* Finally ready for the main event */
n = pselect(qps->fd_last + 1, p_fds[qps_read_mnum],
p_fds[qps_write_mnum],
p_fds[qps_error_mnum],
- qtime2timespec(&ts, timeout),
+ qtime2timespec(&ts, max_wait),
(qps->signum != 0) ? &qps->sigmask : NULL) ;
/* If have something, set and return the pending count. */
diff --git a/lib/qtimers.c b/lib/qtimers.c
index dcce24b9..0aef52a4 100644
--- a/lib/qtimers.c
+++ b/lib/qtimers.c
@@ -108,7 +108,9 @@ qtimer_pile_init_new(qtimer_pile qtp)
* timers -- invalid heap -- need to properly initialise
*/
- /* Eclipse flags offsetof(struct qtimer, backlink) as a syntax error :-( */
+ /* (The typedef is required to stop Eclipse (3.4.2 with CDT 5.0) whining
+ * about first argument of offsetof().)
+ */
typedef struct qtimer qtimer_t ;
heap_init_new_backlinked(&qtp->timers, 0, (heap_cmp*)qtimer_cmp,
@@ -122,15 +124,18 @@ qtimer_pile_init_new(qtimer_pile qtp)
* empty, or the top entry times out after the maximum time, then the maximum
* is returned.
*/
-qtime_mono_t
-qtimer_pile_top_time(qtimer_pile qtp, qtime_mono_t max_time)
+qtime_t
+qtimer_pile_top_wait(qtimer_pile qtp, qtime_t max_wait)
{
+ qtime_t top_wait ;
qtimer qtr = heap_top_item(&qtp->timers) ;
- if ((qtr == NULL) || (qtr->time >= max_time))
- return max_time ;
- else
- return qtr->time ;
+ if (qtr == NULL)
+ return max_wait ;
+
+ top_wait = qtr->time - qt_get_monotonic() ;
+
+ return (top_wait < max_wait) ? top_wait : max_wait ;
} ;
/* Dispatch the next timer whose time is <= the given "upto" time.
@@ -157,7 +162,6 @@ qtimer_pile_dispatch_next(qtimer_pile qtp, qtime_mono_t upto)
qtr->state = qtr_state_unset_pending ;
qtr->action(qtr, qtr->timer_info, upto) ;
- assert(qtp == qtr->pile);
if (qtr->state == qtr_state_unset_pending)
qtimer_unset(qtr) ;
@@ -372,7 +376,9 @@ qtimer_pile_verify(qtimer_pile qtp)
vector_index e ;
qtimer qtr ;
- /* Eclipse flags offsetof(struct qtimer, backlink) as a syntax error :-( */
+ /* (The typedef is required to stop Eclipse (3.4.2 with CDT 5.0) whining
+ * about first argument of offsetof().)
+ */
typedef struct qtimer qtimer_t ;
assert(th->cmp == (heap_cmp*)qtimer_cmp) ;
diff --git a/lib/qtimers.h b/lib/qtimers.h
index 3d509acb..0bc3d7a1 100644
--- a/lib/qtimers.h
+++ b/lib/qtimers.h
@@ -85,8 +85,8 @@ qtimer_pile_init_new(qtimer_pile qtp) ;
int
qtimer_pile_dispatch_next(qtimer_pile qtp, qtime_mono_t upto) ;
-qtime_mono_t
-qtimer_pile_top_time(qtimer_pile qtp, qtime_mono_t max_time) ;
+qtime_t
+qtimer_pile_top_wait(qtimer_pile qtp, qtime_t max_wait) ;
qtimer
qtimer_pile_ream(qtimer_pile qtp, int free_structure) ;
diff --git a/lib/sigevent.c b/lib/sigevent.c
index 30e9a3d1..a3d4219c 100644
--- a/lib/sigevent.c
+++ b/lib/sigevent.c
@@ -16,7 +16,7 @@
* You should have received a copy of the GNU General Public License
* along with Quagga; see the file COPYING. If not, write to the Free
* Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- * 02111-1307, USA.
+ * 02111-1307, USA.
*/
#include <zebra.h>
@@ -41,13 +41,13 @@ struct quagga_sigevent_master_t
{
struct thread *t;
- struct quagga_signal_t *signals;
+ struct quagga_signal_t *signals;
int sigc;
-
+
volatile sig_atomic_t caught;
} sigmaster;
-/* Generic signal handler
+/* Generic signal handler
* Schedules signal event thread
*/
static void
@@ -55,24 +55,30 @@ quagga_signal_handler (int signo)
{
int i;
struct quagga_signal_t *sig;
-
+
for (i = 0; i < sigmaster.sigc; i++)
{
sig = &(sigmaster.signals[i]);
-
+
if (sig->signal == signo)
sig->caught = 1;
}
-
+
sigmaster.caught = 1;
-}
+}
-/* check if signals have been caught and run appropriate handlers */
+/* check if signals have been caught and run appropriate handlers
+ *
+ * Returns: 0 => nothing to do
+ * -1 => failed
+ * > 0 => done this many signals
+ */
int
quagga_sigevent_process (void)
{
struct quagga_signal_t *sig;
int i;
+ int done ;
#ifdef SIGEVENT_BLOCK_SIGNALS
/* shouldnt need to block signals, but potentially may be needed */
sigset_t newmask, oldmask;
@@ -85,7 +91,7 @@ quagga_sigevent_process (void)
sigfillset (&newmask);
sigdelset (&newmask, SIGTRAP);
sigdelset (&newmask, SIGKILL);
-
+
if ( (sigprocmask (SIG_BLOCK, &newmask, &oldmask)) < 0)
{
zlog_err ("quagga_signal_timer: couldnt block signals!");
@@ -93,13 +99,14 @@ quagga_sigevent_process (void)
}
#endif /* SIGEVENT_BLOCK_SIGNALS */
+ done = 0 ;
if (sigmaster.caught > 0)
{
sigmaster.caught = 0;
/* must not read or set sigmaster.caught after here,
* race condition with per-sig caught flags if one does
*/
-
+
for (i = 0; i < sigmaster.sigc; i++)
{
sig = &(sigmaster.signals[i]);
@@ -108,6 +115,7 @@ quagga_sigevent_process (void)
{
sig->caught = 0;
sig->handler ();
+ ++done ;
}
}
}
@@ -117,7 +125,7 @@ quagga_sigevent_process (void)
return -1;
#endif /* SIGEVENT_BLOCK_SIGNALS */
- return 0;
+ return done ;
}
#ifdef SIGEVENT_SCHEDULE_THREAD
@@ -159,7 +167,7 @@ signal_set (int signo)
}
ret = sigaction (signo, &sig, &osig);
- if (ret < 0)
+ if (ret < 0)
return ret;
else
return 0;
@@ -245,13 +253,13 @@ trap_default_signals(void)
SIGUSR1,
SIGUSR2,
#ifdef SIGPOLL
- SIGPOLL,
+ SIGPOLL,
#endif
#ifdef SIGVTALRM
SIGVTALRM,
#endif
#ifdef SIGSTKFLT
- SIGSTKFLT,
+ SIGSTKFLT,
#endif
};
static const int ignore_signals[] = {
@@ -309,8 +317,8 @@ trap_default_signals(void)
}
}
-void
-signal_init (struct thread_master *m, int sigc,
+void
+signal_init (struct thread_master *m, int sigc,
struct quagga_signal_t signals[])
{
@@ -320,7 +328,7 @@ signal_init (struct thread_master *m, int sigc,
/* First establish some default handlers that can be overridden by
the application. */
trap_default_signals();
-
+
while (i < sigc)
{
sig = &signals[i];
@@ -332,9 +340,9 @@ signal_init (struct thread_master *m, int sigc,
sigmaster.sigc = sigc;
sigmaster.signals = signals;
-#ifdef SIGEVENT_SCHEDULE_THREAD
- sigmaster.t =
- thread_add_timer (m, quagga_signal_timer, &sigmaster,
+#ifdef SIGEVENT_SCHEDULE_THREAD
+ sigmaster.t =
+ thread_add_timer (m, quagga_signal_timer, &sigmaster,
QUAGGA_SIGNAL_TIMER_INTERVAL);
#endif /* SIGEVENT_SCHEDULE_THREAD */
}
diff --git a/lib/stream.c b/lib/stream.c
index 14c7c589..b4c16977 100644
--- a/lib/stream.c
+++ b/lib/stream.c
@@ -998,46 +998,6 @@ stream_flush (struct stream* s, int fd)
}
/*------------------------------------------------------------------------------
- * Try to write stream contents to the file descriptor -- assuming non-blocking.
- *
- * Loops if gets EINTR.
- *
- * If writes everything, resets the stream.
- *
- * If does not write everything, then would block.
- *
- * Returns: >= 0 number of bytes left to write
- * -1 => some error (not including EINTR, EAGAIN or EWOULDBLOCK)
- */
-int
-stream_flush_try(struct stream* s, int fd)
-{
- int have ;
- int ret ;
-
- STREAM_VERIFY_SANE(s);
-
- while ((have = (s->endp - s->getp)) != 0)
- {
- ret = write(fd, s->data + s->getp, have) ;
- if (ret > 0)
- s->getp += ret ;
- else if (ret < 0)
- {
- ret = errno ;
- if ((ret == EAGAIN) || (ret == EWOULDBLOCK))
- return have ;
- if (ret != EINTR)
- return -1 ;
- } ;
- } ;
-
- s->getp = s->endp = 0;
-
- return 0 ;
-}
-
-/*------------------------------------------------------------------------------
* Transfer contents of stream to given buffer and reset stream.
*
* Transfers *entire* stream buffer.
@@ -1113,6 +1073,13 @@ stream_fifo_head (struct stream_fifo *fifo)
}
void
+stream_fifo_reset (struct stream_fifo *fifo)
+{
+ fifo->head = fifo->tail = NULL;
+ fifo->count = 0;
+}
+
+void
stream_fifo_clean (struct stream_fifo *fifo)
{
struct stream *s;
diff --git a/lib/stream.h b/lib/stream.h
index 094cf0c6..e7303652 100644
--- a/lib/stream.h
+++ b/lib/stream.h
@@ -224,6 +224,7 @@ extern struct stream_fifo *stream_fifo_new (void);
extern void stream_fifo_push (struct stream_fifo *fifo, struct stream *s);
extern struct stream *stream_fifo_pop (struct stream_fifo *fifo);
extern struct stream *stream_fifo_head (struct stream_fifo *fifo);
+extern void stream_fifo_reset (struct stream_fifo *fifo);
extern void stream_fifo_clean (struct stream_fifo *fifo);
extern void stream_fifo_free (struct stream_fifo *fifo);
diff --git a/lib/thread.c b/lib/thread.c
index f2b873ac..3df9acf7 100644
--- a/lib/thread.c
+++ b/lib/thread.c
@@ -31,6 +31,7 @@
#include "command.h"
#include "sigevent.h"
#include "qpthreads.h"
+#include "qtimers.h"
/* Recent absolute time of day */
struct timeval recent_time;
@@ -47,7 +48,12 @@ static qpt_mutex_t thread_mutex;
#define UNLOCK qpt_mutex_unlock(&thread_mutex);
static struct hash *cpu_record = NULL;
-/* Struct timeval's tv_usec one second value. */
+/* Pointer to qtimer pile to be used, if any */
+static qtimer_pile use_qtimer_pile = NULL ;
+static qtimer spare_qtimers = NULL ;
+static unsigned used_standard_timer = 0 ;
+
+/* Struct timeval's tv_usec one second value. */
#define TIMER_SECOND_MICRO 1000000L
/* Adjust so that tv_usec is in the range [0,TIMER_SECOND_MICRO).
@@ -238,18 +244,51 @@ cpu_record_hash_cmp (const struct cpu_thread_history *a,
static void *
cpu_record_hash_alloc (struct cpu_thread_history *a)
{
- struct cpu_thread_history *new;
+ const char* b ;
+ const char* e ;
+ char* n ;
+ int l ;
+ struct cpu_thread_history *new ;
+
+ /* Establish start and length of name, removing leading/trailing
+ * spaces and any enclosing (...) -- recursively.
+ */
+ b = a->funcname ;
+ e = b + strlen(b) - 1 ;
+
+ while (1)
+ {
+ while (*b == ' ')
+ ++b ; /* strip leading spaces */
+ if (*b == '\0')
+ break ; /* quit if now empty */
+ while (*e == ' ')
+ --e ; /* strip trailing spaces */
+ if ((*b != '(') || (*e != ')'))
+ break ; /* quit if not now (...) */
+ ++b ;
+ --e ; /* discard ( and ) */
+ } ;
+
+ l = (e + 1) - b ; /* length excluding trailing \0 */
+
+ n = XMALLOC(MTYPE_THREAD_FUNCNAME, l + 1) ;
+ memcpy(n, b, l) ;
+ n[l] = '\0' ;
+
+ /* Allocate empty structure and set address and name */
new = XCALLOC (MTYPE_THREAD_STATS, sizeof (struct cpu_thread_history));
- new->func = a->func;
- new->funcname = XSTRDUP(MTYPE_THREAD_FUNCNAME, a->funcname);
- return new;
+ new->func = a->func;
+ new->funcname = n ;
+
+ return new ;
}
static void
cpu_record_hash_free (void *a)
{
struct cpu_thread_history *hist = a;
- char* funcname = miyagi(hist->funcname) ;
+ void* funcname = miyagi(hist->funcname) ;
XFREE (MTYPE_THREAD_FUNCNAME, funcname);
XFREE (MTYPE_THREAD_STATS, hist);
@@ -497,7 +536,6 @@ thread_add_unuse (struct thread_master *m, struct thread *thread)
assert (thread->prev == NULL);
assert (thread->type == THREAD_UNUSED);
thread_list_add (&m->unuse, thread);
- /* XXX: Should we deallocate funcname here? */
}
/* Free all unused thread. */
@@ -510,8 +548,13 @@ thread_list_free (struct thread_master *m, struct thread_list *list)
for (t = list->head; t; t = next)
{
next = t->next;
- if (t->funcname)
- XFREE (MTYPE_THREAD_FUNCNAME, t->funcname);
+
+ if ( (use_qtimer_pile != NULL)
+ && ( (t->type == THREAD_TIMER || t->type == THREAD_BACKGROUND) )
+ && (t->u.qtr != NULL)
+ )
+ qtimer_free(t->u.qtr) ;
+
XFREE (MTYPE_THREAD, t);
list->count--;
m->alloc--;
@@ -522,6 +565,8 @@ thread_list_free (struct thread_master *m, struct thread_list *list)
void
thread_master_free (struct thread_master *m)
{
+ qtimer qtr ;
+
thread_list_free (m, &m->read);
thread_list_free (m, &m->write);
thread_list_free (m, &m->timer);
@@ -540,6 +585,12 @@ thread_master_free (struct thread_master *m)
cpu_record = NULL;
}
UNLOCK
+
+ while ((qtr = spare_qtimers) != NULL)
+ {
+ spare_qtimers = (void*)(qtr->pile) ;
+ qtimer_free(qtr) ;
+ } ;
}
/* Thread list is empty or not. */
@@ -570,34 +621,26 @@ thread_timer_remain_second (struct thread *thread)
return 0;
}
-/* Trim blankspace and "()"s */
-static char *
-strip_funcname (const char *funcname)
-{
- char buff[100];
- char tmp, *ret, *e, *b = buff;
+/* Get new cpu history */
- strncpy(buff, funcname, sizeof(buff));
- buff[ sizeof(buff) -1] = '\0';
- e = buff +strlen(buff) -1;
+static struct cpu_thread_history*
+thread_get_hist(struct thread* thread, const char* funcname)
+{
+ struct cpu_thread_history tmp ;
+ struct cpu_thread_history* hist ;
- /* Wont work for funcname == "Word (explanation)" */
+ tmp.func = thread->func ;
+ tmp.funcname = funcname ;
- while (*b == ' ' || *b == '(')
- ++b;
- while (*e == ' ' || *e == ')')
- --e;
- e++;
+ LOCK
+ hist = hash_get (cpu_record, &tmp,
+ (void * (*) (void *))cpu_record_hash_alloc);
+ UNLOCK
- tmp = *e;
- *e = '\0';
- ret = XSTRDUP (MTYPE_THREAD_FUNCNAME, b);
- *e = tmp;
+ return hist ;
+} ;
- return ret;
-}
-
-/* Get new thread. */
+/* Get new thread. */
static struct thread *
thread_get (struct thread_master *m, u_char type,
int (*func) (struct thread *), void *arg, const char* funcname)
@@ -607,23 +650,22 @@ thread_get (struct thread_master *m, u_char type,
if (!thread_empty (&m->unuse))
{
thread = thread_trim_head (&m->unuse);
- if (thread->funcname)
- XFREE(MTYPE_THREAD_FUNCNAME, thread->funcname);
+ memset(thread, 0, sizeof (struct thread)) ;
}
else
{
thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread));
m->alloc++;
}
- thread->type = type;
+ thread->type = type;
thread->add_type = type;
- thread->master = m;
- thread->func = func;
- thread->arg = arg;
+ thread->master = m;
+ thread->func = func;
+ thread->arg = arg;
- thread->funcname = strip_funcname(funcname);
+ thread->hist = thread_get_hist(thread, funcname) ;
- return thread;
+ return thread ;
}
/* Add new read thread. */
@@ -672,48 +714,190 @@ funcname_thread_add_write (struct thread_master *m,
return thread;
}
+/*==============================================================================
+ * Timer Threads -- THREAD_TIMER and THREAD_BACKGROUND
+ *
+ * Standard Timer Threads are sorted by the "struct timeval sands", and
+ * processed by thread_timer_process() -- which moves any expired timer
+ * threads onto the THREAD_READY queue. So, the scheduling of background stuff
+ * is done by not processing the THREAD_BACKGROUND queue until there is
+ * nothing else to do.
+ *
+ * When using a qtimer_pile:
+ *
+ * * THREAD_TIMER threads have an associated qtimer.
+ *
+ * When the timer expires, the qtimer is cut from the thread (and put onto
+ * the spare_qtimers list). The thread is then queued on the THREAD_READY
+ * queue (as before).
+ *
+ * * THREAD_BACKGROUND threads which have a non-zero delay are treated much
+ * as THREAD_TIMER, except that when the timer expires, the thread is
+ * queued on the THREAD_BACKGROUND queue.
+ *
+ * The THREAD_BACKGROUND queue is visited only when there is nothing else
+ * to do.
+ *
+ * Note that when using a qtimer_pile, and there is an active qtimer associated
+ * with the thread, the thread will be on the THREAD_TIMER queue -- so that it
+ * can be collected up and released if required.
+ *
+ * NB: when using a qtimer_pile, if there is a qtimer associated with a
+ * THREAD_TIMER or a THREAD_BACKGROUND thread, then thread->u.qtr points
+ * at the qtimer.
+ *
+ * AND, conversely, if there is no qtimer, then thread->u.ptr == NULL.
+ */
+
+/*------------------------------------------------------------------------------
+ * Set use_qtimer_pile !
+ */
+extern void
+thread_set_qtimer_pile(qtimer_pile pile)
+{
+ passert(!used_standard_timer) ;
+
+ use_qtimer_pile = pile ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Unset qtimer associated with the given THREAD_TIMER or THREAD_BACKGROUND
+ * thread -- if any.
+ *
+ * Moves any qtimer onto the spare_qtimers list.
+ */
+static void
+thread_qtimer_unset(struct thread* thread)
+{
+ qtimer qtr ;
+ assert (thread->type == THREAD_TIMER || thread->type == THREAD_BACKGROUND);
+ assert (use_qtimer_pile != NULL) ;
+
+ qtr = thread->u.qtr ;
+ if (qtr != NULL)
+ {
+ qtimer_unset(qtr) ;
+
+ qtr->pile = (void*)spare_qtimers ;
+ spare_qtimers = qtr ;
+
+ thread->u.qtr = NULL ;
+ } ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * The qtimer action function -- when using qtimer pile (!)
+ *
+ * Remove thread from the THREAD_TIMER queue and unset the qtimer, place
+ * thread on the THREAD_READY or the THREAD_BACKGROUND queue as required.
+ */
+static void
+thread_qtimer_dispatch(qtimer qtr, void* timer_info, qtime_mono_t when)
+{
+ struct thread* thread = timer_info ;
+
+ thread_list_delete (&thread->master->timer, thread) ;
+ thread_qtimer_unset(thread) ;
+
+ switch (thread->type)
+ {
+ case THREAD_TIMER:
+ thread->type = THREAD_READY;
+ thread_list_add (&thread->master->ready, thread);
+ break ;
+
+ case THREAD_BACKGROUND:
+ thread_list_add (&thread->master->background, thread);
+ break ;
+
+ default:
+ zabort("invalid thread type in thread_qtimer_dispatch") ;
+ } ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * For standard timers, return time left on first timer on the given list.
+ */
+static struct timeval *
+thread_timer_wait (struct thread_list *tlist, struct timeval *timer_val)
+{
+ if (!thread_empty (tlist))
+ {
+ *timer_val = timeval_subtract (tlist->head->u.sands, relative_time);
+ return timer_val;
+ }
+ return NULL;
+}
+
+/*------------------------------------------------------------------------------
+ * Add timer of given type -- either standard or qtimer_pile as required.
+ *
+ * Timer interval is given as a struct timeval.
+ */
static struct thread *
-funcname_thread_add_timer_timeval (struct thread_master *m,
- int (*func) (struct thread *),
+funcname_thread_add_timer_timeval(struct thread_master *m,
+ int (*func) (struct thread *),
int type,
void *arg,
struct timeval *time_relative,
const char* funcname)
{
struct thread *thread;
- struct thread_list *list;
- struct timeval alarm_time;
- struct thread *tt;
assert (m != NULL);
+ assert (time_relative != NULL);
assert (type == THREAD_TIMER || type == THREAD_BACKGROUND);
- assert (time_relative);
- list = ((type == THREAD_TIMER) ? &m->timer : &m->background);
thread = thread_get (m, type, func, arg, funcname);
- /* Do we need jitter here? */
- quagga_get_relative (NULL);
- alarm_time.tv_sec = relative_time.tv_sec + time_relative->tv_sec;
- alarm_time.tv_usec = relative_time.tv_usec + time_relative->tv_usec;
- thread->u.sands = timeval_adjust(alarm_time);
-
- /* Sort by timeval. */
- for (tt = list->head; tt; tt = tt->next)
- if (timeval_cmp (thread->u.sands, tt->u.sands) <= 0)
- break;
+ if (use_qtimer_pile == NULL)
+ {
+ struct thread_list *list;
+ struct timeval alarm_time;
+ struct thread *tt;
- if (tt)
- thread_list_add_before (list, tt, thread);
+ /* Do we need jitter here? */
+ quagga_get_relative (NULL);
+ alarm_time.tv_sec = relative_time.tv_sec + time_relative->tv_sec;
+ alarm_time.tv_usec = relative_time.tv_usec + time_relative->tv_usec;
+ thread->u.sands = timeval_adjust(alarm_time);
+
+ /* Sort by timeval. */
+ list = ((type == THREAD_TIMER) ? &m->timer : &m->background);
+ for (tt = list->head; tt; tt = tt->next)
+ if (timeval_cmp (thread->u.sands, tt->u.sands) <= 0)
+ break;
+
+ if (tt)
+ thread_list_add_before (list, tt, thread);
+ else
+ thread_list_add (list, thread);
+
+ used_standard_timer = 1 ;
+ }
else
- thread_list_add (list, thread);
+ {
+ qtimer qtr = spare_qtimers ;
+ if (qtr != NULL)
+ spare_qtimers = (qtimer)(qtr->pile) ;
+
+ qtr = qtimer_init_new(qtr, use_qtimer_pile, NULL, thread) ;
+ thread->u.qtr = qtr ;
+
+ qtimer_set_interval(qtr, timeval2qtime(time_relative),
+ thread_qtimer_dispatch) ;
+ thread_list_add(&m->timer, thread) ;
+ } ;
return thread;
}
-
-/* Add timer event thread. */
+/*------------------------------------------------------------------------------
+ * Add a THREAD_TIMER timer -- either standard or qtimer_pile as required.
+ *
+ * Timer interval is given in seconds.
+ */
struct thread *
funcname_thread_add_timer (struct thread_master *m,
int (*func) (struct thread *),
@@ -721,16 +905,18 @@ funcname_thread_add_timer (struct thread_master *m,
{
struct timeval trel;
- assert (m != NULL);
-
- trel.tv_sec = timer;
+ trel.tv_sec = timer;
trel.tv_usec = 0;
return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER, arg,
&trel, funcname);
}
-/* Add timer event thread with "millisecond" resolution */
+/*------------------------------------------------------------------------------
+ * Add a THREAD_TIMER timer -- either standard or qtimer_pile as required.
+ *
+ * Timer interval is given in milliseconds.
+ */
struct thread *
funcname_thread_add_timer_msec (struct thread_master *m,
int (*func) (struct thread *),
@@ -738,45 +924,56 @@ funcname_thread_add_timer_msec (struct thread_master *m,
{
struct timeval trel;
- assert (m != NULL);
-
- trel.tv_sec = timer / 1000;
- trel.tv_usec = 1000*(timer % 1000);
+ trel.tv_sec = timer / 1000 ;
+ trel.tv_usec = (timer % 1000) * 1000 ;
return funcname_thread_add_timer_timeval (m, func, THREAD_TIMER,
- arg, &trel, funcname);
+ arg, &trel, funcname);
}
-/* Add a background thread, with an optional millisec delay */
+/*------------------------------------------------------------------------------
+ * Add a THREAD_BACKGROUND thread -- either standard or qtimer_pile as required.
+ *
+ * Timer interval is given in milliseconds.
+ *
+ * For qtimer_pile, if the delay is zero, the thread is placed straight onto
+ * the THREAD_BACKGROUND queue.
+ */
struct thread *
funcname_thread_add_background (struct thread_master *m,
int (*func) (struct thread *),
void *arg, long delay,
const char *funcname)
{
- struct timeval trel;
+ if ((delay != 0) || (use_qtimer_pile == NULL))
+ {
+ struct timeval trel;
- assert (m != NULL);
+ trel.tv_sec = delay / 1000;
+ trel.tv_usec = (delay % 1000) * 1000 ;
- if (delay)
- {
- trel.tv_sec = delay / 1000;
- trel.tv_usec = 1000*(delay % 1000);
+ return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND,
+ arg, &trel, funcname);
}
else
{
- trel.tv_sec = 0;
- trel.tv_usec = 0;
- }
+ struct thread* thread ;
+
+ assert (m != NULL);
- return funcname_thread_add_timer_timeval (m, func, THREAD_BACKGROUND,
- arg, &trel, funcname);
+ thread = thread_get (m, THREAD_BACKGROUND, func, arg, funcname);
+ thread_list_add (&m->background, thread) ;
+
+ return thread ;
+ } ;
}
+/*----------------------------------------------------------------------------*/
/* Add simple event thread. */
struct thread *
funcname_thread_add_event (struct thread_master *m,
- int (*func) (struct thread *), void *arg, int val, const char* funcname)
+ int (*func) (struct thread *), void *arg, int val,
+ const char* funcname)
{
struct thread *thread;
@@ -789,7 +986,11 @@ funcname_thread_add_event (struct thread_master *m,
return thread;
}
-/* Cancel thread from scheduler. */
+/*------------------------------------------------------------------------------
+ * Cancel thread from scheduler.
+ *
+ * Note that when using qtimer_pile need to unset any associated qtimer.
+ */
void
thread_cancel (struct thread *thread)
{
@@ -808,6 +1009,8 @@ thread_cancel (struct thread *thread)
list = &thread->master->write;
break;
case THREAD_TIMER:
+ if ((use_qtimer_pile != NULL) && (thread->u.qtr != NULL))
+ thread_qtimer_unset(thread) ;
list = &thread->master->timer;
break;
case THREAD_EVENT:
@@ -817,13 +1020,21 @@ thread_cancel (struct thread *thread)
list = &thread->master->ready;
break;
case THREAD_BACKGROUND:
- list = &thread->master->background;
+ if ((use_qtimer_pile != NULL) && (thread->u.qtr != NULL))
+ {
+ thread_qtimer_unset(thread) ;
+ list = &thread->master->timer;
+ }
+ else
+ list = &thread->master->background;
break;
+
default:
- return;
- break;
+ return ;
}
+
thread_list_delete (list, thread);
+
thread->type = THREAD_UNUSED;
thread_add_unuse (thread->master, thread);
}
@@ -854,24 +1065,12 @@ thread_cancel_event (struct thread_master *m, void *arg)
return ret;
}
-static struct timeval *
-thread_timer_wait (struct thread_list *tlist, struct timeval *timer_val)
-{
- if (!thread_empty (tlist))
- {
- *timer_val = timeval_subtract (tlist->head->u.sands, relative_time);
- return timer_val;
- }
- return NULL;
-}
-
static struct thread *
thread_run (struct thread_master *m, struct thread *thread,
struct thread *fetch)
{
*fetch = *thread;
thread->type = THREAD_UNUSED;
- thread->funcname = NULL; /* thread_call will free fetch's copied pointer */
thread_add_unuse (m, thread);
return fetch;
}
@@ -921,7 +1120,11 @@ thread_timer_process (struct thread_list *list, struct timeval *timenow)
return ready;
}
-/* Fetch next ready thread. */
+/*------------------------------------------------------------------------------
+ * Fetch next ready thread -- for standard thread handing.
+ *
+ * (This is not used when using qtimer_pile, or qnexus stuff.)
+ */
struct thread *
thread_fetch (struct thread_master *m, struct thread *fetch)
{
@@ -939,8 +1142,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
int num = 0;
/* Signals are highest priority */
- if (!qpthreads_enabled)
- quagga_sigevent_process ();
+ quagga_sigevent_process ();
/* Normal event are the next highest priority. */
if ((thread = thread_trim_head (&m->event)) != NULL)
@@ -1009,69 +1211,66 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
}
}
-
-/* Fetch next ready thread <= given priority. Events and timeouts only.
- * No I/O. If nothing to do returns NULL and sets event_wait to
- * recommended time to be called again. */
-struct thread *
-thread_fetch_event (enum qpn_priority priority, struct thread_master *m, struct thread *fetch,
- qtime_mono_t *event_wait)
+/*------------------------------------------------------------------------------
+ * Empties the event and ready queues.
+ *
+ * This is used when qnexus is managing most things, including I/O. Must be
+ * using qtimer_pile !
+ *
+ * This runs "legacy" event and ready queues only.
+ *
+ * Returns: the number of threads dispatched.
+ *
+ * Legacy timers are handled by the qtimer_pile, and their related threads will
+ * be placed on the ready queue when they expire.
+ *
+ * The background queue is handled separately.
+ */
+extern int
+thread_dispatch(struct thread_master *m)
{
- struct thread *thread;
- struct timeval timer_val;
- struct timeval timer_val_bg;
- struct timeval *timer_wait;
- struct timeval *timer_wait_bg;
-
- *event_wait = 0;
-
- /* Normal event are the next highest priority. */
- if ((thread = thread_trim_head (&m->event)) != NULL)
- return thread_run (m, thread, fetch);
+ struct thread_list* list ;
+ struct thread fetch ;
+ int count = 0 ;
- if (priority <= qpn_pri_first)
- return NULL;
-
- /* If there are any ready threads from previous scheduler runs,
- * process top of them.
- */
- if ((thread = thread_trim_head (&m->ready)) != NULL)
- return thread_run (m, thread, fetch);
-
- if (priority <= qpn_pri_second)
- return NULL;
-
- /* Check foreground timers. */
- quagga_get_relative (NULL);
- thread_timer_process (&m->timer, &relative_time);
-
- if ((thread = thread_trim_head (&m->ready)) != NULL)
- return thread_run (m, thread, fetch);
+ while (1)
+ {
+ if (thread_empty(list = &m->event))
+ if (thread_empty(list = &m->ready))
+ return count ;
- if (priority <= qpn_pri_third)
- return NULL;
+ thread_call(thread_run(m, thread_list_delete(list, list->head), &fetch)) ;
- /* Background timer/events, lowest priority */
- thread_timer_process (&m->background, &relative_time);
+ ++count ;
+ } ;
+} ;
- if ((thread = thread_trim_head (&m->ready)) != NULL)
- return thread_run (m, thread, fetch);
+/*------------------------------------------------------------------------------
+ * Dispatch first item on the background queue, if any.
+ *
+ * This is used when qnexus is managing most things.
+ *
+ * Background threads spend their lives being cycled around the background
+ * queue -- possibly via the timer queue, if a delay is put in before the next
+ * invocation.
+ *
+ * Returns: 1 if dispatched a background thread
+ * 0 if there are no background threads
+ */
+extern int
+thread_dispatch_background(struct thread_master *m)
+{
+ struct thread* thread ;
+ struct thread fetch ;
- /* Calculate select wait timer if nothing else to do */
- timer_wait = thread_timer_wait (&m->timer, &timer_val);
- timer_wait_bg = thread_timer_wait (&m->background, &timer_val_bg);
+ if ((thread = thread_trim_head (&m->background)) == NULL)
+ return 0 ;
- if (timer_wait_bg &&
- (!timer_wait || (timeval_cmp (*timer_wait, *timer_wait_bg) > 0)))
- timer_wait = timer_wait_bg;
+ thread_call(thread_run(m, thread, &fetch)) ;
- /* When is the next timer due ? */
- *event_wait = (timer_wait != NULL)
- ? timeval2qtime(timer_wait)
- : 0;
+ return 1 ;
+} ;
- return NULL;
-}
unsigned long
thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime)
@@ -1130,25 +1329,6 @@ thread_call (struct thread *thread)
unsigned long realtime, cputime;
RUSAGE_T ru;
- /* Cache a pointer to the relevant cpu history thread, if the thread
- * does not have it yet.
- *
- * Callers submitting 'dummy threads' hence must take care that
- * thread->cpu is NULL
- */
- if (!thread->hist)
- {
- struct cpu_thread_history tmp;
-
- tmp.func = thread->func;
- tmp.funcname = thread->funcname;
-
- LOCK
- thread->hist = hash_get (cpu_record, &tmp,
- (void * (*) (void *))cpu_record_hash_alloc);
- UNLOCK
- }
-
GETRUSAGE (&thread->ru);
(*thread->func) (thread);
@@ -1157,19 +1337,22 @@ thread_call (struct thread *thread)
realtime = thread_consumed_time (&ru, &thread->ru, &cputime);
- LOCK
- thread->hist->real.total += realtime;
- if (thread->hist->real.max < realtime)
- thread->hist->real.max = realtime;
+ if (thread->hist != NULL)
+ {
+ LOCK
+ thread->hist->real.total += realtime;
+ if (thread->hist->real.max < realtime)
+ thread->hist->real.max = realtime;
#ifdef HAVE_RUSAGE
- thread->hist->cpu.total += cputime;
- if (thread->hist->cpu.max < cputime)
- thread->hist->cpu.max = cputime;
+ thread->hist->cpu.total += cputime;
+ if (thread->hist->cpu.max < cputime)
+ thread->hist->cpu.max = cputime;
#endif
- ++(thread->hist->total_calls);
- thread->hist->types |= (1 << thread->add_type);
- UNLOCK
+ ++(thread->hist->total_calls);
+ thread->hist->types |= (1 << thread->add_type);
+ UNLOCK
+ } ;
#ifdef CONSUMED_TIME_CHECK
if (realtime > CONSUMED_TIME_CHECK)
@@ -1180,13 +1363,12 @@ thread_call (struct thread *thread)
* to fix.
*/
zlog_warn ("SLOW THREAD: task %s (%lx) ran for %lums (cpu time %lums)",
- thread->funcname,
+ (thread->hist != NULL) ? thread->hist->funcname : "??",
(unsigned long) thread->func,
realtime/1000, cputime/1000);
}
#endif /* CONSUMED_TIME_CHECK */
- XFREE (MTYPE_THREAD_FUNCNAME, thread->funcname);
}
/* Execute thread */
@@ -1207,11 +1389,9 @@ funcname_thread_execute (struct thread_master *m,
dummy.func = func;
dummy.arg = arg;
dummy.u.val = val;
- dummy.funcname = strip_funcname (funcname);
+ dummy.hist = thread_get_hist(&dummy, funcname) ;
thread_call (&dummy);
- XFREE (MTYPE_THREAD_FUNCNAME, dummy.funcname);
-
return NULL;
}
diff --git a/lib/thread.h b/lib/thread.h
index 1e68007a..fa021486 100644
--- a/lib/thread.h
+++ b/lib/thread.h
@@ -16,7 +16,7 @@
* You should have received a copy of the GNU General Public License
* along with GNU Zebra; see the file COPYING. If not, write to the Free
* Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- * 02111-1307, USA.
+ * 02111-1307, USA.
*/
#ifndef _ZEBRA_THREAD_H
@@ -25,6 +25,7 @@
#include <sys/resource.h>
#include "qtime.h"
#include "qpnexus.h"
+#include "qtimers.h"
struct rusage_t
{
@@ -68,22 +69,22 @@ struct thread
{
thread_type type; /* thread type */
thread_type add_type; /* thread type */
- struct thread *next; /* next pointer of the thread */
+ struct thread *next; /* next pointer of the thread */
struct thread *prev; /* previous pointer of the thread */
struct thread_master *master; /* pointer to the struct thread_master. */
int (*func) (struct thread *); /* event function */
void *arg; /* event argument */
union {
- int val; /* second argument of the event. */
+ int val; /* second argument of the event. */
int fd; /* file descriptor in case of read/write. */
- struct timeval sands; /* rest of time sands value. */
+ struct timeval sands; /* rest of time sands value. */
+ qtimer qtr ; /* pointer to related qtimer */
} u;
RUSAGE_T ru; /* Indepth usage info. */
struct cpu_thread_history *hist; /* cache pointer to cpu_history */
- char* funcname;
};
-struct cpu_thread_history
+struct cpu_thread_history
{
int (*func)(struct thread *);
const char *funcname;
@@ -169,8 +170,9 @@ extern struct thread_master *thread_master_create (void);
extern void thread_master_free (struct thread_master *);
extern void thread_init_r (void);
extern void thread_finish (void);
+extern void thread_set_qtimer_pile(qtimer_pile pile) ;
-extern struct thread *funcname_thread_add_read (struct thread_master *,
+extern struct thread *funcname_thread_add_read (struct thread_master *,
int (*)(struct thread *),
void *, int, const char*);
extern struct thread *funcname_thread_add_write (struct thread_master *,
@@ -196,8 +198,8 @@ extern struct thread *funcname_thread_execute (struct thread_master *,
extern void thread_cancel (struct thread *);
extern unsigned int thread_cancel_event (struct thread_master *, void *);
extern struct thread *thread_fetch (struct thread_master *, struct thread *);
-struct thread * thread_fetch_event (enum qpn_priority,struct thread_master *m, struct thread *fetch,
- qtime_mono_t *event_wait);
+extern int thread_dispatch(struct thread_master *m) ;
+extern int thread_dispatch_background(struct thread_master *m) ;
extern void thread_call (struct thread *);
extern unsigned long thread_timer_remain_second (struct thread *);
extern int thread_should_yield (struct thread *);
diff --git a/lib/workqueue.c b/lib/workqueue.c
index 7c811edd..6f2cd531 100644
--- a/lib/workqueue.c
+++ b/lib/workqueue.c
@@ -1,4 +1,4 @@
-/*
+/*
* Quagga Work Queue Support.
*
* Copyright (C) 2005 Sun Microsystems, Inc.
@@ -18,38 +18,30 @@
* You should have received a copy of the GNU General Public License
* along with Quagga; see the file COPYING. If not, write to the Free
* Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- * 02111-1307, USA.
+ * 02111-1307, USA.
*/
#include <lib/zebra.h>
#include "thread.h"
#include "memory.h"
#include "workqueue.h"
-#include "linklist.h"
#include "command.h"
#include "log.h"
+#include "linklist.h"
/* master list of work_queues */
static struct list work_queues;
#define WORK_QUEUE_MIN_GRANULARITY 1
-static struct work_queue_item *
-work_queue_item_new (struct work_queue *wq)
-{
- struct work_queue_item *item;
- assert (wq);
-
- item = XCALLOC (MTYPE_WORK_QUEUE_ITEM,
- sizeof (struct work_queue_item));
-
- return item;
-}
-
static void
-work_queue_item_free (struct work_queue_item *item)
+work_queue_item_free (struct work_queue *wq, struct work_queue_item *item)
{
- XFREE (MTYPE_WORK_QUEUE_ITEM, item);
+ /* call private data deletion callback if needed */
+ if (wq->spec.del_item_data != NULL)
+ wq->spec.del_item_data (wq, item) ;
+
+ XFREE (MTYPE_WORK_QUEUE_ITEM, item) ;
return;
}
@@ -58,46 +50,40 @@ struct work_queue *
work_queue_new (struct thread_master *m, const char *queue_name)
{
struct work_queue *new;
-
+
new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
if (new == NULL)
return new;
-
- new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
+
+ new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
new->master = m;
SET_FLAG (new->flags, WQ_UNPLUGGED);
-
- if ( (new->items = list_new ()) == NULL)
- {
- XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
- XFREE (MTYPE_WORK_QUEUE, new);
-
- return NULL;
- }
-
- new->items->del = (void (*)(void *)) work_queue_item_free;
-
+
listnode_add (&work_queues, new);
-
+
new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
/* Default values, can be overriden by caller */
new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
-
+
return new;
}
void
work_queue_free (struct work_queue *wq)
{
+ work_queue_item item ;
+
if (wq->thread != NULL)
thread_cancel(wq->thread);
-
- /* list_delete frees items via callback */
- list_delete (wq->items);
- listnode_delete (&work_queues, wq);
-
+
+ while ((item = wq->head) != NULL)
+ {
+ wq->head = item->next ;
+ work_queue_item_free(wq, item) ;
+ } ;
+
XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
XFREE (MTYPE_WORK_QUEUE, wq);
return;
@@ -109,59 +95,151 @@ work_queue_schedule (struct work_queue *wq, unsigned int delay)
/* if appropriate, schedule work queue thread */
if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
&& (wq->thread == NULL)
- && (listcount (wq->items) > 0) )
+ && (wq->head != NULL) )
{
- wq->thread = thread_add_background (wq->master, work_queue_run,
+ wq->thread = thread_add_background (wq->master, work_queue_run,
wq, delay);
return 1;
}
else
return 0;
}
-
-void
-work_queue_add (struct work_queue *wq, void *data)
+
+/*------------------------------------------------------------------------------
+ * Create new work queue item and place on the end of the given work queue.
+ *
+ * Schedules the work queue if there were no items (unless already scheduled
+ * or plugged).
+ *
+ * Returns the address of the args area in the new item.
+ */
+extern void*
+work_queue_item_add (struct work_queue *wq)
{
- struct work_queue_item *item;
-
+ work_queue_item item ;
+
assert (wq);
- if (!(item = work_queue_item_new (wq)))
+ item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, sizeof (struct work_queue_item));
+
+ if (item == NULL)
{
zlog_err ("%s: unable to get new queue item", __func__);
- return;
+ return NULL ;
+ }
+
+ item->next = NULL ;
+ if (wq->head == NULL)
+ {
+ assert(wq->list_count == 0) ;
+ wq->head = item ;
+ item->prev = NULL ;
}
-
- item->data = data;
- listnode_add (wq->items, item);
-
+ else
+ {
+ assert((wq->tail != NULL) && (wq->list_count > 0)) ;
+ wq->tail->next = item ;
+ item->prev = wq->tail ;
+ } ;
+ wq->tail = item ;
+
+ ++wq->list_count ;
work_queue_schedule (wq, wq->spec.hold);
-
- return;
+
+ return work_queue_item_args(item) ;
}
static void
-work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
+work_queue_item_remove (struct work_queue *wq, work_queue_item item)
{
- struct work_queue_item *item = listgetdata (ln);
+ assert ((wq != NULL) && (item != NULL)) ;
+
+ if (wq->head == item)
+ {
+ /* Removing the first item */
+ assert(item->prev == NULL) ;
+
+ wq->head = item->next ;
- assert (item && item->data);
+ if (wq->tail == item)
+ {
+ /* Removing the only item */
+ assert((item->next == NULL) && (wq->list_count == 1)) ;
+ wq->tail = NULL ;
+ }
+ else
+ {
+ /* First, but not the only item */
+ assert((item->next != NULL) && (wq->list_count > 1)) ;
+ wq->head->prev = NULL ;
+ } ;
+ }
+ else if (wq->tail == item)
+ {
+ /* Removing last, but not only item */
+ assert(item->next == NULL) ;
+ assert((item->prev != NULL) && (wq->list_count > 1)) ;
+
+ wq->tail = item->prev ;
+ wq->tail->next = NULL ;
+ }
+ else
+ {
+ /* Removing from somewhere in middle */
+ assert(item->next != NULL) ;
+ assert((item->prev != NULL) && (wq->list_count > 2)) ;
+
+ item->prev->next = item->next ;
+ item->next->prev = item->prev ;
+ } ;
- /* call private data deletion callback if needed */
- if (wq->spec.del_item_data)
- wq->spec.del_item_data (wq, item->data);
+ --wq->list_count ;
+ work_queue_item_free (wq, item);
- list_delete_node (wq->items, ln);
- work_queue_item_free (item);
-
return;
}
-static void
-work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
+static work_queue_item
+work_queue_item_requeue (struct work_queue *wq, work_queue_item item)
{
- LISTNODE_DETACH (wq->items, ln);
- LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
+ work_queue_item next = item->next ;
+ work_queue_item last = wq->tail ;
+
+ assert(last != NULL) ;
+
+ if (last == item)
+ {
+ /* Requeuing last item -- easy ! */
+ assert(next == NULL) ;
+ return item ;
+ } ;
+
+ assert(next != NULL) ;
+
+ if (wq->head == item)
+ {
+ /* Requeuing first, but not only item */
+ assert(item->prev == NULL) ;
+
+ wq->head = next ;
+ next->prev = NULL ;
+ }
+ else
+ {
+ /* Requeuing something in middle */
+ assert(item->prev != NULL) ;
+
+ item->prev->next = item->next ;
+ item->next->prev = item->prev ;
+ } ;
+
+ item->next = NULL ;
+ item->prev = last ;
+
+ last->next = item ;
+ wq->tail = item ;
+
+ return next ;
}
DEFUN(show_work_queues,
@@ -172,8 +250,8 @@ DEFUN(show_work_queues,
{
struct listnode *node;
struct work_queue *wq;
-
- vty_out (vty,
+
+ vty_out (vty,
"%c %8s %5s %8s %21s%s",
' ', "List","(ms) ","Q. Runs","Cycle Counts ",
VTY_NEWLINE);
@@ -183,24 +261,24 @@ DEFUN(show_work_queues,
"Items",
"Hold",
"Total",
- "Best","Gran.","Avg.",
- "Name",
+ "Best","Gran.","Avg.",
+ "Name",
VTY_NEWLINE);
-
+
for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
{
vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
(CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
- listcount (wq->items),
+ wq->list_count,
wq->spec.hold,
wq->runs,
wq->cycles.best, wq->cycles.granularity,
- (wq->runs) ?
+ (wq->runs) ?
(unsigned int) (wq->cycles.total / wq->runs) : 0,
wq->name,
VTY_NEWLINE);
}
-
+
return CMD_SUCCESS;
}
@@ -212,9 +290,9 @@ work_queue_plug (struct work_queue *wq)
{
if (wq->thread)
thread_cancel (wq->thread);
-
+
wq->thread = NULL;
-
+
UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
}
@@ -232,22 +310,21 @@ work_queue_unplug (struct work_queue *wq)
/* timer thread to process a work queue
* will reschedule itself if required,
- * otherwise work_queue_item_add
+ * otherwise work_queue_item_add
*/
int
work_queue_run (struct thread *thread)
{
struct work_queue *wq;
- struct work_queue_item *item;
+ work_queue_item next, item ;
wq_item_status ret;
unsigned int cycles = 0;
- struct listnode *node, *nnode;
char yielded = 0;
wq = THREAD_ARG (thread);
wq->thread = NULL;
- assert (wq && wq->items);
+ assert (wq != NULL) ;
/* calculate cycle granularity:
* list iteration == 1 cycle
@@ -258,38 +335,40 @@ work_queue_run (struct thread *thread)
*
* Best: starts low, can only increase
*
- * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
- * if we run to end of time slot, can increase otherwise
+ * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
+ * if we run to end of time slot, can increase otherwise
* by a small factor.
*
* We could use just the average and save some work, however we want to be
* able to adjust quickly to CPU pressure. Average wont shift much if
* daemon has been running a long time.
*/
- if (wq->cycles.granularity == 0)
- wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
+ if (wq->cycles.granularity == 0)
+ wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
- for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
+ next = wq->head ;
+ while (next != NULL)
{
- assert (item && item->data);
-
+ item = next ;
+ next = item->next ; /* default next item */
+
/* dont run items which are past their allowed retries */
if (item->ran > wq->spec.max_retries)
{
/* run error handler, if any */
- if (wq->spec.errorfunc)
- wq->spec.errorfunc (wq, item->data);
- work_queue_item_remove (wq, node);
+ if (wq->spec.errorfunc != NULL)
+ wq->spec.errorfunc (wq, item);
+ work_queue_item_remove (wq, item);
continue;
}
/* run and take care of items that want to be retried immediately */
do
{
- ret = wq->spec.workfunc (wq, item->data);
+ ret = wq->spec.workfunc (wq, item);
item->ran++;
}
- while ((ret == WQ_RETRY_NOW)
+ while ((ret == WQ_RETRY_NOW)
&& (item->ran < wq->spec.max_retries));
switch (ret)
@@ -308,21 +387,21 @@ work_queue_run (struct thread *thread)
case WQ_REQUEUE:
{
item->ran--;
- work_queue_item_requeue (wq, node);
+ next = work_queue_item_requeue (wq, item);
break;
}
case WQ_RETRY_NOW:
/* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
case WQ_ERROR:
{
- if (wq->spec.errorfunc)
+ if (wq->spec.errorfunc != NULL)
wq->spec.errorfunc (wq, item);
}
/* fall through here is deliberate */
case WQ_SUCCESS:
default:
{
- work_queue_item_remove (wq, node);
+ work_queue_item_remove (wq, item);
break;
}
}
@@ -331,7 +410,7 @@ work_queue_run (struct thread *thread)
cycles++;
/* test if we should yield */
- if ( !(cycles % wq->cycles.granularity)
+ if ( !(cycles % wq->cycles.granularity)
&& thread_should_yield (thread))
{
yielded = 1;
@@ -346,15 +425,15 @@ stats:
/* we yielded, check whether granularity should be reduced */
if (yielded && (cycles < wq->cycles.granularity))
{
- wq->cycles.granularity = ((cycles > 0) ? cycles
+ wq->cycles.granularity = ((cycles > 0) ? cycles
: WORK_QUEUE_MIN_GRANULARITY);
}
-
+
if (cycles >= (wq->cycles.granularity))
{
if (cycles > wq->cycles.best)
wq->cycles.best = cycles;
-
+
/* along with yielded check, provides hysteris for granularity */
if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR * 2))
wq->cycles.granularity *= WQ_HYSTERIS_FACTOR; /* quick ramp-up */
@@ -362,7 +441,7 @@ stats:
wq->cycles.granularity += WQ_HYSTERIS_FACTOR;
}
#undef WQ_HYSTERIS_FACTOR
-
+
wq->runs++;
wq->cycles.total += cycles;
@@ -370,12 +449,12 @@ stats:
printf ("%s: cycles %d, new: best %d, worst %d\n",
__func__, cycles, wq->cycles.best, wq->cycles.granularity);
#endif
-
+
/* Is the queue done yet? If it is, call the completion callback. */
- if (listcount (wq->items) > 0)
+ if (wq->head != NULL)
work_queue_schedule (wq, 0);
else if (wq->spec.completion_func)
wq->spec.completion_func (wq);
-
+
return 0;
}
diff --git a/lib/workqueue.h b/lib/workqueue.h
index f59499a0..5d2f2da2 100644
--- a/lib/workqueue.h
+++ b/lib/workqueue.h
@@ -1,4 +1,4 @@
-/*
+/*
* Quagga Work Queues.
*
* Copyright (C) 2005 Sun Microsystems, Inc.
@@ -18,14 +18,18 @@
* You should have received a copy of the GNU General Public License
* along with Quagga; see the file COPYING. If not, write to the Free
* Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- * 02111-1307, USA.
+ * 02111-1307, USA.
*/
#ifndef _QUAGGA_WORK_QUEUE_H
#define _QUAGGA_WORK_QUEUE_H
+#ifndef Inline
+#define Inline static inline
+#endif
+
/* Hold time for the initial schedule of a queue run, in millisec */
-#define WORK_QUEUE_DEFAULT_HOLD 50
+#define WORK_QUEUE_DEFAULT_HOLD 50
/* action value, for use by item processor and item error handlers */
typedef enum
@@ -40,12 +44,37 @@ typedef enum
* the particular item.. */
} wq_item_status;
+enum { wq_args_size_max = 24 } ; /* maximum size of union wq_args */
+
+union wq_args
+{
+ void* data ;
+ char bytes[wq_args_size_max] ; /* empty space `*/
+} ;
+
+#define WQ_ARGS_SIZE_OK(s) CONFIRM(sizeof(struct s) <= wq_args_size_max)
+
/* A single work queue item, unsurprisingly */
+typedef struct work_queue_item* work_queue_item ;
struct work_queue_item
{
- void *data; /* opaque data */
+ union wq_args args ; /* cast as required */
+
+ struct work_queue_item* next ; /* the queue itself */
+ struct work_queue_item* prev ;
+
unsigned short ran; /* # of times item has been run */
-};
+} ;
+
+/* work_queue_item structures are malloced. That guarantees maximum alignment.
+ * To guarantee maximum alignment for "struct args", it must be first item !
+ *
+ * (The typedef is required to stop Eclipse (3.4.2 with CDT 5.0) whining
+ * about first argument of offsetof().)
+ */
+typedef struct work_queue_item work_queue_item_t ;
+CONFIRM(offsetof(work_queue_item_t, args) == 0) ;
+ /* so guaranteed max alignment */
#define WQ_UNPLUGGED (1 << 0) /* available for draining */
@@ -57,52 +86,55 @@ struct work_queue
struct thread_master *master; /* thread master */
struct thread *thread; /* thread, if one is active */
char *name; /* work queue name */
-
+
/* Specification for this work queue.
* Public, must be set before use by caller. May be modified at will.
*/
struct {
/* optional opaque user data, global to the queue. */
void *data;
-
+
/* work function to process items with:
* First argument is the workqueue queue.
* Second argument is the item data
*/
- wq_item_status (*workfunc) (struct work_queue *, void *);
+ wq_item_status (*workfunc) (struct work_queue *, work_queue_item);
/* error handling function, optional */
- void (*errorfunc) (struct work_queue *, struct work_queue_item *);
-
+ void (*errorfunc) (struct work_queue *, work_queue_item);
+
/* callback to delete user specific item data */
- void (*del_item_data) (struct work_queue *, void *);
-
+ void (*del_item_data) (struct work_queue *, work_queue_item);
+
/* completion callback, called when queue is emptied, optional */
void (*completion_func) (struct work_queue *);
-
+
/* max number of retries to make for item that errors */
- unsigned int max_retries;
+ unsigned int max_retries;
unsigned int hold; /* hold time for first run, in ms */
} spec;
-
+
/* remaining fields should be opaque to users */
- struct list *items; /* queue item list */
- unsigned long runs; /* runs count */
-
+ work_queue_item head ; /* queue item list */
+ work_queue_item tail ;
+ unsigned list_count ;
+
+ unsigned long runs; /* runs count */
+
struct {
unsigned int best;
unsigned int granularity;
unsigned long total;
} cycles; /* cycle counts */
-
+
/* private state */
u_int16_t flags; /* user set flag */
};
/* User API */
-/* create a new work queue, of given name.
+/* create a new work queue, of given name.
* user must fill in the spec of the returned work queue before adding
* anything to it
*/
@@ -112,7 +144,10 @@ extern struct work_queue *work_queue_new (struct thread_master *,
extern void work_queue_free (struct work_queue *);
/* Add the supplied data as an item onto the workqueue */
-extern void work_queue_add (struct work_queue *, void *);
+Inline void work_queue_add (struct work_queue *, void *);
+
+extern void* work_queue_item_add(struct work_queue* wq) ;
+Inline void* work_queue_item_args(work_queue_item item) ;
/* plug the queue, ie prevent it from being drained / processed */
extern void work_queue_plug (struct work_queue *wq);
@@ -122,4 +157,22 @@ extern void work_queue_unplug (struct work_queue *wq);
/* Helpers, exported for thread.c and command.c */
extern int work_queue_run (struct thread *);
extern struct cmd_element show_work_queues_cmd;
+
+/*==============================================================================
+ * The Inline functions
+ */
+
+Inline void work_queue_add (struct work_queue* wq, void* data)
+{
+ union wq_args* args = work_queue_item_add(wq) ;
+ args->data = data ;
+}
+
+/* Return pointer to the args area in the given work queue item */
+Inline void*
+work_queue_item_args(work_queue_item item)
+{
+ return &item->args ;
+} ;
+
#endif /* _QUAGGA_WORK_QUEUE_H */