summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/TFbuild1
-rw-r--r--src/io-epoll.c120
-rw-r--r--src/io-unix.c58
-rw-r--r--src/scheduler.c8
4 files changed, 111 insertions, 76 deletions
diff --git a/src/TFbuild b/src/TFbuild
index 08cb696..ef1c5ae 100644
--- a/src/TFbuild
+++ b/src/TFbuild
@@ -2,5 +2,6 @@ libs-y += libtf
libtf-objs-y += fiber.o scheduler.o heap.o
libtf-objs-$(OS_LINUX) += io-epoll.o
+libtf-objs-$(OS_UNIX) += io-unix.o
CFLAGS_heap.c += -funroll-all-loops
diff --git a/src/io-epoll.c b/src/io-epoll.c
index 8ac230f..32aa090 100644
--- a/src/io-epoll.c
+++ b/src/io-epoll.c
@@ -14,7 +14,6 @@
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
-#include <sys/socket.h>
#include <libtf/io.h>
#include <libtf/scheduler.h>
@@ -24,71 +23,29 @@ struct tf_poll_data {
int num_waiters;
};
-struct tf_poll_data *tf_epoll_get_data(void)
+static struct tf_poll_data *tf_epoll_get_data(void)
{
struct tf_scheduler *sched = tf_scheduler_get_current();
TF_BUILD_BUG_ON(sizeof(struct tf_poll_data) > sizeof(sched->poll_data));
return (struct tf_poll_data *) &sched->poll_data;
}
-static int tf_fd_created(struct tf_fd *fd)
+static void tf_epoll_init(void)
{
struct tf_poll_data *pd = tf_epoll_get_data();
- struct epoll_event ev;
- int r;
- ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
- ev.data.ptr = fd;
- r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
- if (unlikely(r < 0)) {
- TF_BUG_ON(errno == EEXIST);
- r = -errno;
- return r;
- }
-
- return 0;
-}
-
-static int tf_fd_destroyed(struct tf_fd *fd)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
-
- if (fd->flags & TF_FD_AUTOCLOSE)
- return 0;
-
- epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
- return 0;
-}
-
-static void tf_fd_monitor(struct tf_fd *fd, int events)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
-
- TF_BUG_ON(fd->waiting_fiber != NULL);
- fd->events = events | EPOLLERR | EPOLLHUP;
- fd->waiting_fiber = tf_scheduler_get_current()->active_fiber;
- pd->num_waiters++;
-}
-
-static void tf_fd_unmonitor(struct tf_fd *fd)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
-
- fd->waiting_fiber = NULL;
- fd->events = 0;
- pd->num_waiters--;
+ pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ pd->num_waiters = 0;
+ TF_BUG_ON(pd->epoll_fd < 0);
}
-void tf_poll_init(void)
+static void tf_epoll_close(void)
{
struct tf_poll_data *pd = tf_epoll_get_data();
- pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
- pd->num_waiters = 0;
- TF_BUG_ON(pd->epoll_fd < 0);
+ close(pd->epoll_fd);
}
-
-int tf_poll(tf_mtime_diff_t timeout)
+static int tf_epoll_poll(tf_mtime_diff_t timeout)
{
struct tf_poll_data *pd = tf_epoll_get_data();
struct epoll_event events[64];
@@ -116,11 +73,66 @@ int tf_poll(tf_mtime_diff_t timeout)
return ret;
}
-void tf_poll_close(void)
+static int tf_epoll_fd_created(struct tf_fd *fd)
{
struct tf_poll_data *pd = tf_epoll_get_data();
+ struct epoll_event ev;
+ int r;
- close(pd->epoll_fd);
+ ev = (struct epoll_event) {
+ .events = EPOLLIN | EPOLLOUT | EPOLLET,
+ .data.ptr = fd,
+ };
+ r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+ if (unlikely(r < 0)) {
+ TF_BUG_ON(errno == EEXIST);
+ r = -errno;
+ return r;
+ }
+
+ return 0;
}
-#include "io-unix.c"
+static int tf_epoll_fd_destroyed(struct tf_fd *fd)
+{
+ struct tf_poll_data *pd = tf_epoll_get_data();
+
+ if (fd->flags & TF_FD_AUTOCLOSE)
+ return 0;
+
+ epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
+ return 0;
+}
+
+static void tf_epoll_fd_monitor(struct tf_fd *fd, int events)
+{
+ struct tf_poll_data *pd = tf_epoll_get_data();
+
+ TF_BUG_ON(fd->waiting_fiber != NULL);
+ fd->events = EPOLLERR | EPOLLHUP;
+ if (events & TF_POLL_READ)
+ fd->events |= EPOLLIN;
+ if (events & TF_POLL_WRITE)
+ fd->events |= EPOLLOUT;
+ fd->waiting_fiber = tf_scheduler_get_current()->active_fiber;
+ pd->num_waiters++;
+}
+
+static void tf_epoll_fd_unmonitor(struct tf_fd *fd)
+{
+ struct tf_poll_data *pd = tf_epoll_get_data();
+
+ fd->waiting_fiber = NULL;
+ fd->events = 0;
+ pd->num_waiters--;
+}
+
+struct tf_poll_hooks tf_epoll_hooks = {
+ .init = tf_epoll_init,
+ .close = tf_epoll_close,
+ .poll = tf_epoll_poll,
+ .fd_created = tf_epoll_fd_created,
+ .fd_destroyed = tf_epoll_fd_destroyed,
+ .fd_monitor = tf_epoll_fd_monitor,
+ .fd_unmonitor = tf_epoll_fd_unmonitor,
+};
diff --git a/src/io-unix.c b/src/io-unix.c
index 39cdf64..d80592a 100644
--- a/src/io-unix.c
+++ b/src/io-unix.c
@@ -10,8 +10,18 @@
* See http://www.gnu.org/ for details.
*/
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/socket.h>
#include <string.h>
+#include <libtf/io.h>
+#include <libtf/scheduler.h>
+
+#define TF_POLL_HOOKS \
+struct tf_poll_hooks *poller = tf_scheduler_get_current()->poller;
+
static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
{
if (addr == NULL)
@@ -28,13 +38,14 @@ static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
int tf_open_fd(struct tf_fd *fd, int kfd, int flags)
{
+ TF_POLL_HOOKS;
int r;
fd->fd = kfd;
fd->flags = flags;
fd->waiting_fiber = NULL;
- r = tf_fd_created(fd);
+ r = poller->fd_created(fd);
if (r < 0) {
if (flags & TF_FD_AUTOCLOSE)
close(kfd);
@@ -77,9 +88,10 @@ int tf_open(struct tf_fd *fd, const char *pathname, int flags)
int tf_close(struct tf_fd *fd)
{
+ TF_POLL_HOOKS;
int r;
- tf_fd_destroyed(fd);
+ poller->fd_destroyed(fd);
if (fd->flags & TF_FD_AUTOCLOSE) {
r = close(fd->fd);
if (unlikely(r == -1))
@@ -92,10 +104,11 @@ int tf_close(struct tf_fd *fd)
int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
{
+ TF_POLL_HOOKS;
ssize_t n;
int r;
- tf_fd_monitor(fd, EPOLLIN);
+ poller->fd_monitor(fd, TF_POLL_READ);
do {
n = read(fd->fd, buf, count);
if (n == count) {
@@ -121,17 +134,18 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
r = __tf_fiber_schedule();
} while (r == TF_WAKEUP_FD);
- tf_fd_unmonitor(fd);
+ poller->fd_unmonitor(fd);
return -r;
}
int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
{
+ TF_POLL_HOOKS;
ssize_t n;
int r;
- tf_fd_monitor(fd, EPOLLOUT);
+ poller->fd_monitor(fd, TF_POLL_WRITE);
do {
n = write(fd->fd, buf, count);
if (n == count) {
@@ -154,16 +168,17 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
r = __tf_fiber_schedule();
} while (r == TF_WAKEUP_FD);
- tf_fd_unmonitor(fd);
+ poller->fd_unmonitor(fd);
return -r;
}
ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
{
+ TF_POLL_HOOKS;
ssize_t n;
- tf_fd_monitor(fd, EPOLLIN);
+ poller->fd_monitor(fd, TF_POLL_READ);
do {
n = read(fd->fd, buf, count);
if (n >= 0)
@@ -176,16 +191,17 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
}
n = __tf_fiber_schedule();
} while (n == TF_WAKEUP_FD);
- tf_fd_unmonitor(fd);
+ poller->fd_unmonitor(fd);
return n;
}
ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
{
+ TF_POLL_HOOKS;
ssize_t n;
- tf_fd_monitor(fd, EPOLLOUT);
+ poller->fd_monitor(fd, TF_POLL_WRITE);
do {
n = write(fd->fd, buf, count);
if (n >= 0)
@@ -198,7 +214,7 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
}
n = __tf_fiber_schedule();
} while (n == TF_WAKEUP_FD);
- tf_fd_unmonitor(fd);
+ poller->fd_unmonitor(fd);
return n;
}
@@ -245,6 +261,7 @@ int tf_listen(struct tf_fd *fd, int backlog)
int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
struct tf_sockaddr *from)
{
+ TF_POLL_HOOKS;
int r, tfdf;
struct sockaddr *addr = NULL;
socklen_t al, *pal = NULL;
@@ -258,7 +275,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED);
tfdf |= TF_FD_SET_CLOEXEC;
- tf_fd_monitor(listen_fd, EPOLLIN);
+ poller->fd_monitor(listen_fd, TF_POLL_READ);
do {
/* FIXME: use accept4 if available */
r = accept(listen_fd->fd, addr, pal);
@@ -267,12 +284,12 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
if (errno == EINTR)
continue;
if (errno != EAGAIN) {
- tf_fd_unmonitor(listen_fd);
+ poller->fd_unmonitor(listen_fd);
return -errno;
}
r = __tf_fiber_schedule();
} while (r == TF_WAKEUP_FD);
- tf_fd_unmonitor(listen_fd);
+ poller->fd_unmonitor(listen_fd);
if (r < 0)
return r;
@@ -281,6 +298,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
{
+ TF_POLL_HOOKS;
socklen_t l = sizeof(int);
int r, err;
@@ -292,9 +310,9 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
return -errno;
/* Wait for socket to become readable */
- tf_fd_monitor(fd, EPOLLOUT);
+ poller->fd_monitor(fd, TF_POLL_WRITE);
r = __tf_fiber_schedule();
- tf_fd_unmonitor(fd);
+ poller->fd_unmonitor(fd);
if (r != TF_WAKEUP_FD)
return r;
@@ -309,6 +327,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
struct tf_sockaddr *to,
void *buf, size_t len)
{
+ TF_POLL_HOOKS;
struct iovec iov;
struct msghdr msg;
struct cmsghdr *cmsg;
@@ -328,7 +347,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
.msg_iovlen = 1,
};
- tf_fd_monitor(fd, EPOLLIN);
+ poller->fd_monitor(fd, TF_POLL_READ);
do {
r = recvmsg(fd->fd, &msg, 0);
if (r >= 0)
@@ -339,7 +358,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
}
r = __tf_fiber_schedule();
} while (r == TF_WAKEUP_FD);
- tf_fd_unmonitor(fd);
+ poller->fd_unmonitor(fd);
if (r < 0 || to == NULL)
return r;
@@ -363,6 +382,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
const struct tf_sockaddr *to,
const void *buf, size_t len)
{
+ TF_POLL_HOOKS;
struct msghdr msg;
struct iovec iov;
struct {
@@ -391,7 +411,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
msg.msg_controllen = sizeof(cmsg);
}
- tf_fd_monitor(fd, EPOLLOUT);
+ poller->fd_monitor(fd, TF_POLL_WRITE);
do {
r = sendmsg(fd->fd, &msg, 0);
if (r >= 0)
@@ -402,7 +422,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
}
r = __tf_fiber_schedule();
} while (r == TF_WAKEUP_FD);
- tf_fd_unmonitor(fd);
+ poller->fd_unmonitor(fd);
return r;
}
diff --git a/src/scheduler.c b/src/scheduler.c
index d287eca..a103d0a 100644
--- a/src/scheduler.c
+++ b/src/scheduler.c
@@ -15,6 +15,7 @@
#include <libtf/io.h>
/* FIXME: should be in thread local storage */
+extern struct tf_poll_hooks tf_epoll_hooks;
struct tf_scheduler *__tf_scheduler;
static void update_time(struct tf_scheduler *sched)
@@ -59,7 +60,7 @@ void tf_scheduler_fiber(void *data)
timeout = -1;
}
- if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT &&
+ if (sched->poller->poll(timeout) == TF_WAKEUP_TIMEOUT &&
timeout >= 0) {
sched->scheduler_time += timeout;
process_heap(sched);
@@ -103,7 +104,8 @@ int tf_scheduler_enable(struct tf_scheduler *sched)
__tf_fiber_bind_scheduler(s);
__tf_scheduler = s;
- tf_poll_init();
+ s->poller = &tf_epoll_hooks;
+ s->poller->init();
update_time(s);
if (sched != NULL)
@@ -124,7 +126,7 @@ void tf_scheduler_disable(void)
while (sched->num_fibers > 1)
__tf_fiber_schedule();
- tf_poll_close();
+ sched->poller->close();
__tf_scheduler = NULL;
__tf_fiber_release_scheduler(sched);
tf_heap_destroy(&sched->heap);