summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTimo Teras <timo.teras@iki.fi>2009-11-26 09:35:49 +0200
committerTimo Teras <timo.teras@iki.fi>2009-11-26 09:37:24 +0200
commitaa530f352b0410150bfe94c821ae32c1378b9d02 (patch)
treefb27f277db0c7feaaf12ce43169d3b0b44e95c0f /src
parent4db830052d941d9c6de281bc9a2f6ac212c59ad8 (diff)
downloadlibtf-aa530f352b0410150bfe94c821ae32c1378b9d02.tar.bz2
libtf-aa530f352b0410150bfe94c821ae32c1378b9d02.tar.xz
libtf: stackable timeouts
instead of having per-function argument, use a push/pop mechanism: - multiple timers inside fiber use only one heap entry - easy to chain multiple possibly blocking operations inside one timeout block
Diffstat (limited to 'src')
-rw-r--r--src/fiber.c63
-rw-r--r--src/io-unix.c45
2 files changed, 71 insertions, 37 deletions
diff --git a/src/fiber.c b/src/fiber.c
index b678842..3f8bb15 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -16,15 +16,20 @@
#include <libtf/fiber.h>
#include <libtf/io.h>
+#define TF_TIMEOUT_CHANGE_NEEDED 1
+#define TF_TIMEOUT_CHANGE_NEW_VALUE 2
+
struct tf_fiber {
unsigned int ref_count;
int wakeup_type;
+ unsigned int timeout_change;
+ tf_mtime_t timeout;
struct tf_list_node queue_node;
struct tf_heap_node heap_node;
char data[TF_EMPTY_ARRAY];
};
-#include TF_UCTX_H
+#include "uctx-setjmp.h"
/* FIXME: should be in thread local storage */
struct tf_scheduler *__tf_scheduler;
@@ -108,6 +113,8 @@ static void process_heap(struct tf_scheduler *sched)
tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) {
node = tf_heap_get_node(&sched->heap);
f = container_of(node, struct tf_fiber, heap_node);
+ if (f->wakeup_type == TF_WAKEUP_NONE)
+ f->wakeup_type = TF_WAKEUP_TIMEOUT;
run_fiber(sched, f);
}
}
@@ -170,18 +177,58 @@ int tf_main_args(tf_fiber_proc main_fiber, int argc, char **argv)
return 0;
}
-int tf_schedule(tf_mtime_diff_t milliseconds)
+void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds)
+{
+ struct tf_fiber *f = tf_get_fiber();
+ tf_mtime_t abs = tf_mtime() + milliseconds;
+ int active;
+
+ if (f->timeout_change)
+ active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE);
+ else
+ active = tf_heap_node_active(&f->heap_node);
+
+ if (!active || tf_mtime_diff(abs, f->timeout) < 0) {
+ /* Save previous timeout */
+ timeout->saved_timeout = f->timeout;
+ timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
+ if (active)
+ timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE;
+
+ /* Make new timeout pending */
+ f->timeout = abs;
+ f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED
+ | TF_TIMEOUT_CHANGE_NEW_VALUE;
+ } else {
+ timeout->timeout_change = 0;
+ }
+}
+
+int __tf_timeout_pop(struct tf_timeout *timeout, int err)
+{
+ struct tf_fiber *f = tf_get_fiber();
+
+ f->timeout = timeout->saved_timeout;
+ f->timeout_change = timeout->timeout_change;
+ if (err == TF_WAKEUP_TIMEOUT)
+ err = TF_WAKEUP_THIS_TIMEOUT;
+ return err;
+}
+
+int tf_schedule(void)
{
struct tf_scheduler *sched = tf_get_scheduler();
struct tf_fiber *schedf = container_of((void*) sched, struct tf_fiber, data);
struct tf_fiber *f = sched->active_fiber;
+ if (unlikely(f->timeout_change)) {
+ if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE)
+ tf_heap_change(&f->heap_node, &sched->heap, f->timeout);
+ else
+ tf_heap_delete(&f->heap_node, &sched->heap);
+ f->timeout_change = 0;
+ }
f->wakeup_type = TF_WAKEUP_NONE;
- if (milliseconds == TF_NO_TIMEOUT)
- tf_heap_delete(&f->heap_node, &sched->heap);
- else if (milliseconds != TF_NO_TIMEOUT_CHANGE)
- tf_heap_change(&f->heap_node, &sched->heap,
- tf_mtime() + milliseconds);
return tf_uctx_transfer(f, schedf, TF_WAKEUP_IMMEDIATE);
}
@@ -217,6 +264,6 @@ int tf_yield(void)
struct tf_fiber *f = sched->active_fiber;
tf_list_add_tail(&f->queue_node, &sched->run_q);
- return tf_schedule(TF_NO_TIMEOUT);
+ return tf_schedule();
}
diff --git a/src/io-unix.c b/src/io-unix.c
index 8ed8e42..ea65a76 100644
--- a/src/io-unix.c
+++ b/src/io-unix.c
@@ -90,8 +90,7 @@ int tf_close(struct tf_fd *fd)
return 0;
}
-int tf_read_fully(struct tf_fd *fd, void *buf, size_t count,
- tf_mtime_diff_t timeout)
+int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
{
ssize_t n;
int r;
@@ -120,16 +119,14 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count,
continue;
}
- r = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ r = tf_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
return -r;
}
-int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count,
- tf_mtime_diff_t timeout)
+int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
{
ssize_t n;
int r;
@@ -155,15 +152,14 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count,
continue;
}
- r = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ r = tf_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
return -r;
}
-ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout)
+ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
{
ssize_t n;
@@ -178,16 +174,14 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeo
n = -errno;
break;
}
- n = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ n = tf_schedule();
} while (n == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
return n;
}
-ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count,
- tf_mtime_diff_t timeout)
+ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
{
ssize_t n;
@@ -202,8 +196,7 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count,
n = -errno;
break;
}
- n = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ n = tf_schedule();
} while (n == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
@@ -250,7 +243,7 @@ int tf_listen(struct tf_fd *fd, int backlog)
}
int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
- struct tf_sockaddr *from, tf_mtime_diff_t timeout)
+ struct tf_sockaddr *from)
{
int r, tfdf;
struct sockaddr *addr = NULL;
@@ -277,8 +270,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
tf_fd_unmonitor(listen_fd);
return -errno;
}
- r = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ r = tf_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(listen_fd);
if (r < 0)
@@ -287,8 +279,7 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
return tf_open_fd(child_fd, r, tfdf);
}
-int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to,
- tf_mtime_diff_t timeout)
+int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
{
socklen_t l = sizeof(int);
int r, err;
@@ -302,7 +293,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to,
/* Wait for socket to become readable */
tf_fd_monitor(fd, EPOLLOUT);
- r = tf_schedule(timeout);
+ r = tf_schedule();
tf_fd_unmonitor(fd);
if (r != TF_WAKEUP_FD)
return r;
@@ -316,8 +307,7 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to,
ssize_t tf_recvmsg(struct tf_fd *fd,
struct tf_sockaddr *from,
struct tf_sockaddr *to,
- void *buf, size_t len,
- tf_mtime_diff_t timeout)
+ void *buf, size_t len)
{
struct iovec iov;
struct msghdr msg;
@@ -347,8 +337,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ r = tf_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);
@@ -372,8 +361,7 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
ssize_t tf_sendmsg(struct tf_fd *fd,
struct tf_sockaddr *from,
const struct tf_sockaddr *to,
- const void *buf, size_t len,
- tf_mtime_diff_t timeout)
+ const void *buf, size_t len)
{
struct msghdr msg;
struct iovec iov;
@@ -412,8 +400,7 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = tf_schedule(timeout);
- timeout = TF_NO_TIMEOUT_CHANGE;
+ r = tf_schedule();
} while (r == TF_WAKEUP_FD);
tf_fd_unmonitor(fd);