/* fiber.c - fiber management * * Copyright (C) 2009 Timo Teräs * All rights reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 or later as * published by the Free Software Foundation. * * See http://www.gnu.org/ for details. */ #include #include #include #include #include #include "uctx.h" #define TF_TIMEOUT_CHANGE_NEEDED 1 #define TF_TIMEOUT_CHANGE_NEW_VALUE 2 struct tf_fiber { unsigned int ref_count; struct tf_scheduler * scheduler; int wakeup_type; unsigned int timeout_change; tf_mtime_t timeout; struct tf_list_node queue_node; struct tf_heap_node heap_node; struct tf_uctx context; char data[TF_EMPTY_ARRAY]; }; static inline struct tf_fiber *tf_fiber_get_current(void) { void *data = tf_scheduler_get_current()->active_fiber; return container_of(data, struct tf_fiber, data); } static void tf_fiber_main(void *user_data, void *arg) { tf_fiber_proc proc = (tf_fiber_proc) arg; struct tf_fiber *f = user_data; proc(f->data); tf_fiber_exit(); } void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size) { struct tf_fiber *fiber; fiber = tf_uctx_create_embedded( TF_STACK_SIZE, sizeof(struct tf_fiber) + private_size, offsetof(struct tf_fiber, context), tf_fiber_main, fiber_main); if (fiber == NULL) return NULL; *fiber = (struct tf_fiber) { .ref_count = 1, .queue_node = TF_LIST_INITIALIZER(fiber->queue_node), .context = fiber->context, }; return fiber->data; } void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size) { struct tf_fiber *fiber; struct tf_scheduler *sched; sched = tf_scheduler_get_current(); if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0) return NULL; fiber = container_of(__tf_fiber_create(fiber_main, private_size), struct tf_fiber, data); sched->num_fibers++; fiber->scheduler = sched; fiber->wakeup_type = TF_WAKEUP_NONE; tf_list_add_tail(&fiber->queue_node, &sched->scheduled_q); return tf_fiber_get(fiber->data); } void *tf_fiber_get(void *data) { struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); tf_atomic_inc(fiber->ref_count); return data; } static void __tf_fiber_destroy(struct tf_fiber *fiber) { struct tf_scheduler *sched = fiber->scheduler; int main_fiber, num_fibers; /* decrease first the number of fibers as we might be * killing the scheduler it self */ num_fibers = --sched->num_fibers; main_fiber = (fiber->context.alloc == NULL); tf_heap_delete(&fiber->heap_node, &sched->heap); tf_uctx_destroy(&fiber->context); if (main_fiber) free(fiber); if (num_fibers == 1) { /* FIXME: Use proper fiber event*/ __tf_fiber_wakeup(sched->main_fiber, TF_WAKEUP_IMMEDIATE); } } void tf_fiber_put(void *data) { struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); if (tf_atomic_dec(fiber->ref_count) == 0) __tf_fiber_destroy(fiber); } void __tf_fiber_wakeup(void *data, int wakeup_type) { struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); struct tf_scheduler *sched = fiber->scheduler; if (fiber->wakeup_type == TF_WAKEUP_NONE) { fiber->wakeup_type = wakeup_type; tf_list_add_tail(&fiber->queue_node, &sched->running_q); } } void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node) { __tf_fiber_wakeup(container_of(node, struct tf_fiber, heap_node)->data, TF_WAKEUP_TIMEOUT); } int __tf_fiber_schedule(void) { struct tf_scheduler *sched = tf_scheduler_get_current(); struct tf_fiber *f = tf_fiber_get_current(), *nf; int wakeup; if (unlikely(f->timeout_change)) { if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) { if (tf_mtime_diff(f->timeout, tf_scheduler_get_mtime()) <= 0) { f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; return TF_WAKEUP_TIMEOUT; } tf_heap_change(&f->heap_node, &sched->heap, f->timeout); } else tf_heap_delete(&f->heap_node, &sched->heap); f->timeout_change = 0; } /* Figure out the next fibre to run */ if (unlikely(tf_list_empty(&sched->scheduled_q))) { tf_list_splice_tail(&sched->running_q, &sched->scheduled_q); TF_BUG_ON(tf_list_empty(&sched->scheduled_q)); } nf = tf_list_entry(tf_list_pop(&sched->scheduled_q), struct tf_fiber, queue_node); sched->active_fiber = nf->data; tf_uctx_transfer(&f->context, &nf->context); wakeup = f->wakeup_type; f->wakeup_type = TF_WAKEUP_NONE; return wakeup; } int __tf_fiber_bind_scheduler(struct tf_scheduler *sched) { struct tf_fiber *f; f = malloc(sizeof(struct tf_fiber)); if (f == NULL) return -ENOMEM; /* Mark currently active main fiber as active */ *f = (struct tf_fiber) { .ref_count = 1, .scheduler = sched, .queue_node = TF_LIST_INITIALIZER(f->queue_node), }; tf_uctx_create_self(&f->context); sched->main_fiber = f->data; sched->active_fiber = f->data; sched->num_fibers++; /* Schedule scheduler fiber */ f = container_of((void *) sched, struct tf_fiber, data); f->scheduler = sched; f->wakeup_type = TF_WAKEUP_IMMEDIATE; tf_list_add_tail(&f->queue_node, &sched->running_q); return 0; } int __tf_fiber_release_scheduler(struct tf_scheduler *sched) { struct tf_fiber *f; /* Detach scheduler */ f = container_of((void *) sched, struct tf_fiber, data); tf_list_del(&f->queue_node); /* Detach main stack from this scheduler */ f = container_of((void *) sched->main_fiber, struct tf_fiber, data); tf_fiber_put(sched->main_fiber); sched->main_fiber = NULL; sched->num_fibers--; return 0; } void tf_fiber_exit(void) { struct tf_scheduler *sched = tf_scheduler_get_current(); struct tf_fiber *f = tf_fiber_get_current(); struct tf_fiber *schedf = container_of((void *) sched, struct tf_fiber, data); tf_heap_delete(&f->heap_node, &sched->heap); schedf->wakeup_type = TF_WAKEUP_KILL; tf_uctx_transfer(&f->context, &schedf->context); TF_BUG_ON(1); } void tf_fiber_kill(void *fiber) { } int tf_fiber_yield(void) { struct tf_scheduler *sched = tf_scheduler_get_current(); struct tf_fiber *f = tf_fiber_get_current(); TF_BUG_ON(tf_list_hashed(&f->queue_node)); f->wakeup_type = TF_WAKEUP_IMMEDIATE; tf_list_add_tail(&f->queue_node, &sched->running_q); return __tf_fiber_schedule(); } void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds) { struct tf_fiber *f = tf_fiber_get_current(); tf_mtime_t abs = tf_scheduler_get_mtime() + milliseconds; int active; if (f->timeout_change) active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE); else active = tf_heap_node_active(&f->heap_node); if (!active || tf_mtime_diff(abs, f->timeout) < 0) { /* Save previous timeout */ timeout->saved_timeout = f->timeout; timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; if (active) timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE; /* Make new timeout pending */ f->timeout = abs; f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED | TF_TIMEOUT_CHANGE_NEW_VALUE; } else { timeout->timeout_change = 0; } } int __tf_timeout_pop(struct tf_timeout *timeout, int err) { struct tf_fiber *f = tf_fiber_get_current(); f->timeout = timeout->saved_timeout; f->timeout_change = timeout->timeout_change; if (err == TF_WAKEUP_TIMEOUT) err = TF_WAKEUP_THIS_TIMEOUT; return err; }