From: Rusty Russell Date: Thu, 10 Apr 2014 11:48:36 +0000 (+0930) Subject: io: change io_idle() to io_wait() X-Git-Url: http://git.ozlabs.org/?p=ccan;a=commitdiff_plain;h=f7ab2c65d40839a327e50876dc2708ab7b11aa52;hp=cdf62dce7077a9f9a818edbb67d31d033cbb73c6 io: change io_idle() to io_wait() Signed-off-by: Rusty Russell --- diff --git a/ccan/io/_info b/ccan/io/_info index 235e6ba3..8f77e361 100644 --- a/ccan/io/_info +++ b/ccan/io/_info @@ -25,76 +25,74 @@ * #include * #include * #include + * #include * * struct buffer { - * size_t max, off, rlen; - * char *buf; + * bool finished; + * size_t start, end, rlen, wlen; + * char buf[4096]; * }; * - * struct stdin_buffer { - * struct io_conn *reader, *writer; - * size_t len; - * char inbuf[4096]; - * }; - * - * // This reads from stdin. - * static struct io_plan wake_writer(struct io_conn *, struct stdin_buffer *); - * // This writes the stdin buffer to the child. - * static struct io_plan wake_reader(struct io_conn *, struct stdin_buffer *); - * - * static struct io_plan wake_writer(struct io_conn *c, struct stdin_buffer *b) + * static void finish(struct io_conn *c, struct buffer *b) * { - * assert(c == b->reader); - * io_wake(b->writer, io_write(b->inbuf, b->len, wake_reader, b)); - * return io_idle(); + * // Mark us finished. + * b->finished = true; + * // Wake writer just in case it's asleep. + * io_wake(b); * } * - * static void reader_exit(struct io_conn *c, struct stdin_buffer *b) + * static struct io_plan read_in(struct io_conn *c, struct buffer *b) * { - * assert(c == b->reader); - * io_wake(b->writer, io_close()); - * b->reader = NULL; - * } + * // Add what we just read. + * b->end += b->rlen; + * assert(b->end <= sizeof(b->buf)); * - * static struct io_plan wake_reader(struct io_conn *c, struct stdin_buffer *b) - * { - * assert(c == b->writer); - * if (!b->reader) - * return io_close(); - * b->len = sizeof(b->inbuf); - * io_wake(b->reader, io_read_partial(b->inbuf, &b->len, wake_writer, b)); - * return io_idle(); - * } + * // If we just read something, wake writer. + * if (b->rlen != 0) + * io_wake(b); * - * static void fail_child_write(struct io_conn *conn, struct stdin_buffer *b) - * { - * if (b->reader) - * err(1, "Failed writing to child."); + * // If buffer is empty, return to start. + * if (b->start == b->end) + * b->start = b->end = 0; + * + * // Read in some of the rest. + * b->rlen = sizeof(b->buf) - b->end; + * + * // No room? Wait for writer + * if (b->rlen == 0) + * return io_wait(b, read_in, b); + * + * return io_read_partial(b->buf + b->end, &b->rlen, read_in, b); * } * - * // This reads from the child and saves it into buffer. - * static struct io_plan read_from_child(struct io_conn *conn, - * struct buffer *b) + * static struct io_plan write_out(struct io_conn *c, struct buffer *b) * { - * b->off += b->rlen; - * - * if (b->off == b->max) - * b->buf = realloc(b->buf, b->max *= 2); + * // Remove what we just wrote. + * b->start += b->wlen; + * assert(b->start <= sizeof(b->buf)); + * + * // If we wrote somthing, wake writer. + * if (b->wlen != 0) + * io_wake(b); + * + * b->wlen = b->end - b->start; + * // Nothing to write? Wait for reader. + * if (b->wlen == 0) { + * if (b->finished) + * return io_close(); + * return io_wait(b, write_out, b); + * } * - * b->rlen = b->max - b->off; - * return io_read_partial(b->buf + b->off, &b->rlen, read_from_child, b); + * return io_write_partial(b->buf + b->start, &b->wlen, write_out, b); * } * * // Feed a program our stdin, gather its stdout, print that at end. * int main(int argc, char *argv[]) * { * int tochild[2], fromchild[2]; - * struct buffer out; - * struct stdin_buffer sbuf; + * struct buffer to, from; * int status; - * size_t off; - * ssize_t ret; - * struct io_conn *from_child; + * struct io_conn *reader; * * if (argc == 1) * errx(1, "Usage: runner ..."); @@ -117,35 +115,20 @@ * close(fromchild[1]); * signal(SIGPIPE, SIG_IGN); * - * sbuf.len = sizeof(sbuf.inbuf); - * sbuf.reader = io_new_conn(STDIN_FILENO, - * io_read_partial(sbuf.inbuf, &sbuf.len, - * wake_writer, &sbuf)); - * sbuf.writer = io_new_conn(tochild[1], io_idle()); - * - * out.max = 128; - * out.off = 0; - * out.rlen = 128; - * out.buf = malloc(out.max); - * from_child = io_new_conn(fromchild[0], - * io_read_partial(out.buf, &out.rlen, - * read_from_child, &out)); - * if (!sbuf.reader || !sbuf.writer || !from_child) - * err(1, "Allocating connections"); - * - * io_set_finish(sbuf.reader, reader_exit, &sbuf); - * io_set_finish(sbuf.writer, fail_child_write, &sbuf); + * // Read from stdin, write to child. + * memset(&to, 0, sizeof(to)); + * reader = io_new_conn(STDIN_FILENO, read_in(NULL, &to)); + * io_set_finish(reader, finish, &to); + * io_new_conn(tochild[1], write_out(NULL, &to)); + * + * // Read from child, write to stdout. + * reader = io_new_conn(fromchild[0], read_in(NULL, &from)); + * io_set_finish(reader, finish, &from); + * io_new_conn(STDOUT_FILENO, write_out(NULL, &from)); * * io_loop(); * wait(&status); * - * for (off = 0; off < out.off; off += ret) { - * ret = write(STDOUT_FILENO, out.buf+off, out.off-off); - * if (ret < 0) - * err(1, "Writing stdout"); - * } - * free(out.buf); - * * return WIFEXITED(status) ? WEXITSTATUS(status) : 2; * } * diff --git a/ccan/io/backend.h b/ccan/io/backend.h index e2090ff1..b6674025 100644 --- a/ccan/io/backend.h +++ b/ccan/io/backend.h @@ -91,6 +91,7 @@ bool add_conn(struct io_conn *c); bool add_duplex(struct io_conn *c); void del_listener(struct io_listener *l); void backend_plan_changed(struct io_conn *conn); +void backend_wait_changed(const void *wait); void backend_add_timeout(struct io_conn *conn, struct timespec ts); void backend_del_timeout(struct io_conn *conn); void backend_del_conn(struct io_conn *conn); diff --git a/ccan/io/io.c b/ccan/io/io.c index 90039e1e..a1610a40 100644 --- a/ccan/io/io.c +++ b/ccan/io/io.c @@ -24,8 +24,6 @@ bool io_plan_nodebug; struct io_conn *current; /* User-defined function to select which connection(s) to debug. */ bool (*io_debug_conn)(struct io_conn *conn); -/* Set when we wake up an connection we are debugging. */ -bool io_debug_wakeup; struct io_plan io_debug(struct io_plan plan) { @@ -36,12 +34,9 @@ struct io_plan io_debug(struct io_plan plan) return plan; } - if (!current || !doing_debug_on(current)) { - if (!io_debug_wakeup) - return plan; - } + if (!current || !doing_debug_on(current)) + return plan; - io_debug_wakeup = false; current->plan = plan; backend_plan_changed(current); @@ -68,7 +63,7 @@ struct io_plan io_debug(struct io_plan plan) /* Return a do-nothing plan, so backend_plan_changed in * io_ready doesn't do anything (it's already been called). */ - return io_idle_(); + return io_wait_(NULL, (void *)1, NULL); } int io_debug_io(int ret) @@ -107,22 +102,12 @@ int io_debug_io(int ret) return 2; } -static void debug_io_wake(struct io_conn *conn) -{ - /* We want linear if we wake a debugged connection, too. */ - if (io_debug_conn && io_debug_conn(conn)) - io_debug_wakeup = true; -} - /* Counterpart to io_plan_no_debug(), called in macros in io.h */ static void io_plan_debug_again(void) { io_plan_nodebug = false; } #else -static void debug_io_wake(struct io_conn *conn) -{ -} static void io_plan_debug_again(void) { } @@ -436,37 +421,26 @@ struct io_plan io_connect_(int fd, const struct addrinfo *addr, return plan; } -struct io_plan io_idle_(void) +struct io_plan io_wait_(const void *wait, + struct io_plan (*cb)(struct io_conn *, void*), + void *arg) { struct io_plan plan; + assert(cb); plan.pollflag = 0; plan.io = NULL; - /* Never called (overridden by io_wake), but NULL means closing */ - plan.next = (void *)io_idle_; + plan.next = cb; + plan.next_arg = arg; - return plan; -} + plan.u1.const_vp = wait; -bool io_is_idle(const struct io_conn *conn) -{ - return conn->plan.io == NULL; + return plan; } -void io_wake_(struct io_conn *conn, struct io_plan plan) - +void io_wake(const void *wait) { - io_plan_debug_again(); - - /* It might be closing, but we haven't called its finish() yet. */ - if (!conn->plan.next) - return; - /* It was idle, right? */ - assert(!conn->plan.io); - conn->plan = plan; - backend_plan_changed(conn); - - debug_io_wake(conn); + backend_wait_changed(wait); } void io_ready(struct io_conn *conn) diff --git a/ccan/io/io.h b/ccan/io/io.h index 478190f4..505c4c70 100644 --- a/ccan/io/io.h +++ b/ccan/io/io.h @@ -314,6 +314,7 @@ struct io_plan io_write_partial_(const void *data, size_t *len, struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *), void *arg); + /** * io_connect - plan to connect to a listening socket. * @fd: file descriptor. @@ -360,19 +361,32 @@ struct io_plan io_connect_(int fd, const struct addrinfo *addr, void *arg); /** - * io_idle - plan to do nothing. + * io_wait - plan to wait for something. + * @wait: the address to wait on. + * @cb: function to call after waiting. + * @arg: @cb argument * - * This indicates the connection is idle: io_wake() will be called later do - * give the connection a new plan. + * This indicates the connection is idle: io_wake() will be called later to + * restart the connection. * * Example: * struct io_conn *sleeper; - * sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle()); + * unsigned int counter = 0; + * sleeper = io_new_conn(open("/dev/null", O_RDONLY), + * io_wait(&counter, io_close_cb, NULL)); * if (!sleeper) * exit(1); */ -#define io_idle() io_debug(io_idle_()) -struct io_plan io_idle_(void); +#define io_wait(wait, cb, arg) \ + io_debug(io_wait_(wait, \ + typesafe_cb_preargs(struct io_plan, void *, \ + (cb), (arg), \ + struct io_conn *), \ + (arg))) + +struct io_plan io_wait_(const void *wait, + struct io_plan (*cb)(struct io_conn *, void *), + void *arg); /** * io_timeout - set timeout function if the callback doesn't complete. @@ -440,35 +454,18 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts, struct io_conn *io_duplex_(struct io_conn *conn, struct io_plan plan); /** - * io_wake - wake up an idle connection. - * @conn: an idle connection. - * @plan: the next I/O plan for @conn. - * - * This makes @conn ready to do I/O the next time around the io_loop(). + * io_wake - wake up any connections waiting on @wait + * @wait: the address to trigger. * * Example: - * struct io_conn *sleeper; - * sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle()); - * - * io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL)); - */ -#define io_wake(conn, plan) (io_plan_no_debug(), io_wake_((conn), (plan))) -void io_wake_(struct io_conn *conn, struct io_plan plan); - -/** - * io_is_idle - is a connection idle? - * - * This can be useful for complex protocols, eg. where you want a connection - * to send something, so you queue it and wake it if it's idle. + * unsigned int wait; * - * Example: - * struct io_conn *sleeper; - * sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle()); + * io_new_conn(open("/dev/null", O_RDONLY), + * io_wait(&wait, io_close_cb, NULL)); * - * assert(io_is_idle(sleeper)); - * io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL)); + * io_wake(&wait); */ -bool io_is_idle(const struct io_conn *conn); +void io_wake(const void *wait); /** * io_break - return from io_loop() diff --git a/ccan/io/poll.c b/ccan/io/poll.c index e3f595c9..8ba376a5 100644 --- a/ccan/io/poll.c +++ b/ccan/io/poll.c @@ -167,6 +167,34 @@ void backend_plan_changed(struct io_conn *conn) some_always = true; } +void backend_wait_changed(const void *wait) +{ + unsigned int i; + + for (i = 0; i < num_fds; i++) { + struct io_conn *c, *duplex; + + /* Ignore listeners */ + if (fds[i]->listener) + continue; + c = (void *)fds[i]; + for (duplex = c->duplex; c; c = duplex, duplex = NULL) { + /* Ignore closing. */ + if (!c->plan.next) + continue; + /* Not idle? */ + if (c->plan.io) + continue; + /* Waiting on something else? */ + if (c->plan.u1.const_vp != wait) + continue; + /* Make it do the next thing. */ + c->plan = io_always_(c->plan.next, c->plan.next_arg); + backend_plan_changed(c); + } + } +} + bool add_conn(struct io_conn *c) { if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT))) diff --git a/ccan/io/test/run-06-idle.c b/ccan/io/test/run-06-idle.c index 7ae16e9e..82f6e0af 100644 --- a/ccan/io/test/run-06-idle.c +++ b/ccan/io/test/run-06-idle.c @@ -29,8 +29,7 @@ static struct io_plan read_done(struct io_conn *conn, struct data *d) static void finish_waker(struct io_conn *conn, struct data *d) { - ok1(io_is_idle(idler)); - io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d)); + io_wake(d); ok1(d->state == 1); d->state++; } @@ -47,13 +46,18 @@ static struct io_plan never(struct io_conn *conn, void *arg) abort(); } +static struct io_plan read_buf(struct io_conn *conn, struct data *d) +{ + return io_read(d->buf, sizeof(d->buf), read_done, d); +} + static void init_conn(int fd, struct data *d) { int fd2; ok1(d->state == 0); d->state++; - idler = io_new_conn(fd, io_idle()); + idler = io_new_conn(fd, io_wait(d, read_buf, d)); io_set_finish(idler, finish_idle, d); /* This will wake us up, as read will fail. */ @@ -103,7 +107,7 @@ int main(void) int fd, status; /* This is how many tests you plan to run */ - plan_tests(14); + plan_tests(13); d->state = 0; fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); diff --git a/ccan/io/test/run-08-hangup-on-idle.c b/ccan/io/test/run-08-hangup-on-idle.c index b3840433..c8257703 100644 --- a/ccan/io/test/run-08-hangup-on-idle.c +++ b/ccan/io/test/run-08-hangup-on-idle.c @@ -15,6 +15,11 @@ static struct io_plan timeout_wakeup(struct io_conn *conn, char *buf) return io_read(buf, 16, io_close_cb, NULL); } +static struct io_plan never(struct io_conn *conn, void *unused) +{ + abort(); +} + int main(void) { int fds[2]; @@ -28,7 +33,7 @@ int main(void) /* Write then close. */ io_new_conn(fds[1], io_write("hello there world", 16, io_close_cb, NULL)); - conn = io_new_conn(fds[0], io_idle()); + conn = io_new_conn(fds[0], io_wait(buf, never, NULL)); /* To avoid assert(num_waiting) */ ok1(pipe(fds2) == 0); diff --git a/ccan/io/test/run-08-read-after-hangup.c b/ccan/io/test/run-08-read-after-hangup.c index b73139e4..f6b3db1c 100644 --- a/ccan/io/test/run-08-read-after-hangup.c +++ b/ccan/io/test/run-08-read-after-hangup.c @@ -11,10 +11,15 @@ static char inbuf[8]; static struct io_plan wake_it(struct io_conn *conn, struct io_conn *reader) { - io_wake(reader, io_read(inbuf, 8, io_close_cb, NULL)); + io_wake(inbuf); return io_close(); } +static struct io_plan read_buf(struct io_conn *conn, void *unused) +{ + return io_read(inbuf, 8, io_close_cb, NULL); +} + int main(void) { int fds[2]; @@ -23,7 +28,7 @@ int main(void) plan_tests(3); ok1(pipe(fds) == 0); - conn = io_new_conn(fds[0], io_idle()); + conn = io_new_conn(fds[0], io_wait(inbuf, read_buf, NULL)); io_new_conn(fds[1], io_write("EASYTEST", 8, wake_it, conn)); ok1(io_loop() == NULL); diff --git a/ccan/io/test/run-10-many.c b/ccan/io/test/run-10-many.c index da59a3cc..53e971d0 100644 --- a/ccan/io/test/run-10-many.c +++ b/ccan/io/test/run-10-many.c @@ -16,6 +16,12 @@ struct buffer { }; static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf); +static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf); + +static struct io_plan read_buf(struct io_conn *conn, struct buffer *buf) +{ + return io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf); +} static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf) { @@ -25,25 +31,28 @@ static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf) return io_close(); /* You write. */ - io_wake(buf->writer, - io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf)); + io_wake(&buf->writer); /* I'll wait until you wake me. */ - return io_idle(); + return io_wait(&buf->reader, read_buf, buf); +} + +static struct io_plan write_buf(struct io_conn *conn, struct buffer *buf) +{ + return io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf); } static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf) { assert(conn == buf->writer); /* You read. */ - io_wake(buf->reader, - io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf)); + io_wake(&buf->reader); if (++buf->iters == NUM_ITERS) return io_close(); /* I'll wait until you tell me to write. */ - return io_idle(); + return io_wait(&buf->writer, write_buf, buf); } static struct buffer buf[NUM]; @@ -66,13 +75,12 @@ int main(void) sprintf(buf[i].buf, "%i-%i", i, i); /* Wait for writer to tell us to read. */ - buf[i].reader = io_new_conn(last_read, io_idle()); + buf[i].reader = io_new_conn(last_read, + io_wait(&buf[i].reader, read_buf, + &buf[i])); if (!buf[i].reader) break; - buf[i].writer = io_new_conn(fds[1], - io_write(&buf[i].buf, - sizeof(buf[i].buf), - poke_reader, &buf[i])); + buf[i].writer = io_new_conn(fds[1], write_buf(NULL, &buf[i])); if (!buf[i].writer) break; last_read = fds[0]; @@ -83,11 +91,10 @@ int main(void) /* Last one completes the cirle. */ i = 0; sprintf(buf[i].buf, "%i-%i", i, i); - buf[i].reader = io_new_conn(last_read, io_idle()); + buf[i].reader = io_new_conn(last_read, + io_wait(&buf[i].reader, read_buf, &buf[i])); ok1(buf[i].reader); - buf[i].writer = io_new_conn(last_write, - io_write(&buf[i].buf, sizeof(buf[i].buf), - poke_reader, &buf[i])); + buf[i].writer = io_new_conn(last_write, write_buf(NULL, &buf[i])); ok1(buf[i].writer); /* They should eventually exit */ diff --git a/ccan/io/test/run-13-all-idle.c b/ccan/io/test/run-13-all-idle.c index 0e7e1565..31510007 100644 --- a/ccan/io/test/run-13-all-idle.c +++ b/ccan/io/test/run-13-all-idle.c @@ -17,7 +17,7 @@ int main(void) int fds[2]; ok1(pipe(fds) == 0); - io_new_conn(fds[0], io_idle()); + io_new_conn(fds[0], io_wait(&status, io_close_cb, NULL)); io_loop(); exit(1); } diff --git a/ccan/io/test/run-16-duplex-test.c b/ccan/io/test/run-16-duplex-test.c index 8a1a04f0..addca841 100644 --- a/ccan/io/test/run-16-duplex-test.c +++ b/ccan/io/test/run-16-duplex-test.c @@ -28,7 +28,7 @@ static void finish_ok(struct io_conn *conn, struct data *d) static struct io_plan write_done(struct io_conn *conn, struct data *d) { d->state++; - return io_idle(); + return io_wait(d, io_close_cb, NULL); } static struct io_plan read_done(struct io_conn *conn, struct data *d)