diff options
author | Chris Hall <GMCH@hestia.halldom.com> | 2010-02-16 09:52:14 +0000 |
---|---|---|
committer | Chris Hall <GMCH@hestia.halldom.com> | 2010-02-16 09:52:14 +0000 |
commit | 9856e17cf2495d1f7db16e866f16bc4a8447524d (patch) | |
tree | 260d0c56610ad8f8db533737a59cbda33665752f /bgpd/bgp_session.c | |
parent | 3b9932d5f7cdeac29a81bceeb190479b675a0435 (diff) | |
download | quagga-9856e17cf2495d1f7db16e866f16bc4a8447524d.tar.bz2 quagga-9856e17cf2495d1f7db16e866f16bc4a8447524d.tar.xz |
Revised thread/timer handling, work queue and scheduling.
Updated quagga thread handling to use qtimers when using the new
qpnexus -- so all timers are qtimers in the new scheme.
Updated work queue handling so that each work queue item is a single
malloced structure, not three. (Only bgpd and zebra use the work
queue system.)
When using qpnexus the background thread queue is no longer a timer
queue, but simply a list of pending background threads. When a
background thread is waiting on a timer, it is in the qtimer pile,
same like any other thread.
When using qpnexus, the only remaining quagga thread queues are the
event and ready queues.
Revised the qpnexus loop so that only when there is nothing else to
do will it consider the background threads.
Revised write I/O in the BGP Engine so that all writing is via the
connection's write buffer. Revised the write I/O in the Routeing
Engine, so that it passes groups of updates in a single mqueue
message. This all reduces the number of TCP packets sent (because
BGP messages are collected together in the connection's write buffer)
and reduces the number of mqueue messages involved.
(No need for TCP_CORK.)
Code and comments review for the new code.
modified: bgpd/bgp_advertise.c
modified: bgpd/bgp_common.h
modified: bgpd/bgp_connection.c
modified: bgpd/bgp_connection.h
modified: bgpd/bgp_engine.h
modified: bgpd/bgp_fsm.c
modified: bgpd/bgp_main.c
modified: bgpd/bgp_msg_read.c
modified: bgpd/bgp_msg_write.c
modified: bgpd/bgp_network.c
modified: bgpd/bgp_packet.c
modified: bgpd/bgp_packet.h
modified: bgpd/bgp_peer.c
modified: bgpd/bgp_peer_index.h
modified: bgpd/bgp_route.c
modified: bgpd/bgp_route_refresh.h
modified: bgpd/bgp_session.c
modified: bgpd/bgp_session.h
modified: bgpd/bgpd.c
new file: bgpd/bgpd.cx
modified: lib/mqueue.h
modified: lib/qpnexus.c
modified: lib/qpnexus.h
modified: lib/qpselect.c
modified: lib/qtimers.c
modified: lib/qtimers.h
modified: lib/sigevent.c
modified: lib/stream.c
modified: lib/stream.h
modified: lib/thread.c
modified: lib/thread.h
modified: lib/workqueue.c
modified: lib/workqueue.h
modified: tests/heavy-wq.c
modified: zebra/zebra_rib.c
Diffstat (limited to 'bgpd/bgp_session.c')
-rw-r--r-- | bgpd/bgp_session.c | 164 |
1 files changed, 93 insertions, 71 deletions
diff --git a/bgpd/bgp_session.c b/bgpd/bgp_session.c index a2b49da5..9d17e36c 100644 --- a/bgpd/bgp_session.c +++ b/bgpd/bgp_session.c @@ -80,10 +80,10 @@ static void bgp_session_do_route_refresh_recv(mqueue_block mqb, mqb_flag_t flag) * change any shared item in the session, except under the mutex. And * even then it may make no sense ! * - * NB: a session reaches eDisabled when the Peering Engine has sent a disable + * NB: a session reaches eDisabled when the Routing Engine has sent a disable * request to the BGP Engine, AND an eDisabled event has come back. * - * While the Peering Engine is waiting for the eDisabled event, the session + * While the Routing Engine is waiting for the eDisabled event, the session * is in sLimping state. * * The BGP Engine's primary interest is in its (private) bgp_connection @@ -212,7 +212,7 @@ bgp_session_free(bgp_session session) } /*============================================================================== - * Peering Engine: enable session for given peer -- allocate if required. + * Routing Engine: enable session for given peer -- allocate if required. * * Sets up the session given the current state of the peer. If the state * changes, then need to disable the session and re-enable it again with new @@ -226,12 +226,12 @@ bgp_session_enable(bgp_peer peer) /* Set up session if required. Check session if already exists. * - * Only the Peering Engine creates sessions, so it is safe to pick up the + * Only the Routing Engine creates sessions, so it is safe to pick up the * peer->session pointer and test it. * * If session exists, it MUST be inactive. * - * Peering Engine does not require the mutex while the session is inactive. + * Routing Engine does not require the mutex while the session is inactive. */ session = peer->session ; @@ -348,7 +348,7 @@ bgp_session_do_enable(mqueue_block mqb, mqb_flag_t flag) } ; /*============================================================================== - * Peering Engine: disable session for given peer -- if enabled (!). + * Routing Engine: disable session for given peer -- if enabled (!). * * Passes any bgp_notify to the BGP Engine, which will dispose of it in due * course. @@ -398,7 +398,7 @@ bgp_session_disable(bgp_peer peer, bgp_notify notification) * * the disable is being issued in response to a stopped event from * the BGP Engine. * - * * the session is stopped, but the message to the Peering Engine is + * * the session is stopped, but the message to the Routing Engine is * still in its message queue. * * * the session is stopped while the disable message is in the @@ -410,11 +410,11 @@ bgp_session_disable(bgp_peer peer, bgp_notify notification) * * NB: The BGP Engine will discard any outstanding work for the session. * - * The Peering Engine should discard all further messages for this + * The Routing Engine should discard all further messages for this * session up to the eDisabled, and must then discard any other * messages for the session. * - * NB: the Peering Engine MUST not issue any further messages until it sees + * NB: the Routing Engine MUST not issue any further messages until it sees * the returned eDisabled event. */ mqb = mqb_init_new(NULL, bgp_session_do_disable, session) ; @@ -433,7 +433,6 @@ bgp_session_disable(bgp_peer peer, bgp_notify notification) c = 0 ; s = 0 ; } ; - fprintf(stderr, " session disable %d/%d", c, s) ; } ; ++bgp_engine_queue_stats.event ; @@ -469,7 +468,7 @@ bgp_session_do_disable(mqueue_block mqb, mqb_flag_t flag) /*============================================================================== * BGP Engine: send session event signal to Routeing Engine * - * NB: is passing responsibility for the notification to the Peering Engine. + * NB: is passing responsibility for the notification to the Routing Engine. */ extern void bgp_session_event(bgp_session session, bgp_session_event_t event, @@ -494,21 +493,20 @@ bgp_session_event(bgp_session session, bgp_session_event_t event, args->ordinal = ordinal ; args->stopped = stopped, - ++peering_engine_queue_stats.event ; + ++routing_engine_queue_stats.event ; - bgp_to_peering_engine(mqb) ; -} + bgp_to_routing_engine(mqb) ; +} ; /*============================================================================== - * Peering Engine: dispatch update to peer -> BGP Engine + * Routing Engine: dispatch update(s) to peer -> BGP Engine * - * PRO TEM -- this is being passed the pre-packaged BGP message. + * PRO TEM -- this is being passed the pre-packaged BGP message(s). * - * The BGP Engine takes care of discarding the stream block once it's been - * dealt with. + * The BGP Engine takes care of discarding the stream block(s) once dealt with. */ extern void -bgp_session_update_send(bgp_session session, struct stream* upd) +bgp_session_update_send(bgp_session session, struct stream_fifo* fifo) { struct bgp_session_update_args* args ; mqueue_block mqb ; @@ -516,37 +514,38 @@ bgp_session_update_send(bgp_session session, struct stream* upd) mqb = mqb_init_new(NULL, bgp_session_do_update_send, session) ; args = mqb_get_args(mqb) ; - args->buf = stream_dup(upd) ; + args->buf = stream_fifo_head(fifo) ; args->is_pending = NULL ; - args->xon_kick = (session->flow_control == BGP_XON_KICK); - session->flow_control--; + args->xon_kick = (session->flow_control == BGP_XON_KICK); ++bgp_engine_queue_stats.update ; bgp_to_bgp_engine(mqb) ; + + stream_fifo_reset(fifo) ; } ; /*------------------------------------------------------------------------------ - * BGP Engine: write given BGP update message -- mqb action function. + * BGP Engine: write given BGP update message(s) -- mqb action function. * * Each connection has a pending queue associated with it, onto which messages * are put if the connection's write buffer is unable to absorb any further * messages. * - * This function is called both when the mqb is received from the Peering + * This function is called both when the mqb is received from the Routing * Engine, and when the BGP Engine is trying to empty the connection's pending * queue. * - * When the mqb is received from the Peering Engine, then: + * When the mqb is received from the Routing Engine, then: * - * -- if the connection's pending queue is empty, try to send the message. + * -- if the connection's pending queue is empty, try to send the message(s). * * When the mqb is from connection's pending queue, then: * - * -- try to send the message. + * -- try to send the message(s). * - * In any case, if cannot send the message (and not encountered any error), add - * it (back) to the connection's pending queue. + * In any case, if cannot send all the message(s), add it (back) to the + * connection's pending queue. * * If the mqb has been dealt with, it is freed, along with the stream buffer. * Also, update the flow control counter, and issue XON if required. @@ -557,43 +556,54 @@ bgp_session_do_update_send(mqueue_block mqb, mqb_flag_t flag) struct bgp_session_update_args* args = mqb_get_args(mqb) ; bgp_session session = mqb_get_arg0(mqb) ; - if ((flag == mqb_action) && session->active) + while (args->buf != NULL) { - bgp_connection connection = session->connections[bgp_connection_primary] ; - assert(connection != NULL) ; + struct stream* buf ; - /* If established, try and send. */ - if (connection->state == bgp_fsm_sEstablished) + if ((flag == mqb_action) && session->active) { - int ret = bgp_connection_no_pending(connection, &args->is_pending) ; + bgp_connection connection ; - if (ret != 0) - ret = bgp_msg_send_update(connection, args->buf) ; + connection = session->connections[bgp_connection_primary] ; + assert(connection != NULL) ; - if (ret == 0) - { - /* Either there is already a pending queue, or the message - * could not be sent (and has not failed) -- so add to the - * pending queue. - */ - bgp_connection_add_pending(connection, mqb, &args->is_pending) ; - return ; /* Quit now, with message intact. */ - } - else if (ret > 0) + /* If established, try and send. */ + if (connection->state == bgp_fsm_sEstablished) { - /* Successfully wrote the message. XON if requested */ - if (args->xon_kick) - bgp_session_XON(session); + int ret ; + ret = bgp_connection_no_pending(connection, &args->is_pending) ; + + if (ret != 0) + ret = bgp_msg_send_update(connection, args->buf) ; + + if (ret == 0) + { + /* Either there is already a pending queue, or the message + * could not be sent (and has not failed) -- so add to the + * pending queue. + */ + bgp_connection_add_pending(connection, mqb, + &args->is_pending) ; + return ; /* Quit now, with message intact. */ + } } ; } ; + + buf = args->buf ; + args->buf = buf->next ; + + stream_free(buf) ; } ; - stream_free(args->buf) ; + /* If gets to here, then has dealt with all message(s). */ + if ((flag == mqb_action) && (args->xon_kick)) + bgp_session_XON(session) ; + mqb_free(mqb) ; } ; /*------------------------------------------------------------------------------ - * Peering Engine: are we in XON state ? + * Routing Engine: are we in XON state ? */ extern int bgp_session_is_XON(bgp_peer peer) @@ -606,8 +616,20 @@ bgp_session_is_XON(bgp_peer peer) return result; } ; +/*------------------------------------------------------------------------------ + * Count down flow control -- signal if reached XON point. + */ +extern int +bgp_session_dec_flow_count(bgp_peer peer) +{ + bgp_session session = peer->session; + + assert(session->flow_control > 0) ; + return (--session->flow_control == BGP_XON_KICK) ; +} ; + /*============================================================================== - * Peering Engine: dispatch Route Refresh to peer -> BGP Engine + * Routing Engine: dispatch Route Refresh to peer -> BGP Engine * * The BGP Engine takes care of discarding the bgp_route_refresh once it's been * dealt with. @@ -671,7 +693,7 @@ bgp_session_do_route_refresh_send(mqueue_block mqb, mqb_flag_t flag) } ; /*============================================================================== - * Peering Engine: dispatch End-of-RIB to peer -> BGP Engine + * Routing Engine: dispatch End-of-RIB to peer -> BGP Engine */ extern void bgp_session_end_of_rib_send(bgp_session session, qAFI_t afi, qSAFI_t safi) @@ -736,11 +758,11 @@ bgp_session_do_end_of_rib_send(mqueue_block mqb, mqb_flag_t flag) } ; /*============================================================================== - * BGP Engine: forward incoming update -> Peering Engine + * BGP Engine: forward incoming update -> Routing Engine * * PRO TEM -- this is being passed the raw BGP message. * - * The Peering Engine takes care of discarding the stream block once it's been + * The Routing Engine takes care of discarding the stream block once it's been * dealt with. */ extern void @@ -756,13 +778,13 @@ bgp_session_update_recv(bgp_session session, struct stream* buf, bgp_size_t size args->size = size; args->xon_kick = 0; - ++peering_engine_queue_stats.update ; + ++routing_engine_queue_stats.update ; - bgp_to_peering_engine(mqb) ; + bgp_to_routing_engine(mqb) ; } /*------------------------------------------------------------------------------ - * Peering Engine: process incoming update message -- mqb action function. + * Routing Engine: process incoming update message -- mqb action function. */ static void bgp_session_do_update_recv(mqueue_block mqb, mqb_flag_t flag) @@ -787,7 +809,7 @@ bgp_session_do_update_recv(mqueue_block mqb, mqb_flag_t flag) /*============================================================================== * BGP Engine: received Route Refresh to peer * - * The Peering Engine takes care of discarding the bgp_route_refresh once + * The Routing Engine takes care of discarding the bgp_route_refresh once * it's been dealt with. */ extern void @@ -802,11 +824,11 @@ bgp_session_route_refresh_recv(bgp_session session, bgp_route_refresh rr) args->rr = rr ; args->is_pending = NULL ; - bgp_to_peering_engine(mqb) ; + bgp_to_routing_engine(mqb) ; } ; /*------------------------------------------------------------------------------ - * Peering Engine: receive given BGP route refresh message -- mqb action + * Routing Engine: receive given BGP route refresh message -- mqb action * function. */ static void @@ -823,7 +845,7 @@ bgp_session_do_route_refresh_recv(mqueue_block mqb, mqb_flag_t flag) } /*============================================================================== - * BGP Engine: send XON message to Peering Engine + * BGP Engine: send XON message to Routing Engine * * Can be sent more packets now */ @@ -836,13 +858,13 @@ bgp_session_XON(bgp_session session) confirm(sizeof(struct bgp_session_XON_args) == 0) ; - ++peering_engine_queue_stats.xon ; + ++routing_engine_queue_stats.xon ; - bgp_to_peering_engine(mqb) ; + bgp_to_routing_engine(mqb) ; } /*------------------------------------------------------------------------------ - * Peering Engine: process incoming XON message -- mqb action function. + * Routing Engine: process incoming XON message -- mqb action function. */ static void bgp_session_do_XON(mqueue_block mqb, mqb_flag_t flag) @@ -854,14 +876,14 @@ bgp_session_do_XON(mqueue_block mqb, mqb_flag_t flag) int xoff = (session->flow_control <= 0); session->flow_control = BGP_XON_REFRESH; if (xoff) - bgp_write (session->peer) ; + bgp_write (session->peer, NULL) ; } mqb_free(mqb) ; } /*============================================================================== - * Peering Engine: send set ttl message to BGP Engine + * Routing Engine: send set ttl message to BGP Engine * */ void @@ -918,7 +940,7 @@ bgp_session_do_set_ttl(mqueue_block mqb, mqb_flag_t flag) * pointer is NULL -- this is largely paranoia, but it would be a grave * mistake for the listening socket(s) to find a session which is not active ! * - * NB: accessing Peering Engine "private" variable -- no lock required. + * NB: accessing Routing Engine "private" variable -- no lock required. * * accessing index_entry when not active -- no lock required. */ @@ -943,13 +965,13 @@ bgp_session_is_active(bgp_session session) } ; /*------------------------------------------------------------------------------ - * Peering Engine: if session is limping we defer re-enabling the session + * Routing Engine: if session is limping we defer re-enabling the session * until it is disabled. * * returns 1 if limping and defer * returns 0 if not limping * - * NB: accessing Peering Engine "private" variable -- no lock required. + * NB: accessing Routing Engine "private" variable -- no lock required. */ static int bgp_session_defer_if_limping(bgp_session session) |