/* io-unix.c - non-blocking io primitives for unix * * Copyright (C) 2009 Timo Teräs * All rights reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 or later as * published by the Free Software Foundation. * * See http://www.gnu.org/ for details. */ #include static inline int tf_sockaddr_len(const struct tf_sockaddr *addr) { if (addr == NULL) return 0; switch (addr->u.addr.sa_family) { case AF_INET: return sizeof(struct sockaddr_in); case AF_INET6: return sizeof(struct sockaddr_in6); default: TF_BUG_ON(1); } } int tf_open_fd(struct tf_fd *fd, int kfd, int flags) { int r; fd->fd = kfd; fd->flags = flags; fd->waiting_fiber = NULL; r = tf_fd_created(fd); if (r < 0) { if (flags & TF_FD_AUTOCLOSE) close(kfd); fd->fd = -1; return r; } if (flags & TF_FD_SET_CLOEXEC) fcntl(kfd, F_SETFD, FD_CLOEXEC); if (!(flags & TF_FD_ALREADY_NONBLOCKING)) { r = fcntl(kfd, F_GETFL, 0); if (r & O_NONBLOCK) fd->flags |= TF_FD_ALREADY_NONBLOCKING; else fcntl(kfd, F_SETFL, O_NONBLOCK | r); } return 0; } int tf_open(struct tf_fd *fd, const char *pathname, int flags) { int kfd, tfdf; tfdf = TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED | TF_FD_ALREADY_NONBLOCKING; #if defined(O_CLOEXEC) flags |= O_CLOEXEC; #else tfdf |= TF_FD_SET_CLOEXEC; #endif kfd = open(pathname, flags | O_NONBLOCK); if (unlikely(kfd < 0)) return -errno; return tf_open_fd(fd, kfd, tfdf); } int tf_close(struct tf_fd *fd) { int r; tf_fd_destroyed(fd); if (fd->flags & TF_FD_AUTOCLOSE) { r = close(fd->fd); if (unlikely(r == -1)) return -errno; } else if (!(fd->flags & TF_FD_ALREADY_NONBLOCKING)) { fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK); } return 0; } int tf_read_fully(struct tf_fd *fd, void *buf, size_t count) { ssize_t n; int r; tf_fd_monitor(fd, EPOLLIN); do { n = read(fd->fd, buf, count); if (n == count) { r = 0; break; } if (n < 0) { if (errno == EINTR) continue; if (errno != EAGAIN) { r = errno; break; } } else if (n == 0) { r = EIO; break; } else { buf += n; count -= n; if (!(fd->flags & TF_FD_STREAM_ORIENTED)) continue; } r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); return -r; } int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count) { ssize_t n; int r; tf_fd_monitor(fd, EPOLLOUT); do { n = write(fd->fd, buf, count); if (n == count) { r = 0; break; } if (n < 0) { if (errno == EINTR) continue; if (errno != EAGAIN) { r = errno; break; } } else { buf += n; count -= n; if (!(fd->flags & TF_FD_STREAM_ORIENTED)) continue; } r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); return -r; } ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count) { ssize_t n; tf_fd_monitor(fd, EPOLLIN); do { n = read(fd->fd, buf, count); if (n >= 0) break; if (errno == EINTR) continue; if (errno != EAGAIN) { n = -errno; break; } n = __tf_fiber_schedule(); } while (n == TF_WAKEUP_FD); tf_fd_unmonitor(fd); return n; } ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count) { ssize_t n; tf_fd_monitor(fd, EPOLLOUT); do { n = write(fd->fd, buf, count); if (n >= 0) break; if (errno == EINTR) continue; if (errno != EAGAIN) { n = -errno; break; } n = __tf_fiber_schedule(); } while (n == TF_WAKEUP_FD); tf_fd_unmonitor(fd); return n; } int tf_socket(struct tf_fd *fd, int domain, int type, int protocol) { int kfd, tfdf, on = 1; tfdf = TF_FD_AUTOCLOSE; #if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC) type |= SOCK_NONBLOCK | SOCK_CLOEXEC; tfdf |= TF_FD_ALREADY_NONBLOCKING; #else tfdf |= TF_FD_SET_CLOEXEC; #endif kfd = socket(domain, type, protocol); if (unlikely(kfd < 0)) return -errno; if (protocol == SOCK_STREAM) { tfdf |= TF_FD_STREAM_ORIENTED; } else { /* We want the local IP info too */ setsockopt(kfd, IPPROTO_IP, IP_PKTINFO, &on, sizeof(on)); } return tf_open_fd(fd, kfd, tfdf); } int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr) { if (unlikely(bind(fd->fd, &addr->u.addr, tf_sockaddr_len(addr)) < 0)) return -errno; return 0; } int tf_listen(struct tf_fd *fd, int backlog) { if (unlikely(listen(fd->fd, backlog) < 0)) return -errno; return 0; } int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, struct tf_sockaddr *from) { int r, tfdf; struct sockaddr *addr = NULL; socklen_t al, *pal = NULL; if (from != NULL) { addr = &from->u.addr; al = tf_sockaddr_len(from); pal = &al; } tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED); tfdf |= TF_FD_SET_CLOEXEC; tf_fd_monitor(listen_fd, EPOLLIN); do { /* FIXME: use accept4 if available */ r = accept(listen_fd->fd, addr, pal); if (r >= 0) break; if (errno == EINTR) continue; if (errno != EAGAIN) { tf_fd_unmonitor(listen_fd); return -errno; } r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(listen_fd); if (r < 0) return r; return tf_open_fd(child_fd, r, tfdf); } int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to) { socklen_t l = sizeof(int); int r, err; /* Initiate operation */ r = connect(fd->fd, &to->u.addr, tf_sockaddr_len(to)); if (r >= 0) return 0; if (errno != EINPROGRESS) return -errno; /* Wait for socket to become readable */ tf_fd_monitor(fd, EPOLLOUT); r = __tf_fiber_schedule(); tf_fd_unmonitor(fd); if (r != TF_WAKEUP_FD) return r; /* Check for error */ if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &err, &l) < 0) return -errno; return -err; } ssize_t tf_recvmsg(struct tf_fd *fd, struct tf_sockaddr *from, struct tf_sockaddr *to, void *buf, size_t len) { struct iovec iov; struct msghdr msg; struct cmsghdr *cmsg; char control[128]; int r; iov = (struct iovec) { .iov_base = buf, .iov_len = len, }; msg = (struct msghdr) { .msg_name = &from->u.addr, .msg_namelen = sizeof(struct tf_sockaddr), .msg_control = control, .msg_controllen = sizeof(control), .msg_iov = &iov, .msg_iovlen = 1, }; tf_fd_monitor(fd, EPOLLIN); do { r = recvmsg(fd->fd, &msg, 0); if (r >= 0) break; if (errno != EAGAIN) { r = -errno; break; } r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); if (r < 0 || to == NULL) return r; to->u.addr.sa_family = AF_UNSPEC; for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) { struct in_pktinfo *ipi = (struct in_pktinfo *) CMSG_DATA(cmsg); to->u.in.sin_family = AF_INET; to->u.in.sin_addr = ipi->ipi_spec_dst; to->u.in.sin_port = 0; } } return r; } ssize_t tf_sendmsg(struct tf_fd *fd, struct tf_sockaddr *from, const struct tf_sockaddr *to, const void *buf, size_t len) { struct msghdr msg; struct iovec iov; struct { struct cmsghdr cm; struct in_pktinfo ipi; } cmsg; int r; iov = (struct iovec) { .iov_base = (void*) buf, .iov_len = len, }; msg = (struct msghdr) { .msg_name = (struct sockaddr*) &to->u.addr, .msg_namelen = tf_sockaddr_len(to), .msg_iov = &iov, .msg_iovlen = 1, }; if (from != NULL && from->u.addr.sa_family == AF_INET) { memset(&cmsg.ipi, 0, sizeof(cmsg.ipi)); cmsg.cm.cmsg_len = sizeof(cmsg); cmsg.cm.cmsg_level = IPPROTO_IP; cmsg.cm.cmsg_type = IP_PKTINFO; cmsg.ipi.ipi_spec_dst = from->u.in.sin_addr; msg.msg_control = (void *) &cmsg; msg.msg_controllen = sizeof(cmsg); } tf_fd_monitor(fd, EPOLLOUT); do { r = sendmsg(fd->fd, &msg, 0); if (r >= 0) break; if (errno != EAGAIN) { r = -errno; break; } r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); return r; }