summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/libtf/defines.h2
-rw-r--r--include/libtf/fiber.h15
-rw-r--r--include/libtf/io.h28
-rw-r--r--src/fiber.c38
-rw-r--r--src/io-epoll.c43
-rw-r--r--src/io-unix.c310
-rw-r--r--test/read.c4
7 files changed, 342 insertions, 98 deletions
diff --git a/include/libtf/defines.h b/include/libtf/defines.h
index 144ad63..ae72980 100644
--- a/include/libtf/defines.h
+++ b/include/libtf/defines.h
@@ -72,8 +72,6 @@
#endif
/* Monotonic time */
-#define TF_INFINITE -1
-
typedef uint32_t tf_mtime_t;
typedef int32_t tf_mtime_diff_t;
diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h
index 91c0b3b..36c2812 100644
--- a/include/libtf/fiber.h
+++ b/include/libtf/fiber.h
@@ -28,6 +28,10 @@
#define TF_WAKEUP_TIMEOUT ETIMEDOUT
#define TF_WAKEUP_FD EIO
+/* Special timeouts for tf_schedule() */
+#define TF_NO_TIMEOUT -1
+#define TF_NO_TIMEOUT_CHANGE -2
+
/* Scheduler */
struct tf_fiber;
@@ -77,11 +81,14 @@ void tf_fiber_put(void *data);
void tf_exit(void) attribute_noreturn;
void tf_kill(void *fiber);
-int tf_schedule(int wakeup_type);
-int tf_schedule_timeout(int milliseconds);
+int tf_schedule(int milliseconds);
void tf_wakeup(struct tf_fiber *fiber, int wakeup_type);
-
int tf_yield(void);
-int tf_msleep(int milliseconds);
+
+static inline
+int tf_msleep(int milliseconds)
+{
+ return tf_schedule(milliseconds);
+}
#endif
diff --git a/include/libtf/io.h b/include/libtf/io.h
index 87a6c90..8f5b54d 100644
--- a/include/libtf/io.h
+++ b/include/libtf/io.h
@@ -20,6 +20,12 @@
#include <libtf/defines.h>
+/* Flags for tf_open_fd() */
+#define TF_FD_AUTOCLOSE 1
+#define TF_FD_STREAM_ORIENTED 2
+#define TF_FD_SET_CLOEXEC 4
+#define TF_FD_ALREADY_NONBLOCKING 8
+
struct tf_fiber;
struct tf_sockaddr {
@@ -27,7 +33,7 @@ struct tf_sockaddr {
struct sockaddr addr;
struct sockaddr_in in;
struct sockaddr_in6 in6;
- };
+ } u;
};
struct tf_fd {
@@ -43,28 +49,28 @@ void tf_poll_init(void);
int tf_poll(tf_mtime_diff_t timeout);
void tf_poll_close(void);
+int tf_open_fd(struct tf_fd *fd, int kfd, int flags);
int tf_open(struct tf_fd *fd, const char *pathname, int flags);
-int tf_open_fd(struct tf_fd *fd, int kfd);
int tf_close(struct tf_fd *fd);
-int tf_read(struct tf_fd *fd, void *buf, size_t count, int timeout);
-int tf_write(struct tf_fd *fd, const void *buf, size_t count, int timeout);
+int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout);
+int tf_write(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout);
int tf_socket(struct tf_fd *fd, int domain, int type, int protocol);
int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr);
int tf_listen(struct tf_fd *fd, int backlog);
-int tf_accept(struct tf_fd *fd);
-int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *addr, int timeout);
-ssize_t tf_recv(struct tf_fd *fd, void *buf, size_t count, int timeout);
-ssize_t tf_send(struct tf_fd *fd, const void *buf, size_t count, int timeout);
+int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
+ struct tf_sockaddr *from, tf_mtime_diff_t timeout);
+int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *addr, tf_mtime_diff_t timeout);
+
ssize_t tf_recvmsg(struct tf_fd *fd,
struct tf_sockaddr *from, struct tf_sockaddr *to,
- void *buf, size_t count, int timeout);
+ void *buf, size_t count, tf_mtime_diff_t timeout);
ssize_t tf_sendmsg(struct tf_fd *fd,
struct tf_sockaddr *from, const struct tf_sockaddr *to,
- const void *buf, size_t count, int timeout);
+ const void *buf, size_t count, tf_mtime_diff_t timeout);
int tf_query_dns(const char *name, int num_res, struct tf_sockaddr *res,
- int timeout);
+ tf_mtime_diff_t timeout);
#endif
diff --git a/src/fiber.c b/src/fiber.c
index 15c533a..aa83fa8 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -92,7 +92,6 @@ static void run_fiber(struct tf_scheduler *sched, struct tf_fiber *f)
sched->num_fibers--;
break;
case TF_WAKEUP_IMMEDIATE:
- case TF_WAKEUP_TIMEOUT:
break;
default:
TF_BUG_ON("bad scheduler call from fiber");
@@ -152,7 +151,7 @@ int tf_main(tf_fiber_proc main_fiber)
} else
timeout = -1;
- if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT) {
+ if (tf_poll(timeout) == TF_WAKEUP_TIMEOUT && timeout >= 0) {
sched->scheduler_time += timeout;
process_heap(sched);
}
@@ -164,30 +163,20 @@ int tf_main(tf_fiber_proc main_fiber)
return 0;
}
-int tf_schedule(int wakeup)
+int tf_schedule(tf_mtime_diff_t milliseconds)
{
struct tf_scheduler *sched = tf_get_scheduler();
struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data);
struct tf_fiber *f = sched->active_fiber;
- if (wakeup != TF_WAKEUP_TIMEOUT)
- tf_heap_delete(&f->heap_node, &sched->heap);
f->wakeup_type = TF_WAKEUP_NONE;
-
- return tf_uctx_transfer(f, schedf, wakeup);
-}
-
-int tf_schedule_timeout(int milliseconds)
-{
- struct tf_scheduler *sched = tf_get_scheduler();
- struct tf_fiber *f = sched->active_fiber;
-
- if (milliseconds <= 0) {
+ if (milliseconds == TF_NO_TIMEOUT)
tf_heap_delete(&f->heap_node, &sched->heap);
- return TF_WAKEUP_IMMEDIATE;
- }
- tf_heap_change(&f->heap_node, &sched->heap, tf_mtime() + milliseconds);
- return TF_WAKEUP_TIMEOUT;
+ else if (milliseconds != TF_NO_TIMEOUT_CHANGE)
+ tf_heap_change(&f->heap_node, &sched->heap,
+ tf_mtime() + milliseconds);
+
+ return tf_uctx_transfer(f, schedf, TF_WAKEUP_IMMEDIATE);
}
void tf_wakeup(struct tf_fiber *fiber, int wakeup_type)
@@ -204,9 +193,10 @@ void tf_exit(void)
{
struct tf_scheduler *sched = tf_get_scheduler();
struct tf_fiber *f = sched->active_fiber;
+ struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data);
tf_heap_delete(&f->heap_node, &sched->heap);
- tf_schedule(TF_WAKEUP_KILL);
+ tf_uctx_transfer(f, schedf, TF_WAKEUP_KILL);
TF_BUG_ON(1);
}
@@ -220,12 +210,6 @@ int tf_yield(void)
struct tf_fiber *f = sched->active_fiber;
tf_list_add_tail(&f->queue_node, &sched->run_q);
- return tf_schedule(TF_WAKEUP_IMMEDIATE);
-}
-
-int tf_msleep(int milliseconds)
-{
- tf_schedule_timeout(milliseconds);
- return tf_schedule(TF_WAKEUP_TIMEOUT);
+ return tf_schedule(TF_NO_TIMEOUT);
}
diff --git a/src/io-epoll.c b/src/io-epoll.c
index 56d0743..5e28de8 100644
--- a/src/io-epoll.c
+++ b/src/io-epoll.c
@@ -14,15 +14,12 @@
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
+#include <sys/socket.h>
#include <libtf/io.h>
#include <libtf/fiber.h>
-#define TF_FD_AUTOCLOSE 1
-#define TF_FD_RESTORE_BLOCKING 2
-#define TF_FD_STREAM_ORIENTED 4
-
-static int tf_fd_init(struct tf_fd *fd, int kfd, int flags)
+static int tf_fd_created(struct tf_fd *fd)
{
struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
struct epoll_event ev;
@@ -30,20 +27,28 @@ static int tf_fd_init(struct tf_fd *fd, int kfd, int flags)
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = fd;
- r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, kfd, &ev);
- if (r < 0) {
+ r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+ if (unlikely(r < 0)) {
TF_BUG_ON(errno == EEXIST);
- return -errno;
+ r = -errno;
+ return r;
}
- fd->fd = kfd;
- fd->flags = flags;
- fd->waiting_fiber = NULL;
+ return 0;
+}
+static int tf_fd_destroyed(struct tf_fd *fd)
+{
+ struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
+
+ if (fd->flags & TF_FD_AUTOCLOSE)
+ return 0;
+
+ epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
return 0;
}
-static void tf_fd_wait(struct tf_fd *fd, int events)
+static void tf_fd_monitor(struct tf_fd *fd, int events)
{
struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
@@ -53,7 +58,7 @@ static void tf_fd_wait(struct tf_fd *fd, int events)
pd->num_waiters++;
}
-static void tf_fd_release(struct tf_fd *fd)
+static void tf_fd_unmonitor(struct tf_fd *fd)
{
struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
@@ -76,21 +81,23 @@ int tf_poll(tf_mtime_diff_t timeout)
struct tf_poll_data *pd = &tf_get_scheduler()->poll_data;
struct epoll_event events[64];
struct tf_fd *fd;
- int ret = (timeout == 0) ? TF_WAKEUP_TIMEOUT : TF_WAKEUP_FD;
- int r, i;
+ int r, i, ret;
if (timeout == 0 && pd->num_waiters == 0)
- return ret;
+ return TF_WAKEUP_TIMEOUT;
+ ret = TF_WAKEUP_TIMEOUT;
do {
r = epoll_wait(pd->epoll_fd, events, array_size(events), timeout);
+ if (r == 0)
+ break;
+
for (i = 0; i < r; i++) {
fd = (struct tf_fd *) events[i].data.ptr;
if (likely(fd->events & events[i].events))
tf_wakeup(fd->waiting_fiber, TF_WAKEUP_FD);
}
- if (timeout != 0)
- ret = TF_WAKEUP_FD;
+ ret = TF_WAKEUP_FD;
timeout = 0;
} while (unlikely(r == array_size(events)));
diff --git a/src/io-unix.c b/src/io-unix.c
index d333122..cc1bd1b 100644
--- a/src/io-unix.c
+++ b/src/io-unix.c
@@ -10,46 +10,82 @@
* See http://www.gnu.org/ for details.
*/
-int tf_open(struct tf_fd *fd, const char *pathname, int flags)
+#include <string.h>
+
+static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
{
- int kfd, r;
+ if (addr == NULL)
+ return 0;
+ switch (addr->u.addr.sa_family) {
+ case AF_INET:
+ return sizeof(struct sockaddr_in);
+ case AF_INET6:
+ return sizeof(struct sockaddr_in6);
+ default:
+ TF_BUG_ON(1);
+ }
+}
- kfd = open(pathname, flags | O_CLOEXEC | O_NONBLOCK);
- if (unlikely(kfd < 0))
- return -errno;
+int tf_open_fd(struct tf_fd *fd, int kfd, int flags)
+{
+ int r;
+
+ fd->fd = kfd;
+ fd->flags = flags;
+ fd->waiting_fiber = NULL;
- r = tf_fd_init(fd, kfd, TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED);
+ r = tf_fd_created(fd);
if (r < 0) {
- close(kfd);
+ if (flags & TF_FD_AUTOCLOSE)
+ close(kfd);
+ fd->fd = -1;
return r;
}
+
+ if (flags & TF_FD_SET_CLOEXEC)
+ fcntl(kfd, F_SETFD, FD_CLOEXEC);
+
+ if (!(flags & TF_FD_ALREADY_NONBLOCKING)) {
+ r = fcntl(kfd, F_GETFL, 0);
+ if (r & O_NONBLOCK)
+ fd->flags |= TF_FD_ALREADY_NONBLOCKING;
+ else
+ fcntl(kfd, F_SETFL, O_NONBLOCK | r);
+ }
+
return 0;
}
-int tf_open_fd(struct tf_fd *fd, int kfd)
+int tf_open(struct tf_fd *fd, const char *pathname, int flags)
{
- int mode, flags = 0;
+ int kfd, tfdf;
- mode = fcntl(kfd, F_GETFL, 0);
- if (!(mode & O_NONBLOCK)) {
- fcntl(fd->fd, F_SETFL, mode | O_NONBLOCK);
- flags |= TF_FD_RESTORE_BLOCKING;
- }
+ tfdf = TF_FD_AUTOCLOSE | TF_FD_STREAM_ORIENTED
+ | TF_FD_ALREADY_NONBLOCKING;
+#if defined(O_CLOEXEC)
+ flags |= O_CLOEXEC;
+#else
+ tfdf |= TF_FD_SET_CLOEXEC;
+#endif
+
+ kfd = open(pathname, flags | O_NONBLOCK);
+ if (unlikely(kfd < 0))
+ return -errno;
- return tf_fd_init(fd, kfd, TF_FD_STREAM_ORIENTED | flags);
+ return tf_open_fd(fd, kfd, tfdf);
}
int tf_close(struct tf_fd *fd)
{
int r;
- if (fd->flags & TF_FD_RESTORE_BLOCKING) {
- fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK);
- }
+ tf_fd_destroyed(fd);
if (fd->flags & TF_FD_AUTOCLOSE) {
r = close(fd->fd);
if (unlikely(r == -1))
return -errno;
+ } else if (!(fd->flags & TF_FD_ALREADY_NONBLOCKING)) {
+ fcntl(fd->fd, F_SETFL, fcntl(fd->fd, F_GETFL, 0) & ~O_NONBLOCK);
}
return 0;
}
@@ -57,10 +93,9 @@ int tf_close(struct tf_fd *fd)
int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout)
{
ssize_t n;
- int r, mode;
+ int r;
- mode = tf_schedule_timeout(timeout);
- tf_fd_wait(fd, EPOLLIN);
+ tf_fd_monitor(fd, EPOLLIN);
do {
n = read(fd->fd, buf, count);
if (n == count) {
@@ -84,11 +119,10 @@ int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout)
continue;
}
- r = tf_schedule(mode);
- if (r != TF_WAKEUP_FD)
- break;
- } while (1);
- tf_fd_release(fd);
+ r = tf_schedule(timeout);
+ timeout = TF_NO_TIMEOUT_CHANGE;
+ } while (r == TF_WAKEUP_FD);
+ tf_fd_unmonitor(fd);
return -r;
}
@@ -97,10 +131,9 @@ int tf_write(struct tf_fd *fd, const void *buf, size_t count,
tf_mtime_diff_t timeout)
{
ssize_t n;
- int r, mode;
+ int r;
- mode = tf_schedule_timeout(timeout);
- tf_fd_wait(fd, EPOLLOUT);
+ tf_fd_monitor(fd, EPOLLOUT);
do {
n = write(fd->fd, buf, count);
if (n == count) {
@@ -121,11 +154,220 @@ int tf_write(struct tf_fd *fd, const void *buf, size_t count,
continue;
}
- r = tf_schedule(mode);
- if (r != TF_WAKEUP_FD)
- break;
- } while (1);
- tf_fd_release(fd);
+ r = tf_schedule(timeout);
+ timeout = TF_NO_TIMEOUT_CHANGE;
+ } while (r == TF_WAKEUP_FD);
+ tf_fd_unmonitor(fd);
return -r;
}
+
+int tf_socket(struct tf_fd *fd, int domain, int type, int protocol)
+{
+ int kfd, tfdf, on = 1;
+
+ tfdf = TF_FD_AUTOCLOSE;
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+ type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
+ tfdf |= TF_FD_ALREADY_NONBLOCKING;
+#else
+ tfdf |= TF_FD_SET_CLOEXEC;
+#endif
+ kfd = socket(domain, type, protocol);
+ if (unlikely(kfd < 0))
+ return -errno;
+
+ if (protocol == SOCK_STREAM) {
+ tfdf |= TF_FD_STREAM_ORIENTED;
+ } else {
+ /* We want the local IP info too */
+ setsockopt(kfd, IPPROTO_IP, IP_PKTINFO, &on, sizeof(on));
+ }
+
+ return tf_open_fd(fd, kfd, tfdf);
+}
+
+int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr)
+{
+ if (unlikely(bind(fd->fd, &addr->u.addr, tf_sockaddr_len(addr)) < 0))
+ return -errno;
+ return 0;
+}
+
+int tf_listen(struct tf_fd *fd, int backlog)
+{
+ if (unlikely(listen(fd->fd, backlog) < 0))
+ return -errno;
+ return 0;
+}
+
+int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
+ struct tf_sockaddr *from, tf_mtime_diff_t timeout)
+{
+ int r, tfdf;
+ struct sockaddr *addr = NULL;
+ socklen_t al, *pal = NULL;
+
+ if (from != NULL) {
+ addr = &from->u.addr;
+ al = tf_sockaddr_len(from);
+ pal = &al;
+ }
+
+ tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED);
+ tfdf |= TF_FD_SET_CLOEXEC;
+
+ tf_fd_monitor(listen_fd, EPOLLIN);
+ do {
+ /* FIXME: use accept4 if available */
+ r = accept(listen_fd->fd, addr, pal);
+ if (r >= 0)
+ break;
+ if (errno == EINTR)
+ continue;
+ if (errno != EAGAIN) {
+ tf_fd_unmonitor(listen_fd);
+ return -errno;
+ }
+ r = tf_schedule(timeout);
+ timeout = TF_NO_TIMEOUT_CHANGE;
+ } while (r == TF_WAKEUP_FD);
+ tf_fd_unmonitor(listen_fd);
+ if (r < 0)
+ return r;
+
+ return tf_open_fd(child_fd, r, tfdf);
+}
+
+int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to,
+ tf_mtime_diff_t timeout)
+{
+ socklen_t l = sizeof(int);
+ int r, err;
+
+ /* Initiate operation */
+ r = connect(fd->fd, &to->u.addr, tf_sockaddr_len(to));
+ if (r >= 0)
+ return 0;
+ if (errno != EINPROGRESS)
+ return -errno;
+
+ /* Wait for socket to become readable */
+ tf_fd_monitor(fd, EPOLLOUT);
+ r = tf_schedule(timeout);
+ tf_fd_unmonitor(fd);
+ if (r != TF_WAKEUP_FD)
+ return r;
+
+ /* Check for error */
+ if (getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &err, &l) < 0)
+ return -errno;
+ return -err;
+}
+
+ssize_t tf_recvmsg(struct tf_fd *fd,
+ struct tf_sockaddr *from,
+ struct tf_sockaddr *to,
+ void *buf, size_t len,
+ tf_mtime_diff_t timeout)
+{
+ struct iovec iov;
+ struct msghdr msg;
+ struct cmsghdr *cmsg;
+ char control[128];
+ int r;
+
+ iov = (struct iovec) {
+ .iov_base = buf,
+ .iov_len = len,
+ };
+ msg = (struct msghdr) {
+ .msg_name = &from->u.addr,
+ .msg_namelen = sizeof(struct tf_sockaddr),
+ .msg_control = control,
+ .msg_controllen = sizeof(control),
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ };
+
+ tf_fd_monitor(fd, EPOLLIN);
+ do {
+ r = recvmsg(fd->fd, &msg, 0);
+ if (r >= 0)
+ break;
+ if (errno != EAGAIN) {
+ r = -errno;
+ break;
+ }
+ r = tf_schedule(timeout);
+ timeout = TF_NO_TIMEOUT_CHANGE;
+ } while (r == TF_WAKEUP_FD);
+ tf_fd_unmonitor(fd);
+
+ if (r < 0 || to == NULL)
+ return r;
+
+ to->u.addr.sa_family = AF_UNSPEC;
+ for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ if (cmsg->cmsg_level == IPPROTO_IP &&
+ cmsg->cmsg_type == IP_PKTINFO) {
+ struct in_pktinfo *ipi = (struct in_pktinfo *) CMSG_DATA(cmsg);
+ to->u.in.sin_family = AF_INET;
+ to->u.in.sin_addr = ipi->ipi_spec_dst;
+ to->u.in.sin_port = 0;
+ }
+ }
+
+ return r;
+}
+
+ssize_t tf_sendmsg(struct tf_fd *fd,
+ struct tf_sockaddr *from,
+ const struct tf_sockaddr *to,
+ const void *buf, size_t len,
+ tf_mtime_diff_t timeout)
+{
+ struct msghdr msg;
+ struct iovec iov;
+ struct {
+ struct cmsghdr cm;
+ struct in_pktinfo ipi;
+ } cmsg;
+ int r;
+
+ iov = (struct iovec) {
+ .iov_base = (void*) buf,
+ .iov_len = len,
+ };
+ msg = (struct msghdr) {
+ .msg_name = (struct sockaddr*) &to->u.addr,
+ .msg_namelen = tf_sockaddr_len(to),
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ };
+ if (from != NULL && from->u.addr.sa_family == AF_INET) {
+ memset(&cmsg.ipi, 0, sizeof(cmsg.ipi));
+ cmsg.cm.cmsg_len = sizeof(cmsg);
+ cmsg.cm.cmsg_level = IPPROTO_IP;
+ cmsg.cm.cmsg_type = IP_PKTINFO;
+ cmsg.ipi.ipi_spec_dst = from->u.in.sin_addr;
+ msg.msg_control = (void *) &cmsg;
+ msg.msg_controllen = sizeof(cmsg);
+ }
+
+ tf_fd_monitor(fd, EPOLLOUT);
+ do {
+ r = sendmsg(fd->fd, &msg, 0);
+ if (r >= 0)
+ break;
+ if (errno != EAGAIN) {
+ r = -errno;
+ break;
+ }
+ r = tf_schedule(timeout);
+ timeout = TF_NO_TIMEOUT_CHANGE;
+ } while (r == TF_WAKEUP_FD);
+ tf_fd_unmonitor(fd);
+
+ return r;
+}
diff --git a/test/read.c b/test/read.c
index 1921609..97f25fd 100644
--- a/test/read.c
+++ b/test/read.c
@@ -22,9 +22,9 @@ static void io_fiber(void *ptr)
char data[8];
struct tf_fd fin;
- tf_open_fd(&fin, STDIN_FILENO);
+ tf_open_fd(&fin, STDIN_FILENO, TF_FD_STREAM_ORIENTED);
while (1) {
- if (tf_read(&fin, data, sizeof(data), TF_INFINITE) < 0)
+ if (tf_read(&fin, data, sizeof(data), TF_NO_TIMEOUT) < 0)
break;
printf("Read: %8.8s\n", data);
}