From dc34e87746f69994aad893b39ee4cd3dda6e2f7b Mon Sep 17 00:00:00 2001 From: Timo Teras Date: Wed, 10 Mar 2010 19:04:26 +0200 Subject: io: virtualize polling api so we can in future have more polling frameworks than epoll. --- include/libtf/io.h | 15 ++++-- include/libtf/scheduler.h | 3 ++ src/TFbuild | 1 + src/io-epoll.c | 120 +++++++++++++++++++++++++--------------------- src/io-unix.c | 58 ++++++++++++++-------- src/scheduler.c | 8 ++-- 6 files changed, 126 insertions(+), 79 deletions(-) diff --git a/include/libtf/io.h b/include/libtf/io.h index 1f37d81..0d34421 100644 --- a/include/libtf/io.h +++ b/include/libtf/io.h @@ -43,9 +43,18 @@ struct tf_fd { void *waiting_fiber; }; -void tf_poll_init(void); -int tf_poll(tf_mtime_diff_t timeout); -void tf_poll_close(void); +#define TF_POLL_READ 1 +#define TF_POLL_WRITE 2 + +struct tf_poll_hooks { + void (*init)(void); + int (*poll)(tf_mtime_diff_t timeout); + void (*close)(void); + int (*fd_created)(struct tf_fd *fd); + int (*fd_destroyed)(struct tf_fd *fd); + void (*fd_monitor)(struct tf_fd *fd, int events); + void (*fd_unmonitor)(struct tf_fd *fd); +}; int tf_open_fd(struct tf_fd *fd, int kfd, int flags); int tf_open(struct tf_fd *fd, const char *pathname, int flags); diff --git a/include/libtf/scheduler.h b/include/libtf/scheduler.h index cc8db70..db5a823 100644 --- a/include/libtf/scheduler.h +++ b/include/libtf/scheduler.h @@ -18,6 +18,8 @@ #include #include +struct tf_poll_hooks; + struct tf_scheduler { struct tf_list_head scheduled_q; struct tf_list_head running_q; @@ -26,6 +28,7 @@ struct tf_scheduler { void * main_fiber; int num_fibers; tf_mtime_t scheduler_time; + struct tf_poll_hooks * poller; unsigned long poll_data[2]; }; diff --git a/src/TFbuild b/src/TFbuild index 08cb696..ef1c5ae 100644 --- a/src/TFbuild +++ b/src/TFbuild @@ -2,5 +2,6 @@ libs-y += libtf libtf-objs-y += fiber.o scheduler.o heap.o libtf-objs-$(OS_LINUX) += io-epoll.o +libtf-objs-$(OS_UNIX) += io-unix.o CFLAGS_heap.c += -funroll-all-loops diff --git a/src/io-epoll.c b/src/io-epoll.c index 8ac230f..32aa090 100644 --- a/src/io-epoll.c +++ b/src/io-epoll.c @@ -14,7 +14,6 @@ #include #include #include -#include #include #include @@ -24,71 +23,29 @@ struct tf_poll_data { int num_waiters; }; -struct tf_poll_data *tf_epoll_get_data(void) +static struct tf_poll_data *tf_epoll_get_data(void) { struct tf_scheduler *sched = tf_scheduler_get_current(); TF_BUILD_BUG_ON(sizeof(struct tf_poll_data) > sizeof(sched->poll_data)); return (struct tf_poll_data *) &sched->poll_data; } -static int tf_fd_created(struct tf_fd *fd) +static void tf_epoll_init(void) { struct tf_poll_data *pd = tf_epoll_get_data(); - struct epoll_event ev; - int r; - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; - ev.data.ptr = fd; - r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); - if (unlikely(r < 0)) { - TF_BUG_ON(errno == EEXIST); - r = -errno; - return r; - } - - return 0; -} - -static int tf_fd_destroyed(struct tf_fd *fd) -{ - struct tf_poll_data *pd = tf_epoll_get_data(); - - if (fd->flags & TF_FD_AUTOCLOSE) - return 0; - - epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); - return 0; -} - -static void tf_fd_monitor(struct tf_fd *fd, int events) -{ - struct tf_poll_data *pd = tf_epoll_get_data(); - - TF_BUG_ON(fd->waiting_fiber != NULL); - fd->events = events | EPOLLERR | EPOLLHUP; - fd->waiting_fiber = tf_scheduler_get_current()->active_fiber; - pd->num_waiters++; -} - -static void tf_fd_unmonitor(struct tf_fd *fd) -{ - struct tf_poll_data *pd = tf_epoll_get_data(); - - fd->waiting_fiber = NULL; - fd->events = 0; - pd->num_waiters--; + pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + pd->num_waiters = 0; + TF_BUG_ON(pd->epoll_fd < 0); } -void tf_poll_init(void) +static void tf_epoll_close(void) { struct tf_poll_data *pd = tf_epoll_get_data(); - pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC); - pd->num_waiters = 0; - TF_BUG_ON(pd->epoll_fd < 0); + close(pd->epoll_fd); } - -int tf_poll(tf_mtime_diff_t timeout) +static int tf_epoll_poll(tf_mtime_diff_t timeout) { struct tf_poll_data *pd = tf_epoll_get_data(); struct epoll_event events[64]; @@ -116,11 +73,66 @@ int tf_poll(tf_mtime_diff_t timeout) return ret; } -void tf_poll_close(void) +static int tf_epoll_fd_created(struct tf_fd *fd) { struct tf_poll_data *pd = tf_epoll_get_data(); + struct epoll_event ev; + int r; - close(pd->epoll_fd); + ev = (struct epoll_event) { + .events = EPOLLIN | EPOLLOUT | EPOLLET, + .data.ptr = fd, + }; + r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); + if (unlikely(r < 0)) { + TF_BUG_ON(errno == EEXIST); + r = -errno; + return r; + } + + return 0; } -#include "io-unix.c" +static int tf_epoll_fd_destroyed(struct tf_fd *fd) +{ + struct tf_poll_data *pd = tf_epoll_get_data(); + + if (fd->flags & TF_FD_AUTOCLOSE) + return 0; + + epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); + return 0; +} + +static void tf_epoll_fd_monitor(struct tf_fd *fd, int events) +{ + struct tf_poll_data *pd = tf_epoll_get_data(); + + TF_BUG_ON(fd->waiting_fiber != NULL); + fd->events = EPOLLERR | EPOLLHUP; + if (events & TF_POLL_READ) + fd->events |= EPOLLIN; + if (events & TF_POLL_WRITE) + fd->events |= EPOLLOUT; + fd->waiting_fiber = tf_scheduler_get_current()->active_fiber; + pd->num_waiters++; +} + +static void tf_epoll_fd_unmonitor(struct tf_fd *fd) +{ + struct tf_poll_data *pd = tf_epoll_get_data(); + + fd->waiting_fiber = NULL; + fd->events = 0; + pd->num_waiters--; +} + +struct tf_poll_hooks tf_epoll_hooks = { + .init = tf_epoll_init, + .close = tf_epoll_close, + .poll = tf_epoll_poll, + .fd_created = tf_epoll_fd_created, + .fd_destroyed = tf_epoll_fd_destroyed, + .fd_monitor = tf_epoll_fd_monitor, + .fd_unmonitor = tf_epoll_fd_unmonitor, +}; diff --git a/src/io-unix.c b/src/io-unix.c index 39cdf64..d80592a 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -10,8 +10,18 @@ * See http://www.gnu.org/ for details. */ +#include +#include +#include +#include #include +#include +#include + +#define TF_POLL_HOOKS \ +struct tf_poll_hooks *poller = tf_scheduler_get_current()->poller; + static inline int tf_sockaddr_len(const struct tf_sockaddr *addr) { if (addr == NULL) @@ -28,13 +38,14 @@ static inline int tf_sockaddr_len(const struct tf_sockaddr *addr) int tf_open_fd(struct tf_fd *fd, int kfd, int flags) { + TF_POLL_HOOKS; int r; fd->fd = kfd; fd->flags = flags; fd->waiting_fiber = NULL; - r = tf_fd_created(fd); + r = poller->fd_created(fd); if (r < 0) { if (flags & TF_FD_AUTOCLOSE) close(kfd); @@ -77,9 +88,10 @@ int tf_open(struct tf_fd *fd, const char *pathname, int flags) int tf_close(struct tf_fd *fd) { + TF_POLL_HOOKS; int r; - tf_fd_destroyed(fd); + poller->fd_destroyed(fd); if (fd->flags & TF_FD_AUTOCLOSE) { r = close(fd->fd); if (unlikely(r == -1)) @@ -92,10 +104,11 @@ int tf_close(struct tf_fd *fd) int tf_read_fully(struct tf_fd *fd, void *buf, size_t count) { + TF_POLL_HOOKS; ssize_t n; int r; - tf_fd_monitor(fd, EPOLLIN); + poller->fd_monitor(fd, TF_POLL_READ); do { n = read(fd->fd, buf, count); if (n == count) { @@ -121,17 +134,18 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count) r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); - tf_fd_unmonitor(fd); + poller->fd_unmonitor(fd); return -r; } int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count) { + TF_POLL_HOOKS; ssize_t n; int r; - tf_fd_monitor(fd, EPOLLOUT); + poller->fd_monitor(fd, TF_POLL_WRITE); do { n = write(fd->fd, buf, count); if (n == count) { @@ -154,16 +168,17 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count) r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); - tf_fd_unmonitor(fd); + poller->fd_unmonitor(fd); return -r; } ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count) { + TF_POLL_HOOKS; ssize_t n; - tf_fd_monitor(fd, EPOLLIN); + poller->fd_monitor(fd, TF_POLL_READ); do { n = read(fd->fd, buf, count); if (n >= 0) @@ -176,16 +191,17 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count) } n = __tf_fiber_schedule(); } while (n == TF_WAKEUP_FD); - tf_fd_unmonitor(fd); + poller->fd_unmonitor(fd); return n; } ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count) { + TF_POLL_HOOKS; ssize_t n; - tf_fd_monitor(fd, EPOLLOUT); + poller->fd_monitor(fd, TF_POLL_WRITE); do { n = write(fd->fd, buf, count); if (n >= 0) @@ -198,7 +214,7 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count) } n = __tf_fiber_schedule(); } while (n == TF_WAKEUP_FD); - tf_fd_unmonitor(fd); + poller->fd_unmonitor(fd); return n; } @@ -245,6 +261,7 @@ int tf_listen(struct tf_fd *fd, int backlog) int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, struct tf_sockaddr *from) { + TF_POLL_HOOKS; int r, tfdf; struct sockaddr *addr = NULL; socklen_t al, *pal = NULL; @@ -258,7 +275,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED); tfdf |= TF_FD_SET_CLOEXEC; - tf_fd_monitor(listen_fd, EPOLLIN); + poller->fd_monitor(listen_fd, TF_POLL_READ); do { /* FIXME: use accept4 if available */ r = accept(listen_fd->fd, addr, pal); @@ -267,12 +284,12 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, if (errno == EINTR) continue; if (errno != EAGAIN) { - tf_fd_unmonitor(listen_fd); + poller->fd_unmonitor(listen_fd); return -errno; } r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); - tf_fd_unmonitor(listen_fd); + poller->fd_unmonitor(listen_fd); if (r < 0) return r; @@ -281,6 +298,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to) { + TF_POLL_HOOKS; socklen_t l = sizeof(int); int r, err; @@ -292,9 +310,9 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to) return -errno; /* Wait for socket to become readable */ - tf_fd_monitor(fd, EPOLLOUT); + poller->fd_monitor(fd, TF_POLL_WRITE); r = __tf_fiber_schedule(); - tf_fd_unmonitor(fd); + poller->fd_unmonitor(fd); if (r != TF_WAKEUP_FD) return r; @@ -309,6 +327,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd, struct tf_sockaddr *to, void *buf, size_t len) { + TF_POLL_HOOKS; struct iovec iov; struct msghdr msg; struct cmsghdr *cmsg; @@ -328,7 +347,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd, .msg_iovlen = 1, }; - tf_fd_monitor(fd, EPOLLIN); + poller->fd_monitor(fd, TF_POLL_READ); do { r = recvmsg(fd->fd, &msg, 0); if (r >= 0) @@ -339,7 +358,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd, } r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); - tf_fd_unmonitor(fd); + poller->fd_unmonitor(fd); if (r < 0 || to == NULL) return r; @@ -363,6 +382,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd, const struct tf_sockaddr *to, const void *buf, size_t len) { + TF_POLL_HOOKS; struct msghdr msg; struct iovec iov; struct { @@ -391,7 +411,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd, msg.msg_controllen = sizeof(cmsg); } - tf_fd_monitor(fd, EPOLLOUT); + poller->fd_monitor(fd, TF_POLL_WRITE); do { r = sendmsg(fd->fd, &msg, 0); if (r >= 0) @@ -402,7 +422,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd, } r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); - tf_fd_unmonitor(fd); + poller->fd_unmonitor(fd); return r; } diff --git a/src/scheduler.c b/src/scheduler.c index d287eca..a103d0a 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -15,6 +15,7 @@ #include /* FIXME: should be in thread local storage */ +extern struct tf_poll_hooks tf_epoll_hooks; struct tf_scheduler *__tf_scheduler; static void update_time(struct tf_scheduler *sched) @@ -59,7 +60,7 @@ void tf_scheduler_fiber(void *data) timeout = -1; } - if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT && + if (sched->poller->poll(timeout) == TF_WAKEUP_TIMEOUT && timeout >= 0) { sched->scheduler_time += timeout; process_heap(sched); @@ -103,7 +104,8 @@ int tf_scheduler_enable(struct tf_scheduler *sched) __tf_fiber_bind_scheduler(s); __tf_scheduler = s; - tf_poll_init(); + s->poller = &tf_epoll_hooks; + s->poller->init(); update_time(s); if (sched != NULL) @@ -124,7 +126,7 @@ void tf_scheduler_disable(void) while (sched->num_fibers > 1) __tf_fiber_schedule(); - tf_poll_close(); + sched->poller->close(); __tf_scheduler = NULL; __tf_fiber_release_scheduler(sched); tf_heap_destroy(&sched->heap); -- cgit v1.2.3