summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--TFbuild2
-rw-r--r--include/libtf/defines.h16
-rw-r--r--include/libtf/fiber.h34
-rw-r--r--include/libtf/heap.h71
-rw-r--r--src/TFbuild4
-rw-r--r--src/fiber.c109
-rw-r--r--src/heap.c167
-rw-r--r--test/sleep.c31
8 files changed, 400 insertions, 34 deletions
diff --git a/TFbuild b/TFbuild
index 7bfde59..1a9e846 100644
--- a/TFbuild
+++ b/TFbuild
@@ -1,6 +1,8 @@
subdirs-y += src
subdirs-$(TEST) += test
+
CFLAGS += -I$(srctree)include
+LDFLAGS += -lrt
install:
$(INSTALLDIR) $(DESTDIR)$(DOCDIR)
diff --git a/include/libtf/defines.h b/include/libtf/defines.h
index 7e35ff7..b1d2aa9 100644
--- a/include/libtf/defines.h
+++ b/include/libtf/defines.h
@@ -54,7 +54,7 @@
#define attribute_never_inline __attribute__((noinline))
#define attribute_weak_function __attribute__((weak))
-#define TF_BUG_ON(cond) if (cond) { \
+#define TF_BUG_ON(cond) if (unlikely(cond)) { \
fprintf(stderr, "BUG: failure at %s:%d/%s(): %s!\n", \
__FILE__, __LINE__, __func__, #cond); \
abort(); \
@@ -64,4 +64,18 @@
#define TF_EMPTY_ARRAY 0
+#ifndef TF_STACK_SIZE
+#define TF_STACK_SIZE 4096
+#endif
+
+/* Monotonic time */
+typedef uint32_t tf_mtime_t;
+typedef int32_t tf_mtime_diff_t;
+
+static inline
+tf_mtime_diff_t tf_mtime_diff(tf_mtime_t a, tf_mtime_t b)
+{
+ return (tf_mtime_diff_t)(a - b);
+}
+
#endif
diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h
index 48d4924..09d5ef1 100644
--- a/include/libtf/fiber.h
+++ b/include/libtf/fiber.h
@@ -17,26 +17,44 @@
#include <libtf/defines.h>
#include <libtf/atomic.h>
#include <libtf/list.h>
+#include <libtf/heap.h>
-#ifndef TF_STACK_SIZE
-#define TF_STACK_SIZE 4096
-#endif
+/* Scheduler */
+struct tf_scheduler {
+ struct tf_list_head run_q;
+ struct tf_list_head sleep_q;
+ struct tf_heap_head heap;
+
+ struct tf_fiber * active_fiber;
+ int num_fibers;
-struct tf_fiber {
- unsigned int ref_count;
- struct tf_list_node queue_node;
- char data[TF_EMPTY_ARRAY];
+ tf_mtime_t scheduler_time;
};
-typedef void (*tf_fiber_proc)(void *fiber);
+static inline
+struct tf_scheduler *tf_get_scheduler(void)
+{
+ extern struct tf_scheduler *__tf_scheduler;
+ return __tf_scheduler;
+}
+static inline
+tf_mtime_t tf_mtime(void)
+{
+ return tf_get_scheduler()->scheduler_time;
+}
+
+/* Fiber creation */
+typedef void (*tf_fiber_proc)(void *fiber);
int tf_main(tf_fiber_proc fiber_main);
void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
void *tf_fiber_get(void *data);
void tf_fiber_put(void *data);
+/* Scheduling and fiber management */
int tf_schedule(int err);
+int tf_msleep(int milliseconds);
void tf_kill(void *fiber);
static inline int tf_yield(void)
diff --git a/include/libtf/heap.h b/include/libtf/heap.h
new file mode 100644
index 0000000..24f4767
--- /dev/null
+++ b/include/libtf/heap.h
@@ -0,0 +1,71 @@
+/* heap.h - libtf heap
+ *
+ * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#ifndef TF_HEAP_H
+#define TF_HEAP_H
+
+#include <libtf/defines.h>
+
+#define TF_HEAP_D 4
+#define TF_HEAP_ITEM0 (TF_HEAP_D - 1)
+
+typedef tf_mtime_t tf_heap_priority;
+
+struct tf_heap_child {
+ struct tf_heap_node * ptr;
+ tf_heap_priority val;
+};
+
+struct tf_heap_node {
+ uint32_t index;
+};
+
+struct tf_heap_head {
+ uint32_t num_items;
+ uint32_t allocated;
+ struct tf_heap_child * item;
+};
+
+int __tf_heap_grow(struct tf_heap_head *head);
+void tf_heap_insert(struct tf_heap_node *node, struct tf_heap_head *head,
+ tf_heap_priority val);
+void tf_heap_delete(struct tf_heap_node *node, struct tf_heap_head *head);
+void tf_heap_change(struct tf_heap_node *node, struct tf_heap_head *head,
+ tf_heap_priority val);
+
+static inline
+tf_heap_priority tf_heap_get_value(struct tf_heap_head *head)
+{
+ return head->item[TF_HEAP_ITEM0].val;
+}
+
+static inline
+struct tf_heap_node *tf_heap_get_node(struct tf_heap_head *head)
+{
+ return head->item[TF_HEAP_ITEM0].ptr;
+}
+
+static inline
+int tf_heap_empty(struct tf_heap_head *head)
+{
+ return head->num_items == 0;
+}
+
+static inline
+int tf_heap_prealloc(struct tf_heap_head *head, uint32_t size)
+{
+ if (unlikely(head->num_items + TF_HEAP_ITEM0 >= head->allocated))
+ return __tf_heap_grow(head);
+ return 0;
+}
+
+#endif
diff --git a/src/TFbuild b/src/TFbuild
index 211b734..accae6d 100644
--- a/src/TFbuild
+++ b/src/TFbuild
@@ -1,5 +1,5 @@
libs-y += libtf
-libtf-objs-y += fiber.o dheap.o
+libtf-objs-y += fiber.o heap.o
-CFLAGS_dheap.c += -funroll-all-loops
+CFLAGS_heap.c += -funroll-all-loops
diff --git a/src/fiber.c b/src/fiber.c
index 0db2984..72da440 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -9,27 +9,36 @@
*
* See http://www.gnu.org/ for details.
*/
+
+#include <time.h>
#include <errno.h>
+#include <unistd.h>
#include <libtf/tf.h>
-#include TF_UCTX_H
+#include <libtf/heap.h>
-struct tf_scheduler {
- struct tf_list_head run_q;
- struct tf_list_head sleep_q;
-
- struct tf_fiber * active_fiber;
- int num_fibers;
+struct tf_fiber {
+ unsigned int ref_count;
+ struct tf_list_node queue_node;
+ struct tf_heap_node heap_node;
+ char data[TF_EMPTY_ARRAY];
};
+#include TF_UCTX_H
+
/* FIXME: should be in thread local storage */
-static struct tf_scheduler *__scheduler;
+struct tf_scheduler *__tf_scheduler;
void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
{
- struct tf_scheduler *sched = __scheduler;
+ struct tf_scheduler *sched = tf_get_scheduler();
struct tf_fiber *fiber;
+ if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0)
+ return NULL;
+
fiber = tf_uctx_create(fiber_main, private_size);
+ if (fiber == NULL)
+ return NULL;
/* The initial references for caller and scheduler */
*fiber = (struct tf_fiber) {
@@ -43,8 +52,9 @@ void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
return fiber->data;
}
-void __tf_fiber_destroy(struct tf_fiber *fiber)
+static void __tf_fiber_destroy(struct tf_fiber *fiber)
{
+ tf_heap_delete(&fiber->heap_node, &tf_get_scheduler()->heap);
tf_uctx_destroy(fiber);
}
@@ -62,17 +72,17 @@ void tf_fiber_put(void *data)
__tf_fiber_destroy(fiber);
}
-static void run_fiber(void)
+static void update_time(struct tf_scheduler *sched)
{
- struct tf_scheduler *sched = __scheduler;
- struct tf_fiber *schedf = container_of((void*) __scheduler, struct tf_fiber, data);
- struct tf_fiber *f;
+ struct timespec ts;
- if (tf_list_empty(&sched->run_q))
- return;
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
+}
- f = tf_list_first(&sched->run_q, struct tf_fiber, queue_node);
- tf_list_del(&f->queue_node);
+static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f)
+{
+ struct tf_fiber *schedf = container_of((void*) tf_get_scheduler(), struct tf_fiber, data);
sched->active_fiber = f;
switch (tf_uctx_transfer(schedf, f, 1)) {
@@ -91,6 +101,31 @@ static void run_fiber(void)
}
}
+static void process_heap(struct tf_scheduler *sched)
+{
+ struct tf_heap_node *node;
+ struct tf_fiber *f;
+ tf_mtime_t now = tf_mtime();
+
+ while (!tf_heap_empty(&sched->heap) &&
+ tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) > 0) {
+ node = tf_heap_get_node(&sched->heap);
+ f = container_of(node, struct tf_fiber, heap_node);
+ run_fiber(sched, f);
+ }
+}
+
+static void process_runq(struct tf_scheduler *sched)
+{
+ struct tf_fiber *f;
+
+ while (!tf_list_empty(&sched->run_q)) {
+ f = tf_list_first(&sched->run_q, struct tf_fiber, queue_node);
+ tf_list_del(&f->queue_node);
+ run_fiber(sched, f);
+ }
+}
+
int tf_main(tf_fiber_proc main_fiber)
{
struct tf_uctx *ctx = alloca(sizeof(struct tf_uctx) + sizeof(struct tf_scheduler));
@@ -102,20 +137,38 @@ int tf_main(tf_fiber_proc main_fiber)
.run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q),
.sleep_q = TF_LIST_HEAD_INITIALIZER(sched->sleep_q),
};
- __scheduler = sched;
+ __tf_scheduler = sched;
+ update_time(sched);
tf_fiber_put(tf_fiber_create(main_fiber, 0));
do {
- run_fiber();
+ tf_mtime_diff_t timeout;
+
+ update_time(sched);
+ if (!tf_list_empty(&sched->run_q)) {
+ timeout = 0;
+ } else if (!tf_heap_empty(&sched->heap)) {
+ timeout = tf_mtime_diff(tf_heap_get_value(&sched->heap),
+ sched->scheduler_time);
+ if (timeout < 0)
+ timeout = 0;
+ } else
+ timeout = -1;
+
+ if (timeout > 0)
+ usleep(timeout * 1000);
+
+ process_heap(sched);
+ process_runq(sched);
} while (likely(sched->num_fibers));
- __scheduler = NULL;
+ __tf_scheduler = NULL;
return 0;
}
int tf_schedule(int err)
{
- struct tf_scheduler *sched = __scheduler;
- struct tf_fiber *schedf = container_of((void*) __scheduler, struct tf_fiber, data);
+ struct tf_scheduler *sched = tf_get_scheduler();
+ struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data);
struct tf_fiber *f = sched->active_fiber;
int r;
@@ -126,6 +179,16 @@ int tf_schedule(int err)
return r;
}
+int tf_msleep(int milliseconds)
+{
+ struct tf_scheduler *sched = tf_get_scheduler();
+ struct tf_fiber *f = sched->active_fiber;
+
+ tf_heap_change(&f->heap_node, &sched->heap, tf_mtime() + milliseconds);
+
+ return tf_schedule(EIO);
+}
+
void tf_kill(void *fiber)
{
}
diff --git a/src/heap.c b/src/heap.c
new file mode 100644
index 0000000..0d1a661
--- /dev/null
+++ b/src/heap.c
@@ -0,0 +1,167 @@
+/* heap.c - a linked heap implementation
+ *
+ * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#include <errno.h>
+#include <string.h>
+#include <libtf/heap.h>
+
+#define compare_values(a, b) tf_mtime_diff(a, b)
+
+static inline int tf_heap_parent(int index)
+{
+ return (index - TF_HEAP_ITEM0 - 1) / TF_HEAP_D + TF_HEAP_ITEM0;
+}
+
+static inline int tf_heap_first_child(int index)
+{
+ return TF_HEAP_D * (index - TF_HEAP_ITEM0) + TF_HEAP_ITEM0 + 1;
+}
+
+#if 0
+static void tf_heap_verify(struct tf_heap_head *head)
+{
+ int i, count = 0;
+
+ for (i = TF_HEAP_ITEM0; i < head->num_items + TF_HEAP_ITEM0; i++) {
+ if (head->item[i].ptr->index != i) {
+ printf("Heap item %d is corrupt ptr->index=%d\n", i, head->item[i].ptr->index);
+ count++;
+ }
+ }
+ TF_BUG_ON(count);
+}
+#endif
+
+static inline
+void tf_heap_downheap(struct tf_heap_child *heap, int last_index, int index)
+{
+ struct tf_heap_child he = heap[index];
+ struct tf_heap_child *minpos, *pos;
+ int c, i, mi;
+
+ while (1) {
+ c = tf_heap_first_child(index);
+ pos = &heap[c];
+
+ /* find minimum child */
+ minpos = pos;
+ mi = 0;
+ if (likely(c + TF_HEAP_D - 1 < last_index)) {
+ for (i = 1, pos++; i < TF_HEAP_D; i++, pos++)
+ if (compare_values(pos->val, minpos->val) < 0)
+ minpos = pos, mi = i;
+ } else if (c < last_index) {
+ for (i = 1, pos++; c + i < last_index; i++, pos++)
+ if (compare_values(pos->val, minpos->val) < 0)
+ minpos = pos, mi = i;
+ } else
+ break;
+
+ if (compare_values(he.val, minpos->val) <= 0)
+ break;
+
+ heap[index] = *minpos;
+ minpos->ptr->index = index;
+ index = c + mi;
+ }
+
+ heap[index] = he;
+ he.ptr->index = index;
+}
+
+static inline
+void tf_heap_upheap(struct tf_heap_child *heap, int index)
+{
+ struct tf_heap_child he = heap[index];
+ int p = tf_heap_parent(index);
+
+ while (likely(index > TF_HEAP_ITEM0) &&
+ compare_values(heap[p].val, he.val) > 0) {
+ heap[index] = heap[p];
+ heap[index].ptr->index = index;
+ index = p;
+ p = tf_heap_parent(p);
+ }
+
+ heap[index] = he;
+ he.ptr->index = index;
+}
+
+static inline
+void tf_heap_heapify(struct tf_heap_head *head, int index)
+{
+ struct tf_heap_child *heap = head->item;
+
+ if (likely(index > TF_HEAP_ITEM0) &&
+ compare_values(heap[index].val, heap[tf_heap_parent(index)].val) <= 0)
+ tf_heap_upheap(heap, index);
+ else
+ tf_heap_downheap(heap, TF_HEAP_ITEM0 + head->num_items, index);
+}
+
+int __tf_heap_grow(struct tf_heap_head *head)
+{
+ void *item;
+
+ if (head->allocated)
+ head->allocated *= 2;
+ else
+ head->allocated = 128;
+
+ item = realloc(head->item, head->allocated * sizeof(head->item[0]));
+ if (item == NULL)
+ return -ENOMEM;
+
+ head->item = item;
+ return 0;
+}
+
+void tf_heap_insert(struct tf_heap_node *node, struct tf_heap_head *head,
+ tf_heap_priority val)
+{
+ int i;
+
+ tf_heap_prealloc(head, head->num_items + 1);
+
+ i = node->index = TF_HEAP_ITEM0 + head->num_items;
+ head->num_items++;
+ head->item[i].ptr = node;
+ head->item[i].val = val;
+ tf_heap_upheap(head->item, i);
+}
+
+void tf_heap_delete(struct tf_heap_node *node, struct tf_heap_head *head)
+{
+ int index = node->index;
+
+ if (index == 0)
+ return;
+
+ head->num_items--;
+ if (likely(index < head->num_items + TF_HEAP_ITEM0)) {
+ head->item[index] = head->item[head->num_items+TF_HEAP_ITEM0];
+ tf_heap_heapify(head, index);
+ }
+ head->item[head->num_items+TF_HEAP_ITEM0].ptr = NULL;
+ node->index = 0;
+}
+
+void tf_heap_change(struct tf_heap_node *node, struct tf_heap_head *head,
+ tf_heap_priority val)
+{
+ if (likely(node->index != 0)) {
+ head->item[node->index].val = val;
+ tf_heap_heapify(head, node->index);
+ } else {
+ tf_heap_insert(node, head, val);
+ }
+}
diff --git a/test/sleep.c b/test/sleep.c
new file mode 100644
index 0000000..7e39b5c
--- /dev/null
+++ b/test/sleep.c
@@ -0,0 +1,31 @@
+#include <libtf/tf.h>
+#include <stdio.h>
+
+struct ctx {
+ int timeout;
+};
+
+static void work_fiber(void *ptr)
+{
+ //struct ctx *c = (struct ctx*) ptr;
+
+ tf_msleep(rand() % 5000);
+ tf_msleep(rand() % 5000);
+ tf_msleep(rand() % 5000);
+}
+
+static void init_fiber(void *ptr)
+{
+ struct ctx *c;
+ int i;
+
+ for (i = 0; i < 1000; i++) {
+ c = tf_fiber_create(work_fiber, sizeof(struct ctx));
+ tf_fiber_put(c);
+ }
+}
+
+int main(int argc, char **argv)
+{
+ return tf_main(init_fiber);
+}