From: Rusty Russell Date: Mon, 4 Aug 2014 08:12:21 +0000 (+0930) Subject: ccan/io: duplex support. X-Git-Url: http://git.ozlabs.org/?p=ccan;a=commitdiff_plain;h=e92e2f65a9ae729a0c9805427da737ad936abddb ccan/io: duplex support. This is actually pretty simple now. Signed-off-by: Rusty Russell --- diff --git a/ccan/io/io.c b/ccan/io/io.c index 2832c513..23e80704 100644 --- a/ccan/io/io.c +++ b/ccan/io/io.c @@ -56,9 +56,10 @@ static void next_plan(struct io_conn *conn, struct io_plan *plan) plan = next(conn, plan->next_arg); - /* It should have set a plan inside this conn. */ + /* It should have set a plan inside this conn (or duplex) */ assert(plan == &conn->plan[IO_IN] - || plan == &conn->plan[IO_OUT]); + || plan == &conn->plan[IO_OUT] + || plan == &conn->plan[2]); assert(conn->plan[IO_IN].status != IO_UNSET || conn->plan[IO_OUT].status != IO_UNSET); @@ -428,7 +429,7 @@ void io_break(const void *ret) io_loop_return = (void *)ret; } -struct io_plan *io_never(struct io_conn *conn) +struct io_plan *io_never(struct io_conn *conn, void *unused) { return io_always(conn, io_never_called, NULL); } @@ -437,3 +438,11 @@ int io_conn_fd(const struct io_conn *conn) { return conn->fd.fd; } + +struct io_plan *io_duplex(struct io_plan *in_plan, struct io_plan *out_plan) +{ + /* in_plan must be conn->plan[IO_IN], out_plan must be [IO_OUT] */ + assert(out_plan == in_plan + 1); + + return out_plan + 1; +} diff --git a/ccan/io/io.h b/ccan/io/io.h index 00b9b33d..d4ba2d87 100644 --- a/ccan/io/io.h +++ b/ccan/io/io.h @@ -403,6 +403,38 @@ struct io_plan *io_connect_(struct io_conn *conn, const struct addrinfo *addr, struct io_plan *(*next)(struct io_conn *, void *), void *arg); +/** + * io_duplex - set plans for both input and output. + * @conn: the connection that plan is for. + * @in: the input plan + * @out: the output plan + * + * Most plans are either for input or output; io_duplex creates a plan + * which does both. This is often used in the init function to create + * two independent streams, though it can be used once on any connection. + * + * Note that if either plan closes the connection, it will be closed. + * + * Note that if one plan is io_wait or io_always, that causes a problem: + * they look at the input and output plan slots to figure out which to + * use, but if the other plan hasn't been evaluated yet, that will fail. + * In this case, you'll need to ensure the other plan is evaluated first, + * eg. "struct io_plan *r = io_read(...); return io_duplex(r, io_always(...))" + * + * Example: + * struct buf { + * char in[100]; + * char out[100]; + * }; + * + * static struct io_plan *read_and_write(struct io_conn *conn, struct buf *b) + * { + * return io_duplex(io_read(conn, b->in, sizeof(b->in), io_close_cb, b), + * io_write(conn, b->out, sizeof(b->out), io_close_cb, b)); + * } + */ +struct io_plan *io_duplex(struct io_plan *in_plan, struct io_plan *out_plan); + /** * io_wait - leave a plan idle until something wakes us. * @conn: the connection that plan is for. @@ -471,6 +503,7 @@ void io_break(const void *ret); /** * io_never - assert if callback is called. * @conn: the connection that plan is for. + * @unused: an unused parameter to make this suitable for use as a callback. * * Sometimes you want to make it clear that a callback should never happen * (eg. for io_break). This will assert() if called. @@ -480,10 +513,10 @@ void io_break(const void *ret); * { * io_break(conn); * // We won't ever return from io_break - * return io_never(conn); + * return io_never(conn, NULL); * } */ -struct io_plan *io_never(struct io_conn *conn); +struct io_plan *io_never(struct io_conn *conn, void *unused); /* FIXME: io_recvfrom/io_sendto */ diff --git a/ccan/io/test/run-12-bidir.c b/ccan/io/test/run-12-bidir.c index 533f465d..0329b81f 100644 --- a/ccan/io/test/run-12-bidir.c +++ b/ccan/io/test/run-12-bidir.c @@ -6,7 +6,6 @@ #include #include -#if 0 #ifndef PORT #define PORT "65012" #endif @@ -14,6 +13,7 @@ struct data { struct io_listener *l; int state; + int done; char buf[4]; char wbuf[32]; }; @@ -23,28 +23,27 @@ static void finish_ok(struct io_conn *conn, struct data *d) d->state++; } -static struct io_plan *write_done(struct io_conn *conn, struct data *d) +static struct io_plan *rw_done(struct io_conn *conn, struct data *d) { d->state++; - return io_close(conn); + d->done++; + if (d->done == 2) + return io_close(conn); + return io_wait(conn, NULL, io_never, NULL); } -static void init_conn(int fd, struct data *d) +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) { - struct io_conn *conn; - ok1(d->state == 0); d->state++; io_close_listener(d->l); memset(d->wbuf, 7, sizeof(d->wbuf)); - - conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close_cb, d)); - io_set_finish(conn, finish_ok, d); - conn = io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d)); - ok1(conn); io_set_finish(conn, finish_ok, d); + + return io_duplex(io_read(conn, d->buf, sizeof(d->buf), rw_done, d), + io_write(conn, d->wbuf, sizeof(d->wbuf), rw_done, d)); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -88,9 +87,10 @@ int main(void) /* This is how many tests you plan to run */ plan_tests(10); d->state = 0; + d->done = 0; fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - d->l = io_new_listener(fd, init_conn, d); + d->l = io_new_listener(NULL, fd, init_conn, d); ok1(d->l); fflush(stdout); if (!fork()) { @@ -121,6 +121,7 @@ int main(void) freeaddrinfo(addrinfo); ok1(io_loop() == NULL); ok1(d->state == 4); + ok1(d->done == 2); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); free(d); @@ -131,9 +132,3 @@ int main(void) /* This exits depending on whether all tests passed */ return exit_status(); } -#else -int main(void) -{ - return 0; -} -#endif diff --git a/ccan/io/test/run-14-duplex-both-read.c b/ccan/io/test/run-14-duplex-both-read.c index b4a14c3d..b52553aa 100644 --- a/ccan/io/test/run-14-duplex-both-read.c +++ b/ccan/io/test/run-14-duplex-both-read.c @@ -8,7 +8,6 @@ #include #include -#if 0 #ifndef PORT #define PORT "65014" #endif @@ -16,7 +15,6 @@ struct data { struct io_listener *l; int state; - struct io_conn *c1, *c2; char buf[4]; char wbuf[32]; }; @@ -26,23 +24,24 @@ static void finish_ok(struct io_conn *conn, struct data *d) d->state++; } -static struct io_plan end(struct io_conn *conn, struct data *d) +static struct io_plan *end(struct io_conn *conn, struct data *d) { d->state++; - return io_close(); + if (d->state == 4) + return io_close(conn); + else + return io_wait(conn, NULL, io_never, NULL); } -static struct io_plan make_duplex(struct io_conn *conn, struct data *d) +static struct io_plan *make_duplex(struct io_conn *conn, struct data *d) { + d->state++; /* Have duplex read the rest of the buffer. */ - d->c2 = io_duplex(conn, io_read(d->buf+1, sizeof(d->buf)-1, end, d)); - ok1(d->c2); - io_set_finish(d->c2, finish_ok, d); - - return io_write(d->wbuf, sizeof(d->wbuf), end, d); + return io_duplex(io_read(conn, d->buf+1, sizeof(d->buf)-1, end, d), + io_write(conn, d->wbuf, sizeof(d->wbuf), end, d)); } -static void init_conn(int fd, struct data *d) +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) { ok1(d->state == 0); d->state++; @@ -50,9 +49,8 @@ static void init_conn(int fd, struct data *d) io_close_listener(d->l); memset(d->wbuf, 7, sizeof(d->wbuf)); - - d->c1 = io_new_conn(fd, io_read(d->buf, 1, make_duplex, d)); - io_set_finish(d->c1, finish_ok, d); + io_set_finish(conn, finish_ok, d); + return io_read(conn, d->buf, 1, make_duplex, d); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -94,11 +92,11 @@ int main(void) int fd, status; /* This is how many tests you plan to run */ - plan_tests(10); + plan_tests(9); d->state = 0; fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - d->l = io_new_listener(fd, init_conn, d); + d->l = io_new_listener(NULL, fd, init_conn, d); ok1(d->l); fflush(stdout); if (!fork()) { @@ -139,9 +137,3 @@ int main(void) /* This exits depending on whether all tests passed */ return exit_status(); } -#else -int main(void) -{ - return 0; -} -#endif diff --git a/ccan/io/test/run-16-duplex-test.c b/ccan/io/test/run-16-duplex-test.c index e31c5c5b..9d939c58 100644 --- a/ccan/io/test/run-16-duplex-test.c +++ b/ccan/io/test/run-16-duplex-test.c @@ -8,14 +8,12 @@ #include #include -#if 0 #ifndef PORT #define PORT "65016" #endif struct data { struct io_listener *l; - struct io_conn *writer; int state; char buf[4]; char wbuf[32]; @@ -26,35 +24,27 @@ static void finish_ok(struct io_conn *conn, struct data *d) d->state++; } -static struct io_plan write_done(struct io_conn *conn, struct data *d) +static struct io_plan *io_done(struct io_conn *conn, struct data *d) { d->state++; - return io_wait(d, io_close_cb, NULL); + if (d->state == 3) + return io_close(conn); + return io_wait(conn, d, io_close_cb, NULL); } -static struct io_plan read_done(struct io_conn *conn, struct data *d) +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) { - d->state++; - io_close_other(d->writer); - return io_close(); -} - -static void init_conn(int fd, struct data *d) -{ - struct io_conn *conn; - ok1(d->state == 0); d->state++; memset(d->wbuf, 7, sizeof(d->wbuf)); - conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), read_done, d)); io_set_finish(conn, finish_ok, d); - d->writer = io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d)); - ok1(d->writer); - io_set_finish(d->writer, finish_ok, d); io_close_listener(d->l); + + return io_duplex(io_read(conn, d->buf, sizeof(d->buf), io_done, d), + io_write(conn, d->wbuf, sizeof(d->wbuf), io_done, d)); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -96,11 +86,11 @@ int main(void) int fd, status; /* This is how many tests you plan to run */ - plan_tests(10); + plan_tests(9); d->state = 0; fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - d->l = io_new_listener(fd, init_conn, d); + d->l = io_new_listener(NULL, fd, init_conn, d); ok1(d->l); fflush(stdout); if (!fork()) { @@ -130,7 +120,7 @@ int main(void) } freeaddrinfo(addrinfo); ok1(io_loop() == NULL); - ok1(d->state == 5); + ok1(d->state == 4); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); free(d); @@ -141,9 +131,3 @@ int main(void) /* This exits depending on whether all tests passed */ return exit_status(); } -#else -int main(void) -{ - return 0; -} -#endif