summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimo Teräs <timo.teras@iki.fi>2010-07-02 20:23:07 +0300
committerTimo Teräs <timo.teras@iki.fi>2010-07-02 20:25:47 +0300
commit23b95bf1a15322c2f471b80c06cb65d9b2d2a282 (patch)
tree9bf12231db9591852e3b42ca24715d2cbaf6267b
parent0183e33d9a4759764716e771b85e19f7a997b8bd (diff)
downloadlibtf-master.tar.bz2
libtf-master.tar.xz
libtf: major redesign startedHEADmaster
the idea is to make libtf completely multi-threaded. meaning each fiber can be running concurrently in separate thread. quite a bit of framework is added for this and some atomic helpers are already introduced. however, io polling is busy polling now (will be soon in own thread) and timeouts are still more or less broken. oh, and the multithreading core is not there yet. basically we are currently mostly broken ;)
-rw-r--r--DESIGN77
-rw-r--r--include/libtf/atomic.h11
-rw-r--r--include/libtf/defines.h8
-rw-r--r--include/libtf/fiber.h141
-rw-r--r--include/libtf/io.h29
-rw-r--r--include/libtf/scheduler.h67
-rw-r--r--include/libtf/tf.h2
-rw-r--r--include/libtf/vmach.h49
-rw-r--r--src/TFbuild2
-rw-r--r--src/fiber.c259
-rw-r--r--src/ifc.c57
-rw-r--r--src/io-epoll.c122
-rw-r--r--src/io-unix.c87
-rw-r--r--src/scheduler.c134
-rw-r--r--src/timeout.c120
-rw-r--r--src/uctx.h41
-rw-r--r--src/vmach.c120
-rw-r--r--test/httpget.c9
-rw-r--r--test/read.c8
-rw-r--r--test/simple1.c8
-rw-r--r--test/sleep.c6
21 files changed, 719 insertions, 638 deletions
diff --git a/DESIGN b/DESIGN
index 2ffc535..8624446 100644
--- a/DESIGN
+++ b/DESIGN
@@ -1,19 +1,72 @@
SCHEDULER
-- 4-heap (or 2-heap) with linked nodes
-- epoll
-- edge-triggered monitoring for file i/o (no syscalls to modify fdset)
+- worker thread pool executes read to be run fibers
+- thread pool executes fibers from FIFO queues (one per each priority)
+- possible to make main thread separate scheduler so we can bind some
+ fibers to run in main thread only (not needed?)
+
+THE BIG SCHEDULING QUEUE (shared queue with all worker threads)
+ --- works also between machines, in which case we just always queue
+ work items to remote vmach (need to send wakeups too)
+ - steal first item from the local cache if any
+ - mutex lock
+ - if stuff to push
+ - while idle_vcpu not empty
+ - pop idle_vcpu from list
+ - push item to idle_vcpu
+ - push rest of items to global lists accordingly
+ - mutex unlock
+ - call futex_wake for all idle_cpu's we stuffed
+ - return stolen item
+ - if stuff on list
+ - pop item from list
+ - mutex unlock
+ - return popped_item
+ - insert self to idle_vcpu list
+ - mutex unlock
+ - futex_wait on local_pointer
+ - return local_pointer
+ - mutex unlock
+
+WAKEUP
+ - as minimum keep wakeup bitfield for most common types, and use
+ prepare sleep, and sleep primitives with acceptable wakeup reasons
+ - might add so that wakupper calls wakeupee to execute their wakeup
+ condition check once ask rescheduling based or that or not; this way
+ empty spin does not require pushing to scheduler queue and thread
+ pool wakeup
+
+IO SLEEPING/DISPATCHING
+- epoll in edge-triggered monitoring for file i/o
+- epoll_wait done in separate thread (or possibly as regular fiber)
+ which sole purpose is to wakeup threads for thread pool
- signalfd for signal handling
-- eventfd for thread pool wakeup
+- timerfd for timeout handling
+
+INTER FIBRE CALLS (IFC)
+- sends a callback to be execute under some other fibre
+- executed at tf_ifc_process() points, or at tf_exit() time
+- might need tfc_ifc_wait() for synchronizing with ifc completion
+- sending uses atomic LIFO (single atomic cmpxchg)
+- receive does LIFO flush (single atomic xchg) and reverses order(?)
+- need way to check if IFC is queued or not (so it's safe to reuse it)
+
+TIMEOUTS
+- one (or more) fibers serving as alarm generators
+- 4-heap with mremappable heap array
+- receives IFC calls from other threads
+- receives io wakeups from timerfd
+- sends async signal to fiber after timeout
FIBERS
-- timer node on fiber control data (for delayed heapify; and reuse on timeouts)
- fd, signal, pid wait struct on stack of wait function
- fiber_kill can interrupt wait
-SCHEDULER <-> THREAD POOL
-- scheduler queues to thread pool array fifo queue,
- - semaphore protects threads so no underqueueing happens
- - eventfd notifies scheduler when queue has free space again
- - head/tail updated atomically and item position recovered
-- thread pool atomically pushes to resume atomic stack and sets eventfd
- - scheduler thread atomically pops all and updates fiber states
+FIBRE WAKE UP QUEUES
+- mostly not needed (alarms, io uses async signal wakeups)
+- for mutex, semaphores, possibly send io
+- use thread mutexes for list access
+- wait on single queue only(?)
+
+FIBRE MUTEX
+- simple wakeup queue, sleep on queue, uninterruptible?
+
diff --git a/include/libtf/atomic.h b/include/libtf/atomic.h
index ec9f1e0..8ebe5f8 100644
--- a/include/libtf/atomic.h
+++ b/include/libtf/atomic.h
@@ -13,7 +13,14 @@
#ifndef TF_ATOMIC_H
#define TF_ATOMIC_H
-#define tf_atomic_inc(var) __sync_add_and_fetch(&(var), 1)
-#define tf_atomic_dec(var) __sync_add_and_fetch(&(var), -1)
+#define tf_atomic_inc(var) \
+ __sync_add_and_fetch(&(var), 1)
+#define tf_atomic_dec(var) \
+ __sync_add_and_fetch(&(var), -1)
+
+#define tf_atomic_cmpxchg(ptr, old, new) \
+ __sync_bool_compare_and_swap(ptr, old, new)
+#define tf_atomic_xchg(ptr, new) \
+ ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, new))
#endif
diff --git a/include/libtf/defines.h b/include/libtf/defines.h
index 8e39c7e..44ead8a 100644
--- a/include/libtf/defines.h
+++ b/include/libtf/defines.h
@@ -57,6 +57,7 @@
#define attribute_warn_unused_result __attribute__((warn_unused_result))
#define attribute_deprecated __attribute__((deprecated))
+/* FIXME: glibc fprintf requires 8kB on stack */
#define TF_BUG_ON(cond) if (unlikely(cond)) { \
fprintf(stderr, "BUG: failure at %s:%d/%s(): %s!\n", \
__FILE__, __LINE__, __func__, #cond); \
@@ -68,14 +69,19 @@
#define TF_EMPTY_ARRAY 0
+#define TF_BIT(n) (1 << (n))
+
#ifndef TF_STACK_SIZE
-#define TF_STACK_SIZE 4096
+/* FIXME: glibc fprintf requires 8kB on stack */
+#define TF_STACK_SIZE (4*4096)
#endif
/* Monotonic time */
typedef uint32_t tf_mtime_t;
typedef int32_t tf_mtime_diff_t;
+tf_mtime_t tf_mtime_now(void);
+
static inline
tf_mtime_diff_t tf_mtime_diff(tf_mtime_t a, tf_mtime_t b)
{
diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h
index d5c6153..f97e963 100644
--- a/include/libtf/fiber.h
+++ b/include/libtf/fiber.h
@@ -16,64 +16,127 @@
#include <errno.h>
#include <libtf/defines.h>
#include <libtf/heap.h>
+#include <libtf/list.h>
-/* Fiber wakeup reasons */
-#define TF_WAKEUP_NONE 0
-#define TF_WAKEUP_IMMEDIATE -EAGAIN
-#define TF_WAKEUP_KILL -EINTR
-#define TF_WAKEUP_TIMEOUT -ETIME
-#define TF_WAKEUP_THIS_TIMEOUT -ETIMEDOUT
-#define TF_WAKEUP_FD -EIO
+/* Inter-fibre calls */
+struct tf_fiber;
+struct tf_ifc;
+typedef void (*tf_ifc_handler_t)(void *fiber, struct tf_ifc *msg);
-/* Fiber management */
-struct tf_scheduler;
-typedef void (*tf_fiber_proc)(void *fiber);
+struct tf_ifc {
+ union {
+ struct tf_ifc *next;
+ struct tf_list_node list;
+ };
+ tf_ifc_handler_t handler;
+ struct tf_fiber * sender;
+};
-void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
-void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
-void *tf_fiber_get(void *data);
-void tf_fiber_put(void *data);
-void __tf_fiber_wakeup(void *data, int wakeup_type);
-void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node);
-int __tf_fiber_schedule(void);
-int __tf_fiber_bind_scheduler(struct tf_scheduler *sched);
-int __tf_fiber_release_scheduler(struct tf_scheduler *sched);
+void tf_ifc_queue(void *fiber, struct tf_ifc *msg, tf_ifc_handler_t handler);
+void tf_ifc_complete(void *fiber, struct tf_ifc *msg, tf_ifc_handler_t handler);
+void tf_ifc_process(void);
+void tf_ifc_process_unordered(void);
-void tf_fiber_exit(void) attribute_noreturn;
-void tf_fiber_kill(void *fiber);
-int tf_fiber_yield(void);
+/* Timeouts */
+struct tf_timeout_manager {
+ struct tf_heap_head heap;
+ unsigned int num_fibers;
+};
+
+struct tf_timeout_client {
+ struct tf_timeout_manager * manager;
+ tf_mtime_t value;
+ tf_mtime_t latched;
+ struct tf_heap_node heap_node;
+ struct tf_ifc ifc;
+};
-/* Scheduling and fiber management */
struct tf_timeout {
- tf_mtime_t saved_timeout;
- unsigned int timeout_change;
+ tf_mtime_t saved_timeout;
+ tf_mtime_t my_timeout;
};
-#define tf_timed(func, timeout) \
+static inline int tf_timeout_expired(struct tf_timeout *to)
+{
+ return tf_mtime_diff(to->my_timeout, tf_mtime_now()) <= 0;
+}
+
+struct tf_timeout_fiber *tf_timeout_fiber_create(void);
+void tf_timeout_adjust(struct tf_timeout_client *tc);
+void tf_timeout_delete(struct tf_timeout_client *tc);
+
+#define tf_timed(__to, func, timeout) \
({ \
- struct tf_timeout __timeout; \
- tf_timeout_push(&__timeout, timeout); \
- tf_timeout_pop(&__timeout, (func)); \
+ tf_timeout_push(__to, timeout); \
+ tf_timeout_pop(__to, (func)); \
})
-void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds);
-int __tf_timeout_pop(struct tf_timeout *timeout, int err);
+/* Fibres and their management */
+typedef void *tf_uctx_t;
+
+struct tf_fiber {
+ unsigned int ref_count;
+ unsigned int flags;
+ tf_uctx_t context;
+ tf_uctx_t return_context;
+ struct tf_list_node queue_node;
+ struct tf_list_head wakeup_q;
+ struct tf_timeout_client timeout;
+ struct tf_ifc * pending_ifc;
+ char data[TF_EMPTY_ARRAY];
+};
+
+typedef void (*tf_fiber_proc)(void *fiber);
+
+void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
+void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
+void *tf_fiber_get(void *fiber);
+void tf_fiber_put(void *fiber);
+int tf_fiber_run(void *fiber);
+void tf_fiber_kill(void *fiber);
+
+void tf_fiber_wakeup(struct tf_fiber *fiber);
+int tf_fiber_schedule(void);
-static inline int tf_timeout_pop(struct tf_timeout *timeout, int err)
+void tf_fiber_exit(void) attribute_noreturn;
+
+static inline struct tf_fiber *tf_vmach_get_current_fiber(void)
{
- if (unlikely(timeout->timeout_change))
- return __tf_timeout_pop(timeout, err);
+ extern __thread struct tf_fiber *tf_current_fiber;
+ return tf_current_fiber;
+}
+
+static inline void tf_timeout_push(struct tf_timeout *to, tf_mtime_diff_t ms)
+{
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
+
+ to->saved_timeout = f->timeout.value;
+ to->my_timeout = tf_mtime_now() + ms;
+ if (f->timeout.value == 0 ||
+ tf_mtime_diff(to->my_timeout, f->timeout.value) < 0)
+ f->timeout.value = to->my_timeout;
+}
+
+static inline int tf_timeout_pop(struct tf_timeout *to, int err)
+{
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
+
+ f->timeout.value = to->saved_timeout;
return err;
}
static inline
-int tf_msleep(tf_mtime_diff_t milliseconds)
+int tf_msleep(tf_mtime_diff_t ms)
{
+ struct tf_timeout to;
int r;
- r = tf_timed(__tf_fiber_schedule(), milliseconds);
- if (r == TF_WAKEUP_THIS_TIMEOUT)
- r = 0;
- return r;
+
+ tf_timeout_push(&to, ms);
+ do {
+ r = tf_fiber_schedule();
+ } while (r != -ETIME);
+
+ return tf_timeout_pop(&to, r);
}
#endif
diff --git a/include/libtf/io.h b/include/libtf/io.h
index 0d34421..d1098e3 100644
--- a/include/libtf/io.h
+++ b/include/libtf/io.h
@@ -21,10 +21,12 @@
#include <libtf/defines.h>
/* Flags for tf_open_fd() */
-#define TF_FD_AUTOCLOSE 1
-#define TF_FD_STREAM_ORIENTED 2
-#define TF_FD_SET_CLOEXEC 4
-#define TF_FD_ALREADY_NONBLOCKING 8
+#define TF_FD_READ TF_BIT(0)
+#define TF_FD_WRITE TF_BIT(1)
+#define TF_FD_AUTOCLOSE TF_BIT(2)
+#define TF_FD_STREAM_ORIENTED TF_BIT(3)
+#define TF_FD_SET_CLOEXEC TF_BIT(4)
+#define TF_FD_ALREADY_NONBLOCKING TF_BIT(5)
struct tf_sockaddr {
union {
@@ -37,23 +39,14 @@ struct tf_sockaddr {
struct tf_fd {
int fd;
unsigned int flags;
- /* Single waiter -- would be relatively trivial to modify to allow
- * multiple waiters, if someone actually needs it */
- unsigned int events;
- void *waiting_fiber;
+ struct tf_fiber *fiber;
};
-#define TF_POLL_READ 1
-#define TF_POLL_WRITE 2
-
struct tf_poll_hooks {
- void (*init)(void);
- int (*poll)(tf_mtime_diff_t timeout);
- void (*close)(void);
- int (*fd_created)(struct tf_fd *fd);
- int (*fd_destroyed)(struct tf_fd *fd);
- void (*fd_monitor)(struct tf_fd *fd, int events);
- void (*fd_unmonitor)(struct tf_fd *fd);
+ void * (*create)(void);
+ int (*fd_created)(void *fiber, struct tf_fd *fd);
+ int (*fd_destroyed)(void *fiber, struct tf_fd *fd);
+ void (*fd_rearm)(void *fiber, struct tf_fd *fd);
};
int tf_open_fd(struct tf_fd *fd, int kfd, int flags);
diff --git a/include/libtf/scheduler.h b/include/libtf/scheduler.h
deleted file mode 100644
index db5a823..0000000
--- a/include/libtf/scheduler.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/* scheduler.h - libtf fiber scheduler header
- *
- * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi>
- * All rights reserved.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 or later as
- * published by the Free Software Foundation.
- *
- * See http://www.gnu.org/ for details.
- */
-
-#ifndef TF_SCHEDULER_H
-#define TF_SCHEDULER_H
-
-#include <libtf/atomic.h>
-#include <libtf/list.h>
-#include <libtf/heap.h>
-#include <libtf/fiber.h>
-
-struct tf_poll_hooks;
-
-struct tf_scheduler {
- struct tf_list_head scheduled_q;
- struct tf_list_head running_q;
- struct tf_heap_head heap;
- void * active_fiber;
- void * main_fiber;
- int num_fibers;
- tf_mtime_t scheduler_time;
- struct tf_poll_hooks * poller;
- unsigned long poll_data[2];
-};
-
-static inline
-struct tf_scheduler *tf_scheduler_get_current(void)
-{
- extern struct tf_scheduler *__tf_scheduler;
- TF_BUG_ON(__tf_scheduler == NULL);
- return __tf_scheduler;
-}
-
-static inline
-tf_mtime_t tf_scheduler_get_mtime(void)
-{
- return tf_scheduler_get_current()->scheduler_time;
-}
-
-struct tf_scheduler *tf_scheduler_create(void);
-int tf_scheduler_enable(struct tf_scheduler *);
-void tf_scheduler_disable(void);
-
-static inline struct tf_scheduler *
-tf_scheduler_get(struct tf_scheduler *s)
-{
- tf_fiber_get(s);
- return s;
-}
-
-static inline void
-tf_scheduler_put(struct tf_scheduler *s)
-{
- tf_fiber_put(s);
-}
-
-#endif
-
diff --git a/include/libtf/tf.h b/include/libtf/tf.h
index e613f18..d052ee7 100644
--- a/include/libtf/tf.h
+++ b/include/libtf/tf.h
@@ -14,7 +14,7 @@
#define TF_H
#include <libtf/fiber.h>
-#include <libtf/scheduler.h>
+#include <libtf/vmach.h>
#include <libtf/io.h>
#endif
diff --git a/include/libtf/vmach.h b/include/libtf/vmach.h
new file mode 100644
index 0000000..d302366
--- /dev/null
+++ b/include/libtf/vmach.h
@@ -0,0 +1,49 @@
+/* vmach.h - "virtual" machine and cpu contexts
+ *
+ * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#ifndef TF_VMACH_H
+#define TF_VMACH_H
+
+#include <libtf/fiber.h>
+
+struct tf_vmach {
+ struct tf_poll_hooks * poll_ops;
+ void * poll_fiber;
+ void * timeout_fiber;
+ struct tf_fiber startup_fiber;
+ int num_user_fibers;
+ struct tf_list_head run_q;
+};
+
+struct tf_vcpu {
+ struct tf_vmach * machine;
+};
+
+static inline struct tf_vmach *tf_vmach_get_current(void)
+{
+ extern __thread struct tf_vmach *tf_current_vmach;
+ return tf_current_vmach;
+}
+
+static inline struct tf_vcpu *tf_vmach_get_current_cpu(void)
+{
+ extern __thread struct tf_vcpu *tf_current_vcpu;
+ return tf_current_vcpu;
+}
+
+void tf_vmach_start(void);
+void tf_vmach_stop(void);
+
+void tf_vmach_run(struct tf_vmach *vm, struct tf_fiber *f);
+void tf_vmach_run_dedicated(struct tf_vmach *vm, struct tf_fiber *f);
+
+#endif
diff --git a/src/TFbuild b/src/TFbuild
index 9277f9a..3ffa9ef 100644
--- a/src/TFbuild
+++ b/src/TFbuild
@@ -1,6 +1,6 @@
libs-y += libtf
-libtf-objs-y += fiber.o scheduler.o heap.o
+libtf-objs-y += fiber.o heap.o timeout.o vmach.o ifc.o
libtf-objs-$(OS_LINUX) += io-epoll.o mem-mmap.o
libtf-objs-$(OS_UNIX) += io-unix.o
diff --git a/src/fiber.c b/src/fiber.c
index e507815..bef7b81 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -13,31 +13,13 @@
#include <time.h>
#include <errno.h>
#include <unistd.h>
+#include <libtf/atomic.h>
#include <libtf/fiber.h>
-#include <libtf/scheduler.h>
+#include <libtf/vmach.h>
#include "uctx.h"
-#define TF_TIMEOUT_CHANGE_NEEDED 1
-#define TF_TIMEOUT_CHANGE_NEW_VALUE 2
-
-struct tf_fiber {
- unsigned int ref_count;
- struct tf_scheduler * scheduler;
- int wakeup_type;
- unsigned int timeout_change;
- tf_mtime_t timeout;
- struct tf_list_node queue_node;
- struct tf_heap_node heap_node;
- struct tf_uctx context;
- char data[TF_EMPTY_ARRAY];
-};
-
-static inline
-struct tf_fiber *tf_fiber_get_current(void)
-{
- void *data = tf_scheduler_get_current()->active_fiber;
- return container_of(data, struct tf_fiber, data);
-}
+#define TF_FIBERF_RUNNING TF_BIT(0)
+#define TF_FIBERF_WAKEUP_PENDING TF_BIT(1)
static void tf_fiber_main(void *user_data, void *arg)
{
@@ -48,45 +30,46 @@ static void tf_fiber_main(void *user_data, void *arg)
tf_fiber_exit();
}
-void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
+void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
{
- struct tf_fiber *fiber;
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ struct tf_fiber *f;
- fiber = tf_uctx_create_embedded(
+ f = tf_uctx_create_embedded(
TF_STACK_SIZE,
sizeof(struct tf_fiber) + private_size,
offsetof(struct tf_fiber, context),
tf_fiber_main, fiber_main);
- if (fiber == NULL)
+ if (f == NULL)
return NULL;
- *fiber = (struct tf_fiber) {
+ *f = (struct tf_fiber) {
.ref_count = 1,
- .queue_node = TF_LIST_INITIALIZER(fiber->queue_node),
- .context = fiber->context,
+ .queue_node = TF_LIST_INITIALIZER(f->queue_node),
+ .context = f->context,
+ .timeout.manager = self ? self->timeout.manager : NULL,
+ .wakeup_q = TF_LIST_HEAD_INITIALIZER(f->wakeup_q),
};
- return fiber->data;
+ return f->data;
}
-void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
+int tf_fiber_run(void *fiber)
{
- struct tf_fiber *fiber;
- struct tf_scheduler *sched;
-
- sched = tf_scheduler_get_current();
- if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0)
- return NULL;
+ struct tf_timeout_manager *tm;
+ struct tf_fiber *f = container_of(fiber, struct tf_fiber, data);
- fiber = container_of(__tf_fiber_create(fiber_main, private_size),
- struct tf_fiber, data);
- sched->num_fibers++;
+ tm = f->timeout.manager;
+ if (tm != NULL) {
+ if (tf_heap_prealloc(&tm->heap, tm->num_fibers + 1) < 0)
+ return -ENOMEM;
+ tm->num_fibers++;
+ }
- fiber->scheduler = sched;
- fiber->wakeup_type = TF_WAKEUP_NONE;
- tf_list_add_tail(&fiber->queue_node, &sched->scheduled_q);
+ tf_fiber_wakeup(f);
+ tf_vmach_get_current()->num_user_fibers++;
- return tf_fiber_get(fiber->data);
+ return 0;
}
void *tf_fiber_get(void *data)
@@ -98,23 +81,8 @@ void *tf_fiber_get(void *data)
static void __tf_fiber_destroy(struct tf_fiber *fiber)
{
- struct tf_scheduler *sched = fiber->scheduler;
- int main_fiber, num_fibers;
-
- /* decrease first the number of fibers as we might be
- * killing the scheduler it self */
- num_fibers = --sched->num_fibers;
-
- main_fiber = (fiber->context.alloc == NULL);
- tf_heap_delete(&fiber->heap_node, &sched->heap);
- tf_uctx_destroy(&fiber->context);
- if (main_fiber)
- free(fiber);
-
- if (num_fibers == 1) {
- /* FIXME: Use proper fiber event*/
- __tf_fiber_wakeup(sched->main_fiber, TF_WAKEUP_IMMEDIATE);
- }
+ tf_heap_delete(&fiber->timeout.heap_node, &fiber->timeout.manager->heap);
+ tf_uctx_destroy(fiber->context);
}
void tf_fiber_put(void *data)
@@ -124,165 +92,60 @@ void tf_fiber_put(void *data)
__tf_fiber_destroy(fiber);
}
-void __tf_fiber_wakeup(void *data, int wakeup_type)
+void tf_fiber_wakeup(struct tf_fiber *f)
{
- struct tf_fiber *fiber = container_of(data, struct tf_fiber, data);
- struct tf_scheduler *sched = fiber->scheduler;
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ unsigned int newval, oldval;
- if (fiber->wakeup_type == TF_WAKEUP_NONE) {
- fiber->wakeup_type = wakeup_type;
- tf_list_add_tail(&fiber->queue_node, &sched->running_q);
- }
-}
+ do {
+ oldval = f->flags;
+ if (oldval & TF_FIBERF_WAKEUP_PENDING)
+ return;
+ newval = oldval | TF_FIBERF_WAKEUP_PENDING | TF_FIBERF_RUNNING;
+ } while (!tf_atomic_cmpxchg(&f->flags, oldval, newval));
-void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node)
-{
- __tf_fiber_wakeup(container_of(node, struct tf_fiber, heap_node)->data,
- TF_WAKEUP_TIMEOUT);
+ if (!(oldval & TF_FIBERF_RUNNING))
+ tf_list_add_tail(&f->queue_node, &self->wakeup_q);
}
-int __tf_fiber_schedule(void)
+int tf_fiber_schedule(void)
{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current(), *nf;
- int wakeup;
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
- if (unlikely(f->timeout_change)) {
- if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) {
- if (tf_mtime_diff(f->timeout, tf_scheduler_get_mtime()) <= 0) {
- f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
- return TF_WAKEUP_TIMEOUT;
- }
- tf_heap_change(&f->heap_node, &sched->heap, f->timeout);
- } else
- tf_heap_delete(&f->heap_node, &sched->heap);
- f->timeout_change = 0;
- }
+ if (f->timeout.value != 0 &&
+ tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0)
+ return -ETIME;
- /* Figure out the next fibre to run */
- if (unlikely(tf_list_empty(&sched->scheduled_q))) {
- tf_list_splice_tail(&sched->running_q,
- &sched->scheduled_q);
- TF_BUG_ON(tf_list_empty(&sched->scheduled_q));
+ if (f->flags & TF_FIBERF_WAKEUP_PENDING) {
+ f->flags = TF_FIBERF_RUNNING;
+ return 0;
}
- nf = tf_list_entry(tf_list_pop(&sched->scheduled_q),
- struct tf_fiber, queue_node);
- sched->active_fiber = nf->data;
- tf_uctx_transfer(&f->context, &nf->context);
-
- wakeup = f->wakeup_type;
- f->wakeup_type = TF_WAKEUP_NONE;
-
- return wakeup;
-}
-
-int __tf_fiber_bind_scheduler(struct tf_scheduler *sched)
-{
- struct tf_fiber *f;
- f = malloc(sizeof(struct tf_fiber));
- if (f == NULL)
- return -ENOMEM;
+ if (f->timeout.value != f->timeout.latched)
+ tf_timeout_adjust(&f->timeout);
- /* Mark currently active main fiber as active */
- *f = (struct tf_fiber) {
- .ref_count = 1,
- .scheduler = sched,
- .queue_node = TF_LIST_INITIALIZER(f->queue_node),
- };
- tf_uctx_create_self(&f->context);
- sched->main_fiber = f->data;
- sched->active_fiber = f->data;
- sched->num_fibers++;
+ f->flags = 0;
+ tf_uctx_transfer(f->context, f->return_context);
+ f->flags = TF_FIBERF_RUNNING;
- /* Schedule scheduler fiber */
- f = container_of((void *) sched, struct tf_fiber, data);
- f->scheduler = sched;
- f->wakeup_type = TF_WAKEUP_IMMEDIATE;
- tf_list_add_tail(&f->queue_node, &sched->running_q);
-
- return 0;
-}
-
-int __tf_fiber_release_scheduler(struct tf_scheduler *sched)
-{
- struct tf_fiber *f;
-
- /* Detach scheduler */
- f = container_of((void *) sched, struct tf_fiber, data);
- tf_list_del(&f->queue_node);
-
- /* Detach main stack from this scheduler */
- f = container_of((void *) sched->main_fiber, struct tf_fiber, data);
- tf_fiber_put(sched->main_fiber);
- sched->main_fiber = NULL;
- sched->num_fibers--;
+ if (f->timeout.value != 0 &&
+ tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0)
+ return -ETIME;
return 0;
}
void tf_fiber_exit(void)
{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current();
- struct tf_fiber *schedf = container_of((void *) sched, struct tf_fiber, data);
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
- tf_heap_delete(&f->heap_node, &sched->heap);
- schedf->wakeup_type = TF_WAKEUP_KILL;
- tf_uctx_transfer(&f->context, &schedf->context);
+ if (f->timeout.manager != NULL)
+ tf_timeout_delete(&f->timeout);
+ tf_vmach_get_current()->num_user_fibers--;
+ tf_uctx_transfer(f->context, f->return_context);
TF_BUG_ON(1);
}
void tf_fiber_kill(void *fiber)
{
}
-
-int tf_fiber_yield(void)
-{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current();
-
- TF_BUG_ON(tf_list_hashed(&f->queue_node));
- f->wakeup_type = TF_WAKEUP_IMMEDIATE;
- tf_list_add_tail(&f->queue_node, &sched->running_q);
-
- return __tf_fiber_schedule();
-}
-
-void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds)
-{
- struct tf_fiber *f = tf_fiber_get_current();
- tf_mtime_t abs = tf_scheduler_get_mtime() + milliseconds;
- int active;
-
- if (f->timeout_change)
- active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE);
- else
- active = tf_heap_node_active(&f->heap_node);
-
- if (!active || tf_mtime_diff(abs, f->timeout) < 0) {
- /* Save previous timeout */
- timeout->saved_timeout = f->timeout;
- timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
- if (active)
- timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE;
-
- /* Make new timeout pending */
- f->timeout = abs;
- f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED
- | TF_TIMEOUT_CHANGE_NEW_VALUE;
- } else {
- timeout->timeout_change = 0;
- }
-}
-
-int __tf_timeout_pop(struct tf_timeout *timeout, int err)
-{
- struct tf_fiber *f = tf_fiber_get_current();
-
- f->timeout = timeout->saved_timeout;
- f->timeout_change = timeout->timeout_change;
- if (err == TF_WAKEUP_TIMEOUT)
- err = TF_WAKEUP_THIS_TIMEOUT;
- return err;
-}
diff --git a/src/ifc.c b/src/ifc.c
new file mode 100644
index 0000000..2dfafe8
--- /dev/null
+++ b/src/ifc.c
@@ -0,0 +1,57 @@
+/* ifc.c - inter fiber communications
+ *
+ * Copyright (C) 2010 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#include <libtf/atomic.h>
+#include <libtf/fiber.h>
+
+void tf_ifc_queue(void *fiber, struct tf_ifc *ifc, tf_ifc_handler_t handler)
+{
+ struct tf_fiber *f = container_of(fiber, struct tf_fiber, data);
+ struct tf_ifc *old;
+
+ TF_BUG_ON(ifc->next != NULL);
+ ifc->handler = handler;
+ do {
+ old = f->pending_ifc;
+ ifc->next = old;
+ } while (!tf_atomic_cmpxchg(&f->pending_ifc, old, ifc));
+
+ tf_fiber_wakeup(f);
+}
+
+void tf_ifc_complete(void *fiber, struct tf_ifc *ifc, tf_ifc_handler_t handler)
+{
+ ifc->sender = tf_vmach_get_current_fiber();
+ tf_ifc_queue(fiber, ifc, handler);
+ while (ifc->sender != NULL)
+ tf_fiber_schedule();
+}
+
+void tf_ifc_process_unordered(void)
+{
+ struct tf_fiber *f = tf_vmach_get_current_fiber(), *s;
+ struct tf_ifc *pending, *ifc;
+
+ while (f->pending_ifc != NULL) {
+ pending = tf_atomic_xchg(&f->pending_ifc, NULL);
+ while (pending) {
+ ifc = pending;
+ pending = ifc->next;
+ ifc->handler(f->data, ifc);
+ s = ifc->sender;
+ ifc->next = NULL;
+ ifc->sender = NULL;
+ if (s != NULL)
+ tf_fiber_wakeup(s);
+ }
+ }
+}
diff --git a/src/io-epoll.c b/src/io-epoll.c
index 32aa090..1fc9ca1 100644
--- a/src/io-epoll.c
+++ b/src/io-epoll.c
@@ -16,74 +16,70 @@
#include <sys/epoll.h>
#include <libtf/io.h>
-#include <libtf/scheduler.h>
+#include <libtf/fiber.h>
-struct tf_poll_data {
+struct tf_epoll_data {
int epoll_fd;
- int num_waiters;
};
-static struct tf_poll_data *tf_epoll_get_data(void)
+static void tf_epoll_main(void *ctx)
{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- TF_BUILD_BUG_ON(sizeof(struct tf_poll_data) > sizeof(sched->poll_data));
- return (struct tf_poll_data *) &sched->poll_data;
-}
-
-static void tf_epoll_init(void)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ struct tf_epoll_data *pd = ctx;
+ struct epoll_event events[64];
+ struct tf_fd *fd;
+ int r, i;
- pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
- pd->num_waiters = 0;
- TF_BUG_ON(pd->epoll_fd < 0);
-}
+ do {
+ r = epoll_wait(pd->epoll_fd, events, array_size(events), 0);
+ if (r == 0) {
+ /* FIXME: yielding is bad */
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ tf_list_add_tail(&self->queue_node, &self->wakeup_q);
+ if (tf_fiber_schedule() == 0)
+ continue;
+ }
-static void tf_epoll_close(void)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ for (i = 0; i < r; i++) {
+ fd = (struct tf_fd *) events[i].data.ptr;
+ tf_fiber_wakeup(fd->fiber);
+ }
+ } while (1);
close(pd->epoll_fd);
}
-static int tf_epoll_poll(tf_mtime_diff_t timeout)
+
+static void *tf_epoll_create(void)
{
- struct tf_poll_data *pd = tf_epoll_get_data();
- struct epoll_event events[64];
- struct tf_fd *fd;
- int r, i, ret;
+ struct tf_epoll_data *d;
- if (timeout == 0 && pd->num_waiters == 0)
- return TF_WAKEUP_TIMEOUT;
+ d = tf_fiber_create(tf_epoll_main, sizeof(struct tf_epoll_data));
+ if (d == NULL)
+ return NULL;
- ret = TF_WAKEUP_TIMEOUT;
- do {
- r = epoll_wait(pd->epoll_fd, events, array_size(events), timeout);
- if (r == 0)
- break;
+ d->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ TF_BUG_ON(d->epoll_fd < 0);
- for (i = 0; i < r; i++) {
- fd = (struct tf_fd *) events[i].data.ptr;
- if (likely(fd->events & events[i].events))
- __tf_fiber_wakeup(fd->waiting_fiber, TF_WAKEUP_FD);
- }
- ret = TF_WAKEUP_FD;
- timeout = 0;
- } while (unlikely(r == array_size(events)));
+ tf_fiber_run(tf_fiber_get(d));
- return ret;
+ return d;
}
-static int tf_epoll_fd_created(struct tf_fd *fd)
+static int tf_epoll_fd_created(void *fiber, struct tf_fd *fd)
{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ struct tf_epoll_data *d = fiber;
struct epoll_event ev;
int r;
ev = (struct epoll_event) {
- .events = EPOLLIN | EPOLLOUT | EPOLLET,
+ .events = EPOLLET,
.data.ptr = fd,
};
- r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+ if (fd->flags & TF_FD_READ)
+ ev.events |= EPOLLIN;
+ if (fd->flags & TF_FD_WRITE)
+ ev.events |= EPOLLOUT;
+
+ r = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
if (unlikely(r < 0)) {
TF_BUG_ON(errno == EEXIST);
r = -errno;
@@ -93,46 +89,18 @@ static int tf_epoll_fd_created(struct tf_fd *fd)
return 0;
}
-static int tf_epoll_fd_destroyed(struct tf_fd *fd)
+static int tf_epoll_fd_destroyed(void *fiber, struct tf_fd *fd)
{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ struct tf_epoll_data *d = fiber;
- if (fd->flags & TF_FD_AUTOCLOSE)
- return 0;
+ if (!(fd->flags & TF_FD_AUTOCLOSE))
+ epoll_ctl(d->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
- epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
return 0;
}
-static void tf_epoll_fd_monitor(struct tf_fd *fd, int events)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
-
- TF_BUG_ON(fd->waiting_fiber != NULL);
- fd->events = EPOLLERR | EPOLLHUP;
- if (events & TF_POLL_READ)
- fd->events |= EPOLLIN;
- if (events & TF_POLL_WRITE)
- fd->events |= EPOLLOUT;
- fd->waiting_fiber = tf_scheduler_get_current()->active_fiber;
- pd->num_waiters++;
-}
-
-static void tf_epoll_fd_unmonitor(struct tf_fd *fd)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
-
- fd->waiting_fiber = NULL;
- fd->events = 0;
- pd->num_waiters--;
-}
-
struct tf_poll_hooks tf_epoll_hooks = {
- .init = tf_epoll_init,
- .close = tf_epoll_close,
- .poll = tf_epoll_poll,
+ .create = tf_epoll_create,
.fd_created = tf_epoll_fd_created,
.fd_destroyed = tf_epoll_fd_destroyed,
- .fd_monitor = tf_epoll_fd_monitor,
- .fd_unmonitor = tf_epoll_fd_unmonitor,
};
diff --git a/src/io-unix.c b/src/io-unix.c
index d80592a..7cd4fd0 100644
--- a/src/io-unix.c
+++ b/src/io-unix.c
@@ -17,10 +17,8 @@
#include <string.h>
#include <libtf/io.h>
-#include <libtf/scheduler.h>
-
-#define TF_POLL_HOOKS \
-struct tf_poll_hooks *poller = tf_scheduler_get_current()->poller;
+#include <libtf/fiber.h>
+#include <libtf/vmach.h>
static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
{
@@ -38,14 +36,14 @@ static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
int tf_open_fd(struct tf_fd *fd, int kfd, int flags)
{
- TF_POLL_HOOKS;
+ struct tf_vmach *vm = tf_vmach_get_current();
int r;
fd->fd = kfd;
fd->flags = flags;
- fd->waiting_fiber = NULL;
+ fd->fiber = tf_vmach_get_current_fiber();
- r = poller->fd_created(fd);
+ r = vm->poll_ops->fd_created(vm->poll_fiber, fd);
if (r < 0) {
if (flags & TF_FD_AUTOCLOSE)
close(kfd);
@@ -88,10 +86,10 @@ int tf_open(struct tf_fd *fd, const char *pathname, int flags)
int tf_close(struct tf_fd *fd)
{
- TF_POLL_HOOKS;
+ struct tf_vmach *vm = tf_vmach_get_current();
int r;
- poller->fd_destroyed(fd);
+ vm->poll_ops->fd_destroyed(vm->poll_fiber, fd);
if (fd->flags & TF_FD_AUTOCLOSE) {
r = close(fd->fd);
if (unlikely(r == -1))
@@ -104,11 +102,9 @@ int tf_close(struct tf_fd *fd)
int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- int r;
+ int r = 0;
- poller->fd_monitor(fd, TF_POLL_READ);
do {
n = read(fd->fd, buf, count);
if (n == count) {
@@ -132,20 +128,17 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
continue;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return -r;
}
int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- int r;
+ int r = 0;
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
n = write(fd->fd, buf, count);
if (n == count) {
@@ -166,19 +159,16 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
continue;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return -r;
}
ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- poller->fd_monitor(fd, TF_POLL_READ);
do {
n = read(fd->fd, buf, count);
if (n >= 0)
@@ -189,19 +179,16 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
n = -errno;
break;
}
- n = __tf_fiber_schedule();
- } while (n == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ n = tf_fiber_schedule();
+ } while (n != 0);
return n;
}
ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
n = write(fd->fd, buf, count);
if (n >= 0)
@@ -212,9 +199,8 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
n = -errno;
break;
}
- n = __tf_fiber_schedule();
- } while (n == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ n = tf_fiber_schedule();
+ } while (n != 0);
return n;
}
@@ -261,7 +247,6 @@ int tf_listen(struct tf_fd *fd, int backlog)
int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
struct tf_sockaddr *from)
{
- TF_POLL_HOOKS;
int r, tfdf;
struct sockaddr *addr = NULL;
socklen_t al, *pal = NULL;
@@ -272,24 +257,19 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
pal = &al;
}
- tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED);
- tfdf |= TF_FD_SET_CLOEXEC;
+ tfdf = TF_FD_AUTOCLOSE | TF_FD_ALREADY_NONBLOCKING |
+ (listen_fd->flags & TF_FD_STREAM_ORIENTED);
- poller->fd_monitor(listen_fd, TF_POLL_READ);
do {
- /* FIXME: use accept4 if available */
- r = accept(listen_fd->fd, addr, pal);
+ r = accept4(listen_fd->fd, addr, pal, SOCK_NONBLOCK|SOCK_CLOEXEC);
if (r >= 0)
break;
if (errno == EINTR)
continue;
- if (errno != EAGAIN) {
- poller->fd_unmonitor(listen_fd);
+ if (errno != EAGAIN)
return -errno;
- }
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(listen_fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
if (r < 0)
return r;
@@ -298,7 +278,6 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
{
- TF_POLL_HOOKS;
socklen_t l = sizeof(int);
int r, err;
@@ -310,10 +289,8 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
return -errno;
/* Wait for socket to become readable */
- poller->fd_monitor(fd, TF_POLL_WRITE);
- r = __tf_fiber_schedule();
- poller->fd_unmonitor(fd);
- if (r != TF_WAKEUP_FD)
+ r = tf_fiber_schedule();
+ if (r != 0)
return r;
/* Check for error */
@@ -327,7 +304,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
struct tf_sockaddr *to,
void *buf, size_t len)
{
- TF_POLL_HOOKS;
struct iovec iov;
struct msghdr msg;
struct cmsghdr *cmsg;
@@ -347,7 +323,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
.msg_iovlen = 1,
};
- poller->fd_monitor(fd, TF_POLL_READ);
do {
r = recvmsg(fd->fd, &msg, 0);
if (r >= 0)
@@ -356,9 +331,8 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
if (r < 0 || to == NULL)
return r;
@@ -382,7 +356,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
const struct tf_sockaddr *to,
const void *buf, size_t len)
{
- TF_POLL_HOOKS;
struct msghdr msg;
struct iovec iov;
struct {
@@ -411,7 +384,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
msg.msg_controllen = sizeof(cmsg);
}
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
r = sendmsg(fd->fd, &msg, 0);
if (r >= 0)
@@ -420,9 +392,8 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return r;
}
diff --git a/src/scheduler.c b/src/scheduler.c
deleted file mode 100644
index a103d0a..0000000
--- a/src/scheduler.c
+++ /dev/null
@@ -1,134 +0,0 @@
-/* scheduler.c - fiber scheduling
- *
- * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi>
- * All rights reserved.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 or later as
- * published by the Free Software Foundation.
- *
- * See http://www.gnu.org/ for details.
- */
-
-#include <time.h>
-#include <libtf/scheduler.h>
-#include <libtf/io.h>
-
-/* FIXME: should be in thread local storage */
-extern struct tf_poll_hooks tf_epoll_hooks;
-struct tf_scheduler *__tf_scheduler;
-
-static void update_time(struct tf_scheduler *sched)
-{
- struct timespec ts;
-
- clock_gettime(CLOCK_MONOTONIC, &ts);
- sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
-}
-
-static void process_heap(struct tf_scheduler *sched)
-{
- struct tf_heap_node *node;
- tf_mtime_t now = sched->scheduler_time;
-
- while (!tf_heap_empty(&sched->heap) &&
- tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) {
- node = tf_heap_get_node(&sched->heap);
- tf_heap_delete(node, &sched->heap);
- __tf_fiber_wakeup_heapnode(node);
- }
-}
-
-void tf_scheduler_fiber(void *data)
-{
- struct tf_scheduler *sched = (struct tf_scheduler *) data;
-
- do {
- tf_mtime_diff_t timeout;
-
- update_time(sched);
- if (!tf_list_empty(&sched->scheduled_q) ||
- !tf_list_empty(&sched->running_q)) {
- timeout = 0;
- } else if (!tf_heap_empty(&sched->heap)) {
- timeout = tf_mtime_diff(
- tf_heap_get_value(&sched->heap),
- tf_scheduler_get_mtime());
- if (timeout < 0)
- timeout = 0;
- } else {
- timeout = -1;
- }
-
- if (sched->poller->poll(timeout) == TF_WAKEUP_TIMEOUT &&
- timeout >= 0) {
- sched->scheduler_time += timeout;
- process_heap(sched);
- }
-
- if (tf_fiber_yield() == TF_WAKEUP_KILL) {
- do {
- tf_fiber_put(sched->active_fiber);
- sched->active_fiber = sched;
- } while (__tf_fiber_schedule() == TF_WAKEUP_KILL);
- }
- } while (1);
-}
-
-struct tf_scheduler *tf_scheduler_create(void)
-{
- struct tf_scheduler *sched;
-
- sched = __tf_fiber_create(tf_scheduler_fiber,
- sizeof(struct tf_scheduler));
-
- *sched = (struct tf_scheduler) {
- .scheduled_q = TF_LIST_HEAD_INITIALIZER(sched->scheduled_q),
- .running_q = TF_LIST_HEAD_INITIALIZER(sched->running_q),
- };
-
- return sched;
-}
-
-int tf_scheduler_enable(struct tf_scheduler *sched)
-{
- struct tf_scheduler *s = sched;
-
- if (s == NULL) {
- s = tf_scheduler_create();
- if (s == NULL)
- return -ENOMEM;
- }
- if (s->main_fiber != NULL)
- return -EBUSY;
-
- __tf_fiber_bind_scheduler(s);
- __tf_scheduler = s;
- s->poller = &tf_epoll_hooks;
- s->poller->init();
- update_time(s);
-
- if (sched != NULL)
- tf_scheduler_get(sched);
-
- return 0;
-}
-
-void tf_scheduler_disable(void)
-{
- struct tf_scheduler *sched = __tf_scheduler;
-
- if (sched == NULL ||
- sched->main_fiber != sched->active_fiber)
- return;
-
- /* sleep until no others */
- while (sched->num_fibers > 1)
- __tf_fiber_schedule();
-
- sched->poller->close();
- __tf_scheduler = NULL;
- __tf_fiber_release_scheduler(sched);
- tf_heap_destroy(&sched->heap);
- tf_fiber_put(sched);
-}
diff --git a/src/timeout.c b/src/timeout.c
new file mode 100644
index 0000000..fc7acd2
--- /dev/null
+++ b/src/timeout.c
@@ -0,0 +1,120 @@
+/* timeout.c - timerfd based fiber wakeups
+ *
+ * Copyright (C) 2010 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#include <libtf/fiber.h>
+#include <libtf/io.h>
+#include <sys/timerfd.h>
+
+struct tf_timeout_fiber {
+ struct tf_timeout_manager manager;
+ struct tf_fd fd;
+ tf_mtime_t programmed;
+};
+
+static tf_mtime_t tf_mtime_cached;
+
+tf_mtime_t tf_mtime_now(void)
+{
+ return tf_mtime_cached;
+}
+
+tf_mtime_t tf_mtime_update(void)
+{
+ struct timespec ts;
+
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ tf_mtime_cached = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
+
+ return tf_mtime_cached;
+}
+
+static void tf_timeout_process_heap(struct tf_timeout_fiber *t)
+{
+ struct tf_heap_head *heap = &t->manager.heap;
+ struct tf_heap_node *node;
+ tf_mtime_t now = tf_mtime_update();
+ tf_mtime_diff_t timeout, value;
+
+ while (!tf_heap_empty(heap)) {
+ value = tf_heap_get_value(heap);
+ timeout = tf_mtime_diff(value, now);
+ if (timeout > 0) {
+ if (t->programmed != value) {
+ struct itimerspec its = {
+ .it_value.tv_sec = timeout / 1000,
+ .it_value.tv_nsec = (timeout % 1000) * 1000000,
+ };
+ t->programmed = value;
+ timerfd_settime(t->fd.fd, 0, &its, NULL);
+ }
+ break;
+ }
+
+ node = tf_heap_get_node(heap);
+ tf_heap_delete(node, heap);
+ tf_fiber_wakeup(container_of(node, struct tf_fiber, timeout.heap_node));
+ }
+}
+
+static void tf_timeout_worker(void *ctx)
+{
+ do {
+ tf_ifc_process_unordered();
+ tf_timeout_process_heap(ctx);
+ } while (tf_fiber_schedule() == 0);
+}
+
+struct tf_timeout_fiber *tf_timeout_fiber_create(void)
+{
+ struct tf_timeout_fiber *f;
+
+ f = tf_fiber_create(tf_timeout_worker, sizeof(struct tf_timeout_fiber));
+ if (f == NULL)
+ return f;
+
+ if (tf_open_fd(&f->fd,
+ timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC),
+ TF_FD_READ | TF_FD_ALREADY_NONBLOCKING | TF_FD_AUTOCLOSE) != 0) {
+ tf_fiber_put(f);
+ return NULL;
+ }
+ f->fd.fiber = container_of((void *) f, struct tf_fiber, data);
+ tf_fiber_run(tf_fiber_get(f));
+
+ return f;
+}
+
+static void tf_timeout_adjust_ifc(void *fiber, struct tf_ifc *ifc)
+{
+ struct tf_timeout_fiber *t = fiber;
+ struct tf_timeout_client *c = container_of(ifc, struct tf_timeout_client, ifc);
+ tf_mtime_t val;
+
+ val = c->value;
+ if (val == 0)
+ tf_heap_delete(&c->heap_node, &t->manager.heap);
+ else
+ tf_heap_change(&c->heap_node, &t->manager.heap, val);
+ c->latched = val;
+}
+
+void tf_timeout_adjust(struct tf_timeout_client *tc)
+{
+ tf_ifc_queue(tc->manager, &tc->ifc, tf_timeout_adjust_ifc);
+}
+
+void tf_timeout_delete(struct tf_timeout_client *tc)
+{
+ tc->value = 0;
+ tc->latched = 0;
+ tf_ifc_complete(tc->manager, &tc->ifc, tf_timeout_adjust_ifc);
+}
diff --git a/src/uctx.h b/src/uctx.h
index 3ec30c4..353b732 100644
--- a/src/uctx.h
+++ b/src/uctx.h
@@ -18,19 +18,18 @@
#ifdef VALGRIND
#include <valgrind/valgrind.h>
-#else
-#define VALGRIND_STACK_REGISTER(stack_base, size) 0
-#define VALGRIND_STACK_DEREGISTER(stack_id)
#endif
#define STACK_GUARD 0xbad57ac4
struct tf_uctx {
- int *stack_guard;
- size_t size;
- void *alloc;
- void *current_sp;
- unsigned int stack_id;
+ int * stack_guard;
+ size_t size;
+ void * alloc;
+ void * current_sp;
+#ifdef VALGRIND
+ unsigned int stack_id;
+#endif
};
#if defined(__i386__)
@@ -89,13 +88,14 @@ static inline void stack_push_ptr(void **stackptr, void *ptr)
}
-static inline void tf_uctx_create_self(struct tf_uctx *uctx)
+static inline tf_uctx_t tf_uctx_create_self(struct tf_uctx *uctx)
{
static int dummy_guard = STACK_GUARD;
*uctx = (struct tf_uctx) {
.stack_guard = &dummy_guard,
};
+ return uctx;
}
static inline void *
@@ -118,20 +118,24 @@ tf_uctx_create_embedded(
/* Create initial stack frame (cdecl convention) */
stack = stack_pointer(stack_base, size);
- user_data = stack_push(&stack, TF_ALIGN(private_size, 64));
+ user_data = stack_push(&stack, TF_ALIGN(private_size, 16));
+ uctx = stack_push(&stack, TF_ALIGN(sizeof(struct tf_uctx), 16));
stack_push_ptr(&stack, main_argument);
stack_push_ptr(&stack, user_data);
stack_push_ptr(&stack, NULL);
stack_push_ptr(&stack, stack_frame_main); /* eip */
stack_push_ptr(&stack, NULL); /* ebp */
- uctx = user_data + uctx_offset;
+ *((tf_uctx_t *) (user_data + uctx_offset)) = uctx;
+
*uctx = (struct tf_uctx) {
.stack_guard = stack_guard(stack_base, size),
.alloc = stack_base,
.size = size,
.current_sp = stack,
+#ifdef VALGRIND
.stack_id = VALGRIND_STACK_REGISTER(stack_base, stack_base+size),
+#endif
};
*uctx->stack_guard = STACK_GUARD;
@@ -139,18 +143,25 @@ tf_uctx_create_embedded(
}
static inline
-void tf_uctx_destroy(struct tf_uctx *uctx)
+void tf_uctx_destroy(tf_uctx_t ctx)
{
+ struct tf_uctx *uctx = ctx;
+
if (uctx->alloc != NULL) {
+#ifdef VALGRIND
VALGRIND_STACK_DEREGISTER(uctx->stack_id);
+#endif
tf_bmem_free(uctx->alloc, uctx->size);
}
}
static inline
-void tf_uctx_transfer(struct tf_uctx *from, struct tf_uctx *to)
+void tf_uctx_transfer(tf_uctx_t from, tf_uctx_t to)
{
+ struct tf_uctx *ufrom = from;
+ struct tf_uctx *uto = to;
+
/* Switch stack pointers */
- TF_BUG_ON(*from->stack_guard != STACK_GUARD);
- switch_fiber(from, to);
+ TF_BUG_ON(*ufrom->stack_guard != STACK_GUARD);
+ switch_fiber(ufrom, uto);
}
diff --git a/src/vmach.c b/src/vmach.c
new file mode 100644
index 0000000..a9ca446
--- /dev/null
+++ b/src/vmach.c
@@ -0,0 +1,120 @@
+#include <libtf/defines.h>
+#include <libtf/list.h>
+#include <libtf/vmach.h>
+#include <libtf/io.h>
+#include "uctx.h"
+
+__thread struct tf_fiber *tf_current_fiber;
+__thread struct tf_vcpu *tf_current_vcpu;
+__thread struct tf_vmach *tf_current_vmach;
+
+extern struct tf_poll_hooks tf_epoll_hooks;
+
+struct tf_vmachine {
+ struct tf_vmach vmach;
+ struct tf_uctx startup_uctx;
+ void *machine_init_fiber;
+};
+
+static void tf_vcpu_main(void *fiber_data)
+{
+ struct tf_vcpu *vcpu = fiber_data;
+ struct tf_vmach *vmach = vcpu->machine;
+ struct tf_fiber *self = container_of(fiber_data, struct tf_fiber, data);
+ struct tf_fiber *f;
+
+ tf_current_vmach = vmach;
+ tf_current_vcpu = vcpu;
+
+ while (vmach->num_user_fibers != 0) {
+ if (tf_list_empty(&vmach->run_q)) {
+ /* sleep */
+ continue;
+ }
+
+ f = tf_list_entry(tf_list_pop(&vmach->run_q),
+ struct tf_fiber, queue_node);
+
+ f->return_context = self->context;
+ tf_current_fiber = f;
+ tf_uctx_transfer(self->context, f->context);
+ tf_list_splice_tail(&f->wakeup_q, &vmach->run_q);
+ }
+}
+
+static void tf_vmach_main(void *fiber_data)
+{
+ struct tf_fiber *self = container_of(fiber_data, struct tf_fiber, data);
+ struct tf_vcpu *vcpu = fiber_data;
+ struct tf_vmach *vmach = vcpu->machine;
+
+ tf_current_vmach = vmach;
+ tf_current_vcpu = vcpu;
+ tf_current_fiber = self;
+
+ /* Initialize IO subsystem */
+ vmach->poll_ops = &tf_epoll_hooks;
+ vmach->poll_fiber = vmach->poll_ops->create();
+ vmach->timeout_fiber = tf_timeout_fiber_create();
+ vmach->startup_fiber.timeout.manager = vmach->timeout_fiber;
+
+ /* Run the initial fiber */
+ tf_fiber_wakeup(&vmach->startup_fiber);
+
+ /* Use main thread as a regular vcpu */
+ vmach->num_user_fibers = 1;
+ tf_list_splice_tail(&self->wakeup_q, &vmach->run_q);
+ tf_vcpu_main(vcpu);
+
+ /* Kill all stuff */
+
+ /* Return to main fiber */
+ vmach->startup_fiber.return_context = NULL;
+ tf_current_fiber = NULL;
+ tf_current_vcpu = NULL;
+ tf_current_vmach = NULL;
+
+ tf_uctx_transfer(self->context, vmach->startup_fiber.context);
+}
+
+void tf_vmach_start(void)
+{
+ struct tf_vmachine *vmach;
+ struct tf_vcpu *vcpu;
+
+ TF_BUG_ON(tf_current_vcpu != NULL);
+
+ /* Create a self-fiber so we can surrender control to vcpu */
+ vmach = calloc(1, sizeof(struct tf_vmachine));
+ vmach->vmach.startup_fiber = (struct tf_fiber) {
+ .ref_count = 1,
+ .queue_node = TF_LIST_INITIALIZER(vmach->vmach.startup_fiber.queue_node),
+ .wakeup_q = TF_LIST_HEAD_INITIALIZER(vmach->vmach.startup_fiber.wakeup_q),
+ .context = tf_uctx_create_self(&vmach->startup_uctx),
+ };
+ tf_list_init_head(&vmach->vmach.run_q);
+ vcpu = tf_fiber_create(tf_vmach_main, sizeof(struct tf_vcpu));
+ vmach->machine_init_fiber = vcpu;
+ vcpu->machine = &vmach->vmach;
+
+ /* Create manager fiber to initialize vcpu */
+ tf_uctx_transfer(vmach->vmach.startup_fiber.context,
+ container_of((void *) vcpu, struct tf_fiber, data)->context);
+}
+
+void tf_vmach_stop(void)
+{
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ struct tf_vmach *vmach = tf_vmach_get_current();
+
+ TF_BUG_ON(self != &vmach->startup_fiber);
+
+ /* Wait for the vmachine to stop */
+ tf_vmach_get_current()->num_user_fibers--;
+ while (self->return_context != NULL)
+ tf_fiber_schedule();
+
+ /* And clean up */
+ tf_uctx_destroy(vmach->startup_fiber.context);
+ free(vmach);
+}
diff --git a/test/httpget.c b/test/httpget.c
index 29f7f14..876b63c 100644
--- a/test/httpget.c
+++ b/test/httpget.c
@@ -14,6 +14,7 @@ static void ping_fiber(void *ptr)
struct ctx *ctx = (struct ctx*) ptr;
struct tf_sockaddr host;
struct tf_fd fd;
+ struct tf_timeout to;
char buf[128];
int bytes = 0, r;
const char *req = "GET / HTTP/1.0\r\n\r\n";
@@ -27,7 +28,7 @@ static void ping_fiber(void *ptr)
if (r < 0)
goto err;
- r = tf_timed(tf_connect(&fd, &host), 10000);
+ r = tf_timed(&to, tf_connect(&fd, &host), 10000);
if (r < 0)
goto err_close;
@@ -49,11 +50,11 @@ int main(int argc, char **argv)
struct ctx *c;
int i;
- tf_scheduler_enable(NULL);
+ tf_vmach_start();
for (i = 1; i < argc; i++) {
c = tf_fiber_create(ping_fiber, sizeof(struct ctx));
c->hostname = argv[i];
- tf_fiber_put(c);
+ tf_fiber_run(c);
}
- tf_scheduler_disable();
+ tf_vmach_stop();
}
diff --git a/test/read.c b/test/read.c
index 0dc72cc..9963a62 100644
--- a/test/read.c
+++ b/test/read.c
@@ -34,8 +34,8 @@ static void io_fiber(void *ptr)
int main(int argc, char **argv)
{
- tf_scheduler_enable(NULL);
- tf_fiber_put(tf_fiber_create(time_fiber, 0));
- tf_fiber_put(tf_fiber_create(io_fiber, 0));
- tf_scheduler_disable();
+ tf_vmach_start();
+ tf_fiber_run(tf_fiber_create(time_fiber, 0));
+ tf_fiber_run(tf_fiber_create(io_fiber, 0));
+ tf_vmach_stop();
}
diff --git a/test/simple1.c b/test/simple1.c
index 6f40f8d..b6de289 100644
--- a/test/simple1.c
+++ b/test/simple1.c
@@ -10,7 +10,7 @@ static void work_fiber(void *ptr)
struct ctx *c = (struct ctx*) ptr;
printf("Hello%d.1\n", c->id);
- tf_fiber_yield();
+ tf_msleep(1);
printf("Hello%d.2\n", c->id);
}
@@ -19,11 +19,11 @@ int main(int argc, char **argv)
struct ctx *c;
int i;
- tf_scheduler_enable(NULL);
+ tf_vmach_start();
for (i = 0; i < 6; i++) {
c = tf_fiber_create(work_fiber, sizeof(struct ctx));
c->id = i;
- tf_fiber_put(c);
+ tf_fiber_run(c);
}
- tf_scheduler_disable();
+ tf_vmach_stop();
}
diff --git a/test/sleep.c b/test/sleep.c
index 8e225a7..dd71d91 100644
--- a/test/sleep.c
+++ b/test/sleep.c
@@ -21,10 +21,10 @@ int main(int argc, char **argv)
struct ctx *c;
int i;
- tf_scheduler_enable(NULL);
+ tf_vmach_start();
for (i = 0; i < 1000; i++) {
c = tf_fiber_create(work_fiber, sizeof(struct ctx));
- tf_fiber_put(c);
+ tf_fiber_run(c);
}
- tf_scheduler_disable();
+ tf_vmach_stop();
}