summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--DESIGN77
-rw-r--r--include/libtf/atomic.h11
-rw-r--r--include/libtf/defines.h8
-rw-r--r--include/libtf/fiber.h141
-rw-r--r--include/libtf/io.h29
-rw-r--r--include/libtf/scheduler.h67
-rw-r--r--include/libtf/tf.h2
-rw-r--r--include/libtf/vmach.h49
-rw-r--r--src/TFbuild2
-rw-r--r--src/fiber.c259
-rw-r--r--src/ifc.c57
-rw-r--r--src/io-epoll.c122
-rw-r--r--src/io-unix.c87
-rw-r--r--src/scheduler.c134
-rw-r--r--src/timeout.c120
-rw-r--r--src/uctx.h41
-rw-r--r--src/vmach.c120
-rw-r--r--test/httpget.c9
-rw-r--r--test/read.c8
-rw-r--r--test/simple1.c8
-rw-r--r--test/sleep.c6
21 files changed, 719 insertions, 638 deletions
diff --git a/DESIGN b/DESIGN
index 2ffc535..8624446 100644
--- a/DESIGN
+++ b/DESIGN
@@ -1,19 +1,72 @@
SCHEDULER
-- 4-heap (or 2-heap) with linked nodes
-- epoll
-- edge-triggered monitoring for file i/o (no syscalls to modify fdset)
+- worker thread pool executes read to be run fibers
+- thread pool executes fibers from FIFO queues (one per each priority)
+- possible to make main thread separate scheduler so we can bind some
+ fibers to run in main thread only (not needed?)
+
+THE BIG SCHEDULING QUEUE (shared queue with all worker threads)
+ --- works also between machines, in which case we just always queue
+ work items to remote vmach (need to send wakeups too)
+ - steal first item from the local cache if any
+ - mutex lock
+ - if stuff to push
+ - while idle_vcpu not empty
+ - pop idle_vcpu from list
+ - push item to idle_vcpu
+ - push rest of items to global lists accordingly
+ - mutex unlock
+ - call futex_wake for all idle_cpu's we stuffed
+ - return stolen item
+ - if stuff on list
+ - pop item from list
+ - mutex unlock
+ - return popped_item
+ - insert self to idle_vcpu list
+ - mutex unlock
+ - futex_wait on local_pointer
+ - return local_pointer
+ - mutex unlock
+
+WAKEUP
+ - as minimum keep wakeup bitfield for most common types, and use
+ prepare sleep, and sleep primitives with acceptable wakeup reasons
+ - might add so that wakupper calls wakeupee to execute their wakeup
+ condition check once ask rescheduling based or that or not; this way
+ empty spin does not require pushing to scheduler queue and thread
+ pool wakeup
+
+IO SLEEPING/DISPATCHING
+- epoll in edge-triggered monitoring for file i/o
+- epoll_wait done in separate thread (or possibly as regular fiber)
+ which sole purpose is to wakeup threads for thread pool
- signalfd for signal handling
-- eventfd for thread pool wakeup
+- timerfd for timeout handling
+
+INTER FIBRE CALLS (IFC)
+- sends a callback to be execute under some other fibre
+- executed at tf_ifc_process() points, or at tf_exit() time
+- might need tfc_ifc_wait() for synchronizing with ifc completion
+- sending uses atomic LIFO (single atomic cmpxchg)
+- receive does LIFO flush (single atomic xchg) and reverses order(?)
+- need way to check if IFC is queued or not (so it's safe to reuse it)
+
+TIMEOUTS
+- one (or more) fibers serving as alarm generators
+- 4-heap with mremappable heap array
+- receives IFC calls from other threads
+- receives io wakeups from timerfd
+- sends async signal to fiber after timeout
FIBERS
-- timer node on fiber control data (for delayed heapify; and reuse on timeouts)
- fd, signal, pid wait struct on stack of wait function
- fiber_kill can interrupt wait
-SCHEDULER <-> THREAD POOL
-- scheduler queues to thread pool array fifo queue,
- - semaphore protects threads so no underqueueing happens
- - eventfd notifies scheduler when queue has free space again
- - head/tail updated atomically and item position recovered
-- thread pool atomically pushes to resume atomic stack and sets eventfd
- - scheduler thread atomically pops all and updates fiber states
+FIBRE WAKE UP QUEUES
+- mostly not needed (alarms, io uses async signal wakeups)
+- for mutex, semaphores, possibly send io
+- use thread mutexes for list access
+- wait on single queue only(?)
+
+FIBRE MUTEX
+- simple wakeup queue, sleep on queue, uninterruptible?
+
diff --git a/include/libtf/atomic.h b/include/libtf/atomic.h
index ec9f1e0..8ebe5f8 100644
--- a/include/libtf/atomic.h
+++ b/include/libtf/atomic.h
@@ -13,7 +13,14 @@
#ifndef TF_ATOMIC_H
#define TF_ATOMIC_H
-#define tf_atomic_inc(var) __sync_add_and_fetch(&(var), 1)
-#define tf_atomic_dec(var) __sync_add_and_fetch(&(var), -1)
+#define tf_atomic_inc(var) \
+ __sync_add_and_fetch(&(var), 1)
+#define tf_atomic_dec(var) \
+ __sync_add_and_fetch(&(var), -1)
+
+#define tf_atomic_cmpxchg(ptr, old, new) \
+ __sync_bool_compare_and_swap(ptr, old, new)
+#define tf_atomic_xchg(ptr, new) \
+ ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, new))
#endif
diff --git a/include/libtf/defines.h b/include/libtf/defines.h
index 8e39c7e..44ead8a 100644
--- a/include/libtf/defines.h
+++ b/include/libtf/defines.h
@@ -57,6 +57,7 @@
#define attribute_warn_unused_result __attribute__((warn_unused_result))
#define attribute_deprecated __attribute__((deprecated))
+/* FIXME: glibc fprintf requires 8kB on stack */
#define TF_BUG_ON(cond) if (unlikely(cond)) { \
fprintf(stderr, "BUG: failure at %s:%d/%s(): %s!\n", \
__FILE__, __LINE__, __func__, #cond); \
@@ -68,14 +69,19 @@
#define TF_EMPTY_ARRAY 0
+#define TF_BIT(n) (1 << (n))
+
#ifndef TF_STACK_SIZE
-#define TF_STACK_SIZE 4096
+/* FIXME: glibc fprintf requires 8kB on stack */
+#define TF_STACK_SIZE (4*4096)
#endif
/* Monotonic time */
typedef uint32_t tf_mtime_t;
typedef int32_t tf_mtime_diff_t;
+tf_mtime_t tf_mtime_now(void);
+
static inline
tf_mtime_diff_t tf_mtime_diff(tf_mtime_t a, tf_mtime_t b)
{
diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h
index d5c6153..f97e963 100644
--- a/include/libtf/fiber.h
+++ b/include/libtf/fiber.h
@@ -16,64 +16,127 @@
#include <errno.h>
#include <libtf/defines.h>
#include <libtf/heap.h>
+#include <libtf/list.h>
-/* Fiber wakeup reasons */
-#define TF_WAKEUP_NONE 0
-#define TF_WAKEUP_IMMEDIATE -EAGAIN
-#define TF_WAKEUP_KILL -EINTR
-#define TF_WAKEUP_TIMEOUT -ETIME
-#define TF_WAKEUP_THIS_TIMEOUT -ETIMEDOUT
-#define TF_WAKEUP_FD -EIO
+/* Inter-fibre calls */
+struct tf_fiber;
+struct tf_ifc;
+typedef void (*tf_ifc_handler_t)(void *fiber, struct tf_ifc *msg);
-/* Fiber management */
-struct tf_scheduler;
-typedef void (*tf_fiber_proc)(void *fiber);
+struct tf_ifc {
+ union {
+ struct tf_ifc *next;
+ struct tf_list_node list;
+ };
+ tf_ifc_handler_t handler;
+ struct tf_fiber * sender;
+};
-void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
-void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
-void *tf_fiber_get(void *data);
-void tf_fiber_put(void *data);
-void __tf_fiber_wakeup(void *data, int wakeup_type);
-void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node);
-int __tf_fiber_schedule(void);
-int __tf_fiber_bind_scheduler(struct tf_scheduler *sched);
-int __tf_fiber_release_scheduler(struct tf_scheduler *sched);
+void tf_ifc_queue(void *fiber, struct tf_ifc *msg, tf_ifc_handler_t handler);
+void tf_ifc_complete(void *fiber, struct tf_ifc *msg, tf_ifc_handler_t handler);
+void tf_ifc_process(void);
+void tf_ifc_process_unordered(void);
-void tf_fiber_exit(void) attribute_noreturn;
-void tf_fiber_kill(void *fiber);
-int tf_fiber_yield(void);
+/* Timeouts */
+struct tf_timeout_manager {
+ struct tf_heap_head heap;
+ unsigned int num_fibers;
+};
+
+struct tf_timeout_client {
+ struct tf_timeout_manager * manager;
+ tf_mtime_t value;
+ tf_mtime_t latched;
+ struct tf_heap_node heap_node;
+ struct tf_ifc ifc;
+};
-/* Scheduling and fiber management */
struct tf_timeout {
- tf_mtime_t saved_timeout;
- unsigned int timeout_change;
+ tf_mtime_t saved_timeout;
+ tf_mtime_t my_timeout;
};
-#define tf_timed(func, timeout) \
+static inline int tf_timeout_expired(struct tf_timeout *to)
+{
+ return tf_mtime_diff(to->my_timeout, tf_mtime_now()) <= 0;
+}
+
+struct tf_timeout_fiber *tf_timeout_fiber_create(void);
+void tf_timeout_adjust(struct tf_timeout_client *tc);
+void tf_timeout_delete(struct tf_timeout_client *tc);
+
+#define tf_timed(__to, func, timeout) \
({ \
- struct tf_timeout __timeout; \
- tf_timeout_push(&__timeout, timeout); \
- tf_timeout_pop(&__timeout, (func)); \
+ tf_timeout_push(__to, timeout); \
+ tf_timeout_pop(__to, (func)); \
})
-void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds);
-int __tf_timeout_pop(struct tf_timeout *timeout, int err);
+/* Fibres and their management */
+typedef void *tf_uctx_t;
+
+struct tf_fiber {
+ unsigned int ref_count;
+ unsigned int flags;
+ tf_uctx_t context;
+ tf_uctx_t return_context;
+ struct tf_list_node queue_node;
+ struct tf_list_head wakeup_q;
+ struct tf_timeout_client timeout;
+ struct tf_ifc * pending_ifc;
+ char data[TF_EMPTY_ARRAY];
+};
+
+typedef void (*tf_fiber_proc)(void *fiber);
+
+void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
+void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
+void *tf_fiber_get(void *fiber);
+void tf_fiber_put(void *fiber);
+int tf_fiber_run(void *fiber);
+void tf_fiber_kill(void *fiber);
+
+void tf_fiber_wakeup(struct tf_fiber *fiber);
+int tf_fiber_schedule(void);
-static inline int tf_timeout_pop(struct tf_timeout *timeout, int err)
+void tf_fiber_exit(void) attribute_noreturn;
+
+static inline struct tf_fiber *tf_vmach_get_current_fiber(void)
{
- if (unlikely(timeout->timeout_change))
- return __tf_timeout_pop(timeout, err);
+ extern __thread struct tf_fiber *tf_current_fiber;
+ return tf_current_fiber;
+}
+
+static inline void tf_timeout_push(struct tf_timeout *to, tf_mtime_diff_t ms)
+{
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
+
+ to->saved_timeout = f->timeout.value;
+ to->my_timeout = tf_mtime_now() + ms;
+ if (f->timeout.value == 0 ||
+ tf_mtime_diff(to->my_timeout, f->timeout.value) < 0)
+ f->timeout.value = to->my_timeout;
+}
+
+static inline int tf_timeout_pop(struct tf_timeout *to, int err)
+{
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
+
+ f->timeout.value = to->saved_timeout;
return err;
}
static inline
-int tf_msleep(tf_mtime_diff_t milliseconds)
+int tf_msleep(tf_mtime_diff_t ms)
{
+ struct tf_timeout to;
int r;
- r = tf_timed(__tf_fiber_schedule(), milliseconds);
- if (r == TF_WAKEUP_THIS_TIMEOUT)
- r = 0;
- return r;
+
+ tf_timeout_push(&to, ms);
+ do {
+ r = tf_fiber_schedule();
+ } while (r != -ETIME);
+
+ return tf_timeout_pop(&to, r);
}
#endif
diff --git a/include/libtf/io.h b/include/libtf/io.h
index 0d34421..d1098e3 100644
--- a/include/libtf/io.h
+++ b/include/libtf/io.h
@@ -21,10 +21,12 @@
#include <libtf/defines.h>
/* Flags for tf_open_fd() */
-#define TF_FD_AUTOCLOSE 1
-#define TF_FD_STREAM_ORIENTED 2
-#define TF_FD_SET_CLOEXEC 4
-#define TF_FD_ALREADY_NONBLOCKING 8
+#define TF_FD_READ TF_BIT(0)
+#define TF_FD_WRITE TF_BIT(1)
+#define TF_FD_AUTOCLOSE TF_BIT(2)
+#define TF_FD_STREAM_ORIENTED TF_BIT(3)
+#define TF_FD_SET_CLOEXEC TF_BIT(4)
+#define TF_FD_ALREADY_NONBLOCKING TF_BIT(5)
struct tf_sockaddr {
union {
@@ -37,23 +39,14 @@ struct tf_sockaddr {
struct tf_fd {
int fd;
unsigned int flags;
- /* Single waiter -- would be relatively trivial to modify to allow
- * multiple waiters, if someone actually needs it */
- unsigned int events;
- void *waiting_fiber;
+ struct tf_fiber *fiber;
};
-#define TF_POLL_READ 1
-#define TF_POLL_WRITE 2
-
struct tf_poll_hooks {
- void (*init)(void);
- int (*poll)(tf_mtime_diff_t timeout);
- void (*close)(void);
- int (*fd_created)(struct tf_fd *fd);
- int (*fd_destroyed)(struct tf_fd *fd);
- void (*fd_monitor)(struct tf_fd *fd, int events);
- void (*fd_unmonitor)(struct tf_fd *fd);
+ void * (*create)(void);
+ int (*fd_created)(void *fiber, struct tf_fd *fd);
+ int (*fd_destroyed)(void *fiber, struct tf_fd *fd);
+ void (*fd_rearm)(void *fiber, struct tf_fd *fd);
};
int tf_open_fd(struct tf_fd *fd, int kfd, int flags);
diff --git a/include/libtf/scheduler.h b/include/libtf/scheduler.h
deleted file mode 100644
index db5a823..0000000
--- a/include/libtf/scheduler.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/* scheduler.h - libtf fiber scheduler header
- *
- * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi>
- * All rights reserved.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 or later as
- * published by the Free Software Foundation.
- *
- * See http://www.gnu.org/ for details.
- */
-
-#ifndef TF_SCHEDULER_H
-#define TF_SCHEDULER_H
-
-#include <libtf/atomic.h>
-#include <libtf/list.h>
-#include <libtf/heap.h>
-#include <libtf/fiber.h>
-
-struct tf_poll_hooks;
-
-struct tf_scheduler {
- struct tf_list_head scheduled_q;
- struct tf_list_head running_q;
- struct tf_heap_head heap;
- void * active_fiber;
- void * main_fiber;
- int num_fibers;
- tf_mtime_t scheduler_time;
- struct tf_poll_hooks * poller;
- unsigned long poll_data[2];
-};
-
-static inline
-struct tf_scheduler *tf_scheduler_get_current(void)
-{
- extern struct tf_scheduler *__tf_scheduler;
- TF_BUG_ON(__tf_scheduler == NULL);
- return __tf_scheduler;
-}
-
-static inline
-tf_mtime_t tf_scheduler_get_mtime(void)
-{
- return tf_scheduler_get_current()->scheduler_time;
-}
-
-struct tf_scheduler *tf_scheduler_create(void);
-int tf_scheduler_enable(struct tf_scheduler *);
-void tf_scheduler_disable(void);
-
-static inline struct tf_scheduler *
-tf_scheduler_get(struct tf_scheduler *s)
-{
- tf_fiber_get(s);
- return s;
-}
-
-static inline void
-tf_scheduler_put(struct tf_scheduler *s)
-{
- tf_fiber_put(s);
-}
-
-#endif
-
diff --git a/include/libtf/tf.h b/include/libtf/tf.h
index e613f18..d052ee7 100644
--- a/include/libtf/tf.h
+++ b/include/libtf/tf.h
@@ -14,7 +14,7 @@
#define TF_H
#include <libtf/fiber.h>
-#include <libtf/scheduler.h>
+#include <libtf/vmach.h>
#include <libtf/io.h>
#endif
diff --git a/include/libtf/vmach.h b/include/libtf/vmach.h
new file mode 100644
index 0000000..d302366
--- /dev/null
+++ b/include/libtf/vmach.h
@@ -0,0 +1,49 @@
+/* vmach.h - "virtual" machine and cpu contexts
+ *
+ * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#ifndef TF_VMACH_H
+#define TF_VMACH_H
+
+#include <libtf/fiber.h>
+
+struct tf_vmach {
+ struct tf_poll_hooks * poll_ops;
+ void * poll_fiber;
+ void * timeout_fiber;
+ struct tf_fiber startup_fiber;
+ int num_user_fibers;
+ struct tf_list_head run_q;
+};
+
+struct tf_vcpu {
+ struct tf_vmach * machine;
+};
+
+static inline struct tf_vmach *tf_vmach_get_current(void)
+{
+ extern __thread struct tf_vmach *tf_current_vmach;
+ return tf_current_vmach;
+}
+
+static inline struct tf_vcpu *tf_vmach_get_current_cpu(void)
+{
+ extern __thread struct tf_vcpu *tf_current_vcpu;
+ return tf_current_vcpu;
+}
+
+void tf_vmach_start(void);
+void tf_vmach_stop(void);
+
+void tf_vmach_run(struct tf_vmach *vm, struct tf_fiber *f);
+void tf_vmach_run_dedicated(struct tf_vmach *vm, struct tf_fiber *f);
+
+#endif
diff --git a/src/TFbuild b/src/TFbuild
index 9277f9a..3ffa9ef 100644
--- a/src/TFbuild
+++ b/src/TFbuild
@@ -1,6 +1,6 @@
libs-y += libtf
-libtf-objs-y += fiber.o scheduler.o heap.o
+libtf-objs-y += fiber.o heap.o timeout.o vmach.o ifc.o
libtf-objs-$(OS_LINUX) += io-epoll.o mem-mmap.o
libtf-objs-$(OS_UNIX) += io-unix.o
diff --git a/src/fiber.c b/src/fiber.c
index e507815..bef7b81 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -13,31 +13,13 @@
#include <time.h>
#include <errno.h>
#include <unistd.h>
+#include <libtf/atomic.h>
#include <libtf/fiber.h>
-#include <libtf/scheduler.h>
+#include <libtf/vmach.h>
#include "uctx.h"
-#define TF_TIMEOUT_CHANGE_NEEDED 1
-#define TF_TIMEOUT_CHANGE_NEW_VALUE 2
-
-struct tf_fiber {
- unsigned int ref_count;
- struct tf_scheduler * scheduler;
- int wakeup_type;
- unsigned int timeout_change;
- tf_mtime_t timeout;
- struct tf_list_node queue_node;
- struct tf_heap_node heap_node;
- struct tf_uctx context;
- char data[TF_EMPTY_ARRAY];
-};
-
-static inline
-struct tf_fiber *tf_fiber_get_current(void)
-{
- void *data = tf_scheduler_get_current()->active_fiber;
- return container_of(data, struct tf_fiber, data);
-}
+#define TF_FIBERF_RUNNING TF_BIT(0)
+#define TF_FIBERF_WAKEUP_PENDING TF_BIT(1)
static void tf_fiber_main(void *user_data, void *arg)
{
@@ -48,45 +30,46 @@ static void tf_fiber_main(void *user_data, void *arg)
tf_fiber_exit();
}
-void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
+void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
{
- struct tf_fiber *fiber;
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ struct tf_fiber *f;
- fiber = tf_uctx_create_embedded(
+ f = tf_uctx_create_embedded(
TF_STACK_SIZE,
sizeof(struct tf_fiber) + private_size,
offsetof(struct tf_fiber, context),
tf_fiber_main, fiber_main);
- if (fiber == NULL)
+ if (f == NULL)
return NULL;
- *fiber = (struct tf_fiber) {
+ *f = (struct tf_fiber) {
.ref_count = 1,
- .queue_node = TF_LIST_INITIALIZER(fiber->queue_node),
- .context = fiber->context,
+ .queue_node = TF_LIST_INITIALIZER(f->queue_node),
+ .context = f->context,
+ .timeout.manager = self ? self->timeout.manager : NULL,
+ .wakeup_q = TF_LIST_HEAD_INITIALIZER(f->wakeup_q),
};
- return fiber->data;
+ return f->data;
}
-void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
+int tf_fiber_run(void *fiber)
{
- struct tf_fiber *fiber;
- struct tf_scheduler *sched;
-
- sched = tf_scheduler_get_current();
- if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0)
- return NULL;
+ struct tf_timeout_manager *tm;
+ struct tf_fiber *f = container_of(fiber, struct tf_fiber, data);
- fiber = container_of(__tf_fiber_create(fiber_main, private_size),
- struct tf_fiber, data);
- sched->num_fibers++;
+ tm = f->timeout.manager;
+ if (tm != NULL) {
+ if (tf_heap_prealloc(&tm->heap, tm->num_fibers + 1) < 0)
+ return -ENOMEM;
+ tm->num_fibers++;
+ }
- fiber->scheduler = sched;
- fiber->wakeup_type = TF_WAKEUP_NONE;
- tf_list_add_tail(&fiber->queue_node, &sched->scheduled_q);
+ tf_fiber_wakeup(f);
+ tf_vmach_get_current()->num_user_fibers++;
- return tf_fiber_get(fiber->data);
+ return 0;
}
void *tf_fiber_get(void *data)
@@ -98,23 +81,8 @@ void *tf_fiber_get(void *data)
static void __tf_fiber_destroy(struct tf_fiber *fiber)
{
- struct tf_scheduler *sched = fiber->scheduler;
- int main_fiber, num_fibers;
-
- /* decrease first the number of fibers as we might be
- * killing the scheduler it self */
- num_fibers = --sched->num_fibers;
-
- main_fiber = (fiber->context.alloc == NULL);
- tf_heap_delete(&fiber->heap_node, &sched->heap);
- tf_uctx_destroy(&fiber->context);
- if (main_fiber)
- free(fiber);
-
- if (num_fibers == 1) {
- /* FIXME: Use proper fiber event*/
- __tf_fiber_wakeup(sched->main_fiber, TF_WAKEUP_IMMEDIATE);
- }
+ tf_heap_delete(&fiber->timeout.heap_node, &fiber->timeout.manager->heap);
+ tf_uctx_destroy(fiber->context);
}
void tf_fiber_put(void *data)
@@ -124,165 +92,60 @@ void tf_fiber_put(void *data)
__tf_fiber_destroy(fiber);
}
-void __tf_fiber_wakeup(void *data, int wakeup_type)
+void tf_fiber_wakeup(struct tf_fiber *f)
{
- struct tf_fiber *fiber = container_of(data, struct tf_fiber, data);
- struct tf_scheduler *sched = fiber->scheduler;
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ unsigned int newval, oldval;
- if (fiber->wakeup_type == TF_WAKEUP_NONE) {
- fiber->wakeup_type = wakeup_type;
- tf_list_add_tail(&fiber->queue_node, &sched->running_q);
- }
-}
+ do {
+ oldval = f->flags;
+ if (oldval & TF_FIBERF_WAKEUP_PENDING)
+ return;
+ newval = oldval | TF_FIBERF_WAKEUP_PENDING | TF_FIBERF_RUNNING;
+ } while (!tf_atomic_cmpxchg(&f->flags, oldval, newval));
-void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node)
-{
- __tf_fiber_wakeup(container_of(node, struct tf_fiber, heap_node)->data,
- TF_WAKEUP_TIMEOUT);
+ if (!(oldval & TF_FIBERF_RUNNING))
+ tf_list_add_tail(&f->queue_node, &self->wakeup_q);
}
-int __tf_fiber_schedule(void)
+int tf_fiber_schedule(void)
{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current(), *nf;
- int wakeup;
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
- if (unlikely(f->timeout_change)) {
- if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) {
- if (tf_mtime_diff(f->timeout, tf_scheduler_get_mtime()) <= 0) {
- f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
- return TF_WAKEUP_TIMEOUT;
- }
- tf_heap_change(&f->heap_node, &sched->heap, f->timeout);
- } else
- tf_heap_delete(&f->heap_node, &sched->heap);
- f->timeout_change = 0;
- }
+ if (f->timeout.value != 0 &&
+ tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0)
+ return -ETIME;
- /* Figure out the next fibre to run */
- if (unlikely(tf_list_empty(&sched->scheduled_q))) {
- tf_list_splice_tail(&sched->running_q,
- &sched->scheduled_q);
- TF_BUG_ON(tf_list_empty(&sched->scheduled_q));
+ if (f->flags & TF_FIBERF_WAKEUP_PENDING) {
+ f->flags = TF_FIBERF_RUNNING;
+ return 0;
}
- nf = tf_list_entry(tf_list_pop(&sched->scheduled_q),
- struct tf_fiber, queue_node);
- sched->active_fiber = nf->data;
- tf_uctx_transfer(&f->context, &nf->context);
-
- wakeup = f->wakeup_type;
- f->wakeup_type = TF_WAKEUP_NONE;
-
- return wakeup;
-}
-
-int __tf_fiber_bind_scheduler(struct tf_scheduler *sched)
-{
- struct tf_fiber *f;
- f = malloc(sizeof(struct tf_fiber));
- if (f == NULL)
- return -ENOMEM;
+ if (f->timeout.value != f->timeout.latched)
+ tf_timeout_adjust(&f->timeout);
- /* Mark currently active main fiber as active */
- *f = (struct tf_fiber) {
- .ref_count = 1,
- .scheduler = sched,
- .queue_node = TF_LIST_INITIALIZER(f->queue_node),
- };
- tf_uctx_create_self(&f->context);
- sched->main_fiber = f->data;
- sched->active_fiber = f->data;
- sched->num_fibers++;
+ f->flags = 0;
+ tf_uctx_transfer(f->context, f->return_context);
+ f->flags = TF_FIBERF_RUNNING;
- /* Schedule scheduler fiber */
- f = container_of((void *) sched, struct tf_fiber, data);
- f->scheduler = sched;
- f->wakeup_type = TF_WAKEUP_IMMEDIATE;
- tf_list_add_tail(&f->queue_node, &sched->running_q);
-
- return 0;
-}
-
-int __tf_fiber_release_scheduler(struct tf_scheduler *sched)
-{
- struct tf_fiber *f;
-
- /* Detach scheduler */
- f = container_of((void *) sched, struct tf_fiber, data);
- tf_list_del(&f->queue_node);
-
- /* Detach main stack from this scheduler */
- f = container_of((void *) sched->main_fiber, struct tf_fiber, data);
- tf_fiber_put(sched->main_fiber);
- sched->main_fiber = NULL;
- sched->num_fibers--;
+ if (f->timeout.value != 0 &&
+ tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0)
+ return -ETIME;
return 0;
}
void tf_fiber_exit(void)
{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current();
- struct tf_fiber *schedf = container_of((void *) sched, struct tf_fiber, data);
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
- tf_heap_delete(&f->heap_node, &sched->heap);
- schedf->wakeup_type = TF_WAKEUP_KILL;
- tf_uctx_transfer(&f->context, &schedf->context);
+ if (f->timeout.manager != NULL)
+ tf_timeout_delete(&f->timeout);
+ tf_vmach_get_current()->num_user_fibers--;
+ tf_uctx_transfer(f->context, f->return_context);
TF_BUG_ON(1);
}
void tf_fiber_kill(void *fiber)
{
}
-
-int tf_fiber_yield(void)
-{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current();
-
- TF_BUG_ON(tf_list_hashed(&f->queue_node));
- f->wakeup_type = TF_WAKEUP_IMMEDIATE;
- tf_list_add_tail(&f->queue_node, &sched->running_q);
-
- return __tf_fiber_schedule();
-}
-
-void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds)
-{
- struct tf_fiber *f = tf_fiber_get_current();
- tf_mtime_t abs = tf_scheduler_get_mtime() + milliseconds;
- int active;
-
- if (f->timeout_change)
- active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE);
- else
- active = tf_heap_node_active(&f->heap_node);
-
- if (!active || tf_mtime_diff(abs, f->timeout) < 0) {
- /* Save previous timeout */
- timeout->saved_timeout = f->timeout;
- timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
- if (active)
- timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE;
-
- /* Make new timeout pending */
- f->timeout = abs;
- f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED
- | TF_TIMEOUT_CHANGE_NEW_VALUE;
- } else {
- timeout->timeout_change = 0;
- }
-}
-
-int __tf_timeout_pop(struct tf_timeout *timeout, int err)
-{
- struct tf_fiber *f = tf_fiber_get_current();
-
- f->timeout = timeout->saved_timeout;
- f->timeout_change = timeout->timeout_change;
- if (err == TF_WAKEUP_TIMEOUT)
- err = TF_WAKEUP_THIS_TIMEOUT;
- return err;
-}
diff --git a/src/ifc.c b/src/ifc.c
new file mode 100644
index 0000000..2dfafe8
--- /dev/null
+++ b/src/ifc.c
@@ -0,0 +1,57 @@
+/* ifc.c - inter fiber communications
+ *
+ * Copyright (C) 2010 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#include <libtf/atomic.h>
+#include <libtf/fiber.h>
+
+void tf_ifc_queue(void *fiber, struct tf_ifc *ifc, tf_ifc_handler_t handler)
+{
+ struct tf_fiber *f = container_of(fiber, struct tf_fiber, data);
+ struct tf_ifc *old;
+
+ TF_BUG_ON(ifc->next != NULL);
+ ifc->handler = handler;
+ do {
+ old = f->pending_ifc;
+ ifc->next = old;
+ } while (!tf_atomic_cmpxchg(&f->pending_ifc, old, ifc));
+
+ tf_fiber_wakeup(f);
+}
+
+void tf_ifc_complete(void *fiber, struct tf_ifc *ifc, tf_ifc_handler_t handler)
+{
+ ifc->sender = tf_vmach_get_current_fiber();
+ tf_ifc_queue(fiber, ifc, handler);
+ while (ifc->sender != NULL)
+ tf_fiber_schedule();
+}
+
+void tf_ifc_process_unordered(void)
+{
+ struct tf_fiber *f = tf_vmach_get_current_fiber(), *s;
+ struct tf_ifc *pending, *ifc;
+
+ while (f->pending_ifc != NULL) {
+ pending = tf_atomic_xchg(&f->pending_ifc, NULL);
+ while (pending) {
+ ifc = pending;
+ pending = ifc->next;
+ ifc->handler(f->data, ifc);
+ s = ifc->sender;
+ ifc->next = NULL;
+ ifc->sender = NULL;
+ if (s != NULL)
+ tf_fiber_wakeup(s);
+ }
+ }
+}
diff --git a/src/io-epoll.c b/src/io-epoll.c
index 32aa090..1fc9ca1 100644
--- a/src/io-epoll.c
+++ b/src/io-epoll.c
@@ -16,74 +16,70 @@
#include <sys/epoll.h>
#include <libtf/io.h>
-#include <libtf/scheduler.h>
+#include <libtf/fiber.h>
-struct tf_poll_data {
+struct tf_epoll_data {
int epoll_fd;
- int num_waiters;
};
-static struct tf_poll_data *tf_epoll_get_data(void)
+static void tf_epoll_main(void *ctx)
{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- TF_BUILD_BUG_ON(sizeof(struct tf_poll_data) > sizeof(sched->poll_data));
- return (struct tf_poll_data *) &sched->poll_data;
-}
-
-static void tf_epoll_init(void)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ struct tf_epoll_data *pd = ctx;
+ struct epoll_event events[64];
+ struct tf_fd *fd;
+ int r, i;
- pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
- pd->num_waiters = 0;
- TF_BUG_ON(pd->epoll_fd < 0);
-}
+ do {
+ r = epoll_wait(pd->epoll_fd, events, array_size(events), 0);
+ if (r == 0) {
+ /* FIXME: yielding is bad */
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ tf_list_add_tail(&self->queue_node, &self->wakeup_q);
+ if (tf_fiber_schedule() == 0)
+ continue;
+ }
-static void tf_epoll_close(void)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ for (i = 0; i < r; i++) {
+ fd = (struct tf_fd *) events[i].data.ptr;
+ tf_fiber_wakeup(fd->fiber);
+ }
+ } while (1);
close(pd->epoll_fd);
}
-static int tf_epoll_poll(tf_mtime_diff_t timeout)
+
+static void *tf_epoll_create(void)
{
- struct tf_poll_data *pd = tf_epoll_get_data();
- struct epoll_event events[64];
- struct tf_fd *fd;
- int r, i, ret;
+ struct tf_epoll_data *d;
- if (timeout == 0 && pd->num_waiters == 0)
- return TF_WAKEUP_TIMEOUT;
+ d = tf_fiber_create(tf_epoll_main, sizeof(struct tf_epoll_data));
+ if (d == NULL)
+ return NULL;
- ret = TF_WAKEUP_TIMEOUT;
- do {
- r = epoll_wait(pd->epoll_fd, events, array_size(events), timeout);
- if (r == 0)
- break;
+ d->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ TF_BUG_ON(d->epoll_fd < 0);
- for (i = 0; i < r; i++) {
- fd = (struct tf_fd *) events[i].data.ptr;
- if (likely(fd->events & events[i].events))
- __tf_fiber_wakeup(fd->waiting_fiber, TF_WAKEUP_FD);
- }
- ret = TF_WAKEUP_FD;
- timeout = 0;
- } while (unlikely(r == array_size(events)));
+ tf_fiber_run(tf_fiber_get(d));
- return ret;
+ return d;
}
-static int tf_epoll_fd_created(struct tf_fd *fd)
+static int tf_epoll_fd_created(void *fiber, struct tf_fd *fd)
{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ struct tf_epoll_data *d = fiber;
struct epoll_event ev;
int r;
ev = (struct epoll_event) {
- .events = EPOLLIN | EPOLLOUT | EPOLLET,
+ .events = EPOLLET,
.data.ptr = fd,
};
- r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+ if (fd->flags & TF_FD_READ)
+ ev.events |= EPOLLIN;
+ if (fd->flags & TF_FD_WRITE)
+ ev.events |= EPOLLOUT;
+
+ r = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
if (unlikely(r < 0)) {
TF_BUG_ON(errno == EEXIST);
r = -errno;
@@ -93,46 +89,18 @@ static int tf_epoll_fd_created(struct tf_fd *fd)
return 0;
}
-static int tf_epoll_fd_destroyed(struct tf_fd *fd)
+static int tf_epoll_fd_destroyed(void *fiber, struct tf_fd *fd)
{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ struct tf_epoll_data *d = fiber;
- if (fd->flags & TF_FD_AUTOCLOSE)
- return 0;
+ if (!(fd->flags & TF_FD_AUTOCLOSE))
+ epoll_ctl(d->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
- epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
return 0;
}
-static void tf_epoll_fd_monitor(struct tf_fd *fd, int events)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
-
- TF_BUG_ON(fd->waiting_fiber != NULL);
- fd->events = EPOLLERR | EPOLLHUP;
- if (events & TF_POLL_READ)
- fd->events |= EPOLLIN;
- if (events & TF_POLL_WRITE)
- fd->events |= EPOLLOUT;
- fd->waiting_fiber = tf_scheduler_get_current()->active_fiber;
- pd->num_waiters++;
-}
-
-static void tf_epoll_fd_unmonitor(struct tf_fd *fd)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
-
- fd->waiting_fiber = NULL;
- fd->events = 0;
- pd->num_waiters--;
-}
-
struct tf_poll_hooks tf_epoll_hooks = {
- .init = tf_epoll_init,
- .close = tf_epoll_close,
- .poll = tf_epoll_poll,
+ .create = tf_epoll_create,
.fd_created = tf_epoll_fd_created,
.fd_destroyed = tf_epoll_fd_destroyed,
- .fd_monitor = tf_epoll_fd_monitor,
- .fd_unmonitor = tf_epoll_fd_unmonitor,
};
diff --git a/src/io-unix.c b/src/io-unix.c
index d80592a..7cd4fd0 100644
--- a/src/io-unix.c
+++ b/src/io-unix.c
@@ -17,10 +17,8 @@
#include <string.h>
#include <libtf/io.h>
-#include <libtf/scheduler.h>
-
-#define TF_POLL_HOOKS \
-struct tf_poll_hooks *poller = tf_scheduler_get_current()->poller;
+#include <libtf/fiber.h>
+#include <libtf/vmach.h>
static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
{
@@ -38,14 +36,14 @@ static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
int tf_open_fd(struct tf_fd *fd, int kfd, int flags)
{
- TF_POLL_HOOKS;
+ struct tf_vmach *vm = tf_vmach_get_current();
int r;
fd->fd = kfd;
fd->flags = flags;
- fd->waiting_fiber = NULL;
+ fd->fiber = tf_vmach_get_current_fiber();
- r = poller->fd_created(fd);
+ r = vm->poll_ops->fd_created(vm->poll_fiber, fd);
if (r < 0) {
if (flags & TF_FD_AUTOCLOSE)
close(kfd);
@@ -88,10 +86,10 @@ int tf_open(struct tf_fd *fd, const char *pathname, int flags)
int tf_close(struct tf_fd *fd)
{
- TF_POLL_HOOKS;
+ struct tf_vmach *vm = tf_vmach_get_current();
int r;
- poller->fd_destroyed(fd);
+ vm->poll_ops->fd_destroyed(vm->poll_fiber, fd);
if (fd->flags & TF_FD_AUTOCLOSE) {
r = close(fd->fd);
if (unlikely(r == -1))
@@ -104,11 +102,9 @@ int tf_close(struct tf_fd *fd)
int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- int r;
+ int r = 0;
- poller->fd_monitor(fd, TF_POLL_READ);
do {
n = read(fd->fd, buf, count);
if (n == count) {
@@ -132,20 +128,17 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
continue;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return -r;
}
int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- int r;
+ int r = 0;
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
n = write(fd->fd, buf, count);
if (n == count) {
@@ -166,19 +159,16 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
continue;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return -r;
}
ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- poller->fd_monitor(fd, TF_POLL_READ);
do {
n = read(fd->fd, buf, count);
if (n >= 0)
@@ -189,19 +179,16 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
n = -errno;
break;
}
- n = __tf_fiber_schedule();
- } while (n == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ n = tf_fiber_schedule();
+ } while (n != 0);
return n;
}
ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
n = write(fd->fd, buf, count);
if (n >= 0)
@@ -212,9 +199,8 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
n = -errno;
break;
}
- n = __tf_fiber_schedule();
- } while (n == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ n = tf_fiber_schedule();
+ } while (n != 0);
return n;
}
@@ -261,7 +247,6 @@ int tf_listen(struct tf_fd *fd, int backlog)
int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
struct tf_sockaddr *from)
{
- TF_POLL_HOOKS;
int r, tfdf;
struct sockaddr *addr = NULL;
socklen_t al, *pal = NULL;
@@ -272,24 +257,19 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
pal = &al;
}
- tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED);
- tfdf |= TF_FD_SET_CLOEXEC;
+ tfdf = TF_FD_AUTOCLOSE | TF_FD_ALREADY_NONBLOCKING |
+ (listen_fd->flags & TF_FD_STREAM_ORIENTED);
- poller->fd_monitor(listen_fd, TF_POLL_READ);
do {
- /* FIXME: use accept4 if available */
- r = accept(listen_fd->fd, addr, pal);
+ r = accept4(listen_fd->fd, addr, pal, SOCK_NONBLOCK|SOCK_CLOEXEC);
if (r >= 0)
break;
if (errno == EINTR)
continue;
- if (errno != EAGAIN) {
- poller->fd_unmonitor(listen_fd);
+ if (errno != EAGAIN)
return -errno;
- }
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(listen_fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
if (r < 0)
return r;
@@ -298,7 +278,6 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
{
- TF_POLL_HOOKS;
socklen_t l = sizeof(int);
int r, err;
@@ -310,10 +289,8 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
return -errno;
/* Wait for socket to become readable */
- poller->fd_monitor(fd, TF_POLL_WRITE);
- r = __tf_fiber_schedule();
- poller->fd_unmonitor(fd);
- if (r != TF_WAKEUP_FD)
+ r = tf_fiber_schedule();
+ if (r != 0)
return r;
/* Check for error */
@@ -327,7 +304,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
struct tf_sockaddr *to,
void *buf, size_t len)
{
- TF_POLL_HOOKS;
struct iovec iov;
struct msghdr msg;
struct cmsghdr *cmsg;
@@ -347,7 +323,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
.msg_iovlen = 1,
};
- poller->fd_monitor(fd, TF_POLL_READ);
do {
r = recvmsg(fd->fd, &msg, 0);
if (r >= 0)
@@ -356,9 +331,8 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
if (r < 0 || to == NULL)
return r;
@@ -382,7 +356,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
const struct tf_sockaddr *to,
const void *buf, size_t len)
{
- TF_POLL_HOOKS;
struct msghdr msg;
struct iovec iov;
struct {
@@ -411,7 +384,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
msg.msg_controllen = sizeof(cmsg);
}
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
r = sendmsg(fd->fd, &msg, 0);
if (r >= 0)
@@ -420,9 +392,8 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return r;
}
diff --git a/src/scheduler.c b/src/scheduler.c
deleted file mode 100644
index a103d0a..0000000
--- a/src/scheduler.c
+++ /dev/null
@@ -1,134 +0,0 @@
-/* scheduler.c - fiber scheduling
- *
- * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi>
- * All rights reserved.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 or later as
- * published by the Free Software Foundation.
- *
- * See http://www.gnu.org/ for details.
- */
-
-#include <time.h>
-#include <libtf/scheduler.h>
-#include <libtf/io.h>
-
-/* FIXME: should be in thread local storage */
-extern struct tf_poll_hooks tf_epoll_hooks;
-struct tf_scheduler *__tf_scheduler;
-
-static void update_time(struct tf_scheduler *sched)
-{
- struct timespec ts;
-
- clock_gettime(CLOCK_MONOTONIC, &ts);
- sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
-}
-
-static void process_heap(struct tf_scheduler *sched)
-{
- struct tf_heap_node *node;
- tf_mtime_t now = sched->scheduler_time;
-
- while (!tf_heap_empty(&sched->heap) &&
- tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) {
- node = tf_heap_get_node(&sched->heap);
- tf_heap_delete(node, &sched->heap);
- __tf_fiber_wakeup_heapnode(node);
- }
-}
-
-void tf_scheduler_fiber(void *data)
-{
- struct tf_scheduler *sched = (struct tf_scheduler *) data;
-
- do {
- tf_mtime_diff_t timeout;
-
- update_time(sched);
- if (!tf_list_empty(&sched->scheduled_q) ||
- !tf_list_empty(&sched->running_q)) {
- timeout = 0;
- } else if (!tf_heap_empty(&sched->heap)) {
- timeout = tf_mtime_diff(
- tf_heap_get_value(&sched->heap),
- tf_scheduler_get_mtime());
- if (timeout < 0)
- timeout = 0;
- } else {
- timeout = -1;
- }
-
- if (sched->poller->poll(timeout) == TF_WAKEUP_TIMEOUT &&
- timeout >= 0) {
- sched->scheduler_time += timeout;
- process_heap(sched);
- }
-
- if (tf_fiber_yield() == TF_WAKEUP_KILL) {
- do {
- tf_fiber_put(sched->active_fiber);
- sched->active_fiber = sched;
- } while (__tf_fiber_schedule() == TF_WAKEUP_KILL);
- }
- } while (1);
-}
-
-struct tf_scheduler *tf_scheduler_create(void)
-{
- struct tf_scheduler *sched;
-
- sched = __tf_fiber_create(tf_scheduler_fiber,
- sizeof(struct tf_scheduler));
-
- *sched = (struct tf_scheduler) {
- .scheduled_q = TF_LIST_HEAD_INITIALIZER(sched->scheduled_q),
- .running_q = TF_LIST_HEAD_INITIALIZER(sched->running_q),
- };
-
- return sched;
-}
-
-int tf_scheduler_enable(struct tf_scheduler *sched)
-{
- struct tf_scheduler *s = sched;
-
- if (s == NULL) {
- s = tf_scheduler_create();
- if (s == NULL)
- return -ENOMEM;
- }
- if (s->main_fiber != NULL)
- return -EBUSY;
-
- __tf_fiber_bind_scheduler(s);
- __tf_scheduler = s;
- s->poller = &tf_epoll_hooks;
- s->poller->init();
- update_time(s);
-
- if (sched != NULL)
- tf_scheduler_get(sched);
-
- return 0;
-}
-
-void tf_scheduler_disable(void)
-{
- struct tf_scheduler *sched = __tf_scheduler;
-
- if (sched == NULL ||
- sched->main_fiber != sched->active_fiber)
- return;
-
- /* sleep until no others */
- while (sched->num_fibers > 1)
- __tf_fiber_schedule();
-
- sched->poller->close();
- __tf_scheduler = NULL;
- __tf_fiber_release_scheduler(sched);
- tf_heap_destroy(&sched->heap);
- tf_fiber_put(sched);
-}
diff --git a/src/timeout.c b/src/timeout.c
new file mode 100644
index 0000000..fc7acd2
--- /dev/null
+++ b/src/timeout.c
@@ -0,0 +1,120 @@
+/* timeout.c - timerfd based fiber wakeups
+ *
+ * Copyright (C) 2010 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#include <libtf/fiber.h>
+#include <libtf/io.h>
+#include <sys/timerfd.h>
+
+struct tf_timeout_fiber {
+ struct tf_timeout_manager manager;
+ struct tf_fd fd;
+ tf_mtime_t programmed;
+};
+
+static tf_mtime_t tf_mtime_cached;
+
+tf_mtime_t tf_mtime_now(void)
+{
+ return tf_mtime_cached;
+}
+
+tf_mtime_t tf_mtime_update(void)
+{
+ struct timespec ts;
+
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ tf_mtime_cached = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
+
+ return tf_mtime_cached;
+}
+
+static void tf_timeout_process_heap(struct tf_timeout_fiber *t)
+{
+ struct tf_heap_head *heap = &t->manager.heap;
+ struct tf_heap_node *node;
+ tf_mtime_t now = tf_mtime_update();
+ tf_mtime_diff_t timeout, value;
+
+ while (!tf_heap_empty(heap)) {
+ value = tf_heap_get_value(heap);
+ timeout = tf_mtime_diff(value, now);
+ if (timeout > 0) {
+ if (t->programmed != value) {
+ struct itimerspec its = {
+ .it_value.tv_sec = timeout / 1000,
+ .it_value.tv_nsec = (timeout % 1000) * 1000000,
+ };
+ t->programmed = value;
+ timerfd_settime(t->fd.fd, 0, &its, NULL);
+ }
+ break;
+ }
+
+ node = tf_heap_get_node(heap);
+ tf_heap_delete(node, heap);
+ tf_fiber_wakeup(container_of(node, struct tf_fiber, timeout.heap_node));
+ }
+}
+
+static void tf_timeout_worker(void *ctx)
+{
+ do {
+ tf_ifc_process_unordered();
+ tf_timeout_process_heap(ctx);
+ } while (tf_fiber_schedule() == 0);
+}
+
+struct tf_timeout_fiber *tf_timeout_fiber_create(void)
+{
+ struct tf_timeout_fiber *f;
+
+ f = tf_fiber_create(tf_timeout_worker, sizeof(struct tf_timeout_fiber));
+ if (f == NULL)
+ return f;
+
+ if (tf_open_fd(&f->fd,
+ timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC),
+ TF_FD_READ | TF_FD_ALREADY_NONBLOCKING | TF_FD_AUTOCLOSE) != 0) {
+ tf_fiber_put(f);
+ return NULL;
+ }
+ f->fd.fiber = container_of((void *) f, struct tf_fiber, data);
+ tf_fiber_run(tf_fiber_get(f));
+
+ return f;
+}
+
+static void tf_timeout_adjust_ifc(void *fiber, struct tf_ifc *ifc)
+{
+ struct tf_timeout_fiber *t = fiber;
+ struct tf_timeout_client *c = container_of(ifc, struct tf_timeout_client, ifc);
+ tf_mtime_t val;
+
+ val = c->value;
+ if (val == 0)
+ tf_heap_delete(&c->heap_node, &t->manager.heap);
+ else
+ tf_heap_change(&c->heap_node, &t->manager.heap, val);
+ c->latched = val;
+}
+
+void tf_timeout_adjust(struct tf_timeout_client *tc)
+{
+ tf_ifc_queue(tc->manager, &tc->ifc, tf_timeout_adjust_ifc);
+}
+
+void tf_timeout_delete(struct tf_timeout_client *tc)
+{
+ tc->value = 0;
+ tc->latched = 0;
+ tf_ifc_complete(tc->manager, &tc->ifc, tf_timeout_adjust_ifc);
+}
diff --git a/src/uctx.h b/src/uctx.h
index 3ec30c4..353b732 100644
--- a/src/uctx.h
+++ b/src/uctx.h
@@ -18,19 +18,18 @@
#ifdef VALGRIND
#include <valgrind/valgrind.h>
-#else
-#define VALGRIND_STACK_REGISTER(stack_base, size) 0
-#define VALGRIND_STACK_DEREGISTER(stack_id)
#endif
#define STACK_GUARD 0xbad57ac4
struct tf_uctx {
- int *stack_guard;
- size_t size;
- void *alloc;
- void *current_sp;
- unsigned int stack_id;
+ int * stack_guard;
+ size_t size;
+ void * alloc;
+ void * current_sp;
+#ifdef VALGRIND
+ unsigned int stack_id;
+#endif
};
#if defined(__i386__)
@@ -89,13 +88,14 @@ static inline void stack_push_ptr(void **stackptr, void *ptr)
}
-static inline void tf_uctx_create_self(struct tf_uctx *uctx)
+static inline tf_uctx_t tf_uctx_create_self(struct tf_uctx *uctx)
{
static int dummy_guard = STACK_GUARD;
*uctx = (struct tf_uctx) {
.stack_guard = &dummy_guard,
};
+ return uctx;
}
static inline void *
@@ -118,20 +118,24 @@ tf_uctx_create_embedded(
/* Create initial stack frame (cdecl convention) */
stack = stack_pointer(stack_base, size);
- user_data = stack_push(&stack, TF_ALIGN(private_size, 64));
+ user_data = stack_push(&stack, TF_ALIGN(private_size, 16));
+ uctx = stack_push(&stack, TF_ALIGN(sizeof(struct tf_uctx), 16));
stack_push_ptr(&stack, main_argument);
stack_push_ptr(&stack, user_data);
stack_push_ptr(&stack, NULL);
stack_push_ptr(&stack, stack_frame_main); /* eip */
stack_push_ptr(&stack, NULL); /* ebp */
- uctx = user_data + uctx_offset;
+ *((tf_uctx_t *) (user_data + uctx_offset)) = uctx;
+
*uctx = (struct tf_uctx) {
.stack_guard = stack_guard(stack_base, size),
.alloc = stack_base,
.size = size,
.current_sp = stack,
+#ifdef VALGRIND
.stack_id = VALGRIND_STACK_REGISTER(stack_base, stack_base+size),
+#endif
};
*uctx->stack_guard = STACK_GUARD;
@@ -139,18 +143,25 @@ tf_uctx_create_embedded(
}
static inline
-void tf_uctx_destroy(struct tf_uctx *uctx)
+void tf_uctx_destroy(tf_uctx_t ctx)
{
+ struct tf_uctx *uctx = ctx;
+
if (uctx->alloc != NULL) {
+#ifdef VALGRIND
VALGRIND_STACK_DEREGISTER(uctx->stack_id);
+#endif
tf_bmem_free(uctx->alloc, uctx->size);
}
}
static inline
-void tf_uctx_transfer(struct tf_uctx *from, struct tf_uctx *to)
+void tf_uctx_transfer(tf_uctx_t from, tf_uctx_t to)
{
+ struct tf_uctx *ufrom = from;
+ struct tf_uctx *uto = to;
+
/* Switch stack pointers */
- TF_BUG_ON(*from->stack_guard != STACK_GUARD);
- switch_fiber(from, to);
+ TF_BUG_ON(*ufrom->stack_guard != STACK_GUARD);
+ switch_fiber(ufrom, uto);
}
diff --git a/src/vmach.c b/src/vmach.c
new file mode 100644
index 0000000..a9ca446
--- /dev/null
+++ b/src/vmach.c
@@ -0,0 +1,120 @@
+#include <libtf/defines.h>
+#include <libtf/list.h>
+#include <libtf/vmach.h>
+#include <libtf/io.h>
+#include "uctx.h"
+
+__thread struct tf_fiber *tf_current_fiber;
+__thread struct tf_vcpu *tf_current_vcpu;
+__thread struct tf_vmach *tf_current_vmach;
+
+extern struct tf_poll_hooks tf_epoll_hooks;
+
+struct tf_vmachine {
+ struct tf_vmach vmach;
+ struct tf_uctx startup_uctx;
+ void *machine_init_fiber;
+};
+
+static void tf_vcpu_main(void *fiber_data)
+{
+ struct tf_vcpu *vcpu = fiber_data;
+ struct tf_vmach *vmach = vcpu->machine;
+ struct tf_fiber *self = container_of(fiber_data, struct tf_fiber, data);
+ struct tf_fiber *f;
+
+ tf_current_vmach = vmach;
+ tf_current_vcpu = vcpu;
+
+ while (vmach->num_user_fibers != 0) {
+ if (tf_list_empty(&vmach->run_q)) {
+ /* sleep */
+ continue;
+ }
+
+ f = tf_list_entry(tf_list_pop(&vmach->run_q),
+ struct tf_fiber, queue_node);
+
+ f->return_context = self->context;
+ tf_current_fiber = f;
+ tf_uctx_transfer(self->context, f->context);
+ tf_list_splice_tail(&f->wakeup_q, &vmach->run_q);
+ }
+}
+
+static void tf_vmach_main(void *fiber_data)
+{
+ struct tf_fiber *self = container_of(fiber_data, struct tf_fiber, data);
+ struct tf_vcpu *vcpu = fiber_data;
+ struct tf_vmach *vmach = vcpu->machine;
+
+ tf_current_vmach = vmach;
+ tf_current_vcpu = vcpu;
+ tf_current_fiber = self;
+
+ /* Initialize IO subsystem */
+ vmach->poll_ops = &tf_epoll_hooks;
+ vmach->poll_fiber = vmach->poll_ops->create();
+ vmach->timeout_fiber = tf_timeout_fiber_create();
+ vmach->startup_fiber.timeout.manager = vmach->timeout_fiber;
+
+ /* Run the initial fiber */
+ tf_fiber_wakeup(&vmach->startup_fiber);
+
+ /* Use main thread as a regular vcpu */
+ vmach->num_user_fibers = 1;
+ tf_list_splice_tail(&self->wakeup_q, &vmach->run_q);
+ tf_vcpu_main(vcpu);
+
+ /* Kill all stuff */
+
+ /* Return to main fiber */
+ vmach->startup_fiber.return_context = NULL;
+ tf_current_fiber = NULL;
+ tf_current_vcpu = NULL;
+ tf_current_vmach = NULL;
+
+ tf_uctx_transfer(self->context, vmach->startup_fiber.context);
+}
+
+void tf_vmach_start(void)
+{
+ struct tf_vmachine *vmach;
+ struct tf_vcpu *vcpu;
+
+ TF_BUG_ON(tf_current_vcpu != NULL);
+
+ /* Create a self-fiber so we can surrender control to vcpu */
+ vmach = calloc(1, sizeof(struct tf_vmachine));
+ vmach->vmach.startup_fiber = (struct tf_fiber) {
+ .ref_count = 1,
+ .queue_node = TF_LIST_INITIALIZER(vmach->vmach.startup_fiber.queue_node),
+ .wakeup_q = TF_LIST_HEAD_INITIALIZER(vmach->vmach.startup_fiber.wakeup_q),
+ .context = tf_uctx_create_self(&vmach->startup_uctx),
+ };
+ tf_list_init_head(&vmach->vmach.run_q);
+ vcpu = tf_fiber_create(tf_vmach_main, sizeof(struct tf_vcpu));
+ vmach->machine_init_fiber = vcpu;
+ vcpu->machine = &vmach->vmach;
+
+ /* Create manager fiber to initialize vcpu */
+ tf_uctx_transfer(vmach->vmach.startup_fiber.context,
+ container_of((void *) vcpu, struct tf_fiber, data)->context);
+}
+
+void tf_vmach_stop(void)
+{
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ struct tf_vmach *vmach = tf_vmach_get_current();
+
+ TF_BUG_ON(self != &vmach->startup_fiber);
+
+ /* Wait for the vmachine to stop */
+ tf_vmach_get_current()->num_user_fibers--;
+ while (self->return_context != NULL)
+ tf_fiber_schedule();
+
+ /* And clean up */
+ tf_uctx_destroy(vmach->startup_fiber.context);
+ free(vmach);
+}
diff --git a/test/httpget.c b/test/httpget.c
index 29f7f14..876b63c 100644
--- a/test/httpget.c
+++ b/test/httpget.c
@@ -14,6 +14,7 @@ static void ping_fiber(void *ptr)
struct ctx *ctx = (struct ctx*) ptr;
struct tf_sockaddr host;
struct tf_fd fd;
+ struct tf_timeout to;
char buf[128];
int bytes = 0, r;
const char *req = "GET / HTTP/1.0\r\n\r\n";
@@ -27,7 +28,7 @@ static void ping_fiber(void *ptr)
if (r < 0)
goto err;
- r = tf_timed(tf_connect(&fd, &host), 10000);
+ r = tf_timed(&to, tf_connect(&fd, &host), 10000);
if (r < 0)
goto err_close;
@@ -49,11 +50,11 @@ int main(int argc, char **argv)
struct ctx *c;
int i;
- tf_scheduler_enable(NULL);
+ tf_vmach_start();
for (i = 1; i < argc; i++) {
c = tf_fiber_create(ping_fiber, sizeof(struct ctx));
c->hostname = argv[i];
- tf_fiber_put(c);
+ tf_fiber_run(c);
}
- tf_scheduler_disable();
+ tf_vmach_stop();
}
diff --git a/test/read.c b/test/read.c
index 0dc72cc..9963a62 100644
--- a/test/read.c
+++ b/test/read.c
@@ -34,8 +34,8 @@ static void io_fiber(void *ptr)
int main(int argc, char **argv)
{
- tf_scheduler_enable(NULL);
- tf_fiber_put(tf_fiber_create(time_fiber, 0));
- tf_fiber_put(tf_fiber_create(io_fiber, 0));
- tf_scheduler_disable();
+ tf_vmach_start();
+ tf_fiber_run(tf_fiber_create(time_fiber, 0));
+ tf_fiber_run(tf_fiber_create(io_fiber, 0));
+ tf_vmach_stop();
}
diff --git a/test/simple1.c b/test/simple1.c
index 6f40f8d..b6de289 100644
--- a/test/simple1.c
+++ b/test/simple1.c
@@ -10,7 +10,7 @@ static void work_fiber(void *ptr)
struct ctx *c = (struct ctx*) ptr;
printf("Hello%d.1\n", c->id);
- tf_fiber_yield();
+ tf_msleep(1);
printf("Hello%d.2\n", c->id);
}
@@ -19,11 +19,11 @@ int main(int argc, char **argv)
struct ctx *c;
int i;
- tf_scheduler_enable(NULL);
+ tf_vmach_start();
for (i = 0; i < 6; i++) {
c = tf_fiber_create(work_fiber, sizeof(struct ctx));
c->id = i;
- tf_fiber_put(c);
+ tf_fiber_run(c);
}
- tf_scheduler_disable();
+ tf_vmach_stop();
}
diff --git a/test/sleep.c b/test/sleep.c
index 8e225a7..dd71d91 100644
--- a/test/sleep.c
+++ b/test/sleep.c
@@ -21,10 +21,10 @@ int main(int argc, char **argv)
struct ctx *c;
int i;
- tf_scheduler_enable(NULL);
+ tf_vmach_start();
for (i = 0; i < 1000; i++) {
c = tf_fiber_create(work_fiber, sizeof(struct ctx));
- tf_fiber_put(c);
+ tf_fiber_run(c);
}
- tf_scheduler_disable();
+ tf_vmach_stop();
}