diff options
author | Chris Hall <GMCH@hestia.halldom.com> | 2010-02-16 09:52:14 +0000 |
---|---|---|
committer | Chris Hall <GMCH@hestia.halldom.com> | 2010-02-16 09:52:14 +0000 |
commit | 9856e17cf2495d1f7db16e866f16bc4a8447524d (patch) | |
tree | 260d0c56610ad8f8db533737a59cbda33665752f /lib/qpnexus.c | |
parent | 3b9932d5f7cdeac29a81bceeb190479b675a0435 (diff) | |
download | quagga-9856e17cf2495d1f7db16e866f16bc4a8447524d.tar.bz2 quagga-9856e17cf2495d1f7db16e866f16bc4a8447524d.tar.xz |
Revised thread/timer handling, work queue and scheduling.
Updated quagga thread handling to use qtimers when using the new
qpnexus -- so all timers are qtimers in the new scheme.
Updated work queue handling so that each work queue item is a single
malloced structure, not three. (Only bgpd and zebra use the work
queue system.)
When using qpnexus the background thread queue is no longer a timer
queue, but simply a list of pending background threads. When a
background thread is waiting on a timer, it is in the qtimer pile,
same like any other thread.
When using qpnexus, the only remaining quagga thread queues are the
event and ready queues.
Revised the qpnexus loop so that only when there is nothing else to
do will it consider the background threads.
Revised write I/O in the BGP Engine so that all writing is via the
connection's write buffer. Revised the write I/O in the Routeing
Engine, so that it passes groups of updates in a single mqueue
message. This all reduces the number of TCP packets sent (because
BGP messages are collected together in the connection's write buffer)
and reduces the number of mqueue messages involved.
(No need for TCP_CORK.)
Code and comments review for the new code.
modified: bgpd/bgp_advertise.c
modified: bgpd/bgp_common.h
modified: bgpd/bgp_connection.c
modified: bgpd/bgp_connection.h
modified: bgpd/bgp_engine.h
modified: bgpd/bgp_fsm.c
modified: bgpd/bgp_main.c
modified: bgpd/bgp_msg_read.c
modified: bgpd/bgp_msg_write.c
modified: bgpd/bgp_network.c
modified: bgpd/bgp_packet.c
modified: bgpd/bgp_packet.h
modified: bgpd/bgp_peer.c
modified: bgpd/bgp_peer_index.h
modified: bgpd/bgp_route.c
modified: bgpd/bgp_route_refresh.h
modified: bgpd/bgp_session.c
modified: bgpd/bgp_session.h
modified: bgpd/bgpd.c
new file: bgpd/bgpd.cx
modified: lib/mqueue.h
modified: lib/qpnexus.c
modified: lib/qpnexus.h
modified: lib/qpselect.c
modified: lib/qtimers.c
modified: lib/qtimers.h
modified: lib/sigevent.c
modified: lib/stream.c
modified: lib/stream.h
modified: lib/thread.c
modified: lib/thread.h
modified: lib/workqueue.c
modified: lib/workqueue.h
modified: tests/heavy-wq.c
modified: zebra/zebra_rib.c
Diffstat (limited to 'lib/qpnexus.c')
-rw-r--r-- | lib/qpnexus.c | 166 |
1 files changed, 96 insertions, 70 deletions
diff --git a/lib/qpnexus.c b/lib/qpnexus.c index 8a78a70b..cb0bd12c 100644 --- a/lib/qpnexus.c +++ b/lib/qpnexus.c @@ -36,7 +36,13 @@ static void qpn_in_thread_init(qpn_nexus qpn); */ -/* Initialise a nexus -- allocating it if required. +/*============================================================================== + * Initialisation, add hook, free etc. + * + */ + +/*------------------------------------------------------------------------------ + * Initialise a nexus -- allocating it if required. * * If main_thread is set then no new thread will be created * when qpn_exec() is called, instead the finite state machine will be @@ -45,7 +51,7 @@ static void qpn_in_thread_init(qpn_nexus qpn); * * Returns the qpn_nexus. */ -qpn_nexus +extern qpn_nexus qpn_init_new(qpn_nexus qpn, int main_thread) { if (qpn == NULL) @@ -53,16 +59,27 @@ qpn_init_new(qpn_nexus qpn, int main_thread) else memset(qpn, 0, sizeof(struct qpn_nexus)) ; - qpn->selection = qps_selection_init_new(qpn->selection); - qpn->pile = qtimer_pile_init_new(qpn->pile); - qpn->queue = mqueue_init_new(qpn->queue, mqt_signal_unicast); + qpn->selection = qps_selection_init_new(qpn->selection); + qpn->pile = qtimer_pile_init_new(qpn->pile); + qpn->queue = mqueue_init_new(qpn->queue, mqt_signal_unicast); qpn->main_thread = main_thread; - qpn->start = qpn_start; + qpn->start = qpn_start; return qpn; } -/* free timers, selection, message queue and nexus +/*------------------------------------------------------------------------------ + * Add a hook function to the given nexus. + */ +extern void +qpn_add_hook_function(qpn_hook_list list, void* hook) +{ + passert(list->count < qpn_hooks_max) ; + list->hooks[list->count++] = hook ; +} ; + +/*------------------------------------------------------------------------------ + * free timers, selection, message queue and nexus * return NULL */ qpn_nexus @@ -99,24 +116,25 @@ qpn_free(qpn_nexus qpn) return NULL; } -/* If not main thread create new qpthread. - * Execute the state machine */ -void +/*============================================================================== + * Execution of a nexus + */ + +/*------------------------------------------------------------------------------ + * If not main qpthread create new qpthread. + * + * For all qpthreads: start the thread ! + */ +extern void qpn_exec(qpn_nexus qpn) { if (qpn->main_thread) - { - /* Run the state machine in calling thread */ - qpn->start(qpn); - } + qpn->start(qpn); else - { - /* create a qpthread and run the state machine in it */ - qpt_thread_create(qpn->start, qpn, NULL) ; - } -} + qpt_thread_create(qpn->start, qpn, NULL) ; +} ; -/*============================================================================== +/*------------------------------------------------------------------------------ * Pthread routine * * Processes: @@ -145,78 +163,86 @@ qpn_start(void* arg) qpn_nexus qpn = arg; mqueue_block mqb; int actions; - qtime_mono_t now; - qtime_mono_t max_wait; - int i; + qtime_mono_t now ; + qtime_t max_wait ; + unsigned i; + unsigned done ; + unsigned wait ; - /* now in our thread, complete initialisation */ + /* now in our thread, complete initialisation */ qpn_in_thread_init(qpn); + /* Until required to terminate, loop */ + done = 1 ; while (!qpn->terminate) { - now = qt_get_monotonic(); + wait = (done == 0) ; /* may wait this time only if nothing + found to do on the last pass */ - /* Signals are highest priority. - * only execute on the main thread */ + /* Signals are highest priority -- only execute for main thread + * + * Restarts "done" for this pass. + */ if (qpn->main_thread) - quagga_sigevent_process (); + done = quagga_sigevent_process() ; + else + done = 0 ; - /* max time to wait in pselect */ - max_wait = QTIME(MAX_PSELECT_TIMOUT); - - /* event hooks, if any. High priority */ - for (i = 0; i < NUM_EVENT_HOOK; ++i) - { - if (qpn->event_hook[i] != NULL) - { - /* first, second and third priority */ - qtime_mono_t event_wait = qpn->event_hook[i](qpn_pri_third); - if (event_wait > 0 && event_wait < max_wait) - max_wait = event_wait; - } - } + /* Foreground hooks, if any. */ + for (i = 0; i < qpn->foreground.count ; ++i) + done |= ((qpn_hook_function*)(qpn->foreground.hooks[i]))() ; /* drain the message queue, will be in waiting for signal state * when it's empty */ - for (;;) + + if (done != 0) + wait = 0 ; /* turn off wait if found something */ + + while (1) { - mqb = mqueue_dequeue(qpn->queue, 1, qpn->mts) ; + mqb = mqueue_dequeue(qpn->queue, wait, qpn->mts) ; if (mqb == NULL) break; mqb_dispatch(mqb, mqb_action); - } - /* Event hooks, if any. All priorities */ - for (i = 0; i < NUM_EVENT_HOOK; ++i) - { - if (qpn->event_hook[i] != NULL) - { - /* first, second third and fourth priority */ - qtime_mono_t event_wait = qpn->event_hook[i](qpn_pri_fourth); - if (event_wait > 0 && event_wait < max_wait) - max_wait = event_wait; - } - } - - /* block for some input, output, signal or timeout */ - actions = qps_pselect(qpn->selection, - qtimer_pile_top_time(qpn->pile, now + max_wait)); - - /* process I/O actions */ - while (actions) - actions = qps_dispatch_next(qpn->selection) ; + done = 1 ; /* done something */ + wait = 0 ; /* turn off wait */ + } ; - mqueue_done_waiting(qpn->queue, qpn->mts); + /* block for some input, output, signal or timeout + * + * wait will be true iff did nothing the last time round the loop, and + * not found anything to be done up to this point either. + */ + if (wait) + max_wait = qtimer_pile_top_wait(qpn->pile, QTIME(MAX_PSELECT_WAIT)) ; + else + max_wait = 0 ; + + actions = qps_pselect(qpn->selection, max_wait) ; + done |= actions ; + + if (wait) + mqueue_done_waiting(qpn->queue, qpn->mts); + + /* process I/O actions */ + while (actions) + actions = qps_dispatch_next(qpn->selection) ; - /* process timers */ + /* process timers */ + now = qt_get_monotonic() ; while (qtimer_pile_dispatch_next(qpn->pile, now)) - { - } - } + done = 1 ; + + /* If nothing done in this pass, see if anything in the background */ + if (done == 0) + for (i = 0; i < qpn->background.count ; ++i) + done |= ((qpn_hook_function*)(qpn->background.hooks[i]))() ; + } ; /* last bit of code to run in this thread */ - if (qpn->in_thread_final) + if (qpn->in_thread_final != NULL) qpn->in_thread_final(); return NULL; |