summaryrefslogtreecommitdiffstats
path: root/lib/workqueue.c
diff options
context:
space:
mode:
authorChris Hall <GMCH@hestia.halldom.com>2010-02-16 09:52:14 +0000
committerChris Hall <GMCH@hestia.halldom.com>2010-02-16 09:52:14 +0000
commit9856e17cf2495d1f7db16e866f16bc4a8447524d (patch)
tree260d0c56610ad8f8db533737a59cbda33665752f /lib/workqueue.c
parent3b9932d5f7cdeac29a81bceeb190479b675a0435 (diff)
downloadquagga-9856e17cf2495d1f7db16e866f16bc4a8447524d.tar.bz2
quagga-9856e17cf2495d1f7db16e866f16bc4a8447524d.tar.xz
Revised thread/timer handling, work queue and scheduling.
Updated quagga thread handling to use qtimers when using the new qpnexus -- so all timers are qtimers in the new scheme. Updated work queue handling so that each work queue item is a single malloced structure, not three. (Only bgpd and zebra use the work queue system.) When using qpnexus the background thread queue is no longer a timer queue, but simply a list of pending background threads. When a background thread is waiting on a timer, it is in the qtimer pile, same like any other thread. When using qpnexus, the only remaining quagga thread queues are the event and ready queues. Revised the qpnexus loop so that only when there is nothing else to do will it consider the background threads. Revised write I/O in the BGP Engine so that all writing is via the connection's write buffer. Revised the write I/O in the Routeing Engine, so that it passes groups of updates in a single mqueue message. This all reduces the number of TCP packets sent (because BGP messages are collected together in the connection's write buffer) and reduces the number of mqueue messages involved. (No need for TCP_CORK.) Code and comments review for the new code. modified: bgpd/bgp_advertise.c modified: bgpd/bgp_common.h modified: bgpd/bgp_connection.c modified: bgpd/bgp_connection.h modified: bgpd/bgp_engine.h modified: bgpd/bgp_fsm.c modified: bgpd/bgp_main.c modified: bgpd/bgp_msg_read.c modified: bgpd/bgp_msg_write.c modified: bgpd/bgp_network.c modified: bgpd/bgp_packet.c modified: bgpd/bgp_packet.h modified: bgpd/bgp_peer.c modified: bgpd/bgp_peer_index.h modified: bgpd/bgp_route.c modified: bgpd/bgp_route_refresh.h modified: bgpd/bgp_session.c modified: bgpd/bgp_session.h modified: bgpd/bgpd.c new file: bgpd/bgpd.cx modified: lib/mqueue.h modified: lib/qpnexus.c modified: lib/qpnexus.h modified: lib/qpselect.c modified: lib/qtimers.c modified: lib/qtimers.h modified: lib/sigevent.c modified: lib/stream.c modified: lib/stream.h modified: lib/thread.c modified: lib/thread.h modified: lib/workqueue.c modified: lib/workqueue.h modified: tests/heavy-wq.c modified: zebra/zebra_rib.c
Diffstat (limited to 'lib/workqueue.c')
-rw-r--r--lib/workqueue.c285
1 files changed, 182 insertions, 103 deletions
diff --git a/lib/workqueue.c b/lib/workqueue.c
index 7c811edd..6f2cd531 100644
--- a/lib/workqueue.c
+++ b/lib/workqueue.c
@@ -1,4 +1,4 @@
-/*
+/*
* Quagga Work Queue Support.
*
* Copyright (C) 2005 Sun Microsystems, Inc.
@@ -18,38 +18,30 @@
* You should have received a copy of the GNU General Public License
* along with Quagga; see the file COPYING. If not, write to the Free
* Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- * 02111-1307, USA.
+ * 02111-1307, USA.
*/
#include <lib/zebra.h>
#include "thread.h"
#include "memory.h"
#include "workqueue.h"
-#include "linklist.h"
#include "command.h"
#include "log.h"
+#include "linklist.h"
/* master list of work_queues */
static struct list work_queues;
#define WORK_QUEUE_MIN_GRANULARITY 1
-static struct work_queue_item *
-work_queue_item_new (struct work_queue *wq)
-{
- struct work_queue_item *item;
- assert (wq);
-
- item = XCALLOC (MTYPE_WORK_QUEUE_ITEM,
- sizeof (struct work_queue_item));
-
- return item;
-}
-
static void
-work_queue_item_free (struct work_queue_item *item)
+work_queue_item_free (struct work_queue *wq, struct work_queue_item *item)
{
- XFREE (MTYPE_WORK_QUEUE_ITEM, item);
+ /* call private data deletion callback if needed */
+ if (wq->spec.del_item_data != NULL)
+ wq->spec.del_item_data (wq, item) ;
+
+ XFREE (MTYPE_WORK_QUEUE_ITEM, item) ;
return;
}
@@ -58,46 +50,40 @@ struct work_queue *
work_queue_new (struct thread_master *m, const char *queue_name)
{
struct work_queue *new;
-
+
new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
if (new == NULL)
return new;
-
- new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
+
+ new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
new->master = m;
SET_FLAG (new->flags, WQ_UNPLUGGED);
-
- if ( (new->items = list_new ()) == NULL)
- {
- XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
- XFREE (MTYPE_WORK_QUEUE, new);
-
- return NULL;
- }
-
- new->items->del = (void (*)(void *)) work_queue_item_free;
-
+
listnode_add (&work_queues, new);
-
+
new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
/* Default values, can be overriden by caller */
new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
-
+
return new;
}
void
work_queue_free (struct work_queue *wq)
{
+ work_queue_item item ;
+
if (wq->thread != NULL)
thread_cancel(wq->thread);
-
- /* list_delete frees items via callback */
- list_delete (wq->items);
- listnode_delete (&work_queues, wq);
-
+
+ while ((item = wq->head) != NULL)
+ {
+ wq->head = item->next ;
+ work_queue_item_free(wq, item) ;
+ } ;
+
XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
XFREE (MTYPE_WORK_QUEUE, wq);
return;
@@ -109,59 +95,151 @@ work_queue_schedule (struct work_queue *wq, unsigned int delay)
/* if appropriate, schedule work queue thread */
if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
&& (wq->thread == NULL)
- && (listcount (wq->items) > 0) )
+ && (wq->head != NULL) )
{
- wq->thread = thread_add_background (wq->master, work_queue_run,
+ wq->thread = thread_add_background (wq->master, work_queue_run,
wq, delay);
return 1;
}
else
return 0;
}
-
-void
-work_queue_add (struct work_queue *wq, void *data)
+
+/*------------------------------------------------------------------------------
+ * Create new work queue item and place on the end of the given work queue.
+ *
+ * Schedules the work queue if there were no items (unless already scheduled
+ * or plugged).
+ *
+ * Returns the address of the args area in the new item.
+ */
+extern void*
+work_queue_item_add (struct work_queue *wq)
{
- struct work_queue_item *item;
-
+ work_queue_item item ;
+
assert (wq);
- if (!(item = work_queue_item_new (wq)))
+ item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, sizeof (struct work_queue_item));
+
+ if (item == NULL)
{
zlog_err ("%s: unable to get new queue item", __func__);
- return;
+ return NULL ;
+ }
+
+ item->next = NULL ;
+ if (wq->head == NULL)
+ {
+ assert(wq->list_count == 0) ;
+ wq->head = item ;
+ item->prev = NULL ;
}
-
- item->data = data;
- listnode_add (wq->items, item);
-
+ else
+ {
+ assert((wq->tail != NULL) && (wq->list_count > 0)) ;
+ wq->tail->next = item ;
+ item->prev = wq->tail ;
+ } ;
+ wq->tail = item ;
+
+ ++wq->list_count ;
work_queue_schedule (wq, wq->spec.hold);
-
- return;
+
+ return work_queue_item_args(item) ;
}
static void
-work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
+work_queue_item_remove (struct work_queue *wq, work_queue_item item)
{
- struct work_queue_item *item = listgetdata (ln);
+ assert ((wq != NULL) && (item != NULL)) ;
+
+ if (wq->head == item)
+ {
+ /* Removing the first item */
+ assert(item->prev == NULL) ;
+
+ wq->head = item->next ;
- assert (item && item->data);
+ if (wq->tail == item)
+ {
+ /* Removing the only item */
+ assert((item->next == NULL) && (wq->list_count == 1)) ;
+ wq->tail = NULL ;
+ }
+ else
+ {
+ /* First, but not the only item */
+ assert((item->next != NULL) && (wq->list_count > 1)) ;
+ wq->head->prev = NULL ;
+ } ;
+ }
+ else if (wq->tail == item)
+ {
+ /* Removing last, but not only item */
+ assert(item->next == NULL) ;
+ assert((item->prev != NULL) && (wq->list_count > 1)) ;
+
+ wq->tail = item->prev ;
+ wq->tail->next = NULL ;
+ }
+ else
+ {
+ /* Removing from somewhere in middle */
+ assert(item->next != NULL) ;
+ assert((item->prev != NULL) && (wq->list_count > 2)) ;
+
+ item->prev->next = item->next ;
+ item->next->prev = item->prev ;
+ } ;
- /* call private data deletion callback if needed */
- if (wq->spec.del_item_data)
- wq->spec.del_item_data (wq, item->data);
+ --wq->list_count ;
+ work_queue_item_free (wq, item);
- list_delete_node (wq->items, ln);
- work_queue_item_free (item);
-
return;
}
-static void
-work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
+static work_queue_item
+work_queue_item_requeue (struct work_queue *wq, work_queue_item item)
{
- LISTNODE_DETACH (wq->items, ln);
- LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
+ work_queue_item next = item->next ;
+ work_queue_item last = wq->tail ;
+
+ assert(last != NULL) ;
+
+ if (last == item)
+ {
+ /* Requeuing last item -- easy ! */
+ assert(next == NULL) ;
+ return item ;
+ } ;
+
+ assert(next != NULL) ;
+
+ if (wq->head == item)
+ {
+ /* Requeuing first, but not only item */
+ assert(item->prev == NULL) ;
+
+ wq->head = next ;
+ next->prev = NULL ;
+ }
+ else
+ {
+ /* Requeuing something in middle */
+ assert(item->prev != NULL) ;
+
+ item->prev->next = item->next ;
+ item->next->prev = item->prev ;
+ } ;
+
+ item->next = NULL ;
+ item->prev = last ;
+
+ last->next = item ;
+ wq->tail = item ;
+
+ return next ;
}
DEFUN(show_work_queues,
@@ -172,8 +250,8 @@ DEFUN(show_work_queues,
{
struct listnode *node;
struct work_queue *wq;
-
- vty_out (vty,
+
+ vty_out (vty,
"%c %8s %5s %8s %21s%s",
' ', "List","(ms) ","Q. Runs","Cycle Counts ",
VTY_NEWLINE);
@@ -183,24 +261,24 @@ DEFUN(show_work_queues,
"Items",
"Hold",
"Total",
- "Best","Gran.","Avg.",
- "Name",
+ "Best","Gran.","Avg.",
+ "Name",
VTY_NEWLINE);
-
+
for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
{
vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
(CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
- listcount (wq->items),
+ wq->list_count,
wq->spec.hold,
wq->runs,
wq->cycles.best, wq->cycles.granularity,
- (wq->runs) ?
+ (wq->runs) ?
(unsigned int) (wq->cycles.total / wq->runs) : 0,
wq->name,
VTY_NEWLINE);
}
-
+
return CMD_SUCCESS;
}
@@ -212,9 +290,9 @@ work_queue_plug (struct work_queue *wq)
{
if (wq->thread)
thread_cancel (wq->thread);
-
+
wq->thread = NULL;
-
+
UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
}
@@ -232,22 +310,21 @@ work_queue_unplug (struct work_queue *wq)
/* timer thread to process a work queue
* will reschedule itself if required,
- * otherwise work_queue_item_add
+ * otherwise work_queue_item_add
*/
int
work_queue_run (struct thread *thread)
{
struct work_queue *wq;
- struct work_queue_item *item;
+ work_queue_item next, item ;
wq_item_status ret;
unsigned int cycles = 0;
- struct listnode *node, *nnode;
char yielded = 0;
wq = THREAD_ARG (thread);
wq->thread = NULL;
- assert (wq && wq->items);
+ assert (wq != NULL) ;
/* calculate cycle granularity:
* list iteration == 1 cycle
@@ -258,38 +335,40 @@ work_queue_run (struct thread *thread)
*
* Best: starts low, can only increase
*
- * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
- * if we run to end of time slot, can increase otherwise
+ * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
+ * if we run to end of time slot, can increase otherwise
* by a small factor.
*
* We could use just the average and save some work, however we want to be
* able to adjust quickly to CPU pressure. Average wont shift much if
* daemon has been running a long time.
*/
- if (wq->cycles.granularity == 0)
- wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
+ if (wq->cycles.granularity == 0)
+ wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
- for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
+ next = wq->head ;
+ while (next != NULL)
{
- assert (item && item->data);
-
+ item = next ;
+ next = item->next ; /* default next item */
+
/* dont run items which are past their allowed retries */
if (item->ran > wq->spec.max_retries)
{
/* run error handler, if any */
- if (wq->spec.errorfunc)
- wq->spec.errorfunc (wq, item->data);
- work_queue_item_remove (wq, node);
+ if (wq->spec.errorfunc != NULL)
+ wq->spec.errorfunc (wq, item);
+ work_queue_item_remove (wq, item);
continue;
}
/* run and take care of items that want to be retried immediately */
do
{
- ret = wq->spec.workfunc (wq, item->data);
+ ret = wq->spec.workfunc (wq, item);
item->ran++;
}
- while ((ret == WQ_RETRY_NOW)
+ while ((ret == WQ_RETRY_NOW)
&& (item->ran < wq->spec.max_retries));
switch (ret)
@@ -308,21 +387,21 @@ work_queue_run (struct thread *thread)
case WQ_REQUEUE:
{
item->ran--;
- work_queue_item_requeue (wq, node);
+ next = work_queue_item_requeue (wq, item);
break;
}
case WQ_RETRY_NOW:
/* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
case WQ_ERROR:
{
- if (wq->spec.errorfunc)
+ if (wq->spec.errorfunc != NULL)
wq->spec.errorfunc (wq, item);
}
/* fall through here is deliberate */
case WQ_SUCCESS:
default:
{
- work_queue_item_remove (wq, node);
+ work_queue_item_remove (wq, item);
break;
}
}
@@ -331,7 +410,7 @@ work_queue_run (struct thread *thread)
cycles++;
/* test if we should yield */
- if ( !(cycles % wq->cycles.granularity)
+ if ( !(cycles % wq->cycles.granularity)
&& thread_should_yield (thread))
{
yielded = 1;
@@ -346,15 +425,15 @@ stats:
/* we yielded, check whether granularity should be reduced */
if (yielded && (cycles < wq->cycles.granularity))
{
- wq->cycles.granularity = ((cycles > 0) ? cycles
+ wq->cycles.granularity = ((cycles > 0) ? cycles
: WORK_QUEUE_MIN_GRANULARITY);
}
-
+
if (cycles >= (wq->cycles.granularity))
{
if (cycles > wq->cycles.best)
wq->cycles.best = cycles;
-
+
/* along with yielded check, provides hysteris for granularity */
if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR * 2))
wq->cycles.granularity *= WQ_HYSTERIS_FACTOR; /* quick ramp-up */
@@ -362,7 +441,7 @@ stats:
wq->cycles.granularity += WQ_HYSTERIS_FACTOR;
}
#undef WQ_HYSTERIS_FACTOR
-
+
wq->runs++;
wq->cycles.total += cycles;
@@ -370,12 +449,12 @@ stats:
printf ("%s: cycles %d, new: best %d, worst %d\n",
__func__, cycles, wq->cycles.best, wq->cycles.granularity);
#endif
-
+
/* Is the queue done yet? If it is, call the completion callback. */
- if (listcount (wq->items) > 0)
+ if (wq->head != NULL)
work_queue_schedule (wq, 0);
else if (wq->spec.completion_func)
wq->spec.completion_func (wq);
-
+
return 0;
}