From 4db830052d941d9c6de281bc9a2f6ac212c59ad8 Mon Sep 17 00:00:00 2001 From: Timo Teras Date: Wed, 25 Nov 2009 16:53:02 +0200 Subject: libtf: minor changes and new test case for network i/o fixup the internals a bit. --- include/libtf/fiber.h | 19 +++++++++++----- include/libtf/io.h | 6 ++++-- src/fiber.c | 11 ++++++++-- src/io-unix.c | 54 +++++++++++++++++++++++++++++++++++++++++++--- test/TFbuild | 2 +- test/httpget.c | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 139 insertions(+), 13 deletions(-) create mode 100644 test/httpget.c diff --git a/include/libtf/fiber.h b/include/libtf/fiber.h index 36c2812..c3d87c6 100644 --- a/include/libtf/fiber.h +++ b/include/libtf/fiber.h @@ -23,10 +23,10 @@ /* Fiber wakeup reasons */ #define TF_WAKEUP_NONE 0 -#define TF_WAKEUP_IMMEDIATE EAGAIN -#define TF_WAKEUP_KILL EINTR -#define TF_WAKEUP_TIMEOUT ETIMEDOUT -#define TF_WAKEUP_FD EIO +#define TF_WAKEUP_IMMEDIATE -EAGAIN +#define TF_WAKEUP_KILL -EINTR +#define TF_WAKEUP_TIMEOUT -ETIMEDOUT +#define TF_WAKEUP_FD -EIO /* Special timeouts for tf_schedule() */ #define TF_NO_TIMEOUT -1 @@ -50,6 +50,11 @@ struct tf_scheduler { }; +struct tf_main_ctx { + int argc; + char ** argv; +}; + static inline struct tf_scheduler *tf_get_scheduler(void) { @@ -71,7 +76,11 @@ tf_mtime_t tf_mtime(void) /* Fiber creation */ typedef void (*tf_fiber_proc)(void *fiber); -int tf_main(tf_fiber_proc fiber_main); +int tf_main_args(tf_fiber_proc fiber_main, int argc, char **argv); +static inline int tf_main(tf_fiber_proc fiber_main) +{ + return tf_main_args(fiber_main, 0, NULL); +} void *tf_fiber_create(tf_fiber_proc fiber_main, int private_size); void *tf_fiber_get(void *data); diff --git a/include/libtf/io.h b/include/libtf/io.h index 8f5b54d..38dd541 100644 --- a/include/libtf/io.h +++ b/include/libtf/io.h @@ -52,8 +52,10 @@ void tf_poll_close(void); int tf_open_fd(struct tf_fd *fd, int kfd, int flags); int tf_open(struct tf_fd *fd, const char *pathname, int flags); int tf_close(struct tf_fd *fd); -int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout); -int tf_write(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout); +ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout); +ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout); +int tf_read_fully(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout); +int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count, tf_mtime_diff_t timeout); int tf_socket(struct tf_fd *fd, int domain, int type, int protocol); int tf_bind(struct tf_fd *fd, const struct tf_sockaddr *addr); diff --git a/src/fiber.c b/src/fiber.c index aa83fa8..b678842 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -123,20 +123,27 @@ static void process_runq(struct tf_scheduler *sched) } } -int tf_main(tf_fiber_proc main_fiber) +int tf_main_args(tf_fiber_proc main_fiber, int argc, char **argv) { struct tf_uctx *ctx = alloca(sizeof(struct tf_uctx) + sizeof(struct tf_scheduler)); struct tf_scheduler *sched = (struct tf_scheduler*) ctx->fiber.data; + struct tf_main_ctx *mainctx; int stack_guard = STACK_GUARD; ctx->stack_guard = &stack_guard; *sched = (struct tf_scheduler){ .run_q = TF_LIST_HEAD_INITIALIZER(sched->run_q), }; + __tf_scheduler = sched; tf_poll_init(); update_time(sched); - tf_fiber_put(tf_fiber_create(main_fiber, 0)); + + mainctx = tf_fiber_create(main_fiber, sizeof(struct tf_main_ctx)); + mainctx->argc = argc; + mainctx->argv = argv; + tf_fiber_put(mainctx); + do { tf_mtime_diff_t timeout; diff --git a/src/io-unix.c b/src/io-unix.c index cc1bd1b..8ed8e42 100644 --- a/src/io-unix.c +++ b/src/io-unix.c @@ -90,7 +90,8 @@ int tf_close(struct tf_fd *fd) return 0; } -int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) +int tf_read_fully(struct tf_fd *fd, void *buf, size_t count, + tf_mtime_diff_t timeout) { ssize_t n; int r; @@ -127,8 +128,8 @@ int tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) return -r; } -int tf_write(struct tf_fd *fd, const void *buf, size_t count, - tf_mtime_diff_t timeout) +int tf_write_fully(struct tf_fd *fd, const void *buf, size_t count, + tf_mtime_diff_t timeout) { ssize_t n; int r; @@ -162,6 +163,53 @@ int tf_write(struct tf_fd *fd, const void *buf, size_t count, return -r; } +ssize_t tf_read(struct tf_fd *fd, void *buf, size_t count, tf_mtime_diff_t timeout) +{ + ssize_t n; + + tf_fd_monitor(fd, EPOLLIN); + do { + n = read(fd->fd, buf, count); + if (n >= 0) + break; + if (errno == EINTR) + continue; + if (errno != EAGAIN) { + n = -errno; + break; + } + n = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (n == TF_WAKEUP_FD); + tf_fd_unmonitor(fd); + + return n; +} + +ssize_t tf_write(struct tf_fd *fd, const void *buf, size_t count, + tf_mtime_diff_t timeout) +{ + ssize_t n; + + tf_fd_monitor(fd, EPOLLOUT); + do { + n = write(fd->fd, buf, count); + if (n >= 0) + break; + if (errno == EINTR) + continue; + if (errno != EAGAIN) { + n = -errno; + break; + } + n = tf_schedule(timeout); + timeout = TF_NO_TIMEOUT_CHANGE; + } while (n == TF_WAKEUP_FD); + tf_fd_unmonitor(fd); + + return n; +} + int tf_socket(struct tf_fd *fd, int domain, int type, int protocol) { int kfd, tfdf, on = 1; diff --git a/test/TFbuild b/test/TFbuild index 430e132..19384d2 100644 --- a/test/TFbuild +++ b/test/TFbuild @@ -1,3 +1,3 @@ -progs-$(TEST) += simple1 sleep read +progs-$(TEST) += simple1 sleep read httpget LIBS += $(objtree)src/libtf.a diff --git a/test/httpget.c b/test/httpget.c new file mode 100644 index 0000000..c1e37a3 --- /dev/null +++ b/test/httpget.c @@ -0,0 +1,60 @@ +#include + +#include +#include +#include +#include + +struct ctx { + char *hostname; +}; + +static void ping_fiber(void *ptr) +{ + struct ctx *ctx = (struct ctx*) ptr; + struct tf_sockaddr host; + struct tf_fd fd; + char buf[128]; + int bytes = 0, r = 0; + const char *req = "GET / HTTP/1.0\r\n\r\n"; + + printf("Lookup %s\n", ctx->hostname); + host.u.in.sin_family = AF_INET; + host.u.in.sin_addr.s_addr = inet_addr(ctx->hostname); + host.u.in.sin_port = htons(80); + + if (tf_socket(&fd, AF_INET, SOCK_STREAM, 0) < 0) + goto err; + + if ((r = tf_connect(&fd, &host, 10000)) < 0) + goto err_close; + + if ((r = tf_write_fully(&fd, req, strlen(req), 10000)) < 0) + goto err_close; + + while ((r = tf_read(&fd, buf, sizeof(buf), 10000)) > 0) + bytes += r; +err_close: + tf_close(&fd); +err: + printf("Host: %s, received %d bytes (r=%d, %s)\n", + ctx->hostname, bytes, -r, strerror(-r)); +} + +static void init_fiber(void *ptr) +{ + struct tf_main_ctx *ctx = (struct tf_main_ctx *) ptr; + struct ctx *c; + int i; + + for (i = 1; i < ctx->argc; i++) { + c = tf_fiber_create(ping_fiber, sizeof(struct ctx)); + c->hostname = ctx->argv[i]; + tf_fiber_put(c); + } +} + +int main(int argc, char **argv) +{ + return tf_main_args(init_fiber, argc, argv); +} -- cgit v1.2.3