/* fiber.c - fiber management and scheduling * * Copyright (C) 2009 Timo Teräs * All rights reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 or later as * published by the Free Software Foundation. * * See http://www.gnu.org/ for details. */ #include #include #include #include #include #define TF_TIMEOUT_CHANGE_NEEDED 1 #define TF_TIMEOUT_CHANGE_NEW_VALUE 2 struct tf_fiber { unsigned int ref_count; int wakeup_type; unsigned int timeout_change; tf_mtime_t timeout; struct tf_list_node queue_node; struct tf_heap_node heap_node; char data[TF_EMPTY_ARRAY]; }; #include "uctx.h" /* FIXME: should be in thread local storage */ struct tf_scheduler *__tf_scheduler; void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size) { struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *fiber; if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0) return NULL; fiber = tf_uctx_create(fiber_main, private_size); if (fiber == NULL) return NULL; /* The initial references for caller and scheduler */ *fiber = (struct tf_fiber) { .ref_count = 2, .queue_node = TF_LIST_INITIALIZER(fiber->queue_node), }; tf_list_add_tail(&fiber->queue_node, &sched->run_q); sched->num_fibers++; return fiber->data; } static void __tf_fiber_destroy(struct tf_fiber *fiber) { tf_heap_delete(&fiber->heap_node, &tf_get_scheduler()->heap); tf_uctx_destroy(fiber); } void *tf_fiber_get(void *data) { struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); tf_atomic_inc(fiber->ref_count); return data; } void tf_fiber_put(void *data) { struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); if (tf_atomic_dec(fiber->ref_count) == 0) __tf_fiber_destroy(fiber); } static void update_time(struct tf_scheduler *sched) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; } static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f) { struct tf_fiber *schedf = container_of((void*) tf_get_scheduler(), struct tf_fiber, data); sched->active_fiber = f; tf_uctx_transfer(schedf, f); switch (f->wakeup_type) { case TF_WAKEUP_KILL: tf_fiber_put(f->data); sched->num_fibers--; break; case TF_WAKEUP_NONE: break; default: TF_BUG_ON("bad scheduler call from fiber"); } } static void process_heap(struct tf_scheduler *sched) { struct tf_heap_node *node; struct tf_fiber *f; tf_mtime_t now = tf_mtime(); while (!tf_heap_empty(&sched->heap) && tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) { node = tf_heap_get_node(&sched->heap); f = container_of(node, struct tf_fiber, heap_node); if (f->wakeup_type == TF_WAKEUP_NONE) f->wakeup_type = TF_WAKEUP_TIMEOUT; run_fiber(sched, f); } } static void process_runq(struct tf_scheduler *sched) { struct tf_fiber *f; while (!tf_list_empty(&sched->run_q)) { f = tf_list_first(&sched->run_q, struct tf_fiber, queue_node); tf_list_del(&f->queue_node); run_fiber(sched, f); } } int tf_main_args(tf_fiber_proc main_fiber, int argc, char **argv) { struct tf_uctx *ctx = alloca(sizeof(struct tf_uctx) + sizeof(struct tf_scheduler)); struct tf_scheduler *sched = (struct tf_scheduler*) ctx->fiber.data; struct tf_main_ctx *mainctx; int stack_guard = STACK_GUARD; ctx->stack_guard = &stack_guard; *sched = (struct tf_scheduler){ .run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q), }; __tf_scheduler = sched; tf_poll_init(); update_time(sched); mainctx = tf_fiber_create(main_fiber, sizeof(struct tf_main_ctx)); mainctx->argc = argc; mainctx->argv = argv; tf_fiber_put(mainctx); do { tf_mtime_diff_t timeout; update_time(sched); if (!tf_list_empty(&sched->run_q)) { timeout = 0; } else if (!tf_heap_empty(&sched->heap)) { timeout = tf_mtime_diff(tf_heap_get_value(&sched->heap), tf_mtime()); if (timeout < 0) timeout = 0; } else timeout = -1; if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT && timeout >= 0) { sched->scheduler_time += timeout; process_heap(sched); } process_runq(sched); } while (likely(sched->num_fibers)); tf_poll_close(); __tf_scheduler = NULL; return 0; } void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds) { struct tf_fiber *f = tf_get_fiber(); tf_mtime_t abs = tf_mtime() + milliseconds; int active; if (f->timeout_change) active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE); else active = tf_heap_node_active(&f->heap_node); if (!active || tf_mtime_diff(abs, f->timeout) < 0) { /* Save previous timeout */ timeout->saved_timeout = f->timeout; timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; if (active) timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE; /* Make new timeout pending */ f->timeout = abs; f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED | TF_TIMEOUT_CHANGE_NEW_VALUE; } else { timeout->timeout_change = 0; } } int __tf_timeout_pop(struct tf_timeout *timeout, int err) { struct tf_fiber *f = tf_get_fiber(); f->timeout = timeout->saved_timeout; f->timeout_change = timeout->timeout_change; if (err == TF_WAKEUP_TIMEOUT) err = TF_WAKEUP_THIS_TIMEOUT; return err; } int tf_schedule(void) { struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); struct tf_fiber *f = sched->active_fiber; if (unlikely(f->timeout_change)) { if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) { if (tf_mtime_diff(f->timeout, tf_mtime()) <= 0) { f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; return TF_WAKEUP_TIMEOUT; } tf_heap_change(&f->heap_node, &sched->heap, f->timeout); } else tf_heap_delete(&f->heap_node, &sched->heap); f->timeout_change = 0; } f->wakeup_type = TF_WAKEUP_NONE; tf_uctx_transfer(f, schedf); return f->wakeup_type; } void tf_wakeup(struct tf_fiber *fiber, int wakeup_type) { struct tf_scheduler *sched = tf_get_scheduler(); if (fiber->wakeup_type == TF_WAKEUP_NONE) { fiber->wakeup_type = wakeup_type; tf_list_add_tail(&fiber->queue_node, &sched->run_q); } } void tf_exit(void) { struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *f = sched->active_fiber; struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); tf_heap_delete(&f->heap_node, &sched->heap); f->wakeup_type = TF_WAKEUP_KILL; tf_uctx_transfer(f, schedf); TF_BUG_ON(1); } void tf_kill(void *fiber) { } int tf_yield(void) { struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *f = sched->active_fiber; tf_list_add_tail(&f->queue_node, &sched->run_q); return tf_schedule(); }