diff options
| author | Timo Teras <timo.teras@iki.fi> | 2009-11-26 09:35:49 +0200 | 
|---|---|---|
| committer | Timo Teras <timo.teras@iki.fi> | 2009-11-26 09:37:24 +0200 | 
| commit | aa530f352b0410150bfe94c821ae32c1378b9d02 (patch) | |
| tree | fb27f277db0c7feaaf12ce43169d3b0b44e95c0f | |
| parent | 4db830052d941d9c6de281bc9a2f6ac212c59ad8 (diff) | |
| download | libtf-aa530f352b0410150bfe94c821ae32c1378b9d02.tar.bz2 libtf-aa530f352b0410150bfe94c821ae32c1378b9d02.tar.xz  | |
libtf: stackable timeouts
instead of having per-function argument, use a push/pop mechanism:
- multiple timers inside fiber use only one heap entry
- easy to chain multiple possibly blocking operations inside one
  timeout block
| -rw-r--r-- | include/libtf/fiber.h | 55 | ||||
| -rw-r--r-- | include/libtf/heap.h | 6 | ||||
| -rw-r--r-- | include/libtf/io.h | 19 | ||||
| -rw-r--r-- | src/fiber.c | 63 | ||||
| -rw-r--r-- | src/io-unix.c | 45 | ||||
| -rw-r--r-- | test/httpget.c | 13 | ||||
| -rw-r--r-- | test/read.c | 2 | 
7 files changed, 132 insertions, 71 deletions
diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h index c3d87c6..ce3745b 100644 --- a/include/libtf/fiber.h +++ b/include/libtf/fiber.h @@ -19,27 +19,22 @@  #include <libtf/list.h>  #include <libtf/heap.h> -#define TF_UCTX_H "uctx-setjmp.h" -  /* Fiber wakeup reasons */  #define TF_WAKEUP_NONE		0  #define TF_WAKEUP_IMMEDIATE	-EAGAIN  #define TF_WAKEUP_KILL		-EINTR -#define TF_WAKEUP_TIMEOUT	-ETIMEDOUT +#define TF_WAKEUP_TIMEOUT	-ETIME +#define TF_WAKEUP_THIS_TIMEOUT	-ETIMEDOUT  #define TF_WAKEUP_FD		-EIO -/* Special timeouts for tf_schedule() */ -#define TF_NO_TIMEOUT			-1 -#define TF_NO_TIMEOUT_CHANGE		-2 +struct tf_poll_data { +	int			epoll_fd; +	int			num_waiters; +};  /* Scheduler */  struct tf_fiber; -struct tf_poll_data { -	int epoll_fd; -	int num_waiters; -}; -  struct tf_scheduler {  	struct tf_list_head	run_q;  	struct tf_heap_head	heap; @@ -47,7 +42,6 @@ struct tf_scheduler {  	int			num_fibers;  	tf_mtime_t		scheduler_time;  	struct tf_poll_data	poll_data; -  };  struct tf_main_ctx { @@ -55,6 +49,11 @@ struct tf_main_ctx {  	char **			argv;  }; +struct tf_timeout { +	tf_mtime_t	saved_timeout; +	unsigned int	timeout_change; +}; +  static inline  struct tf_scheduler *tf_get_scheduler(void)  { @@ -86,18 +85,38 @@ void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size);  void *tf_fiber_get(void *data);  void tf_fiber_put(void *data); -/* Scheduling and fiber management */ -void tf_exit(void) attribute_noreturn; -void tf_kill(void *fiber); +#define tf_timed(func, timeout)						\ +	({								\ +		struct tf_timeout __timeout;				\ +		tf_timeout_push(&__timeout, timeout);			\ +		tf_timeout_pop(&__timeout, (func));			\ +	}) + +//* Scheduling and fiber management */ +void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds); +int __tf_timeout_pop(struct tf_timeout *timeout, int err); + +static inline int tf_timeout_pop(struct tf_timeout *timeout, int err) +{ +	if (unlikely(timeout->timeout_change)) +		return __tf_timeout_pop(timeout, err); +	return err; +} -int  tf_schedule(int milliseconds); +int  tf_schedule(void);  void tf_wakeup(struct tf_fiber *fiber, int wakeup_type); +void tf_exit(void) attribute_noreturn; +void tf_kill(void *fiber);  int  tf_yield(void);  static inline -int tf_msleep(int milliseconds) +int tf_msleep(tf_mtime_diff_t milliseconds)  { -	return tf_schedule(milliseconds); +	int r; +	r = tf_timed(tf_schedule(), milliseconds); +	if (r == TF_WAKEUP_THIS_TIMEOUT) +		r = 0; +	return r;  }  #endif diff --git a/include/libtf/heap.h b/include/libtf/heap.h index 24f4767..a68e01d 100644 --- a/include/libtf/heap.h +++ b/include/libtf/heap.h @@ -61,6 +61,12 @@ int tf_heap_empty(struct tf_heap_head *head)  }  static inline +int tf_heap_node_active(struct tf_heap_node *node) +{ +	return node->index != 0; +} + +static inline  int tf_heap_prealloc(struct tf_heap_head *head, uint32_t size)  {  	if (unlikely(head->num_items + TF_HEAP_ITEM0 >= head->allocated)) diff --git a/include/libtf/io.h b/include/libtf/io.h index 38dd541..1f0b793 100644 --- a/include/libtf/io.h +++ b/include/libtf/io.h @@ -52,27 +52,26 @@ void tf_poll_close(void);  int tf_open_fd(struct tf_fd *fd, int kfd, int flags);  int tf_open(struct tf_fd *fd, const char *pathname, int flags);  int tf_close(struct tf_fd *fd); -ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout); -ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout); -int tf_read_fully(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout); -int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout); +ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count); +ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count); +int tf_read_fully(struct tf_fd *fd, void *buf, size_t count); +int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count);  int tf_socket(struct tf_fd *fd, int domain, int type, int protocol);  int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr);  int tf_listen(struct tf_fd *fd, int backlog);  int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, -	      struct tf_sockaddr *from, tf_mtime_diff_t timeout); -int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *addr, tf_mtime_diff_t timeout); +	      struct tf_sockaddr *from); +int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *addr);  ssize_t tf_recvmsg(struct tf_fd *fd,  		   struct tf_sockaddr *from, struct tf_sockaddr *to, -		   void *buf, size_t count, tf_mtime_diff_t timeout); +		   void *buf, size_t count);  ssize_t tf_sendmsg(struct tf_fd *fd,  		   struct tf_sockaddr *from, const struct tf_sockaddr *to, -		   const void *buf, size_t count, tf_mtime_diff_t timeout); +		   const void *buf, size_t count); -int tf_query_dns(const char *name, int num_res, struct tf_sockaddr *res, -		 tf_mtime_diff_t timeout); +int tf_query_dns(const char *name, int num_res, struct tf_sockaddr *res);  #endif diff --git a/src/fiber.c b/src/fiber.c index b678842..3f8bb15 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -16,15 +16,20 @@  #include <libtf/fiber.h>  #include <libtf/io.h> +#define TF_TIMEOUT_CHANGE_NEEDED			1 +#define TF_TIMEOUT_CHANGE_NEW_VALUE			2 +  struct tf_fiber {  	unsigned int		ref_count;  	int			wakeup_type; +	unsigned int		timeout_change; +	tf_mtime_t		timeout;  	struct tf_list_node	queue_node;  	struct tf_heap_node	heap_node;  	char			data[TF_EMPTY_ARRAY];  }; -#include TF_UCTX_H +#include "uctx-setjmp.h"  /* FIXME: should be in thread local storage */  struct tf_scheduler *__tf_scheduler; @@ -108,6 +113,8 @@ static void process_heap(struct tf_scheduler *sched)  	       tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) {  		node = tf_heap_get_node(&sched->heap);  		f = container_of(node, struct tf_fiber, heap_node); +		if (f->wakeup_type == TF_WAKEUP_NONE) +			f->wakeup_type = TF_WAKEUP_TIMEOUT;  		run_fiber(sched, f);  	}  } @@ -170,18 +177,58 @@ int tf_main_args(tf_fiber_proc main_fiber, int argc, char **argv)  	return 0;  } -int tf_schedule(tf_mtime_diff_t milliseconds) +void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds) +{ +	struct tf_fiber *f = tf_get_fiber(); +	tf_mtime_t abs = tf_mtime() + milliseconds; +	int active; + +	if (f->timeout_change) +		active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE); +	else +		active = tf_heap_node_active(&f->heap_node); + +	if (!active || tf_mtime_diff(abs, f->timeout) < 0) { +		/* Save previous timeout */ +		timeout->saved_timeout = f->timeout; +		timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED; +		if (active) +			timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE; + +		/* Make new timeout pending */ +		f->timeout = abs; +		f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED +				  | TF_TIMEOUT_CHANGE_NEW_VALUE; +	} else { +		timeout->timeout_change = 0; +	} +} + +int __tf_timeout_pop(struct tf_timeout *timeout, int err) +{ +	struct tf_fiber *f = tf_get_fiber(); + +	f->timeout = timeout->saved_timeout; +	f->timeout_change = timeout->timeout_change; +	if (err == TF_WAKEUP_TIMEOUT) +		err = TF_WAKEUP_THIS_TIMEOUT; +	return err; +} + +int tf_schedule(void)  {  	struct tf_scheduler *sched = tf_get_scheduler();  	struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data);  	struct tf_fiber *f = sched->active_fiber; +	if (unlikely(f->timeout_change)) { +		if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) +			tf_heap_change(&f->heap_node, &sched->heap, f->timeout); +		else +			tf_heap_delete(&f->heap_node, &sched->heap); +		f->timeout_change = 0; +	}  	f->wakeup_type = TF_WAKEUP_NONE; -	if (milliseconds == TF_NO_TIMEOUT) -		tf_heap_delete(&f->heap_node, &sched->heap); -	else if (milliseconds != TF_NO_TIMEOUT_CHANGE) -		tf_heap_change(&f->heap_node, &sched->heap, -			       tf_mtime() + milliseconds);  	return tf_uctx_transfer(f, schedf, TF_WAKEUP_IMMEDIATE);  } @@ -217,6 +264,6 @@ int tf_yield(void)  	struct tf_fiber *f = sched->active_fiber;  	tf_list_add_tail(&f->queue_node, &sched->run_q); -	return tf_schedule(TF_NO_TIMEOUT); +	return tf_schedule();  } diff --git a/src/io-unix.c b/src/io-unix.c index 8ed8e42..ea65a76 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -90,8 +90,7 @@ int tf_close(struct tf_fd *fd)  	return 0;  } -int tf_read_fully(struct tf_fd *fd, void *buf, size_t count, -		  tf_mtime_diff_t timeout) +int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)  {  	ssize_t n;  	int r; @@ -120,16 +119,14 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count,  				continue;  		} -		r = tf_schedule(timeout); -		timeout = TF_NO_TIMEOUT_CHANGE; +		r = tf_schedule();  	} while (r == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd);  	return -r;  } -int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count, -		   tf_mtime_diff_t timeout) +int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)  {  	ssize_t n;  	int r; @@ -155,15 +152,14 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count,  				continue;  		} -		r = tf_schedule(timeout); -		timeout = TF_NO_TIMEOUT_CHANGE; +		r = tf_schedule();  	} while (r == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd);  	return -r;  } -ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) +ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)  {  	ssize_t n; @@ -178,16 +174,14 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeo  			n = -errno;  			break;  		} -		n = tf_schedule(timeout); -		timeout = TF_NO_TIMEOUT_CHANGE; +		n = tf_schedule();  	} while (n == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd);  	return n;  } -ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count, -		 tf_mtime_diff_t timeout) +ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)  {  	ssize_t n; @@ -202,8 +196,7 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count,  			n = -errno;  			break;  		} -		n = tf_schedule(timeout); -		timeout = TF_NO_TIMEOUT_CHANGE; +		n = tf_schedule();  	} while (n == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd); @@ -250,7 +243,7 @@ int tf_listen(struct tf_fd *fd, int backlog)  }  int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, -	      struct tf_sockaddr *from, tf_mtime_diff_t timeout) +	      struct tf_sockaddr *from)  {  	int r, tfdf;  	struct sockaddr *addr = NULL; @@ -277,8 +270,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,  			tf_fd_unmonitor(listen_fd);  			return -errno;  		} -		r = tf_schedule(timeout); -		timeout = TF_NO_TIMEOUT_CHANGE; +		r = tf_schedule();  	} while (r == TF_WAKEUP_FD);  	tf_fd_unmonitor(listen_fd);  	if (r < 0) @@ -287,8 +279,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,  	return tf_open_fd(child_fd, r, tfdf);  } -int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to, -	       tf_mtime_diff_t timeout) +int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)  {  	socklen_t l = sizeof(int);  	int r, err; @@ -302,7 +293,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to,  	/* Wait for socket to become readable */  	tf_fd_monitor(fd, EPOLLOUT); -	r = tf_schedule(timeout); +	r = tf_schedule();  	tf_fd_unmonitor(fd);  	if (r != TF_WAKEUP_FD)  		return r; @@ -316,8 +307,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to,  ssize_t tf_recvmsg(struct tf_fd *fd,  		   struct tf_sockaddr *from,  		   struct tf_sockaddr *to, -		   void *buf, size_t len, -		   tf_mtime_diff_t timeout) +		   void *buf, size_t len)  {  	struct iovec iov;  	struct msghdr msg; @@ -347,8 +337,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,  			r = -errno;  			break;  		} -		r = tf_schedule(timeout); -		timeout = TF_NO_TIMEOUT_CHANGE; +		r = tf_schedule();  	} while (r == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd); @@ -372,8 +361,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,  ssize_t tf_sendmsg(struct tf_fd *fd,  		   struct tf_sockaddr *from,  		   const struct tf_sockaddr *to, -		   const void *buf, size_t len, -		   tf_mtime_diff_t timeout) +		   const void *buf, size_t len)  {  	struct msghdr msg;  	struct iovec iov; @@ -412,8 +400,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd,  			r = -errno;  			break;  		} -		r = tf_schedule(timeout); -		timeout = TF_NO_TIMEOUT_CHANGE; +		r = tf_schedule();  	} while (r == TF_WAKEUP_FD);  	tf_fd_unmonitor(fd); diff --git a/test/httpget.c b/test/httpget.c index c1e37a3..fed6c06 100644 --- a/test/httpget.c +++ b/test/httpget.c @@ -15,7 +15,7 @@ static void ping_fiber(void *ptr)  	struct tf_sockaddr host;  	struct tf_fd fd;  	char buf[128]; -	int bytes = 0, r = 0; +	int bytes = 0, r;  	const char *req = "GET / HTTP/1.0\r\n\r\n";  	printf("Lookup %s\n", ctx->hostname); @@ -23,16 +23,19 @@ static void ping_fiber(void *ptr)  	host.u.in.sin_addr.s_addr = inet_addr(ctx->hostname);  	host.u.in.sin_port = htons(80); -	if (tf_socket(&fd, AF_INET, SOCK_STREAM, 0) < 0) +	r = tf_socket(&fd, AF_INET, SOCK_STREAM, 0); +	if (r < 0)  		goto err; -	if ((r = tf_connect(&fd, &host, 10000)) < 0) +	r = tf_timed(tf_connect(&fd, &host), 10000); +	if (r < 0)  		goto err_close; -	if ((r = tf_write_fully(&fd, req, strlen(req), 10000)) < 0) +	r = tf_write_fully(&fd, req, strlen(req)); +	if (r < 0)  		goto err_close; -	while ((r = tf_read(&fd, buf, sizeof(buf), 10000)) > 0) +	while ((r = tf_read(&fd, buf, sizeof(buf))) > 0)  		bytes += r;  err_close:  	tf_close(&fd); diff --git a/test/read.c b/test/read.c index 97f25fd..6d8306b 100644 --- a/test/read.c +++ b/test/read.c @@ -24,7 +24,7 @@ static void io_fiber(void *ptr)  	tf_open_fd(&fin, STDIN_FILENO, TF_FD_STREAM_ORIENTED);  	while (1) { -		if (tf_read(&fin, data, sizeof(data), TF_NO_TIMEOUT) < 0) +		if (tf_read_fully(&fin, data, sizeof(data)) < 0)  			break;  		printf("Read: %8.8s\n", data);  	}  | 
