summaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/qpnexus.c27
-rw-r--r--lib/qpnexus.h20
-rw-r--r--lib/thread.c53
-rw-r--r--lib/thread.h3
4 files changed, 59 insertions, 44 deletions
diff --git a/lib/qpnexus.c b/lib/qpnexus.c
index 96615253..8a78a70b 100644
--- a/lib/qpnexus.c
+++ b/lib/qpnexus.c
@@ -123,18 +123,20 @@ qpn_exec(qpn_nexus qpn)
*
* 1) Main thread only -- signals.
*
- * 2) Pending work -- event hooks.
+ * 2) High priority pending work -- event hooks.
*
- * 3) messages coming from other pthreads -- mqueue_queue.
+ * 3) Messages coming from other pthreads -- mqueue_queue.
*
- * 4) I/O -- qpselect
+ * 4) All priority pending work -- event hooks.
+ *
+ * 5) I/O -- qpselect
*
* This deals with all active sockets for read/write/connect/accept.
*
* Each time a socket is readable, one message is read and dispatched.
* The pselect timeout is set to be when the next timer is due.
*
- * 5) Timers -- qtimers
+ * 6) Timers -- qtimers
*
*/
static void*
@@ -162,12 +164,13 @@ qpn_start(void* arg)
/* max time to wait in pselect */
max_wait = QTIME(MAX_PSELECT_TIMOUT);
- /* event hooks, if any */
+ /* event hooks, if any. High priority */
for (i = 0; i < NUM_EVENT_HOOK; ++i)
{
if (qpn->event_hook[i] != NULL)
{
- qtime_mono_t event_wait = qpn->event_hook[i]();
+ /* first, second and third priority */
+ qtime_mono_t event_wait = qpn->event_hook[i](qpn_pri_third);
if (event_wait > 0 && event_wait < max_wait)
max_wait = event_wait;
}
@@ -184,6 +187,18 @@ qpn_start(void* arg)
mqb_dispatch(mqb, mqb_action);
}
+ /* Event hooks, if any. All priorities */
+ for (i = 0; i < NUM_EVENT_HOOK; ++i)
+ {
+ if (qpn->event_hook[i] != NULL)
+ {
+ /* first, second third and fourth priority */
+ qtime_mono_t event_wait = qpn->event_hook[i](qpn_pri_fourth);
+ if (event_wait > 0 && event_wait < max_wait)
+ max_wait = event_wait;
+ }
+ }
+
/* block for some input, output, signal or timeout */
actions = qps_pselect(qpn->selection,
qtimer_pile_top_time(qpn->pile, now + max_wait));
diff --git a/lib/qpnexus.h b/lib/qpnexus.h
index 32219df1..a6cad148 100644
--- a/lib/qpnexus.h
+++ b/lib/qpnexus.h
@@ -56,6 +56,19 @@
/* number of event hooks */
#define NUM_EVENT_HOOK 2
+/* Work priorities */
+enum qpn_priority
+{
+ qpn_pri_highest = 1,
+
+ qpn_pri_first = 1,
+ qpn_pri_second = 2,
+ qpn_pri_third = 3,
+ qpn_pri_fourth = 4,
+
+ qpn_pri_lowest = 4,
+};
+
/*==============================================================================
* Data Structures.
*/
@@ -95,11 +108,12 @@ struct qpn_nexus
* thread loop is no longer executed */
void (*in_thread_final)(void);
- /* thread loop events, can override. Called before message queue,
- * I/O and timers.
+ /* thread loop events, can override. Called before and after message queue,
+ * and before I/O and timers.
+ * Hook should perform all work <= given priority.
* Returns the time to try again, 0 means default to maximum.
*/
- qtime_mono_t (*event_hook[NUM_EVENT_HOOK])(void);
+ qtime_mono_t (*event_hook[NUM_EVENT_HOOK])(enum qpn_priority);
};
diff --git a/lib/thread.c b/lib/thread.c
index e581dd6f..f2b873ac 100644
--- a/lib/thread.c
+++ b/lib/thread.c
@@ -47,13 +47,6 @@ static qpt_mutex_t thread_mutex;
#define UNLOCK qpt_mutex_unlock(&thread_mutex);
static struct hash *cpu_record = NULL;
-/* TODO: remove this */
-#define USE_MQUEUE
-#ifdef USE_MQUEUE
-#include "qpnexus.h"
-static sigset_t newmask;
-#endif
-
/* Struct timeval's tv_usec one second value. */
#define TIMER_SECOND_MICRO 1000000L
@@ -973,33 +966,13 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
(!timer_wait || (timeval_cmp (*timer_wait, *timer_wait_bg) > 0)))
timer_wait = timer_wait_bg;
- /* TODO: remove this */
-#ifdef USE_MQUEUE
- {
- struct timespec spec ;
- spec.tv_sec = timer_wait->tv_sec ;
- spec.tv_nsec = timer_wait->tv_usec * 1000 ;
- num = pselect (FD_SETSIZE, &readfd, &writefd, &exceptfd, &spec,
- &newmask);
- } ;
-#else
num = select (FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
-#endif
/* Signals should get quick treatment */
if (num < 0)
{
if (errno == EINTR)
-#ifdef USE_MQUEUE
- {
- if (qpthreads_enabled)
- return NULL;
- else
- continue; /* signal received - process it */
- }
-#else
continue; /* signal received - process it */
-#endif
zlog_warn ("select() error: %s", safe_strerror (errno));
return NULL;
}
@@ -1037,11 +1010,11 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
}
-/* Fetch next ready thread. Events and timeouts only. No I/O.
- * If nothing to do returns NULL and sets event_wait to recommended time
- * to be called again. */
+/* Fetch next ready thread <= given priority. Events and timeouts only.
+ * No I/O. If nothing to do returns NULL and sets event_wait to
+ * recommended time to be called again. */
struct thread *
-thread_fetch_event (struct thread_master *m, struct thread *fetch,
+thread_fetch_event (enum qpn_priority priority, struct thread_master *m, struct thread *fetch,
qtime_mono_t *event_wait)
{
struct thread *thread;
@@ -1050,22 +1023,34 @@ thread_fetch_event (struct thread_master *m, struct thread *fetch,
struct timeval *timer_wait;
struct timeval *timer_wait_bg;
+ *event_wait = 0;
+
/* Normal event are the next highest priority. */
if ((thread = thread_trim_head (&m->event)) != NULL)
return thread_run (m, thread, fetch);
+ if (priority <= qpn_pri_first)
+ return NULL;
+
/* If there are any ready threads from previous scheduler runs,
* process top of them.
*/
if ((thread = thread_trim_head (&m->ready)) != NULL)
return thread_run (m, thread, fetch);
- /* Check foreground timers. Historically, they have had higher
- priority than I/O threads, so let's push them onto the ready
- list in front of the I/O threads. */
+ if (priority <= qpn_pri_second)
+ return NULL;
+
+ /* Check foreground timers. */
quagga_get_relative (NULL);
thread_timer_process (&m->timer, &relative_time);
+ if ((thread = thread_trim_head (&m->ready)) != NULL)
+ return thread_run (m, thread, fetch);
+
+ if (priority <= qpn_pri_third)
+ return NULL;
+
/* Background timer/events, lowest priority */
thread_timer_process (&m->background, &relative_time);
diff --git a/lib/thread.h b/lib/thread.h
index b0699650..1e68007a 100644
--- a/lib/thread.h
+++ b/lib/thread.h
@@ -24,6 +24,7 @@
#include <sys/resource.h>
#include "qtime.h"
+#include "qpnexus.h"
struct rusage_t
{
@@ -195,7 +196,7 @@ extern struct thread *funcname_thread_execute (struct thread_master *,
extern void thread_cancel (struct thread *);
extern unsigned int thread_cancel_event (struct thread_master *, void *);
extern struct thread *thread_fetch (struct thread_master *, struct thread *);
-struct thread * thread_fetch_event (struct thread_master *m, struct thread *fetch,
+struct thread * thread_fetch_event (enum qpn_priority,struct thread_master *m, struct thread *fetch,
qtime_mono_t *event_wait);
extern void thread_call (struct thread *);
extern unsigned long thread_timer_remain_second (struct thread *);