From a2dffefa5ef8d0cf71d99755c4640a8004679b1d Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 14 Oct 2013 21:28:20 +1030 Subject: [PATCH] ccan/io: io_duplex. Cleaner model for I/O, with cost of complexity if you really want bidir. Signed-off-by: Rusty Russell --- ccan/io/SCENARIOS | 88 ++++++++++++++++++++++++++ ccan/io/backend.h | 14 +++-- ccan/io/io.c | 47 +++++++++----- ccan/io/io.h | 27 ++++++++ ccan/io/poll.c | 109 ++++++++++++++++++++++---------- ccan/io/test/run-12-bidir.c | 122 ++++++++++++++++++++++++++++++++++++ 6 files changed, 352 insertions(+), 55 deletions(-) create mode 100644 ccan/io/SCENARIOS create mode 100644 ccan/io/test/run-12-bidir.c diff --git a/ccan/io/SCENARIOS b/ccan/io/SCENARIOS new file mode 100644 index 00000000..01bf47ac --- /dev/null +++ b/ccan/io/SCENARIOS @@ -0,0 +1,88 @@ +Simple: + step1(conn): read(conn), then step2 + step2(conn): write(conn), then close + +Pass-through: + step1(conn): read(conn), then step2 + step2(conn): write(otherconn), then step1 + +Pass-through-and-connect: + step1(conn): read(conn), then step2 + step2(conn): connect(otherconn), then step3 + step3(conn): write(otherconn), then step1 + +Chatroom: + step1(conn): read(conn), then step2 + step2(conn): for c in allcons: write(c). goto step1 + +Simple: + +void event(struct io_event *done) +{ + char *buf = done->priv; + struct io_event *e; + + e = queue_read(done, done->conn, buf, 100); + e = queue_write(e, done->conn, buf, 100); + queue_close(e, done->conn); +} + +Pass-through: +struct passthru { + char buf[100]; + struct conn *rconn, *wconn; +}; + +void event(struct io_event *done) +{ + struct passthru *p = done->priv; + struct io_event *e; + + e = queue_read(done, p->rconn, p->buf, 100); + e = queue_write(e, p->wconn, buf, 100); + queue_event(e, event); +} + +Chatroom: +struct list_head clients; + +struct buffer { + char buf[100]; + unsigned int ref; +}; + +struct client { + struct list_node list; + struct connection *conn; + struct buffer *rbuf, *wbuf; +}; + +void broadcast(struct io_event *done) +{ + struct client *i, *c = done->conn->priv; + struct io_event *e; + + list_for_each(&clients, i, list) { + e = queue_write(done, i->conn, c->buf->buf, 100); + e->priv = c->buf; + c->buf->ref++; + queue_event(e, drop_ref); + } + + + +void event(struct io_event *done) +{ + struct client *c = done->conn->priv; + struct io_event *e; + + assert(c->conn == done->conn); + c->buf = malloc(sizeof(*c->buf)); + c->buf->ref = 0; + e = queue_read(done, c->conn, c->buf->buf, 100); + e = queue_event(e, broadcast); +} + + + step1(conn): read(conn), then step2 + step2(conn): for c in allcons: write(c). goto step1 diff --git a/ccan/io/backend.h b/ccan/io/backend.h index 06427eb6..2b41fa72 100644 --- a/ccan/io/backend.h +++ b/ccan/io/backend.h @@ -22,11 +22,15 @@ struct io_listener { }; enum io_state { - NEXT, /* eg starting, woken from idle, return from io_break. */ + /* These wait for something to input */ READ, - WRITE, READPART, + + /* These wait for room to output */ + WRITE, WRITEPART, + + NEXT, /* eg starting, woken from idle, return from io_break. */ IDLE, FINISHED, PROCESSING /* We expect them to change this now. */ @@ -61,6 +65,8 @@ struct io_state_writepart { struct io_conn { struct fd fd; + struct io_conn *duplex; + enum io_state state; union { struct io_state_read read; @@ -74,9 +80,9 @@ extern void *io_loop_return; bool add_listener(struct io_listener *l); bool add_conn(struct io_conn *c); +bool add_duplex(struct io_conn *c); void del_listener(struct io_listener *l); void backend_set_state(struct io_conn *conn, struct io_op *op); -struct io_op *do_writeable(struct io_conn *conn); -struct io_op *do_readable(struct io_conn *conn); +struct io_op *do_ready(struct io_conn *conn); #endif /* CCAN_IO_BACKEND_H */ diff --git a/ccan/io/io.c b/ccan/io/io.c index 325db787..6efc68ee 100644 --- a/ccan/io/io.c +++ b/ccan/io/io.c @@ -57,6 +57,7 @@ struct io_conn *io_new_conn_(int fd, conn->fd.finish = finish; conn->fd.finish_arg = conn->fd.next_arg = arg; conn->state = NEXT; + conn->duplex = NULL; if (!add_conn(conn)) { free(conn); return NULL; @@ -64,6 +65,34 @@ struct io_conn *io_new_conn_(int fd, return conn; } +struct io_conn *io_duplex_(struct io_conn *old, + struct io_op *(*start)(struct io_conn *, void *), + void (*finish)(struct io_conn *, void *), + void *arg) +{ + struct io_conn *conn; + + assert(!old->duplex); + + conn = malloc(sizeof(*conn)); + if (!conn) + return NULL; + + conn->fd.listener = false; + conn->fd.fd = old->fd.fd; + conn->fd.next = start; + conn->fd.finish = finish; + conn->fd.finish_arg = conn->fd.next_arg = arg; + conn->state = NEXT; + conn->duplex = old; + if (!add_duplex(conn)) { + free(conn); + return NULL; + } + old->duplex = conn; + return conn; +} + /* Convenient token which only we can produce. */ static inline struct io_next *to_ionext(struct io_conn *conn) { @@ -149,7 +178,7 @@ static struct io_op *do_next(struct io_conn *conn) return conn->fd.next(conn, conn->fd.next_arg); } -struct io_op *do_writeable(struct io_conn *conn) +struct io_op *do_ready(struct io_conn *conn) { ssize_t ret; bool finished; @@ -171,22 +200,6 @@ struct io_op *do_writeable(struct io_conn *conn) *conn->u.writepart.lenp = ret; finished = true; break; - default: - /* Shouldn't happen. */ - abort(); - } - - if (finished) - return do_next(conn); - return to_ioop(conn->state); -} - -struct io_op *do_readable(struct io_conn *conn) -{ - ssize_t ret; - bool finished; - - switch (conn->state) { case READ: ret = read(conn->fd.fd, conn->u.read.buf, conn->u.read.len); if (ret <= 0) diff --git a/ccan/io/io.h b/ccan/io/io.h index 49b6a25e..5ca9731d 100644 --- a/ccan/io/io.h +++ b/ccan/io/io.h @@ -151,6 +151,33 @@ struct io_op *io_write_partial(const void *data, size_t *len, */ struct io_op *io_idle(struct io_conn *conn); +/** + * io_duplex - split an fd into two connections. + * @conn: a connection. + * @start: the first function to call. + * @finish: the function to call when it's closed or fails. + * @arg: the argument to both @start and @finish. + * + * Sometimes you want to be able to simultaneously read and write on a + * single fd, but io forces a linear call sequence. The solition is + * to have two connections for the same fd, and use one for read + * operations and one for write. + * + * You must io_close() both of them to close the fd. + */ +#define io_duplex(conn, start, finish, arg) \ + io_duplex_((conn), \ + typesafe_cb_preargs(struct io_op *, void *, \ + (start), (arg), struct io_conn *), \ + typesafe_cb_preargs(void, void *, (finish), (arg), \ + struct io_conn *), \ + (arg)) + +struct io_conn *io_duplex_(struct io_conn *conn, + struct io_op *(*start)(struct io_conn *, void *), + void (*finish)(struct io_conn *, void *), + void *arg); + /** * io_wake - wake up and idle connection. * @conn: an idle connection. diff --git a/ccan/io/poll.c b/ccan/io/poll.c index 070f6d84..fdff271c 100644 --- a/ccan/io/poll.c +++ b/ccan/io/poll.c @@ -76,11 +76,23 @@ bool add_conn(struct io_conn *c) return true; } +bool add_duplex(struct io_conn *c) +{ + c->fd.backend_info = c->duplex->fd.backend_info; + num_next++; + return true; +} + static void del_conn(struct io_conn *conn) { if (conn->fd.finish) conn->fd.finish(conn, conn->fd.finish_arg); - del_fd(&conn->fd); + if (conn->duplex) { + /* In case fds[] pointed to the other one. */ + fds[conn->fd.backend_info] = &conn->duplex->fd; + conn->duplex->duplex = NULL; + } else + del_fd(&conn->fd); if (conn->state == FINISHED) num_finished--; else if (conn->state == NEXT) @@ -92,32 +104,38 @@ void del_listener(struct io_listener *l) del_fd(&l->fd); } -void backend_set_state(struct io_conn *conn, struct io_op *op) +static int pollmask(enum io_state state) { - enum io_state state = from_ioop(op); - struct pollfd *pfd = &pollfds[conn->fd.backend_info]; - switch (state) { case READ: case READPART: - pfd->events = POLLIN; - break; + return POLLIN; case WRITE: case WRITEPART: - pfd->events = POLLOUT; - break; - case IDLE: - pfd->events = 0; - break; - case NEXT: - num_next++; - break; - case FINISHED: - num_finished++; - break; + return POLLOUT; default: - abort(); + return 0; } +} + +void backend_set_state(struct io_conn *conn, struct io_op *op) +{ + enum io_state state = from_ioop(op); + struct pollfd *pfd = &pollfds[conn->fd.backend_info]; + + pfd->events = pollmask(state); + if (conn->duplex) { + int mask = pollmask(conn->duplex->state); + /* You can't *both* read/write. */ + assert(!mask || pfd->events != mask); + pfd->events |= mask; + } + + if (state == NEXT) + num_next++; + else if (state == FINISHED) + num_finished++; + conn->state = state; } @@ -142,7 +160,7 @@ static void finish_and_next(bool finished_only) unsigned int i; for (i = 0; !io_loop_return && i < num_fds; i++) { - struct io_conn *c; + struct io_conn *c, *duplex; if (!num_finished) { if (finished_only || num_next == 0) @@ -151,17 +169,26 @@ static void finish_and_next(bool finished_only) if (fds[i]->listener) continue; c = (void *)fds[i]; - if (c->state == FINISHED) { - del_conn(c); - free(c); - i--; - } else if (!finished_only && c->state == NEXT) { - backend_set_state(c, c->fd.next(c, c->fd.next_arg)); - num_next--; + for (duplex = c->duplex; c; c = duplex, duplex = NULL) { + if (c->state == FINISHED) { + del_conn(c); + free(c); + i--; + } else if (!finished_only && c->state == NEXT) { + backend_set_state(c, + c->fd.next(c, + c->fd.next_arg)); + num_next--; + } } } } +static void ready(struct io_conn *c) +{ + backend_set_state(c, do_ready(c)); +} + /* This is the main loop. */ void *io_loop(void) { @@ -185,16 +212,30 @@ void *io_loop(void) for (i = 0; i < num_fds && !io_loop_return; i++) { struct io_conn *c = (void *)fds[i]; - if (pollfds[i].revents & POLLOUT) - backend_set_state(c, do_writeable(c)); - else if (pollfds[i].revents & POLLIN) { - if (fds[i]->listener) + int events = pollfds[i].revents; + + if (fds[i]->listener) { + if (events & POLLIN) accept_conn((void *)c); - else - backend_set_state(c, do_readable(c)); - } else if (pollfds[i].revents & POLLHUP) { + } else if (events & (POLLIN|POLLOUT)) { + if (c->duplex) { + int mask = pollmask(c->duplex->state); + if (events & mask) { + ready(c->duplex); + events &= ~mask; + if (!(events&(POLLIN|POLLOUT))) + continue; + } + } + ready(c); + } else if (events & POLLHUP) { backend_set_state(c, io_close(c, NULL)); + if (c->duplex) + backend_set_state(c->duplex, + io_close(c->duplex, + NULL)); } + } } diff --git a/ccan/io/test/run-12-bidir.c b/ccan/io/test/run-12-bidir.c new file mode 100644 index 00000000..f9cf4e59 --- /dev/null +++ b/ccan/io/test/run-12-bidir.c @@ -0,0 +1,122 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +struct data { + struct io_listener *l; + int state; + char buf[4]; + char wbuf[32]; +}; + +static void finish_ok(struct io_conn *conn, struct data *d) +{ + d->state++; +} + +static struct io_op *write_out(struct io_conn *conn, struct data *d) +{ + d->state++; + return io_write(d->wbuf, sizeof(d->wbuf), io_next(conn, io_close, d)); +} + +static struct io_op *start_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 0); + d->state++; + + io_close_listener(d->l); + + memset(d->wbuf, 7, sizeof(d->wbuf)); + ok1(io_duplex(conn, write_out, finish_ok, d)); + return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d)); +} + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +int main(void) +{ + struct data *d = malloc(sizeof(*d)); + struct addrinfo *addrinfo; + int fd, status; + + /* This is how many tests you plan to run */ + plan_tests(10); + d->state = 0; + fd = make_listen_fd("65012", &addrinfo); + ok1(fd >= 0); + d->l = io_new_listener(fd, start_ok, finish_ok, d); + ok1(d->l); + fflush(stdout); + if (!fork()) { + int i; + char buf[32]; + + io_close_listener(d->l); + free(d); + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + signal(SIGPIPE, SIG_IGN); + for (i = 0; i < 32; i++) { + if (read(fd, buf+i, 1) != 1) + break; + } + for (i = 0; i < strlen("hellothere"); i++) { + if (write(fd, "hellothere" + i, 1) != 1) + break; + } + close(fd); + freeaddrinfo(addrinfo); + exit(0); + } + freeaddrinfo(addrinfo); + ok1(io_loop() == NULL); + ok1(d->state == 4); + ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); + free(d); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} -- 2.39.2