diff options
| -rw-r--r-- | include/libtf/io.h | 15 | ||||
| -rw-r--r-- | include/libtf/scheduler.h | 3 | ||||
| -rw-r--r-- | src/TFbuild | 1 | ||||
| -rw-r--r-- | src/io-epoll.c | 120 | ||||
| -rw-r--r-- | src/io-unix.c | 58 | ||||
| -rw-r--r-- | src/scheduler.c | 8 | 
6 files changed, 126 insertions, 79 deletions
diff --git a/include/libtf/io.h b/include/libtf/io.h index 1f37d81..0d34421 100644 --- a/include/libtf/io.h +++ b/include/libtf/io.h @@ -43,9 +43,18 @@ struct tf_fd {  	void *waiting_fiber;  }; -void tf_poll_init(void); -int  tf_poll(tf_mtime_diff_t timeout); -void tf_poll_close(void); +#define TF_POLL_READ	1 +#define TF_POLL_WRITE	2 + +struct tf_poll_hooks { +	void (*init)(void); +	int  (*poll)(tf_mtime_diff_t timeout); +	void (*close)(void); +	int  (*fd_created)(struct tf_fd *fd); +	int  (*fd_destroyed)(struct tf_fd *fd); +	void (*fd_monitor)(struct tf_fd *fd, int events); +	void (*fd_unmonitor)(struct tf_fd *fd); +};  int tf_open_fd(struct tf_fd *fd, int kfd, int flags);  int tf_open(struct tf_fd *fd, const char *pathname, int flags); diff --git a/include/libtf/scheduler.h b/include/libtf/scheduler.h index cc8db70..db5a823 100644 --- a/include/libtf/scheduler.h +++ b/include/libtf/scheduler.h @@ -18,6 +18,8 @@  #include <libtf/heap.h>  #include <libtf/fiber.h> +struct tf_poll_hooks; +  struct tf_scheduler {  	struct tf_list_head	scheduled_q;  	struct tf_list_head	running_q; @@ -26,6 +28,7 @@ struct tf_scheduler {  	void *			main_fiber;  	int			num_fibers;  	tf_mtime_t		scheduler_time; +	struct tf_poll_hooks *	poller;  	unsigned long		poll_data[2];  }; diff --git a/src/TFbuild b/src/TFbuild index 08cb696..ef1c5ae 100644 --- a/src/TFbuild +++ b/src/TFbuild @@ -2,5 +2,6 @@ libs-y			+= libtf  libtf-objs-y		+= fiber.o scheduler.o heap.o  libtf-objs-$(OS_LINUX)	+= io-epoll.o +libtf-objs-$(OS_UNIX)	+= io-unix.o  CFLAGS_heap.c		+= -funroll-all-loops diff --git a/src/io-epoll.c b/src/io-epoll.c index 8ac230f..32aa090 100644 --- a/src/io-epoll.c +++ b/src/io-epoll.c @@ -14,7 +14,6 @@  #include <fcntl.h>  #include <unistd.h>  #include <sys/epoll.h> -#include <sys/socket.h>  #include <libtf/io.h>  #include <libtf/scheduler.h> @@ -24,71 +23,29 @@ struct tf_poll_data {  	int			num_waiters;  }; -struct tf_poll_data *tf_epoll_get_data(void) +static struct tf_poll_data *tf_epoll_get_data(void)  {  	struct tf_scheduler *sched = tf_scheduler_get_current();  	TF_BUILD_BUG_ON(sizeof(struct tf_poll_data) > sizeof(sched->poll_data));  	return (struct tf_poll_data *) &sched->poll_data;  } -static int tf_fd_created(struct tf_fd *fd) +static void tf_epoll_init(void)  {  	struct tf_poll_data *pd = tf_epoll_get_data(); -	struct epoll_event ev; -	int r; -	ev.events = EPOLLIN | EPOLLOUT | EPOLLET; -	ev.data.ptr = fd; -	r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); -	if (unlikely(r < 0)) { -		TF_BUG_ON(errno == EEXIST); -		r = -errno; -		return r; -	} - -	return 0; -} - -static int tf_fd_destroyed(struct tf_fd *fd) -{ -	struct tf_poll_data *pd = tf_epoll_get_data(); - -	if (fd->flags & TF_FD_AUTOCLOSE) -		return 0; - -	epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); -	return 0; -} - -static void tf_fd_monitor(struct tf_fd *fd, int events) -{ -	struct tf_poll_data *pd = tf_epoll_get_data(); - -	TF_BUG_ON(fd->waiting_fiber != NULL); -	fd->events = events | EPOLLERR | EPOLLHUP; -	fd->waiting_fiber = tf_scheduler_get_current()->active_fiber; -	pd->num_waiters++; -} - -static void tf_fd_unmonitor(struct tf_fd *fd) -{ -	struct tf_poll_data *pd = tf_epoll_get_data(); - -	fd->waiting_fiber = NULL; -	fd->events = 0; -	pd->num_waiters--; +	pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC); +	pd->num_waiters = 0; +	TF_BUG_ON(pd->epoll_fd < 0);  } -void tf_poll_init(void) +static void tf_epoll_close(void)  {  	struct tf_poll_data *pd = tf_epoll_get_data(); -	pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC); -	pd->num_waiters = 0; -	TF_BUG_ON(pd->epoll_fd < 0); +	close(pd->epoll_fd);  } - -int tf_poll(tf_mtime_diff_t timeout) +static int tf_epoll_poll(tf_mtime_diff_t timeout)  {  	struct tf_poll_data *pd = tf_epoll_get_data();  	struct epoll_event events[64]; @@ -116,11 +73,66 @@ int tf_poll(tf_mtime_diff_t timeout)  	return ret;  } -void tf_poll_close(void) +static int tf_epoll_fd_created(struct tf_fd *fd)  {  	struct tf_poll_data *pd = tf_epoll_get_data(); +	struct epoll_event ev; +	int r; -	close(pd->epoll_fd); +	ev = (struct epoll_event) { +		.events = EPOLLIN | EPOLLOUT | EPOLLET, +		.data.ptr = fd, +	}; +	r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); +	if (unlikely(r < 0)) { +		TF_BUG_ON(errno == EEXIST); +		r = -errno; +		return r; +	} + +	return 0;  } -#include "io-unix.c" +static int tf_epoll_fd_destroyed(struct tf_fd *fd) +{ +	struct tf_poll_data *pd = tf_epoll_get_data(); + +	if (fd->flags & TF_FD_AUTOCLOSE) +		return 0; + +	epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); +	return 0; +} + +static void tf_epoll_fd_monitor(struct tf_fd *fd, int events) +{ +	struct tf_poll_data *pd = tf_epoll_get_data(); + +	TF_BUG_ON(fd->waiting_fiber != NULL); +	fd->events = EPOLLERR | EPOLLHUP; +	if (events & TF_POLL_READ) +		fd->events |= EPOLLIN; +	if (events & TF_POLL_WRITE) +		fd->events |= EPOLLOUT; +	fd->waiting_fiber = tf_scheduler_get_current()->active_fiber; +	pd->num_waiters++; +} + +static void tf_epoll_fd_unmonitor(struct tf_fd *fd) +{ +	struct tf_poll_data *pd = tf_epoll_get_data(); + +	fd->waiting_fiber = NULL; +	fd->events = 0; +	pd->num_waiters--; +} + +struct tf_poll_hooks tf_epoll_hooks = { +	.init = tf_epoll_init, +	.close = tf_epoll_close, +	.poll = tf_epoll_poll, +	.fd_created = tf_epoll_fd_created, +	.fd_destroyed = tf_epoll_fd_destroyed, +	.fd_monitor = tf_epoll_fd_monitor, +	.fd_unmonitor = tf_epoll_fd_unmonitor, +}; diff --git a/src/io-unix.c b/src/io-unix.c index 39cdf64..d80592a 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -10,8 +10,18 @@   * See http://www.gnu.org/ for details.   */ +#include <errno.h> +#include <fcntl.h> +#include <unistd.h> +#include <sys/socket.h>  #include <string.h> +#include <libtf/io.h> +#include <libtf/scheduler.h> + +#define TF_POLL_HOOKS \ +struct tf_poll_hooks *poller = tf_scheduler_get_current()->poller; +  static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)  {  	if (addr == NULL) @@ -28,13 +38,14 @@ static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)  int tf_open_fd(struct tf_fd *fd, int kfd, int flags)  { +	TF_POLL_HOOKS;  	int r;  	fd->fd = kfd;  	fd->flags = flags;  	fd->waiting_fiber = NULL; -	r = tf_fd_created(fd); +	r = poller->fd_created(fd);  	if (r < 0) {  		if (flags & TF_FD_AUTOCLOSE)  			close(kfd); @@ -77,9 +88,10 @@ int tf_open(struct tf_fd *fd, const char *pathname, int flags)  int tf_close(struct tf_fd *fd)  { +	TF_POLL_HOOKS;  	int r; -	tf_fd_destroyed(fd); +	poller->fd_destroyed(fd);  	if (fd->flags & TF_FD_AUTOCLOSE) {  		r = close(fd->fd);  		if (unlikely(r == -1)) @@ -92,10 +104,11 @@ int tf_close(struct tf_fd *fd)  int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)  { +	TF_POLL_HOOKS;  	ssize_t n;  	int r; -	tf_fd_monitor(fd, EPOLLIN); +	poller->fd_monitor(fd, TF_POLL_READ);  	do {  		n = read(fd->fd, buf, count);  		if (n == count) { @@ -121,17 +134,18 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)  		r = __tf_fiber_schedule();  	} while (r == TF_WAKEUP_FD); -	tf_fd_unmonitor(fd); +	poller->fd_unmonitor(fd);  	return -r;  }  int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)  { +	TF_POLL_HOOKS;  	ssize_t n;  	int r; -	tf_fd_monitor(fd, EPOLLOUT); +	poller->fd_monitor(fd, TF_POLL_WRITE);  	do {  		n = write(fd->fd, buf, count);  		if (n == count) { @@ -154,16 +168,17 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)  		r = __tf_fiber_schedule();  	} while (r == TF_WAKEUP_FD); -	tf_fd_unmonitor(fd); +	poller->fd_unmonitor(fd);  	return -r;  }  ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)  { +	TF_POLL_HOOKS;  	ssize_t n; -	tf_fd_monitor(fd, EPOLLIN); +	poller->fd_monitor(fd, TF_POLL_READ);  	do {  		n = read(fd->fd, buf, count);  		if (n >= 0) @@ -176,16 +191,17 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)  		}  		n = __tf_fiber_schedule();  	} while (n == TF_WAKEUP_FD); -	tf_fd_unmonitor(fd); +	poller->fd_unmonitor(fd);  	return n;  }  ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)  { +	TF_POLL_HOOKS;  	ssize_t n; -	tf_fd_monitor(fd, EPOLLOUT); +	poller->fd_monitor(fd, TF_POLL_WRITE);  	do {  		n = write(fd->fd, buf, count);  		if (n >= 0) @@ -198,7 +214,7 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)  		}  		n = __tf_fiber_schedule();  	} while (n == TF_WAKEUP_FD); -	tf_fd_unmonitor(fd); +	poller->fd_unmonitor(fd);  	return n;  } @@ -245,6 +261,7 @@ int tf_listen(struct tf_fd *fd, int backlog)  int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,  	      struct tf_sockaddr *from)  { +	TF_POLL_HOOKS;  	int r, tfdf;  	struct sockaddr *addr = NULL;  	socklen_t al, *pal = NULL; @@ -258,7 +275,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,  	tfdf  = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED);  	tfdf |= TF_FD_SET_CLOEXEC; -	tf_fd_monitor(listen_fd, EPOLLIN); +	poller->fd_monitor(listen_fd, TF_POLL_READ);  	do {  		/* FIXME: use accept4 if available */  		r = accept(listen_fd->fd, addr, pal); @@ -267,12 +284,12 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,  		if (errno == EINTR)  			continue;  		if (errno != EAGAIN) { -			tf_fd_unmonitor(listen_fd); +			poller->fd_unmonitor(listen_fd);  			return -errno;  		}  		r = __tf_fiber_schedule();  	} while (r == TF_WAKEUP_FD); -	tf_fd_unmonitor(listen_fd); +	poller->fd_unmonitor(listen_fd);  	if (r < 0)  		return r; @@ -281,6 +298,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,  int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)  { +	TF_POLL_HOOKS;  	socklen_t l = sizeof(int);  	int r, err; @@ -292,9 +310,9 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)  		return -errno;  	/* Wait for socket to become readable */ -	tf_fd_monitor(fd, EPOLLOUT); +	poller->fd_monitor(fd, TF_POLL_WRITE);  	r = __tf_fiber_schedule(); -	tf_fd_unmonitor(fd); +	poller->fd_unmonitor(fd);  	if (r != TF_WAKEUP_FD)  		return r; @@ -309,6 +327,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,  		   struct tf_sockaddr *to,  		   void *buf, size_t len)  { +	TF_POLL_HOOKS;  	struct iovec iov;  	struct msghdr msg;  	struct cmsghdr *cmsg; @@ -328,7 +347,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,  		.msg_iovlen	= 1,  	}; -	tf_fd_monitor(fd, EPOLLIN); +	poller->fd_monitor(fd, TF_POLL_READ);  	do {  		r = recvmsg(fd->fd, &msg, 0);  		if (r >= 0) @@ -339,7 +358,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,  		}  		r = __tf_fiber_schedule();  	} while (r == TF_WAKEUP_FD); -	tf_fd_unmonitor(fd); +	poller->fd_unmonitor(fd);  	if (r < 0 || to == NULL)  		return r; @@ -363,6 +382,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd,  		   const struct tf_sockaddr *to,  		   const void *buf, size_t len)  { +	TF_POLL_HOOKS;  	struct msghdr msg;  	struct iovec iov;  	struct { @@ -391,7 +411,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd,  		msg.msg_controllen	= sizeof(cmsg);  	} -	tf_fd_monitor(fd, EPOLLOUT); +	poller->fd_monitor(fd, TF_POLL_WRITE);  	do {  		r = sendmsg(fd->fd, &msg, 0);  		if (r >= 0) @@ -402,7 +422,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd,  		}  		r = __tf_fiber_schedule();  	} while (r == TF_WAKEUP_FD); -	tf_fd_unmonitor(fd); +	poller->fd_unmonitor(fd);  	return r;  } diff --git a/src/scheduler.c b/src/scheduler.c index d287eca..a103d0a 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -15,6 +15,7 @@  #include <libtf/io.h>  /* FIXME: should be in thread local storage */ +extern struct tf_poll_hooks tf_epoll_hooks;  struct tf_scheduler *__tf_scheduler;  static void update_time(struct tf_scheduler *sched) @@ -59,7 +60,7 @@ void tf_scheduler_fiber(void *data)  			timeout = -1;  		} -		if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT && +		if (sched->poller->poll(timeout) == TF_WAKEUP_TIMEOUT &&  		    timeout >= 0) {  			sched->scheduler_time += timeout;  			process_heap(sched); @@ -103,7 +104,8 @@ int  tf_scheduler_enable(struct tf_scheduler *sched)  	__tf_fiber_bind_scheduler(s);  	__tf_scheduler = s; -	tf_poll_init(); +	s->poller = &tf_epoll_hooks; +	s->poller->init();  	update_time(s);  	if (sched != NULL) @@ -124,7 +126,7 @@ void tf_scheduler_disable(void)  	while (sched->num_fibers > 1)  		__tf_fiber_schedule(); -	tf_poll_close(); +	sched->poller->close();  	__tf_scheduler = NULL;  	__tf_fiber_release_scheduler(sched);  	tf_heap_destroy(&sched->heap);  | 
