summaryrefslogtreecommitdiffstats
path: root/src/fiber.c
diff options
context:
space:
mode:
authorTimo Teräs <timo.teras@iki.fi>2010-07-02 20:23:07 +0300
committerTimo Teräs <timo.teras@iki.fi>2010-07-02 20:25:47 +0300
commit23b95bf1a15322c2f471b80c06cb65d9b2d2a282 (patch)
tree9bf12231db9591852e3b42ca24715d2cbaf6267b /src/fiber.c
parent0183e33d9a4759764716e771b85e19f7a997b8bd (diff)
downloadlibtf-master.tar.bz2
libtf-master.tar.xz
libtf: major redesign startedHEADmaster
the idea is to make libtf completely multi-threaded. meaning each fiber can be running concurrently in separate thread. quite a bit of framework is added for this and some atomic helpers are already introduced. however, io polling is busy polling now (will be soon in own thread) and timeouts are still more or less broken. oh, and the multithreading core is not there yet. basically we are currently mostly broken ;)
Diffstat (limited to 'src/fiber.c')
-rw-r--r--src/fiber.c259
1 files changed, 61 insertions, 198 deletions
diff --git a/src/fiber.c b/src/fiber.c
index e507815..bef7b81 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -13,31 +13,13 @@
#include <time.h>
#include <errno.h>
#include <unistd.h>
+#include <libtf/atomic.h>
#include <libtf/fiber.h>
-#include <libtf/scheduler.h>
+#include <libtf/vmach.h>
#include "uctx.h"
-#define TF_TIMEOUT_CHANGE_NEEDED 1
-#define TF_TIMEOUT_CHANGE_NEW_VALUE 2
-
-struct tf_fiber {
- unsigned int ref_count;
- struct tf_scheduler * scheduler;
- int wakeup_type;
- unsigned int timeout_change;
- tf_mtime_t timeout;
- struct tf_list_node queue_node;
- struct tf_heap_node heap_node;
- struct tf_uctx context;
- char data[TF_EMPTY_ARRAY];
-};
-
-static inline
-struct tf_fiber *tf_fiber_get_current(void)
-{
- void *data = tf_scheduler_get_current()->active_fiber;
- return container_of(data, struct tf_fiber, data);
-}
+#define TF_FIBERF_RUNNING TF_BIT(0)
+#define TF_FIBERF_WAKEUP_PENDING TF_BIT(1)
static void tf_fiber_main(void *user_data, void *arg)
{
@@ -48,45 +30,46 @@ static void tf_fiber_main(void *user_data, void *arg)
tf_fiber_exit();
}
-void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
+void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
{
- struct tf_fiber *fiber;
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ struct tf_fiber *f;
- fiber = tf_uctx_create_embedded(
+ f = tf_uctx_create_embedded(
TF_STACK_SIZE,
sizeof(struct tf_fiber) + private_size,
offsetof(struct tf_fiber, context),
tf_fiber_main, fiber_main);
- if (fiber == NULL)
+ if (f == NULL)
return NULL;
- *fiber = (struct tf_fiber) {
+ *f = (struct tf_fiber) {
.ref_count = 1,
- .queue_node = TF_LIST_INITIALIZER(fiber->queue_node),
- .context = fiber->context,
+ .queue_node = TF_LIST_INITIALIZER(f->queue_node),
+ .context = f->context,
+ .timeout.manager = self ? self->timeout.manager : NULL,
+ .wakeup_q = TF_LIST_HEAD_INITIALIZER(f->wakeup_q),
};
- return fiber->data;
+ return f->data;
}
-void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
+int tf_fiber_run(void *fiber)
{
- struct tf_fiber *fiber;
- struct tf_scheduler *sched;
-
- sched = tf_scheduler_get_current();
- if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0)
- return NULL;
+ struct tf_timeout_manager *tm;
+ struct tf_fiber *f = container_of(fiber, struct tf_fiber, data);
- fiber = container_of(__tf_fiber_create(fiber_main, private_size),
- struct tf_fiber, data);
- sched->num_fibers++;
+ tm = f->timeout.manager;
+ if (tm != NULL) {
+ if (tf_heap_prealloc(&tm->heap, tm->num_fibers + 1) < 0)
+ return -ENOMEM;
+ tm->num_fibers++;
+ }
- fiber->scheduler = sched;
- fiber->wakeup_type = TF_WAKEUP_NONE;
- tf_list_add_tail(&fiber->queue_node, &sched->scheduled_q);
+ tf_fiber_wakeup(f);
+ tf_vmach_get_current()->num_user_fibers++;
- return tf_fiber_get(fiber->data);
+ return 0;
}
void *tf_fiber_get(void *data)
@@ -98,23 +81,8 @@ void *tf_fiber_get(void *data)
static void __tf_fiber_destroy(struct tf_fiber *fiber)
{
- struct tf_scheduler *sched = fiber->scheduler;
- int main_fiber, num_fibers;
-
- /* decrease first the number of fibers as we might be
- * killing the scheduler it self */
- num_fibers = --sched->num_fibers;
-
- main_fiber = (fiber->context.alloc == NULL);
- tf_heap_delete(&fiber->heap_node, &sched->heap);
- tf_uctx_destroy(&fiber->context);
- if (main_fiber)
- free(fiber);
-
- if (num_fibers == 1) {
- /* FIXME: Use proper fiber event*/
- __tf_fiber_wakeup(sched->main_fiber, TF_WAKEUP_IMMEDIATE);
- }
+ tf_heap_delete(&fiber->timeout.heap_node, &fiber->timeout.manager->heap);
+ tf_uctx_destroy(fiber->context);
}
void tf_fiber_put(void *data)
@@ -124,165 +92,60 @@ void tf_fiber_put(void *data)
__tf_fiber_destroy(fiber);
}
-void __tf_fiber_wakeup(void *data, int wakeup_type)
+void tf_fiber_wakeup(struct tf_fiber *f)
{
- struct tf_fiber *fiber = container_of(data, struct tf_fiber, data);
- struct tf_scheduler *sched = fiber->scheduler;
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ unsigned int newval, oldval;
- if (fiber->wakeup_type == TF_WAKEUP_NONE) {
- fiber->wakeup_type = wakeup_type;
- tf_list_add_tail(&fiber->queue_node, &sched->running_q);
- }
-}
+ do {
+ oldval = f->flags;
+ if (oldval & TF_FIBERF_WAKEUP_PENDING)
+ return;
+ newval = oldval | TF_FIBERF_WAKEUP_PENDING | TF_FIBERF_RUNNING;
+ } while (!tf_atomic_cmpxchg(&f->flags, oldval, newval));
-void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node)
-{
- __tf_fiber_wakeup(container_of(node, struct tf_fiber, heap_node)->data,
- TF_WAKEUP_TIMEOUT);
+ if (!(oldval & TF_FIBERF_RUNNING))
+ tf_list_add_tail(&f->queue_node, &self->wakeup_q);
}
-int __tf_fiber_schedule(void)
+int tf_fiber_schedule(void)
{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current(), *nf;
- int wakeup;
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
- if (unlikely(f->timeout_change)) {
- if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) {
- if (tf_mtime_diff(f->timeout, tf_scheduler_get_mtime()) <= 0) {
- f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
- return TF_WAKEUP_TIMEOUT;
- }
- tf_heap_change(&f->heap_node, &sched->heap, f->timeout);
- } else
- tf_heap_delete(&f->heap_node, &sched->heap);
- f->timeout_change = 0;
- }
+ if (f->timeout.value != 0 &&
+ tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0)
+ return -ETIME;
- /* Figure out the next fibre to run */
- if (unlikely(tf_list_empty(&sched->scheduled_q))) {
- tf_list_splice_tail(&sched->running_q,
- &sched->scheduled_q);
- TF_BUG_ON(tf_list_empty(&sched->scheduled_q));
+ if (f->flags & TF_FIBERF_WAKEUP_PENDING) {
+ f->flags = TF_FIBERF_RUNNING;
+ return 0;
}
- nf = tf_list_entry(tf_list_pop(&sched->scheduled_q),
- struct tf_fiber, queue_node);
- sched->active_fiber = nf->data;
- tf_uctx_transfer(&f->context, &nf->context);
-
- wakeup = f->wakeup_type;
- f->wakeup_type = TF_WAKEUP_NONE;
-
- return wakeup;
-}
-
-int __tf_fiber_bind_scheduler(struct tf_scheduler *sched)
-{
- struct tf_fiber *f;
- f = malloc(sizeof(struct tf_fiber));
- if (f == NULL)
- return -ENOMEM;
+ if (f->timeout.value != f->timeout.latched)
+ tf_timeout_adjust(&f->timeout);
- /* Mark currently active main fiber as active */
- *f = (struct tf_fiber) {
- .ref_count = 1,
- .scheduler = sched,
- .queue_node = TF_LIST_INITIALIZER(f->queue_node),
- };
- tf_uctx_create_self(&f->context);
- sched->main_fiber = f->data;
- sched->active_fiber = f->data;
- sched->num_fibers++;
+ f->flags = 0;
+ tf_uctx_transfer(f->context, f->return_context);
+ f->flags = TF_FIBERF_RUNNING;
- /* Schedule scheduler fiber */
- f = container_of((void *) sched, struct tf_fiber, data);
- f->scheduler = sched;
- f->wakeup_type = TF_WAKEUP_IMMEDIATE;
- tf_list_add_tail(&f->queue_node, &sched->running_q);
-
- return 0;
-}
-
-int __tf_fiber_release_scheduler(struct tf_scheduler *sched)
-{
- struct tf_fiber *f;
-
- /* Detach scheduler */
- f = container_of((void *) sched, struct tf_fiber, data);
- tf_list_del(&f->queue_node);
-
- /* Detach main stack from this scheduler */
- f = container_of((void *) sched->main_fiber, struct tf_fiber, data);
- tf_fiber_put(sched->main_fiber);
- sched->main_fiber = NULL;
- sched->num_fibers--;
+ if (f->timeout.value != 0 &&
+ tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0)
+ return -ETIME;
return 0;
}
void tf_fiber_exit(void)
{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current();
- struct tf_fiber *schedf = container_of((void *) sched, struct tf_fiber, data);
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
- tf_heap_delete(&f->heap_node, &sched->heap);
- schedf->wakeup_type = TF_WAKEUP_KILL;
- tf_uctx_transfer(&f->context, &schedf->context);
+ if (f->timeout.manager != NULL)
+ tf_timeout_delete(&f->timeout);
+ tf_vmach_get_current()->num_user_fibers--;
+ tf_uctx_transfer(f->context, f->return_context);
TF_BUG_ON(1);
}
void tf_fiber_kill(void *fiber)
{
}
-
-int tf_fiber_yield(void)
-{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current();
-
- TF_BUG_ON(tf_list_hashed(&f->queue_node));
- f->wakeup_type = TF_WAKEUP_IMMEDIATE;
- tf_list_add_tail(&f->queue_node, &sched->running_q);
-
- return __tf_fiber_schedule();
-}
-
-void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds)
-{
- struct tf_fiber *f = tf_fiber_get_current();
- tf_mtime_t abs = tf_scheduler_get_mtime() + milliseconds;
- int active;
-
- if (f->timeout_change)
- active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE);
- else
- active = tf_heap_node_active(&f->heap_node);
-
- if (!active || tf_mtime_diff(abs, f->timeout) < 0) {
- /* Save previous timeout */
- timeout->saved_timeout = f->timeout;
- timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
- if (active)
- timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE;
-
- /* Make new timeout pending */
- f->timeout = abs;
- f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED
- | TF_TIMEOUT_CHANGE_NEW_VALUE;
- } else {
- timeout->timeout_change = 0;
- }
-}
-
-int __tf_timeout_pop(struct tf_timeout *timeout, int err)
-{
- struct tf_fiber *f = tf_fiber_get_current();
-
- f->timeout = timeout->saved_timeout;
- f->timeout_change = timeout->timeout_change;
- if (err == TF_WAKEUP_TIMEOUT)
- err = TF_WAKEUP_THIS_TIMEOUT;
- return err;
-}