From 9856e17cf2495d1f7db16e866f16bc4a8447524d Mon Sep 17 00:00:00 2001 From: Chris Hall Date: Tue, 16 Feb 2010 09:52:14 +0000 Subject: Revised thread/timer handling, work queue and scheduling. Updated quagga thread handling to use qtimers when using the new qpnexus -- so all timers are qtimers in the new scheme. Updated work queue handling so that each work queue item is a single malloced structure, not three. (Only bgpd and zebra use the work queue system.) When using qpnexus the background thread queue is no longer a timer queue, but simply a list of pending background threads. When a background thread is waiting on a timer, it is in the qtimer pile, same like any other thread. When using qpnexus, the only remaining quagga thread queues are the event and ready queues. Revised the qpnexus loop so that only when there is nothing else to do will it consider the background threads. Revised write I/O in the BGP Engine so that all writing is via the connection's write buffer. Revised the write I/O in the Routeing Engine, so that it passes groups of updates in a single mqueue message. This all reduces the number of TCP packets sent (because BGP messages are collected together in the connection's write buffer) and reduces the number of mqueue messages involved. (No need for TCP_CORK.) Code and comments review for the new code. modified: bgpd/bgp_advertise.c modified: bgpd/bgp_common.h modified: bgpd/bgp_connection.c modified: bgpd/bgp_connection.h modified: bgpd/bgp_engine.h modified: bgpd/bgp_fsm.c modified: bgpd/bgp_main.c modified: bgpd/bgp_msg_read.c modified: bgpd/bgp_msg_write.c modified: bgpd/bgp_network.c modified: bgpd/bgp_packet.c modified: bgpd/bgp_packet.h modified: bgpd/bgp_peer.c modified: bgpd/bgp_peer_index.h modified: bgpd/bgp_route.c modified: bgpd/bgp_route_refresh.h modified: bgpd/bgp_session.c modified: bgpd/bgp_session.h modified: bgpd/bgpd.c new file: bgpd/bgpd.cx modified: lib/mqueue.h modified: lib/qpnexus.c modified: lib/qpnexus.h modified: lib/qpselect.c modified: lib/qtimers.c modified: lib/qtimers.h modified: lib/sigevent.c modified: lib/stream.c modified: lib/stream.h modified: lib/thread.c modified: lib/thread.h modified: lib/workqueue.c modified: lib/workqueue.h modified: tests/heavy-wq.c modified: zebra/zebra_rib.c --- lib/workqueue.h | 95 ++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 74 insertions(+), 21 deletions(-) (limited to 'lib/workqueue.h') diff --git a/lib/workqueue.h b/lib/workqueue.h index f59499a0..5d2f2da2 100644 --- a/lib/workqueue.h +++ b/lib/workqueue.h @@ -1,4 +1,4 @@ -/* +/* * Quagga Work Queues. * * Copyright (C) 2005 Sun Microsystems, Inc. @@ -18,14 +18,18 @@ * You should have received a copy of the GNU General Public License * along with Quagga; see the file COPYING. If not, write to the Free * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA - * 02111-1307, USA. + * 02111-1307, USA. */ #ifndef _QUAGGA_WORK_QUEUE_H #define _QUAGGA_WORK_QUEUE_H +#ifndef Inline +#define Inline static inline +#endif + /* Hold time for the initial schedule of a queue run, in millisec */ -#define WORK_QUEUE_DEFAULT_HOLD 50 +#define WORK_QUEUE_DEFAULT_HOLD 50 /* action value, for use by item processor and item error handlers */ typedef enum @@ -40,12 +44,37 @@ typedef enum * the particular item.. */ } wq_item_status; +enum { wq_args_size_max = 24 } ; /* maximum size of union wq_args */ + +union wq_args +{ + void* data ; + char bytes[wq_args_size_max] ; /* empty space `*/ +} ; + +#define WQ_ARGS_SIZE_OK(s) CONFIRM(sizeof(struct s) <= wq_args_size_max) + /* A single work queue item, unsurprisingly */ +typedef struct work_queue_item* work_queue_item ; struct work_queue_item { - void *data; /* opaque data */ + union wq_args args ; /* cast as required */ + + struct work_queue_item* next ; /* the queue itself */ + struct work_queue_item* prev ; + unsigned short ran; /* # of times item has been run */ -}; +} ; + +/* work_queue_item structures are malloced. That guarantees maximum alignment. + * To guarantee maximum alignment for "struct args", it must be first item ! + * + * (The typedef is required to stop Eclipse (3.4.2 with CDT 5.0) whining + * about first argument of offsetof().) + */ +typedef struct work_queue_item work_queue_item_t ; +CONFIRM(offsetof(work_queue_item_t, args) == 0) ; + /* so guaranteed max alignment */ #define WQ_UNPLUGGED (1 << 0) /* available for draining */ @@ -57,52 +86,55 @@ struct work_queue struct thread_master *master; /* thread master */ struct thread *thread; /* thread, if one is active */ char *name; /* work queue name */ - + /* Specification for this work queue. * Public, must be set before use by caller. May be modified at will. */ struct { /* optional opaque user data, global to the queue. */ void *data; - + /* work function to process items with: * First argument is the workqueue queue. * Second argument is the item data */ - wq_item_status (*workfunc) (struct work_queue *, void *); + wq_item_status (*workfunc) (struct work_queue *, work_queue_item); /* error handling function, optional */ - void (*errorfunc) (struct work_queue *, struct work_queue_item *); - + void (*errorfunc) (struct work_queue *, work_queue_item); + /* callback to delete user specific item data */ - void (*del_item_data) (struct work_queue *, void *); - + void (*del_item_data) (struct work_queue *, work_queue_item); + /* completion callback, called when queue is emptied, optional */ void (*completion_func) (struct work_queue *); - + /* max number of retries to make for item that errors */ - unsigned int max_retries; + unsigned int max_retries; unsigned int hold; /* hold time for first run, in ms */ } spec; - + /* remaining fields should be opaque to users */ - struct list *items; /* queue item list */ - unsigned long runs; /* runs count */ - + work_queue_item head ; /* queue item list */ + work_queue_item tail ; + unsigned list_count ; + + unsigned long runs; /* runs count */ + struct { unsigned int best; unsigned int granularity; unsigned long total; } cycles; /* cycle counts */ - + /* private state */ u_int16_t flags; /* user set flag */ }; /* User API */ -/* create a new work queue, of given name. +/* create a new work queue, of given name. * user must fill in the spec of the returned work queue before adding * anything to it */ @@ -112,7 +144,10 @@ extern struct work_queue *work_queue_new (struct thread_master *, extern void work_queue_free (struct work_queue *); /* Add the supplied data as an item onto the workqueue */ -extern void work_queue_add (struct work_queue *, void *); +Inline void work_queue_add (struct work_queue *, void *); + +extern void* work_queue_item_add(struct work_queue* wq) ; +Inline void* work_queue_item_args(work_queue_item item) ; /* plug the queue, ie prevent it from being drained / processed */ extern void work_queue_plug (struct work_queue *wq); @@ -122,4 +157,22 @@ extern void work_queue_unplug (struct work_queue *wq); /* Helpers, exported for thread.c and command.c */ extern int work_queue_run (struct thread *); extern struct cmd_element show_work_queues_cmd; + +/*============================================================================== + * The Inline functions + */ + +Inline void work_queue_add (struct work_queue* wq, void* data) +{ + union wq_args* args = work_queue_item_add(wq) ; + args->data = data ; +} + +/* Return pointer to the args area in the given work queue item */ +Inline void* +work_queue_item_args(work_queue_item item) +{ + return &item->args ; +} ; + #endif /* _QUAGGA_WORK_QUEUE_H */ -- cgit v1.2.3