summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimo Teras <timo.teras@iki.fi>2009-11-26 09:35:49 +0200
committerTimo Teras <timo.teras@iki.fi>2009-11-26 09:37:24 +0200
commitaa530f352b0410150bfe94c821ae32c1378b9d02 (patch)
treefb27f277db0c7feaaf12ce43169d3b0b44e95c0f
parent4db830052d941d9c6de281bc9a2f6ac212c59ad8 (diff)
downloadlibtf-aa530f352b0410150bfe94c821ae32c1378b9d02.tar.bz2
libtf-aa530f352b0410150bfe94c821ae32c1378b9d02.tar.xz
libtf: stackable timeouts
instead of having per-function argument, use a push/pop mechanism: - multiple timers inside fiber use only one heap entry - easy to chain multiple possibly blocking operations inside one timeout block
-rw-r--r--include/libtf/fiber.h55
-rw-r--r--include/libtf/heap.h6
-rw-r--r--include/libtf/io.h19
-rw-r--r--src/fiber.c63
-rw-r--r--src/io-unix.c45
-rw-r--r--test/httpget.c13
-rw-r--r--test/read.c2
7 files changed, 132 insertions, 71 deletions
diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h
index c3d87c6..ce3745b 100644
--- a/include/libtf/fiber.h
+++ b/include/libtf/fiber.h
@@ -19,27 +19,22 @@
#include <libtf/list.h>
#include <libtf/heap.h>
-#define TF_UCTX_H "uctx-setjmp.h"
-
/* Fiber wakeup reasons */
#define TF_WAKEUP_NONE 0
#define TF_WAKEUP_IMMEDIATE -EAGAIN
#define TF_WAKEUP_KILL -EINTR
-#define TF_WAKEUP_TIMEOUT -ETIMEDOUT
+#define TF_WAKEUP_TIMEOUT -ETIME
+#define TF_WAKEUP_THIS_TIMEOUT -ETIMEDOUT
#define TF_WAKEUP_FD -EIO
-/* Special timeouts for tf_schedule() */
-#define TF_NO_TIMEOUT -1
-#define TF_NO_TIMEOUT_CHANGE -2
+struct tf_poll_data {
+ int epoll_fd;
+ int num_waiters;
+};
/* Scheduler */
struct tf_fiber;
-struct tf_poll_data {
- int epoll_fd;
- int num_waiters;
-};
-
struct tf_scheduler {
struct tf_list_head run_q;
struct tf_heap_head heap;
@@ -47,7 +42,6 @@ struct tf_scheduler {
int num_fibers;
tf_mtime_t scheduler_time;
struct tf_poll_data poll_data;
-
};
struct tf_main_ctx {
@@ -55,6 +49,11 @@ struct tf_main_ctx {
char ** argv;
};
+struct tf_timeout {
+ tf_mtime_t saved_timeout;
+ unsigned int timeout_change;
+};
+
static inline
struct tf_scheduler *tf_get_scheduler(void)
{
@@ -86,18 +85,38 @@ void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size);
void *tf_fiber_get(void *data);
void tf_fiber_put(void *data);
-/* Scheduling and fiber management */
-void tf_exit(void) attribute_noreturn;
-void tf_kill(void *fiber);
+#define tf_timed(func, timeout) \
+ ({ \
+ struct tf_timeout __timeout; \
+ tf_timeout_push(&__timeout, timeout); \
+ tf_timeout_pop(&__timeout, (func)); \
+ })
+
+//* Scheduling and fiber management */
+void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds);
+int __tf_timeout_pop(struct tf_timeout *timeout, int err);
+
+static inline int tf_timeout_pop(struct tf_timeout *timeout, int err)
+{
+ if (unlikely(timeout->timeout_change))
+ return __tf_timeout_pop(timeout, err);
+ return err;
+}
-int tf_schedule(int milliseconds);
+int tf_schedule(void);
void tf_wakeup(struct tf_fiber *fiber, int wakeup_type);
+void tf_exit(void) attribute_noreturn;
+void tf_kill(void *fiber);
int tf_yield(void);
static inline
-int tf_msleep(int milliseconds)
+int tf_msleep(tf_mtime_diff_t milliseconds)
{
- return tf_schedule(milliseconds);
+ int r;
+ r = tf_timed(tf_schedule(), milliseconds);
+ if (r == TF_WAKEUP_THIS_TIMEOUT)
+ r = 0;
+ return r;
}
#endif
diff --git a/include/libtf/heap.h b/include/libtf/heap.h
index 24f4767..a68e01d 100644
--- a/include/libtf/heap.h
+++ b/include/libtf/heap.h
@@ -61,6 +61,12 @@ int tf_heap_empty(struct tf_heap_head *head)
}
static inline
+int tf_heap_node_active(struct tf_heap_node *node)
+{
+ return node->index != 0;
+}
+
+static inline
int tf_heap_prealloc(struct tf_heap_head *head, uint32_t size)
{
if (unlikely(head->num_items + TF_HEAP_ITEM0 >= head->allocated))
diff --git a/include/libtf/io.h b/include/libtf/io.h
index 38dd541..1f0b793 100644
--- a/include/libtf/io.h
+++ b/include/libtf/io.h
@@ -52,27 +52,26 @@ void tf_poll_close(void);
int tf_open_fd(struct tf_fd *fd, int kfd, int flags);
int tf_open(struct tf_fd *fd, const char *pathname, int flags);
int tf_close(struct tf_fd *fd);
-ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout);
-ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout);
-int tf_read_fully(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout);
-int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout);
+ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count);
+ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count);
+int tf_read_fully(struct tf_fd *fd, void *buf, size_t count);
+int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count);
int tf_socket(struct tf_fd *fd, int domain, int type, int protocol);
int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr);
int tf_listen(struct tf_fd *fd, int backlog);
int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
- struct tf_sockaddr *from, tf_mtime_diff_t timeout);
-int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *addr, tf_mtime_diff_t timeout);
+ struct tf_sockaddr *from);
+int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *addr);
ssize_t tf_recvmsg(struct tf_fd *fd,
struct tf_sockaddr *from, struct tf_sockaddr *to,
- void *buf, size_t count, tf_mtime_diff_t timeout);
+ void *buf, size_t count);
ssize_t tf_sendmsg(struct tf_fd *fd,
struct tf_sockaddr *from, const struct tf_sockaddr *to,
- const void *buf, size_t count, tf_mtime_diff_t timeout);
+ const void *buf, size_t count);
-int tf_query_dns(const char *name, int num_res, struct tf_sockaddr *res,
- tf_mtime_diff_t timeout);
+int tf_query_dns(const char *name, int num_res, struct tf_sockaddr *res);
#endif
diff --git a/src/fiber.c b/src/fiber.c
index b678842..3f8bb15 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -16,15 +16,20 @@
#include <libtf/fiber.h>
#include <libtf/io.h>
+#define TF_TIMEOUT_CHANGE_NEEDED 1
+#define TF_TIMEOUT_CHANGE_NEW_VALUE 2
+
struct tf_fiber {
unsigned int ref_count;
int wakeup_type;
+ unsigned int timeout_change;
+ tf_mtime_t timeout;
struct tf_list_node queue_node;
struct tf_heap_node heap_node;
char data[TF_EMPTY_ARRAY];
};
-#include TF_UCTX_H
+#include "uctx-setjmp.h"
/* FIXME: should be in thread local storage */
struct tf_scheduler *__tf_scheduler;
@@ -108,6 +113,8 @@ static void process_heap(struct tf_scheduler *sched)
tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) {
node = tf_heap_get_node(&sched->heap);
f = container_of(node, struct tf_fiber, heap_node);
+ if (f->wakeup_type == TF_WAKEUP_NONE)
+ f->wakeup_type = TF_WAKEUP_TIMEOUT;
run_fiber(sched, f);
}
}
@@ -170,18 +177,58 @@ int tf_main_args(tf_fiber_proc main_fiber, int argc, char **argv)
return 0;
}
-int tf_schedule(tf_mtime_diff_t milliseconds)
+void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds)
+{
+ struct tf_fiber *f = tf_get_fiber();
+ tf_mtime_t abs = tf_mtime() + milliseconds;
+ int active;
+
+ if (f->timeout_change)
+ active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE);
+ else
+ active = tf_heap_node_active(&f->heap_node);
+
+ if (!active || tf_mtime_diff(abs, f->timeout) < 0) {
+ /* Save previous timeout */
+ timeout->saved_timeout = f->timeout;
+ timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
+ if (active)
+ timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE;
+
+ /* Make new timeout pending */
+ f->timeout = abs;
+ f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED
+ | TF_TIMEOUT_CHANGE_NEW_VALUE;
+ } else {
+ timeout->timeout_change = 0;
+ }
+}
+
+int __tf_timeout_pop(struct tf_timeout *timeout, int err)
+{
+ struct tf_fiber *f = tf_get_fiber();
+
+ f->timeout = timeout->saved_timeout;
+ f->timeout_change = timeout->timeout_change;
+ if (err == TF_WAKEUP_TIMEOUT)
+ err = TF_WAKEUP_THIS_TIMEOUT;
+ return err;
+}
+
+int tf_schedule(void)
{
struct tf_scheduler *sched = tf_get_scheduler();
struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data);
struct tf_fiber *f = sched->active_fiber;
+ if (unlikely(f->timeout_change)) {
+ if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE)
+ tf_heap_change(&f->heap_node, &sched->heap, f->timeout);
+ else
+ tf_heap_delete(&f->heap_node, &sched->heap);
+ f->timeout_change = 0;
+ }
f->wakeup_type = TF_WAKEUP_NONE;
- if (milliseconds == TF_NO_TIMEOUT)
- tf_heap_delete(&f->heap_node, &sched->heap);
- else if (milliseconds != TF_NO_TIMEOUT_CHANGE)
- tf_heap_change(&f->heap_node, &sched->heap,
- tf_mtime() + milliseconds);
return tf_uctx_transfer(f, schedf, TF_WAKEUP_IMMEDIATE);
}
@@ -217,6 +264,6 @@ int tf_yield(void)
struct tf_fiber *f = sched->active_fiber;
tf_list_add_tail(&f->queue_node, &sched->run_q);
- return tf_schedule(TF_NO_TIMEOUT);
+ return tf_schedule();
}
diff --git a/src/io-unix.c b/src/io-unix.c
index 8ed8e42..ea65a76 100644
--- a/src/io-unix.c
+++ b/src/io-unix.c
@@ -90,8 +90,7 @@ int tf_close(struct tf_fd *fd)
return 0;
}
-int tf_read_fully(struct tf_fd *fd, void *buf, size_t count,
- tf_mtime_diff_t timeout)
+int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
{
ssize_t n;
int r;
@@ -120,16 +119,14 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count,
continue;
}
- r = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ r = tf_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
return -r;
}
-int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count,
- tf_mtime_diff_t timeout)
+int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
{
ssize_t n;
int r;
@@ -155,15 +152,14 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count,
continue;
}
- r = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ r = tf_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
return -r;
}
-ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout)
+ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
{
ssize_t n;
@@ -178,16 +174,14 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeo
n = -errno;
break;
}
- n = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ n = tf_schedule();
} while (n == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
return n;
}
-ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count,
- tf_mtime_diff_t timeout)
+ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
{
ssize_t n;
@@ -202,8 +196,7 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count,
n = -errno;
break;
}
- n = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ n = tf_schedule();
} while (n == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
@@ -250,7 +243,7 @@ int tf_listen(struct tf_fd *fd, int backlog)
}
int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
- struct tf_sockaddr *from, tf_mtime_diff_t timeout)
+ struct tf_sockaddr *from)
{
int r, tfdf;
struct sockaddr *addr = NULL;
@@ -277,8 +270,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
tf_fd_unmonitor(listen_fd);
return -errno;
}
- r = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ r = tf_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(listen_fd);
if (r < 0)
@@ -287,8 +279,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
return tf_open_fd(child_fd, r, tfdf);
}
-int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to,
- tf_mtime_diff_t timeout)
+int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
{
socklen_t l = sizeof(int);
int r, err;
@@ -302,7 +293,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to,
/* Wait for socket to become readable */
tf_fd_monitor(fd, EPOLLOUT);
- r = tf_schedule(timeout);
+ r = tf_schedule();
tf_fd_unmonitor(fd);
if (r != TF_WAKEUP_FD)
return r;
@@ -316,8 +307,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to,
ssize_t tf_recvmsg(struct tf_fd *fd,
struct tf_sockaddr *from,
struct tf_sockaddr *to,
- void *buf, size_t len,
- tf_mtime_diff_t timeout)
+ void *buf, size_t len)
{
struct iovec iov;
struct msghdr msg;
@@ -347,8 +337,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ r = tf_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
@@ -372,8 +361,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
ssize_t tf_sendmsg(struct tf_fd *fd,
struct tf_sockaddr *from,
const struct tf_sockaddr *to,
- const void *buf, size_t len,
- tf_mtime_diff_t timeout)
+ const void *buf, size_t len)
{
struct msghdr msg;
struct iovec iov;
@@ -412,8 +400,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ r = tf_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
diff --git a/test/httpget.c b/test/httpget.c
index c1e37a3..fed6c06 100644
--- a/test/httpget.c
+++ b/test/httpget.c
@@ -15,7 +15,7 @@ static void ping_fiber(void *ptr)
struct tf_sockaddr host;
struct tf_fd fd;
char buf[128];
- int bytes = 0, r = 0;
+ int bytes = 0, r;
const char *req = "GET / HTTP/1.0\r\n\r\n";
printf("Lookup %s\n", ctx->hostname);
@@ -23,16 +23,19 @@ static void ping_fiber(void *ptr)
host.u.in.sin_addr.s_addr = inet_addr(ctx->hostname);
host.u.in.sin_port = htons(80);
- if (tf_socket(&fd, AF_INET, SOCK_STREAM, 0) < 0)
+ r = tf_socket(&fd, AF_INET, SOCK_STREAM, 0);
+ if (r < 0)
goto err;
- if ((r = tf_connect(&fd, &host, 10000)) < 0)
+ r = tf_timed(tf_connect(&fd, &host), 10000);
+ if (r < 0)
goto err_close;
- if ((r = tf_write_fully(&fd, req, strlen(req), 10000)) < 0)
+ r = tf_write_fully(&fd, req, strlen(req));
+ if (r < 0)
goto err_close;
- while ((r = tf_read(&fd, buf, sizeof(buf), 10000)) > 0)
+ while ((r = tf_read(&fd, buf, sizeof(buf))) > 0)
bytes += r;
err_close:
tf_close(&fd);
diff --git a/test/read.c b/test/read.c
index 97f25fd..6d8306b 100644
--- a/test/read.c
+++ b/test/read.c
@@ -24,7 +24,7 @@ static void io_fiber(void *ptr)
tf_open_fd(&fin, STDIN_FILENO, TF_FD_STREAM_ORIENTED);
while (1) {
- if (tf_read(&fin, data, sizeof(data), TF_NO_TIMEOUT) < 0)
+ if (tf_read_fully(&fin, data, sizeof(data)) < 0)
break;
printf("Read: %8.8s\n", data);
}