diff options
| -rw-r--r-- | include/libtf/defines.h | 5 | ||||
| -rw-r--r-- | include/libtf/fiber.h | 45 | ||||
| -rw-r--r-- | include/libtf/io.h | 70 | ||||
| -rw-r--r-- | include/libtf/tf.h | 3 | ||||
| -rw-r--r-- | src/TFbuild | 2 | ||||
| -rw-r--r-- | src/fiber.c | 85 | ||||
| -rw-r--r-- | src/heap.c | 3 | ||||
| -rw-r--r-- | src/io-epoll.c | 107 | ||||
| -rw-r--r-- | src/io-unix.c | 131 | ||||
| -rw-r--r-- | test/TFbuild | 2 | ||||
| -rw-r--r-- | test/read.c | 44 | 
11 files changed, 453 insertions, 44 deletions
diff --git a/include/libtf/defines.h b/include/libtf/defines.h index b1d2aa9..144ad63 100644 --- a/include/libtf/defines.h +++ b/include/libtf/defines.h @@ -53,6 +53,9 @@  #define attribute_never_inline		__attribute__((noinline))  #define attribute_weak_function		__attribute__((weak)) +#define attribute_noreturn		__attribute__((noreturn)) +#define attribute_warn_unused_result	__attribute__((warn_unused_result)) +#define attribute_deprecated		__attribute__((deprecated))  #define TF_BUG_ON(cond) if (unlikely(cond)) { \  	fprintf(stderr, "BUG: failure at %s:%d/%s(): %s!\n",	\ @@ -69,6 +72,8 @@  #endif  /* Monotonic time */ +#define TF_INFINITE		-1 +  typedef uint32_t tf_mtime_t;  typedef int32_t tf_mtime_diff_t; diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h index 09d5ef1..91c0b3b 100644 --- a/include/libtf/fiber.h +++ b/include/libtf/fiber.h @@ -1,4 +1,4 @@ -/* tf.h - libtf main include +/* fiber.h - libtf fiber scheduler header   *   * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>   * All rights reserved. @@ -19,16 +19,31 @@  #include <libtf/list.h>  #include <libtf/heap.h> +#define TF_UCTX_H "uctx-setjmp.h" + +/* Fiber wakeup reasons */ +#define TF_WAKEUP_NONE		0 +#define TF_WAKEUP_IMMEDIATE	EAGAIN +#define TF_WAKEUP_KILL		EINTR +#define TF_WAKEUP_TIMEOUT	ETIMEDOUT +#define TF_WAKEUP_FD		EIO +  /* Scheduler */ +struct tf_fiber; + +struct tf_poll_data { +	int epoll_fd; +	int num_waiters; +}; +  struct tf_scheduler {  	struct tf_list_head	run_q; -	struct tf_list_head	sleep_q;  	struct tf_heap_head	heap; -  	struct tf_fiber *	active_fiber;  	int			num_fibers; -  	tf_mtime_t		scheduler_time; +	struct tf_poll_data	poll_data; +  };  static inline @@ -39,6 +54,12 @@ struct tf_scheduler *tf_get_scheduler(void)  }  static inline +struct tf_fiber *tf_get_fiber(void) +{ +	return tf_get_scheduler()->active_fiber; +} + +static inline  tf_mtime_t tf_mtime(void)  {  	return tf_get_scheduler()->scheduler_time; @@ -53,18 +74,14 @@ void *tf_fiber_get(void *data);  void tf_fiber_put(void *data);  /* Scheduling and fiber management */ -int tf_schedule(int err); -int tf_msleep(int milliseconds); +void tf_exit(void) attribute_noreturn;  void tf_kill(void *fiber); -static inline int tf_yield(void) -{ -	return tf_schedule(EAGAIN); -} +int  tf_schedule(int wakeup_type); +int  tf_schedule_timeout(int milliseconds); +void tf_wakeup(struct tf_fiber *fiber, int wakeup_type); -static inline int tf_exit(void) -{ -	return tf_schedule(EFAULT); -} +int  tf_yield(void); +int  tf_msleep(int milliseconds);  #endif diff --git a/include/libtf/io.h b/include/libtf/io.h new file mode 100644 index 0000000..87a6c90 --- /dev/null +++ b/include/libtf/io.h @@ -0,0 +1,70 @@ +/* tf.h - libtf io header + * + * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#ifndef TF_IO_H +#define TF_IO_H + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/in.h> + +#include <libtf/defines.h> + +struct tf_fiber; + +struct tf_sockaddr { +	union { +		struct sockaddr		addr; +		struct sockaddr_in	in; +		struct sockaddr_in6	in6; +	}; +}; + +struct tf_fd { +	int fd; +	unsigned int flags; +	/* Single waiter -- would be relatively trivial to modify to allow +	 * multiple waiters, if someone actually needs it */ +	unsigned int events; +	struct tf_fiber *waiting_fiber; +}; + +void tf_poll_init(void); +int  tf_poll(tf_mtime_diff_t timeout); +void tf_poll_close(void); + +int tf_open(struct tf_fd *fd, const char *pathname, int flags); +int tf_open_fd(struct tf_fd *fd, int kfd); +int tf_close(struct tf_fd *fd); +int tf_read(struct tf_fd *fd, void *buf, size_t count, int timeout); +int tf_write(struct tf_fd *fd, const void *buf, size_t count, int timeout); + +int tf_socket(struct tf_fd *fd, int domain, int type, int protocol); +int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr); +int tf_listen(struct tf_fd *fd, int backlog); +int tf_accept(struct tf_fd *fd); +int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *addr, int timeout); + +ssize_t tf_recv(struct tf_fd *fd, void *buf, size_t count, int timeout); +ssize_t tf_send(struct tf_fd *fd, const void *buf, size_t count, int timeout); +ssize_t tf_recvmsg(struct tf_fd *fd, +		   struct tf_sockaddr *from, struct tf_sockaddr *to, +		   void *buf, size_t count, int timeout); +ssize_t tf_sendmsg(struct tf_fd *fd, +		   struct tf_sockaddr *from, const struct tf_sockaddr *to, +		   const void *buf, size_t count, int timeout); + +int tf_query_dns(const char *name, int num_res, struct tf_sockaddr *res, +		 int timeout); + +#endif diff --git a/include/libtf/tf.h b/include/libtf/tf.h index 7ff7b25..7a089ff 100644 --- a/include/libtf/tf.h +++ b/include/libtf/tf.h @@ -13,8 +13,7 @@  #ifndef TF_H  #define TF_H -#define TF_UCTX_H "uctx-setjmp.h" -  #include <libtf/fiber.h> +#include <libtf/io.h>  #endif diff --git a/src/TFbuild b/src/TFbuild index accae6d..9b40443 100644 --- a/src/TFbuild +++ b/src/TFbuild @@ -1,5 +1,5 @@  libs-y			+= libtf -libtf-objs-y		+= fiber.o heap.o +libtf-objs-y		+= fiber.o heap.o io-epoll.o  CFLAGS_heap.c		+= -funroll-all-loops diff --git a/src/fiber.c b/src/fiber.c index 72da440..15c533a 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -13,11 +13,12 @@  #include <time.h>  #include <errno.h>  #include <unistd.h> -#include <libtf/tf.h> -#include <libtf/heap.h> +#include <libtf/fiber.h> +#include <libtf/io.h>  struct tf_fiber {  	unsigned int		ref_count; +	int			wakeup_type;  	struct tf_list_node	queue_node;  	struct tf_heap_node	heap_node;  	char			data[TF_EMPTY_ARRAY]; @@ -85,16 +86,13 @@ static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f)  	struct tf_fiber *schedf = container_of((void*) tf_get_scheduler(), struct tf_fiber, data);  	sched->active_fiber = f; -	switch (tf_uctx_transfer(schedf, f, 1)) { -	case EFAULT: /* Fiber is dead */ +	switch (tf_uctx_transfer(schedf, f, f->wakeup_type)) { +	case TF_WAKEUP_KILL:  		tf_fiber_put(f->data);  		sched->num_fibers--;  		break; -	case EAGAIN: /* Yielded, reshedule */ -		tf_list_add_tail(&f->queue_node, &sched->run_q); -		break; -	case EIO: /* Blocked, in sleep */ -		tf_list_add_tail(&f->queue_node, &sched->sleep_q); +	case TF_WAKEUP_IMMEDIATE: +	case TF_WAKEUP_TIMEOUT:  		break;  	default:  		TF_BUG_ON("bad scheduler call from fiber"); @@ -108,7 +106,7 @@ static void process_heap(struct tf_scheduler *sched)  	tf_mtime_t now = tf_mtime();  	while (!tf_heap_empty(&sched->heap) && -	       tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) > 0) { +	       tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) {  		node = tf_heap_get_node(&sched->heap);  		f = container_of(node, struct tf_fiber, heap_node);  		run_fiber(sched, f); @@ -135,9 +133,9 @@ int tf_main(tf_fiber_proc main_fiber)  	ctx->stack_guard = &stack_guard;  	*sched = (struct tf_scheduler){  		.run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q), -		.sleep_q = TF_LIST_HEAD_INITIALIZER(sched->sleep_q),  	};  	__tf_scheduler = sched; +	tf_poll_init();  	update_time(sched);  	tf_fiber_put(tf_fiber_create(main_fiber, 0));  	do { @@ -148,47 +146,86 @@ int tf_main(tf_fiber_proc main_fiber)  			timeout = 0;  		} else if (!tf_heap_empty(&sched->heap)) {  			timeout = tf_mtime_diff(tf_heap_get_value(&sched->heap), -						sched->scheduler_time); +						tf_mtime());  			if (timeout < 0)  				timeout = 0;  		} else  			timeout = -1; -		if (timeout > 0) -			usleep(timeout * 1000); - -		process_heap(sched); +		if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT) { +			sched->scheduler_time += timeout; +			process_heap(sched); +		}  		process_runq(sched);  	} while (likely(sched->num_fibers)); +	tf_poll_close();  	__tf_scheduler = NULL;  	return 0;  } -int tf_schedule(int err) +int tf_schedule(int wakeup)  {  	struct tf_scheduler *sched = tf_get_scheduler();  	struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data);  	struct tf_fiber *f = sched->active_fiber; -	int r; -	r = tf_uctx_transfer(f, schedf, err); -	if (r == 1) -		return 0; +	if (wakeup != TF_WAKEUP_TIMEOUT) +		tf_heap_delete(&f->heap_node, &sched->heap); +	f->wakeup_type = TF_WAKEUP_NONE; -	return r; +	return tf_uctx_transfer(f, schedf, wakeup);  } -int tf_msleep(int milliseconds) +int tf_schedule_timeout(int milliseconds)  {  	struct tf_scheduler *sched = tf_get_scheduler();  	struct tf_fiber *f = sched->active_fiber; +	if (milliseconds <= 0) { +		tf_heap_delete(&f->heap_node, &sched->heap); +		return TF_WAKEUP_IMMEDIATE; +	}  	tf_heap_change(&f->heap_node, &sched->heap, tf_mtime() + milliseconds); +	return TF_WAKEUP_TIMEOUT; +} + +void tf_wakeup(struct tf_fiber *fiber, int wakeup_type) +{ +	struct tf_scheduler *sched = tf_get_scheduler(); + +	if (fiber->wakeup_type == TF_WAKEUP_NONE) { +		fiber->wakeup_type = wakeup_type; +		tf_list_add_tail(&fiber->queue_node, &sched->run_q); +	} +} + +void tf_exit(void) +{ +	struct tf_scheduler *sched = tf_get_scheduler(); +	struct tf_fiber *f = sched->active_fiber; -	return tf_schedule(EIO); +	tf_heap_delete(&f->heap_node, &sched->heap); +	tf_schedule(TF_WAKEUP_KILL); +	TF_BUG_ON(1);  }  void tf_kill(void *fiber)  {  } + +int tf_yield(void) +{ +	struct tf_scheduler *sched = tf_get_scheduler(); +	struct tf_fiber *f = sched->active_fiber; + +	tf_list_add_tail(&f->queue_node, &sched->run_q); +	return tf_schedule(TF_WAKEUP_IMMEDIATE); +} + +int tf_msleep(int milliseconds) +{ +	tf_schedule_timeout(milliseconds); +	return tf_schedule(TF_WAKEUP_TIMEOUT); +} + @@ -1,4 +1,4 @@ -/* heap.c - a linked heap implementation +/* heap.c - an array based d-ary heap implementation   *   * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi>   * All rights reserved. @@ -151,7 +151,6 @@ void tf_heap_delete(struct tf_heap_node *node, struct tf_heap_head *head)  		head->item[index] = head->item[head->num_items+TF_HEAP_ITEM0];  		tf_heap_heapify(head, index);  	} -	head->item[head->num_items+TF_HEAP_ITEM0].ptr = NULL;  	node->index = 0;  } diff --git a/src/io-epoll.c b/src/io-epoll.c new file mode 100644 index 0000000..56d0743 --- /dev/null +++ b/src/io-epoll.c @@ -0,0 +1,107 @@ +/* io-epoll.c - epoll(7) based file descriptor monitoring + * + * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +#include <errno.h> +#include <fcntl.h> +#include <unistd.h> +#include <sys/epoll.h> + +#include <libtf/io.h> +#include <libtf/fiber.h> + +#define TF_FD_AUTOCLOSE			1 +#define TF_FD_RESTORE_BLOCKING		2 +#define TF_FD_STREAM_ORIENTED		4 + +static int tf_fd_init(struct tf_fd *fd, int kfd, int flags) +{ +	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; +	struct epoll_event ev; +	int r; + +	ev.events = EPOLLIN | EPOLLOUT | EPOLLET; +	ev.data.ptr = fd; +	r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, kfd, &ev); +	if (r < 0) { +		TF_BUG_ON(errno == EEXIST); +		return -errno; +	} + +	fd->fd = kfd; +	fd->flags = flags; +	fd->waiting_fiber = NULL; + +	return 0; +} + +static void tf_fd_wait(struct tf_fd *fd, int events) +{ +	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + +	TF_BUG_ON(fd->waiting_fiber != NULL); +	fd->events = events | EPOLLERR | EPOLLHUP; +	fd->waiting_fiber = tf_get_fiber(); +	pd->num_waiters++; +} + +static void tf_fd_release(struct tf_fd *fd) +{ +	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + +	fd->waiting_fiber = NULL; +	fd->events = 0; +	pd->num_waiters--; +} + +void tf_poll_init(void) +{ +	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + +	pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC); +	pd->num_waiters = 0; +	TF_BUG_ON(pd->epoll_fd < 0); +} + +int tf_poll(tf_mtime_diff_t timeout) +{ +	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; +	struct epoll_event events[64]; +	struct tf_fd *fd; +	int ret = (timeout == 0) ? TF_WAKEUP_TIMEOUT : TF_WAKEUP_FD; +	int r, i; + +	if (timeout == 0 && pd->num_waiters == 0) +		return ret; + +	do { +		r = epoll_wait(pd->epoll_fd, events, array_size(events), timeout); +		for (i = 0; i < r; i++) { +			fd = (struct tf_fd *) events[i].data.ptr; +			if (likely(fd->events & events[i].events)) +				tf_wakeup(fd->waiting_fiber, TF_WAKEUP_FD); +		} +		if (timeout != 0) +			ret = TF_WAKEUP_FD; +		timeout = 0; +	} while (unlikely(r == array_size(events))); + +	return ret; +} + +void tf_poll_close(void) +{ +	struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + +	close(pd->epoll_fd); +} + +#include "io-unix.c" diff --git a/src/io-unix.c b/src/io-unix.c new file mode 100644 index 0000000..d333122 --- /dev/null +++ b/src/io-unix.c @@ -0,0 +1,131 @@ +/* io-unix.c - non-blocking io primitives for unix + * + * Copyright (C) 2009 Timo Teräs <timo.teras@iki.fi> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 or later as + * published by the Free Software Foundation. + * + * See http://www.gnu.org/ for details. + */ + +int tf_open(struct tf_fd *fd, const char *pathname, int flags) +{ +	int kfd, r; + +	kfd = open(pathname, flags | O_CLOEXEC | O_NONBLOCK); +	if (unlikely(kfd < 0)) +		return -errno; + +	r = tf_fd_init(fd, kfd, TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED); +	if (r < 0) { +		close(kfd); +		return r; +	} +	return 0; +} + +int tf_open_fd(struct tf_fd *fd, int kfd) +{ +	int mode, flags = 0; + +	mode = fcntl(kfd, F_GETFL, 0); +	if (!(mode & O_NONBLOCK)) { +		fcntl(fd->fd, F_SETFL, mode | O_NONBLOCK); +		flags |= TF_FD_RESTORE_BLOCKING; +	} + +	return tf_fd_init(fd, kfd, TF_FD_STREAM_ORIENTED | flags); +} + +int tf_close(struct tf_fd *fd) +{ +	int r; + +	if (fd->flags & TF_FD_RESTORE_BLOCKING) { +		fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK); +	} +	if (fd->flags & TF_FD_AUTOCLOSE) { +		r = close(fd->fd); +		if (unlikely(r == -1)) +			return -errno; +	} +	return 0; +} + +int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) +{ +	ssize_t n; +	int r, mode; + +	mode = tf_schedule_timeout(timeout); +	tf_fd_wait(fd, EPOLLIN); +	do { +		n = read(fd->fd, buf, count); +		if (n == count) { +			r = 0; +			break; +		} +		if (n < 0) { +			if (errno == EINTR) +				continue; +			if (errno != EAGAIN) { +				r = errno; +				break; +			} +		} else if (n == 0) { +			r = EIO; +			break; +		} else { +			buf += n; +			count -= n; +			if (!(fd->flags & TF_FD_STREAM_ORIENTED)) +				continue; +		} + +		r = tf_schedule(mode); +		if (r != TF_WAKEUP_FD) +			break; +	} while (1); +	tf_fd_release(fd); + +	return -r; +} + +int tf_write(struct tf_fd *fd, const void *buf, size_t count, +	     tf_mtime_diff_t timeout) +{ +	ssize_t n; +	int r, mode; + +	mode = tf_schedule_timeout(timeout); +	tf_fd_wait(fd, EPOLLOUT); +	do { +		n = write(fd->fd, buf, count); +		if (n == count) { +			r = 0; +			break; +		} +		if (n < 0) { +			if (errno == EINTR) +				continue; +			if (errno != EAGAIN) { +				r = errno; +				break; +			} +		} else { +			buf += n; +			count -= n; +			if (!(fd->flags & TF_FD_STREAM_ORIENTED)) +				continue; +		} + +		r = tf_schedule(mode); +		if (r != TF_WAKEUP_FD) +			break; +	} while (1); +	tf_fd_release(fd); + +	return -r; +} diff --git a/test/TFbuild b/test/TFbuild index d2648ed..430e132 100644 --- a/test/TFbuild +++ b/test/TFbuild @@ -1,3 +1,3 @@ -progs-$(TEST)		+= simple1 sleep +progs-$(TEST)		+= simple1 sleep read  LIBS			+= $(objtree)src/libtf.a diff --git a/test/read.c b/test/read.c new file mode 100644 index 0000000..1921609 --- /dev/null +++ b/test/read.c @@ -0,0 +1,44 @@ +/* Read from stdin and have an active fiber in the background. + * Stdin needs to be redirected to FIFO or similar; mixing + * console and non-blocking I/O is not a good idea. + */ + +#include <libtf/tf.h> +#include <stdio.h> +#include <unistd.h> + +static void time_fiber(void *ptr) +{ +	while (1) { +		printf("Tick\n"); +		tf_msleep(1000); +		printf("Tack\n"); +		tf_msleep(1000); +	} +} + +static void io_fiber(void *ptr) +{ +	char data[8]; +	struct tf_fd fin; + +	tf_open_fd(&fin, STDIN_FILENO); +	while (1) { +		if (tf_read(&fin, data, sizeof(data), TF_INFINITE) < 0) +			break; +		printf("Read: %8.8s\n", data); +	} +	printf("Exiting io fiber\n"); +	tf_close(&fin); +} + +static void init_fiber(void *ptr) +{ +	tf_fiber_put(tf_fiber_create(time_fiber, 0)); +	tf_fiber_put(tf_fiber_create(io_fiber, 0)); +} + +int main(int argc, char **argv) +{ +	return tf_main(init_fiber); +}  | 
