From: Rusty Russell Date: Thu, 14 Nov 2013 02:29:41 +0000 (+1030) Subject: Merge branch 'master' of ozlabs.org:ccan X-Git-Url: http://git.ozlabs.org/?p=ccan;a=commitdiff_plain;h=02ae136a997a5e7cc3bb9210167a066b9f169a7b;hp=5d628b63760d38f7d5094141d019f4ab83546690 Merge branch 'master' of ozlabs.org:ccan --- diff --git a/ccan/io/io.c b/ccan/io/io.c index b7452044..54ba7da5 100644 --- a/ccan/io/io.c +++ b/ccan/io/io.c @@ -100,6 +100,10 @@ int io_debug_io(int ret) case 1: /* Done: get next plan. */ if (timeout_active(conn)) backend_del_timeout(conn); + /* In case they call io_duplex, clear our poll flags so + * both sides don't seem to be both doing read or write + * (See assert(!mask || pfd->events != mask) in poll.c) */ + conn->plan.pollflag = 0; conn->plan.next(conn, conn->plan.next_arg); break; default: @@ -414,6 +418,11 @@ struct io_plan io_idle_(void) return plan; } +bool io_is_idle(const struct io_conn *conn) +{ + return conn->plan.io == NULL; +} + void io_wake_(struct io_conn *conn, struct io_plan plan) { @@ -443,13 +452,17 @@ void io_ready(struct io_conn *conn) case 1: /* Done: get next plan. */ if (timeout_active(conn)) backend_del_timeout(conn); + /* In case they call io_duplex, clear our poll flags so + * both sides don't seem to be both doing read or write + * (See assert(!mask || pfd->events != mask) in poll.c) */ + conn->plan.pollflag = 0; conn->plan = conn->plan.next(conn, conn->plan.next_arg); backend_plan_changed(conn); } set_current(NULL); - /* If it closed, close duplex. */ - if (!conn->plan.next && conn->duplex) { + /* If it closed, close duplex if not already */ + if (!conn->plan.next && conn->duplex && conn->duplex->plan.next) { set_current(conn->duplex); conn->duplex->plan = io_close(); backend_plan_changed(conn->duplex); diff --git a/ccan/io/io.h b/ccan/io/io.h index 0318aef3..df0764bd 100644 --- a/ccan/io/io.h +++ b/ccan/io/io.h @@ -431,6 +431,21 @@ struct io_conn *io_duplex_(struct io_conn *conn, struct io_plan plan); #define io_wake(conn, plan) (io_plan_no_debug(), io_wake_((conn), (plan))) void io_wake_(struct io_conn *conn, struct io_plan plan); +/** + * io_is_idle - is a connection idle? + * + * This can be useful for complex protocols, eg. where you want a connection + * to send something, so you queue it and wake it if it's idle. + * + * Example: + * struct io_conn *sleeper; + * sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle()); + * + * assert(io_is_idle(sleeper)); + * io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL)); + */ +bool io_is_idle(const struct io_conn *conn); + /** * io_break - return from io_loop() * @ret: non-NULL value to return from io_loop(). diff --git a/ccan/io/poll.c b/ccan/io/poll.c index 0078fc65..1b11badd 100644 --- a/ccan/io/poll.c +++ b/ccan/io/poll.c @@ -346,7 +346,11 @@ void *do_io_loop(struct io_conn **ready) * anything can change. */ if (doing_debug()) break; - if (!(events&(POLLIN|POLLOUT))) + + /* If no events, or it closed + * the duplex, continue. */ + if (!(events&(POLLIN|POLLOUT)) + || !c->plan.next) continue; } } diff --git a/ccan/io/test/run-06-idle.c b/ccan/io/test/run-06-idle.c index 51cca961..455b8608 100644 --- a/ccan/io/test/run-06-idle.c +++ b/ccan/io/test/run-06-idle.c @@ -29,6 +29,7 @@ static struct io_plan read_done(struct io_conn *conn, struct data *d) static void finish_waker(struct io_conn *conn, struct data *d) { + ok1(io_is_idle(idler)); io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d)); ok1(d->state == 1); d->state++; @@ -102,7 +103,7 @@ int main(void) int fd, status; /* This is how many tests you plan to run */ - plan_tests(13); + plan_tests(14); d->state = 0; fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); diff --git a/ccan/io/test/run-14-duplex-both-read-DEBUG.c b/ccan/io/test/run-14-duplex-both-read-DEBUG.c new file mode 100644 index 00000000..5c4aae7c --- /dev/null +++ b/ccan/io/test/run-14-duplex-both-read-DEBUG.c @@ -0,0 +1,8 @@ +#define DEBUG +#define PORT "64014" +#define main real_main +int real_main(void); +#include "run-14-duplex-both-read.c" +#undef main +static bool always_debug(struct io_conn *conn) { return true; } +int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-14-duplex-both-read.c b/ccan/io/test/run-14-duplex-both-read.c new file mode 100644 index 00000000..70bdec0a --- /dev/null +++ b/ccan/io/test/run-14-duplex-both-read.c @@ -0,0 +1,149 @@ +/* Check a bug where we have just completed a read, then set up a duplex + * which tries to do a read. */ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +#ifndef PORT +#define PORT "65014" +#endif + +struct data { + struct io_listener *l; + int state; + struct io_conn *c1, *c2; + char buf[4]; + char wbuf[32]; +}; + +static void finish_ok(struct io_conn *conn, struct data *d) +{ + d->state++; +} + +static struct io_plan end(struct io_conn *conn, struct data *d) +{ + d->state++; + + /* last one out closes. */ + if (conn == d->c1 && io_is_idle(d->c2)) + return io_close(); + + /* last one out closes. */ + if (conn == d->c2 && io_is_idle(d->c1)) + return io_close(); + + return io_idle(); +} + +static struct io_plan make_duplex(struct io_conn *conn, struct data *d) +{ + /* Have duplex read the rest of the buffer. */ + d->c2 = io_duplex(conn, io_read(d->buf+1, sizeof(d->buf)-1, end, d)); + ok1(d->c2); + io_set_finish(d->c2, finish_ok, d); + + return io_write(d->wbuf, sizeof(d->wbuf), end, d); +} + +static void init_conn(int fd, struct data *d) +{ + ok1(d->state == 0); + d->state++; + + io_close_listener(d->l); + + memset(d->wbuf, 7, sizeof(d->wbuf)); + + d->c1 = io_new_conn(fd, io_read(d->buf, 1, make_duplex, d)); + io_set_finish(d->c1, finish_ok, d); +} + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +int main(void) +{ + struct data *d = malloc(sizeof(*d)); + struct addrinfo *addrinfo; + int fd, status; + + /* This is how many tests you plan to run */ + plan_tests(10); + d->state = 0; + fd = make_listen_fd(PORT, &addrinfo); + ok1(fd >= 0); + d->l = io_new_listener(fd, init_conn, d); + ok1(d->l); + fflush(stdout); + if (!fork()) { + int i; + char buf[32]; + + io_close_listener(d->l); + free(d); + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + signal(SIGPIPE, SIG_IGN); + for (i = 0; i < strlen("hellothere"); i++) { + if (write(fd, "hellothere" + i, 1) != 1) + break; + } + for (i = 0; i < 32; i++) { + if (read(fd, buf+i, 1) != 1) + break; + } + close(fd); + freeaddrinfo(addrinfo); + exit(0); + } + freeaddrinfo(addrinfo); + ok1(io_loop() == NULL); + ok1(d->state == 5); + ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); + free(d); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + /* This exits depending on whether all tests passed */ + return exit_status(); +}