From 7082f7d0e81911acb26787949c251dfb298cbdd8 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Wed, 25 Oct 2017 16:09:47 +1030 Subject: [PATCH] io: query whether io_plan in/out have started. For lightning, we want to hand the socket off to another daemon, but we need to be on a packet boundary. This lets us check if we've part-read or part-written. Signed-off-by: Rusty Russell --- ccan/io/backend.h | 5 +- ccan/io/io.c | 23 ++++- ccan/io/io.h | 28 ++++++ ccan/io/poll.c | 6 +- ccan/io/test/run-43-io_plan_in_started.c | 75 ++++++++++++++++ ccan/io/test/run-44-io_plan_out_started.c | 100 ++++++++++++++++++++++ 6 files changed, 229 insertions(+), 8 deletions(-) create mode 100644 ccan/io/test/run-43-io_plan_in_started.c create mode 100644 ccan/io/test/run-44-io_plan_out_started.c diff --git a/ccan/io/backend.h b/ccan/io/backend.h index c8ceb4e8..2ee5a830 100644 --- a/ccan/io/backend.h +++ b/ccan/io/backend.h @@ -25,8 +25,9 @@ struct io_listener { enum io_plan_status { /* As before calling next function. */ IO_UNSET, - /* Normal. */ - IO_POLLING, + /* Normal, but haven't started yet. */ + IO_POLLING_NOTSTARTED, + IO_POLLING_STARTED, /* Waiting for io_wake */ IO_WAITING, /* Always do this. */ diff --git a/ccan/io/io.c b/ccan/io/io.c index 99f0f7c9..c0dd9b83 100644 --- a/ccan/io/io.c +++ b/ccan/io/io.c @@ -131,7 +131,7 @@ struct io_plan_arg *io_plan_arg(struct io_conn *conn, enum io_direction dir) { assert(conn->plan[dir].status == IO_UNSET); - conn->plan[dir].status = IO_POLLING; + conn->plan[dir].status = IO_POLLING_NOTSTARTED; return &conn->plan[dir].arg; } @@ -368,7 +368,8 @@ static bool do_plan(struct io_conn *conn, struct io_plan *plan, bool idle_on_epipe) { /* We shouldn't have polled for this event if this wasn't true! */ - assert(plan->status == IO_POLLING); + assert(plan->status == IO_POLLING_NOTSTARTED + || plan->status == IO_POLLING_STARTED); switch (plan->io(conn->fd.fd, &plan->arg)) { case -1: @@ -380,6 +381,7 @@ static bool do_plan(struct io_conn *conn, struct io_plan *plan, io_close(conn); return false; case 0: + plan->status = IO_POLLING_STARTED; return true; case 1: return next_plan(conn, plan); @@ -399,7 +401,8 @@ void io_ready(struct io_conn *conn, int pollflags) /* If we're writing to a closed pipe, we need to wait for * read to fail if we're duplex: we want to drain it! */ do_plan(conn, &conn->plan[IO_OUT], - conn->plan[IO_IN].status == IO_POLLING); + conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED + || conn->plan[IO_IN].status == IO_POLLING_STARTED); } void io_do_always(struct io_conn *conn) @@ -509,13 +512,24 @@ struct io_plan *io_set_plan(struct io_conn *conn, enum io_direction dir, return plan; } +bool io_plan_in_started(const struct io_conn *conn) +{ + return conn->plan[IO_IN].status == IO_POLLING_STARTED; +} + +bool io_plan_out_started(const struct io_conn *conn) +{ + return conn->plan[IO_OUT].status == IO_POLLING_STARTED; +} + bool io_flush_sync(struct io_conn *conn) { struct io_plan *plan = &conn->plan[IO_OUT]; bool ok; /* Not writing? Nothing to do. */ - if (plan->status != IO_POLLING) + if (plan->status != IO_POLLING_STARTED + && plan->status != IO_POLLING_NOTSTARTED) return true; /* Synchronous please. */ @@ -528,6 +542,7 @@ again: break; /* Incomplete, try again. */ case 0: + plan->status = IO_POLLING_STARTED; goto again; case 1: ok = true; diff --git a/ccan/io/io.h b/ccan/io/io.h index 1e4e80e7..c9ab228c 100644 --- a/ccan/io/io.h +++ b/ccan/io/io.h @@ -674,6 +674,34 @@ void *io_loop(struct timers *timers, struct timer **expired); */ int io_conn_fd(const struct io_conn *conn); +/** + * io_plan_in_started - is this conn doing input I/O now? + * @conn: the conn. + * + * This returns true if input I/O has been performed on the conn but + * @next hasn't been called yet. For example, io_read() may have done + * a partial read. + * + * This can be useful if we want to terminate a connection only after + * reading a whole packet: if this returns true, we would wait until + * @next is called. + */ +bool io_plan_in_started(const struct io_conn *conn); + +/** + * io_plan_out_started - is this conn doing output I/O now? + * @conn: the conn. + * + * This returns true if output I/O has been performed on the conn but + * @next hasn't been called yet. For example, io_write() may have done + * a partial write. + * + * This can be useful if we want to terminate a connection only after + * writing a whole packet: if this returns true, we would wait until + * @next is called. + */ +bool io_plan_out_started(const struct io_conn *conn); + /** * io_flush_sync - (synchronously) complete any outstanding output. * @conn: the connection. diff --git a/ccan/io/poll.c b/ccan/io/poll.c index a4e83ed7..3354abe0 100644 --- a/ccan/io/poll.c +++ b/ccan/io/poll.c @@ -130,9 +130,11 @@ void backend_new_plan(struct io_conn *conn) num_waiting--; pfd->events = 0; - if (conn->plan[IO_IN].status == IO_POLLING) + if (conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED + || conn->plan[IO_IN].status == IO_POLLING_STARTED) pfd->events |= POLLIN; - if (conn->plan[IO_OUT].status == IO_POLLING) + if (conn->plan[IO_OUT].status == IO_POLLING_NOTSTARTED + || conn->plan[IO_OUT].status == IO_POLLING_STARTED) pfd->events |= POLLOUT; if (pfd->events) { diff --git a/ccan/io/test/run-43-io_plan_in_started.c b/ccan/io/test/run-43-io_plan_in_started.c new file mode 100644 index 00000000..f63f8779 --- /dev/null +++ b/ccan/io/test/run-43-io_plan_in_started.c @@ -0,0 +1,75 @@ +#include +/* Include the C files directly. */ +#include +#include +#include + +static struct io_conn *in_conn; + +static struct io_plan *in_conn_done(struct io_conn *conn, void *unused) +{ + ok1(!io_plan_in_started(conn)); + return io_close(conn); +} + +static struct io_plan *init_in_conn(struct io_conn *conn, char *buf) +{ + ok1(!io_plan_in_started(conn)); + return io_read(conn, buf, 2, in_conn_done, NULL); +} + +static int do_nothing(int fd, struct io_plan_arg *arg) +{ + return 1; +} + +static struct io_plan *dummy_write(struct io_conn *conn, + struct io_plan *(*next) + (struct io_conn *, void *), + void *next_arg) +{ + io_plan_arg(conn, IO_OUT); + return io_set_plan(conn, IO_OUT, do_nothing, next, next_arg); +} + +static struct io_plan *out_post_write(struct io_conn *conn, void *unused) +{ + /* It might not have started yet: try again. */ + if (!io_plan_in_started(in_conn)) + return dummy_write(conn, out_post_write, NULL); + ok1(io_plan_in_started(in_conn)); + + /* Final write, then close */ + return io_write(conn, "2", 1, io_close_cb, NULL); +} + +static struct io_plan *init_out_conn(struct io_conn *conn, void *unused) +{ + ok1(!io_plan_in_started(in_conn)); + return io_write(conn, "1", 1, out_post_write, NULL); +} + +int main(void) +{ + int fds[2]; + const tal_t *ctx = tal(NULL, char); + char *buf = tal_arr(ctx, char, 3); + + /* This is how many tests you plan to run */ + plan_tests(5); + + if (pipe(fds) != 0) + abort(); + + buf[2] = '\0'; + + in_conn = io_new_conn(ctx, fds[0], init_in_conn, buf); + io_new_conn(ctx, fds[1], init_out_conn, NULL); + + io_loop(NULL, NULL); + ok1(strcmp(buf, "12") == 0); + tal_free(ctx); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} diff --git a/ccan/io/test/run-44-io_plan_out_started.c b/ccan/io/test/run-44-io_plan_out_started.c new file mode 100644 index 00000000..71a63dd5 --- /dev/null +++ b/ccan/io/test/run-44-io_plan_out_started.c @@ -0,0 +1,100 @@ +#include +/* Include the C files directly. */ +#include +#include +#include + +static struct io_conn *out_conn; + +/* Write one byte at a time. */ +static int do_slow_write(int fd, struct io_plan_arg *arg) +{ + ssize_t ret = write(fd, arg->u1.cp, 1); + if (ret < 0) + return -1; + + arg->u1.cp += ret; + arg->u2.s -= ret; + return arg->u2.s == 0; +} + +static struct io_plan *io_slow_write(struct io_conn *conn, + const void *data, size_t len, + struct io_plan *(*next)(struct io_conn *, + void *), + void *next_arg) +{ + struct io_plan_arg *arg = io_plan_arg(conn, IO_OUT); + + arg->u1.const_vp = data; + arg->u2.s = len; + + return io_set_plan(conn, IO_OUT, do_slow_write, next, next_arg); +} + +static struct io_plan *out_conn_done(struct io_conn *conn, void *unused) +{ + ok1(!io_plan_out_started(conn)); + return io_close(conn); +} + +static struct io_plan *init_out_conn(struct io_conn *conn, void *unused) +{ + ok1(!io_plan_out_started(conn)); + return io_slow_write(conn, "12", 2, out_conn_done, NULL); +} + +static int do_nothing(int fd, struct io_plan_arg *arg) +{ + return 1; +} + +static struct io_plan *dummy_read(struct io_conn *conn, + struct io_plan *(*next) + (struct io_conn *, void *), + void *next_arg) +{ + io_plan_arg(conn, IO_IN); + return io_set_plan(conn, IO_IN, do_nothing, next, next_arg); +} + +static struct io_plan *in_post_read(struct io_conn *conn, void *buf) +{ + /* It might not have started yet: try again. */ + if (!io_plan_out_started(out_conn)) + return dummy_read(conn, in_post_read, NULL); + ok1(io_plan_out_started(out_conn)); + + /* Final read, then close */ + return io_read(conn, (char *)buf+1, 1, io_close_cb, NULL); +} + +static struct io_plan *init_in_conn(struct io_conn *conn, char *buf) +{ + return io_read(conn, buf, 1, in_post_read, buf); +} + +int main(void) +{ + int fds[2]; + const tal_t *ctx = tal(NULL, char); + char *buf = tal_arr(ctx, char, 3); + + /* This is how many tests you plan to run */ + plan_tests(4); + + if (pipe(fds) != 0) + abort(); + + buf[2] = '\0'; + + io_new_conn(ctx, fds[0], init_in_conn, buf); + out_conn = io_new_conn(ctx, fds[1], init_out_conn, NULL); + + io_loop(NULL, NULL); + ok1(strcmp(buf, "12") == 0); + tal_free(ctx); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} -- 2.39.2