diff options
Diffstat (limited to 'lib/zclient.c')
-rw-r--r-- | lib/zclient.c | 264 |
1 files changed, 236 insertions, 28 deletions
diff --git a/lib/zclient.c b/lib/zclient.c index d3d53227..8cfa5d52 100644 --- a/lib/zclient.c +++ b/lib/zclient.c @@ -32,18 +32,25 @@ #include "zclient.h" #include "memory.h" #include "table.h" - -/* Zebra client events. */ -enum event {ZCLIENT_SCHEDULE, ZCLIENT_READ, ZCLIENT_CONNECT}; -/* Prototype for event manager. */ -static void zclient_event (enum event, struct zclient *); +/* Zebra client events. */ +enum event {ZLOOKUP_SCHEDULE, ZCLIENT_SCHEDULE, ZCLIENT_READ, ZCLIENT_CONNECT}; extern struct thread_master *master; /* This file local debug flag. */ int zclient_debug = 0; - + +/* Nexus to use, if any */ +static qpn_nexus zclient_nexus = NULL; + +/* prototypes */ +static int zclient_read (struct zclient *zclient); +static void zclient_event (enum event, struct zclient *); +static void zclient_event_r (enum event event, struct zclient *zclient); +static void zclient_event_t (enum event event, struct zclient *zclient); +static void zclient_connect_r (qtimer qtr, void* timer_info, qtime_t when); + /* Allocate zclient structure. */ struct zclient * zclient_new () @@ -55,6 +62,13 @@ zclient_new () zclient->obuf = stream_new (ZEBRA_MAX_PACKET_SIZ); zclient->wb = buffer_new(0); + if (zclient_nexus) + { + zclient->qf = qps_file_init_new(zclient->qf, NULL); + zclient->qtr = qtimer_init_new(zclient->qtr, zclient_nexus->pile, + zclient_connect_r, zclient); + } + return zclient; } @@ -73,9 +87,29 @@ zclient_free (struct zclient *zclient) if (zclient->wb) buffer_free(zclient->wb); + /* qfile and qtimer */ + if (zclient->qf) + { + qps_remove_file(zclient->qf); + qps_file_free(zclient->qf); + zclient->qf = NULL; + } + if (zclient->qtr) + { + qtimer_free(zclient->qtr); + zclient->qtr = NULL; + } + XFREE (MTYPE_ZCLIENT, zclient); } +/* Initialize to use a nexus (qpselect etc). */ +void +zclient_init_r (qpn_nexus n) +{ + zclient_nexus = n; +} + /* Initialize zebra client. Argument redist_default is unwanted redistribute route type. */ void @@ -108,6 +142,13 @@ zclient_init (struct zclient *zclient, int redist_default) zclient_event (ZCLIENT_SCHEDULE, zclient); } +/* Schedule lookup connection */ +void +zlookup_schedule(struct zclient *zclient) +{ + zclient_event (ZLOOKUP_SCHEDULE, zclient); +} + /* Stop zebra client services. */ void zclient_stop (struct zclient *zclient) @@ -120,6 +161,12 @@ zclient_stop (struct zclient *zclient) THREAD_OFF(zclient->t_connect); THREAD_OFF(zclient->t_write); + if (zclient->qf) + qps_remove_file(zclient->qf); + + if (zclient->qtr) + qtimer_unset(zclient->qtr); + /* Reset streams. */ stream_reset(zclient->ibuf); stream_reset(zclient->obuf); @@ -217,8 +264,37 @@ zclient_failed(struct zclient *zclient) return -1; } +/* Write as much data as possible. + * nexus version */ +static void +zclient_flush_data_r(qps_file qf, void* file_info) +{ + struct zclient *zclient = file_info; + + qps_disable_modes(qf, qps_write_mbit); + + if (zclient->sock < 0) + return; + + switch (buffer_flush_available(zclient->wb, zclient->sock)) + { + case BUFFER_ERROR: + zlog_warn("%s: buffer_flush_available failed on zclient fd %d, closing", + __func__, zclient->sock); + zclient_failed(zclient); + break; + case BUFFER_PENDING: + qps_enable_mode(qf, qps_write_mnum, zclient_flush_data_r) ; + break; + case BUFFER_EMPTY: + break; + } +} + +/* Write as much data as possible. + * thread version */ static int -zclient_flush_data(struct thread *thread) +zclient_flush_data_t(struct thread *thread) { struct zclient *zclient = THREAD_ARG(thread); @@ -233,7 +309,7 @@ zclient_flush_data(struct thread *thread) return zclient_failed(zclient); break; case BUFFER_PENDING: - zclient->t_write = thread_add_write(master, zclient_flush_data, + zclient->t_write = thread_add_write(master, zclient_flush_data_t, zclient, zclient->sock); break; case BUFFER_EMPTY: @@ -256,11 +332,17 @@ zclient_send_message(struct zclient *zclient) return zclient_failed(zclient); break; case BUFFER_EMPTY: - THREAD_OFF(zclient->t_write); + if (zclient_nexus) + qps_disable_modes(zclient->qf, qps_write_mbit); + else + THREAD_OFF(zclient->t_write); break; case BUFFER_PENDING: - THREAD_WRITE_ON(master, zclient->t_write, - zclient_flush_data, zclient, zclient->sock); + if (zclient_nexus) + qps_enable_mode(zclient->qf, qps_write_mnum, zclient_flush_data_r) ; + else + THREAD_WRITE_ON(master, zclient->t_write, + zclient_flush_data_t, zclient, zclient->sock); break; } return 0; @@ -313,6 +395,10 @@ zclient_start (struct zclient *zclient) if (zclient->t_connect) return 0; + /* Check timer */ + if (zclient->qtr && qtr_is_active(zclient->qtr)) + return 0; + /* Make socket. */ #ifdef HAVE_TCP_ZEBRA zclient->sock = zclient_socket (); @@ -336,6 +422,9 @@ zclient_start (struct zclient *zclient) if (zclient_debug) zlog_debug ("zclient connect success with socket [%d]", zclient->sock); + if (zclient_nexus) + qps_add_file(zclient_nexus->selection, zclient->qf, zclient->sock, zclient); + /* Create read thread. */ zclient_event (ZCLIENT_READ, zclient); @@ -358,9 +447,24 @@ zclient_start (struct zclient *zclient) } /* This function is a wrapper function for calling zclient_start from + qtimer. */ +static void +zclient_connect_r (qtimer qtr, void* timer_info, qtime_t when) +{ + struct zclient *zclient = timer_info; + + qtimer_unset(qtr); + + if (zclient_debug) + zlog_debug ("zclient_connect is called"); + + zclient_start (zclient); +} + +/* This function is a wrapper function for calling zclient_start from timer or event thread. */ static int -zclient_connect (struct thread *t) +zclient_connect_t (struct thread *t) { struct zclient *zclient; @@ -372,7 +476,51 @@ zclient_connect (struct thread *t) return zclient_start (zclient); } - + +/* Connect to zebra for nexthop lookup. + * thread version */ +static int +zlookup_connect_t (struct thread *t) +{ + struct zclient *zlookup; + + zlookup = THREAD_ARG (t); + zlookup->t_connect = NULL; + + if (zlookup->sock != -1) + return 0; + +#ifdef HAVE_TCP_ZEBRA + zlookup->sock = zclient_socket (); +#else + zlookup->sock = zclient_socket_un (ZEBRA_SERV_PATH); +#endif /* HAVE_TCP_ZEBRA */ + if (zlookup->sock < 0) + return -1; + + return 0; +} + +/* Connect to zebra for nexthop lookup. + * nexus version */ +static void +zlookup_connect_r (qtimer qtr, void* timer_info, qtime_t when) +{ + struct zclient *zlookup = timer_info; + + qtimer_unset(qtr); + + if (zlookup->sock != -1) + return; + +#ifdef HAVE_TCP_ZEBRA + zlookup->sock = zclient_socket (); +#else + zlookup->sock = zclient_socket_un (ZEBRA_SERV_PATH); +#endif /* HAVE_TCP_ZEBRA */ +} + + /* * "xdr_encode"-like interface that allows daemon (client) to send * a message to zebra server for a route that needs to be @@ -791,20 +939,32 @@ zebra_interface_address_read (int type, struct stream *s) return ifc; } - +/* nexus: Zebra client message read function. */ +static void +zclient_read_r (qps_file qf, void* file_info) +{ + struct zclient *zclient = file_info; + qps_disable_modes(qf, qps_read_mbit); + zclient_read(zclient); +} + +/* thread: Zebra client message read function. */ +static int +zclient_read_t (struct thread *thread) +{ + struct zclient *zclient = THREAD_ARG (thread); + zclient->t_read = NULL; + return zclient_read(zclient); +} + /* Zebra client message read function. */ static int -zclient_read (struct thread *thread) +zclient_read (struct zclient *zclient) { int ret; size_t already; uint16_t length, command; uint8_t marker, version; - struct zclient *zclient; - - /* Get socket to zebra. */ - zclient = THREAD_ARG (thread); - zclient->t_read = NULL; /* Read zebra header (if we don't have it already). */ if ((already = stream_get_endp(zclient->ibuf)) < ZEBRA_HEADER_SIZE) @@ -991,15 +1151,30 @@ zclient_redistribute_default (int command, struct zclient *zclient) zebra_message_send (zclient, command); } +/* Arm event. */ static void zclient_event (enum event event, struct zclient *zclient) { + if (zclient_nexus) + zclient_event_r(event, zclient); + else + zclient_event_t(event, zclient); +} + +/* Arm event. + * nexus version */ +static void +zclient_event_r (enum event event, struct zclient *zclient) +{ switch (event) { + case ZLOOKUP_SCHEDULE: + if (!qtr_is_active(zclient->qtr)) + qtimer_set(zclient->qtr, qt_get_monotonic(), zlookup_connect_r) ; + break; case ZCLIENT_SCHEDULE: - if (! zclient->t_connect) - zclient->t_connect = - thread_add_event (master, zclient_connect, zclient, 0); + if (!qtr_is_active(zclient->qtr)) + qtimer_set(zclient->qtr, qt_get_monotonic(), zclient_connect_r) ; break; case ZCLIENT_CONNECT: if (zclient->fail >= 10) @@ -1007,14 +1182,47 @@ zclient_event (enum event event, struct zclient *zclient) if (zclient_debug) zlog_debug ("zclient connect schedule interval is %d", zclient->fail < 3 ? 10 : 60); + if (!qtr_is_active(zclient->qtr)) + qtimer_set(zclient->qtr, + qt_add_monotonic(QTIME(zclient->fail < 3 ? 10 : 60)), zclient_connect_r) ; + break; + case ZCLIENT_READ: + qps_enable_mode(zclient->qf, qps_read_mnum, zclient_read_r) ; + break; + } +} + +/* Arm event. + * thread version */ +static void +zclient_event_t (enum event event, struct zclient *zclient) +{ + switch (event) + { + case ZLOOKUP_SCHEDULE: + if (! zclient->t_connect) + zclient->t_connect = + thread_add_event (master, zlookup_connect_t, zclient, 0); + break; + case ZCLIENT_SCHEDULE: + if (! zclient->t_connect) + zclient->t_connect = + thread_add_event (master, zclient_connect_t, zclient, 0); + break; + case ZCLIENT_CONNECT: + if (zclient->fail >= 10) + return; + if (zclient_debug) + zlog_debug ("zclient connect schedule interval is %d", + zclient->fail < 3 ? 10 : 60); if (! zclient->t_connect) - zclient->t_connect = - thread_add_timer (master, zclient_connect, zclient, - zclient->fail < 3 ? 10 : 60); + zclient->t_connect = + thread_add_timer (master, zclient_connect_t, zclient, + zclient->fail < 3 ? 10 : 60); break; case ZCLIENT_READ: - zclient->t_read = - thread_add_read (master, zclient_read, zclient, zclient->sock); + zclient->t_read = + thread_add_read (master, zclient_read_t, zclient, zclient->sock); break; } } |