diff options
-rw-r--r-- | DESIGN | 77 | ||||
-rw-r--r-- | include/libtf/atomic.h | 11 | ||||
-rw-r--r-- | include/libtf/defines.h | 8 | ||||
-rw-r--r-- | include/libtf/fiber.h | 141 | ||||
-rw-r--r-- | include/libtf/io.h | 29 | ||||
-rw-r--r-- | include/libtf/scheduler.h | 67 | ||||
-rw-r--r-- | include/libtf/tf.h | 2 | ||||
-rw-r--r-- | include/libtf/vmach.h | 49 | ||||
-rw-r--r-- | src/TFbuild | 2 | ||||
-rw-r--r-- | src/fiber.c | 259 | ||||
-rw-r--r-- | src/ifc.c | 57 | ||||
-rw-r--r-- | src/io-epoll.c | 122 | ||||
-rw-r--r-- | src/io-unix.c | 87 | ||||
-rw-r--r-- | src/scheduler.c | 134 | ||||
-rw-r--r-- | src/timeout.c | 120 | ||||
-rw-r--r-- | src/uctx.h | 41 | ||||
-rw-r--r-- | src/vmach.c | 120 | ||||
-rw-r--r-- | test/httpget.c | 9 | ||||
-rw-r--r-- | test/read.c | 8 | ||||
-rw-r--r-- | test/simple1.c | 8 | ||||
-rw-r--r-- | test/sleep.c | 6 |
21 files changed, 719 insertions, 638 deletions
@@ -1,19 +1,72 @@ SCHEDULER -- 4-heap (or 2-heap) with linked nodes -- epoll -- edge-triggered monitoring for file i/o (no syscalls to modify fdset) +- worker thread pool executes read to be run fibers +- thread pool executes fibers from FIFO queues (one per each priority) +- possible to make main thread separate scheduler so we can bind some + fibers to run in main thread only (not needed?) + +THE BIG SCHEDULING QUEUE (shared queue with all worker threads) + --- works also between machines, in which case we just always queue + work items to remote vmach (need to send wakeups too) + - steal first item from the local cache if any + - mutex lock + - if stuff to push + - while idle_vcpu not empty + - pop idle_vcpu from list + - push item to idle_vcpu + - push rest of items to global lists accordingly + - mutex unlock + - call futex_wake for all idle_cpu's we stuffed + - return stolen item + - if stuff on list + - pop item from list + - mutex unlock + - return popped_item + - insert self to idle_vcpu list + - mutex unlock + - futex_wait on local_pointer + - return local_pointer + - mutex unlock + +WAKEUP + - as minimum keep wakeup bitfield for most common types, and use + prepare sleep, and sleep primitives with acceptable wakeup reasons + - might add so that wakupper calls wakeupee to execute their wakeup + condition check once ask rescheduling based or that or not; this way + empty spin does not require pushing to scheduler queue and thread + pool wakeup + +IO SLEEPING/DISPATCHING +- epoll in edge-triggered monitoring for file i/o +- epoll_wait done in separate thread (or possibly as regular fiber) + which sole purpose is to wakeup threads for thread pool - signalfd for signal handling -- eventfd for thread pool wakeup +- timerfd for timeout handling + +INTER FIBRE CALLS (IFC) +- sends a callback to be execute under some other fibre +- executed at tf_ifc_process() points, or at tf_exit() time +- might need tfc_ifc_wait() for synchronizing with ifc completion +- sending uses atomic LIFO (single atomic cmpxchg) +- receive does LIFO flush (single atomic xchg) and reverses order(?) +- need way to check if IFC is queued or not (so it's safe to reuse it) + +TIMEOUTS +- one (or more) fibers serving as alarm generators +- 4-heap with mremappable heap array +- receives IFC calls from other threads +- receives io wakeups from timerfd +- sends async signal to fiber after timeout FIBERS -- timer node on fiber control data (for delayed heapify; and reuse on timeouts) - fd, signal, pid wait struct on stack of wait function - fiber_kill can interrupt wait -SCHEDULER <-> THREAD POOL -- scheduler queues to thread pool array fifo queue, - - semaphore protects threads so no underqueueing happens - - eventfd notifies scheduler when queue has free space again - - head/tail updated atomically and item position recovered -- thread pool atomically pushes to resume atomic stack and sets eventfd - - scheduler thread atomically pops all and updates fiber states +FIBRE WAKE UP QUEUES +- mostly not needed (alarms, io uses async signal wakeups) +- for mutex, semaphores, possibly send io +- use thread mutexes for list access +- wait on single queue only(?) + +FIBRE MUTEX +- simple wakeup queue, sleep on queue, uninterruptible? + diff --git a/include/libtf/atomic.h b/include/libtf/atomic.h index ec9f1e0..8ebe5f8 100644 --- a/include/libtf/atomic.h +++ b/include/libtf/atomic.h @@ -13,7 +13,14 @@ #ifndef TF_ATOMIC_H #define TF_ATOMIC_H -#define tf_atomic_inc(var) __sync_add_and_fetch(&(var), 1) -#define tf_atomic_dec(var) __sync_add_and_fetch(&(var), -1) +#define tf_atomic_inc(var) \ + __sync_add_and_fetch(&(var), 1) +#define tf_atomic_dec(var) \ + __sync_add_and_fetch(&(var), -1) + +#define tf_atomic_cmpxchg(ptr, old, new) \ + __sync_bool_compare_and_swap(ptr, old, new) +#define tf_atomic_xchg(ptr, new) \ + ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, new)) #endif diff --git a/include/libtf/defines.h b/include/libtf/defines.h index 8e39c7e..44ead8a 100644 --- a/include/libtf/defines.h +++ b/include/libtf/defines.h @@ -57,6 +57,7 @@ #define attribute_warn_unused_result __attribute__((warn_unused_result)) #define attribute_deprecated __attribute__((deprecated)) +/* FIXME: glibc fprintf requires 8kB on stack */ #define TF_BUG_ON(cond) if (unlikely(cond)) { \ fprintf(stderr, "BUG: failure at %s:%d/%s(): %s!\n", \ __FILE__, __LINE__, __func__, #cond); \ @@ -68,14 +69,19 @@ #define TF_EMPTY_ARRAY 0 +#define TF_BIT(n) (1 << (n)) + #ifndef TF_STACK_SIZE -#define TF_STACK_SIZE 4096 +/* FIXME: glibc fprintf requires 8kB on stack */ +#define TF_STACK_SIZE (4*4096) #endif /* Monotonic time */ typedef uint32_t tf_mtime_t; typedef int32_t tf_mtime_diff_t; +tf_mtime_t tf_mtime_now(void); + static inline tf_mtime_diff_t tf_mtime_diff(tf_mtime_t a, tf_mtime_t b) { diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h index d5c6153..f97e963 100644 --- a/include/libtf/fiber.h +++ b/include/libtf/fiber.h @@ -16,64 +16,127 @@ #include <errno.h> #include <libtf/defines.h> #include <libtf/heap.h> +#include <libtf/list.h> -/* Fiber wakeup reasons */ -#define TF_WAKEUP_NONE 0 -#define TF_WAKEUP_IMMEDIATE -EAGAIN -#define TF_WAKEUP_KILL -EINTR -#define TF_WAKEUP_TIMEOUT -ETIME -#define TF_WAKEUP_THIS_TIMEOUT -ETIMEDOUT -#define TF_WAKEUP_FD -EIO +/* Inter-fibre calls */ +struct tf_fiber; +struct tf_ifc; +typedef void (*tf_ifc_handler_t)(void *fiber, struct tf_ifc *msg); -/* Fiber management */ -struct tf_scheduler; -typedef void (*tf_fiber_proc)(void *fiber); +struct tf_ifc { + union { + struct tf_ifc *next; + struct tf_list_node list; + }; + tf_ifc_handler_t handler; + struct tf_fiber * sender; +}; -void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size); -void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size); -void *tf_fiber_get(void *data); -void tf_fiber_put(void *data); -void __tf_fiber_wakeup(void *data, int wakeup_type); -void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node); -int __tf_fiber_schedule(void); -int __tf_fiber_bind_scheduler(struct tf_scheduler *sched); -int __tf_fiber_release_scheduler(struct tf_scheduler *sched); +void tf_ifc_queue(void *fiber, struct tf_ifc *msg, tf_ifc_handler_t handler); +void tf_ifc_complete(void *fiber, struct tf_ifc *msg, tf_ifc_handler_t handler); +void tf_ifc_process(void); +void tf_ifc_process_unordered(void); -void tf_fiber_exit(void) attribute_noreturn; -void tf_fiber_kill(void *fiber); -int tf_fiber_yield(void); +/* Timeouts */ +struct tf_timeout_manager { + struct tf_heap_head heap; + unsigned int num_fibers; +}; + +struct tf_timeout_client { + struct tf_timeout_manager * manager; + tf_mtime_t value; + tf_mtime_t latched; + struct tf_heap_node heap_node; + struct tf_ifc ifc; +}; -/* Scheduling and fiber management */ struct tf_timeout { - tf_mtime_t saved_timeout; - unsigned int timeout_change; + tf_mtime_t saved_timeout; + tf_mtime_t my_timeout; }; -#define tf_timed(func, timeout) \ +static inline int tf_timeout_expired(struct tf_timeout *to) +{ + return tf_mtime_diff(to->my_timeout, tf_mtime_now()) <= 0; +} + +struct tf_timeout_fiber *tf_timeout_fiber_create(void); +void tf_timeout_adjust(struct tf_timeout_client *tc); +void tf_timeout_delete(struct tf_timeout_client *tc); + +#define tf_timed(__to, func, timeout) \ ({ \ - struct tf_timeout __timeout; \ - tf_timeout_push(&__timeout, timeout); \ - tf_timeout_pop(&__timeout, (func)); \ + tf_timeout_push(__to, timeout); \ + tf_timeout_pop(__to, (func)); \ }) -void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds); -int __tf_timeout_pop(struct tf_timeout *timeout, int err); +/* Fibres and their management */ +typedef void *tf_uctx_t; + +struct tf_fiber { + unsigned int ref_count; + unsigned int flags; + tf_uctx_t context; + tf_uctx_t return_context; + struct tf_list_node queue_node; + struct tf_list_head wakeup_q; + struct tf_timeout_client timeout; + struct tf_ifc * pending_ifc; + char data[TF_EMPTY_ARRAY]; +}; + +typedef void (*tf_fiber_proc)(void *fiber); + +void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size); +void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size); +void *tf_fiber_get(void *fiber); +void tf_fiber_put(void *fiber); +int tf_fiber_run(void *fiber); +void tf_fiber_kill(void *fiber); + +void tf_fiber_wakeup(struct tf_fiber *fiber); +int tf_fiber_schedule(void); -static inline int tf_timeout_pop(struct tf_timeout *timeout, int err) +void tf_fiber_exit(void) attribute_noreturn; + +static inline struct tf_fiber *tf_vmach_get_current_fiber(void) { - if (unlikely(timeout->timeout_change)) - return __tf_timeout_pop(timeout, err); + extern __thread struct tf_fiber *tf_current_fiber; + return tf_current_fiber; +} + +static inline void tf_timeout_push(struct tf_timeout *to, tf_mtime_diff_t ms) +{ + struct tf_fiber *f = tf_vmach_get_current_fiber(); + + to->saved_timeout = f->timeout.value; + to->my_timeout = tf_mtime_now() + ms; + if (f->timeout.value == 0 || + tf_mtime_diff(to->my_timeout, f->timeout.value) < 0) + f->timeout.value = to->my_timeout; +} + +static inline int tf_timeout_pop(struct tf_timeout *to, int err) +{ + struct tf_fiber *f = tf_vmach_get_current_fiber(); + + f->timeout.value = to->saved_timeout; return err; } static inline -int tf_msleep(tf_mtime_diff_t milliseconds) +int tf_msleep(tf_mtime_diff_t ms) { + struct tf_timeout to; int r; - r = tf_timed(__tf_fiber_schedule(), milliseconds); - if (r == TF_WAKEUP_THIS_TIMEOUT) - r = 0; - return r; + + tf_timeout_push(&to, ms); + do { + r = tf_fiber_schedule(); + } while (r != -ETIME); + + return tf_timeout_pop(&to, r); } #endif diff --git a/include/libtf/io.h b/include/libtf/io.h index 0d34421..d1098e3 100644 --- a/include/libtf/io.h +++ b/include/libtf/io.h @@ -21,10 +21,12 @@ #include <libtf/defines.h> /* Flags for tf_open_fd() */ -#define TF_FD_AUTOCLOSE 1 -#define TF_FD_STREAM_ORIENTED 2 -#define TF_FD_SET_CLOEXEC 4 -#define TF_FD_ALREADY_NONBLOCKING 8 +#define TF_FD_READ TF_BIT(0) +#define TF_FD_WRITE TF_BIT(1) +#define TF_FD_AUTOCLOSE TF_BIT(2) +#define TF_FD_STREAM_ORIENTED TF_BIT(3) +#define TF_FD_SET_CLOEXEC TF_BIT(4) +#define TF_FD_ALREADY_NONBLOCKING TF_BIT(5) struct tf_sockaddr { union { @@ -37,23 +39,14 @@ struct tf_sockaddr { struct tf_fd { int fd; unsigned int flags; - /* Single waiter -- would be relatively trivial to modify to allow - * multiple waiters, if someone actually needs it */ - unsigned int events; - void *waiting_fiber; + struct tf_fiber *fiber; }; -#define TF_POLL_READ 1 -#define TF_POLL_WRITE 2 - struct tf_poll_hooks { - void (*init)(void); - int (*poll)(tf_mtime_diff_t timeout); - void (*close)(void); - int (*fd_created)(struct tf_fd *fd); - int (*fd_destroyed)(struct tf_fd *fd); - void (*fd_monitor)(struct tf_fd *fd, int events); - void (*fd_unmonitor)(struct tf_fd *fd); + void * (*create)(void); + int (*fd_created)(void *fiber, struct tf_fd *fd); + int (*fd_destroyed)(void *fiber, struct tf_fd *fd); + void (*fd_rearm)(void *fiber, struct tf_fd *fd); }; int tf_open_fd(struct tf_fd *fd, int kfd, int flags); diff --git a/include/libtf/scheduler.h b/include/libtf/scheduler.h deleted file mode 100644 index db5a823..0000000 --- a/include/libtf/scheduler.h +++ /dev/null @@ -1,67 +0,0 @@ -/* scheduler.h - libtf fiber scheduler header - * - * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi> - * All rights reserved. - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 or later as - * published by the Free Software Foundation. - * - * See http://www.gnu.org/ for details. - */ - -#ifndef TF_SCHEDULER_H -#define TF_SCHEDULER_H - -#include <libtf/atomic.h> -#include <libtf/list.h> -#include <libtf/heap.h> -#include <libtf/fiber.h> - -struct tf_poll_hooks; - -struct tf_scheduler { - struct tf_list_head scheduled_q; - struct tf_list_head running_q; - struct tf_heap_head heap; - void * active_fiber; - void * main_fiber; - int num_fibers; - tf_mtime_t scheduler_time; - struct tf_poll_hooks * poller; - unsigned long poll_data[2]; -}; - -static inline -struct tf_scheduler *tf_scheduler_get_current(void) -{ - extern struct tf_scheduler *__tf_scheduler; - TF_BUG_ON(__tf_scheduler == NULL); - return __tf_scheduler; -} - -static inline -tf_mtime_t tf_scheduler_get_mtime(void) -{ - return tf_scheduler_get_current()->scheduler_time; -} - -struct tf_scheduler *tf_scheduler_create(void); -int tf_scheduler_enable(struct tf_scheduler *); -void tf_scheduler_disable(void); - -static inline struct tf_scheduler * -tf_scheduler_get(struct tf_scheduler *s) -{ - tf_fiber_get(s); - return s; -} - -static inline void -tf_scheduler_put(struct tf_scheduler *s) -{ - tf_fiber_put(s); -} - -#endif - diff --git a/include/libtf/tf.h b/include/libtf/tf.h index e613f18..d052ee7 100644 --- a/include/libtf/tf.h +++ b/include/libtf/tf.h @@ -14,7 +14,7 @@ #define TF_H #include <libtf/fiber.h> -#include <libtf/scheduler.h> +#include <libtf/vmach.h> #include <libtf/io.h> #endif diff --git a/include/libtf/vmach.h b/include/libtf/vmach.h new file mode 100644 index 0000000..d302366 --- /dev/null +++ b/include/libtf/vmach.h @@ -0,0 +1,49 @@ +/* vmach.h - "virtual" machine and cpu contexts + * + * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#ifndef TF_VMACH_H +#define TF_VMACH_H + +#include <libtf/fiber.h> + +struct tf_vmach { + struct tf_poll_hooks * poll_ops; + void * poll_fiber; + void * timeout_fiber; + struct tf_fiber startup_fiber; + int num_user_fibers; + struct tf_list_head run_q; +}; + +struct tf_vcpu { + struct tf_vmach * machine; +}; + +static inline struct tf_vmach *tf_vmach_get_current(void) +{ + extern __thread struct tf_vmach *tf_current_vmach; + return tf_current_vmach; +} + +static inline struct tf_vcpu *tf_vmach_get_current_cpu(void) +{ + extern __thread struct tf_vcpu *tf_current_vcpu; + return tf_current_vcpu; +} + +void tf_vmach_start(void); +void tf_vmach_stop(void); + +void tf_vmach_run(struct tf_vmach *vm, struct tf_fiber *f); +void tf_vmach_run_dedicated(struct tf_vmach *vm, struct tf_fiber *f); + +#endif diff --git a/src/TFbuild b/src/TFbuild index 9277f9a..3ffa9ef 100644 --- a/src/TFbuild +++ b/src/TFbuild @@ -1,6 +1,6 @@ libs-y += libtf -libtf-objs-y += fiber.o scheduler.o heap.o +libtf-objs-y += fiber.o heap.o timeout.o vmach.o ifc.o libtf-objs-$(OS_LINUX) += io-epoll.o mem-mmap.o libtf-objs-$(OS_UNIX) += io-unix.o diff --git a/src/fiber.c b/src/fiber.c index e507815..bef7b81 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -13,31 +13,13 @@ #include <time.h> #include <errno.h> #include <unistd.h> +#include <libtf/atomic.h> #include <libtf/fiber.h> -#include <libtf/scheduler.h> +#include <libtf/vmach.h> #include "uctx.h" -#define TF_TIMEOUT_CHANGE_NEEDED 1 -#define TF_TIMEOUT_CHANGE_NEW_VALUE 2 - -struct tf_fiber { - unsigned int ref_count; - struct tf_scheduler * scheduler; - int wakeup_type; - unsigned int timeout_change; - tf_mtime_t timeout; - struct tf_list_node queue_node; - struct tf_heap_node heap_node; - struct tf_uctx context; - char data[TF_EMPTY_ARRAY]; -}; - -static inline -struct tf_fiber *tf_fiber_get_current(void) -{ - void *data = tf_scheduler_get_current()->active_fiber; - return container_of(data, struct tf_fiber, data); -} +#define TF_FIBERF_RUNNING TF_BIT(0) +#define TF_FIBERF_WAKEUP_PENDING TF_BIT(1) static void tf_fiber_main(void *user_data, void *arg) { @@ -48,45 +30,46 @@ static void tf_fiber_main(void *user_data, void *arg) tf_fiber_exit(); } -void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size) +void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size) { - struct tf_fiber *fiber; + struct tf_fiber *self = tf_vmach_get_current_fiber(); + struct tf_fiber *f; - fiber = tf_uctx_create_embedded( + f = tf_uctx_create_embedded( TF_STACK_SIZE, sizeof(struct tf_fiber) + private_size, offsetof(struct tf_fiber, context), tf_fiber_main, fiber_main); - if (fiber == NULL) + if (f == NULL) return NULL; - *fiber = (struct tf_fiber) { + *f = (struct tf_fiber) { .ref_count = 1, - .queue_node = TF_LIST_INITIALIZER(fiber->queue_node), - .context = fiber->context, + .queue_node = TF_LIST_INITIALIZER(f->queue_node), + .context = f->context, + .timeout.manager = self ? self->timeout.manager : NULL, + .wakeup_q = TF_LIST_HEAD_INITIALIZER(f->wakeup_q), }; - return fiber->data; + return f->data; } -void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size) +int tf_fiber_run(void *fiber) { - struct tf_fiber *fiber; - struct tf_scheduler *sched; - - sched = tf_scheduler_get_current(); - if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0) - return NULL; + struct tf_timeout_manager *tm; + struct tf_fiber *f = container_of(fiber, struct tf_fiber, data); - fiber = container_of(__tf_fiber_create(fiber_main, private_size), - struct tf_fiber, data); - sched->num_fibers++; + tm = f->timeout.manager; + if (tm != NULL) { + if (tf_heap_prealloc(&tm->heap, tm->num_fibers + 1) < 0) + return -ENOMEM; + tm->num_fibers++; + } - fiber->scheduler = sched; - fiber->wakeup_type = TF_WAKEUP_NONE; - tf_list_add_tail(&fiber->queue_node, &sched->scheduled_q); + tf_fiber_wakeup(f); + tf_vmach_get_current()->num_user_fibers++; - return tf_fiber_get(fiber->data); + return 0; } void *tf_fiber_get(void *data) @@ -98,23 +81,8 @@ void *tf_fiber_get(void *data) static void __tf_fiber_destroy(struct tf_fiber *fiber) { - struct tf_scheduler *sched = fiber->scheduler; - int main_fiber, num_fibers; - - /* decrease first the number of fibers as we might be - * killing the scheduler it self */ - num_fibers = --sched->num_fibers; - - main_fiber = (fiber->context.alloc == NULL); - tf_heap_delete(&fiber->heap_node, &sched->heap); - tf_uctx_destroy(&fiber->context); - if (main_fiber) - free(fiber); - - if (num_fibers == 1) { - /* FIXME: Use proper fiber event*/ - __tf_fiber_wakeup(sched->main_fiber, TF_WAKEUP_IMMEDIATE); - } + tf_heap_delete(&fiber->timeout.heap_node, &fiber->timeout.manager->heap); + tf_uctx_destroy(fiber->context); } void tf_fiber_put(void *data) @@ -124,165 +92,60 @@ void tf_fiber_put(void *data) __tf_fiber_destroy(fiber); } -void __tf_fiber_wakeup(void *data, int wakeup_type) +void tf_fiber_wakeup(struct tf_fiber *f) { - struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); - struct tf_scheduler *sched = fiber->scheduler; + struct tf_fiber *self = tf_vmach_get_current_fiber(); + unsigned int newval, oldval; - if (fiber->wakeup_type == TF_WAKEUP_NONE) { - fiber->wakeup_type = wakeup_type; - tf_list_add_tail(&fiber->queue_node, &sched->running_q); - } -} + do { + oldval = f->flags; + if (oldval & TF_FIBERF_WAKEUP_PENDING) + return; + newval = oldval | TF_FIBERF_WAKEUP_PENDING | TF_FIBERF_RUNNING; + } while (!tf_atomic_cmpxchg(&f->flags, oldval, newval)); -void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node) -{ - __tf_fiber_wakeup(container_of(node, struct tf_fiber, heap_node)->data, - TF_WAKEUP_TIMEOUT); + if (!(oldval & TF_FIBERF_RUNNING)) + tf_list_add_tail(&f->queue_node, &self->wakeup_q); } -int __tf_fiber_schedule(void) +int tf_fiber_schedule(void) { - struct tf_scheduler *sched = tf_scheduler_get_current(); - struct tf_fiber *f = tf_fiber_get_current(), *nf; - int wakeup; + struct tf_fiber *f = tf_vmach_get_current_fiber(); - if (unlikely(f->timeout_change)) { - if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) { - if (tf_mtime_diff(f->timeout, tf_scheduler_get_mtime()) <= 0) { - f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; - return TF_WAKEUP_TIMEOUT; - } - tf_heap_change(&f->heap_node, &sched->heap, f->timeout); - } else - tf_heap_delete(&f->heap_node, &sched->heap); - f->timeout_change = 0; - } + if (f->timeout.value != 0 && + tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0) + return -ETIME; - /* Figure out the next fibre to run */ - if (unlikely(tf_list_empty(&sched->scheduled_q))) { - tf_list_splice_tail(&sched->running_q, - &sched->scheduled_q); - TF_BUG_ON(tf_list_empty(&sched->scheduled_q)); + if (f->flags & TF_FIBERF_WAKEUP_PENDING) { + f->flags = TF_FIBERF_RUNNING; + return 0; } - nf = tf_list_entry(tf_list_pop(&sched->scheduled_q), - struct tf_fiber, queue_node); - sched->active_fiber = nf->data; - tf_uctx_transfer(&f->context, &nf->context); - - wakeup = f->wakeup_type; - f->wakeup_type = TF_WAKEUP_NONE; - - return wakeup; -} - -int __tf_fiber_bind_scheduler(struct tf_scheduler *sched) -{ - struct tf_fiber *f; - f = malloc(sizeof(struct tf_fiber)); - if (f == NULL) - return -ENOMEM; + if (f->timeout.value != f->timeout.latched) + tf_timeout_adjust(&f->timeout); - /* Mark currently active main fiber as active */ - *f = (struct tf_fiber) { - .ref_count = 1, - .scheduler = sched, - .queue_node = TF_LIST_INITIALIZER(f->queue_node), - }; - tf_uctx_create_self(&f->context); - sched->main_fiber = f->data; - sched->active_fiber = f->data; - sched->num_fibers++; + f->flags = 0; + tf_uctx_transfer(f->context, f->return_context); + f->flags = TF_FIBERF_RUNNING; - /* Schedule scheduler fiber */ - f = container_of((void *) sched, struct tf_fiber, data); - f->scheduler = sched; - f->wakeup_type = TF_WAKEUP_IMMEDIATE; - tf_list_add_tail(&f->queue_node, &sched->running_q); - - return 0; -} - -int __tf_fiber_release_scheduler(struct tf_scheduler *sched) -{ - struct tf_fiber *f; - - /* Detach scheduler */ - f = container_of((void *) sched, struct tf_fiber, data); - tf_list_del(&f->queue_node); - - /* Detach main stack from this scheduler */ - f = container_of((void *) sched->main_fiber, struct tf_fiber, data); - tf_fiber_put(sched->main_fiber); - sched->main_fiber = NULL; - sched->num_fibers--; + if (f->timeout.value != 0 && + tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0) + return -ETIME; return 0; } void tf_fiber_exit(void) { - struct tf_scheduler *sched = tf_scheduler_get_current(); - struct tf_fiber *f = tf_fiber_get_current(); - struct tf_fiber *schedf = container_of((void *) sched, struct tf_fiber, data); + struct tf_fiber *f = tf_vmach_get_current_fiber(); - tf_heap_delete(&f->heap_node, &sched->heap); - schedf->wakeup_type = TF_WAKEUP_KILL; - tf_uctx_transfer(&f->context, &schedf->context); + if (f->timeout.manager != NULL) + tf_timeout_delete(&f->timeout); + tf_vmach_get_current()->num_user_fibers--; + tf_uctx_transfer(f->context, f->return_context); TF_BUG_ON(1); } void tf_fiber_kill(void *fiber) { } - -int tf_fiber_yield(void) -{ - struct tf_scheduler *sched = tf_scheduler_get_current(); - struct tf_fiber *f = tf_fiber_get_current(); - - TF_BUG_ON(tf_list_hashed(&f->queue_node)); - f->wakeup_type = TF_WAKEUP_IMMEDIATE; - tf_list_add_tail(&f->queue_node, &sched->running_q); - - return __tf_fiber_schedule(); -} - -void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds) -{ - struct tf_fiber *f = tf_fiber_get_current(); - tf_mtime_t abs = tf_scheduler_get_mtime() + milliseconds; - int active; - - if (f->timeout_change) - active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE); - else - active = tf_heap_node_active(&f->heap_node); - - if (!active || tf_mtime_diff(abs, f->timeout) < 0) { - /* Save previous timeout */ - timeout->saved_timeout = f->timeout; - timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; - if (active) - timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE; - - /* Make new timeout pending */ - f->timeout = abs; - f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED - | TF_TIMEOUT_CHANGE_NEW_VALUE; - } else { - timeout->timeout_change = 0; - } -} - -int __tf_timeout_pop(struct tf_timeout *timeout, int err) -{ - struct tf_fiber *f = tf_fiber_get_current(); - - f->timeout = timeout->saved_timeout; - f->timeout_change = timeout->timeout_change; - if (err == TF_WAKEUP_TIMEOUT) - err = TF_WAKEUP_THIS_TIMEOUT; - return err; -} diff --git a/src/ifc.c b/src/ifc.c new file mode 100644 index 0000000..2dfafe8 --- /dev/null +++ b/src/ifc.c @@ -0,0 +1,57 @@ +/* ifc.c - inter fiber communications + * + * Copyright (C) 2010 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#include <libtf/atomic.h> +#include <libtf/fiber.h> + +void tf_ifc_queue(void *fiber, struct tf_ifc *ifc, tf_ifc_handler_t handler) +{ + struct tf_fiber *f = container_of(fiber, struct tf_fiber, data); + struct tf_ifc *old; + + TF_BUG_ON(ifc->next != NULL); + ifc->handler = handler; + do { + old = f->pending_ifc; + ifc->next = old; + } while (!tf_atomic_cmpxchg(&f->pending_ifc, old, ifc)); + + tf_fiber_wakeup(f); +} + +void tf_ifc_complete(void *fiber, struct tf_ifc *ifc, tf_ifc_handler_t handler) +{ + ifc->sender = tf_vmach_get_current_fiber(); + tf_ifc_queue(fiber, ifc, handler); + while (ifc->sender != NULL) + tf_fiber_schedule(); +} + +void tf_ifc_process_unordered(void) +{ + struct tf_fiber *f = tf_vmach_get_current_fiber(), *s; + struct tf_ifc *pending, *ifc; + + while (f->pending_ifc != NULL) { + pending = tf_atomic_xchg(&f->pending_ifc, NULL); + while (pending) { + ifc = pending; + pending = ifc->next; + ifc->handler(f->data, ifc); + s = ifc->sender; + ifc->next = NULL; + ifc->sender = NULL; + if (s != NULL) + tf_fiber_wakeup(s); + } + } +} diff --git a/src/io-epoll.c b/src/io-epoll.c index 32aa090..1fc9ca1 100644 --- a/src/io-epoll.c +++ b/src/io-epoll.c @@ -16,74 +16,70 @@ #include <sys/epoll.h> #include <libtf/io.h> -#include <libtf/scheduler.h> +#include <libtf/fiber.h> -struct tf_poll_data { +struct tf_epoll_data { int epoll_fd; - int num_waiters; }; -static struct tf_poll_data *tf_epoll_get_data(void) +static void tf_epoll_main(void *ctx) { - struct tf_scheduler *sched = tf_scheduler_get_current(); - TF_BUILD_BUG_ON(sizeof(struct tf_poll_data) > sizeof(sched->poll_data)); - return (struct tf_poll_data *) &sched->poll_data; -} - -static void tf_epoll_init(void) -{ - struct tf_poll_data *pd = tf_epoll_get_data(); + struct tf_epoll_data *pd = ctx; + struct epoll_event events[64]; + struct tf_fd *fd; + int r, i; - pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC); - pd->num_waiters = 0; - TF_BUG_ON(pd->epoll_fd < 0); -} + do { + r = epoll_wait(pd->epoll_fd, events, array_size(events), 0); + if (r == 0) { + /* FIXME: yielding is bad */ + struct tf_fiber *self = tf_vmach_get_current_fiber(); + tf_list_add_tail(&self->queue_node, &self->wakeup_q); + if (tf_fiber_schedule() == 0) + continue; + } -static void tf_epoll_close(void) -{ - struct tf_poll_data *pd = tf_epoll_get_data(); + for (i = 0; i < r; i++) { + fd = (struct tf_fd *) events[i].data.ptr; + tf_fiber_wakeup(fd->fiber); + } + } while (1); close(pd->epoll_fd); } -static int tf_epoll_poll(tf_mtime_diff_t timeout) + +static void *tf_epoll_create(void) { - struct tf_poll_data *pd = tf_epoll_get_data(); - struct epoll_event events[64]; - struct tf_fd *fd; - int r, i, ret; + struct tf_epoll_data *d; - if (timeout == 0 && pd->num_waiters == 0) - return TF_WAKEUP_TIMEOUT; + d = tf_fiber_create(tf_epoll_main, sizeof(struct tf_epoll_data)); + if (d == NULL) + return NULL; - ret = TF_WAKEUP_TIMEOUT; - do { - r = epoll_wait(pd->epoll_fd, events, array_size(events), timeout); - if (r == 0) - break; + d->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + TF_BUG_ON(d->epoll_fd < 0); - for (i = 0; i < r; i++) { - fd = (struct tf_fd *) events[i].data.ptr; - if (likely(fd->events & events[i].events)) - __tf_fiber_wakeup(fd->waiting_fiber, TF_WAKEUP_FD); - } - ret = TF_WAKEUP_FD; - timeout = 0; - } while (unlikely(r == array_size(events))); + tf_fiber_run(tf_fiber_get(d)); - return ret; + return d; } -static int tf_epoll_fd_created(struct tf_fd *fd) +static int tf_epoll_fd_created(void *fiber, struct tf_fd *fd) { - struct tf_poll_data *pd = tf_epoll_get_data(); + struct tf_epoll_data *d = fiber; struct epoll_event ev; int r; ev = (struct epoll_event) { - .events = EPOLLIN | EPOLLOUT | EPOLLET, + .events = EPOLLET, .data.ptr = fd, }; - r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); + if (fd->flags & TF_FD_READ) + ev.events |= EPOLLIN; + if (fd->flags & TF_FD_WRITE) + ev.events |= EPOLLOUT; + + r = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); if (unlikely(r < 0)) { TF_BUG_ON(errno == EEXIST); r = -errno; @@ -93,46 +89,18 @@ static int tf_epoll_fd_created(struct tf_fd *fd) return 0; } -static int tf_epoll_fd_destroyed(struct tf_fd *fd) +static int tf_epoll_fd_destroyed(void *fiber, struct tf_fd *fd) { - struct tf_poll_data *pd = tf_epoll_get_data(); + struct tf_epoll_data *d = fiber; - if (fd->flags & TF_FD_AUTOCLOSE) - return 0; + if (!(fd->flags & TF_FD_AUTOCLOSE)) + epoll_ctl(d->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); - epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); return 0; } -static void tf_epoll_fd_monitor(struct tf_fd *fd, int events) -{ - struct tf_poll_data *pd = tf_epoll_get_data(); - - TF_BUG_ON(fd->waiting_fiber != NULL); - fd->events = EPOLLERR | EPOLLHUP; - if (events & TF_POLL_READ) - fd->events |= EPOLLIN; - if (events & TF_POLL_WRITE) - fd->events |= EPOLLOUT; - fd->waiting_fiber = tf_scheduler_get_current()->active_fiber; - pd->num_waiters++; -} - -static void tf_epoll_fd_unmonitor(struct tf_fd *fd) -{ - struct tf_poll_data *pd = tf_epoll_get_data(); - - fd->waiting_fiber = NULL; - fd->events = 0; - pd->num_waiters--; -} - struct tf_poll_hooks tf_epoll_hooks = { - .init = tf_epoll_init, - .close = tf_epoll_close, - .poll = tf_epoll_poll, + .create = tf_epoll_create, .fd_created = tf_epoll_fd_created, .fd_destroyed = tf_epoll_fd_destroyed, - .fd_monitor = tf_epoll_fd_monitor, - .fd_unmonitor = tf_epoll_fd_unmonitor, }; diff --git a/src/io-unix.c b/src/io-unix.c index d80592a..7cd4fd0 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -17,10 +17,8 @@ #include <string.h> #include <libtf/io.h> -#include <libtf/scheduler.h> - -#define TF_POLL_HOOKS \ -struct tf_poll_hooks *poller = tf_scheduler_get_current()->poller; +#include <libtf/fiber.h> +#include <libtf/vmach.h> static inline int tf_sockaddr_len(const struct tf_sockaddr *addr) { @@ -38,14 +36,14 @@ static inline int tf_sockaddr_len(const struct tf_sockaddr *addr) int tf_open_fd(struct tf_fd *fd, int kfd, int flags) { - TF_POLL_HOOKS; + struct tf_vmach *vm = tf_vmach_get_current(); int r; fd->fd = kfd; fd->flags = flags; - fd->waiting_fiber = NULL; + fd->fiber = tf_vmach_get_current_fiber(); - r = poller->fd_created(fd); + r = vm->poll_ops->fd_created(vm->poll_fiber, fd); if (r < 0) { if (flags & TF_FD_AUTOCLOSE) close(kfd); @@ -88,10 +86,10 @@ int tf_open(struct tf_fd *fd, const char *pathname, int flags) int tf_close(struct tf_fd *fd) { - TF_POLL_HOOKS; + struct tf_vmach *vm = tf_vmach_get_current(); int r; - poller->fd_destroyed(fd); + vm->poll_ops->fd_destroyed(vm->poll_fiber, fd); if (fd->flags & TF_FD_AUTOCLOSE) { r = close(fd->fd); if (unlikely(r == -1)) @@ -104,11 +102,9 @@ int tf_close(struct tf_fd *fd) int tf_read_fully(struct tf_fd *fd, void *buf, size_t count) { - TF_POLL_HOOKS; ssize_t n; - int r; + int r = 0; - poller->fd_monitor(fd, TF_POLL_READ); do { n = read(fd->fd, buf, count); if (n == count) { @@ -132,20 +128,17 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count) continue; } - r = __tf_fiber_schedule(); - } while (r == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + r = tf_fiber_schedule(); + } while (r != 0); return -r; } int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count) { - TF_POLL_HOOKS; ssize_t n; - int r; + int r = 0; - poller->fd_monitor(fd, TF_POLL_WRITE); do { n = write(fd->fd, buf, count); if (n == count) { @@ -166,19 +159,16 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count) continue; } - r = __tf_fiber_schedule(); - } while (r == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + r = tf_fiber_schedule(); + } while (r != 0); return -r; } ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count) { - TF_POLL_HOOKS; ssize_t n; - poller->fd_monitor(fd, TF_POLL_READ); do { n = read(fd->fd, buf, count); if (n >= 0) @@ -189,19 +179,16 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count) n = -errno; break; } - n = __tf_fiber_schedule(); - } while (n == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + n = tf_fiber_schedule(); + } while (n != 0); return n; } ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count) { - TF_POLL_HOOKS; ssize_t n; - poller->fd_monitor(fd, TF_POLL_WRITE); do { n = write(fd->fd, buf, count); if (n >= 0) @@ -212,9 +199,8 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count) n = -errno; break; } - n = __tf_fiber_schedule(); - } while (n == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + n = tf_fiber_schedule(); + } while (n != 0); return n; } @@ -261,7 +247,6 @@ int tf_listen(struct tf_fd *fd, int backlog) int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, struct tf_sockaddr *from) { - TF_POLL_HOOKS; int r, tfdf; struct sockaddr *addr = NULL; socklen_t al, *pal = NULL; @@ -272,24 +257,19 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, pal = &al; } - tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED); - tfdf |= TF_FD_SET_CLOEXEC; + tfdf = TF_FD_AUTOCLOSE | TF_FD_ALREADY_NONBLOCKING | + (listen_fd->flags & TF_FD_STREAM_ORIENTED); - poller->fd_monitor(listen_fd, TF_POLL_READ); do { - /* FIXME: use accept4 if available */ - r = accept(listen_fd->fd, addr, pal); + r = accept4(listen_fd->fd, addr, pal, SOCK_NONBLOCK|SOCK_CLOEXEC); if (r >= 0) break; if (errno == EINTR) continue; - if (errno != EAGAIN) { - poller->fd_unmonitor(listen_fd); + if (errno != EAGAIN) return -errno; - } - r = __tf_fiber_schedule(); - } while (r == TF_WAKEUP_FD); - poller->fd_unmonitor(listen_fd); + r = tf_fiber_schedule(); + } while (r != 0); if (r < 0) return r; @@ -298,7 +278,6 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to) { - TF_POLL_HOOKS; socklen_t l = sizeof(int); int r, err; @@ -310,10 +289,8 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to) return -errno; /* Wait for socket to become readable */ - poller->fd_monitor(fd, TF_POLL_WRITE); - r = __tf_fiber_schedule(); - poller->fd_unmonitor(fd); - if (r != TF_WAKEUP_FD) + r = tf_fiber_schedule(); + if (r != 0) return r; /* Check for error */ @@ -327,7 +304,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd, struct tf_sockaddr *to, void *buf, size_t len) { - TF_POLL_HOOKS; struct iovec iov; struct msghdr msg; struct cmsghdr *cmsg; @@ -347,7 +323,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd, .msg_iovlen = 1, }; - poller->fd_monitor(fd, TF_POLL_READ); do { r = recvmsg(fd->fd, &msg, 0); if (r >= 0) @@ -356,9 +331,8 @@ ssize_t tf_recvmsg(struct tf_fd *fd, r = -errno; break; } - r = __tf_fiber_schedule(); - } while (r == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + r = tf_fiber_schedule(); + } while (r != 0); if (r < 0 || to == NULL) return r; @@ -382,7 +356,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd, const struct tf_sockaddr *to, const void *buf, size_t len) { - TF_POLL_HOOKS; struct msghdr msg; struct iovec iov; struct { @@ -411,7 +384,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd, msg.msg_controllen = sizeof(cmsg); } - poller->fd_monitor(fd, TF_POLL_WRITE); do { r = sendmsg(fd->fd, &msg, 0); if (r >= 0) @@ -420,9 +392,8 @@ ssize_t tf_sendmsg(struct tf_fd *fd, r = -errno; break; } - r = __tf_fiber_schedule(); - } while (r == TF_WAKEUP_FD); - poller->fd_unmonitor(fd); + r = tf_fiber_schedule(); + } while (r != 0); return r; } diff --git a/src/scheduler.c b/src/scheduler.c deleted file mode 100644 index a103d0a..0000000 --- a/src/scheduler.c +++ /dev/null @@ -1,134 +0,0 @@ -/* scheduler.c - fiber scheduling - * - * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi> - * All rights reserved. - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 or later as - * published by the Free Software Foundation. - * - * See http://www.gnu.org/ for details. - */ - -#include <time.h> -#include <libtf/scheduler.h> -#include <libtf/io.h> - -/* FIXME: should be in thread local storage */ -extern struct tf_poll_hooks tf_epoll_hooks; -struct tf_scheduler *__tf_scheduler; - -static void update_time(struct tf_scheduler *sched) -{ - struct timespec ts; - - clock_gettime(CLOCK_MONOTONIC, &ts); - sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; -} - -static void process_heap(struct tf_scheduler *sched) -{ - struct tf_heap_node *node; - tf_mtime_t now = sched->scheduler_time; - - while (!tf_heap_empty(&sched->heap) && - tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) { - node = tf_heap_get_node(&sched->heap); - tf_heap_delete(node, &sched->heap); - __tf_fiber_wakeup_heapnode(node); - } -} - -void tf_scheduler_fiber(void *data) -{ - struct tf_scheduler *sched = (struct tf_scheduler *) data; - - do { - tf_mtime_diff_t timeout; - - update_time(sched); - if (!tf_list_empty(&sched->scheduled_q) || - !tf_list_empty(&sched->running_q)) { - timeout = 0; - } else if (!tf_heap_empty(&sched->heap)) { - timeout = tf_mtime_diff( - tf_heap_get_value(&sched->heap), - tf_scheduler_get_mtime()); - if (timeout < 0) - timeout = 0; - } else { - timeout = -1; - } - - if (sched->poller->poll(timeout) == TF_WAKEUP_TIMEOUT && - timeout >= 0) { - sched->scheduler_time += timeout; - process_heap(sched); - } - - if (tf_fiber_yield() == TF_WAKEUP_KILL) { - do { - tf_fiber_put(sched->active_fiber); - sched->active_fiber = sched; - } while (__tf_fiber_schedule() == TF_WAKEUP_KILL); - } - } while (1); -} - -struct tf_scheduler *tf_scheduler_create(void) -{ - struct tf_scheduler *sched; - - sched = __tf_fiber_create(tf_scheduler_fiber, - sizeof(struct tf_scheduler)); - - *sched = (struct tf_scheduler) { - .scheduled_q = TF_LIST_HEAD_INITIALIZER(sched->scheduled_q), - .running_q = TF_LIST_HEAD_INITIALIZER(sched->running_q), - }; - - return sched; -} - -int tf_scheduler_enable(struct tf_scheduler *sched) -{ - struct tf_scheduler *s = sched; - - if (s == NULL) { - s = tf_scheduler_create(); - if (s == NULL) - return -ENOMEM; - } - if (s->main_fiber != NULL) - return -EBUSY; - - __tf_fiber_bind_scheduler(s); - __tf_scheduler = s; - s->poller = &tf_epoll_hooks; - s->poller->init(); - update_time(s); - - if (sched != NULL) - tf_scheduler_get(sched); - - return 0; -} - -void tf_scheduler_disable(void) -{ - struct tf_scheduler *sched = __tf_scheduler; - - if (sched == NULL || - sched->main_fiber != sched->active_fiber) - return; - - /* sleep until no others */ - while (sched->num_fibers > 1) - __tf_fiber_schedule(); - - sched->poller->close(); - __tf_scheduler = NULL; - __tf_fiber_release_scheduler(sched); - tf_heap_destroy(&sched->heap); - tf_fiber_put(sched); -} diff --git a/src/timeout.c b/src/timeout.c new file mode 100644 index 0000000..fc7acd2 --- /dev/null +++ b/src/timeout.c @@ -0,0 +1,120 @@ +/* timeout.c - timerfd based fiber wakeups + * + * Copyright (C) 2010 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#include <libtf/fiber.h> +#include <libtf/io.h> +#include <sys/timerfd.h> + +struct tf_timeout_fiber { + struct tf_timeout_manager manager; + struct tf_fd fd; + tf_mtime_t programmed; +}; + +static tf_mtime_t tf_mtime_cached; + +tf_mtime_t tf_mtime_now(void) +{ + return tf_mtime_cached; +} + +tf_mtime_t tf_mtime_update(void) +{ + struct timespec ts; + + clock_gettime(CLOCK_MONOTONIC, &ts); + tf_mtime_cached = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; + + return tf_mtime_cached; +} + +static void tf_timeout_process_heap(struct tf_timeout_fiber *t) +{ + struct tf_heap_head *heap = &t->manager.heap; + struct tf_heap_node *node; + tf_mtime_t now = tf_mtime_update(); + tf_mtime_diff_t timeout, value; + + while (!tf_heap_empty(heap)) { + value = tf_heap_get_value(heap); + timeout = tf_mtime_diff(value, now); + if (timeout > 0) { + if (t->programmed != value) { + struct itimerspec its = { + .it_value.tv_sec = timeout / 1000, + .it_value.tv_nsec = (timeout % 1000) * 1000000, + }; + t->programmed = value; + timerfd_settime(t->fd.fd, 0, &its, NULL); + } + break; + } + + node = tf_heap_get_node(heap); + tf_heap_delete(node, heap); + tf_fiber_wakeup(container_of(node, struct tf_fiber, timeout.heap_node)); + } +} + +static void tf_timeout_worker(void *ctx) +{ + do { + tf_ifc_process_unordered(); + tf_timeout_process_heap(ctx); + } while (tf_fiber_schedule() == 0); +} + +struct tf_timeout_fiber *tf_timeout_fiber_create(void) +{ + struct tf_timeout_fiber *f; + + f = tf_fiber_create(tf_timeout_worker, sizeof(struct tf_timeout_fiber)); + if (f == NULL) + return f; + + if (tf_open_fd(&f->fd, + timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC), + TF_FD_READ | TF_FD_ALREADY_NONBLOCKING | TF_FD_AUTOCLOSE) != 0) { + tf_fiber_put(f); + return NULL; + } + f->fd.fiber = container_of((void *) f, struct tf_fiber, data); + tf_fiber_run(tf_fiber_get(f)); + + return f; +} + +static void tf_timeout_adjust_ifc(void *fiber, struct tf_ifc *ifc) +{ + struct tf_timeout_fiber *t = fiber; + struct tf_timeout_client *c = container_of(ifc, struct tf_timeout_client, ifc); + tf_mtime_t val; + + val = c->value; + if (val == 0) + tf_heap_delete(&c->heap_node, &t->manager.heap); + else + tf_heap_change(&c->heap_node, &t->manager.heap, val); + c->latched = val; +} + +void tf_timeout_adjust(struct tf_timeout_client *tc) +{ + tf_ifc_queue(tc->manager, &tc->ifc, tf_timeout_adjust_ifc); +} + +void tf_timeout_delete(struct tf_timeout_client *tc) +{ + tc->value = 0; + tc->latched = 0; + tf_ifc_complete(tc->manager, &tc->ifc, tf_timeout_adjust_ifc); +} @@ -18,19 +18,18 @@ #ifdef VALGRIND #include <valgrind/valgrind.h> -#else -#define VALGRIND_STACK_REGISTER(stack_base, size) 0 -#define VALGRIND_STACK_DEREGISTER(stack_id) #endif #define STACK_GUARD 0xbad57ac4 struct tf_uctx { - int *stack_guard; - size_t size; - void *alloc; - void *current_sp; - unsigned int stack_id; + int * stack_guard; + size_t size; + void * alloc; + void * current_sp; +#ifdef VALGRIND + unsigned int stack_id; +#endif }; #if defined(__i386__) @@ -89,13 +88,14 @@ static inline void stack_push_ptr(void **stackptr, void *ptr) } -static inline void tf_uctx_create_self(struct tf_uctx *uctx) +static inline tf_uctx_t tf_uctx_create_self(struct tf_uctx *uctx) { static int dummy_guard = STACK_GUARD; *uctx = (struct tf_uctx) { .stack_guard = &dummy_guard, }; + return uctx; } static inline void * @@ -118,20 +118,24 @@ tf_uctx_create_embedded( /* Create initial stack frame (cdecl convention) */ stack = stack_pointer(stack_base, size); - user_data = stack_push(&stack, TF_ALIGN(private_size, 64)); + user_data = stack_push(&stack, TF_ALIGN(private_size, 16)); + uctx = stack_push(&stack, TF_ALIGN(sizeof(struct tf_uctx), 16)); stack_push_ptr(&stack, main_argument); stack_push_ptr(&stack, user_data); stack_push_ptr(&stack, NULL); stack_push_ptr(&stack, stack_frame_main); /* eip */ stack_push_ptr(&stack, NULL); /* ebp */ - uctx = user_data + uctx_offset; + *((tf_uctx_t *) (user_data + uctx_offset)) = uctx; + *uctx = (struct tf_uctx) { .stack_guard = stack_guard(stack_base, size), .alloc = stack_base, .size = size, .current_sp = stack, +#ifdef VALGRIND .stack_id = VALGRIND_STACK_REGISTER(stack_base, stack_base+size), +#endif }; *uctx->stack_guard = STACK_GUARD; @@ -139,18 +143,25 @@ tf_uctx_create_embedded( } static inline -void tf_uctx_destroy(struct tf_uctx *uctx) +void tf_uctx_destroy(tf_uctx_t ctx) { + struct tf_uctx *uctx = ctx; + if (uctx->alloc != NULL) { +#ifdef VALGRIND VALGRIND_STACK_DEREGISTER(uctx->stack_id); +#endif tf_bmem_free(uctx->alloc, uctx->size); } } static inline -void tf_uctx_transfer(struct tf_uctx *from, struct tf_uctx *to) +void tf_uctx_transfer(tf_uctx_t from, tf_uctx_t to) { + struct tf_uctx *ufrom = from; + struct tf_uctx *uto = to; + /* Switch stack pointers */ - TF_BUG_ON(*from->stack_guard != STACK_GUARD); - switch_fiber(from, to); + TF_BUG_ON(*ufrom->stack_guard != STACK_GUARD); + switch_fiber(ufrom, uto); } diff --git a/src/vmach.c b/src/vmach.c new file mode 100644 index 0000000..a9ca446 --- /dev/null +++ b/src/vmach.c @@ -0,0 +1,120 @@ +#include <libtf/defines.h> +#include <libtf/list.h> +#include <libtf/vmach.h> +#include <libtf/io.h> +#include "uctx.h" + +__thread struct tf_fiber *tf_current_fiber; +__thread struct tf_vcpu *tf_current_vcpu; +__thread struct tf_vmach *tf_current_vmach; + +extern struct tf_poll_hooks tf_epoll_hooks; + +struct tf_vmachine { + struct tf_vmach vmach; + struct tf_uctx startup_uctx; + void *machine_init_fiber; +}; + +static void tf_vcpu_main(void *fiber_data) +{ + struct tf_vcpu *vcpu = fiber_data; + struct tf_vmach *vmach = vcpu->machine; + struct tf_fiber *self = container_of(fiber_data, struct tf_fiber, data); + struct tf_fiber *f; + + tf_current_vmach = vmach; + tf_current_vcpu = vcpu; + + while (vmach->num_user_fibers != 0) { + if (tf_list_empty(&vmach->run_q)) { + /* sleep */ + continue; + } + + f = tf_list_entry(tf_list_pop(&vmach->run_q), + struct tf_fiber, queue_node); + + f->return_context = self->context; + tf_current_fiber = f; + tf_uctx_transfer(self->context, f->context); + tf_list_splice_tail(&f->wakeup_q, &vmach->run_q); + } +} + +static void tf_vmach_main(void *fiber_data) +{ + struct tf_fiber *self = container_of(fiber_data, struct tf_fiber, data); + struct tf_vcpu *vcpu = fiber_data; + struct tf_vmach *vmach = vcpu->machine; + + tf_current_vmach = vmach; + tf_current_vcpu = vcpu; + tf_current_fiber = self; + + /* Initialize IO subsystem */ + vmach->poll_ops = &tf_epoll_hooks; + vmach->poll_fiber = vmach->poll_ops->create(); + vmach->timeout_fiber = tf_timeout_fiber_create(); + vmach->startup_fiber.timeout.manager = vmach->timeout_fiber; + + /* Run the initial fiber */ + tf_fiber_wakeup(&vmach->startup_fiber); + + /* Use main thread as a regular vcpu */ + vmach->num_user_fibers = 1; + tf_list_splice_tail(&self->wakeup_q, &vmach->run_q); + tf_vcpu_main(vcpu); + + /* Kill all stuff */ + + /* Return to main fiber */ + vmach->startup_fiber.return_context = NULL; + tf_current_fiber = NULL; + tf_current_vcpu = NULL; + tf_current_vmach = NULL; + + tf_uctx_transfer(self->context, vmach->startup_fiber.context); +} + +void tf_vmach_start(void) +{ + struct tf_vmachine *vmach; + struct tf_vcpu *vcpu; + + TF_BUG_ON(tf_current_vcpu != NULL); + + /* Create a self-fiber so we can surrender control to vcpu */ + vmach = calloc(1, sizeof(struct tf_vmachine)); + vmach->vmach.startup_fiber = (struct tf_fiber) { + .ref_count = 1, + .queue_node = TF_LIST_INITIALIZER(vmach->vmach.startup_fiber.queue_node), + .wakeup_q = TF_LIST_HEAD_INITIALIZER(vmach->vmach.startup_fiber.wakeup_q), + .context = tf_uctx_create_self(&vmach->startup_uctx), + }; + tf_list_init_head(&vmach->vmach.run_q); + vcpu = tf_fiber_create(tf_vmach_main, sizeof(struct tf_vcpu)); + vmach->machine_init_fiber = vcpu; + vcpu->machine = &vmach->vmach; + + /* Create manager fiber to initialize vcpu */ + tf_uctx_transfer(vmach->vmach.startup_fiber.context, + container_of((void *) vcpu, struct tf_fiber, data)->context); +} + +void tf_vmach_stop(void) +{ + struct tf_fiber *self = tf_vmach_get_current_fiber(); + struct tf_vmach *vmach = tf_vmach_get_current(); + + TF_BUG_ON(self != &vmach->startup_fiber); + + /* Wait for the vmachine to stop */ + tf_vmach_get_current()->num_user_fibers--; + while (self->return_context != NULL) + tf_fiber_schedule(); + + /* And clean up */ + tf_uctx_destroy(vmach->startup_fiber.context); + free(vmach); +} diff --git a/test/httpget.c b/test/httpget.c index 29f7f14..876b63c 100644 --- a/test/httpget.c +++ b/test/httpget.c @@ -14,6 +14,7 @@ static void ping_fiber(void *ptr) struct ctx *ctx = (struct ctx*) ptr; struct tf_sockaddr host; struct tf_fd fd; + struct tf_timeout to; char buf[128]; int bytes = 0, r; const char *req = "GET / HTTP/1.0\r\n\r\n"; @@ -27,7 +28,7 @@ static void ping_fiber(void *ptr) if (r < 0) goto err; - r = tf_timed(tf_connect(&fd, &host), 10000); + r = tf_timed(&to, tf_connect(&fd, &host), 10000); if (r < 0) goto err_close; @@ -49,11 +50,11 @@ int main(int argc, char **argv) struct ctx *c; int i; - tf_scheduler_enable(NULL); + tf_vmach_start(); for (i = 1; i < argc; i++) { c = tf_fiber_create(ping_fiber, sizeof(struct ctx)); c->hostname = argv[i]; - tf_fiber_put(c); + tf_fiber_run(c); } - tf_scheduler_disable(); + tf_vmach_stop(); } diff --git a/test/read.c b/test/read.c index 0dc72cc..9963a62 100644 --- a/test/read.c +++ b/test/read.c @@ -34,8 +34,8 @@ static void io_fiber(void *ptr) int main(int argc, char **argv) { - tf_scheduler_enable(NULL); - tf_fiber_put(tf_fiber_create(time_fiber, 0)); - tf_fiber_put(tf_fiber_create(io_fiber, 0)); - tf_scheduler_disable(); + tf_vmach_start(); + tf_fiber_run(tf_fiber_create(time_fiber, 0)); + tf_fiber_run(tf_fiber_create(io_fiber, 0)); + tf_vmach_stop(); } diff --git a/test/simple1.c b/test/simple1.c index 6f40f8d..b6de289 100644 --- a/test/simple1.c +++ b/test/simple1.c @@ -10,7 +10,7 @@ static void work_fiber(void *ptr) struct ctx *c = (struct ctx*) ptr; printf("Hello%d.1\n", c->id); - tf_fiber_yield(); + tf_msleep(1); printf("Hello%d.2\n", c->id); } @@ -19,11 +19,11 @@ int main(int argc, char **argv) struct ctx *c; int i; - tf_scheduler_enable(NULL); + tf_vmach_start(); for (i = 0; i < 6; i++) { c = tf_fiber_create(work_fiber, sizeof(struct ctx)); c->id = i; - tf_fiber_put(c); + tf_fiber_run(c); } - tf_scheduler_disable(); + tf_vmach_stop(); } diff --git a/test/sleep.c b/test/sleep.c index 8e225a7..dd71d91 100644 --- a/test/sleep.c +++ b/test/sleep.c @@ -21,10 +21,10 @@ int main(int argc, char **argv) struct ctx *c; int i; - tf_scheduler_enable(NULL); + tf_vmach_start(); for (i = 0; i < 1000; i++) { c = tf_fiber_create(work_fiber, sizeof(struct ctx)); - tf_fiber_put(c); + tf_fiber_run(c); } - tf_scheduler_disable(); + tf_vmach_stop(); } |