diff options
Diffstat (limited to 'bgpd/bgp_connection.c')
-rw-r--r-- | bgpd/bgp_connection.c | 309 |
1 files changed, 162 insertions, 147 deletions
diff --git a/bgpd/bgp_connection.c b/bgpd/bgp_connection.c index 8320ae49..1c427318 100644 --- a/bgpd/bgp_connection.c +++ b/bgpd/bgp_connection.c @@ -93,7 +93,6 @@ static const char* bgp_connection_tags[] = static void bgp_connection_init_host(bgp_connection connection, const char* tag) ; -static void bgp_write_buffer_init_new(bgp_wbuffer wb, size_t size) ; static void bgp_write_buffer_free(bgp_wbuffer wb) ; /*------------------------------------------------------------------------------ @@ -122,7 +121,8 @@ bgp_connection_init_new(bgp_connection connection, bgp_session session, * * comatose not comatose * * next NULL -- not on the connection queue * * prev NULL -- not on the connection queue - * * post bgp_fsm_null_event + * * follow_on bgp_fsm_null_event + * * exception bgp_session_null_event * * fsm_active not active * * notification NULL -- none received or sent * * err no error, so far @@ -138,15 +138,11 @@ bgp_connection_init_new(bgp_connection connection, bgp_session session, * * msg_type none -- set when reading message * * msg_size none -- set when reading message * * notification_pending nothing pending - * * wbuff all pointers NULL -- empty buffer - * *except* must set limit so is not "full". + * * wbuff all pointers NULL -- empty but not writable */ - - confirm(bgp_fsm_sInitial == 0) ; - confirm(bgp_fsm_null_event == 0) ; - - connection->wbuff.limit = connection->wbuff.base + - bgp_write_buffer_full_threshold ; + confirm(bgp_fsm_sInitial == 0) ; + confirm(bgp_fsm_null_event == 0) ; + confirm(bgp_session_null_event == 0) ; /* Link back to session, point at its mutex and point session here */ connection->session = session ; @@ -327,8 +323,8 @@ bgp_connection_exit(bgp_connection connection) static void bgp_connection_free(bgp_connection connection) { - assert( (connection->state == bgp_fsm_sStopping) - && (connection->session == NULL) + assert( (connection->state == bgp_fsm_sStopping) + && (connection->session == NULL) && ( (connection->lock_count == 0) || (connection->lock_count == CUT_LOOSE_LOCK_COUNT) ) ) ; @@ -353,20 +349,22 @@ bgp_connection_free(bgp_connection connection) } ; /*------------------------------------------------------------------------------ - * Allocate new write buffer and initialise pointers + * If required, allocate new write buffer. + * Initialise pointers empty and writable. * - * NB: assumes structure has been zeroised by the initialisation of the - * enclosing connection. + * NB: structure was zeroised the enclosing connection was initialised. + * Buffer may have been allocated since then. */ static void -bgp_write_buffer_init_new(bgp_wbuffer wb, size_t size) +bgp_write_buffer_init(bgp_wbuffer wb, size_t size) { - assert(wb->base == NULL) ; - - wb->base = XMALLOC(MTYPE_STREAM_DATA, size) ; - wb->limit = wb->base + size ; + if (wb->base == NULL) + { + wb->base = XMALLOC(MTYPE_STREAM_DATA, size) ; + wb->limit = wb->base + size ; + } ; - wb->p_in = wb->p_out = wb->base ; + bgp_write_buffer_reset(wb) ; } ; /*------------------------------------------------------------------------------ @@ -376,7 +374,9 @@ static void bgp_write_buffer_free(bgp_wbuffer wb) { if (wb->base != NULL) - XFREE(MTYPE_STREAM_DATA, wb->base) ; + XFREE(MTYPE_STREAM_DATA, wb->base) ; /* sets wb->base = NULL */ + + wb->p_in = wb->p_out = wb->limit = wb->base; } ; /*============================================================================== @@ -449,12 +449,18 @@ bgp_connection_queue_del(bgp_connection connection) * pending queue (success) or remove connection from the pending queue. * * This is also where connections come to die. + * + * Returns: 0 => nothing to do + * 1 => dealt with one or more queued bits of work */ -extern void +extern int bgp_connection_queue_process(void) { mqueue_block mqb ; + if (bgp_connection_queue == NULL) + return 0 ; + while (bgp_connection_queue != NULL) { /* select the first in the queue, and step to the next */ @@ -486,6 +492,8 @@ bgp_connection_queue_process(void) if (mqb == mqueue_local_head(&connection->pending_queue)) bgp_connection_queue_del(connection) ; } ; + + return 1 ; } ; /*------------------------------------------------------------------------------ @@ -527,17 +535,29 @@ bgp_connection_add_pending(bgp_connection connection, mqueue_block mqb, * * Expects connection to either be newly created or recently closed. * + * For connect() connections this is done at connect() time, so before any + * connection comes up. + * + * For accept() connections this is done at accept() time, so when the + * connection comes up. + * + * The file is disabled in all modes. + * + * To complete the process must bgp_connection_start(), which resets the write + * buffer (allocating if required), and ensures that all is ready to read/write. + * * Resets: * * * closes any file that may be lingering (should never be) - * * resets all buffering (should all be empty) + * * reset all stream buffers to empty (should already be) + * * set write buffer unwritable * * Sets: * * * if secondary connection, turn off accept() - * * sets the qfile and fd ready for use + * * sets the qfile and fd ready for use -- disabled in all modes * * clears err -- must be OK so far - * * discards any open_state and notification + * * discards any open_state * * copies hold_timer_interval and keep_alive_timer_interval from session * * Expects: @@ -545,13 +565,15 @@ bgp_connection_add_pending(bgp_connection connection, mqueue_block mqb, * * links to/from session to be set up (including ordinal) * * timers to be initialised * * log and host to be set up - * * buffers to exist + * * stream buffers to exist * * Does not touch: * - * * state of the connection (including post event) + * * state of the connection (including exception and follow-on event) * * timers -- FSM looks after those * + * NB: nothing can be written until bgp_connection_start() has been called. + * * NB: requires the session to be LOCKED. */ extern void @@ -580,7 +602,65 @@ bgp_connection_open(bgp_connection connection, int fd) } ; /*------------------------------------------------------------------------------ + * Start connection which has just come up -- connect() or accept() + * + * Copy the local and remote addresses and note the effective address family. + * + * Make sure now have a write buffer, and set it empty and writable. + */ +extern void +bgp_connection_start(bgp_connection connection, union sockunion* su_local, + union sockunion* su_remote) +{ + sockunion_set_dup(&connection->su_local, su_local) ; + sockunion_set_dup(&connection->su_remote, su_remote) ; + + connection->paf = sockunion_family(connection->su_local) ; + + bgp_write_buffer_init(&connection->wbuff, bgp_wbuff_size) ; +} ; + +/*------------------------------------------------------------------------------ + * Stop connection + * + * * reset stream buffers + * * empty out any pending queue + * * remove from the BGP Engine connection queue, if there + * * clear session->active flag, so will not process any more messages + * that expect some message to be sent. + * * no notification pending (yet) + * + * If required: + * + * * set write buffer unwritable + * * disable file in write mode + * + * NB: requires the session to be LOCKED. + */ +static void +bgp_connection_stop(bgp_connection connection, int stop_writer) +{ + /* Reset all stream buffering empty. */ + stream_reset(connection->ibuf) ; + stream_reset(connection->obuf) ; + + connection->read_pending = 0 ; + connection->read_header = 0 ; + connection->notification_pending = 0 ; + + /* Empty out the pending queue and remove from connection queue */ + mqueue_local_reset_keep(&connection->pending_queue) ; + bgp_connection_queue_del(connection) ; + + /* If required: set write buffer *unwritable* (and empty). */ + if (stop_writer) + bgp_write_buffer_unwritable(&connection->wbuff) ; +} ; + +/*------------------------------------------------------------------------------ * Enable connection for accept() + * + * NB: requires the session to be LOCKED. */ extern void bgp_connection_enable_accept(bgp_connection connection) @@ -590,6 +670,8 @@ bgp_connection_enable_accept(bgp_connection connection) /*------------------------------------------------------------------------------ * Disable connection for accept() -- assuming still have session ! + * + * NB: requires the session to be LOCKED. */ extern void bgp_connection_disable_accept(bgp_connection connection) @@ -605,7 +687,8 @@ bgp_connection_disable_accept(bgp_connection connection) * * if there is an fd, close it * * if qfile is active, remove it * * forget any addresses - * * reset all buffering to empty + * * reset all stream buffers to empty + * * reset write buffer to unwritable * * empties the pending queue -- destroying all messages * * * for secondary connection: disable accept @@ -630,6 +713,8 @@ bgp_connection_disable_accept(bgp_connection connection) * * bgp_connection_free() -- to finally discard * * * bgp_connection_full_close() -- can do this again + * + * NB: requires the session to be LOCKED. */ extern void bgp_connection_full_close(bgp_connection connection, int unset_timers) @@ -658,20 +743,8 @@ bgp_connection_full_close(bgp_connection connection, int unset_timers) sockunion_unset(&connection->su_local) ; sockunion_unset(&connection->su_remote) ; - /* Reset all buffering empty. */ - stream_reset(connection->ibuf) ; - stream_reset(connection->obuf) ; - - connection->read_pending = 0 ; - connection->read_header = 0 ; - connection->notification_pending = 0 ; - - connection->wbuff.p_in = connection->wbuff.base ; - connection->wbuff.p_out = connection->wbuff.base ; - - /* Empty out the pending queue and remove from connection queue */ - mqueue_local_reset_keep(&connection->pending_queue) ; - bgp_connection_queue_del(connection) ; + /* Bring connection to a stop. */ + bgp_connection_stop(connection, 1) ; } ; /*------------------------------------------------------------------------------ @@ -691,34 +764,39 @@ bgp_connection_full_close(bgp_connection connection, int unset_timers) * be written (at least as far as the write buffer). * * Everything else is left untouched. + * + * Returns: 1 => OK, ready to send NOTIFICATION now + * 0 => no file descriptor => no chance of sending NOTIFICATION + * + * NB: requires the session to be LOCKED. */ -extern void +extern int bgp_connection_part_close(bgp_connection connection) { + bgp_session session = connection->session ; bgp_wbuffer wb = &connection->wbuff ; int fd ; uint8_t* p ; bgp_size_t mlen ; - /* close the qfile and any associate file descriptor */ + /* Check that have a usable file descriptor */ fd = qps_file_fd(&connection->qf) ; - if (fd != fd_undef) - { - shutdown(fd, SHUT_RD) ; - qps_disable_modes(&connection->qf, qps_read_mbit) ; - } ; - /* Reset all input buffering. */ - stream_reset(connection->ibuf) ; + if (fd == fd_undef) + return 0 ; - connection->read_pending = 0 ; - connection->read_header = 0 ; + /* Shutdown the read side of this connection */ + shutdown(fd, SHUT_RD) ; + qps_disable_modes(&connection->qf, qps_read_mbit) ; - /* Reset obuf and purge wbuff. */ - stream_reset(connection->obuf) ; + /* Stop all buffering activity, except for write buffer. */ + bgp_connection_stop(connection, 0) ; - connection->notification_pending = 0 ; + /* Turn off session->active (if still attached). */ + if (session != NULL) + session->active = 0 ; + /* Purge wbuff of all but current partly written message (if any) */ if (wb->p_in != wb->p_out) /* will be equal if buffer is empty */ { passert(wb->p_out < wb->p_in) ; @@ -739,62 +817,49 @@ bgp_connection_part_close(bgp_connection connection) wb->p_in = wb->base + mlen ; } else - wb->p_in = wb->p_out = wb->base ; + bgp_write_buffer_reset(wb) ; - /* Empty out the pending queue and remove from connection queue */ - mqueue_local_reset_keep(&connection->pending_queue) ; - bgp_connection_queue_del(connection) ; + /* OK -- part closed, ready to send NOTIFICATION */ + return 1 ; } ; /*============================================================================== * Writing to BGP connection -- once TCP connection has come up. * - * All writing is done by preparing a BGP message in the "obuf" buffer, - * and then calling bgp_connection_write(). + * Nothing is written directly -- all writing is qpselect driven. * - * If possible, that is written away immediately. If not, then no further - * messages may be prepared until the buffer has been cleared. - * - * Write the contents of the "work" buffer. + * All writing is done by preparing a BGP message in a stream buffer, + * and then calling bgp_connection_write(). The contents of the stream buffer + * are transferred to the connection's write buffer. * * Returns true <=> able to write the entire buffer without blocking. */ -static int bgp_connection_write_direct(bgp_connection connection, - struct stream* s) ; static void bgp_connection_write_action(qps_file qf, void* file_info) ; /*------------------------------------------------------------------------------ - * Write the contents of the given stream, if possible - * - * Writes everything or nothing. + * Write the contents of the given stream * - * If the write buffer is empty, then will attempt to write directly to the - * socket, buffering anything that cannot be sent immediately. Any errors - * encountered in this process generate an FSM event. + * Writes everything or FATAL error. * - * In case it is relevant, identifies when the data has been written all the - * way into the TCP buffer. + * Returns: 1 => written to wbuff -- stream reset, empty * - * Returns: 2 => written to TCP -- it's gone -- stream reset, empty - * 1 => written to wbuff -- waiting for socket -- stream reset, empty - * 0 => nothing written -- insufficient space in wbuff - * -1 => failed -- error event generated + * NB: actual I/O occurs in the qpselect action function -- so this cannot + * fail ! */ extern int bgp_connection_write(bgp_connection connection, struct stream* s) { bgp_wbuffer wb = &connection->wbuff ; - if (bgp_write_buffer_empty(wb)) - { - /* write buffer is empty -- attempt to write directly */ - return bgp_connection_write_direct(connection, s) ; - } ; - - /* Write nothing if cannot write everything */ + /* FATAL error if cannot write everything. */ if (bgp_write_buffer_cannot(wb, stream_pending(s))) - return 0 ; + zabort("Write buffer does not have enough room") ; + + /* If buffer is empty, enable write mode */ + if (bgp_write_buffer_empty(wb)) + qps_enable_mode(&connection->qf, qps_write_mnum, + bgp_connection_write_action) ; /* Transfer the obuf contents to the write buffer. */ wb->p_in = stream_transfer(wb->p_in, s, wb->limit) ; @@ -803,71 +868,19 @@ bgp_connection_write(bgp_connection connection, struct stream* s) } ; /*------------------------------------------------------------------------------ - * The write buffer is empty -- so try to write stream directly. - * - * If cannot empty the stream directly to the TCP buffers, transfer it to to - * the write buffer, and enable the qpselect action. - * (This is where the write buffer is allocated, if it hasn't yet been.) - * - * Either way, the stream is cleared and can be reused (unless failed). - * - * Returns: 2 => written to TCP -- it's gone -- stream reset, empty - * 1 => written to wbuff -- waiting for socket -- stream reset, empty - * -1 => failed -- error event generated - */ -enum { bgp_wbuff_size = BGP_MSG_MAX_L * 10 } ; - -static int -bgp_connection_write_direct(bgp_connection connection, struct stream* s) -{ - int ret ; - - ret = stream_flush_try(s, qps_file_fd(&connection->qf)) ; - - if (ret == 0) - return 2 ; /* Done: wbuff and stream are empty */ - - else if (ret > 0) - { - bgp_wbuffer wb = &connection->wbuff ; - - /* Partial write -- set up buffering, if required. */ - if (wb->base == NULL) - bgp_write_buffer_init_new(wb, bgp_wbuff_size) ; - - /* Transfer *entire* message to staging buffer */ - wb->p_in = stream_transfer(wb->base, s, wb->limit) ; - - wb->p_out = wb->p_in - ret ; /* output from here */ - - /* Must now be enabled to write */ - qps_enable_mode(&connection->qf, qps_write_mnum, - bgp_connection_write_action) ; - - return 1 ; /* Done: wbuff is not empty -- stream is */ - } ; - - /* write failed -- signal error and return failed */ - bgp_fsm_io_error(connection, errno) ; - - return -1 ; -} ; - -/*------------------------------------------------------------------------------ * Write Action for bgp connection. * * Empty the write buffer if we can. * * If empties that, disable write mode, then: * - * -- if notification is pending, then generate a notification sent event + * -- if notification is pending, generate a notification sent event * * -- otherwise: place connection on the connection queue, so can start to * flush out anything on the connection's pending queue. * - * If empty out everything, disable write mode. - * - * If encounter an error, generate TCP_fatal_error event. + * If encounter an error, generate TCP_fatal_error event, forcing buffer + * empty but unwritable. */ static void bgp_connection_write_action(qps_file qf, void* file_info) @@ -894,14 +907,16 @@ bgp_connection_write_action(qps_file qf, void* file_info) continue ; if ((ret != EAGAIN) && (ret != EWOULDBLOCK)) - bgp_fsm_io_error(connection, errno) ; - + { + bgp_write_buffer_unwritable(wb) ; + bgp_fsm_io_error(connection, errno) ; + } ; return ; } ; } ; /* Buffer is empty -- reset it and disable write mode */ - wb->p_out = wb->p_in = wb->base ; + bgp_write_buffer_reset(wb) ; qps_disable_modes(&connection->qf, qps_write_mbit) ; |