diff options
-rw-r--r-- | bgpd/bgp_connection.c | 1 | ||||
-rw-r--r-- | bgpd/bgp_main.c | 13 | ||||
-rw-r--r-- | lib/qpnexus.c | 27 | ||||
-rw-r--r-- | lib/qpnexus.h | 20 | ||||
-rw-r--r-- | lib/thread.c | 53 | ||||
-rw-r--r-- | lib/thread.h | 3 |
6 files changed, 66 insertions, 51 deletions
diff --git a/bgpd/bgp_connection.c b/bgpd/bgp_connection.c index 81e83a71..b74b8cef 100644 --- a/bgpd/bgp_connection.c +++ b/bgpd/bgp_connection.c @@ -441,7 +441,6 @@ bgp_connection_queue_del(bgp_connection connection) * Process each item until its pending queue becomes empty, or its write * buffer becomes full, or it is stopped. * - * TODO: link bgp_connection_queue_process() into the bgp_engine loop. */ extern void bgp_connection_queue_process(void) diff --git a/bgpd/bgp_main.c b/bgpd/bgp_main.c index 05868896..3ad9e0c3 100644 --- a/bgpd/bgp_main.c +++ b/bgpd/bgp_main.c @@ -84,8 +84,8 @@ void sigusr2 (void); static void bgp_exit (int); static void init_second_stage(int pthreads); static void bgp_in_thread_init(void); -static qtime_mono_t routing_event_hook(void); -static qtime_mono_t bgp_event_hook(void); +static qtime_mono_t routing_event_hook(enum qpn_priority priority); +static qtime_mono_t bgp_event_hook(enum qpn_priority priority); static void sighup_action(mqueue_block mqb, mqb_flag_t flag); static void sighup_enqueue(void); static void sigterm_action(mqueue_block mqb, mqb_flag_t flag); @@ -618,12 +618,12 @@ bgp_in_thread_init(void) /* legacy threads */ static qtime_mono_t -routing_event_hook(void) +routing_event_hook(enum qpn_priority priority) { struct thread thread; qtime_mono_t event_wait; - while (thread_fetch_event (master, &thread, &event_wait)) + while (thread_fetch_event (priority, master, &thread, &event_wait)) thread_call (&thread); return event_wait; @@ -631,9 +631,10 @@ routing_event_hook(void) /* BGP local queued events */ static qtime_mono_t -bgp_event_hook(void) +bgp_event_hook(enum qpn_priority priority) { - bgp_connection_queue_process(); + if (priority >= qpn_pri_fourth) + bgp_connection_queue_process(); return 0; } diff --git a/lib/qpnexus.c b/lib/qpnexus.c index 96615253..8a78a70b 100644 --- a/lib/qpnexus.c +++ b/lib/qpnexus.c @@ -123,18 +123,20 @@ qpn_exec(qpn_nexus qpn) * * 1) Main thread only -- signals. * - * 2) Pending work -- event hooks. + * 2) High priority pending work -- event hooks. * - * 3) messages coming from other pthreads -- mqueue_queue. + * 3) Messages coming from other pthreads -- mqueue_queue. * - * 4) I/O -- qpselect + * 4) All priority pending work -- event hooks. + * + * 5) I/O -- qpselect * * This deals with all active sockets for read/write/connect/accept. * * Each time a socket is readable, one message is read and dispatched. * The pselect timeout is set to be when the next timer is due. * - * 5) Timers -- qtimers + * 6) Timers -- qtimers * */ static void* @@ -162,12 +164,13 @@ qpn_start(void* arg) /* max time to wait in pselect */ max_wait = QTIME(MAX_PSELECT_TIMOUT); - /* event hooks, if any */ + /* event hooks, if any. High priority */ for (i = 0; i < NUM_EVENT_HOOK; ++i) { if (qpn->event_hook[i] != NULL) { - qtime_mono_t event_wait = qpn->event_hook[i](); + /* first, second and third priority */ + qtime_mono_t event_wait = qpn->event_hook[i](qpn_pri_third); if (event_wait > 0 && event_wait < max_wait) max_wait = event_wait; } @@ -184,6 +187,18 @@ qpn_start(void* arg) mqb_dispatch(mqb, mqb_action); } + /* Event hooks, if any. All priorities */ + for (i = 0; i < NUM_EVENT_HOOK; ++i) + { + if (qpn->event_hook[i] != NULL) + { + /* first, second third and fourth priority */ + qtime_mono_t event_wait = qpn->event_hook[i](qpn_pri_fourth); + if (event_wait > 0 && event_wait < max_wait) + max_wait = event_wait; + } + } + /* block for some input, output, signal or timeout */ actions = qps_pselect(qpn->selection, qtimer_pile_top_time(qpn->pile, now + max_wait)); diff --git a/lib/qpnexus.h b/lib/qpnexus.h index 32219df1..a6cad148 100644 --- a/lib/qpnexus.h +++ b/lib/qpnexus.h @@ -56,6 +56,19 @@ /* number of event hooks */ #define NUM_EVENT_HOOK 2 +/* Work priorities */ +enum qpn_priority +{ + qpn_pri_highest = 1, + + qpn_pri_first = 1, + qpn_pri_second = 2, + qpn_pri_third = 3, + qpn_pri_fourth = 4, + + qpn_pri_lowest = 4, +}; + /*============================================================================== * Data Structures. */ @@ -95,11 +108,12 @@ struct qpn_nexus * thread loop is no longer executed */ void (*in_thread_final)(void); - /* thread loop events, can override. Called before message queue, - * I/O and timers. + /* thread loop events, can override. Called before and after message queue, + * and before I/O and timers. + * Hook should perform all work <= given priority. * Returns the time to try again, 0 means default to maximum. */ - qtime_mono_t (*event_hook[NUM_EVENT_HOOK])(void); + qtime_mono_t (*event_hook[NUM_EVENT_HOOK])(enum qpn_priority); }; diff --git a/lib/thread.c b/lib/thread.c index e581dd6f..f2b873ac 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -47,13 +47,6 @@ static qpt_mutex_t thread_mutex; #define UNLOCK qpt_mutex_unlock(&thread_mutex); static struct hash *cpu_record = NULL; -/* TODO: remove this */ -#define USE_MQUEUE -#ifdef USE_MQUEUE -#include "qpnexus.h" -static sigset_t newmask; -#endif - /* Struct timeval's tv_usec one second value. */ #define TIMER_SECOND_MICRO 1000000L @@ -973,33 +966,13 @@ thread_fetch (struct thread_master *m, struct thread *fetch) (!timer_wait || (timeval_cmp (*timer_wait, *timer_wait_bg) > 0))) timer_wait = timer_wait_bg; - /* TODO: remove this */ -#ifdef USE_MQUEUE - { - struct timespec spec ; - spec.tv_sec = timer_wait->tv_sec ; - spec.tv_nsec = timer_wait->tv_usec * 1000 ; - num = pselect (FD_SETSIZE, &readfd, &writefd, &exceptfd, &spec, - &newmask); - } ; -#else num = select (FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait); -#endif /* Signals should get quick treatment */ if (num < 0) { if (errno == EINTR) -#ifdef USE_MQUEUE - { - if (qpthreads_enabled) - return NULL; - else - continue; /* signal received - process it */ - } -#else continue; /* signal received - process it */ -#endif zlog_warn ("select() error: %s", safe_strerror (errno)); return NULL; } @@ -1037,11 +1010,11 @@ thread_fetch (struct thread_master *m, struct thread *fetch) } -/* Fetch next ready thread. Events and timeouts only. No I/O. - * If nothing to do returns NULL and sets event_wait to recommended time - * to be called again. */ +/* Fetch next ready thread <= given priority. Events and timeouts only. + * No I/O. If nothing to do returns NULL and sets event_wait to + * recommended time to be called again. */ struct thread * -thread_fetch_event (struct thread_master *m, struct thread *fetch, +thread_fetch_event (enum qpn_priority priority, struct thread_master *m, struct thread *fetch, qtime_mono_t *event_wait) { struct thread *thread; @@ -1050,22 +1023,34 @@ thread_fetch_event (struct thread_master *m, struct thread *fetch, struct timeval *timer_wait; struct timeval *timer_wait_bg; + *event_wait = 0; + /* Normal event are the next highest priority. */ if ((thread = thread_trim_head (&m->event)) != NULL) return thread_run (m, thread, fetch); + if (priority <= qpn_pri_first) + return NULL; + /* If there are any ready threads from previous scheduler runs, * process top of them. */ if ((thread = thread_trim_head (&m->ready)) != NULL) return thread_run (m, thread, fetch); - /* Check foreground timers. Historically, they have had higher - priority than I/O threads, so let's push them onto the ready - list in front of the I/O threads. */ + if (priority <= qpn_pri_second) + return NULL; + + /* Check foreground timers. */ quagga_get_relative (NULL); thread_timer_process (&m->timer, &relative_time); + if ((thread = thread_trim_head (&m->ready)) != NULL) + return thread_run (m, thread, fetch); + + if (priority <= qpn_pri_third) + return NULL; + /* Background timer/events, lowest priority */ thread_timer_process (&m->background, &relative_time); diff --git a/lib/thread.h b/lib/thread.h index b0699650..1e68007a 100644 --- a/lib/thread.h +++ b/lib/thread.h @@ -24,6 +24,7 @@ #include <sys/resource.h> #include "qtime.h" +#include "qpnexus.h" struct rusage_t { @@ -195,7 +196,7 @@ extern struct thread *funcname_thread_execute (struct thread_master *, extern void thread_cancel (struct thread *); extern unsigned int thread_cancel_event (struct thread_master *, void *); extern struct thread *thread_fetch (struct thread_master *, struct thread *); -struct thread * thread_fetch_event (struct thread_master *m, struct thread *fetch, +struct thread * thread_fetch_event (enum qpn_priority,struct thread_master *m, struct thread *fetch, qtime_mono_t *event_wait); extern void thread_call (struct thread *); extern unsigned long thread_timer_remain_second (struct thread *); |