From cdffdf5d61f8330cfc3467e73a84876eb3928e9b Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Sun, 3 Aug 2014 09:55:07 +0930 Subject: [PATCH] ccan/io: rewrite. I found it difficult to use myself, particularly io_duplex(). So this removes that, as well as timers and debug (for the moment). API changes: 1) An io_plan is passed by pointer, rather than copied on the stack. 3) All io_plans are generated using the struct io_conn. 3) tal is the allocator. 4) A new connection must be set up with a callback, so this is now the same as one generated from a listener. 5) io_read_partial and io_write_partial take an explicit length. 6) io_always() and io_wait() take an explit in/out arg. 7) io_break() does not return a plan. Signed-off-by: Rusty Russell --- ccan/io/_info | 43 +- ccan/io/backend.h | 80 +-- ccan/io/io.c | 532 +++++++----------- ccan/io/io.h | 511 ++++++++--------- ccan/io/io_plan.h | 149 ++--- ccan/io/poll.c | 376 ++++--------- ccan/io/test/run-01-start-finish-DEBUG.c | 8 - ccan/io/test/run-01-start-finish.c | 12 +- ccan/io/test/run-02-read-DEBUG.c | 8 - ccan/io/test/run-02-read.c | 11 +- ccan/io/test/run-03-readpartial-DEBUG.c | 8 - ccan/io/test/run-03-readpartial.c | 14 +- ccan/io/test/run-04-writepartial-DEBUG.c | 8 - ccan/io/test/run-04-writepartial.c | 13 +- ccan/io/test/run-05-write-DEBUG.c | 8 - ccan/io/test/run-05-write.c | 11 +- ccan/io/test/run-06-idle-DEBUG.c | 8 - ccan/io/test/run-06-idle.c | 31 +- ccan/io/test/run-07-break-DEBUG.c | 8 - ccan/io/test/run-07-break.c | 16 +- ccan/io/test/run-08-hangup-on-idle-DEBUG.c | 7 - ccan/io/test/run-08-hangup-on-idle.c | 42 +- ccan/io/test/run-08-read-after-hangup-DEBUG.c | 7 - ccan/io/test/run-08-read-after-hangup.c | 22 +- ccan/io/test/run-09-connect-DEBUG.c | 8 - ccan/io/test/run-09-connect.c | 29 +- ccan/io/test/run-10-many-DEBUG.c | 12 - ccan/io/test/run-10-many.c | 41 +- ccan/io/test/run-12-bidir-DEBUG.c | 8 - ccan/io/test/run-12-bidir.c | 11 +- ccan/io/test/run-13-all-idle-DEBUG.c | 8 - ccan/io/test/run-13-all-idle.c | 7 +- ccan/io/test/run-14-duplex-both-read-DEBUG.c | 8 - ccan/io/test/run-14-duplex-both-read.c | 7 + ccan/io/test/run-15-timeout-DEBUG.c | 8 - ccan/io/test/run-15-timeout.c | 9 +- ccan/io/test/run-16-duplex-test.c | 7 + ccan/io/test/run-17-homemade-io-DEBUG.c | 8 - ccan/io/test/run-17-homemade-io.c | 38 +- ccan/io/test/run-18-errno-DEBUG.c | 8 - ccan/io/test/run-18-errno.c | 17 +- ccan/io/test/run-19-always-DEBUG.c | 8 - ccan/io/test/run-19-always.c | 15 +- ccan/io/test/run-set_alloc.c | 240 -------- 44 files changed, 831 insertions(+), 1599 deletions(-) delete mode 100644 ccan/io/test/run-01-start-finish-DEBUG.c delete mode 100644 ccan/io/test/run-02-read-DEBUG.c delete mode 100644 ccan/io/test/run-03-readpartial-DEBUG.c delete mode 100644 ccan/io/test/run-04-writepartial-DEBUG.c delete mode 100644 ccan/io/test/run-05-write-DEBUG.c delete mode 100644 ccan/io/test/run-06-idle-DEBUG.c delete mode 100644 ccan/io/test/run-07-break-DEBUG.c delete mode 100644 ccan/io/test/run-08-hangup-on-idle-DEBUG.c delete mode 100644 ccan/io/test/run-08-read-after-hangup-DEBUG.c delete mode 100644 ccan/io/test/run-09-connect-DEBUG.c delete mode 100644 ccan/io/test/run-10-many-DEBUG.c delete mode 100644 ccan/io/test/run-12-bidir-DEBUG.c delete mode 100644 ccan/io/test/run-13-all-idle-DEBUG.c delete mode 100644 ccan/io/test/run-14-duplex-both-read-DEBUG.c delete mode 100644 ccan/io/test/run-15-timeout-DEBUG.c delete mode 100644 ccan/io/test/run-17-homemade-io-DEBUG.c delete mode 100644 ccan/io/test/run-18-errno-DEBUG.c delete mode 100644 ccan/io/test/run-19-always-DEBUG.c delete mode 100644 ccan/io/test/run-set_alloc.c diff --git a/ccan/io/_info b/ccan/io/_info index 0ba46a63..6dd633ab 100644 --- a/ccan/io/_info +++ b/ccan/io/_info @@ -10,12 +10,6 @@ * (eg. read, write). It is also possible to write custom I/O * plans. * - * When compiled with DEBUG, control flow is changed so that rather - * than returning to the main io_loop(), plans are executed sequentially - * providing a backtrace showing what has occurred on that connection. - * Which connection(s) do this depends on the user-specified io_debug - * function. - * * Example: * // Given tr A-Z a-z outputs tr a-z a-z * #include @@ -41,7 +35,7 @@ * io_wake(b); * } * - * static struct io_plan read_in(struct io_conn *c, struct buffer *b) + * static struct io_plan *read_in(struct io_conn *c, struct buffer *b) * { * // Add what we just read. * b->end += b->rlen; @@ -55,35 +49,33 @@ * if (b->start == b->end) * b->start = b->end = 0; * - * // Read in some of the rest. - * b->rlen = sizeof(b->buf) - b->end; - * * // No room? Wait for writer - * if (b->rlen == 0) - * return io_wait(b, read_in, b); + * if (b->end == sizeof(b->buf)) + * return io_wait(c, b, IO_IN, read_in, b); * - * return io_read_partial(b->buf + b->end, &b->rlen, read_in, b); + * return io_read_partial(c, b->buf + b->end, sizeof(b->buf) - b->end, + * &b->rlen, read_in, b); * } * - * static struct io_plan write_out(struct io_conn *c, struct buffer *b) + * static struct io_plan *write_out(struct io_conn *c, struct buffer *b) * { * // Remove what we just wrote. * b->start += b->wlen; * assert(b->start <= sizeof(b->buf)); * - * // If we wrote somthing, wake writer. + * // If we wrote something, wake writer. * if (b->wlen != 0) * io_wake(b); * - * b->wlen = b->end - b->start; * // Nothing to write? Wait for reader. - * if (b->wlen == 0) { + * if (b->end == b->start) { * if (b->finished) - * return io_close(); - * return io_wait(b, write_out, b); + * return io_close(c); + * return io_wait(c, b, IO_OUT, write_out, b); * } * - * return io_write_partial(b->buf + b->start, &b->wlen, write_out, b); + * return io_write_partial(c, b->buf + b->start, b->end - b->start, + * &b->wlen, write_out, b); * } * * // Feed a program our stdin, gather its stdout, print that at end. @@ -117,14 +109,14 @@ * * // Read from stdin, write to child. * memset(&to, 0, sizeof(to)); - * reader = io_new_conn(STDIN_FILENO, read_in(NULL, &to)); + * reader = io_new_conn(NULL, STDIN_FILENO, read_in, &to); * io_set_finish(reader, finish, &to); - * io_new_conn(tochild[1], write_out(NULL, &to)); + * io_new_conn(NULL, tochild[1], write_out, &to); * * // Read from child, write to stdout. - * reader = io_new_conn(fromchild[0], read_in(NULL, &from)); + * reader = io_new_conn(NULL, fromchild[0], read_in, &from); * io_set_finish(reader, finish, &from); - * io_new_conn(STDOUT_FILENO, write_out(NULL, &from)); + * io_new_conn(NULL, STDOUT_FILENO, write_out, &from); * * io_loop(); * wait(&status); @@ -141,9 +133,8 @@ int main(int argc, char *argv[]) return 1; if (strcmp(argv[1], "depends") == 0) { + printf("ccan/tal\n"); printf("ccan/typesafe_cb\n"); - printf("ccan/time\n"); - printf("ccan/timer\n"); return 0; } diff --git a/ccan/io/backend.h b/ccan/io/backend.h index b59b9fa3..44e64ab1 100644 --- a/ccan/io/backend.h +++ b/ccan/io/backend.h @@ -2,18 +2,8 @@ #ifndef CCAN_IO_BACKEND_H #define CCAN_IO_BACKEND_H #include -#include #include - -/* A setting for actions to always run (eg. zero-length reads). */ -#define POLLALWAYS (((POLLIN|POLLOUT) + 1) & ~((POLLIN|POLLOUT))) - -struct io_alloc { - void *(*alloc)(size_t size); - void *(*realloc)(void *ptr, size_t size); - void (*free)(void *ptr); -}; -extern struct io_alloc io_alloc; +#include "io_plan.h" struct fd { int fd; @@ -25,77 +15,43 @@ struct fd { struct io_listener { struct fd fd; + const tal_t *ctx; + /* These are for connections we create. */ - void (*init)(int fd, void *arg); + struct io_plan *(*init)(struct io_conn *conn, void *arg); void *arg; }; -struct io_timeout { - struct timer timer; - struct io_conn *conn; - - struct io_plan (*next)(struct io_conn *, void *arg); - void *next_arg; -}; - /* One connection per client. */ struct io_conn { struct fd fd; + /* always or closing list. */ + struct io_conn *list; + void (*finish)(struct io_conn *, void *arg); void *finish_arg; - struct io_conn *duplex; - struct io_timeout *timeout; - - struct io_plan plan; + struct io_plan plan[2]; }; -static inline bool timeout_active(const struct io_conn *conn) -{ - return conn->timeout && conn->timeout->conn; -} - extern void *io_loop_return; -#ifdef DEBUG -extern struct io_conn *current; -static inline void set_current(struct io_conn *conn) -{ - current = conn; -} -static inline bool doing_debug_on(struct io_conn *conn) -{ - return io_debug_conn && io_debug_conn(conn); -} -static inline bool doing_debug(void) -{ - return io_debug_conn; -} -#else -static inline void set_current(struct io_conn *conn) -{ -} -static inline bool doing_debug_on(struct io_conn *conn) -{ - return false; -} -static inline bool doing_debug(void) -{ - return false; -} -#endif - bool add_listener(struct io_listener *l); bool add_conn(struct io_conn *c); bool add_duplex(struct io_conn *c); void del_listener(struct io_listener *l); -void backend_plan_changed(struct io_conn *conn); -void backend_wait_changed(const void *wait); -void backend_add_timeout(struct io_conn *conn, struct timerel duration); -void backend_del_timeout(struct io_conn *conn); +void backend_new_closing(struct io_conn *conn); +void backend_new_always(struct io_conn *conn); +void backend_new_plan(struct io_conn *conn); + +void backend_plan_done(struct io_conn *conn); + +void backend_wake(const void *wait); void backend_del_conn(struct io_conn *conn); -void io_ready(struct io_conn *conn); +void io_ready(struct io_conn *conn, int pollflags); +void io_do_always(struct io_conn *conn); +void io_do_wakeup(struct io_conn *conn, struct io_plan *plan); void *do_io_loop(struct io_conn **ready); #endif /* CCAN_IO_BACKEND_H */ diff --git a/ccan/io/io.c b/ccan/io/io.c index 8e269529..52f4368c 100644 --- a/ccan/io/io.c +++ b/ccan/io/io.c @@ -13,112 +13,12 @@ void *io_loop_return; -struct io_alloc io_alloc = { - malloc, realloc, free -}; - -#ifdef DEBUG -/* Set to skip the next plan. */ -bool io_plan_nodebug; -/* The current connection to apply plan to. */ -struct io_conn *current; -/* User-defined function to select which connection(s) to debug. */ -bool (*io_debug_conn)(struct io_conn *conn); - -struct io_plan io_debug(struct io_plan plan) -{ - struct io_conn *ready = NULL; - - if (io_plan_nodebug) { - io_plan_nodebug = false; - return plan; - } - - if (!current || !doing_debug_on(current)) - return plan; - - current->plan = plan; - backend_plan_changed(current); - - /* Call back into the loop immediately. */ - io_loop_return = do_io_loop(&ready); - - if (ready) { - set_current(ready); - if (!ready->plan.next) { - /* Call finish function immediately. */ - if (ready->finish) { - errno = ready->plan.u1.s; - ready->finish(ready, ready->finish_arg); - ready->finish = NULL; - } - backend_del_conn(ready); - } else { - /* Calls back in itself, via io_debug_io(). */ - if (ready->plan.io(ready->fd.fd, &ready->plan) != 2) - abort(); - } - set_current(NULL); - } - - /* Return a do-nothing plan, so backend_plan_changed in - * io_ready doesn't do anything (it's already been called). */ - return io_wait_(NULL, (void *)1, NULL); -} - -int io_debug_io(int ret) -{ - /* Cache it for debugging; current changes. */ - struct io_conn *conn = current; - int saved_errno = errno; - - if (!doing_debug_on(conn)) - return ret; - - /* These will all go linearly through the io_debug() path above. */ - switch (ret) { - case -1: - /* This will call io_debug above. */ - errno = saved_errno; - io_close(); - break; - case 0: /* Keep going with plan. */ - io_debug(conn->plan); - break; - case 1: /* Done: get next plan. */ - if (timeout_active(conn)) - backend_del_timeout(conn); - /* In case they call io_duplex, clear our poll flags so - * both sides don't seem to be both doing read or write - * (See assert(!mask || pfd->events != mask) in poll.c) */ - conn->plan.pollflag = 0; - conn->plan.next(conn, conn->plan.next_arg); - break; - default: - abort(); - } - - /* Normally-invalid value, used for sanity check. */ - return 2; -} - -/* Counterpart to io_plan_no_debug(), called in macros in io.h */ -static void io_plan_debug_again(void) -{ - io_plan_nodebug = false; -} -#else -static void io_plan_debug_again(void) -{ -} -#endif - -struct io_listener *io_new_listener_(int fd, - void (*init)(int fd, void *arg), +struct io_listener *io_new_listener_(const tal_t *ctx, int fd, + struct io_plan *(*init)(struct io_conn *, + void *), void *arg) { - struct io_listener *l = io_alloc.alloc(sizeof(*l)); - + struct io_listener *l = tal(ctx, struct io_listener); if (!l) return NULL; @@ -126,10 +26,9 @@ struct io_listener *io_new_listener_(int fd, l->fd.fd = fd; l->init = init; l->arg = arg; - if (!add_listener(l)) { - io_alloc.free(l); - return NULL; - } + l->ctx = ctx; + if (!add_listener(l)) + return tal_free(l); return l; } @@ -137,29 +36,60 @@ void io_close_listener(struct io_listener *l) { close(l->fd.fd); del_listener(l); - io_alloc.free(l); + tal_free(l); } -struct io_conn *io_new_conn_(int fd, struct io_plan plan) +static struct io_plan *io_never_called(struct io_conn *conn, void *arg) { - struct io_conn *conn = io_alloc.alloc(sizeof(*conn)); + abort(); +} + +static void next_plan(struct io_conn *conn, struct io_plan *plan) +{ + struct io_plan *(*next)(struct io_conn *, void *arg); + + next = plan->next; - io_plan_debug_again(); + plan->status = IO_UNSET; + plan->io = NULL; + plan->next = io_never_called; + + plan = next(conn, plan->next_arg); + + /* It should have set a plan inside this conn. */ + assert(plan == &conn->plan[IO_IN] + || plan == &conn->plan[IO_OUT]); + assert(conn->plan[IO_IN].status != IO_UNSET + || conn->plan[IO_OUT].status != IO_UNSET); + + backend_new_plan(conn); +} + +struct io_conn *io_new_conn_(const tal_t *ctx, int fd, + struct io_plan *(*init)(struct io_conn *, void *), + void *arg) +{ + struct io_conn *conn = tal(ctx, struct io_conn); if (!conn) return NULL; conn->fd.listener = false; conn->fd.fd = fd; - conn->plan = plan; conn->finish = NULL; conn->finish_arg = NULL; - conn->duplex = NULL; - conn->timeout = NULL; - if (!add_conn(conn)) { - io_alloc.free(conn); - return NULL; - } + conn->list = NULL; + + if (!add_conn(conn)) + return tal_free(conn); + + /* We start with out doing nothing, and in doing our init. */ + conn->plan[IO_OUT].status = IO_UNSET; + + conn->plan[IO_IN].next = init; + conn->plan[IO_IN].next_arg = arg; + next_plan(conn, &conn->plan[IO_IN]); + return conn; } @@ -171,101 +101,69 @@ void io_set_finish_(struct io_conn *conn, conn->finish_arg = arg; } -struct io_conn *io_duplex_(struct io_conn *old, struct io_plan plan) +struct io_plan *io_get_plan(struct io_conn *conn, enum io_direction dir) { - struct io_conn *conn; - - io_plan_debug_again(); + assert(conn->plan[dir].status == IO_UNSET); - assert(!old->duplex); - - conn = io_alloc.alloc(sizeof(*conn)); - if (!conn) - return NULL; - - conn->fd.listener = false; - conn->fd.fd = old->fd.fd; - conn->plan = plan; - conn->duplex = old; - conn->finish = NULL; - conn->finish_arg = NULL; - conn->timeout = NULL; - if (!add_duplex(conn)) { - io_alloc.free(conn); - return NULL; - } - old->duplex = conn; - return conn; + conn->plan[dir].status = IO_POLLING; + return &conn->plan[dir]; } -bool io_timeout_(struct io_conn *conn, struct timerel t, - struct io_plan (*cb)(struct io_conn *, void *), void *arg) +static struct io_plan *set_always(struct io_conn *conn, + struct io_plan *plan, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg) { - assert(cb); - - if (!conn->timeout) { - conn->timeout = io_alloc.alloc(sizeof(*conn->timeout)); - if (!conn->timeout) - return false; - } else - assert(!timeout_active(conn)); - - conn->timeout->next = cb; - conn->timeout->next_arg = arg; - backend_add_timeout(conn, t); - return true; -} + plan->next = next; + plan->next_arg = arg; + plan->status = IO_ALWAYS; -/* Always done: call the next thing. */ -static int do_always(int fd, struct io_plan *plan) -{ - return 1; + backend_new_always(conn); + return plan; } -struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *), - void *arg) +struct io_plan *io_always_(struct io_conn *conn, + enum io_direction dir, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg) { - struct io_plan plan; + struct io_plan *plan = io_get_plan(conn, dir); - assert(cb); - plan.io = do_always; - plan.next = cb; - plan.next_arg = arg; - plan.pollflag = POLLALWAYS; + assert(next); + set_always(conn, plan, next, arg); return plan; } -/* Returns true if we're finished. */ static int do_write(int fd, struct io_plan *plan) { ssize_t ret = write(fd, plan->u1.cp, plan->u2.s); if (ret < 0) - return io_debug_io(-1); + return -1; plan->u1.cp += ret; plan->u2.s -= ret; - return io_debug_io(plan->u2.s == 0); + return plan->u2.s == 0; } /* Queue some data to be written. */ -struct io_plan io_write_(const void *data, size_t len, - struct io_plan (*cb)(struct io_conn *, void *), - void *arg) +struct io_plan *io_write_(struct io_conn *conn, const void *data, size_t len, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg) { - struct io_plan plan; + struct io_plan *plan = io_get_plan(conn, IO_OUT); - assert(cb); + assert(next); if (len == 0) - return io_always_(cb, arg); + return set_always(conn, plan, next, arg); - plan.u1.const_vp = data; - plan.u2.s = len; - plan.io = do_write; - plan.next = cb; - plan.next_arg = arg; - plan.pollflag = POLLOUT; + plan->u1.const_vp = data; + plan->u2.s = len; + plan->io = do_write; + plan->next = next; + plan->next_arg = arg; return plan; } @@ -274,32 +172,31 @@ static int do_read(int fd, struct io_plan *plan) { ssize_t ret = read(fd, plan->u1.cp, plan->u2.s); if (ret <= 0) - return io_debug_io(-1); + return -1; plan->u1.cp += ret; plan->u2.s -= ret; - return io_debug_io(plan->u2.s == 0); + return plan->u2.s == 0; } /* Queue a request to read into a buffer. */ -struct io_plan io_read_(void *data, size_t len, - struct io_plan (*cb)(struct io_conn *, void *), - void *arg) +struct io_plan *io_read_(struct io_conn *conn, + void *data, size_t len, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg) { - struct io_plan plan; + struct io_plan *plan = io_get_plan(conn, IO_IN); - assert(cb); + assert(next); if (len == 0) - return io_always_(cb, arg); - - plan.u1.cp = data; - plan.u2.s = len; - plan.io = do_read; - plan.next = cb; - plan.next_arg = arg; + return set_always(conn, plan, next, arg); - plan.pollflag = POLLIN; + plan->u1.cp = data; + plan->u2.s = len; + plan->io = do_read; + plan->next = next; + plan->next_arg = arg; return plan; } @@ -308,30 +205,33 @@ static int do_read_partial(int fd, struct io_plan *plan) { ssize_t ret = read(fd, plan->u1.cp, *(size_t *)plan->u2.vp); if (ret <= 0) - return io_debug_io(-1); + return -1; *(size_t *)plan->u2.vp = ret; - return io_debug_io(1); + return 1; } /* Queue a partial request to read into a buffer. */ -struct io_plan io_read_partial_(void *data, size_t *len, - struct io_plan (*cb)(struct io_conn *, void *), - void *arg) +struct io_plan *io_read_partial_(struct io_conn *conn, + void *data, size_t maxlen, size_t *len, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg) { - struct io_plan plan; + struct io_plan *plan = io_get_plan(conn, IO_IN); - assert(cb); + assert(next); - if (*len == 0) - return io_always_(cb, arg); + if (maxlen == 0) + return set_always(conn, plan, next, arg); - plan.u1.cp = data; - plan.u2.vp = len; - plan.io = do_read_partial; - plan.next = cb; - plan.next_arg = arg; - plan.pollflag = POLLIN; + plan->u1.cp = data; + /* We store the max len in here temporarily. */ + *len = maxlen; + plan->u2.vp = len; + plan->io = do_read_partial; + plan->next = next; + plan->next_arg = arg; return plan; } @@ -340,39 +240,37 @@ static int do_write_partial(int fd, struct io_plan *plan) { ssize_t ret = write(fd, plan->u1.cp, *(size_t *)plan->u2.vp); if (ret < 0) - return io_debug_io(-1); + return -1; *(size_t *)plan->u2.vp = ret; - return io_debug_io(1); + return 1; } /* Queue a partial write request. */ -struct io_plan io_write_partial_(const void *data, size_t *len, - struct io_plan (*cb)(struct io_conn*, void *), - void *arg) +struct io_plan *io_write_partial_(struct io_conn *conn, + const void *data, size_t maxlen, size_t *len, + struct io_plan *(*next)(struct io_conn *, + void*), + void *arg) { - struct io_plan plan; + struct io_plan *plan = io_get_plan(conn, IO_OUT); - assert(cb); + assert(next); - if (*len == 0) - return io_always_(cb, arg); + if (maxlen == 0) + return set_always(conn, plan, next, arg); - plan.u1.const_vp = data; - plan.u2.vp = len; - plan.io = do_write_partial; - plan.next = cb; - plan.next_arg = arg; - plan.pollflag = POLLOUT; + plan->u1.const_vp = data; + /* We store the max len in here temporarily. */ + *len = maxlen; + plan->u2.vp = len; + plan->io = do_write_partial; + plan->next = next; + plan->next_arg = arg; return plan; } -static int already_connected(int fd, struct io_plan *plan) -{ - return io_debug_io(1); -} - static int do_connect(int fd, struct io_plan *plan) { int err, ret; @@ -394,143 +292,135 @@ static int do_connect(int fd, struct io_plan *plan) return -1; } -struct io_plan io_connect_(int fd, const struct addrinfo *addr, - struct io_plan (*cb)(struct io_conn*, void *), - void *arg) +struct io_plan *io_connect_(struct io_conn *conn, const struct addrinfo *addr, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg) { - struct io_plan plan; - - assert(cb); + struct io_plan *plan = io_get_plan(conn, IO_IN); + int fd = io_conn_fd(conn); - plan.next = cb; - plan.next_arg = arg; + assert(next); /* Save old flags, set nonblock if not already. */ - plan.u1.s = fcntl(fd, F_GETFL); - fcntl(fd, F_SETFL, plan.u1.s | O_NONBLOCK); + plan->u1.s = fcntl(fd, F_GETFL); + fcntl(fd, F_SETFL, plan->u1.s | O_NONBLOCK); /* Immediate connect can happen. */ - if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0) { - /* Dummy will be called immediately. */ - plan.pollflag = POLLOUT; - plan.io = already_connected; - } else { - if (errno != EINPROGRESS) - return io_close_(); - - plan.pollflag = POLLIN; - plan.io = do_connect; - } + if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0) + return set_always(conn, plan, next, arg); + + if (errno != EINPROGRESS) + return io_close(conn); + + plan->next = next; + plan->next_arg = arg; + plan->io = do_connect; + return plan; } -struct io_plan io_wait_(const void *wait, - struct io_plan (*cb)(struct io_conn *, void*), - void *arg) +struct io_plan *io_wait_(struct io_conn *conn, + const void *wait, enum io_direction dir, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg) { - struct io_plan plan; + struct io_plan *plan = io_get_plan(conn, dir); - assert(cb); - plan.pollflag = 0; - plan.io = NULL; - plan.next = cb; - plan.next_arg = arg; + assert(next); - plan.u1.const_vp = wait; + plan->next = next; + plan->next_arg = arg; + plan->u1.const_vp = wait; + plan->status = IO_WAITING; return plan; } void io_wake(const void *wait) { - backend_wait_changed(wait); + backend_wake(wait); } -void io_ready(struct io_conn *conn) +static void do_plan(struct io_conn *conn, struct io_plan *plan) { - /* Beware io_close_other! */ - if (!conn->plan.next) + /* Someone else might have called io_close() on us. */ + if (plan->status == IO_CLOSING) return; - set_current(conn); - switch (conn->plan.io(conn->fd.fd, &conn->plan)) { - case -1: /* Failure means a new plan: close up. */ - conn->plan = io_close(); - backend_plan_changed(conn); + /* We shouldn't have polled for this event if this wasn't true! */ + assert(plan->status == IO_POLLING); + + switch (plan->io(conn->fd.fd, plan)) { + case -1: + io_close(conn); break; - case 0: /* Keep going with plan. */ + case 0: break; - case 1: /* Done: get next plan. */ - if (timeout_active(conn)) - backend_del_timeout(conn); - /* In case they call io_duplex, clear our poll flags so - * both sides don't seem to be both doing read or write - * (See assert(!mask || pfd->events != mask) in poll.c) */ - conn->plan.pollflag = 0; - conn->plan = conn->plan.next(conn, conn->plan.next_arg); - backend_plan_changed(conn); + case 1: + next_plan(conn, plan); + break; + default: + /* IO should only return -1, 0 or 1 */ + abort(); } - set_current(NULL); } -/* Close the connection, we're done. */ -struct io_plan io_close_(void) +void io_ready(struct io_conn *conn, int pollflags) { - struct io_plan plan; - - plan.pollflag = 0; - /* This means we're closing. */ - plan.next = NULL; - plan.u1.s = errno; + if (pollflags & POLLIN) + do_plan(conn, &conn->plan[IO_IN]); - return plan; + if (pollflags & POLLOUT) + do_plan(conn, &conn->plan[IO_OUT]); } -struct io_plan io_close_cb(struct io_conn *conn, void *arg) +void io_do_always(struct io_conn *conn) { - return io_close(); + if (conn->plan[IO_IN].status == IO_ALWAYS) + next_plan(conn, &conn->plan[IO_IN]); + + if (conn->plan[IO_OUT].status == IO_ALWAYS) + next_plan(conn, &conn->plan[IO_OUT]); } -void io_close_other(struct io_conn *conn) +void io_do_wakeup(struct io_conn *conn, struct io_plan *plan) { - /* Don't close if already closing! */ - if (conn->plan.next) { - conn->plan = io_close_(); - backend_plan_changed(conn); - } + assert(plan->status == IO_WAITING); + next_plan(conn, plan); } -/* Exit the loop, returning this (non-NULL) arg. */ -struct io_plan io_break_(void *ret, struct io_plan plan) +/* Close the connection, we're done. */ +struct io_plan *io_close(struct io_conn *conn) { - io_plan_debug_again(); + /* Already closing? Don't close twice. */ + if (conn->plan[IO_IN].status == IO_CLOSING) + return &conn->plan[IO_IN]; - assert(ret); - io_loop_return = ret; + conn->plan[IO_IN].status = conn->plan[IO_OUT].status = IO_CLOSING; + conn->plan[IO_IN].u1.s = errno; + backend_new_closing(conn); - return plan; + return &conn->plan[IO_IN]; } -static struct io_plan io_never_called(struct io_conn *conn, void *arg) +struct io_plan *io_close_cb(struct io_conn *conn, void *arg) { - abort(); + return io_close(conn); } -struct io_plan io_never(void) +/* Exit the loop, returning this (non-NULL) arg. */ +void io_break(const void *ret) { - return io_always_(io_never_called, NULL); + assert(ret); + io_loop_return = (void *)ret; } -int io_conn_fd(const struct io_conn *conn) +struct io_plan *io_never(struct io_conn *conn) { - return conn->fd.fd; + return io_always(conn, IO_IN, io_never_called, NULL); } -void io_set_alloc(void *(*allocfn)(size_t size), - void *(*reallocfn)(void *ptr, size_t size), - void (*freefn)(void *ptr)) +int io_conn_fd(const struct io_conn *conn) { - io_alloc.alloc = allocfn; - io_alloc.realloc = reallocfn; - io_alloc.free = freefn; + return conn->fd.fd; } diff --git a/ccan/io/io.h b/ccan/io/io.h index 3bc1c1fa..e2792844 100644 --- a/ccan/io/io.h +++ b/ccan/io/io.h @@ -1,35 +1,69 @@ /* Licensed under LGPLv2.1+ - see LICENSE file for details */ #ifndef CCAN_IO_H #define CCAN_IO_H +#include #include -#include #include #include -#include "io_plan.h" + +enum io_direction { + IO_IN, + IO_OUT +}; + +/** + * struct io_plan - a plan for input or output. + * + * Each io_conn has zero to two of these active at any time. + */ +struct io_plan; + +/** + * struct io_conn - a connection associated with an fd. + */ +struct io_conn; /** * io_new_conn - create a new connection. + * @ctx: the context to tal from (or NULL) * @fd: the file descriptor. - * @plan: the first I/O to perform. + * @init: the function to call for a new connection + * @arg: the argument to @init. * - * This creates a connection which owns @fd. @plan will be called on the - * next io_loop(). + * This creates a connection which owns @fd, it then calls + * @init to initialize the connection, which sets up an io_plan. * * Returns NULL on error (and sets errno). * * Example: + * // Dumb init function to print string and tell conn to close. + * static struct io_plan *conn_init(struct io_conn *conn, const char *msg) + * { + * printf("Created conn %p: %s", conn, msg); + * return io_close(conn); + * } + * + * static void create_self_closing_pipe(void) + * { * int fd[2]; * struct io_conn *conn; * * pipe(fd); - * // Plan is to close the fd immediately. - * conn = io_new_conn(fd[0], io_close()); + * conn = io_new_conn(NULL, fd[0], conn_init, (const char *)"hi!"); * if (!conn) * exit(1); + * } */ -#define io_new_conn(fd, plan) \ - (io_plan_no_debug(), io_new_conn_((fd), (plan))) -struct io_conn *io_new_conn_(int fd, struct io_plan plan); +#define io_new_conn(ctx, fd, init, arg) \ + io_new_conn_((ctx), (fd), \ + typesafe_cb_preargs(struct io_plan *, void *, \ + (init), (arg), \ + struct io_conn *conn), \ + (void *)(arg)) + +struct io_conn *io_new_conn_(const tal_t *ctx, int fd, + struct io_plan *(*init)(struct io_conn *, void *), + void *arg); /** * io_set_finish - set finish function on a connection. @@ -40,34 +74,42 @@ struct io_conn *io_new_conn_(int fd, struct io_plan plan); * @finish will be called when an I/O operation fails, or you call * io_close() on the connection. errno will be set to the value * after the failed I/O, or at the call to io_close(). The fd - * will be closed (unless a duplex) before @finish is called. + * will be closed before @finish is called. * * Example: - * static void finish(struct io_conn *conn, void *unused) + * static void finish(struct io_conn *conn, const char *msg) * { * // errno is not 0 after success, so this is a bit useless. - * printf("Conn %p closed with errno %i\n", conn, errno); + * printf("Conn %p closed with errno %i (%s)\n", conn, errno, msg); + * } + * + * // Dumb init function to print string and tell conn to close. + * static struct io_plan *conn_init(struct io_conn *conn, const char *msg) + * { + * io_set_finish(conn, finish, msg); + * return io_close(conn); * } - * ... - * io_set_finish(conn, finish, NULL); */ #define io_set_finish(conn, finish, arg) \ io_set_finish_((conn), \ typesafe_cb_preargs(void, void *, \ (finish), (arg), \ struct io_conn *), \ - (arg)) + (void *)(arg)) void io_set_finish_(struct io_conn *conn, void (*finish)(struct io_conn *, void *), void *arg); + /** * io_new_listener - create a new accepting listener. + * @ctx: the context to tal from (or NULL) * @fd: the file descriptor. * @init: the function to call for a new connection * @arg: the argument to @init. * - * When @fd becomes readable, we accept() and pass that fd to init(). + * When @fd becomes readable, we accept(), create a new connection, + * (tal'ocated off @ctx) and pass that to init(). * * Returns NULL on error (and sets errno). * @@ -76,11 +118,7 @@ void io_set_finish_(struct io_conn *conn, * #include * #include * - * static void start_conn(int fd, char *msg) - * { - * printf("%s fd %i\n", msg, fd); - * close(fd); - * } + * ... * * // Set up a listening socket, return it. * static struct io_listener *do_listen(const char *port) @@ -112,17 +150,18 @@ void io_set_finish_(struct io_conn *conn, * close(fd); * return NULL; * } - * return io_new_listener(fd, start_conn, (char *)"Got one!"); + * return io_new_listener(NULL, fd, conn_init, (const char *)"listened!"); * } */ -#define io_new_listener(fd, init, arg) \ - io_new_listener_((fd), \ - typesafe_cb_preargs(void, void *, \ +#define io_new_listener(ctx, fd, init, arg) \ + io_new_listener_((ctx), (fd), \ + typesafe_cb_preargs(struct io_plan *, void *, \ (init), (arg), \ - int fd), \ - (arg)) -struct io_listener *io_new_listener_(int fd, - void (*init)(int fd, void *arg), + struct io_conn *conn), \ + (void *)(arg)) +struct io_listener *io_new_listener_(const tal_t *ctx, int fd, + struct io_plan *(*init)(struct io_conn *, + void *), void *arg); /** @@ -142,73 +181,79 @@ struct io_listener *io_new_listener_(int fd, void io_close_listener(struct io_listener *listener); /** - * io_write - plan to write data. + * io_write - output plan to write data. + * @conn: the connection that plan is for. * @data: the data buffer. * @len: the length to write. - * @cb: function to call once it's done. - * @arg: @cb argument + * @next: function to call output is done. + * @arg: @next argument * - * This creates a plan write out a data buffer. Once it's all - * written, the @cb function will be called: on an error, the finish + * This updates the output plan, to write out a data buffer. Once it's all + * written, the @next function will be called: on an error, the finish * function is called instead. * * Note that the I/O may actually be done immediately. * * Example: - * static void start_conn_with_write(int fd, const char *msg) + * static struct io_plan *write_to_conn(struct io_conn *conn, const char *msg) * { * // Write message, then close. - * io_new_conn(fd, io_write(msg, strlen(msg), io_close_cb, NULL)); + * return io_write(conn, msg, strlen(msg), io_close_cb, NULL); * } */ -#define io_write(data, len, cb, arg) \ - io_debug(io_write_((data), (len), \ - typesafe_cb_preargs(struct io_plan, void *, \ - (cb), (arg), struct io_conn *), \ - (arg))) -struct io_plan io_write_(const void *data, size_t len, - struct io_plan (*cb)(struct io_conn *, void *), - void *arg); +#define io_write(conn, data, len, next, arg) \ + io_write_((conn), (data), (len), \ + typesafe_cb_preargs(struct io_plan *, void *, \ + (next), (arg), struct io_conn *), \ + (arg)) +struct io_plan *io_write_(struct io_conn *conn, + const void *data, size_t len, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg); /** - * io_read - plan to read data. + * io_read - input plan to read data. + * @conn: the connection that plan is for. * @data: the data buffer. * @len: the length to read. - * @cb: function to call once it's done. - * @arg: @cb argument + * @next: function to call once input is done. + * @arg: @next argument * * This creates a plan to read data into a buffer. Once it's all - * read, the @cb function will be called: on an error, the finish + * read, the @next function will be called: on an error, the finish * function is called instead. * * Note that the I/O may actually be done immediately. * * Example: - * static void start_conn_with_read(int fd, char msg[12]) + * static struct io_plan *read_from_conn(struct io_conn *conn, char *buf) * { * // Read message, then close. - * io_new_conn(fd, io_read(msg, 12, io_close_cb, NULL)); + * return io_read(conn, buf, 12, io_close_cb, NULL); * } */ -#define io_read(data, len, cb, arg) \ - io_debug(io_read_((data), (len), \ - typesafe_cb_preargs(struct io_plan, void *, \ - (cb), (arg), struct io_conn *), \ - (arg))) -struct io_plan io_read_(void *data, size_t len, - struct io_plan (*cb)(struct io_conn *, void *), - void *arg); +#define io_read(conn, data, len, next, arg) \ + io_read_((conn), (data), (len), \ + typesafe_cb_preargs(struct io_plan *, void *, \ + (next), (arg), struct io_conn *), \ + (arg)) +struct io_plan *io_read_(struct io_conn *conn, + void *data, size_t len, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg); /** - * io_read_partial - plan to read some data. + * io_read_partial - input plan to read some data. + * @conn: the connection that plan is for. * @data: the data buffer. - * @len: the maximum length to read, set to the length actually read. - * @cb: function to call once it's done. - * @arg: @cb argument + * @maxlen: the maximum length to read + * @lenp: set to the length actually read. + * @next: function to call once input is done. + * @arg: @next argument * * This creates a plan to read data into a buffer. Once any data is - * read, @len is updated and the @cb function will be called: on an + * read, @len is updated and the @next function will be called: on an * error, the finish function is called instead. * * Note that the I/O may actually be done immediately. @@ -219,41 +264,42 @@ struct io_plan io_read_(void *data, size_t len, * char buf[12]; * }; * - * static struct io_plan dump_and_close(struct io_conn *conn, struct buf *b) + * static struct io_plan *dump(struct io_conn *conn, struct buf *b) * { * printf("Partial read: '%*s'\n", (int)b->len, b->buf); * free(b); - * return io_close(); + * return io_close(conn); * } * - * static void start_conn_with_part_read(int fd, void *unused) + * static struct io_plan *read_part(struct io_conn *conn, struct buf *b) * { - * struct buf *b = malloc(sizeof(*b)); - * * // Read message, then dump and close. - * b->len = sizeof(b->buf); - * io_new_conn(fd, io_read_partial(b->buf, &b->len, dump_and_close, b)); + * return io_read_partial(conn, b->buf, sizeof(b->buf), &b->len, dump, b); * } */ -#define io_read_partial(data, len, cb, arg) \ - io_debug(io_read_partial_((data), (len), \ - typesafe_cb_preargs(struct io_plan, void *, \ - (cb), (arg), \ - struct io_conn *), \ - (arg))) -struct io_plan io_read_partial_(void *data, size_t *len, - struct io_plan (*cb)(struct io_conn *, void *), - void *arg); +#define io_read_partial(conn, data, maxlen, lenp, next, arg) \ + io_read_partial_((conn), (data), (maxlen), (lenp), \ + typesafe_cb_preargs(struct io_plan *, void *, \ + (next), (arg), \ + struct io_conn *), \ + (arg)) +struct io_plan *io_read_partial_(struct io_conn *conn, + void *data, size_t maxlen, size_t *lenp, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg); /** - * io_write_partial - plan to write some data. + * io_write_partial - output plan to write some data. + * @conn: the connection that plan is for. * @data: the data buffer. - * @len: the maximum length to write, set to the length actually written. - * @cb: function to call once it's done. - * @arg: @cb argument + * @maxlen: the maximum length to write + * @lenp: set to the length actually written. + * @next: function to call once output is done. + * @arg: @next argument * * This creates a plan to write data from a buffer. Once any data is - * written, @len is updated and the @cb function will be called: on an + * written, @len is updated and the @next function will be called: on an * error, the finish function is called instead. * * Note that the I/O may actually be done immediately. @@ -264,70 +310,71 @@ struct io_plan io_read_partial_(void *data, size_t *len, * char buf[12]; * }; * - * static struct io_plan show_remainder(struct io_conn *conn, struct buf *b) + * static struct io_plan *show_partial(struct io_conn *conn, struct buf *b) * { * printf("Only wrote: '%*s'\n", (int)b->len, b->buf); * free(b); - * return io_close(); + * return io_close(conn); * } * - * static void start_conn_with_part_read(int fd, void *unused) + * static struct io_plan *write_part(struct io_conn *conn, struct buf *b) * { - * struct buf *b = malloc(sizeof(*b)); - * * // Write message, then dump and close. - * b->len = sizeof(b->buf); * strcpy(b->buf, "Hello world"); - * io_new_conn(fd, io_write_partial(b->buf, &b->len, show_remainder, b)); + * return io_write_partial(conn, b->buf, strlen(b->buf), + * &b->len, show_partial, b); * } */ -#define io_write_partial(data, len, cb, arg) \ - io_debug(io_write_partial_((data), (len), \ - typesafe_cb_preargs(struct io_plan, void *, \ - (cb), (arg), \ - struct io_conn *), \ - (arg))) -struct io_plan io_write_partial_(const void *data, size_t *len, - struct io_plan (*cb)(struct io_conn *, void*), - void *arg); +#define io_write_partial(conn, data, maxlen, lenp, next, arg) \ + io_write_partial_((conn), (data), (maxlen), (lenp), \ + typesafe_cb_preargs(struct io_plan *, void *, \ + (next), (arg), \ + struct io_conn *), \ + (arg)) +struct io_plan *io_write_partial_(struct io_conn *conn, + const void *data, size_t maxlen, size_t *lenp, + struct io_plan *(*next)(struct io_conn *, + void*), + void *arg); /** - * io_always - plan to immediately call next callback. - * @cb: function to call. - * @arg: @cb argument + * io_always - plan to immediately call next callback + * @conn: the connection that plan is for. + * @dir: IO_IN or IO_OUT + * @next: function to call. + * @arg: @next argument * * Sometimes it's neater to plan a callback rather than call it directly; * for example, if you only need to read data for one path and not another. * * Example: - * static void start_conn_with_nothing(int fd) + * static struct io_plan *init_conn_with_nothing(struct io_conn *conn, + * void *unused) * { * // Silly example: close on next time around loop. - * io_new_conn(fd, io_always(io_close_cb, NULL)); + * return io_always(conn, IO_IN, io_close_cb, NULL); * } */ -#define io_always(cb, arg) \ - io_debug(io_always_(typesafe_cb_preargs(struct io_plan, void *, \ - (cb), (arg), \ - struct io_conn *), \ - (arg))) -struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *), - void *arg); +#define io_always(conn, dir, next, arg) \ + io_always_((conn), dir, typesafe_cb_preargs(struct io_plan *, void *, \ + (next), (arg), \ + struct io_conn *), \ + (arg)) +struct io_plan *io_always_(struct io_conn *conn, enum io_direction dir, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg); /** - * io_connect - plan to connect to a listening socket. - * @fd: file descriptor. + * io_connect - create an asynchronous connection to a listening socket. + * @conn: the connection that plan is for. * @addr: where to connect. - * @cb: function to call once it's done. - * @arg: @cb argument + * @init: function to call once it's connected + * @arg: @init argument * * This initiates a connection, and creates a plan for - * (asynchronously). completing it. Once complete, @len is updated - * and the @cb function will be called: on an error, the finish - * function is called instead. - * - * Note that the connect may actually be done immediately. + * (asynchronously) completing it. Once complete, the @init function + * will be called. * * Example: * #include @@ -335,9 +382,10 @@ struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *), * #include * * // Write, then close socket. - * static struct io_plan start_write(struct io_conn *conn, void *unused) + * static struct io_plan *init_connect(struct io_conn *conn, + * struct addrinfo *addrinfo) * { - * return io_write("hello", 5, io_close_cb, NULL); + * return io_connect(conn, addrinfo, io_close_cb, NULL); * } * * ... @@ -347,177 +395,123 @@ struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *), * * fd = socket(AF_INET, SOCK_STREAM, 0); * getaddrinfo("localhost", "8111", NULL, &addrinfo); - * io_new_conn(fd, io_connect(fd, addrinfo, start_write, NULL)); + * io_new_conn(NULL, fd, init_connect, addrinfo); */ struct addrinfo; -#define io_connect(fd, addr, cb, arg) \ - io_debug(io_connect_((fd), (addr), \ - typesafe_cb_preargs(struct io_plan, void *, \ - (cb), (arg), \ - struct io_conn *), \ - (arg))) -struct io_plan io_connect_(int fd, const struct addrinfo *addr, - struct io_plan (*cb)(struct io_conn *, void*), - void *arg); - -/** - * io_wait - plan to wait for something. - * @wait: the address to wait on. - * @cb: function to call after waiting. - * @arg: @cb argument - * - * This indicates the connection is idle: io_wake() will be called later to - * restart the connection. - * - * Example: - * struct io_conn *sleeper; - * unsigned int counter = 0; - * sleeper = io_new_conn(open("/dev/null", O_RDONLY), - * io_wait(&counter, io_close_cb, NULL)); - * if (!sleeper) - * exit(1); - */ -#define io_wait(wait, cb, arg) \ - io_debug(io_wait_(wait, \ - typesafe_cb_preargs(struct io_plan, void *, \ - (cb), (arg), \ - struct io_conn *), \ - (arg))) - -struct io_plan io_wait_(const void *wait, - struct io_plan (*cb)(struct io_conn *, void *), - void *arg); - -/** - * io_timeout - set timeout function if the callback doesn't complete. - * @conn: the current connection. - * @t: how long until the timeout should be called. - * @cb: callback to call. - * @arg: argument to @cb. - * - * If the usual next callback is not called for this connection before @ts, - * this function will be called. If next callback is called, the timeout - * is automatically removed. - * - * Returns false on allocation failure. A connection can only have one - * timeout. - * - * Example: - * static struct io_plan close_on_timeout(struct io_conn *conn, char *msg) - * { - * printf("%s\n", msg); - * return io_close(); - * } - * - * ... - * io_timeout(sleeper, time_from_msec(100), - * close_on_timeout, (char *)"Bye!"); - */ -#define io_timeout(conn, ts, fn, arg) \ - io_timeout_((conn), (ts), \ - typesafe_cb_preargs(struct io_plan, void *, \ - (fn), (arg), \ +#define io_connect(conn, addr, next, arg) \ + io_connect_((conn), (addr), \ + typesafe_cb_preargs(struct io_plan *, void *, \ + (next), (arg), \ struct io_conn *), \ (arg)) -bool io_timeout_(struct io_conn *conn, struct timerel t, - struct io_plan (*fn)(struct io_conn *, void *), void *arg); + +struct io_plan *io_connect_(struct io_conn *conn, const struct addrinfo *addr, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg); /** - * io_duplex - split an fd into two connections. - * @conn: a connection. - * @plan: the first I/O function to call. - * - * Sometimes you want to be able to simultaneously read and write on a - * single fd, but io forces a linear call sequence. The solution is - * to have two connections for the same fd, and use one for read - * operations and one for write. + * io_wait - leave a plan idle until something wakes us. + * @conn: the connection that plan is for. + * @waitaddr: the address to wait on. + * @dir: IO_IN or IO_OUT + * @next: function to call after waiting. + * @arg: @next argument * - * You must io_close() both of them to close the fd. + * This leaves the input or output idle: io_wake(@waitaddr) will be + * called later to restart the connection. * * Example: - * static void setup_read_write(int fd, - * char greet_in[5], const char greet_out[5]) - * { - * struct io_conn *writer, *reader; - * - * // Read their greeting and send ours at the same time. - * writer = io_new_conn(fd, - * io_write(greet_out, 5, io_close_cb, NULL)); - * reader = io_duplex(writer, - * io_read(greet_in, 5, io_close_cb, NULL)); - * if (!reader || !writer) - * exit(1); - * } + * // Silly example to wait then close. + * static struct io_plan *wait(struct io_conn *conn, void *b) + * { + * return io_wait(conn, b, IO_IN, io_close_cb, NULL); + * } */ -#define io_duplex(conn, plan) \ - (io_plan_no_debug(), io_duplex_((conn), (plan))) -struct io_conn *io_duplex_(struct io_conn *conn, struct io_plan plan); +#define io_wait(conn, waitaddr, dir, next, arg) \ + io_wait_((conn), (waitaddr), (dir), \ + typesafe_cb_preargs(struct io_plan *, void *, \ + (next), (arg), \ + struct io_conn *), \ + (arg)) + +struct io_plan *io_wait_(struct io_conn *conn, + const void *wait, enum io_direction dir, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg); + /** * io_wake - wake up any connections waiting on @wait - * @wait: the address to trigger. - * - * Example: - * unsigned int wait; + * @waitaddr: the address to trigger. * - * io_new_conn(open("/dev/null", O_RDONLY), - * io_wait(&wait, io_close_cb, NULL)); + * All io_conns who have returned io_wait() on @waitaddr will move on + * to their next callback. * - * io_wake(&wait); + * Example: + * static struct io_plan *wake_it(struct io_conn *conn, void *b) + * { + * io_wake(b); + * return io_close(conn); + * } */ void io_wake(const void *wait); /** * io_break - return from io_loop() * @ret: non-NULL value to return from io_loop(). - * @plan: I/O to perform on return (if any) * - * This breaks out of the io_loop. As soon as the current @next - * function returns, any io_closed()'d connections will have their - * finish callbacks called, then io_loop() with return with @ret. + * This breaks out of the io_loop. As soon as the current function + * returns, any io_close()'d connections will have their finish + * callbacks called, then io_loop() with return with @ret. * * If io_loop() is called again, then @plan will be carried out. * * Example: - * static struct io_plan fail_on_timeout(struct io_conn *conn, char *msg) + * static struct io_plan *fail_on_timeout(struct io_conn *conn, char *msg) * { - * return io_break(msg, io_close()); + * io_break(msg); + * return io_close(conn); * } */ -#define io_break(ret, plan) (io_plan_no_debug(), io_break_((ret), (plan))) -struct io_plan io_break_(void *ret, struct io_plan plan); +void io_break(const void *ret); /** * io_never - assert if callback is called. + * @conn: the connection that plan is for. * * Sometimes you want to make it clear that a callback should never happen * (eg. for io_break). This will assert() if called. * * Example: - * static struct io_plan break_out(struct io_conn *conn, void *unused) + * static struct io_plan *break_out(struct io_conn *conn, void *unused) * { + * io_break(conn); * // We won't ever return from io_break - * return io_break(conn, io_never()); + * return io_never(conn); * } */ -struct io_plan io_never(void); +struct io_plan *io_never(struct io_conn *conn); /* FIXME: io_recvfrom/io_sendto */ /** * io_close - plan to close a connection. + * @conn: the connection to close. + * + * On return to io_loop, the connection will be closed. It doesn't have + * to be the current connection and it doesn't need to be idle. No more + * IO or callbacks will occur. * - * On return to io_loop, the connection will be closed. + * You can close a connection twice without harmful effects. * * Example: - * static struct io_plan close_on_timeout(struct io_conn *conn, const char *msg) + * static struct io_plan *close_on_timeout(struct io_conn *conn, const char *msg) * { * printf("closing: %s\n", msg); - * return io_close(); + * return io_close(conn); * } */ -#define io_close() io_debug(io_close_()) -struct io_plan io_close_(void); +struct io_plan *io_close(struct io_conn *conn); /** * io_close_cb - helper callback to close a connection. @@ -529,25 +523,7 @@ struct io_plan io_close_(void); * Example: * #define close_on_timeout io_close_cb */ -struct io_plan io_close_cb(struct io_conn *, void *unused); - -/** - * io_close_other - close different connection next time around the I/O loop. - * @conn: the connection to close. - * - * This is used to force a different connection to close: no more I/O will - * happen on @conn, even if it's pending. - * - * It's a bug to use this on the current connection! - * - * Example: - * static void stop_connection(struct io_conn *conn) - * { - * printf("forcing stop on connection\n"); - * io_close_other(conn); - * } - */ -void io_close_other(struct io_conn *conn); +struct io_plan *io_close_cb(struct io_conn *, void *unused); /** * io_loop - process fds until all closed on io_break. @@ -567,17 +543,4 @@ void *io_loop(void); * Sometimes useful, eg for getsockname(). */ int io_conn_fd(const struct io_conn *conn); - -/** - * io_set_alloc - set alloc/realloc/free function for io to use. - * @allocfn: allocator function - * @reallocfn: reallocator function, ptr may be NULL, size never 0. - * @freefn: free function - * - * By default io uses malloc/realloc/free, and returns NULL if they fail. - * You can set your own variants here. - */ -void io_set_alloc(void *(*allocfn)(size_t size), - void *(*reallocfn)(void *ptr, size_t size), - void (*freefn)(void *ptr)); #endif /* CCAN_IO_H */ diff --git a/ccan/io/io_plan.h b/ccan/io/io_plan.h index 61d76245..0b3c27b1 100644 --- a/ccan/io/io_plan.h +++ b/ccan/io/io_plan.h @@ -4,129 +4,50 @@ struct io_conn; /** - * struct io_plan - a plan of what I/O to do. - * @pollflag: POLLIN or POLLOUT. - * @io: function to call when fd is available for @pollflag. - * @next: function to call after @io returns true. - * @next_arg: argument to @next. - * @u1: scratch area for I/O. - * @u2: scratch area for I/O. - * - * When the fd is POLLIN or POLLOUT (according to @pollflag), @io is - * called. If it returns -1, io_close() becomed the new plan (and errno - * is saved). If it returns 1, @next is called, otherwise @io is - * called again when @pollflag is available. - * - * You can use this to write your own io_plan functions. + * union io_plan_arg - scratch space for struct io_plan read/write fns. */ -struct io_plan { - int pollflag; - /* Only NULL if idle. */ - int (*io)(int fd, struct io_plan *plan); - /* Only NULL if closing. */ - struct io_plan (*next)(struct io_conn *, void *arg); - void *next_arg; +union io_plan_arg { + char *cp; + void *vp; + const void *const_vp; + size_t s; + char c[sizeof(size_t)]; +}; - union { - char *cp; - void *vp; - const void *const_vp; - size_t s; - char c[sizeof(size_t)]; - } u1; - union { - char *p; - void *vp; - const void *const_vp; - size_t s; - char c[sizeof(size_t)]; - } u2; +enum io_plan_status { + /* As before calling next function. */ + IO_UNSET, + /* Normal. */ + IO_POLLING, + /* Waiting for io_wake */ + IO_WAITING, + /* Always do this. */ + IO_ALWAYS, + /* Closing (both plans will be the same). */ + IO_CLOSING }; -#ifdef DEBUG /** - * io_debug_conn - routine to select connection(s) to debug. - * - * If this is set, the routine should return true if the connection is a - * debugging candidate. If so, the callchain for I/O operations on this - * connection will be linear, for easier use of a debugger. - * - * You will also see calls to any callbacks which wake the connection - * which is being debugged. - * - * Example: - * static bool debug_all(struct io_conn *conn) - * { - * return true(); - * } - * ... - * io_debug_conn = debug_all; + * struct io_plan - one half of I/O to do + * @status: the status of this plan. + * @io: function to call when fd becomes read/writable, returns 0 to be + * called again, 1 if it's finished, and -1 on error (fd will be closed) + * @next: the next function which is called if io returns 1. + * @next_arg: the argument to @next + * @u1, @u2: scratch space for @io. */ -extern bool (*io_debug_conn)(struct io_conn *conn); +struct io_plan { + enum io_plan_status status; -/** - * io_debug - if we're debugging the current connection, call immediately. - * - * This determines if we are debugging the current connection: if so, - * it immediately applies the plan and calls back into io_loop() to - * create a linear call chain. - * - * Example: - * #define io_idle() io_debug(io_idle_()) - * struct io_plan io_idle_(void); - */ -struct io_plan io_debug(struct io_plan plan); + int (*io)(int fd, struct io_plan *plan); -/** - * io_debug_io - return from function which actually does I/O. - * - * This determines if we are debugging the current connection: if so, - * it immediately sets the next function and calls into io_loop() to - * create a linear call chain. - * - * Example: - * - * static int do_write(int fd, struct io_plan *plan) - * { - * ssize_t ret = write(fd, plan->u.write.buf, plan->u.write.len); - * if (ret < 0) - * return io_debug_io(-1); - * - * plan->u.write.buf += ret; - * plan->u.write.len -= ret; - * return io_debug_io(plan->u.write.len == 0); - * } - */ -int io_debug_io(int ret); + struct io_plan *(*next)(struct io_conn *, void *arg); + void *next_arg; -/** - * io_plan_no_debug - mark the next plan not to be called immediately. - * - * Most routines which take a plan are about to apply it to the current - * connection. We (ab)use this pattern for debugging: as soon as such a - * plan is created it is called, to create a linear call chain. - * - * Some routines, like io_break(), io_duplex() and io_wake() take an - * io_plan, but they must not be applied immediately to the current - * connection, so we call this first. - * - * Example: - * #define io_break(ret, plan) (io_plan_no_debug(), io_break_((ret), (plan))) - * struct io_plan io_break_(void *ret, struct io_plan plan); - */ -#define io_plan_no_debug() ((io_plan_nodebug = true)) + union io_plan_arg u1, u2; +}; -extern bool io_plan_nodebug; -#else -static inline struct io_plan io_debug(struct io_plan plan) -{ - return plan; -} -static inline int io_debug_io(int ret) -{ - return ret; -} -#define io_plan_no_debug() (void)0 -#endif +/* Helper to get a conn's io_plan. */ +struct io_plan *io_get_plan(struct io_conn *conn, enum io_direction dir); #endif /* CCAN_IO_PLAN_H */ diff --git a/ccan/io/poll.c b/ccan/io/poll.c index 2fd39f6b..c1a62452 100644 --- a/ccan/io/poll.c +++ b/ccan/io/poll.c @@ -9,70 +9,31 @@ #include #include -static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0; -static bool some_always = false; +static size_t num_fds = 0, max_fds = 0, num_waiting = 0; static struct pollfd *pollfds = NULL; static struct fd **fds = NULL; -static struct timers timeouts; -#ifdef DEBUG -static unsigned int io_loop_level; -static struct io_conn *free_later; -static void io_loop_enter(void) -{ - io_loop_level++; -} -static void io_loop_exit(void) -{ - io_loop_level--; - if (io_loop_level == 0) { - /* Delayed free. */ - while (free_later) { - struct io_conn *c = free_later; - free_later = c->finish_arg; - io_alloc.free(c); - } - } -} -static void free_conn(struct io_conn *conn) -{ - /* Only free on final exit: chain via finish. */ - if (io_loop_level > 1) { - struct io_conn *c; - for (c = free_later; c; c = c->finish_arg) - assert(c != conn); - conn->finish_arg = free_later; - free_later = conn; - } else - io_alloc.free(conn); -} -#else -static void io_loop_enter(void) -{ -} -static void io_loop_exit(void) -{ -} -static void free_conn(struct io_conn *conn) -{ - io_alloc.free(conn); -} -#endif +static struct io_conn *closing = NULL, *always = NULL; static bool add_fd(struct fd *fd, short events) { + if (!max_fds) { + assert(num_fds == 0); + pollfds = tal_arr(NULL, struct pollfd, 8); + if (!pollfds) + return false; + fds = tal_arr(pollfds, struct fd *, 8); + if (!fds) + return false; + max_fds = 8; + } + if (num_fds + 1 > max_fds) { - struct pollfd *newpollfds; - struct fd **newfds; - size_t num = max_fds ? max_fds * 2 : 8; + size_t num = max_fds * 2; - newpollfds = io_alloc.realloc(pollfds, sizeof(*newpollfds)*num); - if (!newpollfds) + if (!tal_resize(&pollfds, num)) return false; - pollfds = newpollfds; - newfds = io_alloc.realloc(fds, sizeof(*newfds) * num); - if (!newfds) + if (!tal_resize(&fds, num)) return false; - fds = newfds; max_fds = num; } @@ -106,19 +67,9 @@ static void del_fd(struct fd *fd) fds[n] = fds[num_fds-1]; assert(fds[n]->backend_info == num_fds-1); fds[n]->backend_info = n; - /* If that happens to be a duplex, move that too. */ - if (!fds[n]->listener) { - struct io_conn *c = (void *)fds[n]; - if (c->duplex) { - assert(c->duplex->fd.backend_info == num_fds-1); - c->duplex->fd.backend_info = n; - } - } } else if (num_fds == 1) { /* Free everything when no more fds. */ - io_alloc.free(pollfds); - io_alloc.free(fds); - pollfds = NULL; + pollfds = tal_free(pollfds); fds = NULL; max_fds = 0; } @@ -134,106 +85,88 @@ bool add_listener(struct io_listener *l) return true; } -void backend_plan_changed(struct io_conn *conn) +void backend_new_closing(struct io_conn *conn) { - struct pollfd *pfd; + /* Already on always list? Remove it. */ + if (conn->list) { + struct io_conn **p = &always; - /* This can happen with debugging and delayed free... */ - if (conn->fd.backend_info == -1) - return; + while (*p != conn) + p = &(*p)->list; - pfd = &pollfds[conn->fd.backend_info]; + *p = conn->list; + } + + conn->list = closing; + closing = conn; +} + +void backend_new_always(struct io_conn *conn) +{ + /* May already be in always list (other plan), or closing. */ + if (!conn->list) { + conn->list = always; + always = conn; + } +} + +void backend_new_plan(struct io_conn *conn) +{ + struct pollfd *pfd = &pollfds[conn->fd.backend_info]; if (pfd->events) num_waiting--; - pfd->events = conn->plan.pollflag & (POLLIN|POLLOUT); - if (conn->duplex) { - int mask = conn->duplex->plan.pollflag & (POLLIN|POLLOUT); - /* You can't *both* read/write. */ - assert(!mask || pfd->events != mask); - pfd->events |= mask; - } + pfd->events = 0; + if (conn->plan[IO_IN].status == IO_POLLING) + pfd->events |= POLLIN; + if (conn->plan[IO_OUT].status == IO_POLLING) + pfd->events |= POLLOUT; + if (pfd->events) { num_waiting++; pfd->fd = conn->fd.fd; - } else + } else { pfd->fd = -conn->fd.fd; - - if (!conn->plan.next) - num_closing++; - - if (conn->plan.pollflag == POLLALWAYS) - some_always = true; + } } -void backend_wait_changed(const void *wait) +void backend_wake(const void *wait) { unsigned int i; for (i = 0; i < num_fds; i++) { - struct io_conn *c, *duplex; + struct io_conn *c; /* Ignore listeners */ if (fds[i]->listener) continue; + c = (void *)fds[i]; - for (duplex = c->duplex; c; c = duplex, duplex = NULL) { - /* Ignore closing. */ - if (!c->plan.next) - continue; - /* Not idle? */ - if (c->plan.io) - continue; - /* Waiting on something else? */ - if (c->plan.u1.const_vp != wait) - continue; - /* Make it do the next thing. */ - c->plan = io_always_(c->plan.next, c->plan.next_arg); - backend_plan_changed(c); - } + if (c->plan[IO_IN].status == IO_WAITING + && c->plan[IO_IN].u1.const_vp == wait) + io_do_wakeup(c, &c->plan[IO_IN]); + + if (c->plan[IO_OUT].status == IO_WAITING + && c->plan[IO_OUT].u1.const_vp == wait) + io_do_wakeup(c, &c->plan[IO_OUT]); } } bool add_conn(struct io_conn *c) { - if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT))) - return false; - /* Immediate close is allowed. */ - if (!c->plan.next) - num_closing++; - if (c->plan.pollflag == POLLALWAYS) - some_always = true; - return true; + return add_fd(&c->fd, 0); } -bool add_duplex(struct io_conn *c) +static void del_conn(struct io_conn *conn) { - c->fd.backend_info = c->duplex->fd.backend_info; - backend_plan_changed(c); - return true; -} - -void backend_del_conn(struct io_conn *conn) -{ - if (timeout_active(conn)) - backend_del_timeout(conn); - io_alloc.free(conn->timeout); - if (conn->duplex) { - /* In case fds[] pointed to the other one. */ - assert(conn->duplex->fd.backend_info == conn->fd.backend_info); - fds[conn->fd.backend_info] = &conn->duplex->fd; - conn->duplex->duplex = NULL; - conn->fd.backend_info = -1; - } else - del_fd(&conn->fd); - num_closing--; + del_fd(&conn->fd); if (conn->finish) { /* Saved by io_close */ - errno = conn->plan.u1.s; + errno = conn->plan[IO_IN].u1.s; conn->finish(conn, conn->finish_arg); } - free_conn(conn); + tal_free(conn); } void del_listener(struct io_listener *l) @@ -241,12 +174,6 @@ void del_listener(struct io_listener *l) del_fd(&l->fd); } -static void set_plan(struct io_conn *conn, struct io_plan plan) -{ - conn->plan = plan; - backend_plan_changed(conn); -} - static void accept_conn(struct io_listener *l) { int fd = accept(l->fd.fd, NULL, NULL); @@ -254,135 +181,73 @@ static void accept_conn(struct io_listener *l) /* FIXME: What to do here? */ if (fd < 0) return; - l->init(fd, l->arg); + + io_new_conn(l->ctx, fd, l->init, l->arg); } /* It's OK to miss some, as long as we make progress. */ -static bool finish_conns(struct io_conn **ready) +static bool close_conns(void) { - unsigned int i; + bool ret = false; - for (i = 0; !io_loop_return && i < num_fds; i++) { - struct io_conn *c, *duplex; + while (closing) { + struct io_conn *conn = closing; - if (!num_closing) - break; + assert(conn->plan[IO_IN].status == IO_CLOSING); + assert(conn->plan[IO_OUT].status == IO_CLOSING); - if (fds[i]->listener) - continue; - c = (void *)fds[i]; - for (duplex = c->duplex; c; c = duplex, duplex = NULL) { - if (!c->plan.next) { - if (doing_debug_on(c) && ready) { - *ready = c; - return true; - } - backend_del_conn(c); - i--; - } - } + closing = closing->list; + del_conn(conn); + ret = true; } - return false; -} - -void backend_add_timeout(struct io_conn *conn, struct timerel duration) -{ - if (!timeouts.base) - timers_init(&timeouts, time_now()); - timer_add(&timeouts, &conn->timeout->timer, - timeabs_add(time_now(), duration)); - conn->timeout->conn = conn; -} - -void backend_del_timeout(struct io_conn *conn) -{ - assert(conn->timeout->conn == conn); - timer_del(&timeouts, &conn->timeout->timer); - conn->timeout->conn = NULL; + return ret; } -static void handle_always(void) +static bool handle_always(void) { - int i; + bool ret = false; - some_always = false; + while (always) { + struct io_conn *conn = always; - for (i = 0; i < num_fds && !io_loop_return; i++) { - struct io_conn *c = (void *)fds[i]; + assert(conn->plan[IO_IN].status == IO_ALWAYS + || conn->plan[IO_OUT].status == IO_ALWAYS); - if (fds[i]->listener) - continue; - - if (c->plan.pollflag == POLLALWAYS) - io_ready(c); - - if (c->duplex && c->duplex->plan.pollflag == POLLALWAYS) - io_ready(c->duplex); + /* Remove from list, and mark it so it knows that. */ + always = always->list; + conn->list = NULL; + io_do_always(conn); + ret = true; } + return ret; } /* This is the main loop. */ -void *do_io_loop(struct io_conn **ready) +void *io_loop(void) { void *ret; - io_loop_enter(); - while (!io_loop_return) { - int i, r, timeout = INT_MAX; - struct timeabs now; - bool some_timeouts = false; - - if (timeouts.base) { - struct timeabs first; - struct list_head expired; - struct io_timeout *t; - - now = time_now(); - - /* Call functions for expired timers. */ - timers_expire(&timeouts, now, &expired); - while ((t = list_pop(&expired, struct io_timeout, timer.list))) { - struct io_conn *conn = t->conn; - /* Clear, in case timer re-adds */ - t->conn = NULL; - set_current(conn); - set_plan(conn, t->next(conn, t->next_arg)); - some_timeouts = true; - } + int i, r; - /* Now figure out how long to wait for the next one. */ - if (timer_earliest(&timeouts, &first)) { - uint64_t f = time_to_msec(time_between(first, now)); - if (f < INT_MAX) - timeout = f; - } - } - - if (num_closing) { - /* If this finishes a debugging con, return now. */ - if (finish_conns(ready)) - return NULL; + if (close_conns()) { /* Could have started/finished more. */ continue; } - /* debug can recurse on io_loop; anything can change. */ - if (doing_debug() && some_timeouts) - continue; - - if (some_always) { - handle_always(); + if (handle_always()) { + /* Could have started/finished more. */ continue; } + /* Everything closed? */ if (num_fds == 0) break; /* You can't tell them all to go to sleep! */ assert(num_waiting); - r = poll(pollfds, num_fds, timeout); + r = poll(pollfds, num_fds, -1); if (r < 0) break; @@ -400,62 +265,19 @@ void *do_io_loop(struct io_conn **ready) } } else if (events & (POLLIN|POLLOUT)) { r--; - if (c->duplex) { - int mask = c->duplex->plan.pollflag; - if (events & mask) { - if (doing_debug_on(c->duplex) - && ready) { - *ready = c->duplex; - return NULL; - } - io_ready(c->duplex); - events &= ~mask; - /* debug can recurse; - * anything can change. */ - if (doing_debug()) - break; - - /* If no events, or it closed - * the duplex, continue. */ - if (!(events&(POLLIN|POLLOUT)) - || !c->plan.next) - continue; - } - } - if (doing_debug_on(c) && ready) { - *ready = c; - return NULL; - } - io_ready(c); - /* debug can recurse; anything can change. */ - if (doing_debug()) - break; + io_ready(c, events); } else if (events & (POLLHUP|POLLNVAL|POLLERR)) { r--; - set_current(c); errno = EBADF; - set_plan(c, io_close()); - if (c->duplex) { - set_current(c->duplex); - set_plan(c->duplex, io_close()); - } + io_close(c); } } } - while (num_closing && !io_loop_return) { - if (finish_conns(ready)) - return NULL; - } + close_conns(); ret = io_loop_return; io_loop_return = NULL; - io_loop_exit(); return ret; } - -void *io_loop(void) -{ - return do_io_loop(NULL); -} diff --git a/ccan/io/test/run-01-start-finish-DEBUG.c b/ccan/io/test/run-01-start-finish-DEBUG.c deleted file mode 100644 index 9e33f2bc..00000000 --- a/ccan/io/test/run-01-start-finish-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64001" -#define main real_main -int real_main(void); -#include "run-01-start-finish.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-01-start-finish.c b/ccan/io/test/run-01-start-finish.c index 3ee65947..b1ce78d3 100644 --- a/ccan/io/test/run-01-start-finish.c +++ b/ccan/io/test/run-01-start-finish.c @@ -16,15 +16,17 @@ static void finish_ok(struct io_conn *conn, int *state) ok1(*state == 1); ok1(io_conn_fd(conn) == expected_fd); (*state)++; - io_break(state + 1, io_never()); + io_break(state + 1); } -static void init_conn(int fd, int *state) +static struct io_plan *init_conn(struct io_conn *conn, int *state) { ok1(*state == 0); (*state)++; - expected_fd = fd; - io_set_finish(io_new_conn(fd, io_close()), finish_ok, state); + expected_fd = io_conn_fd(conn); + io_set_finish(conn, finish_ok, state); + + return io_close(conn); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -70,7 +72,7 @@ int main(void) plan_tests(10); fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, &state); + l = io_new_listener(NULL, fd, init_conn, &state); ok1(l); fflush(stdout); if (!fork()) { diff --git a/ccan/io/test/run-02-read-DEBUG.c b/ccan/io/test/run-02-read-DEBUG.c deleted file mode 100644 index 5ca27818..00000000 --- a/ccan/io/test/run-02-read-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64002" -#define main real_main -int real_main(void); -#include "run-02-read.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-02-read.c b/ccan/io/test/run-02-read.c index 439beb1f..4a043359 100644 --- a/ccan/io/test/run-02-read.c +++ b/ccan/io/test/run-02-read.c @@ -19,17 +19,16 @@ static void finish_ok(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - io_break(d, io_never()); + io_break(d); } -static void init_conn(int fd, struct data *d) +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) { ok1(d->state == 0); d->state++; - io_set_finish(io_new_conn(fd, - io_read(d->buf, sizeof(d->buf), io_close_cb, d)), - finish_ok, d); + io_set_finish(conn, finish_ok, d); + return io_read(conn, d->buf, sizeof(d->buf), io_close_cb, d); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -76,7 +75,7 @@ int main(void) d->state = 0; fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, d); + l = io_new_listener(NULL, fd, init_conn, d); ok1(l); fflush(stdout); if (!fork()) { diff --git a/ccan/io/test/run-03-readpartial-DEBUG.c b/ccan/io/test/run-03-readpartial-DEBUG.c deleted file mode 100644 index c473b65c..00000000 --- a/ccan/io/test/run-03-readpartial-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64003" -#define main real_main -int real_main(void); -#include "run-03-readpartial.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-03-readpartial.c b/ccan/io/test/run-03-readpartial.c index 860792b7..7b360638 100644 --- a/ccan/io/test/run-03-readpartial.c +++ b/ccan/io/test/run-03-readpartial.c @@ -20,18 +20,18 @@ static void finish_ok(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - io_break(d, io_never()); + io_break(d); } -static void init_conn(int fd, struct data *d) +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) { ok1(d->state == 0); d->state++; - d->bytes = sizeof(d->buf); - io_set_finish(io_new_conn(fd, - io_read_partial(d->buf, &d->bytes, io_close_cb, d)), - finish_ok, d); + io_set_finish(conn, finish_ok, d); + + return io_read_partial(conn, d->buf, sizeof(d->buf), &d->bytes, + io_close_cb, d); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -96,7 +96,7 @@ int main(void) d->state = 0; fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, d); + l = io_new_listener(NULL, fd, init_conn, d); ok1(l); fflush(stdout); if (!fork()) { diff --git a/ccan/io/test/run-04-writepartial-DEBUG.c b/ccan/io/test/run-04-writepartial-DEBUG.c deleted file mode 100644 index fa65bcf5..00000000 --- a/ccan/io/test/run-04-writepartial-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64004" -#define main real_main -int real_main(void); -#include "run-04-writepartial.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-04-writepartial.c b/ccan/io/test/run-04-writepartial.c index 5c1f5e01..4ca21a32 100644 --- a/ccan/io/test/run-04-writepartial.c +++ b/ccan/io/test/run-04-writepartial.c @@ -20,16 +20,17 @@ static void finish_ok(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - io_break(d, io_never()); + io_break(d); } -static void init_conn(int fd, struct data *d) +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) { ok1(d->state == 0); d->state++; - io_set_finish(io_new_conn(fd, - io_write_partial(d->buf, &d->bytes, io_close_cb, d)), - finish_ok, d); + io_set_finish(conn, finish_ok, d); + + return io_write_partial(conn, d->buf, d->bytes, &d->bytes, + io_close_cb, d); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -97,7 +98,7 @@ int main(void) memset(d->buf, 'a', d->bytes); fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, d); + l = io_new_listener(NULL, fd, init_conn, d); ok1(l); fflush(stdout); if (!fork()) { diff --git a/ccan/io/test/run-05-write-DEBUG.c b/ccan/io/test/run-05-write-DEBUG.c deleted file mode 100644 index 831e6719..00000000 --- a/ccan/io/test/run-05-write-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64005" -#define main real_main -int real_main(void); -#include "run-05-write.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-05-write.c b/ccan/io/test/run-05-write.c index 2c3e1839..2e744cff 100644 --- a/ccan/io/test/run-05-write.c +++ b/ccan/io/test/run-05-write.c @@ -20,16 +20,15 @@ static void finish_ok(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - io_break(d, io_never()); + io_break(d); } -static void init_conn(int fd, struct data *d) +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) { ok1(d->state == 0); d->state++; - io_set_finish(io_new_conn(fd, io_write(d->buf, d->bytes, - io_close_cb, d)), - finish_ok, d); + io_set_finish(conn, finish_ok, d); + return io_write(conn, d->buf, d->bytes, io_close_cb, d); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -100,7 +99,7 @@ int main(void) memset(d->buf, 'a', d->bytes); fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, d); + l = io_new_listener(NULL, fd, init_conn, d); ok1(l); fflush(stdout); if (!fork()) { diff --git a/ccan/io/test/run-06-idle-DEBUG.c b/ccan/io/test/run-06-idle-DEBUG.c deleted file mode 100644 index 298ce234..00000000 --- a/ccan/io/test/run-06-idle-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64006" -#define main real_main -int real_main(void); -#include "run-06-idle.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-06-idle.c b/ccan/io/test/run-06-idle.c index 82f6e0af..e71f4d8b 100644 --- a/ccan/io/test/run-06-idle.c +++ b/ccan/io/test/run-06-idle.c @@ -20,11 +20,11 @@ struct data { char buf[4]; }; -static struct io_plan read_done(struct io_conn *conn, struct data *d) +static struct io_plan *read_done(struct io_conn *conn, struct data *d) { ok1(d->state == 2 || d->state == 3); d->state++; - return io_close(); + return io_close(conn); } static void finish_waker(struct io_conn *conn, struct data *d) @@ -38,33 +38,40 @@ static void finish_idle(struct io_conn *conn, struct data *d) { ok1(d->state == 3); d->state++; - io_break(d, io_never()); + io_break(d); } -static struct io_plan never(struct io_conn *conn, void *arg) +static struct io_plan *never(struct io_conn *conn, void *arg) { abort(); } -static struct io_plan read_buf(struct io_conn *conn, struct data *d) +static struct io_plan *read_buf(struct io_conn *conn, struct data *d) { - return io_read(d->buf, sizeof(d->buf), read_done, d); + return io_read(conn, d->buf, sizeof(d->buf), read_done, d); } -static void init_conn(int fd, struct data *d) +static struct io_plan *init_waker(struct io_conn *conn, void *unused) +{ + /* This is /dev/null, so will never succeed. */ + return io_read(conn, unused, 1, never, NULL); +} + +static struct io_plan *init_idle(struct io_conn *conn, struct data *d) { int fd2; ok1(d->state == 0); d->state++; - idler = io_new_conn(fd, io_wait(d, read_buf, d)); - io_set_finish(idler, finish_idle, d); + idler = conn; + io_set_finish(conn, finish_idle, d); /* This will wake us up, as read will fail. */ fd2 = open("/dev/null", O_RDONLY); ok1(fd2 >= 0); - io_set_finish(io_new_conn(fd2, io_read(idler, 1, never, NULL)), - finish_waker, d); + io_set_finish(io_new_conn(NULL, fd2, init_waker, d), finish_waker, d); + + return io_wait(conn, d, IO_IN, read_buf, d); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -111,7 +118,7 @@ int main(void) d->state = 0; fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, d); + l = io_new_listener(NULL, fd, init_idle, d); ok1(l); fflush(stdout); if (!fork()) { diff --git a/ccan/io/test/run-07-break-DEBUG.c b/ccan/io/test/run-07-break-DEBUG.c deleted file mode 100644 index 602d7c2f..00000000 --- a/ccan/io/test/run-07-break-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64007" -#define main real_main -int real_main(void); -#include "run-07-break.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-07-break.c b/ccan/io/test/run-07-break.c index 19cc6a8e..1f69e9f5 100644 --- a/ccan/io/test/run-07-break.c +++ b/ccan/io/test/run-07-break.c @@ -15,11 +15,11 @@ struct data { char buf[4]; }; -static struct io_plan read_done(struct io_conn *conn, struct data *d) +static struct io_plan *read_done(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - return io_close(); + return io_close(conn); } static void finish_ok(struct io_conn *conn, struct data *d) @@ -28,15 +28,15 @@ static void finish_ok(struct io_conn *conn, struct data *d) d->state++; } -static void init_conn(int fd, struct data *d) +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) { ok1(d->state == 0); d->state++; - io_set_finish(io_new_conn(fd, - io_break(d, - io_read(d->buf, sizeof(d->buf), read_done, d))), - finish_ok, d); + io_set_finish(conn, finish_ok, d); + + io_break(d); + return io_read(conn, d->buf, sizeof(d->buf), read_done, d); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -83,7 +83,7 @@ int main(void) d->state = 0; fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, d); + l = io_new_listener(NULL, fd, init_conn, d); ok1(l); fflush(stdout); if (!fork()) { diff --git a/ccan/io/test/run-08-hangup-on-idle-DEBUG.c b/ccan/io/test/run-08-hangup-on-idle-DEBUG.c deleted file mode 100644 index f916b8e1..00000000 --- a/ccan/io/test/run-08-hangup-on-idle-DEBUG.c +++ /dev/null @@ -1,7 +0,0 @@ -#define DEBUG -#define main real_main -int real_main(void); -#include "run-08-hangup-on-idle.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-08-hangup-on-idle.c b/ccan/io/test/run-08-hangup-on-idle.c index c8257703..c1ff1e6e 100644 --- a/ccan/io/test/run-08-hangup-on-idle.c +++ b/ccan/io/test/run-08-hangup-on-idle.c @@ -8,39 +8,49 @@ static int fds2[2]; -static struct io_plan timeout_wakeup(struct io_conn *conn, char *buf) +static struct io_plan *read_in(struct io_conn *conn, char *buf) { - /* This kills the dummy connection. */ - close(fds2[1]); - return io_read(buf, 16, io_close_cb, NULL); + return io_read(conn, buf, 16, io_close_cb, NULL); } -static struct io_plan never(struct io_conn *conn, void *unused) +static struct io_plan *setup_waiter(struct io_conn *conn, char *buf) { - abort(); + return io_wait(conn, buf, IO_IN, read_in, buf); +} + +static struct io_plan *wake_and_close(struct io_conn *conn, char *buf) +{ + io_wake(buf); + return io_close(conn); +} + +static struct io_plan *setup_waker(struct io_conn *conn, char *buf) +{ + return io_read(conn, buf, 1, wake_and_close, buf); } int main(void) { int fds[2]; - struct io_conn *conn; char buf[16]; plan_tests(4); ok1(pipe(fds) == 0); - /* Write then close. */ - io_new_conn(fds[1], io_write("hello there world", 16, - io_close_cb, NULL)); - conn = io_new_conn(fds[0], io_wait(buf, never, NULL)); - - /* To avoid assert(num_waiting) */ + io_new_conn(NULL, fds[0], setup_waiter, buf); ok1(pipe(fds2) == 0); - io_new_conn(fds2[0], io_read(buf, 16, io_close_cb, NULL)); + io_new_conn(NULL, fds2[0], setup_waker, buf); + + if (fork() == 0) { + write(fds[1], "hello there world", 16); + close(fds[1]); - /* After half a second, it will read. */ - io_timeout(conn, time_from_msec(500), timeout_wakeup, buf); + /* Now wake it. */ + sleep(1); + write(fds2[1], "", 1); + exit(0); + } ok1(io_loop() == NULL); ok1(memcmp(buf, "hello there world", 16) == 0); diff --git a/ccan/io/test/run-08-read-after-hangup-DEBUG.c b/ccan/io/test/run-08-read-after-hangup-DEBUG.c deleted file mode 100644 index 8c602891..00000000 --- a/ccan/io/test/run-08-read-after-hangup-DEBUG.c +++ /dev/null @@ -1,7 +0,0 @@ -#define DEBUG -#define main real_main -int real_main(void); -#include "run-08-read-after-hangup.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-08-read-after-hangup.c b/ccan/io/test/run-08-read-after-hangup.c index f6b3db1c..f9ed267d 100644 --- a/ccan/io/test/run-08-read-after-hangup.c +++ b/ccan/io/test/run-08-read-after-hangup.c @@ -9,15 +9,25 @@ static char inbuf[8]; -static struct io_plan wake_it(struct io_conn *conn, struct io_conn *reader) +static struct io_plan *wake_it(struct io_conn *conn, struct io_conn *reader) { io_wake(inbuf); - return io_close(); + return io_close(conn); } -static struct io_plan read_buf(struct io_conn *conn, void *unused) +static struct io_plan *read_buf(struct io_conn *conn, void *unused) { - return io_read(inbuf, 8, io_close_cb, NULL); + return io_read(conn, inbuf, 8, io_close_cb, NULL); +} + +static struct io_plan *init_writer(struct io_conn *conn, struct io_conn *wakeme) +{ + return io_write(conn, "EASYTEST", 8, wake_it, wakeme); +} + +static struct io_plan *init_waiter(struct io_conn *conn, void *unused) +{ + return io_wait(conn, inbuf, IO_IN, read_buf, NULL); } int main(void) @@ -28,8 +38,8 @@ int main(void) plan_tests(3); ok1(pipe(fds) == 0); - conn = io_new_conn(fds[0], io_wait(inbuf, read_buf, NULL)); - io_new_conn(fds[1], io_write("EASYTEST", 8, wake_it, conn)); + conn = io_new_conn(NULL, fds[0], init_waiter, NULL); + io_new_conn(conn, fds[1], init_writer, conn); ok1(io_loop() == NULL); ok1(memcmp(inbuf, "EASYTEST", sizeof(inbuf)) == 0); diff --git a/ccan/io/test/run-09-connect-DEBUG.c b/ccan/io/test/run-09-connect-DEBUG.c deleted file mode 100644 index 5520dd78..00000000 --- a/ccan/io/test/run-09-connect-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64009" -#define main real_main -int real_main(void); -#include "run-09-connect.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-09-connect.c b/ccan/io/test/run-09-connect.c index fd7e1606..8f57f00d 100644 --- a/ccan/io/test/run-09-connect.c +++ b/ccan/io/test/run-09-connect.c @@ -11,31 +11,33 @@ #endif static struct io_listener *l; +static struct data *d2; struct data { int state; char buf[10]; }; -static struct io_plan closer(struct io_conn *conn, struct data *d) +static struct io_plan *closer(struct io_conn *conn, struct data *d) { d->state++; - return io_close(); + return io_close(conn); } -static struct io_plan connected(struct io_conn *conn, struct data *d2) +static struct io_plan *connected(struct io_conn *conn, struct data *d2) { ok1(d2->state == 0); d2->state++; - return io_read(d2->buf, sizeof(d2->buf), closer, d2); + return io_read(conn, d2->buf, sizeof(d2->buf), closer, d2); } -static void init_conn(int fd, struct data *d) +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) { ok1(d->state == 0); d->state++; - io_new_conn(fd, io_write(d->buf, sizeof(d->buf), closer, d)); io_close_listener(l); + + return io_write(conn, d->buf, sizeof(d->buf), closer, d); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -70,9 +72,17 @@ static int make_listen_fd(const char *port, struct addrinfo **info) return fd; } +static struct io_plan *setup_connect(struct io_conn *conn, + struct addrinfo *addrinfo) +{ + d2 = malloc(sizeof(*d2)); + d2->state = 0; + return io_connect(conn, addrinfo, connected, d2); +} + int main(void) { - struct data *d = malloc(sizeof(*d)), *d2 = malloc(sizeof(*d2)); + struct data *d = malloc(sizeof(*d)); struct addrinfo *addrinfo; int fd; @@ -82,13 +92,12 @@ int main(void) memset(d->buf, 'a', sizeof(d->buf)); fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, d); + l = io_new_listener(NULL, fd, init_conn, d); ok1(l); fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, addrinfo->ai_protocol); - d2->state = 0; - ok1(io_new_conn(fd, io_connect(fd, addrinfo, connected, d2))); + ok1(io_new_conn(NULL, fd, setup_connect, addrinfo)); ok1(io_loop() == NULL); ok1(d->state == 2); diff --git a/ccan/io/test/run-10-many-DEBUG.c b/ccan/io/test/run-10-many-DEBUG.c deleted file mode 100644 index 675c7952..00000000 --- a/ccan/io/test/run-10-many-DEBUG.c +++ /dev/null @@ -1,12 +0,0 @@ -#define DEBUG -#define PORT "64010" -#define main real_main -int real_main(void); -#include "run-10-many.c" -#undef main -/* We stack overflow if we debug all of them! */ -static bool debug_one(struct io_conn *conn) -{ - return conn == buf[1].reader; -} -int main(void) { io_debug_conn = debug_one; return real_main(); } diff --git a/ccan/io/test/run-10-many.c b/ccan/io/test/run-10-many.c index 53e971d0..b70310dd 100644 --- a/ccan/io/test/run-10-many.c +++ b/ccan/io/test/run-10-many.c @@ -15,44 +15,49 @@ struct buffer { char buf[32]; }; -static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf); -static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf); +static struct io_plan *poke_reader(struct io_conn *conn, struct buffer *buf); +static struct io_plan *poke_writer(struct io_conn *conn, struct buffer *buf); -static struct io_plan read_buf(struct io_conn *conn, struct buffer *buf) +static struct io_plan *read_buf(struct io_conn *conn, struct buffer *buf) { - return io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf); + return io_read(conn, &buf->buf, sizeof(buf->buf), poke_writer, buf); } -static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf) +static struct io_plan *poke_writer(struct io_conn *conn, struct buffer *buf) { assert(conn == buf->reader); if (buf->iters == NUM_ITERS) - return io_close(); + return io_close(conn); /* You write. */ io_wake(&buf->writer); /* I'll wait until you wake me. */ - return io_wait(&buf->reader, read_buf, buf); + return io_wait(conn, &buf->reader, IO_IN, read_buf, buf); } -static struct io_plan write_buf(struct io_conn *conn, struct buffer *buf) +static struct io_plan *write_buf(struct io_conn *conn, struct buffer *buf) { - return io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf); + return io_write(conn, &buf->buf, sizeof(buf->buf), poke_reader, buf); } -static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf) +static struct io_plan *poke_reader(struct io_conn *conn, struct buffer *buf) { assert(conn == buf->writer); /* You read. */ io_wake(&buf->reader); if (++buf->iters == NUM_ITERS) - return io_close(); + return io_close(conn); /* I'll wait until you tell me to write. */ - return io_wait(&buf->writer, write_buf, buf); + return io_wait(conn, &buf->writer, IO_OUT, write_buf, buf); +} + +static struct io_plan *setup_reader(struct io_conn *conn, struct buffer *buf) +{ + return io_wait(conn, &buf->reader, IO_IN, read_buf, buf); } static struct buffer buf[NUM]; @@ -75,12 +80,11 @@ int main(void) sprintf(buf[i].buf, "%i-%i", i, i); /* Wait for writer to tell us to read. */ - buf[i].reader = io_new_conn(last_read, - io_wait(&buf[i].reader, read_buf, - &buf[i])); + buf[i].reader = io_new_conn(NULL, last_read, + setup_reader, &buf[i]); if (!buf[i].reader) break; - buf[i].writer = io_new_conn(fds[1], write_buf(NULL, &buf[i])); + buf[i].writer = io_new_conn(NULL, fds[1], write_buf, &buf[i]); if (!buf[i].writer) break; last_read = fds[0]; @@ -91,10 +95,9 @@ int main(void) /* Last one completes the cirle. */ i = 0; sprintf(buf[i].buf, "%i-%i", i, i); - buf[i].reader = io_new_conn(last_read, - io_wait(&buf[i].reader, read_buf, &buf[i])); + buf[i].reader = io_new_conn(NULL, last_read, setup_reader, &buf[i]); ok1(buf[i].reader); - buf[i].writer = io_new_conn(last_write, write_buf(NULL, &buf[i])); + buf[i].writer = io_new_conn(NULL, last_write, write_buf, &buf[i]); ok1(buf[i].writer); /* They should eventually exit */ diff --git a/ccan/io/test/run-12-bidir-DEBUG.c b/ccan/io/test/run-12-bidir-DEBUG.c deleted file mode 100644 index 55c4cf72..00000000 --- a/ccan/io/test/run-12-bidir-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64012" -#define main real_main -int real_main(void); -#include "run-12-bidir.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-12-bidir.c b/ccan/io/test/run-12-bidir.c index 1ab0a218..533f465d 100644 --- a/ccan/io/test/run-12-bidir.c +++ b/ccan/io/test/run-12-bidir.c @@ -6,6 +6,7 @@ #include #include +#if 0 #ifndef PORT #define PORT "65012" #endif @@ -22,10 +23,10 @@ static void finish_ok(struct io_conn *conn, struct data *d) d->state++; } -static struct io_plan write_done(struct io_conn *conn, struct data *d) +static struct io_plan *write_done(struct io_conn *conn, struct data *d) { d->state++; - return io_close(); + return io_close(conn); } static void init_conn(int fd, struct data *d) @@ -130,3 +131,9 @@ int main(void) /* This exits depending on whether all tests passed */ return exit_status(); } +#else +int main(void) +{ + return 0; +} +#endif diff --git a/ccan/io/test/run-13-all-idle-DEBUG.c b/ccan/io/test/run-13-all-idle-DEBUG.c deleted file mode 100644 index 2969a13b..00000000 --- a/ccan/io/test/run-13-all-idle-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64013" -#define main real_main -int real_main(void); -#include "run-13-all-idle.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-13-all-idle.c b/ccan/io/test/run-13-all-idle.c index 31510007..f8ee97f8 100644 --- a/ccan/io/test/run-13-all-idle.c +++ b/ccan/io/test/run-13-all-idle.c @@ -7,6 +7,11 @@ #include #include +static struct io_plan *setup_waiter(struct io_conn *conn, int *status) +{ + return io_wait(conn, status, IO_IN, io_close_cb, NULL); +} + int main(void) { int status; @@ -17,7 +22,7 @@ int main(void) int fds[2]; ok1(pipe(fds) == 0); - io_new_conn(fds[0], io_wait(&status, io_close_cb, NULL)); + io_new_conn(NULL, fds[0], setup_waiter, &status); io_loop(); exit(1); } diff --git a/ccan/io/test/run-14-duplex-both-read-DEBUG.c b/ccan/io/test/run-14-duplex-both-read-DEBUG.c deleted file mode 100644 index 5c4aae7c..00000000 --- a/ccan/io/test/run-14-duplex-both-read-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64014" -#define main real_main -int real_main(void); -#include "run-14-duplex-both-read.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-14-duplex-both-read.c b/ccan/io/test/run-14-duplex-both-read.c index d1868e2e..b4a14c3d 100644 --- a/ccan/io/test/run-14-duplex-both-read.c +++ b/ccan/io/test/run-14-duplex-both-read.c @@ -8,6 +8,7 @@ #include #include +#if 0 #ifndef PORT #define PORT "65014" #endif @@ -138,3 +139,9 @@ int main(void) /* This exits depending on whether all tests passed */ return exit_status(); } +#else +int main(void) +{ + return 0; +} +#endif diff --git a/ccan/io/test/run-15-timeout-DEBUG.c b/ccan/io/test/run-15-timeout-DEBUG.c deleted file mode 100644 index d5114862..00000000 --- a/ccan/io/test/run-15-timeout-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64015" -#define main real_main -int real_main(void); -#include "run-15-timeout.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-15-timeout.c b/ccan/io/test/run-15-timeout.c index 2aceb2b5..9224b0a9 100644 --- a/ccan/io/test/run-15-timeout.c +++ b/ccan/io/test/run-15-timeout.c @@ -7,6 +7,7 @@ #include #include +#if 0 #ifndef PORT #define PORT "65015" #endif @@ -38,7 +39,7 @@ static void finish_ok(struct io_conn *conn, struct data *d) { ok1(d->state == 2); d->state++; - io_break(d, io_never()); + io_break(d); } static void init_conn(int fd, struct data *d) @@ -172,3 +173,9 @@ int main(void) /* This exits depending on whether all tests passed */ return exit_status(); } +#else +int main(void) +{ + return 0; +} +#endif diff --git a/ccan/io/test/run-16-duplex-test.c b/ccan/io/test/run-16-duplex-test.c index addca841..e31c5c5b 100644 --- a/ccan/io/test/run-16-duplex-test.c +++ b/ccan/io/test/run-16-duplex-test.c @@ -8,6 +8,7 @@ #include #include +#if 0 #ifndef PORT #define PORT "65016" #endif @@ -140,3 +141,9 @@ int main(void) /* This exits depending on whether all tests passed */ return exit_status(); } +#else +int main(void) +{ + return 0; +} +#endif diff --git a/ccan/io/test/run-17-homemade-io-DEBUG.c b/ccan/io/test/run-17-homemade-io-DEBUG.c deleted file mode 100644 index 5c44ce08..00000000 --- a/ccan/io/test/run-17-homemade-io-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64017" -#define main real_main -int real_main(void); -#include "run-17-homemade-io.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-17-homemade-io.c b/ccan/io/test/run-17-homemade-io.c index c5d82889..a868b213 100644 --- a/ccan/io/test/run-17-homemade-io.c +++ b/ccan/io/test/run-17-homemade-io.c @@ -20,7 +20,7 @@ static void finish_ok(struct io_conn *conn, struct packet *pkt) { ok1(pkt->state == 3); pkt->state++; - io_break(pkt, io_never()); + io_break(pkt); } static int do_read_packet(int fd, struct io_plan *plan) @@ -41,7 +41,7 @@ static int do_read_packet(int fd, struct io_plan *plan) ok1(pkt->state == 2); pkt->state++; if (pkt->len == 0) - return io_debug_io(1); + return 1; if (!pkt->contents && !(pkt->contents = malloc(pkt->len))) goto fail; else { @@ -58,39 +58,39 @@ static int do_read_packet(int fd, struct io_plan *plan) plan->u2.s += ret; /* Finished? */ - return io_debug_io(plan->u2.s >= sizeof(pkt->len) - && plan->u2.s == pkt->len + sizeof(pkt->len)); + return plan->u2.s >= sizeof(pkt->len) + && plan->u2.s == pkt->len + sizeof(pkt->len); fail: free(pkt->contents); - return io_debug_io(-1); + return -1; } -static struct io_plan io_read_packet(struct packet *pkt, - struct io_plan (*cb)(struct io_conn *, void *), - void *arg) +static struct io_plan *io_read_packet(struct io_conn *conn, + struct packet *pkt, + struct io_plan *(*cb)(struct io_conn *, void *), + void *arg) { - struct io_plan plan; + struct io_plan *plan = io_get_plan(conn, IO_IN); assert(cb); pkt->contents = NULL; - plan.u1.vp = pkt; - plan.u2.s = 0; - plan.io = do_read_packet; - plan.next = cb; - plan.next_arg = arg; - plan.pollflag = POLLIN; + plan->u1.vp = pkt; + plan->u2.s = 0; + plan->io = do_read_packet; + plan->next = cb; + plan->next_arg = arg; return plan; } -static void init_conn(int fd, struct packet *pkt) +static struct io_plan *init_conn(struct io_conn *conn, struct packet *pkt) { ok1(pkt->state == 0); pkt->state++; - io_set_finish(io_new_conn(fd, io_read_packet(pkt, io_close_cb, pkt)), - finish_ok, pkt); + io_set_finish(conn, finish_ok, pkt); + return io_read_packet(conn, pkt, io_close_cb, pkt); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -137,7 +137,7 @@ int main(void) pkt->state = 0; fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, pkt); + l = io_new_listener(NULL, fd, init_conn, pkt); ok1(l); fflush(stdout); if (!fork()) { diff --git a/ccan/io/test/run-18-errno-DEBUG.c b/ccan/io/test/run-18-errno-DEBUG.c deleted file mode 100644 index 863d1ae1..00000000 --- a/ccan/io/test/run-18-errno-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64018" -#define main real_main -int real_main(void); -#include "run-18-errno.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-18-errno.c b/ccan/io/test/run-18-errno.c index 222c0fb5..f67c0eb0 100644 --- a/ccan/io/test/run-18-errno.c +++ b/ccan/io/test/run-18-errno.c @@ -22,23 +22,24 @@ static void finish_EBADF(struct io_conn *conn, int *state) ok1(errno == EBADF); ok1(*state == 3); (*state)++; - io_break(state + 1, io_close()); + io_break(state + 1); } -static void init_conn(int fd, int *state) +static struct io_plan *init_conn(struct io_conn *conn, int *state) { if (*state == 0) { (*state)++; errno = 100; - io_set_finish(io_new_conn(fd, io_close()), finish_100, state); + io_set_finish(conn, finish_100, state); + return io_close(conn); } else { ok1(*state == 2); (*state)++; - close(fd); + close(io_conn_fd(conn)); errno = 0; - io_set_finish(io_new_conn(fd, io_read(state, 1, - io_close_cb, NULL)), - finish_EBADF, state); + io_set_finish(conn, finish_EBADF, state); + + return io_read(conn, state, 1, io_close_cb, NULL); } } @@ -85,7 +86,7 @@ int main(void) plan_tests(12); fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, &state); + l = io_new_listener(NULL, fd, init_conn, &state); ok1(l); fflush(stdout); if (!fork()) { diff --git a/ccan/io/test/run-19-always-DEBUG.c b/ccan/io/test/run-19-always-DEBUG.c deleted file mode 100644 index 4decacd8..00000000 --- a/ccan/io/test/run-19-always-DEBUG.c +++ /dev/null @@ -1,8 +0,0 @@ -#define DEBUG -#define PORT "64019" -#define main real_main -int real_main(void); -#include "run-19-always.c" -#undef main -static bool always_debug(struct io_conn *conn) { return true; } -int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-19-always.c b/ccan/io/test/run-19-always.c index 1805f79f..24771969 100644 --- a/ccan/io/test/run-19-always.c +++ b/ccan/io/test/run-19-always.c @@ -20,21 +20,22 @@ static void finish_ok(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - io_break(d, io_never()); + io_break(d); } -static struct io_plan write_buf(struct io_conn *conn, struct data *d) +static struct io_plan *write_buf(struct io_conn *conn, struct data *d) { - return io_write(d->buf, d->bytes, io_close_cb, d); + return io_write(conn, d->buf, d->bytes, io_close_cb, d); } -static void init_conn(int fd, struct data *d) +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) { ok1(d->state == 0); d->state++; + io_set_finish(conn, finish_ok, d); + /* Empty read should run immediately... */ - io_set_finish(io_new_conn(fd, io_read(NULL, 0, write_buf, d)), - finish_ok, d); + return io_read(conn, NULL, 0, write_buf, d); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -105,7 +106,7 @@ int main(void) memset(d->buf, 'a', d->bytes); fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, d); + l = io_new_listener(NULL, fd, init_conn, d); ok1(l); fflush(stdout); if (!fork()) { diff --git a/ccan/io/test/run-set_alloc.c b/ccan/io/test/run-set_alloc.c deleted file mode 100644 index 8d201689..00000000 --- a/ccan/io/test/run-set_alloc.c +++ /dev/null @@ -1,240 +0,0 @@ -#include -#include -#include -#include -#include -#include - -/* Make sure we override these! */ -static void *no_malloc(size_t size) -{ - abort(); -} -static void *no_realloc(void *p, size_t size) -{ - abort(); -} -static void no_free(void *p) -{ - abort(); -} -#define malloc no_malloc -#define realloc no_realloc -#define free no_free - -#include -#include - -#undef malloc -#undef realloc -#undef free - -static unsigned int alloc_count, realloc_count, free_count; -static void *ptrs[100]; - -static void **find_ptr(void *p) -{ - unsigned int i; - - for (i = 0; i < 100; i++) - if (ptrs[i] == p) - return ptrs + i; - return NULL; -} - -static void *allocfn(size_t size) -{ - alloc_count++; - return *find_ptr(NULL) = malloc(size); -} - -static void *reallocfn(void *ptr, size_t size) -{ - realloc_count++; - if (!ptr) - alloc_count++; - - return *find_ptr(ptr) = realloc(ptr, size); -} - -static void freefn(void *ptr) -{ - free_count++; - free(ptr); - *find_ptr(ptr) = NULL; -} - -#ifndef PORT -#define PORT "65115" -#endif - -struct data { - int state; - int timeout_usec; - bool timed_out; - char buf[4]; -}; - - -static struct io_plan no_timeout(struct io_conn *conn, struct data *d) -{ - ok1(d->state == 1); - d->state++; - return io_close(); -} - -static struct io_plan timeout(struct io_conn *conn, struct data *d) -{ - ok1(d->state == 1); - d->state++; - d->timed_out = true; - return io_close(); -} - -static void finish_ok(struct io_conn *conn, struct data *d) -{ - ok1(d->state == 2); - d->state++; - io_break(d, io_never()); -} - -static void init_conn(int fd, struct data *d) -{ - struct io_conn *conn; - - ok1(d->state == 0); - d->state++; - - conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d)); - io_set_finish(conn, finish_ok, d); - io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d); -} - -static int make_listen_fd(const char *port, struct addrinfo **info) -{ - int fd, on = 1; - struct addrinfo *addrinfo, hints; - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - hints.ai_protocol = 0; - - if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) - return -1; - - fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, - addrinfo->ai_protocol); - if (fd < 0) - return -1; - - setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); - if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { - close(fd); - return -1; - } - if (listen(fd, 1) != 0) { - close(fd); - return -1; - } - *info = addrinfo; - return fd; -} - -int main(void) -{ - struct data *d = allocfn(sizeof(*d)); - struct addrinfo *addrinfo; - struct io_listener *l; - int fd, status; - - io_set_alloc(allocfn, reallocfn, freefn); - - /* This is how many tests you plan to run */ - plan_tests(25); - d->state = 0; - d->timed_out = false; - d->timeout_usec = 100000; - fd = make_listen_fd(PORT, &addrinfo); - ok1(fd >= 0); - l = io_new_listener(fd, init_conn, d); - ok1(l); - fflush(stdout); - - if (!fork()) { - int i; - - io_close_listener(l); - fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, - addrinfo->ai_protocol); - if (fd < 0) - exit(1); - if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) - exit(2); - signal(SIGPIPE, SIG_IGN); - usleep(500000); - for (i = 0; i < strlen("hellothere"); i++) { - if (write(fd, "hellothere" + i, 1) != 1) - break; - } - close(fd); - freeaddrinfo(addrinfo); - free(d); - exit(i); - } - ok1(io_loop() == d); - ok1(d->state == 3); - ok1(d->timed_out == true); - ok1(wait(&status)); - ok1(WIFEXITED(status)); - ok1(WEXITSTATUS(status) < sizeof(d->buf)); - - /* This one shouldn't time out. */ - d->state = 0; - d->timed_out = false; - d->timeout_usec = 500000; - fflush(stdout); - - if (!fork()) { - int i; - - io_close_listener(l); - fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, - addrinfo->ai_protocol); - if (fd < 0) - exit(1); - if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) - exit(2); - signal(SIGPIPE, SIG_IGN); - usleep(100000); - for (i = 0; i < strlen("hellothere"); i++) { - if (write(fd, "hellothere" + i, 1) != 1) - break; - } - close(fd); - freeaddrinfo(addrinfo); - free(d); - exit(i); - } - ok1(io_loop() == d); - ok1(d->state == 3); - ok1(d->timed_out == false); - ok1(wait(&status)); - ok1(WIFEXITED(status)); - ok1(WEXITSTATUS(status) >= sizeof(d->buf)); - - io_close_listener(l); - freeaddrinfo(addrinfo); - - /* We should have tested each one at least once! */ - ok1(realloc_count); - ok1(alloc_count); - ok1(free_count); - - ok1(free_count < alloc_count); - freefn(d); - ok1(free_count == alloc_count); - - return exit_status(); -} -- 2.39.2