summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTimo Teräs <timo.teras@iki.fi>2010-07-02 20:23:07 +0300
committerTimo Teräs <timo.teras@iki.fi>2010-07-02 20:25:47 +0300
commit23b95bf1a15322c2f471b80c06cb65d9b2d2a282 (patch)
tree9bf12231db9591852e3b42ca24715d2cbaf6267b /src
parent0183e33d9a4759764716e771b85e19f7a997b8bd (diff)
downloadlibtf-23b95bf1a15322c2f471b80c06cb65d9b2d2a282.tar.bz2
libtf-23b95bf1a15322c2f471b80c06cb65d9b2d2a282.tar.xz
libtf: major redesign startedHEADmaster
the idea is to make libtf completely multi-threaded. meaning each fiber can be running concurrently in separate thread. quite a bit of framework is added for this and some atomic helpers are already introduced. however, io polling is busy polling now (will be soon in own thread) and timeouts are still more or less broken. oh, and the multithreading core is not there yet. basically we are currently mostly broken ;)
Diffstat (limited to 'src')
-rw-r--r--src/TFbuild2
-rw-r--r--src/fiber.c259
-rw-r--r--src/ifc.c57
-rw-r--r--src/io-epoll.c122
-rw-r--r--src/io-unix.c87
-rw-r--r--src/scheduler.c134
-rw-r--r--src/timeout.c120
-rw-r--r--src/uctx.h41
-rw-r--r--src/vmach.c120
9 files changed, 459 insertions, 483 deletions
diff --git a/src/TFbuild b/src/TFbuild
index 9277f9a..3ffa9ef 100644
--- a/src/TFbuild
+++ b/src/TFbuild
@@ -1,6 +1,6 @@
libs-y += libtf
-libtf-objs-y += fiber.o scheduler.o heap.o
+libtf-objs-y += fiber.o heap.o timeout.o vmach.o ifc.o
libtf-objs-$(OS_LINUX) += io-epoll.o mem-mmap.o
libtf-objs-$(OS_UNIX) += io-unix.o
diff --git a/src/fiber.c b/src/fiber.c
index e507815..bef7b81 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -13,31 +13,13 @@
#include <time.h>
#include <errno.h>
#include <unistd.h>
+#include <libtf/atomic.h>
#include <libtf/fiber.h>
-#include <libtf/scheduler.h>
+#include <libtf/vmach.h>
#include "uctx.h"
-#define TF_TIMEOUT_CHANGE_NEEDED 1
-#define TF_TIMEOUT_CHANGE_NEW_VALUE 2
-
-struct tf_fiber {
- unsigned int ref_count;
- struct tf_scheduler * scheduler;
- int wakeup_type;
- unsigned int timeout_change;
- tf_mtime_t timeout;
- struct tf_list_node queue_node;
- struct tf_heap_node heap_node;
- struct tf_uctx context;
- char data[TF_EMPTY_ARRAY];
-};
-
-static inline
-struct tf_fiber *tf_fiber_get_current(void)
-{
- void *data = tf_scheduler_get_current()->active_fiber;
- return container_of(data, struct tf_fiber, data);
-}
+#define TF_FIBERF_RUNNING TF_BIT(0)
+#define TF_FIBERF_WAKEUP_PENDING TF_BIT(1)
static void tf_fiber_main(void *user_data, void *arg)
{
@@ -48,45 +30,46 @@ static void tf_fiber_main(void *user_data, void *arg)
tf_fiber_exit();
}
-void *__tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
+void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
{
- struct tf_fiber *fiber;
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ struct tf_fiber *f;
- fiber = tf_uctx_create_embedded(
+ f = tf_uctx_create_embedded(
TF_STACK_SIZE,
sizeof(struct tf_fiber) + private_size,
offsetof(struct tf_fiber, context),
tf_fiber_main, fiber_main);
- if (fiber == NULL)
+ if (f == NULL)
return NULL;
- *fiber = (struct tf_fiber) {
+ *f = (struct tf_fiber) {
.ref_count = 1,
- .queue_node = TF_LIST_INITIALIZER(fiber->queue_node),
- .context = fiber->context,
+ .queue_node = TF_LIST_INITIALIZER(f->queue_node),
+ .context = f->context,
+ .timeout.manager = self ? self->timeout.manager : NULL,
+ .wakeup_q = TF_LIST_HEAD_INITIALIZER(f->wakeup_q),
};
- return fiber->data;
+ return f->data;
}
-void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size)
+int tf_fiber_run(void *fiber)
{
- struct tf_fiber *fiber;
- struct tf_scheduler *sched;
-
- sched = tf_scheduler_get_current();
- if (tf_heap_prealloc(&sched->heap, sched->num_fibers + 1) < 0)
- return NULL;
+ struct tf_timeout_manager *tm;
+ struct tf_fiber *f = container_of(fiber, struct tf_fiber, data);
- fiber = container_of(__tf_fiber_create(fiber_main, private_size),
- struct tf_fiber, data);
- sched->num_fibers++;
+ tm = f->timeout.manager;
+ if (tm != NULL) {
+ if (tf_heap_prealloc(&tm->heap, tm->num_fibers + 1) < 0)
+ return -ENOMEM;
+ tm->num_fibers++;
+ }
- fiber->scheduler = sched;
- fiber->wakeup_type = TF_WAKEUP_NONE;
- tf_list_add_tail(&fiber->queue_node, &sched->scheduled_q);
+ tf_fiber_wakeup(f);
+ tf_vmach_get_current()->num_user_fibers++;
- return tf_fiber_get(fiber->data);
+ return 0;
}
void *tf_fiber_get(void *data)
@@ -98,23 +81,8 @@ void *tf_fiber_get(void *data)
static void __tf_fiber_destroy(struct tf_fiber *fiber)
{
- struct tf_scheduler *sched = fiber->scheduler;
- int main_fiber, num_fibers;
-
- /* decrease first the number of fibers as we might be
- * killing the scheduler it self */
- num_fibers = --sched->num_fibers;
-
- main_fiber = (fiber->context.alloc == NULL);
- tf_heap_delete(&fiber->heap_node, &sched->heap);
- tf_uctx_destroy(&fiber->context);
- if (main_fiber)
- free(fiber);
-
- if (num_fibers == 1) {
- /* FIXME: Use proper fiber event*/
- __tf_fiber_wakeup(sched->main_fiber, TF_WAKEUP_IMMEDIATE);
- }
+ tf_heap_delete(&fiber->timeout.heap_node, &fiber->timeout.manager->heap);
+ tf_uctx_destroy(fiber->context);
}
void tf_fiber_put(void *data)
@@ -124,165 +92,60 @@ void tf_fiber_put(void *data)
__tf_fiber_destroy(fiber);
}
-void __tf_fiber_wakeup(void *data, int wakeup_type)
+void tf_fiber_wakeup(struct tf_fiber *f)
{
- struct tf_fiber *fiber = container_of(data, struct tf_fiber, data);
- struct tf_scheduler *sched = fiber->scheduler;
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ unsigned int newval, oldval;
- if (fiber->wakeup_type == TF_WAKEUP_NONE) {
- fiber->wakeup_type = wakeup_type;
- tf_list_add_tail(&fiber->queue_node, &sched->running_q);
- }
-}
+ do {
+ oldval = f->flags;
+ if (oldval & TF_FIBERF_WAKEUP_PENDING)
+ return;
+ newval = oldval | TF_FIBERF_WAKEUP_PENDING | TF_FIBERF_RUNNING;
+ } while (!tf_atomic_cmpxchg(&f->flags, oldval, newval));
-void __tf_fiber_wakeup_heapnode(struct tf_heap_node *node)
-{
- __tf_fiber_wakeup(container_of(node, struct tf_fiber, heap_node)->data,
- TF_WAKEUP_TIMEOUT);
+ if (!(oldval & TF_FIBERF_RUNNING))
+ tf_list_add_tail(&f->queue_node, &self->wakeup_q);
}
-int __tf_fiber_schedule(void)
+int tf_fiber_schedule(void)
{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current(), *nf;
- int wakeup;
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
- if (unlikely(f->timeout_change)) {
- if (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE) {
- if (tf_mtime_diff(f->timeout, tf_scheduler_get_mtime()) <= 0) {
- f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
- return TF_WAKEUP_TIMEOUT;
- }
- tf_heap_change(&f->heap_node, &sched->heap, f->timeout);
- } else
- tf_heap_delete(&f->heap_node, &sched->heap);
- f->timeout_change = 0;
- }
+ if (f->timeout.value != 0 &&
+ tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0)
+ return -ETIME;
- /* Figure out the next fibre to run */
- if (unlikely(tf_list_empty(&sched->scheduled_q))) {
- tf_list_splice_tail(&sched->running_q,
- &sched->scheduled_q);
- TF_BUG_ON(tf_list_empty(&sched->scheduled_q));
+ if (f->flags & TF_FIBERF_WAKEUP_PENDING) {
+ f->flags = TF_FIBERF_RUNNING;
+ return 0;
}
- nf = tf_list_entry(tf_list_pop(&sched->scheduled_q),
- struct tf_fiber, queue_node);
- sched->active_fiber = nf->data;
- tf_uctx_transfer(&f->context, &nf->context);
-
- wakeup = f->wakeup_type;
- f->wakeup_type = TF_WAKEUP_NONE;
-
- return wakeup;
-}
-
-int __tf_fiber_bind_scheduler(struct tf_scheduler *sched)
-{
- struct tf_fiber *f;
- f = malloc(sizeof(struct tf_fiber));
- if (f == NULL)
- return -ENOMEM;
+ if (f->timeout.value != f->timeout.latched)
+ tf_timeout_adjust(&f->timeout);
- /* Mark currently active main fiber as active */
- *f = (struct tf_fiber) {
- .ref_count = 1,
- .scheduler = sched,
- .queue_node = TF_LIST_INITIALIZER(f->queue_node),
- };
- tf_uctx_create_self(&f->context);
- sched->main_fiber = f->data;
- sched->active_fiber = f->data;
- sched->num_fibers++;
+ f->flags = 0;
+ tf_uctx_transfer(f->context, f->return_context);
+ f->flags = TF_FIBERF_RUNNING;
- /* Schedule scheduler fiber */
- f = container_of((void *) sched, struct tf_fiber, data);
- f->scheduler = sched;
- f->wakeup_type = TF_WAKEUP_IMMEDIATE;
- tf_list_add_tail(&f->queue_node, &sched->running_q);
-
- return 0;
-}
-
-int __tf_fiber_release_scheduler(struct tf_scheduler *sched)
-{
- struct tf_fiber *f;
-
- /* Detach scheduler */
- f = container_of((void *) sched, struct tf_fiber, data);
- tf_list_del(&f->queue_node);
-
- /* Detach main stack from this scheduler */
- f = container_of((void *) sched->main_fiber, struct tf_fiber, data);
- tf_fiber_put(sched->main_fiber);
- sched->main_fiber = NULL;
- sched->num_fibers--;
+ if (f->timeout.value != 0 &&
+ tf_mtime_diff(f->timeout.value, tf_mtime_now()) <= 0)
+ return -ETIME;
return 0;
}
void tf_fiber_exit(void)
{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current();
- struct tf_fiber *schedf = container_of((void *) sched, struct tf_fiber, data);
+ struct tf_fiber *f = tf_vmach_get_current_fiber();
- tf_heap_delete(&f->heap_node, &sched->heap);
- schedf->wakeup_type = TF_WAKEUP_KILL;
- tf_uctx_transfer(&f->context, &schedf->context);
+ if (f->timeout.manager != NULL)
+ tf_timeout_delete(&f->timeout);
+ tf_vmach_get_current()->num_user_fibers--;
+ tf_uctx_transfer(f->context, f->return_context);
TF_BUG_ON(1);
}
void tf_fiber_kill(void *fiber)
{
}
-
-int tf_fiber_yield(void)
-{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- struct tf_fiber *f = tf_fiber_get_current();
-
- TF_BUG_ON(tf_list_hashed(&f->queue_node));
- f->wakeup_type = TF_WAKEUP_IMMEDIATE;
- tf_list_add_tail(&f->queue_node, &sched->running_q);
-
- return __tf_fiber_schedule();
-}
-
-void tf_timeout_push(struct tf_timeout *timeout, tf_mtime_diff_t milliseconds)
-{
- struct tf_fiber *f = tf_fiber_get_current();
- tf_mtime_t abs = tf_scheduler_get_mtime() + milliseconds;
- int active;
-
- if (f->timeout_change)
- active = (f->timeout_change & TF_TIMEOUT_CHANGE_NEW_VALUE);
- else
- active = tf_heap_node_active(&f->heap_node);
-
- if (!active || tf_mtime_diff(abs, f->timeout) < 0) {
- /* Save previous timeout */
- timeout->saved_timeout = f->timeout;
- timeout->timeout_change = TF_TIMEOUT_CHANGE_NEEDED;
- if (active)
- timeout->timeout_change |= TF_TIMEOUT_CHANGE_NEW_VALUE;
-
- /* Make new timeout pending */
- f->timeout = abs;
- f->timeout_change = TF_TIMEOUT_CHANGE_NEEDED
- | TF_TIMEOUT_CHANGE_NEW_VALUE;
- } else {
- timeout->timeout_change = 0;
- }
-}
-
-int __tf_timeout_pop(struct tf_timeout *timeout, int err)
-{
- struct tf_fiber *f = tf_fiber_get_current();
-
- f->timeout = timeout->saved_timeout;
- f->timeout_change = timeout->timeout_change;
- if (err == TF_WAKEUP_TIMEOUT)
- err = TF_WAKEUP_THIS_TIMEOUT;
- return err;
-}
diff --git a/src/ifc.c b/src/ifc.c
new file mode 100644
index 0000000..2dfafe8
--- /dev/null
+++ b/src/ifc.c
@@ -0,0 +1,57 @@
+/* ifc.c - inter fiber communications
+ *
+ * Copyright (C) 2010 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#include <libtf/atomic.h>
+#include <libtf/fiber.h>
+
+void tf_ifc_queue(void *fiber, struct tf_ifc *ifc, tf_ifc_handler_t handler)
+{
+ struct tf_fiber *f = container_of(fiber, struct tf_fiber, data);
+ struct tf_ifc *old;
+
+ TF_BUG_ON(ifc->next != NULL);
+ ifc->handler = handler;
+ do {
+ old = f->pending_ifc;
+ ifc->next = old;
+ } while (!tf_atomic_cmpxchg(&f->pending_ifc, old, ifc));
+
+ tf_fiber_wakeup(f);
+}
+
+void tf_ifc_complete(void *fiber, struct tf_ifc *ifc, tf_ifc_handler_t handler)
+{
+ ifc->sender = tf_vmach_get_current_fiber();
+ tf_ifc_queue(fiber, ifc, handler);
+ while (ifc->sender != NULL)
+ tf_fiber_schedule();
+}
+
+void tf_ifc_process_unordered(void)
+{
+ struct tf_fiber *f = tf_vmach_get_current_fiber(), *s;
+ struct tf_ifc *pending, *ifc;
+
+ while (f->pending_ifc != NULL) {
+ pending = tf_atomic_xchg(&f->pending_ifc, NULL);
+ while (pending) {
+ ifc = pending;
+ pending = ifc->next;
+ ifc->handler(f->data, ifc);
+ s = ifc->sender;
+ ifc->next = NULL;
+ ifc->sender = NULL;
+ if (s != NULL)
+ tf_fiber_wakeup(s);
+ }
+ }
+}
diff --git a/src/io-epoll.c b/src/io-epoll.c
index 32aa090..1fc9ca1 100644
--- a/src/io-epoll.c
+++ b/src/io-epoll.c
@@ -16,74 +16,70 @@
#include <sys/epoll.h>
#include <libtf/io.h>
-#include <libtf/scheduler.h>
+#include <libtf/fiber.h>
-struct tf_poll_data {
+struct tf_epoll_data {
int epoll_fd;
- int num_waiters;
};
-static struct tf_poll_data *tf_epoll_get_data(void)
+static void tf_epoll_main(void *ctx)
{
- struct tf_scheduler *sched = tf_scheduler_get_current();
- TF_BUILD_BUG_ON(sizeof(struct tf_poll_data) > sizeof(sched->poll_data));
- return (struct tf_poll_data *) &sched->poll_data;
-}
-
-static void tf_epoll_init(void)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ struct tf_epoll_data *pd = ctx;
+ struct epoll_event events[64];
+ struct tf_fd *fd;
+ int r, i;
- pd->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
- pd->num_waiters = 0;
- TF_BUG_ON(pd->epoll_fd < 0);
-}
+ do {
+ r = epoll_wait(pd->epoll_fd, events, array_size(events), 0);
+ if (r == 0) {
+ /* FIXME: yielding is bad */
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ tf_list_add_tail(&self->queue_node, &self->wakeup_q);
+ if (tf_fiber_schedule() == 0)
+ continue;
+ }
-static void tf_epoll_close(void)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ for (i = 0; i < r; i++) {
+ fd = (struct tf_fd *) events[i].data.ptr;
+ tf_fiber_wakeup(fd->fiber);
+ }
+ } while (1);
close(pd->epoll_fd);
}
-static int tf_epoll_poll(tf_mtime_diff_t timeout)
+
+static void *tf_epoll_create(void)
{
- struct tf_poll_data *pd = tf_epoll_get_data();
- struct epoll_event events[64];
- struct tf_fd *fd;
- int r, i, ret;
+ struct tf_epoll_data *d;
- if (timeout == 0 && pd->num_waiters == 0)
- return TF_WAKEUP_TIMEOUT;
+ d = tf_fiber_create(tf_epoll_main, sizeof(struct tf_epoll_data));
+ if (d == NULL)
+ return NULL;
- ret = TF_WAKEUP_TIMEOUT;
- do {
- r = epoll_wait(pd->epoll_fd, events, array_size(events), timeout);
- if (r == 0)
- break;
+ d->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ TF_BUG_ON(d->epoll_fd < 0);
- for (i = 0; i < r; i++) {
- fd = (struct tf_fd *) events[i].data.ptr;
- if (likely(fd->events & events[i].events))
- __tf_fiber_wakeup(fd->waiting_fiber, TF_WAKEUP_FD);
- }
- ret = TF_WAKEUP_FD;
- timeout = 0;
- } while (unlikely(r == array_size(events)));
+ tf_fiber_run(tf_fiber_get(d));
- return ret;
+ return d;
}
-static int tf_epoll_fd_created(struct tf_fd *fd)
+static int tf_epoll_fd_created(void *fiber, struct tf_fd *fd)
{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ struct tf_epoll_data *d = fiber;
struct epoll_event ev;
int r;
ev = (struct epoll_event) {
- .events = EPOLLIN | EPOLLOUT | EPOLLET,
+ .events = EPOLLET,
.data.ptr = fd,
};
- r = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+ if (fd->flags & TF_FD_READ)
+ ev.events |= EPOLLIN;
+ if (fd->flags & TF_FD_WRITE)
+ ev.events |= EPOLLOUT;
+
+ r = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
if (unlikely(r < 0)) {
TF_BUG_ON(errno == EEXIST);
r = -errno;
@@ -93,46 +89,18 @@ static int tf_epoll_fd_created(struct tf_fd *fd)
return 0;
}
-static int tf_epoll_fd_destroyed(struct tf_fd *fd)
+static int tf_epoll_fd_destroyed(void *fiber, struct tf_fd *fd)
{
- struct tf_poll_data *pd = tf_epoll_get_data();
+ struct tf_epoll_data *d = fiber;
- if (fd->flags & TF_FD_AUTOCLOSE)
- return 0;
+ if (!(fd->flags & TF_FD_AUTOCLOSE))
+ epoll_ctl(d->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
- epoll_ctl(pd->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
return 0;
}
-static void tf_epoll_fd_monitor(struct tf_fd *fd, int events)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
-
- TF_BUG_ON(fd->waiting_fiber != NULL);
- fd->events = EPOLLERR | EPOLLHUP;
- if (events & TF_POLL_READ)
- fd->events |= EPOLLIN;
- if (events & TF_POLL_WRITE)
- fd->events |= EPOLLOUT;
- fd->waiting_fiber = tf_scheduler_get_current()->active_fiber;
- pd->num_waiters++;
-}
-
-static void tf_epoll_fd_unmonitor(struct tf_fd *fd)
-{
- struct tf_poll_data *pd = tf_epoll_get_data();
-
- fd->waiting_fiber = NULL;
- fd->events = 0;
- pd->num_waiters--;
-}
-
struct tf_poll_hooks tf_epoll_hooks = {
- .init = tf_epoll_init,
- .close = tf_epoll_close,
- .poll = tf_epoll_poll,
+ .create = tf_epoll_create,
.fd_created = tf_epoll_fd_created,
.fd_destroyed = tf_epoll_fd_destroyed,
- .fd_monitor = tf_epoll_fd_monitor,
- .fd_unmonitor = tf_epoll_fd_unmonitor,
};
diff --git a/src/io-unix.c b/src/io-unix.c
index d80592a..7cd4fd0 100644
--- a/src/io-unix.c
+++ b/src/io-unix.c
@@ -17,10 +17,8 @@
#include <string.h>
#include <libtf/io.h>
-#include <libtf/scheduler.h>
-
-#define TF_POLL_HOOKS \
-struct tf_poll_hooks *poller = tf_scheduler_get_current()->poller;
+#include <libtf/fiber.h>
+#include <libtf/vmach.h>
static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
{
@@ -38,14 +36,14 @@ static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
int tf_open_fd(struct tf_fd *fd, int kfd, int flags)
{
- TF_POLL_HOOKS;
+ struct tf_vmach *vm = tf_vmach_get_current();
int r;
fd->fd = kfd;
fd->flags = flags;
- fd->waiting_fiber = NULL;
+ fd->fiber = tf_vmach_get_current_fiber();
- r = poller->fd_created(fd);
+ r = vm->poll_ops->fd_created(vm->poll_fiber, fd);
if (r < 0) {
if (flags & TF_FD_AUTOCLOSE)
close(kfd);
@@ -88,10 +86,10 @@ int tf_open(struct tf_fd *fd, const char *pathname, int flags)
int tf_close(struct tf_fd *fd)
{
- TF_POLL_HOOKS;
+ struct tf_vmach *vm = tf_vmach_get_current();
int r;
- poller->fd_destroyed(fd);
+ vm->poll_ops->fd_destroyed(vm->poll_fiber, fd);
if (fd->flags & TF_FD_AUTOCLOSE) {
r = close(fd->fd);
if (unlikely(r == -1))
@@ -104,11 +102,9 @@ int tf_close(struct tf_fd *fd)
int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- int r;
+ int r = 0;
- poller->fd_monitor(fd, TF_POLL_READ);
do {
n = read(fd->fd, buf, count);
if (n == count) {
@@ -132,20 +128,17 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
continue;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return -r;
}
int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- int r;
+ int r = 0;
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
n = write(fd->fd, buf, count);
if (n == count) {
@@ -166,19 +159,16 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
continue;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return -r;
}
ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- poller->fd_monitor(fd, TF_POLL_READ);
do {
n = read(fd->fd, buf, count);
if (n >= 0)
@@ -189,19 +179,16 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
n = -errno;
break;
}
- n = __tf_fiber_schedule();
- } while (n == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ n = tf_fiber_schedule();
+ } while (n != 0);
return n;
}
ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
n = write(fd->fd, buf, count);
if (n >= 0)
@@ -212,9 +199,8 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
n = -errno;
break;
}
- n = __tf_fiber_schedule();
- } while (n == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ n = tf_fiber_schedule();
+ } while (n != 0);
return n;
}
@@ -261,7 +247,6 @@ int tf_listen(struct tf_fd *fd, int backlog)
int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
struct tf_sockaddr *from)
{
- TF_POLL_HOOKS;
int r, tfdf;
struct sockaddr *addr = NULL;
socklen_t al, *pal = NULL;
@@ -272,24 +257,19 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
pal = &al;
}
- tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED);
- tfdf |= TF_FD_SET_CLOEXEC;
+ tfdf = TF_FD_AUTOCLOSE | TF_FD_ALREADY_NONBLOCKING |
+ (listen_fd->flags & TF_FD_STREAM_ORIENTED);
- poller->fd_monitor(listen_fd, TF_POLL_READ);
do {
- /* FIXME: use accept4 if available */
- r = accept(listen_fd->fd, addr, pal);
+ r = accept4(listen_fd->fd, addr, pal, SOCK_NONBLOCK|SOCK_CLOEXEC);
if (r >= 0)
break;
if (errno == EINTR)
continue;
- if (errno != EAGAIN) {
- poller->fd_unmonitor(listen_fd);
+ if (errno != EAGAIN)
return -errno;
- }
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(listen_fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
if (r < 0)
return r;
@@ -298,7 +278,6 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
{
- TF_POLL_HOOKS;
socklen_t l = sizeof(int);
int r, err;
@@ -310,10 +289,8 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
return -errno;
/* Wait for socket to become readable */
- poller->fd_monitor(fd, TF_POLL_WRITE);
- r = __tf_fiber_schedule();
- poller->fd_unmonitor(fd);
- if (r != TF_WAKEUP_FD)
+ r = tf_fiber_schedule();
+ if (r != 0)
return r;
/* Check for error */
@@ -327,7 +304,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
struct tf_sockaddr *to,
void *buf, size_t len)
{
- TF_POLL_HOOKS;
struct iovec iov;
struct msghdr msg;
struct cmsghdr *cmsg;
@@ -347,7 +323,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
.msg_iovlen = 1,
};
- poller->fd_monitor(fd, TF_POLL_READ);
do {
r = recvmsg(fd->fd, &msg, 0);
if (r >= 0)
@@ -356,9 +331,8 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
if (r < 0 || to == NULL)
return r;
@@ -382,7 +356,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
const struct tf_sockaddr *to,
const void *buf, size_t len)
{
- TF_POLL_HOOKS;
struct msghdr msg;
struct iovec iov;
struct {
@@ -411,7 +384,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
msg.msg_controllen = sizeof(cmsg);
}
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
r = sendmsg(fd->fd, &msg, 0);
if (r >= 0)
@@ -420,9 +392,8 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return r;
}
diff --git a/src/scheduler.c b/src/scheduler.c
deleted file mode 100644
index a103d0a..0000000
--- a/src/scheduler.c
+++ /dev/null
@@ -1,134 +0,0 @@
-/* scheduler.c - fiber scheduling
- *
- * Copyright (C) 2009-2010 Timo Teräs <timo.teras@iki.fi>
- * All rights reserved.
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 or later as
- * published by the Free Software Foundation.
- *
- * See http://www.gnu.org/ for details.
- */
-
-#include <time.h>
-#include <libtf/scheduler.h>
-#include <libtf/io.h>
-
-/* FIXME: should be in thread local storage */
-extern struct tf_poll_hooks tf_epoll_hooks;
-struct tf_scheduler *__tf_scheduler;
-
-static void update_time(struct tf_scheduler *sched)
-{
- struct timespec ts;
-
- clock_gettime(CLOCK_MONOTONIC, &ts);
- sched->scheduler_time = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
-}
-
-static void process_heap(struct tf_scheduler *sched)
-{
- struct tf_heap_node *node;
- tf_mtime_t now = sched->scheduler_time;
-
- while (!tf_heap_empty(&sched->heap) &&
- tf_mtime_diff(now, tf_heap_get_value(&sched->heap)) >= 0) {
- node = tf_heap_get_node(&sched->heap);
- tf_heap_delete(node, &sched->heap);
- __tf_fiber_wakeup_heapnode(node);
- }
-}
-
-void tf_scheduler_fiber(void *data)
-{
- struct tf_scheduler *sched = (struct tf_scheduler *) data;
-
- do {
- tf_mtime_diff_t timeout;
-
- update_time(sched);
- if (!tf_list_empty(&sched->scheduled_q) ||
- !tf_list_empty(&sched->running_q)) {
- timeout = 0;
- } else if (!tf_heap_empty(&sched->heap)) {
- timeout = tf_mtime_diff(
- tf_heap_get_value(&sched->heap),
- tf_scheduler_get_mtime());
- if (timeout < 0)
- timeout = 0;
- } else {
- timeout = -1;
- }
-
- if (sched->poller->poll(timeout) == TF_WAKEUP_TIMEOUT &&
- timeout >= 0) {
- sched->scheduler_time += timeout;
- process_heap(sched);
- }
-
- if (tf_fiber_yield() == TF_WAKEUP_KILL) {
- do {
- tf_fiber_put(sched->active_fiber);
- sched->active_fiber = sched;
- } while (__tf_fiber_schedule() == TF_WAKEUP_KILL);
- }
- } while (1);
-}
-
-struct tf_scheduler *tf_scheduler_create(void)
-{
- struct tf_scheduler *sched;
-
- sched = __tf_fiber_create(tf_scheduler_fiber,
- sizeof(struct tf_scheduler));
-
- *sched = (struct tf_scheduler) {
- .scheduled_q = TF_LIST_HEAD_INITIALIZER(sched->scheduled_q),
- .running_q = TF_LIST_HEAD_INITIALIZER(sched->running_q),
- };
-
- return sched;
-}
-
-int tf_scheduler_enable(struct tf_scheduler *sched)
-{
- struct tf_scheduler *s = sched;
-
- if (s == NULL) {
- s = tf_scheduler_create();
- if (s == NULL)
- return -ENOMEM;
- }
- if (s->main_fiber != NULL)
- return -EBUSY;
-
- __tf_fiber_bind_scheduler(s);
- __tf_scheduler = s;
- s->poller = &tf_epoll_hooks;
- s->poller->init();
- update_time(s);
-
- if (sched != NULL)
- tf_scheduler_get(sched);
-
- return 0;
-}
-
-void tf_scheduler_disable(void)
-{
- struct tf_scheduler *sched = __tf_scheduler;
-
- if (sched == NULL ||
- sched->main_fiber != sched->active_fiber)
- return;
-
- /* sleep until no others */
- while (sched->num_fibers > 1)
- __tf_fiber_schedule();
-
- sched->poller->close();
- __tf_scheduler = NULL;
- __tf_fiber_release_scheduler(sched);
- tf_heap_destroy(&sched->heap);
- tf_fiber_put(sched);
-}
diff --git a/src/timeout.c b/src/timeout.c
new file mode 100644
index 0000000..fc7acd2
--- /dev/null
+++ b/src/timeout.c
@@ -0,0 +1,120 @@
+/* timeout.c - timerfd based fiber wakeups
+ *
+ * Copyright (C) 2010 Timo Teräs <timo.teras@iki.fi>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 or later as
+ * published by the Free Software Foundation.
+ *
+ * See http://www.gnu.org/ for details.
+ */
+
+#include <libtf/fiber.h>
+#include <libtf/io.h>
+#include <sys/timerfd.h>
+
+struct tf_timeout_fiber {
+ struct tf_timeout_manager manager;
+ struct tf_fd fd;
+ tf_mtime_t programmed;
+};
+
+static tf_mtime_t tf_mtime_cached;
+
+tf_mtime_t tf_mtime_now(void)
+{
+ return tf_mtime_cached;
+}
+
+tf_mtime_t tf_mtime_update(void)
+{
+ struct timespec ts;
+
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ tf_mtime_cached = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
+
+ return tf_mtime_cached;
+}
+
+static void tf_timeout_process_heap(struct tf_timeout_fiber *t)
+{
+ struct tf_heap_head *heap = &t->manager.heap;
+ struct tf_heap_node *node;
+ tf_mtime_t now = tf_mtime_update();
+ tf_mtime_diff_t timeout, value;
+
+ while (!tf_heap_empty(heap)) {
+ value = tf_heap_get_value(heap);
+ timeout = tf_mtime_diff(value, now);
+ if (timeout > 0) {
+ if (t->programmed != value) {
+ struct itimerspec its = {
+ .it_value.tv_sec = timeout / 1000,
+ .it_value.tv_nsec = (timeout % 1000) * 1000000,
+ };
+ t->programmed = value;
+ timerfd_settime(t->fd.fd, 0, &its, NULL);
+ }
+ break;
+ }
+
+ node = tf_heap_get_node(heap);
+ tf_heap_delete(node, heap);
+ tf_fiber_wakeup(container_of(node, struct tf_fiber, timeout.heap_node));
+ }
+}
+
+static void tf_timeout_worker(void *ctx)
+{
+ do {
+ tf_ifc_process_unordered();
+ tf_timeout_process_heap(ctx);
+ } while (tf_fiber_schedule() == 0);
+}
+
+struct tf_timeout_fiber *tf_timeout_fiber_create(void)
+{
+ struct tf_timeout_fiber *f;
+
+ f = tf_fiber_create(tf_timeout_worker, sizeof(struct tf_timeout_fiber));
+ if (f == NULL)
+ return f;
+
+ if (tf_open_fd(&f->fd,
+ timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC),
+ TF_FD_READ | TF_FD_ALREADY_NONBLOCKING | TF_FD_AUTOCLOSE) != 0) {
+ tf_fiber_put(f);
+ return NULL;
+ }
+ f->fd.fiber = container_of((void *) f, struct tf_fiber, data);
+ tf_fiber_run(tf_fiber_get(f));
+
+ return f;
+}
+
+static void tf_timeout_adjust_ifc(void *fiber, struct tf_ifc *ifc)
+{
+ struct tf_timeout_fiber *t = fiber;
+ struct tf_timeout_client *c = container_of(ifc, struct tf_timeout_client, ifc);
+ tf_mtime_t val;
+
+ val = c->value;
+ if (val == 0)
+ tf_heap_delete(&c->heap_node, &t->manager.heap);
+ else
+ tf_heap_change(&c->heap_node, &t->manager.heap, val);
+ c->latched = val;
+}
+
+void tf_timeout_adjust(struct tf_timeout_client *tc)
+{
+ tf_ifc_queue(tc->manager, &tc->ifc, tf_timeout_adjust_ifc);
+}
+
+void tf_timeout_delete(struct tf_timeout_client *tc)
+{
+ tc->value = 0;
+ tc->latched = 0;
+ tf_ifc_complete(tc->manager, &tc->ifc, tf_timeout_adjust_ifc);
+}
diff --git a/src/uctx.h b/src/uctx.h
index 3ec30c4..353b732 100644
--- a/src/uctx.h
+++ b/src/uctx.h
@@ -18,19 +18,18 @@
#ifdef VALGRIND
#include <valgrind/valgrind.h>
-#else
-#define VALGRIND_STACK_REGISTER(stack_base, size) 0
-#define VALGRIND_STACK_DEREGISTER(stack_id)
#endif
#define STACK_GUARD 0xbad57ac4
struct tf_uctx {
- int *stack_guard;
- size_t size;
- void *alloc;
- void *current_sp;
- unsigned int stack_id;
+ int * stack_guard;
+ size_t size;
+ void * alloc;
+ void * current_sp;
+#ifdef VALGRIND
+ unsigned int stack_id;
+#endif
};
#if defined(__i386__)
@@ -89,13 +88,14 @@ static inline void stack_push_ptr(void **stackptr, void *ptr)
}
-static inline void tf_uctx_create_self(struct tf_uctx *uctx)
+static inline tf_uctx_t tf_uctx_create_self(struct tf_uctx *uctx)
{
static int dummy_guard = STACK_GUARD;
*uctx = (struct tf_uctx) {
.stack_guard = &dummy_guard,
};
+ return uctx;
}
static inline void *
@@ -118,20 +118,24 @@ tf_uctx_create_embedded(
/* Create initial stack frame (cdecl convention) */
stack = stack_pointer(stack_base, size);
- user_data = stack_push(&stack, TF_ALIGN(private_size, 64));
+ user_data = stack_push(&stack, TF_ALIGN(private_size, 16));
+ uctx = stack_push(&stack, TF_ALIGN(sizeof(struct tf_uctx), 16));
stack_push_ptr(&stack, main_argument);
stack_push_ptr(&stack, user_data);
stack_push_ptr(&stack, NULL);
stack_push_ptr(&stack, stack_frame_main); /* eip */
stack_push_ptr(&stack, NULL); /* ebp */
- uctx = user_data + uctx_offset;
+ *((tf_uctx_t *) (user_data + uctx_offset)) = uctx;
+
*uctx = (struct tf_uctx) {
.stack_guard = stack_guard(stack_base, size),
.alloc = stack_base,
.size = size,
.current_sp = stack,
+#ifdef VALGRIND
.stack_id = VALGRIND_STACK_REGISTER(stack_base, stack_base+size),
+#endif
};
*uctx->stack_guard = STACK_GUARD;
@@ -139,18 +143,25 @@ tf_uctx_create_embedded(
}
static inline
-void tf_uctx_destroy(struct tf_uctx *uctx)
+void tf_uctx_destroy(tf_uctx_t ctx)
{
+ struct tf_uctx *uctx = ctx;
+
if (uctx->alloc != NULL) {
+#ifdef VALGRIND
VALGRIND_STACK_DEREGISTER(uctx->stack_id);
+#endif
tf_bmem_free(uctx->alloc, uctx->size);
}
}
static inline
-void tf_uctx_transfer(struct tf_uctx *from, struct tf_uctx *to)
+void tf_uctx_transfer(tf_uctx_t from, tf_uctx_t to)
{
+ struct tf_uctx *ufrom = from;
+ struct tf_uctx *uto = to;
+
/* Switch stack pointers */
- TF_BUG_ON(*from->stack_guard != STACK_GUARD);
- switch_fiber(from, to);
+ TF_BUG_ON(*ufrom->stack_guard != STACK_GUARD);
+ switch_fiber(ufrom, uto);
}
diff --git a/src/vmach.c b/src/vmach.c
new file mode 100644
index 0000000..a9ca446
--- /dev/null
+++ b/src/vmach.c
@@ -0,0 +1,120 @@
+#include <libtf/defines.h>
+#include <libtf/list.h>
+#include <libtf/vmach.h>
+#include <libtf/io.h>
+#include "uctx.h"
+
+__thread struct tf_fiber *tf_current_fiber;
+__thread struct tf_vcpu *tf_current_vcpu;
+__thread struct tf_vmach *tf_current_vmach;
+
+extern struct tf_poll_hooks tf_epoll_hooks;
+
+struct tf_vmachine {
+ struct tf_vmach vmach;
+ struct tf_uctx startup_uctx;
+ void *machine_init_fiber;
+};
+
+static void tf_vcpu_main(void *fiber_data)
+{
+ struct tf_vcpu *vcpu = fiber_data;
+ struct tf_vmach *vmach = vcpu->machine;
+ struct tf_fiber *self = container_of(fiber_data, struct tf_fiber, data);
+ struct tf_fiber *f;
+
+ tf_current_vmach = vmach;
+ tf_current_vcpu = vcpu;
+
+ while (vmach->num_user_fibers != 0) {
+ if (tf_list_empty(&vmach->run_q)) {
+ /* sleep */
+ continue;
+ }
+
+ f = tf_list_entry(tf_list_pop(&vmach->run_q),
+ struct tf_fiber, queue_node);
+
+ f->return_context = self->context;
+ tf_current_fiber = f;
+ tf_uctx_transfer(self->context, f->context);
+ tf_list_splice_tail(&f->wakeup_q, &vmach->run_q);
+ }
+}
+
+static void tf_vmach_main(void *fiber_data)
+{
+ struct tf_fiber *self = container_of(fiber_data, struct tf_fiber, data);
+ struct tf_vcpu *vcpu = fiber_data;
+ struct tf_vmach *vmach = vcpu->machine;
+
+ tf_current_vmach = vmach;
+ tf_current_vcpu = vcpu;
+ tf_current_fiber = self;
+
+ /* Initialize IO subsystem */
+ vmach->poll_ops = &tf_epoll_hooks;
+ vmach->poll_fiber = vmach->poll_ops->create();
+ vmach->timeout_fiber = tf_timeout_fiber_create();
+ vmach->startup_fiber.timeout.manager = vmach->timeout_fiber;
+
+ /* Run the initial fiber */
+ tf_fiber_wakeup(&vmach->startup_fiber);
+
+ /* Use main thread as a regular vcpu */
+ vmach->num_user_fibers = 1;
+ tf_list_splice_tail(&self->wakeup_q, &vmach->run_q);
+ tf_vcpu_main(vcpu);
+
+ /* Kill all stuff */
+
+ /* Return to main fiber */
+ vmach->startup_fiber.return_context = NULL;
+ tf_current_fiber = NULL;
+ tf_current_vcpu = NULL;
+ tf_current_vmach = NULL;
+
+ tf_uctx_transfer(self->context, vmach->startup_fiber.context);
+}
+
+void tf_vmach_start(void)
+{
+ struct tf_vmachine *vmach;
+ struct tf_vcpu *vcpu;
+
+ TF_BUG_ON(tf_current_vcpu != NULL);
+
+ /* Create a self-fiber so we can surrender control to vcpu */
+ vmach = calloc(1, sizeof(struct tf_vmachine));
+ vmach->vmach.startup_fiber = (struct tf_fiber) {
+ .ref_count = 1,
+ .queue_node = TF_LIST_INITIALIZER(vmach->vmach.startup_fiber.queue_node),
+ .wakeup_q = TF_LIST_HEAD_INITIALIZER(vmach->vmach.startup_fiber.wakeup_q),
+ .context = tf_uctx_create_self(&vmach->startup_uctx),
+ };
+ tf_list_init_head(&vmach->vmach.run_q);
+ vcpu = tf_fiber_create(tf_vmach_main, sizeof(struct tf_vcpu));
+ vmach->machine_init_fiber = vcpu;
+ vcpu->machine = &vmach->vmach;
+
+ /* Create manager fiber to initialize vcpu */
+ tf_uctx_transfer(vmach->vmach.startup_fiber.context,
+ container_of((void *) vcpu, struct tf_fiber, data)->context);
+}
+
+void tf_vmach_stop(void)
+{
+ struct tf_fiber *self = tf_vmach_get_current_fiber();
+ struct tf_vmach *vmach = tf_vmach_get_current();
+
+ TF_BUG_ON(self != &vmach->startup_fiber);
+
+ /* Wait for the vmachine to stop */
+ tf_vmach_get_current()->num_user_fibers--;
+ while (self->return_context != NULL)
+ tf_fiber_schedule();
+
+ /* And clean up */
+ tf_uctx_destroy(vmach->startup_fiber.context);
+ free(vmach);
+}