enum io_plan_status {
/* As before calling next function. */
IO_UNSET,
- /* Normal. */
- IO_POLLING,
+ /* Normal, but haven't started yet. */
+ IO_POLLING_NOTSTARTED,
+ IO_POLLING_STARTED,
/* Waiting for io_wake */
IO_WAITING,
/* Always do this. */
{
assert(conn->plan[dir].status == IO_UNSET);
- conn->plan[dir].status = IO_POLLING;
+ conn->plan[dir].status = IO_POLLING_NOTSTARTED;
return &conn->plan[dir].arg;
}
bool idle_on_epipe)
{
/* We shouldn't have polled for this event if this wasn't true! */
- assert(plan->status == IO_POLLING);
+ assert(plan->status == IO_POLLING_NOTSTARTED
+ || plan->status == IO_POLLING_STARTED);
switch (plan->io(conn->fd.fd, &plan->arg)) {
case -1:
io_close(conn);
return false;
case 0:
+ plan->status = IO_POLLING_STARTED;
return true;
case 1:
return next_plan(conn, plan);
/* If we're writing to a closed pipe, we need to wait for
* read to fail if we're duplex: we want to drain it! */
do_plan(conn, &conn->plan[IO_OUT],
- conn->plan[IO_IN].status == IO_POLLING);
+ conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED
+ || conn->plan[IO_IN].status == IO_POLLING_STARTED);
}
void io_do_always(struct io_conn *conn)
return plan;
}
+bool io_plan_in_started(const struct io_conn *conn)
+{
+ return conn->plan[IO_IN].status == IO_POLLING_STARTED;
+}
+
+bool io_plan_out_started(const struct io_conn *conn)
+{
+ return conn->plan[IO_OUT].status == IO_POLLING_STARTED;
+}
+
bool io_flush_sync(struct io_conn *conn)
{
struct io_plan *plan = &conn->plan[IO_OUT];
bool ok;
/* Not writing? Nothing to do. */
- if (plan->status != IO_POLLING)
+ if (plan->status != IO_POLLING_STARTED
+ && plan->status != IO_POLLING_NOTSTARTED)
return true;
/* Synchronous please. */
break;
/* Incomplete, try again. */
case 0:
+ plan->status = IO_POLLING_STARTED;
goto again;
case 1:
ok = true;
*/
int io_conn_fd(const struct io_conn *conn);
+/**
+ * io_plan_in_started - is this conn doing input I/O now?
+ * @conn: the conn.
+ *
+ * This returns true if input I/O has been performed on the conn but
+ * @next hasn't been called yet. For example, io_read() may have done
+ * a partial read.
+ *
+ * This can be useful if we want to terminate a connection only after
+ * reading a whole packet: if this returns true, we would wait until
+ * @next is called.
+ */
+bool io_plan_in_started(const struct io_conn *conn);
+
+/**
+ * io_plan_out_started - is this conn doing output I/O now?
+ * @conn: the conn.
+ *
+ * This returns true if output I/O has been performed on the conn but
+ * @next hasn't been called yet. For example, io_write() may have done
+ * a partial write.
+ *
+ * This can be useful if we want to terminate a connection only after
+ * writing a whole packet: if this returns true, we would wait until
+ * @next is called.
+ */
+bool io_plan_out_started(const struct io_conn *conn);
+
/**
* io_flush_sync - (synchronously) complete any outstanding output.
* @conn: the connection.
num_waiting--;
pfd->events = 0;
- if (conn->plan[IO_IN].status == IO_POLLING)
+ if (conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED
+ || conn->plan[IO_IN].status == IO_POLLING_STARTED)
pfd->events |= POLLIN;
- if (conn->plan[IO_OUT].status == IO_POLLING)
+ if (conn->plan[IO_OUT].status == IO_POLLING_NOTSTARTED
+ || conn->plan[IO_OUT].status == IO_POLLING_STARTED)
pfd->events |= POLLOUT;
if (pfd->events) {
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+
+static struct io_conn *in_conn;
+
+static struct io_plan *in_conn_done(struct io_conn *conn, void *unused)
+{
+ ok1(!io_plan_in_started(conn));
+ return io_close(conn);
+}
+
+static struct io_plan *init_in_conn(struct io_conn *conn, char *buf)
+{
+ ok1(!io_plan_in_started(conn));
+ return io_read(conn, buf, 2, in_conn_done, NULL);
+}
+
+static int do_nothing(int fd, struct io_plan_arg *arg)
+{
+ return 1;
+}
+
+static struct io_plan *dummy_write(struct io_conn *conn,
+ struct io_plan *(*next)
+ (struct io_conn *, void *),
+ void *next_arg)
+{
+ io_plan_arg(conn, IO_OUT);
+ return io_set_plan(conn, IO_OUT, do_nothing, next, next_arg);
+}
+
+static struct io_plan *out_post_write(struct io_conn *conn, void *unused)
+{
+ /* It might not have started yet: try again. */
+ if (!io_plan_in_started(in_conn))
+ return dummy_write(conn, out_post_write, NULL);
+ ok1(io_plan_in_started(in_conn));
+
+ /* Final write, then close */
+ return io_write(conn, "2", 1, io_close_cb, NULL);
+}
+
+static struct io_plan *init_out_conn(struct io_conn *conn, void *unused)
+{
+ ok1(!io_plan_in_started(in_conn));
+ return io_write(conn, "1", 1, out_post_write, NULL);
+}
+
+int main(void)
+{
+ int fds[2];
+ const tal_t *ctx = tal(NULL, char);
+ char *buf = tal_arr(ctx, char, 3);
+
+ /* This is how many tests you plan to run */
+ plan_tests(5);
+
+ if (pipe(fds) != 0)
+ abort();
+
+ buf[2] = '\0';
+
+ in_conn = io_new_conn(ctx, fds[0], init_in_conn, buf);
+ io_new_conn(ctx, fds[1], init_out_conn, NULL);
+
+ io_loop(NULL, NULL);
+ ok1(strcmp(buf, "12") == 0);
+ tal_free(ctx);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+
+static struct io_conn *out_conn;
+
+/* Write one byte at a time. */
+static int do_slow_write(int fd, struct io_plan_arg *arg)
+{
+ ssize_t ret = write(fd, arg->u1.cp, 1);
+ if (ret < 0)
+ return -1;
+
+ arg->u1.cp += ret;
+ arg->u2.s -= ret;
+ return arg->u2.s == 0;
+}
+
+static struct io_plan *io_slow_write(struct io_conn *conn,
+ const void *data, size_t len,
+ struct io_plan *(*next)(struct io_conn *,
+ void *),
+ void *next_arg)
+{
+ struct io_plan_arg *arg = io_plan_arg(conn, IO_OUT);
+
+ arg->u1.const_vp = data;
+ arg->u2.s = len;
+
+ return io_set_plan(conn, IO_OUT, do_slow_write, next, next_arg);
+}
+
+static struct io_plan *out_conn_done(struct io_conn *conn, void *unused)
+{
+ ok1(!io_plan_out_started(conn));
+ return io_close(conn);
+}
+
+static struct io_plan *init_out_conn(struct io_conn *conn, void *unused)
+{
+ ok1(!io_plan_out_started(conn));
+ return io_slow_write(conn, "12", 2, out_conn_done, NULL);
+}
+
+static int do_nothing(int fd, struct io_plan_arg *arg)
+{
+ return 1;
+}
+
+static struct io_plan *dummy_read(struct io_conn *conn,
+ struct io_plan *(*next)
+ (struct io_conn *, void *),
+ void *next_arg)
+{
+ io_plan_arg(conn, IO_IN);
+ return io_set_plan(conn, IO_IN, do_nothing, next, next_arg);
+}
+
+static struct io_plan *in_post_read(struct io_conn *conn, void *buf)
+{
+ /* It might not have started yet: try again. */
+ if (!io_plan_out_started(out_conn))
+ return dummy_read(conn, in_post_read, NULL);
+ ok1(io_plan_out_started(out_conn));
+
+ /* Final read, then close */
+ return io_read(conn, (char *)buf+1, 1, io_close_cb, NULL);
+}
+
+static struct io_plan *init_in_conn(struct io_conn *conn, char *buf)
+{
+ return io_read(conn, buf, 1, in_post_read, buf);
+}
+
+int main(void)
+{
+ int fds[2];
+ const tal_t *ctx = tal(NULL, char);
+ char *buf = tal_arr(ctx, char, 3);
+
+ /* This is how many tests you plan to run */
+ plan_tests(4);
+
+ if (pipe(fds) != 0)
+ abort();
+
+ buf[2] = '\0';
+
+ io_new_conn(ctx, fds[0], init_in_conn, buf);
+ out_conn = io_new_conn(ctx, fds[1], init_out_conn, NULL);
+
+ io_loop(NULL, NULL);
+ ok1(strcmp(buf, "12") == 0);
+ tal_free(ctx);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}