summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTimo Teras <timo.teras@iki.fi>2009-11-24 11:36:24 +0200
committerTimo Teras <timo.teras@iki.fi>2009-11-24 13:26:52 +0200
commit3ea1a77fb5419ffd2f9c997977b383b5faf75423 (patch)
tree6f848510e07e11ca0ce34939bcb7944dfa49a4f5 /src
parente4e54c2ec744e884f6f55c135bea78e815d28d6c (diff)
downloadlibtf-3ea1a77fb5419ffd2f9c997977b383b5faf75423.tar.bz2
libtf-3ea1a77fb5419ffd2f9c997977b383b5faf75423.tar.xz
libtf: implement timeouts
internally put sleepers to d-ary heap based priority queue. the heap value is compared with overflow.
Diffstat (limited to 'src')
-rw-r--r--src/TFbuild4
-rw-r--r--src/fiber.c109
-rw-r--r--src/heap.c167
3 files changed, 255 insertions, 25 deletions
diff --git a/src/TFbuild b/src/TFbuild
index 211b734..accae6d 100644
--- a/src/TFbuild
+++ b/src/TFbuild
@@ -1,5 +1,5 @@
libs-y += libtf
-libtf-objs-y += fiber.o dheap.o
+libtf-objs-y += fiber.o heap.o
-CFLAGS_dheap.c += -funroll-all-loops
+CFLAGS_heap.c += -funroll-all-loops
diff --git a/src/fiber.c b/src/fiber.c
index 0db2984..72da440 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -9,27 +9,36 @@
*
* See http://www.gnu.org/ for details.
*/
+
+#include <time.h>
#include <errno.h>
+#include <unistd.h>
#include <libtf/tf.h>
-#include TF_UCTX_H
+#include <libtf/heap.h>
-struct tf_scheduler {
- struct tf_list_head run_q;
- struct tf_list_head sleep_q;
-
- struct tf_fiber * active_fiber;
- int num_fibers;
+struct tf_fiber {
+ unsigned int ref_count;
+ struct tf_list_node queue_node;
+ struct tf_heap_node heap_node;
+ char data[TF_EMPTY_ARRAY];
};
+#include TF_UCTX_H
+
/* FIXME: should be in thread local storage */
-static struct tf_scheduler *__scheduler;
+struct tf_scheduler *__tf_scheduler;
void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
{
- struct tf_scheduler *sched = __scheduler;
+ struct tf_scheduler *sched = tf_get_scheduler();
struct tf_fiber *fiber;
+ if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0)
+ return NULL;
+
fiber = tf_uctx_create(fiber_main, private_size);
+ if (fiber == NULL)
+ return NULL;
/* The initial references for caller and scheduler */
*fiber = (struct tf_fiber) {
@@ -43,8 +52,9 @@ void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
return fiber->data;
}
-void __tf_fiber_destroy(struct tf_fiber *fiber)
+static void __tf_fiber_destroy(struct tf_fiber *fiber)
{
+ tf_heap_delete(&fiber->heap_node, &tf_get_scheduler()->heap);
tf_uctx_destroy(fiber);
}
@@ -62,17 +72,17 @@ void tf_fiber_put(void *data)
__tf_fiber_destroy(fiber);
}
-static void run_fiber(void)
+static void update_time(struct tf_scheduler *sched)
{
- struct tf_scheduler *sched = __scheduler;
- struct tf_fiber *schedf = container_of((void*) __scheduler, struct tf_fiber, data);
- struct tf_fiber *f;
+ struct timespec ts;
- if (tf_list_empty(&sched->run_q))
- return;
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
+}
- f = tf_list_first(&sched->run_q, struct tf_fiber, queue_node);
- tf_list_del(&f->queue_node);
+static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f)
+{
+ struct tf_fiber *schedf = container_of((void*) tf_get_scheduler(), struct tf_fiber, data);
sched->active_fiber = f;
switch (tf_uctx_transfer(schedf, f, 1)) {
@@ -91,6 +101,31 @@ static void run_fiber(void)
}
}
+static void process_heap(struct tf_scheduler *sched)
+{
+ struct tf_heap_node *node;
+ struct tf_fiber *f;
+ tf_mtime_t now = tf_mtime();
+
+ while (!tf_heap_empty(&sched->heap) &&
+ tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) > 0) {
+ node = tf_heap_get_node(&sched->heap);
+ f = container_of(node, struct tf_fiber, heap_node);
+ run_fiber(sched, f);
+ }
+}
+
+static void process_runq(struct tf_scheduler *sched)
+{
+ struct tf_fiber *f;
+
+ while (!tf_list_empty(&sched->run_q)) {
+ f = tf_list_first(&sched->run_q, struct tf_fiber, queue_node);
+ tf_list_del(&f->queue_node);
+ run_fiber(sched, f);
+ }
+}
+
int tf_main(tf_fiber_proc main_fiber)
{
struct tf_uctx *ctx = alloca(sizeof(struct tf_uctx) + sizeof(struct tf_scheduler));
@@ -102,20 +137,38 @@ int tf_main(tf_fiber_proc main_fiber)
.run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q),
.sleep_q = TF_LIST_HEAD_INITIALIZER(sched->sleep_q),
};
- __scheduler = sched;
+ __tf_scheduler = sched;
+ update_time(sched);
tf_fiber_put(tf_fiber_create(main_fiber, 0));
do {
- run_fiber();
+ tf_mtime_diff_t timeout;
+
+ update_time(sched);
+ if (!tf_list_empty(&sched->run_q)) {
+ timeout = 0;
+ } else if (!tf_heap_empty(&sched->heap)) {
+ timeout = tf_mtime_diff(tf_heap_get_value(&sched->heap),
+ sched->scheduler_time);
+ if (timeout < 0)
+ timeout = 0;
+ } else
+ timeout = -1;
+
+ if (timeout > 0)
+ usleep(timeout * 1000);
+
+ process_heap(sched);
+ process_runq(sched);
} while (likely(sched->num_fibers));
- __scheduler = NULL;
+ __tf_scheduler = NULL;
return 0;
}
int tf_schedule(int err)
{
- struct tf_scheduler *sched = __scheduler;
- struct tf_fiber *schedf = container_of((void*) __scheduler, struct tf_fiber, data);
+ struct tf_scheduler *sched = tf_get_scheduler();
+ struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data);
struct tf_fiber *f = sched->active_fiber;
int r;
@@ -126,6 +179,16 @@ int tf_schedule(int err)
return r;
}
+int tf_msleep(int milliseconds)
+{
+ struct tf_scheduler *sched = tf_get_scheduler();
+ struct tf_fiber *f = sched->active_fiber;
+
+ tf_heap_change(&f->heap_node, &sched->heap, tf_mtime() + milliseconds);
+
+ return tf_schedule(EIO);
+}
+
void tf_kill(void *fiber)
{
}
diff --git a/src/heap.c b/src/heap.c
new file mode 100644
index 0000000..0d1a661
--- /dev/null
+++ b/src/heap.c
@@ -0,0 +1,167 @@
+/* heap.c - a linked heap implementation
+ *
+ * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#include <errno.h>
+#include <string.h>
+#include <libtf/heap.h>
+
+#define compare_values(a, b) tf_mtime_diff(a, b)
+
+static inline int tf_heap_parent(int index)
+{
+ return (index - TF_HEAP_ITEM0 - 1) / TF_HEAP_D + TF_HEAP_ITEM0;
+}
+
+static inline int tf_heap_first_child(int index)
+{
+ return TF_HEAP_D * (index - TF_HEAP_ITEM0) + TF_HEAP_ITEM0 + 1;
+}
+
+#if 0
+static void tf_heap_verify(struct tf_heap_head *head)
+{
+ int i, count = 0;
+
+ for (i = TF_HEAP_ITEM0; i < head->num_items + TF_HEAP_ITEM0; i++) {
+ if (head->item[i].ptr->index != i) {
+ printf("Heap item %d is corrupt ptr->index=%d\n", i, head->item[i].ptr->index);
+ count++;
+ }
+ }
+ TF_BUG_ON(count);
+}
+#endif
+
+static inline
+void tf_heap_downheap(struct tf_heap_child *heap, int last_index, int index)
+{
+ struct tf_heap_child he = heap[index];
+ struct tf_heap_child *minpos, *pos;
+ int c, i, mi;
+
+ while (1) {
+ c = tf_heap_first_child(index);
+ pos = &heap[c];
+
+ /* find minimum child */
+ minpos = pos;
+ mi = 0;
+ if (likely(c + TF_HEAP_D - 1 < last_index)) {
+ for (i = 1, pos++; i < TF_HEAP_D; i++, pos++)
+ if (compare_values(pos->val, minpos->val) < 0)
+ minpos = pos, mi = i;
+ } else if (c < last_index) {
+ for (i = 1, pos++; c + i < last_index; i++, pos++)
+ if (compare_values(pos->val, minpos->val) < 0)
+ minpos = pos, mi = i;
+ } else
+ break;
+
+ if (compare_values(he.val, minpos->val) <= 0)
+ break;
+
+ heap[index] = *minpos;
+ minpos->ptr->index = index;
+ index = c + mi;
+ }
+
+ heap[index] = he;
+ he.ptr->index = index;
+}
+
+static inline
+void tf_heap_upheap(struct tf_heap_child *heap, int index)
+{
+ struct tf_heap_child he = heap[index];
+ int p = tf_heap_parent(index);
+
+ while (likely(index > TF_HEAP_ITEM0) &&
+ compare_values(heap[p].val, he.val) > 0) {
+ heap[index] = heap[p];
+ heap[index].ptr->index = index;
+ index = p;
+ p = tf_heap_parent(p);
+ }
+
+ heap[index] = he;
+ he.ptr->index = index;
+}
+
+static inline
+void tf_heap_heapify(struct tf_heap_head *head, int index)
+{
+ struct tf_heap_child *heap = head->item;
+
+ if (likely(index > TF_HEAP_ITEM0) &&
+ compare_values(heap[index].val, heap[tf_heap_parent(index)].val) <= 0)
+ tf_heap_upheap(heap, index);
+ else
+ tf_heap_downheap(heap, TF_HEAP_ITEM0 + head->num_items, index);
+}
+
+int __tf_heap_grow(struct tf_heap_head *head)
+{
+ void *item;
+
+ if (head->allocated)
+ head->allocated *= 2;
+ else
+ head->allocated = 128;
+
+ item = realloc(head->item, head->allocated * sizeof(head->item[0]));
+ if (item == NULL)
+ return -ENOMEM;
+
+ head->item = item;
+ return 0;
+}
+
+void tf_heap_insert(struct tf_heap_node *node, struct tf_heap_head *head,
+ tf_heap_priority val)
+{
+ int i;
+
+ tf_heap_prealloc(head, head->num_items + 1);
+
+ i = node->index = TF_HEAP_ITEM0 + head->num_items;
+ head->num_items++;
+ head->item[i].ptr = node;
+ head->item[i].val = val;
+ tf_heap_upheap(head->item, i);
+}
+
+void tf_heap_delete(struct tf_heap_node *node, struct tf_heap_head *head)
+{
+ int index = node->index;
+
+ if (index == 0)
+ return;
+
+ head->num_items--;
+ if (likely(index < head->num_items + TF_HEAP_ITEM0)) {
+ head->item[index] = head->item[head->num_items+TF_HEAP_ITEM0];
+ tf_heap_heapify(head, index);
+ }
+ head->item[head->num_items+TF_HEAP_ITEM0].ptr = NULL;
+ node->index = 0;
+}
+
+void tf_heap_change(struct tf_heap_node *node, struct tf_heap_head *head,
+ tf_heap_priority val)
+{
+ if (likely(node->index != 0)) {
+ head->item[node->index].val = val;
+ tf_heap_heapify(head, node->index);
+ } else {
+ tf_heap_insert(node, head, val);
+ }
+}