diff options
-rw-r--r-- | TFbuild | 2 | ||||
-rw-r--r-- | include/libtf/defines.h | 16 | ||||
-rw-r--r-- | include/libtf/fiber.h | 34 | ||||
-rw-r--r-- | include/libtf/heap.h | 71 | ||||
-rw-r--r-- | src/TFbuild | 4 | ||||
-rw-r--r-- | src/fiber.c | 109 | ||||
-rw-r--r-- | src/heap.c | 167 | ||||
-rw-r--r-- | test/sleep.c | 31 |
8 files changed, 400 insertions, 34 deletions
@@ -1,6 +1,8 @@ subdirs-y += src subdirs-$(TEST) += test + CFLAGS += -I$(srctree)include +LDFLAGS += -lrt install: $(INSTALLDIR) $(DESTDIR)$(DOCDIR) diff --git a/include/libtf/defines.h b/include/libtf/defines.h index 7e35ff7..b1d2aa9 100644 --- a/include/libtf/defines.h +++ b/include/libtf/defines.h @@ -54,7 +54,7 @@ #define attribute_never_inline __attribute__((noinline)) #define attribute_weak_function __attribute__((weak)) -#define TF_BUG_ON(cond) if (cond) { \ +#define TF_BUG_ON(cond) if (unlikely(cond)) { \ fprintf(stderr, "BUG: failure at %s:%d/%s(): %s!\n", \ __FILE__, __LINE__, __func__, #cond); \ abort(); \ @@ -64,4 +64,18 @@ #define TF_EMPTY_ARRAY 0 +#ifndef TF_STACK_SIZE +#define TF_STACK_SIZE 4096 +#endif + +/* Monotonic time */ +typedef uint32_t tf_mtime_t; +typedef int32_t tf_mtime_diff_t; + +static inline +tf_mtime_diff_t tf_mtime_diff(tf_mtime_t a, tf_mtime_t b) +{ + return (tf_mtime_diff_t)(a - b); +} + #endif diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h index 48d4924..09d5ef1 100644 --- a/include/libtf/fiber.h +++ b/include/libtf/fiber.h @@ -17,26 +17,44 @@ #include <libtf/defines.h> #include <libtf/atomic.h> #include <libtf/list.h> +#include <libtf/heap.h> -#ifndef TF_STACK_SIZE -#define TF_STACK_SIZE 4096 -#endif +/* Scheduler */ +struct tf_scheduler { + struct tf_list_head run_q; + struct tf_list_head sleep_q; + struct tf_heap_head heap; + + struct tf_fiber * active_fiber; + int num_fibers; -struct tf_fiber { - unsigned int ref_count; - struct tf_list_node queue_node; - char data[TF_EMPTY_ARRAY]; + tf_mtime_t scheduler_time; }; -typedef void (*tf_fiber_proc)(void *fiber); +static inline +struct tf_scheduler *tf_get_scheduler(void) +{ + extern struct tf_scheduler *__tf_scheduler; + return __tf_scheduler; +} +static inline +tf_mtime_t tf_mtime(void) +{ + return tf_get_scheduler()->scheduler_time; +} + +/* Fiber creation */ +typedef void (*tf_fiber_proc)(void *fiber); int tf_main(tf_fiber_proc fiber_main); void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size); void *tf_fiber_get(void *data); void tf_fiber_put(void *data); +/* Scheduling and fiber management */ int tf_schedule(int err); +int tf_msleep(int milliseconds); void tf_kill(void *fiber); static inline int tf_yield(void) diff --git a/include/libtf/heap.h b/include/libtf/heap.h new file mode 100644 index 0000000..24f4767 --- /dev/null +++ b/include/libtf/heap.h @@ -0,0 +1,71 @@ +/* heap.h - libtf heap + * + * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#ifndef TF_HEAP_H +#define TF_HEAP_H + +#include <libtf/defines.h> + +#define TF_HEAP_D 4 +#define TF_HEAP_ITEM0 (TF_HEAP_D - 1) + +typedef tf_mtime_t tf_heap_priority; + +struct tf_heap_child { + struct tf_heap_node * ptr; + tf_heap_priority val; +}; + +struct tf_heap_node { + uint32_t index; +}; + +struct tf_heap_head { + uint32_t num_items; + uint32_t allocated; + struct tf_heap_child * item; +}; + +int __tf_heap_grow(struct tf_heap_head *head); +void tf_heap_insert(struct tf_heap_node *node, struct tf_heap_head *head, + tf_heap_priority val); +void tf_heap_delete(struct tf_heap_node *node, struct tf_heap_head *head); +void tf_heap_change(struct tf_heap_node *node, struct tf_heap_head *head, + tf_heap_priority val); + +static inline +tf_heap_priority tf_heap_get_value(struct tf_heap_head *head) +{ + return head->item[TF_HEAP_ITEM0].val; +} + +static inline +struct tf_heap_node *tf_heap_get_node(struct tf_heap_head *head) +{ + return head->item[TF_HEAP_ITEM0].ptr; +} + +static inline +int tf_heap_empty(struct tf_heap_head *head) +{ + return head->num_items == 0; +} + +static inline +int tf_heap_prealloc(struct tf_heap_head *head, uint32_t size) +{ + if (unlikely(head->num_items + TF_HEAP_ITEM0 >= head->allocated)) + return __tf_heap_grow(head); + return 0; +} + +#endif diff --git a/src/TFbuild b/src/TFbuild index 211b734..accae6d 100644 --- a/src/TFbuild +++ b/src/TFbuild @@ -1,5 +1,5 @@ libs-y += libtf -libtf-objs-y += fiber.o dheap.o +libtf-objs-y += fiber.o heap.o -CFLAGS_dheap.c += -funroll-all-loops +CFLAGS_heap.c += -funroll-all-loops diff --git a/src/fiber.c b/src/fiber.c index 0db2984..72da440 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -9,27 +9,36 @@ * * See http://www.gnu.org/ for details. */ + +#include <time.h> #include <errno.h> +#include <unistd.h> #include <libtf/tf.h> -#include TF_UCTX_H +#include <libtf/heap.h> -struct tf_scheduler { - struct tf_list_head run_q; - struct tf_list_head sleep_q; - - struct tf_fiber * active_fiber; - int num_fibers; +struct tf_fiber { + unsigned int ref_count; + struct tf_list_node queue_node; + struct tf_heap_node heap_node; + char data[TF_EMPTY_ARRAY]; }; +#include TF_UCTX_H + /* FIXME: should be in thread local storage */ -static struct tf_scheduler *__scheduler; +struct tf_scheduler *__tf_scheduler; void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size) { - struct tf_scheduler *sched = __scheduler; + struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *fiber; + if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0) + return NULL; + fiber = tf_uctx_create(fiber_main, private_size); + if (fiber == NULL) + return NULL; /* The initial references for caller and scheduler */ *fiber = (struct tf_fiber) { @@ -43,8 +52,9 @@ void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size) return fiber->data; } -void __tf_fiber_destroy(struct tf_fiber *fiber) +static void __tf_fiber_destroy(struct tf_fiber *fiber) { + tf_heap_delete(&fiber->heap_node, &tf_get_scheduler()->heap); tf_uctx_destroy(fiber); } @@ -62,17 +72,17 @@ void tf_fiber_put(void *data) __tf_fiber_destroy(fiber); } -static void run_fiber(void) +static void update_time(struct tf_scheduler *sched) { - struct tf_scheduler *sched = __scheduler; - struct tf_fiber *schedf = container_of((void*) __scheduler, struct tf_fiber, data); - struct tf_fiber *f; + struct timespec ts; - if (tf_list_empty(&sched->run_q)) - return; + clock_gettime(CLOCK_MONOTONIC, &ts); + sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; +} - f = tf_list_first(&sched->run_q, struct tf_fiber, queue_node); - tf_list_del(&f->queue_node); +static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f) +{ + struct tf_fiber *schedf = container_of((void*) tf_get_scheduler(), struct tf_fiber, data); sched->active_fiber = f; switch (tf_uctx_transfer(schedf, f, 1)) { @@ -91,6 +101,31 @@ static void run_fiber(void) } } +static void process_heap(struct tf_scheduler *sched) +{ + struct tf_heap_node *node; + struct tf_fiber *f; + tf_mtime_t now = tf_mtime(); + + while (!tf_heap_empty(&sched->heap) && + tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) > 0) { + node = tf_heap_get_node(&sched->heap); + f = container_of(node, struct tf_fiber, heap_node); + run_fiber(sched, f); + } +} + +static void process_runq(struct tf_scheduler *sched) +{ + struct tf_fiber *f; + + while (!tf_list_empty(&sched->run_q)) { + f = tf_list_first(&sched->run_q, struct tf_fiber, queue_node); + tf_list_del(&f->queue_node); + run_fiber(sched, f); + } +} + int tf_main(tf_fiber_proc main_fiber) { struct tf_uctx *ctx = alloca(sizeof(struct tf_uctx) + sizeof(struct tf_scheduler)); @@ -102,20 +137,38 @@ int tf_main(tf_fiber_proc main_fiber) .run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q), .sleep_q = TF_LIST_HEAD_INITIALIZER(sched->sleep_q), }; - __scheduler = sched; + __tf_scheduler = sched; + update_time(sched); tf_fiber_put(tf_fiber_create(main_fiber, 0)); do { - run_fiber(); + tf_mtime_diff_t timeout; + + update_time(sched); + if (!tf_list_empty(&sched->run_q)) { + timeout = 0; + } else if (!tf_heap_empty(&sched->heap)) { + timeout = tf_mtime_diff(tf_heap_get_value(&sched->heap), + sched->scheduler_time); + if (timeout < 0) + timeout = 0; + } else + timeout = -1; + + if (timeout > 0) + usleep(timeout * 1000); + + process_heap(sched); + process_runq(sched); } while (likely(sched->num_fibers)); - __scheduler = NULL; + __tf_scheduler = NULL; return 0; } int tf_schedule(int err) { - struct tf_scheduler *sched = __scheduler; - struct tf_fiber *schedf = container_of((void*) __scheduler, struct tf_fiber, data); + struct tf_scheduler *sched = tf_get_scheduler(); + struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); struct tf_fiber *f = sched->active_fiber; int r; @@ -126,6 +179,16 @@ int tf_schedule(int err) return r; } +int tf_msleep(int milliseconds) +{ + struct tf_scheduler *sched = tf_get_scheduler(); + struct tf_fiber *f = sched->active_fiber; + + tf_heap_change(&f->heap_node, &sched->heap, tf_mtime() + milliseconds); + + return tf_schedule(EIO); +} + void tf_kill(void *fiber) { } diff --git a/src/heap.c b/src/heap.c new file mode 100644 index 0000000..0d1a661 --- /dev/null +++ b/src/heap.c @@ -0,0 +1,167 @@ +/* heap.c - a linked heap implementation + * + * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#include <errno.h> +#include <string.h> +#include <libtf/heap.h> + +#define compare_values(a, b) tf_mtime_diff(a, b) + +static inline int tf_heap_parent(int index) +{ + return (index - TF_HEAP_ITEM0 - 1) / TF_HEAP_D + TF_HEAP_ITEM0; +} + +static inline int tf_heap_first_child(int index) +{ + return TF_HEAP_D * (index - TF_HEAP_ITEM0) + TF_HEAP_ITEM0 + 1; +} + +#if 0 +static void tf_heap_verify(struct tf_heap_head *head) +{ + int i, count = 0; + + for (i = TF_HEAP_ITEM0; i < head->num_items + TF_HEAP_ITEM0; i++) { + if (head->item[i].ptr->index != i) { + printf("Heap item %d is corrupt ptr->index=%d\n", i, head->item[i].ptr->index); + count++; + } + } + TF_BUG_ON(count); +} +#endif + +static inline +void tf_heap_downheap(struct tf_heap_child *heap, int last_index, int index) +{ + struct tf_heap_child he = heap[index]; + struct tf_heap_child *minpos, *pos; + int c, i, mi; + + while (1) { + c = tf_heap_first_child(index); + pos = &heap[c]; + + /* find minimum child */ + minpos = pos; + mi = 0; + if (likely(c + TF_HEAP_D - 1 < last_index)) { + for (i = 1, pos++; i < TF_HEAP_D; i++, pos++) + if (compare_values(pos->val, minpos->val) < 0) + minpos = pos, mi = i; + } else if (c < last_index) { + for (i = 1, pos++; c + i < last_index; i++, pos++) + if (compare_values(pos->val, minpos->val) < 0) + minpos = pos, mi = i; + } else + break; + + if (compare_values(he.val, minpos->val) <= 0) + break; + + heap[index] = *minpos; + minpos->ptr->index = index; + index = c + mi; + } + + heap[index] = he; + he.ptr->index = index; +} + +static inline +void tf_heap_upheap(struct tf_heap_child *heap, int index) +{ + struct tf_heap_child he = heap[index]; + int p = tf_heap_parent(index); + + while (likely(index > TF_HEAP_ITEM0) && + compare_values(heap[p].val, he.val) > 0) { + heap[index] = heap[p]; + heap[index].ptr->index = index; + index = p; + p = tf_heap_parent(p); + } + + heap[index] = he; + he.ptr->index = index; +} + +static inline +void tf_heap_heapify(struct tf_heap_head *head, int index) +{ + struct tf_heap_child *heap = head->item; + + if (likely(index > TF_HEAP_ITEM0) && + compare_values(heap[index].val, heap[tf_heap_parent(index)].val) <= 0) + tf_heap_upheap(heap, index); + else + tf_heap_downheap(heap, TF_HEAP_ITEM0 + head->num_items, index); +} + +int __tf_heap_grow(struct tf_heap_head *head) +{ + void *item; + + if (head->allocated) + head->allocated *= 2; + else + head->allocated = 128; + + item = realloc(head->item, head->allocated * sizeof(head->item[0])); + if (item == NULL) + return -ENOMEM; + + head->item = item; + return 0; +} + +void tf_heap_insert(struct tf_heap_node *node, struct tf_heap_head *head, + tf_heap_priority val) +{ + int i; + + tf_heap_prealloc(head, head->num_items + 1); + + i = node->index = TF_HEAP_ITEM0 + head->num_items; + head->num_items++; + head->item[i].ptr = node; + head->item[i].val = val; + tf_heap_upheap(head->item, i); +} + +void tf_heap_delete(struct tf_heap_node *node, struct tf_heap_head *head) +{ + int index = node->index; + + if (index == 0) + return; + + head->num_items--; + if (likely(index < head->num_items + TF_HEAP_ITEM0)) { + head->item[index] = head->item[head->num_items+TF_HEAP_ITEM0]; + tf_heap_heapify(head, index); + } + head->item[head->num_items+TF_HEAP_ITEM0].ptr = NULL; + node->index = 0; +} + +void tf_heap_change(struct tf_heap_node *node, struct tf_heap_head *head, + tf_heap_priority val) +{ + if (likely(node->index != 0)) { + head->item[node->index].val = val; + tf_heap_heapify(head, node->index); + } else { + tf_heap_insert(node, head, val); + } +} diff --git a/test/sleep.c b/test/sleep.c new file mode 100644 index 0000000..7e39b5c --- /dev/null +++ b/test/sleep.c @@ -0,0 +1,31 @@ +#include <libtf/tf.h> +#include <stdio.h> + +struct ctx { + int timeout; +}; + +static void work_fiber(void *ptr) +{ + //struct ctx *c = (struct ctx*) ptr; + + tf_msleep(rand() % 5000); + tf_msleep(rand() % 5000); + tf_msleep(rand() % 5000); +} + +static void init_fiber(void *ptr) +{ + struct ctx *c; + int i; + + for (i = 0; i < 1000; i++) { + c = tf_fiber_create(work_fiber, sizeof(struct ctx)); + tf_fiber_put(c); + } +} + +int main(int argc, char **argv) +{ + return tf_main(init_fiber); +} |