summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bgpd/bgp_connection.c80
-rw-r--r--bgpd/bgp_engine.c17
-rw-r--r--bgpd/bgp_msg_write.c27
-rw-r--r--bgpd/bgp_msg_write.h9
-rw-r--r--bgpd/bgp_open_state.c11
-rw-r--r--bgpd/bgp_packet.c159
-rw-r--r--bgpd/bgp_session.c308
-rw-r--r--bgpd/bgp_session.h42
-rw-r--r--lib/mqueue.c53
-rw-r--r--lib/mqueue.h14
10 files changed, 607 insertions, 113 deletions
diff --git a/bgpd/bgp_connection.c b/bgpd/bgp_connection.c
index 10a6e21f..ba13c33a 100644
--- a/bgpd/bgp_connection.c
+++ b/bgpd/bgp_connection.c
@@ -85,11 +85,10 @@ static const char* bgp_connection_tags[] =
[bgp_connection_secondary] = "(secondary)",
} ;
-static void
-bgp_connection_init_host(bgp_connection connection, const char* tag) ;
-
-static void
-bgp_write_buffer_init_new(bgp_wbuffer wb, size_t size) ;
+static void bgp_connection_init_host(bgp_connection connection,
+ const char* tag) ;
+static void bgp_write_buffer_init_new(bgp_wbuffer wb, size_t size) ;
+static void bgp_write_buffer_free(bgp_wbuffer wb) ;
/*------------------------------------------------------------------------------
* Initialise connection structure -- allocate if required.
@@ -285,6 +284,28 @@ bgp_connection_exit(bgp_connection connection)
static void
bgp_connection_free(bgp_connection connection)
{
+ assert( (connection->state == bgp_fsm_Stopping) &&
+ (connection->session == NULL) ) ;
+
+ /* Make sure is closed, so no active file, no active timers, pending queue
+ * is empty, not on the connection queue, etc.
+ */
+ bgp_connection_close(connection) ;
+
+ /* Free any components which still exist */
+ bgp_notify_free(&connection->notification) ;
+ bgp_open_state_free(connection->open_recv) ;
+ if (connection->su_local != NULL)
+ sockunion_free(connection->su_local) ;
+ if (connection->su_remote != NULL)
+ sockunion_free(connection->su_remote) ;
+ if (connection->host != NULL)
+ XFREE(MTYPE_BGP_PEER_HOST, connection->host) ;
+ stream_free(connection->ibuf) ;
+ stream_free(connection->obuf) ;
+ bgp_write_buffer_free(&connection->wbuff) ;
+
+ /* Free the body */
XFREE(MTYPE_BGP_CONNECTION, connection) ;
} ;
@@ -305,6 +326,16 @@ bgp_write_buffer_init_new(bgp_wbuffer wb, size_t size)
wb->p_in = wb->p_out = wb->base ;
} ;
+/*------------------------------------------------------------------------------
+ * Free any write buffer
+ */
+static void
+bgp_write_buffer_free(bgp_wbuffer wb)
+{
+ if (wb->base != NULL)
+ XFREE(MTYPE_STREAM_DATA, wb->base) ;
+} ;
+
/*==============================================================================
* Connection queue management.
*
@@ -373,6 +404,7 @@ bgp_connection_queue_del(bgp_connection connection)
* Process each item until its pending queue becomes empty, or its write
* buffer becomes full, or it is stopped.
*
+ * TODO: link bgp_connection_queue_process() into the bgp_engine loop.
*/
extern void
bgp_connection_queue_process(void)
@@ -381,25 +413,33 @@ bgp_connection_queue_process(void)
while (bgp_connection_queue != NULL)
{
- /* select the first in the queue, and step to the next */
+ /* select the first in the queue, and step to the next */
bgp_connection connection = bgp_connection_queue ;
bgp_connection_queue = connection->next ;
- /* Reap the connection if it is now stopped. */
+ /* Reap the connection if it is now stopped. */
if (connection->state == bgp_fsm_Stopping)
{
- bgp_connection_free(connection) ;
+ bgp_connection_free(connection) ; /* removes from connection queue */
+ continue ;
} ;
- /* ..... */
-
-
- /* ..... */
-
-
+ /* Process next item on connection's pending queue */
+ mqb = mqueue_local_head(&connection->pending_queue) ;
+ if (mqb != NULL)
+ /* The action will either remove the mqb from the pending queue,
+ * or remove the connection from the connection queue.
+ */
+ {
+ bgp_session session = mqb_get_arg0(mqb) ;
+ assert( (session == connection->session)
+ && (connection
+ == session->connections[bgp_connection_primary]) ) ;
+ mqb_dispatch_action(mqb) ;
+ }
+ else
+ bgp_connection_queue_del(connection) ;
} ;
-
-
} ;
/*==============================================================================
@@ -738,7 +778,13 @@ bgp_connection_write_direct(bgp_connection connection, struct stream* s)
*
* Empty the write buffer if we can.
*
- * If empties that, empty the obuf if there is anything pending, and....
+ * If empties that, disable write mode, then:
+ *
+ * -- if notification is pending, then generate a notification sent event
+ *
+ * -- otherwise: place connection on the connection queue, so can start to
+ * flush out anything on the connection's pending queue and/or send an
+ * XON message to the Peering Engine.
*
* If empty out everything, disable write mode.
*
diff --git a/bgpd/bgp_engine.c b/bgpd/bgp_engine.c
index 3e297849..c47094f6 100644
--- a/bgpd/bgp_engine.c
+++ b/bgpd/bgp_engine.c
@@ -76,6 +76,10 @@ static struct qpn_nexus bgp_engine ;
static void* bgp_engine_loop(void* arg) ;
+/* TODO: BGP Engine side of bgp_engine_start() must call bgp_open_listeners()
+ * for which it needs the port and address from command line.
+ */
+
extern void
bgp_engine_start(void)
{
@@ -95,6 +99,19 @@ bgp_engine_start(void)
} ;
/*==============================================================================
+ * Stop the BGP Engine Thread.
+ *
+ */
+
+/* TODO: BGP Engine side of bgp_engine_stop() must call bgp_close_listeners()
+ */
+
+extern void
+bgp_engine_stop(void)
+{
+} ;
+
+/*==============================================================================
* The BGP Engine Thread main loop
*
* Processes:
diff --git a/bgpd/bgp_msg_write.c b/bgpd/bgp_msg_write.c
index fbc7ba0e..f4345f49 100644
--- a/bgpd/bgp_msg_write.c
+++ b/bgpd/bgp_msg_write.c
@@ -755,6 +755,29 @@ bgp_msg_orf_prefix(struct stream* s, uint8_t common,
} ;
/*==============================================================================
+ * UPDATE -- send an UPDATE message
+ *
+ * PRO TEM -- this is passed a raw BGP message in a stream buffer
+ */
+
+/*------------------------------------------------------------------------------
+ * Make UPDATE message and dispatch.
+ *
+ * Returns: 2 => written to TCP -- it's gone
+ * 1 => written to wbuff -- waiting for socket
+ * 0 => nothing written -- wbuff was not empty !
+ * -1 => failed -- error event generated
+ */
+extern int
+bgp_msg_send_update(bgp_connection connection, struct stream* s)
+{
+ if (bgp_connection_write_full(connection))
+ return 0 ;
+
+ return bgp_connection_write(connection, s) ;
+} ;
+
+/*==============================================================================
* End-of-RIB -- send an End-of-RIB BGP message (see Graceful Restart)
*/
@@ -773,7 +796,7 @@ bgp_msg_send_end_of_rib(bgp_connection connection, iAFI_t afi, iSAFI_t safi)
{
struct stream *s = connection->obuf ;
- if (!bgp_connection_write_empty(connection))
+ if (bgp_connection_write_full(connection))
return 0 ;
/* Make UPDATE message header */
@@ -803,7 +826,7 @@ bgp_msg_send_end_of_rib(bgp_connection connection, iAFI_t afi, iSAFI_t safi)
zlog_debug ("send End-of-RIB for %s to %s", afi_safi_print (afi, safi),
connection->host) ;
- /* Finally -- write the obuf away */
+ /* Finally -- write the buffer away */
return bgp_connection_write(connection, s) ;
} ;
diff --git a/bgpd/bgp_msg_write.h b/bgpd/bgp_msg_write.h
index 9e24ce6f..5355ed70 100644
--- a/bgpd/bgp_msg_write.h
+++ b/bgpd/bgp_msg_write.h
@@ -47,12 +47,15 @@ extern int
bgp_msg_send_route_refresh(bgp_connection connection, bgp_route_refresh rr) ;
extern int
-bgp_packet_set_marker(struct stream *s, uint8_t type) ;
+bgp_msg_send_update(bgp_connection connection, struct stream* s) ;
extern int
-bgp_packet_set_size (struct stream *s) ;
+bgp_msg_send_end_of_rib(bgp_connection connection, iAFI_t afi, iSAFI_t safi);
extern int
-bgp_msg_send_end_of_rib(bgp_connection connection, iAFI_t afi, iSAFI_t safi);
+bgp_packet_set_marker(struct stream *s, uint8_t type) ;
+
+extern int
+bgp_packet_set_size (struct stream *s) ;
#endif /* _QUAGGA_BGP_MSG_WRITE_H */
diff --git a/bgpd/bgp_open_state.c b/bgpd/bgp_open_state.c
index 9d86e03f..9c743d5b 100644
--- a/bgpd/bgp_open_state.c
+++ b/bgpd/bgp_open_state.c
@@ -60,11 +60,14 @@ bgp_open_state_free(bgp_open_state state)
{
bgp_cap_unknown unknown ;
- while ((unknown = vector_ream_keep(&state->unknowns)) != NULL)
- XFREE(MTYPE_TMP, unknown) ;
-
if (state != NULL)
- XFREE(MTYPE_BGP_OPEN_STATE, state) ;
+ {
+ while ((unknown = vector_ream_keep(&state->unknowns)) != NULL)
+ XFREE(MTYPE_TMP, unknown) ;
+
+ XFREE(MTYPE_BGP_OPEN_STATE, state) ;
+ } ;
+
return NULL ;
}
diff --git a/bgpd/bgp_packet.c b/bgpd/bgp_packet.c
index 55e7975c..f432f7a7 100644
--- a/bgpd/bgp_packet.c
+++ b/bgpd/bgp_packet.c
@@ -142,14 +142,20 @@ bgp_connect_check (struct peer *peer)
}
#endif
-/* Make BGP update packet. */
+/*------------------------------------------------------------------------------
+ * Construct an update from head of peer->sync[afi][safi]->update.
+ *
+ * Generates complete BGP message in the peer->work stream structure.
+ *
+ * Returns: peer->work -- if have something to be written.
+ * NULL -- otherwise
+ */
static struct stream *
bgp_update_packet (struct peer *peer, afi_t afi, safi_t safi)
{
struct stream *s;
struct bgp_adj_out *adj;
struct bgp_advertise *adv;
- struct stream *packet;
struct bgp_node *rn = NULL;
struct bgp_info *binfo = NULL;
bgp_size_t total_attr_len = 0;
@@ -222,23 +228,25 @@ bgp_update_packet (struct peer *peer, afi_t afi, safi_t safi)
break;
}
- if (! stream_empty (s))
- {
- bgp_packet_set_size (s);
- packet = stream_dup (s);
- bgp_packet_add (peer, packet);
- bgp_write(peer);
- stream_reset (s);
- return packet;
- }
- return NULL;
+ if (stream_empty (s))
+ return NULL ;
+
+ bgp_packet_set_size (s) ;
+ return s ;
}
+/*------------------------------------------------------------------------------
+ * Construct an End-of-RIB update message for given AFI/SAFI.
+ *
+ * Generates complete BGP message in the peer->work stream structure.
+ *
+ * Returns: peer->work -- if have something to be written.
+ * NULL -- otherwise
+ */
static struct stream *
bgp_update_packet_eor (struct peer *peer, afi_t afi, safi_t safi)
{
struct stream *s;
- struct stream *packet;
if (DISABLE_BGP_ANNOUNCE)
return NULL;
@@ -246,7 +254,8 @@ bgp_update_packet_eor (struct peer *peer, afi_t afi, safi_t safi)
if (BGP_DEBUG (normal, NORMAL))
zlog_debug ("send End-of-RIB for %s to %s", afi_safi_print (afi, safi), peer->host);
- s = stream_new (BGP_MAX_PACKET_SIZE);
+ s = peer->work;
+ stream_reset (s);
/* Make BGP update packet. */
bgp_packet_set_marker (s, BGP_MSG_UPDATE);
@@ -271,18 +280,21 @@ bgp_update_packet_eor (struct peer *peer, afi_t afi, safi_t safi)
}
bgp_packet_set_size (s);
- packet = stream_dup (s);
- bgp_packet_add (peer, packet);
- stream_free (s);
- return packet;
+ return s ;
}
-/* Make BGP withdraw packet. */
+/*------------------------------------------------------------------------------
+ * Construct a withdraw update from from head of peer->sync[afi][safi]->withdraw
+ *
+ * Generates complete BGP message in the peer->work stream structure.
+ *
+ * Returns: peer->work -- if have something to be written.
+ * NULL -- otherwise
+ */
static struct stream *
bgp_withdraw_packet (struct peer *peer, afi_t afi, safi_t safi)
{
struct stream *s;
- struct stream *packet;
struct bgp_adj_out *adj;
struct bgp_advertise *adv;
struct bgp_node *rn;
@@ -342,31 +354,33 @@ bgp_withdraw_packet (struct peer *peer, afi_t afi, safi_t safi)
break;
}
- if (! stream_empty (s))
+ if (stream_empty (s))
+ return NULL ;
+
+ if (afi == AFI_IP && safi == SAFI_UNICAST)
{
- if (afi == AFI_IP && safi == SAFI_UNICAST)
- {
- unfeasible_len
+ unfeasible_len
= stream_get_endp (s) - BGP_HEADER_SIZE - BGP_UNFEASIBLE_LEN;
- stream_putw_at (s, BGP_HEADER_SIZE, unfeasible_len);
- stream_putw (s, 0);
- }
- bgp_packet_set_size (s);
- packet = stream_dup (s);
- bgp_packet_add (peer, packet);
- stream_reset (s);
- return packet;
- }
+ stream_putw_at (s, BGP_HEADER_SIZE, unfeasible_len);
+ stream_putw (s, 0);
+ } ;
- return NULL;
+ bgp_packet_set_size (s);
+ return s ;
}
+/*------------------------------------------------------------------------------
+ * Construct an update for the default route, place it in the obuf queue
+ * and kick write.
+ *
+ * Uses peer->work stream structure, but copies result to new stream, which is
+ * pushed onto the obuf queue.
+ */
void
bgp_default_update_send (struct peer *peer, struct attr *attr,
afi_t afi, safi_t safi, struct peer *from)
{
struct stream *s;
- struct stream *packet;
struct prefix p;
unsigned long pos;
bgp_size_t total_attr_len;
@@ -392,7 +406,8 @@ bgp_default_update_send (struct peer *peer, struct attr *attr,
p.prefixlen, attrstr);
}
- s = stream_new (BGP_MAX_PACKET_SIZE);
+ s = peer->work ;
+ stream_reset (s);
/* Make BGP update packet. */
bgp_packet_set_marker (s, BGP_MSG_UPDATE);
@@ -415,24 +430,27 @@ bgp_default_update_send (struct peer *peer, struct attr *attr,
/* Set size. */
bgp_packet_set_size (s);
- packet = stream_dup (s);
- stream_free (s);
-
/* Dump packet if debug option is set. */
#ifdef DEBUG
- /* bgp_packet_dump (packet); */
+ /* bgp_packet_dump (s); */
#endif /* DEBUG */
/* Add packet to the peer. */
- bgp_packet_add (peer, packet);
+ bgp_packet_add (peer, stream_dup (s));
bgp_write(peer);
}
+/*------------------------------------------------------------------------------
+ * Construct a withdraw update for the default route, place it in the obuf
+ * queue and kick write.
+ *
+ * Uses peer->work stream structure, but copies result to new stream, which is
+ * pushed onto the obuf queue.
+ */
void
bgp_default_withdraw_send (struct peer *peer, afi_t afi, safi_t safi)
{
struct stream *s;
- struct stream *packet;
struct prefix p;
unsigned long pos;
unsigned long cp;
@@ -458,7 +476,8 @@ bgp_default_withdraw_send (struct peer *peer, afi_t afi, safi_t safi)
peer->host, inet_ntop(p.family, &(p.u.prefix), buf, BUFSIZ),
p.prefixlen);
- s = stream_new (BGP_MAX_PACKET_SIZE);
+ s = peer->work ;
+ stream_reset (s);
/* Make BGP update packet. */
bgp_packet_set_marker (s, BGP_MSG_UPDATE);
@@ -492,15 +511,19 @@ bgp_default_withdraw_send (struct peer *peer, afi_t afi, safi_t safi)
bgp_packet_set_size (s);
- packet = stream_dup (s);
- stream_free (s);
-
/* Add packet to the peer. */
- bgp_packet_add (peer, packet);
+ bgp_packet_add (peer, stream_dup (s));
bgp_write(peer);
}
-/* Get next packet to be written. */
+/*------------------------------------------------------------------------------
+ * Get next update message to be written.
+ *
+ * Generates complete BGP message in the peer->work stream structure.
+ *
+ * Returns: peer->work -- if have something to be written.
+ * NULL -- otherwise
+ */
static struct stream *
bgp_write_packet (struct peer *peer)
{
@@ -509,10 +532,6 @@ bgp_write_packet (struct peer *peer)
struct stream *s = NULL;
struct bgp_advertise *adv;
- s = stream_fifo_head (peer->obuf);
- if (s)
- return s;
-
for (afi = AFI_IP; afi < AFI_MAX; afi++)
for (safi = SAFI_UNICAST; safi < SAFI_MAX; safi++)
{
@@ -593,18 +612,35 @@ bgp_write_proceed (struct peer *peer)
}
#endif
-/* Write packets to the peer. */
+/*------------------------------------------------------------------------------
+/* Write packets to the peer -- subject to the XON flow control.
+ *
+ * Empties the obuf queue first.
+ *
+ * Then processes the peer->sync structure to generate further updates.
+ *
+ * TODO: work out how bgp_routeadv_timer fits into this.
+ */
int
bgp_write (bgp_peer peer)
{
u_char type;
struct stream *s;
+ int free_s ;
while (bgp_session_is_XON(peer))
{
- s = bgp_write_packet (peer);
- if (! s)
- break;
+ free_s = 0 ;
+
+ s = stream_fifo_head(peer->obuf) ; /* returns own stream */
+ if (s != NULL)
+ free_s = 1 ;
+ else
+ {
+ s = bgp_write_packet(peer); /* uses peer->work */
+ if (s == NULL)
+ break;
+ } ;
bgp_session_update_send(peer->session, s);
@@ -644,7 +680,8 @@ bgp_write (bgp_peer peer)
}
/* OK we sent packet so delete it. */
- bgp_packet_delete (peer);
+ if (free_s)
+ bgp_packet_delete (peer);
}
return 0;
@@ -821,6 +858,11 @@ bgp_notify_send (struct peer *peer, u_char code, u_char sub_code)
}
/* Send route refresh message to the peer. */
+
+/* TODO: wire up to bgp_route_refresh structure and send a route_refresh
+ * message, rather than a raw "update".
+ */
+
void
bgp_route_refresh_send (struct peer *peer, afi_t afi, safi_t safi,
u_char orf_type, u_char when_to_refresh, int remove)
@@ -917,6 +959,9 @@ bgp_route_refresh_send (struct peer *peer, afi_t afi, safi_t safi,
}
/* Send capability message to the peer. */
+
+/* TODO: require BGP Engine support for Dynamic Capability messages. */
+
void
bgp_capability_send (struct peer *peer, afi_t afi, safi_t safi,
int capability_code, int action)
diff --git a/bgpd/bgp_session.c b/bgpd/bgp_session.c
index c0f1e8a5..0aae7168 100644
--- a/bgpd/bgp_session.c
+++ b/bgpd/bgp_session.c
@@ -19,29 +19,35 @@
* Boston, MA 02111-1307, USA.
*/
+#include "bgpd/bgp_common.h"
#include "bgpd/bgp_session.h"
#include "bgpd/bgp_peer.h"
#include "bgpd/bgp_engine.h"
#include "bgpd/bgp_peer_index.h"
#include "bgpd/bgp_fsm.h"
#include "bgpd/bgp_open_state.h"
+#include "bgpd/bgp_route_refresh.h"
+#include "bgpd/bgp_msg_write.h"
+
#include "bgpd/bgp_packet.h"
#include "lib/memory.h"
#include "lib/sockunion.h"
+#include "lib/mqueue.h"
+#include "lib/zassert.h"
/* prototypes */
-static int
-bgp_session_defer_if_stopping(bgp_session session);
+static int bgp_session_defer_if_stopping(bgp_session session);
static void bgp_session_do_enable(mqueue_block mqb, mqb_flag_t flag) ;
static void bgp_session_do_update_recv(mqueue_block mqb, mqb_flag_t flag);
static void bgp_session_do_update_send(mqueue_block mqb, mqb_flag_t flag);
+static void bgp_session_do_end_of_rib_send(mqueue_block mqb, mqb_flag_t flag);
+static void bgp_session_do_route_refresh_send(mqueue_block mqb,
+ mqb_flag_t flag);
static void bgp_session_do_disable(mqueue_block mqb, mqb_flag_t flag) ;
static void bgp_session_XON(bgp_session session);
static void bgp_session_do_XON(mqueue_block mqb, mqb_flag_t flag);
-
-
/*==============================================================================
* BGP Session.
*
@@ -161,19 +167,23 @@ bgp_session_init_new(bgp_session session, bgp_peer peer)
/* Free session structure
*
*/
-bgp_session
+extern bgp_session
bgp_session_free(bgp_session session)
{
if (session == NULL)
return NULL;
+ assert(!bgp_session_is_active(session)) ;
+
qpt_mutex_destroy(&session->mutex, 0) ;
bgp_notify_free(&session->notification);
bgp_open_state_free(session->open_send);
bgp_open_state_free(session->open_recv);
- XFREE(MTYPE_BGP_SESSION, session->host);
- XFREE(MTYPE_BGP_SESSION, session->password);
+ if (session->host != NULL)
+ XFREE(MTYPE_BGP_SESSION, session->host);
+ if (session->password != NULL)
+ XFREE(MTYPE_BGP_SESSION, session->password);
/* Zeroize to catch dangling references asap */
memset(session, 0, sizeof(struct bgp_session)) ;
@@ -190,8 +200,7 @@ bgp_session_free(bgp_session session)
*
*
*/
-
-void
+extern void
bgp_session_enable(bgp_peer peer)
{
bgp_session session ;
@@ -361,6 +370,10 @@ bgp_session_do_disable(mqueue_block mqb, mqb_flag_t flag)
bgp_session session = mqb_get_arg0(mqb) ;
struct bgp_session_disable_args* args = mqb_get_args(mqb) ;
+ /* Immediately discard any other messages for this session. */
+ mqueue_revoke(p_bgp_engine->queue, session) ;
+
+ /* Get the FSM to send any notification and close connections */
bgp_fsm_disable_session(session, args->notification) ;
} ;
@@ -402,7 +415,7 @@ bgp_session_event(bgp_session session, bgp_session_event_t event,
* dealt with.
*/
-void
+extern void
bgp_session_update_send(bgp_session session, struct stream* upd)
{
struct bgp_session_update_args* args ;
@@ -411,7 +424,8 @@ bgp_session_update_send(bgp_session session, struct stream* upd)
mqb = mqb_init_new(NULL, bgp_session_do_update_send, session) ;
args = mqb_get_args(mqb) ;
- args->buf = stream_dup(upd) ;
+ args->buf = stream_dup(upd) ;
+ args->pending = NULL ;
BGP_SESSION_LOCK(session) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
session->flow_control++; /* count them in ... */
@@ -420,41 +434,283 @@ bgp_session_update_send(bgp_session session, struct stream* upd)
bgp_to_bgp_engine(mqb) ;
} ;
+/*------------------------------------------------------------------------------
+ * BGP Engine -- mqb action function -- write given BGP update message.
+ *
+ * Each connection has a pending queue associated with it, onto which messages
+ * are put if the connection's write buffer is unable to absorb any further
+ * messages.
+ *
+ * This function is called both when the mqb is received from the Peering
+ * Engine, and when the BGP Engine is trying to empty the connection's pending
+ * queue.
+ *
+ * When the mqb is received from the Peering Engine, then:
+ *
+ * -- if the connection's pending queue is empty, try to send the message.
+ *
+ * If cannot send the message (and not encountered any error), add it to
+ * the connection's pending queue.
+ *
+ * -- otherwise, add mqb to the pending queue.
+ *
+ * When the mqb is on the connection's pending queue it must be the head of
+ * that queue -- and still on the queue. Then:
+ *
+ * -- if the message is sent (or is now redundant), remove the mqb from
+ * the connection's pending queue.
+ *
+ * -- otherwise: leave the mqb on the connection's pending queue for later,
+ * but remove the connection from the connection queue, because unable to
+ * proceed any further.
+ *
+ * If the mqb has been dealt with (is not on the pending queue), it is freed,
+ * along with the stream buffer.
+ *
+ * NB: when not called "mqb_action", the mqb MUST NOT be on the connection's
+ * pending queue.
+ */
static void
bgp_session_do_update_send(mqueue_block mqb, mqb_flag_t flag)
{
+ struct bgp_session_update_args* args = mqb_get_args(mqb) ;
+
if (flag == mqb_action)
{
bgp_session session = mqb_get_arg0(mqb) ;
- struct bgp_session_update_args* args = mqb_get_args(mqb) ;
- int result;
-
- /* TODO: process an update packet */
- BGP_SESSION_LOCK(session) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
- result = session->flow_control--; /* ... count them out */
- BGP_SESSION_UNLOCK(session) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
-
- if (result == 0)
- bgp_session_XON(session);
- }
+ bgp_connection connection = session->connections[bgp_connection_primary] ;
+
+ mqueue_block head = mqueue_local_head(&connection->pending_queue) ;
+
+ int is_pending = (args->pending != NULL) ;
+ if (is_pending)
+ assert( (args->pending == connection) && (mqb == head) ) ;
+
+ /* If established, try and send. */
+ if (connection->state == bgp_fsm_Established)
+ {
+ int ret = 0 ;
+
+ if ((head == NULL) || is_pending)
+ ret = bgp_msg_send_update(connection, args->buf) ;
+
+ if (ret == 0)
+ {
+ /* Did not fail, but could not write the message. */
+ if (!is_pending)
+ {
+ mqueue_local_enqueue(&connection->pending_queue, mqb) ;
+ args->pending = connection ;
+ }
+ else
+ bgp_connection_queue_del(connection) ;
+
+ return ; /* **** Quit now, with message intact. */
+
+ }
+ else if (ret > 0)
+ {
+ /* Successfully wrote the message. */
+ int xon ;
+ BGP_SESSION_LOCK(session) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
+ xon = --session->flow_control ; /* ... count them out */
+ BGP_SESSION_UNLOCK(session) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
+
+ if (xon == 0)
+ bgp_session_XON(session);
+ } ;
+ } ;
+
+ /* Have dealt with the message -- if was pending, it's done. */
+ if (is_pending)
+ mqueue_local_dequeue(&connection->pending_queue) ;
+ } ;
+ stream_free(args->buf) ;
mqb_free(mqb) ;
-}
+} ;
/* Are we in XON state ? */
-int
+extern int
bgp_session_is_XON(bgp_peer peer)
{
int result = 0;
bgp_session session = peer->session;
BGP_SESSION_LOCK(session) ; /*<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<*/
- result = session->flow_control < (int)BGP_WRITE_PACKET_MAX;
+ result = session->flow_control < BGP_XON_THRESHOLD ;
BGP_SESSION_UNLOCK(session) ; /*>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>*/
return result;
-}
+} ;
+
+/*==============================================================================
+ * Dispatch Route Refresh to peer -- Peering Engine -> BGP Engine
+ *
+ * The BGP Engine takes care of discarding the bgp_route_refresh once it's been
+ * dealt with.
+ */
+extern void
+bgp_session_route_refresh_send(bgp_session session, bgp_route_refresh rr)
+{
+ struct bgp_session_route_refresh_args* args ;
+ mqueue_block mqb ;
+
+ mqb = mqb_init_new(NULL, bgp_session_do_route_refresh_send, session) ;
+
+ args = mqb_get_args(mqb) ;
+ args->rr = rr ;
+ args->pending = NULL ;
+
+ bgp_to_bgp_engine(mqb) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * BGP Engine -- mqb action function -- write given BGP route refresh message.
+ *
+ * The logic here is the same as for bgp_session_do_update_send -- except that
+ * there is no flow control (!).
+ */
+static void
+bgp_session_do_route_refresh_send(mqueue_block mqb, mqb_flag_t flag)
+{
+ struct bgp_session_route_refresh_args* args = mqb_get_args(mqb) ;
+
+ if (flag == mqb_action)
+ {
+ bgp_session session = mqb_get_arg0(mqb) ;
+
+ bgp_connection connection = session->connections[bgp_connection_primary] ;
+
+ mqueue_block head = mqueue_local_head(&connection->pending_queue) ;
+
+ int is_pending = (args->pending != NULL) ;
+ if (is_pending)
+ assert( (args->pending == connection) && (mqb == head) ) ;
+
+ /* If established, try and send. */
+ if (connection->state == bgp_fsm_Established)
+ {
+ int ret = 0 ;
+
+ if ((head == NULL) || is_pending)
+ ret = bgp_msg_send_route_refresh(connection, args->rr) ;
+
+ if (ret == 0)
+ {
+ /* Did not fail, but could not write everything.
+ *
+ * If this is not on the connection's pending queue, put it there.
+ *
+ * Otherwise leave it there, and take the connection off the
+ * connection queue -- nothing further can be done for this
+ * connection.
+ */
+ if (!is_pending)
+ {
+ mqueue_local_enqueue(&connection->pending_queue, mqb) ;
+ args->pending = connection ;
+ }
+ else
+ bgp_connection_queue_del(connection) ;
+
+ return ; /* Quit now, with message intact. */
+ } ;
+ } ;
+
+ /* Have dealt with the message -- if was pending, it's done. */
+ if (is_pending)
+ mqueue_local_dequeue(&connection->pending_queue) ;
+ } ;
+
+ bgp_route_refresh_free(args->rr) ;
+ mqb_free(mqb) ;
+} ;
+
+/*==============================================================================
+ * Dispatch End-of-RIB to peer -- Peering Engine -> BGP Engine
+ */
+extern void
+bgp_session_end_of_rib_send(bgp_session session, qAFI_t afi, qSAFI_t safi)
+{
+ struct bgp_session_end_of_rib_args* args ;
+ mqueue_block mqb ;
+ qafx_num_t qafx ;
+
+ qafx = qafx_num_from_qAFI_qSAFI(afi, safi) ;
+
+ mqb = mqb_init_new(NULL, bgp_session_do_end_of_rib_send, session) ;
+
+ args = mqb_get_args(mqb) ;
+ args->afi = get_iAFI(qafx) ;
+ args->safi = get_iSAFI(qafx) ;
+ args->pending = NULL ;
+
+ bgp_to_bgp_engine(mqb) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * BGP Engine -- mqb action function -- write given BGP end-of-RIB message.
+ *
+ * The logic here is the same as for bgp_session_do_update_send -- except that
+ * there is no flow control (!).
+ */
+static void
+bgp_session_do_end_of_rib_send(mqueue_block mqb, mqb_flag_t flag)
+{
+ struct bgp_session_end_of_rib_args* args = mqb_get_args(mqb) ;
+
+ if (flag == mqb_action)
+ {
+ bgp_session session = mqb_get_arg0(mqb) ;
+
+ bgp_connection connection = session->connections[bgp_connection_primary] ;
+
+ mqueue_block head = mqueue_local_head(&connection->pending_queue) ;
+
+ int is_pending = (args->pending != NULL) ;
+ if (is_pending)
+ assert( (args->pending == connection) && (mqb == head) ) ;
+
+ /* If established, try and send. */
+ if (connection->state == bgp_fsm_Established)
+ {
+ int ret = 0 ;
+
+ if ((head == NULL) || is_pending)
+ ret = bgp_msg_send_end_of_rib(connection, args->afi, args->safi) ;
+
+ if (ret == 0)
+ {
+ /* Did not fail, but could not write everything.
+ *
+ * If this is not on the connection's pending queue, put it there.
+ *
+ * Otherwise leave it there, and take the connection off the
+ * connection queue -- nothing further can be done for this
+ * connection.
+ */
+ if (!is_pending)
+ {
+ mqueue_local_enqueue(&connection->pending_queue, mqb) ;
+ args->pending = connection ;
+ }
+ else
+ bgp_connection_queue_del(connection) ;
+
+ return ; /* Quit now, with message intact. */
+ } ;
+ } ;
+
+ /* Have dealt with the message -- if was pending, it's done. */
+ if (is_pending)
+ mqueue_local_dequeue(&connection->pending_queue) ;
+ } ;
+
+ mqb_free(mqb) ;
+} ;
+
/*==============================================================================
* Forward incoming update -- BGP Engine -> Peering Engine
*
diff --git a/bgpd/bgp_session.h b/bgpd/bgp_session.h
index 1b6f4ad9..8b30e0fe 100644
--- a/bgpd/bgp_session.h
+++ b/bgpd/bgp_session.h
@@ -28,6 +28,7 @@
#include "bgpd/bgp_engine.h"
#include "bgpd/bgp_connection.h"
#include "bgpd/bgp_notification.h"
+#include "bgpd/bgp_route_refresh.h"
#include "bgpd/bgp_peer_index.h"
#include "lib/qtimers.h"
@@ -208,9 +209,31 @@ MQB_ARGS_SIZE_OK(bgp_session_enable_args) ;
struct bgp_session_update_args /* to and from BGP Engine */
{
struct stream* buf ;
- bgp_size_t size;
+ bgp_size_t size ;
+
+ bgp_connection pending ; /* used inside the BGP Engine */
+ /* set NULL on message creation */
} ;
-MQB_ARGS_SIZE_OK(bgp_session_enable_args) ;
+MQB_ARGS_SIZE_OK(bgp_session_update_args) ;
+
+struct bgp_session_route_refresh_args /* to and from BGP Engine */
+{
+ bgp_route_refresh rr ;
+
+ bgp_connection pending ; /* used inside the BGP Engine */
+ /* set NULL on message creation */
+} ;
+MQB_ARGS_SIZE_OK(bgp_session_route_refresh_args) ;
+
+struct bgp_session_end_of_rib_args /* to and from BGP Engine */
+{
+ iAFI_t afi ;
+ iSAFI_t safi ;
+
+ bgp_connection pending ; /* used inside the BGP Engine */
+ /* set NULL on message creation */
+} ;
+MQB_ARGS_SIZE_OK(bgp_session_end_of_rib_args) ;
struct bgp_session_event_args /* to Routeing Engine */
{
@@ -220,7 +243,7 @@ struct bgp_session_event_args /* to Routeing Engine */
bgp_connection_ord_t ordinal ; /* primary/secondary connection */
int stopped ; /* session has stopped */
} ;
-MQB_ARGS_SIZE_OK(bgp_session_enable_args) ;
+MQB_ARGS_SIZE_OK(bgp_session_event_args) ;
struct bgp_session_XON_args /* to Routeing Engine */
{
@@ -228,6 +251,10 @@ struct bgp_session_XON_args /* to Routeing Engine */
} ;
MQB_ARGS_SIZE_OK(bgp_session_XON_args) ;
+
+
+enum { BGP_XON_THRESHOLD = 7 } ;
+
/*==============================================================================
* Session mutex lock/unlock
*/
@@ -269,7 +296,14 @@ extern void
bgp_session_update_send(bgp_session session, struct stream* upd) ;
extern void
-bgp_session_update_recv(bgp_session session, struct stream* buf, bgp_size_t size) ;
+bgp_session_route_refresh_send(bgp_session session, bgp_route_refresh rr) ;
+
+extern void
+bgp_session_end_of_rib_send(bgp_session session, qAFI_t afi, qSAFI_t) ;
+
+extern void
+bgp_session_update_recv(bgp_session session, struct stream* buf,
+ bgp_size_t size) ;
extern int
bgp_session_is_XON(bgp_peer peer);
diff --git a/lib/mqueue.c b/lib/mqueue.c
index e1433245..f252586e 100644
--- a/lib/mqueue.c
+++ b/lib/mqueue.c
@@ -603,6 +603,59 @@ done:
} ;
/*------------------------------------------------------------------------------
+ * Revoke message(s)
+ *
+ * Revokes all messages, or only messages whose arg0 matches the given value.
+ * (If the given value is NULL revokes everything.)
+ *
+ * Revokes by calling mqb_dispatch_destroy().
+ *
+ * During a revoke() operation more items may be enqueued, but no other mqueue
+ * operations may be performed. Enqueued items may promptly be revoked, except
+ * for priority items if the revoke operation has already moved past the last
+ * priority item.
+ */
+extern void
+mqueue_revoke(mqueue_queue mq, void* arg0)
+{
+ mqueue_block mqb ;
+ mqueue_block prev ;
+
+ qpt_mutex_lock(&mq->mutex) ;
+
+ prev = NULL ;
+ while (1)
+ {
+ if (prev == NULL)
+ mqb = mq->head ;
+ else
+ mqb = prev->next ;
+
+ if (mqb == NULL)
+ break ;
+
+ if ((arg0 == NULL) || (arg0 == mqb->arg0))
+ {
+ if (prev == NULL)
+ mq->head = mqb->next ;
+ else
+ prev->next = mqb->next ;
+
+ if (mq->tail == mqb)
+ mq->tail = prev ;
+
+ qpt_mutex_unlock(&mq->mutex) ;
+ mqb_dispatch_destroy(mqb) ;
+ qpt_mutex_lock(&mq->mutex) ;
+ }
+ else
+ prev = mqb ;
+ } ;
+
+ qpt_mutex_unlock(&mq->mutex) ;
+} ;
+
+/*------------------------------------------------------------------------------
* No longer waiting for a signal -- does nothing if !qpthreads_enabled.
*
* Returns true <=> signal has been kicked
diff --git a/lib/mqueue.h b/lib/mqueue.h
index a4f5d91d..a6edb4d7 100644
--- a/lib/mqueue.h
+++ b/lib/mqueue.h
@@ -225,12 +225,18 @@ mqueue_enqueue(mqueue_queue mq, mqueue_block mqb, int priority) ;
extern mqueue_block
mqueue_dequeue(mqueue_queue mq, int wait, void* arg) ;
+extern void
+mqueue_revoke(mqueue_queue mq, void* arg0) ;
+
extern int
mqueue_done_waiting(mqueue_queue mq, mqueue_thread_signal mtsig) ;
extern void
mqueue_local_enqueue(mqueue_local_queue lmq, mqueue_block mqb) ;
+Inline mqueue_block
+mqueue_local_head(mqueue_local_queue lmq) ;
+
extern mqueue_block
mqueue_local_dequeue(mqueue_local_queue lmq) ;
@@ -257,6 +263,8 @@ extern void mqb_push_argv_u(mqueue_block mqb, mqb_uint_t u) ;
extern void mqb_push_argv_array(mqueue_block mqb, unsigned n, void** array) ;
Inline void mqb_dispatch(mqueue_block mqb, mqb_flag_t flag) ;
+Inline void mqb_dispatch_action(mqueue_block mqb) ;
+Inline void mqb_dispatch_destroy(mqueue_block mqb) ;
Inline void* mqb_get_arg0(mqueue_block mqb) ;
Inline void* mqb_get_args(mqueue_block mqb) ;
@@ -278,6 +286,12 @@ extern void** mqb_pop_argv_array(mqueue_block mqb) ;
* The Inline functions.
*/
+Inline mqueue_block
+mqueue_local_head(mqueue_local_queue lmq)
+{
+ return lmq->head ;
+} ;
+
/* Set operations. */
Inline void