From: Rusty Russell Date: Mon, 14 Oct 2013 10:04:07 +0000 (+1030) Subject: ccan/io: new module. X-Git-Url: http://git.ozlabs.org/?p=ccan;a=commitdiff_plain;h=0a2fd289c7bf57d9fc35ad6af36df4bcc694f361;ds=sidebyside ccan/io: new module. Designed for async I/O. Signed-off-by: Rusty Russell --- diff --git a/Makefile-ccan b/Makefile-ccan index 86d6ac0c..77816577 100644 --- a/Makefile-ccan +++ b/Makefile-ccan @@ -55,6 +55,7 @@ MODS_WITH_SRC := antithread \ htable \ idtree \ ilog \ + io \ isaac \ iscsi \ jmap \ diff --git a/ccan/io/LICENSE b/ccan/io/LICENSE new file mode 120000 index 00000000..2354d129 --- /dev/null +++ b/ccan/io/LICENSE @@ -0,0 +1 @@ +../../licenses/BSD-MIT \ No newline at end of file diff --git a/ccan/io/_info b/ccan/io/_info new file mode 100644 index 00000000..f494c124 --- /dev/null +++ b/ccan/io/_info @@ -0,0 +1,174 @@ +#include +#include +#include "config.h" + +/** + * io - simple library for stateful io handling. + * + * io provides a simple mechanism to write I/O servers with multiple + * connections. Handling of connections is multiplexed, and function + * indicate what they want written or read, and what follow-on + * function to call on success (or failure). + * + * Example: + * // Given tr A-Z a-z outputs tr a-z a-z + * #include + * #include + * #include + * #include + * #include + * #include + * #include + * + * struct buffer { + * size_t max, off, rlen; + * char *buf; + * }; + * + * struct stdin_buffer { + * struct io_conn *reader, *writer; + * size_t len; + * char inbuf[4096]; + * }; + * + * // This reads from stdin. + * static struct io_op *wake_writer(struct io_conn *, struct stdin_buffer *); + * // This writes the stdin buffer to the child. + * static struct io_op *write_to_child(struct io_conn *c, + * struct stdin_buffer *b); + * static struct io_op *read_stdin(struct io_conn *c, struct stdin_buffer *b) + * { + * assert(c == b->reader); + * b->len = sizeof(b->inbuf); + * return io_read_partial(b->inbuf, &b->len, + * io_next(c, wake_writer, b)); + * } + * + * static struct io_op *wake_writer(struct io_conn *c, struct stdin_buffer *b) + * { + * assert(c == b->reader); + * io_wake(b->writer, write_to_child, b); + * return io_idle(c); + * } + * + * static void reader_exit(struct io_conn *c, struct stdin_buffer *b) + * { + * assert(c == b->reader); + * io_wake(b->writer, write_to_child, b); + * b->reader = NULL; + * } + * + * static struct io_op *wake_reader(struct io_conn *c, struct stdin_buffer *b) + * { + * assert(c == b->writer); + * io_wake(b->reader, read_stdin, b); + * return io_idle(c); + * } + * + * static struct io_op *write_to_child(struct io_conn *conn, + * struct stdin_buffer *b) + * { + * assert(conn == b->writer); + * if (!b->reader) + * return io_close(conn, NULL); + * return io_write(b->inbuf, b->len, io_next(conn, wake_reader, b)); + * } + * + * static struct io_op *start_writer(struct io_conn *conn, + * struct stdin_buffer *b) + * { + * assert(conn == b->writer); + * return io_idle(conn); + * } + * + * static void fail_child_write(struct io_conn *conn, struct stdin_buffer *b) + * { + * if (b->reader) + * err(1, "Failed writing to child."); + * } + * + * // This reads from the child and saves it into buffer. + * static struct io_op *read_from_child(struct io_conn *conn, + * struct buffer *b) + * { + * b->off += b->rlen; + * + * if (b->off == b->max) { + * if (b->max == 0) + * b->max = 128; + * else if (b->max >= 1024*1024) + * b->max += 1024*1024; + * else + * b->max *= 2; + * b->buf = realloc(b->buf, b->max); + * } + * + * b->rlen = b->max - b->off; + * return io_read_partial(b->buf + b->off, &b->rlen, + * io_next(conn, read_from_child, b)); + * } + * + * // Feed a program our stdin, gather its stdout, print that at end. + * int main(int argc, char *argv[]) + * { + * int tochild[2], fromchild[2]; + * struct buffer out = { 0, 0, 0, NULL }; + * struct stdin_buffer sbuf; + * int status; + * size_t off; + * ssize_t ret; + * + * if (argc == 1) + * errx(1, "Usage: runner ..."); + * + * if (pipe(tochild) != 0 || pipe(fromchild) != 0) + * err(1, "Creating pipes"); + * + * if (!fork()) { + * // Child runs command. + * close(tochild[1]); + * close(fromchild[0]); + * + * dup2(tochild[0], STDIN_FILENO); + * dup2(fromchild[1], STDOUT_FILENO); + * execvp(argv[1], argv + 1); + * exit(127); + * } + * + * close(tochild[0]); + * close(fromchild[1]); + * signal(SIGPIPE, SIG_IGN); + * + * sbuf.reader = io_new_conn(STDIN_FILENO, read_stdin, reader_exit, &sbuf); + * sbuf.writer = io_new_conn(tochild[1], start_writer, fail_child_write, + * &sbuf); + * if (!sbuf.reader || !sbuf.writer + * || !io_new_conn(fromchild[0], read_from_child, NULL, &out)) + * err(1, "Allocating connections"); + * + * io_loop(); + * wait(&status); + * + * for (off = 0; off < out.off; off += ret) { + * ret = write(STDOUT_FILENO, out.buf+off, out.off-off); + * if (ret < 0) + * err(1, "Writing stdout"); + * } + * free(out.buf); + * + * return WIFEXITED(status) ? WEXITSTATUS(status) : 2; + * } + * + * License: BSD-MIT + */ +int main(int argc, char *argv[]) +{ + if (argc != 2) + return 1; + + if (strcmp(argv[1], "depends") == 0) { + return 0; + } + + return 1; +} diff --git a/ccan/io/backend.h b/ccan/io/backend.h new file mode 100644 index 00000000..06427eb6 --- /dev/null +++ b/ccan/io/backend.h @@ -0,0 +1,82 @@ +/* Licensed under BSD-MIT - see LICENSE file for details */ +#ifndef CCAN_IO_BACKEND_H +#define CCAN_IO_BACKEND_H +#include + +struct fd { + int fd; + bool listener; + size_t backend_info; + + struct io_op *(*next)(struct io_conn *, void *arg); + void *next_arg; + + void (*finish)(struct io_conn *, void *arg); + void *finish_arg; +}; + + +/* Listeners create connections. */ +struct io_listener { + struct fd fd; +}; + +enum io_state { + NEXT, /* eg starting, woken from idle, return from io_break. */ + READ, + WRITE, + READPART, + WRITEPART, + IDLE, + FINISHED, + PROCESSING /* We expect them to change this now. */ +}; + +static inline enum io_state from_ioop(struct io_op *op) +{ + return (enum io_state)(long)op; +} + +struct io_state_read { + char *buf; + size_t len; +}; + +struct io_state_write { + const char *buf; + size_t len; +}; + +struct io_state_readpart { + char *buf; + size_t *lenp; +}; + +struct io_state_writepart { + const char *buf; + size_t *lenp; +}; + +/* One connection per client. */ +struct io_conn { + struct fd fd; + + enum io_state state; + union { + struct io_state_read read; + struct io_state_write write; + struct io_state_readpart readpart; + struct io_state_writepart writepart; + } u; +}; + +extern void *io_loop_return; + +bool add_listener(struct io_listener *l); +bool add_conn(struct io_conn *c); +void del_listener(struct io_listener *l); +void backend_set_state(struct io_conn *conn, struct io_op *op); + +struct io_op *do_writeable(struct io_conn *conn); +struct io_op *do_readable(struct io_conn *conn); +#endif /* CCAN_IO_BACKEND_H */ diff --git a/ccan/io/io.c b/ccan/io/io.c new file mode 100644 index 00000000..325db787 --- /dev/null +++ b/ccan/io/io.c @@ -0,0 +1,229 @@ +/* Licensed under BSD-MIT - see LICENSE file for details */ +#include "io.h" +#include "backend.h" +#include +#include +#include +#include +#include +#include +#include + +void *io_loop_return; + +struct io_listener *io_new_listener_(int fd, + struct io_op *(*start)(struct io_conn *, + void *arg), + void (*finish)(struct io_conn *, void *), + void *arg) +{ + struct io_listener *l = malloc(sizeof(*l)); + + if (!l) + return NULL; + + l->fd.listener = true; + l->fd.fd = fd; + l->fd.next = start; + l->fd.finish = finish; + l->fd.finish_arg = l->fd.next_arg = arg; + if (!add_listener(l)) { + free(l); + return NULL; + } + return l; +} + +void io_close_listener(struct io_listener *l) +{ + close(l->fd.fd); + del_listener(l); + free(l); +} + +struct io_conn *io_new_conn_(int fd, + struct io_op *(*start)(struct io_conn *, void *), + void (*finish)(struct io_conn *, void *), + void *arg) +{ + struct io_conn *conn = malloc(sizeof(*conn)); + + if (!conn) + return NULL; + + conn->fd.listener = false; + conn->fd.fd = fd; + conn->fd.next = start; + conn->fd.finish = finish; + conn->fd.finish_arg = conn->fd.next_arg = arg; + conn->state = NEXT; + if (!add_conn(conn)) { + free(conn); + return NULL; + } + return conn; +} + +/* Convenient token which only we can produce. */ +static inline struct io_next *to_ionext(struct io_conn *conn) +{ + return (struct io_next *)conn; +} + +static inline struct io_op *to_ioop(enum io_state state) +{ + return (struct io_op *)(long)state; +} + +static inline struct io_conn *from_ionext(struct io_next *next) +{ + return (struct io_conn *)next; +} + +struct io_next *io_next_(struct io_conn *conn, + struct io_op *(*next)(struct io_conn *, void *), + void *arg) +{ + conn->fd.next = next; + conn->fd.next_arg = arg; + + return to_ionext(conn); +} + +/* Queue some data to be written. */ +struct io_op *io_write(const void *data, size_t len, struct io_next *next) +{ + struct io_conn *conn = from_ionext(next); + conn->u.write.buf = data; + conn->u.write.len = len; + return to_ioop(WRITE); +} + +/* Queue a request to read into a buffer. */ +struct io_op *io_read(void *data, size_t len, struct io_next *next) +{ + struct io_conn *conn = from_ionext(next); + conn->u.read.buf = data; + conn->u.read.len = len; + return to_ioop(READ); +} + +/* Queue a partial request to read into a buffer. */ +struct io_op *io_read_partial(void *data, size_t *len, struct io_next *next) +{ + struct io_conn *conn = from_ionext(next); + conn->u.readpart.buf = data; + conn->u.readpart.lenp = len; + return to_ioop(READPART); +} + +/* Queue a partial write request. */ +struct io_op *io_write_partial(const void *data, size_t *len, struct io_next *next) +{ + struct io_conn *conn = from_ionext(next); + conn->u.writepart.buf = data; + conn->u.writepart.lenp = len; + return to_ioop(WRITEPART); +} + +struct io_op *io_idle(struct io_conn *conn) +{ + return to_ioop(IDLE); +} + +void io_wake_(struct io_conn *conn, + struct io_op *(*next)(struct io_conn *, void *), void *arg) + +{ + /* It might have finished, but we haven't called its finish() yet. */ + if (conn->state == FINISHED) + return; + assert(conn->state == IDLE); + conn->fd.next = next; + conn->fd.next_arg = arg; + backend_set_state(conn, to_ioop(NEXT)); +} + +static struct io_op *do_next(struct io_conn *conn) +{ + return conn->fd.next(conn, conn->fd.next_arg); +} + +struct io_op *do_writeable(struct io_conn *conn) +{ + ssize_t ret; + bool finished; + + switch (conn->state) { + case WRITE: + ret = write(conn->fd.fd, conn->u.write.buf, conn->u.write.len); + if (ret < 0) + return io_close(conn, NULL); + conn->u.write.buf += ret; + conn->u.write.len -= ret; + finished = (conn->u.write.len == 0); + break; + case WRITEPART: + ret = write(conn->fd.fd, conn->u.writepart.buf, + *conn->u.writepart.lenp); + if (ret < 0) + return io_close(conn, NULL); + *conn->u.writepart.lenp = ret; + finished = true; + break; + default: + /* Shouldn't happen. */ + abort(); + } + + if (finished) + return do_next(conn); + return to_ioop(conn->state); +} + +struct io_op *do_readable(struct io_conn *conn) +{ + ssize_t ret; + bool finished; + + switch (conn->state) { + case READ: + ret = read(conn->fd.fd, conn->u.read.buf, conn->u.read.len); + if (ret <= 0) + return io_close(conn, NULL); + conn->u.read.buf += ret; + conn->u.read.len -= ret; + finished = (conn->u.read.len == 0); + break; + case READPART: + ret = read(conn->fd.fd, conn->u.readpart.buf, + *conn->u.readpart.lenp); + if (ret <= 0) + return io_close(conn, NULL); + *conn->u.readpart.lenp = ret; + finished = true; + break; + default: + /* Shouldn't happen. */ + abort(); + } + + if (finished) + return do_next(conn); + return to_ioop(conn->state); +} + +/* Useful next functions. */ +/* Close the connection, we're done. */ +struct io_op *io_close(struct io_conn *conn, void *arg) +{ + return to_ioop(FINISHED); +} + +/* Exit the loop, returning this (non-NULL) arg. */ +struct io_op *io_break(void *arg, struct io_next *next) +{ + io_loop_return = arg; + + return to_ioop(NEXT); +} diff --git a/ccan/io/io.h b/ccan/io/io.h new file mode 100644 index 00000000..49b6a25e --- /dev/null +++ b/ccan/io/io.h @@ -0,0 +1,229 @@ +/* Licensed under BSD-MIT - see LICENSE file for details */ +#ifndef CCAN_IO_H +#define CCAN_IO_H +#include +#include +#include + +/** + * struct io_op - pointer to return from io functions. + * + * This undefined structure is just to help the compiler check that you + * really do return the result of an io-queueing method. + */ +struct io_op; + +/** + * struct io_next - pointer to what we're going to do next. + * + * Bundles up callbacks, generated by io_next(). + */ +struct io_next; + +/** + * io_new_conn - create a new connection. + * @fd: the file descriptor. + * @start: the first function to call. + * @finish: the function to call when it's closed or fails. + * @arg: the argument to both @start and @finish. + * + * This creates a connection which owns @fd. @start will be called on the + * next return to io_loop(), and @finish will be called when an I/O operation + * fails, or you call io_close() on the connection. + * + * The @start function must call one of the io queueing functions + * (eg. io_read, io_write) and return the next function to call once + * that is done using io_next(). The alternative is to call io_close(). + * + * Returns NULL on error (and sets errno). + */ +#define io_new_conn(fd, start, finish, arg) \ + io_new_conn_((fd), \ + typesafe_cb_preargs(struct io_op *, void *, \ + (start), (arg), struct io_conn *), \ + typesafe_cb_preargs(void, void *, (finish), (arg), \ + struct io_conn *), \ + (arg)) +struct io_conn *io_new_conn_(int fd, + struct io_op *(*start)(struct io_conn *, void *), + void (*finish)(struct io_conn *, void *), + void *arg); + +/** + * io_new_listener - create a new accepting listener. + * @fd: the file descriptor. + * @start: the first function to call on new connections. + * @finish: the function to call when the connection is closed or fails. + * @arg: the argument to both @start and @finish. + * + * When @fd becomes readable, we accept() and turn that fd into a new + * connection. + * + * Returns NULL on error (and sets errno). + */ +#define io_new_listener(fd, start, finish, arg) \ + io_new_listener_((fd), \ + typesafe_cb_preargs(struct io_op *, void *, \ + (start), (arg), \ + struct io_conn *), \ + typesafe_cb_preargs(void, void *, (finish), \ + (arg), struct io_conn *), \ + (arg)) +struct io_listener *io_new_listener_(int fd, + struct io_op *(*start)(struct io_conn *, + void *arg), + void (*finish)(struct io_conn *, + void *arg), + void *arg); + +/** + * io_close_listener - delete a listener. + * @listener: the listener returned from io_new_listener. + * + * This closes the fd and frees @listener. + */ +void io_close_listener(struct io_listener *listener); + +/** + * io_write - queue data to be written. + * @data: the data buffer. + * @len: the length to write. + * @next: what to call next. + * + * This will queue the data buffer for writing. Once it's all written, the + * function registered with io_next() will be called: on an error, the finish + * function is called instead. + * + * Note that the I/O may actually be done immediately. + */ +struct io_op *io_write(const void *data, size_t len, struct io_next *next); + +/** + * io_read - queue buffer to be read. + * @data: the data buffer. + * @len: the length to read. + * @next: what to call next. + * + * This will queue the data buffer for reading. Once it's all read, the + * function registered with io_next() will be called: on an error, the finish + * function is called instead. + * + * Note that the I/O may actually be done immediately. + */ +struct io_op *io_read(void *data, size_t len, struct io_next *next); + +/** + * io_read_partial - queue buffer to be read (partial OK). + * @data: the data buffer. + * @len: the maximum length to read, set to the length actually read. + * @next: what to call next. + * + * This will queue the data buffer for reading. Once any data is + * read, @len is updated and the function registered with io_next() + * will be called: on an error, the finish function is called instead. + * + * Note that the I/O may actually be done immediately. + */ +struct io_op *io_read_partial(void *data, size_t *len, struct io_next *next); + +/** + * io_write_partial - queue data to be written (partial OK). + * @data: the data buffer. + * @len: the maximum length to write, set to the length actually written. + * @next: what to call next. + * + * This will queue the data buffer for writing. Once any data is + * written, @len is updated and the function registered with io_next() + * will be called: on an error, the finish function is called instead. + * + * Note that the I/O may actually be done immediately. + */ +struct io_op *io_write_partial(const void *data, size_t *len, + struct io_next *next); + +/** + * io_idle - explicitly note that this connection will do nothing. + * @conn: the current connection. + * + * This indicates the connection is idle: some other function will + * later call io_read/io_write etc. (or io_close) on it, in which case + * it will do that. + */ +struct io_op *io_idle(struct io_conn *conn); + +/** + * io_wake - wake up and idle connection. + * @conn: an idle connection. + * @next: the next function to call once queued IO is complete. + * @arg: the argument to @next. + * + * This makes @conn run its @next function the next time around the + * io_loop(). + */ +#define io_wake(conn, next, arg) \ + io_wake_((conn), \ + typesafe_cb_preargs(struct io_op *, void *, \ + (next), (arg), struct io_conn *), \ + (arg)) +void io_wake_(struct io_conn *conn, + struct io_op *(*next)(struct io_conn *, void *), void *arg); + +/** + * io_break - return from io_loop() + * @arg: non-NULL value to return from io_loop(). + * @next: what to call next (can be NULL if we expect no return). + * + * This breaks out of the io_loop. As soon as the current @next + * function returns, any io_closed()'d connections will have their + * finish callbacks called, then io_loop() with return with @arg. + * + * If io_loop() is called again, then @next will be called. + */ +struct io_op *io_break(void *arg, struct io_next *next); + +/** + * io_next - indicate what callback to call next. + * @conn: this connection. + * @next: the next function to call once queued IO is complete. + * @arg: the argument to @next. + * + * Every @next (or @start) function should "return io_next(...);" once + * they have indicated what io to perform (eg. io_write, io_idle). + * The exception is io_close(), which can be used instead of io_next(). + * + * Note that as an optimization, the next function may be called + * immediately, which is why this should be the last statement in your + * function. + */ +#define io_next(conn, next, arg) \ + io_next_((conn), \ + typesafe_cb_preargs(struct io_op *, void *, \ + (next), (arg), struct io_conn *), \ + (arg)) +struct io_next *io_next_(struct io_conn *conn, + struct io_op *(*next)(struct io_conn *, void *arg), + void *arg); + +/* FIXME: io_recvfrom/io_sendto */ + +/** + * io_close - terminate a connection. + * @conn: any connection. + * + * The schedules a connection to be closed. It can be done on any + * connection, whether it has I/O queued or not (though that I/O may + * be performed first). + * + * It's common to 'return io_close(...)' from a @next function, but + * io_close can also be used as an argument to io_next(). + */ +struct io_op *io_close(struct io_conn *, void *unused); + +/** + * io_loop - process fds until all closed on io_break. + * + * This is the core loop; it exits with the io_break() arg, or NULL if + * all connections and listeners are closed. + */ +void *io_loop(void); +#endif /* CCAN_IO_H */ diff --git a/ccan/io/poll.c b/ccan/io/poll.c new file mode 100644 index 00000000..070f6d84 --- /dev/null +++ b/ccan/io/poll.c @@ -0,0 +1,207 @@ +/* Licensed under BSD-MIT - see LICENSE file for details */ +#include "io.h" +#include "backend.h" +#include +#include +#include +#include +#include + +static size_t num_fds = 0, max_fds = 0, num_next = 0, num_finished = 0; +static struct pollfd *pollfds = NULL; +static struct fd **fds = NULL; + +static bool add_fd(struct fd *fd, short events) +{ + if (num_fds + 1 > max_fds) { + struct pollfd *newpollfds; + struct fd **newfds; + size_t num = max_fds ? max_fds * 2 : 8; + + newpollfds = realloc(pollfds, sizeof(*newpollfds) * num); + if (!newpollfds) + return false; + pollfds = newpollfds; + newfds = realloc(fds, sizeof(*newfds) * num); + if (!newfds) + return false; + fds = newfds; + max_fds = num; + } + + pollfds[num_fds].fd = fd->fd; + pollfds[num_fds].events = events; + pollfds[num_fds].revents = 0; /* In case we're iterating now */ + fds[num_fds] = fd; + fd->backend_info = num_fds; + num_fds++; + return true; +} + +static void del_fd(struct fd *fd) +{ + size_t n = fd->backend_info; + + assert(n != -1); + assert(n < num_fds); + if (n != num_fds - 1) { + /* Move last one over us. */ + pollfds[n] = pollfds[num_fds-1]; + fds[n] = fds[num_fds-1]; + assert(fds[n]->backend_info == num_fds-1); + fds[n]->backend_info = n; + } else if (num_fds == 1) { + /* Free everything when no more fds. */ + free(pollfds); + free(fds); + pollfds = NULL; + fds = NULL; + max_fds = 0; + } + num_fds--; + fd->backend_info = -1; + close(fd->fd); +} + +bool add_listener(struct io_listener *l) +{ + return add_fd(&l->fd, POLLIN); +} + +bool add_conn(struct io_conn *c) +{ + if (!add_fd(&c->fd, 0)) + return false; + num_next++; + return true; +} + +static void del_conn(struct io_conn *conn) +{ + if (conn->fd.finish) + conn->fd.finish(conn, conn->fd.finish_arg); + del_fd(&conn->fd); + if (conn->state == FINISHED) + num_finished--; + else if (conn->state == NEXT) + num_next--; +} + +void del_listener(struct io_listener *l) +{ + del_fd(&l->fd); +} + +void backend_set_state(struct io_conn *conn, struct io_op *op) +{ + enum io_state state = from_ioop(op); + struct pollfd *pfd = &pollfds[conn->fd.backend_info]; + + switch (state) { + case READ: + case READPART: + pfd->events = POLLIN; + break; + case WRITE: + case WRITEPART: + pfd->events = POLLOUT; + break; + case IDLE: + pfd->events = 0; + break; + case NEXT: + num_next++; + break; + case FINISHED: + num_finished++; + break; + default: + abort(); + } + conn->state = state; +} + +static void accept_conn(struct io_listener *l) +{ + struct io_conn *c; + int fd = accept(l->fd.fd, NULL, NULL); + + /* FIXME: What to do here? */ + if (fd < 0) + return; + c = io_new_conn(fd, l->fd.next, l->fd.finish, l->fd.next_arg); + if (!c) { + close(fd); + return; + } +} + +/* It's OK to miss some, as long as we make progress. */ +static void finish_and_next(bool finished_only) +{ + unsigned int i; + + for (i = 0; !io_loop_return && i < num_fds; i++) { + struct io_conn *c; + + if (!num_finished) { + if (finished_only || num_next == 0) + break; + } + if (fds[i]->listener) + continue; + c = (void *)fds[i]; + if (c->state == FINISHED) { + del_conn(c); + free(c); + i--; + } else if (!finished_only && c->state == NEXT) { + backend_set_state(c, c->fd.next(c, c->fd.next_arg)); + num_next--; + } + } +} + +/* This is the main loop. */ +void *io_loop(void) +{ + void *ret; + + while (!io_loop_return) { + int i, r; + + if (num_finished || num_next) { + finish_and_next(false); + /* Could have started/finished more. */ + continue; + } + + if (num_fds == 0) + break; + + r = poll(pollfds, num_fds, -1); + if (r < 0) + break; + + for (i = 0; i < num_fds && !io_loop_return; i++) { + struct io_conn *c = (void *)fds[i]; + if (pollfds[i].revents & POLLOUT) + backend_set_state(c, do_writeable(c)); + else if (pollfds[i].revents & POLLIN) { + if (fds[i]->listener) + accept_conn((void *)c); + else + backend_set_state(c, do_readable(c)); + } else if (pollfds[i].revents & POLLHUP) { + backend_set_state(c, io_close(c, NULL)); + } + } + } + + while (num_finished) + finish_and_next(true); + + ret = io_loop_return; + io_loop_return = NULL; + return ret; +} diff --git a/ccan/io/test/run-01-start-finish.c b/ccan/io/test/run-01-start-finish.c new file mode 100644 index 00000000..03a1e328 --- /dev/null +++ b/ccan/io/test/run-01-start-finish.c @@ -0,0 +1,91 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +static struct io_op *start_ok(struct io_conn *conn, int *state) +{ + ok1(*state == 0); + (*state)++; + return io_close(conn, NULL); +} + +static void finish_ok(struct io_conn *conn, int *state) +{ + ok1(*state == 1); + (*state)++; + io_break(state + 1, NULL); +} + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +int main(void) +{ + int state = 0; + struct addrinfo *addrinfo; + struct io_listener *l; + int fd; + + /* This is how many tests you plan to run */ + plan_tests(9); + fd = make_listen_fd("65001", &addrinfo); + ok1(fd >= 0); + l = io_new_listener(fd, start_ok, finish_ok, &state); + ok1(l); + fflush(stdout); + if (!fork()) { + io_close_listener(l); + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + close(fd); + freeaddrinfo(addrinfo); + exit(0); + } + freeaddrinfo(addrinfo); + ok1(io_loop() == &state + 1); + ok1(state == 2); + io_close_listener(l); + ok1(wait(&state)); + ok1(WIFEXITED(state)); + ok1(WEXITSTATUS(state) == 0); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} diff --git a/ccan/io/test/run-02-read.c b/ccan/io/test/run-02-read.c new file mode 100644 index 00000000..e59ccb38 --- /dev/null +++ b/ccan/io/test/run-02-read.c @@ -0,0 +1,108 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +struct data { + int state; + char buf[4]; +}; + +static struct io_op *start_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 0); + d->state++; + return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d)); +} + +static void finish_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 1); + d->state++; + io_break(d, NULL); +} + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +int main(void) +{ + struct data *d = malloc(sizeof(*d)); + struct addrinfo *addrinfo; + struct io_listener *l; + int fd, status; + + /* This is how many tests you plan to run */ + plan_tests(10); + d->state = 0; + fd = make_listen_fd("65002", &addrinfo); + ok1(fd >= 0); + l = io_new_listener(fd, start_ok, finish_ok, d); + ok1(l); + fflush(stdout); + if (!fork()) { + int i; + + io_close_listener(l); + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + signal(SIGPIPE, SIG_IGN); + for (i = 0; i < strlen("hellothere"); i++) { + if (write(fd, "hellothere" + i, 1) != 1) + break; + } + close(fd); + freeaddrinfo(addrinfo); + free(d); + exit(0); + } + freeaddrinfo(addrinfo); + ok1(io_loop() == d); + ok1(d->state == 2); + ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); + free(d); + io_close_listener(l); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} diff --git a/ccan/io/test/run-03-readpartial.c b/ccan/io/test/run-03-readpartial.c new file mode 100644 index 00000000..cc1857c5 --- /dev/null +++ b/ccan/io/test/run-03-readpartial.c @@ -0,0 +1,137 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +struct data { + int state; + size_t bytes; + char buf[4]; +}; + +static struct io_op *start_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 0); + d->state++; + d->bytes = sizeof(d->buf); + return io_read_partial(d->buf, &d->bytes, io_next(conn, io_close, d)); +} + +static void finish_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 1); + d->state++; + io_break(d, NULL); +} + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +static void write_to_socket(const char *str, const struct addrinfo *addrinfo) +{ + int fd, i; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + signal(SIGPIPE, SIG_IGN); + for (i = 0; i < strlen(str); i++) { + if (write(fd, str + i, 1) != 1) + break; + } + close(fd); +} + +int main(void) +{ + struct data *d = malloc(sizeof(*d)); + struct addrinfo *addrinfo; + struct io_listener *l; + int fd, status; + + /* This is how many tests you plan to run */ + plan_tests(22); + d->state = 0; + fd = make_listen_fd("65003", &addrinfo); + ok1(fd >= 0); + l = io_new_listener(fd, start_ok, finish_ok, d); + ok1(l); + fflush(stdout); + if (!fork()) { + io_close_listener(l); + write_to_socket("hellothere", addrinfo); + freeaddrinfo(addrinfo); + free(d); + exit(0); + } + ok1(io_loop() == d); + ok1(d->state == 2); + ok1(d->bytes > 0); + ok1(d->bytes <= sizeof(d->buf)); + ok1(memcmp(d->buf, "hellothere", d->bytes) == 0); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + fflush(stdout); + if (!fork()) { + io_close_listener(l); + write_to_socket("hi", addrinfo); + freeaddrinfo(addrinfo); + free(d); + exit(0); + } + d->state = 0; + ok1(io_loop() == d); + ok1(d->state == 2); + ok1(d->bytes > 0); + ok1(d->bytes <= strlen("hi")); + ok1(memcmp(d->buf, "hi", d->bytes) == 0); + + freeaddrinfo(addrinfo); + free(d); + io_close_listener(l); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} diff --git a/ccan/io/test/run-04-writepartial.c b/ccan/io/test/run-04-writepartial.c new file mode 100644 index 00000000..daa81e7c --- /dev/null +++ b/ccan/io/test/run-04-writepartial.c @@ -0,0 +1,121 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +struct data { + int state; + size_t bytes; + char *buf; +}; + +static struct io_op *start_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 0); + d->state++; + return io_write_partial(d->buf, &d->bytes, io_next(conn, io_close, d)); +} + +static void finish_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 1); + d->state++; + io_break(d, NULL); +} + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +static void read_from_socket(const char *str, const struct addrinfo *addrinfo) +{ + int fd; + char buf[100]; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + if (read(fd, buf, strlen(str)) != strlen(str)) + exit(3); + if (memcmp(buf, str, strlen(str)) != 0) + exit(4); + close(fd); +} + +int main(void) +{ + struct data *d = malloc(sizeof(*d)); + struct addrinfo *addrinfo; + struct io_listener *l; + int fd, status; + + /* This is how many tests you plan to run */ + plan_tests(11); + d->state = 0; + d->bytes = 1024*1024; + d->buf = malloc(d->bytes); + memset(d->buf, 'a', d->bytes); + fd = make_listen_fd("65004", &addrinfo); + ok1(fd >= 0); + l = io_new_listener(fd, start_ok, finish_ok, d); + ok1(l); + fflush(stdout); + if (!fork()) { + io_close_listener(l); + read_from_socket("aaaaaa", addrinfo); + freeaddrinfo(addrinfo); + free(d->buf); + free(d); + exit(0); + } + ok1(io_loop() == d); + ok1(d->state == 2); + ok1(d->bytes > 0); + ok1(d->bytes <= 1024*1024); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + freeaddrinfo(addrinfo); + free(d->buf); + free(d); + io_close_listener(l); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} diff --git a/ccan/io/test/run-05-write.c b/ccan/io/test/run-05-write.c new file mode 100644 index 00000000..83c8c4ca --- /dev/null +++ b/ccan/io/test/run-05-write.c @@ -0,0 +1,122 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +struct data { + int state; + size_t bytes; + char *buf; +}; + +static struct io_op *start_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 0); + d->state++; + return io_write(d->buf, d->bytes, io_next(conn, io_close, d)); +} + +static void finish_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 1); + d->state++; + io_break(d, NULL); +} + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo) +{ + int fd, done, r; + char buf[100]; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + + for (done = 0; done < bytes; done += r) { + r = read(fd, buf, sizeof(buf)); + if (r < 0) + exit(3); + done += r; + } + close(fd); +} + +int main(void) +{ + struct data *d = malloc(sizeof(*d)); + struct addrinfo *addrinfo; + struct io_listener *l; + int fd, status; + + /* This is how many tests you plan to run */ + plan_tests(9); + d->state = 0; + d->bytes = 1024*1024; + d->buf = malloc(d->bytes); + memset(d->buf, 'a', d->bytes); + fd = make_listen_fd("65005", &addrinfo); + ok1(fd >= 0); + l = io_new_listener(fd, start_ok, finish_ok, d); + ok1(l); + fflush(stdout); + if (!fork()) { + io_close_listener(l); + read_from_socket(d->bytes, addrinfo); + freeaddrinfo(addrinfo); + free(d->buf); + free(d); + exit(0); + } + ok1(io_loop() == d); + ok1(d->state == 2); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + freeaddrinfo(addrinfo); + free(d->buf); + free(d); + io_close_listener(l); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} diff --git a/ccan/io/test/run-06-idle.c b/ccan/io/test/run-06-idle.c new file mode 100644 index 00000000..e6144dee --- /dev/null +++ b/ccan/io/test/run-06-idle.c @@ -0,0 +1,145 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include +#include +#include +#include + +static struct io_conn *idler; + +struct data { + int state; + char buf[4]; +}; + +static struct io_op *do_read(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 2 || d->state == 3); + d->state++; + return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d)); +} + +static struct io_op *start_waker(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 1); + d->state++; + + io_wake(idler, do_read, d); + return io_close(conn, NULL); +} + +static void finish_waker(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 2 || d->state == 3); + d->state++; +} + +static struct io_op *start_idle(struct io_conn *conn, struct data *d) +{ + int fd; + + ok1(d->state == 0); + d->state++; + idler = conn; + + /* This will wake us up. */ + fd = open("/dev/null", O_RDONLY); + ok1(fd >= 0); + ok1(io_new_conn(fd, start_waker, finish_waker, d)); + + return io_idle(conn); +} + +static void finish_idle(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 4); + d->state++; + io_break(d, NULL); +} + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +int main(void) +{ + struct data *d = malloc(sizeof(*d)); + struct addrinfo *addrinfo; + struct io_listener *l; + int fd, status; + + /* This is how many tests you plan to run */ + plan_tests(15); + d->state = 0; + fd = make_listen_fd("65006", &addrinfo); + ok1(fd >= 0); + l = io_new_listener(fd, start_idle, finish_idle, d); + ok1(l); + fflush(stdout); + if (!fork()) { + int i; + + io_close_listener(l); + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + signal(SIGPIPE, SIG_IGN); + for (i = 0; i < strlen("hellothere"); i++) { + if (write(fd, "hellothere" + i, 1) != 1) + break; + } + close(fd); + freeaddrinfo(addrinfo); + free(d); + exit(0); + } + freeaddrinfo(addrinfo); + + ok1(io_loop() == d); + ok1(d->state == 5); + ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); + free(d); + io_close_listener(l); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} diff --git a/ccan/io/test/run-07-break.c b/ccan/io/test/run-07-break.c new file mode 100644 index 00000000..cf3d251a --- /dev/null +++ b/ccan/io/test/run-07-break.c @@ -0,0 +1,117 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +struct data { + int state; + char buf[4]; +}; + +static struct io_op *do_read(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 1); + d->state++; + return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d)); +} + +static struct io_op *start_break(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 0); + d->state++; + return io_break(d, io_next(conn, do_read, d)); +} + +static void finish_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 2); + d->state++; +} + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +int main(void) +{ + struct data *d = malloc(sizeof(*d)); + struct addrinfo *addrinfo; + struct io_listener *l; + int fd, status; + + /* This is how many tests you plan to run */ + plan_tests(13); + d->state = 0; + fd = make_listen_fd("65007", &addrinfo); + ok1(fd >= 0); + l = io_new_listener(fd, start_break, finish_ok, d); + ok1(l); + fflush(stdout); + if (!fork()) { + int i; + + io_close_listener(l); + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + signal(SIGPIPE, SIG_IGN); + for (i = 0; i < strlen("hellothere"); i++) { + if (write(fd, "hellothere" + i, 1) != 1) + break; + } + close(fd); + freeaddrinfo(addrinfo); + free(d); + exit(0); + } + freeaddrinfo(addrinfo); + ok1(io_loop() == d); + ok1(d->state == 1); + io_close_listener(l); + + ok1(io_loop() == NULL); + ok1(d->state == 3); + ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); + free(d); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} diff --git a/ccan/io/test/run-10-many.c b/ccan/io/test/run-10-many.c new file mode 100644 index 00000000..6a972ba8 --- /dev/null +++ b/ccan/io/test/run-10-many.c @@ -0,0 +1,122 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +#define NUM 100 +#define NUM_ITERS 1000 + +struct buffer { + int iters; + struct io_conn *reader, *writer; + char buf[32]; +}; + +static struct io_op *poke_writer(struct io_conn *conn, struct buffer *buf); +static struct io_op *poke_reader(struct io_conn *conn, struct buffer *buf); + +static struct io_op *do_read(struct io_conn *conn, struct buffer *buf) +{ + assert(conn == buf->reader); + + return io_read(&buf->buf, sizeof(buf->buf), + io_next(conn, poke_writer, buf)); +} + +static struct io_op *do_write(struct io_conn *conn, struct buffer *buf) +{ + assert(conn == buf->writer); + + return io_write(&buf->buf, sizeof(buf->buf), + io_next(conn, poke_reader, buf)); +} + +static struct io_op *poke_writer(struct io_conn *conn, struct buffer *buf) +{ + assert(conn == buf->reader); + + if (buf->iters == NUM_ITERS) + return io_close(conn, NULL); + + /* You write. */ + io_wake(buf->writer, do_write, buf); + + /* I'll wait until you wake me. */ + return io_idle(conn); +} + +static struct io_op *poke_reader(struct io_conn *conn, struct buffer *buf) +{ + assert(conn == buf->writer); + /* You read. */ + io_wake(buf->reader, do_read, buf); + + if (++buf->iters == NUM_ITERS) + return io_close(conn, NULL); + + /* I'll wait until you tell me to write. */ + return io_idle(conn); +} + +static struct io_op *reader(struct io_conn *conn, struct buffer *buf) +{ + assert(conn == buf->reader); + + /* Wait for writer to tell us to read. */ + return io_idle(conn); +} + +static struct buffer buf[NUM]; + +int main(void) +{ + unsigned int i; + int fds[2], last_read, last_write; + + plan_tests(5 + NUM); + + ok1(pipe(fds) == 0); + last_read = fds[0]; + last_write = fds[1]; + + for (i = 1; i < NUM; i++) { + if (pipe(fds) < 0) + break; + memset(buf[i].buf, i, sizeof(buf[i].buf)); + sprintf(buf[i].buf, "%i-%i", i, i); + + buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]); + if (!buf[i].reader) + break; + buf[i].writer = io_new_conn(fds[1], do_write, NULL, &buf[i]); + if (!buf[i].writer) + break; + last_read = fds[0]; + } + if (!ok1(i == NUM)) + exit(exit_status()); + + /* Last one completes the cirle. */ + i = 0; + sprintf(buf[i].buf, "%i-%i", i, i); + buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]); + ok1(buf[i].reader); + buf[i].writer = io_new_conn(last_write, do_write, NULL, &buf[i]); + ok1(buf[i].writer); + + /* They should eventually exit */ + ok1(io_loop() == NULL); + + for (i = 0; i < NUM; i++) { + char b[sizeof(buf[0].buf)]; + memset(b, i, sizeof(b)); + sprintf(b, "%i-%i", i, i); + ok1(memcmp(b, buf[(i + NUM_ITERS) % NUM].buf, sizeof(b)) == 0); + } + + /* This exits depending on whether all tests passed */ + return exit_status(); +}