diff options
-rw-r--r-- | bgpd/bgp_connection.c | 80 | ||||
-rw-r--r-- | bgpd/bgp_engine.c | 17 | ||||
-rw-r--r-- | bgpd/bgp_msg_write.c | 27 | ||||
-rw-r--r-- | bgpd/bgp_msg_write.h | 9 | ||||
-rw-r--r-- | bgpd/bgp_open_state.c | 11 | ||||
-rw-r--r-- | bgpd/bgp_packet.c | 159 | ||||
-rw-r--r-- | bgpd/bgp_session.c | 308 | ||||
-rw-r--r-- | bgpd/bgp_session.h | 42 | ||||
-rw-r--r-- | lib/mqueue.c | 53 | ||||
-rw-r--r-- | lib/mqueue.h | 14 |
10 files changed, 607 insertions, 113 deletions
diff --git a/bgpd/bgp_connection.c b/bgpd/bgp_connection.c index 10a6e21f..ba13c33a 100644 --- a/bgpd/bgp_connection.c +++ b/bgpd/bgp_connection.c @@ -85,11 +85,10 @@ static const char* bgp_connection_tags[] = [bgp_connection_secondary] = "(secondary)", } ; -static void -bgp_connection_init_host(bgp_connection connection, const char* tag) ; - -static void -bgp_write_buffer_init_new(bgp_wbuffer wb, size_t size) ; +static void bgp_connection_init_host(bgp_connection connection, + const char* tag) ; +static void bgp_write_buffer_init_new(bgp_wbuffer wb, size_t size) ; +static void bgp_write_buffer_free(bgp_wbuffer wb) ; /*------------------------------------------------------------------------------ * Initialise connection structure -- allocate if required. @@ -285,6 +284,28 @@ bgp_connection_exit(bgp_connection connection) static void bgp_connection_free(bgp_connection connection) { + assert( (connection->state == bgp_fsm_Stopping) && + (connection->session == NULL) ) ; + + /* Make sure is closed, so no active file, no active timers, pending queue + * is empty, not on the connection queue, etc. + */ + bgp_connection_close(connection) ; + + /* Free any components which still exist */ + bgp_notify_free(&connection->notification) ; + bgp_open_state_free(connection->open_recv) ; + if (connection->su_local != NULL) + sockunion_free(connection->su_local) ; + if (connection->su_remote != NULL) + sockunion_free(connection->su_remote) ; + if (connection->host != NULL) + XFREE(MTYPE_BGP_PEER_HOST, connection->host) ; + stream_free(connection->ibuf) ; + stream_free(connection->obuf) ; + bgp_write_buffer_free(&connection->wbuff) ; + + /* Free the body */ XFREE(MTYPE_BGP_CONNECTION, connection) ; } ; @@ -305,6 +326,16 @@ bgp_write_buffer_init_new(bgp_wbuffer wb, size_t size) wb->p_in = wb->p_out = wb->base ; } ; +/*------------------------------------------------------------------------------ + * Free any write buffer + */ +static void +bgp_write_buffer_free(bgp_wbuffer wb) +{ + if (wb->base != NULL) + XFREE(MTYPE_STREAM_DATA, wb->base) ; +} ; + /*============================================================================== * Connection queue management. * @@ -373,6 +404,7 @@ bgp_connection_queue_del(bgp_connection connection) * Process each item until its pending queue becomes empty, or its write * buffer becomes full, or it is stopped. * + * TODO: link bgp_connection_queue_process() into the bgp_engine loop. */ extern void bgp_connection_queue_process(void) @@ -381,25 +413,33 @@ bgp_connection_queue_process(void) while (bgp_connection_queue != NULL) { - /* select the first in the queue, and step to the next */ + /* select the first in the queue, and step to the next */ bgp_connection connection = bgp_connection_queue ; bgp_connection_queue = connection->next ; - /* Reap the connection if it is now stopped. */ + /* Reap the connection if it is now stopped. */ if (connection->state == bgp_fsm_Stopping) { - bgp_connection_free(connection) ; + bgp_connection_free(connection) ; /* removes from connection queue */ + continue ; } ; - /* ..... */ - - - /* ..... */ - - + /* Process next item on connection's pending queue */ + mqb = mqueue_local_head(&connection->pending_queue) ; + if (mqb != NULL) + /* The action will either remove the mqb from the pending queue, + * or remove the connection from the connection queue. + */ + { + bgp_session session = mqb_get_arg0(mqb) ; + assert( (session == connection->session) + && (connection + == session->connections[bgp_connection_primary]) ) ; + mqb_dispatch_action(mqb) ; + } + else + bgp_connection_queue_del(connection) ; } ; - - } ; /*============================================================================== @@ -738,7 +778,13 @@ bgp_connection_write_direct(bgp_connection connection, struct stream* s) * * Empty the write buffer if we can. * - * If empties that, empty the obuf if there is anything pending, and.... + * If empties that, disable write mode, then: + * + * -- if notification is pending, then generate a notification sent event + * + * -- otherwise: place connection on the connection queue, so can start to + * flush out anything on the connection's pending queue and/or send an + * XON message to the Peering Engine. * * If empty out everything, disable write mode. * diff --git a/bgpd/bgp_engine.c b/bgpd/bgp_engine.c index 3e297849..c47094f6 100644 --- a/bgpd/bgp_engine.c +++ b/bgpd/bgp_engine.c @@ -76,6 +76,10 @@ static struct qpn_nexus bgp_engine ; static void* bgp_engine_loop(void* arg) ; +/* TODO: BGP Engine side of bgp_engine_start() must call bgp_open_listeners() + * for which it needs the port and address from command line. + */ + extern void bgp_engine_start(void) { @@ -95,6 +99,19 @@ bgp_engine_start(void) } ; /*============================================================================== + * Stop the BGP Engine Thread. + * + */ + +/* TODO: BGP Engine side of bgp_engine_stop() must call bgp_close_listeners() + */ + +extern void +bgp_engine_stop(void) +{ +} ; + +/*============================================================================== * The BGP Engine Thread main loop * * Processes: diff --git a/bgpd/bgp_msg_write.c b/bgpd/bgp_msg_write.c index fbc7ba0e..f4345f49 100644 --- a/bgpd/bgp_msg_write.c +++ b/bgpd/bgp_msg_write.c @@ -755,6 +755,29 @@ bgp_msg_orf_prefix(struct stream* s, uint8_t common, } ; /*============================================================================== + * UPDATE -- send an UPDATE message + * + * PRO TEM -- this is passed a raw BGP message in a stream buffer + */ + +/*------------------------------------------------------------------------------ + * Make UPDATE message and dispatch. + * + * Returns: 2 => written to TCP -- it's gone + * 1 => written to wbuff -- waiting for socket + * 0 => nothing written -- wbuff was not empty ! + * -1 => failed -- error event generated + */ +extern int +bgp_msg_send_update(bgp_connection connection, struct stream* s) +{ + if (bgp_connection_write_full(connection)) + return 0 ; + + return bgp_connection_write(connection, s) ; +} ; + +/*============================================================================== * End-of-RIB -- send an End-of-RIB BGP message (see Graceful Restart) */ @@ -773,7 +796,7 @@ bgp_msg_send_end_of_rib(bgp_connection connection, iAFI_t afi, iSAFI_t safi) { struct stream *s = connection->obuf ; - if (!bgp_connection_write_empty(connection)) + if (bgp_connection_write_full(connection)) return 0 ; /* Make UPDATE message header */ @@ -803,7 +826,7 @@ bgp_msg_send_end_of_rib(bgp_connection connection, iAFI_t afi, iSAFI_t safi) zlog_debug ("send End-of-RIB for %s to %s", afi_safi_print (afi, safi), connection->host) ; - /* Finally -- write the obuf away */ + /* Finally -- write the buffer away */ return bgp_connection_write(connection, s) ; } ; diff --git a/bgpd/bgp_msg_write.h b/bgpd/bgp_msg_write.h index 9e24ce6f..5355ed70 100644 --- a/bgpd/bgp_msg_write.h +++ b/bgpd/bgp_msg_write.h @@ -47,12 +47,15 @@ extern int bgp_msg_send_route_refresh(bgp_connection connection, bgp_route_refresh rr) ; extern int -bgp_packet_set_marker(struct stream *s, uint8_t type) ; +bgp_msg_send_update(bgp_connection connection, struct stream* s) ; extern int -bgp_packet_set_size (struct stream *s) ; +bgp_msg_send_end_of_rib(bgp_connection connection, iAFI_t afi, iSAFI_t safi); extern int -bgp_msg_send_end_of_rib(bgp_connection connection, iAFI_t afi, iSAFI_t safi); +bgp_packet_set_marker(struct stream *s, uint8_t type) ; + +extern int +bgp_packet_set_size (struct stream *s) ; #endif /* _QUAGGA_BGP_MSG_WRITE_H */ diff --git a/bgpd/bgp_open_state.c b/bgpd/bgp_open_state.c index 9d86e03f..9c743d5b 100644 --- a/bgpd/bgp_open_state.c +++ b/bgpd/bgp_open_state.c @@ -60,11 +60,14 @@ bgp_open_state_free(bgp_open_state state) { bgp_cap_unknown unknown ; - while ((unknown = vector_ream_keep(&state->unknowns)) != NULL) - XFREE(MTYPE_TMP, unknown) ; - if (state != NULL) - XFREE(MTYPE_BGP_OPEN_STATE, state) ; + { + while ((unknown = vector_ream_keep(&state->unknowns)) != NULL) + XFREE(MTYPE_TMP, unknown) ; + + XFREE(MTYPE_BGP_OPEN_STATE, state) ; + } ; + return NULL ; } diff --git a/bgpd/bgp_packet.c b/bgpd/bgp_packet.c index 55e7975c..f432f7a7 100644 --- a/bgpd/bgp_packet.c +++ b/bgpd/bgp_packet.c @@ -142,14 +142,20 @@ bgp_connect_check (struct peer *peer) } #endif -/* Make BGP update packet. */ +/*------------------------------------------------------------------------------ + * Construct an update from head of peer->sync[afi][safi]->update. + * + * Generates complete BGP message in the peer->work stream structure. + * + * Returns: peer->work -- if have something to be written. + * NULL -- otherwise + */ static struct stream * bgp_update_packet (struct peer *peer, afi_t afi, safi_t safi) { struct stream *s; struct bgp_adj_out *adj; struct bgp_advertise *adv; - struct stream *packet; struct bgp_node *rn = NULL; struct bgp_info *binfo = NULL; bgp_size_t total_attr_len = 0; @@ -222,23 +228,25 @@ bgp_update_packet (struct peer *peer, afi_t afi, safi_t safi) break; } - if (! stream_empty (s)) - { - bgp_packet_set_size (s); - packet = stream_dup (s); - bgp_packet_add (peer, packet); - bgp_write(peer); - stream_reset (s); - return packet; - } - return NULL; + if (stream_empty (s)) + return NULL ; + + bgp_packet_set_size (s) ; + return s ; } +/*------------------------------------------------------------------------------ + * Construct an End-of-RIB update message for given AFI/SAFI. + * + * Generates complete BGP message in the peer->work stream structure. + * + * Returns: peer->work -- if have something to be written. + * NULL -- otherwise + */ static struct stream * bgp_update_packet_eor (struct peer *peer, afi_t afi, safi_t safi) { struct stream *s; - struct stream *packet; if (DISABLE_BGP_ANNOUNCE) return NULL; @@ -246,7 +254,8 @@ bgp_update_packet_eor (struct peer *peer, afi_t afi, safi_t safi) if (BGP_DEBUG (normal, NORMAL)) zlog_debug ("send End-of-RIB for %s to %s", afi_safi_print (afi, safi), peer->host); - s = stream_new (BGP_MAX_PACKET_SIZE); + s = peer->work; + stream_reset (s); /* Make BGP update packet. */ bgp_packet_set_marker (s, BGP_MSG_UPDATE); @@ -271,18 +280,21 @@ bgp_update_packet_eor (struct peer *peer, afi_t afi, safi_t safi) } bgp_packet_set_size (s); - packet = stream_dup (s); - bgp_packet_add (peer, packet); - stream_free (s); - return packet; + return s ; } -/* Make BGP withdraw packet. */ +/*------------------------------------------------------------------------------ + * Construct a withdraw update from from head of peer->sync[afi][safi]->withdraw + * + * Generates complete BGP message in the peer->work stream structure. + * + * Returns: peer->work -- if have something to be written. + * NULL -- otherwise + */ static struct stream * bgp_withdraw_packet (struct peer *peer, afi_t afi, safi_t safi) { struct stream *s; - struct stream *packet; struct bgp_adj_out *adj; struct bgp_advertise *adv; struct bgp_node *rn; @@ -342,31 +354,33 @@ bgp_withdraw_packet (struct peer *peer, afi_t afi, safi_t safi) break; } - if (! stream_empty (s)) + if (stream_empty (s)) + return NULL ; + + if (afi == AFI_IP && safi == SAFI_UNICAST) { - if (afi == AFI_IP && safi == SAFI_UNICAST) - { - unfeasible_len + unfeasible_len = stream_get_endp (s) - BGP_HEADER_SIZE - BGP_UNFEASIBLE_LEN; - stream_putw_at (s, BGP_HEADER_SIZE, unfeasible_len); - stream_putw (s, 0); - } - bgp_packet_set_size (s); - packet = stream_dup (s); - bgp_packet_add (peer, packet); - stream_reset (s); - return packet; - } + stream_putw_at (s, BGP_HEADER_SIZE, unfeasible_len); + stream_putw (s, 0); + } ; - return NULL; + bgp_packet_set_size (s); + return s ; } +/*------------------------------------------------------------------------------ + * Construct an update for the default route, place it in the obuf queue + * and kick write. + * + * Uses peer->work stream structure, but copies result to new stream, which is + * pushed onto the obuf queue. + */ void bgp_default_update_send (struct peer *peer, struct attr *attr, afi_t afi, safi_t safi, struct peer *from) { struct stream *s; - struct stream *packet; struct prefix p; unsigned long pos; bgp_size_t total_attr_len; @@ -392,7 +406,8 @@ bgp_default_update_send (struct peer *peer, struct attr *attr, p.prefixlen, attrstr); } - s = stream_new (BGP_MAX_PACKET_SIZE); + s = peer->work ; + stream_reset (s); /* Make BGP update packet. */ bgp_packet_set_marker (s, BGP_MSG_UPDATE); @@ -415,24 +430,27 @@ bgp_default_update_send (struct peer *peer, struct attr *attr, /* Set size. */ bgp_packet_set_size (s); - packet = stream_dup (s); - stream_free (s); - /* Dump packet if debug option is set. */ #ifdef DEBUG - /* bgp_packet_dump (packet); */ + /* bgp_packet_dump (s); */ #endif /* DEBUG */ /* Add packet to the peer. */ - bgp_packet_add (peer, packet); + bgp_packet_add (peer, stream_dup (s)); bgp_write(peer); } +/*------------------------------------------------------------------------------ + * Construct a withdraw update for the default route, place it in the obuf + * queue and kick write. + * + * Uses peer->work stream structure, but copies result to new stream, which is + * pushed onto the obuf queue. + */ void bgp_default_withdraw_send (struct peer *peer, afi_t afi, safi_t safi) { struct stream *s; - struct stream *packet; struct prefix p; unsigned long pos; unsigned long cp; @@ -458,7 +476,8 @@ bgp_default_withdraw_send (struct peer *peer, afi_t afi, safi_t safi) peer->host, inet_ntop(p.family, &(p.u.prefix), buf, BUFSIZ), p.prefixlen); - s = stream_new (BGP_MAX_PACKET_SIZE); + s = peer->work ; + stream_reset (s); /* Make BGP update packet. */ bgp_packet_set_marker (s, BGP_MSG_UPDATE); @@ -492,15 +511,19 @@ bgp_default_withdraw_send (struct peer *peer, afi_t afi, safi_t safi) bgp_packet_set_size (s); - packet = stream_dup (s); - stream_free (s); - /* Add packet to the peer. */ - bgp_packet_add (peer, packet); + bgp_packet_add (peer, stream_dup (s)); bgp_write(peer); } -/* Get next packet to be written. */ +/*------------------------------------------------------------------------------ + * Get next update message to be written. + * + * Generates complete BGP message in the peer->work stream structure. + * + * Returns: peer->work -- if have something to be written. + * NULL -- otherwise + */ static struct stream * bgp_write_packet (struct peer *peer) { @@ -509,10 +532,6 @@ bgp_write_packet (struct peer *peer) struct stream *s = NULL; struct bgp_advertise *adv; - s = stream_fifo_head (peer->obuf); - if (s) - return s; - for (afi = AFI_IP; afi < AFI_MAX; afi++) for (safi = SAFI_UNICAST; safi < SAFI_MAX; safi++) { @@ -593,18 +612,35 @@ bgp_write_proceed (struct peer *peer) } #endif -/* Write packets to the peer. */ +/*------------------------------------------------------------------------------ +/* Write packets to the peer -- subject to the XON flow control. + * + * Empties the obuf queue first. + * + * Then processes the peer->sync structure to generate further updates. + * + * TODO: work out how bgp_routeadv_timer fits into this. + */ int bgp_write (bgp_peer peer) { u_char type; struct stream *s; + int free_s ; while (bgp_session_is_XON(peer)) { - s = bgp_write_packet (peer); - if (! s) - break; + free_s = 0 ; + + s = stream_fifo_head(peer->obuf) ; /* returns own stream */ + if (s != NULL) + free_s = 1 ; + else + { + s = bgp_write_packet(peer); /* uses peer->work */ + if (s == NULL) + break; + } ; bgp_session_update_send(peer->session, s); @@ -644,7 +680,8 @@ bgp_write (bgp_peer peer) } /* OK we sent packet so delete it. */ - bgp_packet_delete (peer); + if (free_s) + bgp_packet_delete (peer); } return 0; @@ -821,6 +858,11 @@ bgp_notify_send (struct peer *peer, u_char code, u_char sub_code) } /* Send route refresh message to the peer. */ + +/* TODO: wire up to bgp_route_refresh structure and send a route_refresh + * message, rather than a raw "update". + */ + void bgp_route_refresh_send (struct peer *peer, afi_t afi, safi_t safi, u_char orf_type, u_char when_to_refresh, int remove) @@ -917,6 +959,9 @@ bgp_route_refresh_send (struct peer *peer, afi_t afi, safi_t safi, } /* Send capability message to the peer. */ + +/* TODO: require BGP Engine support for Dynamic Capability messages. */ + void bgp_capability_send (struct peer *peer, afi_t afi, safi_t safi, int capability_code, int action) diff --git a/bgpd/bgp_session.c b/bgpd/bgp_session.c index c0f1e8a5..0aae7168 100644 --- a/bgpd/bgp_session.c +++ b/bgpd/bgp_session.c @@ -19,29 +19,35 @@ * Boston, MA 02111-1307, USA. */ +#include "bgpd/bgp_common.h" #include "bgpd/bgp_session.h" #include "bgpd/bgp_peer.h" #include "bgpd/bgp_engine.h" #include "bgpd/bgp_peer_index.h" #include "bgpd/bgp_fsm.h" #include "bgpd/bgp_open_state.h" +#include "bgpd/bgp_route_refresh.h" +#include "bgpd/bgp_msg_write.h" + #include "bgpd/bgp_packet.h" #include "lib/memory.h" #include "lib/sockunion.h" +#include "lib/mqueue.h" +#include "lib/zassert.h" /* prototypes */ -static int -bgp_session_defer_if_stopping(bgp_session session); +static int bgp_session_defer_if_stopping(bgp_session session); static void bgp_session_do_enable(mqueue_block mqb, mqb_flag_t flag) ; static void bgp_session_do_update_recv(mqueue_block mqb, mqb_flag_t flag); static void bgp_session_do_update_send(mqueue_block mqb, mqb_flag_t flag); +static void bgp_session_do_end_of_rib_send(mqueue_block mqb, mqb_flag_t flag); +static void bgp_session_do_route_refresh_send(mqueue_block mqb, + mqb_flag_t flag); static void bgp_session_do_disable(mqueue_block mqb, mqb_flag_t flag) ; static void bgp_session_XON(bgp_session session); static void bgp_session_do_XON(mqueue_block mqb, mqb_flag_t flag); - - /*============================================================================== * BGP Session. * @@ -161,19 +167,23 @@ bgp_session_init_new(bgp_session session, bgp_peer peer) /* Free session structure * */ -bgp_session +extern bgp_session bgp_session_free(bgp_session session) { if (session == NULL) return NULL; + assert(!bgp_session_is_active(session)) ; + qpt_mutex_destroy(&session->mutex, 0) ; bgp_notify_free(&session->notification); bgp_open_state_free(session->open_send); bgp_open_state_free(session->open_recv); - XFREE(MTYPE_BGP_SESSION, session->host); - XFREE(MTYPE_BGP_SESSION, session->password); + if (session->host != NULL) + XFREE(MTYPE_BGP_SESSION, session->host); + if (session->password != NULL) + XFREE(MTYPE_BGP_SESSION, session->password); /* Zeroize to catch dangling references asap */ memset(session, 0, sizeof(struct bgp_session)) ; @@ -190,8 +200,7 @@ bgp_session_free(bgp_session session) * * */ - -void +extern void bgp_session_enable(bgp_peer peer) { bgp_session session ; @@ -361,6 +370,10 @@ bgp_session_do_disable(mqueue_block mqb, mqb_flag_t flag) bgp_session session = mqb_get_arg0(mqb) ; struct bgp_session_disable_args* args = mqb_get_args(mqb) ; + /* Immediately discard any other messages for this session. */ + mqueue_revoke(p_bgp_engine->queue, session) ; + + /* Get the FSM to send any notification and close connections */ bgp_fsm_disable_session(session, args->notification) ; } ; @@ -402,7 +415,7 @@ bgp_session_event(bgp_session session, bgp_session_event_t event, * dealt with. */ -void +extern void bgp_session_update_send(bgp_session session, struct stream* upd) { struct bgp_session_update_args* args ; @@ -411,7 +424,8 @@ bgp_session_update_send(bgp_session session, struct stream* upd) mqb = mqb_init_new(NULL, bgp_session_do_update_send, session) ; args = mqb_get_args(mqb) ; - args->buf = stream_dup(upd) ; + args->buf = stream_dup(upd) ; + args->pending = NULL ; BGP_SESSION_LOCK(session) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ session->flow_control++; /* count them in ... */ @@ -420,41 +434,283 @@ bgp_session_update_send(bgp_session session, struct stream* upd) bgp_to_bgp_engine(mqb) ; } ; +/*------------------------------------------------------------------------------ + * BGP Engine -- mqb action function -- write given BGP update message. + * + * Each connection has a pending queue associated with it, onto which messages + * are put if the connection's write buffer is unable to absorb any further + * messages. + * + * This function is called both when the mqb is received from the Peering + * Engine, and when the BGP Engine is trying to empty the connection's pending + * queue. + * + * When the mqb is received from the Peering Engine, then: + * + * -- if the connection's pending queue is empty, try to send the message. + * + * If cannot send the message (and not encountered any error), add it to + * the connection's pending queue. + * + * -- otherwise, add mqb to the pending queue. + * + * When the mqb is on the connection's pending queue it must be the head of + * that queue -- and still on the queue. Then: + * + * -- if the message is sent (or is now redundant), remove the mqb from + * the connection's pending queue. + * + * -- otherwise: leave the mqb on the connection's pending queue for later, + * but remove the connection from the connection queue, because unable to + * proceed any further. + * + * If the mqb has been dealt with (is not on the pending queue), it is freed, + * along with the stream buffer. + * + * NB: when not called "mqb_action", the mqb MUST NOT be on the connection's + * pending queue. + */ static void bgp_session_do_update_send(mqueue_block mqb, mqb_flag_t flag) { + struct bgp_session_update_args* args = mqb_get_args(mqb) ; + if (flag == mqb_action) { bgp_session session = mqb_get_arg0(mqb) ; - struct bgp_session_update_args* args = mqb_get_args(mqb) ; - int result; - - /* TODO: process an update packet */ - BGP_SESSION_LOCK(session) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ - result = session->flow_control--; /* ... count them out */ - BGP_SESSION_UNLOCK(session) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ - - if (result == 0) - bgp_session_XON(session); - } + bgp_connection connection = session->connections[bgp_connection_primary] ; + + mqueue_block head = mqueue_local_head(&connection->pending_queue) ; + + int is_pending = (args->pending != NULL) ; + if (is_pending) + assert( (args->pending == connection) && (mqb == head) ) ; + + /* If established, try and send. */ + if (connection->state == bgp_fsm_Established) + { + int ret = 0 ; + + if ((head == NULL) || is_pending) + ret = bgp_msg_send_update(connection, args->buf) ; + + if (ret == 0) + { + /* Did not fail, but could not write the message. */ + if (!is_pending) + { + mqueue_local_enqueue(&connection->pending_queue, mqb) ; + args->pending = connection ; + } + else + bgp_connection_queue_del(connection) ; + + return ; /* **** Quit now, with message intact. */ + + } + else if (ret > 0) + { + /* Successfully wrote the message. */ + int xon ; + BGP_SESSION_LOCK(session) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ + xon = --session->flow_control ; /* ... count them out */ + BGP_SESSION_UNLOCK(session) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ + + if (xon == 0) + bgp_session_XON(session); + } ; + } ; + + /* Have dealt with the message -- if was pending, it's done. */ + if (is_pending) + mqueue_local_dequeue(&connection->pending_queue) ; + } ; + stream_free(args->buf) ; mqb_free(mqb) ; -} +} ; /* Are we in XON state ? */ -int +extern int bgp_session_is_XON(bgp_peer peer) { int result = 0; bgp_session session = peer->session; BGP_SESSION_LOCK(session) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/ - result = session->flow_control < (int)BGP_WRITE_PACKET_MAX; + result = session->flow_control < BGP_XON_THRESHOLD ; BGP_SESSION_UNLOCK(session) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/ return result; -} +} ; + +/*============================================================================== + * Dispatch Route Refresh to peer -- Peering Engine -> BGP Engine + * + * The BGP Engine takes care of discarding the bgp_route_refresh once it's been + * dealt with. + */ +extern void +bgp_session_route_refresh_send(bgp_session session, bgp_route_refresh rr) +{ + struct bgp_session_route_refresh_args* args ; + mqueue_block mqb ; + + mqb = mqb_init_new(NULL, bgp_session_do_route_refresh_send, session) ; + + args = mqb_get_args(mqb) ; + args->rr = rr ; + args->pending = NULL ; + + bgp_to_bgp_engine(mqb) ; +} ; + +/*------------------------------------------------------------------------------ + * BGP Engine -- mqb action function -- write given BGP route refresh message. + * + * The logic here is the same as for bgp_session_do_update_send -- except that + * there is no flow control (!). + */ +static void +bgp_session_do_route_refresh_send(mqueue_block mqb, mqb_flag_t flag) +{ + struct bgp_session_route_refresh_args* args = mqb_get_args(mqb) ; + + if (flag == mqb_action) + { + bgp_session session = mqb_get_arg0(mqb) ; + + bgp_connection connection = session->connections[bgp_connection_primary] ; + + mqueue_block head = mqueue_local_head(&connection->pending_queue) ; + + int is_pending = (args->pending != NULL) ; + if (is_pending) + assert( (args->pending == connection) && (mqb == head) ) ; + + /* If established, try and send. */ + if (connection->state == bgp_fsm_Established) + { + int ret = 0 ; + + if ((head == NULL) || is_pending) + ret = bgp_msg_send_route_refresh(connection, args->rr) ; + + if (ret == 0) + { + /* Did not fail, but could not write everything. + * + * If this is not on the connection's pending queue, put it there. + * + * Otherwise leave it there, and take the connection off the + * connection queue -- nothing further can be done for this + * connection. + */ + if (!is_pending) + { + mqueue_local_enqueue(&connection->pending_queue, mqb) ; + args->pending = connection ; + } + else + bgp_connection_queue_del(connection) ; + + return ; /* Quit now, with message intact. */ + } ; + } ; + + /* Have dealt with the message -- if was pending, it's done. */ + if (is_pending) + mqueue_local_dequeue(&connection->pending_queue) ; + } ; + + bgp_route_refresh_free(args->rr) ; + mqb_free(mqb) ; +} ; + +/*============================================================================== + * Dispatch End-of-RIB to peer -- Peering Engine -> BGP Engine + */ +extern void +bgp_session_end_of_rib_send(bgp_session session, qAFI_t afi, qSAFI_t safi) +{ + struct bgp_session_end_of_rib_args* args ; + mqueue_block mqb ; + qafx_num_t qafx ; + + qafx = qafx_num_from_qAFI_qSAFI(afi, safi) ; + + mqb = mqb_init_new(NULL, bgp_session_do_end_of_rib_send, session) ; + + args = mqb_get_args(mqb) ; + args->afi = get_iAFI(qafx) ; + args->safi = get_iSAFI(qafx) ; + args->pending = NULL ; + + bgp_to_bgp_engine(mqb) ; +} ; + +/*------------------------------------------------------------------------------ + * BGP Engine -- mqb action function -- write given BGP end-of-RIB message. + * + * The logic here is the same as for bgp_session_do_update_send -- except that + * there is no flow control (!). + */ +static void +bgp_session_do_end_of_rib_send(mqueue_block mqb, mqb_flag_t flag) +{ + struct bgp_session_end_of_rib_args* args = mqb_get_args(mqb) ; + + if (flag == mqb_action) + { + bgp_session session = mqb_get_arg0(mqb) ; + + bgp_connection connection = session->connections[bgp_connection_primary] ; + + mqueue_block head = mqueue_local_head(&connection->pending_queue) ; + + int is_pending = (args->pending != NULL) ; + if (is_pending) + assert( (args->pending == connection) && (mqb == head) ) ; + + /* If established, try and send. */ + if (connection->state == bgp_fsm_Established) + { + int ret = 0 ; + + if ((head == NULL) || is_pending) + ret = bgp_msg_send_end_of_rib(connection, args->afi, args->safi) ; + + if (ret == 0) + { + /* Did not fail, but could not write everything. + * + * If this is not on the connection's pending queue, put it there. + * + * Otherwise leave it there, and take the connection off the + * connection queue -- nothing further can be done for this + * connection. + */ + if (!is_pending) + { + mqueue_local_enqueue(&connection->pending_queue, mqb) ; + args->pending = connection ; + } + else + bgp_connection_queue_del(connection) ; + + return ; /* Quit now, with message intact. */ + } ; + } ; + + /* Have dealt with the message -- if was pending, it's done. */ + if (is_pending) + mqueue_local_dequeue(&connection->pending_queue) ; + } ; + + mqb_free(mqb) ; +} ; + /*============================================================================== * Forward incoming update -- BGP Engine -> Peering Engine * diff --git a/bgpd/bgp_session.h b/bgpd/bgp_session.h index 1b6f4ad9..8b30e0fe 100644 --- a/bgpd/bgp_session.h +++ b/bgpd/bgp_session.h @@ -28,6 +28,7 @@ #include "bgpd/bgp_engine.h" #include "bgpd/bgp_connection.h" #include "bgpd/bgp_notification.h" +#include "bgpd/bgp_route_refresh.h" #include "bgpd/bgp_peer_index.h" #include "lib/qtimers.h" @@ -208,9 +209,31 @@ MQB_ARGS_SIZE_OK(bgp_session_enable_args) ; struct bgp_session_update_args /* to and from BGP Engine */ { struct stream* buf ; - bgp_size_t size; + bgp_size_t size ; + + bgp_connection pending ; /* used inside the BGP Engine */ + /* set NULL on message creation */ } ; -MQB_ARGS_SIZE_OK(bgp_session_enable_args) ; +MQB_ARGS_SIZE_OK(bgp_session_update_args) ; + +struct bgp_session_route_refresh_args /* to and from BGP Engine */ +{ + bgp_route_refresh rr ; + + bgp_connection pending ; /* used inside the BGP Engine */ + /* set NULL on message creation */ +} ; +MQB_ARGS_SIZE_OK(bgp_session_route_refresh_args) ; + +struct bgp_session_end_of_rib_args /* to and from BGP Engine */ +{ + iAFI_t afi ; + iSAFI_t safi ; + + bgp_connection pending ; /* used inside the BGP Engine */ + /* set NULL on message creation */ +} ; +MQB_ARGS_SIZE_OK(bgp_session_end_of_rib_args) ; struct bgp_session_event_args /* to Routeing Engine */ { @@ -220,7 +243,7 @@ struct bgp_session_event_args /* to Routeing Engine */ bgp_connection_ord_t ordinal ; /* primary/secondary connection */ int stopped ; /* session has stopped */ } ; -MQB_ARGS_SIZE_OK(bgp_session_enable_args) ; +MQB_ARGS_SIZE_OK(bgp_session_event_args) ; struct bgp_session_XON_args /* to Routeing Engine */ { @@ -228,6 +251,10 @@ struct bgp_session_XON_args /* to Routeing Engine */ } ; MQB_ARGS_SIZE_OK(bgp_session_XON_args) ; + + +enum { BGP_XON_THRESHOLD = 7 } ; + /*============================================================================== * Session mutex lock/unlock */ @@ -269,7 +296,14 @@ extern void bgp_session_update_send(bgp_session session, struct stream* upd) ; extern void -bgp_session_update_recv(bgp_session session, struct stream* buf, bgp_size_t size) ; +bgp_session_route_refresh_send(bgp_session session, bgp_route_refresh rr) ; + +extern void +bgp_session_end_of_rib_send(bgp_session session, qAFI_t afi, qSAFI_t) ; + +extern void +bgp_session_update_recv(bgp_session session, struct stream* buf, + bgp_size_t size) ; extern int bgp_session_is_XON(bgp_peer peer); diff --git a/lib/mqueue.c b/lib/mqueue.c index e1433245..f252586e 100644 --- a/lib/mqueue.c +++ b/lib/mqueue.c @@ -603,6 +603,59 @@ done: } ; /*------------------------------------------------------------------------------ + * Revoke message(s) + * + * Revokes all messages, or only messages whose arg0 matches the given value. + * (If the given value is NULL revokes everything.) + * + * Revokes by calling mqb_dispatch_destroy(). + * + * During a revoke() operation more items may be enqueued, but no other mqueue + * operations may be performed. Enqueued items may promptly be revoked, except + * for priority items if the revoke operation has already moved past the last + * priority item. + */ +extern void +mqueue_revoke(mqueue_queue mq, void* arg0) +{ + mqueue_block mqb ; + mqueue_block prev ; + + qpt_mutex_lock(&mq->mutex) ; + + prev = NULL ; + while (1) + { + if (prev == NULL) + mqb = mq->head ; + else + mqb = prev->next ; + + if (mqb == NULL) + break ; + + if ((arg0 == NULL) || (arg0 == mqb->arg0)) + { + if (prev == NULL) + mq->head = mqb->next ; + else + prev->next = mqb->next ; + + if (mq->tail == mqb) + mq->tail = prev ; + + qpt_mutex_unlock(&mq->mutex) ; + mqb_dispatch_destroy(mqb) ; + qpt_mutex_lock(&mq->mutex) ; + } + else + prev = mqb ; + } ; + + qpt_mutex_unlock(&mq->mutex) ; +} ; + +/*------------------------------------------------------------------------------ * No longer waiting for a signal -- does nothing if !qpthreads_enabled. * * Returns true <=> signal has been kicked diff --git a/lib/mqueue.h b/lib/mqueue.h index a4f5d91d..a6edb4d7 100644 --- a/lib/mqueue.h +++ b/lib/mqueue.h @@ -225,12 +225,18 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority) ; extern mqueue_block mqueue_dequeue(mqueue_queue mq, int wait, void* arg) ; +extern void +mqueue_revoke(mqueue_queue mq, void* arg0) ; + extern int mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) ; extern void mqueue_local_enqueue(mqueue_local_queue lmq, mqueue_block mqb) ; +Inline mqueue_block +mqueue_local_head(mqueue_local_queue lmq) ; + extern mqueue_block mqueue_local_dequeue(mqueue_local_queue lmq) ; @@ -257,6 +263,8 @@ extern void mqb_push_argv_u(mqueue_block mqb, mqb_uint_t u) ; extern void mqb_push_argv_array(mqueue_block mqb, unsigned n, void** array) ; Inline void mqb_dispatch(mqueue_block mqb, mqb_flag_t flag) ; +Inline void mqb_dispatch_action(mqueue_block mqb) ; +Inline void mqb_dispatch_destroy(mqueue_block mqb) ; Inline void* mqb_get_arg0(mqueue_block mqb) ; Inline void* mqb_get_args(mqueue_block mqb) ; @@ -278,6 +286,12 @@ extern void** mqb_pop_argv_array(mqueue_block mqb) ; * The Inline functions. */ +Inline mqueue_block +mqueue_local_head(mqueue_local_queue lmq) +{ + return lmq->head ; +} ; + /* Set operations. */ Inline void |