diff options
| -rw-r--r-- | include/libtf/defines.h | 1 | ||||
| -rw-r--r-- | include/libtf/fiber.h | 82 | ||||
| -rw-r--r-- | include/libtf/heap.h | 10 | ||||
| -rw-r--r-- | include/libtf/io.h | 4 | ||||
| -rw-r--r-- | include/libtf/list.h | 33 | ||||
| -rw-r--r-- | include/libtf/scheduler.h | 64 | ||||
| -rw-r--r-- | include/libtf/tf.h | 1 | ||||
| -rw-r--r-- | src/TFbuild | 3 | ||||
| -rw-r--r-- | src/fiber.c | 327 | ||||
| -rw-r--r-- | src/io-epoll.c | 32 | ||||
| -rw-r--r-- | src/io-unix.c | 16 | ||||
| -rw-r--r-- | src/scheduler.c | 132 | ||||
| -rw-r--r-- | src/uctx.h | 82 | ||||
| -rw-r--r-- | test/httpget.c | 16 | ||||
| -rw-r--r-- | test/read.c | 11 | ||||
| -rw-r--r-- | test/simple1.c | 13 | ||||
| -rw-r--r-- | test/sleep.c | 17 | 
17 files changed, 533 insertions, 311 deletions
diff --git a/include/libtf/defines.h b/include/libtf/defines.h index ae72980..8e39c7e 100644 --- a/include/libtf/defines.h +++ b/include/libtf/defines.h @@ -62,6 +62,7 @@  		__FILE__, __LINE__, __func__, #cond);		\  	abort();						\  } +#define TF_BUILD_BUG_ON(cond) ((void) sizeof(struct { char TF_BUILD_BUG_ON[-!!(cond)]; }))  #define TF_ALIGN(size,align)	((((size_t)(size)) + (align)-1) & ~((align)-1)) diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h index ce3745b..a140607 100644 --- a/include/libtf/fiber.h +++ b/include/libtf/fiber.h @@ -1,6 +1,6 @@ -/* fiber.h - libtf fiber scheduler header +/* fiber.h - libtf fiber manager header   * - * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi> + * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi>   * All rights reserved.   *   * This program is free software; you can redistribute it and/or modify it @@ -15,8 +15,6 @@  #include <errno.h>  #include <libtf/defines.h> -#include <libtf/atomic.h> -#include <libtf/list.h>  #include <libtf/heap.h>  /* Fiber wakeup reasons */ @@ -27,64 +25,31 @@  #define TF_WAKEUP_THIS_TIMEOUT	-ETIMEDOUT  #define TF_WAKEUP_FD		-EIO -struct tf_poll_data { -	int			epoll_fd; -	int			num_waiters; -}; - -/* Scheduler */ -struct tf_fiber; +/* Fiber management */ +struct tf_scheduler; +typedef void (*tf_fiber_proc)(void *fiber); -struct tf_scheduler { -	struct tf_list_head	run_q; -	struct tf_heap_head	heap; -	struct tf_fiber *	active_fiber; -	int			num_fibers; -	tf_mtime_t		scheduler_time; -	struct tf_poll_data	poll_data; -}; +void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size); +void *tf_fiber_create(struct tf_scheduler *sched, +		      tf_fiber_proc fiber_main, int private_size); +void *tf_fiber_get(void *data); +void tf_fiber_put(void *data); +void __tf_fiber_wakeup(void *data, int wakeup_type); +void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node); +int  __tf_fiber_schedule(void); +int  __tf_fiber_bind_scheduler(struct tf_scheduler *sched); +int  __tf_fiber_release_scheduler(struct tf_scheduler *sched); -struct tf_main_ctx { -	int			argc; -	char **			argv; -}; +void tf_fiber_exit(void) attribute_noreturn; +void tf_fiber_kill(void *fiber); +int  tf_fiber_yield(void); +/* Scheduling and fiber management */  struct tf_timeout {  	tf_mtime_t	saved_timeout;  	unsigned int	timeout_change;  }; -static inline -struct tf_scheduler *tf_get_scheduler(void) -{ -	extern struct tf_scheduler *__tf_scheduler; -	return __tf_scheduler; -} - -static inline -struct tf_fiber *tf_get_fiber(void) -{ -	return tf_get_scheduler()->active_fiber; -} - -static inline -tf_mtime_t tf_mtime(void) -{ -	return tf_get_scheduler()->scheduler_time; -} - -/* Fiber creation */ -typedef void (*tf_fiber_proc)(void *fiber); -int tf_main_args(tf_fiber_proc fiber_main, int argc, char **argv); -static inline int tf_main(tf_fiber_proc fiber_main) -{ -	return tf_main_args(fiber_main, 0, NULL); -} - -void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size); -void *tf_fiber_get(void *data); -void tf_fiber_put(void *data); -  #define tf_timed(func, timeout)						\  	({								\  		struct tf_timeout __timeout;				\ @@ -92,7 +57,6 @@ void tf_fiber_put(void *data);  		tf_timeout_pop(&__timeout, (func));			\  	}) -//* Scheduling and fiber management */  void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds);  int __tf_timeout_pop(struct tf_timeout *timeout, int err); @@ -103,17 +67,11 @@ static inline int tf_timeout_pop(struct tf_timeout *timeout, int err)  	return err;  } -int  tf_schedule(void); -void tf_wakeup(struct tf_fiber *fiber, int wakeup_type); -void tf_exit(void) attribute_noreturn; -void tf_kill(void *fiber); -int  tf_yield(void); -  static inline  int tf_msleep(tf_mtime_diff_t milliseconds)  {  	int r; -	r = tf_timed(tf_schedule(), milliseconds); +	r = tf_timed(__tf_fiber_schedule(), milliseconds);  	if (r == TF_WAKEUP_THIS_TIMEOUT)  		r = 0;  	return r; diff --git a/include/libtf/heap.h b/include/libtf/heap.h index a68e01d..3a16159 100644 --- a/include/libtf/heap.h +++ b/include/libtf/heap.h @@ -74,4 +74,14 @@ int tf_heap_prealloc(struct tf_heap_head *head, uint32_t size)  	return 0;  } +static inline +void tf_heap_destroy(struct tf_heap_head *head) +{ +	if (head->item) +		free(head->item); +	head->item = NULL; +	head->num_items = 0; +	head->allocated = 0; +} +  #endif diff --git a/include/libtf/io.h b/include/libtf/io.h index 1f0b793..1f37d81 100644 --- a/include/libtf/io.h +++ b/include/libtf/io.h @@ -26,8 +26,6 @@  #define TF_FD_SET_CLOEXEC		4  #define TF_FD_ALREADY_NONBLOCKING	8 -struct tf_fiber; -  struct tf_sockaddr {  	union {  		struct sockaddr		addr; @@ -42,7 +40,7 @@ struct tf_fd {  	/* Single waiter -- would be relatively trivial to modify to allow  	 * multiple waiters, if someone actually needs it */  	unsigned int events; -	struct tf_fiber *waiting_fiber; +	void *waiting_fiber;  };  void tf_poll_init(void); diff --git a/include/libtf/list.h b/include/libtf/list.h index 22b76a8..f75a0be 100644 --- a/include/libtf/list.h +++ b/include/libtf/list.h @@ -142,7 +142,7 @@ static inline void tf_list_add_tail(struct tf_list_node *new, struct tf_list_hea  	tf_list_add_before(new, &head->node);  } -static inline void __tf_list_del(struct tf_list_node * prev, struct tf_list_node *next) +static inline void __tf_list_del(struct tf_list_node *prev, struct tf_list_node *next)  {  	next->prev = prev;  	prev->next = next; @@ -155,6 +155,14 @@ static inline void tf_list_del(struct tf_list_node *entry)  	entry->prev = NULL;  } +static inline struct tf_list_node *tf_list_pop(struct tf_list_head *head) +{ +	struct tf_list_node *n; +	n = head->node.next; +	tf_list_del(n); +	return n; +} +  static inline int tf_list_hashed(const struct tf_list_node *n)  {  	return n->next != n && n->next != NULL; @@ -165,6 +173,29 @@ static inline int tf_list_empty(const struct tf_list_head *h)  	return !tf_list_hashed(&h->node);  } +static inline void __tf_list_splice(const struct tf_list_head *list, +				    struct tf_list_node *prev, +				    struct tf_list_node *next) +{ +	struct tf_list_node *first = list->node.next; +	struct tf_list_node *last = list->node.prev; + +	first->prev = prev; +	prev->next = first; + +	last->next = next; +	next->prev = last; +} + +static inline void tf_list_splice_tail(struct tf_list_head *src, +				       struct tf_list_head *dst) +{ +	if (!tf_list_empty(src)) { +		__tf_list_splice(src, dst->node.prev, &dst->node); +		tf_list_init_head(src); +	} +} +  #define tf_list_next(ptr, type, member) \  	(tf_list_hashed(ptr) ? container_of((ptr)->next,type,member) : NULL) diff --git a/include/libtf/scheduler.h b/include/libtf/scheduler.h new file mode 100644 index 0000000..cc8db70 --- /dev/null +++ b/include/libtf/scheduler.h @@ -0,0 +1,64 @@ +/* scheduler.h - libtf fiber scheduler header + * + * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#ifndef TF_SCHEDULER_H +#define TF_SCHEDULER_H + +#include <libtf/atomic.h> +#include <libtf/list.h> +#include <libtf/heap.h> +#include <libtf/fiber.h> + +struct tf_scheduler { +	struct tf_list_head	scheduled_q; +	struct tf_list_head	running_q; +	struct tf_heap_head	heap; +	void *			active_fiber; +	void *			main_fiber; +	int			num_fibers; +	tf_mtime_t		scheduler_time; +	unsigned long		poll_data[2]; +}; + +static inline +struct tf_scheduler *tf_scheduler_get_current(void) +{ +	extern struct tf_scheduler *__tf_scheduler; +	TF_BUG_ON(__tf_scheduler == NULL); +	return __tf_scheduler; +} + +static inline +tf_mtime_t tf_scheduler_get_mtime(void) +{ +	return tf_scheduler_get_current()->scheduler_time; +} + +struct tf_scheduler *tf_scheduler_create(void); +int  tf_scheduler_enable(struct tf_scheduler *); +void tf_scheduler_disable(void); + +static inline struct tf_scheduler * +tf_scheduler_get(struct tf_scheduler *s) +{ +	tf_fiber_get(s); +	return s; +} + +static inline void +tf_scheduler_put(struct tf_scheduler *s) +{ +	tf_fiber_put(s); +} + +#endif + diff --git a/include/libtf/tf.h b/include/libtf/tf.h index 7a089ff..e613f18 100644 --- a/include/libtf/tf.h +++ b/include/libtf/tf.h @@ -14,6 +14,7 @@  #define TF_H  #include <libtf/fiber.h> +#include <libtf/scheduler.h>  #include <libtf/io.h>  #endif diff --git a/src/TFbuild b/src/TFbuild index 9b40443..08cb696 100644 --- a/src/TFbuild +++ b/src/TFbuild @@ -1,5 +1,6 @@  libs-y			+= libtf -libtf-objs-y		+= fiber.o heap.o io-epoll.o +libtf-objs-y		+= fiber.o scheduler.o heap.o +libtf-objs-$(OS_LINUX)	+= io-epoll.o  CFLAGS_heap.c		+= -funroll-all-loops diff --git a/src/fiber.c b/src/fiber.c index a7cb6bd..3f58b6d 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -1,4 +1,4 @@ -/* fiber.c - fiber management and scheduling +/* fiber.c - fiber management   *   * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>   * All rights reserved. @@ -14,54 +14,74 @@  #include <errno.h>  #include <unistd.h>  #include <libtf/fiber.h> -#include <libtf/io.h> +#include <libtf/scheduler.h> +#include "uctx.h"  #define TF_TIMEOUT_CHANGE_NEEDED			1  #define TF_TIMEOUT_CHANGE_NEW_VALUE			2  struct tf_fiber {  	unsigned int		ref_count; +	struct tf_scheduler *	scheduler;  	int			wakeup_type;  	unsigned int		timeout_change;  	tf_mtime_t		timeout;  	struct tf_list_node	queue_node;  	struct tf_heap_node	heap_node; +	struct tf_uctx		context;  	char			data[TF_EMPTY_ARRAY];  }; -#include "uctx.h" - -/* FIXME: should be in thread local storage */ -struct tf_scheduler *__tf_scheduler; +static inline +struct tf_fiber *tf_fiber_get_current(void) +{ +	void *data = tf_scheduler_get_current()->active_fiber; +	return container_of(data, struct tf_fiber, data); +} -void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size) +void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size)  { -	struct tf_scheduler *sched = tf_get_scheduler();  	struct tf_fiber *fiber; -	if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0) -		return NULL; - -	fiber = tf_uctx_create(fiber_main, private_size); +	fiber = tf_uctx_create_embedded( +		TF_STACK_SIZE, +		sizeof(struct tf_fiber) + private_size, +		offsetof(struct tf_fiber, context), +		fiber_main, offsetof(struct tf_fiber, data), +		tf_fiber_exit);  	if (fiber == NULL)  		return NULL; -	/* The initial references for caller and scheduler */  	*fiber = (struct tf_fiber) { -		.ref_count = 2, +		.ref_count = 1,  		.queue_node = TF_LIST_INITIALIZER(fiber->queue_node), +		.context = fiber->context,  	}; -	tf_list_add_tail(&fiber->queue_node, &sched->run_q); -	sched->num_fibers++; -  	return fiber->data;  } -static void __tf_fiber_destroy(struct tf_fiber *fiber) +void *tf_fiber_create( +	struct tf_scheduler *sched, +	tf_fiber_proc fiber_main, int private_size)  { -	tf_heap_delete(&fiber->heap_node, &tf_get_scheduler()->heap); -	tf_uctx_destroy(fiber); +	struct tf_fiber *fiber; + +	if (sched == NULL) +		sched = tf_scheduler_get_current(); + +	if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0) +		return NULL; + +	fiber = container_of(__tf_fiber_create(fiber_main, private_size), +			     struct tf_fiber, data); +	sched->num_fibers++; + +	fiber->scheduler = sched; +	fiber->wakeup_type = TF_WAKEUP_NONE; +	tf_list_add_tail(&fiber->queue_node, &sched->scheduled_q); + +	return tf_fiber_get(fiber->data);  }  void *tf_fiber_get(void *data) @@ -71,6 +91,23 @@ void *tf_fiber_get(void *data)  	return data;  } +static void __tf_fiber_destroy(struct tf_fiber *fiber) +{ +	struct tf_scheduler *sched = fiber->scheduler; +	int main_fiber; + +	main_fiber = (fiber->context.alloc == NULL); +	tf_heap_delete(&fiber->heap_node, &sched->heap); +	tf_uctx_destroy(&fiber->context); +	if (main_fiber) +		free(fiber); +	sched->num_fibers--; +	if (sched->num_fibers == 1) { +		/* FIXME: Use proper fiber event*/ +		__tf_fiber_wakeup(sched->main_fiber, TF_WAKEUP_IMMEDIATE); +	} +} +  void tf_fiber_put(void *data)  {  	struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); @@ -78,110 +115,144 @@ void tf_fiber_put(void *data)  		__tf_fiber_destroy(fiber);  } -static void update_time(struct tf_scheduler *sched) +void __tf_fiber_wakeup(void *data, int wakeup_type)  { -	struct timespec ts; +	struct tf_fiber *fiber = container_of(data, struct tf_fiber, data); +	struct tf_scheduler *sched = fiber->scheduler; -	clock_gettime(CLOCK_MONOTONIC, &ts); -	sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; +	if (fiber->wakeup_type == TF_WAKEUP_NONE) { +		fiber->wakeup_type = wakeup_type; +		tf_list_add_tail(&fiber->queue_node, &sched->running_q); +	}  } -static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f) +void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node)  { -	struct tf_fiber *schedf = container_of((void*) tf_get_scheduler(), struct tf_fiber, data); - -	sched->active_fiber = f; -	tf_uctx_transfer(schedf, f); -	switch (f->wakeup_type) { -	case TF_WAKEUP_KILL: -		tf_fiber_put(f->data); -		sched->num_fibers--; -		break; -	case TF_WAKEUP_NONE: -		break; -	default: -		TF_BUG_ON("bad scheduler call from fiber"); -	} +	__tf_fiber_wakeup(container_of(node, struct tf_fiber, heap_node)->data, +			  TF_WAKEUP_TIMEOUT);  } -static void process_heap(struct tf_scheduler *sched) +static void __tf_fiber_schedule_next(void)  { -	struct tf_heap_node *node; -	struct tf_fiber *f; -	tf_mtime_t now = tf_mtime(); - -	while (!tf_heap_empty(&sched->heap) && -	       tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) { -		node = tf_heap_get_node(&sched->heap); -		f = container_of(node, struct tf_fiber, heap_node); -		if (f->wakeup_type == TF_WAKEUP_NONE) -			f->wakeup_type = TF_WAKEUP_TIMEOUT; -		run_fiber(sched, f); +	struct tf_scheduler *sched = tf_scheduler_get_current(); +	struct tf_fiber *f = tf_fiber_get_current(); +	struct tf_fiber *nf; + +	/* Figure out the next fibre to run */ +	if (unlikely(tf_list_empty(&sched->scheduled_q))) { +		tf_list_splice_tail(&sched->running_q, +				    &sched->scheduled_q); +		TF_BUG_ON(tf_list_empty(&sched->scheduled_q));  	} +	nf = tf_list_entry(tf_list_pop(&sched->scheduled_q), +			   struct tf_fiber, queue_node); +	sched->active_fiber = nf->data; +	tf_uctx_transfer(&f->context, &nf->context);  } -static void process_runq(struct tf_scheduler *sched) +int __tf_fiber_schedule(void)  { -	struct tf_fiber *f; +	struct tf_scheduler *sched = tf_scheduler_get_current(); +	struct tf_fiber *f = tf_fiber_get_current(); +	int wakeup; -	while (!tf_list_empty(&sched->run_q)) { -		f = tf_list_first(&sched->run_q, struct tf_fiber, queue_node); -		tf_list_del(&f->queue_node); -		run_fiber(sched, f); +	if (unlikely(f->timeout_change)) { +		if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) { +			if (tf_mtime_diff(f->timeout, tf_scheduler_get_mtime()) <= 0) { +				f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; +				return TF_WAKEUP_TIMEOUT; +			} +			tf_heap_change(&f->heap_node, &sched->heap, f->timeout); +		} else +			tf_heap_delete(&f->heap_node, &sched->heap); +		f->timeout_change = 0;  	} + +	__tf_fiber_schedule_next(); + +	wakeup = f->wakeup_type; +	f->wakeup_type = TF_WAKEUP_NONE; + +	return wakeup;  } -int tf_main_args(tf_fiber_proc main_fiber, int argc, char **argv) +int __tf_fiber_bind_scheduler(struct tf_scheduler *sched)  { -	struct tf_uctx *ctx = alloca(sizeof(struct tf_uctx) + sizeof(struct tf_scheduler)); -	struct tf_scheduler *sched = (struct tf_scheduler*) ctx->fiber.data; -	struct tf_main_ctx *mainctx; -	int stack_guard = STACK_GUARD; - -	ctx->stack_guard = &stack_guard; -	*sched = (struct tf_scheduler){ -		.run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q), +	struct tf_fiber *f; + +	f = malloc(sizeof(struct tf_fiber)); +	if (f == NULL) +		return -ENOMEM; + +	/* Mark currently active main fiber as active */ +	*f = (struct tf_fiber) { +		.ref_count = 1, +		.scheduler = sched, +		.queue_node = TF_LIST_INITIALIZER(f->queue_node),  	}; +	tf_uctx_create_self(&f->context); +	sched->main_fiber = f->data; +	sched->active_fiber = f->data; +	sched->num_fibers++; -	__tf_scheduler = sched; -	tf_poll_init(); -	update_time(sched); - -	mainctx = tf_fiber_create(main_fiber, sizeof(struct tf_main_ctx)); -	mainctx->argc = argc; -	mainctx->argv = argv; -	tf_fiber_put(mainctx); - -	do { -		tf_mtime_diff_t timeout; - -		update_time(sched); -		if (!tf_list_empty(&sched->run_q)) { -			timeout = 0; -		} else if (!tf_heap_empty(&sched->heap)) { -			timeout = tf_mtime_diff(tf_heap_get_value(&sched->heap), -						tf_mtime()); -			if (timeout < 0) -				timeout = 0; -		} else -			timeout = -1; +	/* Schedule scheduler fiber */ +	f = container_of((void *) sched, struct tf_fiber, data); +	f->scheduler = sched; +	f->wakeup_type = TF_WAKEUP_IMMEDIATE; +	tf_list_add_tail(&f->queue_node, &sched->running_q); -		if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT && timeout >= 0) { -			sched->scheduler_time += timeout; -			process_heap(sched); -		} -		process_runq(sched); -	} while (likely(sched->num_fibers)); -	tf_poll_close(); -	__tf_scheduler = NULL; +	return 0; +} + +int __tf_fiber_release_scheduler(struct tf_scheduler *sched) +{ +	struct tf_fiber *f; + +	/* Detach scheduler */ +	f = container_of((void *) sched, struct tf_fiber, data); +	tf_list_del(&f->queue_node); + +	/* Detach main stack from this scheduler */ +	f = container_of((void *) sched->main_fiber, struct tf_fiber, data); +	tf_fiber_put(sched->main_fiber); +	sched->main_fiber = NULL; +	sched->num_fibers--;  	return 0;  } +void tf_fiber_exit(void) +{ +	struct tf_scheduler *sched = tf_scheduler_get_current(); +	struct tf_fiber *f = tf_fiber_get_current(); +	struct tf_fiber *schedf = container_of((void *) sched, struct tf_fiber, data); + +	tf_heap_delete(&f->heap_node, &sched->heap); +	schedf->wakeup_type = TF_WAKEUP_KILL; +	tf_uctx_transfer(&f->context, &schedf->context); +	TF_BUG_ON(1); +} + +void tf_fiber_kill(void *fiber) +{ +} + +int tf_fiber_yield(void) +{ +	struct tf_scheduler *sched = tf_scheduler_get_current(); +	struct tf_fiber *f = tf_fiber_get_current(); + +	TF_BUG_ON(tf_list_hashed(&f->queue_node)); +	f->wakeup_type = TF_WAKEUP_IMMEDIATE; +	tf_list_add_tail(&f->queue_node, &sched->running_q); + +	return __tf_fiber_schedule(); +} +  void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds)  { -	struct tf_fiber *f = tf_get_fiber(); -	tf_mtime_t abs = tf_mtime() + milliseconds; +	struct tf_fiber *f = tf_fiber_get_current(); +	tf_mtime_t abs = tf_scheduler_get_mtime() + milliseconds;  	int active;  	if (f->timeout_change) @@ -207,7 +278,7 @@ void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds)  int __tf_timeout_pop(struct tf_timeout *timeout, int err)  { -	struct tf_fiber *f = tf_get_fiber(); +	struct tf_fiber *f = tf_fiber_get_current();  	f->timeout = timeout->saved_timeout;  	f->timeout_change = timeout->timeout_change; @@ -215,61 +286,3 @@ int __tf_timeout_pop(struct tf_timeout *timeout, int err)  		err = TF_WAKEUP_THIS_TIMEOUT;  	return err;  } - -int tf_schedule(void) -{ -	struct tf_scheduler *sched = tf_get_scheduler(); -	struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); -	struct tf_fiber *f = sched->active_fiber; - -	if (unlikely(f->timeout_change)) { -		if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) { -			if (tf_mtime_diff(f->timeout, tf_mtime()) <= 0) { -				f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; -				return TF_WAKEUP_TIMEOUT; -			} -			tf_heap_change(&f->heap_node, &sched->heap, f->timeout); -		} else -			tf_heap_delete(&f->heap_node, &sched->heap); -		f->timeout_change = 0; -	} -	f->wakeup_type = TF_WAKEUP_NONE; -	tf_uctx_transfer(f, schedf); -	return f->wakeup_type; -} - -void tf_wakeup(struct tf_fiber *fiber, int wakeup_type) -{ -	struct tf_scheduler *sched = tf_get_scheduler(); - -	if (fiber->wakeup_type == TF_WAKEUP_NONE) { -		fiber->wakeup_type = wakeup_type; -		tf_list_add_tail(&fiber->queue_node, &sched->run_q); -	} -} - -void tf_exit(void) -{ -	struct tf_scheduler *sched = tf_get_scheduler(); -	struct tf_fiber *f = sched->active_fiber; -	struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); - -	tf_heap_delete(&f->heap_node, &sched->heap); -	f->wakeup_type = TF_WAKEUP_KILL; -	tf_uctx_transfer(f, schedf); -	TF_BUG_ON(1); -} - -void tf_kill(void *fiber) -{ -} - -int tf_yield(void) -{ -	struct tf_scheduler *sched = tf_get_scheduler(); -	struct tf_fiber *f = sched->active_fiber; - -	tf_list_add_tail(&f->queue_node, &sched->run_q); -	return tf_schedule(); -} - diff --git a/src/io-epoll.c b/src/io-epoll.c index 5e28de8..8ac230f 100644 --- a/src/io-epoll.c +++ b/src/io-epoll.c @@ -17,11 +17,23 @@  #include <sys/socket.h>  #include <libtf/io.h> -#include <libtf/fiber.h> +#include <libtf/scheduler.h> + +struct tf_poll_data { +	int			epoll_fd; +	int			num_waiters; +}; + +struct tf_poll_data *tf_epoll_get_data(void) +{ +	struct tf_scheduler *sched = tf_scheduler_get_current(); +	TF_BUILD_BUG_ON(sizeof(struct tf_poll_data) > sizeof(sched->poll_data)); +	return (struct tf_poll_data *) &sched->poll_data; +}  static int tf_fd_created(struct tf_fd *fd)  { -	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; +	struct tf_poll_data *pd = tf_epoll_get_data();  	struct epoll_event ev;  	int r; @@ -39,7 +51,7 @@ static int tf_fd_created(struct tf_fd *fd)  static int tf_fd_destroyed(struct tf_fd *fd)  { -	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; +	struct tf_poll_data *pd = tf_epoll_get_data();  	if (fd->flags & TF_FD_AUTOCLOSE)  		return 0; @@ -50,17 +62,17 @@ static int tf_fd_destroyed(struct tf_fd *fd)  static void tf_fd_monitor(struct tf_fd *fd, int events)  { -	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; +	struct tf_poll_data *pd = tf_epoll_get_data();  	TF_BUG_ON(fd->waiting_fiber != NULL);  	fd->events = events | EPOLLERR | EPOLLHUP; -	fd->waiting_fiber = tf_get_fiber(); +	fd->waiting_fiber = tf_scheduler_get_current()->active_fiber;  	pd->num_waiters++;  }  static void tf_fd_unmonitor(struct tf_fd *fd)  { -	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; +	struct tf_poll_data *pd = tf_epoll_get_data();  	fd->waiting_fiber = NULL;  	fd->events = 0; @@ -69,7 +81,7 @@ static void tf_fd_unmonitor(struct tf_fd *fd)  void tf_poll_init(void)  { -	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; +	struct tf_poll_data *pd = tf_epoll_get_data();  	pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC);  	pd->num_waiters = 0; @@ -78,7 +90,7 @@ void tf_poll_init(void)  int tf_poll(tf_mtime_diff_t timeout)  { -	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; +	struct tf_poll_data *pd = tf_epoll_get_data();  	struct epoll_event events[64];  	struct tf_fd *fd;  	int r, i, ret; @@ -95,7 +107,7 @@ int tf_poll(tf_mtime_diff_t timeout)  		for (i = 0; i < r; i++) {  			fd = (struct tf_fd *) events[i].data.ptr;  			if (likely(fd->events & events[i].events)) -				tf_wakeup(fd->waiting_fiber, TF_WAKEUP_FD); +				__tf_fiber_wakeup(fd->waiting_fiber, TF_WAKEUP_FD);  		}  		ret = TF_WAKEUP_FD;  		timeout = 0; @@ -106,7 +118,7 @@ int tf_poll(tf_mtime_diff_t timeout)  void tf_poll_close(void)  { -	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; +	struct tf_poll_data *pd = tf_epoll_get_data();  	close(pd->epoll_fd);  } diff --git a/src/io-unix.c b/src/io-unix.c index ea65a76..39cdf64 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -119,7 +119,7 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)  				continue;  		} -		r = tf_schedule(); +		r = __tf_fiber_schedule();  	} while (r == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd); @@ -152,7 +152,7 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)  				continue;  		} -		r = tf_schedule(); +		r = __tf_fiber_schedule();  	} while (r == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd); @@ -174,7 +174,7 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)  			n = -errno;  			break;  		} -		n = tf_schedule(); +		n = __tf_fiber_schedule();  	} while (n == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd); @@ -196,7 +196,7 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)  			n = -errno;  			break;  		} -		n = tf_schedule(); +		n = __tf_fiber_schedule();  	} while (n == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd); @@ -270,7 +270,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,  			tf_fd_unmonitor(listen_fd);  			return -errno;  		} -		r = tf_schedule(); +		r = __tf_fiber_schedule();  	} while (r == TF_WAKEUP_FD);  	tf_fd_unmonitor(listen_fd);  	if (r < 0) @@ -293,7 +293,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)  	/* Wait for socket to become readable */  	tf_fd_monitor(fd, EPOLLOUT); -	r = tf_schedule(); +	r = __tf_fiber_schedule();  	tf_fd_unmonitor(fd);  	if (r != TF_WAKEUP_FD)  		return r; @@ -337,7 +337,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,  			r = -errno;  			break;  		} -		r = tf_schedule(); +		r = __tf_fiber_schedule();  	} while (r == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd); @@ -400,7 +400,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd,  			r = -errno;  			break;  		} -		r = tf_schedule(); +		r = __tf_fiber_schedule();  	} while (r == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd); diff --git a/src/scheduler.c b/src/scheduler.c new file mode 100644 index 0000000..d287eca --- /dev/null +++ b/src/scheduler.c @@ -0,0 +1,132 @@ +/* scheduler.c - fiber scheduling + * + * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#include <time.h> +#include <libtf/scheduler.h> +#include <libtf/io.h> + +/* FIXME: should be in thread local storage */ +struct tf_scheduler *__tf_scheduler; + +static void update_time(struct tf_scheduler *sched) +{ +	struct timespec ts; + +	clock_gettime(CLOCK_MONOTONIC, &ts); +	sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; +} + +static void process_heap(struct tf_scheduler *sched) +{ +	struct tf_heap_node *node; +	tf_mtime_t now = sched->scheduler_time; + +	while (!tf_heap_empty(&sched->heap) && +	       tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) { +		node = tf_heap_get_node(&sched->heap); +		tf_heap_delete(node, &sched->heap); +		__tf_fiber_wakeup_heapnode(node); +	} +} + +void tf_scheduler_fiber(void *data) +{ +	struct tf_scheduler *sched = (struct tf_scheduler *) data; + +	do { +		tf_mtime_diff_t timeout; + +		update_time(sched); +		if (!tf_list_empty(&sched->scheduled_q) || +		    !tf_list_empty(&sched->running_q)) { +			timeout = 0; +		} else if (!tf_heap_empty(&sched->heap)) { +			timeout = tf_mtime_diff( +				tf_heap_get_value(&sched->heap), +				tf_scheduler_get_mtime()); +			if (timeout < 0) +				timeout = 0; +		} else { +			timeout = -1; +		} + +		if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT && +		    timeout >= 0) { +			sched->scheduler_time += timeout; +			process_heap(sched); +		} + +		if (tf_fiber_yield() == TF_WAKEUP_KILL) { +			do { +				tf_fiber_put(sched->active_fiber); +				sched->active_fiber = sched; +			} while (__tf_fiber_schedule() == TF_WAKEUP_KILL); +		} +	} while (1); +} + +struct tf_scheduler *tf_scheduler_create(void) +{ +	struct tf_scheduler *sched; + +	sched = __tf_fiber_create(tf_scheduler_fiber, +				  sizeof(struct tf_scheduler)); + +	*sched = (struct tf_scheduler) { +		.scheduled_q = TF_LIST_HEAD_INITIALIZER(sched->scheduled_q), +		.running_q = TF_LIST_HEAD_INITIALIZER(sched->running_q), +	}; + +	return sched; +} + +int  tf_scheduler_enable(struct tf_scheduler *sched) +{ +	struct tf_scheduler *s = sched; + +	if (s == NULL) { +		s = tf_scheduler_create(); +		if (s == NULL) +			return -ENOMEM; +	} +	if (s->main_fiber != NULL) +		return -EBUSY; + +	__tf_fiber_bind_scheduler(s); +	__tf_scheduler = s; +	tf_poll_init(); +	update_time(s); + +	if (sched != NULL) +		tf_scheduler_get(sched); + +	return 0; +} + +void tf_scheduler_disable(void) +{ +	struct tf_scheduler *sched = __tf_scheduler; + +	if (sched == NULL || +	    sched->main_fiber != sched->active_fiber) +		return; + +	/* sleep until no others */ +	while (sched->num_fibers > 1) +		__tf_fiber_schedule(); + +	tf_poll_close(); +	__tf_scheduler = NULL; +	__tf_fiber_release_scheduler(sched); +	tf_heap_destroy(&sched->heap); +	tf_fiber_put(sched); +} @@ -14,8 +14,12 @@  #include <stdio.h>  #include <stdlib.h> +  #ifdef VALGRIND  #include <valgrind/valgrind.h> +#else +#define VALGRIND_STACK_REGISTER(stack_base, size) 0 +#define VALGRIND_STACK_DEREGISTER(stack_id)  #endif  #define STACK_GUARD		0xbad57ac4 @@ -24,10 +28,7 @@ struct tf_uctx {  	int *stack_guard;  	void *alloc;  	void *current_sp; -#ifdef VALGRIND  	unsigned int stack_id; -#endif -	struct tf_fiber fiber;  };  #if defined(__i386__) @@ -86,10 +87,25 @@ static inline void stack_push_ptr(void **stackptr, void *ptr)  } -static inline -struct tf_fiber *tf_uctx_create(tf_fiber_proc fiber_main, int private_size) +static inline void tf_uctx_create_self(struct tf_uctx *uctx) +{ +	static int dummy_guard = STACK_GUARD; + +	*uctx = (struct tf_uctx) { +		.stack_guard = &dummy_guard, +	}; +} + +static inline void * +tf_uctx_create_embedded( +	size_t stack_size, +	size_t private_size, +	off_t uctx_offset, +	void (*stack_frame_main)(void*), off_t main_argument_offset, +	void (*stack_frame_return)(void))  {  	size_t size = TF_STACK_SIZE; +	void *user_data;  	struct tf_uctx *uctx;  	void *stack, *stack_base; @@ -98,46 +114,42 @@ struct tf_fiber *tf_uctx_create(tf_fiber_proc fiber_main, int private_size)  	if (stack_base == NULL)  		return NULL; +	/* Create initial stack frame (cdecl convention) */  	stack = stack_pointer(stack_base, size); -	private_size += sizeof(struct tf_uctx); - -	/* Construct inital frame for call the main function and if it -	 * happens to return, it'll jump back to tf_exit() which kills -	 * the fiber (cdecl calling convetion assumed) */ -	uctx = stack_push(&stack, TF_ALIGN(private_size, 64)); -	stack_push_ptr(&stack, uctx->fiber.data); -	stack_push_ptr(&stack, &tf_exit); -	stack_push_ptr(&stack, fiber_main); -	uctx->current_sp = stack; - -#ifdef VALGRIND -	uctx->stack_id = VALGRIND_STACK_REGISTER(stack_base, size); -#endif -	uctx->alloc = stack_base; -	uctx->stack_guard = stack_guard(stack_base, size); +	user_data = stack_push(&stack, TF_ALIGN(private_size, 64)); +	stack_push_ptr(&stack, NULL); +	stack_push_ptr(&stack, NULL); +	stack_push_ptr(&stack, NULL); +	stack_push_ptr(&stack, NULL); +	stack_push_ptr(&stack, user_data + main_argument_offset); +	stack_push_ptr(&stack, stack_frame_return); +	stack_push_ptr(&stack, stack_frame_main); + +	uctx = user_data + uctx_offset; +	*uctx = (struct tf_uctx) { +		.stack_guard = stack_guard(stack_base, size), +		.alloc = stack_base, +		.current_sp = stack, +		.stack_id = VALGRIND_STACK_REGISTER(stack_base, size), +	};  	*uctx->stack_guard = STACK_GUARD; -	return &uctx->fiber; +	return user_data;  }  static inline -void tf_uctx_destroy(struct tf_fiber *fiber) +void tf_uctx_destroy(struct tf_uctx *uctx)  { -	struct tf_uctx *uctx = container_of(fiber, struct tf_uctx, fiber); -#ifdef VALGRIND -	VALGRIND_STACK_DEREGISTER(uctx->stack_id); -#endif -	free(uctx->alloc); +	if (uctx->alloc != NULL) { +		VALGRIND_STACK_DEREGISTER(uctx->stack_id); +		free(uctx->alloc); +	}  }  static inline -void tf_uctx_transfer(struct tf_fiber *from, struct tf_fiber *to) +void tf_uctx_transfer(struct tf_uctx *from, struct tf_uctx *to)  { - -	struct tf_uctx *ufrom = container_of(from, struct tf_uctx, fiber); -	struct tf_uctx *uto   = container_of(to,   struct tf_uctx, fiber); -  	/* Switch stack pointers */ -	TF_BUG_ON(*ufrom->stack_guard != STACK_GUARD); -	switch_fiber(ufrom, uto); +	TF_BUG_ON(*from->stack_guard != STACK_GUARD); +	switch_fiber(from, to);  } diff --git a/test/httpget.c b/test/httpget.c index fed6c06..9aec886 100644 --- a/test/httpget.c +++ b/test/httpget.c @@ -44,20 +44,16 @@ err:  		ctx->hostname, bytes, -r, strerror(-r));  } -static void init_fiber(void *ptr) +int main(int argc, char **argv)  { -	struct tf_main_ctx *ctx = (struct tf_main_ctx *) ptr;  	struct ctx *c;  	int i; -	for (i = 1; i < ctx->argc; i++) { -		c = tf_fiber_create(ping_fiber, sizeof(struct ctx)); -		c->hostname = ctx->argv[i]; +	tf_scheduler_enable(NULL); +	for (i = 1; i < argc; i++) { +		c = tf_fiber_create(NULL, ping_fiber, sizeof(struct ctx)); +		c->hostname = argv[i];  		tf_fiber_put(c);  	} -} - -int main(int argc, char **argv) -{ -	return tf_main_args(init_fiber, argc, argv); +	tf_scheduler_disable();  } diff --git a/test/read.c b/test/read.c index 6d8306b..3d318a3 100644 --- a/test/read.c +++ b/test/read.c @@ -32,13 +32,10 @@ static void io_fiber(void *ptr)  	tf_close(&fin);  } -static void init_fiber(void *ptr) -{ -	tf_fiber_put(tf_fiber_create(time_fiber, 0)); -	tf_fiber_put(tf_fiber_create(io_fiber, 0)); -} -  int main(int argc, char **argv)  { -	return tf_main(init_fiber); +	tf_scheduler_enable(NULL); +	tf_fiber_put(tf_fiber_create(NULL, time_fiber, 0)); +	tf_fiber_put(tf_fiber_create(NULL, io_fiber, 0)); +	tf_scheduler_disable();  } diff --git a/test/simple1.c b/test/simple1.c index 65a787c..6a05b7a 100644 --- a/test/simple1.c +++ b/test/simple1.c @@ -10,23 +10,20 @@ static void work_fiber(void *ptr)  	struct ctx *c = (struct ctx*) ptr;  	printf("Hello%d.1\n", c->id); -	tf_yield(); +	tf_fiber_yield();  	printf("Hello%d.2\n", c->id);  } -static void init_fiber(void *ptr) +int main(int argc, char **argv)  {  	struct ctx *c;  	int i; +	tf_scheduler_enable(NULL);  	for (i = 0; i < 6; i++) { -		c = tf_fiber_create(work_fiber, sizeof(struct ctx)); +		c = tf_fiber_create(NULL, work_fiber, sizeof(struct ctx));  		c->id = i;  		tf_fiber_put(c);  	} -} - -int main(int argc, char **argv) -{ -	return tf_main(init_fiber); +	tf_scheduler_disable();  } diff --git a/test/sleep.c b/test/sleep.c index 7e39b5c..82399e1 100644 --- a/test/sleep.c +++ b/test/sleep.c @@ -8,24 +8,23 @@ struct ctx {  static void work_fiber(void *ptr)  {  	//struct ctx *c = (struct ctx*) ptr; -  	tf_msleep(rand() % 5000); +	printf("one\n");  	tf_msleep(rand() % 5000); +	printf("two\n");  	tf_msleep(rand() % 5000); +	printf("three\n");  } -static void init_fiber(void *ptr) +int main(int argc, char **argv)  {  	struct ctx *c;  	int i; -	for (i = 0; i < 1000; i++) { -		c = tf_fiber_create(work_fiber, sizeof(struct ctx)); +	tf_scheduler_enable(NULL); +	for (i = 0; i < 100; i++) { +		c = tf_fiber_create(NULL, work_fiber, sizeof(struct ctx));  		tf_fiber_put(c);  	} -} - -int main(int argc, char **argv) -{ -	return tf_main(init_fiber); +	tf_scheduler_disable();  }  | 
