diff options
author | Timo Teras <timo.teras@iki.fi> | 2009-11-26 09:35:49 +0200 |
---|---|---|
committer | Timo Teras <timo.teras@iki.fi> | 2009-11-26 09:37:24 +0200 |
commit | aa530f352b0410150bfe94c821ae32c1378b9d02 (patch) | |
tree | fb27f277db0c7feaaf12ce43169d3b0b44e95c0f /src | |
parent | 4db830052d941d9c6de281bc9a2f6ac212c59ad8 (diff) | |
download | libtf-aa530f352b0410150bfe94c821ae32c1378b9d02.tar.bz2 libtf-aa530f352b0410150bfe94c821ae32c1378b9d02.tar.xz |
libtf: stackable timeouts
instead of having per-function argument, use a push/pop mechanism:
- multiple timers inside fiber use only one heap entry
- easy to chain multiple possibly blocking operations inside one
timeout block
Diffstat (limited to 'src')
-rw-r--r-- | src/fiber.c | 63 | ||||
-rw-r--r-- | src/io-unix.c | 45 |
2 files changed, 71 insertions, 37 deletions
diff --git a/src/fiber.c b/src/fiber.c index b678842..3f8bb15 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -16,15 +16,20 @@ #include <libtf/fiber.h> #include <libtf/io.h> +#define TF_TIMEOUT_CHANGE_NEEDED 1 +#define TF_TIMEOUT_CHANGE_NEW_VALUE 2 + struct tf_fiber { unsigned int ref_count; int wakeup_type; + unsigned int timeout_change; + tf_mtime_t timeout; struct tf_list_node queue_node; struct tf_heap_node heap_node; char data[TF_EMPTY_ARRAY]; }; -#include TF_UCTX_H +#include "uctx-setjmp.h" /* FIXME: should be in thread local storage */ struct tf_scheduler *__tf_scheduler; @@ -108,6 +113,8 @@ static void process_heap(struct tf_scheduler *sched) tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) { node = tf_heap_get_node(&sched->heap); f = container_of(node, struct tf_fiber, heap_node); + if (f->wakeup_type == TF_WAKEUP_NONE) + f->wakeup_type = TF_WAKEUP_TIMEOUT; run_fiber(sched, f); } } @@ -170,18 +177,58 @@ int tf_main_args(tf_fiber_proc main_fiber, int argc, char **argv) return 0; } -int tf_schedule(tf_mtime_diff_t milliseconds) +void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds) +{ + struct tf_fiber *f = tf_get_fiber(); + tf_mtime_t abs = tf_mtime() + milliseconds; + int active; + + if (f->timeout_change) + active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE); + else + active = tf_heap_node_active(&f->heap_node); + + if (!active || tf_mtime_diff(abs, f->timeout) < 0) { + /* Save previous timeout */ + timeout->saved_timeout = f->timeout; + timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; + if (active) + timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE; + + /* Make new timeout pending */ + f->timeout = abs; + f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED + | TF_TIMEOUT_CHANGE_NEW_VALUE; + } else { + timeout->timeout_change = 0; + } +} + +int __tf_timeout_pop(struct tf_timeout *timeout, int err) +{ + struct tf_fiber *f = tf_get_fiber(); + + f->timeout = timeout->saved_timeout; + f->timeout_change = timeout->timeout_change; + if (err == TF_WAKEUP_TIMEOUT) + err = TF_WAKEUP_THIS_TIMEOUT; + return err; +} + +int tf_schedule(void) { struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); struct tf_fiber *f = sched->active_fiber; + if (unlikely(f->timeout_change)) { + if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) + tf_heap_change(&f->heap_node, &sched->heap, f->timeout); + else + tf_heap_delete(&f->heap_node, &sched->heap); + f->timeout_change = 0; + } f->wakeup_type = TF_WAKEUP_NONE; - if (milliseconds == TF_NO_TIMEOUT) - tf_heap_delete(&f->heap_node, &sched->heap); - else if (milliseconds != TF_NO_TIMEOUT_CHANGE) - tf_heap_change(&f->heap_node, &sched->heap, - tf_mtime() + milliseconds); return tf_uctx_transfer(f, schedf, TF_WAKEUP_IMMEDIATE); } @@ -217,6 +264,6 @@ int tf_yield(void) struct tf_fiber *f = sched->active_fiber; tf_list_add_tail(&f->queue_node, &sched->run_q); - return tf_schedule(TF_NO_TIMEOUT); + return tf_schedule(); } diff --git a/src/io-unix.c b/src/io-unix.c index 8ed8e42..ea65a76 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -90,8 +90,7 @@ int tf_close(struct tf_fd *fd) return 0; } -int tf_read_fully(struct tf_fd *fd, void *buf, size_t count, - tf_mtime_diff_t timeout) +int tf_read_fully(struct tf_fd *fd, void *buf, size_t count) { ssize_t n; int r; @@ -120,16 +119,14 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count, continue; } - r = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + r = tf_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); return -r; } -int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count, - tf_mtime_diff_t timeout) +int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count) { ssize_t n; int r; @@ -155,15 +152,14 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count, continue; } - r = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + r = tf_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); return -r; } -ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) +ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count) { ssize_t n; @@ -178,16 +174,14 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeo n = -errno; break; } - n = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + n = tf_schedule(); } while (n == TF_WAKEUP_FD); tf_fd_unmonitor(fd); return n; } -ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count, - tf_mtime_diff_t timeout) +ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count) { ssize_t n; @@ -202,8 +196,7 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count, n = -errno; break; } - n = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + n = tf_schedule(); } while (n == TF_WAKEUP_FD); tf_fd_unmonitor(fd); @@ -250,7 +243,7 @@ int tf_listen(struct tf_fd *fd, int backlog) } int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, - struct tf_sockaddr *from, tf_mtime_diff_t timeout) + struct tf_sockaddr *from) { int r, tfdf; struct sockaddr *addr = NULL; @@ -277,8 +270,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, tf_fd_unmonitor(listen_fd); return -errno; } - r = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + r = tf_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(listen_fd); if (r < 0) @@ -287,8 +279,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, return tf_open_fd(child_fd, r, tfdf); } -int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to, - tf_mtime_diff_t timeout) +int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to) { socklen_t l = sizeof(int); int r, err; @@ -302,7 +293,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to, /* Wait for socket to become readable */ tf_fd_monitor(fd, EPOLLOUT); - r = tf_schedule(timeout); + r = tf_schedule(); tf_fd_unmonitor(fd); if (r != TF_WAKEUP_FD) return r; @@ -316,8 +307,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to, ssize_t tf_recvmsg(struct tf_fd *fd, struct tf_sockaddr *from, struct tf_sockaddr *to, - void *buf, size_t len, - tf_mtime_diff_t timeout) + void *buf, size_t len) { struct iovec iov; struct msghdr msg; @@ -347,8 +337,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd, r = -errno; break; } - r = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + r = tf_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); @@ -372,8 +361,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd, ssize_t tf_sendmsg(struct tf_fd *fd, struct tf_sockaddr *from, const struct tf_sockaddr *to, - const void *buf, size_t len, - tf_mtime_diff_t timeout) + const void *buf, size_t len) { struct msghdr msg; struct iovec iov; @@ -412,8 +400,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd, r = -errno; break; } - r = tf_schedule(timeout); - timeout = TF_NO_TIMEOUT_CHANGE; + r = tf_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); |