diff options
author | Timo Teräs <timo.teras@iki.fi> | 2010-07-02 20:23:07 +0300 |
---|---|---|
committer | Timo Teräs <timo.teras@iki.fi> | 2010-07-02 20:25:47 +0300 |
commit | 23b95bf1a15322c2f471b80c06cb65d9b2d2a282 (patch) | |
tree | 9bf12231db9591852e3b42ca24715d2cbaf6267b /src/io-unix.c | |
parent | 0183e33d9a4759764716e771b85e19f7a997b8bd (diff) | |
download | libtf-master.tar.bz2 libtf-master.tar.xz |
the idea is to make libtf completely multi-threaded. meaning each
fiber can be running concurrently in separate thread. quite a bit
of framework is added for this and some atomic helpers are already
introduced. however, io polling is busy polling now (will be soon
in own thread) and timeouts are still more or less broken. oh, and
the multithreading core is not there yet. basically we are currently
mostly broken ;)
Diffstat (limited to 'src/io-unix.c')
-rw-r--r-- | src/io-unix.c | 87 |
1 files changed, 29 insertions, 58 deletions
diff --git a/src/io-unix.c b/src/io-unix.c index d80592a..7cd4fd0 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -17,10 +17,8 @@ #include <string.h> #include <libtf/io.h> -#include <libtf/scheduler.h> - -#define TF_POLL_HOOKS \ -struct tf_poll_hooks *poller = tf_scheduler_get_current()->poller; +#include <libtf/fiber.h> +#include <libtf/vmach.h> static inline int tf_sockaddr_len(const struct tf_sockaddr *addr) { @@ -38,14 +36,14 @@ static inline int tf_sockaddr_len(const struct tf_sockaddr *addr) int tf_open_fd(struct tf_fd *fd, int kfd, int flags) { - TF_POLL_HOOKS; + struct tf_vmach *vm = tf_vmach_get_current(); int r; fd->fd = kfd; fd->flags = flags; - fd->waiting_fiber = NULL; + fd->fiber = tf_vmach_get_current_fiber(); - r = poller->fd_created(fd); + r = vm->poll_ops->fd_created(vm->poll_fiber, fd); if (r < 0) { if (flags & TF_FD_AUTOCLOSE) close(kfd); @@ -88,10 +86,10 @@ int tf_open(struct tf_fd *fd, const char *pathname, int flags) int tf_close(struct tf_fd *fd) { - TF_POLL_HOOKS; + struct tf_vmach *vm = tf_vmach_get_current(); int r; - poller->fd_destroyed(fd); + vm->poll_ops->fd_destroyed(vm->poll_fiber, fd); if (fd->flags & TF_FD_AUTOCLOSE) { r = close(fd->fd); if (unlikely(r == -1)) @@ -104,11 +102,9 @@ int tf_close(struct tf_fd *fd) int tf_read_fully(struct tf_fd *fd, void *buf, size_t count) { - TF_POLL_HOOKS; ssize_t n; - int r; + int r = 0; - poller->fd_monitor(fd, TF_POLL_READ); do { n = read(fd->fd, buf, count); if (n == count) { @@ -132,20 +128,17 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count) continue; } - r = __tf_fiber_schedule(); - } while (r == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + r = tf_fiber_schedule(); + } while (r != 0); return -r; } int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count) { - TF_POLL_HOOKS; ssize_t n; - int r; + int r = 0; - poller->fd_monitor(fd, TF_POLL_WRITE); do { n = write(fd->fd, buf, count); if (n == count) { @@ -166,19 +159,16 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count) continue; } - r = __tf_fiber_schedule(); - } while (r == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + r = tf_fiber_schedule(); + } while (r != 0); return -r; } ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count) { - TF_POLL_HOOKS; ssize_t n; - poller->fd_monitor(fd, TF_POLL_READ); do { n = read(fd->fd, buf, count); if (n >= 0) @@ -189,19 +179,16 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count) n = -errno; break; } - n = __tf_fiber_schedule(); - } while (n == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + n = tf_fiber_schedule(); + } while (n != 0); return n; } ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count) { - TF_POLL_HOOKS; ssize_t n; - poller->fd_monitor(fd, TF_POLL_WRITE); do { n = write(fd->fd, buf, count); if (n >= 0) @@ -212,9 +199,8 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count) n = -errno; break; } - n = __tf_fiber_schedule(); - } while (n == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + n = tf_fiber_schedule(); + } while (n != 0); return n; } @@ -261,7 +247,6 @@ int tf_listen(struct tf_fd *fd, int backlog) int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, struct tf_sockaddr *from) { - TF_POLL_HOOKS; int r, tfdf; struct sockaddr *addr = NULL; socklen_t al, *pal = NULL; @@ -272,24 +257,19 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, pal = &al; } - tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED); - tfdf |= TF_FD_SET_CLOEXEC; + tfdf = TF_FD_AUTOCLOSE | TF_FD_ALREADY_NONBLOCKING | + (listen_fd->flags & TF_FD_STREAM_ORIENTED); - poller->fd_monitor(listen_fd, TF_POLL_READ); do { - /* FIXME: use accept4 if available */ - r = accept(listen_fd->fd, addr, pal); + r = accept4(listen_fd->fd, addr, pal, SOCK_NONBLOCK|SOCK_CLOEXEC); if (r >= 0) break; if (errno == EINTR) continue; - if (errno != EAGAIN) { - poller->fd_unmonitor(listen_fd); + if (errno != EAGAIN) return -errno; - } - r = __tf_fiber_schedule(); - } while (r == TF_WAKEUP_FD); - poller->fd_unmonitor(listen_fd); + r = tf_fiber_schedule(); + } while (r != 0); if (r < 0) return r; @@ -298,7 +278,6 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to) { - TF_POLL_HOOKS; socklen_t l = sizeof(int); int r, err; @@ -310,10 +289,8 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to) return -errno; /* Wait for socket to become readable */ - poller->fd_monitor(fd, TF_POLL_WRITE); - r = __tf_fiber_schedule(); - poller->fd_unmonitor(fd); - if (r != TF_WAKEUP_FD) + r = tf_fiber_schedule(); + if (r != 0) return r; /* Check for error */ @@ -327,7 +304,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd, struct tf_sockaddr *to, void *buf, size_t len) { - TF_POLL_HOOKS; struct iovec iov; struct msghdr msg; struct cmsghdr *cmsg; @@ -347,7 +323,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd, .msg_iovlen = 1, }; - poller->fd_monitor(fd, TF_POLL_READ); do { r = recvmsg(fd->fd, &msg, 0); if (r >= 0) @@ -356,9 +331,8 @@ ssize_t tf_recvmsg(struct tf_fd *fd, r = -errno; break; } - r = __tf_fiber_schedule(); - } while (r == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + r = tf_fiber_schedule(); + } while (r != 0); if (r < 0 || to == NULL) return r; @@ -382,7 +356,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd, const struct tf_sockaddr *to, const void *buf, size_t len) { - TF_POLL_HOOKS; struct msghdr msg; struct iovec iov; struct { @@ -411,7 +384,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd, msg.msg_controllen = sizeof(cmsg); } - poller->fd_monitor(fd, TF_POLL_WRITE); do { r = sendmsg(fd->fd, &msg, 0); if (r >= 0) @@ -420,9 +392,8 @@ ssize_t tf_sendmsg(struct tf_fd *fd, r = -errno; break; } - r = __tf_fiber_schedule(); - } while (r == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + r = tf_fiber_schedule(); + } while (r != 0); return r; } |