From: Rusty Russell Date: Mon, 14 Oct 2013 10:58:36 +0000 (+1030) Subject: ccan/io: initialize connection with an explicit I/O plan. X-Git-Url: http://git.ozlabs.org/?p=ccan;a=commitdiff_plain;h=57d9d1be33905691ec756b14b066181ca6850ced ccan/io: initialize connection with an explicit I/O plan. Rather than going via a callback, which tends to just set up I/O, do any setup before the call to io_new_conn(), then pass it the io_plan directly. The patch shows how much this simplifies our test code. Signed-off-by: Rusty Russell --- diff --git a/ccan/io/_info b/ccan/io/_info index 150b93e5..0dfb43c6 100644 --- a/ccan/io/_info +++ b/ccan/io/_info @@ -34,49 +34,29 @@ * // This reads from stdin. * static struct io_plan wake_writer(struct io_conn *, struct stdin_buffer *); * // This writes the stdin buffer to the child. - * static struct io_plan write_to_child(struct io_conn *c, - * struct stdin_buffer *b); - * static struct io_plan read_stdin(struct io_conn *c, struct stdin_buffer *b) - * { - * assert(c == b->reader); - * b->len = sizeof(b->inbuf); - * return io_read_partial(b->inbuf, &b->len, wake_writer, b); - * } + * static struct io_plan wake_reader(struct io_conn *, struct stdin_buffer *); * * static struct io_plan wake_writer(struct io_conn *c, struct stdin_buffer *b) * { * assert(c == b->reader); - * io_wake(b->writer, write_to_child, b); + * io_wake(b->writer, io_write(b->inbuf, b->len, wake_reader, b)); * return io_idle(); * } * * static void reader_exit(struct io_conn *c, struct stdin_buffer *b) * { * assert(c == b->reader); - * io_wake(b->writer, write_to_child, b); + * io_wake(b->writer, io_close(b->writer, NULL)); * b->reader = NULL; * } * * static struct io_plan wake_reader(struct io_conn *c, struct stdin_buffer *b) * { * assert(c == b->writer); - * io_wake(b->reader, read_stdin, b); - * return io_idle(); - * } - * - * static struct io_plan write_to_child(struct io_conn *conn, - * struct stdin_buffer *b) - * { - * assert(conn == b->writer); - * if (!b->reader) - * return io_close(conn, NULL); - * return io_write(b->inbuf, b->len, wake_reader, b); - * } - * - * static struct io_plan start_writer(struct io_conn *conn, - * struct stdin_buffer *b) - * { - * assert(conn == b->writer); + * if (!b->reader) + * return io_close(c, NULL); + * b->len = sizeof(b->inbuf); + * io_wake(b->reader, io_read_partial(b->inbuf, &b->len, wake_writer, b)); * return io_idle(); * } * @@ -92,15 +72,8 @@ * { * b->off += b->rlen; * - * if (b->off == b->max) { - * if (b->max == 0) - * b->max = 128; - * else if (b->max >= 1024*1024) - * b->max += 1024*1024; - * else - * b->max *= 2; - * b->buf = realloc(b->buf, b->max); - * } + * if (b->off == b->max) + * b->buf = realloc(b->buf, b->max *= 2); * * b->rlen = b->max - b->off; * return io_read_partial(b->buf + b->off, &b->rlen, read_from_child, b); @@ -110,11 +83,12 @@ * int main(int argc, char *argv[]) * { * int tochild[2], fromchild[2]; - * struct buffer out = { 0, 0, 0, NULL }; + * struct buffer out; * struct stdin_buffer sbuf; * int status; * size_t off; * ssize_t ret; + * struct io_conn *from_child; * * if (argc == 1) * errx(1, "Usage: runner ..."); @@ -137,11 +111,23 @@ * close(fromchild[1]); * signal(SIGPIPE, SIG_IGN); * - * sbuf.reader = io_new_conn(STDIN_FILENO, read_stdin, reader_exit, &sbuf); - * sbuf.writer = io_new_conn(tochild[1], start_writer, fail_child_write, + * sbuf.len = sizeof(sbuf.inbuf); + * sbuf.reader = io_new_conn(STDIN_FILENO, + * io_read_partial(sbuf.inbuf, &sbuf.len, + * wake_writer, &sbuf), + * reader_exit, &sbuf); + * sbuf.writer = io_new_conn(tochild[1], io_idle(), fail_child_write, * &sbuf); - * if (!sbuf.reader || !sbuf.writer - * || !io_new_conn(fromchild[0], read_from_child, NULL, &out)) + * + * out.max = 128; + * out.off = 0; + * out.rlen = 128; + * out.buf = malloc(out.max); + * from_child = io_new_conn(fromchild[0], + * io_read_partial(out.buf, &out.rlen, + * read_from_child, &out), + * NULL, NULL); + * if (!sbuf.reader || !sbuf.writer || !from_child) * err(1, "Allocating connections"); * * io_loop(); diff --git a/ccan/io/benchmarks/run-different-speed.c b/ccan/io/benchmarks/run-different-speed.c index 537a67bb..7a95fd84 100644 --- a/ccan/io/benchmarks/run-different-speed.c +++ b/ccan/io/benchmarks/run-different-speed.c @@ -28,7 +28,7 @@ struct client { static struct io_plan write_reply(struct io_conn *conn, struct client *client); static struct io_plan read_request(struct io_conn *conn, struct client *client) { - return io_read(conn, client->request_buffer, REQUEST_SIZE, + return io_read(client->request_buffer, REQUEST_SIZE, write_reply, client); } @@ -41,7 +41,7 @@ static struct io_plan write_complete(struct io_conn *conn, struct client *client static struct io_plan write_reply(struct io_conn *conn, struct client *client) { - return io_write(conn, client->reply_buffer, REPLY_SIZE, + return io_write(client->reply_buffer, REPLY_SIZE, write_complete, client); } @@ -108,12 +108,7 @@ static void sigalarm(int sig) static struct io_plan do_timeout(struct io_conn *conn, char *buf) { - return io_break(conn, buf, NULL, NULL); -} - -static struct io_plan do_timeout_read(struct io_conn *conn, char *buf) -{ - return io_read(conn, buf, 1, do_timeout, buf); + return io_break(buf, io_idle()); } int main(int argc, char *argv[]) @@ -155,11 +150,14 @@ int main(int argc, char *argv[]) if (ret < 0) err(1, "Accepting fd"); /* For efficiency, we share client structure */ - io_new_conn(ret, read_request, NULL, &client); + io_new_conn(ret, + io_read(client.request_buffer, REQUEST_SIZE, + write_reply, &client), + NULL, NULL); } } - io_new_conn(timeout[0], do_timeout_read, NULL, &buf); + io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf), NULL, NULL); close(wake[0]); for (i = 0; i < NUM_CHILDREN; i++) diff --git a/ccan/io/benchmarks/run-length-prefix.c b/ccan/io/benchmarks/run-length-prefix.c index 2ed9c729..5eb33acf 100644 --- a/ccan/io/benchmarks/run-length-prefix.c +++ b/ccan/io/benchmarks/run-length-prefix.c @@ -29,26 +29,25 @@ static struct io_plan write_reply(struct io_conn *conn, struct client *client); static struct io_plan read_body(struct io_conn *conn, struct client *client) { assert(client->len <= REQUEST_MAX); - return io_read(conn, client->request_buffer, client->len, + return io_read(client->request_buffer, client->len, write_reply, client); } -static struct io_plan read_header(struct io_conn *conn, struct client *client) +static struct io_plan io_read_header(struct client *client) { - return io_read(conn, &client->len, sizeof(client->len), - read_body, client); + return io_read(&client->len, sizeof(client->len), read_body, client); } /* once we're done, loop again. */ static struct io_plan write_complete(struct io_conn *conn, struct client *client) { completed++; - return read_header(conn, client); + return io_read_header(client); } static struct io_plan write_reply(struct io_conn *conn, struct client *client) { - return io_write(conn, &client->len, sizeof(client->len), + return io_write(&client->len, sizeof(client->len), write_complete, client); } @@ -114,12 +113,7 @@ static void sigalarm(int sig) static struct io_plan do_timeout(struct io_conn *conn, char *buf) { - return io_break(conn, buf, NULL, NULL); -} - -static struct io_plan do_timeout_read(struct io_conn *conn, char *buf) -{ - return io_read(conn, buf, 1, do_timeout, buf); + return io_break(buf, io_idle()); } int main(int argc, char *argv[]) @@ -163,11 +157,11 @@ int main(int argc, char *argv[]) err(1, "Accepting fd"); /* For efficiency, we share buffer */ client->request_buffer = buffer; - io_new_conn(ret, read_header, NULL, client); + io_new_conn(ret, io_read_header(client), NULL, NULL); } } - io_new_conn(timeout[0], do_timeout_read, NULL, &buf); + io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf), NULL, NULL); close(wake[0]); for (i = 0; i < NUM_CHILDREN; i++) diff --git a/ccan/io/benchmarks/run-loop.c b/ccan/io/benchmarks/run-loop.c index a2898e13..b0e6b02c 100644 --- a/ccan/io/benchmarks/run-loop.c +++ b/ccan/io/benchmarks/run-loop.c @@ -16,23 +16,8 @@ struct buffer { char buf[32]; }; -static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf); static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf); -static struct io_plan do_read(struct io_conn *conn, struct buffer *buf) -{ - assert(conn == buf->reader); - - return io_read(conn, &buf->buf, sizeof(buf->buf), poke_writer, buf); -} - -static struct io_plan do_write(struct io_conn *conn, struct buffer *buf) -{ - assert(conn == buf->writer); - - return io_write(conn, &buf->buf, sizeof(buf->buf), poke_reader, buf); -} - static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf) { assert(conn == buf->reader); @@ -41,31 +26,25 @@ static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf) return io_close(conn, NULL); /* You write. */ - io_wake(buf->writer, do_write, buf); + io_wake(buf->writer, + io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf)); /* I'll wait until you wake me. */ - return io_idle(conn); + return io_idle(); } static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf) { assert(conn == buf->writer); /* You read. */ - io_wake(buf->reader, do_read, buf); + io_wake(buf->reader, + io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf)); if (++buf->iters == NUM_ITERS) return io_close(conn, NULL); /* I'll wait until you tell me to write. */ - return io_idle(conn); -} - -static struct io_plan reader(struct io_conn *conn, struct buffer *buf) -{ - assert(conn == buf->reader); - - /* Wait for writer to tell us to read. */ - return io_idle(conn); + return io_idle(); } int main(void) @@ -87,10 +66,14 @@ int main(void) memset(buf[i].buf, i, sizeof(buf[i].buf)); sprintf(buf[i].buf, "%i-%i", i, i); - buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]); + buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL); if (!buf[i].reader) err(1, "Creating reader %i", i); - buf[i].writer = io_new_conn(fds[1], do_write, NULL, &buf[i]); + buf[i].writer = io_new_conn(fds[1], + io_write(&buf[i].buf, + sizeof(buf[i].buf), + poke_reader, &buf[i]), + NULL, NULL); if (!buf[i].writer) err(1, "Creating writer %i", i); last_read = fds[0]; @@ -100,10 +83,13 @@ int main(void) i = 0; buf[i].iters = 0; sprintf(buf[i].buf, "%i-%i", i, i); - buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]); + buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL); if (!buf[i].reader) err(1, "Creating reader %i", i); - buf[i].writer = io_new_conn(last_write, do_write, NULL, &buf[i]); + buf[i].writer = io_new_conn(last_write, io_write(&buf[i].buf, + sizeof(buf[i].buf), + poke_reader, &buf[i]), + NULL, NULL); if (!buf[i].writer) err(1, "Creating writer %i", i); diff --git a/ccan/io/io.c b/ccan/io/io.c index 5ea90ea7..bd45630a 100644 --- a/ccan/io/io.c +++ b/ccan/io/io.c @@ -40,7 +40,7 @@ void io_close_listener(struct io_listener *l) } struct io_conn *io_new_conn_(int fd, - struct io_plan (*start)(struct io_conn *, void *), + struct io_plan plan, void (*finish)(struct io_conn *, void *), void *arg) { @@ -51,11 +51,9 @@ struct io_conn *io_new_conn_(int fd, conn->fd.listener = false; conn->fd.fd = fd; - conn->plan.next = start; + conn->plan = plan; conn->finish = finish; - conn->finish_arg = conn->plan.next_arg = arg; - conn->plan.pollflag = 0; - conn->plan.state = IO_NEXT; + conn->finish_arg = arg; conn->duplex = NULL; conn->timeout = NULL; if (!add_conn(conn)) { @@ -66,9 +64,9 @@ struct io_conn *io_new_conn_(int fd, } struct io_conn *io_duplex_(struct io_conn *old, - struct io_plan (*start)(struct io_conn *, void *), - void (*finish)(struct io_conn *, void *), - void *arg) + struct io_plan plan, + void (*finish)(struct io_conn *, void *), + void *arg) { struct io_conn *conn; @@ -80,12 +78,10 @@ struct io_conn *io_duplex_(struct io_conn *old, conn->fd.listener = false; conn->fd.fd = old->fd.fd; - conn->plan.next = start; - conn->finish = finish; - conn->finish_arg = conn->plan.next_arg = arg; - conn->plan.pollflag = 0; - conn->plan.state = IO_NEXT; + conn->plan = plan; conn->duplex = old; + conn->finish = finish; + conn->finish_arg = arg; conn->timeout = NULL; if (!add_duplex(conn)) { free(conn); @@ -239,18 +235,14 @@ struct io_plan io_idle(void) return plan; } -void io_wake_(struct io_conn *conn, - struct io_plan (*fn)(struct io_conn *, void *), void *arg) +void io_wake(struct io_conn *conn, struct io_plan plan) { /* It might have finished, but we haven't called its finish() yet. */ if (conn->plan.state == IO_FINISHED) return; assert(conn->plan.state == IO_IDLE); - conn->plan.next = fn; - conn->plan.next_arg = arg; - conn->plan.pollflag = 0; - conn->plan.state = IO_NEXT; + conn->plan = plan; backend_wakeup(conn); } @@ -289,18 +281,9 @@ struct io_plan io_close(struct io_conn *conn, void *arg) } /* Exit the loop, returning this (non-NULL) arg. */ -struct io_plan io_break_(void *ret, - struct io_plan (*fn)(struct io_conn *, void *), - void *arg) +struct io_plan io_break(void *ret, struct io_plan plan) { - struct io_plan plan; - io_loop_return = ret; - plan.state = IO_NEXT; - plan.pollflag = 0; - plan.next = fn; - plan.next_arg = arg; - return plan; } diff --git a/ccan/io/io.h b/ccan/io/io.h index 9ba46b67..24176f55 100644 --- a/ccan/io/io.h +++ b/ccan/io/io.h @@ -64,29 +64,23 @@ struct io_plan { /** * io_new_conn - create a new connection. * @fd: the file descriptor. - * @start: the first function to call. + * @plan: the first I/O function. * @finish: the function to call when it's closed or fails. - * @arg: the argument to both @start and @finish. + * @arg: the argument to @finish. * - * This creates a connection which owns @fd. @start will be called on the - * next return to io_loop(), and @finish will be called when an I/O operation + * This creates a connection which owns @fd. @plan will be called on the + * next io_loop(), and @finish will be called when an I/O operation * fails, or you call io_close() on the connection. * - * The @start function must call one of the io queueing functions - * (eg. io_read, io_write) and return the next function to call once - * that is done using io_next(). The alternative is to call io_close(). - * * Returns NULL on error (and sets errno). */ -#define io_new_conn(fd, start, finish, arg) \ - io_new_conn_((fd), \ - typesafe_cb_preargs(struct io_plan, void *, \ - (start), (arg), struct io_conn *), \ +#define io_new_conn(fd, plan, finish, arg) \ + io_new_conn_((fd), (plan), \ typesafe_cb_preargs(void, void *, (finish), (arg), \ struct io_conn *), \ (arg)) struct io_conn *io_new_conn_(int fd, - struct io_plan (*start)(struct io_conn *, void *), + struct io_plan plan, void (*finish)(struct io_conn *, void *), void *arg); @@ -243,9 +237,9 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts, /** * io_duplex - split an fd into two connections. * @conn: a connection. - * @start: the first function to call. + * @plan: the first I/O function to call. * @finish: the function to call when it's closed or fails. - * @arg: the argument to both @start and @finish. + * @arg: the argument to @finish. * * Sometimes you want to be able to simultaneously read and write on a * single fd, but io forces a linear call sequence. The solition is @@ -254,56 +248,38 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts, * * You must io_close() both of them to close the fd. */ -#define io_duplex(conn, start, finish, arg) \ - io_duplex_((conn), \ - typesafe_cb_preargs(struct io_plan, void *, \ - (start), (arg), struct io_conn *), \ +#define io_duplex(conn, plan, finish, arg) \ + io_duplex_((conn), (plan), \ typesafe_cb_preargs(void, void *, (finish), (arg), \ struct io_conn *), \ (arg)) struct io_conn *io_duplex_(struct io_conn *conn, - struct io_plan (*start)(struct io_conn *, void *), + struct io_plan plan, void (*finish)(struct io_conn *, void *), void *arg); /** - * io_wake - wake up and idle connection. + * io_wake - wake up an idle connection. * @conn: an idle connection. - * @fn: the next function to call once queued IO is complete. - * @arg: the argument to @next. + * @plan: the next I/O function for @conn. * - * This makes @conn run its @next function the next time around the - * io_loop(). + * This makes @conn do I/O the next time around the io_loop(). */ -#define io_wake(conn, fn, arg) \ - io_wake_((conn), \ - typesafe_cb_preargs(struct io_plan, void *, \ - (fn), (arg), struct io_conn *), \ - (arg)) -void io_wake_(struct io_conn *conn, - struct io_plan (*fn)(struct io_conn *, void *), void *arg); +void io_wake(struct io_conn *conn, struct io_plan plan); /** * io_break - return from io_loop() * @ret: non-NULL value to return from io_loop(). - * @cb: function to call once on return - * @arg: @cb argument + * @plan: I/O to perform on return (if any) * * This breaks out of the io_loop. As soon as the current @next * function returns, any io_closed()'d connections will have their * finish callbacks called, then io_loop() with return with @ret. * - * If io_loop() is called again, then @cb will be called. + * If io_loop() is called again, then @plan will be carried out. */ -#define io_break(ret, fn, arg) \ - io_break_((ret), \ - typesafe_cb_preargs(struct io_plan, void *, \ - (fn), (arg), struct io_conn *), \ - (arg)) -struct io_plan io_break_(void *ret, - struct io_plan (*fn)(struct io_conn *, void *), - void *arg); +struct io_plan io_break(void *ret, struct io_plan plan); /* FIXME: io_recvfrom/io_sendto */ diff --git a/ccan/io/poll.c b/ccan/io/poll.c index c30bea17..f9221d1f 100644 --- a/ccan/io/poll.c +++ b/ccan/io/poll.c @@ -37,6 +37,9 @@ static bool add_fd(struct fd *fd, short events) fds[num_fds] = fd; fd->backend_info = num_fds; num_fds++; + if (events) + num_waiting++; + return true; } @@ -46,6 +49,8 @@ static void del_fd(struct fd *fd) assert(n != -1); assert(n < num_fds); + if (pollfds[n].events) + num_waiting--; if (n != num_fds - 1) { /* Move last one over us. */ pollfds[n] = pollfds[num_fds-1]; @@ -69,22 +74,49 @@ bool add_listener(struct io_listener *l) { if (!add_fd(&l->fd, POLLIN)) return false; - num_waiting++; return true; } +static void adjust_counts(enum io_state state) +{ + if (state == IO_NEXT) + num_next++; + else if (state == IO_FINISHED) + num_finished++; +} + +static void update_pollevents(struct io_conn *conn) +{ + struct pollfd *pfd = &pollfds[conn->fd.backend_info]; + + if (pfd->events) + num_waiting--; + + pfd->events = conn->plan.pollflag; + if (conn->duplex) { + int mask = conn->duplex->plan.pollflag; + /* You can't *both* read/write. */ + assert(!mask || pfd->events != mask); + pfd->events |= mask; + } + if (pfd->events) + num_waiting++; + + adjust_counts(conn->plan.state); +} + bool add_conn(struct io_conn *c) { - if (!add_fd(&c->fd, 0)) + if (!add_fd(&c->fd, c->plan.pollflag)) return false; - num_next++; + adjust_counts(c->plan.state); return true; } bool add_duplex(struct io_conn *c) { c->fd.backend_info = c->duplex->fd.backend_info; - num_next++; + update_pollevents(c); return true; } @@ -114,32 +146,13 @@ void del_listener(struct io_listener *l) static void backend_set_state(struct io_conn *conn, struct io_plan plan) { - struct pollfd *pfd = &pollfds[conn->fd.backend_info]; - - if (pfd->events) - num_waiting--; - - pfd->events = plan.pollflag; - if (conn->duplex) { - int mask = conn->duplex->plan.pollflag; - /* You can't *both* read/write. */ - assert(!mask || pfd->events != mask); - pfd->events |= mask; - } - if (pfd->events) - num_waiting++; - - if (plan.state == IO_NEXT) - num_next++; - else if (plan.state == IO_FINISHED) - num_finished++; - conn->plan = plan; + update_pollevents(conn); } void backend_wakeup(struct io_conn *conn) { - num_next++; + update_pollevents(conn); } static void accept_conn(struct io_listener *l) diff --git a/ccan/io/test/run-01-start-finish.c b/ccan/io/test/run-01-start-finish.c index 53c07f24..b5114abe 100644 --- a/ccan/io/test/run-01-start-finish.c +++ b/ccan/io/test/run-01-start-finish.c @@ -6,23 +6,18 @@ #include #include -static struct io_plan start_ok(struct io_conn *conn, int *state) -{ - ok1(*state == 0); - (*state)++; - return io_close(conn, NULL); -} - static void finish_ok(struct io_conn *conn, int *state) { ok1(*state == 1); (*state)++; - io_break(state + 1, NULL, NULL); + io_break(state + 1, io_idle()); } static void init_conn(int fd, int *state) { - if (!io_new_conn(fd, start_ok, finish_ok, state)) + ok1(*state == 0); + (*state)++; + if (!io_new_conn(fd, io_close(NULL, NULL), finish_ok, state)) abort(); } diff --git a/ccan/io/test/run-02-read.c b/ccan/io/test/run-02-read.c index abc66463..9abcd964 100644 --- a/ccan/io/test/run-02-read.c +++ b/ccan/io/test/run-02-read.c @@ -11,23 +11,20 @@ struct data { char buf[4]; }; -static struct io_plan start_ok(struct io_conn *conn, struct data *d) -{ - ok1(d->state == 0); - d->state++; - return io_read(d->buf, sizeof(d->buf), io_close, d); -} - static void finish_ok(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - io_break(d, NULL, NULL); + io_break(d, io_idle()); } static void init_conn(int fd, struct data *d) { - if (!io_new_conn(fd, start_ok, finish_ok, d)) + ok1(d->state == 0); + d->state++; + + if (!io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close, d), + finish_ok, d)) abort(); } diff --git a/ccan/io/test/run-03-readpartial.c b/ccan/io/test/run-03-readpartial.c index a7d0ae31..a24be7e0 100644 --- a/ccan/io/test/run-03-readpartial.c +++ b/ccan/io/test/run-03-readpartial.c @@ -12,24 +12,21 @@ struct data { char buf[4]; }; -static struct io_plan start_ok(struct io_conn *conn, struct data *d) -{ - ok1(d->state == 0); - d->state++; - d->bytes = sizeof(d->buf); - return io_read_partial(d->buf, &d->bytes, io_close, d); -} - static void finish_ok(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - io_break(d, NULL, NULL); + io_break(d, io_idle()); } static void init_conn(int fd, struct data *d) { - if (!io_new_conn(fd, start_ok, finish_ok, d)) + ok1(d->state == 0); + d->state++; + d->bytes = sizeof(d->buf); + + if (!io_new_conn(fd, io_read_partial(d->buf, &d->bytes, io_close, d), + finish_ok, d)) abort(); } diff --git a/ccan/io/test/run-04-writepartial.c b/ccan/io/test/run-04-writepartial.c index 11ac22a7..12a21ddc 100644 --- a/ccan/io/test/run-04-writepartial.c +++ b/ccan/io/test/run-04-writepartial.c @@ -12,23 +12,19 @@ struct data { char *buf; }; -static struct io_plan start_ok(struct io_conn *conn, struct data *d) -{ - ok1(d->state == 0); - d->state++; - return io_write_partial(d->buf, &d->bytes, io_close, d); -} - static void finish_ok(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - io_break(d, NULL, NULL); + io_break(d, io_idle()); } static void init_conn(int fd, struct data *d) { - if (!io_new_conn(fd, start_ok, finish_ok, d)) + ok1(d->state == 0); + d->state++; + if (!io_new_conn(fd, io_write_partial(d->buf, &d->bytes, io_close, d), + finish_ok, d)) abort(); } diff --git a/ccan/io/test/run-05-write.c b/ccan/io/test/run-05-write.c index 3f0a8f98..e878c330 100644 --- a/ccan/io/test/run-05-write.c +++ b/ccan/io/test/run-05-write.c @@ -12,23 +12,19 @@ struct data { char *buf; }; -static struct io_plan start_ok(struct io_conn *conn, struct data *d) -{ - ok1(d->state == 0); - d->state++; - return io_write(d->buf, d->bytes, io_close, d); -} - static void finish_ok(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - io_break(d, NULL, NULL); + io_break(d, io_idle()); } static void init_conn(int fd, struct data *d) { - if (!io_new_conn(fd, start_ok, finish_ok, d)) + ok1(d->state == 0); + d->state++; + if (!io_new_conn(fd, io_write(d->buf, d->bytes, io_close, d), + finish_ok, d)) abort(); } diff --git a/ccan/io/test/run-06-idle.c b/ccan/io/test/run-06-idle.c index 314293a5..fb681a08 100644 --- a/ccan/io/test/run-06-idle.c +++ b/ccan/io/test/run-06-idle.c @@ -16,55 +16,39 @@ struct data { char buf[4]; }; -static struct io_plan plan_read(struct io_conn *conn, struct data *d) +static struct io_plan read_done(struct io_conn *conn, struct data *d) { ok1(d->state == 2 || d->state == 3); d->state++; - return io_read(d->buf, sizeof(d->buf), io_close, d); + return io_close(conn, NULL); } -static struct io_plan start_waker(struct io_conn *conn, struct data *d) +static void finish_waker(struct io_conn *conn, struct data *d) { + io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d)); ok1(d->state == 1); d->state++; - - io_wake(idler, plan_read, d); - return io_close(conn, NULL); } -static void finish_waker(struct io_conn *conn, struct data *d) +static void finish_idle(struct io_conn *conn, struct data *d) { - ok1(d->state == 2 || d->state == 3); + ok1(d->state == 3); d->state++; + io_break(d, io_idle()); } -static struct io_plan start_idle(struct io_conn *conn, struct data *d) +static void init_conn(int fd, struct data *d) { - int fd; + int fd2; ok1(d->state == 0); d->state++; - idler = conn; + idler = io_new_conn(fd, io_idle(), finish_idle, d); - /* This will wake us up. */ - fd = open("/dev/null", O_RDONLY); - ok1(fd >= 0); - ok1(io_new_conn(fd, start_waker, finish_waker, d)); - - return io_idle(); -} - -static void finish_idle(struct io_conn *conn, struct data *d) -{ - ok1(d->state == 4); - d->state++; - io_break(d, NULL, NULL); -} - -static void init_conn(int fd, struct data *d) -{ - if (!io_new_conn(fd, start_idle, finish_idle, d)) - abort(); + /* This will wake us up, as read will fail. */ + fd2 = open("/dev/null", O_RDONLY); + ok1(fd2 >= 0); + ok1(io_new_conn(fd2, io_read(idler, 1, NULL, NULL), finish_waker, d)); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -107,7 +91,7 @@ int main(void) int fd, status; /* This is how many tests you plan to run */ - plan_tests(15); + plan_tests(14); d->state = 0; fd = make_listen_fd("65006", &addrinfo); ok1(fd >= 0); @@ -137,7 +121,7 @@ int main(void) freeaddrinfo(addrinfo); ok1(io_loop() == d); - ok1(d->state == 5); + ok1(d->state == 4); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); free(d); io_close_listener(l); diff --git a/ccan/io/test/run-07-break.c b/ccan/io/test/run-07-break.c index e8db3889..5a6e9701 100644 --- a/ccan/io/test/run-07-break.c +++ b/ccan/io/test/run-07-break.c @@ -11,18 +11,11 @@ struct data { char buf[4]; }; -static struct io_plan plan_read(struct io_conn *conn, struct data *d) +static struct io_plan read_done(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - return io_read(d->buf, sizeof(d->buf), io_close, d); -} - -static struct io_plan start_break(struct io_conn *conn, struct data *d) -{ - ok1(d->state == 0); - d->state++; - return io_break(d, plan_read, d); + return io_close(conn, NULL); } static void finish_ok(struct io_conn *conn, struct data *d) @@ -33,7 +26,13 @@ static void finish_ok(struct io_conn *conn, struct data *d) static void init_conn(int fd, struct data *d) { - if (!io_new_conn(fd, start_break, finish_ok, d)) + ok1(d->state == 0); + d->state++; + + if (!io_new_conn(fd, + io_break(d, + io_read(d->buf, sizeof(d->buf), read_done, d)), + finish_ok, d)) abort(); } diff --git a/ccan/io/test/run-10-many.c b/ccan/io/test/run-10-many.c index 95a716e2..1c39635b 100644 --- a/ccan/io/test/run-10-many.c +++ b/ccan/io/test/run-10-many.c @@ -15,23 +15,8 @@ struct buffer { char buf[32]; }; -static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf); static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf); -static struct io_plan plan_read(struct io_conn *conn, struct buffer *buf) -{ - assert(conn == buf->reader); - - return io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf); -} - -static struct io_plan plan_write(struct io_conn *conn, struct buffer *buf) -{ - assert(conn == buf->writer); - - return io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf); -} - static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf) { assert(conn == buf->reader); @@ -40,7 +25,8 @@ static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf) return io_close(conn, NULL); /* You write. */ - io_wake(buf->writer, plan_write, buf); + io_wake(buf->writer, + io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf)); /* I'll wait until you wake me. */ return io_idle(); @@ -50,7 +36,8 @@ static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf) { assert(conn == buf->writer); /* You read. */ - io_wake(buf->reader, plan_read, buf); + io_wake(buf->reader, + io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf)); if (++buf->iters == NUM_ITERS) return io_close(conn, NULL); @@ -59,14 +46,6 @@ static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf) return io_idle(); } -static struct io_plan reader(struct io_conn *conn, struct buffer *buf) -{ - assert(conn == buf->reader); - - /* Wait for writer to tell us to read. */ - return io_idle(); -} - static struct buffer buf[NUM]; int main(void) @@ -86,10 +65,15 @@ int main(void) memset(buf[i].buf, i, sizeof(buf[i].buf)); sprintf(buf[i].buf, "%i-%i", i, i); - buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]); + /* Wait for writer to tell us to read. */ + buf[i].reader = io_new_conn(last_read, io_idle(), NULL, &buf[i]); if (!buf[i].reader) break; - buf[i].writer = io_new_conn(fds[1], plan_write, NULL, &buf[i]); + buf[i].writer = io_new_conn(fds[1], + io_write(&buf[i].buf, + sizeof(buf[i].buf), + poke_reader, &buf[i]), + NULL, &buf[i]); if (!buf[i].writer) break; last_read = fds[0]; @@ -100,9 +84,12 @@ int main(void) /* Last one completes the cirle. */ i = 0; sprintf(buf[i].buf, "%i-%i", i, i); - buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]); + buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL); ok1(buf[i].reader); - buf[i].writer = io_new_conn(last_write, plan_write, NULL, &buf[i]); + buf[i].writer = io_new_conn(last_write, + io_write(&buf[i].buf, sizeof(buf[i].buf), + poke_reader, &buf[i]), + NULL, NULL); ok1(buf[i].writer); /* They should eventually exit */ diff --git a/ccan/io/test/run-12-bidir.c b/ccan/io/test/run-12-bidir.c index 3534bc31..6dc94c8a 100644 --- a/ccan/io/test/run-12-bidir.c +++ b/ccan/io/test/run-12-bidir.c @@ -18,28 +18,27 @@ static void finish_ok(struct io_conn *conn, struct data *d) d->state++; } -static struct io_plan write_out(struct io_conn *conn, struct data *d) +static struct io_plan write_done(struct io_conn *conn, struct data *d) { d->state++; - return io_write(d->wbuf, sizeof(d->wbuf), io_close, d); + return io_close(conn, NULL); } -static struct io_plan start_ok(struct io_conn *conn, struct data *d) +static void init_conn(int fd, struct data *d) { + struct io_conn *conn; + ok1(d->state == 0); d->state++; io_close_listener(d->l); memset(d->wbuf, 7, sizeof(d->wbuf)); - ok1(io_duplex(conn, write_out, finish_ok, d)); - return io_read(d->buf, sizeof(d->buf), io_close, d); -} -static void init_conn(int fd, struct data *d) -{ - if (!io_new_conn(fd, start_ok, finish_ok, d)) - abort(); + conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close, d), + finish_ok, d); + ok1(io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d), + finish_ok, d)); } static int make_listen_fd(const char *port, struct addrinfo **info) diff --git a/ccan/io/test/run-13-all-idle.c b/ccan/io/test/run-13-all-idle.c index f83fb31e..7ad6bfe4 100644 --- a/ccan/io/test/run-13-all-idle.c +++ b/ccan/io/test/run-13-all-idle.c @@ -7,11 +7,6 @@ #include #include -static struct io_plan start(struct io_conn *conn, void *unused) -{ - return io_idle(); -} - int main(void) { int status; @@ -22,7 +17,7 @@ int main(void) int fds[2]; ok1(pipe(fds) == 0); - io_new_conn(fds[0], start, NULL, NULL); + io_new_conn(fds[0], io_idle(), NULL, NULL); io_loop(); exit(1); } diff --git a/ccan/io/test/run-15-timeout.c b/ccan/io/test/run-15-timeout.c index 0ff3fc8a..f8ddc6a5 100644 --- a/ccan/io/test/run-15-timeout.c +++ b/ccan/io/test/run-15-timeout.c @@ -30,25 +30,23 @@ static struct io_plan timeout(struct io_conn *conn, struct data *d) return io_close(conn, d); } -static struct io_plan start_ok(struct io_conn *conn, struct data *d) -{ - ok1(d->state == 0); - d->state++; - io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d); - return io_read(d->buf, sizeof(d->buf), no_timeout, d); -} - static void finish_ok(struct io_conn *conn, struct data *d) { ok1(d->state == 2); d->state++; - io_break(d, NULL, NULL); + io_break(d, io_idle()); } static void init_conn(int fd, struct data *d) { - if (!io_new_conn(fd, start_ok, finish_ok, d)) - abort(); + struct io_conn *conn; + + ok1(d->state == 0); + d->state++; + + conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d), + finish_ok, d); + io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d); } static int make_listen_fd(const char *port, struct addrinfo **info)