summaryrefslogtreecommitdiffstats
path: root/lib/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stream.c')
-rw-r--r--lib/stream.c373
1 files changed, 258 insertions, 115 deletions
diff --git a/lib/stream.c b/lib/stream.c
index 983330ff..36bbba1e 100644
--- a/lib/stream.c
+++ b/lib/stream.c
@@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License
* along with GNU Zebra; see the file COPYING. If not, write to the Free
* Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- * 02111-1307, USA.
+ * 02111-1307, USA.
*/
#include <stddef.h>
@@ -29,7 +29,7 @@
#include "prefix.h"
#include "log.h"
-/* Tests whether a position is valid */
+/* Tests whether a position is valid */
#define GETP_VALID(S,G) \
((G) <= (S)->endp)
#define PUT_AT_VALID(S,G) GETP_VALID(S,G)
@@ -92,24 +92,24 @@ stream_new (size_t size)
struct stream *s;
assert (size > 0);
-
+
if (size == 0)
{
zlog_warn ("stream_new(): called with 0 size!");
return NULL;
}
-
+
s = XCALLOC (MTYPE_STREAM, sizeof (struct stream));
if (s == NULL)
return s;
-
+
if ( (s->data = XMALLOC (MTYPE_STREAM_DATA, size)) == NULL)
{
XFREE (MTYPE_STREAM, s);
return NULL;
}
-
+
s->size = size;
return s;
}
@@ -120,7 +120,7 @@ stream_free (struct stream *s)
{
if (!s)
return;
-
+
XFREE (MTYPE_STREAM_DATA, s->data);
XFREE (MTYPE_STREAM, s);
}
@@ -129,15 +129,15 @@ struct stream *
stream_copy (struct stream *new, struct stream *src)
{
STREAM_VERIFY_SANE (src);
-
+
assert (new != NULL);
assert (STREAM_SIZE(new) >= src->endp);
new->endp = src->endp;
new->getp = src->getp;
-
+
memcpy (new->data, src->data, src->endp);
-
+
return new;
}
@@ -154,30 +154,52 @@ stream_dup (struct stream *s)
return (stream_copy (new, s));
}
+struct stream *
+stream_dup_pending (struct stream *s)
+{
+ struct stream *new;
+ int new_endp ;
+
+ STREAM_VERIFY_SANE (s);
+
+ new_endp = s->endp - s->getp ;
+ if ( (new = stream_new(new_endp)) == NULL)
+ return NULL;
+
+ assert (STREAM_SIZE(new) >= new_endp);
+
+ new->endp = new_endp ;
+ new->getp = 0 ;
+
+ memcpy (new->data, s->data + s->getp, new_endp) ;
+
+ return new ;
+}
+
size_t
stream_resize (struct stream *s, size_t newsize)
{
u_char *newdata;
STREAM_VERIFY_SANE (s);
-
+
newdata = XREALLOC (MTYPE_STREAM_DATA, s->data, newsize);
-
+
if (newdata == NULL)
return s->size;
-
+
s->data = newdata;
s->size = newsize;
-
+
if (s->endp > s->size)
s->endp = s->size;
if (s->getp > s->endp)
s->getp = s->endp;
-
+
STREAM_VERIFY_SANE (s);
-
+
return s->size;
}
-
+
size_t
stream_get_getp (struct stream *s)
{
@@ -204,7 +226,7 @@ void
stream_set_getp (struct stream *s, size_t pos)
{
STREAM_VERIFY_SANE(s);
-
+
if (!GETP_VALID (s, pos))
{
STREAM_BOUND_WARN (s, "set getp");
@@ -219,13 +241,13 @@ void
stream_forward_getp (struct stream *s, size_t size)
{
STREAM_VERIFY_SANE(s);
-
+
if (!GETP_VALID (s, s->getp + size))
{
STREAM_BOUND_WARN (s, "seek getp");
return;
}
-
+
s->getp += size;
}
@@ -233,28 +255,28 @@ void
stream_forward_endp (struct stream *s, size_t size)
{
STREAM_VERIFY_SANE(s);
-
+
if (!ENDP_VALID (s, s->endp + size))
{
STREAM_BOUND_WARN (s, "seek endp");
return;
}
-
+
s->endp += size;
}
-
+
/* Copy from stream to destination. */
void
stream_get (void *dst, struct stream *s, size_t size)
{
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_READABLE(s) < size)
{
STREAM_BOUND_WARN (s, "get");
return;
}
-
+
memcpy (dst, s->data + s->getp, size);
s->getp += size;
}
@@ -264,7 +286,7 @@ u_char
stream_getc (struct stream *s)
{
u_char c;
-
+
STREAM_VERIFY_SANE (s);
if (STREAM_READABLE(s) < sizeof (u_char))
@@ -273,7 +295,7 @@ stream_getc (struct stream *s)
return 0;
}
c = s->data[s->getp++];
-
+
return c;
}
@@ -284,15 +306,15 @@ stream_getc_from (struct stream *s, size_t from)
u_char c;
STREAM_VERIFY_SANE(s);
-
+
if (!GETP_VALID (s, from + sizeof (u_char)))
{
STREAM_BOUND_WARN (s, "get char");
return 0;
}
-
+
c = s->data[from];
-
+
return c;
}
@@ -309,10 +331,10 @@ stream_getw (struct stream *s)
STREAM_BOUND_WARN (s, "get ");
return 0;
}
-
+
w = s->data[s->getp++] << 8;
w |= s->data[s->getp++];
-
+
return w;
}
@@ -323,16 +345,16 @@ stream_getw_from (struct stream *s, size_t from)
u_int16_t w;
STREAM_VERIFY_SANE(s);
-
+
if (!GETP_VALID (s, from + sizeof (u_int16_t)))
{
STREAM_BOUND_WARN (s, "get ");
return 0;
}
-
+
w = s->data[from++] << 8;
w |= s->data[from];
-
+
return w;
}
@@ -343,18 +365,18 @@ stream_getl_from (struct stream *s, size_t from)
u_int32_t l;
STREAM_VERIFY_SANE(s);
-
+
if (!GETP_VALID (s, from + sizeof (u_int32_t)))
{
STREAM_BOUND_WARN (s, "get long");
return 0;
}
-
+
l = s->data[from++] << 24;
l |= s->data[from++] << 16;
l |= s->data[from++] << 8;
l |= s->data[from];
-
+
return l;
}
@@ -364,18 +386,18 @@ stream_getl (struct stream *s)
u_int32_t l;
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_READABLE (s) < sizeof (u_int32_t))
{
STREAM_BOUND_WARN (s, "get long");
return 0;
}
-
+
l = s->data[s->getp++] << 24;
l |= s->data[s->getp++] << 16;
l |= s->data[s->getp++] << 8;
l |= s->data[s->getp++];
-
+
return l;
}
@@ -386,22 +408,22 @@ stream_getq_from (struct stream *s, size_t from)
uint64_t q;
STREAM_VERIFY_SANE(s);
-
+
if (!GETP_VALID (s, from + sizeof (uint64_t)))
{
STREAM_BOUND_WARN (s, "get quad");
return 0;
}
-
+
q = ((uint64_t) s->data[from++]) << 56;
q |= ((uint64_t) s->data[from++]) << 48;
q |= ((uint64_t) s->data[from++]) << 40;
- q |= ((uint64_t) s->data[from++]) << 32;
+ q |= ((uint64_t) s->data[from++]) << 32;
q |= ((uint64_t) s->data[from++]) << 24;
q |= ((uint64_t) s->data[from++]) << 16;
q |= ((uint64_t) s->data[from++]) << 8;
q |= ((uint64_t) s->data[from++]);
-
+
return q;
}
@@ -411,22 +433,22 @@ stream_getq (struct stream *s)
uint64_t q;
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_READABLE (s) < sizeof (uint64_t))
{
STREAM_BOUND_WARN (s, "get quad");
return 0;
}
-
+
q = ((uint64_t) s->data[s->getp++]) << 56;
q |= ((uint64_t) s->data[s->getp++]) << 48;
q |= ((uint64_t) s->data[s->getp++]) << 40;
- q |= ((uint64_t) s->data[s->getp++]) << 32;
+ q |= ((uint64_t) s->data[s->getp++]) << 32;
q |= ((uint64_t) s->data[s->getp++]) << 24;
q |= ((uint64_t) s->data[s->getp++]) << 16;
q |= ((uint64_t) s->data[s->getp++]) << 8;
q |= ((uint64_t) s->data[s->getp++]);
-
+
return q;
}
@@ -437,19 +459,19 @@ stream_get_ipv4 (struct stream *s)
u_int32_t l;
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_READABLE (s) < sizeof(u_int32_t))
{
STREAM_BOUND_WARN (s, "get ipv4");
return 0;
}
-
+
memcpy (&l, s->data + s->getp, sizeof(u_int32_t));
s->getp += sizeof(u_int32_t);
return l;
}
-
+
/* Copy to source to stream.
*
* XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
@@ -463,15 +485,15 @@ stream_put (struct stream *s, const void *src, size_t size)
/* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
CHECK_SIZE(s, size);
-
+
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_WRITEABLE (s) < size)
{
STREAM_BOUND_WARN (s, "put");
return;
}
-
+
if (src)
memcpy (s->data + s->endp, src, size);
else
@@ -485,13 +507,13 @@ int
stream_putc (struct stream *s, u_char c)
{
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_WRITEABLE (s) < sizeof(u_char))
{
STREAM_BOUND_WARN (s, "put");
return 0;
}
-
+
s->data[s->endp++] = c;
return sizeof (u_char);
}
@@ -507,7 +529,7 @@ stream_putw (struct stream *s, u_int16_t w)
STREAM_BOUND_WARN (s, "put");
return 0;
}
-
+
s->data[s->endp++] = (u_char)(w >> 8);
s->data[s->endp++] = (u_char) w;
@@ -525,7 +547,7 @@ stream_putl (struct stream *s, u_int32_t l)
STREAM_BOUND_WARN (s, "put");
return 0;
}
-
+
s->data[s->endp++] = (u_char)(l >> 24);
s->data[s->endp++] = (u_char)(l >> 16);
s->data[s->endp++] = (u_char)(l >> 8);
@@ -545,7 +567,7 @@ stream_putq (struct stream *s, uint64_t q)
STREAM_BOUND_WARN (s, "put quad");
return 0;
}
-
+
s->data[s->endp++] = (u_char)(q >> 56);
s->data[s->endp++] = (u_char)(q >> 48);
s->data[s->endp++] = (u_char)(q >> 40);
@@ -562,15 +584,15 @@ int
stream_putc_at (struct stream *s, size_t putp, u_char c)
{
STREAM_VERIFY_SANE(s);
-
+
if (!PUT_AT_VALID (s, putp + sizeof (u_char)))
{
STREAM_BOUND_WARN (s, "put");
return 0;
}
-
+
s->data[putp] = c;
-
+
return 1;
}
@@ -578,16 +600,16 @@ int
stream_putw_at (struct stream *s, size_t putp, u_int16_t w)
{
STREAM_VERIFY_SANE(s);
-
+
if (!PUT_AT_VALID (s, putp + sizeof (u_int16_t)))
{
STREAM_BOUND_WARN (s, "put");
return 0;
}
-
+
s->data[putp] = (u_char)(w >> 8);
s->data[putp + 1] = (u_char) w;
-
+
return 2;
}
@@ -595,7 +617,7 @@ int
stream_putl_at (struct stream *s, size_t putp, u_int32_t l)
{
STREAM_VERIFY_SANE(s);
-
+
if (!PUT_AT_VALID (s, putp + sizeof (u_int32_t)))
{
STREAM_BOUND_WARN (s, "put");
@@ -605,7 +627,7 @@ stream_putl_at (struct stream *s, size_t putp, u_int32_t l)
s->data[putp + 1] = (u_char)(l >> 16);
s->data[putp + 2] = (u_char)(l >> 8);
s->data[putp + 3] = (u_char)l;
-
+
return 4;
}
@@ -613,7 +635,7 @@ int
stream_putq_at (struct stream *s, size_t putp, uint64_t q)
{
STREAM_VERIFY_SANE(s);
-
+
if (!PUT_AT_VALID (s, putp + sizeof (uint64_t)))
{
STREAM_BOUND_WARN (s, "put");
@@ -627,7 +649,7 @@ stream_putq_at (struct stream *s, size_t putp, uint64_t q)
s->data[putp + 5] = (u_char)(q >> 16);
s->data[putp + 6] = (u_char)(q >> 8);
s->data[putp + 7] = (u_char)q;
-
+
return 8;
}
@@ -636,7 +658,7 @@ int
stream_put_ipv4 (struct stream *s, u_int32_t l)
{
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_WRITEABLE (s) < sizeof (u_int32_t))
{
STREAM_BOUND_WARN (s, "put");
@@ -653,7 +675,7 @@ int
stream_put_in_addr (struct stream *s, struct in_addr *addr)
{
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_WRITEABLE (s) < sizeof (u_int32_t))
{
STREAM_BOUND_WARN (s, "put");
@@ -671,24 +693,24 @@ int
stream_put_prefix (struct stream *s, struct prefix *p)
{
size_t psize;
-
+
STREAM_VERIFY_SANE(s);
-
+
psize = PSIZE (p->prefixlen);
-
+
if (STREAM_WRITEABLE (s) < psize)
{
STREAM_BOUND_WARN (s, "put");
return 0;
}
-
+
stream_putc (s, p->prefixlen);
memcpy (s->data + s->endp, &p->u.prefix, psize);
s->endp += psize;
-
+
return psize;
}
-
+
/* Read size from fd. */
int
stream_read (struct stream *s, int fd, size_t size)
@@ -696,18 +718,18 @@ stream_read (struct stream *s, int fd, size_t size)
int nbytes;
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_WRITEABLE (s) < size)
{
STREAM_BOUND_WARN (s, "put");
return 0;
}
-
+
nbytes = readn (fd, s->data + s->endp, size);
if (nbytes > 0)
s->endp += nbytes;
-
+
return nbytes;
}
@@ -717,15 +739,15 @@ stream_read_unblock (struct stream *s, int fd, size_t size)
{
int nbytes;
int val;
-
+
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_WRITEABLE (s) < size)
{
STREAM_BOUND_WARN (s, "put");
return 0;
}
-
+
val = fcntl (fd, F_GETFL, 0);
fcntl (fd, F_SETFL, val|O_NONBLOCK);
nbytes = read (fd, s->data + s->endp, size);
@@ -733,17 +755,67 @@ stream_read_unblock (struct stream *s, int fd, size_t size)
if (nbytes > 0)
s->endp += nbytes;
-
+
return nbytes;
}
+/*------------------------------------------------------------------------------
+ * Read up to size bytes into stream -- assuming non-blocking socket.
+ *
+ * Loops internally if gets EINTR -- so if does not read everything asked for,
+ * that must be because the read would otherwise block.
+ *
+ * Returns: 0..size -- number of bytes read
+ * -1 => failed -- see errno
+ * -2 => EOF met
+ *
+ * NB: if asks for zero bytes, will return 0 or error (if any).
+ */
+int
+stream_read_nonblock(struct stream *s, int fd, size_t size)
+{
+ int ret ;
+ int want = size ;
+
+ STREAM_VERIFY_SANE(s);
+
+ if (STREAM_WRITEABLE (s) < size)
+ {
+ STREAM_BOUND_WARN (s, "put");
+ return 0;
+ }
+
+ do
+ {
+ ret = read(fd, s->data + s->endp, want);
+
+ if (ret > 0)
+ {
+ s->endp += ret ;
+ want -= ret ;
+ }
+ else if (ret == 0)
+ return (want == 0) ? 0 : -2 ;
+ else
+ {
+ int err = errno ;
+ if ((err == EAGAIN) || (err == EWOULDBLOCK))
+ break ;
+ if (err != EINTR)
+ return -1 ;
+ } ;
+ } while (want > 0) ;
+
+ return size - want ;
+}
+
ssize_t
stream_read_try(struct stream *s, int fd, size_t size)
{
ssize_t nbytes;
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_WRITEABLE(s) < size)
{
STREAM_BOUND_WARN (s, "put");
@@ -767,14 +839,14 @@ stream_read_try(struct stream *s, int fd, size_t size)
/* Read up to size bytes into the stream from the fd, using recvmsgfrom
* whose arguments match the remaining arguments to this function
*/
-ssize_t
+ssize_t
stream_recvfrom (struct stream *s, int fd, size_t size, int flags,
- struct sockaddr *from, socklen_t *fromlen)
+ struct sockaddr *from, socklen_t *fromlen)
{
ssize_t nbytes;
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_WRITEABLE(s) < size)
{
STREAM_BOUND_WARN (s, "put");
@@ -783,7 +855,7 @@ stream_recvfrom (struct stream *s, int fd, size_t size, int flags,
return -1;
}
- if ((nbytes = recvfrom (fd, s->data + s->endp, size,
+ if ((nbytes = recvfrom (fd, s->data + s->endp, size,
flags, from, fromlen)) >= 0)
{
s->endp += nbytes;
@@ -802,15 +874,15 @@ stream_recvfrom (struct stream *s, int fd, size_t size, int flags,
* Stream need not be empty.
*/
ssize_t
-stream_recvmsg (struct stream *s, int fd, struct msghdr *msgh, int flags,
+stream_recvmsg (struct stream *s, int fd, struct msghdr *msgh, int flags,
size_t size)
{
int nbytes;
struct iovec *iov;
-
+
STREAM_VERIFY_SANE(s);
- assert (msgh->msg_iovlen > 0);
-
+ assert (msgh->msg_iovlen > 0);
+
if (STREAM_WRITEABLE (s) < size)
{
STREAM_BOUND_WARN (s, "put");
@@ -818,19 +890,19 @@ stream_recvmsg (struct stream *s, int fd, struct msghdr *msgh, int flags,
to hold the desired data! */
return -1;
}
-
+
iov = &(msgh->msg_iov[0]);
iov->iov_base = (s->data + s->endp);
iov->iov_len = size;
-
+
nbytes = recvmsg (fd, msgh, flags);
-
+
if (nbytes > 0)
s->endp += nbytes;
-
+
return nbytes;
}
-
+
/* Write data to buffer. */
size_t
stream_write (struct stream *s, const void *ptr, size_t size)
@@ -839,20 +911,20 @@ stream_write (struct stream *s, const void *ptr, size_t size)
CHECK_SIZE(s, size);
STREAM_VERIFY_SANE(s);
-
+
if (STREAM_WRITEABLE (s) < size)
{
STREAM_BOUND_WARN (s, "put");
return 0;
}
-
+
memcpy (s->data + s->endp, ptr, size);
s->endp += size;
return size;
}
-/* Return current read pointer.
+/* Return current read pointer.
* DEPRECATED!
* Use stream_get_pnt_to if you must, but decoding streams properly
* is preferred
@@ -882,26 +954,97 @@ stream_reset (struct stream *s)
s->getp = s->endp = 0;
}
+/* Number of bytes pending to be written */
+int
+stream_pending(struct stream* s)
+{
+ STREAM_VERIFY_SANE(s);
+
+ return s->endp - s->getp ;
+}
+
/* Write stream contens to the file discriptor. */
int
-stream_flush (struct stream *s, int fd)
+stream_flush (struct stream* s, int fd)
{
int nbytes;
-
+
STREAM_VERIFY_SANE(s);
-
+
nbytes = write (fd, s->data + s->getp, s->endp - s->getp);
-
+
return nbytes;
}
-
+
+/*------------------------------------------------------------------------------
+ * Try to write stream contents to the file descriptor -- assuming non-blocking.
+ *
+ * Loops if gets EINTR.
+ *
+ * If writes everything, resets the stream.
+ *
+ * If does not write everything, then would block.
+ *
+ * Returns: >= 0 number of bytes left to write
+ * -1 => some error (not including EINTR, EAGAIN or EWOULDBLOCK)
+ */
+int
+stream_flush_try(struct stream* s, int fd)
+{
+ int have ;
+ int ret ;
+
+ STREAM_VERIFY_SANE(s);
+
+ while ((have = (s->endp - s->getp)) != 0)
+ {
+ ret = write(fd, s->data + s->getp, have) ;
+ if (ret > 0)
+ s->getp += ret ;
+ else if (ret < 0)
+ {
+ ret = errno ;
+ if ((ret == EAGAIN) || (ret == EWOULDBLOCK))
+ return have ;
+ if (ret != EINTR)
+ return -1 ;
+ } ;
+ } ;
+
+ s->getp = s->endp = 0;
+
+ return 0 ;
+}
+
+/*------------------------------------------------------------------------------
+ * Transfer contents of stream to given buffer and reset stream.
+ *
+ * Transfers *entire* stream buffer.
+ *
+ * Returns pointer to next byte in given buffer
+ */
+void*
+stream_transfer(void* p, struct stream* s, void* limit)
+{
+ size_t have = s->endp ;
+
+ STREAM_VERIFY_SANE(s);
+ assert((p + have) <= limit) ;
+
+ memcpy(p, s->data, have) ;
+
+ s->getp = s->endp = 0;
+
+ return p + have ;
+} ;
+
/* Stream first in first out queue. */
struct stream_fifo *
stream_fifo_new (void)
{
struct stream_fifo *new;
-
+
new = XCALLOC (MTYPE_STREAM_FIFO, sizeof (struct stream_fifo));
return new;
}
@@ -914,7 +1057,7 @@ stream_fifo_push (struct stream_fifo *fifo, struct stream *s)
fifo->tail->next = s;
else
fifo->head = s;
-
+
fifo->tail = s;
fifo->count++;
@@ -925,20 +1068,20 @@ struct stream *
stream_fifo_pop (struct stream_fifo *fifo)
{
struct stream *s;
-
- s = fifo->head;
+
+ s = fifo->head;
if (s)
- {
+ {
fifo->head = s->next;
if (fifo->head == NULL)
fifo->tail = NULL;
- }
- fifo->count--;
+ fifo->count--;
+ }
- return s;
+ return s;
}
/* Return first fifo entry. */