diff options
Diffstat (limited to 'lib/workqueue.c')
-rw-r--r-- | lib/workqueue.c | 285 |
1 files changed, 182 insertions, 103 deletions
diff --git a/lib/workqueue.c b/lib/workqueue.c index 7c811edd..6f2cd531 100644 --- a/lib/workqueue.c +++ b/lib/workqueue.c @@ -1,4 +1,4 @@ -/* +/* * Quagga Work Queue Support. * * Copyright (C) 2005 Sun Microsystems, Inc. @@ -18,38 +18,30 @@ * You should have received a copy of the GNU General Public License * along with Quagga; see the file COPYING. If not, write to the Free * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA - * 02111-1307, USA. + * 02111-1307, USA. */ #include <lib/zebra.h> #include "thread.h" #include "memory.h" #include "workqueue.h" -#include "linklist.h" #include "command.h" #include "log.h" +#include "linklist.h" /* master list of work_queues */ static struct list work_queues; #define WORK_QUEUE_MIN_GRANULARITY 1 -static struct work_queue_item * -work_queue_item_new (struct work_queue *wq) -{ - struct work_queue_item *item; - assert (wq); - - item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, - sizeof (struct work_queue_item)); - - return item; -} - static void -work_queue_item_free (struct work_queue_item *item) +work_queue_item_free (struct work_queue *wq, struct work_queue_item *item) { - XFREE (MTYPE_WORK_QUEUE_ITEM, item); + /* call private data deletion callback if needed */ + if (wq->spec.del_item_data != NULL) + wq->spec.del_item_data (wq, item) ; + + XFREE (MTYPE_WORK_QUEUE_ITEM, item) ; return; } @@ -58,46 +50,40 @@ struct work_queue * work_queue_new (struct thread_master *m, const char *queue_name) { struct work_queue *new; - + new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue)); if (new == NULL) return new; - - new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name); + + new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name); new->master = m; SET_FLAG (new->flags, WQ_UNPLUGGED); - - if ( (new->items = list_new ()) == NULL) - { - XFREE (MTYPE_WORK_QUEUE_NAME, new->name); - XFREE (MTYPE_WORK_QUEUE, new); - - return NULL; - } - - new->items->del = (void (*)(void *)) work_queue_item_free; - + listnode_add (&work_queues, new); - + new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; /* Default values, can be overriden by caller */ new->spec.hold = WORK_QUEUE_DEFAULT_HOLD; - + return new; } void work_queue_free (struct work_queue *wq) { + work_queue_item item ; + if (wq->thread != NULL) thread_cancel(wq->thread); - - /* list_delete frees items via callback */ - list_delete (wq->items); - listnode_delete (&work_queues, wq); - + + while ((item = wq->head) != NULL) + { + wq->head = item->next ; + work_queue_item_free(wq, item) ; + } ; + XFREE (MTYPE_WORK_QUEUE_NAME, wq->name); XFREE (MTYPE_WORK_QUEUE, wq); return; @@ -109,59 +95,151 @@ work_queue_schedule (struct work_queue *wq, unsigned int delay) /* if appropriate, schedule work queue thread */ if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL) - && (listcount (wq->items) > 0) ) + && (wq->head != NULL) ) { - wq->thread = thread_add_background (wq->master, work_queue_run, + wq->thread = thread_add_background (wq->master, work_queue_run, wq, delay); return 1; } else return 0; } - -void -work_queue_add (struct work_queue *wq, void *data) + +/*------------------------------------------------------------------------------ + * Create new work queue item and place on the end of the given work queue. + * + * Schedules the work queue if there were no items (unless already scheduled + * or plugged). + * + * Returns the address of the args area in the new item. + */ +extern void* +work_queue_item_add (struct work_queue *wq) { - struct work_queue_item *item; - + work_queue_item item ; + assert (wq); - if (!(item = work_queue_item_new (wq))) + item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, sizeof (struct work_queue_item)); + + if (item == NULL) { zlog_err ("%s: unable to get new queue item", __func__); - return; + return NULL ; + } + + item->next = NULL ; + if (wq->head == NULL) + { + assert(wq->list_count == 0) ; + wq->head = item ; + item->prev = NULL ; } - - item->data = data; - listnode_add (wq->items, item); - + else + { + assert((wq->tail != NULL) && (wq->list_count > 0)) ; + wq->tail->next = item ; + item->prev = wq->tail ; + } ; + wq->tail = item ; + + ++wq->list_count ; work_queue_schedule (wq, wq->spec.hold); - - return; + + return work_queue_item_args(item) ; } static void -work_queue_item_remove (struct work_queue *wq, struct listnode *ln) +work_queue_item_remove (struct work_queue *wq, work_queue_item item) { - struct work_queue_item *item = listgetdata (ln); + assert ((wq != NULL) && (item != NULL)) ; + + if (wq->head == item) + { + /* Removing the first item */ + assert(item->prev == NULL) ; + + wq->head = item->next ; - assert (item && item->data); + if (wq->tail == item) + { + /* Removing the only item */ + assert((item->next == NULL) && (wq->list_count == 1)) ; + wq->tail = NULL ; + } + else + { + /* First, but not the only item */ + assert((item->next != NULL) && (wq->list_count > 1)) ; + wq->head->prev = NULL ; + } ; + } + else if (wq->tail == item) + { + /* Removing last, but not only item */ + assert(item->next == NULL) ; + assert((item->prev != NULL) && (wq->list_count > 1)) ; + + wq->tail = item->prev ; + wq->tail->next = NULL ; + } + else + { + /* Removing from somewhere in middle */ + assert(item->next != NULL) ; + assert((item->prev != NULL) && (wq->list_count > 2)) ; + + item->prev->next = item->next ; + item->next->prev = item->prev ; + } ; - /* call private data deletion callback if needed */ - if (wq->spec.del_item_data) - wq->spec.del_item_data (wq, item->data); + --wq->list_count ; + work_queue_item_free (wq, item); - list_delete_node (wq->items, ln); - work_queue_item_free (item); - return; } -static void -work_queue_item_requeue (struct work_queue *wq, struct listnode *ln) +static work_queue_item +work_queue_item_requeue (struct work_queue *wq, work_queue_item item) { - LISTNODE_DETACH (wq->items, ln); - LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */ + work_queue_item next = item->next ; + work_queue_item last = wq->tail ; + + assert(last != NULL) ; + + if (last == item) + { + /* Requeuing last item -- easy ! */ + assert(next == NULL) ; + return item ; + } ; + + assert(next != NULL) ; + + if (wq->head == item) + { + /* Requeuing first, but not only item */ + assert(item->prev == NULL) ; + + wq->head = next ; + next->prev = NULL ; + } + else + { + /* Requeuing something in middle */ + assert(item->prev != NULL) ; + + item->prev->next = item->next ; + item->next->prev = item->prev ; + } ; + + item->next = NULL ; + item->prev = last ; + + last->next = item ; + wq->tail = item ; + + return next ; } DEFUN(show_work_queues, @@ -172,8 +250,8 @@ DEFUN(show_work_queues, { struct listnode *node; struct work_queue *wq; - - vty_out (vty, + + vty_out (vty, "%c %8s %5s %8s %21s%s", ' ', "List","(ms) ","Q. Runs","Cycle Counts ", VTY_NEWLINE); @@ -183,24 +261,24 @@ DEFUN(show_work_queues, "Items", "Hold", "Total", - "Best","Gran.","Avg.", - "Name", + "Best","Gran.","Avg.", + "Name", VTY_NEWLINE); - + for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq)) { vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s", (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'), - listcount (wq->items), + wq->list_count, wq->spec.hold, wq->runs, wq->cycles.best, wq->cycles.granularity, - (wq->runs) ? + (wq->runs) ? (unsigned int) (wq->cycles.total / wq->runs) : 0, wq->name, VTY_NEWLINE); } - + return CMD_SUCCESS; } @@ -212,9 +290,9 @@ work_queue_plug (struct work_queue *wq) { if (wq->thread) thread_cancel (wq->thread); - + wq->thread = NULL; - + UNSET_FLAG (wq->flags, WQ_UNPLUGGED); } @@ -232,22 +310,21 @@ work_queue_unplug (struct work_queue *wq) /* timer thread to process a work queue * will reschedule itself if required, - * otherwise work_queue_item_add + * otherwise work_queue_item_add */ int work_queue_run (struct thread *thread) { struct work_queue *wq; - struct work_queue_item *item; + work_queue_item next, item ; wq_item_status ret; unsigned int cycles = 0; - struct listnode *node, *nnode; char yielded = 0; wq = THREAD_ARG (thread); wq->thread = NULL; - assert (wq && wq->items); + assert (wq != NULL) ; /* calculate cycle granularity: * list iteration == 1 cycle @@ -258,38 +335,40 @@ work_queue_run (struct thread *thread) * * Best: starts low, can only increase * - * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased - * if we run to end of time slot, can increase otherwise + * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased + * if we run to end of time slot, can increase otherwise * by a small factor. * * We could use just the average and save some work, however we want to be * able to adjust quickly to CPU pressure. Average wont shift much if * daemon has been running a long time. */ - if (wq->cycles.granularity == 0) - wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; + if (wq->cycles.granularity == 0) + wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; - for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item)) + next = wq->head ; + while (next != NULL) { - assert (item && item->data); - + item = next ; + next = item->next ; /* default next item */ + /* dont run items which are past their allowed retries */ if (item->ran > wq->spec.max_retries) { /* run error handler, if any */ - if (wq->spec.errorfunc) - wq->spec.errorfunc (wq, item->data); - work_queue_item_remove (wq, node); + if (wq->spec.errorfunc != NULL) + wq->spec.errorfunc (wq, item); + work_queue_item_remove (wq, item); continue; } /* run and take care of items that want to be retried immediately */ do { - ret = wq->spec.workfunc (wq, item->data); + ret = wq->spec.workfunc (wq, item); item->ran++; } - while ((ret == WQ_RETRY_NOW) + while ((ret == WQ_RETRY_NOW) && (item->ran < wq->spec.max_retries)); switch (ret) @@ -308,21 +387,21 @@ work_queue_run (struct thread *thread) case WQ_REQUEUE: { item->ran--; - work_queue_item_requeue (wq, node); + next = work_queue_item_requeue (wq, item); break; } case WQ_RETRY_NOW: /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */ case WQ_ERROR: { - if (wq->spec.errorfunc) + if (wq->spec.errorfunc != NULL) wq->spec.errorfunc (wq, item); } /* fall through here is deliberate */ case WQ_SUCCESS: default: { - work_queue_item_remove (wq, node); + work_queue_item_remove (wq, item); break; } } @@ -331,7 +410,7 @@ work_queue_run (struct thread *thread) cycles++; /* test if we should yield */ - if ( !(cycles % wq->cycles.granularity) + if ( !(cycles % wq->cycles.granularity) && thread_should_yield (thread)) { yielded = 1; @@ -346,15 +425,15 @@ stats: /* we yielded, check whether granularity should be reduced */ if (yielded && (cycles < wq->cycles.granularity)) { - wq->cycles.granularity = ((cycles > 0) ? cycles + wq->cycles.granularity = ((cycles > 0) ? cycles : WORK_QUEUE_MIN_GRANULARITY); } - + if (cycles >= (wq->cycles.granularity)) { if (cycles > wq->cycles.best) wq->cycles.best = cycles; - + /* along with yielded check, provides hysteris for granularity */ if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR * 2)) wq->cycles.granularity *= WQ_HYSTERIS_FACTOR; /* quick ramp-up */ @@ -362,7 +441,7 @@ stats: wq->cycles.granularity += WQ_HYSTERIS_FACTOR; } #undef WQ_HYSTERIS_FACTOR - + wq->runs++; wq->cycles.total += cycles; @@ -370,12 +449,12 @@ stats: printf ("%s: cycles %d, new: best %d, worst %d\n", __func__, cycles, wq->cycles.best, wq->cycles.granularity); #endif - + /* Is the queue done yet? If it is, call the completion callback. */ - if (listcount (wq->items) > 0) + if (wq->head != NULL) work_queue_schedule (wq, 0); else if (wq->spec.completion_func) wq->spec.completion_func (wq); - + return 0; } |