Designed for async I/O.
Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
htable \
idtree \
ilog \
+ io \
isaac \
iscsi \
jmap \
--- /dev/null
+../../licenses/BSD-MIT
\ No newline at end of file
--- /dev/null
+#include <stdio.h>
+#include <string.h>
+#include "config.h"
+
+/**
+ * io - simple library for stateful io handling.
+ *
+ * io provides a simple mechanism to write I/O servers with multiple
+ * connections. Handling of connections is multiplexed, and function
+ * indicate what they want written or read, and what follow-on
+ * function to call on success (or failure).
+ *
+ * Example:
+ * // Given tr A-Z a-z outputs tr a-z a-z
+ * #include <ccan/io/io.h>
+ * #include <ccan/err/err.h>
+ * #include <assert.h>
+ * #include <stdlib.h>
+ * #include <signal.h>
+ * #include <sys/types.h>
+ * #include <sys/wait.h>
+ *
+ * struct buffer {
+ * size_t max, off, rlen;
+ * char *buf;
+ * };
+ *
+ * struct stdin_buffer {
+ * struct io_conn *reader, *writer;
+ * size_t len;
+ * char inbuf[4096];
+ * };
+ *
+ * // This reads from stdin.
+ * static struct io_op *wake_writer(struct io_conn *, struct stdin_buffer *);
+ * // This writes the stdin buffer to the child.
+ * static struct io_op *write_to_child(struct io_conn *c,
+ * struct stdin_buffer *b);
+ * static struct io_op *read_stdin(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * assert(c == b->reader);
+ * b->len = sizeof(b->inbuf);
+ * return io_read_partial(b->inbuf, &b->len,
+ * io_next(c, wake_writer, b));
+ * }
+ *
+ * static struct io_op *wake_writer(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * assert(c == b->reader);
+ * io_wake(b->writer, write_to_child, b);
+ * return io_idle(c);
+ * }
+ *
+ * static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * assert(c == b->reader);
+ * io_wake(b->writer, write_to_child, b);
+ * b->reader = NULL;
+ * }
+ *
+ * static struct io_op *wake_reader(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * assert(c == b->writer);
+ * io_wake(b->reader, read_stdin, b);
+ * return io_idle(c);
+ * }
+ *
+ * static struct io_op *write_to_child(struct io_conn *conn,
+ * struct stdin_buffer *b)
+ * {
+ * assert(conn == b->writer);
+ * if (!b->reader)
+ * return io_close(conn, NULL);
+ * return io_write(b->inbuf, b->len, io_next(conn, wake_reader, b));
+ * }
+ *
+ * static struct io_op *start_writer(struct io_conn *conn,
+ * struct stdin_buffer *b)
+ * {
+ * assert(conn == b->writer);
+ * return io_idle(conn);
+ * }
+ *
+ * static void fail_child_write(struct io_conn *conn, struct stdin_buffer *b)
+ * {
+ * if (b->reader)
+ * err(1, "Failed writing to child.");
+ * }
+ *
+ * // This reads from the child and saves it into buffer.
+ * static struct io_op *read_from_child(struct io_conn *conn,
+ * struct buffer *b)
+ * {
+ * b->off += b->rlen;
+ *
+ * if (b->off == b->max) {
+ * if (b->max == 0)
+ * b->max = 128;
+ * else if (b->max >= 1024*1024)
+ * b->max += 1024*1024;
+ * else
+ * b->max *= 2;
+ * b->buf = realloc(b->buf, b->max);
+ * }
+ *
+ * b->rlen = b->max - b->off;
+ * return io_read_partial(b->buf + b->off, &b->rlen,
+ * io_next(conn, read_from_child, b));
+ * }
+ *
+ * // Feed a program our stdin, gather its stdout, print that at end.
+ * int main(int argc, char *argv[])
+ * {
+ * int tochild[2], fromchild[2];
+ * struct buffer out = { 0, 0, 0, NULL };
+ * struct stdin_buffer sbuf;
+ * int status;
+ * size_t off;
+ * ssize_t ret;
+ *
+ * if (argc == 1)
+ * errx(1, "Usage: runner <cmdline>...");
+ *
+ * if (pipe(tochild) != 0 || pipe(fromchild) != 0)
+ * err(1, "Creating pipes");
+ *
+ * if (!fork()) {
+ * // Child runs command.
+ * close(tochild[1]);
+ * close(fromchild[0]);
+ *
+ * dup2(tochild[0], STDIN_FILENO);
+ * dup2(fromchild[1], STDOUT_FILENO);
+ * execvp(argv[1], argv + 1);
+ * exit(127);
+ * }
+ *
+ * close(tochild[0]);
+ * close(fromchild[1]);
+ * signal(SIGPIPE, SIG_IGN);
+ *
+ * sbuf.reader = io_new_conn(STDIN_FILENO, read_stdin, reader_exit, &sbuf);
+ * sbuf.writer = io_new_conn(tochild[1], start_writer, fail_child_write,
+ * &sbuf);
+ * if (!sbuf.reader || !sbuf.writer
+ * || !io_new_conn(fromchild[0], read_from_child, NULL, &out))
+ * err(1, "Allocating connections");
+ *
+ * io_loop();
+ * wait(&status);
+ *
+ * for (off = 0; off < out.off; off += ret) {
+ * ret = write(STDOUT_FILENO, out.buf+off, out.off-off);
+ * if (ret < 0)
+ * err(1, "Writing stdout");
+ * }
+ * free(out.buf);
+ *
+ * return WIFEXITED(status) ? WEXITSTATUS(status) : 2;
+ * }
+ *
+ * License: BSD-MIT
+ */
+int main(int argc, char *argv[])
+{
+ if (argc != 2)
+ return 1;
+
+ if (strcmp(argv[1], "depends") == 0) {
+ return 0;
+ }
+
+ return 1;
+}
--- /dev/null
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#ifndef CCAN_IO_BACKEND_H
+#define CCAN_IO_BACKEND_H
+#include <stdbool.h>
+
+struct fd {
+ int fd;
+ bool listener;
+ size_t backend_info;
+
+ struct io_op *(*next)(struct io_conn *, void *arg);
+ void *next_arg;
+
+ void (*finish)(struct io_conn *, void *arg);
+ void *finish_arg;
+};
+
+
+/* Listeners create connections. */
+struct io_listener {
+ struct fd fd;
+};
+
+enum io_state {
+ NEXT, /* eg starting, woken from idle, return from io_break. */
+ READ,
+ WRITE,
+ READPART,
+ WRITEPART,
+ IDLE,
+ FINISHED,
+ PROCESSING /* We expect them to change this now. */
+};
+
+static inline enum io_state from_ioop(struct io_op *op)
+{
+ return (enum io_state)(long)op;
+}
+
+struct io_state_read {
+ char *buf;
+ size_t len;
+};
+
+struct io_state_write {
+ const char *buf;
+ size_t len;
+};
+
+struct io_state_readpart {
+ char *buf;
+ size_t *lenp;
+};
+
+struct io_state_writepart {
+ const char *buf;
+ size_t *lenp;
+};
+
+/* One connection per client. */
+struct io_conn {
+ struct fd fd;
+
+ enum io_state state;
+ union {
+ struct io_state_read read;
+ struct io_state_write write;
+ struct io_state_readpart readpart;
+ struct io_state_writepart writepart;
+ } u;
+};
+
+extern void *io_loop_return;
+
+bool add_listener(struct io_listener *l);
+bool add_conn(struct io_conn *c);
+void del_listener(struct io_listener *l);
+void backend_set_state(struct io_conn *conn, struct io_op *op);
+
+struct io_op *do_writeable(struct io_conn *conn);
+struct io_op *do_readable(struct io_conn *conn);
+#endif /* CCAN_IO_BACKEND_H */
--- /dev/null
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <assert.h>
+
+void *io_loop_return;
+
+struct io_listener *io_new_listener_(int fd,
+ struct io_op *(*start)(struct io_conn *,
+ void *arg),
+ void (*finish)(struct io_conn *, void *),
+ void *arg)
+{
+ struct io_listener *l = malloc(sizeof(*l));
+
+ if (!l)
+ return NULL;
+
+ l->fd.listener = true;
+ l->fd.fd = fd;
+ l->fd.next = start;
+ l->fd.finish = finish;
+ l->fd.finish_arg = l->fd.next_arg = arg;
+ if (!add_listener(l)) {
+ free(l);
+ return NULL;
+ }
+ return l;
+}
+
+void io_close_listener(struct io_listener *l)
+{
+ close(l->fd.fd);
+ del_listener(l);
+ free(l);
+}
+
+struct io_conn *io_new_conn_(int fd,
+ struct io_op *(*start)(struct io_conn *, void *),
+ void (*finish)(struct io_conn *, void *),
+ void *arg)
+{
+ struct io_conn *conn = malloc(sizeof(*conn));
+
+ if (!conn)
+ return NULL;
+
+ conn->fd.listener = false;
+ conn->fd.fd = fd;
+ conn->fd.next = start;
+ conn->fd.finish = finish;
+ conn->fd.finish_arg = conn->fd.next_arg = arg;
+ conn->state = NEXT;
+ if (!add_conn(conn)) {
+ free(conn);
+ return NULL;
+ }
+ return conn;
+}
+
+/* Convenient token which only we can produce. */
+static inline struct io_next *to_ionext(struct io_conn *conn)
+{
+ return (struct io_next *)conn;
+}
+
+static inline struct io_op *to_ioop(enum io_state state)
+{
+ return (struct io_op *)(long)state;
+}
+
+static inline struct io_conn *from_ionext(struct io_next *next)
+{
+ return (struct io_conn *)next;
+}
+
+struct io_next *io_next_(struct io_conn *conn,
+ struct io_op *(*next)(struct io_conn *, void *),
+ void *arg)
+{
+ conn->fd.next = next;
+ conn->fd.next_arg = arg;
+
+ return to_ionext(conn);
+}
+
+/* Queue some data to be written. */
+struct io_op *io_write(const void *data, size_t len, struct io_next *next)
+{
+ struct io_conn *conn = from_ionext(next);
+ conn->u.write.buf = data;
+ conn->u.write.len = len;
+ return to_ioop(WRITE);
+}
+
+/* Queue a request to read into a buffer. */
+struct io_op *io_read(void *data, size_t len, struct io_next *next)
+{
+ struct io_conn *conn = from_ionext(next);
+ conn->u.read.buf = data;
+ conn->u.read.len = len;
+ return to_ioop(READ);
+}
+
+/* Queue a partial request to read into a buffer. */
+struct io_op *io_read_partial(void *data, size_t *len, struct io_next *next)
+{
+ struct io_conn *conn = from_ionext(next);
+ conn->u.readpart.buf = data;
+ conn->u.readpart.lenp = len;
+ return to_ioop(READPART);
+}
+
+/* Queue a partial write request. */
+struct io_op *io_write_partial(const void *data, size_t *len, struct io_next *next)
+{
+ struct io_conn *conn = from_ionext(next);
+ conn->u.writepart.buf = data;
+ conn->u.writepart.lenp = len;
+ return to_ioop(WRITEPART);
+}
+
+struct io_op *io_idle(struct io_conn *conn)
+{
+ return to_ioop(IDLE);
+}
+
+void io_wake_(struct io_conn *conn,
+ struct io_op *(*next)(struct io_conn *, void *), void *arg)
+
+{
+ /* It might have finished, but we haven't called its finish() yet. */
+ if (conn->state == FINISHED)
+ return;
+ assert(conn->state == IDLE);
+ conn->fd.next = next;
+ conn->fd.next_arg = arg;
+ backend_set_state(conn, to_ioop(NEXT));
+}
+
+static struct io_op *do_next(struct io_conn *conn)
+{
+ return conn->fd.next(conn, conn->fd.next_arg);
+}
+
+struct io_op *do_writeable(struct io_conn *conn)
+{
+ ssize_t ret;
+ bool finished;
+
+ switch (conn->state) {
+ case WRITE:
+ ret = write(conn->fd.fd, conn->u.write.buf, conn->u.write.len);
+ if (ret < 0)
+ return io_close(conn, NULL);
+ conn->u.write.buf += ret;
+ conn->u.write.len -= ret;
+ finished = (conn->u.write.len == 0);
+ break;
+ case WRITEPART:
+ ret = write(conn->fd.fd, conn->u.writepart.buf,
+ *conn->u.writepart.lenp);
+ if (ret < 0)
+ return io_close(conn, NULL);
+ *conn->u.writepart.lenp = ret;
+ finished = true;
+ break;
+ default:
+ /* Shouldn't happen. */
+ abort();
+ }
+
+ if (finished)
+ return do_next(conn);
+ return to_ioop(conn->state);
+}
+
+struct io_op *do_readable(struct io_conn *conn)
+{
+ ssize_t ret;
+ bool finished;
+
+ switch (conn->state) {
+ case READ:
+ ret = read(conn->fd.fd, conn->u.read.buf, conn->u.read.len);
+ if (ret <= 0)
+ return io_close(conn, NULL);
+ conn->u.read.buf += ret;
+ conn->u.read.len -= ret;
+ finished = (conn->u.read.len == 0);
+ break;
+ case READPART:
+ ret = read(conn->fd.fd, conn->u.readpart.buf,
+ *conn->u.readpart.lenp);
+ if (ret <= 0)
+ return io_close(conn, NULL);
+ *conn->u.readpart.lenp = ret;
+ finished = true;
+ break;
+ default:
+ /* Shouldn't happen. */
+ abort();
+ }
+
+ if (finished)
+ return do_next(conn);
+ return to_ioop(conn->state);
+}
+
+/* Useful next functions. */
+/* Close the connection, we're done. */
+struct io_op *io_close(struct io_conn *conn, void *arg)
+{
+ return to_ioop(FINISHED);
+}
+
+/* Exit the loop, returning this (non-NULL) arg. */
+struct io_op *io_break(void *arg, struct io_next *next)
+{
+ io_loop_return = arg;
+
+ return to_ioop(NEXT);
+}
--- /dev/null
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#ifndef CCAN_IO_H
+#define CCAN_IO_H
+#include <ccan/typesafe_cb/typesafe_cb.h>
+#include <stdbool.h>
+#include <unistd.h>
+
+/**
+ * struct io_op - pointer to return from io functions.
+ *
+ * This undefined structure is just to help the compiler check that you
+ * really do return the result of an io-queueing method.
+ */
+struct io_op;
+
+/**
+ * struct io_next - pointer to what we're going to do next.
+ *
+ * Bundles up callbacks, generated by io_next().
+ */
+struct io_next;
+
+/**
+ * io_new_conn - create a new connection.
+ * @fd: the file descriptor.
+ * @start: the first function to call.
+ * @finish: the function to call when it's closed or fails.
+ * @arg: the argument to both @start and @finish.
+ *
+ * This creates a connection which owns @fd. @start will be called on the
+ * next return to io_loop(), and @finish will be called when an I/O operation
+ * fails, or you call io_close() on the connection.
+ *
+ * The @start function must call one of the io queueing functions
+ * (eg. io_read, io_write) and return the next function to call once
+ * that is done using io_next(). The alternative is to call io_close().
+ *
+ * Returns NULL on error (and sets errno).
+ */
+#define io_new_conn(fd, start, finish, arg) \
+ io_new_conn_((fd), \
+ typesafe_cb_preargs(struct io_op *, void *, \
+ (start), (arg), struct io_conn *), \
+ typesafe_cb_preargs(void, void *, (finish), (arg), \
+ struct io_conn *), \
+ (arg))
+struct io_conn *io_new_conn_(int fd,
+ struct io_op *(*start)(struct io_conn *, void *),
+ void (*finish)(struct io_conn *, void *),
+ void *arg);
+
+/**
+ * io_new_listener - create a new accepting listener.
+ * @fd: the file descriptor.
+ * @start: the first function to call on new connections.
+ * @finish: the function to call when the connection is closed or fails.
+ * @arg: the argument to both @start and @finish.
+ *
+ * When @fd becomes readable, we accept() and turn that fd into a new
+ * connection.
+ *
+ * Returns NULL on error (and sets errno).
+ */
+#define io_new_listener(fd, start, finish, arg) \
+ io_new_listener_((fd), \
+ typesafe_cb_preargs(struct io_op *, void *, \
+ (start), (arg), \
+ struct io_conn *), \
+ typesafe_cb_preargs(void, void *, (finish), \
+ (arg), struct io_conn *), \
+ (arg))
+struct io_listener *io_new_listener_(int fd,
+ struct io_op *(*start)(struct io_conn *,
+ void *arg),
+ void (*finish)(struct io_conn *,
+ void *arg),
+ void *arg);
+
+/**
+ * io_close_listener - delete a listener.
+ * @listener: the listener returned from io_new_listener.
+ *
+ * This closes the fd and frees @listener.
+ */
+void io_close_listener(struct io_listener *listener);
+
+/**
+ * io_write - queue data to be written.
+ * @data: the data buffer.
+ * @len: the length to write.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for writing. Once it's all written, the
+ * function registered with io_next() will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_write(const void *data, size_t len, struct io_next *next);
+
+/**
+ * io_read - queue buffer to be read.
+ * @data: the data buffer.
+ * @len: the length to read.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for reading. Once it's all read, the
+ * function registered with io_next() will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_read(void *data, size_t len, struct io_next *next);
+
+/**
+ * io_read_partial - queue buffer to be read (partial OK).
+ * @data: the data buffer.
+ * @len: the maximum length to read, set to the length actually read.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for reading. Once any data is
+ * read, @len is updated and the function registered with io_next()
+ * will be called: on an error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_read_partial(void *data, size_t *len, struct io_next *next);
+
+/**
+ * io_write_partial - queue data to be written (partial OK).
+ * @data: the data buffer.
+ * @len: the maximum length to write, set to the length actually written.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for writing. Once any data is
+ * written, @len is updated and the function registered with io_next()
+ * will be called: on an error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_write_partial(const void *data, size_t *len,
+ struct io_next *next);
+
+/**
+ * io_idle - explicitly note that this connection will do nothing.
+ * @conn: the current connection.
+ *
+ * This indicates the connection is idle: some other function will
+ * later call io_read/io_write etc. (or io_close) on it, in which case
+ * it will do that.
+ */
+struct io_op *io_idle(struct io_conn *conn);
+
+/**
+ * io_wake - wake up and idle connection.
+ * @conn: an idle connection.
+ * @next: the next function to call once queued IO is complete.
+ * @arg: the argument to @next.
+ *
+ * This makes @conn run its @next function the next time around the
+ * io_loop().
+ */
+#define io_wake(conn, next, arg) \
+ io_wake_((conn), \
+ typesafe_cb_preargs(struct io_op *, void *, \
+ (next), (arg), struct io_conn *), \
+ (arg))
+void io_wake_(struct io_conn *conn,
+ struct io_op *(*next)(struct io_conn *, void *), void *arg);
+
+/**
+ * io_break - return from io_loop()
+ * @arg: non-NULL value to return from io_loop().
+ * @next: what to call next (can be NULL if we expect no return).
+ *
+ * This breaks out of the io_loop. As soon as the current @next
+ * function returns, any io_closed()'d connections will have their
+ * finish callbacks called, then io_loop() with return with @arg.
+ *
+ * If io_loop() is called again, then @next will be called.
+ */
+struct io_op *io_break(void *arg, struct io_next *next);
+
+/**
+ * io_next - indicate what callback to call next.
+ * @conn: this connection.
+ * @next: the next function to call once queued IO is complete.
+ * @arg: the argument to @next.
+ *
+ * Every @next (or @start) function should "return io_next(...);" once
+ * they have indicated what io to perform (eg. io_write, io_idle).
+ * The exception is io_close(), which can be used instead of io_next().
+ *
+ * Note that as an optimization, the next function may be called
+ * immediately, which is why this should be the last statement in your
+ * function.
+ */
+#define io_next(conn, next, arg) \
+ io_next_((conn), \
+ typesafe_cb_preargs(struct io_op *, void *, \
+ (next), (arg), struct io_conn *), \
+ (arg))
+struct io_next *io_next_(struct io_conn *conn,
+ struct io_op *(*next)(struct io_conn *, void *arg),
+ void *arg);
+
+/* FIXME: io_recvfrom/io_sendto */
+
+/**
+ * io_close - terminate a connection.
+ * @conn: any connection.
+ *
+ * The schedules a connection to be closed. It can be done on any
+ * connection, whether it has I/O queued or not (though that I/O may
+ * be performed first).
+ *
+ * It's common to 'return io_close(...)' from a @next function, but
+ * io_close can also be used as an argument to io_next().
+ */
+struct io_op *io_close(struct io_conn *, void *unused);
+
+/**
+ * io_loop - process fds until all closed on io_break.
+ *
+ * This is the core loop; it exits with the io_break() arg, or NULL if
+ * all connections and listeners are closed.
+ */
+void *io_loop(void);
+#endif /* CCAN_IO_H */
--- /dev/null
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <assert.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+static size_t num_fds = 0, max_fds = 0, num_next = 0, num_finished = 0;
+static struct pollfd *pollfds = NULL;
+static struct fd **fds = NULL;
+
+static bool add_fd(struct fd *fd, short events)
+{
+ if (num_fds + 1 > max_fds) {
+ struct pollfd *newpollfds;
+ struct fd **newfds;
+ size_t num = max_fds ? max_fds * 2 : 8;
+
+ newpollfds = realloc(pollfds, sizeof(*newpollfds) * num);
+ if (!newpollfds)
+ return false;
+ pollfds = newpollfds;
+ newfds = realloc(fds, sizeof(*newfds) * num);
+ if (!newfds)
+ return false;
+ fds = newfds;
+ max_fds = num;
+ }
+
+ pollfds[num_fds].fd = fd->fd;
+ pollfds[num_fds].events = events;
+ pollfds[num_fds].revents = 0; /* In case we're iterating now */
+ fds[num_fds] = fd;
+ fd->backend_info = num_fds;
+ num_fds++;
+ return true;
+}
+
+static void del_fd(struct fd *fd)
+{
+ size_t n = fd->backend_info;
+
+ assert(n != -1);
+ assert(n < num_fds);
+ if (n != num_fds - 1) {
+ /* Move last one over us. */
+ pollfds[n] = pollfds[num_fds-1];
+ fds[n] = fds[num_fds-1];
+ assert(fds[n]->backend_info == num_fds-1);
+ fds[n]->backend_info = n;
+ } else if (num_fds == 1) {
+ /* Free everything when no more fds. */
+ free(pollfds);
+ free(fds);
+ pollfds = NULL;
+ fds = NULL;
+ max_fds = 0;
+ }
+ num_fds--;
+ fd->backend_info = -1;
+ close(fd->fd);
+}
+
+bool add_listener(struct io_listener *l)
+{
+ return add_fd(&l->fd, POLLIN);
+}
+
+bool add_conn(struct io_conn *c)
+{
+ if (!add_fd(&c->fd, 0))
+ return false;
+ num_next++;
+ return true;
+}
+
+static void del_conn(struct io_conn *conn)
+{
+ if (conn->fd.finish)
+ conn->fd.finish(conn, conn->fd.finish_arg);
+ del_fd(&conn->fd);
+ if (conn->state == FINISHED)
+ num_finished--;
+ else if (conn->state == NEXT)
+ num_next--;
+}
+
+void del_listener(struct io_listener *l)
+{
+ del_fd(&l->fd);
+}
+
+void backend_set_state(struct io_conn *conn, struct io_op *op)
+{
+ enum io_state state = from_ioop(op);
+ struct pollfd *pfd = &pollfds[conn->fd.backend_info];
+
+ switch (state) {
+ case READ:
+ case READPART:
+ pfd->events = POLLIN;
+ break;
+ case WRITE:
+ case WRITEPART:
+ pfd->events = POLLOUT;
+ break;
+ case IDLE:
+ pfd->events = 0;
+ break;
+ case NEXT:
+ num_next++;
+ break;
+ case FINISHED:
+ num_finished++;
+ break;
+ default:
+ abort();
+ }
+ conn->state = state;
+}
+
+static void accept_conn(struct io_listener *l)
+{
+ struct io_conn *c;
+ int fd = accept(l->fd.fd, NULL, NULL);
+
+ /* FIXME: What to do here? */
+ if (fd < 0)
+ return;
+ c = io_new_conn(fd, l->fd.next, l->fd.finish, l->fd.next_arg);
+ if (!c) {
+ close(fd);
+ return;
+ }
+}
+
+/* It's OK to miss some, as long as we make progress. */
+static void finish_and_next(bool finished_only)
+{
+ unsigned int i;
+
+ for (i = 0; !io_loop_return && i < num_fds; i++) {
+ struct io_conn *c;
+
+ if (!num_finished) {
+ if (finished_only || num_next == 0)
+ break;
+ }
+ if (fds[i]->listener)
+ continue;
+ c = (void *)fds[i];
+ if (c->state == FINISHED) {
+ del_conn(c);
+ free(c);
+ i--;
+ } else if (!finished_only && c->state == NEXT) {
+ backend_set_state(c, c->fd.next(c, c->fd.next_arg));
+ num_next--;
+ }
+ }
+}
+
+/* This is the main loop. */
+void *io_loop(void)
+{
+ void *ret;
+
+ while (!io_loop_return) {
+ int i, r;
+
+ if (num_finished || num_next) {
+ finish_and_next(false);
+ /* Could have started/finished more. */
+ continue;
+ }
+
+ if (num_fds == 0)
+ break;
+
+ r = poll(pollfds, num_fds, -1);
+ if (r < 0)
+ break;
+
+ for (i = 0; i < num_fds && !io_loop_return; i++) {
+ struct io_conn *c = (void *)fds[i];
+ if (pollfds[i].revents & POLLOUT)
+ backend_set_state(c, do_writeable(c));
+ else if (pollfds[i].revents & POLLIN) {
+ if (fds[i]->listener)
+ accept_conn((void *)c);
+ else
+ backend_set_state(c, do_readable(c));
+ } else if (pollfds[i].revents & POLLHUP) {
+ backend_set_state(c, io_close(c, NULL));
+ }
+ }
+ }
+
+ while (num_finished)
+ finish_and_next(true);
+
+ ret = io_loop_return;
+ io_loop_return = NULL;
+ return ret;
+}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+static struct io_op *start_ok(struct io_conn *conn, int *state)
+{
+ ok1(*state == 0);
+ (*state)++;
+ return io_close(conn, NULL);
+}
+
+static void finish_ok(struct io_conn *conn, int *state)
+{
+ ok1(*state == 1);
+ (*state)++;
+ io_break(state + 1, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ int state = 0;
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd;
+
+ /* This is how many tests you plan to run */
+ plan_tests(9);
+ fd = make_listen_fd("65001", &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, start_ok, finish_ok, &state);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ close(fd);
+ freeaddrinfo(addrinfo);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+ ok1(io_loop() == &state + 1);
+ ok1(state == 2);
+ io_close_listener(l);
+ ok1(wait(&state));
+ ok1(WIFEXITED(state));
+ ok1(WEXITSTATUS(state) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+ int state;
+ char buf[4];
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+ return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(10);
+ d->state = 0;
+ fd = make_listen_fd("65002", &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, start_ok, finish_ok, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ int i;
+
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ for (i = 0; i < strlen("hellothere"); i++) {
+ if (write(fd, "hellothere" + i, 1) != 1)
+ break;
+ }
+ close(fd);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+ ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+ free(d);
+ io_close_listener(l);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+ int state;
+ size_t bytes;
+ char buf[4];
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+ d->bytes = sizeof(d->buf);
+ return io_read_partial(d->buf, &d->bytes, io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+static void write_to_socket(const char *str, const struct addrinfo *addrinfo)
+{
+ int fd, i;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ for (i = 0; i < strlen(str); i++) {
+ if (write(fd, str + i, 1) != 1)
+ break;
+ }
+ close(fd);
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(22);
+ d->state = 0;
+ fd = make_listen_fd("65003", &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, start_ok, finish_ok, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ write_to_socket("hellothere", addrinfo);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(0);
+ }
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+ ok1(d->bytes > 0);
+ ok1(d->bytes <= sizeof(d->buf));
+ ok1(memcmp(d->buf, "hellothere", d->bytes) == 0);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ write_to_socket("hi", addrinfo);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(0);
+ }
+ d->state = 0;
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+ ok1(d->bytes > 0);
+ ok1(d->bytes <= strlen("hi"));
+ ok1(memcmp(d->buf, "hi", d->bytes) == 0);
+
+ freeaddrinfo(addrinfo);
+ free(d);
+ io_close_listener(l);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+ int state;
+ size_t bytes;
+ char *buf;
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+ return io_write_partial(d->buf, &d->bytes, io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+static void read_from_socket(const char *str, const struct addrinfo *addrinfo)
+{
+ int fd;
+ char buf[100];
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ if (read(fd, buf, strlen(str)) != strlen(str))
+ exit(3);
+ if (memcmp(buf, str, strlen(str)) != 0)
+ exit(4);
+ close(fd);
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(11);
+ d->state = 0;
+ d->bytes = 1024*1024;
+ d->buf = malloc(d->bytes);
+ memset(d->buf, 'a', d->bytes);
+ fd = make_listen_fd("65004", &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, start_ok, finish_ok, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ read_from_socket("aaaaaa", addrinfo);
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ exit(0);
+ }
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+ ok1(d->bytes > 0);
+ ok1(d->bytes <= 1024*1024);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ io_close_listener(l);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+ int state;
+ size_t bytes;
+ char *buf;
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+ return io_write(d->buf, d->bytes, io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo)
+{
+ int fd, done, r;
+ char buf[100];
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+
+ for (done = 0; done < bytes; done += r) {
+ r = read(fd, buf, sizeof(buf));
+ if (r < 0)
+ exit(3);
+ done += r;
+ }
+ close(fd);
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(9);
+ d->state = 0;
+ d->bytes = 1024*1024;
+ d->buf = malloc(d->bytes);
+ memset(d->buf, 'a', d->bytes);
+ fd = make_listen_fd("65005", &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, start_ok, finish_ok, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ read_from_socket(d->bytes, addrinfo);
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ exit(0);
+ }
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ io_close_listener(l);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+static struct io_conn *idler;
+
+struct data {
+ int state;
+ char buf[4];
+};
+
+static struct io_op *do_read(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 2 || d->state == 3);
+ d->state++;
+ return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static struct io_op *start_waker(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+
+ io_wake(idler, do_read, d);
+ return io_close(conn, NULL);
+}
+
+static void finish_waker(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 2 || d->state == 3);
+ d->state++;
+}
+
+static struct io_op *start_idle(struct io_conn *conn, struct data *d)
+{
+ int fd;
+
+ ok1(d->state == 0);
+ d->state++;
+ idler = conn;
+
+ /* This will wake us up. */
+ fd = open("/dev/null", O_RDONLY);
+ ok1(fd >= 0);
+ ok1(io_new_conn(fd, start_waker, finish_waker, d));
+
+ return io_idle(conn);
+}
+
+static void finish_idle(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 4);
+ d->state++;
+ io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(15);
+ d->state = 0;
+ fd = make_listen_fd("65006", &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, start_idle, finish_idle, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ int i;
+
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ for (i = 0; i < strlen("hellothere"); i++) {
+ if (write(fd, "hellothere" + i, 1) != 1)
+ break;
+ }
+ close(fd);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+
+ ok1(io_loop() == d);
+ ok1(d->state == 5);
+ ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+ free(d);
+ io_close_listener(l);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+ int state;
+ char buf[4];
+};
+
+static struct io_op *do_read(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static struct io_op *start_break(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+ return io_break(d, io_next(conn, do_read, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 2);
+ d->state++;
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(13);
+ d->state = 0;
+ fd = make_listen_fd("65007", &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, start_break, finish_ok, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ int i;
+
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ for (i = 0; i < strlen("hellothere"); i++) {
+ if (write(fd, "hellothere" + i, 1) != 1)
+ break;
+ }
+ close(fd);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+ ok1(io_loop() == d);
+ ok1(d->state == 1);
+ io_close_listener(l);
+
+ ok1(io_loop() == NULL);
+ ok1(d->state == 3);
+ ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+ free(d);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#define NUM 100
+#define NUM_ITERS 1000
+
+struct buffer {
+ int iters;
+ struct io_conn *reader, *writer;
+ char buf[32];
+};
+
+static struct io_op *poke_writer(struct io_conn *conn, struct buffer *buf);
+static struct io_op *poke_reader(struct io_conn *conn, struct buffer *buf);
+
+static struct io_op *do_read(struct io_conn *conn, struct buffer *buf)
+{
+ assert(conn == buf->reader);
+
+ return io_read(&buf->buf, sizeof(buf->buf),
+ io_next(conn, poke_writer, buf));
+}
+
+static struct io_op *do_write(struct io_conn *conn, struct buffer *buf)
+{
+ assert(conn == buf->writer);
+
+ return io_write(&buf->buf, sizeof(buf->buf),
+ io_next(conn, poke_reader, buf));
+}
+
+static struct io_op *poke_writer(struct io_conn *conn, struct buffer *buf)
+{
+ assert(conn == buf->reader);
+
+ if (buf->iters == NUM_ITERS)
+ return io_close(conn, NULL);
+
+ /* You write. */
+ io_wake(buf->writer, do_write, buf);
+
+ /* I'll wait until you wake me. */
+ return io_idle(conn);
+}
+
+static struct io_op *poke_reader(struct io_conn *conn, struct buffer *buf)
+{
+ assert(conn == buf->writer);
+ /* You read. */
+ io_wake(buf->reader, do_read, buf);
+
+ if (++buf->iters == NUM_ITERS)
+ return io_close(conn, NULL);
+
+ /* I'll wait until you tell me to write. */
+ return io_idle(conn);
+}
+
+static struct io_op *reader(struct io_conn *conn, struct buffer *buf)
+{
+ assert(conn == buf->reader);
+
+ /* Wait for writer to tell us to read. */
+ return io_idle(conn);
+}
+
+static struct buffer buf[NUM];
+
+int main(void)
+{
+ unsigned int i;
+ int fds[2], last_read, last_write;
+
+ plan_tests(5 + NUM);
+
+ ok1(pipe(fds) == 0);
+ last_read = fds[0];
+ last_write = fds[1];
+
+ for (i = 1; i < NUM; i++) {
+ if (pipe(fds) < 0)
+ break;
+ memset(buf[i].buf, i, sizeof(buf[i].buf));
+ sprintf(buf[i].buf, "%i-%i", i, i);
+
+ buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+ if (!buf[i].reader)
+ break;
+ buf[i].writer = io_new_conn(fds[1], do_write, NULL, &buf[i]);
+ if (!buf[i].writer)
+ break;
+ last_read = fds[0];
+ }
+ if (!ok1(i == NUM))
+ exit(exit_status());
+
+ /* Last one completes the cirle. */
+ i = 0;
+ sprintf(buf[i].buf, "%i-%i", i, i);
+ buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+ ok1(buf[i].reader);
+ buf[i].writer = io_new_conn(last_write, do_write, NULL, &buf[i]);
+ ok1(buf[i].writer);
+
+ /* They should eventually exit */
+ ok1(io_loop() == NULL);
+
+ for (i = 0; i < NUM; i++) {
+ char b[sizeof(buf[0].buf)];
+ memset(b, i, sizeof(b));
+ sprintf(b, "%i-%i", i, i);
+ ok1(memcmp(b, buf[(i + NUM_ITERS) % NUM].buf, sizeof(b)) == 0);
+ }
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}