diff options
author | Timo Teras <timo.teras@iki.fi> | 2010-03-10 13:58:39 +0200 |
---|---|---|
committer | Timo Teras <timo.teras@iki.fi> | 2010-03-10 13:58:39 +0200 |
commit | 5ef38570315dc68d7ddf8d9475d9a8830528e8a4 (patch) | |
tree | f88fc542b5231614ac6c22a75baea90d82449d6c | |
parent | 43e69b26126b8708b70680c6b4806eb3844386ab (diff) | |
download | libtf-5ef38570315dc68d7ddf8d9475d9a8830528e8a4.tar.bz2 libtf-5ef38570315dc68d7ddf8d9475d9a8830528e8a4.tar.xz |
libtf: separate scheduler fibre, change the core api
-rw-r--r-- | include/libtf/defines.h | 1 | ||||
-rw-r--r-- | include/libtf/fiber.h | 82 | ||||
-rw-r--r-- | include/libtf/heap.h | 10 | ||||
-rw-r--r-- | include/libtf/io.h | 4 | ||||
-rw-r--r-- | include/libtf/list.h | 33 | ||||
-rw-r--r-- | include/libtf/scheduler.h | 64 | ||||
-rw-r--r-- | include/libtf/tf.h | 1 | ||||
-rw-r--r-- | src/TFbuild | 3 | ||||
-rw-r--r-- | src/fiber.c | 327 | ||||
-rw-r--r-- | src/io-epoll.c | 32 | ||||
-rw-r--r-- | src/io-unix.c | 16 | ||||
-rw-r--r-- | src/scheduler.c | 132 | ||||
-rw-r--r-- | src/uctx.h | 82 | ||||
-rw-r--r-- | test/httpget.c | 16 | ||||
-rw-r--r-- | test/read.c | 11 | ||||
-rw-r--r-- | test/simple1.c | 13 | ||||
-rw-r--r-- | test/sleep.c | 17 |
17 files changed, 533 insertions, 311 deletions
diff --git a/include/libtf/defines.h b/include/libtf/defines.h index ae72980..8e39c7e 100644 --- a/include/libtf/defines.h +++ b/include/libtf/defines.h @@ -62,6 +62,7 @@ __FILE__, __LINE__, __func__, #cond); \ abort(); \ } +#define TF_BUILD_BUG_ON(cond) ((void) sizeof(struct { char TF_BUILD_BUG_ON[-!!(cond)]; })) #define TF_ALIGN(size,align) ((((size_t)(size)) + (align)-1) & ~((align)-1)) diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h index ce3745b..a140607 100644 --- a/include/libtf/fiber.h +++ b/include/libtf/fiber.h @@ -1,6 +1,6 @@ -/* fiber.h - libtf fiber scheduler header +/* fiber.h - libtf fiber manager header * - * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi> + * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi> * All rights reserved. * * This program is free software; you can redistribute it and/or modify it @@ -15,8 +15,6 @@ #include <errno.h> #include <libtf/defines.h> -#include <libtf/atomic.h> -#include <libtf/list.h> #include <libtf/heap.h> /* Fiber wakeup reasons */ @@ -27,64 +25,31 @@ #define TF_WAKEUP_THIS_TIMEOUT -ETIMEDOUT #define TF_WAKEUP_FD -EIO -struct tf_poll_data { - int epoll_fd; - int num_waiters; -}; - -/* Scheduler */ -struct tf_fiber; +/* Fiber management */ +struct tf_scheduler; +typedef void (*tf_fiber_proc)(void *fiber); -struct tf_scheduler { - struct tf_list_head run_q; - struct tf_heap_head heap; - struct tf_fiber * active_fiber; - int num_fibers; - tf_mtime_t scheduler_time; - struct tf_poll_data poll_data; -}; +void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size); +void *tf_fiber_create(struct tf_scheduler *sched, + tf_fiber_proc fiber_main, int private_size); +void *tf_fiber_get(void *data); +void tf_fiber_put(void *data); +void __tf_fiber_wakeup(void *data, int wakeup_type); +void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node); +int __tf_fiber_schedule(void); +int __tf_fiber_bind_scheduler(struct tf_scheduler *sched); +int __tf_fiber_release_scheduler(struct tf_scheduler *sched); -struct tf_main_ctx { - int argc; - char ** argv; -}; +void tf_fiber_exit(void) attribute_noreturn; +void tf_fiber_kill(void *fiber); +int tf_fiber_yield(void); +/* Scheduling and fiber management */ struct tf_timeout { tf_mtime_t saved_timeout; unsigned int timeout_change; }; -static inline -struct tf_scheduler *tf_get_scheduler(void) -{ - extern struct tf_scheduler *__tf_scheduler; - return __tf_scheduler; -} - -static inline -struct tf_fiber *tf_get_fiber(void) -{ - return tf_get_scheduler()->active_fiber; -} - -static inline -tf_mtime_t tf_mtime(void) -{ - return tf_get_scheduler()->scheduler_time; -} - -/* Fiber creation */ -typedef void (*tf_fiber_proc)(void *fiber); -int tf_main_args(tf_fiber_proc fiber_main, int argc, char **argv); -static inline int tf_main(tf_fiber_proc fiber_main) -{ - return tf_main_args(fiber_main, 0, NULL); -} - -void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size); -void *tf_fiber_get(void *data); -void tf_fiber_put(void *data); - #define tf_timed(func, timeout) \ ({ \ struct tf_timeout __timeout; \ @@ -92,7 +57,6 @@ void tf_fiber_put(void *data); tf_timeout_pop(&__timeout, (func)); \ }) -//* Scheduling and fiber management */ void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds); int __tf_timeout_pop(struct tf_timeout *timeout, int err); @@ -103,17 +67,11 @@ static inline int tf_timeout_pop(struct tf_timeout *timeout, int err) return err; } -int tf_schedule(void); -void tf_wakeup(struct tf_fiber *fiber, int wakeup_type); -void tf_exit(void) attribute_noreturn; -void tf_kill(void *fiber); -int tf_yield(void); - static inline int tf_msleep(tf_mtime_diff_t milliseconds) { int r; - r = tf_timed(tf_schedule(), milliseconds); + r = tf_timed(__tf_fiber_schedule(), milliseconds); if (r == TF_WAKEUP_THIS_TIMEOUT) r = 0; return r; diff --git a/include/libtf/heap.h b/include/libtf/heap.h index a68e01d..3a16159 100644 --- a/include/libtf/heap.h +++ b/include/libtf/heap.h @@ -74,4 +74,14 @@ int tf_heap_prealloc(struct tf_heap_head *head, uint32_t size) return 0; } +static inline +void tf_heap_destroy(struct tf_heap_head *head) +{ + if (head->item) + free(head->item); + head->item = NULL; + head->num_items = 0; + head->allocated = 0; +} + #endif diff --git a/include/libtf/io.h b/include/libtf/io.h index 1f0b793..1f37d81 100644 --- a/include/libtf/io.h +++ b/include/libtf/io.h @@ -26,8 +26,6 @@ #define TF_FD_SET_CLOEXEC 4 #define TF_FD_ALREADY_NONBLOCKING 8 -struct tf_fiber; - struct tf_sockaddr { union { struct sockaddr addr; @@ -42,7 +40,7 @@ struct tf_fd { /* Single waiter -- would be relatively trivial to modify to allow * multiple waiters, if someone actually needs it */ unsigned int events; - struct tf_fiber *waiting_fiber; + void *waiting_fiber; }; void tf_poll_init(void); diff --git a/include/libtf/list.h b/include/libtf/list.h index 22b76a8..f75a0be 100644 --- a/include/libtf/list.h +++ b/include/libtf/list.h @@ -142,7 +142,7 @@ static inline void tf_list_add_tail(struct tf_list_node *new, struct tf_list_hea tf_list_add_before(new, &head->node); } -static inline void __tf_list_del(struct tf_list_node * prev, struct tf_list_node *next) +static inline void __tf_list_del(struct tf_list_node *prev, struct tf_list_node *next) { next->prev = prev; prev->next = next; @@ -155,6 +155,14 @@ static inline void tf_list_del(struct tf_list_node *entry) entry->prev = NULL; } +static inline struct tf_list_node *tf_list_pop(struct tf_list_head *head) +{ + struct tf_list_node *n; + n = head->node.next; + tf_list_del(n); + return n; +} + static inline int tf_list_hashed(const struct tf_list_node *n) { return n->next != n && n->next != NULL; @@ -165,6 +173,29 @@ static inline int tf_list_empty(const struct tf_list_head *h) return !tf_list_hashed(&h->node); } +static inline void __tf_list_splice(const struct tf_list_head *list, + struct tf_list_node *prev, + struct tf_list_node *next) +{ + struct tf_list_node *first = list->node.next; + struct tf_list_node *last = list->node.prev; + + first->prev = prev; + prev->next = first; + + last->next = next; + next->prev = last; +} + +static inline void tf_list_splice_tail(struct tf_list_head *src, + struct tf_list_head *dst) +{ + if (!tf_list_empty(src)) { + __tf_list_splice(src, dst->node.prev, &dst->node); + tf_list_init_head(src); + } +} + #define tf_list_next(ptr, type, member) \ (tf_list_hashed(ptr) ? container_of((ptr)->next,type,member) : NULL) diff --git a/include/libtf/scheduler.h b/include/libtf/scheduler.h new file mode 100644 index 0000000..cc8db70 --- /dev/null +++ b/include/libtf/scheduler.h @@ -0,0 +1,64 @@ +/* scheduler.h - libtf fiber scheduler header + * + * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#ifndef TF_SCHEDULER_H +#define TF_SCHEDULER_H + +#include <libtf/atomic.h> +#include <libtf/list.h> +#include <libtf/heap.h> +#include <libtf/fiber.h> + +struct tf_scheduler { + struct tf_list_head scheduled_q; + struct tf_list_head running_q; + struct tf_heap_head heap; + void * active_fiber; + void * main_fiber; + int num_fibers; + tf_mtime_t scheduler_time; + unsigned long poll_data[2]; +}; + +static inline +struct tf_scheduler *tf_scheduler_get_current(void) +{ + extern struct tf_scheduler *__tf_scheduler; + TF_BUG_ON(__tf_scheduler == NULL); + return __tf_scheduler; +} + +static inline +tf_mtime_t tf_scheduler_get_mtime(void) +{ + return tf_scheduler_get_current()->scheduler_time; +} + +struct tf_scheduler *tf_scheduler_create(void); +int tf_scheduler_enable(struct tf_scheduler *); +void tf_scheduler_disable(void); + +static inline struct tf_scheduler * +tf_scheduler_get(struct tf_scheduler *s) +{ + tf_fiber_get(s); + return s; +} + +static inline void +tf_scheduler_put(struct tf_scheduler *s) +{ + tf_fiber_put(s); +} + +#endif + diff --git a/include/libtf/tf.h b/include/libtf/tf.h index 7a089ff..e613f18 100644 --- a/include/libtf/tf.h +++ b/include/libtf/tf.h @@ -14,6 +14,7 @@ #define TF_H #include <libtf/fiber.h> +#include <libtf/scheduler.h> #include <libtf/io.h> #endif diff --git a/src/TFbuild b/src/TFbuild index 9b40443..08cb696 100644 --- a/src/TFbuild +++ b/src/TFbuild @@ -1,5 +1,6 @@ libs-y += libtf -libtf-objs-y += fiber.o heap.o io-epoll.o +libtf-objs-y += fiber.o scheduler.o heap.o +libtf-objs-$(OS_LINUX) += io-epoll.o CFLAGS_heap.c += -funroll-all-loops diff --git a/src/fiber.c b/src/fiber.c index a7cb6bd..3f58b6d 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -1,4 +1,4 @@ -/* fiber.c - fiber management and scheduling +/* fiber.c - fiber management * * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi> * All rights reserved. @@ -14,54 +14,74 @@ #include <errno.h> #include <unistd.h> #include <libtf/fiber.h> -#include <libtf/io.h> +#include <libtf/scheduler.h> +#include "uctx.h" #define TF_TIMEOUT_CHANGE_NEEDED 1 #define TF_TIMEOUT_CHANGE_NEW_VALUE 2 struct tf_fiber { unsigned int ref_count; + struct tf_scheduler * scheduler; int wakeup_type; unsigned int timeout_change; tf_mtime_t timeout; struct tf_list_node queue_node; struct tf_heap_node heap_node; + struct tf_uctx context; char data[TF_EMPTY_ARRAY]; }; -#include "uctx.h" - -/* FIXME: should be in thread local storage */ -struct tf_scheduler *__tf_scheduler; +static inline +struct tf_fiber *tf_fiber_get_current(void) +{ + void *data = tf_scheduler_get_current()->active_fiber; + return container_of(data, struct tf_fiber, data); +} -void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size) +void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size) { - struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *fiber; - if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0) - return NULL; - - fiber = tf_uctx_create(fiber_main, private_size); + fiber = tf_uctx_create_embedded( + TF_STACK_SIZE, + sizeof(struct tf_fiber) + private_size, + offsetof(struct tf_fiber, context), + fiber_main, offsetof(struct tf_fiber, data), + tf_fiber_exit); if (fiber == NULL) return NULL; - /* The initial references for caller and scheduler */ *fiber = (struct tf_fiber) { - .ref_count = 2, + .ref_count = 1, .queue_node = TF_LIST_INITIALIZER(fiber->queue_node), + .context = fiber->context, }; - tf_list_add_tail(&fiber->queue_node, &sched->run_q); - sched->num_fibers++; - return fiber->data; } -static void __tf_fiber_destroy(struct tf_fiber *fiber) +void *tf_fiber_create( + struct tf_scheduler *sched, + tf_fiber_proc fiber_main, int private_size) { - tf_heap_delete(&fiber->heap_node, &tf_get_scheduler()->heap); - tf_uctx_destroy(fiber); + struct tf_fiber *fiber; + + if (sched == NULL) + sched = tf_scheduler_get_current(); + + if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0) + return NULL; + + fiber = container_of(__tf_fiber_create(fiber_main, private_size), + struct tf_fiber, data); + sched->num_fibers++; + + fiber->scheduler = sched; + fiber->wakeup_type = TF_WAKEUP_NONE; + tf_list_add_tail(&fiber->queue_node, &sched->scheduled_q); + + return tf_fiber_get(fiber->data); } void *tf_fiber_get(void *data) @@ -71,6 +91,23 @@ void *tf_fiber_get(void *data) return data; } +static void __tf_fiber_destroy(struct tf_fiber *fiber) +{ + struct tf_scheduler *sched = fiber->scheduler; + int main_fiber; + + main_fiber = (fiber->context.alloc == NULL); + tf_heap_delete(&fiber->heap_node, &sched->heap); + tf_uctx_destroy(&fiber->context); + if (main_fiber) + free(fiber); + sched->num_fibers--; + if (sched->num_fibers == 1) { + /* FIXME: Use proper fiber event*/ + __tf_fiber_wakeup(sched->main_fiber, TF_WAKEUP_IMMEDIATE); + } +} + void tf_fiber_put(void *data) { struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); @@ -78,110 +115,144 @@ void tf_fiber_put(void *data) __tf_fiber_destroy(fiber); } -static void update_time(struct tf_scheduler *sched) +void __tf_fiber_wakeup(void *data, int wakeup_type) { - struct timespec ts; + struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); + struct tf_scheduler *sched = fiber->scheduler; - clock_gettime(CLOCK_MONOTONIC, &ts); - sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; + if (fiber->wakeup_type == TF_WAKEUP_NONE) { + fiber->wakeup_type = wakeup_type; + tf_list_add_tail(&fiber->queue_node, &sched->running_q); + } } -static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f) +void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node) { - struct tf_fiber *schedf = container_of((void*) tf_get_scheduler(), struct tf_fiber, data); - - sched->active_fiber = f; - tf_uctx_transfer(schedf, f); - switch (f->wakeup_type) { - case TF_WAKEUP_KILL: - tf_fiber_put(f->data); - sched->num_fibers--; - break; - case TF_WAKEUP_NONE: - break; - default: - TF_BUG_ON("bad scheduler call from fiber"); - } + __tf_fiber_wakeup(container_of(node, struct tf_fiber, heap_node)->data, + TF_WAKEUP_TIMEOUT); } -static void process_heap(struct tf_scheduler *sched) +static void __tf_fiber_schedule_next(void) { - struct tf_heap_node *node; - struct tf_fiber *f; - tf_mtime_t now = tf_mtime(); - - while (!tf_heap_empty(&sched->heap) && - tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) { - node = tf_heap_get_node(&sched->heap); - f = container_of(node, struct tf_fiber, heap_node); - if (f->wakeup_type == TF_WAKEUP_NONE) - f->wakeup_type = TF_WAKEUP_TIMEOUT; - run_fiber(sched, f); + struct tf_scheduler *sched = tf_scheduler_get_current(); + struct tf_fiber *f = tf_fiber_get_current(); + struct tf_fiber *nf; + + /* Figure out the next fibre to run */ + if (unlikely(tf_list_empty(&sched->scheduled_q))) { + tf_list_splice_tail(&sched->running_q, + &sched->scheduled_q); + TF_BUG_ON(tf_list_empty(&sched->scheduled_q)); } + nf = tf_list_entry(tf_list_pop(&sched->scheduled_q), + struct tf_fiber, queue_node); + sched->active_fiber = nf->data; + tf_uctx_transfer(&f->context, &nf->context); } -static void process_runq(struct tf_scheduler *sched) +int __tf_fiber_schedule(void) { - struct tf_fiber *f; + struct tf_scheduler *sched = tf_scheduler_get_current(); + struct tf_fiber *f = tf_fiber_get_current(); + int wakeup; - while (!tf_list_empty(&sched->run_q)) { - f = tf_list_first(&sched->run_q, struct tf_fiber, queue_node); - tf_list_del(&f->queue_node); - run_fiber(sched, f); + if (unlikely(f->timeout_change)) { + if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) { + if (tf_mtime_diff(f->timeout, tf_scheduler_get_mtime()) <= 0) { + f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; + return TF_WAKEUP_TIMEOUT; + } + tf_heap_change(&f->heap_node, &sched->heap, f->timeout); + } else + tf_heap_delete(&f->heap_node, &sched->heap); + f->timeout_change = 0; } + + __tf_fiber_schedule_next(); + + wakeup = f->wakeup_type; + f->wakeup_type = TF_WAKEUP_NONE; + + return wakeup; } -int tf_main_args(tf_fiber_proc main_fiber, int argc, char **argv) +int __tf_fiber_bind_scheduler(struct tf_scheduler *sched) { - struct tf_uctx *ctx = alloca(sizeof(struct tf_uctx) + sizeof(struct tf_scheduler)); - struct tf_scheduler *sched = (struct tf_scheduler*) ctx->fiber.data; - struct tf_main_ctx *mainctx; - int stack_guard = STACK_GUARD; - - ctx->stack_guard = &stack_guard; - *sched = (struct tf_scheduler){ - .run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q), + struct tf_fiber *f; + + f = malloc(sizeof(struct tf_fiber)); + if (f == NULL) + return -ENOMEM; + + /* Mark currently active main fiber as active */ + *f = (struct tf_fiber) { + .ref_count = 1, + .scheduler = sched, + .queue_node = TF_LIST_INITIALIZER(f->queue_node), }; + tf_uctx_create_self(&f->context); + sched->main_fiber = f->data; + sched->active_fiber = f->data; + sched->num_fibers++; - __tf_scheduler = sched; - tf_poll_init(); - update_time(sched); - - mainctx = tf_fiber_create(main_fiber, sizeof(struct tf_main_ctx)); - mainctx->argc = argc; - mainctx->argv = argv; - tf_fiber_put(mainctx); - - do { - tf_mtime_diff_t timeout; - - update_time(sched); - if (!tf_list_empty(&sched->run_q)) { - timeout = 0; - } else if (!tf_heap_empty(&sched->heap)) { - timeout = tf_mtime_diff(tf_heap_get_value(&sched->heap), - tf_mtime()); - if (timeout < 0) - timeout = 0; - } else - timeout = -1; + /* Schedule scheduler fiber */ + f = container_of((void *) sched, struct tf_fiber, data); + f->scheduler = sched; + f->wakeup_type = TF_WAKEUP_IMMEDIATE; + tf_list_add_tail(&f->queue_node, &sched->running_q); - if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT && timeout >= 0) { - sched->scheduler_time += timeout; - process_heap(sched); - } - process_runq(sched); - } while (likely(sched->num_fibers)); - tf_poll_close(); - __tf_scheduler = NULL; + return 0; +} + +int __tf_fiber_release_scheduler(struct tf_scheduler *sched) +{ + struct tf_fiber *f; + + /* Detach scheduler */ + f = container_of((void *) sched, struct tf_fiber, data); + tf_list_del(&f->queue_node); + + /* Detach main stack from this scheduler */ + f = container_of((void *) sched->main_fiber, struct tf_fiber, data); + tf_fiber_put(sched->main_fiber); + sched->main_fiber = NULL; + sched->num_fibers--; return 0; } +void tf_fiber_exit(void) +{ + struct tf_scheduler *sched = tf_scheduler_get_current(); + struct tf_fiber *f = tf_fiber_get_current(); + struct tf_fiber *schedf = container_of((void *) sched, struct tf_fiber, data); + + tf_heap_delete(&f->heap_node, &sched->heap); + schedf->wakeup_type = TF_WAKEUP_KILL; + tf_uctx_transfer(&f->context, &schedf->context); + TF_BUG_ON(1); +} + +void tf_fiber_kill(void *fiber) +{ +} + +int tf_fiber_yield(void) +{ + struct tf_scheduler *sched = tf_scheduler_get_current(); + struct tf_fiber *f = tf_fiber_get_current(); + + TF_BUG_ON(tf_list_hashed(&f->queue_node)); + f->wakeup_type = TF_WAKEUP_IMMEDIATE; + tf_list_add_tail(&f->queue_node, &sched->running_q); + + return __tf_fiber_schedule(); +} + void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds) { - struct tf_fiber *f = tf_get_fiber(); - tf_mtime_t abs = tf_mtime() + milliseconds; + struct tf_fiber *f = tf_fiber_get_current(); + tf_mtime_t abs = tf_scheduler_get_mtime() + milliseconds; int active; if (f->timeout_change) @@ -207,7 +278,7 @@ void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds) int __tf_timeout_pop(struct tf_timeout *timeout, int err) { - struct tf_fiber *f = tf_get_fiber(); + struct tf_fiber *f = tf_fiber_get_current(); f->timeout = timeout->saved_timeout; f->timeout_change = timeout->timeout_change; @@ -215,61 +286,3 @@ int __tf_timeout_pop(struct tf_timeout *timeout, int err) err = TF_WAKEUP_THIS_TIMEOUT; return err; } - -int tf_schedule(void) -{ - struct tf_scheduler *sched = tf_get_scheduler(); - struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); - struct tf_fiber *f = sched->active_fiber; - - if (unlikely(f->timeout_change)) { - if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) { - if (tf_mtime_diff(f->timeout, tf_mtime()) <= 0) { - f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; - return TF_WAKEUP_TIMEOUT; - } - tf_heap_change(&f->heap_node, &sched->heap, f->timeout); - } else - tf_heap_delete(&f->heap_node, &sched->heap); - f->timeout_change = 0; - } - f->wakeup_type = TF_WAKEUP_NONE; - tf_uctx_transfer(f, schedf); - return f->wakeup_type; -} - -void tf_wakeup(struct tf_fiber *fiber, int wakeup_type) -{ - struct tf_scheduler *sched = tf_get_scheduler(); - - if (fiber->wakeup_type == TF_WAKEUP_NONE) { - fiber->wakeup_type = wakeup_type; - tf_list_add_tail(&fiber->queue_node, &sched->run_q); - } -} - -void tf_exit(void) -{ - struct tf_scheduler *sched = tf_get_scheduler(); - struct tf_fiber *f = sched->active_fiber; - struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); - - tf_heap_delete(&f->heap_node, &sched->heap); - f->wakeup_type = TF_WAKEUP_KILL; - tf_uctx_transfer(f, schedf); - TF_BUG_ON(1); -} - -void tf_kill(void *fiber) -{ -} - -int tf_yield(void) -{ - struct tf_scheduler *sched = tf_get_scheduler(); - struct tf_fiber *f = sched->active_fiber; - - tf_list_add_tail(&f->queue_node, &sched->run_q); - return tf_schedule(); -} - diff --git a/src/io-epoll.c b/src/io-epoll.c index 5e28de8..8ac230f 100644 --- a/src/io-epoll.c +++ b/src/io-epoll.c @@ -17,11 +17,23 @@ #include <sys/socket.h> #include <libtf/io.h> -#include <libtf/fiber.h> +#include <libtf/scheduler.h> + +struct tf_poll_data { + int epoll_fd; + int num_waiters; +}; + +struct tf_poll_data *tf_epoll_get_data(void) +{ + struct tf_scheduler *sched = tf_scheduler_get_current(); + TF_BUILD_BUG_ON(sizeof(struct tf_poll_data) > sizeof(sched->poll_data)); + return (struct tf_poll_data *) &sched->poll_data; +} static int tf_fd_created(struct tf_fd *fd) { - struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + struct tf_poll_data *pd = tf_epoll_get_data(); struct epoll_event ev; int r; @@ -39,7 +51,7 @@ static int tf_fd_created(struct tf_fd *fd) static int tf_fd_destroyed(struct tf_fd *fd) { - struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + struct tf_poll_data *pd = tf_epoll_get_data(); if (fd->flags & TF_FD_AUTOCLOSE) return 0; @@ -50,17 +62,17 @@ static int tf_fd_destroyed(struct tf_fd *fd) static void tf_fd_monitor(struct tf_fd *fd, int events) { - struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + struct tf_poll_data *pd = tf_epoll_get_data(); TF_BUG_ON(fd->waiting_fiber != NULL); fd->events = events | EPOLLERR | EPOLLHUP; - fd->waiting_fiber = tf_get_fiber(); + fd->waiting_fiber = tf_scheduler_get_current()->active_fiber; pd->num_waiters++; } static void tf_fd_unmonitor(struct tf_fd *fd) { - struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + struct tf_poll_data *pd = tf_epoll_get_data(); fd->waiting_fiber = NULL; fd->events = 0; @@ -69,7 +81,7 @@ static void tf_fd_unmonitor(struct tf_fd *fd) void tf_poll_init(void) { - struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + struct tf_poll_data *pd = tf_epoll_get_data(); pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC); pd->num_waiters = 0; @@ -78,7 +90,7 @@ void tf_poll_init(void) int tf_poll(tf_mtime_diff_t timeout) { - struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + struct tf_poll_data *pd = tf_epoll_get_data(); struct epoll_event events[64]; struct tf_fd *fd; int r, i, ret; @@ -95,7 +107,7 @@ int tf_poll(tf_mtime_diff_t timeout) for (i = 0; i < r; i++) { fd = (struct tf_fd *) events[i].data.ptr; if (likely(fd->events & events[i].events)) - tf_wakeup(fd->waiting_fiber, TF_WAKEUP_FD); + __tf_fiber_wakeup(fd->waiting_fiber, TF_WAKEUP_FD); } ret = TF_WAKEUP_FD; timeout = 0; @@ -106,7 +118,7 @@ int tf_poll(tf_mtime_diff_t timeout) void tf_poll_close(void) { - struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + struct tf_poll_data *pd = tf_epoll_get_data(); close(pd->epoll_fd); } diff --git a/src/io-unix.c b/src/io-unix.c index ea65a76..39cdf64 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -119,7 +119,7 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count) continue; } - r = tf_schedule(); + r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); @@ -152,7 +152,7 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count) continue; } - r = tf_schedule(); + r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); @@ -174,7 +174,7 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count) n = -errno; break; } - n = tf_schedule(); + n = __tf_fiber_schedule(); } while (n == TF_WAKEUP_FD); tf_fd_unmonitor(fd); @@ -196,7 +196,7 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count) n = -errno; break; } - n = tf_schedule(); + n = __tf_fiber_schedule(); } while (n == TF_WAKEUP_FD); tf_fd_unmonitor(fd); @@ -270,7 +270,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, tf_fd_unmonitor(listen_fd); return -errno; } - r = tf_schedule(); + r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(listen_fd); if (r < 0) @@ -293,7 +293,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to) /* Wait for socket to become readable */ tf_fd_monitor(fd, EPOLLOUT); - r = tf_schedule(); + r = __tf_fiber_schedule(); tf_fd_unmonitor(fd); if (r != TF_WAKEUP_FD) return r; @@ -337,7 +337,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd, r = -errno; break; } - r = tf_schedule(); + r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); @@ -400,7 +400,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd, r = -errno; break; } - r = tf_schedule(); + r = __tf_fiber_schedule(); } while (r == TF_WAKEUP_FD); tf_fd_unmonitor(fd); diff --git a/src/scheduler.c b/src/scheduler.c new file mode 100644 index 0000000..d287eca --- /dev/null +++ b/src/scheduler.c @@ -0,0 +1,132 @@ +/* scheduler.c - fiber scheduling + * + * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#include <time.h> +#include <libtf/scheduler.h> +#include <libtf/io.h> + +/* FIXME: should be in thread local storage */ +struct tf_scheduler *__tf_scheduler; + +static void update_time(struct tf_scheduler *sched) +{ + struct timespec ts; + + clock_gettime(CLOCK_MONOTONIC, &ts); + sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; +} + +static void process_heap(struct tf_scheduler *sched) +{ + struct tf_heap_node *node; + tf_mtime_t now = sched->scheduler_time; + + while (!tf_heap_empty(&sched->heap) && + tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) { + node = tf_heap_get_node(&sched->heap); + tf_heap_delete(node, &sched->heap); + __tf_fiber_wakeup_heapnode(node); + } +} + +void tf_scheduler_fiber(void *data) +{ + struct tf_scheduler *sched = (struct tf_scheduler *) data; + + do { + tf_mtime_diff_t timeout; + + update_time(sched); + if (!tf_list_empty(&sched->scheduled_q) || + !tf_list_empty(&sched->running_q)) { + timeout = 0; + } else if (!tf_heap_empty(&sched->heap)) { + timeout = tf_mtime_diff( + tf_heap_get_value(&sched->heap), + tf_scheduler_get_mtime()); + if (timeout < 0) + timeout = 0; + } else { + timeout = -1; + } + + if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT && + timeout >= 0) { + sched->scheduler_time += timeout; + process_heap(sched); + } + + if (tf_fiber_yield() == TF_WAKEUP_KILL) { + do { + tf_fiber_put(sched->active_fiber); + sched->active_fiber = sched; + } while (__tf_fiber_schedule() == TF_WAKEUP_KILL); + } + } while (1); +} + +struct tf_scheduler *tf_scheduler_create(void) +{ + struct tf_scheduler *sched; + + sched = __tf_fiber_create(tf_scheduler_fiber, + sizeof(struct tf_scheduler)); + + *sched = (struct tf_scheduler) { + .scheduled_q = TF_LIST_HEAD_INITIALIZER(sched->scheduled_q), + .running_q = TF_LIST_HEAD_INITIALIZER(sched->running_q), + }; + + return sched; +} + +int tf_scheduler_enable(struct tf_scheduler *sched) +{ + struct tf_scheduler *s = sched; + + if (s == NULL) { + s = tf_scheduler_create(); + if (s == NULL) + return -ENOMEM; + } + if (s->main_fiber != NULL) + return -EBUSY; + + __tf_fiber_bind_scheduler(s); + __tf_scheduler = s; + tf_poll_init(); + update_time(s); + + if (sched != NULL) + tf_scheduler_get(sched); + + return 0; +} + +void tf_scheduler_disable(void) +{ + struct tf_scheduler *sched = __tf_scheduler; + + if (sched == NULL || + sched->main_fiber != sched->active_fiber) + return; + + /* sleep until no others */ + while (sched->num_fibers > 1) + __tf_fiber_schedule(); + + tf_poll_close(); + __tf_scheduler = NULL; + __tf_fiber_release_scheduler(sched); + tf_heap_destroy(&sched->heap); + tf_fiber_put(sched); +} @@ -14,8 +14,12 @@ #include <stdio.h> #include <stdlib.h> + #ifdef VALGRIND #include <valgrind/valgrind.h> +#else +#define VALGRIND_STACK_REGISTER(stack_base, size) 0 +#define VALGRIND_STACK_DEREGISTER(stack_id) #endif #define STACK_GUARD 0xbad57ac4 @@ -24,10 +28,7 @@ struct tf_uctx { int *stack_guard; void *alloc; void *current_sp; -#ifdef VALGRIND unsigned int stack_id; -#endif - struct tf_fiber fiber; }; #if defined(__i386__) @@ -86,10 +87,25 @@ static inline void stack_push_ptr(void **stackptr, void *ptr) } -static inline -struct tf_fiber *tf_uctx_create(tf_fiber_proc fiber_main, int private_size) +static inline void tf_uctx_create_self(struct tf_uctx *uctx) +{ + static int dummy_guard = STACK_GUARD; + + *uctx = (struct tf_uctx) { + .stack_guard = &dummy_guard, + }; +} + +static inline void * +tf_uctx_create_embedded( + size_t stack_size, + size_t private_size, + off_t uctx_offset, + void (*stack_frame_main)(void*), off_t main_argument_offset, + void (*stack_frame_return)(void)) { size_t size = TF_STACK_SIZE; + void *user_data; struct tf_uctx *uctx; void *stack, *stack_base; @@ -98,46 +114,42 @@ struct tf_fiber *tf_uctx_create(tf_fiber_proc fiber_main, int private_size) if (stack_base == NULL) return NULL; + /* Create initial stack frame (cdecl convention) */ stack = stack_pointer(stack_base, size); - private_size += sizeof(struct tf_uctx); - - /* Construct inital frame for call the main function and if it - * happens to return, it'll jump back to tf_exit() which kills - * the fiber (cdecl calling convetion assumed) */ - uctx = stack_push(&stack, TF_ALIGN(private_size, 64)); - stack_push_ptr(&stack, uctx->fiber.data); - stack_push_ptr(&stack, &tf_exit); - stack_push_ptr(&stack, fiber_main); - uctx->current_sp = stack; - -#ifdef VALGRIND - uctx->stack_id = VALGRIND_STACK_REGISTER(stack_base, size); -#endif - uctx->alloc = stack_base; - uctx->stack_guard = stack_guard(stack_base, size); + user_data = stack_push(&stack, TF_ALIGN(private_size, 64)); + stack_push_ptr(&stack, NULL); + stack_push_ptr(&stack, NULL); + stack_push_ptr(&stack, NULL); + stack_push_ptr(&stack, NULL); + stack_push_ptr(&stack, user_data + main_argument_offset); + stack_push_ptr(&stack, stack_frame_return); + stack_push_ptr(&stack, stack_frame_main); + + uctx = user_data + uctx_offset; + *uctx = (struct tf_uctx) { + .stack_guard = stack_guard(stack_base, size), + .alloc = stack_base, + .current_sp = stack, + .stack_id = VALGRIND_STACK_REGISTER(stack_base, size), + }; *uctx->stack_guard = STACK_GUARD; - return &uctx->fiber; + return user_data; } static inline -void tf_uctx_destroy(struct tf_fiber *fiber) +void tf_uctx_destroy(struct tf_uctx *uctx) { - struct tf_uctx *uctx = container_of(fiber, struct tf_uctx, fiber); -#ifdef VALGRIND - VALGRIND_STACK_DEREGISTER(uctx->stack_id); -#endif - free(uctx->alloc); + if (uctx->alloc != NULL) { + VALGRIND_STACK_DEREGISTER(uctx->stack_id); + free(uctx->alloc); + } } static inline -void tf_uctx_transfer(struct tf_fiber *from, struct tf_fiber *to) +void tf_uctx_transfer(struct tf_uctx *from, struct tf_uctx *to) { - - struct tf_uctx *ufrom = container_of(from, struct tf_uctx, fiber); - struct tf_uctx *uto = container_of(to, struct tf_uctx, fiber); - /* Switch stack pointers */ - TF_BUG_ON(*ufrom->stack_guard != STACK_GUARD); - switch_fiber(ufrom, uto); + TF_BUG_ON(*from->stack_guard != STACK_GUARD); + switch_fiber(from, to); } diff --git a/test/httpget.c b/test/httpget.c index fed6c06..9aec886 100644 --- a/test/httpget.c +++ b/test/httpget.c @@ -44,20 +44,16 @@ err: ctx->hostname, bytes, -r, strerror(-r)); } -static void init_fiber(void *ptr) +int main(int argc, char **argv) { - struct tf_main_ctx *ctx = (struct tf_main_ctx *) ptr; struct ctx *c; int i; - for (i = 1; i < ctx->argc; i++) { - c = tf_fiber_create(ping_fiber, sizeof(struct ctx)); - c->hostname = ctx->argv[i]; + tf_scheduler_enable(NULL); + for (i = 1; i < argc; i++) { + c = tf_fiber_create(NULL, ping_fiber, sizeof(struct ctx)); + c->hostname = argv[i]; tf_fiber_put(c); } -} - -int main(int argc, char **argv) -{ - return tf_main_args(init_fiber, argc, argv); + tf_scheduler_disable(); } diff --git a/test/read.c b/test/read.c index 6d8306b..3d318a3 100644 --- a/test/read.c +++ b/test/read.c @@ -32,13 +32,10 @@ static void io_fiber(void *ptr) tf_close(&fin); } -static void init_fiber(void *ptr) -{ - tf_fiber_put(tf_fiber_create(time_fiber, 0)); - tf_fiber_put(tf_fiber_create(io_fiber, 0)); -} - int main(int argc, char **argv) { - return tf_main(init_fiber); + tf_scheduler_enable(NULL); + tf_fiber_put(tf_fiber_create(NULL, time_fiber, 0)); + tf_fiber_put(tf_fiber_create(NULL, io_fiber, 0)); + tf_scheduler_disable(); } diff --git a/test/simple1.c b/test/simple1.c index 65a787c..6a05b7a 100644 --- a/test/simple1.c +++ b/test/simple1.c @@ -10,23 +10,20 @@ static void work_fiber(void *ptr) struct ctx *c = (struct ctx*) ptr; printf("Hello%d.1\n", c->id); - tf_yield(); + tf_fiber_yield(); printf("Hello%d.2\n", c->id); } -static void init_fiber(void *ptr) +int main(int argc, char **argv) { struct ctx *c; int i; + tf_scheduler_enable(NULL); for (i = 0; i < 6; i++) { - c = tf_fiber_create(work_fiber, sizeof(struct ctx)); + c = tf_fiber_create(NULL, work_fiber, sizeof(struct ctx)); c->id = i; tf_fiber_put(c); } -} - -int main(int argc, char **argv) -{ - return tf_main(init_fiber); + tf_scheduler_disable(); } diff --git a/test/sleep.c b/test/sleep.c index 7e39b5c..82399e1 100644 --- a/test/sleep.c +++ b/test/sleep.c @@ -8,24 +8,23 @@ struct ctx { static void work_fiber(void *ptr) { //struct ctx *c = (struct ctx*) ptr; - tf_msleep(rand() % 5000); + printf("one\n"); tf_msleep(rand() % 5000); + printf("two\n"); tf_msleep(rand() % 5000); + printf("three\n"); } -static void init_fiber(void *ptr) +int main(int argc, char **argv) { struct ctx *c; int i; - for (i = 0; i < 1000; i++) { - c = tf_fiber_create(work_fiber, sizeof(struct ctx)); + tf_scheduler_enable(NULL); + for (i = 0; i < 100; i++) { + c = tf_fiber_create(NULL, work_fiber, sizeof(struct ctx)); tf_fiber_put(c); } -} - -int main(int argc, char **argv) -{ - return tf_main(init_fiber); + tf_scheduler_disable(); } |