summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimo Teras <timo.teras@iki.fi>2009-11-25 10:52:15 +0200
committerTimo Teras <timo.teras@iki.fi>2009-11-25 10:52:15 +0200
commitfc1044daf51f32b9d85f8497e4e0bd5a3c1e7fe9 (patch)
tree52e11c88f17c47c0d086761e50b266f5c5ccd061
parentcec85dedb7fd66cf2c23cafadd7c53eb7afed78f (diff)
downloadlibtf-fc1044daf51f32b9d85f8497e4e0bd5a3c1e7fe9.tar.bz2
libtf-fc1044daf51f32b9d85f8497e4e0bd5a3c1e7fe9.tar.xz
libtf: implement basic file i/o with epoll
some scetching of i/o api, and implement basic read and write functionality. integrate polling to scheduler and an epoll based polling mechanism.
-rw-r--r--include/libtf/defines.h5
-rw-r--r--include/libtf/fiber.h45
-rw-r--r--include/libtf/io.h70
-rw-r--r--include/libtf/tf.h3
-rw-r--r--src/TFbuild2
-rw-r--r--src/fiber.c85
-rw-r--r--src/heap.c3
-rw-r--r--src/io-epoll.c107
-rw-r--r--src/io-unix.c131
-rw-r--r--test/TFbuild2
-rw-r--r--test/read.c44
11 files changed, 453 insertions, 44 deletions
diff --git a/include/libtf/defines.h b/include/libtf/defines.h
index b1d2aa9..144ad63 100644
--- a/include/libtf/defines.h
+++ b/include/libtf/defines.h
@@ -53,6 +53,9 @@
#define attribute_never_inline __attribute__((noinline))
#define attribute_weak_function __attribute__((weak))
+#define attribute_noreturn __attribute__((noreturn))
+#define attribute_warn_unused_result __attribute__((warn_unused_result))
+#define attribute_deprecated __attribute__((deprecated))
#define TF_BUG_ON(cond) if (unlikely(cond)) { \
fprintf(stderr, "BUG: failure at %s:%d/%s(): %s!\n", \
@@ -69,6 +72,8 @@
#endif
/* Monotonic time */
+#define TF_INFINITE -1
+
typedef uint32_t tf_mtime_t;
typedef int32_t tf_mtime_diff_t;
diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h
index 09d5ef1..91c0b3b 100644
--- a/include/libtf/fiber.h
+++ b/include/libtf/fiber.h
@@ -1,4 +1,4 @@
-/* tf.h - libtf main include
+/* fiber.h - libtf fiber scheduler header
*
* Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
* All rights reserved.
@@ -19,16 +19,31 @@
#include <libtf/list.h>
#include <libtf/heap.h>
+#define TF_UCTX_H "uctx-setjmp.h"
+
+/* Fiber wakeup reasons */
+#define TF_WAKEUP_NONE 0
+#define TF_WAKEUP_IMMEDIATE EAGAIN
+#define TF_WAKEUP_KILL EINTR
+#define TF_WAKEUP_TIMEOUT ETIMEDOUT
+#define TF_WAKEUP_FD EIO
+
/* Scheduler */
+struct tf_fiber;
+
+struct tf_poll_data {
+ int epoll_fd;
+ int num_waiters;
+};
+
struct tf_scheduler {
struct tf_list_head run_q;
- struct tf_list_head sleep_q;
struct tf_heap_head heap;
-
struct tf_fiber * active_fiber;
int num_fibers;
-
tf_mtime_t scheduler_time;
+ struct tf_poll_data poll_data;
+
};
static inline
@@ -39,6 +54,12 @@ struct tf_scheduler *tf_get_scheduler(void)
}
static inline
+struct tf_fiber *tf_get_fiber(void)
+{
+ return tf_get_scheduler()->active_fiber;
+}
+
+static inline
tf_mtime_t tf_mtime(void)
{
return tf_get_scheduler()->scheduler_time;
@@ -53,18 +74,14 @@ void *tf_fiber_get(void *data);
void tf_fiber_put(void *data);
/* Scheduling and fiber management */
-int tf_schedule(int err);
-int tf_msleep(int milliseconds);
+void tf_exit(void) attribute_noreturn;
void tf_kill(void *fiber);
-static inline int tf_yield(void)
-{
- return tf_schedule(EAGAIN);
-}
+int tf_schedule(int wakeup_type);
+int tf_schedule_timeout(int milliseconds);
+void tf_wakeup(struct tf_fiber *fiber, int wakeup_type);
-static inline int tf_exit(void)
-{
- return tf_schedule(EFAULT);
-}
+int tf_yield(void);
+int tf_msleep(int milliseconds);
#endif
diff --git a/include/libtf/io.h b/include/libtf/io.h
new file mode 100644
index 0000000..87a6c90
--- /dev/null
+++ b/include/libtf/io.h
@@ -0,0 +1,70 @@
+/* tf.h - libtf io header
+ *
+ * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#ifndef TF_IO_H
+#define TF_IO_H
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/in.h>
+
+#include <libtf/defines.h>
+
+struct tf_fiber;
+
+struct tf_sockaddr {
+ union {
+ struct sockaddr addr;
+ struct sockaddr_in in;
+ struct sockaddr_in6 in6;
+ };
+};
+
+struct tf_fd {
+ int fd;
+ unsigned int flags;
+ /* Single waiter -- would be relatively trivial to modify to allow
+ * multiple waiters, if someone actually needs it */
+ unsigned int events;
+ struct tf_fiber *waiting_fiber;
+};
+
+void tf_poll_init(void);
+int tf_poll(tf_mtime_diff_t timeout);
+void tf_poll_close(void);
+
+int tf_open(struct tf_fd *fd, const char *pathname, int flags);
+int tf_open_fd(struct tf_fd *fd, int kfd);
+int tf_close(struct tf_fd *fd);
+int tf_read(struct tf_fd *fd, void *buf, size_t count, int timeout);
+int tf_write(struct tf_fd *fd, const void *buf, size_t count, int timeout);
+
+int tf_socket(struct tf_fd *fd, int domain, int type, int protocol);
+int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr);
+int tf_listen(struct tf_fd *fd, int backlog);
+int tf_accept(struct tf_fd *fd);
+int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *addr, int timeout);
+
+ssize_t tf_recv(struct tf_fd *fd, void *buf, size_t count, int timeout);
+ssize_t tf_send(struct tf_fd *fd, const void *buf, size_t count, int timeout);
+ssize_t tf_recvmsg(struct tf_fd *fd,
+ struct tf_sockaddr *from, struct tf_sockaddr *to,
+ void *buf, size_t count, int timeout);
+ssize_t tf_sendmsg(struct tf_fd *fd,
+ struct tf_sockaddr *from, const struct tf_sockaddr *to,
+ const void *buf, size_t count, int timeout);
+
+int tf_query_dns(const char *name, int num_res, struct tf_sockaddr *res,
+ int timeout);
+
+#endif
diff --git a/include/libtf/tf.h b/include/libtf/tf.h
index 7ff7b25..7a089ff 100644
--- a/include/libtf/tf.h
+++ b/include/libtf/tf.h
@@ -13,8 +13,7 @@
#ifndef TF_H
#define TF_H
-#define TF_UCTX_H "uctx-setjmp.h"
-
#include <libtf/fiber.h>
+#include <libtf/io.h>
#endif
diff --git a/src/TFbuild b/src/TFbuild
index accae6d..9b40443 100644
--- a/src/TFbuild
+++ b/src/TFbuild
@@ -1,5 +1,5 @@
libs-y += libtf
-libtf-objs-y += fiber.o heap.o
+libtf-objs-y += fiber.o heap.o io-epoll.o
CFLAGS_heap.c += -funroll-all-loops
diff --git a/src/fiber.c b/src/fiber.c
index 72da440..15c533a 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -13,11 +13,12 @@
#include <time.h>
#include <errno.h>
#include <unistd.h>
-#include <libtf/tf.h>
-#include <libtf/heap.h>
+#include <libtf/fiber.h>
+#include <libtf/io.h>
struct tf_fiber {
unsigned int ref_count;
+ int wakeup_type;
struct tf_list_node queue_node;
struct tf_heap_node heap_node;
char data[TF_EMPTY_ARRAY];
@@ -85,16 +86,13 @@ static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f)
struct tf_fiber *schedf = container_of((void*) tf_get_scheduler(), struct tf_fiber, data);
sched->active_fiber = f;
- switch (tf_uctx_transfer(schedf, f, 1)) {
- case EFAULT: /* Fiber is dead */
+ switch (tf_uctx_transfer(schedf, f, f->wakeup_type)) {
+ case TF_WAKEUP_KILL:
tf_fiber_put(f->data);
sched->num_fibers--;
break;
- case EAGAIN: /* Yielded, reshedule */
- tf_list_add_tail(&f->queue_node, &sched->run_q);
- break;
- case EIO: /* Blocked, in sleep */
- tf_list_add_tail(&f->queue_node, &sched->sleep_q);
+ case TF_WAKEUP_IMMEDIATE:
+ case TF_WAKEUP_TIMEOUT:
break;
default:
TF_BUG_ON("bad scheduler call from fiber");
@@ -108,7 +106,7 @@ static void process_heap(struct tf_scheduler *sched)
tf_mtime_t now = tf_mtime();
while (!tf_heap_empty(&sched->heap) &&
- tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) > 0) {
+ tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) {
node = tf_heap_get_node(&sched->heap);
f = container_of(node, struct tf_fiber, heap_node);
run_fiber(sched, f);
@@ -135,9 +133,9 @@ int tf_main(tf_fiber_proc main_fiber)
ctx->stack_guard = &stack_guard;
*sched = (struct tf_scheduler){
.run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q),
- .sleep_q = TF_LIST_HEAD_INITIALIZER(sched->sleep_q),
};
__tf_scheduler = sched;
+ tf_poll_init();
update_time(sched);
tf_fiber_put(tf_fiber_create(main_fiber, 0));
do {
@@ -148,47 +146,86 @@ int tf_main(tf_fiber_proc main_fiber)
timeout = 0;
} else if (!tf_heap_empty(&sched->heap)) {
timeout = tf_mtime_diff(tf_heap_get_value(&sched->heap),
- sched->scheduler_time);
+ tf_mtime());
if (timeout < 0)
timeout = 0;
} else
timeout = -1;
- if (timeout > 0)
- usleep(timeout * 1000);
-
- process_heap(sched);
+ if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT) {
+ sched->scheduler_time += timeout;
+ process_heap(sched);
+ }
process_runq(sched);
} while (likely(sched->num_fibers));
+ tf_poll_close();
__tf_scheduler = NULL;
return 0;
}
-int tf_schedule(int err)
+int tf_schedule(int wakeup)
{
struct tf_scheduler *sched = tf_get_scheduler();
struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data);
struct tf_fiber *f = sched->active_fiber;
- int r;
- r = tf_uctx_transfer(f, schedf, err);
- if (r == 1)
- return 0;
+ if (wakeup != TF_WAKEUP_TIMEOUT)
+ tf_heap_delete(&f->heap_node, &sched->heap);
+ f->wakeup_type = TF_WAKEUP_NONE;
- return r;
+ return tf_uctx_transfer(f, schedf, wakeup);
}
-int tf_msleep(int milliseconds)
+int tf_schedule_timeout(int milliseconds)
{
struct tf_scheduler *sched = tf_get_scheduler();
struct tf_fiber *f = sched->active_fiber;
+ if (milliseconds <= 0) {
+ tf_heap_delete(&f->heap_node, &sched->heap);
+ return TF_WAKEUP_IMMEDIATE;
+ }
tf_heap_change(&f->heap_node, &sched->heap, tf_mtime() + milliseconds);
+ return TF_WAKEUP_TIMEOUT;
+}
+
+void tf_wakeup(struct tf_fiber *fiber, int wakeup_type)
+{
+ struct tf_scheduler *sched = tf_get_scheduler();
+
+ if (fiber->wakeup_type == TF_WAKEUP_NONE) {
+ fiber->wakeup_type = wakeup_type;
+ tf_list_add_tail(&fiber->queue_node, &sched->run_q);
+ }
+}
+
+void tf_exit(void)
+{
+ struct tf_scheduler *sched = tf_get_scheduler();
+ struct tf_fiber *f = sched->active_fiber;
- return tf_schedule(EIO);
+ tf_heap_delete(&f->heap_node, &sched->heap);
+ tf_schedule(TF_WAKEUP_KILL);
+ TF_BUG_ON(1);
}
void tf_kill(void *fiber)
{
}
+
+int tf_yield(void)
+{
+ struct tf_scheduler *sched = tf_get_scheduler();
+ struct tf_fiber *f = sched->active_fiber;
+
+ tf_list_add_tail(&f->queue_node, &sched->run_q);
+ return tf_schedule(TF_WAKEUP_IMMEDIATE);
+}
+
+int tf_msleep(int milliseconds)
+{
+ tf_schedule_timeout(milliseconds);
+ return tf_schedule(TF_WAKEUP_TIMEOUT);
+}
+
diff --git a/src/heap.c b/src/heap.c
index 0d1a661..e93abe3 100644
--- a/src/heap.c
+++ b/src/heap.c
@@ -1,4 +1,4 @@
-/* heap.c - a linked heap implementation
+/* heap.c - an array based d-ary heap implementation
*
* Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
* All rights reserved.
@@ -151,7 +151,6 @@ void tf_heap_delete(struct tf_heap_node *node, struct tf_heap_head *head)
head->item[index] = head->item[head->num_items+TF_HEAP_ITEM0];
tf_heap_heapify(head, index);
}
- head->item[head->num_items+TF_HEAP_ITEM0].ptr = NULL;
node->index = 0;
}
diff --git a/src/io-epoll.c b/src/io-epoll.c
new file mode 100644
index 0000000..56d0743
--- /dev/null
+++ b/src/io-epoll.c
@@ -0,0 +1,107 @@
+/* io-epoll.c - epoll(7) based file descriptor monitoring
+ *
+ * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/epoll.h>
+
+#include <libtf/io.h>
+#include <libtf/fiber.h>
+
+#define TF_FD_AUTOCLOSE 1
+#define TF_FD_RESTORE_BLOCKING 2
+#define TF_FD_STREAM_ORIENTED 4
+
+static int tf_fd_init(struct tf_fd *fd, int kfd, int flags)
+{
+ struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+ struct epoll_event ev;
+ int r;
+
+ ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+ ev.data.ptr = fd;
+ r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, kfd, &ev);
+ if (r < 0) {
+ TF_BUG_ON(errno == EEXIST);
+ return -errno;
+ }
+
+ fd->fd = kfd;
+ fd->flags = flags;
+ fd->waiting_fiber = NULL;
+
+ return 0;
+}
+
+static void tf_fd_wait(struct tf_fd *fd, int events)
+{
+ struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+
+ TF_BUG_ON(fd->waiting_fiber != NULL);
+ fd->events = events | EPOLLERR | EPOLLHUP;
+ fd->waiting_fiber = tf_get_fiber();
+ pd->num_waiters++;
+}
+
+static void tf_fd_release(struct tf_fd *fd)
+{
+ struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+
+ fd->waiting_fiber = NULL;
+ fd->events = 0;
+ pd->num_waiters--;
+}
+
+void tf_poll_init(void)
+{
+ struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+
+ pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ pd->num_waiters = 0;
+ TF_BUG_ON(pd->epoll_fd < 0);
+}
+
+int tf_poll(tf_mtime_diff_t timeout)
+{
+ struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+ struct epoll_event events[64];
+ struct tf_fd *fd;
+ int ret = (timeout == 0) ? TF_WAKEUP_TIMEOUT : TF_WAKEUP_FD;
+ int r, i;
+
+ if (timeout == 0 && pd->num_waiters == 0)
+ return ret;
+
+ do {
+ r = epoll_wait(pd->epoll_fd, events, array_size(events), timeout);
+ for (i = 0; i < r; i++) {
+ fd = (struct tf_fd *) events[i].data.ptr;
+ if (likely(fd->events & events[i].events))
+ tf_wakeup(fd->waiting_fiber, TF_WAKEUP_FD);
+ }
+ if (timeout != 0)
+ ret = TF_WAKEUP_FD;
+ timeout = 0;
+ } while (unlikely(r == array_size(events)));
+
+ return ret;
+}
+
+void tf_poll_close(void)
+{
+ struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+
+ close(pd->epoll_fd);
+}
+
+#include "io-unix.c"
diff --git a/src/io-unix.c b/src/io-unix.c
new file mode 100644
index 0000000..d333122
--- /dev/null
+++ b/src/io-unix.c
@@ -0,0 +1,131 @@
+/* io-unix.c - non-blocking io primitives for unix
+ *
+ * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+int tf_open(struct tf_fd *fd, const char *pathname, int flags)
+{
+ int kfd, r;
+
+ kfd = open(pathname, flags | O_CLOEXEC | O_NONBLOCK);
+ if (unlikely(kfd < 0))
+ return -errno;
+
+ r = tf_fd_init(fd, kfd, TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED);
+ if (r < 0) {
+ close(kfd);
+ return r;
+ }
+ return 0;
+}
+
+int tf_open_fd(struct tf_fd *fd, int kfd)
+{
+ int mode, flags = 0;
+
+ mode = fcntl(kfd, F_GETFL, 0);
+ if (!(mode & O_NONBLOCK)) {
+ fcntl(fd->fd, F_SETFL, mode | O_NONBLOCK);
+ flags |= TF_FD_RESTORE_BLOCKING;
+ }
+
+ return tf_fd_init(fd, kfd, TF_FD_STREAM_ORIENTED | flags);
+}
+
+int tf_close(struct tf_fd *fd)
+{
+ int r;
+
+ if (fd->flags & TF_FD_RESTORE_BLOCKING) {
+ fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK);
+ }
+ if (fd->flags & TF_FD_AUTOCLOSE) {
+ r = close(fd->fd);
+ if (unlikely(r == -1))
+ return -errno;
+ }
+ return 0;
+}
+
+int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout)
+{
+ ssize_t n;
+ int r, mode;
+
+ mode = tf_schedule_timeout(timeout);
+ tf_fd_wait(fd, EPOLLIN);
+ do {
+ n = read(fd->fd, buf, count);
+ if (n == count) {
+ r = 0;
+ break;
+ }
+ if (n < 0) {
+ if (errno == EINTR)
+ continue;
+ if (errno != EAGAIN) {
+ r = errno;
+ break;
+ }
+ } else if (n == 0) {
+ r = EIO;
+ break;
+ } else {
+ buf += n;
+ count -= n;
+ if (!(fd->flags & TF_FD_STREAM_ORIENTED))
+ continue;
+ }
+
+ r = tf_schedule(mode);
+ if (r != TF_WAKEUP_FD)
+ break;
+ } while (1);
+ tf_fd_release(fd);
+
+ return -r;
+}
+
+int tf_write(struct tf_fd *fd, const void *buf, size_t count,
+ tf_mtime_diff_t timeout)
+{
+ ssize_t n;
+ int r, mode;
+
+ mode = tf_schedule_timeout(timeout);
+ tf_fd_wait(fd, EPOLLOUT);
+ do {
+ n = write(fd->fd, buf, count);
+ if (n == count) {
+ r = 0;
+ break;
+ }
+ if (n < 0) {
+ if (errno == EINTR)
+ continue;
+ if (errno != EAGAIN) {
+ r = errno;
+ break;
+ }
+ } else {
+ buf += n;
+ count -= n;
+ if (!(fd->flags & TF_FD_STREAM_ORIENTED))
+ continue;
+ }
+
+ r = tf_schedule(mode);
+ if (r != TF_WAKEUP_FD)
+ break;
+ } while (1);
+ tf_fd_release(fd);
+
+ return -r;
+}
diff --git a/test/TFbuild b/test/TFbuild
index d2648ed..430e132 100644
--- a/test/TFbuild
+++ b/test/TFbuild
@@ -1,3 +1,3 @@
-progs-$(TEST) += simple1 sleep
+progs-$(TEST) += simple1 sleep read
LIBS += $(objtree)src/libtf.a
diff --git a/test/read.c b/test/read.c
new file mode 100644
index 0000000..1921609
--- /dev/null
+++ b/test/read.c
@@ -0,0 +1,44 @@
+/* Read from stdin and have an active fiber in the background.
+ * Stdin needs to be redirected to FIFO or similar; mixing
+ * console and non-blocking I/O is not a good idea.
+ */
+
+#include <libtf/tf.h>
+#include <stdio.h>
+#include <unistd.h>
+
+static void time_fiber(void *ptr)
+{
+ while (1) {
+ printf("Tick\n");
+ tf_msleep(1000);
+ printf("Tack\n");
+ tf_msleep(1000);
+ }
+}
+
+static void io_fiber(void *ptr)
+{
+ char data[8];
+ struct tf_fd fin;
+
+ tf_open_fd(&fin, STDIN_FILENO);
+ while (1) {
+ if (tf_read(&fin, data, sizeof(data), TF_INFINITE) < 0)
+ break;
+ printf("Read: %8.8s\n", data);
+ }
+ printf("Exiting io fiber\n");
+ tf_close(&fin);
+}
+
+static void init_fiber(void *ptr)
+{
+ tf_fiber_put(tf_fiber_create(time_fiber, 0));
+ tf_fiber_put(tf_fiber_create(io_fiber, 0));
+}
+
+int main(int argc, char **argv)
+{
+ return tf_main(init_fiber);
+}