diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/command_queue.c | 53 | ||||
-rw-r--r-- | lib/qpnexus.c | 150 | ||||
-rw-r--r-- | lib/qpnexus.h | 1 |
3 files changed, 120 insertions, 84 deletions
diff --git a/lib/command_queue.c b/lib/command_queue.c index 69c8eca6..3bb3473f 100644 --- a/lib/command_queue.c +++ b/lib/command_queue.c @@ -29,34 +29,18 @@ /* Prototypes */ static void cq_action(mqueue_block mqb, mqb_flag_t flag); -/* We have too many parameters for a message queue block so have to marshal */ -struct marshal -{ - struct cmd_element *matched_element; - struct vty *vty; - int argc; - char **argv; -}; - void cq_enqueue(struct cmd_element *matched_element, struct vty *vty, int argc, const char *argv[], qpn_nexus bgp_nexus) { - struct marshal *wyatt = XCALLOC(MTYPE_MARSHAL, sizeof(struct marshal)) ; int i; - mqueue_block mqb = NULL; + mqueue_block mqb = mqb_init_new(NULL, cq_action, matched_element) ; - wyatt->matched_element = matched_element; - wyatt->vty = vty; - wyatt->argc = argc; - wyatt->argv = argc ? XCALLOC(MTYPE_MARSHAL, sizeof (char*) * argc) : NULL; + /* all parameters are pointers so use the queue's argv */ + mqb_push_argv_p(mqb, vty); for (i = 0; i < argc; ++i) - { - wyatt->argv[i] = XSTRDUP(MTYPE_MARSHAL, argv[i]); - } + mqb_push_argv_p(mqb, XSTRDUP(MTYPE_MARSHAL, argv[i])); - mqb = mqb_init_new(mqb, cq_action, 0) ; - mqb->arg0 = wyatt; mqueue_enqueue(bgp_nexus->queue, mqb, 0) ; } @@ -66,25 +50,32 @@ cq_action(mqueue_block mqb, mqb_flag_t flag) { int result; int i; - struct marshal *wyatt = mqb->arg0; + struct cmd_element *matched_element; + struct vty *vty; + void **argv; + int argc; + + matched_element = mqb_get_arg0(mqb); + argc = mqb_get_argv_count(mqb); + argv = mqb_get_argv(mqb) ; + + vty = argv[0]; + argv++; + argc--; if (flag == mqb_action) { /* Execute matched command. */ - result = (*wyatt->matched_element->func) - (wyatt->matched_element, wyatt->vty, wyatt->argc, (const char **)wyatt->argv); + result = (matched_element->func) + (matched_element, vty, argc, (const char **)argv); /* report */ - vty_queued_result(wyatt->vty, result); + vty_queued_result(vty, result); } /* clean up */ - for (i = 0; i< wyatt->argc; ++i) - { - XFREE(MTYPE_MARSHAL, wyatt->argv[i]); - } - if (wyatt->argv) - XFREE(MTYPE_MARSHAL, wyatt->argv); - XFREE(MTYPE_MARSHAL, wyatt); + for (i = 0; i < argc; ++i) + XFREE(MTYPE_MARSHAL, argv[i]); + mqb_free(mqb); } diff --git a/lib/qpnexus.c b/lib/qpnexus.c index d8a8bbc6..5ecb97cf 100644 --- a/lib/qpnexus.c +++ b/lib/qpnexus.c @@ -27,14 +27,15 @@ #include "sigevent.h" /* prototypes */ -static void* qpn_start_main(void* arg); +static void* qpn_start(void* arg); static void* qpn_start_bgp(void* arg); +static void qpn_in_thread_init(qpn_nexus qpn); /* Master of the threads. */ extern struct thread_master *master; /*============================================================================== - * Quagga Nexus Interface -- qpt_xxxx + * Quagga Nexus Interface -- qpn_xxxx * */ @@ -44,9 +45,9 @@ extern struct thread_master *master; * If main_thread is set then no new thread will be created * when qpn_exec() is called, instead the finite state machine will be * run in the calling thread. The main thread will only block the - * message queue's signal. Non main threads will block all signals. + * message queue's signal. Non-main threads will block most signals. * - * Returns the qtn_nexus. + * Returns the qpn_nexus. */ qpn_nexus qpn_init_new(qpn_nexus qpn) @@ -59,21 +60,21 @@ qpn_init_new(qpn_nexus qpn) return qpn; } -/* Initialize main qpthread, no queue */ +/* Initialize main qpthread */ qpn_nexus qpn_init_main(qpn_nexus qpn) { qpn = qpn_init_new(qpn); - qpn->selection = qps_selection_init_new(qpn->selection); qpn->pile = qtimer_pile_init_new(qpn->pile); + qpn->queue = mqueue_init_new(qpn->queue, mqt_signal_unicast); qpn->main_thread = 1; - qpn->start = qpn_start_main; + qpn->start = qpn_start; return qpn; } -/* Initialize bgp qpthread */ +/* Initialize bgp engine's qpthread */ qpn_nexus qpn_init_bgp(qpn_nexus qpn) { @@ -84,6 +85,24 @@ qpn_init_bgp(qpn_nexus qpn) return qpn; } +/* Initialize Routing engine's qpthread + * + * Although not expected to do I/O we still use qps_selection (pselect) as + * the mechanism to wait for either a timeout or a signal from the message + * queue. +*/ +qpn_nexus +qpn_init_routing(qpn_nexus qpn) +{ + qpn = qpn_init_new(qpn); + qpn->selection = qps_selection_init_new(qpn->selection); + qpn->pile = qtimer_pile_init_new(qpn->pile); + qpn->queue = mqueue_init_new(qpn->queue, mqt_signal_unicast); + qpn->start = qpn_start; + + return qpn; +} + /* free timers, selection, message queue and nexus * return NULL */ @@ -100,21 +119,19 @@ qpn_free(qpn_nexus qpn) if (qpn->pile != NULL) { while ((qtr = qtimer_pile_ream(qpn->pile, 1))) - { qtimer_free(qtr); - } } /* files and selection */ if (qpn->selection != NULL) { while ((qf = qps_selection_ream(qpn->selection, 1))) - { qps_file_free(qf); - } } - /* TODO: free qtn->queue */ + /* TODO: free qpn->queue */ + /* TODO: free qpn->mts */ + XFREE(MTYPE_QPN_NEXUS, qpn) ; @@ -138,30 +155,26 @@ qpn_exec(qpn_nexus qpn) } } -/* Main qpthread, prep signals, then run finite state machine - * using qps_selection and qtimer +/* Thread routine, complete init, then run finite state machine + * using mqueue, qps_selection and qtimer */ static void* -qpn_start_main(void* arg) +qpn_start(void* arg) { qpn_nexus qpn = arg; + mqueue_block mqb; int actions; qtime_mono_t now; - sigset_t newmask; - qpn->thread_id = qpt_thread_self(); - - /* Main thread, block the message queue's signal */ - sigemptyset (&newmask); - sigaddset (&newmask, SIGMQUEUE); - qpt_thread_sigmask(SIG_BLOCK, &newmask, NULL); - qps_set_signal(qpn->selection, SIGMQUEUE, newmask); + /* now in our thread, complete initialisation */ + qpn_in_thread_init(qpn); while (!qpn->terminate) { /* Signals are highest priority. * only execute on the main thread */ - quagga_sigevent_process (); + if (qpn->main_thread) + quagga_sigevent_process (); /* process timers */ now = qt_get_monotonic(); @@ -169,21 +182,31 @@ qpn_start_main(void* arg) { } - /* block for some input, output or timeout */ - actions = qps_pselect( qpn->selection, + /* drain the message queue, will be waiting when it's empty */ + for (;;) + { + mqb = mqueue_dequeue(qpn->queue, 1, qpn->mts) ; + if (mqb == NULL) + break; + + mqb_dispatch(mqb, mqb_action); + } + + /* block for some input, output, signal or timeout */ + actions = qps_pselect(qpn->selection, qtimer_pile_top_time(qpn->pile, now + QTIME(MAX_PSELECT_TIMOUT)) ); /* process I/O actions */ while (actions) - { actions = qps_dispatch_next(qpn->selection) ; - } + + mqueue_done_waiting(qpn->queue, qpn->mts); } return NULL; } -/* Bgp prep signals, then run finite state machine +/* Bgp engine's qpthread, complete init, then run finite state machine * using legacy threads */ static void* @@ -192,33 +215,12 @@ qpn_start_bgp(void* arg) qpn_nexus qpn = arg; struct thread thread; mqueue_block mqb; - sigset_t newmask; - - qpn->thread_id = qpt_thread_self(); - /* - * Not main thread. Block most signals, but be careful not to - * defer SIGTRAP because doing so breaks gdb, at least on - * NetBSD 2.0. Avoid asking to block SIGKILL, just because - * we shouldn't be able to do so. Avoid blocking SIGFPE, - * SIGILL, SIGSEGV, SIGBUS as this is undefined by POSIX. - * Don't block SIGPIPE so that is gets ignored on this thread. - */ - sigfillset (&newmask); - sigdelset (&newmask, SIGTRAP); - sigdelset (&newmask, SIGKILL); - sigdelset (&newmask, SIGPIPE); - sigdelset (&newmask, SIGFPE); - sigdelset (&newmask, SIGILL); - sigdelset (&newmask, SIGSEGV); - sigdelset (&newmask, SIGBUS); - - qpt_thread_sigmask(SIG_BLOCK, &newmask, NULL); - qpn->mts = mqueue_thread_signal_init(qpn->mts, qpn->thread_id, SIGMQUEUE); + /* now in our thread, complete initialisation */ + qpn_in_thread_init(qpn); while (!qpn->terminate) { - /* drain the message queue, will be waiting when it's empty */ for (;;) { @@ -239,6 +241,48 @@ qpn_start_bgp(void* arg) return NULL; } +/* Now running in our thread, complete initialisation */ +static void +qpn_in_thread_init(qpn_nexus qpn) +{ + sigset_t newmask; + + qpn->thread_id = qpt_thread_self(); + + if (qpn->main_thread) + { + /* Main thread, block the message queue's signal */ + sigemptyset (&newmask); + sigaddset (&newmask, SIGMQUEUE); + } + else + { + /* + * Not main thread. Block most signals, but be careful not to + * defer SIGTRAP because doing so breaks gdb, at least on + * NetBSD 2.0. Avoid asking to block SIGKILL, just because + * we shouldn't be able to do so. Avoid blocking SIGFPE, + * SIGILL, SIGSEGV, SIGBUS as this is undefined by POSIX. + * Don't block SIGPIPE so that is gets ignored on this thread. + */ + sigfillset (&newmask); + sigdelset (&newmask, SIGTRAP); + sigdelset (&newmask, SIGKILL); + sigdelset (&newmask, SIGPIPE); + sigdelset (&newmask, SIGFPE); + sigdelset (&newmask, SIGILL); + sigdelset (&newmask, SIGSEGV); + sigdelset (&newmask, SIGBUS); + } + qpt_thread_sigmask(SIG_BLOCK, &newmask, NULL); + + /* Now we have thread_id and mask, prep for using message queue. */ + if (qpn->queue != NULL) + qpn->mts = mqueue_thread_signal_init(qpn->mts, qpn->thread_id, SIGMQUEUE); + if (qpn->selection != NULL) + qps_set_signal(qpn->selection, SIGMQUEUE, newmask); +} + /* Ask the thread to terminate itself quickly and cleanly */ void qpn_terminate(qpn_nexus qpn) diff --git a/lib/qpnexus.h b/lib/qpnexus.h index 7436c55d..69fe8044 100644 --- a/lib/qpnexus.h +++ b/lib/qpnexus.h @@ -92,6 +92,7 @@ struct qpn_nexus extern qpn_nexus qpn_init_new(qpn_nexus qtn); extern qpn_nexus qpn_init_main(qpn_nexus qtn); extern qpn_nexus qpn_init_bgp(qpn_nexus qtn); +extern qpn_nexus qpn_init_routing(qpn_nexus qtn); extern void qpn_exec(qpn_nexus qtn); extern void qpn_terminate(qpn_nexus qpn); extern qpn_nexus qpn_free(qpn_nexus qpn); |