From 40df0ee8bfa49a18eb5fa6b2e02715fa82da8a96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20Ter=C3=A4s?= Date: Thu, 2 Jun 2016 22:10:06 +0300 Subject: [PATCH] Implement internal abstraction for iostreams fopencookie/funclose is a non-standard API and should not be used in portable software. Additionally, the way FILE's fd is used in non-blocking mode is undefined behaviour and cannot be relied on. This introduces internal abstraction for io streams, that allows implementing the desired virtualization of read/write operations with necessary timeout handling. ASTERISK-24515 #close ASTERISK-24517 #close Change-Id: Id916aef418b665ced6a7489aef74908b6e376e85 --- apps/app_externalivr.c | 119 ++++---- channels/chan_sip.c | 61 ++-- configure.ac | 4 - include/asterisk/iostream.h | 118 ++++++++ include/asterisk/tcptls.h | 92 +----- main/http.c | 105 +++---- main/iostream.c | 553 ++++++++++++++++++++++++++++++++++ main/manager.c | 142 ++++----- main/tcptls.c | 711 ++++++-------------------------------------- main/utils.c | 62 ---- res/res_http_post.c | 10 +- res/res_http_websocket.c | 116 ++++---- res/res_phoneprov.c | 2 +- 13 files changed, 997 insertions(+), 1098 deletions(-) create mode 100644 include/asterisk/iostream.h create mode 100644 main/iostream.c diff --git a/apps/app_externalivr.c b/apps/app_externalivr.c index 2bb1d8b..129f29b 100644 --- a/apps/app_externalivr.c +++ b/apps/app_externalivr.c @@ -152,10 +152,12 @@ struct gen_state { }; static int eivr_comm(struct ast_channel *chan, struct ivr_localuser *u, - int *eivr_events_fd, int *eivr_commands_fd, int *eivr_errors_fd, + struct ast_iostream *eivr_events, + struct ast_iostream *eivr_commands, + struct ast_iostream *eivr_errors, const struct ast_str *args, const struct ast_flags flags); -static void send_eivr_event(FILE *handle, const char event, const char *data, +static void send_eivr_event(struct ast_iostream *stream, const char event, const char *data, const struct ast_channel *chan) { struct ast_str *tmp = ast_str_create(12); @@ -164,9 +166,11 @@ static void send_eivr_event(FILE *handle, const char event, const char *data, if (data) { ast_str_append(&tmp, 0, ",%s", data); } + ast_str_append(&tmp, 0, "\n"); + ast_iostream_write(stream, ast_str_buffer(tmp), strlen(ast_str_buffer(tmp))); + ast_str_truncate(tmp, -1); - fprintf(handle, "%s\n", ast_str_buffer(tmp)); - ast_debug(1, "sent '%s'\n", ast_str_buffer(tmp)); + ast_debug(1, "sent '%s'", ast_str_buffer(tmp)); ast_free(tmp); } @@ -395,6 +399,8 @@ static int app_exec(struct ast_channel *chan, const char *data) int child_stdin[2] = { -1, -1 }; int child_stdout[2] = { -1, -1 }; int child_stderr[2] = { -1, -1 }; + struct ast_iostream *stream_stdin = NULL, *stream_stdout = NULL, + *stream_stderr = NULL; int res = -1; int pid; @@ -526,7 +532,7 @@ static int app_exec(struct ast_channel *chan, const char *data) goto exit; } - res = eivr_comm(chan, u, &ser->fd, &ser->fd, NULL, comma_delim_args, flags); + res = eivr_comm(chan, u, ser->stream, ser->stream, NULL, comma_delim_args, flags); } else { if (pipe(child_stdin)) { @@ -568,7 +574,12 @@ static int app_exec(struct ast_channel *chan, const char *data) child_stdout[1] = -1; close(child_stderr[1]); child_stderr[1] = -1; - res = eivr_comm(chan, u, &child_stdin[1], &child_stdout[0], &child_stderr[0], comma_delim_args, flags); + + stream_stdin = ast_iostream_from_fd(&child_stdin[1]); + stream_stdout = ast_iostream_from_fd(&child_stdout[0]); + stream_stderr = ast_iostream_from_fd(&child_stderr[0]); + + res = eivr_comm(chan, u, stream_stdin, stream_stdout, stream_stderr, comma_delim_args, flags); } } @@ -576,6 +587,15 @@ static int app_exec(struct ast_channel *chan, const char *data) if (u->gen_active) { ast_deactivate_generator(chan); } + if (stream_stdin) { + ast_iostream_close(stream_stdin); + } + if (stream_stdout) { + ast_iostream_close(stream_stdout); + } + if (stream_stderr) { + ast_iostream_close(stream_stderr); + } if (child_stdin[0] > -1) { close(child_stdin[0]); } @@ -604,46 +624,25 @@ static int app_exec(struct ast_channel *chan, const char *data) } static int eivr_comm(struct ast_channel *chan, struct ivr_localuser *u, - int *eivr_events_fd, int *eivr_commands_fd, int *eivr_errors_fd, - const struct ast_str *args, const struct ast_flags flags) + struct ast_iostream *eivr_events, + struct ast_iostream *eivr_commands, + struct ast_iostream *eivr_errors, + const struct ast_str *args, const struct ast_flags flags) { + char input[1024]; struct playlist_entry *entry; struct ast_frame *f; int ms; int exception; int ready_fd; - int waitfds[2] = { *eivr_commands_fd, (eivr_errors_fd) ? *eivr_errors_fd : -1 }; + int waitfds[2]; + int r; struct ast_channel *rchan; int res = -1; - int test_available_fd = -1; int hangup_info_sent = 0; - - FILE *eivr_commands = NULL; - FILE *eivr_errors = NULL; - FILE *eivr_events = NULL; - if (!(eivr_events = fdopen(*eivr_events_fd, "w"))) { - ast_chan_log(LOG_ERROR, chan, "Could not open stream to send events\n"); - goto exit; - } - if (!(eivr_commands = fdopen(*eivr_commands_fd, "r"))) { - ast_chan_log(LOG_ERROR, chan, "Could not open stream to receive commands\n"); - goto exit; - } - if (eivr_errors_fd) { /* if opening a socket connection, error stream will not be used */ - if (!(eivr_errors = fdopen(*eivr_errors_fd, "r"))) { - ast_chan_log(LOG_ERROR, chan, "Could not open stream to receive errors\n"); - goto exit; - } - } - - test_available_fd = open("/dev/null", O_RDONLY); - - setvbuf(eivr_events, NULL, _IONBF, 0); - setvbuf(eivr_commands, NULL, _IONBF, 0); - if (eivr_errors) { - setvbuf(eivr_errors, NULL, _IONBF, 0); - } + waitfds[0] = ast_iostream_get_fd(eivr_commands); + waitfds[1] = eivr_errors ? ast_iostream_get_fd(eivr_errors) : -1; while (1) { if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_ZOMBIE)) { @@ -667,7 +666,7 @@ static int eivr_comm(struct ast_channel *chan, struct ivr_localuser *u, errno = 0; exception = 0; - rchan = ast_waitfor_nandfds(&chan, 1, waitfds, (eivr_errors_fd) ? 2 : 1, &exception, &ready_fd, &ms); + rchan = ast_waitfor_nandfds(&chan, 1, waitfds, (eivr_errors) ? 2 : 1, &exception, &ready_fd, &ms); if (ast_channel_state(chan) == AST_STATE_UP && !AST_LIST_EMPTY(&u->finishlist)) { AST_LIST_LOCK(&u->finishlist); @@ -715,15 +714,18 @@ static int eivr_comm(struct ast_channel *chan, struct ivr_localuser *u, break; } ast_frfree(f); - } else if (ready_fd == *eivr_commands_fd) { - char input[1024]; - - if (exception || (dup2(*eivr_commands_fd, test_available_fd) == -1) || feof(eivr_commands)) { + } else if (ready_fd == waitfds[0]) { + if (exception) { ast_chan_log(LOG_ERROR, chan, "Child process went away\n"); break; } - if (!fgets(input, sizeof(input), eivr_commands)) { + r = ast_iostream_gets(eivr_commands, input, sizeof(input)); + if (r <= 0) { + if (r == 0) { + ast_chan_log(LOG_ERROR, chan, "Child process went away\n"); + break; + } continue; } @@ -869,16 +871,19 @@ static int eivr_comm(struct ast_channel *chan, struct ivr_localuser *u, else ast_chan_log(LOG_WARNING, chan, "Unknown option requested: %s\n", &input[2]); } - } else if (eivr_errors_fd && (ready_fd == *eivr_errors_fd)) { - char input[1024]; - - if (exception || feof(eivr_errors)) { + } else if (ready_fd == waitfds[1]) { + if (exception) { ast_chan_log(LOG_ERROR, chan, "Child process went away\n"); break; } - if (fgets(input, sizeof(input), eivr_errors)) { + + r = ast_iostream_gets(eivr_errors, input, sizeof(input)); + if (r > 0) { ast_chan_log(LOG_NOTICE, chan, "stderr: %s\n", ast_strip(input)); - } + } else if (r == 0) { + ast_chan_log(LOG_ERROR, chan, "Child process went away\n"); + break; + } } else if ((ready_fd < 0) && ms) { if (errno == 0 || errno == EINTR) continue; @@ -888,23 +893,7 @@ static int eivr_comm(struct ast_channel *chan, struct ivr_localuser *u, } } - exit: - if (test_available_fd > -1) { - close(test_available_fd); - } - if (eivr_events) { - fclose(eivr_events); - *eivr_events_fd = -1; - } - if (eivr_commands) { - fclose(eivr_commands); - *eivr_commands_fd = -1; - } - if (eivr_errors) { - fclose(eivr_errors); - *eivr_errors_fd = -1; - } - return res; + return res; } static int unload_module(void) diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 297981a..ac44f2d 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -2543,7 +2543,7 @@ static struct sip_threadinfo *sip_threadinfo_create(struct ast_tcptls_session_in } ao2_t_ref(tcptls_session, +1, "tcptls_session ref for sip_threadinfo object"); th->tcptls_session = tcptls_session; - th->type = transport ? transport : (tcptls_session->ssl ? AST_TRANSPORT_TLS: AST_TRANSPORT_TCP); + th->type = transport ? transport : (ast_iostream_get_ssl(tcptls_session->stream) ? AST_TRANSPORT_TLS: AST_TRANSPORT_TCP); ao2_t_link(threadt, th, "Adding new tcptls helper thread"); ao2_t_ref(th, -1, "Decrementing threadinfo ref from alloc, only table ref remains"); return th; @@ -2566,8 +2566,7 @@ static int sip_tcptls_write(struct ast_tcptls_session_instance *tcptls_session, ao2_lock(tcptls_session); - if ((tcptls_session->fd == -1) || - !(th = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread")) || + if (!(th = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread")) || !(packet = ao2_alloc(sizeof(*packet), tcptls_packet_destructor)) || !(packet->data = ast_str_create(len))) { goto tcptls_write_setup_error; @@ -2880,7 +2879,7 @@ static int sip_tcptls_read(struct sip_request *req, struct ast_tcptls_session_in } else { timeout = -1; } - res = ast_wait_for_input(tcptls_session->fd, timeout); + res = ast_wait_for_input(ast_iostream_get_fd(tcptls_session->stream), timeout); if (res < 0) { ast_debug(2, "SIP TCP/TLS server :: ast_wait_for_input returned %d\n", res); return -1; @@ -2889,7 +2888,7 @@ static int sip_tcptls_read(struct sip_request *req, struct ast_tcptls_session_in return -1; } - res = ast_tcptls_server_read(tcptls_session, readbuf, sizeof(readbuf) - 1); + res = ast_iostream_read(tcptls_session->stream, readbuf, sizeof(readbuf) - 1); if (res < 0) { if (errno == EAGAIN || errno == EINTR) { continue; @@ -2950,18 +2949,8 @@ static void *_sip_tcp_helper_thread(struct ast_tcptls_session_instance *tcptls_s goto cleanup; } - if ((flags = fcntl(tcptls_session->fd, F_GETFL)) == -1) { - ast_log(LOG_ERROR, "error setting socket to non blocking mode, fcntl() failed: %s\n", strerror(errno)); - goto cleanup; - } - - flags |= O_NONBLOCK; - if (fcntl(tcptls_session->fd, F_SETFL, flags) == -1) { - ast_log(LOG_ERROR, "error setting socket to non blocking mode, fcntl() failed: %s\n", strerror(errno)); - goto cleanup; - } - - if (!(me = sip_threadinfo_create(tcptls_session, tcptls_session->ssl ? AST_TRANSPORT_TLS : AST_TRANSPORT_TCP))) { + ast_iostream_nonblock(tcptls_session->stream); + if (!(me = sip_threadinfo_create(tcptls_session, ast_iostream_get_ssl(tcptls_session->stream) ? AST_TRANSPORT_TLS : AST_TRANSPORT_TCP))) { goto cleanup; } ao2_t_ref(me, +1, "Adding threadinfo ref for tcp_helper_thread"); @@ -2978,16 +2967,16 @@ static void *_sip_tcp_helper_thread(struct ast_tcptls_session_instance *tcptls_s } flags = 1; - if (setsockopt(tcptls_session->fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) { + if (setsockopt(ast_iostream_get_fd(tcptls_session->stream), SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) { ast_log(LOG_ERROR, "error enabling TCP keep-alives on sip socket: %s\n", strerror(errno)); goto cleanup; } me->threadid = pthread_self(); - ast_debug(2, "Starting thread for %s server\n", tcptls_session->ssl ? "TLS" : "TCP"); + ast_debug(2, "Starting thread for %s server\n", ast_iostream_get_ssl(tcptls_session->stream) ? "TLS" : "TCP"); /* set up pollfd to watch for reads on both the socket and the alert_pipe */ - fds[0].fd = tcptls_session->fd; + fds[0].fd = ast_iostream_get_fd(tcptls_session->stream); fds[1].fd = me->alert_pipe[0]; fds[0].events = fds[1].events = POLLIN | POLLPRI; @@ -3007,9 +2996,9 @@ static void *_sip_tcp_helper_thread(struct ast_tcptls_session_instance *tcptls_s * We cannot let the stream exclusively wait for data to arrive. * We have to wake up the task to send outgoing messages. */ - ast_tcptls_stream_set_exclusive_input(tcptls_session->stream_cookie, 0); + ast_iostream_set_exclusive_input(tcptls_session->stream, 0); - ast_tcptls_stream_set_timeout_sequence(tcptls_session->stream_cookie, ast_tvnow(), + ast_iostream_set_timeout_sequence(tcptls_session->stream, ast_tvnow(), tcptls_session->client ? -1 : (authtimeout * 1000)); for (;;) { @@ -3017,7 +3006,7 @@ static void *_sip_tcp_helper_thread(struct ast_tcptls_session_instance *tcptls_s if (!tcptls_session->client && req.authenticated && !authenticated) { authenticated = 1; - ast_tcptls_stream_set_timeout_disable(tcptls_session->stream_cookie); + ast_iostream_set_timeout_disable(tcptls_session->stream); ast_atomic_fetchadd_int(&unauth_sessions, -1); } @@ -3028,7 +3017,7 @@ static void *_sip_tcp_helper_thread(struct ast_tcptls_session_instance *tcptls_s } if (timeout == 0) { - ast_debug(2, "SIP %s server timed out\n", tcptls_session->ssl ? "TLS": "TCP"); + ast_debug(2, "SIP %s server timed out\n", ast_iostream_get_ssl(tcptls_session->stream) ? "TLS": "TCP"); goto cleanup; } } else { @@ -3038,11 +3027,11 @@ static void *_sip_tcp_helper_thread(struct ast_tcptls_session_instance *tcptls_s if (ast_str_strlen(tcptls_session->overflow_buf) == 0) { res = ast_poll(fds, 2, timeout); /* polls for both socket and alert_pipe */ if (res < 0) { - ast_debug(2, "SIP %s server :: ast_wait_for_input returned %d\n", tcptls_session->ssl ? "TLS": "TCP", res); + ast_debug(2, "SIP %s server :: ast_wait_for_input returned %d\n", ast_iostream_get_ssl(tcptls_session->stream) ? "TLS": "TCP", res); goto cleanup; } else if (res == 0) { /* timeout */ - ast_debug(2, "SIP %s server timed out\n", tcptls_session->ssl ? "TLS": "TCP"); + ast_debug(2, "SIP %s server timed out\n", ast_iostream_get_ssl(tcptls_session->stream) ? "TLS": "TCP"); goto cleanup; } } @@ -3067,14 +3056,14 @@ static void *_sip_tcp_helper_thread(struct ast_tcptls_session_instance *tcptls_s memset(buf, 0, sizeof(buf)); - if (tcptls_session->ssl) { + if (ast_iostream_get_ssl(tcptls_session->stream)) { set_socket_transport(&req.socket, AST_TRANSPORT_TLS); req.socket.port = htons(ourport_tls); } else { set_socket_transport(&req.socket, AST_TRANSPORT_TCP); req.socket.port = htons(ourport_tcp); } - req.socket.fd = tcptls_session->fd; + req.socket.fd = ast_iostream_get_fd(tcptls_session->stream); res = sip_tcptls_read(&req, tcptls_session, authenticated, start); if (res < 0) { @@ -3108,7 +3097,7 @@ static void *_sip_tcp_helper_thread(struct ast_tcptls_session_instance *tcptls_s ao2_unlock(me); if (packet) { - if (ast_tcptls_server_write(tcptls_session, ast_str_buffer(packet->data), packet->len) == -1) { + if (ast_iostream_write(tcptls_session->stream, ast_str_buffer(packet->data), packet->len) == -1) { ast_log(LOG_WARNING, "Failure to write to tcp/tls socket\n"); } ao2_t_ref(packet, -1, "tcptls packet sent, this is no longer needed"); @@ -3120,7 +3109,7 @@ static void *_sip_tcp_helper_thread(struct ast_tcptls_session_instance *tcptls_s } } - ast_debug(2, "Shutting down thread for %s server\n", tcptls_session->ssl ? "TLS" : "TCP"); + ast_debug(2, "Shutting down thread for %s server\n", ast_iostream_get_ssl(tcptls_session->stream) ? "TLS" : "TCP"); cleanup: if (tcptls_session && !tcptls_session->client && !authenticated) { @@ -29089,9 +29078,8 @@ static int sip_prepare_socket(struct sip_pvt *p) return s->fd; } if ((s->type & (AST_TRANSPORT_TCP | AST_TRANSPORT_TLS)) && - (s->tcptls_session) && - (s->tcptls_session->fd != -1)) { - return s->tcptls_session->fd; + s->tcptls_session) { + return ast_iostream_get_fd(s->tcptls_session->stream); } if ((s->type & (AST_TRANSPORT_WS | AST_TRANSPORT_WSS))) { return s->ws_session ? ast_websocket_fd(s->ws_session) : -1; @@ -29121,7 +29109,7 @@ static int sip_prepare_socket(struct sip_pvt *p) /* 1. check for existing threads */ ast_sockaddr_copy(&sa_tmp, sip_real_dst(p)); if ((tcptls_session = sip_tcp_locate(&sa_tmp))) { - s->fd = tcptls_session->fd; + s->fd = ast_iostream_get_fd(tcptls_session->stream); if (s->tcptls_session) { ao2_ref(s->tcptls_session, -1); s->tcptls_session = NULL; @@ -29168,7 +29156,7 @@ static int sip_prepare_socket(struct sip_pvt *p) goto create_tcptls_session_fail; } - s->fd = s->tcptls_session->fd; + s->fd = ast_iostream_get_fd(s->tcptls_session->stream); /* client connections need to have the sip_threadinfo object created before * the thread is detached. This ensures the alert_pipe is up before it will @@ -29970,8 +29958,7 @@ static int sip_send_keepalive(const void *data) if ((peer->socket.fd != -1) && (peer->socket.type == AST_TRANSPORT_UDP)) { res = ast_sendto(peer->socket.fd, keepalive, sizeof(keepalive), 0, &peer->addr); } else if ((peer->socket.type & (AST_TRANSPORT_TCP | AST_TRANSPORT_TLS)) && - (peer->socket.tcptls_session) && - (peer->socket.tcptls_session->fd != -1)) { + peer->socket.tcptls_session) { res = sip_tcptls_write(peer->socket.tcptls_session, keepalive, sizeof(keepalive)); } else if (peer->socket.type == AST_TRANSPORT_UDP) { res = ast_sendto(sipsock, keepalive, sizeof(keepalive), 0, &peer->addr); diff --git a/configure.ac b/configure.ac index 66c8971..20bbf76 100644 --- a/configure.ac +++ b/configure.ac @@ -815,10 +815,6 @@ AC_ARG_ENABLE([asteriskssl], esac], [AST_ASTERISKSSL=yes]) AC_SUBST(AST_ASTERISKSSL) -# https support (in main/http.c) uses funopen on BSD systems, -# fopencookie on linux -AC_CHECK_FUNCS([funopen fopencookie]) - AC_CHECK_FUNCS([inet_aton]) # check if we have IP_PKTINFO constant defined diff --git a/include/asterisk/iostream.h b/include/asterisk/iostream.h new file mode 100644 index 0000000..c641ffb --- /dev/null +++ b/include/asterisk/iostream.h @@ -0,0 +1,118 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 1999 - 2015, Digium, Inc. + * + * Timo Teräs + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_IOSTREAM_H +#define _ASTERISK_IOSTREAM_H + +/*! + * \file iostream.h + * + * \brief Generic abstraction for input/output streams. + */ + +#if defined(HAVE_OPENSSL) +#define DO_SSL /* comment in/out if you want to support ssl */ +#endif + +#ifdef DO_SSL +#include +#include +#include +#else +/* declare dummy types so we can define a pointer to them */ +typedef struct {} SSL; +typedef struct {} SSL_CTX; +#endif /* DO_SSL */ + +struct ast_iostream; + +/*! + * \brief Disable the iostream timeout timer. + * + * \param stream iostream control data. + * + * \return Nothing + */ +void ast_iostream_set_timeout_disable(struct ast_iostream *stream); + +/*! + * \brief Set the iostream inactivity timeout timer. + * + * \param stream iostream control data. + * \param timeout Number of milliseconds to wait for data transfer with the peer. + * + * \details This is basically how much time we are willing to spend + * in an I/O call before we declare the peer unresponsive. + * + * \note Setting timeout to -1 disables the timeout. + * \note Setting this timeout replaces the I/O sequence timeout timer. + * + * \return Nothing + */ +void ast_iostream_set_timeout_inactivity(struct ast_iostream *stream, int timeout); + +void ast_iostream_set_timeout_idle_inactivity(struct ast_iostream *stream, int timeout, int timeout_reset); + +/*! + * \brief Set the iostream I/O sequence timeout timer. + * + * \param stream iostream control data. + * \param start Time the I/O sequence timer starts. + * \param timeout Number of milliseconds from the start time before timeout. + * + * \details This is how much time are we willing to allow the peer + * to complete an operation that can take several I/O calls. The + * main use is as an authentication timer with us. + * + * \note Setting timeout to -1 disables the timeout. + * \note Setting this timeout replaces the inactivity timeout timer. + * + * \return Nothing + */ +void ast_iostream_set_timeout_sequence(struct ast_iostream *stream, struct timeval start, int timeout); + +/*! + * \brief Set the iostream if it can exclusively depend upon the set timeouts. + * + * \param stream iostream control data. + * \param exclusive_input TRUE if stream can exclusively wait for fd input. + * Otherwise, the stream will not wait for fd input. It will wait while + * trying to send data. + * + * \note The stream timeouts still need to be set. + * + * \return Nothing + */ +void ast_iostream_set_exclusive_input(struct ast_iostream *stream, int exclusive_input); + +int ast_iostream_get_fd(struct ast_iostream *stream); +void ast_iostream_nonblock(struct ast_iostream *stream); + +SSL* ast_iostream_get_ssl(struct ast_iostream *stream); + +ssize_t ast_iostream_read(struct ast_iostream *stream, void *buf, size_t count); +ssize_t ast_iostream_gets(struct ast_iostream *stream, char *buf, size_t count); +ssize_t ast_iostream_discard(struct ast_iostream *stream, size_t count); +ssize_t ast_iostream_write(struct ast_iostream *stream, const void *buf, size_t count); +ssize_t ast_iostream_printf(struct ast_iostream *stream, const void *fmt, ...); + +struct ast_iostream* ast_iostream_from_fd(int *fd); +int ast_iostream_start_tls(struct ast_iostream **stream, SSL_CTX *ctx, int client); +int ast_iostream_close(struct ast_iostream *stream); + +#endif /* _ASTERISK_IOSTREAM_H */ diff --git a/include/asterisk/tcptls.h b/include/asterisk/tcptls.h index 3c5f450..883cb92 100644 --- a/include/asterisk/tcptls.h +++ b/include/asterisk/tcptls.h @@ -57,20 +57,7 @@ #include "asterisk/netsock2.h" #include "asterisk/utils.h" - -#if defined(HAVE_OPENSSL) && (defined(HAVE_FUNOPEN) || defined(HAVE_FOPENCOOKIE)) -#define DO_SSL /* comment in/out if you want to support ssl */ -#endif - -#ifdef DO_SSL -#include -#include -#include -#else -/* declare dummy types so we can define a pointer to them */ -typedef struct {} SSL; -typedef struct {} SSL_CTX; -#endif /* DO_SSL */ +#include "asterisk/iostream.h" /*! SSL support */ #define AST_CERTFILE "asterisk.pem" @@ -153,72 +140,10 @@ struct ast_tcptls_session_args { const char *name; }; -struct ast_tcptls_stream; - -/*! - * \brief Disable the TCP/TLS stream timeout timer. - * - * \param stream TCP/TLS stream control data. - * - * \return Nothing - */ -void ast_tcptls_stream_set_timeout_disable(struct ast_tcptls_stream *stream); - -/*! - * \brief Set the TCP/TLS stream inactivity timeout timer. - * - * \param stream TCP/TLS stream control data. - * \param timeout Number of milliseconds to wait for data transfer with the peer. - * - * \details This is basically how much time we are willing to spend - * in an I/O call before we declare the peer unresponsive. - * - * \note Setting timeout to -1 disables the timeout. - * \note Setting this timeout replaces the I/O sequence timeout timer. - * - * \return Nothing - */ -void ast_tcptls_stream_set_timeout_inactivity(struct ast_tcptls_stream *stream, int timeout); - -/*! - * \brief Set the TCP/TLS stream I/O sequence timeout timer. - * - * \param stream TCP/TLS stream control data. - * \param start Time the I/O sequence timer starts. - * \param timeout Number of milliseconds from the start time before timeout. - * - * \details This is how much time are we willing to allow the peer - * to complete an operation that can take several I/O calls. The - * main use is as an authentication timer with us. - * - * \note Setting timeout to -1 disables the timeout. - * \note Setting this timeout replaces the inactivity timeout timer. - * - * \return Nothing - */ -void ast_tcptls_stream_set_timeout_sequence(struct ast_tcptls_stream *stream, struct timeval start, int timeout); - -/*! - * \brief Set the TCP/TLS stream I/O if it can exclusively depend upon the set timeouts. - * - * \param stream TCP/TLS stream control data. - * \param exclusive_input TRUE if stream can exclusively wait for fd input. - * Otherwise, the stream will not wait for fd input. It will wait while - * trying to send data. - * - * \note The stream timeouts still need to be set. - * - * \return Nothing - */ -void ast_tcptls_stream_set_exclusive_input(struct ast_tcptls_stream *stream, int exclusive_input); - /*! \brief * describes a server instance */ struct ast_tcptls_session_instance { - FILE *f; /*!< fopen/funopen result */ - int fd; /*!< the socket returned by accept() */ - SSL *ssl; /*!< ssl state */ int client; struct ast_sockaddr remote_address; struct ast_tcptls_session_args *parent; @@ -228,20 +153,12 @@ struct ast_tcptls_session_instance { * extra data. */ struct ast_str *overflow_buf; - /*! ao2 FILE stream cookie object associated with f. */ - struct ast_tcptls_stream *stream_cookie; + /*! ao2 stream object associated with this session. */ + struct ast_iostream *stream; /*! ao2 object private data of parent->worker_fn */ void *private_data; }; -#if defined(HAVE_FUNOPEN) -#define HOOK_T int -#define LEN_T int -#else -#define HOOK_T ssize_t -#define LEN_T size_t -#endif - /*! * \brief attempts to connect and start tcptls session, on error the tcptls_session's * ref count is decremented, fd and file are closed, and NULL is returned. @@ -297,7 +214,4 @@ void ast_ssl_teardown(struct ast_tls_config *cfg); */ int ast_tls_read_conf(struct ast_tls_config *tls_cfg, struct ast_tcptls_session_args *tls_desc, const char *varname, const char *value); -HOOK_T ast_tcptls_server_read(struct ast_tcptls_session_instance *ser, void *buf, size_t count); -HOOK_T ast_tcptls_server_write(struct ast_tcptls_session_instance *ser, const void *buf, size_t count); - #endif /* _ASTERISK_TCPTLS_H */ diff --git a/main/http.c b/main/http.c index da564da..8cb1c2f 100644 --- a/main/http.c +++ b/main/http.c @@ -451,11 +451,13 @@ void ast_http_send(struct ast_tcptls_session_instance *ser, struct timeval now = ast_tvnow(); struct ast_tm tm; char timebuf[80]; + char buf[256]; + int len; int content_length = 0; int close_connection; struct ast_str *server_header_field = ast_str_create(MAX_SERVER_NAME_LENGTH); - if (!ser || !ser->f || !server_header_field) { + if (!ser || !server_header_field) { /* The connection is not open. */ ast_free(http_header); ast_free(out); @@ -505,7 +507,7 @@ void ast_http_send(struct ast_tcptls_session_instance *ser, } /* send http header */ - fprintf(ser->f, + ast_iostream_printf(ser->stream, "HTTP/1.1 %d %s\r\n" "%s" "Date: %s\r\n" @@ -526,18 +528,16 @@ void ast_http_send(struct ast_tcptls_session_instance *ser, /* send content */ if (method != AST_HTTP_HEAD || status_code >= 400) { if (out && ast_str_strlen(out)) { - if (fwrite(ast_str_buffer(out), ast_str_strlen(out), 1, ser->f) != 1) { + len = ast_str_strlen(out); + if (ast_iostream_write(ser->stream, ast_str_buffer(out), len) != len) { ast_log(LOG_ERROR, "fwrite() failed: %s\n", strerror(errno)); close_connection = 1; } } if (fd) { - char buf[256]; - int len; - while ((len = read(fd, buf, sizeof(buf))) > 0) { - if (fwrite(buf, len, 1, ser->f) != 1) { + if (ast_iostream_write(ser->stream, buf, len) != len) { ast_log(LOG_WARNING, "fwrite() failed: %s\n", strerror(errno)); close_connection = 1; break; @@ -569,7 +569,7 @@ void ast_http_create_response(struct ast_tcptls_session_instance *ser, int statu ast_free(http_header_data); ast_free(server_address); ast_free(out); - if (ser && ser->f) { + if (ser) { ast_debug(1, "HTTP closing session. OOM.\n"); ast_tcptls_close_session_file(ser); } @@ -923,9 +923,9 @@ static int http_body_read_contents(struct ast_tcptls_session_instance *ser, char { int res; - /* Stay in fread until get all the expected data or timeout. */ - res = fread(buf, length, 1, ser->f); - if (res < 1) { + /* Stream is in exclusive mode so we get it all if possible. */ + res = ast_iostream_read(ser->stream, buf, length); + if (res < length) { ast_log(LOG_WARNING, "Short HTTP request %s (Wanted %d)\n", what_getting, length); return -1; @@ -947,23 +947,12 @@ static int http_body_read_contents(struct ast_tcptls_session_instance *ser, char */ static int http_body_discard_contents(struct ast_tcptls_session_instance *ser, int length, const char *what_getting) { - int res; - char buf[MAX_HTTP_LINE_LENGTH];/* Discard buffer */ - - /* Stay in fread until get all the expected data or timeout. */ - while (sizeof(buf) < length) { - res = fread(buf, sizeof(buf), 1, ser->f); - if (res < 1) { - ast_log(LOG_WARNING, "Short HTTP request %s (Wanted %zu of remaining %d)\n", - what_getting, sizeof(buf), length); - return -1; - } - length -= sizeof(buf); - } - res = fread(buf, length, 1, ser->f); - if (res < 1) { - ast_log(LOG_WARNING, "Short HTTP request %s (Wanted %d of remaining %d)\n", - what_getting, length, length); + ssize_t res; + + res = ast_iostream_discard(ser->stream, length); + if (res < length) { + ast_log(LOG_WARNING, "Short HTTP request %s (Wanted %d but got %zd)\n", + what_getting, length, res); return -1; } return 0; @@ -1039,7 +1028,7 @@ static int http_body_get_chunk_length(struct ast_tcptls_session_instance *ser) char header_line[MAX_HTTP_LINE_LENGTH]; /* get the line of hexadecimal giving chunk-size w/ optional chunk-extension */ - if (!fgets(header_line, sizeof(header_line), ser->f)) { + if (ast_iostream_gets(ser->stream, header_line, sizeof(header_line)) <= 0) { ast_log(LOG_WARNING, "Short HTTP read of chunked header\n"); return -1; } @@ -1067,8 +1056,8 @@ static int http_body_check_chunk_sync(struct ast_tcptls_session_instance *ser) char chunk_sync[2]; /* Stay in fread until get the expected CRLF or timeout. */ - res = fread(chunk_sync, sizeof(chunk_sync), 1, ser->f); - if (res < 1) { + res = ast_iostream_read(ser->stream, chunk_sync, sizeof(chunk_sync)); + if (res < sizeof(chunk_sync)) { ast_log(LOG_WARNING, "Short HTTP chunk sync read (Wanted %zu)\n", sizeof(chunk_sync)); return -1; @@ -1097,7 +1086,7 @@ static int http_body_discard_chunk_trailer_headers(struct ast_tcptls_session_ins char header_line[MAX_HTTP_LINE_LENGTH]; for (;;) { - if (!fgets(header_line, sizeof(header_line), ser->f)) { + if (ast_iostream_gets(ser->stream, header_line, sizeof(header_line)) <= 0) { ast_log(LOG_WARNING, "Short HTTP read of chunked trailer header\n"); return -1; } @@ -1760,7 +1749,7 @@ static int http_request_headers_get(struct ast_tcptls_session_instance *ser, str char *name; char *value; - if (!fgets(header_line, sizeof(header_line), ser->f)) { + if (ast_iostream_gets(ser->stream, header_line, sizeof(header_line)) <= 0) { ast_http_error(ser, 400, "Bad Request", "Timeout"); return -1; } @@ -1834,7 +1823,7 @@ static int httpd_process_request(struct ast_tcptls_session_instance *ser) int res; char request_line[MAX_HTTP_LINE_LENGTH]; - if (!fgets(request_line, sizeof(request_line), ser->f)) { + if (ast_iostream_gets(ser->stream, request_line, sizeof(request_line)) <= 0) { return -1; } @@ -1915,11 +1904,10 @@ static int httpd_process_request(struct ast_tcptls_session_instance *ser) static void *httpd_helper_thread(void *data) { struct ast_tcptls_session_instance *ser = data; - struct protoent *p; - int flags; int timeout; + int arg = 1; - if (!ser || !ser->f) { + if (!ser) { ao2_cleanup(ser); return NULL; } @@ -1936,23 +1924,11 @@ static void *httpd_helper_thread(void *data) * This is necessary to prevent delays (caused by buffering) as we * write to the socket in bits and pieces. */ - p = getprotobyname("tcp"); - if (p) { - int arg = 1; - - if (setsockopt(ser->fd, p->p_proto, TCP_NODELAY, (char *) &arg, sizeof(arg) ) < 0) { - ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on HTTP connection: %s\n", strerror(errno)); - ast_log(LOG_WARNING, "Some HTTP requests may be slow to respond.\n"); - } - } else { - ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on HTTP connection, getprotobyname(\"tcp\") failed\n"); + if (setsockopt(ast_iostream_get_fd(ser->stream), IPPROTO_TCP, TCP_NODELAY, (char *) &arg, sizeof(arg) ) < 0) { + ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on HTTP connection: %s\n", strerror(errno)); ast_log(LOG_WARNING, "Some HTTP requests may be slow to respond.\n"); } - - /* make sure socket is non-blocking */ - flags = fcntl(ser->fd, F_GETFL); - flags |= O_NONBLOCK; - fcntl(ser->fd, F_SETFL, flags); + ast_iostream_nonblock(ser->stream); /* Setup HTTP worker private data to keep track of request body reading. */ ao2_cleanup(ser->private_data); @@ -1975,23 +1951,17 @@ static void *httpd_helper_thread(void *data) } /* We can let the stream wait for data to arrive. */ - ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 1); + ast_iostream_set_exclusive_input(ser->stream, 1); for (;;) { - int ch; - /* Wait for next potential HTTP request message. */ - ast_tcptls_stream_set_timeout_inactivity(ser->stream_cookie, timeout); - ch = fgetc(ser->f); - if (ch == EOF || ungetc(ch, ser->f) == EOF) { - /* Between request idle timeout */ - ast_debug(1, "HTTP idle timeout or peer closed connection.\n"); + ast_iostream_set_timeout_idle_inactivity(ser->stream, timeout, session_inactivity); + if (httpd_process_request(ser)) { + /* Break the connection or the connection closed */ break; } - - ast_tcptls_stream_set_timeout_inactivity(ser->stream_cookie, session_inactivity); - if (httpd_process_request(ser) || !ser->f || feof(ser->f)) { - /* Break the connection or the connection closed */ + if (!ser->stream) { + /* Web-socket or similar that took the connection */ break; } @@ -2005,10 +1975,9 @@ static void *httpd_helper_thread(void *data) done: ast_atomic_fetchadd_int(&session_count, -1); - if (ser->f) { - ast_debug(1, "HTTP closing session. Top level\n"); - ast_tcptls_close_session_file(ser); - } + ast_debug(1, "HTTP closing session. Top level\n"); + ast_tcptls_close_session_file(ser); + ao2_ref(ser, -1); return NULL; } diff --git a/main/iostream.c b/main/iostream.c new file mode 100644 index 0000000..46abc18 --- /dev/null +++ b/main/iostream.c @@ -0,0 +1,553 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 1999 - 2015, Digium, Inc. + * + * Timo Teräs + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#include +#include + +#include "asterisk.h" +#include "asterisk/utils.h" +#include "asterisk/astobj2.h" +#include "asterisk/iostream.h" + +struct ast_iostream { + SSL *ssl; + struct timeval start; + int fd; + int timeout; + int timeout_reset; + int exclusive_input; + int rbuflen; + char *rbufhead; + char rbuf[2048]; +}; + +int ast_iostream_get_fd(struct ast_iostream *stream) +{ + return stream->fd; +} + +void ast_iostream_nonblock(struct ast_iostream *stream) +{ + fcntl(stream->fd, F_SETFL, fcntl(stream->fd, F_GETFL) | O_NONBLOCK); +} + +SSL *ast_iostream_get_ssl(struct ast_iostream *stream) +{ + return stream->ssl; +} + +void ast_iostream_set_timeout_disable(struct ast_iostream *stream) +{ + ast_assert(stream != NULL); + + stream->timeout = -1; + stream->timeout_reset = -1; +} + +void ast_iostream_set_timeout_inactivity(struct ast_iostream *stream, int timeout) +{ + ast_assert(stream != NULL); + + stream->start.tv_sec = 0; + stream->timeout = timeout; + stream->timeout_reset = timeout; +} + +void ast_iostream_set_timeout_idle_inactivity(struct ast_iostream *stream, int timeout, int timeout_reset) +{ + ast_assert(stream != NULL); + + stream->start.tv_sec = 0; + stream->timeout = timeout; + stream->timeout_reset = timeout_reset; +} + +void ast_iostream_set_timeout_sequence(struct ast_iostream *stream, struct timeval start, int timeout) +{ + ast_assert(stream != NULL); + + stream->start = start; + stream->timeout = timeout; + stream->timeout_reset = timeout; +} + +void ast_iostream_set_exclusive_input(struct ast_iostream *stream, int exclusive_input) +{ + ast_assert(stream != NULL); + + stream->exclusive_input = exclusive_input; +} + +static ssize_t iostream_read(struct ast_iostream *stream, void *buf, size_t size) +{ + struct timeval start; + int ms; + int res; + + if (stream->start.tv_sec) { + start = stream->start; + } else { + start = ast_tvnow(); + } + +#if defined(DO_SSL) + if (stream->ssl) { + for (;;) { + res = SSL_read(stream->ssl, buf, size); + if (0 < res) { + /* We read some payload data. */ + stream->timeout = stream->timeout_reset; + return res; + } + switch (SSL_get_error(stream->ssl, res)) { + case SSL_ERROR_ZERO_RETURN: + /* Report EOF for a shutdown */ + ast_debug(1, "TLS clean shutdown alert reading data\n"); + return 0; + case SSL_ERROR_WANT_READ: + if (!stream->exclusive_input) { + /* We cannot wait for data now. */ + errno = EAGAIN; + return -1; + } + while ((ms = ast_remaining_ms(start, stream->timeout))) { + res = ast_wait_for_input(stream->fd, ms); + if (0 < res) { + /* Socket is ready to be read. */ + break; + } + if (res < 0) { + if (errno == EINTR || errno == EAGAIN) { + /* Try again. */ + continue; + } + ast_debug(1, "TLS socket error waiting for read data: %s\n", + strerror(errno)); + return -1; + } + } + break; + case SSL_ERROR_WANT_WRITE: + while ((ms = ast_remaining_ms(start, stream->timeout))) { + res = ast_wait_for_output(stream->fd, ms); + if (0 < res) { + /* Socket is ready to be written. */ + break; + } + if (res < 0) { + if (errno == EINTR || errno == EAGAIN) { + /* Try again. */ + continue; + } + ast_debug(1, "TLS socket error waiting for write space: %s\n", + strerror(errno)); + return -1; + } + } + break; + default: + /* Report EOF for an undecoded SSL or transport error. */ + ast_debug(1, "TLS transport or SSL error reading data\n"); + return 0; + } + if (!ms) { + /* Report EOF for a timeout */ + ast_debug(1, "TLS timeout reading data\n"); + return 0; + } + } + } +#endif /* defined(DO_SSL) */ + + for (;;) { + res = read(stream->fd, buf, size); + if (0 <= res) { + /* Got data or we cannot wait for it. */ + stream->timeout = stream->timeout_reset; + return res; + } + if (!stream->exclusive_input) { + return res; + } + if (errno != EINTR && errno != EAGAIN) { + /* Not a retryable error. */ + ast_debug(1, "TCP socket error reading data: %s\n", + strerror(errno)); + return -1; + } + ms = ast_remaining_ms(start, stream->timeout); + if (!ms) { + /* Report EOF for a timeout */ + ast_debug(1, "TCP timeout reading data\n"); + return 0; + } + ast_wait_for_input(stream->fd, ms); + } +} + +ssize_t ast_iostream_read(struct ast_iostream *stream, void *buf, size_t size) +{ + if (!size) { + /* You asked for no data you got no data. */ + return 0; + } + + if (!stream || stream->fd == -1) { + errno = EBADF; + return -1; + } + + /* Get any remains from the read buffer */ + if (stream->rbuflen) { + size_t r = size; + if (r > stream->rbuflen) { + r = stream->rbuflen; + } + memcpy(buf, stream->rbufhead, r); + stream->rbuflen -= r; + stream->rbufhead += r; + return r; + } + + return iostream_read(stream, buf, size); +} + +ssize_t ast_iostream_gets(struct ast_iostream *stream, char *buf, size_t count) +{ + ssize_t r; + char *newline; + + do { + /* Search for newline */ + newline = memchr(stream->rbufhead, '\n', stream->rbuflen); + if (newline) { + r = newline - stream->rbufhead + 1; + if (r > count-1) { + r = count-1; + } + break; + } + + /* Enough data? */ + if (stream->rbuflen >= count - 1) { + r = count - 1; + break; + } + + /* Try to fill in line buffer */ + if (stream->rbuflen && stream->rbuf != stream->rbufhead) { + memmove(&stream->rbuf, stream->rbufhead, stream->rbuflen); + } + stream->rbufhead = stream->rbuf; + + r = iostream_read(stream, stream->rbufhead + stream->rbuflen, sizeof(stream->rbuf) - stream->rbuflen); + if (r <= 0) { + return r; + } + stream->rbuflen += r; + } while (1); + + /* Return r bytes with termination byte */ + memcpy(buf, stream->rbufhead, r); + buf[r] = 0; + stream->rbuflen -= r; + stream->rbufhead += r; + + return r; +} + +ssize_t ast_iostream_discard(struct ast_iostream *stream, size_t size) +{ + char buf[1024]; + size_t remaining = size; + ssize_t ret; + + while (remaining) { + ret = ast_iostream_read(stream, buf, remaining > sizeof(buf) ? sizeof(buf) : remaining); + if (ret < 0) { + return ret; + } + remaining -= ret; + } + + return size; +} + +ssize_t ast_iostream_write(struct ast_iostream *stream, const void *buf, size_t size) +{ + struct timeval start; + int ms; + int res; + int written; + int remaining; + + if (!size) { + /* You asked to write no data you wrote no data. */ + return 0; + } + + if (!stream || stream->fd == -1) { + errno = EBADF; + return -1; + } + + if (stream->start.tv_sec) { + start = stream->start; + } else { + start = ast_tvnow(); + } + +#if defined(DO_SSL) + if (stream->ssl) { + written = 0; + remaining = size; + for (;;) { + res = SSL_write(stream->ssl, buf + written, remaining); + if (res == remaining) { + /* Everything was written. */ + return size; + } + if (0 < res) { + /* Successfully wrote part of the buffer. Try to write the rest. */ + written += res; + remaining -= res; + continue; + } + switch (SSL_get_error(stream->ssl, res)) { + case SSL_ERROR_ZERO_RETURN: + ast_debug(1, "TLS clean shutdown alert writing data\n"); + if (written) { + /* Report partial write. */ + return written; + } + errno = EBADF; + return -1; + case SSL_ERROR_WANT_READ: + ms = ast_remaining_ms(start, stream->timeout); + if (!ms) { + /* Report partial write. */ + ast_debug(1, "TLS timeout writing data (want read)\n"); + return written; + } + ast_wait_for_input(stream->fd, ms); + break; + case SSL_ERROR_WANT_WRITE: + ms = ast_remaining_ms(start, stream->timeout); + if (!ms) { + /* Report partial write. */ + ast_debug(1, "TLS timeout writing data (want write)\n"); + return written; + } + ast_wait_for_output(stream->fd, ms); + break; + default: + /* Undecoded SSL or transport error. */ + ast_debug(1, "TLS transport or SSL error writing data\n"); + if (written) { + /* Report partial write. */ + return written; + } + errno = EBADF; + return -1; + } + } + } +#endif /* defined(DO_SSL) */ + + written = 0; + remaining = size; + for (;;) { + res = write(stream->fd, buf + written, remaining); + if (res == remaining) { + /* Yay everything was written. */ + return size; + } + if (0 < res) { + /* Successfully wrote part of the buffer. Try to write the rest. */ + written += res; + remaining -= res; + continue; + } + if (errno != EINTR && errno != EAGAIN) { + /* Not a retryable error. */ + ast_debug(1, "TCP socket error writing: %s\n", strerror(errno)); + if (written) { + return written; + } + return -1; + } + ms = ast_remaining_ms(start, stream->timeout); + if (!ms) { + /* Report partial write. */ + ast_debug(1, "TCP timeout writing data\n"); + return written; + } + ast_wait_for_output(stream->fd, ms); + } +} + +ssize_t ast_iostream_printf(struct ast_iostream *stream, const void *fmt, ...) +{ + char sbuf[256], *buf = sbuf; + int len, len2, ret = -1; + va_list va; + + va_start(va, fmt); + len = vsnprintf(buf, sizeof(sbuf), fmt, va); + va_end(va); + + if (len > sizeof(sbuf)) { + buf = ast_malloc(len); + if (!buf) { + return -1; + } + va_start(va, fmt); + len2 = vsnprintf(buf, len, fmt, va); + va_end(va); + if (len2 > len) { + goto error; + } + } + + if (ast_iostream_write(stream, buf, len) == len) + ret = len; + +error: + if (buf != sbuf) { + ast_free(buf); + } + + return ret; +} + +int ast_iostream_close(struct ast_iostream *stream) +{ + if (!stream) { + errno = EBADF; + return -1; + } + + if (stream->fd != -1) { +#if defined(DO_SSL) + if (stream->ssl) { + int res; + + /* + * According to the TLS standard, it is acceptable for an + * application to only send its shutdown alert and then + * close the underlying connection without waiting for + * the peer's response (this way resources can be saved, + * as the process can already terminate or serve another + * connection). + */ + res = SSL_shutdown(stream->ssl); + if (res < 0) { + ast_log(LOG_ERROR, "SSL_shutdown() failed: %d\n", + SSL_get_error(stream->ssl, res)); + } + + if (!stream->ssl->server) { + /* For client threads, ensure that the error stack is cleared */ + ERR_remove_state(0); + } + + SSL_free(stream->ssl); + stream->ssl = NULL; + } +#endif /* defined(DO_SSL) */ + + /* + * Issuing shutdown() is necessary here to avoid a race + * condition where the last data written may not appear + * in the TCP stream. See ASTERISK-23548 + */ + shutdown(stream->fd, SHUT_RDWR); + if (close(stream->fd)) { + ast_log(LOG_ERROR, "close() failed: %s\n", strerror(errno)); + } + stream->fd = -1; + } + ao2_t_ref(stream, -1, "Closed ast_iostream"); + + return 0; +} + +static void iostream_dtor(void *cookie) +{ +#ifdef AST_DEVMODE + /* Since the ast_assert below is the only one using stream, + * and ast_assert is only available with AST_DEVMODE, we + * put this in a conditional to avoid compiler warnings. */ + struct ast_iostream *stream = cookie; +#endif + + ast_assert(stream->fd == -1); +} + +struct ast_iostream *ast_iostream_from_fd(int *fd) +{ + struct ast_iostream *stream; + + stream = ao2_alloc_options(sizeof(*stream), iostream_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); + if (stream) { + stream->timeout = -1; + stream->timeout_reset = -1; + stream->fd = *fd; + *fd = -1; + } + + return stream; +} + +int ast_iostream_start_tls(struct ast_iostream **pstream, SSL_CTX *ssl_ctx, int client) +{ +#ifdef DO_SSL + struct ast_iostream *stream = *pstream; + int (*ssl_setup)(SSL *) = client ? SSL_connect : SSL_accept; + char err[256]; + + stream->ssl = SSL_new(ssl_ctx); + if (!stream->ssl) { + ast_log(LOG_ERROR, "Unable to create new SSL connection\n"); + errno = ENOMEM; + return -1; + } + + /* + * This function takes struct ast_iostream **, so it can chain + * SSL over any ast_iostream. For now we assume it's a file descriptor. + * But later this should instead use BIO wrapper to tie SSL to another + * ast_iostream. + */ + SSL_set_fd(stream->ssl, stream->fd); + + if (ssl_setup(stream->ssl) <= 0) { + ast_log(LOG_ERROR, "Problem setting up ssl connection: %s\n", + ERR_error_string(ERR_get_error(), err)); + errno = EIO; + return -1; + } + + return 0; +#else + ast_log(LOG_ERROR, "SSL not enabled in this build\n"); + errno = ENOTSUP; + return -1; +#endif +} diff --git a/main/manager.c b/main/manager.c index 32322b8..8c3220e 100644 --- a/main/manager.c +++ b/main/manager.c @@ -1551,8 +1551,7 @@ static void acl_change_stasis_unsubscribe(void) struct mansession_session { /*! \todo XXX need to document which fields it is protecting */ struct ast_sockaddr addr; /*!< address we are connecting from */ - FILE *f; /*!< fdopen() on the underlying fd */ - int fd; /*!< descriptor used for output. Either the socket (AMI) or a temporary file (HTTP) */ + struct ast_iostream *stream; /*!< AMI stream */ int inuse; /*!< number of HTTP sessions using this entry */ int needdestroy; /*!< Whether an HTTP session should be destroyed */ pthread_t waiting_thread; /*!< Sleeping thread using this descriptor */ @@ -1594,9 +1593,8 @@ enum mansession_message_parsing { */ struct mansession { struct mansession_session *session; + struct ast_iostream *stream; struct ast_tcptls_session_instance *tcptls_session; - FILE *f; - int fd; enum mansession_message_parsing parsing; int write_error:1; struct manager_custom_hook *hook; @@ -2168,10 +2166,6 @@ static void session_destructor(void *obj) ast_datastore_free(datastore); } - if (session->f != NULL) { - fflush(session->f); - fclose(session->f); - } if (eqe) { ast_atomic_fetchadd_int(&eqe->usecount, -1); } @@ -2206,7 +2200,6 @@ static struct mansession_session *build_mansession(const struct ast_sockaddr *ad return NULL; } - newsession->fd = -1; newsession->waiting_thread = AST_PTHREADT_NULL; newsession->writetimeout = 100; newsession->send_events = -1; @@ -2619,7 +2612,7 @@ static char *handle_showmanconn(struct ast_cli_entry *e, int cmd, struct ast_cli ast_sockaddr_stringify_addr(&session->addr), (int) (session->sessionstart), (int) (now - session->sessionstart), - session->fd, + session->stream ? ast_iostream_get_fd(session->stream) : -1, session->inuse, session->readperm, session->writeperm); @@ -2891,7 +2884,6 @@ int ast_hook_send_action(struct manager_custom_hook *hook, const char *msg) * This is necessary to meet the previous design of manager.c */ s.hook = hook; - s.f = (void*)1; /* set this to something so our request will make it through all functions that test it*/ ao2_lock(act_found); if (act_found->registered && act_found->func) { @@ -2922,9 +2914,8 @@ int ast_hook_send_action(struct manager_custom_hook *hook, const char *msg) */ static int send_string(struct mansession *s, char *string) { - int res; - FILE *f = s->f ? s->f : s->session->f; - int fd = s->f ? s->fd : s->session->fd; + struct ast_iostream *stream = s->stream ? s->stream : s->session->stream; + int len, res; /* It's a result from one of the hook's action invocation */ if (s->hook) { @@ -2936,7 +2927,12 @@ static int send_string(struct mansession *s, char *string) return 0; } - if ((res = ast_careful_fwrite(f, fd, string, strlen(string), s->session->writetimeout))) { + len = strlen(string); + ast_iostream_set_timeout_inactivity(stream, s->session->writetimeout); + res = ast_iostream_write(stream, string, len); + ast_iostream_set_timeout_disable(stream); + + if (res < len) { s->write_error = 1; } @@ -2977,10 +2973,10 @@ void astman_append(struct mansession *s, const char *fmt, ...) return; } - if (s->f != NULL || s->session->f != NULL) { + if (s->tcptls_session != NULL && s->tcptls_session->stream != NULL) { send_string(s, ast_str_buffer(buf)); } else { - ast_verbose("fd == -1 in astman_append, should not happen\n"); + ast_verbose("No connection stream in astman_append, should not happen\n"); } } @@ -4121,7 +4117,7 @@ static int action_waitevent(struct mansession *s, const struct message *m) break; } if (s->session->managerid == 0) { /* AMI session */ - if (ast_wait_for_input(s->session->fd, 1000)) { + if (ast_wait_for_input(ast_iostream_get_fd(s->session->stream), 1000)) { break; } } else { /* HTTP session */ @@ -5926,7 +5922,7 @@ static int process_events(struct mansession *s) int ret = 0; ao2_lock(s->session); - if (s->session->f != NULL) { + if (s->session->stream != NULL) { struct eventqent *eqe = s->session->last_ev; while ((eqe = advance_event(eqe))) { @@ -6468,7 +6464,7 @@ static int get_input(struct mansession *s, char *output) s->session->waiting_thread = pthread_self(); ao2_unlock(s->session); - res = ast_wait_for_input(s->session->fd, timeout); + res = ast_wait_for_input(ast_iostream_get_fd(s->session->stream), timeout); ao2_lock(s->session); s->session->waiting_thread = AST_PTHREADT_NULL; @@ -6486,7 +6482,7 @@ static int get_input(struct mansession *s, char *output) } ao2_lock(s->session); - res = fread(src + s->session->inlen, 1, maxlen - s->session->inlen, s->session->f); + res = ast_iostream_read(s->session->stream, src + s->session->inlen, maxlen - s->session->inlen); if (res < 1) { res = -1; /* error return */ } else { @@ -6619,13 +6615,12 @@ static void *session_do(void *data) struct mansession s = { .tcptls_session = data, }; - int flags; int res; + int arg = 1; struct ast_sockaddr ser_remote_address_tmp; - struct protoent *p; if (ast_atomic_fetchadd_int(&unauth_sessions, +1) >= authlimit) { - fclose(ser->f); + ast_iostream_close(ser->stream); ast_atomic_fetchadd_int(&unauth_sessions, -1); goto done; } @@ -6634,7 +6629,7 @@ static void *session_do(void *data) session = build_mansession(&ser_remote_address_tmp); if (session == NULL) { - fclose(ser->f); + ast_iostream_close(ser->stream); ast_atomic_fetchadd_int(&unauth_sessions, -1); goto done; } @@ -6642,20 +6637,10 @@ static void *session_do(void *data) /* here we set TCP_NODELAY on the socket to disable Nagle's algorithm. * This is necessary to prevent delays (caused by buffering) as we * write to the socket in bits and pieces. */ - p = getprotobyname("tcp"); - if (p) { - int arg = 1; - if( setsockopt(ser->fd, p->p_proto, TCP_NODELAY, (char *)&arg, sizeof(arg) ) < 0 ) { - ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY mode: %s\nSome manager actions may be slow to respond.\n", strerror(errno)); - } - } else { - ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY, getprotobyname(\"tcp\") failed\nSome manager actions may be slow to respond.\n"); + if (setsockopt(ast_iostream_get_fd(ser->stream), IPPROTO_TCP, TCP_NODELAY, (char *)&arg, sizeof(arg) ) < 0) { + ast_log(LOG_WARNING, "Failed to set manager tcp connection to TCP_NODELAY mode: %s\nSome manager actions may be slow to respond.\n", strerror(errno)); } - - /* make sure socket is non-blocking */ - flags = fcntl(ser->fd, F_GETFL); - flags |= O_NONBLOCK; - fcntl(ser->fd, F_SETFL, flags); + ast_iostream_nonblock(ser->stream); ao2_lock(session); /* Hook to the tail of the event queue */ @@ -6664,8 +6649,7 @@ static void *session_do(void *data) ast_mutex_init(&s.lock); /* these fields duplicate those in the 'ser' structure */ - session->fd = s.fd = ser->fd; - session->f = s.f = ser->f; + session->stream = s.stream = ser->stream; ast_sockaddr_copy(&session->addr, &ser_remote_address_tmp); s.session = session; @@ -6684,9 +6668,9 @@ static void *session_do(void *data) * We cannot let the stream exclusively wait for data to arrive. * We have to wake up the task to send async events. */ - ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 0); + ast_iostream_set_exclusive_input(ser->stream, 0); - ast_tcptls_stream_set_timeout_sequence(ser->stream_cookie, + ast_iostream_set_timeout_sequence(ser->stream, ast_tvnow(), authtimeout * 1000); astman_append(&s, "Asterisk Call Manager/%s\r\n", AMI_VERSION); /* welcome prompt */ @@ -6695,7 +6679,7 @@ static void *session_do(void *data) break; } if (session->authenticated) { - ast_tcptls_stream_set_timeout_disable(ser->stream_cookie); + ast_iostream_set_timeout_disable(ser->stream); } } /* session is over, explain why and terminate */ @@ -7554,23 +7538,9 @@ static void xml_translate(struct ast_str **out, char *in, struct ast_variable *g static void close_mansession_file(struct mansession *s) { - if (s->f) { - if (fclose(s->f)) { - ast_log(LOG_ERROR, "fclose() failed: %s\n", strerror(errno)); - } - s->f = NULL; - s->fd = -1; - } else if (s->fd != -1) { - /* - * Issuing shutdown() is necessary here to avoid a race - * condition where the last data written may not appear - * in the TCP stream. See ASTERISK-23548 - */ - shutdown(s->fd, SHUT_RDWR); - if (close(s->fd)) { - ast_log(LOG_ERROR, "close() failed: %s\n", strerror(errno)); - } - s->fd = -1; + if (s->stream) { + ast_iostream_close(s->stream); + s->stream = NULL; } else { ast_log(LOG_ERROR, "Attempted to close file/file descriptor on mansession without a valid file or file descriptor.\n"); } @@ -7579,17 +7549,20 @@ static void close_mansession_file(struct mansession *s) static void process_output(struct mansession *s, struct ast_str **out, struct ast_variable *params, enum output_format format) { char *buf; - size_t l; + off_t l; + int fd; - if (!s->f) + if (!s->stream) return; /* Ensure buffer is NULL-terminated */ - fprintf(s->f, "%c", 0); - fflush(s->f); + ast_iostream_write(s->stream, "", 1); + + fd = ast_iostream_get_fd(s->stream); - if ((l = ftell(s->f)) > 0) { - if (MAP_FAILED == (buf = mmap(NULL, l, PROT_READ | PROT_WRITE, MAP_PRIVATE, s->fd, 0))) { + l = lseek(fd, SEEK_CUR, 0); + if (l > 0) { + if (MAP_FAILED == (buf = mmap(NULL, l, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0))) { ast_log(LOG_WARNING, "mmap failed. Manager output was not processed\n"); } else { if (format == FORMAT_XML || format == FORMAT_HTML) { @@ -7616,6 +7589,7 @@ static int generic_http_callback(struct ast_tcptls_session_instance *ser, struct mansession s = { .session = NULL, .tcptls_session = ser }; struct mansession_session *session = NULL; uint32_t ident; + int fd; int blastaway = 0; struct ast_variable *v; struct ast_variable *params = get_params; @@ -7671,17 +7645,17 @@ static int generic_http_callback(struct ast_tcptls_session_instance *ser, } s.session = session; - s.fd = mkstemp(template); /* create a temporary file for command output */ + fd = mkstemp(template); /* create a temporary file for command output */ unlink(template); - if (s.fd <= -1) { + if (fd <= -1) { ast_http_error(ser, 500, "Server Error", "Internal Server Error (mkstemp failed)"); goto generic_callback_out; } - s.f = fdopen(s.fd, "w+"); - if (!s.f) { + s.stream = ast_iostream_from_fd(&fd); + if (!s.stream) { ast_log(LOG_WARNING, "HTTP Manager, fdopen failed: %s!\n", strerror(errno)); ast_http_error(ser, 500, "Server Error", "Internal Server Error (fdopen failed)"); - close(s.fd); + close(fd); goto generic_callback_out; } @@ -7821,9 +7795,9 @@ generic_callback_out: if (blastaway) { session_destroy(session); } else { - if (session->f) { - fclose(session->f); - session->f = NULL; + if (session->stream) { + ast_iostream_close(session->stream); + session->stream = NULL; } unref_mansession(session); } @@ -7848,6 +7822,7 @@ static int auth_http_callback(struct ast_tcptls_session_instance *ser, struct message m = { 0 }; unsigned int idx; size_t hdrlen; + int fd; time_t time_now = time(NULL); unsigned long nonce = 0, nc; @@ -8026,17 +8001,17 @@ static int auth_http_callback(struct ast_tcptls_session_instance *ser, ast_mutex_init(&s.lock); s.session = session; - s.fd = mkstemp(template); /* create a temporary file for command output */ + fd = mkstemp(template); /* create a temporary file for command output */ unlink(template); - if (s.fd <= -1) { + if (fd <= -1) { ast_http_error(ser, 500, "Server Error", "Internal Server Error (mkstemp failed)"); goto auth_callback_out; } - s.f = fdopen(s.fd, "w+"); - if (!s.f) { + s.stream = ast_iostream_from_fd(&fd); + if (!s.stream) { ast_log(LOG_WARNING, "HTTP Manager, fdopen failed: %s!\n", strerror(errno)); ast_http_error(ser, 500, "Server Error", "Internal Server Error (fdopen failed)"); - close(s.fd); + close(fd); goto auth_callback_out; } @@ -8087,7 +8062,7 @@ static int auth_http_callback(struct ast_tcptls_session_instance *ser, m.headers[idx] = NULL; } - result_size = ftell(s.f); /* Calculate approx. size of result */ + result_size = lseek(ast_iostream_get_fd(s.stream), SEEK_CUR, 0); /* Calculate approx. size of result */ http_header = ast_str_create(80); out = ast_str_create(result_size * 2 + 512); @@ -8139,11 +8114,10 @@ auth_callback_out: ast_free(out); ao2_lock(session); - if (session->f) { - fclose(session->f); + if (session->stream) { + ast_iostream_close(session->stream); + session->stream = NULL; } - session->f = NULL; - session->fd = -1; ao2_unlock(session); if (session->needdestroy) { diff --git a/main/tcptls.c b/main/tcptls.c index 046501b..44b609f 100644 --- a/main/tcptls.c +++ b/main/tcptls.c @@ -49,506 +49,13 @@ ASTERISK_REGISTER_FILE() #include "asterisk/astobj2.h" #include "asterisk/pbx.h" -/*! ao2 object used for the FILE stream fopencookie()/funopen() cookie. */ -struct ast_tcptls_stream { - /*! SSL state if not NULL */ - SSL *ssl; - /*! - * \brief Start time from when an I/O sequence must complete - * by struct ast_tcptls_stream.timeout. - * - * \note If struct ast_tcptls_stream.start.tv_sec is zero then - * start time is the current I/O request. - */ - struct timeval start; - /*! - * \brief The socket returned by accept(). - * - * \note Set to -1 if the stream is closed. - */ - int fd; - /*! - * \brief Timeout in ms relative to struct ast_tcptls_stream.start - * to wait for an event on struct ast_tcptls_stream.fd. - * - * \note Set to -1 to disable timeout. - * \note The socket needs to be set to non-blocking for the timeout - * feature to work correctly. - */ - int timeout; - /*! TRUE if stream can exclusively wait for fd input. */ - int exclusive_input; -}; - -void ast_tcptls_stream_set_timeout_disable(struct ast_tcptls_stream *stream) -{ - ast_assert(stream != NULL); - - stream->timeout = -1; -} - -void ast_tcptls_stream_set_timeout_inactivity(struct ast_tcptls_stream *stream, int timeout) -{ - ast_assert(stream != NULL); - - stream->start.tv_sec = 0; - stream->timeout = timeout; -} - -void ast_tcptls_stream_set_timeout_sequence(struct ast_tcptls_stream *stream, struct timeval start, int timeout) -{ - ast_assert(stream != NULL); - - stream->start = start; - stream->timeout = timeout; -} - -void ast_tcptls_stream_set_exclusive_input(struct ast_tcptls_stream *stream, int exclusive_input) -{ - ast_assert(stream != NULL); - - stream->exclusive_input = exclusive_input; -} - -/*! - * \internal - * \brief fopencookie()/funopen() stream read function. - * - * \param cookie Stream control data. - * \param buf Where to put read data. - * \param size Size of the buffer. - * - * \retval number of bytes put into buf. - * \retval 0 on end of file. - * \retval -1 on error. - */ -static HOOK_T tcptls_stream_read(void *cookie, char *buf, LEN_T size) -{ - struct ast_tcptls_stream *stream = cookie; - struct timeval start; - int ms; - int res; - - if (!size) { - /* You asked for no data you got no data. */ - return 0; - } - - if (!stream || stream->fd == -1) { - errno = EBADF; - return -1; - } - - if (stream->start.tv_sec) { - start = stream->start; - } else { - start = ast_tvnow(); - } - -#if defined(DO_SSL) - if (stream->ssl) { - for (;;) { - res = SSL_read(stream->ssl, buf, size); - if (0 < res) { - /* We read some payload data. */ - return res; - } - switch (SSL_get_error(stream->ssl, res)) { - case SSL_ERROR_ZERO_RETURN: - /* Report EOF for a shutdown */ - ast_debug(1, "TLS clean shutdown alert reading data\n"); - return 0; - case SSL_ERROR_WANT_READ: - if (!stream->exclusive_input) { - /* We cannot wait for data now. */ - errno = EAGAIN; - return -1; - } - while ((ms = ast_remaining_ms(start, stream->timeout))) { - res = ast_wait_for_input(stream->fd, ms); - if (0 < res) { - /* Socket is ready to be read. */ - break; - } - if (res < 0) { - if (errno == EINTR || errno == EAGAIN) { - /* Try again. */ - continue; - } - ast_debug(1, "TLS socket error waiting for read data: %s\n", - strerror(errno)); - return -1; - } - } - break; - case SSL_ERROR_WANT_WRITE: - while ((ms = ast_remaining_ms(start, stream->timeout))) { - res = ast_wait_for_output(stream->fd, ms); - if (0 < res) { - /* Socket is ready to be written. */ - break; - } - if (res < 0) { - if (errno == EINTR || errno == EAGAIN) { - /* Try again. */ - continue; - } - ast_debug(1, "TLS socket error waiting for write space: %s\n", - strerror(errno)); - return -1; - } - } - break; - default: - /* Report EOF for an undecoded SSL or transport error. */ - ast_debug(1, "TLS transport or SSL error reading data\n"); - return 0; - } - if (!ms) { - /* Report EOF for a timeout */ - ast_debug(1, "TLS timeout reading data\n"); - return 0; - } - } - } -#endif /* defined(DO_SSL) */ - - for (;;) { - res = read(stream->fd, buf, size); - if (0 <= res || !stream->exclusive_input) { - /* Got data or we cannot wait for it. */ - return res; - } - if (errno != EINTR && errno != EAGAIN) { - /* Not a retryable error. */ - ast_debug(1, "TCP socket error reading data: %s\n", - strerror(errno)); - return -1; - } - ms = ast_remaining_ms(start, stream->timeout); - if (!ms) { - /* Report EOF for a timeout */ - ast_debug(1, "TCP timeout reading data\n"); - return 0; - } - ast_wait_for_input(stream->fd, ms); - } -} - -/*! - * \internal - * \brief fopencookie()/funopen() stream write function. - * - * \param cookie Stream control data. - * \param buf Where to get data to write. - * \param size Size of the buffer. - * - * \retval number of bytes written from buf. - * \retval -1 on error. - */ -static HOOK_T tcptls_stream_write(void *cookie, const char *buf, LEN_T size) -{ - struct ast_tcptls_stream *stream = cookie; - struct timeval start; - int ms; - int res; - int written; - int remaining; - - if (!size) { - /* You asked to write no data you wrote no data. */ - return 0; - } - - if (!stream || stream->fd == -1) { - errno = EBADF; - return -1; - } - - if (stream->start.tv_sec) { - start = stream->start; - } else { - start = ast_tvnow(); - } - -#if defined(DO_SSL) - if (stream->ssl) { - written = 0; - remaining = size; - for (;;) { - res = SSL_write(stream->ssl, buf + written, remaining); - if (res == remaining) { - /* Everything was written. */ - return size; - } - if (0 < res) { - /* Successfully wrote part of the buffer. Try to write the rest. */ - written += res; - remaining -= res; - continue; - } - switch (SSL_get_error(stream->ssl, res)) { - case SSL_ERROR_ZERO_RETURN: - ast_debug(1, "TLS clean shutdown alert writing data\n"); - if (written) { - /* Report partial write. */ - return written; - } - errno = EBADF; - return -1; - case SSL_ERROR_WANT_READ: - ms = ast_remaining_ms(start, stream->timeout); - if (!ms) { - /* Report partial write. */ - ast_debug(1, "TLS timeout writing data (want read)\n"); - return written; - } - ast_wait_for_input(stream->fd, ms); - break; - case SSL_ERROR_WANT_WRITE: - ms = ast_remaining_ms(start, stream->timeout); - if (!ms) { - /* Report partial write. */ - ast_debug(1, "TLS timeout writing data (want write)\n"); - return written; - } - ast_wait_for_output(stream->fd, ms); - break; - default: - /* Undecoded SSL or transport error. */ - ast_debug(1, "TLS transport or SSL error writing data\n"); - if (written) { - /* Report partial write. */ - return written; - } - errno = EBADF; - return -1; - } - } - } -#endif /* defined(DO_SSL) */ - - written = 0; - remaining = size; - for (;;) { - res = write(stream->fd, buf + written, remaining); - if (res == remaining) { - /* Yay everything was written. */ - return size; - } - if (0 < res) { - /* Successfully wrote part of the buffer. Try to write the rest. */ - written += res; - remaining -= res; - continue; - } - if (errno != EINTR && errno != EAGAIN) { - /* Not a retryable error. */ - ast_debug(1, "TCP socket error writing: %s\n", strerror(errno)); - if (written) { - return written; - } - return -1; - } - ms = ast_remaining_ms(start, stream->timeout); - if (!ms) { - /* Report partial write. */ - ast_debug(1, "TCP timeout writing data\n"); - return written; - } - ast_wait_for_output(stream->fd, ms); - } -} - -/*! - * \internal - * \brief fopencookie()/funopen() stream close function. - * - * \param cookie Stream control data. - * - * \retval 0 on success. - * \retval -1 on error. - */ -static int tcptls_stream_close(void *cookie) -{ - struct ast_tcptls_stream *stream = cookie; - - if (!stream) { - errno = EBADF; - return -1; - } - - if (stream->fd != -1) { -#if defined(DO_SSL) - if (stream->ssl) { - int res; - - /* - * According to the TLS standard, it is acceptable for an - * application to only send its shutdown alert and then - * close the underlying connection without waiting for - * the peer's response (this way resources can be saved, - * as the process can already terminate or serve another - * connection). - */ - res = SSL_shutdown(stream->ssl); - if (res < 0) { - ast_log(LOG_ERROR, "SSL_shutdown() failed: %d\n", - SSL_get_error(stream->ssl, res)); - } - - if (!stream->ssl->server) { - /* For client threads, ensure that the error stack is cleared */ -#if OPENSSL_VERSION_NUMBER >= 0x10000000L - ERR_remove_thread_state(NULL); -#else - ERR_remove_state(0); -#endif /* OPENSSL_VERSION_NUMBER >= 0x10000000L */ - } - - SSL_free(stream->ssl); - stream->ssl = NULL; - } -#endif /* defined(DO_SSL) */ - - /* - * Issuing shutdown() is necessary here to avoid a race - * condition where the last data written may not appear - * in the TCP stream. See ASTERISK-23548 - */ - shutdown(stream->fd, SHUT_RDWR); - if (close(stream->fd)) { - ast_log(LOG_ERROR, "close() failed: %s\n", strerror(errno)); - } - stream->fd = -1; - } - ao2_t_ref(stream, -1, "Closed tcptls stream cookie"); - - return 0; -} - -/*! - * \internal - * \brief fopencookie()/funopen() stream destructor function. - * - * \param cookie Stream control data. - * - * \return Nothing - */ -static void tcptls_stream_dtor(void *cookie) -{ -#ifdef AST_DEVMODE - /* Since the ast_assert below is the only one using stream, - * and ast_assert is only available with AST_DEVMODE, we - * put this in a conditional to avoid compiler warnings. */ - struct ast_tcptls_stream *stream = cookie; -#endif - - ast_assert(stream->fd == -1); -} - -/*! - * \internal - * \brief fopencookie()/funopen() stream allocation function. - * - * \retval stream_cookie on success. - * \retval NULL on error. - */ -static struct ast_tcptls_stream *tcptls_stream_alloc(void) -{ - struct ast_tcptls_stream *stream; - - stream = ao2_alloc_options(sizeof(*stream), tcptls_stream_dtor, - AO2_ALLOC_OPT_LOCK_NOLOCK); - if (stream) { - stream->fd = -1; - stream->timeout = -1; - } - return stream; -} - -/*! - * \internal - * \brief Open a custom FILE stream for tcptls. - * - * \param stream Stream cookie control data. - * \param ssl SSL state if not NULL. - * \param fd Socket file descriptor. - * \param timeout ms to wait for an event on fd. -1 if timeout disabled. - * - * \retval fp on success. - * \retval NULL on error. - */ -static FILE *tcptls_stream_fopen(struct ast_tcptls_stream *stream, SSL *ssl, int fd, int timeout) -{ - FILE *fp; - -#if defined(HAVE_FOPENCOOKIE) /* the glibc/linux interface */ - static const cookie_io_functions_t cookie_funcs = { - tcptls_stream_read, - tcptls_stream_write, - NULL, - tcptls_stream_close - }; -#endif /* defined(HAVE_FOPENCOOKIE) */ - - if (fd == -1) { - /* Socket not open. */ - return NULL; - } - - stream->ssl = ssl; - stream->fd = fd; - stream->timeout = timeout; - ao2_t_ref(stream, +1, "Opening tcptls stream cookie"); - -#if defined(HAVE_FUNOPEN) /* the BSD interface */ - fp = funopen(stream, tcptls_stream_read, tcptls_stream_write, NULL, - tcptls_stream_close); -#elif defined(HAVE_FOPENCOOKIE) /* the glibc/linux interface */ - fp = fopencookie(stream, "w+", cookie_funcs); -#else - /* could add other methods here */ - ast_debug(2, "No stream FILE methods attempted!\n"); - fp = NULL; -#endif - - if (!fp) { - stream->fd = -1; - ao2_t_ref(stream, -1, "Failed to open tcptls stream cookie"); - } - return fp; -} - -HOOK_T ast_tcptls_server_read(struct ast_tcptls_session_instance *tcptls_session, void *buf, size_t count) -{ - if (!tcptls_session->stream_cookie || tcptls_session->stream_cookie->fd == -1) { - ast_log(LOG_ERROR, "TCP/TLS read called on invalid stream.\n"); - errno = EIO; - return -1; - } - - return tcptls_stream_read(tcptls_session->stream_cookie, buf, count); -} - -HOOK_T ast_tcptls_server_write(struct ast_tcptls_session_instance *tcptls_session, const void *buf, size_t count) -{ - if (!tcptls_session->stream_cookie || tcptls_session->stream_cookie->fd == -1) { - ast_log(LOG_ERROR, "TCP/TLS write called on invalid stream.\n"); - errno = EIO; - return -1; - } - - return tcptls_stream_write(tcptls_session->stream_cookie, buf, count); -} - static void session_instance_destructor(void *obj) { struct ast_tcptls_session_instance *i = obj; - if (i->stream_cookie) { - ao2_t_ref(i->stream_cookie, -1, "Destroying tcptls session instance"); - i->stream_cookie = NULL; + if (i->stream) { + ast_iostream_close(i->stream); + i->stream = NULL; } ast_free(i->overflow_buf); ao2_cleanup(i->private_data); @@ -593,9 +100,7 @@ static void *handle_tcptls_connection(void *data) { struct ast_tcptls_session_instance *tcptls_session = data; #ifdef DO_SSL - int (*ssl_setup)(SSL *) = (tcptls_session->client) ? SSL_connect : SSL_accept; - int ret; - char err[256]; + SSL *ssl; #endif /* TCP/TLS connections are associated with external protocols, and @@ -610,123 +115,94 @@ static void *handle_tcptls_connection(void *data) return NULL; } - tcptls_session->stream_cookie = tcptls_stream_alloc(); - if (!tcptls_session->stream_cookie) { - ast_tcptls_close_session_file(tcptls_session); - ao2_ref(tcptls_session, -1); - return NULL; - } + if (tcptls_session->parent->tls_cfg) { +#ifdef DO_SSL + if (ast_iostream_start_tls(&tcptls_session->stream, tcptls_session->parent->tls_cfg->ssl_ctx, tcptls_session->client) < 0) { + ast_tcptls_close_session_file(tcptls_session); + ao2_ref(tcptls_session, -1); + return NULL; + } - /* - * open a FILE * as appropriate. - */ - if (!tcptls_session->parent->tls_cfg) { - tcptls_session->f = tcptls_stream_fopen(tcptls_session->stream_cookie, NULL, - tcptls_session->fd, -1); - if (tcptls_session->f) { - if (setvbuf(tcptls_session->f, NULL, _IONBF, 0)) { + ssl = ast_iostream_get_ssl(tcptls_session->stream); + if ((tcptls_session->client && !ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_DONT_VERIFY_SERVER)) + || (!tcptls_session->client && ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_VERIFY_CLIENT))) { + X509 *peer; + long res; + peer = SSL_get_peer_certificate(ssl); + if (!peer) { + ast_log(LOG_ERROR, "No peer SSL certificate to verify\n"); ast_tcptls_close_session_file(tcptls_session); + ao2_ref(tcptls_session, -1); + return NULL; } - } - } -#ifdef DO_SSL - else if ( (tcptls_session->ssl = SSL_new(tcptls_session->parent->tls_cfg->ssl_ctx)) ) { - SSL_set_fd(tcptls_session->ssl, tcptls_session->fd); - if ((ret = ssl_setup(tcptls_session->ssl)) <= 0) { - ast_log(LOG_ERROR, "Problem setting up ssl connection: %s\n", ERR_error_string(ERR_get_error(), err)); - } else if ((tcptls_session->f = tcptls_stream_fopen(tcptls_session->stream_cookie, - tcptls_session->ssl, tcptls_session->fd, -1))) { - if ((tcptls_session->client && !ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_DONT_VERIFY_SERVER)) - || (!tcptls_session->client && ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_VERIFY_CLIENT))) { - X509 *peer; - long res; - peer = SSL_get_peer_certificate(tcptls_session->ssl); - if (!peer) { - ast_log(LOG_ERROR, "No peer SSL certificate to verify\n"); - ast_tcptls_close_session_file(tcptls_session); - ao2_ref(tcptls_session, -1); - return NULL; - } - res = SSL_get_verify_result(tcptls_session->ssl); - if (res != X509_V_OK) { - ast_log(LOG_ERROR, "Certificate did not verify: %s\n", X509_verify_cert_error_string(res)); - X509_free(peer); - ast_tcptls_close_session_file(tcptls_session); - ao2_ref(tcptls_session, -1); - return NULL; - } - if (!ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_IGNORE_COMMON_NAME)) { - ASN1_STRING *str; - X509_NAME *name = X509_get_subject_name(peer); - STACK_OF(GENERAL_NAME) *alt_names; - int pos = -1; - int found = 0; - - for (;;) { - /* Walk the certificate to check all available "Common Name" */ - /* XXX Probably should do a gethostbyname on the hostname and compare that as well */ - pos = X509_NAME_get_index_by_NID(name, NID_commonName, pos); - if (pos < 0) { - break; - } - - str = X509_NAME_ENTRY_get_data(X509_NAME_get_entry(name, pos)); - if (!check_tcptls_cert_name(str, tcptls_session->parent->hostname, "common name")) { - found = 1; - break; - } + res = SSL_get_verify_result(ssl); + if (res != X509_V_OK) { + ast_log(LOG_ERROR, "Certificate did not verify: %s\n", X509_verify_cert_error_string(res)); + X509_free(peer); + ast_tcptls_close_session_file(tcptls_session); + ao2_ref(tcptls_session, -1); + return NULL; + } + if (!ast_test_flag(&tcptls_session->parent->tls_cfg->flags, AST_SSL_IGNORE_COMMON_NAME)) { + ASN1_STRING *str; + X509_NAME *name = X509_get_subject_name(peer); + STACK_OF(GENERAL_NAME) *alt_names; + int pos = -1; + int found = 0; + + for (;;) { + /* Walk the certificate to check all available "Common Name" */ + /* XXX Probably should do a gethostbyname on the hostname and compare that as well */ + pos = X509_NAME_get_index_by_NID(name, NID_commonName, pos); + if (pos < 0) { + break; } + str = X509_NAME_ENTRY_get_data(X509_NAME_get_entry(name, pos)); + if (!check_tcptls_cert_name(str, tcptls_session->parent->hostname, "common name")) { + found = 1; + break; + } + } - if (!found) { - alt_names = X509_get_ext_d2i(peer, NID_subject_alt_name, NULL, NULL); - if (alt_names != NULL) { - int alt_names_count = sk_GENERAL_NAME_num(alt_names); - - for (pos = 0; pos < alt_names_count; pos++) { - const GENERAL_NAME *alt_name = sk_GENERAL_NAME_value(alt_names, pos); + if (!found) { + alt_names = X509_get_ext_d2i(peer, NID_subject_alt_name, NULL, NULL); + if (alt_names != NULL) { + int alt_names_count = sk_GENERAL_NAME_num(alt_names); - if (alt_name->type != GEN_DNS) { - continue; - } + for (pos = 0; pos < alt_names_count; pos++) { + const GENERAL_NAME *alt_name = sk_GENERAL_NAME_value(alt_names, pos); - if (!check_tcptls_cert_name(alt_name->d.dNSName, tcptls_session->parent->hostname, "alt name")) { - found = 1; - break; - } + if (alt_name->type != GEN_DNS) { + continue; } - sk_GENERAL_NAME_pop_free(alt_names, GENERAL_NAME_free); + if (!check_tcptls_cert_name(alt_name->d.dNSName, tcptls_session->parent->hostname, "alt name")) { + found = 1; + break; + } } - } - if (!found) { - ast_log(LOG_ERROR, "Certificate common name did not match (%s)\n", tcptls_session->parent->hostname); - X509_free(peer); - ast_tcptls_close_session_file(tcptls_session); - ao2_ref(tcptls_session, -1); - return NULL; + sk_GENERAL_NAME_pop_free(alt_names, GENERAL_NAME_free); } } - X509_free(peer); + + if (!found) { + ast_log(LOG_ERROR, "Certificate common name did not match (%s)\n", tcptls_session->parent->hostname); + X509_free(peer); + ast_tcptls_close_session_file(tcptls_session); + ao2_ref(tcptls_session, -1); + return NULL; + } } + X509_free(peer); } - if (!tcptls_session->f) { /* no success opening descriptor stacking */ - SSL_free(tcptls_session->ssl); - } - } -#endif /* DO_SSL */ - - if (!tcptls_session->f) { +#else + ast_log(LOG_ERROR, "Attempted a TLS connection without OpenSSL support. This will not work!\n"); ast_tcptls_close_session_file(tcptls_session); - ast_log(LOG_WARNING, "FILE * open failed!\n"); -#ifndef DO_SSL - if (tcptls_session->parent->tls_cfg) { - ast_log(LOG_ERROR, "Attempted a TLS connection without OpenSSL support. This will not work!\n"); - } -#endif ao2_ref(tcptls_session, -1); return NULL; +#endif /* DO_SSL */ } if (tcptls_session->parent->worker_fn) { @@ -774,7 +250,13 @@ void *ast_tcptls_server_root(void *data) tcptls_session->overflow_buf = ast_str_create(128); flags = fcntl(fd, F_GETFL); fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); - tcptls_session->fd = fd; + + tcptls_session->stream = ast_iostream_from_fd(&fd); + if (!tcptls_session->stream) { + ast_log(LOG_WARNING, "No memory for new session iostream\n"); + continue; + } + tcptls_session->parent = desc; ast_sockaddr_copy(&tcptls_session->remote_address, &addr); @@ -1038,7 +520,7 @@ client_start_error: struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_session_args *desc) { - int x = 1; + int fd, x = 1; struct ast_tcptls_session_instance *tcptls_session = NULL; /* Do nothing if nothing has changed */ @@ -1054,8 +536,8 @@ struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_s close(desc->accept_fd); } - desc->accept_fd = socket(ast_sockaddr_is_ipv6(&desc->remote_address) ? - AF_INET6 : AF_INET, SOCK_STREAM, IPPROTO_TCP); + fd = desc->accept_fd = socket(ast_sockaddr_is_ipv6(&desc->remote_address) ? + AF_INET6 : AF_INET, SOCK_STREAM, IPPROTO_TCP); if (desc->accept_fd < 0) { ast_log(LOG_ERROR, "Unable to allocate socket for %s: %s\n", desc->name, strerror(errno)); @@ -1081,7 +563,11 @@ struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_s tcptls_session->overflow_buf = ast_str_create(128); tcptls_session->client = 1; - tcptls_session->fd = desc->accept_fd; + tcptls_session->stream = ast_iostream_from_fd(&fd); + if (!tcptls_session->stream) { + goto error; + } + tcptls_session->parent = desc; tcptls_session->parent->worker_fn = NULL; ast_sockaddr_copy(&tcptls_session->remote_address, @@ -1172,24 +658,9 @@ error: void ast_tcptls_close_session_file(struct ast_tcptls_session_instance *tcptls_session) { - if (tcptls_session->f) { - fflush(tcptls_session->f); - if (fclose(tcptls_session->f)) { - ast_log(LOG_ERROR, "fclose() failed: %s\n", strerror(errno)); - } - tcptls_session->f = NULL; - tcptls_session->fd = -1; - } else if (tcptls_session->fd != -1) { - /* - * Issuing shutdown() is necessary here to avoid a race - * condition where the last data written may not appear - * in the TCP stream. See ASTERISK-23548 - */ - shutdown(tcptls_session->fd, SHUT_RDWR); - if (close(tcptls_session->fd)) { - ast_log(LOG_ERROR, "close() failed: %s\n", strerror(errno)); - } - tcptls_session->fd = -1; + if (tcptls_session->stream) { + ast_iostream_close(tcptls_session->stream); + tcptls_session->stream = NULL; } else { ast_log(LOG_ERROR, "ast_tcptls_close_session_file invoked on session instance without file or file descriptor\n"); } diff --git a/main/utils.c b/main/utils.c index 3dfc02c..f406dbe 100644 --- a/main/utils.c +++ b/main/utils.c @@ -1429,68 +1429,6 @@ int ast_carefulwrite(int fd, char *s, int len, int timeoutms) return res; } -int ast_careful_fwrite(FILE *f, int fd, const char *src, size_t len, int timeoutms) -{ - struct timeval start = ast_tvnow(); - int n = 0; - int elapsed = 0; - - while (len) { - if (wait_for_output(fd, timeoutms - elapsed)) { - /* poll returned a fatal error, so bail out immediately. */ - return -1; - } - - /* Clear any errors from a previous write */ - clearerr(f); - - n = fwrite(src, 1, len, f); - - if (ferror(f) && errno != EINTR && errno != EAGAIN) { - /* fatal error from fwrite() */ - if (!feof(f)) { - /* Don't spam the logs if it was just that the connection is closed. */ - ast_log(LOG_ERROR, "fwrite() returned error: %s\n", strerror(errno)); - } - n = -1; - break; - } - - /* Update for data already written to the socket */ - len -= n; - src += n; - - elapsed = ast_tvdiff_ms(ast_tvnow(), start); - if (elapsed >= timeoutms) { - /* We've taken too long to write - * This is only an error condition if we haven't finished writing. */ - n = len ? -1 : 0; - break; - } - } - - errno = 0; - while (fflush(f)) { - if (errno == EAGAIN || errno == EINTR) { - /* fflush() does not appear to reset errno if it flushes - * and reaches EOF at the same time. It returns EOF with - * the last seen value of errno, causing a possible loop. - * Also usleep() to reduce CPU eating if it does loop */ - errno = 0; - usleep(1); - continue; - } - if (errno && !feof(f)) { - /* Don't spam the logs if it was just that the connection is closed. */ - ast_log(LOG_ERROR, "fflush() returned error: %s\n", strerror(errno)); - } - n = -1; - break; - } - - return n < 0 ? -1 : 0; -} - char *ast_strip_quoted(char *s, const char *beg_quotes, const char *end_quotes) { char *e; diff --git a/res/res_http_post.c b/res/res_http_post.c index 37fc4fa..907ee56 100644 --- a/res/res_http_post.c +++ b/res/res_http_post.c @@ -213,7 +213,7 @@ static int find_sequence(char * inbuf, int inlen, char * matchbuf, int matchlen) * This function has two modes. The first to find a boundary marker. The * second is to find the filename immediately after the boundary. */ -static int readmimefile(FILE *fin, FILE *fout, char *boundary, int contentlen) +static int readmimefile(struct ast_iostream *in, FILE *fout, char *boundary, int contentlen) { int find_filename = 0; char buf[4096]; @@ -224,7 +224,7 @@ static int readmimefile(FILE *fin, FILE *fout, char *boundary, int contentlen) int boundary_len; char * path_end, * path_start, * filespec; - if (NULL == fin || NULL == fout || NULL == boundary || 0 >= contentlen) { + if (NULL == in || NULL == fout || NULL == boundary || 0 >= contentlen) { return -1; } @@ -238,8 +238,8 @@ static int readmimefile(FILE *fin, FILE *fout, char *boundary, int contentlen) } if (0 < num_to_read) { - if (fread(&(buf[char_in_buf]), 1, num_to_read, fin) < num_to_read) { - ast_log(LOG_WARNING, "fread() failed: %s\n", strerror(errno)); + if (ast_iostream_read(in, &(buf[char_in_buf]), num_to_read) < num_to_read) { + ast_log(LOG_WARNING, "read failed: %s\n", strerror(errno)); num_to_read = 0; } contentlen -= num_to_read; @@ -380,7 +380,7 @@ static int http_post_callback(struct ast_tcptls_session_instance *ser, const str */ ast_http_body_read_status(ser, 0); - if (0 > readmimefile(ser->f, f, boundary_marker, content_len)) { + if (0 > readmimefile(ser->stream, f, boundary_marker, content_len)) { ast_debug(1, "Cannot find boundary marker in POST request.\n"); fclose(f); ast_http_error(ser, 400, "Bad Request", "Cannot find boundary marker in POST request."); diff --git a/res/res_http_websocket.c b/res/res_http_websocket.c index 8476e26..ce6430f 100644 --- a/res/res_http_websocket.c +++ b/res/res_http_websocket.c @@ -88,8 +88,7 @@ ASTERISK_REGISTER_FILE() /*! \brief Structure definition for session */ struct ast_websocket { - FILE *f; /*!< Pointer to the file instance used for writing and reading */ - int fd; /*!< File descriptor for the session, only used for polling */ + struct ast_iostream *stream; /*!< iostream of the connection */ struct ast_sockaddr address; /*!< Address of the remote client */ enum ast_websocket_opcode opcode; /*!< Cached opcode for multi-frame messages */ size_t payload_len; /*!< Length of the payload */ @@ -180,10 +179,11 @@ static void session_destroy_fn(void *obj) { struct ast_websocket *session = obj; - if (session->f) { + if (session->stream) { ast_websocket_close(session, 0); - if (session->f) { - fclose(session->f); + if (session->stream) { + ast_iostream_close(session->stream); + session->stream = NULL; ast_verb(2, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from", ast_sockaddr_stringify(&session->address)); } @@ -309,20 +309,22 @@ int AST_OPTIONAL_API_NAME(ast_websocket_close)(struct ast_websocket *session, ui session->close_sent = 1; ao2_lock(session); - res = ast_careful_fwrite(session->f, session->fd, frame, 4, session->timeout); + ast_iostream_set_timeout_inactivity(session->stream, session->timeout); + res = ast_iostream_write(session->stream, frame, sizeof(frame)); + ast_iostream_set_timeout_disable(session->stream); /* If an error occurred when trying to close this connection explicitly terminate it now. * Doing so will cause the thread polling on it to wake up and terminate. */ - if (res) { - fclose(session->f); - session->f = NULL; + if (res != sizeof(frame)) { + ast_iostream_close(session->stream); + session->stream = NULL; ast_verb(2, "WebSocket connection %s '%s' forcefully closed due to fatal write error\n", session->client ? "to" : "from", ast_sockaddr_stringify(&session->address)); } ao2_unlock(session); - return res; + return res == sizeof(frame); } static const char *opcode_map[] = { @@ -390,7 +392,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en return -1; } - if (ast_careful_fwrite(session->f, session->fd, frame, frame_size, session->timeout)) { + ast_iostream_set_timeout_sequence(session->stream, ast_tvnow(), session->timeout); + if (ast_iostream_write(session->stream, frame, frame_size) != frame_size) { ao2_unlock(session); /* 1011 - server terminating connection due to not being able to fulfill the request */ ast_debug(1, "Closing WS with 1011 because we can't fulfill a write request\n"); @@ -398,7 +401,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write)(struct ast_websocket *session, en return -1; } - fflush(session->f); + ast_iostream_set_timeout_disable(session->stream); ao2_unlock(session); return 0; @@ -426,7 +429,7 @@ void AST_OPTIONAL_API_NAME(ast_websocket_unref)(struct ast_websocket *session) int AST_OPTIONAL_API_NAME(ast_websocket_fd)(struct ast_websocket *session) { - return session->closing ? -1 : session->fd; + return session->closing ? -1 : ast_iostream_get_fd(session->stream); } struct ast_sockaddr * AST_OPTIONAL_API_NAME(ast_websocket_remote_address)(struct ast_websocket *session) @@ -441,18 +444,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_is_secure)(struct ast_websocket *session int AST_OPTIONAL_API_NAME(ast_websocket_set_nonblock)(struct ast_websocket *session) { - int flags; - - if ((flags = fcntl(session->fd, F_GETFL)) == -1) { - return -1; - } - - flags |= O_NONBLOCK; - - if ((flags = fcntl(session->fd, F_SETFL, flags)) == -1) { - return -1; - } - + ast_iostream_nonblock(session->stream); + ast_iostream_set_exclusive_input(session->stream, 0); return 0; } @@ -505,17 +498,16 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len int sanity = 10; ao2_lock(session); - if (!session->f) { + if (!session->stream) { ao2_unlock(session); errno = ECONNABORTED; return -1; } for (;;) { - clearerr(session->f); - rlen = fread(rbuf, 1, xlen, session->f); - if (!rlen) { - if (feof(session->f)) { + rlen = ast_iostream_read(session->stream, rbuf, xlen); + if (rlen != xlen) { + if (rlen == 0) { ast_log(LOG_WARNING, "Web socket closed abruptly\n"); *opcode = AST_WEBSOCKET_OPCODE_CLOSE; session->closing = 1; @@ -523,7 +515,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len return -1; } - if (ferror(session->f) && errno != EAGAIN) { + if (rlen < 0 && errno != EAGAIN) { ast_log(LOG_ERROR, "Error reading from web socket: %s\n", strerror(errno)); *opcode = AST_WEBSOCKET_OPCODE_CLOSE; session->closing = 1; @@ -544,7 +536,7 @@ static inline int ws_safe_read(struct ast_websocket *session, char *buf, int len if (!xlen) { break; } - if (ast_wait_for_input(session->fd, 1000) < 0) { + if (ast_wait_for_input(ast_iostream_get_fd(session->stream), 1000) < 0) { ast_log(LOG_ERROR, "ast_wait_for_input returned err: %s\n", strerror(errno)); *opcode = AST_WEBSOCKET_OPCODE_CLOSE; session->closing = 1; @@ -839,7 +831,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan ao2_ref(protocol_handler, -1); return 0; } - session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT; + session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT; /* Generate the session id */ if (!ast_uuid_generate_str(session->session_id, sizeof(session->session_id))) { @@ -869,7 +861,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan * Connection_. */ if (protocol) { - fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n" + ast_iostream_printf(ser->stream, + "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: %s\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: %s\r\n" @@ -878,15 +871,14 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan websocket_combine_key(key, base64, sizeof(base64)), protocol); } else { - fprintf(ser->f, "HTTP/1.1 101 Switching Protocols\r\n" + ast_iostream_printf(ser->stream, + "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: %s\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: %s\r\n\r\n", upgrade, websocket_combine_key(key, base64, sizeof(base64))); } - - fflush(ser->f); } else { /* Specification defined in http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75 or completely unknown */ @@ -898,7 +890,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan } /* Enable keepalive on all sessions so the underlying user does not have to */ - if (setsockopt(ser->fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) { + if (setsockopt(ast_iostream_get_fd(ser->stream), SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags))) { ast_log(LOG_WARNING, "WebSocket connection from '%s' could not be accepted - failed to enable keepalive\n", ast_sockaddr_stringify(&ser->remote_address)); websocket_bad_request(ser); @@ -910,25 +902,23 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan ast_verb(2, "WebSocket connection from '%s' for protocol '%s' accepted using version '%d'\n", ast_sockaddr_stringify(&ser->remote_address), protocol ? : "", version); /* Populate the session with all the needed details */ - session->f = ser->f; - session->fd = ser->fd; + session->stream = ser->stream; ast_sockaddr_copy(&session->address, &ser->remote_address); session->opcode = -1; session->reconstruct = DEFAULT_RECONSTRUCTION_CEILING; - session->secure = ser->ssl ? 1 : 0; + session->secure = ast_iostream_get_ssl(ser->stream) ? 1 : 0; /* Give up ownership of the socket and pass it to the protocol handler */ - ast_tcptls_stream_set_exclusive_input(ser->stream_cookie, 0); + ast_iostream_set_exclusive_input(session->stream, 0); protocol_handler->session_established(session, get_vars, headers); ao2_ref(protocol_handler, -1); /* - * By dropping the FILE* and fd from the session the connection + * By dropping the stream from the session the connection * won't get closed when the HTTP server cleans up because we * passed the connection to the protocol handler. */ - ser->f = NULL; - ser->fd = -1; + ser->stream = NULL; return 0; } @@ -1262,7 +1252,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response( int has_accept = 0; int has_protocol = 0; - if (!fgets(buf, sizeof(buf), client->ser->f)) { + if (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) <= 0) { ast_log(LOG_ERROR, "Unable to retrieve HTTP status line."); return WS_BAD_STATUS; } @@ -1275,7 +1265,7 @@ static enum ast_websocket_result websocket_client_handshake_get_response( /* Ignoring line folding - assuming header field values are contained within a single line */ - while (fgets(buf, sizeof(buf), client->ser->f)) { + while (ast_iostream_gets(client->ser->stream, buf, sizeof(buf)) > 0) { char *name, *value; int parsed = ast_http_header_parse(buf, &name, &value); @@ -1328,19 +1318,19 @@ static enum ast_websocket_result websocket_client_handshake( client->protocols); } - if (fprintf(client->ser->f, - "GET /%s HTTP/1.1\r\n" - "Sec-WebSocket-Version: %d\r\n" - "Upgrade: websocket\r\n" - "Connection: Upgrade\r\n" - "Host: %s\r\n" - "Sec-WebSocket-Key: %s\r\n" - "%s\r\n", - client->resource_name ? ast_str_buffer(client->resource_name) : "", - client->version, - client->host, - client->key, - protocols) < 0) { + if (ast_iostream_printf(client->ser->stream, + "GET /%s HTTP/1.1\r\n" + "Sec-WebSocket-Version: %d\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Host: %s\r\n" + "Sec-WebSocket-Key: %s\r\n" + "%s\r\n", + client->resource_name ? ast_str_buffer(client->resource_name) : "", + client->version, + client->host, + client->key, + protocols) < 0) { ast_log(LOG_ERROR, "Failed to send handshake.\n"); return WS_WRITE_ERROR; } @@ -1364,9 +1354,9 @@ static enum ast_websocket_result websocket_client_connect(struct ast_websocket * return res; } - ws->f = ws->client->ser->f; - ws->fd = ws->client->ser->fd; - ws->secure = ws->client->ser->ssl ? 1 : 0; + ws->stream = ws->client->ser->stream; + ws->secure = ast_iostream_get_ssl(ws->stream) ? 1 : 0; + ws->client->ser->stream = NULL; ast_sockaddr_copy(&ws->address, &ws->client->ser->remote_address); return WS_OK; } diff --git a/res/res_phoneprov.c b/res/res_phoneprov.c index 2e4f873..1b77b9f 100644 --- a/res/res_phoneprov.c +++ b/res/res_phoneprov.c @@ -950,7 +950,7 @@ static int phoneprov_callback(struct ast_tcptls_session_instance *ser, const str socklen_t namelen = sizeof(name.sa); int res; - if ((res = getsockname(ser->fd, &name.sa, &namelen))) { + if ((res = getsockname(ast_iostream_get_fd(ser->stream), &name.sa, &namelen))) { ast_log(LOG_WARNING, "Could not get server IP, breakage likely.\n"); } else { struct extension *exten_iter; -- 2.10.2