io: query whether io_plan in/out have started.
authorRusty Russell <rusty@rustcorp.com.au>
Wed, 25 Oct 2017 05:39:47 +0000 (16:09 +1030)
committerRusty Russell <rusty@rustcorp.com.au>
Wed, 25 Oct 2017 05:39:47 +0000 (16:09 +1030)
For lightning, we want to hand the socket off to another daemon, but we need
to be on a packet boundary.  This lets us check if we've part-read or
part-written.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ccan/io/backend.h
ccan/io/io.c
ccan/io/io.h
ccan/io/poll.c
ccan/io/test/run-43-io_plan_in_started.c [new file with mode: 0644]
ccan/io/test/run-44-io_plan_out_started.c [new file with mode: 0644]

index c8ceb4e864397c5e54c54fe20c3ceddaae2c9e20..2ee5a830e1233bd614d916d73d7087a9f9ec7cea 100644 (file)
@@ -25,8 +25,9 @@ struct io_listener {
 enum io_plan_status {
        /* As before calling next function. */
        IO_UNSET,
-       /* Normal. */
-       IO_POLLING,
+       /* Normal, but haven't started yet. */
+       IO_POLLING_NOTSTARTED,
+       IO_POLLING_STARTED,
        /* Waiting for io_wake */
        IO_WAITING,
        /* Always do this. */
index 99f0f7c90fd8580f4dac211134b5ec87da544bf4..c0dd9b838e84af6ec8ef6b62dec8fe3695adf94c 100644 (file)
@@ -131,7 +131,7 @@ struct io_plan_arg *io_plan_arg(struct io_conn *conn, enum io_direction dir)
 {
        assert(conn->plan[dir].status == IO_UNSET);
 
-       conn->plan[dir].status = IO_POLLING;
+       conn->plan[dir].status = IO_POLLING_NOTSTARTED;
        return &conn->plan[dir].arg;
 }
 
@@ -368,7 +368,8 @@ static bool do_plan(struct io_conn *conn, struct io_plan *plan,
                    bool idle_on_epipe)
 {
        /* We shouldn't have polled for this event if this wasn't true! */
-       assert(plan->status == IO_POLLING);
+       assert(plan->status == IO_POLLING_NOTSTARTED
+              || plan->status == IO_POLLING_STARTED);
 
        switch (plan->io(conn->fd.fd, &plan->arg)) {
        case -1:
@@ -380,6 +381,7 @@ static bool do_plan(struct io_conn *conn, struct io_plan *plan,
                io_close(conn);
                return false;
        case 0:
+               plan->status = IO_POLLING_STARTED;
                return true;
        case 1:
                return next_plan(conn, plan);
@@ -399,7 +401,8 @@ void io_ready(struct io_conn *conn, int pollflags)
                /* If we're writing to a closed pipe, we need to wait for
                 * read to fail if we're duplex: we want to drain it! */
                do_plan(conn, &conn->plan[IO_OUT],
-                       conn->plan[IO_IN].status == IO_POLLING);
+                       conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED
+                       || conn->plan[IO_IN].status == IO_POLLING_STARTED);
 }
 
 void io_do_always(struct io_conn *conn)
@@ -509,13 +512,24 @@ struct io_plan *io_set_plan(struct io_conn *conn, enum io_direction dir,
        return plan;
 }
 
+bool io_plan_in_started(const struct io_conn *conn)
+{
+       return conn->plan[IO_IN].status == IO_POLLING_STARTED;
+}
+
+bool io_plan_out_started(const struct io_conn *conn)
+{
+       return conn->plan[IO_OUT].status == IO_POLLING_STARTED;
+}
+
 bool io_flush_sync(struct io_conn *conn)
 {
        struct io_plan *plan = &conn->plan[IO_OUT];
        bool ok;
 
        /* Not writing?  Nothing to do. */
-       if (plan->status != IO_POLLING)
+       if (plan->status != IO_POLLING_STARTED
+           && plan->status != IO_POLLING_NOTSTARTED)
                return true;
 
        /* Synchronous please. */
@@ -528,6 +542,7 @@ again:
                break;
        /* Incomplete, try again. */
        case 0:
+               plan->status = IO_POLLING_STARTED;
                goto again;
        case 1:
                ok = true;
index 1e4e80e7db779410b72a80428c65b12e8137218f..c9ab228c7a046b3c3575d0c1aa40d1255b15f5d7 100644 (file)
@@ -674,6 +674,34 @@ void *io_loop(struct timers *timers, struct timer **expired);
  */
 int io_conn_fd(const struct io_conn *conn);
 
+/**
+ * io_plan_in_started - is this conn doing input I/O now?
+ * @conn: the conn.
+ *
+ * This returns true if input I/O has been performed on the conn but
+ * @next hasn't been called yet.  For example, io_read() may have done
+ * a partial read.
+ *
+ * This can be useful if we want to terminate a connection only after
+ * reading a whole packet: if this returns true, we would wait until
+ * @next is called.
+ */
+bool io_plan_in_started(const struct io_conn *conn);
+
+/**
+ * io_plan_out_started - is this conn doing output I/O now?
+ * @conn: the conn.
+ *
+ * This returns true if output I/O has been performed on the conn but
+ * @next hasn't been called yet.  For example, io_write() may have done
+ * a partial write.
+ *
+ * This can be useful if we want to terminate a connection only after
+ * writing a whole packet: if this returns true, we would wait until
+ * @next is called.
+ */
+bool io_plan_out_started(const struct io_conn *conn);
+
 /**
  * io_flush_sync - (synchronously) complete any outstanding output.
  * @conn: the connection.
index a4e83ed761e77251767b01b3e3f5c5dd3492bfcd..3354abe01a2bae38ae6728b9f9f07522053b90b2 100644 (file)
@@ -130,9 +130,11 @@ void backend_new_plan(struct io_conn *conn)
                num_waiting--;
 
        pfd->events = 0;
-       if (conn->plan[IO_IN].status == IO_POLLING)
+       if (conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED
+           || conn->plan[IO_IN].status == IO_POLLING_STARTED)
                pfd->events |= POLLIN;
-       if (conn->plan[IO_OUT].status == IO_POLLING)
+       if (conn->plan[IO_OUT].status == IO_POLLING_NOTSTARTED
+           || conn->plan[IO_OUT].status == IO_POLLING_STARTED)
                pfd->events |= POLLOUT;
 
        if (pfd->events) {
diff --git a/ccan/io/test/run-43-io_plan_in_started.c b/ccan/io/test/run-43-io_plan_in_started.c
new file mode 100644 (file)
index 0000000..f63f877
--- /dev/null
@@ -0,0 +1,75 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+
+static struct io_conn *in_conn;
+
+static struct io_plan *in_conn_done(struct io_conn *conn, void *unused)
+{
+       ok1(!io_plan_in_started(conn));
+       return io_close(conn);
+}
+
+static struct io_plan *init_in_conn(struct io_conn *conn, char *buf)
+{
+       ok1(!io_plan_in_started(conn));
+       return io_read(conn, buf, 2, in_conn_done, NULL);
+}
+
+static int do_nothing(int fd, struct io_plan_arg *arg)
+{
+       return 1;
+}
+
+static struct io_plan *dummy_write(struct io_conn *conn,
+                                  struct io_plan *(*next)
+                                  (struct io_conn *, void *),
+                                  void *next_arg)
+{
+       io_plan_arg(conn, IO_OUT);
+       return io_set_plan(conn, IO_OUT, do_nothing, next, next_arg);
+}
+
+static struct io_plan *out_post_write(struct io_conn *conn, void *unused)
+{
+       /* It might not have started yet: try again. */
+       if (!io_plan_in_started(in_conn))
+               return dummy_write(conn, out_post_write, NULL);
+       ok1(io_plan_in_started(in_conn));
+
+       /* Final write, then close */
+       return io_write(conn, "2", 1, io_close_cb, NULL);
+}
+
+static struct io_plan *init_out_conn(struct io_conn *conn, void *unused)
+{
+       ok1(!io_plan_in_started(in_conn));
+       return io_write(conn, "1", 1, out_post_write, NULL);
+}
+
+int main(void)
+{
+       int fds[2];
+       const tal_t *ctx = tal(NULL, char);
+       char *buf = tal_arr(ctx, char, 3);
+
+       /* This is how many tests you plan to run */
+       plan_tests(5);
+
+       if (pipe(fds) != 0)
+               abort();
+
+       buf[2] = '\0';
+
+       in_conn = io_new_conn(ctx, fds[0], init_in_conn, buf);
+       io_new_conn(ctx, fds[1], init_out_conn, NULL);
+
+       io_loop(NULL, NULL);
+       ok1(strcmp(buf, "12") == 0);
+       tal_free(ctx);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-44-io_plan_out_started.c b/ccan/io/test/run-44-io_plan_out_started.c
new file mode 100644 (file)
index 0000000..71a63dd
--- /dev/null
@@ -0,0 +1,100 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+
+static struct io_conn *out_conn;
+
+/* Write one byte at a time. */
+static int do_slow_write(int fd, struct io_plan_arg *arg)
+{
+       ssize_t ret = write(fd, arg->u1.cp, 1);
+       if (ret < 0)
+               return -1;
+
+       arg->u1.cp += ret;
+       arg->u2.s -= ret;
+       return arg->u2.s == 0;
+}
+
+static struct io_plan *io_slow_write(struct io_conn *conn,
+                                    const void *data, size_t len,
+                                    struct io_plan *(*next)(struct io_conn *,
+                                                            void *),
+                                    void *next_arg)
+{
+       struct io_plan_arg *arg = io_plan_arg(conn, IO_OUT);
+
+       arg->u1.const_vp = data;
+       arg->u2.s = len;
+
+       return io_set_plan(conn, IO_OUT, do_slow_write, next, next_arg);
+}
+
+static struct io_plan *out_conn_done(struct io_conn *conn, void *unused)
+{
+       ok1(!io_plan_out_started(conn));
+       return io_close(conn);
+}
+
+static struct io_plan *init_out_conn(struct io_conn *conn, void *unused)
+{
+       ok1(!io_plan_out_started(conn));
+       return io_slow_write(conn, "12", 2, out_conn_done, NULL);
+}
+
+static int do_nothing(int fd, struct io_plan_arg *arg)
+{
+       return 1;
+}
+
+static struct io_plan *dummy_read(struct io_conn *conn,
+                                 struct io_plan *(*next)
+                                 (struct io_conn *, void *),
+                                 void *next_arg)
+{
+       io_plan_arg(conn, IO_IN);
+       return io_set_plan(conn, IO_IN, do_nothing, next, next_arg);
+}
+
+static struct io_plan *in_post_read(struct io_conn *conn, void *buf)
+{
+       /* It might not have started yet: try again. */
+       if (!io_plan_out_started(out_conn))
+               return dummy_read(conn, in_post_read, NULL);
+       ok1(io_plan_out_started(out_conn));
+
+       /* Final read, then close */
+       return io_read(conn, (char *)buf+1, 1, io_close_cb, NULL);
+}
+
+static struct io_plan *init_in_conn(struct io_conn *conn, char *buf)
+{
+       return io_read(conn, buf, 1, in_post_read, buf);
+}
+
+int main(void)
+{
+       int fds[2];
+       const tal_t *ctx = tal(NULL, char);
+       char *buf = tal_arr(ctx, char, 3);
+
+       /* This is how many tests you plan to run */
+       plan_tests(4);
+
+       if (pipe(fds) != 0)
+               abort();
+
+       buf[2] = '\0';
+
+       io_new_conn(ctx, fds[0], init_in_conn, buf);
+       out_conn = io_new_conn(ctx, fds[1], init_out_conn, NULL);
+
+       io_loop(NULL, NULL);
+       ok1(strcmp(buf, "12") == 0);
+       tal_free(ctx);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}