summaryrefslogtreecommitdiffstats
path: root/bgpd/bgp_connection.c
diff options
context:
space:
mode:
authorChris Hall <GMCH@hestia.halldom.com>2010-02-16 09:52:14 +0000
committerChris Hall <GMCH@hestia.halldom.com>2010-02-16 09:52:14 +0000
commit9856e17cf2495d1f7db16e866f16bc4a8447524d (patch)
tree260d0c56610ad8f8db533737a59cbda33665752f /bgpd/bgp_connection.c
parent3b9932d5f7cdeac29a81bceeb190479b675a0435 (diff)
downloadquagga-9856e17cf2495d1f7db16e866f16bc4a8447524d.tar.bz2
quagga-9856e17cf2495d1f7db16e866f16bc4a8447524d.tar.xz
Revised thread/timer handling, work queue and scheduling.
Updated quagga thread handling to use qtimers when using the new qpnexus -- so all timers are qtimers in the new scheme. Updated work queue handling so that each work queue item is a single malloced structure, not three. (Only bgpd and zebra use the work queue system.) When using qpnexus the background thread queue is no longer a timer queue, but simply a list of pending background threads. When a background thread is waiting on a timer, it is in the qtimer pile, same like any other thread. When using qpnexus, the only remaining quagga thread queues are the event and ready queues. Revised the qpnexus loop so that only when there is nothing else to do will it consider the background threads. Revised write I/O in the BGP Engine so that all writing is via the connection's write buffer. Revised the write I/O in the Routeing Engine, so that it passes groups of updates in a single mqueue message. This all reduces the number of TCP packets sent (because BGP messages are collected together in the connection's write buffer) and reduces the number of mqueue messages involved. (No need for TCP_CORK.) Code and comments review for the new code. modified: bgpd/bgp_advertise.c modified: bgpd/bgp_common.h modified: bgpd/bgp_connection.c modified: bgpd/bgp_connection.h modified: bgpd/bgp_engine.h modified: bgpd/bgp_fsm.c modified: bgpd/bgp_main.c modified: bgpd/bgp_msg_read.c modified: bgpd/bgp_msg_write.c modified: bgpd/bgp_network.c modified: bgpd/bgp_packet.c modified: bgpd/bgp_packet.h modified: bgpd/bgp_peer.c modified: bgpd/bgp_peer_index.h modified: bgpd/bgp_route.c modified: bgpd/bgp_route_refresh.h modified: bgpd/bgp_session.c modified: bgpd/bgp_session.h modified: bgpd/bgpd.c new file: bgpd/bgpd.cx modified: lib/mqueue.h modified: lib/qpnexus.c modified: lib/qpnexus.h modified: lib/qpselect.c modified: lib/qtimers.c modified: lib/qtimers.h modified: lib/sigevent.c modified: lib/stream.c modified: lib/stream.h modified: lib/thread.c modified: lib/thread.h modified: lib/workqueue.c modified: lib/workqueue.h modified: tests/heavy-wq.c modified: zebra/zebra_rib.c
Diffstat (limited to 'bgpd/bgp_connection.c')
-rw-r--r--bgpd/bgp_connection.c309
1 files changed, 162 insertions, 147 deletions
diff --git a/bgpd/bgp_connection.c b/bgpd/bgp_connection.c
index 8320ae49..1c427318 100644
--- a/bgpd/bgp_connection.c
+++ b/bgpd/bgp_connection.c
@@ -93,7 +93,6 @@ static const char* bgp_connection_tags[] =
static void bgp_connection_init_host(bgp_connection connection,
const char* tag) ;
-static void bgp_write_buffer_init_new(bgp_wbuffer wb, size_t size) ;
static void bgp_write_buffer_free(bgp_wbuffer wb) ;
/*------------------------------------------------------------------------------
@@ -122,7 +121,8 @@ bgp_connection_init_new(bgp_connection connection, bgp_session session,
* * comatose not comatose
* * next NULL -- not on the connection queue
* * prev NULL -- not on the connection queue
- * * post bgp_fsm_null_event
+ * * follow_on bgp_fsm_null_event
+ * * exception bgp_session_null_event
* * fsm_active not active
* * notification NULL -- none received or sent
* * err no error, so far
@@ -138,15 +138,11 @@ bgp_connection_init_new(bgp_connection connection, bgp_session session,
* * msg_type none -- set when reading message
* * msg_size none -- set when reading message
* * notification_pending nothing pending
- * * wbuff all pointers NULL -- empty buffer
- * *except* must set limit so is not "full".
+ * * wbuff all pointers NULL -- empty but not writable
*/
-
- confirm(bgp_fsm_sInitial == 0) ;
- confirm(bgp_fsm_null_event == 0) ;
-
- connection->wbuff.limit = connection->wbuff.base +
- bgp_write_buffer_full_threshold ;
+ confirm(bgp_fsm_sInitial == 0) ;
+ confirm(bgp_fsm_null_event == 0) ;
+ confirm(bgp_session_null_event == 0) ;
/* Link back to session, point at its mutex and point session here */
connection->session = session ;
@@ -327,8 +323,8 @@ bgp_connection_exit(bgp_connection connection)
static void
bgp_connection_free(bgp_connection connection)
{
- assert( (connection->state == bgp_fsm_sStopping)
- && (connection->session == NULL)
+ assert( (connection->state == bgp_fsm_sStopping)
+ && (connection->session == NULL)
&& ( (connection->lock_count == 0) ||
(connection->lock_count == CUT_LOOSE_LOCK_COUNT) ) ) ;
@@ -353,20 +349,22 @@ bgp_connection_free(bgp_connection connection)
} ;
/*------------------------------------------------------------------------------
- * Allocate new write buffer and initialise pointers
+ * If required, allocate new write buffer.
+ * Initialise pointers empty and writable.
*
- * NB: assumes structure has been zeroised by the initialisation of the
- * enclosing connection.
+ * NB: structure was zeroised the enclosing connection was initialised.
+ * Buffer may have been allocated since then.
*/
static void
-bgp_write_buffer_init_new(bgp_wbuffer wb, size_t size)
+bgp_write_buffer_init(bgp_wbuffer wb, size_t size)
{
- assert(wb->base == NULL) ;
-
- wb->base = XMALLOC(MTYPE_STREAM_DATA, size) ;
- wb->limit = wb->base + size ;
+ if (wb->base == NULL)
+ {
+ wb->base = XMALLOC(MTYPE_STREAM_DATA, size) ;
+ wb->limit = wb->base + size ;
+ } ;
- wb->p_in = wb->p_out = wb->base ;
+ bgp_write_buffer_reset(wb) ;
} ;
/*------------------------------------------------------------------------------
@@ -376,7 +374,9 @@ static void
bgp_write_buffer_free(bgp_wbuffer wb)
{
if (wb->base != NULL)
- XFREE(MTYPE_STREAM_DATA, wb->base) ;
+ XFREE(MTYPE_STREAM_DATA, wb->base) ; /* sets wb->base = NULL */
+
+ wb->p_in = wb->p_out = wb->limit = wb->base;
} ;
/*==============================================================================
@@ -449,12 +449,18 @@ bgp_connection_queue_del(bgp_connection connection)
* pending queue (success) or remove connection from the pending queue.
*
* This is also where connections come to die.
+ *
+ * Returns: 0 => nothing to do
+ * 1 => dealt with one or more queued bits of work
*/
-extern void
+extern int
bgp_connection_queue_process(void)
{
mqueue_block mqb ;
+ if (bgp_connection_queue == NULL)
+ return 0 ;
+
while (bgp_connection_queue != NULL)
{
/* select the first in the queue, and step to the next */
@@ -486,6 +492,8 @@ bgp_connection_queue_process(void)
if (mqb == mqueue_local_head(&connection->pending_queue))
bgp_connection_queue_del(connection) ;
} ;
+
+ return 1 ;
} ;
/*------------------------------------------------------------------------------
@@ -527,17 +535,29 @@ bgp_connection_add_pending(bgp_connection connection, mqueue_block mqb,
*
* Expects connection to either be newly created or recently closed.
*
+ * For connect() connections this is done at connect() time, so before any
+ * connection comes up.
+ *
+ * For accept() connections this is done at accept() time, so when the
+ * connection comes up.
+ *
+ * The file is disabled in all modes.
+ *
+ * To complete the process must bgp_connection_start(), which resets the write
+ * buffer (allocating if required), and ensures that all is ready to read/write.
+ *
* Resets:
*
* * closes any file that may be lingering (should never be)
- * * resets all buffering (should all be empty)
+ * * reset all stream buffers to empty (should already be)
+ * * set write buffer unwritable
*
* Sets:
*
* * if secondary connection, turn off accept()
- * * sets the qfile and fd ready for use
+ * * sets the qfile and fd ready for use -- disabled in all modes
* * clears err -- must be OK so far
- * * discards any open_state and notification
+ * * discards any open_state
* * copies hold_timer_interval and keep_alive_timer_interval from session
*
* Expects:
@@ -545,13 +565,15 @@ bgp_connection_add_pending(bgp_connection connection, mqueue_block mqb,
* * links to/from session to be set up (including ordinal)
* * timers to be initialised
* * log and host to be set up
- * * buffers to exist
+ * * stream buffers to exist
*
* Does not touch:
*
- * * state of the connection (including post event)
+ * * state of the connection (including exception and follow-on event)
* * timers -- FSM looks after those
*
+ * NB: nothing can be written until bgp_connection_start() has been called.
+ *
* NB: requires the session to be LOCKED.
*/
extern void
@@ -580,7 +602,65 @@ bgp_connection_open(bgp_connection connection, int fd)
} ;
/*------------------------------------------------------------------------------
+ * Start connection which has just come up -- connect() or accept()
+ *
+ * Copy the local and remote addresses and note the effective address family.
+ *
+ * Make sure now have a write buffer, and set it empty and writable.
+ */
+extern void
+bgp_connection_start(bgp_connection connection, union sockunion* su_local,
+ union sockunion* su_remote)
+{
+ sockunion_set_dup(&connection->su_local, su_local) ;
+ sockunion_set_dup(&connection->su_remote, su_remote) ;
+
+ connection->paf = sockunion_family(connection->su_local) ;
+
+ bgp_write_buffer_init(&connection->wbuff, bgp_wbuff_size) ;
+} ;
+
+/*------------------------------------------------------------------------------
+ * Stop connection
+ *
+ * * reset stream buffers
+ * * empty out any pending queue
+ * * remove from the BGP Engine connection queue, if there
+ * * clear session->active flag, so will not process any more messages
+ * that expect some message to be sent.
+ * * no notification pending (yet)
+ *
+ * If required:
+ *
+ * * set write buffer unwritable
+ * * disable file in write mode
+ *
+ * NB: requires the session to be LOCKED.
+ */
+static void
+bgp_connection_stop(bgp_connection connection, int stop_writer)
+{
+ /* Reset all stream buffering empty. */
+ stream_reset(connection->ibuf) ;
+ stream_reset(connection->obuf) ;
+
+ connection->read_pending = 0 ;
+ connection->read_header = 0 ;
+ connection->notification_pending = 0 ;
+
+ /* Empty out the pending queue and remove from connection queue */
+ mqueue_local_reset_keep(&connection->pending_queue) ;
+ bgp_connection_queue_del(connection) ;
+
+ /* If required: set write buffer *unwritable* (and empty). */
+ if (stop_writer)
+ bgp_write_buffer_unwritable(&connection->wbuff) ;
+} ;
+
+/*------------------------------------------------------------------------------
* Enable connection for accept()
+ *
+ * NB: requires the session to be LOCKED.
*/
extern void
bgp_connection_enable_accept(bgp_connection connection)
@@ -590,6 +670,8 @@ bgp_connection_enable_accept(bgp_connection connection)
/*------------------------------------------------------------------------------
* Disable connection for accept() -- assuming still have session !
+ *
+ * NB: requires the session to be LOCKED.
*/
extern void
bgp_connection_disable_accept(bgp_connection connection)
@@ -605,7 +687,8 @@ bgp_connection_disable_accept(bgp_connection connection)
* * if there is an fd, close it
* * if qfile is active, remove it
* * forget any addresses
- * * reset all buffering to empty
+ * * reset all stream buffers to empty
+ * * reset write buffer to unwritable
* * empties the pending queue -- destroying all messages
*
* * for secondary connection: disable accept
@@ -630,6 +713,8 @@ bgp_connection_disable_accept(bgp_connection connection)
* * bgp_connection_free() -- to finally discard
*
* * bgp_connection_full_close() -- can do this again
+ *
+ * NB: requires the session to be LOCKED.
*/
extern void
bgp_connection_full_close(bgp_connection connection, int unset_timers)
@@ -658,20 +743,8 @@ bgp_connection_full_close(bgp_connection connection, int unset_timers)
sockunion_unset(&connection->su_local) ;
sockunion_unset(&connection->su_remote) ;
- /* Reset all buffering empty. */
- stream_reset(connection->ibuf) ;
- stream_reset(connection->obuf) ;
-
- connection->read_pending = 0 ;
- connection->read_header = 0 ;
- connection->notification_pending = 0 ;
-
- connection->wbuff.p_in = connection->wbuff.base ;
- connection->wbuff.p_out = connection->wbuff.base ;
-
- /* Empty out the pending queue and remove from connection queue */
- mqueue_local_reset_keep(&connection->pending_queue) ;
- bgp_connection_queue_del(connection) ;
+ /* Bring connection to a stop. */
+ bgp_connection_stop(connection, 1) ;
} ;
/*------------------------------------------------------------------------------
@@ -691,34 +764,39 @@ bgp_connection_full_close(bgp_connection connection, int unset_timers)
* be written (at least as far as the write buffer).
*
* Everything else is left untouched.
+ *
+ * Returns: 1 => OK, ready to send NOTIFICATION now
+ * 0 => no file descriptor => no chance of sending NOTIFICATION
+ *
+ * NB: requires the session to be LOCKED.
*/
-extern void
+extern int
bgp_connection_part_close(bgp_connection connection)
{
+ bgp_session session = connection->session ;
bgp_wbuffer wb = &connection->wbuff ;
int fd ;
uint8_t* p ;
bgp_size_t mlen ;
- /* close the qfile and any associate file descriptor */
+ /* Check that have a usable file descriptor */
fd = qps_file_fd(&connection->qf) ;
- if (fd != fd_undef)
- {
- shutdown(fd, SHUT_RD) ;
- qps_disable_modes(&connection->qf, qps_read_mbit) ;
- } ;
- /* Reset all input buffering. */
- stream_reset(connection->ibuf) ;
+ if (fd == fd_undef)
+ return 0 ;
- connection->read_pending = 0 ;
- connection->read_header = 0 ;
+ /* Shutdown the read side of this connection */
+ shutdown(fd, SHUT_RD) ;
+ qps_disable_modes(&connection->qf, qps_read_mbit) ;
- /* Reset obuf and purge wbuff. */
- stream_reset(connection->obuf) ;
+ /* Stop all buffering activity, except for write buffer. */
+ bgp_connection_stop(connection, 0) ;
- connection->notification_pending = 0 ;
+ /* Turn off session->active (if still attached). */
+ if (session != NULL)
+ session->active = 0 ;
+ /* Purge wbuff of all but current partly written message (if any) */
if (wb->p_in != wb->p_out) /* will be equal if buffer is empty */
{
passert(wb->p_out < wb->p_in) ;
@@ -739,62 +817,49 @@ bgp_connection_part_close(bgp_connection connection)
wb->p_in = wb->base + mlen ;
}
else
- wb->p_in = wb->p_out = wb->base ;
+ bgp_write_buffer_reset(wb) ;
- /* Empty out the pending queue and remove from connection queue */
- mqueue_local_reset_keep(&connection->pending_queue) ;
- bgp_connection_queue_del(connection) ;
+ /* OK -- part closed, ready to send NOTIFICATION */
+ return 1 ;
} ;
/*==============================================================================
* Writing to BGP connection -- once TCP connection has come up.
*
- * All writing is done by preparing a BGP message in the "obuf" buffer,
- * and then calling bgp_connection_write().
+ * Nothing is written directly -- all writing is qpselect driven.
*
- * If possible, that is written away immediately. If not, then no further
- * messages may be prepared until the buffer has been cleared.
- *
- * Write the contents of the "work" buffer.
+ * All writing is done by preparing a BGP message in a stream buffer,
+ * and then calling bgp_connection_write(). The contents of the stream buffer
+ * are transferred to the connection's write buffer.
*
* Returns true <=> able to write the entire buffer without blocking.
*/
-static int bgp_connection_write_direct(bgp_connection connection,
- struct stream* s) ;
static void bgp_connection_write_action(qps_file qf, void* file_info) ;
/*------------------------------------------------------------------------------
- * Write the contents of the given stream, if possible
- *
- * Writes everything or nothing.
+ * Write the contents of the given stream
*
- * If the write buffer is empty, then will attempt to write directly to the
- * socket, buffering anything that cannot be sent immediately. Any errors
- * encountered in this process generate an FSM event.
+ * Writes everything or FATAL error.
*
- * In case it is relevant, identifies when the data has been written all the
- * way into the TCP buffer.
+ * Returns: 1 => written to wbuff -- stream reset, empty
*
- * Returns: 2 => written to TCP -- it's gone -- stream reset, empty
- * 1 => written to wbuff -- waiting for socket -- stream reset, empty
- * 0 => nothing written -- insufficient space in wbuff
- * -1 => failed -- error event generated
+ * NB: actual I/O occurs in the qpselect action function -- so this cannot
+ * fail !
*/
extern int
bgp_connection_write(bgp_connection connection, struct stream* s)
{
bgp_wbuffer wb = &connection->wbuff ;
- if (bgp_write_buffer_empty(wb))
- {
- /* write buffer is empty -- attempt to write directly */
- return bgp_connection_write_direct(connection, s) ;
- } ;
-
- /* Write nothing if cannot write everything */
+ /* FATAL error if cannot write everything. */
if (bgp_write_buffer_cannot(wb, stream_pending(s)))
- return 0 ;
+ zabort("Write buffer does not have enough room") ;
+
+ /* If buffer is empty, enable write mode */
+ if (bgp_write_buffer_empty(wb))
+ qps_enable_mode(&connection->qf, qps_write_mnum,
+ bgp_connection_write_action) ;
/* Transfer the obuf contents to the write buffer. */
wb->p_in = stream_transfer(wb->p_in, s, wb->limit) ;
@@ -803,71 +868,19 @@ bgp_connection_write(bgp_connection connection, struct stream* s)
} ;
/*------------------------------------------------------------------------------
- * The write buffer is empty -- so try to write stream directly.
- *
- * If cannot empty the stream directly to the TCP buffers, transfer it to to
- * the write buffer, and enable the qpselect action.
- * (This is where the write buffer is allocated, if it hasn't yet been.)
- *
- * Either way, the stream is cleared and can be reused (unless failed).
- *
- * Returns: 2 => written to TCP -- it's gone -- stream reset, empty
- * 1 => written to wbuff -- waiting for socket -- stream reset, empty
- * -1 => failed -- error event generated
- */
-enum { bgp_wbuff_size = BGP_MSG_MAX_L * 10 } ;
-
-static int
-bgp_connection_write_direct(bgp_connection connection, struct stream* s)
-{
- int ret ;
-
- ret = stream_flush_try(s, qps_file_fd(&connection->qf)) ;
-
- if (ret == 0)
- return 2 ; /* Done: wbuff and stream are empty */
-
- else if (ret > 0)
- {
- bgp_wbuffer wb = &connection->wbuff ;
-
- /* Partial write -- set up buffering, if required. */
- if (wb->base == NULL)
- bgp_write_buffer_init_new(wb, bgp_wbuff_size) ;
-
- /* Transfer *entire* message to staging buffer */
- wb->p_in = stream_transfer(wb->base, s, wb->limit) ;
-
- wb->p_out = wb->p_in - ret ; /* output from here */
-
- /* Must now be enabled to write */
- qps_enable_mode(&connection->qf, qps_write_mnum,
- bgp_connection_write_action) ;
-
- return 1 ; /* Done: wbuff is not empty -- stream is */
- } ;
-
- /* write failed -- signal error and return failed */
- bgp_fsm_io_error(connection, errno) ;
-
- return -1 ;
-} ;
-
-/*------------------------------------------------------------------------------
* Write Action for bgp connection.
*
* Empty the write buffer if we can.
*
* If empties that, disable write mode, then:
*
- * -- if notification is pending, then generate a notification sent event
+ * -- if notification is pending, generate a notification sent event
*
* -- otherwise: place connection on the connection queue, so can start to
* flush out anything on the connection's pending queue.
*
- * If empty out everything, disable write mode.
- *
- * If encounter an error, generate TCP_fatal_error event.
+ * If encounter an error, generate TCP_fatal_error event, forcing buffer
+ * empty but unwritable.
*/
static void
bgp_connection_write_action(qps_file qf, void* file_info)
@@ -894,14 +907,16 @@ bgp_connection_write_action(qps_file qf, void* file_info)
continue ;
if ((ret != EAGAIN) && (ret != EWOULDBLOCK))
- bgp_fsm_io_error(connection, errno) ;
-
+ {
+ bgp_write_buffer_unwritable(wb) ;
+ bgp_fsm_io_error(connection, errno) ;
+ } ;
return ;
} ;
} ;
/* Buffer is empty -- reset it and disable write mode */
- wb->p_out = wb->p_in = wb->base ;
+ bgp_write_buffer_reset(wb) ;
qps_disable_modes(&connection->qf, qps_write_mbit) ;