summaryrefslogtreecommitdiffstats
path: root/src/io-unix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/io-unix.c')
-rw-r--r--src/io-unix.c310
1 files changed, 276 insertions, 34 deletions
diff --git a/src/io-unix.c b/src/io-unix.c
index d333122..cc1bd1b 100644
--- a/src/io-unix.c
+++ b/src/io-unix.c
@@ -10,46 +10,82 @@
* See http://www.gnu.org/ for details.
*/
-int tf_open(struct tf_fd *fd, const char *pathname, int flags)
+#include <string.h>
+
+static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
{
- int kfd, r;
+ if (addr == NULL)
+ return 0;
+ switch (addr->u.addr.sa_family) {
+ case AF_INET:
+ return sizeof(struct sockaddr_in);
+ case AF_INET6:
+ return sizeof(struct sockaddr_in6);
+ default:
+ TF_BUG_ON(1);
+ }
+}
- kfd = open(pathname, flags | O_CLOEXEC | O_NONBLOCK);
- if (unlikely(kfd < 0))
- return -errno;
+int tf_open_fd(struct tf_fd *fd, int kfd, int flags)
+{
+ int r;
+
+ fd->fd = kfd;
+ fd->flags = flags;
+ fd->waiting_fiber = NULL;
- r = tf_fd_init(fd, kfd, TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED);
+ r = tf_fd_created(fd);
if (r < 0) {
- close(kfd);
+ if (flags & TF_FD_AUTOCLOSE)
+ close(kfd);
+ fd->fd = -1;
return r;
}
+
+ if (flags & TF_FD_SET_CLOEXEC)
+ fcntl(kfd, F_SETFD, FD_CLOEXEC);
+
+ if (!(flags & TF_FD_ALREADY_NONBLOCKING)) {
+ r = fcntl(kfd, F_GETFL, 0);
+ if (r & O_NONBLOCK)
+ fd->flags |= TF_FD_ALREADY_NONBLOCKING;
+ else
+ fcntl(kfd, F_SETFL, O_NONBLOCK | r);
+ }
+
return 0;
}
-int tf_open_fd(struct tf_fd *fd, int kfd)
+int tf_open(struct tf_fd *fd, const char *pathname, int flags)
{
- int mode, flags = 0;
+ int kfd, tfdf;
- mode = fcntl(kfd, F_GETFL, 0);
- if (!(mode & O_NONBLOCK)) {
- fcntl(fd->fd, F_SETFL, mode | O_NONBLOCK);
- flags |= TF_FD_RESTORE_BLOCKING;
- }
+ tfdf = TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED
+ | TF_FD_ALREADY_NONBLOCKING;
+#if defined(O_CLOEXEC)
+ flags |= O_CLOEXEC;
+#else
+ tfdf |= TF_FD_SET_CLOEXEC;
+#endif
+
+ kfd = open(pathname, flags | O_NONBLOCK);
+ if (unlikely(kfd < 0))
+ return -errno;
- return tf_fd_init(fd, kfd, TF_FD_STREAM_ORIENTED | flags);
+ return tf_open_fd(fd, kfd, tfdf);
}
int tf_close(struct tf_fd *fd)
{
int r;
- if (fd->flags & TF_FD_RESTORE_BLOCKING) {
- fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK);
- }
+ tf_fd_destroyed(fd);
if (fd->flags & TF_FD_AUTOCLOSE) {
r = close(fd->fd);
if (unlikely(r == -1))
return -errno;
+ } else if (!(fd->flags & TF_FD_ALREADY_NONBLOCKING)) {
+ fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK);
}
return 0;
}
@@ -57,10 +93,9 @@ int tf_close(struct tf_fd *fd)
int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout)
{
ssize_t n;
- int r, mode;
+ int r;
- mode = tf_schedule_timeout(timeout);
- tf_fd_wait(fd, EPOLLIN);
+ tf_fd_monitor(fd, EPOLLIN);
do {
n = read(fd->fd, buf, count);
if (n == count) {
@@ -84,11 +119,10 @@ int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout)
continue;
}
- r = tf_schedule(mode);
- if (r != TF_WAKEUP_FD)
- break;
- } while (1);
- tf_fd_release(fd);
+ r = tf_schedule(timeout);
+ timeout = TF_NO_TIMEOUT_CHANGE;
+ } while (r == TF_WAKEUP_FD);
+ tf_fd_unmonitor(fd);
return -r;
}
@@ -97,10 +131,9 @@ int tf_write(struct tf_fd *fd, const void *buf, size_t count,
tf_mtime_diff_t timeout)
{
ssize_t n;
- int r, mode;
+ int r;
- mode = tf_schedule_timeout(timeout);
- tf_fd_wait(fd, EPOLLOUT);
+ tf_fd_monitor(fd, EPOLLOUT);
do {
n = write(fd->fd, buf, count);
if (n == count) {
@@ -121,11 +154,220 @@ int tf_write(struct tf_fd *fd, const void *buf, size_t count,
continue;
}
- r = tf_schedule(mode);
- if (r != TF_WAKEUP_FD)
- break;
- } while (1);
- tf_fd_release(fd);
+ r = tf_schedule(timeout);
+ timeout = TF_NO_TIMEOUT_CHANGE;
+ } while (r == TF_WAKEUP_FD);
+ tf_fd_unmonitor(fd);
return -r;
}
+
+int tf_socket(struct tf_fd *fd, int domain, int type, int protocol)
+{
+ int kfd, tfdf, on = 1;
+
+ tfdf = TF_FD_AUTOCLOSE;
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+ type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
+ tfdf |= TF_FD_ALREADY_NONBLOCKING;
+#else
+ tfdf |= TF_FD_SET_CLOEXEC;
+#endif
+ kfd = socket(domain, type, protocol);
+ if (unlikely(kfd < 0))
+ return -errno;
+
+ if (protocol == SOCK_STREAM) {
+ tfdf |= TF_FD_STREAM_ORIENTED;
+ } else {
+ /* We want the local IP info too */
+ setsockopt(kfd, IPPROTO_IP, IP_PKTINFO, &on, sizeof(on));
+ }
+
+ return tf_open_fd(fd, kfd, tfdf);
+}
+
+int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr)
+{
+ if (unlikely(bind(fd->fd, &addr->u.addr, tf_sockaddr_len(addr)) < 0))
+ return -errno;
+ return 0;
+}
+
+int tf_listen(struct tf_fd *fd, int backlog)
+{
+ if (unlikely(listen(fd->fd, backlog) < 0))
+ return -errno;
+ return 0;
+}
+
+int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
+ struct tf_sockaddr *from, tf_mtime_diff_t timeout)
+{
+ int r, tfdf;
+ struct sockaddr *addr = NULL;
+ socklen_t al, *pal = NULL;
+
+ if (from != NULL) {
+ addr = &from->u.addr;
+ al = tf_sockaddr_len(from);
+ pal = &al;
+ }
+
+ tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED);
+ tfdf |= TF_FD_SET_CLOEXEC;
+
+ tf_fd_monitor(listen_fd, EPOLLIN);
+ do {
+ /* FIXME: use accept4 if available */
+ r = accept(listen_fd->fd, addr, pal);
+ if (r >= 0)
+ break;
+ if (errno == EINTR)
+ continue;
+ if (errno != EAGAIN) {
+ tf_fd_unmonitor(listen_fd);
+ return -errno;
+ }
+ r = tf_schedule(timeout);
+ timeout = TF_NO_TIMEOUT_CHANGE;
+ } while (r == TF_WAKEUP_FD);
+ tf_fd_unmonitor(listen_fd);
+ if (r < 0)
+ return r;
+
+ return tf_open_fd(child_fd, r, tfdf);
+}
+
+int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to,
+ tf_mtime_diff_t timeout)
+{
+ socklen_t l = sizeof(int);
+ int r, err;
+
+ /* Initiate operation */
+ r = connect(fd->fd, &to->u.addr, tf_sockaddr_len(to));
+ if (r >= 0)
+ return 0;
+ if (errno != EINPROGRESS)
+ return -errno;
+
+ /* Wait for socket to become readable */
+ tf_fd_monitor(fd, EPOLLOUT);
+ r = tf_schedule(timeout);
+ tf_fd_unmonitor(fd);
+ if (r != TF_WAKEUP_FD)
+ return r;
+
+ /* Check for error */
+ if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &err, &l) < 0)
+ return -errno;
+ return -err;
+}
+
+ssize_t tf_recvmsg(struct tf_fd *fd,
+ struct tf_sockaddr *from,
+ struct tf_sockaddr *to,
+ void *buf, size_t len,
+ tf_mtime_diff_t timeout)
+{
+ struct iovec iov;
+ struct msghdr msg;
+ struct cmsghdr *cmsg;
+ char control[128];
+ int r;
+
+ iov = (struct iovec) {
+ .iov_base = buf,
+ .iov_len = len,
+ };
+ msg = (struct msghdr) {
+ .msg_name = &from->u.addr,
+ .msg_namelen = sizeof(struct tf_sockaddr),
+ .msg_control = control,
+ .msg_controllen = sizeof(control),
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ };
+
+ tf_fd_monitor(fd, EPOLLIN);
+ do {
+ r = recvmsg(fd->fd, &msg, 0);
+ if (r >= 0)
+ break;
+ if (errno != EAGAIN) {
+ r = -errno;
+ break;
+ }
+ r = tf_schedule(timeout);
+ timeout = TF_NO_TIMEOUT_CHANGE;
+ } while (r == TF_WAKEUP_FD);
+ tf_fd_unmonitor(fd);
+
+ if (r < 0 || to == NULL)
+ return r;
+
+ to->u.addr.sa_family = AF_UNSPEC;
+ for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ if (cmsg->cmsg_level == IPPROTO_IP &&
+ cmsg->cmsg_type == IP_PKTINFO) {
+ struct in_pktinfo *ipi = (struct in_pktinfo *) CMSG_DATA(cmsg);
+ to->u.in.sin_family = AF_INET;
+ to->u.in.sin_addr = ipi->ipi_spec_dst;
+ to->u.in.sin_port = 0;
+ }
+ }
+
+ return r;
+}
+
+ssize_t tf_sendmsg(struct tf_fd *fd,
+ struct tf_sockaddr *from,
+ const struct tf_sockaddr *to,
+ const void *buf, size_t len,
+ tf_mtime_diff_t timeout)
+{
+ struct msghdr msg;
+ struct iovec iov;
+ struct {
+ struct cmsghdr cm;
+ struct in_pktinfo ipi;
+ } cmsg;
+ int r;
+
+ iov = (struct iovec) {
+ .iov_base = (void*) buf,
+ .iov_len = len,
+ };
+ msg = (struct msghdr) {
+ .msg_name = (struct sockaddr*) &to->u.addr,
+ .msg_namelen = tf_sockaddr_len(to),
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ };
+ if (from != NULL && from->u.addr.sa_family == AF_INET) {
+ memset(&cmsg.ipi, 0, sizeof(cmsg.ipi));
+ cmsg.cm.cmsg_len = sizeof(cmsg);
+ cmsg.cm.cmsg_level = IPPROTO_IP;
+ cmsg.cm.cmsg_type = IP_PKTINFO;
+ cmsg.ipi.ipi_spec_dst = from->u.in.sin_addr;
+ msg.msg_control = (void *) &cmsg;
+ msg.msg_controllen = sizeof(cmsg);
+ }
+
+ tf_fd_monitor(fd, EPOLLOUT);
+ do {
+ r = sendmsg(fd->fd, &msg, 0);
+ if (r >= 0)
+ break;
+ if (errno != EAGAIN) {
+ r = -errno;
+ break;
+ }
+ r = tf_schedule(timeout);
+ timeout = TF_NO_TIMEOUT_CHANGE;
+ } while (r == TF_WAKEUP_FD);
+ tf_fd_unmonitor(fd);
+
+ return r;
+}