From fc1044daf51f32b9d85f8497e4e0bd5a3c1e7fe9 Mon Sep 17 00:00:00 2001 From: Timo Teras Date: Wed, 25 Nov 2009 10:52:15 +0200 Subject: libtf: implement basic file i/o with epoll some scetching of i/o api, and implement basic read and write functionality. integrate polling to scheduler and an epoll based polling mechanism. --- src/TFbuild | 2 +- src/fiber.c | 85 ++++++++++++++++++++++++++----------- src/heap.c | 3 +- src/io-epoll.c | 107 ++++++++++++++++++++++++++++++++++++++++++++++ src/io-unix.c | 131 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 301 insertions(+), 27 deletions(-) create mode 100644 src/io-epoll.c create mode 100644 src/io-unix.c (limited to 'src') diff --git a/src/TFbuild b/src/TFbuild index accae6d..9b40443 100644 --- a/src/TFbuild +++ b/src/TFbuild @@ -1,5 +1,5 @@ libs-y += libtf -libtf-objs-y += fiber.o heap.o +libtf-objs-y += fiber.o heap.o io-epoll.o CFLAGS_heap.c += -funroll-all-loops diff --git a/src/fiber.c b/src/fiber.c index 72da440..15c533a 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -13,11 +13,12 @@ #include #include #include -#include -#include +#include +#include struct tf_fiber { unsigned int ref_count; + int wakeup_type; struct tf_list_node queue_node; struct tf_heap_node heap_node; char data[TF_EMPTY_ARRAY]; @@ -85,16 +86,13 @@ static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f) struct tf_fiber *schedf = container_of((void*) tf_get_scheduler(), struct tf_fiber, data); sched->active_fiber = f; - switch (tf_uctx_transfer(schedf, f, 1)) { - case EFAULT: /* Fiber is dead */ + switch (tf_uctx_transfer(schedf, f, f->wakeup_type)) { + case TF_WAKEUP_KILL: tf_fiber_put(f->data); sched->num_fibers--; break; - case EAGAIN: /* Yielded, reshedule */ - tf_list_add_tail(&f->queue_node, &sched->run_q); - break; - case EIO: /* Blocked, in sleep */ - tf_list_add_tail(&f->queue_node, &sched->sleep_q); + case TF_WAKEUP_IMMEDIATE: + case TF_WAKEUP_TIMEOUT: break; default: TF_BUG_ON("bad scheduler call from fiber"); @@ -108,7 +106,7 @@ static void process_heap(struct tf_scheduler *sched) tf_mtime_t now = tf_mtime(); while (!tf_heap_empty(&sched->heap) && - tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) > 0) { + tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) { node = tf_heap_get_node(&sched->heap); f = container_of(node, struct tf_fiber, heap_node); run_fiber(sched, f); @@ -135,9 +133,9 @@ int tf_main(tf_fiber_proc main_fiber) ctx->stack_guard = &stack_guard; *sched = (struct tf_scheduler){ .run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q), - .sleep_q = TF_LIST_HEAD_INITIALIZER(sched->sleep_q), }; __tf_scheduler = sched; + tf_poll_init(); update_time(sched); tf_fiber_put(tf_fiber_create(main_fiber, 0)); do { @@ -148,47 +146,86 @@ int tf_main(tf_fiber_proc main_fiber) timeout = 0; } else if (!tf_heap_empty(&sched->heap)) { timeout = tf_mtime_diff(tf_heap_get_value(&sched->heap), - sched->scheduler_time); + tf_mtime()); if (timeout < 0) timeout = 0; } else timeout = -1; - if (timeout > 0) - usleep(timeout * 1000); - - process_heap(sched); + if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT) { + sched->scheduler_time += timeout; + process_heap(sched); + } process_runq(sched); } while (likely(sched->num_fibers)); + tf_poll_close(); __tf_scheduler = NULL; return 0; } -int tf_schedule(int err) +int tf_schedule(int wakeup) { struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); struct tf_fiber *f = sched->active_fiber; - int r; - r = tf_uctx_transfer(f, schedf, err); - if (r == 1) - return 0; + if (wakeup != TF_WAKEUP_TIMEOUT) + tf_heap_delete(&f->heap_node, &sched->heap); + f->wakeup_type = TF_WAKEUP_NONE; - return r; + return tf_uctx_transfer(f, schedf, wakeup); } -int tf_msleep(int milliseconds) +int tf_schedule_timeout(int milliseconds) { struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *f = sched->active_fiber; + if (milliseconds <= 0) { + tf_heap_delete(&f->heap_node, &sched->heap); + return TF_WAKEUP_IMMEDIATE; + } tf_heap_change(&f->heap_node, &sched->heap, tf_mtime() + milliseconds); + return TF_WAKEUP_TIMEOUT; +} + +void tf_wakeup(struct tf_fiber *fiber, int wakeup_type) +{ + struct tf_scheduler *sched = tf_get_scheduler(); + + if (fiber->wakeup_type == TF_WAKEUP_NONE) { + fiber->wakeup_type = wakeup_type; + tf_list_add_tail(&fiber->queue_node, &sched->run_q); + } +} + +void tf_exit(void) +{ + struct tf_scheduler *sched = tf_get_scheduler(); + struct tf_fiber *f = sched->active_fiber; - return tf_schedule(EIO); + tf_heap_delete(&f->heap_node, &sched->heap); + tf_schedule(TF_WAKEUP_KILL); + TF_BUG_ON(1); } void tf_kill(void *fiber) { } + +int tf_yield(void) +{ + struct tf_scheduler *sched = tf_get_scheduler(); + struct tf_fiber *f = sched->active_fiber; + + tf_list_add_tail(&f->queue_node, &sched->run_q); + return tf_schedule(TF_WAKEUP_IMMEDIATE); +} + +int tf_msleep(int milliseconds) +{ + tf_schedule_timeout(milliseconds); + return tf_schedule(TF_WAKEUP_TIMEOUT); +} + diff --git a/src/heap.c b/src/heap.c index 0d1a661..e93abe3 100644 --- a/src/heap.c +++ b/src/heap.c @@ -1,4 +1,4 @@ -/* heap.c - a linked heap implementation +/* heap.c - an array based d-ary heap implementation * * Copyright (C) 2009 Timo Teräs * All rights reserved. @@ -151,7 +151,6 @@ void tf_heap_delete(struct tf_heap_node *node, struct tf_heap_head *head) head->item[index] = head->item[head->num_items+TF_HEAP_ITEM0]; tf_heap_heapify(head, index); } - head->item[head->num_items+TF_HEAP_ITEM0].ptr = NULL; node->index = 0; } diff --git a/src/io-epoll.c b/src/io-epoll.c new file mode 100644 index 0000000..56d0743 --- /dev/null +++ b/src/io-epoll.c @@ -0,0 +1,107 @@ +/* io-epoll.c - epoll(7) based file descriptor monitoring + * + * Copyright (C) 2009 Timo Teräs + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#include +#include +#include +#include + +#include +#include + +#define TF_FD_AUTOCLOSE 1 +#define TF_FD_RESTORE_BLOCKING 2 +#define TF_FD_STREAM_ORIENTED 4 + +static int tf_fd_init(struct tf_fd *fd, int kfd, int flags) +{ + struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + struct epoll_event ev; + int r; + + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.data.ptr = fd; + r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, kfd, &ev); + if (r < 0) { + TF_BUG_ON(errno == EEXIST); + return -errno; + } + + fd->fd = kfd; + fd->flags = flags; + fd->waiting_fiber = NULL; + + return 0; +} + +static void tf_fd_wait(struct tf_fd *fd, int events) +{ + struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + + TF_BUG_ON(fd->waiting_fiber != NULL); + fd->events = events | EPOLLERR | EPOLLHUP; + fd->waiting_fiber = tf_get_fiber(); + pd->num_waiters++; +} + +static void tf_fd_release(struct tf_fd *fd) +{ + struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + + fd->waiting_fiber = NULL; + fd->events = 0; + pd->num_waiters--; +} + +void tf_poll_init(void) +{ + struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + + pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + pd->num_waiters = 0; + TF_BUG_ON(pd->epoll_fd < 0); +} + +int tf_poll(tf_mtime_diff_t timeout) +{ + struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + struct epoll_event events[64]; + struct tf_fd *fd; + int ret = (timeout == 0) ? TF_WAKEUP_TIMEOUT : TF_WAKEUP_FD; + int r, i; + + if (timeout == 0 && pd->num_waiters == 0) + return ret; + + do { + r = epoll_wait(pd->epoll_fd, events, array_size(events), timeout); + for (i = 0; i < r; i++) { + fd = (struct tf_fd *) events[i].data.ptr; + if (likely(fd->events & events[i].events)) + tf_wakeup(fd->waiting_fiber, TF_WAKEUP_FD); + } + if (timeout != 0) + ret = TF_WAKEUP_FD; + timeout = 0; + } while (unlikely(r == array_size(events))); + + return ret; +} + +void tf_poll_close(void) +{ + struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + + close(pd->epoll_fd); +} + +#include "io-unix.c" diff --git a/src/io-unix.c b/src/io-unix.c new file mode 100644 index 0000000..d333122 --- /dev/null +++ b/src/io-unix.c @@ -0,0 +1,131 @@ +/* io-unix.c - non-blocking io primitives for unix + * + * Copyright (C) 2009 Timo Teräs + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +int tf_open(struct tf_fd *fd, const char *pathname, int flags) +{ + int kfd, r; + + kfd = open(pathname, flags | O_CLOEXEC | O_NONBLOCK); + if (unlikely(kfd < 0)) + return -errno; + + r = tf_fd_init(fd, kfd, TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED); + if (r < 0) { + close(kfd); + return r; + } + return 0; +} + +int tf_open_fd(struct tf_fd *fd, int kfd) +{ + int mode, flags = 0; + + mode = fcntl(kfd, F_GETFL, 0); + if (!(mode & O_NONBLOCK)) { + fcntl(fd->fd, F_SETFL, mode | O_NONBLOCK); + flags |= TF_FD_RESTORE_BLOCKING; + } + + return tf_fd_init(fd, kfd, TF_FD_STREAM_ORIENTED | flags); +} + +int tf_close(struct tf_fd *fd) +{ + int r; + + if (fd->flags & TF_FD_RESTORE_BLOCKING) { + fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK); + } + if (fd->flags & TF_FD_AUTOCLOSE) { + r = close(fd->fd); + if (unlikely(r == -1)) + return -errno; + } + return 0; +} + +int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) +{ + ssize_t n; + int r, mode; + + mode = tf_schedule_timeout(timeout); + tf_fd_wait(fd, EPOLLIN); + do { + n = read(fd->fd, buf, count); + if (n == count) { + r = 0; + break; + } + if (n < 0) { + if (errno == EINTR) + continue; + if (errno != EAGAIN) { + r = errno; + break; + } + } else if (n == 0) { + r = EIO; + break; + } else { + buf += n; + count -= n; + if (!(fd->flags & TF_FD_STREAM_ORIENTED)) + continue; + } + + r = tf_schedule(mode); + if (r != TF_WAKEUP_FD) + break; + } while (1); + tf_fd_release(fd); + + return -r; +} + +int tf_write(struct tf_fd *fd, const void *buf, size_t count, + tf_mtime_diff_t timeout) +{ + ssize_t n; + int r, mode; + + mode = tf_schedule_timeout(timeout); + tf_fd_wait(fd, EPOLLOUT); + do { + n = write(fd->fd, buf, count); + if (n == count) { + r = 0; + break; + } + if (n < 0) { + if (errno == EINTR) + continue; + if (errno != EAGAIN) { + r = errno; + break; + } + } else { + buf += n; + count -= n; + if (!(fd->flags & TF_FD_STREAM_ORIENTED)) + continue; + } + + r = tf_schedule(mode); + if (r != TF_WAKEUP_FD) + break; + } while (1); + tf_fd_release(fd); + + return -r; +} -- cgit v1.2.3