summaryrefslogtreecommitdiffstats
path: root/lib/zclient.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/zclient.c')
-rw-r--r--lib/zclient.c264
1 files changed, 236 insertions, 28 deletions
diff --git a/lib/zclient.c b/lib/zclient.c
index d3d53227..8cfa5d52 100644
--- a/lib/zclient.c
+++ b/lib/zclient.c
@@ -32,18 +32,25 @@
#include "zclient.h"
#include "memory.h"
#include "table.h"
-
-/* Zebra client events. */
-enum event {ZCLIENT_SCHEDULE, ZCLIENT_READ, ZCLIENT_CONNECT};
-/* Prototype for event manager. */
-static void zclient_event (enum event, struct zclient *);
+/* Zebra client events. */
+enum event {ZLOOKUP_SCHEDULE, ZCLIENT_SCHEDULE, ZCLIENT_READ, ZCLIENT_CONNECT};
extern struct thread_master *master;
/* This file local debug flag. */
int zclient_debug = 0;
-
+
+/* Nexus to use, if any */
+static qpn_nexus zclient_nexus = NULL;
+
+/* prototypes */
+static int zclient_read (struct zclient *zclient);
+static void zclient_event (enum event, struct zclient *);
+static void zclient_event_r (enum event event, struct zclient *zclient);
+static void zclient_event_t (enum event event, struct zclient *zclient);
+static void zclient_connect_r (qtimer qtr, void* timer_info, qtime_t when);
+
/* Allocate zclient structure. */
struct zclient *
zclient_new ()
@@ -55,6 +62,13 @@ zclient_new ()
zclient->obuf = stream_new (ZEBRA_MAX_PACKET_SIZ);
zclient->wb = buffer_new(0);
+ if (zclient_nexus)
+ {
+ zclient->qf = qps_file_init_new(zclient->qf, NULL);
+ zclient->qtr = qtimer_init_new(zclient->qtr, zclient_nexus->pile,
+ zclient_connect_r, zclient);
+ }
+
return zclient;
}
@@ -73,9 +87,29 @@ zclient_free (struct zclient *zclient)
if (zclient->wb)
buffer_free(zclient->wb);
+ /* qfile and qtimer */
+ if (zclient->qf)
+ {
+ qps_remove_file(zclient->qf);
+ qps_file_free(zclient->qf);
+ zclient->qf = NULL;
+ }
+ if (zclient->qtr)
+ {
+ qtimer_free(zclient->qtr);
+ zclient->qtr = NULL;
+ }
+
XFREE (MTYPE_ZCLIENT, zclient);
}
+/* Initialize to use a nexus (qpselect etc). */
+void
+zclient_init_r (qpn_nexus n)
+{
+ zclient_nexus = n;
+}
+
/* Initialize zebra client. Argument redist_default is unwanted
redistribute route type. */
void
@@ -108,6 +142,13 @@ zclient_init (struct zclient *zclient, int redist_default)
zclient_event (ZCLIENT_SCHEDULE, zclient);
}
+/* Schedule lookup connection */
+void
+zlookup_schedule(struct zclient *zclient)
+{
+ zclient_event (ZLOOKUP_SCHEDULE, zclient);
+}
+
/* Stop zebra client services. */
void
zclient_stop (struct zclient *zclient)
@@ -120,6 +161,12 @@ zclient_stop (struct zclient *zclient)
THREAD_OFF(zclient->t_connect);
THREAD_OFF(zclient->t_write);
+ if (zclient->qf)
+ qps_remove_file(zclient->qf);
+
+ if (zclient->qtr)
+ qtimer_unset(zclient->qtr);
+
/* Reset streams. */
stream_reset(zclient->ibuf);
stream_reset(zclient->obuf);
@@ -217,8 +264,37 @@ zclient_failed(struct zclient *zclient)
return -1;
}
+/* Write as much data as possible.
+ * nexus version */
+static void
+zclient_flush_data_r(qps_file qf, void* file_info)
+{
+ struct zclient *zclient = file_info;
+
+ qps_disable_modes(qf, qps_write_mbit);
+
+ if (zclient->sock < 0)
+ return;
+
+ switch (buffer_flush_available(zclient->wb, zclient->sock))
+ {
+ case BUFFER_ERROR:
+ zlog_warn("%s: buffer_flush_available failed on zclient fd %d, closing",
+ __func__, zclient->sock);
+ zclient_failed(zclient);
+ break;
+ case BUFFER_PENDING:
+ qps_enable_mode(qf, qps_write_mnum, zclient_flush_data_r) ;
+ break;
+ case BUFFER_EMPTY:
+ break;
+ }
+}
+
+/* Write as much data as possible.
+ * thread version */
static int
-zclient_flush_data(struct thread *thread)
+zclient_flush_data_t(struct thread *thread)
{
struct zclient *zclient = THREAD_ARG(thread);
@@ -233,7 +309,7 @@ zclient_flush_data(struct thread *thread)
return zclient_failed(zclient);
break;
case BUFFER_PENDING:
- zclient->t_write = thread_add_write(master, zclient_flush_data,
+ zclient->t_write = thread_add_write(master, zclient_flush_data_t,
zclient, zclient->sock);
break;
case BUFFER_EMPTY:
@@ -256,11 +332,17 @@ zclient_send_message(struct zclient *zclient)
return zclient_failed(zclient);
break;
case BUFFER_EMPTY:
- THREAD_OFF(zclient->t_write);
+ if (zclient_nexus)
+ qps_disable_modes(zclient->qf, qps_write_mbit);
+ else
+ THREAD_OFF(zclient->t_write);
break;
case BUFFER_PENDING:
- THREAD_WRITE_ON(master, zclient->t_write,
- zclient_flush_data, zclient, zclient->sock);
+ if (zclient_nexus)
+ qps_enable_mode(zclient->qf, qps_write_mnum, zclient_flush_data_r) ;
+ else
+ THREAD_WRITE_ON(master, zclient->t_write,
+ zclient_flush_data_t, zclient, zclient->sock);
break;
}
return 0;
@@ -313,6 +395,10 @@ zclient_start (struct zclient *zclient)
if (zclient->t_connect)
return 0;
+ /* Check timer */
+ if (zclient->qtr && qtr_is_active(zclient->qtr))
+ return 0;
+
/* Make socket. */
#ifdef HAVE_TCP_ZEBRA
zclient->sock = zclient_socket ();
@@ -336,6 +422,9 @@ zclient_start (struct zclient *zclient)
if (zclient_debug)
zlog_debug ("zclient connect success with socket [%d]", zclient->sock);
+ if (zclient_nexus)
+ qps_add_file(zclient_nexus->selection, zclient->qf, zclient->sock, zclient);
+
/* Create read thread. */
zclient_event (ZCLIENT_READ, zclient);
@@ -358,9 +447,24 @@ zclient_start (struct zclient *zclient)
}
/* This function is a wrapper function for calling zclient_start from
+ qtimer. */
+static void
+zclient_connect_r (qtimer qtr, void* timer_info, qtime_t when)
+{
+ struct zclient *zclient = timer_info;
+
+ qtimer_unset(qtr);
+
+ if (zclient_debug)
+ zlog_debug ("zclient_connect is called");
+
+ zclient_start (zclient);
+}
+
+/* This function is a wrapper function for calling zclient_start from
timer or event thread. */
static int
-zclient_connect (struct thread *t)
+zclient_connect_t (struct thread *t)
{
struct zclient *zclient;
@@ -372,7 +476,51 @@ zclient_connect (struct thread *t)
return zclient_start (zclient);
}
-
+
+/* Connect to zebra for nexthop lookup.
+ * thread version */
+static int
+zlookup_connect_t (struct thread *t)
+{
+ struct zclient *zlookup;
+
+ zlookup = THREAD_ARG (t);
+ zlookup->t_connect = NULL;
+
+ if (zlookup->sock != -1)
+ return 0;
+
+#ifdef HAVE_TCP_ZEBRA
+ zlookup->sock = zclient_socket ();
+#else
+ zlookup->sock = zclient_socket_un (ZEBRA_SERV_PATH);
+#endif /* HAVE_TCP_ZEBRA */
+ if (zlookup->sock < 0)
+ return -1;
+
+ return 0;
+}
+
+/* Connect to zebra for nexthop lookup.
+ * nexus version */
+static void
+zlookup_connect_r (qtimer qtr, void* timer_info, qtime_t when)
+{
+ struct zclient *zlookup = timer_info;
+
+ qtimer_unset(qtr);
+
+ if (zlookup->sock != -1)
+ return;
+
+#ifdef HAVE_TCP_ZEBRA
+ zlookup->sock = zclient_socket ();
+#else
+ zlookup->sock = zclient_socket_un (ZEBRA_SERV_PATH);
+#endif /* HAVE_TCP_ZEBRA */
+}
+
+
/*
* "xdr_encode"-like interface that allows daemon (client) to send
* a message to zebra server for a route that needs to be
@@ -791,20 +939,32 @@ zebra_interface_address_read (int type, struct stream *s)
return ifc;
}
-
+/* nexus: Zebra client message read function. */
+static void
+zclient_read_r (qps_file qf, void* file_info)
+{
+ struct zclient *zclient = file_info;
+ qps_disable_modes(qf, qps_read_mbit);
+ zclient_read(zclient);
+}
+
+/* thread: Zebra client message read function. */
+static int
+zclient_read_t (struct thread *thread)
+{
+ struct zclient *zclient = THREAD_ARG (thread);
+ zclient->t_read = NULL;
+ return zclient_read(zclient);
+}
+
/* Zebra client message read function. */
static int
-zclient_read (struct thread *thread)
+zclient_read (struct zclient *zclient)
{
int ret;
size_t already;
uint16_t length, command;
uint8_t marker, version;
- struct zclient *zclient;
-
- /* Get socket to zebra. */
- zclient = THREAD_ARG (thread);
- zclient->t_read = NULL;
/* Read zebra header (if we don't have it already). */
if ((already = stream_get_endp(zclient->ibuf)) < ZEBRA_HEADER_SIZE)
@@ -991,15 +1151,30 @@ zclient_redistribute_default (int command, struct zclient *zclient)
zebra_message_send (zclient, command);
}
+/* Arm event. */
static void
zclient_event (enum event event, struct zclient *zclient)
{
+ if (zclient_nexus)
+ zclient_event_r(event, zclient);
+ else
+ zclient_event_t(event, zclient);
+}
+
+/* Arm event.
+ * nexus version */
+static void
+zclient_event_r (enum event event, struct zclient *zclient)
+{
switch (event)
{
+ case ZLOOKUP_SCHEDULE:
+ if (!qtr_is_active(zclient->qtr))
+ qtimer_set(zclient->qtr, qt_get_monotonic(), zlookup_connect_r) ;
+ break;
case ZCLIENT_SCHEDULE:
- if (! zclient->t_connect)
- zclient->t_connect =
- thread_add_event (master, zclient_connect, zclient, 0);
+ if (!qtr_is_active(zclient->qtr))
+ qtimer_set(zclient->qtr, qt_get_monotonic(), zclient_connect_r) ;
break;
case ZCLIENT_CONNECT:
if (zclient->fail >= 10)
@@ -1007,14 +1182,47 @@ zclient_event (enum event event, struct zclient *zclient)
if (zclient_debug)
zlog_debug ("zclient connect schedule interval is %d",
zclient->fail < 3 ? 10 : 60);
+ if (!qtr_is_active(zclient->qtr))
+ qtimer_set(zclient->qtr,
+ qt_add_monotonic(QTIME(zclient->fail < 3 ? 10 : 60)), zclient_connect_r) ;
+ break;
+ case ZCLIENT_READ:
+ qps_enable_mode(zclient->qf, qps_read_mnum, zclient_read_r) ;
+ break;
+ }
+}
+
+/* Arm event.
+ * thread version */
+static void
+zclient_event_t (enum event event, struct zclient *zclient)
+{
+ switch (event)
+ {
+ case ZLOOKUP_SCHEDULE:
+ if (! zclient->t_connect)
+ zclient->t_connect =
+ thread_add_event (master, zlookup_connect_t, zclient, 0);
+ break;
+ case ZCLIENT_SCHEDULE:
+ if (! zclient->t_connect)
+ zclient->t_connect =
+ thread_add_event (master, zclient_connect_t, zclient, 0);
+ break;
+ case ZCLIENT_CONNECT:
+ if (zclient->fail >= 10)
+ return;
+ if (zclient_debug)
+ zlog_debug ("zclient connect schedule interval is %d",
+ zclient->fail < 3 ? 10 : 60);
if (! zclient->t_connect)
- zclient->t_connect =
- thread_add_timer (master, zclient_connect, zclient,
- zclient->fail < 3 ? 10 : 60);
+ zclient->t_connect =
+ thread_add_timer (master, zclient_connect_t, zclient,
+ zclient->fail < 3 ? 10 : 60);
break;
case ZCLIENT_READ:
- zclient->t_read =
- thread_add_read (master, zclient_read, zclient, zclient->sock);
+ zclient->t_read =
+ thread_add_read (master, zclient_read_t, zclient, zclient->sock);
break;
}
}