summaryrefslogtreecommitdiffstats
path: root/src/io-unix.c
diff options
context:
space:
mode:
authorTimo Teräs <timo.teras@iki.fi>2010-07-02 20:23:07 +0300
committerTimo Teräs <timo.teras@iki.fi>2010-07-02 20:25:47 +0300
commit23b95bf1a15322c2f471b80c06cb65d9b2d2a282 (patch)
tree9bf12231db9591852e3b42ca24715d2cbaf6267b /src/io-unix.c
parent0183e33d9a4759764716e771b85e19f7a997b8bd (diff)
downloadlibtf-master.tar.bz2
libtf-master.tar.xz
libtf: major redesign startedHEADmaster
the idea is to make libtf completely multi-threaded. meaning each fiber can be running concurrently in separate thread. quite a bit of framework is added for this and some atomic helpers are already introduced. however, io polling is busy polling now (will be soon in own thread) and timeouts are still more or less broken. oh, and the multithreading core is not there yet. basically we are currently mostly broken ;)
Diffstat (limited to 'src/io-unix.c')
-rw-r--r--src/io-unix.c87
1 files changed, 29 insertions, 58 deletions
diff --git a/src/io-unix.c b/src/io-unix.c
index d80592a..7cd4fd0 100644
--- a/src/io-unix.c
+++ b/src/io-unix.c
@@ -17,10 +17,8 @@
#include <string.h>
#include <libtf/io.h>
-#include <libtf/scheduler.h>
-
-#define TF_POLL_HOOKS \
-struct tf_poll_hooks *poller = tf_scheduler_get_current()->poller;
+#include <libtf/fiber.h>
+#include <libtf/vmach.h>
static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
{
@@ -38,14 +36,14 @@ static inline int tf_sockaddr_len(const struct tf_sockaddr *addr)
int tf_open_fd(struct tf_fd *fd, int kfd, int flags)
{
- TF_POLL_HOOKS;
+ struct tf_vmach *vm = tf_vmach_get_current();
int r;
fd->fd = kfd;
fd->flags = flags;
- fd->waiting_fiber = NULL;
+ fd->fiber = tf_vmach_get_current_fiber();
- r = poller->fd_created(fd);
+ r = vm->poll_ops->fd_created(vm->poll_fiber, fd);
if (r < 0) {
if (flags & TF_FD_AUTOCLOSE)
close(kfd);
@@ -88,10 +86,10 @@ int tf_open(struct tf_fd *fd, const char *pathname, int flags)
int tf_close(struct tf_fd *fd)
{
- TF_POLL_HOOKS;
+ struct tf_vmach *vm = tf_vmach_get_current();
int r;
- poller->fd_destroyed(fd);
+ vm->poll_ops->fd_destroyed(vm->poll_fiber, fd);
if (fd->flags & TF_FD_AUTOCLOSE) {
r = close(fd->fd);
if (unlikely(r == -1))
@@ -104,11 +102,9 @@ int tf_close(struct tf_fd *fd)
int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- int r;
+ int r = 0;
- poller->fd_monitor(fd, TF_POLL_READ);
do {
n = read(fd->fd, buf, count);
if (n == count) {
@@ -132,20 +128,17 @@ int tf_read_fully(struct tf_fd *fd, void *buf, size_t count)
continue;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return -r;
}
int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- int r;
+ int r = 0;
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
n = write(fd->fd, buf, count);
if (n == count) {
@@ -166,19 +159,16 @@ int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count)
continue;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return -r;
}
ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- poller->fd_monitor(fd, TF_POLL_READ);
do {
n = read(fd->fd, buf, count);
if (n >= 0)
@@ -189,19 +179,16 @@ ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count)
n = -errno;
break;
}
- n = __tf_fiber_schedule();
- } while (n == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ n = tf_fiber_schedule();
+ } while (n != 0);
return n;
}
ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
{
- TF_POLL_HOOKS;
ssize_t n;
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
n = write(fd->fd, buf, count);
if (n >= 0)
@@ -212,9 +199,8 @@ ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count)
n = -errno;
break;
}
- n = __tf_fiber_schedule();
- } while (n == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ n = tf_fiber_schedule();
+ } while (n != 0);
return n;
}
@@ -261,7 +247,6 @@ int tf_listen(struct tf_fd *fd, int backlog)
int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
struct tf_sockaddr *from)
{
- TF_POLL_HOOKS;
int r, tfdf;
struct sockaddr *addr = NULL;
socklen_t al, *pal = NULL;
@@ -272,24 +257,19 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
pal = &al;
}
- tfdf = TF_FD_AUTOCLOSE | (listen_fd->flags & TF_FD_STREAM_ORIENTED);
- tfdf |= TF_FD_SET_CLOEXEC;
+ tfdf = TF_FD_AUTOCLOSE | TF_FD_ALREADY_NONBLOCKING |
+ (listen_fd->flags & TF_FD_STREAM_ORIENTED);
- poller->fd_monitor(listen_fd, TF_POLL_READ);
do {
- /* FIXME: use accept4 if available */
- r = accept(listen_fd->fd, addr, pal);
+ r = accept4(listen_fd->fd, addr, pal, SOCK_NONBLOCK|SOCK_CLOEXEC);
if (r >= 0)
break;
if (errno == EINTR)
continue;
- if (errno != EAGAIN) {
- poller->fd_unmonitor(listen_fd);
+ if (errno != EAGAIN)
return -errno;
- }
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(listen_fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
if (r < 0)
return r;
@@ -298,7 +278,6 @@ int tf_accept(struct tf_fd *listen_fd, struct tf_fd *child_fd,
int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
{
- TF_POLL_HOOKS;
socklen_t l = sizeof(int);
int r, err;
@@ -310,10 +289,8 @@ int tf_connect(struct tf_fd *fd, const struct tf_sockaddr *to)
return -errno;
/* Wait for socket to become readable */
- poller->fd_monitor(fd, TF_POLL_WRITE);
- r = __tf_fiber_schedule();
- poller->fd_unmonitor(fd);
- if (r != TF_WAKEUP_FD)
+ r = tf_fiber_schedule();
+ if (r != 0)
return r;
/* Check for error */
@@ -327,7 +304,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
struct tf_sockaddr *to,
void *buf, size_t len)
{
- TF_POLL_HOOKS;
struct iovec iov;
struct msghdr msg;
struct cmsghdr *cmsg;
@@ -347,7 +323,6 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
.msg_iovlen = 1,
};
- poller->fd_monitor(fd, TF_POLL_READ);
do {
r = recvmsg(fd->fd, &msg, 0);
if (r >= 0)
@@ -356,9 +331,8 @@ ssize_t tf_recvmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
if (r < 0 || to == NULL)
return r;
@@ -382,7 +356,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
const struct tf_sockaddr *to,
const void *buf, size_t len)
{
- TF_POLL_HOOKS;
struct msghdr msg;
struct iovec iov;
struct {
@@ -411,7 +384,6 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
msg.msg_controllen = sizeof(cmsg);
}
- poller->fd_monitor(fd, TF_POLL_WRITE);
do {
r = sendmsg(fd->fd, &msg, 0);
if (r >= 0)
@@ -420,9 +392,8 @@ ssize_t tf_sendmsg(struct tf_fd *fd,
r = -errno;
break;
}
- r = __tf_fiber_schedule();
- } while (r == TF_WAKEUP_FD);
- poller->fd_unmonitor(fd);
+ r = tf_fiber_schedule();
+ } while (r != 0);
return r;
}