diff options
Diffstat (limited to 'lib/stream.c')
-rw-r--r-- | lib/stream.c | 373 |
1 files changed, 258 insertions, 115 deletions
diff --git a/lib/stream.c b/lib/stream.c index 983330ff..36bbba1e 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -17,7 +17,7 @@ * You should have received a copy of the GNU General Public License * along with GNU Zebra; see the file COPYING. If not, write to the Free * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA - * 02111-1307, USA. + * 02111-1307, USA. */ #include <stddef.h> @@ -29,7 +29,7 @@ #include "prefix.h" #include "log.h" -/* Tests whether a position is valid */ +/* Tests whether a position is valid */ #define GETP_VALID(S,G) \ ((G) <= (S)->endp) #define PUT_AT_VALID(S,G) GETP_VALID(S,G) @@ -92,24 +92,24 @@ stream_new (size_t size) struct stream *s; assert (size > 0); - + if (size == 0) { zlog_warn ("stream_new(): called with 0 size!"); return NULL; } - + s = XCALLOC (MTYPE_STREAM, sizeof (struct stream)); if (s == NULL) return s; - + if ( (s->data = XMALLOC (MTYPE_STREAM_DATA, size)) == NULL) { XFREE (MTYPE_STREAM, s); return NULL; } - + s->size = size; return s; } @@ -120,7 +120,7 @@ stream_free (struct stream *s) { if (!s) return; - + XFREE (MTYPE_STREAM_DATA, s->data); XFREE (MTYPE_STREAM, s); } @@ -129,15 +129,15 @@ struct stream * stream_copy (struct stream *new, struct stream *src) { STREAM_VERIFY_SANE (src); - + assert (new != NULL); assert (STREAM_SIZE(new) >= src->endp); new->endp = src->endp; new->getp = src->getp; - + memcpy (new->data, src->data, src->endp); - + return new; } @@ -154,30 +154,52 @@ stream_dup (struct stream *s) return (stream_copy (new, s)); } +struct stream * +stream_dup_pending (struct stream *s) +{ + struct stream *new; + int new_endp ; + + STREAM_VERIFY_SANE (s); + + new_endp = s->endp - s->getp ; + if ( (new = stream_new(new_endp)) == NULL) + return NULL; + + assert (STREAM_SIZE(new) >= new_endp); + + new->endp = new_endp ; + new->getp = 0 ; + + memcpy (new->data, s->data + s->getp, new_endp) ; + + return new ; +} + size_t stream_resize (struct stream *s, size_t newsize) { u_char *newdata; STREAM_VERIFY_SANE (s); - + newdata = XREALLOC (MTYPE_STREAM_DATA, s->data, newsize); - + if (newdata == NULL) return s->size; - + s->data = newdata; s->size = newsize; - + if (s->endp > s->size) s->endp = s->size; if (s->getp > s->endp) s->getp = s->endp; - + STREAM_VERIFY_SANE (s); - + return s->size; } - + size_t stream_get_getp (struct stream *s) { @@ -204,7 +226,7 @@ void stream_set_getp (struct stream *s, size_t pos) { STREAM_VERIFY_SANE(s); - + if (!GETP_VALID (s, pos)) { STREAM_BOUND_WARN (s, "set getp"); @@ -219,13 +241,13 @@ void stream_forward_getp (struct stream *s, size_t size) { STREAM_VERIFY_SANE(s); - + if (!GETP_VALID (s, s->getp + size)) { STREAM_BOUND_WARN (s, "seek getp"); return; } - + s->getp += size; } @@ -233,28 +255,28 @@ void stream_forward_endp (struct stream *s, size_t size) { STREAM_VERIFY_SANE(s); - + if (!ENDP_VALID (s, s->endp + size)) { STREAM_BOUND_WARN (s, "seek endp"); return; } - + s->endp += size; } - + /* Copy from stream to destination. */ void stream_get (void *dst, struct stream *s, size_t size) { STREAM_VERIFY_SANE(s); - + if (STREAM_READABLE(s) < size) { STREAM_BOUND_WARN (s, "get"); return; } - + memcpy (dst, s->data + s->getp, size); s->getp += size; } @@ -264,7 +286,7 @@ u_char stream_getc (struct stream *s) { u_char c; - + STREAM_VERIFY_SANE (s); if (STREAM_READABLE(s) < sizeof (u_char)) @@ -273,7 +295,7 @@ stream_getc (struct stream *s) return 0; } c = s->data[s->getp++]; - + return c; } @@ -284,15 +306,15 @@ stream_getc_from (struct stream *s, size_t from) u_char c; STREAM_VERIFY_SANE(s); - + if (!GETP_VALID (s, from + sizeof (u_char))) { STREAM_BOUND_WARN (s, "get char"); return 0; } - + c = s->data[from]; - + return c; } @@ -309,10 +331,10 @@ stream_getw (struct stream *s) STREAM_BOUND_WARN (s, "get "); return 0; } - + w = s->data[s->getp++] << 8; w |= s->data[s->getp++]; - + return w; } @@ -323,16 +345,16 @@ stream_getw_from (struct stream *s, size_t from) u_int16_t w; STREAM_VERIFY_SANE(s); - + if (!GETP_VALID (s, from + sizeof (u_int16_t))) { STREAM_BOUND_WARN (s, "get "); return 0; } - + w = s->data[from++] << 8; w |= s->data[from]; - + return w; } @@ -343,18 +365,18 @@ stream_getl_from (struct stream *s, size_t from) u_int32_t l; STREAM_VERIFY_SANE(s); - + if (!GETP_VALID (s, from + sizeof (u_int32_t))) { STREAM_BOUND_WARN (s, "get long"); return 0; } - + l = s->data[from++] << 24; l |= s->data[from++] << 16; l |= s->data[from++] << 8; l |= s->data[from]; - + return l; } @@ -364,18 +386,18 @@ stream_getl (struct stream *s) u_int32_t l; STREAM_VERIFY_SANE(s); - + if (STREAM_READABLE (s) < sizeof (u_int32_t)) { STREAM_BOUND_WARN (s, "get long"); return 0; } - + l = s->data[s->getp++] << 24; l |= s->data[s->getp++] << 16; l |= s->data[s->getp++] << 8; l |= s->data[s->getp++]; - + return l; } @@ -386,22 +408,22 @@ stream_getq_from (struct stream *s, size_t from) uint64_t q; STREAM_VERIFY_SANE(s); - + if (!GETP_VALID (s, from + sizeof (uint64_t))) { STREAM_BOUND_WARN (s, "get quad"); return 0; } - + q = ((uint64_t) s->data[from++]) << 56; q |= ((uint64_t) s->data[from++]) << 48; q |= ((uint64_t) s->data[from++]) << 40; - q |= ((uint64_t) s->data[from++]) << 32; + q |= ((uint64_t) s->data[from++]) << 32; q |= ((uint64_t) s->data[from++]) << 24; q |= ((uint64_t) s->data[from++]) << 16; q |= ((uint64_t) s->data[from++]) << 8; q |= ((uint64_t) s->data[from++]); - + return q; } @@ -411,22 +433,22 @@ stream_getq (struct stream *s) uint64_t q; STREAM_VERIFY_SANE(s); - + if (STREAM_READABLE (s) < sizeof (uint64_t)) { STREAM_BOUND_WARN (s, "get quad"); return 0; } - + q = ((uint64_t) s->data[s->getp++]) << 56; q |= ((uint64_t) s->data[s->getp++]) << 48; q |= ((uint64_t) s->data[s->getp++]) << 40; - q |= ((uint64_t) s->data[s->getp++]) << 32; + q |= ((uint64_t) s->data[s->getp++]) << 32; q |= ((uint64_t) s->data[s->getp++]) << 24; q |= ((uint64_t) s->data[s->getp++]) << 16; q |= ((uint64_t) s->data[s->getp++]) << 8; q |= ((uint64_t) s->data[s->getp++]); - + return q; } @@ -437,19 +459,19 @@ stream_get_ipv4 (struct stream *s) u_int32_t l; STREAM_VERIFY_SANE(s); - + if (STREAM_READABLE (s) < sizeof(u_int32_t)) { STREAM_BOUND_WARN (s, "get ipv4"); return 0; } - + memcpy (&l, s->data + s->getp, sizeof(u_int32_t)); s->getp += sizeof(u_int32_t); return l; } - + /* Copy to source to stream. * * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap @@ -463,15 +485,15 @@ stream_put (struct stream *s, const void *src, size_t size) /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */ CHECK_SIZE(s, size); - + STREAM_VERIFY_SANE(s); - + if (STREAM_WRITEABLE (s) < size) { STREAM_BOUND_WARN (s, "put"); return; } - + if (src) memcpy (s->data + s->endp, src, size); else @@ -485,13 +507,13 @@ int stream_putc (struct stream *s, u_char c) { STREAM_VERIFY_SANE(s); - + if (STREAM_WRITEABLE (s) < sizeof(u_char)) { STREAM_BOUND_WARN (s, "put"); return 0; } - + s->data[s->endp++] = c; return sizeof (u_char); } @@ -507,7 +529,7 @@ stream_putw (struct stream *s, u_int16_t w) STREAM_BOUND_WARN (s, "put"); return 0; } - + s->data[s->endp++] = (u_char)(w >> 8); s->data[s->endp++] = (u_char) w; @@ -525,7 +547,7 @@ stream_putl (struct stream *s, u_int32_t l) STREAM_BOUND_WARN (s, "put"); return 0; } - + s->data[s->endp++] = (u_char)(l >> 24); s->data[s->endp++] = (u_char)(l >> 16); s->data[s->endp++] = (u_char)(l >> 8); @@ -545,7 +567,7 @@ stream_putq (struct stream *s, uint64_t q) STREAM_BOUND_WARN (s, "put quad"); return 0; } - + s->data[s->endp++] = (u_char)(q >> 56); s->data[s->endp++] = (u_char)(q >> 48); s->data[s->endp++] = (u_char)(q >> 40); @@ -562,15 +584,15 @@ int stream_putc_at (struct stream *s, size_t putp, u_char c) { STREAM_VERIFY_SANE(s); - + if (!PUT_AT_VALID (s, putp + sizeof (u_char))) { STREAM_BOUND_WARN (s, "put"); return 0; } - + s->data[putp] = c; - + return 1; } @@ -578,16 +600,16 @@ int stream_putw_at (struct stream *s, size_t putp, u_int16_t w) { STREAM_VERIFY_SANE(s); - + if (!PUT_AT_VALID (s, putp + sizeof (u_int16_t))) { STREAM_BOUND_WARN (s, "put"); return 0; } - + s->data[putp] = (u_char)(w >> 8); s->data[putp + 1] = (u_char) w; - + return 2; } @@ -595,7 +617,7 @@ int stream_putl_at (struct stream *s, size_t putp, u_int32_t l) { STREAM_VERIFY_SANE(s); - + if (!PUT_AT_VALID (s, putp + sizeof (u_int32_t))) { STREAM_BOUND_WARN (s, "put"); @@ -605,7 +627,7 @@ stream_putl_at (struct stream *s, size_t putp, u_int32_t l) s->data[putp + 1] = (u_char)(l >> 16); s->data[putp + 2] = (u_char)(l >> 8); s->data[putp + 3] = (u_char)l; - + return 4; } @@ -613,7 +635,7 @@ int stream_putq_at (struct stream *s, size_t putp, uint64_t q) { STREAM_VERIFY_SANE(s); - + if (!PUT_AT_VALID (s, putp + sizeof (uint64_t))) { STREAM_BOUND_WARN (s, "put"); @@ -627,7 +649,7 @@ stream_putq_at (struct stream *s, size_t putp, uint64_t q) s->data[putp + 5] = (u_char)(q >> 16); s->data[putp + 6] = (u_char)(q >> 8); s->data[putp + 7] = (u_char)q; - + return 8; } @@ -636,7 +658,7 @@ int stream_put_ipv4 (struct stream *s, u_int32_t l) { STREAM_VERIFY_SANE(s); - + if (STREAM_WRITEABLE (s) < sizeof (u_int32_t)) { STREAM_BOUND_WARN (s, "put"); @@ -653,7 +675,7 @@ int stream_put_in_addr (struct stream *s, struct in_addr *addr) { STREAM_VERIFY_SANE(s); - + if (STREAM_WRITEABLE (s) < sizeof (u_int32_t)) { STREAM_BOUND_WARN (s, "put"); @@ -671,24 +693,24 @@ int stream_put_prefix (struct stream *s, struct prefix *p) { size_t psize; - + STREAM_VERIFY_SANE(s); - + psize = PSIZE (p->prefixlen); - + if (STREAM_WRITEABLE (s) < psize) { STREAM_BOUND_WARN (s, "put"); return 0; } - + stream_putc (s, p->prefixlen); memcpy (s->data + s->endp, &p->u.prefix, psize); s->endp += psize; - + return psize; } - + /* Read size from fd. */ int stream_read (struct stream *s, int fd, size_t size) @@ -696,18 +718,18 @@ stream_read (struct stream *s, int fd, size_t size) int nbytes; STREAM_VERIFY_SANE(s); - + if (STREAM_WRITEABLE (s) < size) { STREAM_BOUND_WARN (s, "put"); return 0; } - + nbytes = readn (fd, s->data + s->endp, size); if (nbytes > 0) s->endp += nbytes; - + return nbytes; } @@ -717,15 +739,15 @@ stream_read_unblock (struct stream *s, int fd, size_t size) { int nbytes; int val; - + STREAM_VERIFY_SANE(s); - + if (STREAM_WRITEABLE (s) < size) { STREAM_BOUND_WARN (s, "put"); return 0; } - + val = fcntl (fd, F_GETFL, 0); fcntl (fd, F_SETFL, val|O_NONBLOCK); nbytes = read (fd, s->data + s->endp, size); @@ -733,17 +755,67 @@ stream_read_unblock (struct stream *s, int fd, size_t size) if (nbytes > 0) s->endp += nbytes; - + return nbytes; } +/*------------------------------------------------------------------------------ + * Read up to size bytes into stream -- assuming non-blocking socket. + * + * Loops internally if gets EINTR -- so if does not read everything asked for, + * that must be because the read would otherwise block. + * + * Returns: 0..size -- number of bytes read + * -1 => failed -- see errno + * -2 => EOF met + * + * NB: if asks for zero bytes, will return 0 or error (if any). + */ +int +stream_read_nonblock(struct stream *s, int fd, size_t size) +{ + int ret ; + int want = size ; + + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE (s) < size) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + + do + { + ret = read(fd, s->data + s->endp, want); + + if (ret > 0) + { + s->endp += ret ; + want -= ret ; + } + else if (ret == 0) + return (want == 0) ? 0 : -2 ; + else + { + int err = errno ; + if ((err == EAGAIN) || (err == EWOULDBLOCK)) + break ; + if (err != EINTR) + return -1 ; + } ; + } while (want > 0) ; + + return size - want ; +} + ssize_t stream_read_try(struct stream *s, int fd, size_t size) { ssize_t nbytes; STREAM_VERIFY_SANE(s); - + if (STREAM_WRITEABLE(s) < size) { STREAM_BOUND_WARN (s, "put"); @@ -767,14 +839,14 @@ stream_read_try(struct stream *s, int fd, size_t size) /* Read up to size bytes into the stream from the fd, using recvmsgfrom * whose arguments match the remaining arguments to this function */ -ssize_t +ssize_t stream_recvfrom (struct stream *s, int fd, size_t size, int flags, - struct sockaddr *from, socklen_t *fromlen) + struct sockaddr *from, socklen_t *fromlen) { ssize_t nbytes; STREAM_VERIFY_SANE(s); - + if (STREAM_WRITEABLE(s) < size) { STREAM_BOUND_WARN (s, "put"); @@ -783,7 +855,7 @@ stream_recvfrom (struct stream *s, int fd, size_t size, int flags, return -1; } - if ((nbytes = recvfrom (fd, s->data + s->endp, size, + if ((nbytes = recvfrom (fd, s->data + s->endp, size, flags, from, fromlen)) >= 0) { s->endp += nbytes; @@ -802,15 +874,15 @@ stream_recvfrom (struct stream *s, int fd, size_t size, int flags, * Stream need not be empty. */ ssize_t -stream_recvmsg (struct stream *s, int fd, struct msghdr *msgh, int flags, +stream_recvmsg (struct stream *s, int fd, struct msghdr *msgh, int flags, size_t size) { int nbytes; struct iovec *iov; - + STREAM_VERIFY_SANE(s); - assert (msgh->msg_iovlen > 0); - + assert (msgh->msg_iovlen > 0); + if (STREAM_WRITEABLE (s) < size) { STREAM_BOUND_WARN (s, "put"); @@ -818,19 +890,19 @@ stream_recvmsg (struct stream *s, int fd, struct msghdr *msgh, int flags, to hold the desired data! */ return -1; } - + iov = &(msgh->msg_iov[0]); iov->iov_base = (s->data + s->endp); iov->iov_len = size; - + nbytes = recvmsg (fd, msgh, flags); - + if (nbytes > 0) s->endp += nbytes; - + return nbytes; } - + /* Write data to buffer. */ size_t stream_write (struct stream *s, const void *ptr, size_t size) @@ -839,20 +911,20 @@ stream_write (struct stream *s, const void *ptr, size_t size) CHECK_SIZE(s, size); STREAM_VERIFY_SANE(s); - + if (STREAM_WRITEABLE (s) < size) { STREAM_BOUND_WARN (s, "put"); return 0; } - + memcpy (s->data + s->endp, ptr, size); s->endp += size; return size; } -/* Return current read pointer. +/* Return current read pointer. * DEPRECATED! * Use stream_get_pnt_to if you must, but decoding streams properly * is preferred @@ -882,26 +954,97 @@ stream_reset (struct stream *s) s->getp = s->endp = 0; } +/* Number of bytes pending to be written */ +int +stream_pending(struct stream* s) +{ + STREAM_VERIFY_SANE(s); + + return s->endp - s->getp ; +} + /* Write stream contens to the file discriptor. */ int -stream_flush (struct stream *s, int fd) +stream_flush (struct stream* s, int fd) { int nbytes; - + STREAM_VERIFY_SANE(s); - + nbytes = write (fd, s->data + s->getp, s->endp - s->getp); - + return nbytes; } - + +/*------------------------------------------------------------------------------ + * Try to write stream contents to the file descriptor -- assuming non-blocking. + * + * Loops if gets EINTR. + * + * If writes everything, resets the stream. + * + * If does not write everything, then would block. + * + * Returns: >= 0 number of bytes left to write + * -1 => some error (not including EINTR, EAGAIN or EWOULDBLOCK) + */ +int +stream_flush_try(struct stream* s, int fd) +{ + int have ; + int ret ; + + STREAM_VERIFY_SANE(s); + + while ((have = (s->endp - s->getp)) != 0) + { + ret = write(fd, s->data + s->getp, have) ; + if (ret > 0) + s->getp += ret ; + else if (ret < 0) + { + ret = errno ; + if ((ret == EAGAIN) || (ret == EWOULDBLOCK)) + return have ; + if (ret != EINTR) + return -1 ; + } ; + } ; + + s->getp = s->endp = 0; + + return 0 ; +} + +/*------------------------------------------------------------------------------ + * Transfer contents of stream to given buffer and reset stream. + * + * Transfers *entire* stream buffer. + * + * Returns pointer to next byte in given buffer + */ +void* +stream_transfer(void* p, struct stream* s, void* limit) +{ + size_t have = s->endp ; + + STREAM_VERIFY_SANE(s); + assert((p + have) <= limit) ; + + memcpy(p, s->data, have) ; + + s->getp = s->endp = 0; + + return p + have ; +} ; + /* Stream first in first out queue. */ struct stream_fifo * stream_fifo_new (void) { struct stream_fifo *new; - + new = XCALLOC (MTYPE_STREAM_FIFO, sizeof (struct stream_fifo)); return new; } @@ -914,7 +1057,7 @@ stream_fifo_push (struct stream_fifo *fifo, struct stream *s) fifo->tail->next = s; else fifo->head = s; - + fifo->tail = s; fifo->count++; @@ -925,20 +1068,20 @@ struct stream * stream_fifo_pop (struct stream_fifo *fifo) { struct stream *s; - - s = fifo->head; + + s = fifo->head; if (s) - { + { fifo->head = s->next; if (fifo->head == NULL) fifo->tail = NULL; - } - fifo->count--; + fifo->count--; + } - return s; + return s; } /* Return first fifo entry. */ |