summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/libtf/defines.h1
-rw-r--r--include/libtf/fiber.h82
-rw-r--r--include/libtf/heap.h10
-rw-r--r--include/libtf/io.h4
-rw-r--r--include/libtf/list.h33
-rw-r--r--include/libtf/scheduler.h64
-rw-r--r--include/libtf/tf.h1
-rw-r--r--src/TFbuild3
-rw-r--r--src/fiber.c327
-rw-r--r--src/io-epoll.c32
-rw-r--r--src/io-unix.c16
-rw-r--r--src/scheduler.c132
-rw-r--r--src/uctx.h82
-rw-r--r--test/httpget.c16
-rw-r--r--test/read.c11
-rw-r--r--test/simple1.c13
-rw-r--r--test/sleep.c17
17 files changed, 533 insertions, 311 deletions
diff --git a/include/libtf/defines.h b/include/libtf/defines.h
index ae72980..8e39c7e 100644
--- a/include/libtf/defines.h
+++ b/include/libtf/defines.h
@@ -62,6 +62,7 @@
__FILE__, __LINE__, __func__, #cond); \
abort(); \
}
+#define TF_BUILD_BUG_ON(cond) ((void) sizeof(struct { char TF_BUILD_BUG_ON[-!!(cond)]; }))
#define TF_ALIGN(size,align) ((((size_t)(size)) + (align)-1) & ~((align)-1))
diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h
index ce3745b..a140607 100644
--- a/include/libtf/fiber.h
+++ b/include/libtf/fiber.h
@@ -1,6 +1,6 @@
-/* fiber.h - libtf fiber scheduler header
+/* fiber.h - libtf fiber manager header
*
- * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
+ * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi>
* All rights reserved.
*
* This program is free software; you can redistribute it and/or modify it
@@ -15,8 +15,6 @@
#include <errno.h>
#include <libtf/defines.h>
-#include <libtf/atomic.h>
-#include <libtf/list.h>
#include <libtf/heap.h>
/* Fiber wakeup reasons */
@@ -27,64 +25,31 @@
#define TF_WAKEUP_THIS_TIMEOUT -ETIMEDOUT
#define TF_WAKEUP_FD -EIO
-struct tf_poll_data {
- int epoll_fd;
- int num_waiters;
-};
-
-/* Scheduler */
-struct tf_fiber;
+/* Fiber management */
+struct tf_scheduler;
+typedef void (*tf_fiber_proc)(void *fiber);
-struct tf_scheduler {
- struct tf_list_head run_q;
- struct tf_heap_head heap;
- struct tf_fiber * active_fiber;
- int num_fibers;
- tf_mtime_t scheduler_time;
- struct tf_poll_data poll_data;
-};
+void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
+void *tf_fiber_create(struct tf_scheduler *sched,
+ tf_fiber_proc fiber_main, int private_size);
+void *tf_fiber_get(void *data);
+void tf_fiber_put(void *data);
+void __tf_fiber_wakeup(void *data, int wakeup_type);
+void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node);
+int __tf_fiber_schedule(void);
+int __tf_fiber_bind_scheduler(struct tf_scheduler *sched);
+int __tf_fiber_release_scheduler(struct tf_scheduler *sched);
-struct tf_main_ctx {
- int argc;
- char ** argv;
-};
+void tf_fiber_exit(void) attribute_noreturn;
+void tf_fiber_kill(void *fiber);
+int tf_fiber_yield(void);
+/* Scheduling and fiber management */
struct tf_timeout {
tf_mtime_t saved_timeout;
unsigned int timeout_change;
};
-static inline
-struct tf_scheduler *tf_get_scheduler(void)
-{
- extern struct tf_scheduler *__tf_scheduler;
- return __tf_scheduler;
-}
-
-static inline
-struct tf_fiber *tf_get_fiber(void)
-{
- return tf_get_scheduler()->active_fiber;
-}
-
-static inline
-tf_mtime_t tf_mtime(void)
-{
- return tf_get_scheduler()->scheduler_time;
-}
-
-/* Fiber creation */
-typedef void (*tf_fiber_proc)(void *fiber);
-int tf_main_args(tf_fiber_proc fiber_main, int argc, char **argv);
-static inline int tf_main(tf_fiber_proc fiber_main)
-{
- return tf_main_args(fiber_main, 0, NULL);
-}
-
-void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
-void *tf_fiber_get(void *data);
-void tf_fiber_put(void *data);
-
#define tf_timed(func, timeout) \
({ \
struct tf_timeout __timeout; \
@@ -92,7 +57,6 @@ void tf_fiber_put(void *data);
tf_timeout_pop(&__timeout, (func)); \
})
-//* Scheduling and fiber management */
void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds);
int __tf_timeout_pop(struct tf_timeout *timeout, int err);
@@ -103,17 +67,11 @@ static inline int tf_timeout_pop(struct tf_timeout *timeout, int err)
return err;
}
-int tf_schedule(void);
-void tf_wakeup(struct tf_fiber *fiber, int wakeup_type);
-void tf_exit(void) attribute_noreturn;
-void tf_kill(void *fiber);
-int tf_yield(void);
-
static inline
int tf_msleep(tf_mtime_diff_t milliseconds)
{
int r;
- r = tf_timed(tf_schedule(), milliseconds);
+ r = tf_timed(__tf_fiber_schedule(), milliseconds);
if (r == TF_WAKEUP_THIS_TIMEOUT)
r = 0;
return r;
diff --git a/include/libtf/heap.h b/include/libtf/heap.h
index a68e01d..3a16159 100644
--- a/include/libtf/heap.h
+++ b/include/libtf/heap.h
@@ -74,4 +74,14 @@ int tf_heap_prealloc(struct tf_heap_head *head, uint32_t size)
return 0;
}
+static inline
+void tf_heap_destroy(struct tf_heap_head *head)
+{
+ if (head->item)
+ free(head->item);
+ head->item = NULL;
+ head->num_items = 0;
+ head->allocated = 0;
+}
+
#endif
diff --git a/include/libtf/io.h b/include/libtf/io.h
index 1f0b793..1f37d81 100644
--- a/include/libtf/io.h
+++ b/include/libtf/io.h
@@ -26,8 +26,6 @@
#define TF_FD_SET_CLOEXEC 4
#define TF_FD_ALREADY_NONBLOCKING 8
-struct tf_fiber;
-
struct tf_sockaddr {
union {
struct sockaddr addr;
@@ -42,7 +40,7 @@ struct tf_fd {
/* Single waiter -- would be relatively trivial to modify to allow
* multiple waiters, if someone actually needs it */
unsigned int events;
- struct tf_fiber *waiting_fiber;
+ void *waiting_fiber;
};
void tf_poll_init(void);
diff --git a/include/libtf/list.h b/include/libtf/list.h
index 22b76a8..f75a0be 100644
--- a/include/libtf/list.h
+++ b/include/libtf/list.h
@@ -142,7 +142,7 @@ static inline void tf_list_add_tail(struct tf_list_node *new, struct tf_list_hea
tf_list_add_before(new, &head->node);
}
-static inline void __tf_list_del(struct tf_list_node * prev, struct tf_list_node *next)
+static inline void __tf_list_del(struct tf_list_node *prev, struct tf_list_node *next)
{
next->prev = prev;
prev->next = next;
@@ -155,6 +155,14 @@ static inline void tf_list_del(struct tf_list_node *entry)
entry->prev = NULL;
}
+static inline struct tf_list_node *tf_list_pop(struct tf_list_head *head)
+{
+ struct tf_list_node *n;
+ n = head->node.next;
+ tf_list_del(n);
+ return n;
+}
+
static inline int tf_list_hashed(const struct tf_list_node *n)
{
return n->next != n && n->next != NULL;
@@ -165,6 +173,29 @@ static inline int tf_list_empty(const struct tf_list_head *h)
return !tf_list_hashed(&h->node);
}
+static inline void __tf_list_splice(const struct tf_list_head *list,
+ struct tf_list_node *prev,
+ struct tf_list_node *next)
+{
+ struct tf_list_node *first = list->node.next;
+ struct tf_list_node *last = list->node.prev;
+
+ first->prev = prev;
+ prev->next = first;
+
+ last->next = next;
+ next->prev = last;
+}
+
+static inline void tf_list_splice_tail(struct tf_list_head *src,
+ struct tf_list_head *dst)
+{
+ if (!tf_list_empty(src)) {
+ __tf_list_splice(src, dst->node.prev, &dst->node);
+ tf_list_init_head(src);
+ }
+}
+
#define tf_list_next(ptr, type, member) \
(tf_list_hashed(ptr) ? container_of((ptr)->next,type,member) : NULL)
diff --git a/include/libtf/scheduler.h b/include/libtf/scheduler.h
new file mode 100644
index 0000000..cc8db70
--- /dev/null
+++ b/include/libtf/scheduler.h
@@ -0,0 +1,64 @@
+/* scheduler.h - libtf fiber scheduler header
+ *
+ * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#ifndef TF_SCHEDULER_H
+#define TF_SCHEDULER_H
+
+#include <libtf/atomic.h>
+#include <libtf/list.h>
+#include <libtf/heap.h>
+#include <libtf/fiber.h>
+
+struct tf_scheduler {
+ struct tf_list_head scheduled_q;
+ struct tf_list_head running_q;
+ struct tf_heap_head heap;
+ void * active_fiber;
+ void * main_fiber;
+ int num_fibers;
+ tf_mtime_t scheduler_time;
+ unsigned long poll_data[2];
+};
+
+static inline
+struct tf_scheduler *tf_scheduler_get_current(void)
+{
+ extern struct tf_scheduler *__tf_scheduler;
+ TF_BUG_ON(__tf_scheduler == NULL);
+ return __tf_scheduler;
+}
+
+static inline
+tf_mtime_t tf_scheduler_get_mtime(void)
+{
+ return tf_scheduler_get_current()->scheduler_time;
+}
+
+struct tf_scheduler *tf_scheduler_create(void);
+int tf_scheduler_enable(struct tf_scheduler *);
+void tf_scheduler_disable(void);
+
+static inline struct tf_scheduler *
+tf_scheduler_get(struct tf_scheduler *s)
+{
+ tf_fiber_get(s);
+ return s;
+}
+
+static inline void
+tf_scheduler_put(struct tf_scheduler *s)
+{
+ tf_fiber_put(s);
+}
+
+#endif
+
diff --git a/include/libtf/tf.h b/include/libtf/tf.h
index 7a089ff..e613f18 100644
--- a/include/libtf/tf.h
+++ b/include/libtf/tf.h
@@ -14,6 +14,7 @@
#define TF_H
#include <libtf/fiber.h>
+#include <libtf/scheduler.h>
#include <libtf/io.h>
#endif
diff --git a/src/TFbuild b/src/TFbuild
index 9b40443..08cb696 100644
--- a/src/TFbuild
+++ b/src/TFbuild
@@ -1,5 +1,6 @@
libs-y += libtf
-libtf-objs-y += fiber.o heap.o io-epoll.o
+libtf-objs-y += fiber.o scheduler.o heap.o
+libtf-objs-$(OS_LINUX) += io-epoll.o
CFLAGS_heap.c += -funroll-all-loops
diff --git a/src/fiber.c b/src/fiber.c
index a7cb6bd..3f58b6d 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -1,4 +1,4 @@
-/* fiber.c - fiber management and scheduling
+/* fiber.c - fiber management
*
* Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
* All rights reserved.
@@ -14,54 +14,74 @@
#include <errno.h>
#include <unistd.h>
#include <libtf/fiber.h>
-#include <libtf/io.h>
+#include <libtf/scheduler.h>
+#include "uctx.h"
#define TF_TIMEOUT_CHANGE_NEEDED 1
#define TF_TIMEOUT_CHANGE_NEW_VALUE 2
struct tf_fiber {
unsigned int ref_count;
+ struct tf_scheduler * scheduler;
int wakeup_type;
unsigned int timeout_change;
tf_mtime_t timeout;
struct tf_list_node queue_node;
struct tf_heap_node heap_node;
+ struct tf_uctx context;
char data[TF_EMPTY_ARRAY];
};
-#include "uctx.h"
-
-/* FIXME: should be in thread local storage */
-struct tf_scheduler *__tf_scheduler;
+static inline
+struct tf_fiber *tf_fiber_get_current(void)
+{
+ void *data = tf_scheduler_get_current()->active_fiber;
+ return container_of(data, struct tf_fiber, data);
+}
-void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
+void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
{
- struct tf_scheduler *sched = tf_get_scheduler();
struct tf_fiber *fiber;
- if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0)
- return NULL;
-
- fiber = tf_uctx_create(fiber_main, private_size);
+ fiber = tf_uctx_create_embedded(
+ TF_STACK_SIZE,
+ sizeof(struct tf_fiber) + private_size,
+ offsetof(struct tf_fiber, context),
+ fiber_main, offsetof(struct tf_fiber, data),
+ tf_fiber_exit);
if (fiber == NULL)
return NULL;
- /* The initial references for caller and scheduler */
*fiber = (struct tf_fiber) {
- .ref_count = 2,
+ .ref_count = 1,
.queue_node = TF_LIST_INITIALIZER(fiber->queue_node),
+ .context = fiber->context,
};
- tf_list_add_tail(&fiber->queue_node, &sched->run_q);
- sched->num_fibers++;
-
return fiber->data;
}
-static void __tf_fiber_destroy(struct tf_fiber *fiber)
+void *tf_fiber_create(
+ struct tf_scheduler *sched,
+ tf_fiber_proc fiber_main, int private_size)
{
- tf_heap_delete(&fiber->heap_node, &tf_get_scheduler()->heap);
- tf_uctx_destroy(fiber);
+ struct tf_fiber *fiber;
+
+ if (sched == NULL)
+ sched = tf_scheduler_get_current();
+
+ if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0)
+ return NULL;
+
+ fiber = container_of(__tf_fiber_create(fiber_main, private_size),
+ struct tf_fiber, data);
+ sched->num_fibers++;
+
+ fiber->scheduler = sched;
+ fiber->wakeup_type = TF_WAKEUP_NONE;
+ tf_list_add_tail(&fiber->queue_node, &sched->scheduled_q);
+
+ return tf_fiber_get(fiber->data);
}
void *tf_fiber_get(void *data)
@@ -71,6 +91,23 @@ void *tf_fiber_get(void *data)
return data;
}
+static void __tf_fiber_destroy(struct tf_fiber *fiber)
+{
+ struct tf_scheduler *sched = fiber->scheduler;
+ int main_fiber;
+
+ main_fiber = (fiber->context.alloc == NULL);
+ tf_heap_delete(&fiber->heap_node, &sched->heap);
+ tf_uctx_destroy(&fiber->context);
+ if (main_fiber)
+ free(fiber);
+ sched->num_fibers--;
+ if (sched->num_fibers == 1) {
+ /* FIXME: Use proper fiber event*/
+ __tf_fiber_wakeup(sched->main_fiber, TF_WAKEUP_IMMEDIATE);
+ }
+}
+
void tf_fiber_put(void *data)
{
struct tf_fiber *fiber = container_of(data, struct tf_fiber, data);
@@ -78,110 +115,144 @@ void tf_fiber_put(void *data)
__tf_fiber_destroy(fiber);
}
-static void update_time(struct tf_scheduler *sched)
+void __tf_fiber_wakeup(void *data, int wakeup_type)
{
- struct timespec ts;
+ struct tf_fiber *fiber = container_of(data, struct tf_fiber, data);
+ struct tf_scheduler *sched = fiber->scheduler;
- clock_gettime(CLOCK_MONOTONIC, &ts);
- sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
+ if (fiber->wakeup_type == TF_WAKEUP_NONE) {
+ fiber->wakeup_type = wakeup_type;
+ tf_list_add_tail(&fiber->queue_node, &sched->running_q);
+ }
}
-static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f)
+void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node)
{
- struct tf_fiber *schedf = container_of((void*) tf_get_scheduler(), struct tf_fiber, data);
-
- sched->active_fiber = f;
- tf_uctx_transfer(schedf, f);
- switch (f->wakeup_type) {
- case TF_WAKEUP_KILL:
- tf_fiber_put(f->data);
- sched->num_fibers--;
- break;
- case TF_WAKEUP_NONE:
- break;
- default:
- TF_BUG_ON("bad scheduler call from fiber");
- }
+ __tf_fiber_wakeup(container_of(node, struct tf_fiber, heap_node)->data,
+ TF_WAKEUP_TIMEOUT);
}
-static void process_heap(struct tf_scheduler *sched)
+static void __tf_fiber_schedule_next(void)
{
- struct tf_heap_node *node;
- struct tf_fiber *f;
- tf_mtime_t now = tf_mtime();
-
- while (!tf_heap_empty(&sched->heap) &&
- tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) {
- node = tf_heap_get_node(&sched->heap);
- f = container_of(node, struct tf_fiber, heap_node);
- if (f->wakeup_type == TF_WAKEUP_NONE)
- f->wakeup_type = TF_WAKEUP_TIMEOUT;
- run_fiber(sched, f);
+ struct tf_scheduler *sched = tf_scheduler_get_current();
+ struct tf_fiber *f = tf_fiber_get_current();
+ struct tf_fiber *nf;
+
+ /* Figure out the next fibre to run */
+ if (unlikely(tf_list_empty(&sched->scheduled_q))) {
+ tf_list_splice_tail(&sched->running_q,
+ &sched->scheduled_q);
+ TF_BUG_ON(tf_list_empty(&sched->scheduled_q));
}
+ nf = tf_list_entry(tf_list_pop(&sched->scheduled_q),
+ struct tf_fiber, queue_node);
+ sched->active_fiber = nf->data;
+ tf_uctx_transfer(&f->context, &nf->context);
}
-static void process_runq(struct tf_scheduler *sched)
+int __tf_fiber_schedule(void)
{
- struct tf_fiber *f;
+ struct tf_scheduler *sched = tf_scheduler_get_current();
+ struct tf_fiber *f = tf_fiber_get_current();
+ int wakeup;
- while (!tf_list_empty(&sched->run_q)) {
- f = tf_list_first(&sched->run_q, struct tf_fiber, queue_node);
- tf_list_del(&f->queue_node);
- run_fiber(sched, f);
+ if (unlikely(f->timeout_change)) {
+ if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) {
+ if (tf_mtime_diff(f->timeout, tf_scheduler_get_mtime()) <= 0) {
+ f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
+ return TF_WAKEUP_TIMEOUT;
+ }
+ tf_heap_change(&f->heap_node, &sched->heap, f->timeout);
+ } else
+ tf_heap_delete(&f->heap_node, &sched->heap);
+ f->timeout_change = 0;
}
+
+ __tf_fiber_schedule_next();
+
+ wakeup = f->wakeup_type;
+ f->wakeup_type = TF_WAKEUP_NONE;
+
+ return wakeup;
}
-int tf_main_args(tf_fiber_proc main_fiber, int argc, char **argv)
+int __tf_fiber_bind_scheduler(struct tf_scheduler *sched)
{
- struct tf_uctx *ctx = alloca(sizeof(struct tf_uctx) + sizeof(struct tf_scheduler));
- struct tf_scheduler *sched = (struct tf_scheduler*) ctx->fiber.data;
- struct tf_main_ctx *mainctx;
- int stack_guard = STACK_GUARD;
-
- ctx->stack_guard = &stack_guard;
- *sched = (struct tf_scheduler){
- .run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q),
+ struct tf_fiber *f;
+
+ f = malloc(sizeof(struct tf_fiber));
+ if (f == NULL)
+ return -ENOMEM;
+
+ /* Mark currently active main fiber as active */
+ *f = (struct tf_fiber) {
+ .ref_count = 1,
+ .scheduler = sched,
+ .queue_node = TF_LIST_INITIALIZER(f->queue_node),
};
+ tf_uctx_create_self(&f->context);
+ sched->main_fiber = f->data;
+ sched->active_fiber = f->data;
+ sched->num_fibers++;
- __tf_scheduler = sched;
- tf_poll_init();
- update_time(sched);
-
- mainctx = tf_fiber_create(main_fiber, sizeof(struct tf_main_ctx));
- mainctx->argc = argc;
- mainctx->argv = argv;
- tf_fiber_put(mainctx);
-
- do {
- tf_mtime_diff_t timeout;
-
- update_time(sched);
- if (!tf_list_empty(&sched->run_q)) {
- timeout = 0;
- } else if (!tf_heap_empty(&sched->heap)) {
- timeout = tf_mtime_diff(tf_heap_get_value(&sched->heap),
- tf_mtime());
- if (timeout < 0)
- timeout = 0;
- } else
- timeout = -1;
+ /* Schedule scheduler fiber */
+ f = container_of((void *) sched, struct tf_fiber, data);
+ f->scheduler = sched;
+ f->wakeup_type = TF_WAKEUP_IMMEDIATE;
+ tf_list_add_tail(&f->queue_node, &sched->running_q);
- if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT && timeout >= 0) {
- sched->scheduler_time += timeout;
- process_heap(sched);
- }
- process_runq(sched);
- } while (likely(sched->num_fibers));
- tf_poll_close();
- __tf_scheduler = NULL;
+ return 0;
+}
+
+int __tf_fiber_release_scheduler(struct tf_scheduler *sched)
+{
+ struct tf_fiber *f;
+
+ /* Detach scheduler */
+ f = container_of((void *) sched, struct tf_fiber, data);
+ tf_list_del(&f->queue_node);
+
+ /* Detach main stack from this scheduler */
+ f = container_of((void *) sched->main_fiber, struct tf_fiber, data);
+ tf_fiber_put(sched->main_fiber);
+ sched->main_fiber = NULL;
+ sched->num_fibers--;
return 0;
}
+void tf_fiber_exit(void)
+{
+ struct tf_scheduler *sched = tf_scheduler_get_current();
+ struct tf_fiber *f = tf_fiber_get_current();
+ struct tf_fiber *schedf = container_of((void *) sched, struct tf_fiber, data);
+
+ tf_heap_delete(&f->heap_node, &sched->heap);
+ schedf->wakeup_type = TF_WAKEUP_KILL;
+ tf_uctx_transfer(&f->context, &schedf->context);
+ TF_BUG_ON(1);
+}
+
+void tf_fiber_kill(void *fiber)
+{
+}
+
+int tf_fiber_yield(void)
+{
+ struct tf_scheduler *sched = tf_scheduler_get_current();
+ struct tf_fiber *f = tf_fiber_get_current();
+
+ TF_BUG_ON(tf_list_hashed(&f->queue_node));
+ f->wakeup_type = TF_WAKEUP_IMMEDIATE;
+ tf_list_add_tail(&f->queue_node, &sched->running_q);
+
+ return __tf_fiber_schedule();
+}
+
void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds)
{
- struct tf_fiber *f = tf_get_fiber();
- tf_mtime_t abs = tf_mtime() + milliseconds;
+ struct tf_fiber *f = tf_fiber_get_current();
+ tf_mtime_t abs = tf_scheduler_get_mtime() + milliseconds;
int active;
if (f->timeout_change)
@@ -207,7 +278,7 @@ void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds)
int __tf_timeout_pop(struct tf_timeout *timeout, int err)
{
- struct tf_fiber *f = tf_get_fiber();
+ struct tf_fiber *f = tf_fiber_get_current();
f->timeout = timeout->saved_timeout;
f->timeout_change = timeout->timeout_change;
@@ -215,61 +286,3 @@ int __tf_timeout_pop(struct tf_timeout *timeout, int err)
err = TF_WAKEUP_THIS_TIMEOUT;
return err;
}
-
-int tf_schedule(void)
-{
- struct tf_scheduler *sched = tf_get_scheduler();
- struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data);
- struct tf_fiber *f = sched->active_fiber;
-
- if (unlikely(f->timeout_change)) {
- if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) {
- if (tf_mtime_diff(f->timeout, tf_mtime()) <= 0) {
- f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
- return TF_WAKEUP_TIMEOUT;
- }
- tf_heap_change(&f->heap_node, &sched->heap, f->timeout);
- } else
- tf_heap_delete(&f->heap_node, &sched->heap);
- f->timeout_change = 0;
- }
- f->wakeup_type = TF_WAKEUP_NONE;
- tf_uctx_transfer(f, schedf);
- return f->wakeup_type;
-}
-
-void tf_wakeup(struct tf_fiber *fiber, int wakeup_type)
-{
- struct tf_scheduler *sched = tf_get_scheduler();
-
- if (fiber->wakeup_type == TF_WAKEUP_NONE) {
- fiber->wakeup_type = wakeup_type;
- tf_list_add_tail(&fiber->queue_node, &sched->run_q);
- }
-}
-
-void tf_exit(void)
-{
- struct tf_scheduler *sched = tf_get_scheduler();
- struct tf_fiber *f = sched->active_fiber;
- struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data);
-
- tf_heap_delete(&f->heap_node, &sched->heap);
- f->wakeup_type = TF_WAKEUP_KILL;
- tf_uctx_transfer(f, schedf);
- TF_BUG_ON(1);
-}
-
-void tf_kill(void *fiber)
-{
-}
-
-int tf_yield(void)
-{
- struct tf_scheduler *sched = tf_get_scheduler();
- struct tf_fiber *f = sched->active_fiber;
-
- tf_list_add_tail(&f->queue_node, &sched->run_q);
- return tf_schedule();
-}
-
diff --git a/src/io-epoll.c b/src/io-epoll.c
index 5e28de8..8ac230f 100644
--- a/src/io-epoll.c
+++ b/src/io-epoll.c
@@ -17,11 +17,23 @@
#include <sys/socket.h>
#include <libtf/io.h>
-#include <libtf/fiber.h>
+#include <libtf/scheduler.h>
+
+struct tf_poll_data {
+ int epoll_fd;
+ int num_waiters;
+};
+
+struct tf_poll_data *tf_epoll_get_data(void)
+{
+ struct tf_scheduler *sched = tf_scheduler_get_current();
+ TF_BUILD_BUG_ON(sizeof(struct tf_poll_data) > sizeof(sched->poll_data));
+ return (struct tf_poll_data *) &sched->poll_data;
+}
static int tf_fd_created(struct tf_fd *fd)
{
- struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+ struct tf_poll_data *pd = tf_epoll_get_data();
struct epoll_event ev;
int r;
@@ -39,7 +51,7 @@ static int tf_fd_created(struct tf_fd *fd)
static int tf_fd_destroyed(struct tf_fd *fd)
{
- struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+ struct tf_poll_data *pd = tf_epoll_get_data();
if (fd->flags & TF_FD_AUTOCLOSE)
return 0;
@@ -50,17 +62,17 @@ static int tf_fd_destroyed(struct tf_fd *fd)
static void tf_fd_monitor(struct tf_fd *fd, int events)
{
- struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+ struct tf_poll_data *pd = tf_epoll_get_data();
TF_BUG_ON(fd->waiting_fiber != NULL);
fd->events = events | EPOLLERR | EPOLLHUP;
- fd->waiting_fiber = tf_get_fiber();
+ fd->waiting_fiber = tf_scheduler_get_current()->active_fiber;
pd->num_waiters++;
}
static void tf_fd_unmonitor(struct tf_fd *fd)
{
- struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+ struct tf_poll_data *pd = tf_epoll_get_data();
fd->waiting_fiber = NULL;
fd->events = 0;
@@ -69,7 +81,7 @@ static void tf_fd_unmonitor(struct tf_fd *fd)
void tf_poll_init(void)
{
- struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+ struct tf_poll_data *pd = tf_epoll_get_data();
pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
pd->num_waiters = 0;
@@ -78,7 +90,7 @@ void tf_poll_init(void)
int tf_poll(tf_mtime_diff_t timeout)
{
- struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+ struct tf_poll_data *pd = tf_epoll_get_data();
struct epoll_event events[64];
struct tf_fd *fd;
int r, i, ret;
@@ -95,7 +107,7 @@ int tf_poll(tf_mtime_diff_t timeout)
for (i = 0; i < r; i++) {
fd = (struct tf_fd *) events[i].data.ptr;
if (likely(fd->events & events[i].events))
- tf_wakeup(fd->waiting_fiber, TF_WAKEUP_FD);
+ __tf_fiber_wakeup(fd->waiting_fiber, TF_WAKEUP_FD);
}
ret = TF_WAKEUP_FD;
timeout = 0;
@@ -106,7 +118,7 @@ int tf_poll(tf_mtime_diff_t timeout)
void tf_poll_close(void)
{
- struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+ struct tf_poll_data *pd = tf_epoll_get_data();
close(pd->epoll_fd);
}
diff --git a/src/io-unix.c b/src/io-unix.c
index ea65a76..39cdf64 100644
--- a/src/io-unix.c
+++ b/src/io-unix.c
@@ -119,7 +119,7 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
continue;
}
- r = tf_schedule();
+ r = __tf_fiber_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
@@ -152,7 +152,7 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
continue;
}
- r = tf_schedule();
+ r = __tf_fiber_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
@@ -174,7 +174,7 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
n = -errno;
break;
}
- n = tf_schedule();
+ n = __tf_fiber_schedule();
} while (n == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
@@ -196,7 +196,7 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
n = -errno;
break;
}
- n = tf_schedule();
+ n = __tf_fiber_schedule();
} while (n == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
@@ -270,7 +270,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
tf_fd_unmonitor(listen_fd);
return -errno;
}
- r = tf_schedule();
+ r = __tf_fiber_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(listen_fd);
if (r < 0)
@@ -293,7 +293,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
/* Wait for socket to become readable */
tf_fd_monitor(fd, EPOLLOUT);
- r = tf_schedule();
+ r = __tf_fiber_schedule();
tf_fd_unmonitor(fd);
if (r != TF_WAKEUP_FD)
return r;
@@ -337,7 +337,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = tf_schedule();
+ r = __tf_fiber_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
@@ -400,7 +400,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = tf_schedule();
+ r = __tf_fiber_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
diff --git a/src/scheduler.c b/src/scheduler.c
new file mode 100644
index 0000000..d287eca
--- /dev/null
+++ b/src/scheduler.c
@@ -0,0 +1,132 @@
+/* scheduler.c - fiber scheduling
+ *
+ * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#include <time.h>
+#include <libtf/scheduler.h>
+#include <libtf/io.h>
+
+/* FIXME: should be in thread local storage */
+struct tf_scheduler *__tf_scheduler;
+
+static void update_time(struct tf_scheduler *sched)
+{
+ struct timespec ts;
+
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
+}
+
+static void process_heap(struct tf_scheduler *sched)
+{
+ struct tf_heap_node *node;
+ tf_mtime_t now = sched->scheduler_time;
+
+ while (!tf_heap_empty(&sched->heap) &&
+ tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) {
+ node = tf_heap_get_node(&sched->heap);
+ tf_heap_delete(node, &sched->heap);
+ __tf_fiber_wakeup_heapnode(node);
+ }
+}
+
+void tf_scheduler_fiber(void *data)
+{
+ struct tf_scheduler *sched = (struct tf_scheduler *) data;
+
+ do {
+ tf_mtime_diff_t timeout;
+
+ update_time(sched);
+ if (!tf_list_empty(&sched->scheduled_q) ||
+ !tf_list_empty(&sched->running_q)) {
+ timeout = 0;
+ } else if (!tf_heap_empty(&sched->heap)) {
+ timeout = tf_mtime_diff(
+ tf_heap_get_value(&sched->heap),
+ tf_scheduler_get_mtime());
+ if (timeout < 0)
+ timeout = 0;
+ } else {
+ timeout = -1;
+ }
+
+ if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT &&
+ timeout >= 0) {
+ sched->scheduler_time += timeout;
+ process_heap(sched);
+ }
+
+ if (tf_fiber_yield() == TF_WAKEUP_KILL) {
+ do {
+ tf_fiber_put(sched->active_fiber);
+ sched->active_fiber = sched;
+ } while (__tf_fiber_schedule() == TF_WAKEUP_KILL);
+ }
+ } while (1);
+}
+
+struct tf_scheduler *tf_scheduler_create(void)
+{
+ struct tf_scheduler *sched;
+
+ sched = __tf_fiber_create(tf_scheduler_fiber,
+ sizeof(struct tf_scheduler));
+
+ *sched = (struct tf_scheduler) {
+ .scheduled_q = TF_LIST_HEAD_INITIALIZER(sched->scheduled_q),
+ .running_q = TF_LIST_HEAD_INITIALIZER(sched->running_q),
+ };
+
+ return sched;
+}
+
+int tf_scheduler_enable(struct tf_scheduler *sched)
+{
+ struct tf_scheduler *s = sched;
+
+ if (s == NULL) {
+ s = tf_scheduler_create();
+ if (s == NULL)
+ return -ENOMEM;
+ }
+ if (s->main_fiber != NULL)
+ return -EBUSY;
+
+ __tf_fiber_bind_scheduler(s);
+ __tf_scheduler = s;
+ tf_poll_init();
+ update_time(s);
+
+ if (sched != NULL)
+ tf_scheduler_get(sched);
+
+ return 0;
+}
+
+void tf_scheduler_disable(void)
+{
+ struct tf_scheduler *sched = __tf_scheduler;
+
+ if (sched == NULL ||
+ sched->main_fiber != sched->active_fiber)
+ return;
+
+ /* sleep until no others */
+ while (sched->num_fibers > 1)
+ __tf_fiber_schedule();
+
+ tf_poll_close();
+ __tf_scheduler = NULL;
+ __tf_fiber_release_scheduler(sched);
+ tf_heap_destroy(&sched->heap);
+ tf_fiber_put(sched);
+}
diff --git a/src/uctx.h b/src/uctx.h
index 541fcf9..e27a53c 100644
--- a/src/uctx.h
+++ b/src/uctx.h
@@ -14,8 +14,12 @@
#include <stdio.h>
#include <stdlib.h>
+
#ifdef VALGRIND
#include <valgrind/valgrind.h>
+#else
+#define VALGRIND_STACK_REGISTER(stack_base, size) 0
+#define VALGRIND_STACK_DEREGISTER(stack_id)
#endif
#define STACK_GUARD 0xbad57ac4
@@ -24,10 +28,7 @@ struct tf_uctx {
int *stack_guard;
void *alloc;
void *current_sp;
-#ifdef VALGRIND
unsigned int stack_id;
-#endif
- struct tf_fiber fiber;
};
#if defined(__i386__)
@@ -86,10 +87,25 @@ static inline void stack_push_ptr(void **stackptr, void *ptr)
}
-static inline
-struct tf_fiber *tf_uctx_create(tf_fiber_proc fiber_main, int private_size)
+static inline void tf_uctx_create_self(struct tf_uctx *uctx)
+{
+ static int dummy_guard = STACK_GUARD;
+
+ *uctx = (struct tf_uctx) {
+ .stack_guard = &dummy_guard,
+ };
+}
+
+static inline void *
+tf_uctx_create_embedded(
+ size_t stack_size,
+ size_t private_size,
+ off_t uctx_offset,
+ void (*stack_frame_main)(void*), off_t main_argument_offset,
+ void (*stack_frame_return)(void))
{
size_t size = TF_STACK_SIZE;
+ void *user_data;
struct tf_uctx *uctx;
void *stack, *stack_base;
@@ -98,46 +114,42 @@ struct tf_fiber *tf_uctx_create(tf_fiber_proc fiber_main, int private_size)
if (stack_base == NULL)
return NULL;
+ /* Create initial stack frame (cdecl convention) */
stack = stack_pointer(stack_base, size);
- private_size += sizeof(struct tf_uctx);
-
- /* Construct inital frame for call the main function and if it
- * happens to return, it'll jump back to tf_exit() which kills
- * the fiber (cdecl calling convetion assumed) */
- uctx = stack_push(&stack, TF_ALIGN(private_size, 64));
- stack_push_ptr(&stack, uctx->fiber.data);
- stack_push_ptr(&stack, &tf_exit);
- stack_push_ptr(&stack, fiber_main);
- uctx->current_sp = stack;
-
-#ifdef VALGRIND
- uctx->stack_id = VALGRIND_STACK_REGISTER(stack_base, size);
-#endif
- uctx->alloc = stack_base;
- uctx->stack_guard = stack_guard(stack_base, size);
+ user_data = stack_push(&stack, TF_ALIGN(private_size, 64));
+ stack_push_ptr(&stack, NULL);
+ stack_push_ptr(&stack, NULL);
+ stack_push_ptr(&stack, NULL);
+ stack_push_ptr(&stack, NULL);
+ stack_push_ptr(&stack, user_data + main_argument_offset);
+ stack_push_ptr(&stack, stack_frame_return);
+ stack_push_ptr(&stack, stack_frame_main);
+
+ uctx = user_data + uctx_offset;
+ *uctx = (struct tf_uctx) {
+ .stack_guard = stack_guard(stack_base, size),
+ .alloc = stack_base,
+ .current_sp = stack,
+ .stack_id = VALGRIND_STACK_REGISTER(stack_base, size),
+ };
*uctx->stack_guard = STACK_GUARD;
- return &uctx->fiber;
+ return user_data;
}
static inline
-void tf_uctx_destroy(struct tf_fiber *fiber)
+void tf_uctx_destroy(struct tf_uctx *uctx)
{
- struct tf_uctx *uctx = container_of(fiber, struct tf_uctx, fiber);
-#ifdef VALGRIND
- VALGRIND_STACK_DEREGISTER(uctx->stack_id);
-#endif
- free(uctx->alloc);
+ if (uctx->alloc != NULL) {
+ VALGRIND_STACK_DEREGISTER(uctx->stack_id);
+ free(uctx->alloc);
+ }
}
static inline
-void tf_uctx_transfer(struct tf_fiber *from, struct tf_fiber *to)
+void tf_uctx_transfer(struct tf_uctx *from, struct tf_uctx *to)
{
-
- struct tf_uctx *ufrom = container_of(from, struct tf_uctx, fiber);
- struct tf_uctx *uto = container_of(to, struct tf_uctx, fiber);
-
/* Switch stack pointers */
- TF_BUG_ON(*ufrom->stack_guard != STACK_GUARD);
- switch_fiber(ufrom, uto);
+ TF_BUG_ON(*from->stack_guard != STACK_GUARD);
+ switch_fiber(from, to);
}
diff --git a/test/httpget.c b/test/httpget.c
index fed6c06..9aec886 100644
--- a/test/httpget.c
+++ b/test/httpget.c
@@ -44,20 +44,16 @@ err:
ctx->hostname, bytes, -r, strerror(-r));
}
-static void init_fiber(void *ptr)
+int main(int argc, char **argv)
{
- struct tf_main_ctx *ctx = (struct tf_main_ctx *) ptr;
struct ctx *c;
int i;
- for (i = 1; i < ctx->argc; i++) {
- c = tf_fiber_create(ping_fiber, sizeof(struct ctx));
- c->hostname = ctx->argv[i];
+ tf_scheduler_enable(NULL);
+ for (i = 1; i < argc; i++) {
+ c = tf_fiber_create(NULL, ping_fiber, sizeof(struct ctx));
+ c->hostname = argv[i];
tf_fiber_put(c);
}
-}
-
-int main(int argc, char **argv)
-{
- return tf_main_args(init_fiber, argc, argv);
+ tf_scheduler_disable();
}
diff --git a/test/read.c b/test/read.c
index 6d8306b..3d318a3 100644
--- a/test/read.c
+++ b/test/read.c
@@ -32,13 +32,10 @@ static void io_fiber(void *ptr)
tf_close(&fin);
}
-static void init_fiber(void *ptr)
-{
- tf_fiber_put(tf_fiber_create(time_fiber, 0));
- tf_fiber_put(tf_fiber_create(io_fiber, 0));
-}
-
int main(int argc, char **argv)
{
- return tf_main(init_fiber);
+ tf_scheduler_enable(NULL);
+ tf_fiber_put(tf_fiber_create(NULL, time_fiber, 0));
+ tf_fiber_put(tf_fiber_create(NULL, io_fiber, 0));
+ tf_scheduler_disable();
}
diff --git a/test/simple1.c b/test/simple1.c
index 65a787c..6a05b7a 100644
--- a/test/simple1.c
+++ b/test/simple1.c
@@ -10,23 +10,20 @@ static void work_fiber(void *ptr)
struct ctx *c = (struct ctx*) ptr;
printf("Hello%d.1\n", c->id);
- tf_yield();
+ tf_fiber_yield();
printf("Hello%d.2\n", c->id);
}
-static void init_fiber(void *ptr)
+int main(int argc, char **argv)
{
struct ctx *c;
int i;
+ tf_scheduler_enable(NULL);
for (i = 0; i < 6; i++) {
- c = tf_fiber_create(work_fiber, sizeof(struct ctx));
+ c = tf_fiber_create(NULL, work_fiber, sizeof(struct ctx));
c->id = i;
tf_fiber_put(c);
}
-}
-
-int main(int argc, char **argv)
-{
- return tf_main(init_fiber);
+ tf_scheduler_disable();
}
diff --git a/test/sleep.c b/test/sleep.c
index 7e39b5c..82399e1 100644
--- a/test/sleep.c
+++ b/test/sleep.c
@@ -8,24 +8,23 @@ struct ctx {
static void work_fiber(void *ptr)
{
//struct ctx *c = (struct ctx*) ptr;
-
tf_msleep(rand() % 5000);
+ printf("one\n");
tf_msleep(rand() % 5000);
+ printf("two\n");
tf_msleep(rand() % 5000);
+ printf("three\n");
}
-static void init_fiber(void *ptr)
+int main(int argc, char **argv)
{
struct ctx *c;
int i;
- for (i = 0; i < 1000; i++) {
- c = tf_fiber_create(work_fiber, sizeof(struct ctx));
+ tf_scheduler_enable(NULL);
+ for (i = 0; i < 100; i++) {
+ c = tf_fiber_create(NULL, work_fiber, sizeof(struct ctx));
tf_fiber_put(c);
}
-}
-
-int main(int argc, char **argv)
-{
- return tf_main(init_fiber);
+ tf_scheduler_disable();
}