diff options
author | Timo Teras <timo.teras@iki.fi> | 2009-11-25 15:11:20 +0200 |
---|---|---|
committer | Timo Teras <timo.teras@iki.fi> | 2009-11-25 15:11:20 +0200 |
commit | 2b19cc385163a43b1d559074a795a8aaab751185 (patch) | |
tree | 322473e3446153c1bbaac8d6d990734b09d15977 /src | |
parent | fc1044daf51f32b9d85f8497e4e0bd5a3c1e7fe9 (diff) | |
download | libtf-2b19cc385163a43b1d559074a795a8aaab751185.tar.bz2 libtf-2b19cc385163a43b1d559074a795a8aaab751185.tar.xz |
libtf: implement basic networking i/o
pretty much untested. also some slight changes to how scheduler is
invoked.
Diffstat (limited to 'src')
-rw-r--r-- | src/fiber.c | 38 | ||||
-rw-r--r-- | src/io-epoll.c | 43 | ||||
-rw-r--r-- | src/io-unix.c | 310 |
3 files changed, 312 insertions, 79 deletions
diff --git a/src/fiber.c b/src/fiber.c index 15c533a..aa83fa8 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -92,7 +92,6 @@ static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f) sched->num_fibers--; break; case TF_WAKEUP_IMMEDIATE: - case TF_WAKEUP_TIMEOUT: break; default: TF_BUG_ON("bad scheduler call from fiber"); @@ -152,7 +151,7 @@ int tf_main(tf_fiber_proc main_fiber) } else timeout = -1; - if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT) { + if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT && timeout >= 0) { sched->scheduler_time += timeout; process_heap(sched); } @@ -164,30 +163,20 @@ int tf_main(tf_fiber_proc main_fiber) return 0; } -int tf_schedule(int wakeup) +int tf_schedule(tf_mtime_diff_t milliseconds) { struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); struct tf_fiber *f = sched->active_fiber; - if (wakeup != TF_WAKEUP_TIMEOUT) - tf_heap_delete(&f->heap_node, &sched->heap); f->wakeup_type = TF_WAKEUP_NONE; - - return tf_uctx_transfer(f, schedf, wakeup); -} - -int tf_schedule_timeout(int milliseconds) -{ - struct tf_scheduler *sched = tf_get_scheduler(); - struct tf_fiber *f = sched->active_fiber; - - if (milliseconds <= 0) { + if (milliseconds == TF_NO_TIMEOUT) tf_heap_delete(&f->heap_node, &sched->heap); - return TF_WAKEUP_IMMEDIATE; - } - tf_heap_change(&f->heap_node, &sched->heap, tf_mtime() + milliseconds); - return TF_WAKEUP_TIMEOUT; + else if (milliseconds != TF_NO_TIMEOUT_CHANGE) + tf_heap_change(&f->heap_node, &sched->heap, + tf_mtime() + milliseconds); + + return tf_uctx_transfer(f, schedf, TF_WAKEUP_IMMEDIATE); } void tf_wakeup(struct tf_fiber *fiber, int wakeup_type) @@ -204,9 +193,10 @@ void tf_exit(void) { struct tf_scheduler *sched = tf_get_scheduler(); struct tf_fiber *f = sched->active_fiber; + struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data); tf_heap_delete(&f->heap_node, &sched->heap); - tf_schedule(TF_WAKEUP_KILL); + tf_uctx_transfer(f, schedf, TF_WAKEUP_KILL); TF_BUG_ON(1); } @@ -220,12 +210,6 @@ int tf_yield(void) struct tf_fiber *f = sched->active_fiber; tf_list_add_tail(&f->queue_node, &sched->run_q); - return tf_schedule(TF_WAKEUP_IMMEDIATE); -} - -int tf_msleep(int milliseconds) -{ - tf_schedule_timeout(milliseconds); - return tf_schedule(TF_WAKEUP_TIMEOUT); + return tf_schedule(TF_NO_TIMEOUT); } diff --git a/src/io-epoll.c b/src/io-epoll.c index 56d0743..5e28de8 100644 --- a/src/io-epoll.c +++ b/src/io-epoll.c @@ -14,15 +14,12 @@ #include <fcntl.h> #include <unistd.h> #include <sys/epoll.h> +#include <sys/socket.h> #include <libtf/io.h> #include <libtf/fiber.h> -#define TF_FD_AUTOCLOSE 1 -#define TF_FD_RESTORE_BLOCKING 2 -#define TF_FD_STREAM_ORIENTED 4 - -static int tf_fd_init(struct tf_fd *fd, int kfd, int flags) +static int tf_fd_created(struct tf_fd *fd) { struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; struct epoll_event ev; @@ -30,20 +27,28 @@ static int tf_fd_init(struct tf_fd *fd, int kfd, int flags) ev.events = EPOLLIN | EPOLLOUT | EPOLLET; ev.data.ptr = fd; - r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, kfd, &ev); - if (r < 0) { + r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); + if (unlikely(r < 0)) { TF_BUG_ON(errno == EEXIST); - return -errno; + r = -errno; + return r; } - fd->fd = kfd; - fd->flags = flags; - fd->waiting_fiber = NULL; + return 0; +} +static int tf_fd_destroyed(struct tf_fd *fd) +{ + struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; + + if (fd->flags & TF_FD_AUTOCLOSE) + return 0; + + epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); return 0; } -static void tf_fd_wait(struct tf_fd *fd, int events) +static void tf_fd_monitor(struct tf_fd *fd, int events) { struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; @@ -53,7 +58,7 @@ static void tf_fd_wait(struct tf_fd *fd, int events) pd->num_waiters++; } -static void tf_fd_release(struct tf_fd *fd) +static void tf_fd_unmonitor(struct tf_fd *fd) { struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; @@ -76,21 +81,23 @@ int tf_poll(tf_mtime_diff_t timeout) struct tf_poll_data *pd = &tf_get_scheduler()->poll_data; struct epoll_event events[64]; struct tf_fd *fd; - int ret = (timeout == 0) ? TF_WAKEUP_TIMEOUT : TF_WAKEUP_FD; - int r, i; + int r, i, ret; if (timeout == 0 && pd->num_waiters == 0) - return ret; + return TF_WAKEUP_TIMEOUT; + ret = TF_WAKEUP_TIMEOUT; do { r = epoll_wait(pd->epoll_fd, events, array_size(events), timeout); + if (r == 0) + break; + for (i = 0; i < r; i++) { fd = (struct tf_fd *) events[i].data.ptr; if (likely(fd->events & events[i].events)) tf_wakeup(fd->waiting_fiber, TF_WAKEUP_FD); } - if (timeout != 0) - ret = TF_WAKEUP_FD; + ret = TF_WAKEUP_FD; timeout = 0; } while (unlikely(r == array_size(events))); diff --git a/src/io-unix.c b/src/io-unix.c index d333122..cc1bd1b 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -10,46 +10,82 @@ * See http://www.gnu.org/ for details. */ -int tf_open(struct tf_fd *fd, const char *pathname, int flags) +#include <string.h> + +static inline int tf_sockaddr_len(const struct tf_sockaddr *addr) { - int kfd, r; + if (addr == NULL) + return 0; + switch (addr->u.addr.sa_family) { + case AF_INET: + return sizeof(struct sockaddr_in); + case AF_INET6: + return sizeof(struct sockaddr_in6); + default: + TF_BUG_ON(1); + } +} - kfd = open(pathname, flags | O_CLOEXEC | O_NONBLOCK); - if (unlikely(kfd < 0)) - return -errno; +int tf_open_fd(struct tf_fd *fd, int kfd, int flags) +{ + int r; + + fd->fd = kfd; + fd->flags = flags; + fd->waiting_fiber = NULL; - r = tf_fd_init(fd, kfd, TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED); + r = tf_fd_created(fd); if (r < 0) { - close(kfd); + if (flags & TF_FD_AUTOCLOSE) + close(kfd); + fd->fd = -1; return r; } + + if (flags & TF_FD_SET_CLOEXEC) + fcntl(kfd, F_SETFD, FD_CLOEXEC); + + if (!(flags & TF_FD_ALREADY_NONBLOCKING)) { + r = fcntl(kfd, F_GETFL, 0); + if (r & O_NONBLOCK) + fd->flags |= TF_FD_ALREADY_NONBLOCKING; + else + fcntl(kfd, F_SETFL, O_NONBLOCK | r); + } + return 0; } -int tf_open_fd(struct tf_fd *fd, int kfd) +int tf_open(struct tf_fd *fd, const char *pathname, int flags) { - int mode, flags = 0; + int kfd, tfdf; - mode = fcntl(kfd, F_GETFL, 0); - if (!(mode & O_NONBLOCK)) { - fcntl(fd->fd, F_SETFL, mode | O_NONBLOCK); - flags |= TF_FD_RESTORE_BLOCKING; - } + tfdf = TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED + | TF_FD_ALREADY_NONBLOCKING; +#if defined(O_CLOEXEC) + flags |= O_CLOEXEC; +#else + tfdf |= TF_FD_SET_CLOEXEC; +#endif + + kfd = open(pathname, flags | O_NONBLOCK); + if (unlikely(kfd < 0)) + return -errno; - return tf_fd_init(fd, kfd, TF_FD_STREAM_ORIENTED | flags); + return tf_open_fd(fd, kfd, tfdf); } int tf_close(struct tf_fd *fd) { int r; - if (fd->flags & TF_FD_RESTORE_BLOCKING) { - fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK); - } + tf_fd_destroyed(fd); if (fd->flags & TF_FD_AUTOCLOSE) { r = close(fd->fd); if (unlikely(r == -1)) return -errno; + } else if (!(fd->flags & TF_FD_ALREADY_NONBLOCKING)) { + fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK); } return 0; } @@ -57,10 +93,9 @@ int tf_close(struct tf_fd *fd) int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) { ssize_t n; - int r, mode; + int r; - mode = tf_schedule_timeout(timeout); - tf_fd_wait(fd, EPOLLIN); + tf_fd_monitor(fd, EPOLLIN); do { n = read(fd->fd, buf, count); if (n == count) { @@ -84,11 +119,10 @@ int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) continue; } - r = tf_schedule(mode); - if (r != TF_WAKEUP_FD) - break; - } while (1); - tf_fd_release(fd); + r = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (r == TF_WAKEUP_FD); + tf_fd_unmonitor(fd); return -r; } @@ -97,10 +131,9 @@ int tf_write(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout) { ssize_t n; - int r, mode; + int r; - mode = tf_schedule_timeout(timeout); - tf_fd_wait(fd, EPOLLOUT); + tf_fd_monitor(fd, EPOLLOUT); do { n = write(fd->fd, buf, count); if (n == count) { @@ -121,11 +154,220 @@ int tf_write(struct tf_fd *fd, const void *buf, size_t count, continue; } - r = tf_schedule(mode); - if (r != TF_WAKEUP_FD) - break; - } while (1); - tf_fd_release(fd); + r = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (r == TF_WAKEUP_FD); + tf_fd_unmonitor(fd); return -r; } + +int tf_socket(struct tf_fd *fd, int domain, int type, int protocol) +{ + int kfd, tfdf, on = 1; + + tfdf = TF_FD_AUTOCLOSE; +#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC) + type |= SOCK_NONBLOCK | SOCK_CLOEXEC; + tfdf |= TF_FD_ALREADY_NONBLOCKING; +#else + tfdf |= TF_FD_SET_CLOEXEC; +#endif + kfd = socket(domain, type, protocol); + if (unlikely(kfd < 0)) + return -errno; + + if (protocol == SOCK_STREAM) { + tfdf |= TF_FD_STREAM_ORIENTED; + } else { + /* We want the local IP info too */ + setsockopt(kfd, IPPROTO_IP, IP_PKTINFO, &on, sizeof(on)); + } + + return tf_open_fd(fd, kfd, tfdf); +} + +int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr) +{ + if (unlikely(bind(fd->fd, &addr->u.addr, tf_sockaddr_len(addr)) < 0)) + return -errno; + return 0; +} + +int tf_listen(struct tf_fd *fd, int backlog) +{ + if (unlikely(listen(fd->fd, backlog) < 0)) + return -errno; + return 0; +} + +int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd, + struct tf_sockaddr *from, tf_mtime_diff_t timeout) +{ + int r, tfdf; + struct sockaddr *addr = NULL; + socklen_t al, *pal = NULL; + + if (from != NULL) { + addr = &from->u.addr; + al = tf_sockaddr_len(from); + pal = &al; + } + + tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED); + tfdf |= TF_FD_SET_CLOEXEC; + + tf_fd_monitor(listen_fd, EPOLLIN); + do { + /* FIXME: use accept4 if available */ + r = accept(listen_fd->fd, addr, pal); + if (r >= 0) + break; + if (errno == EINTR) + continue; + if (errno != EAGAIN) { + tf_fd_unmonitor(listen_fd); + return -errno; + } + r = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (r == TF_WAKEUP_FD); + tf_fd_unmonitor(listen_fd); + if (r < 0) + return r; + + return tf_open_fd(child_fd, r, tfdf); +} + +int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to, + tf_mtime_diff_t timeout) +{ + socklen_t l = sizeof(int); + int r, err; + + /* Initiate operation */ + r = connect(fd->fd, &to->u.addr, tf_sockaddr_len(to)); + if (r >= 0) + return 0; + if (errno != EINPROGRESS) + return -errno; + + /* Wait for socket to become readable */ + tf_fd_monitor(fd, EPOLLOUT); + r = tf_schedule(timeout); + tf_fd_unmonitor(fd); + if (r != TF_WAKEUP_FD) + return r; + + /* Check for error */ + if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &err, &l) < 0) + return -errno; + return -err; +} + +ssize_t tf_recvmsg(struct tf_fd *fd, + struct tf_sockaddr *from, + struct tf_sockaddr *to, + void *buf, size_t len, + tf_mtime_diff_t timeout) +{ + struct iovec iov; + struct msghdr msg; + struct cmsghdr *cmsg; + char control[128]; + int r; + + iov = (struct iovec) { + .iov_base = buf, + .iov_len = len, + }; + msg = (struct msghdr) { + .msg_name = &from->u.addr, + .msg_namelen = sizeof(struct tf_sockaddr), + .msg_control = control, + .msg_controllen = sizeof(control), + .msg_iov = &iov, + .msg_iovlen = 1, + }; + + tf_fd_monitor(fd, EPOLLIN); + do { + r = recvmsg(fd->fd, &msg, 0); + if (r >= 0) + break; + if (errno != EAGAIN) { + r = -errno; + break; + } + r = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (r == TF_WAKEUP_FD); + tf_fd_unmonitor(fd); + + if (r < 0 || to == NULL) + return r; + + to->u.addr.sa_family = AF_UNSPEC; + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level == IPPROTO_IP && + cmsg->cmsg_type == IP_PKTINFO) { + struct in_pktinfo *ipi = (struct in_pktinfo *) CMSG_DATA(cmsg); + to->u.in.sin_family = AF_INET; + to->u.in.sin_addr = ipi->ipi_spec_dst; + to->u.in.sin_port = 0; + } + } + + return r; +} + +ssize_t tf_sendmsg(struct tf_fd *fd, + struct tf_sockaddr *from, + const struct tf_sockaddr *to, + const void *buf, size_t len, + tf_mtime_diff_t timeout) +{ + struct msghdr msg; + struct iovec iov; + struct { + struct cmsghdr cm; + struct in_pktinfo ipi; + } cmsg; + int r; + + iov = (struct iovec) { + .iov_base = (void*) buf, + .iov_len = len, + }; + msg = (struct msghdr) { + .msg_name = (struct sockaddr*) &to->u.addr, + .msg_namelen = tf_sockaddr_len(to), + .msg_iov = &iov, + .msg_iovlen = 1, + }; + if (from != NULL && from->u.addr.sa_family == AF_INET) { + memset(&cmsg.ipi, 0, sizeof(cmsg.ipi)); + cmsg.cm.cmsg_len = sizeof(cmsg); + cmsg.cm.cmsg_level = IPPROTO_IP; + cmsg.cm.cmsg_type = IP_PKTINFO; + cmsg.ipi.ipi_spec_dst = from->u.in.sin_addr; + msg.msg_control = (void *) &cmsg; + msg.msg_controllen = sizeof(cmsg); + } + + tf_fd_monitor(fd, EPOLLOUT); + do { + r = sendmsg(fd->fd, &msg, 0); + if (r >= 0) + break; + if (errno != EAGAIN) { + r = -errno; + break; + } + r = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (r == TF_WAKEUP_FD); + tf_fd_unmonitor(fd); + + return r; +} |