diff options
Diffstat (limited to 'src/fiber.c')
-rw-r--r-- | src/fiber.c | 327 |
1 files changed, 170 insertions, 157 deletions
diff --git a/src/fiber.c b/src/fiber.c index a7cb6bd..3f58b6d 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -1,4 +1,4 @@ -/* fiber.c - fiber management and scheduling +/* fiber.c - fiber management * * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi> * All rights reserved. @@ -14,54 +14,74 @@ #include <errno.h> #include <unistd.h> #include <libtf/fiber.h> -#include <libtf/io.h> +#include <libtf/scheduler.h> +#include "uctx.h" #define TF_TIMEOUT_CHANGE_NEEDED 1 #define TF_TIMEOUT_CHANGE_NEW_VALUE 2 struct tf_fiber { unsigned int ref_count; + struct tf_scheduler * scheduler; int wakeup_type; unsigned int timeout_change; tf_mtime_t timeout; struct tf_list_node queue_node; struct tf_heap_node heap_node; + struct tf_uctx context; char data[TF_EMPTY_ARRAY]; }; -#include "uctx.h" - -/* FIXME: should be in thread local storage */ -struct tf_scheduler *__tf_scheduler; +static inline +struct tf_fiber *tf_fiber_get_current(void) +{ + void *data = tf_scheduler_get_current()->active_fiber; + return container_of(data, struct tf_fiber, data); +} -void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size) +void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size) { - struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *fiber; - if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0) - return NULL; - - fiber = tf_uctx_create(fiber_main, private_size); + fiber = tf_uctx_create_embedded( + TF_STACK_SIZE, + sizeof(struct tf_fiber) + private_size, + offsetof(struct tf_fiber, context), + fiber_main, offsetof(struct tf_fiber, data), + tf_fiber_exit); if (fiber == NULL) return NULL; - /* The initial references for caller and scheduler */ *fiber = (struct tf_fiber) { - .ref_count = 2, + .ref_count = 1, .queue_node = TF_LIST_INITIALIZER(fiber->queue_node), + .context = fiber->context, }; - tf_list_add_tail(&fiber->queue_node, &sched->run_q); - sched->num_fibers++; - return fiber->data; } -static void __tf_fiber_destroy(struct tf_fiber *fiber) +void *tf_fiber_create( + struct tf_scheduler *sched, + tf_fiber_proc fiber_main, int private_size) { - tf_heap_delete(&fiber->heap_node, &tf_get_scheduler()->heap); - tf_uctx_destroy(fiber); + struct tf_fiber *fiber; + + if (sched == NULL) + sched = tf_scheduler_get_current(); + + if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0) + return NULL; + + fiber = container_of(__tf_fiber_create(fiber_main, private_size), + struct tf_fiber, data); + sched->num_fibers++; + + fiber->scheduler = sched; + fiber->wakeup_type = TF_WAKEUP_NONE; + tf_list_add_tail(&fiber->queue_node, &sched->scheduled_q); + + return tf_fiber_get(fiber->data); } void *tf_fiber_get(void *data) @@ -71,6 +91,23 @@ void *tf_fiber_get(void *data) return data; } +static void __tf_fiber_destroy(struct tf_fiber *fiber) +{ + struct tf_scheduler *sched = fiber->scheduler; + int main_fiber; + + main_fiber = (fiber->context.alloc == NULL); + tf_heap_delete(&fiber->heap_node, &sched->heap); + tf_uctx_destroy(&fiber->context); + if (main_fiber) + free(fiber); + sched->num_fibers--; + if (sched->num_fibers == 1) { + /* FIXME: Use proper fiber event*/ + __tf_fiber_wakeup(sched->main_fiber, TF_WAKEUP_IMMEDIATE); + } +} + void tf_fiber_put(void *data) { struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); @@ -78,110 +115,144 @@ void tf_fiber_put(void *data) __tf_fiber_destroy(fiber); } -static void update_time(struct tf_scheduler *sched) +void __tf_fiber_wakeup(void *data, int wakeup_type) { - struct timespec ts; + struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); + struct tf_scheduler *sched = fiber->scheduler; - clock_gettime(CLOCK_MONOTONIC, &ts); - sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; + if (fiber->wakeup_type == TF_WAKEUP_NONE) { + fiber->wakeup_type = wakeup_type; + tf_list_add_tail(&fiber->queue_node, &sched->running_q); + } } -static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f) +void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node) { - struct tf_fiber *schedf = container_of((void*) tf_get_scheduler(), struct tf_fiber, data); - - sched->active_fiber = f; - tf_uctx_transfer(schedf, f); - switch (f->wakeup_type) { - case TF_WAKEUP_KILL: - tf_fiber_put(f->data); - sched->num_fibers--; - break; - case TF_WAKEUP_NONE: - break; - default: - TF_BUG_ON("bad scheduler call from fiber"); - } + __tf_fiber_wakeup(container_of(node, struct tf_fiber, heap_node)->data, + TF_WAKEUP_TIMEOUT); } -static void process_heap(struct tf_scheduler *sched) +static void __tf_fiber_schedule_next(void) { - struct tf_heap_node *node; - struct tf_fiber *f; - tf_mtime_t now = tf_mtime(); - - while (!tf_heap_empty(&sched->heap) && - tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) { - node = tf_heap_get_node(&sched->heap); - f = container_of(node, struct tf_fiber, heap_node); - if (f->wakeup_type == TF_WAKEUP_NONE) - f->wakeup_type = TF_WAKEUP_TIMEOUT; - run_fiber(sched, f); + struct tf_scheduler *sched = tf_scheduler_get_current(); + struct tf_fiber *f = tf_fiber_get_current(); + struct tf_fiber *nf; + + /* Figure out the next fibre to run */ + if (unlikely(tf_list_empty(&sched->scheduled_q))) { + tf_list_splice_tail(&sched->running_q, + &sched->scheduled_q); + TF_BUG_ON(tf_list_empty(&sched->scheduled_q)); } + nf = tf_list_entry(tf_list_pop(&sched->scheduled_q), + struct tf_fiber, queue_node); + sched->active_fiber = nf->data; + tf_uctx_transfer(&f->context, &nf->context); } -static void process_runq(struct tf_scheduler *sched) +int __tf_fiber_schedule(void) { - struct tf_fiber *f; + struct tf_scheduler *sched = tf_scheduler_get_current(); + struct tf_fiber *f = tf_fiber_get_current(); + int wakeup; - while (!tf_list_empty(&sched->run_q)) { - f = tf_list_first(&sched->run_q, struct tf_fiber, queue_node); - tf_list_del(&f->queue_node); - run_fiber(sched, f); + if (unlikely(f->timeout_change)) { + if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) { + if (tf_mtime_diff(f->timeout, tf_scheduler_get_mtime()) <= 0) { + f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; + return TF_WAKEUP_TIMEOUT; + } + tf_heap_change(&f->heap_node, &sched->heap, f->timeout); + } else + tf_heap_delete(&f->heap_node, &sched->heap); + f->timeout_change = 0; } + + __tf_fiber_schedule_next(); + + wakeup = f->wakeup_type; + f->wakeup_type = TF_WAKEUP_NONE; + + return wakeup; } -int tf_main_args(tf_fiber_proc main_fiber, int argc, char **argv) +int __tf_fiber_bind_scheduler(struct tf_scheduler *sched) { - struct tf_uctx *ctx = alloca(sizeof(struct tf_uctx) + sizeof(struct tf_scheduler)); - struct tf_scheduler *sched = (struct tf_scheduler*) ctx->fiber.data; - struct tf_main_ctx *mainctx; - int stack_guard = STACK_GUARD; - - ctx->stack_guard = &stack_guard; - *sched = (struct tf_scheduler){ - .run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q), + struct tf_fiber *f; + + f = malloc(sizeof(struct tf_fiber)); + if (f == NULL) + return -ENOMEM; + + /* Mark currently active main fiber as active */ + *f = (struct tf_fiber) { + .ref_count = 1, + .scheduler = sched, + .queue_node = TF_LIST_INITIALIZER(f->queue_node), }; + tf_uctx_create_self(&f->context); + sched->main_fiber = f->data; + sched->active_fiber = f->data; + sched->num_fibers++; - __tf_scheduler = sched; - tf_poll_init(); - update_time(sched); - - mainctx = tf_fiber_create(main_fiber, sizeof(struct tf_main_ctx)); - mainctx->argc = argc; - mainctx->argv = argv; - tf_fiber_put(mainctx); - - do { - tf_mtime_diff_t timeout; - - update_time(sched); - if (!tf_list_empty(&sched->run_q)) { - timeout = 0; - } else if (!tf_heap_empty(&sched->heap)) { - timeout = tf_mtime_diff(tf_heap_get_value(&sched->heap), - tf_mtime()); - if (timeout < 0) - timeout = 0; - } else - timeout = -1; + /* Schedule scheduler fiber */ + f = container_of((void *) sched, struct tf_fiber, data); + f->scheduler = sched; + f->wakeup_type = TF_WAKEUP_IMMEDIATE; + tf_list_add_tail(&f->queue_node, &sched->running_q); - if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT && timeout >= 0) { - sched->scheduler_time += timeout; - process_heap(sched); - } - process_runq(sched); - } while (likely(sched->num_fibers)); - tf_poll_close(); - __tf_scheduler = NULL; + return 0; +} + +int __tf_fiber_release_scheduler(struct tf_scheduler *sched) +{ + struct tf_fiber *f; + + /* Detach scheduler */ + f = container_of((void *) sched, struct tf_fiber, data); + tf_list_del(&f->queue_node); + + /* Detach main stack from this scheduler */ + f = container_of((void *) sched->main_fiber, struct tf_fiber, data); + tf_fiber_put(sched->main_fiber); + sched->main_fiber = NULL; + sched->num_fibers--; return 0; } +void tf_fiber_exit(void) +{ + struct tf_scheduler *sched = tf_scheduler_get_current(); + struct tf_fiber *f = tf_fiber_get_current(); + struct tf_fiber *schedf = container_of((void *) sched, struct tf_fiber, data); + + tf_heap_delete(&f->heap_node, &sched->heap); + schedf->wakeup_type = TF_WAKEUP_KILL; + tf_uctx_transfer(&f->context, &schedf->context); + TF_BUG_ON(1); +} + +void tf_fiber_kill(void *fiber) +{ +} + +int tf_fiber_yield(void) +{ + struct tf_scheduler *sched = tf_scheduler_get_current(); + struct tf_fiber *f = tf_fiber_get_current(); + + TF_BUG_ON(tf_list_hashed(&f->queue_node)); + f->wakeup_type = TF_WAKEUP_IMMEDIATE; + tf_list_add_tail(&f->queue_node, &sched->running_q); + + return __tf_fiber_schedule(); +} + void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds) { - struct tf_fiber *f = tf_get_fiber(); - tf_mtime_t abs = tf_mtime() + milliseconds; + struct tf_fiber *f = tf_fiber_get_current(); + tf_mtime_t abs = tf_scheduler_get_mtime() + milliseconds; int active; if (f->timeout_change) @@ -207,7 +278,7 @@ void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds) int __tf_timeout_pop(struct tf_timeout *timeout, int err) { - struct tf_fiber *f = tf_get_fiber(); + struct tf_fiber *f = tf_fiber_get_current(); f->timeout = timeout->saved_timeout; f->timeout_change = timeout->timeout_change; @@ -215,61 +286,3 @@ int __tf_timeout_pop(struct tf_timeout *timeout, int err) err = TF_WAKEUP_THIS_TIMEOUT; return err; } - -int tf_schedule(void) -{ - struct tf_scheduler *sched = tf_get_scheduler(); - struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); - struct tf_fiber *f = sched->active_fiber; - - if (unlikely(f->timeout_change)) { - if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) { - if (tf_mtime_diff(f->timeout, tf_mtime()) <= 0) { - f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; - return TF_WAKEUP_TIMEOUT; - } - tf_heap_change(&f->heap_node, &sched->heap, f->timeout); - } else - tf_heap_delete(&f->heap_node, &sched->heap); - f->timeout_change = 0; - } - f->wakeup_type = TF_WAKEUP_NONE; - tf_uctx_transfer(f, schedf); - return f->wakeup_type; -} - -void tf_wakeup(struct tf_fiber *fiber, int wakeup_type) -{ - struct tf_scheduler *sched = tf_get_scheduler(); - - if (fiber->wakeup_type == TF_WAKEUP_NONE) { - fiber->wakeup_type = wakeup_type; - tf_list_add_tail(&fiber->queue_node, &sched->run_q); - } -} - -void tf_exit(void) -{ - struct tf_scheduler *sched = tf_get_scheduler(); - struct tf_fiber *f = sched->active_fiber; - struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); - - tf_heap_delete(&f->heap_node, &sched->heap); - f->wakeup_type = TF_WAKEUP_KILL; - tf_uctx_transfer(f, schedf); - TF_BUG_ON(1); -} - -void tf_kill(void *fiber) -{ -} - -int tf_yield(void) -{ - struct tf_scheduler *sched = tf_get_scheduler(); - struct tf_fiber *f = sched->active_fiber; - - tf_list_add_tail(&f->queue_node, &sched->run_q); - return tf_schedule(); -} - |