From aa530f352b0410150bfe94c821ae32c1378b9d02 Mon Sep 17 00:00:00 2001 From: Timo Teras Date: Thu, 26 Nov 2009 09:35:49 +0200 Subject: libtf: stackable timeouts instead of having per-function argument, use a push/pop mechanism: - multiple timers inside fiber use only one heap entry - easy to chain multiple possibly blocking operations inside one timeout block --- include/libtf/fiber.h | 55 +++++++++++++++++++++++++++++--------------- include/libtf/heap.h | 6 +++++ include/libtf/io.h | 19 ++++++++-------- src/fiber.c | 63 ++++++++++++++++++++++++++++++++++++++++++++------- src/io-unix.c | 45 +++++++++++++----------------------- test/httpget.c | 13 +++++++---- test/read.c | 2 +- 7 files changed, 132 insertions(+), 71 deletions(-) diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h index c3d87c6..ce3745b 100644 --- a/include/libtf/fiber.h +++ b/include/libtf/fiber.h @@ -19,27 +19,22 @@ #include #include -#define TF_UCTX_H "uctx-setjmp.h" - /* Fiber wakeup reasons */ #define TF_WAKEUP_NONE 0 #define TF_WAKEUP_IMMEDIATE -EAGAIN #define TF_WAKEUP_KILL -EINTR -#define TF_WAKEUP_TIMEOUT -ETIMEDOUT +#define TF_WAKEUP_TIMEOUT -ETIME +#define TF_WAKEUP_THIS_TIMEOUT -ETIMEDOUT #define TF_WAKEUP_FD -EIO -/* Special timeouts for tf_schedule() */ -#define TF_NO_TIMEOUT -1 -#define TF_NO_TIMEOUT_CHANGE -2 +struct tf_poll_data { + int epoll_fd; + int num_waiters; +}; /* Scheduler */ struct tf_fiber; -struct tf_poll_data { - int epoll_fd; - int num_waiters; -}; - struct tf_scheduler { struct tf_list_head run_q; struct tf_heap_head heap; @@ -47,7 +42,6 @@ struct tf_scheduler { int num_fibers; tf_mtime_t scheduler_time; struct tf_poll_data poll_data; - }; struct tf_main_ctx { @@ -55,6 +49,11 @@ struct tf_main_ctx { char ** argv; }; +struct tf_timeout { + tf_mtime_t saved_timeout; + unsigned int timeout_change; +}; + static inline struct tf_scheduler *tf_get_scheduler(void) { @@ -86,18 +85,38 @@ void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size); void *tf_fiber_get(void *data); void tf_fiber_put(void *data); -/* Scheduling and fiber management */ -void tf_exit(void) attribute_noreturn; -void tf_kill(void *fiber); +#define tf_timed(func, timeout) \ + ({ \ + struct tf_timeout __timeout; \ + tf_timeout_push(&__timeout, timeout); \ + tf_timeout_pop(&__timeout, (func)); \ + }) + +//* Scheduling and fiber management */ +void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds); +int __tf_timeout_pop(struct tf_timeout *timeout, int err); + +static inline int tf_timeout_pop(struct tf_timeout *timeout, int err) +{ + if (unlikely(timeout->timeout_change)) + return __tf_timeout_pop(timeout, err); + return err; +} -int tf_schedule(int milliseconds); +int tf_schedule(void); void tf_wakeup(struct tf_fiber *fiber, int wakeup_type); +void tf_exit(void) attribute_noreturn; +void tf_kill(void *fiber); int tf_yield(void); static inline -int tf_msleep(int milliseconds) +int tf_msleep(tf_mtime_diff_t milliseconds) { - return tf_schedule(milliseconds); + int r; + r = tf_timed(tf_schedule(), milliseconds); + if (r == TF_WAKEUP_THIS_TIMEOUT) + r = 0; + return r; } #endif diff --git a/include/libtf/heap.h b/include/libtf/heap.h index 24f4767..a68e01d 100644 --- a/include/libtf/heap.h +++ b/include/libtf/heap.h @@ -60,6 +60,12 @@ int tf_heap_empty(struct tf_heap_head *head) return head->num_items == 0; } +static inline +int tf_heap_node_active(struct tf_heap_node *node) +{ + return node->index != 0; +} + static inline int tf_heap_prealloc(struct tf_heap_head *head, uint32_t size) { diff --git a/include/libtf/io.h b/include/libtf/io.h index 38dd541..1f0b793 100644 --- a/include/libtf/io.h +++ b/include/libtf/io.h @@ -52,27 +52,26 @@ void tf_poll_close(void); int tf_open_fd(struct tf_fd *fd, int kfd, int flags); int tf_open(struct tf_fd *fd, const char *pathname, int flags); int tf_close(struct tf_fd *fd); -ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout); -ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout); -int tf_read_fully(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout); -int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout); +ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count); +ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count); +int tf_read_fully(struct tf_fd *fd, void *buf, size_t count); +int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count); int tf_socket(struct tf_fd *fd, int domain, int type, int protocol); int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr); int tf_listen(struct tf_fd *fd, int backlog); int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, - struct tf_sockaddr *from, tf_mtime_diff_t timeout); -int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *addr, tf_mtime_diff_t timeout); + struct tf_sockaddr *from); +int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *addr); ssize_t tf_recvmsg(struct tf_fd *fd, struct tf_sockaddr *from, struct tf_sockaddr *to, - void *buf, size_t count, tf_mtime_diff_t timeout); + void *buf, size_t count); ssize_t tf_sendmsg(struct tf_fd *fd, struct tf_sockaddr *from, const struct tf_sockaddr *to, - const void *buf, size_t count, tf_mtime_diff_t timeout); + const void *buf, size_t count); -int tf_query_dns(const char *name, int num_res, struct tf_sockaddr *res, - tf_mtime_diff_t timeout); +int tf_query_dns(const char *name, int num_res, struct tf_sockaddr *res); #endif diff --git a/src/fiber.c b/src/fiber.c index b678842..3f8bb15 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -16,15 +16,20 @@ #include #include +#define TF_TIMEOUT_CHANGE_NEEDED 1 +#define TF_TIMEOUT_CHANGE_NEW_VALUE 2 + struct tf_fiber { unsigned int ref_count; int wakeup_type; + unsigned int timeout_change; + tf_mtime_t timeout; struct tf_list_node queue_node; struct tf_heap_node heap_node; char data[TF_EMPTY_ARRAY]; }; -#include TF_UCTX_H +#include "uctx-setjmp.h" /* FIXME: should be in thread local storage */ struct tf_scheduler *__tf_scheduler; @@ -108,6 +113,8 @@ static void process_heap(struct tf_scheduler *sched) tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) { node = tf_heap_get_node(&sched->heap); f = container_of(node, struct tf_fiber, heap_node); + if (f->wakeup_type == TF_WAKEUP_NONE) + f->wakeup_type = TF_WAKEUP_TIMEOUT; run_fiber(sched, f); } } @@ -170,18 +177,58 @@ int tf_main_args(tf_fiber_proc main_fiber, int argc, char **argv) return 0; } -int tf_schedule(tf_mtime_diff_t milliseconds) +void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds) +{ + struct tf_fiber *f = tf_get_fiber(); + tf_mtime_t abs = tf_mtime() + milliseconds; + int active; + + if (f->timeout_change) + active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE); + else + active = tf_heap_node_active(&f->heap_node); + + if (!active || tf_mtime_diff(abs, f->timeout) < 0) { + /* Save previous timeout */ + timeout->saved_timeout = f->timeout; + timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; + if (active) + timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE; + + /* Make new timeout pending */ + f->timeout = abs; + f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED + | TF_TIMEOUT_CHANGE_NEW_VALUE; + } else { + timeout->timeout_change = 0; + } +} + +int __tf_timeout_pop(struct tf_timeout *timeout, int err) +{ + struct tf_fiber *f = tf_get_fiber(); + + f->timeout = timeout->saved_timeout; + f->timeout_change = timeout->timeout_change; + if (err == TF_WAKEUP_TIMEOUT) + err = TF_WAKEUP_THIS_TIMEOUT; + return err; +} + +int tf_schedule(void) { struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); struct tf_fiber *f = sched->active_fiber; + if (unlikely(f->timeout_change)) { + if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) + tf_heap_change(&f->heap_node, &sched->heap, f->timeout); + else + tf_heap_delete(&f->heap_node, &sched->heap); + f->timeout_change = 0; + } f->wakeup_type = TF_WAKEUP_NONE; - if (milliseconds == TF_NO_TIMEOUT) - tf_heap_delete(&f->heap_node, &sched->heap); - else if (milliseconds != TF_NO_TIMEOUT_CHANGE) - tf_heap_change(&f->heap_node, &sched->heap, - tf_mtime() + milliseconds); return tf_uctx_transfer(f, schedf, TF_WAKEUP_IMMEDIATE); } @@ -217,6 +264,6 @@ int tf_yield(void) struct tf_fiber *f = sched->active_fiber; tf_list_add_tail(&f->queue_node, &sched->run_q); - return tf_schedule(TF_NO_TIMEOUT); + return tf_schedule(); } diff --git a/src/io-unix.c b/src/io-unix.c index 8ed8e42..ea65a76 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -90,8 +90,7 @@ int tf_close(struct tf_fd *fd) return 0; } -int tf_read_fully(struct tf_fd *fd, void *buf, size_t count, - tf_mtime_diff_t timeout) +int tf_read_fully(struct tf_fd *fd, void *buf, size_t count) { ssize_t n; int r; @@ -120,16 +119,14 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count, continue; } - r = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + r = tf_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); return -r; } -int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count, - tf_mtime_diff_t timeout) +int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count) { ssize_t n; int r; @@ -155,15 +152,14 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count, continue; } - r = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + r = tf_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); return -r; } -ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) +ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count) { ssize_t n; @@ -178,16 +174,14 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeo n = -errno; break; } - n = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + n = tf_schedule(); } while (n == TF_WAKEUP_FD); tf_fd_unmonitor(fd); return n; } -ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count, - tf_mtime_diff_t timeout) +ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count) { ssize_t n; @@ -202,8 +196,7 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count, n = -errno; break; } - n = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + n = tf_schedule(); } while (n == TF_WAKEUP_FD); tf_fd_unmonitor(fd); @@ -250,7 +243,7 @@ int tf_listen(struct tf_fd *fd, int backlog) } int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, - struct tf_sockaddr *from, tf_mtime_diff_t timeout) + struct tf_sockaddr *from) { int r, tfdf; struct sockaddr *addr = NULL; @@ -277,8 +270,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, tf_fd_unmonitor(listen_fd); return -errno; } - r = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + r = tf_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(listen_fd); if (r < 0) @@ -287,8 +279,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, return tf_open_fd(child_fd, r, tfdf); } -int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to, - tf_mtime_diff_t timeout) +int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to) { socklen_t l = sizeof(int); int r, err; @@ -302,7 +293,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to, /* Wait for socket to become readable */ tf_fd_monitor(fd, EPOLLOUT); - r = tf_schedule(timeout); + r = tf_schedule(); tf_fd_unmonitor(fd); if (r != TF_WAKEUP_FD) return r; @@ -316,8 +307,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to, ssize_t tf_recvmsg(struct tf_fd *fd, struct tf_sockaddr *from, struct tf_sockaddr *to, - void *buf, size_t len, - tf_mtime_diff_t timeout) + void *buf, size_t len) { struct iovec iov; struct msghdr msg; @@ -347,8 +337,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd, r = -errno; break; } - r = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + r = tf_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); @@ -372,8 +361,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd, ssize_t tf_sendmsg(struct tf_fd *fd, struct tf_sockaddr *from, const struct tf_sockaddr *to, - const void *buf, size_t len, - tf_mtime_diff_t timeout) + const void *buf, size_t len) { struct msghdr msg; struct iovec iov; @@ -412,8 +400,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd, r = -errno; break; } - r = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + r = tf_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); diff --git a/test/httpget.c b/test/httpget.c index c1e37a3..fed6c06 100644 --- a/test/httpget.c +++ b/test/httpget.c @@ -15,7 +15,7 @@ static void ping_fiber(void *ptr) struct tf_sockaddr host; struct tf_fd fd; char buf[128]; - int bytes = 0, r = 0; + int bytes = 0, r; const char *req = "GET / HTTP/1.0\r\n\r\n"; printf("Lookup %s\n", ctx->hostname); @@ -23,16 +23,19 @@ static void ping_fiber(void *ptr) host.u.in.sin_addr.s_addr = inet_addr(ctx->hostname); host.u.in.sin_port = htons(80); - if (tf_socket(&fd, AF_INET, SOCK_STREAM, 0) < 0) + r = tf_socket(&fd, AF_INET, SOCK_STREAM, 0); + if (r < 0) goto err; - if ((r = tf_connect(&fd, &host, 10000)) < 0) + r = tf_timed(tf_connect(&fd, &host), 10000); + if (r < 0) goto err_close; - if ((r = tf_write_fully(&fd, req, strlen(req), 10000)) < 0) + r = tf_write_fully(&fd, req, strlen(req)); + if (r < 0) goto err_close; - while ((r = tf_read(&fd, buf, sizeof(buf), 10000)) > 0) + while ((r = tf_read(&fd, buf, sizeof(buf))) > 0) bytes += r; err_close: tf_close(&fd); diff --git a/test/read.c b/test/read.c index 97f25fd..6d8306b 100644 --- a/test/read.c +++ b/test/read.c @@ -24,7 +24,7 @@ static void io_fiber(void *ptr) tf_open_fd(&fin, STDIN_FILENO, TF_FD_STREAM_ORIENTED); while (1) { - if (tf_read(&fin, data, sizeof(data), TF_NO_TIMEOUT) < 0) + if (tf_read_fully(&fin, data, sizeof(data)) < 0) break; printf("Read: %8.8s\n", data); } -- cgit v1.2.3