diff options
Diffstat (limited to 'src/io-unix.c')
-rw-r--r-- | src/io-unix.c | 310 |
1 files changed, 276 insertions, 34 deletions
diff --git a/src/io-unix.c b/src/io-unix.c index d333122..cc1bd1b 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -10,46 +10,82 @@ * See http://www.gnu.org/ for details. */ -int tf_open(struct tf_fd *fd, const char *pathname, int flags) +#include <string.h> + +static inline int tf_sockaddr_len(const struct tf_sockaddr *addr) { - int kfd, r; + if (addr == NULL) + return 0; + switch (addr->u.addr.sa_family) { + case AF_INET: + return sizeof(struct sockaddr_in); + case AF_INET6: + return sizeof(struct sockaddr_in6); + default: + TF_BUG_ON(1); + } +} - kfd = open(pathname, flags | O_CLOEXEC | O_NONBLOCK); - if (unlikely(kfd < 0)) - return -errno; +int tf_open_fd(struct tf_fd *fd, int kfd, int flags) +{ + int r; + + fd->fd = kfd; + fd->flags = flags; + fd->waiting_fiber = NULL; - r = tf_fd_init(fd, kfd, TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED); + r = tf_fd_created(fd); if (r < 0) { - close(kfd); + if (flags & TF_FD_AUTOCLOSE) + close(kfd); + fd->fd = -1; return r; } + + if (flags & TF_FD_SET_CLOEXEC) + fcntl(kfd, F_SETFD, FD_CLOEXEC); + + if (!(flags & TF_FD_ALREADY_NONBLOCKING)) { + r = fcntl(kfd, F_GETFL, 0); + if (r & O_NONBLOCK) + fd->flags |= TF_FD_ALREADY_NONBLOCKING; + else + fcntl(kfd, F_SETFL, O_NONBLOCK | r); + } + return 0; } -int tf_open_fd(struct tf_fd *fd, int kfd) +int tf_open(struct tf_fd *fd, const char *pathname, int flags) { - int mode, flags = 0; + int kfd, tfdf; - mode = fcntl(kfd, F_GETFL, 0); - if (!(mode & O_NONBLOCK)) { - fcntl(fd->fd, F_SETFL, mode | O_NONBLOCK); - flags |= TF_FD_RESTORE_BLOCKING; - } + tfdf = TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED + | TF_FD_ALREADY_NONBLOCKING; +#if defined(O_CLOEXEC) + flags |= O_CLOEXEC; +#else + tfdf |= TF_FD_SET_CLOEXEC; +#endif + + kfd = open(pathname, flags | O_NONBLOCK); + if (unlikely(kfd < 0)) + return -errno; - return tf_fd_init(fd, kfd, TF_FD_STREAM_ORIENTED | flags); + return tf_open_fd(fd, kfd, tfdf); } int tf_close(struct tf_fd *fd) { int r; - if (fd->flags & TF_FD_RESTORE_BLOCKING) { - fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK); - } + tf_fd_destroyed(fd); if (fd->flags & TF_FD_AUTOCLOSE) { r = close(fd->fd); if (unlikely(r == -1)) return -errno; + } else if (!(fd->flags & TF_FD_ALREADY_NONBLOCKING)) { + fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK); } return 0; } @@ -57,10 +93,9 @@ int tf_close(struct tf_fd *fd) int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) { ssize_t n; - int r, mode; + int r; - mode = tf_schedule_timeout(timeout); - tf_fd_wait(fd, EPOLLIN); + tf_fd_monitor(fd, EPOLLIN); do { n = read(fd->fd, buf, count); if (n == count) { @@ -84,11 +119,10 @@ int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) continue; } - r = tf_schedule(mode); - if (r != TF_WAKEUP_FD) - break; - } while (1); - tf_fd_release(fd); + r = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (r == TF_WAKEUP_FD); + tf_fd_unmonitor(fd); return -r; } @@ -97,10 +131,9 @@ int tf_write(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout) { ssize_t n; - int r, mode; + int r; - mode = tf_schedule_timeout(timeout); - tf_fd_wait(fd, EPOLLOUT); + tf_fd_monitor(fd, EPOLLOUT); do { n = write(fd->fd, buf, count); if (n == count) { @@ -121,11 +154,220 @@ int tf_write(struct tf_fd *fd, const void *buf, size_t count, continue; } - r = tf_schedule(mode); - if (r != TF_WAKEUP_FD) - break; - } while (1); - tf_fd_release(fd); + r = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (r == TF_WAKEUP_FD); + tf_fd_unmonitor(fd); return -r; } + +int tf_socket(struct tf_fd *fd, int domain, int type, int protocol) +{ + int kfd, tfdf, on = 1; + + tfdf = TF_FD_AUTOCLOSE; +#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC) + type |= SOCK_NONBLOCK | SOCK_CLOEXEC; + tfdf |= TF_FD_ALREADY_NONBLOCKING; +#else + tfdf |= TF_FD_SET_CLOEXEC; +#endif + kfd = socket(domain, type, protocol); + if (unlikely(kfd < 0)) + return -errno; + + if (protocol == SOCK_STREAM) { + tfdf |= TF_FD_STREAM_ORIENTED; + } else { + /* We want the local IP info too */ + setsockopt(kfd, IPPROTO_IP, IP_PKTINFO, &on, sizeof(on)); + } + + return tf_open_fd(fd, kfd, tfdf); +} + +int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr) +{ + if (unlikely(bind(fd->fd, &addr->u.addr, tf_sockaddr_len(addr)) < 0)) + return -errno; + return 0; +} + +int tf_listen(struct tf_fd *fd, int backlog) +{ + if (unlikely(listen(fd->fd, backlog) < 0)) + return -errno; + return 0; +} + +int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, + struct tf_sockaddr *from, tf_mtime_diff_t timeout) +{ + int r, tfdf; + struct sockaddr *addr = NULL; + socklen_t al, *pal = NULL; + + if (from != NULL) { + addr = &from->u.addr; + al = tf_sockaddr_len(from); + pal = &al; + } + + tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED); + tfdf |= TF_FD_SET_CLOEXEC; + + tf_fd_monitor(listen_fd, EPOLLIN); + do { + /* FIXME: use accept4 if available */ + r = accept(listen_fd->fd, addr, pal); + if (r >= 0) + break; + if (errno == EINTR) + continue; + if (errno != EAGAIN) { + tf_fd_unmonitor(listen_fd); + return -errno; + } + r = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (r == TF_WAKEUP_FD); + tf_fd_unmonitor(listen_fd); + if (r < 0) + return r; + + return tf_open_fd(child_fd, r, tfdf); +} + +int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to, + tf_mtime_diff_t timeout) +{ + socklen_t l = sizeof(int); + int r, err; + + /* Initiate operation */ + r = connect(fd->fd, &to->u.addr, tf_sockaddr_len(to)); + if (r >= 0) + return 0; + if (errno != EINPROGRESS) + return -errno; + + /* Wait for socket to become readable */ + tf_fd_monitor(fd, EPOLLOUT); + r = tf_schedule(timeout); + tf_fd_unmonitor(fd); + if (r != TF_WAKEUP_FD) + return r; + + /* Check for error */ + if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &err, &l) < 0) + return -errno; + return -err; +} + +ssize_t tf_recvmsg(struct tf_fd *fd, + struct tf_sockaddr *from, + struct tf_sockaddr *to, + void *buf, size_t len, + tf_mtime_diff_t timeout) +{ + struct iovec iov; + struct msghdr msg; + struct cmsghdr *cmsg; + char control[128]; + int r; + + iov = (struct iovec) { + .iov_base = buf, + .iov_len = len, + }; + msg = (struct msghdr) { + .msg_name = &from->u.addr, + .msg_namelen = sizeof(struct tf_sockaddr), + .msg_control = control, + .msg_controllen = sizeof(control), + .msg_iov = &iov, + .msg_iovlen = 1, + }; + + tf_fd_monitor(fd, EPOLLIN); + do { + r = recvmsg(fd->fd, &msg, 0); + if (r >= 0) + break; + if (errno != EAGAIN) { + r = -errno; + break; + } + r = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (r == TF_WAKEUP_FD); + tf_fd_unmonitor(fd); + + if (r < 0 || to == NULL) + return r; + + to->u.addr.sa_family = AF_UNSPEC; + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level == IPPROTO_IP && + cmsg->cmsg_type == IP_PKTINFO) { + struct in_pktinfo *ipi = (struct in_pktinfo *) CMSG_DATA(cmsg); + to->u.in.sin_family = AF_INET; + to->u.in.sin_addr = ipi->ipi_spec_dst; + to->u.in.sin_port = 0; + } + } + + return r; +} + +ssize_t tf_sendmsg(struct tf_fd *fd, + struct tf_sockaddr *from, + const struct tf_sockaddr *to, + const void *buf, size_t len, + tf_mtime_diff_t timeout) +{ + struct msghdr msg; + struct iovec iov; + struct { + struct cmsghdr cm; + struct in_pktinfo ipi; + } cmsg; + int r; + + iov = (struct iovec) { + .iov_base = (void*) buf, + .iov_len = len, + }; + msg = (struct msghdr) { + .msg_name = (struct sockaddr*) &to->u.addr, + .msg_namelen = tf_sockaddr_len(to), + .msg_iov = &iov, + .msg_iovlen = 1, + }; + if (from != NULL && from->u.addr.sa_family == AF_INET) { + memset(&cmsg.ipi, 0, sizeof(cmsg.ipi)); + cmsg.cm.cmsg_len = sizeof(cmsg); + cmsg.cm.cmsg_level = IPPROTO_IP; + cmsg.cm.cmsg_type = IP_PKTINFO; + cmsg.ipi.ipi_spec_dst = from->u.in.sin_addr; + msg.msg_control = (void *) &cmsg; + msg.msg_controllen = sizeof(cmsg); + } + + tf_fd_monitor(fd, EPOLLOUT); + do { + r = sendmsg(fd->fd, &msg, 0); + if (r >= 0) + break; + if (errno != EAGAIN) { + r = -errno; + break; + } + r = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (r == TF_WAKEUP_FD); + tf_fd_unmonitor(fd); + + return r; +} |