X-Git-Url: http://git.ozlabs.org/?p=ccan;a=blobdiff_plain;f=ccan%2Fio%2Fio.c;h=7eea9c35442cbbcb4386b430929195d1454b49a0;hp=68b95e82467194f13e87302e907cda5020fe37db;hb=e2e70f5566cb8978b74de517c24a0f236d3d01ee;hpb=3f642347378afc9e1db1768d88c9f5b2baffe9ba diff --git a/ccan/io/io.c b/ccan/io/io.c index 68b95e82..7eea9c35 100644 --- a/ccan/io/io.c +++ b/ccan/io/io.c @@ -72,16 +72,19 @@ static bool next_plan(struct io_conn *conn, struct io_plan *plan) return true; } -static void set_blocking(int fd, bool block) +bool io_fd_block(int fd, bool block) { int flags = fcntl(fd, F_GETFL); + if (flags == -1) + return false; + if (block) flags &= ~O_NONBLOCK; else flags |= O_NONBLOCK; - fcntl(fd, F_SETFL, flags); + return fcntl(fd, F_SETFL, flags) != -1; } struct io_conn *io_new_conn_(const tal_t *ctx, int fd, @@ -103,7 +106,7 @@ struct io_conn *io_new_conn_(const tal_t *ctx, int fd, return tal_free(conn); /* Keep our I/O async. */ - set_blocking(fd, false); + io_fd_block(fd, false); /* We start with out doing nothing, and in doing our init. */ conn->plan[IO_OUT].status = IO_UNSET; @@ -360,14 +363,20 @@ void io_wake(const void *wait) backend_wake(wait); } -/* Returns false if this has been freed. */ -static bool do_plan(struct io_conn *conn, struct io_plan *plan) +/* Returns false if this should not be touched (eg. freed). */ +static bool do_plan(struct io_conn *conn, struct io_plan *plan, + bool idle_on_epipe) { /* We shouldn't have polled for this event if this wasn't true! */ assert(plan->status == IO_POLLING); switch (plan->io(conn->fd.fd, &plan->arg)) { case -1: + if (errno == EPIPE && idle_on_epipe) { + plan->status = IO_UNSET; + backend_new_plan(conn); + return false; + } io_close(conn); return false; case 0: @@ -383,21 +392,38 @@ static bool do_plan(struct io_conn *conn, struct io_plan *plan) void io_ready(struct io_conn *conn, int pollflags) { if (pollflags & POLLIN) - if (!do_plan(conn, &conn->plan[IO_IN])) + if (!do_plan(conn, &conn->plan[IO_IN], false)) return; if (pollflags & POLLOUT) - do_plan(conn, &conn->plan[IO_OUT]); + /* If we're writing to a closed pipe, we need to wait for + * read to fail if we're duplex: we want to drain it! */ + do_plan(conn, &conn->plan[IO_OUT], + (conn->plan[IO_IN].status == IO_POLLING + || conn->plan[IO_IN].status == IO_ALWAYS)); } void io_do_always(struct io_conn *conn) { + /* There's a corner case where the in next_plan wakes up the + * out, placing it in IO_ALWAYS and we end up processing it immediately, + * only to leave it in the always list. + * + * Yet we can't just process one, in case they are both supposed + * to be done, so grab state beforehand. + */ + bool always_out = (conn->plan[IO_OUT].status == IO_ALWAYS); + if (conn->plan[IO_IN].status == IO_ALWAYS) if (!next_plan(conn, &conn->plan[IO_IN])) return; - if (conn->plan[IO_OUT].status == IO_ALWAYS) + if (always_out) { + /* You can't *unalways* a conn (except by freeing, in which + * case next_plan() returned false */ + assert(conn->plan[IO_OUT].status == IO_ALWAYS); next_plan(conn, &conn->plan[IO_OUT]); + } } void io_do_wakeup(struct io_conn *conn, enum io_direction dir) @@ -423,7 +449,7 @@ struct io_plan *io_close_cb(struct io_conn *conn, void *next_arg) struct io_plan *io_close_taken_fd(struct io_conn *conn) { - set_blocking(conn->fd.fd, true); + io_fd_block(conn->fd.fd, true); cleanup_conn_without_close(conn); return io_close(conn); @@ -483,3 +509,37 @@ struct io_plan *io_set_plan(struct io_conn *conn, enum io_direction dir, return plan; } + +bool io_flush_sync(struct io_conn *conn) +{ + struct io_plan *plan = &conn->plan[IO_OUT]; + bool ok; + + /* Not writing? Nothing to do. */ + if (plan->status != IO_POLLING) + return true; + + /* Synchronous please. */ + io_fd_block(io_conn_fd(conn), true); + +again: + switch (plan->io(conn->fd.fd, &plan->arg)) { + case -1: + ok = false; + break; + /* Incomplete, try again. */ + case 0: + goto again; + case 1: + ok = true; + /* In case they come back. */ + set_always(conn, IO_OUT, plan->next, plan->next_arg); + break; + default: + /* IO should only return -1, 0 or 1 */ + abort(); + } + + io_fd_block(io_conn_fd(conn), false); + return ok; +}