diff options
| -rw-r--r-- | DESIGN | 77 | ||||
| -rw-r--r-- | include/libtf/atomic.h | 11 | ||||
| -rw-r--r-- | include/libtf/defines.h | 8 | ||||
| -rw-r--r-- | include/libtf/fiber.h | 141 | ||||
| -rw-r--r-- | include/libtf/io.h | 29 | ||||
| -rw-r--r-- | include/libtf/scheduler.h | 67 | ||||
| -rw-r--r-- | include/libtf/tf.h | 2 | ||||
| -rw-r--r-- | include/libtf/vmach.h | 49 | ||||
| -rw-r--r-- | src/TFbuild | 2 | ||||
| -rw-r--r-- | src/fiber.c | 259 | ||||
| -rw-r--r-- | src/ifc.c | 57 | ||||
| -rw-r--r-- | src/io-epoll.c | 122 | ||||
| -rw-r--r-- | src/io-unix.c | 87 | ||||
| -rw-r--r-- | src/scheduler.c | 134 | ||||
| -rw-r--r-- | src/timeout.c | 120 | ||||
| -rw-r--r-- | src/uctx.h | 41 | ||||
| -rw-r--r-- | src/vmach.c | 120 | ||||
| -rw-r--r-- | test/httpget.c | 9 | ||||
| -rw-r--r-- | test/read.c | 8 | ||||
| -rw-r--r-- | test/simple1.c | 8 | ||||
| -rw-r--r-- | test/sleep.c | 6 | 
21 files changed, 719 insertions, 638 deletions
@@ -1,19 +1,72 @@  SCHEDULER -- 4-heap (or 2-heap) with linked nodes -- epoll -- edge-triggered monitoring for file i/o (no syscalls to modify fdset) +- worker thread pool executes read to be run fibers +- thread pool executes fibers from FIFO queues (one per each priority) +- possible to make main thread separate scheduler so we can bind some +  fibers to run in main thread only (not needed?) + +THE BIG SCHEDULING QUEUE (shared queue with all worker threads) + --- works also between machines, in which case we just always queue +     work items to remote vmach (need to send wakeups too) +  - steal first item from the local cache if any +  - mutex lock +  - if stuff to push +        - while idle_vcpu not empty +                - pop idle_vcpu from list +                - push item to idle_vcpu +        - push rest of items to global lists accordingly +        - mutex unlock +        - call futex_wake for all idle_cpu's we stuffed +        - return stolen item +   - if stuff on list +        - pop item from list +        - mutex unlock +        - return popped_item +   - insert self to idle_vcpu list +   - mutex unlock +   - futex_wait on local_pointer +   - return local_pointer +   - mutex unlock + +WAKEUP +  - as minimum keep wakeup bitfield for most common types, and use +    prepare sleep, and sleep primitives with acceptable wakeup reasons +  - might add so that wakupper calls wakeupee to execute their wakeup +    condition check once ask rescheduling based or that or not; this way +    empty spin does not require pushing to scheduler queue and thread +    pool wakeup + +IO SLEEPING/DISPATCHING +- epoll in edge-triggered monitoring for file i/o +- epoll_wait done in separate thread (or possibly as regular fiber) +  which sole purpose is to wakeup threads for thread pool  - signalfd for signal handling -- eventfd for thread pool wakeup +- timerfd for timeout handling + +INTER FIBRE CALLS (IFC) +- sends a callback to be execute under some other fibre +- executed at tf_ifc_process() points, or at tf_exit() time +- might need tfc_ifc_wait() for synchronizing with ifc completion +- sending uses atomic LIFO (single atomic cmpxchg) +- receive does LIFO flush (single atomic xchg) and reverses order(?) +- need way to check if IFC is queued or not (so it's safe to reuse it) + +TIMEOUTS +- one (or more) fibers serving as alarm generators +- 4-heap with mremappable heap array +- receives IFC calls from other threads +- receives io wakeups from timerfd +- sends async signal to fiber after timeout  FIBERS -- timer node on fiber control data (for delayed heapify; and reuse on timeouts)  - fd, signal, pid wait struct on stack of wait function  - fiber_kill can interrupt wait -SCHEDULER <-> THREAD POOL -- scheduler queues to thread pool array fifo queue, -    - semaphore protects threads so no underqueueing happens -    - eventfd notifies scheduler when queue has free space again -    - head/tail updated atomically and item position recovered -- thread pool atomically pushes to resume atomic stack and sets eventfd -    - scheduler thread atomically pops all and updates fiber states +FIBRE WAKE UP QUEUES +- mostly not needed (alarms, io uses async signal wakeups) +- for mutex, semaphores, possibly send io +- use thread mutexes for list access +- wait on single queue only(?) + +FIBRE MUTEX +- simple wakeup queue, sleep on queue, uninterruptible? + diff --git a/include/libtf/atomic.h b/include/libtf/atomic.h index ec9f1e0..8ebe5f8 100644 --- a/include/libtf/atomic.h +++ b/include/libtf/atomic.h @@ -13,7 +13,14 @@  #ifndef TF_ATOMIC_H  #define TF_ATOMIC_H -#define tf_atomic_inc(var) __sync_add_and_fetch(&(var), 1) -#define tf_atomic_dec(var) __sync_add_and_fetch(&(var), -1) +#define tf_atomic_inc(var) \ +	__sync_add_and_fetch(&(var), 1) +#define tf_atomic_dec(var) \ +	__sync_add_and_fetch(&(var), -1) + +#define tf_atomic_cmpxchg(ptr, old, new) \ +	__sync_bool_compare_and_swap(ptr, old, new) +#define tf_atomic_xchg(ptr, new) \ +	((typeof(*(ptr)))__sync_lock_test_and_set(ptr, new))  #endif diff --git a/include/libtf/defines.h b/include/libtf/defines.h index 8e39c7e..44ead8a 100644 --- a/include/libtf/defines.h +++ b/include/libtf/defines.h @@ -57,6 +57,7 @@  #define attribute_warn_unused_result	__attribute__((warn_unused_result))  #define attribute_deprecated		__attribute__((deprecated)) +/* FIXME: glibc fprintf requires 8kB on stack */  #define TF_BUG_ON(cond) if (unlikely(cond)) { \  	fprintf(stderr, "BUG: failure at %s:%d/%s(): %s!\n",	\  		__FILE__, __LINE__, __func__, #cond);		\ @@ -68,14 +69,19 @@  #define TF_EMPTY_ARRAY 0 +#define TF_BIT(n)		(1 << (n)) +  #ifndef TF_STACK_SIZE -#define TF_STACK_SIZE		4096 +/* FIXME: glibc fprintf requires 8kB on stack */ +#define TF_STACK_SIZE		(4*4096)  #endif  /* Monotonic time */  typedef uint32_t tf_mtime_t;  typedef int32_t tf_mtime_diff_t; +tf_mtime_t tf_mtime_now(void); +  static inline  tf_mtime_diff_t tf_mtime_diff(tf_mtime_t a, tf_mtime_t b)  { diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h index d5c6153..f97e963 100644 --- a/include/libtf/fiber.h +++ b/include/libtf/fiber.h @@ -16,64 +16,127 @@  #include <errno.h>  #include <libtf/defines.h>  #include <libtf/heap.h> +#include <libtf/list.h> -/* Fiber wakeup reasons */ -#define TF_WAKEUP_NONE		0 -#define TF_WAKEUP_IMMEDIATE	-EAGAIN -#define TF_WAKEUP_KILL		-EINTR -#define TF_WAKEUP_TIMEOUT	-ETIME -#define TF_WAKEUP_THIS_TIMEOUT	-ETIMEDOUT -#define TF_WAKEUP_FD		-EIO +/* Inter-fibre calls */ +struct tf_fiber; +struct tf_ifc; +typedef void (*tf_ifc_handler_t)(void *fiber, struct tf_ifc *msg); -/* Fiber management */ -struct tf_scheduler; -typedef void (*tf_fiber_proc)(void *fiber); +struct tf_ifc { +	union { +		struct tf_ifc		*next; +		struct tf_list_node	list; +	}; +	tf_ifc_handler_t		handler; +	struct tf_fiber *		sender; +}; -void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size); -void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size); -void *tf_fiber_get(void *data); -void tf_fiber_put(void *data); -void __tf_fiber_wakeup(void *data, int wakeup_type); -void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node); -int  __tf_fiber_schedule(void); -int  __tf_fiber_bind_scheduler(struct tf_scheduler *sched); -int  __tf_fiber_release_scheduler(struct tf_scheduler *sched); +void tf_ifc_queue(void *fiber, struct tf_ifc *msg, tf_ifc_handler_t handler); +void tf_ifc_complete(void *fiber, struct tf_ifc *msg, tf_ifc_handler_t handler); +void tf_ifc_process(void); +void tf_ifc_process_unordered(void); -void tf_fiber_exit(void) attribute_noreturn; -void tf_fiber_kill(void *fiber); -int  tf_fiber_yield(void); +/* Timeouts */ +struct tf_timeout_manager { +	struct tf_heap_head		heap; +	unsigned int			num_fibers; +}; + +struct tf_timeout_client { +	struct tf_timeout_manager *	manager; +	tf_mtime_t			value; +	tf_mtime_t			latched; +	struct tf_heap_node		heap_node; +	struct tf_ifc			ifc; +}; -/* Scheduling and fiber management */  struct tf_timeout { -	tf_mtime_t	saved_timeout; -	unsigned int	timeout_change; +	tf_mtime_t			saved_timeout; +	tf_mtime_t			my_timeout;  }; -#define tf_timed(func, timeout)						\ +static inline int tf_timeout_expired(struct tf_timeout *to) +{ +	return tf_mtime_diff(to->my_timeout, tf_mtime_now()) <= 0; +} + +struct tf_timeout_fiber *tf_timeout_fiber_create(void); +void tf_timeout_adjust(struct tf_timeout_client *tc); +void tf_timeout_delete(struct tf_timeout_client *tc); + +#define tf_timed(__to, func, timeout)					\  	({								\ -		struct tf_timeout __timeout;				\ -		tf_timeout_push(&__timeout, timeout);			\ -		tf_timeout_pop(&__timeout, (func));			\ +		tf_timeout_push(__to, timeout);				\ +		tf_timeout_pop(__to, (func));				\  	}) -void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds); -int __tf_timeout_pop(struct tf_timeout *timeout, int err); +/* Fibres and their management */ +typedef void *tf_uctx_t; + +struct tf_fiber { +	unsigned int		ref_count; +	unsigned int		flags; +	tf_uctx_t		context; +	tf_uctx_t		return_context; +	struct tf_list_node	queue_node; +	struct tf_list_head	wakeup_q; +	struct tf_timeout_client timeout; +	struct tf_ifc *		pending_ifc; +	char			data[TF_EMPTY_ARRAY]; +}; + +typedef void (*tf_fiber_proc)(void *fiber); + +void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size); +void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size); +void *tf_fiber_get(void *fiber); +void tf_fiber_put(void *fiber); +int tf_fiber_run(void *fiber); +void tf_fiber_kill(void *fiber); + +void tf_fiber_wakeup(struct tf_fiber *fiber); +int  tf_fiber_schedule(void); -static inline int tf_timeout_pop(struct tf_timeout *timeout, int err) +void tf_fiber_exit(void) attribute_noreturn; + +static inline struct tf_fiber *tf_vmach_get_current_fiber(void)  { -	if (unlikely(timeout->timeout_change)) -		return __tf_timeout_pop(timeout, err); +	extern __thread struct tf_fiber *tf_current_fiber; +	return tf_current_fiber; +} + +static inline void tf_timeout_push(struct tf_timeout *to, tf_mtime_diff_t ms) +{ +	struct tf_fiber *f = tf_vmach_get_current_fiber(); + +	to->saved_timeout = f->timeout.value; +	to->my_timeout = tf_mtime_now() + ms; +	if (f->timeout.value == 0 || +	    tf_mtime_diff(to->my_timeout, f->timeout.value) < 0) +		f->timeout.value = to->my_timeout; +} + +static inline int tf_timeout_pop(struct tf_timeout *to, int err) +{ +	struct tf_fiber *f = tf_vmach_get_current_fiber(); + +	f->timeout.value = to->saved_timeout;  	return err;  }  static inline -int tf_msleep(tf_mtime_diff_t milliseconds) +int tf_msleep(tf_mtime_diff_t ms)  { +	struct tf_timeout to;  	int r; -	r = tf_timed(__tf_fiber_schedule(), milliseconds); -	if (r == TF_WAKEUP_THIS_TIMEOUT) -		r = 0; -	return r; + +	tf_timeout_push(&to, ms); +	do { +		r = tf_fiber_schedule(); +	} while (r != -ETIME); + +	return tf_timeout_pop(&to, r);  }  #endif diff --git a/include/libtf/io.h b/include/libtf/io.h index 0d34421..d1098e3 100644 --- a/include/libtf/io.h +++ b/include/libtf/io.h @@ -21,10 +21,12 @@  #include <libtf/defines.h>  /* Flags for tf_open_fd() */ -#define TF_FD_AUTOCLOSE			1 -#define TF_FD_STREAM_ORIENTED		2 -#define TF_FD_SET_CLOEXEC		4 -#define TF_FD_ALREADY_NONBLOCKING	8 +#define TF_FD_READ			TF_BIT(0) +#define TF_FD_WRITE			TF_BIT(1) +#define TF_FD_AUTOCLOSE			TF_BIT(2) +#define TF_FD_STREAM_ORIENTED		TF_BIT(3) +#define TF_FD_SET_CLOEXEC		TF_BIT(4) +#define TF_FD_ALREADY_NONBLOCKING	TF_BIT(5)  struct tf_sockaddr {  	union { @@ -37,23 +39,14 @@ struct tf_sockaddr {  struct tf_fd {  	int fd;  	unsigned int flags; -	/* Single waiter -- would be relatively trivial to modify to allow -	 * multiple waiters, if someone actually needs it */ -	unsigned int events; -	void *waiting_fiber; +	struct tf_fiber *fiber;  }; -#define TF_POLL_READ	1 -#define TF_POLL_WRITE	2 -  struct tf_poll_hooks { -	void (*init)(void); -	int  (*poll)(tf_mtime_diff_t timeout); -	void (*close)(void); -	int  (*fd_created)(struct tf_fd *fd); -	int  (*fd_destroyed)(struct tf_fd *fd); -	void (*fd_monitor)(struct tf_fd *fd, int events); -	void (*fd_unmonitor)(struct tf_fd *fd); +	void *	(*create)(void); +	int	(*fd_created)(void *fiber, struct tf_fd *fd); +	int	(*fd_destroyed)(void *fiber, struct tf_fd *fd); +	void	(*fd_rearm)(void *fiber, struct tf_fd *fd);  };  int tf_open_fd(struct tf_fd *fd, int kfd, int flags); diff --git a/include/libtf/scheduler.h b/include/libtf/scheduler.h deleted file mode 100644 index db5a823..0000000 --- a/include/libtf/scheduler.h +++ /dev/null @@ -1,67 +0,0 @@ -/* scheduler.h - libtf fiber scheduler header - * - * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi> - * All rights reserved. - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 or later as - * published by the Free Software Foundation. - * - * See http://www.gnu.org/ for details. - */ - -#ifndef TF_SCHEDULER_H -#define TF_SCHEDULER_H - -#include <libtf/atomic.h> -#include <libtf/list.h> -#include <libtf/heap.h> -#include <libtf/fiber.h> - -struct tf_poll_hooks; - -struct tf_scheduler { -	struct tf_list_head	scheduled_q; -	struct tf_list_head	running_q; -	struct tf_heap_head	heap; -	void *			active_fiber; -	void *			main_fiber; -	int			num_fibers; -	tf_mtime_t		scheduler_time; -	struct tf_poll_hooks *	poller; -	unsigned long		poll_data[2]; -}; - -static inline -struct tf_scheduler *tf_scheduler_get_current(void) -{ -	extern struct tf_scheduler *__tf_scheduler; -	TF_BUG_ON(__tf_scheduler == NULL); -	return __tf_scheduler; -} - -static inline -tf_mtime_t tf_scheduler_get_mtime(void) -{ -	return tf_scheduler_get_current()->scheduler_time; -} - -struct tf_scheduler *tf_scheduler_create(void); -int  tf_scheduler_enable(struct tf_scheduler *); -void tf_scheduler_disable(void); - -static inline struct tf_scheduler * -tf_scheduler_get(struct tf_scheduler *s) -{ -	tf_fiber_get(s); -	return s; -} - -static inline void -tf_scheduler_put(struct tf_scheduler *s) -{ -	tf_fiber_put(s); -} - -#endif - diff --git a/include/libtf/tf.h b/include/libtf/tf.h index e613f18..d052ee7 100644 --- a/include/libtf/tf.h +++ b/include/libtf/tf.h @@ -14,7 +14,7 @@  #define TF_H  #include <libtf/fiber.h> -#include <libtf/scheduler.h> +#include <libtf/vmach.h>  #include <libtf/io.h>  #endif diff --git a/include/libtf/vmach.h b/include/libtf/vmach.h new file mode 100644 index 0000000..d302366 --- /dev/null +++ b/include/libtf/vmach.h @@ -0,0 +1,49 @@ +/* vmach.h - "virtual" machine and cpu contexts + * + * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#ifndef TF_VMACH_H +#define TF_VMACH_H + +#include <libtf/fiber.h> + +struct tf_vmach { +	struct tf_poll_hooks *		poll_ops; +	void *				poll_fiber; +	void *				timeout_fiber; +	struct tf_fiber			startup_fiber; +	int				num_user_fibers; +	struct tf_list_head		run_q; +}; + +struct tf_vcpu { +	struct tf_vmach *		machine; +}; + +static inline struct tf_vmach *tf_vmach_get_current(void) +{ +	extern __thread struct tf_vmach *tf_current_vmach; +	return tf_current_vmach; +} + +static inline struct tf_vcpu *tf_vmach_get_current_cpu(void) +{ +	extern __thread struct tf_vcpu *tf_current_vcpu; +	return tf_current_vcpu; +} + +void tf_vmach_start(void); +void tf_vmach_stop(void); + +void tf_vmach_run(struct tf_vmach *vm, struct tf_fiber *f); +void tf_vmach_run_dedicated(struct tf_vmach *vm, struct tf_fiber *f); + +#endif diff --git a/src/TFbuild b/src/TFbuild index 9277f9a..3ffa9ef 100644 --- a/src/TFbuild +++ b/src/TFbuild @@ -1,6 +1,6 @@  libs-y			+= libtf -libtf-objs-y		+= fiber.o scheduler.o heap.o +libtf-objs-y		+= fiber.o heap.o timeout.o vmach.o ifc.o  libtf-objs-$(OS_LINUX)	+= io-epoll.o mem-mmap.o  libtf-objs-$(OS_UNIX)	+= io-unix.o diff --git a/src/fiber.c b/src/fiber.c index e507815..bef7b81 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -13,31 +13,13 @@  #include <time.h>  #include <errno.h>  #include <unistd.h> +#include <libtf/atomic.h>  #include <libtf/fiber.h> -#include <libtf/scheduler.h> +#include <libtf/vmach.h>  #include "uctx.h" -#define TF_TIMEOUT_CHANGE_NEEDED			1 -#define TF_TIMEOUT_CHANGE_NEW_VALUE			2 - -struct tf_fiber { -	unsigned int		ref_count; -	struct tf_scheduler *	scheduler; -	int			wakeup_type; -	unsigned int		timeout_change; -	tf_mtime_t		timeout; -	struct tf_list_node	queue_node; -	struct tf_heap_node	heap_node; -	struct tf_uctx		context; -	char			data[TF_EMPTY_ARRAY]; -}; - -static inline -struct tf_fiber *tf_fiber_get_current(void) -{ -	void *data = tf_scheduler_get_current()->active_fiber; -	return container_of(data, struct tf_fiber, data); -} +#define TF_FIBERF_RUNNING		TF_BIT(0) +#define TF_FIBERF_WAKEUP_PENDING	TF_BIT(1)  static void tf_fiber_main(void *user_data, void *arg)  { @@ -48,45 +30,46 @@ static void tf_fiber_main(void *user_data, void *arg)  	tf_fiber_exit();  } -void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size) +void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)  { -	struct tf_fiber *fiber; +	struct tf_fiber *self = tf_vmach_get_current_fiber(); +	struct tf_fiber *f; -	fiber = tf_uctx_create_embedded( +	f = tf_uctx_create_embedded(  		TF_STACK_SIZE,  		sizeof(struct tf_fiber) + private_size,  		offsetof(struct tf_fiber, context),  		tf_fiber_main, fiber_main); -	if (fiber == NULL) +	if (f == NULL)  		return NULL; -	*fiber = (struct tf_fiber) { +	*f = (struct tf_fiber) {  		.ref_count = 1, -		.queue_node = TF_LIST_INITIALIZER(fiber->queue_node), -		.context = fiber->context, +		.queue_node = TF_LIST_INITIALIZER(f->queue_node), +		.context = f->context, +		.timeout.manager = self ? self->timeout.manager : NULL, +		.wakeup_q = TF_LIST_HEAD_INITIALIZER(f->wakeup_q),  	}; -	return fiber->data; +	return f->data;  } -void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size) +int tf_fiber_run(void *fiber)  { -	struct tf_fiber *fiber; -	struct tf_scheduler *sched; - -	sched = tf_scheduler_get_current(); -	if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0) -		return NULL; +	struct tf_timeout_manager *tm; +	struct tf_fiber *f = container_of(fiber, struct tf_fiber, data); -	fiber = container_of(__tf_fiber_create(fiber_main, private_size), -			     struct tf_fiber, data); -	sched->num_fibers++; +	tm = f->timeout.manager; +	if (tm != NULL) { +		if (tf_heap_prealloc(&tm->heap, tm->num_fibers + 1) < 0) +			return -ENOMEM; +		tm->num_fibers++; +	} -	fiber->scheduler = sched; -	fiber->wakeup_type = TF_WAKEUP_NONE; -	tf_list_add_tail(&fiber->queue_node, &sched->scheduled_q); +	tf_fiber_wakeup(f); +	tf_vmach_get_current()->num_user_fibers++; -	return tf_fiber_get(fiber->data); +	return 0;  }  void *tf_fiber_get(void *data) @@ -98,23 +81,8 @@ void *tf_fiber_get(void *data)  static void __tf_fiber_destroy(struct tf_fiber *fiber)  { -	struct tf_scheduler *sched = fiber->scheduler; -	int main_fiber, num_fibers; - -	/* decrease first the number of fibers as we might be -	 * killing the scheduler it self */ -	num_fibers = --sched->num_fibers; - -	main_fiber = (fiber->context.alloc == NULL); -	tf_heap_delete(&fiber->heap_node, &sched->heap); -	tf_uctx_destroy(&fiber->context); -	if (main_fiber) -		free(fiber); - -	if (num_fibers == 1) { -		/* FIXME: Use proper fiber event*/ -		__tf_fiber_wakeup(sched->main_fiber, TF_WAKEUP_IMMEDIATE); -	} +	tf_heap_delete(&fiber->timeout.heap_node, &fiber->timeout.manager->heap); +	tf_uctx_destroy(fiber->context);  }  void tf_fiber_put(void *data) @@ -124,165 +92,60 @@ void tf_fiber_put(void *data)  		__tf_fiber_destroy(fiber);  } -void __tf_fiber_wakeup(void *data, int wakeup_type) +void tf_fiber_wakeup(struct tf_fiber *f)  { -	struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); -	struct tf_scheduler *sched = fiber->scheduler; +	struct tf_fiber *self = tf_vmach_get_current_fiber(); +	unsigned int newval, oldval; -	if (fiber->wakeup_type == TF_WAKEUP_NONE) { -		fiber->wakeup_type = wakeup_type; -		tf_list_add_tail(&fiber->queue_node, &sched->running_q); -	} -} +	do { +		oldval = f->flags; +		if (oldval & TF_FIBERF_WAKEUP_PENDING) +			return; +		newval = oldval | TF_FIBERF_WAKEUP_PENDING | TF_FIBERF_RUNNING; +	} while (!tf_atomic_cmpxchg(&f->flags, oldval, newval)); -void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node) -{ -	__tf_fiber_wakeup(container_of(node, struct tf_fiber, heap_node)->data, -			  TF_WAKEUP_TIMEOUT); +	if (!(oldval & TF_FIBERF_RUNNING)) +		tf_list_add_tail(&f->queue_node, &self->wakeup_q);  } -int __tf_fiber_schedule(void) +int tf_fiber_schedule(void)  { -	struct tf_scheduler *sched = tf_scheduler_get_current(); -	struct tf_fiber *f = tf_fiber_get_current(), *nf; -	int wakeup; +	struct tf_fiber *f = tf_vmach_get_current_fiber(); -	if (unlikely(f->timeout_change)) { -		if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) { -			if (tf_mtime_diff(f->timeout, tf_scheduler_get_mtime()) <= 0) { -				f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; -				return TF_WAKEUP_TIMEOUT; -			} -			tf_heap_change(&f->heap_node, &sched->heap, f->timeout); -		} else -			tf_heap_delete(&f->heap_node, &sched->heap); -		f->timeout_change = 0; -	} +	if (f->timeout.value != 0 && +	    tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0) +		return -ETIME; -	/* Figure out the next fibre to run */ -	if (unlikely(tf_list_empty(&sched->scheduled_q))) { -		tf_list_splice_tail(&sched->running_q, -				    &sched->scheduled_q); -		TF_BUG_ON(tf_list_empty(&sched->scheduled_q)); +	if (f->flags & TF_FIBERF_WAKEUP_PENDING) { +		f->flags = TF_FIBERF_RUNNING; +		return 0;  	} -	nf = tf_list_entry(tf_list_pop(&sched->scheduled_q), -			   struct tf_fiber, queue_node); -	sched->active_fiber = nf->data; -	tf_uctx_transfer(&f->context, &nf->context); - -	wakeup = f->wakeup_type; -	f->wakeup_type = TF_WAKEUP_NONE; - -	return wakeup; -} - -int __tf_fiber_bind_scheduler(struct tf_scheduler *sched) -{ -	struct tf_fiber *f; -	f = malloc(sizeof(struct tf_fiber)); -	if (f == NULL) -		return -ENOMEM; +	if (f->timeout.value != f->timeout.latched) +		tf_timeout_adjust(&f->timeout); -	/* Mark currently active main fiber as active */ -	*f = (struct tf_fiber) { -		.ref_count = 1, -		.scheduler = sched, -		.queue_node = TF_LIST_INITIALIZER(f->queue_node), -	}; -	tf_uctx_create_self(&f->context); -	sched->main_fiber = f->data; -	sched->active_fiber = f->data; -	sched->num_fibers++; +	f->flags = 0; +	tf_uctx_transfer(f->context, f->return_context); +	f->flags = TF_FIBERF_RUNNING; -	/* Schedule scheduler fiber */ -	f = container_of((void *) sched, struct tf_fiber, data); -	f->scheduler = sched; -	f->wakeup_type = TF_WAKEUP_IMMEDIATE; -	tf_list_add_tail(&f->queue_node, &sched->running_q); - -	return 0; -} - -int __tf_fiber_release_scheduler(struct tf_scheduler *sched) -{ -	struct tf_fiber *f; - -	/* Detach scheduler */ -	f = container_of((void *) sched, struct tf_fiber, data); -	tf_list_del(&f->queue_node); - -	/* Detach main stack from this scheduler */ -	f = container_of((void *) sched->main_fiber, struct tf_fiber, data); -	tf_fiber_put(sched->main_fiber); -	sched->main_fiber = NULL; -	sched->num_fibers--; +	if (f->timeout.value != 0 && +	    tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0) +		return -ETIME;  	return 0;  }  void tf_fiber_exit(void)  { -	struct tf_scheduler *sched = tf_scheduler_get_current(); -	struct tf_fiber *f = tf_fiber_get_current(); -	struct tf_fiber *schedf = container_of((void *) sched, struct tf_fiber, data); +	struct tf_fiber *f = tf_vmach_get_current_fiber(); -	tf_heap_delete(&f->heap_node, &sched->heap); -	schedf->wakeup_type = TF_WAKEUP_KILL; -	tf_uctx_transfer(&f->context, &schedf->context); +	if (f->timeout.manager != NULL) +		tf_timeout_delete(&f->timeout); +	tf_vmach_get_current()->num_user_fibers--; +	tf_uctx_transfer(f->context, f->return_context);  	TF_BUG_ON(1);  }  void tf_fiber_kill(void *fiber)  {  } - -int tf_fiber_yield(void) -{ -	struct tf_scheduler *sched = tf_scheduler_get_current(); -	struct tf_fiber *f = tf_fiber_get_current(); - -	TF_BUG_ON(tf_list_hashed(&f->queue_node)); -	f->wakeup_type = TF_WAKEUP_IMMEDIATE; -	tf_list_add_tail(&f->queue_node, &sched->running_q); - -	return __tf_fiber_schedule(); -} - -void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds) -{ -	struct tf_fiber *f = tf_fiber_get_current(); -	tf_mtime_t abs = tf_scheduler_get_mtime() + milliseconds; -	int active; - -	if (f->timeout_change) -		active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE); -	else -		active = tf_heap_node_active(&f->heap_node); - -	if (!active || tf_mtime_diff(abs, f->timeout) < 0) { -		/* Save previous timeout */ -		timeout->saved_timeout = f->timeout; -		timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; -		if (active) -			timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE; - -		/* Make new timeout pending */ -		f->timeout = abs; -		f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED -				  | TF_TIMEOUT_CHANGE_NEW_VALUE; -	} else { -		timeout->timeout_change = 0; -	} -} - -int __tf_timeout_pop(struct tf_timeout *timeout, int err) -{ -	struct tf_fiber *f = tf_fiber_get_current(); - -	f->timeout = timeout->saved_timeout; -	f->timeout_change = timeout->timeout_change; -	if (err == TF_WAKEUP_TIMEOUT) -		err = TF_WAKEUP_THIS_TIMEOUT; -	return err; -} diff --git a/src/ifc.c b/src/ifc.c new file mode 100644 index 0000000..2dfafe8 --- /dev/null +++ b/src/ifc.c @@ -0,0 +1,57 @@ +/* ifc.c - inter fiber communications + * + * Copyright (C) 2010 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#include <libtf/atomic.h> +#include <libtf/fiber.h> + +void tf_ifc_queue(void *fiber, struct tf_ifc *ifc, tf_ifc_handler_t handler) +{ +	struct tf_fiber *f = container_of(fiber, struct tf_fiber, data); +	struct tf_ifc *old; + +	TF_BUG_ON(ifc->next != NULL); +	ifc->handler = handler; +	do { +		old = f->pending_ifc; +		ifc->next = old; +	} while (!tf_atomic_cmpxchg(&f->pending_ifc, old, ifc)); + +	tf_fiber_wakeup(f); +} + +void tf_ifc_complete(void *fiber, struct tf_ifc *ifc, tf_ifc_handler_t handler) +{ +	ifc->sender = tf_vmach_get_current_fiber(); +	tf_ifc_queue(fiber, ifc, handler); +	while (ifc->sender != NULL) +		tf_fiber_schedule(); +} + +void tf_ifc_process_unordered(void) +{ +	struct tf_fiber *f = tf_vmach_get_current_fiber(), *s; +	struct tf_ifc *pending, *ifc; + +	while (f->pending_ifc != NULL) { +		pending = tf_atomic_xchg(&f->pending_ifc, NULL); +		while (pending) { +			ifc = pending; +			pending = ifc->next; +			ifc->handler(f->data, ifc); +			s = ifc->sender; +			ifc->next = NULL; +			ifc->sender = NULL; +			if (s != NULL) +				tf_fiber_wakeup(s); +		} +	} +} diff --git a/src/io-epoll.c b/src/io-epoll.c index 32aa090..1fc9ca1 100644 --- a/src/io-epoll.c +++ b/src/io-epoll.c @@ -16,74 +16,70 @@  #include <sys/epoll.h>  #include <libtf/io.h> -#include <libtf/scheduler.h> +#include <libtf/fiber.h> -struct tf_poll_data { +struct tf_epoll_data {  	int			epoll_fd; -	int			num_waiters;  }; -static struct tf_poll_data *tf_epoll_get_data(void) +static void tf_epoll_main(void *ctx)  { -	struct tf_scheduler *sched = tf_scheduler_get_current(); -	TF_BUILD_BUG_ON(sizeof(struct tf_poll_data) > sizeof(sched->poll_data)); -	return (struct tf_poll_data *) &sched->poll_data; -} - -static void tf_epoll_init(void) -{ -	struct tf_poll_data *pd = tf_epoll_get_data(); +	struct tf_epoll_data *pd = ctx; +	struct epoll_event events[64]; +	struct tf_fd *fd; +	int r, i; -	pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC); -	pd->num_waiters = 0; -	TF_BUG_ON(pd->epoll_fd < 0); -} +	do { +		r = epoll_wait(pd->epoll_fd, events, array_size(events), 0); +		if (r == 0) { +			/* FIXME: yielding is bad */ +			struct tf_fiber *self = tf_vmach_get_current_fiber(); +			tf_list_add_tail(&self->queue_node, &self->wakeup_q); +			if (tf_fiber_schedule() == 0) +				continue; +		} -static void tf_epoll_close(void) -{ -	struct tf_poll_data *pd = tf_epoll_get_data(); +		for (i = 0; i < r; i++) { +			fd = (struct tf_fd *) events[i].data.ptr; +			tf_fiber_wakeup(fd->fiber); +		} +	} while (1);  	close(pd->epoll_fd);  } -static int tf_epoll_poll(tf_mtime_diff_t timeout) + +static void *tf_epoll_create(void)  { -	struct tf_poll_data *pd = tf_epoll_get_data(); -	struct epoll_event events[64]; -	struct tf_fd *fd; -	int r, i, ret; +	struct tf_epoll_data *d; -	if (timeout == 0 && pd->num_waiters == 0) -		return TF_WAKEUP_TIMEOUT; +	d = tf_fiber_create(tf_epoll_main, sizeof(struct tf_epoll_data)); +	if (d == NULL) +		return NULL; -	ret = TF_WAKEUP_TIMEOUT; -	do { -		r = epoll_wait(pd->epoll_fd, events, array_size(events), timeout); -		if (r == 0) -			break; +	d->epoll_fd = epoll_create1(EPOLL_CLOEXEC); +	TF_BUG_ON(d->epoll_fd < 0); -		for (i = 0; i < r; i++) { -			fd = (struct tf_fd *) events[i].data.ptr; -			if (likely(fd->events & events[i].events)) -				__tf_fiber_wakeup(fd->waiting_fiber, TF_WAKEUP_FD); -		} -		ret = TF_WAKEUP_FD; -		timeout = 0; -	} while (unlikely(r == array_size(events))); +	tf_fiber_run(tf_fiber_get(d)); -	return ret; +	return d;  } -static int tf_epoll_fd_created(struct tf_fd *fd) +static int tf_epoll_fd_created(void *fiber, struct tf_fd *fd)  { -	struct tf_poll_data *pd = tf_epoll_get_data(); +	struct tf_epoll_data *d = fiber;  	struct epoll_event ev;  	int r;  	ev = (struct epoll_event) { -		.events = EPOLLIN | EPOLLOUT | EPOLLET, +		.events = EPOLLET,  		.data.ptr = fd,  	}; -	r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); +	if (fd->flags & TF_FD_READ) +		ev.events |= EPOLLIN; +	if (fd->flags & TF_FD_WRITE) +		ev.events |= EPOLLOUT; + +	r = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);  	if (unlikely(r < 0)) {  		TF_BUG_ON(errno == EEXIST);  		r = -errno; @@ -93,46 +89,18 @@ static int tf_epoll_fd_created(struct tf_fd *fd)  	return 0;  } -static int tf_epoll_fd_destroyed(struct tf_fd *fd) +static int tf_epoll_fd_destroyed(void *fiber, struct tf_fd *fd)  { -	struct tf_poll_data *pd = tf_epoll_get_data(); +	struct tf_epoll_data *d = fiber; -	if (fd->flags & TF_FD_AUTOCLOSE) -		return 0; +	if (!(fd->flags & TF_FD_AUTOCLOSE)) +		epoll_ctl(d->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); -	epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);  	return 0;  } -static void tf_epoll_fd_monitor(struct tf_fd *fd, int events) -{ -	struct tf_poll_data *pd = tf_epoll_get_data(); - -	TF_BUG_ON(fd->waiting_fiber != NULL); -	fd->events = EPOLLERR | EPOLLHUP; -	if (events & TF_POLL_READ) -		fd->events |= EPOLLIN; -	if (events & TF_POLL_WRITE) -		fd->events |= EPOLLOUT; -	fd->waiting_fiber = tf_scheduler_get_current()->active_fiber; -	pd->num_waiters++; -} - -static void tf_epoll_fd_unmonitor(struct tf_fd *fd) -{ -	struct tf_poll_data *pd = tf_epoll_get_data(); - -	fd->waiting_fiber = NULL; -	fd->events = 0; -	pd->num_waiters--; -} -  struct tf_poll_hooks tf_epoll_hooks = { -	.init = tf_epoll_init, -	.close = tf_epoll_close, -	.poll = tf_epoll_poll, +	.create = tf_epoll_create,  	.fd_created = tf_epoll_fd_created,  	.fd_destroyed = tf_epoll_fd_destroyed, -	.fd_monitor = tf_epoll_fd_monitor, -	.fd_unmonitor = tf_epoll_fd_unmonitor,  }; diff --git a/src/io-unix.c b/src/io-unix.c index d80592a..7cd4fd0 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -17,10 +17,8 @@  #include <string.h>  #include <libtf/io.h> -#include <libtf/scheduler.h> - -#define TF_POLL_HOOKS \ -struct tf_poll_hooks *poller = tf_scheduler_get_current()->poller; +#include <libtf/fiber.h> +#include <libtf/vmach.h>  static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)  { @@ -38,14 +36,14 @@ static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)  int tf_open_fd(struct tf_fd *fd, int kfd, int flags)  { -	TF_POLL_HOOKS; +	struct tf_vmach *vm = tf_vmach_get_current();  	int r;  	fd->fd = kfd;  	fd->flags = flags; -	fd->waiting_fiber = NULL; +	fd->fiber = tf_vmach_get_current_fiber(); -	r = poller->fd_created(fd); +	r = vm->poll_ops->fd_created(vm->poll_fiber, fd);  	if (r < 0) {  		if (flags & TF_FD_AUTOCLOSE)  			close(kfd); @@ -88,10 +86,10 @@ int tf_open(struct tf_fd *fd, const char *pathname, int flags)  int tf_close(struct tf_fd *fd)  { -	TF_POLL_HOOKS; +	struct tf_vmach *vm = tf_vmach_get_current();  	int r; -	poller->fd_destroyed(fd); +	vm->poll_ops->fd_destroyed(vm->poll_fiber, fd);  	if (fd->flags & TF_FD_AUTOCLOSE) {  		r = close(fd->fd);  		if (unlikely(r == -1)) @@ -104,11 +102,9 @@ int tf_close(struct tf_fd *fd)  int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)  { -	TF_POLL_HOOKS;  	ssize_t n; -	int r; +	int r = 0; -	poller->fd_monitor(fd, TF_POLL_READ);  	do {  		n = read(fd->fd, buf, count);  		if (n == count) { @@ -132,20 +128,17 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)  				continue;  		} -		r = __tf_fiber_schedule(); -	} while (r == TF_WAKEUP_FD); -	poller->fd_unmonitor(fd); +		r = tf_fiber_schedule(); +	} while (r != 0);  	return -r;  }  int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)  { -	TF_POLL_HOOKS;  	ssize_t n; -	int r; +	int r = 0; -	poller->fd_monitor(fd, TF_POLL_WRITE);  	do {  		n = write(fd->fd, buf, count);  		if (n == count) { @@ -166,19 +159,16 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)  				continue;  		} -		r = __tf_fiber_schedule(); -	} while (r == TF_WAKEUP_FD); -	poller->fd_unmonitor(fd); +		r = tf_fiber_schedule(); +	} while (r != 0);  	return -r;  }  ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)  { -	TF_POLL_HOOKS;  	ssize_t n; -	poller->fd_monitor(fd, TF_POLL_READ);  	do {  		n = read(fd->fd, buf, count);  		if (n >= 0) @@ -189,19 +179,16 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)  			n = -errno;  			break;  		} -		n = __tf_fiber_schedule(); -	} while (n == TF_WAKEUP_FD); -	poller->fd_unmonitor(fd); +		n = tf_fiber_schedule(); +	} while (n != 0);  	return n;  }  ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)  { -	TF_POLL_HOOKS;  	ssize_t n; -	poller->fd_monitor(fd, TF_POLL_WRITE);  	do {  		n = write(fd->fd, buf, count);  		if (n >= 0) @@ -212,9 +199,8 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)  			n = -errno;  			break;  		} -		n = __tf_fiber_schedule(); -	} while (n == TF_WAKEUP_FD); -	poller->fd_unmonitor(fd); +		n = tf_fiber_schedule(); +	} while (n != 0);  	return n;  } @@ -261,7 +247,6 @@ int tf_listen(struct tf_fd *fd, int backlog)  int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,  	      struct tf_sockaddr *from)  { -	TF_POLL_HOOKS;  	int r, tfdf;  	struct sockaddr *addr = NULL;  	socklen_t al, *pal = NULL; @@ -272,24 +257,19 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,  		pal  = &al;  	} -	tfdf  = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED); -	tfdf |= TF_FD_SET_CLOEXEC; +	tfdf = TF_FD_AUTOCLOSE | TF_FD_ALREADY_NONBLOCKING | +	       (listen_fd->flags & TF_FD_STREAM_ORIENTED); -	poller->fd_monitor(listen_fd, TF_POLL_READ);  	do { -		/* FIXME: use accept4 if available */ -		r = accept(listen_fd->fd, addr, pal); +		r = accept4(listen_fd->fd, addr, pal, SOCK_NONBLOCK|SOCK_CLOEXEC);  		if (r >= 0)  			break;  		if (errno == EINTR)  			continue; -		if (errno != EAGAIN) { -			poller->fd_unmonitor(listen_fd); +		if (errno != EAGAIN)  			return -errno; -		} -		r = __tf_fiber_schedule(); -	} while (r == TF_WAKEUP_FD); -	poller->fd_unmonitor(listen_fd); +		r = tf_fiber_schedule(); +	} while (r != 0);  	if (r < 0)  		return r; @@ -298,7 +278,6 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,  int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)  { -	TF_POLL_HOOKS;  	socklen_t l = sizeof(int);  	int r, err; @@ -310,10 +289,8 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)  		return -errno;  	/* Wait for socket to become readable */ -	poller->fd_monitor(fd, TF_POLL_WRITE); -	r = __tf_fiber_schedule(); -	poller->fd_unmonitor(fd); -	if (r != TF_WAKEUP_FD) +	r = tf_fiber_schedule(); +	if (r != 0)  		return r;  	/* Check for error */ @@ -327,7 +304,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd,  		   struct tf_sockaddr *to,  		   void *buf, size_t len)  { -	TF_POLL_HOOKS;  	struct iovec iov;  	struct msghdr msg;  	struct cmsghdr *cmsg; @@ -347,7 +323,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd,  		.msg_iovlen	= 1,  	}; -	poller->fd_monitor(fd, TF_POLL_READ);  	do {  		r = recvmsg(fd->fd, &msg, 0);  		if (r >= 0) @@ -356,9 +331,8 @@ ssize_t tf_recvmsg(struct tf_fd *fd,  			r = -errno;  			break;  		} -		r = __tf_fiber_schedule(); -	} while (r == TF_WAKEUP_FD); -	poller->fd_unmonitor(fd); +		r = tf_fiber_schedule(); +	} while (r != 0);  	if (r < 0 || to == NULL)  		return r; @@ -382,7 +356,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd,  		   const struct tf_sockaddr *to,  		   const void *buf, size_t len)  { -	TF_POLL_HOOKS;  	struct msghdr msg;  	struct iovec iov;  	struct { @@ -411,7 +384,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd,  		msg.msg_controllen	= sizeof(cmsg);  	} -	poller->fd_monitor(fd, TF_POLL_WRITE);  	do {  		r = sendmsg(fd->fd, &msg, 0);  		if (r >= 0) @@ -420,9 +392,8 @@ ssize_t tf_sendmsg(struct tf_fd *fd,  			r = -errno;  			break;  		} -		r = __tf_fiber_schedule(); -	} while (r == TF_WAKEUP_FD); -	poller->fd_unmonitor(fd); +		r = tf_fiber_schedule(); +	} while (r != 0);  	return r;  } diff --git a/src/scheduler.c b/src/scheduler.c deleted file mode 100644 index a103d0a..0000000 --- a/src/scheduler.c +++ /dev/null @@ -1,134 +0,0 @@ -/* scheduler.c - fiber scheduling - * - * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi> - * All rights reserved. - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 or later as - * published by the Free Software Foundation. - * - * See http://www.gnu.org/ for details. - */ - -#include <time.h> -#include <libtf/scheduler.h> -#include <libtf/io.h> - -/* FIXME: should be in thread local storage */ -extern struct tf_poll_hooks tf_epoll_hooks; -struct tf_scheduler *__tf_scheduler; - -static void update_time(struct tf_scheduler *sched) -{ -	struct timespec ts; - -	clock_gettime(CLOCK_MONOTONIC, &ts); -	sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; -} - -static void process_heap(struct tf_scheduler *sched) -{ -	struct tf_heap_node *node; -	tf_mtime_t now = sched->scheduler_time; - -	while (!tf_heap_empty(&sched->heap) && -	       tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) { -		node = tf_heap_get_node(&sched->heap); -		tf_heap_delete(node, &sched->heap); -		__tf_fiber_wakeup_heapnode(node); -	} -} - -void tf_scheduler_fiber(void *data) -{ -	struct tf_scheduler *sched = (struct tf_scheduler *) data; - -	do { -		tf_mtime_diff_t timeout; - -		update_time(sched); -		if (!tf_list_empty(&sched->scheduled_q) || -		    !tf_list_empty(&sched->running_q)) { -			timeout = 0; -		} else if (!tf_heap_empty(&sched->heap)) { -			timeout = tf_mtime_diff( -				tf_heap_get_value(&sched->heap), -				tf_scheduler_get_mtime()); -			if (timeout < 0) -				timeout = 0; -		} else { -			timeout = -1; -		} - -		if (sched->poller->poll(timeout) == TF_WAKEUP_TIMEOUT && -		    timeout >= 0) { -			sched->scheduler_time += timeout; -			process_heap(sched); -		} - -		if (tf_fiber_yield() == TF_WAKEUP_KILL) { -			do { -				tf_fiber_put(sched->active_fiber); -				sched->active_fiber = sched; -			} while (__tf_fiber_schedule() == TF_WAKEUP_KILL); -		} -	} while (1); -} - -struct tf_scheduler *tf_scheduler_create(void) -{ -	struct tf_scheduler *sched; - -	sched = __tf_fiber_create(tf_scheduler_fiber, -				  sizeof(struct tf_scheduler)); - -	*sched = (struct tf_scheduler) { -		.scheduled_q = TF_LIST_HEAD_INITIALIZER(sched->scheduled_q), -		.running_q = TF_LIST_HEAD_INITIALIZER(sched->running_q), -	}; - -	return sched; -} - -int  tf_scheduler_enable(struct tf_scheduler *sched) -{ -	struct tf_scheduler *s = sched; - -	if (s == NULL) { -		s = tf_scheduler_create(); -		if (s == NULL) -			return -ENOMEM; -	} -	if (s->main_fiber != NULL) -		return -EBUSY; - -	__tf_fiber_bind_scheduler(s); -	__tf_scheduler = s; -	s->poller = &tf_epoll_hooks; -	s->poller->init(); -	update_time(s); - -	if (sched != NULL) -		tf_scheduler_get(sched); - -	return 0; -} - -void tf_scheduler_disable(void) -{ -	struct tf_scheduler *sched = __tf_scheduler; - -	if (sched == NULL || -	    sched->main_fiber != sched->active_fiber) -		return; - -	/* sleep until no others */ -	while (sched->num_fibers > 1) -		__tf_fiber_schedule(); - -	sched->poller->close(); -	__tf_scheduler = NULL; -	__tf_fiber_release_scheduler(sched); -	tf_heap_destroy(&sched->heap); -	tf_fiber_put(sched); -} diff --git a/src/timeout.c b/src/timeout.c new file mode 100644 index 0000000..fc7acd2 --- /dev/null +++ b/src/timeout.c @@ -0,0 +1,120 @@ +/* timeout.c - timerfd based fiber wakeups + * + * Copyright (C) 2010 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#include <libtf/fiber.h> +#include <libtf/io.h> +#include <sys/timerfd.h> + +struct tf_timeout_fiber { +	struct tf_timeout_manager	manager; +	struct tf_fd			fd; +	tf_mtime_t			programmed; +}; + +static tf_mtime_t tf_mtime_cached; + +tf_mtime_t tf_mtime_now(void) +{ +	return tf_mtime_cached; +} + +tf_mtime_t tf_mtime_update(void) +{ +	struct timespec ts; + +	clock_gettime(CLOCK_MONOTONIC, &ts); +	tf_mtime_cached = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; + +	return tf_mtime_cached; +} + +static void tf_timeout_process_heap(struct tf_timeout_fiber *t) +{ +	struct tf_heap_head *heap = &t->manager.heap; +	struct tf_heap_node *node; +	tf_mtime_t now = tf_mtime_update(); +	tf_mtime_diff_t timeout, value; + +	while (!tf_heap_empty(heap)) { +		value = tf_heap_get_value(heap); +		timeout = tf_mtime_diff(value, now); +		if (timeout > 0) { +			if (t->programmed != value) { +				struct itimerspec its = { +					.it_value.tv_sec  = timeout / 1000, +					.it_value.tv_nsec = (timeout % 1000) * 1000000, +				}; +				t->programmed = value; +				timerfd_settime(t->fd.fd, 0, &its, NULL); +			} +			break;	 +		} + +		node = tf_heap_get_node(heap); +		tf_heap_delete(node, heap); +		tf_fiber_wakeup(container_of(node, struct tf_fiber, timeout.heap_node)); +	} +} + +static void tf_timeout_worker(void *ctx) +{ +	do { +		tf_ifc_process_unordered(); +		tf_timeout_process_heap(ctx); +	} while (tf_fiber_schedule() == 0); +} + +struct tf_timeout_fiber *tf_timeout_fiber_create(void) +{ +	struct tf_timeout_fiber *f; + +	f = tf_fiber_create(tf_timeout_worker, sizeof(struct tf_timeout_fiber)); +	if (f == NULL) +		return f; + +	if (tf_open_fd(&f->fd, +		       timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC), +		       TF_FD_READ | TF_FD_ALREADY_NONBLOCKING | TF_FD_AUTOCLOSE) != 0) { +		tf_fiber_put(f); +		return NULL; +	} +	f->fd.fiber = container_of((void *) f, struct tf_fiber, data); +	tf_fiber_run(tf_fiber_get(f)); + +	return f; +} + +static void tf_timeout_adjust_ifc(void *fiber, struct tf_ifc *ifc) +{ +	struct tf_timeout_fiber *t = fiber; +	struct tf_timeout_client *c = container_of(ifc, struct tf_timeout_client, ifc); +	tf_mtime_t val; + +	val = c->value; +	if (val == 0) +		tf_heap_delete(&c->heap_node, &t->manager.heap); +	else +		tf_heap_change(&c->heap_node, &t->manager.heap, val); +	c->latched = val; +} + +void tf_timeout_adjust(struct tf_timeout_client *tc) +{ +	tf_ifc_queue(tc->manager, &tc->ifc, tf_timeout_adjust_ifc); +} + +void tf_timeout_delete(struct tf_timeout_client *tc) +{ +	tc->value = 0; +	tc->latched = 0; +	tf_ifc_complete(tc->manager, &tc->ifc, tf_timeout_adjust_ifc); +} @@ -18,19 +18,18 @@  #ifdef VALGRIND  #include <valgrind/valgrind.h> -#else -#define VALGRIND_STACK_REGISTER(stack_base, size) 0 -#define VALGRIND_STACK_DEREGISTER(stack_id)  #endif  #define STACK_GUARD		0xbad57ac4  struct tf_uctx { -	int *stack_guard; -	size_t size; -	void *alloc; -	void *current_sp; -	unsigned int stack_id; +	int *			stack_guard; +	size_t			size; +	void *			alloc; +	void *			current_sp; +#ifdef VALGRIND +	unsigned int		stack_id; +#endif  };  #if defined(__i386__) @@ -89,13 +88,14 @@ static inline void stack_push_ptr(void **stackptr, void *ptr)  } -static inline void tf_uctx_create_self(struct tf_uctx *uctx) +static inline tf_uctx_t tf_uctx_create_self(struct tf_uctx *uctx)  {  	static int dummy_guard = STACK_GUARD;  	*uctx = (struct tf_uctx) {  		.stack_guard = &dummy_guard,  	}; +	return uctx;  }  static inline void * @@ -118,20 +118,24 @@ tf_uctx_create_embedded(  	/* Create initial stack frame (cdecl convention) */  	stack = stack_pointer(stack_base, size); -	user_data = stack_push(&stack, TF_ALIGN(private_size, 64)); +	user_data = stack_push(&stack, TF_ALIGN(private_size, 16)); +	uctx = stack_push(&stack, TF_ALIGN(sizeof(struct tf_uctx), 16));  	stack_push_ptr(&stack, main_argument);  	stack_push_ptr(&stack, user_data);  	stack_push_ptr(&stack, NULL);  	stack_push_ptr(&stack, stack_frame_main);	/* eip */  	stack_push_ptr(&stack, NULL);			/* ebp */ -	uctx = user_data + uctx_offset; +	*((tf_uctx_t *) (user_data + uctx_offset)) = uctx; +  	*uctx = (struct tf_uctx) {  		.stack_guard = stack_guard(stack_base, size),  		.alloc = stack_base,  		.size = size,  		.current_sp = stack, +#ifdef VALGRIND  		.stack_id = VALGRIND_STACK_REGISTER(stack_base, stack_base+size), +#endif  	};  	*uctx->stack_guard = STACK_GUARD; @@ -139,18 +143,25 @@ tf_uctx_create_embedded(  }  static inline -void tf_uctx_destroy(struct tf_uctx *uctx) +void tf_uctx_destroy(tf_uctx_t ctx)  { +	struct tf_uctx *uctx = ctx; +  	if (uctx->alloc != NULL) { +#ifdef VALGRIND  		VALGRIND_STACK_DEREGISTER(uctx->stack_id); +#endif  		tf_bmem_free(uctx->alloc, uctx->size);  	}  }  static inline -void tf_uctx_transfer(struct tf_uctx *from, struct tf_uctx *to) +void tf_uctx_transfer(tf_uctx_t from, tf_uctx_t to)  { +	struct tf_uctx *ufrom = from; +	struct tf_uctx *uto = to; +  	/* Switch stack pointers */ -	TF_BUG_ON(*from->stack_guard != STACK_GUARD); -	switch_fiber(from, to); +	TF_BUG_ON(*ufrom->stack_guard != STACK_GUARD); +	switch_fiber(ufrom, uto);  } diff --git a/src/vmach.c b/src/vmach.c new file mode 100644 index 0000000..a9ca446 --- /dev/null +++ b/src/vmach.c @@ -0,0 +1,120 @@ +#include <libtf/defines.h> +#include <libtf/list.h> +#include <libtf/vmach.h> +#include <libtf/io.h> +#include "uctx.h" + +__thread struct tf_fiber *tf_current_fiber; +__thread struct tf_vcpu *tf_current_vcpu; +__thread struct tf_vmach *tf_current_vmach; + +extern struct tf_poll_hooks tf_epoll_hooks; + +struct tf_vmachine { +	struct tf_vmach vmach; +	struct tf_uctx startup_uctx; +	void *machine_init_fiber; +}; + +static void tf_vcpu_main(void *fiber_data) +{ +	struct tf_vcpu *vcpu = fiber_data; +	struct tf_vmach *vmach = vcpu->machine; +	struct tf_fiber *self = container_of(fiber_data, struct tf_fiber, data); +	struct tf_fiber *f; + +	tf_current_vmach = vmach; +	tf_current_vcpu  = vcpu; + +	while (vmach->num_user_fibers != 0) { +		if (tf_list_empty(&vmach->run_q)) { +			/* sleep */ +			continue; +		} + +		f = tf_list_entry(tf_list_pop(&vmach->run_q), +				  struct tf_fiber, queue_node); + +		f->return_context = self->context; +		tf_current_fiber = f; +		tf_uctx_transfer(self->context, f->context); +		tf_list_splice_tail(&f->wakeup_q, &vmach->run_q); +	} +} + +static void tf_vmach_main(void *fiber_data) +{ +	struct tf_fiber *self = container_of(fiber_data, struct tf_fiber, data); +	struct tf_vcpu *vcpu = fiber_data; +	struct tf_vmach *vmach = vcpu->machine; + +	tf_current_vmach = vmach; +	tf_current_vcpu  = vcpu; +	tf_current_fiber = self; + +	/* Initialize IO subsystem */ +	vmach->poll_ops = &tf_epoll_hooks; +	vmach->poll_fiber = vmach->poll_ops->create(); +	vmach->timeout_fiber = tf_timeout_fiber_create(); +	vmach->startup_fiber.timeout.manager = vmach->timeout_fiber; + +	/* Run the initial fiber */ +	tf_fiber_wakeup(&vmach->startup_fiber); + +	/* Use main thread as a regular vcpu */ +	vmach->num_user_fibers = 1; +	tf_list_splice_tail(&self->wakeup_q, &vmach->run_q); +	tf_vcpu_main(vcpu); + +	/* Kill all stuff */ + +	/* Return to main fiber */ +	vmach->startup_fiber.return_context = NULL; +	tf_current_fiber = NULL; +	tf_current_vcpu = NULL; +	tf_current_vmach = NULL; + +	tf_uctx_transfer(self->context, vmach->startup_fiber.context); +} + +void tf_vmach_start(void) +{ +	struct tf_vmachine *vmach; +	struct tf_vcpu *vcpu; + +	TF_BUG_ON(tf_current_vcpu != NULL); + +	/* Create a self-fiber so we can surrender control to vcpu */ +	vmach = calloc(1, sizeof(struct tf_vmachine)); +	vmach->vmach.startup_fiber = (struct tf_fiber) { +		.ref_count = 1, +		.queue_node = TF_LIST_INITIALIZER(vmach->vmach.startup_fiber.queue_node), +		.wakeup_q = TF_LIST_HEAD_INITIALIZER(vmach->vmach.startup_fiber.wakeup_q), +		.context = tf_uctx_create_self(&vmach->startup_uctx), +	}; +	tf_list_init_head(&vmach->vmach.run_q); +	vcpu = tf_fiber_create(tf_vmach_main, sizeof(struct tf_vcpu)); +	vmach->machine_init_fiber = vcpu; +	vcpu->machine = &vmach->vmach; + +	/* Create manager fiber to initialize vcpu */ +	tf_uctx_transfer(vmach->vmach.startup_fiber.context, +			 container_of((void *) vcpu, struct tf_fiber, data)->context); +} + +void tf_vmach_stop(void) +{ +	struct tf_fiber *self = tf_vmach_get_current_fiber(); +	struct tf_vmach *vmach = tf_vmach_get_current(); + +	TF_BUG_ON(self != &vmach->startup_fiber); + +	/* Wait for the vmachine to stop */ +	tf_vmach_get_current()->num_user_fibers--; +	while (self->return_context != NULL) +		tf_fiber_schedule(); + +	/* And clean up */ +	tf_uctx_destroy(vmach->startup_fiber.context); +	free(vmach); +} diff --git a/test/httpget.c b/test/httpget.c index 29f7f14..876b63c 100644 --- a/test/httpget.c +++ b/test/httpget.c @@ -14,6 +14,7 @@ static void ping_fiber(void *ptr)  	struct ctx *ctx = (struct ctx*) ptr;  	struct tf_sockaddr host;  	struct tf_fd fd; +	struct tf_timeout to;  	char buf[128];  	int bytes = 0, r;  	const char *req = "GET / HTTP/1.0\r\n\r\n"; @@ -27,7 +28,7 @@ static void ping_fiber(void *ptr)  	if (r < 0)  		goto err; -	r = tf_timed(tf_connect(&fd, &host), 10000); +	r = tf_timed(&to, tf_connect(&fd, &host), 10000);  	if (r < 0)  		goto err_close; @@ -49,11 +50,11 @@ int main(int argc, char **argv)  	struct ctx *c;  	int i; -	tf_scheduler_enable(NULL); +	tf_vmach_start();  	for (i = 1; i < argc; i++) {  		c = tf_fiber_create(ping_fiber, sizeof(struct ctx));  		c->hostname = argv[i]; -		tf_fiber_put(c); +		tf_fiber_run(c);  	} -	tf_scheduler_disable(); +	tf_vmach_stop();  } diff --git a/test/read.c b/test/read.c index 0dc72cc..9963a62 100644 --- a/test/read.c +++ b/test/read.c @@ -34,8 +34,8 @@ static void io_fiber(void *ptr)  int main(int argc, char **argv)  { -	tf_scheduler_enable(NULL); -	tf_fiber_put(tf_fiber_create(time_fiber, 0)); -	tf_fiber_put(tf_fiber_create(io_fiber, 0)); -	tf_scheduler_disable(); +	tf_vmach_start(); +	tf_fiber_run(tf_fiber_create(time_fiber, 0)); +	tf_fiber_run(tf_fiber_create(io_fiber, 0)); +	tf_vmach_stop();  } diff --git a/test/simple1.c b/test/simple1.c index 6f40f8d..b6de289 100644 --- a/test/simple1.c +++ b/test/simple1.c @@ -10,7 +10,7 @@ static void work_fiber(void *ptr)  	struct ctx *c = (struct ctx*) ptr;  	printf("Hello%d.1\n", c->id); -	tf_fiber_yield(); +	tf_msleep(1);  	printf("Hello%d.2\n", c->id);  } @@ -19,11 +19,11 @@ int main(int argc, char **argv)  	struct ctx *c;  	int i; -	tf_scheduler_enable(NULL); +	tf_vmach_start();  	for (i = 0; i < 6; i++) {  		c = tf_fiber_create(work_fiber, sizeof(struct ctx));  		c->id = i; -		tf_fiber_put(c); +		tf_fiber_run(c);  	} -	tf_scheduler_disable(); +	tf_vmach_stop();  } diff --git a/test/sleep.c b/test/sleep.c index 8e225a7..dd71d91 100644 --- a/test/sleep.c +++ b/test/sleep.c @@ -21,10 +21,10 @@ int main(int argc, char **argv)  	struct ctx *c;  	int i; -	tf_scheduler_enable(NULL); +	tf_vmach_start();  	for (i = 0; i < 1000; i++) {  		c = tf_fiber_create(work_fiber, sizeof(struct ctx)); -		tf_fiber_put(c); +		tf_fiber_run(c);  	} -	tf_scheduler_disable(); +	tf_vmach_stop();  }  | 
