]> git.ozlabs.org Git - ccan/commitdiff
ccan/io: new module.
authorRusty Russell <rusty@rustcorp.com.au>
Mon, 14 Oct 2013 10:04:07 +0000 (20:34 +1030)
committerRusty Russell <rusty@rustcorp.com.au>
Mon, 14 Oct 2013 10:04:07 +0000 (20:34 +1030)
Designed for async I/O.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
15 files changed:
Makefile-ccan
ccan/io/LICENSE [new symlink]
ccan/io/_info [new file with mode: 0644]
ccan/io/backend.h [new file with mode: 0644]
ccan/io/io.c [new file with mode: 0644]
ccan/io/io.h [new file with mode: 0644]
ccan/io/poll.c [new file with mode: 0644]
ccan/io/test/run-01-start-finish.c [new file with mode: 0644]
ccan/io/test/run-02-read.c [new file with mode: 0644]
ccan/io/test/run-03-readpartial.c [new file with mode: 0644]
ccan/io/test/run-04-writepartial.c [new file with mode: 0644]
ccan/io/test/run-05-write.c [new file with mode: 0644]
ccan/io/test/run-06-idle.c [new file with mode: 0644]
ccan/io/test/run-07-break.c [new file with mode: 0644]
ccan/io/test/run-10-many.c [new file with mode: 0644]

index 86d6ac0c44e3ca149d1e0fefb2448379bf5df4d3..7781657766f1268690caa0dc73bb2a7a490102a1 100644 (file)
@@ -55,6 +55,7 @@ MODS_WITH_SRC := antithread \
        htable \
        idtree \
        ilog \
+       io \
        isaac \
        iscsi \
        jmap \
diff --git a/ccan/io/LICENSE b/ccan/io/LICENSE
new file mode 120000 (symlink)
index 0000000..2354d12
--- /dev/null
@@ -0,0 +1 @@
+../../licenses/BSD-MIT
\ No newline at end of file
diff --git a/ccan/io/_info b/ccan/io/_info
new file mode 100644 (file)
index 0000000..f494c12
--- /dev/null
@@ -0,0 +1,174 @@
+#include <stdio.h>
+#include <string.h>
+#include "config.h"
+
+/**
+ * io - simple library for stateful io handling.
+ *
+ * io provides a simple mechanism to write I/O servers with multiple
+ * connections.  Handling of connections is multiplexed, and function
+ * indicate what they want written or read, and what follow-on
+ * function to call on success (or failure).
+ *
+ * Example:
+ * // Given tr A-Z a-z outputs tr a-z a-z
+ * #include <ccan/io/io.h>
+ * #include <ccan/err/err.h>
+ * #include <assert.h>
+ * #include <stdlib.h>
+ * #include <signal.h>
+ * #include <sys/types.h>
+ * #include <sys/wait.h>
+ *
+ * struct buffer {
+ *     size_t max, off, rlen;
+ *     char *buf;
+ * };
+ *
+ * struct stdin_buffer {
+ *     struct io_conn *reader, *writer;
+ *     size_t len;
+ *     char inbuf[4096];
+ * };
+ *
+ * // This reads from stdin.
+ * static struct io_op *wake_writer(struct io_conn *, struct stdin_buffer *);
+ * // This writes the stdin buffer to the child.
+ * static struct io_op *write_to_child(struct io_conn *c,
+ *                                    struct stdin_buffer *b);
+ * static struct io_op *read_stdin(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ *     assert(c == b->reader);
+ *     b->len = sizeof(b->inbuf);
+ *     return io_read_partial(b->inbuf, &b->len,
+ *                            io_next(c, wake_writer, b));
+ * }
+ *
+ * static struct io_op *wake_writer(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ *     assert(c == b->reader);
+ *     io_wake(b->writer, write_to_child, b);
+ *     return io_idle(c);
+ * }
+ *
+ * static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ *     assert(c == b->reader);
+ *     io_wake(b->writer, write_to_child, b);
+ *     b->reader = NULL;
+ * }
+ *
+ * static struct io_op *wake_reader(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ *     assert(c == b->writer);
+ *     io_wake(b->reader, read_stdin, b);
+ *     return io_idle(c);
+ * }
+ *
+ * static struct io_op *write_to_child(struct io_conn *conn,
+ *                                   struct stdin_buffer *b)
+ * {
+ *     assert(conn == b->writer);
+ *     if (!b->reader)
+ *             return io_close(conn, NULL);
+ *     return io_write(b->inbuf, b->len, io_next(conn, wake_reader, b));
+ * }
+ *
+ * static struct io_op *start_writer(struct io_conn *conn,
+ *                                  struct stdin_buffer *b)
+ * {
+ *     assert(conn == b->writer);
+ *     return io_idle(conn);
+ * }
+ *
+ * static void fail_child_write(struct io_conn *conn, struct stdin_buffer *b)
+ * {
+ *     if (b->reader)
+ *             err(1, "Failed writing to child.");
+ * }
+ *
+ * // This reads from the child and saves it into buffer.
+ * static struct io_op *read_from_child(struct io_conn *conn,
+ *                                     struct buffer *b)
+ * {
+ *     b->off += b->rlen;
+ *
+ *     if (b->off == b->max) {
+ *             if (b->max == 0)
+ *                     b->max = 128;
+ *             else if (b->max >= 1024*1024)
+ *                     b->max += 1024*1024;
+ *             else
+ *                     b->max *= 2;
+ *             b->buf = realloc(b->buf, b->max);
+ *     }
+ *
+ *     b->rlen = b->max - b->off;
+ *     return io_read_partial(b->buf + b->off, &b->rlen,
+ *                            io_next(conn, read_from_child, b));
+ * }
+ *
+ * // Feed a program our stdin, gather its stdout, print that at end.
+ * int main(int argc, char *argv[])
+ * {
+ *     int tochild[2], fromchild[2];
+ *     struct buffer out = { 0, 0, 0, NULL };
+ *     struct stdin_buffer sbuf;
+ *     int status;
+ *     size_t off;
+ *     ssize_t ret;
+ *
+ *     if (argc == 1)
+ *             errx(1, "Usage: runner <cmdline>...");
+ *
+ *     if (pipe(tochild) != 0 || pipe(fromchild) != 0)
+ *             err(1, "Creating pipes");
+ *
+ *     if (!fork()) {
+ *             // Child runs command.
+ *             close(tochild[1]);
+ *             close(fromchild[0]);
+ *
+ *             dup2(tochild[0], STDIN_FILENO);
+ *             dup2(fromchild[1], STDOUT_FILENO);
+ *             execvp(argv[1], argv + 1);
+ *             exit(127);
+ *     }
+ *
+ *     close(tochild[0]);
+ *     close(fromchild[1]);
+ *     signal(SIGPIPE, SIG_IGN);
+ *
+ *     sbuf.reader = io_new_conn(STDIN_FILENO, read_stdin, reader_exit, &sbuf);
+ *     sbuf.writer = io_new_conn(tochild[1], start_writer, fail_child_write,
+ *                               &sbuf);
+ *     if (!sbuf.reader || !sbuf.writer
+ *         || !io_new_conn(fromchild[0], read_from_child, NULL, &out))
+ *             err(1, "Allocating connections");
+ *
+ *     io_loop();
+ *     wait(&status);
+ *
+ *     for (off = 0; off < out.off; off += ret) {
+ *             ret = write(STDOUT_FILENO, out.buf+off, out.off-off);
+ *             if (ret < 0)
+ *                     err(1, "Writing stdout");
+ *     }
+ *     free(out.buf);
+ *
+ *     return WIFEXITED(status) ? WEXITSTATUS(status) : 2;
+ * }
+ *
+ * License: BSD-MIT
+ */
+int main(int argc, char *argv[])
+{
+       if (argc != 2)
+               return 1;
+
+       if (strcmp(argv[1], "depends") == 0) {
+               return 0;
+       }
+
+       return 1;
+}
diff --git a/ccan/io/backend.h b/ccan/io/backend.h
new file mode 100644 (file)
index 0000000..06427eb
--- /dev/null
@@ -0,0 +1,82 @@
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#ifndef CCAN_IO_BACKEND_H
+#define CCAN_IO_BACKEND_H
+#include <stdbool.h>
+
+struct fd {
+       int fd;
+       bool listener;
+       size_t backend_info;
+
+       struct io_op *(*next)(struct io_conn *, void *arg);
+       void *next_arg;
+
+       void (*finish)(struct io_conn *, void *arg);
+       void *finish_arg;
+};
+
+
+/* Listeners create connections. */
+struct io_listener {
+       struct fd fd;
+};
+
+enum io_state {
+       NEXT, /* eg starting, woken from idle, return from io_break. */
+       READ,
+       WRITE,
+       READPART,
+       WRITEPART,
+       IDLE,
+       FINISHED,
+       PROCESSING /* We expect them to change this now. */
+};
+
+static inline enum io_state from_ioop(struct io_op *op)
+{
+       return (enum io_state)(long)op;
+}
+
+struct io_state_read {
+       char *buf;
+       size_t len;
+};
+
+struct io_state_write {
+       const char *buf;
+       size_t len;
+};
+
+struct io_state_readpart {
+       char *buf;
+       size_t *lenp;
+};
+
+struct io_state_writepart {
+       const char *buf;
+       size_t *lenp;
+};
+
+/* One connection per client. */
+struct io_conn {
+       struct fd fd;
+
+       enum io_state state;
+       union {
+               struct io_state_read read;
+               struct io_state_write write;
+               struct io_state_readpart readpart;
+               struct io_state_writepart writepart;
+       } u;
+};
+
+extern void *io_loop_return;
+
+bool add_listener(struct io_listener *l);
+bool add_conn(struct io_conn *c);
+void del_listener(struct io_listener *l);
+void backend_set_state(struct io_conn *conn, struct io_op *op);
+
+struct io_op *do_writeable(struct io_conn *conn);
+struct io_op *do_readable(struct io_conn *conn);
+#endif /* CCAN_IO_BACKEND_H */
diff --git a/ccan/io/io.c b/ccan/io/io.c
new file mode 100644 (file)
index 0000000..325db78
--- /dev/null
@@ -0,0 +1,229 @@
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <assert.h>
+
+void *io_loop_return;
+
+struct io_listener *io_new_listener_(int fd,
+                                    struct io_op *(*start)(struct io_conn *,
+                                                           void *arg),
+                                    void (*finish)(struct io_conn *, void *),
+                                    void *arg)
+{
+       struct io_listener *l = malloc(sizeof(*l));
+
+       if (!l)
+               return NULL;
+
+       l->fd.listener = true;
+       l->fd.fd = fd;
+       l->fd.next = start;
+       l->fd.finish = finish;
+       l->fd.finish_arg = l->fd.next_arg = arg;
+       if (!add_listener(l)) {
+               free(l);
+               return NULL;
+       }
+       return l;
+}
+
+void io_close_listener(struct io_listener *l)
+{
+       close(l->fd.fd);
+       del_listener(l);
+       free(l);
+}
+
+struct io_conn *io_new_conn_(int fd,
+                            struct io_op *(*start)(struct io_conn *, void *),
+                            void (*finish)(struct io_conn *, void *),
+                            void *arg)
+{
+       struct io_conn *conn = malloc(sizeof(*conn));
+
+       if (!conn)
+               return NULL;
+
+       conn->fd.listener = false;
+       conn->fd.fd = fd;
+       conn->fd.next = start;
+       conn->fd.finish = finish;
+       conn->fd.finish_arg = conn->fd.next_arg = arg;
+       conn->state = NEXT;
+       if (!add_conn(conn)) {
+               free(conn);
+               return NULL;
+       }
+       return conn;
+}
+
+/* Convenient token which only we can produce. */
+static inline struct io_next *to_ionext(struct io_conn *conn)
+{
+       return (struct io_next *)conn;
+}
+
+static inline struct io_op *to_ioop(enum io_state state)
+{
+       return (struct io_op *)(long)state;
+}
+
+static inline struct io_conn *from_ionext(struct io_next *next)
+{
+       return (struct io_conn *)next;
+}
+
+struct io_next *io_next_(struct io_conn *conn,
+                        struct io_op *(*next)(struct io_conn *, void *),
+                        void *arg)
+{
+       conn->fd.next = next;
+       conn->fd.next_arg = arg;
+
+       return to_ionext(conn);
+}
+
+/* Queue some data to be written. */
+struct io_op *io_write(const void *data, size_t len, struct io_next *next)
+{
+       struct io_conn *conn = from_ionext(next);
+       conn->u.write.buf = data;
+       conn->u.write.len = len;
+       return to_ioop(WRITE);
+}
+
+/* Queue a request to read into a buffer. */
+struct io_op *io_read(void *data, size_t len, struct io_next *next)
+{
+       struct io_conn *conn = from_ionext(next);
+       conn->u.read.buf = data;
+       conn->u.read.len = len;
+       return to_ioop(READ);
+}
+
+/* Queue a partial request to read into a buffer. */
+struct io_op *io_read_partial(void *data, size_t *len, struct io_next *next)
+{
+       struct io_conn *conn = from_ionext(next);
+       conn->u.readpart.buf = data;
+       conn->u.readpart.lenp = len;
+       return to_ioop(READPART);
+}
+
+/* Queue a partial write request. */
+struct io_op *io_write_partial(const void *data, size_t *len, struct io_next *next)
+{
+       struct io_conn *conn = from_ionext(next);
+       conn->u.writepart.buf = data;
+       conn->u.writepart.lenp = len;
+       return to_ioop(WRITEPART);
+}
+
+struct io_op *io_idle(struct io_conn *conn)
+{
+       return to_ioop(IDLE);
+}
+
+void io_wake_(struct io_conn *conn,
+             struct io_op *(*next)(struct io_conn *, void *), void *arg)
+
+{
+       /* It might have finished, but we haven't called its finish() yet. */
+       if (conn->state == FINISHED)
+               return;
+       assert(conn->state == IDLE);
+       conn->fd.next = next;
+       conn->fd.next_arg = arg;
+       backend_set_state(conn, to_ioop(NEXT));
+}
+
+static struct io_op *do_next(struct io_conn *conn)
+{
+       return conn->fd.next(conn, conn->fd.next_arg);
+}
+
+struct io_op *do_writeable(struct io_conn *conn)
+{
+       ssize_t ret;
+       bool finished;
+
+       switch (conn->state) {
+       case WRITE:
+               ret = write(conn->fd.fd, conn->u.write.buf, conn->u.write.len);
+               if (ret < 0)
+                       return io_close(conn, NULL);
+               conn->u.write.buf += ret;
+               conn->u.write.len -= ret;
+               finished = (conn->u.write.len == 0);
+               break;
+       case WRITEPART:
+               ret = write(conn->fd.fd, conn->u.writepart.buf,
+                           *conn->u.writepart.lenp);
+               if (ret < 0)
+                       return io_close(conn, NULL);
+               *conn->u.writepart.lenp = ret;
+               finished = true;
+               break;
+       default:
+               /* Shouldn't happen. */
+               abort();
+       }
+
+       if (finished)
+               return do_next(conn);
+       return to_ioop(conn->state);
+}
+
+struct io_op *do_readable(struct io_conn *conn)
+{
+       ssize_t ret;
+       bool finished;
+
+       switch (conn->state) {
+       case READ:
+               ret = read(conn->fd.fd, conn->u.read.buf, conn->u.read.len);
+               if (ret <= 0)
+                       return io_close(conn, NULL);
+               conn->u.read.buf += ret;
+               conn->u.read.len -= ret;
+               finished = (conn->u.read.len == 0);
+               break;
+       case READPART:
+               ret = read(conn->fd.fd, conn->u.readpart.buf,
+                           *conn->u.readpart.lenp);
+               if (ret <= 0)
+                       return io_close(conn, NULL);
+               *conn->u.readpart.lenp = ret;
+               finished = true;
+               break;
+       default:
+               /* Shouldn't happen. */
+               abort();
+       }
+
+       if (finished)
+               return do_next(conn);
+       return to_ioop(conn->state);
+}
+
+/* Useful next functions. */
+/* Close the connection, we're done. */
+struct io_op *io_close(struct io_conn *conn, void *arg)
+{
+       return to_ioop(FINISHED);
+}
+
+/* Exit the loop, returning this (non-NULL) arg. */
+struct io_op *io_break(void *arg, struct io_next *next)
+{
+       io_loop_return = arg;
+
+       return to_ioop(NEXT);
+}
diff --git a/ccan/io/io.h b/ccan/io/io.h
new file mode 100644 (file)
index 0000000..49b6a25
--- /dev/null
@@ -0,0 +1,229 @@
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#ifndef CCAN_IO_H
+#define CCAN_IO_H
+#include <ccan/typesafe_cb/typesafe_cb.h>
+#include <stdbool.h>
+#include <unistd.h>
+
+/**
+ * struct io_op - pointer to return from io functions.
+ *
+ * This undefined structure is just to help the compiler check that you
+ * really do return the result of an io-queueing method.
+ */
+struct io_op;
+
+/**
+ * struct io_next - pointer to what we're going to do next.
+ *
+ * Bundles up callbacks, generated by io_next().
+ */
+struct io_next;
+
+/**
+ * io_new_conn - create a new connection.
+ * @fd: the file descriptor.
+ * @start: the first function to call.
+ * @finish: the function to call when it's closed or fails.
+ * @arg: the argument to both @start and @finish.
+ *
+ * This creates a connection which owns @fd.  @start will be called on the
+ * next return to io_loop(), and @finish will be called when an I/O operation
+ * fails, or you call io_close() on the connection.
+ *
+ * The @start function must call one of the io queueing functions
+ * (eg. io_read, io_write) and return the next function to call once
+ * that is done using io_next().  The alternative is to call io_close().
+ *
+ * Returns NULL on error (and sets errno).
+ */
+#define io_new_conn(fd, start, finish, arg)                            \
+       io_new_conn_((fd),                                              \
+                    typesafe_cb_preargs(struct io_op *, void *,        \
+                                        (start), (arg), struct io_conn *), \
+                    typesafe_cb_preargs(void, void *, (finish), (arg), \
+                                        struct io_conn *),             \
+                    (arg))
+struct io_conn *io_new_conn_(int fd,
+                            struct io_op *(*start)(struct io_conn *, void *),
+                            void (*finish)(struct io_conn *, void *),
+                            void *arg);
+
+/**
+ * io_new_listener - create a new accepting listener.
+ * @fd: the file descriptor.
+ * @start: the first function to call on new connections.
+ * @finish: the function to call when the connection is closed or fails.
+ * @arg: the argument to both @start and @finish.
+ *
+ * When @fd becomes readable, we accept() and turn that fd into a new
+ * connection.
+ *
+ * Returns NULL on error (and sets errno).
+ */
+#define io_new_listener(fd, start, finish, arg)                                \
+       io_new_listener_((fd),                                          \
+                        typesafe_cb_preargs(struct io_op *, void *,    \
+                                            (start), (arg),            \
+                                            struct io_conn *),         \
+                        typesafe_cb_preargs(void, void *, (finish),    \
+                                            (arg), struct io_conn *),  \
+                        (arg))
+struct io_listener *io_new_listener_(int fd,
+                                    struct io_op *(*start)(struct io_conn *,
+                                                           void *arg),
+                                    void (*finish)(struct io_conn *,
+                                                   void *arg),
+                                    void *arg);
+
+/**
+ * io_close_listener - delete a listener.
+ * @listener: the listener returned from io_new_listener.
+ *
+ * This closes the fd and frees @listener.
+ */
+void io_close_listener(struct io_listener *listener);
+
+/**
+ * io_write - queue data to be written.
+ * @data: the data buffer.
+ * @len: the length to write.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for writing.  Once it's all written, the
+ * function registered with io_next() will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_write(const void *data, size_t len, struct io_next *next);
+
+/**
+ * io_read - queue buffer to be read.
+ * @data: the data buffer.
+ * @len: the length to read.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for reading.  Once it's all read, the
+ * function registered with io_next() will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_read(void *data, size_t len, struct io_next *next);
+
+/**
+ * io_read_partial - queue buffer to be read (partial OK).
+ * @data: the data buffer.
+ * @len: the maximum length to read, set to the length actually read.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for reading.  Once any data is
+ * read, @len is updated and the function registered with io_next()
+ * will be called: on an error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_read_partial(void *data, size_t *len, struct io_next *next);
+
+/**
+ * io_write_partial - queue data to be written (partial OK).
+ * @data: the data buffer.
+ * @len: the maximum length to write, set to the length actually written.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for writing.  Once any data is
+ * written, @len is updated and the function registered with io_next()
+ * will be called: on an error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_write_partial(const void *data, size_t *len,
+                              struct io_next *next);
+
+/**
+ * io_idle - explicitly note that this connection will do nothing.
+ * @conn: the current connection.
+ *
+ * This indicates the connection is idle: some other function will
+ * later call io_read/io_write etc. (or io_close) on it, in which case
+ * it will do that.
+ */
+struct io_op *io_idle(struct io_conn *conn);
+
+/**
+ * io_wake - wake up and idle connection.
+ * @conn: an idle connection.
+ * @next: the next function to call once queued IO is complete.
+ * @arg: the argument to @next.
+ *
+ * This makes @conn run its @next function the next time around the
+ * io_loop().
+ */
+#define io_wake(conn, next, arg)                                       \
+       io_wake_((conn),                                                \
+                typesafe_cb_preargs(struct io_op *, void *,            \
+                                    (next), (arg), struct io_conn *),  \
+                (arg))
+void io_wake_(struct io_conn *conn,
+             struct io_op *(*next)(struct io_conn *, void *), void *arg);
+
+/**
+ * io_break - return from io_loop()
+ * @arg: non-NULL value to return from io_loop().
+ * @next: what to call next (can be NULL if we expect no return).
+ *
+ * This breaks out of the io_loop.  As soon as the current @next
+ * function returns, any io_closed()'d connections will have their
+ * finish callbacks called, then io_loop() with return with @arg.
+ *
+ * If io_loop() is called again, then @next will be called.
+ */
+struct io_op *io_break(void *arg, struct io_next *next);
+
+/**
+ * io_next - indicate what callback to call next.
+ * @conn: this connection.
+ * @next: the next function to call once queued IO is complete.
+ * @arg: the argument to @next.
+ *
+ * Every @next (or @start) function should "return io_next(...);" once
+ * they have indicated what io to perform (eg. io_write, io_idle).
+ * The exception is io_close(), which can be used instead of io_next().
+ *
+ * Note that as an optimization, the next function may be called
+ * immediately, which is why this should be the last statement in your
+ * function.
+ */
+#define io_next(conn, next, arg)                                       \
+       io_next_((conn),                                                \
+                typesafe_cb_preargs(struct io_op *, void *,            \
+                                    (next), (arg), struct io_conn *),  \
+                (arg))
+struct io_next *io_next_(struct io_conn *conn,
+                        struct io_op *(*next)(struct io_conn *, void *arg),
+                        void *arg);
+
+/* FIXME: io_recvfrom/io_sendto */
+
+/**
+ * io_close - terminate a connection.
+ * @conn: any connection.
+ *
+ * The schedules a connection to be closed.  It can be done on any
+ * connection, whether it has I/O queued or not (though that I/O may
+ * be performed first).
+ *
+ * It's common to 'return io_close(...)' from a @next function, but
+ * io_close can also be used as an argument to io_next().
+ */
+struct io_op *io_close(struct io_conn *, void *unused);
+
+/**
+ * io_loop - process fds until all closed on io_break.
+ *
+ * This is the core loop; it exits with the io_break() arg, or NULL if
+ * all connections and listeners are closed.
+ */
+void *io_loop(void);
+#endif /* CCAN_IO_H */
diff --git a/ccan/io/poll.c b/ccan/io/poll.c
new file mode 100644 (file)
index 0000000..070f6d8
--- /dev/null
@@ -0,0 +1,207 @@
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <assert.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+static size_t num_fds = 0, max_fds = 0, num_next = 0, num_finished = 0;
+static struct pollfd *pollfds = NULL;
+static struct fd **fds = NULL;
+
+static bool add_fd(struct fd *fd, short events)
+{
+       if (num_fds + 1 > max_fds) {
+               struct pollfd *newpollfds;
+               struct fd **newfds;
+               size_t num = max_fds ? max_fds * 2 : 8;
+
+               newpollfds = realloc(pollfds, sizeof(*newpollfds) * num);
+               if (!newpollfds)
+                       return false;
+               pollfds = newpollfds;
+               newfds = realloc(fds, sizeof(*newfds) * num);
+               if (!newfds)
+                       return false;
+               fds = newfds;
+               max_fds = num;
+       }
+
+       pollfds[num_fds].fd = fd->fd;
+       pollfds[num_fds].events = events;
+       pollfds[num_fds].revents = 0; /* In case we're iterating now */
+       fds[num_fds] = fd;
+       fd->backend_info = num_fds;
+       num_fds++;
+       return true;
+}
+
+static void del_fd(struct fd *fd)
+{
+       size_t n = fd->backend_info;
+
+       assert(n != -1);
+       assert(n < num_fds);
+       if (n != num_fds - 1) {
+               /* Move last one over us. */
+               pollfds[n] = pollfds[num_fds-1];
+               fds[n] = fds[num_fds-1];
+               assert(fds[n]->backend_info == num_fds-1);
+               fds[n]->backend_info = n;
+       } else if (num_fds == 1) {
+               /* Free everything when no more fds. */
+               free(pollfds);
+               free(fds);
+               pollfds = NULL;
+               fds = NULL;
+               max_fds = 0;
+       }
+       num_fds--;
+       fd->backend_info = -1;
+       close(fd->fd);
+}
+
+bool add_listener(struct io_listener *l)
+{
+       return add_fd(&l->fd, POLLIN);
+}
+
+bool add_conn(struct io_conn *c)
+{
+       if (!add_fd(&c->fd, 0))
+               return false;
+       num_next++;
+       return true;
+}
+
+static void del_conn(struct io_conn *conn)
+{
+       if (conn->fd.finish)
+               conn->fd.finish(conn, conn->fd.finish_arg);
+       del_fd(&conn->fd);
+       if (conn->state == FINISHED)
+               num_finished--;
+       else if (conn->state == NEXT)
+               num_next--;
+}
+
+void del_listener(struct io_listener *l)
+{
+       del_fd(&l->fd);
+}
+
+void backend_set_state(struct io_conn *conn, struct io_op *op)
+{
+       enum io_state state = from_ioop(op);
+       struct pollfd *pfd = &pollfds[conn->fd.backend_info];
+
+       switch (state) {
+       case READ:
+       case READPART:
+               pfd->events = POLLIN;
+               break;
+       case WRITE:
+       case WRITEPART:
+               pfd->events = POLLOUT;
+               break;
+       case IDLE:
+               pfd->events = 0;
+               break;
+       case NEXT:
+               num_next++;
+               break;
+       case FINISHED:
+               num_finished++;
+               break;
+       default:
+               abort();
+       }
+       conn->state = state;
+}
+
+static void accept_conn(struct io_listener *l)
+{
+       struct io_conn *c;
+       int fd = accept(l->fd.fd, NULL, NULL);
+
+       /* FIXME: What to do here? */
+       if (fd < 0)
+               return;
+       c = io_new_conn(fd, l->fd.next, l->fd.finish, l->fd.next_arg);
+       if (!c) {
+               close(fd);
+               return;
+       }
+}
+
+/* It's OK to miss some, as long as we make progress. */
+static void finish_and_next(bool finished_only)
+{
+       unsigned int i;
+
+       for (i = 0; !io_loop_return && i < num_fds; i++) {
+               struct io_conn *c;
+
+               if (!num_finished) {
+                       if (finished_only || num_next == 0)
+                               break;
+               }
+               if (fds[i]->listener)
+                       continue;
+               c = (void *)fds[i];
+               if (c->state == FINISHED) {
+                       del_conn(c);
+                       free(c);
+                       i--;
+               } else if (!finished_only && c->state == NEXT) {
+                       backend_set_state(c, c->fd.next(c, c->fd.next_arg));
+                       num_next--;
+               }
+       }
+}
+
+/* This is the main loop. */
+void *io_loop(void)
+{
+       void *ret;
+
+       while (!io_loop_return) {
+               int i, r;
+
+               if (num_finished || num_next) {
+                       finish_and_next(false);
+                       /* Could have started/finished more. */
+                       continue;
+               }
+
+               if (num_fds == 0)
+                       break;
+
+               r = poll(pollfds, num_fds, -1);
+               if (r < 0)
+                       break;
+
+               for (i = 0; i < num_fds && !io_loop_return; i++) {
+                       struct io_conn *c = (void *)fds[i];
+                       if (pollfds[i].revents & POLLOUT)
+                               backend_set_state(c, do_writeable(c));
+                       else if (pollfds[i].revents & POLLIN) {
+                               if (fds[i]->listener)
+                                       accept_conn((void *)c);
+                               else
+                                       backend_set_state(c, do_readable(c));
+                       } else if (pollfds[i].revents & POLLHUP) {
+                               backend_set_state(c, io_close(c, NULL));
+                       }
+               }
+       }
+
+       while (num_finished)
+               finish_and_next(true);
+
+       ret = io_loop_return;
+       io_loop_return = NULL;
+       return ret;
+}
diff --git a/ccan/io/test/run-01-start-finish.c b/ccan/io/test/run-01-start-finish.c
new file mode 100644 (file)
index 0000000..03a1e32
--- /dev/null
@@ -0,0 +1,91 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+static struct io_op *start_ok(struct io_conn *conn, int *state)
+{
+       ok1(*state == 0);
+       (*state)++;
+       return io_close(conn, NULL);
+}
+
+static void finish_ok(struct io_conn *conn, int *state)
+{
+       ok1(*state == 1);
+       (*state)++;
+       io_break(state + 1, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       int state = 0;
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd;
+
+       /* This is how many tests you plan to run */
+       plan_tests(9);
+       fd = make_listen_fd("65001", &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, start_ok, finish_ok, &state);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               close(fd);
+               freeaddrinfo(addrinfo);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == &state + 1);
+       ok1(state == 2);
+       io_close_listener(l);
+       ok1(wait(&state));
+       ok1(WIFEXITED(state));
+       ok1(WEXITSTATUS(state) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-02-read.c b/ccan/io/test/run-02-read.c
new file mode 100644 (file)
index 0000000..e59ccb3
--- /dev/null
@@ -0,0 +1,108 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+       int state;
+       char buf[4];
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+       return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(10);
+       d->state = 0;
+       fd = make_listen_fd("65002", &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, start_ok, finish_ok, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               int i;
+
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == d);
+       ok1(d->state == 2);
+       ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+       free(d);
+       io_close_listener(l);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-03-readpartial.c b/ccan/io/test/run-03-readpartial.c
new file mode 100644 (file)
index 0000000..cc1857c
--- /dev/null
@@ -0,0 +1,137 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+       int state;
+       size_t bytes;
+       char buf[4];
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+       d->bytes = sizeof(d->buf);
+       return io_read_partial(d->buf, &d->bytes, io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+static void write_to_socket(const char *str, const struct addrinfo *addrinfo)
+{
+       int fd, i;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               exit(1);
+       if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+               exit(2);
+       signal(SIGPIPE, SIG_IGN);
+       for (i = 0; i < strlen(str); i++) {
+               if (write(fd, str + i, 1) != 1)
+                       break;
+       }
+       close(fd);
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(22);
+       d->state = 0;
+       fd = make_listen_fd("65003", &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, start_ok, finish_ok, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               write_to_socket("hellothere", addrinfo);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(0);
+       }
+       ok1(io_loop() == d);
+       ok1(d->state == 2);
+       ok1(d->bytes > 0);
+       ok1(d->bytes <= sizeof(d->buf));
+       ok1(memcmp(d->buf, "hellothere", d->bytes) == 0);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               write_to_socket("hi", addrinfo);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(0);
+       }
+       d->state = 0;
+       ok1(io_loop() == d);
+       ok1(d->state == 2);
+       ok1(d->bytes > 0);
+       ok1(d->bytes <= strlen("hi"));
+       ok1(memcmp(d->buf, "hi", d->bytes) == 0);
+
+       freeaddrinfo(addrinfo);
+       free(d);
+       io_close_listener(l);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-04-writepartial.c b/ccan/io/test/run-04-writepartial.c
new file mode 100644 (file)
index 0000000..daa81e7
--- /dev/null
@@ -0,0 +1,121 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+       int state;
+       size_t bytes;
+       char *buf;
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+       return io_write_partial(d->buf, &d->bytes, io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+static void read_from_socket(const char *str, const struct addrinfo *addrinfo)
+{
+       int fd;
+       char buf[100];
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               exit(1);
+       if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+               exit(2);
+       if (read(fd, buf, strlen(str)) != strlen(str))
+               exit(3);
+       if (memcmp(buf, str, strlen(str)) != 0)
+               exit(4);
+       close(fd);
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(11);
+       d->state = 0;
+       d->bytes = 1024*1024;
+       d->buf = malloc(d->bytes);
+       memset(d->buf, 'a', d->bytes);
+       fd = make_listen_fd("65004", &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, start_ok, finish_ok, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               read_from_socket("aaaaaa", addrinfo);
+               freeaddrinfo(addrinfo);
+               free(d->buf);
+               free(d);
+               exit(0);
+       }
+       ok1(io_loop() == d);
+       ok1(d->state == 2);
+       ok1(d->bytes > 0);
+       ok1(d->bytes <= 1024*1024);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       freeaddrinfo(addrinfo);
+       free(d->buf);
+       free(d);
+       io_close_listener(l);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-05-write.c b/ccan/io/test/run-05-write.c
new file mode 100644 (file)
index 0000000..83c8c4c
--- /dev/null
@@ -0,0 +1,122 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+       int state;
+       size_t bytes;
+       char *buf;
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+       return io_write(d->buf, d->bytes, io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo)
+{
+       int fd, done, r;
+       char buf[100];
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               exit(1);
+       if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+               exit(2);
+
+       for (done = 0; done < bytes; done += r) {
+               r = read(fd, buf, sizeof(buf));
+               if (r < 0)
+                       exit(3);
+               done += r;
+       }
+       close(fd);
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(9);
+       d->state = 0;
+       d->bytes = 1024*1024;
+       d->buf = malloc(d->bytes);
+       memset(d->buf, 'a', d->bytes);
+       fd = make_listen_fd("65005", &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, start_ok, finish_ok, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               read_from_socket(d->bytes, addrinfo);
+               freeaddrinfo(addrinfo);
+               free(d->buf);
+               free(d);
+               exit(0);
+       }
+       ok1(io_loop() == d);
+       ok1(d->state == 2);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       freeaddrinfo(addrinfo);
+       free(d->buf);
+       free(d);
+       io_close_listener(l);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-06-idle.c b/ccan/io/test/run-06-idle.c
new file mode 100644 (file)
index 0000000..e6144de
--- /dev/null
@@ -0,0 +1,145 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+static struct io_conn *idler;
+
+struct data {
+       int state;
+       char buf[4];
+};
+
+static struct io_op *do_read(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 2 || d->state == 3);
+       d->state++;
+       return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static struct io_op *start_waker(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+
+       io_wake(idler, do_read, d);
+       return io_close(conn, NULL);
+}
+
+static void finish_waker(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 2 || d->state == 3);
+       d->state++;
+}
+
+static struct io_op *start_idle(struct io_conn *conn, struct data *d)
+{
+       int fd;
+
+       ok1(d->state == 0);
+       d->state++;
+       idler = conn;
+
+       /* This will wake us up. */
+       fd = open("/dev/null", O_RDONLY);
+       ok1(fd >= 0);
+       ok1(io_new_conn(fd, start_waker, finish_waker, d));
+
+       return io_idle(conn);
+}
+
+static void finish_idle(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 4);
+       d->state++;
+       io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(15);
+       d->state = 0;
+       fd = make_listen_fd("65006", &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, start_idle, finish_idle, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               int i;
+
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+
+       ok1(io_loop() == d);
+       ok1(d->state == 5);
+       ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+       free(d);
+       io_close_listener(l);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-07-break.c b/ccan/io/test/run-07-break.c
new file mode 100644 (file)
index 0000000..cf3d251
--- /dev/null
@@ -0,0 +1,117 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+       int state;
+       char buf[4];
+};
+
+static struct io_op *do_read(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static struct io_op *start_break(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+       return io_break(d, io_next(conn, do_read, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 2);
+       d->state++;
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(13);
+       d->state = 0;
+       fd = make_listen_fd("65007", &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, start_break, finish_ok, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               int i;
+
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == d);
+       ok1(d->state == 1);
+       io_close_listener(l);
+
+       ok1(io_loop() == NULL);
+       ok1(d->state == 3);
+       ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+       free(d);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-10-many.c b/ccan/io/test/run-10-many.c
new file mode 100644 (file)
index 0000000..6a972ba
--- /dev/null
@@ -0,0 +1,122 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#define NUM 100
+#define NUM_ITERS 1000
+
+struct buffer {
+       int iters;
+       struct io_conn *reader, *writer;
+       char buf[32];
+};
+
+static struct io_op *poke_writer(struct io_conn *conn, struct buffer *buf);
+static struct io_op *poke_reader(struct io_conn *conn, struct buffer *buf);
+
+static struct io_op *do_read(struct io_conn *conn, struct buffer *buf)
+{
+       assert(conn == buf->reader);
+
+       return io_read(&buf->buf, sizeof(buf->buf),
+                      io_next(conn, poke_writer, buf));
+}
+
+static struct io_op *do_write(struct io_conn *conn, struct buffer *buf)
+{
+       assert(conn == buf->writer);
+
+       return io_write(&buf->buf, sizeof(buf->buf),
+                       io_next(conn, poke_reader, buf));
+}
+
+static struct io_op *poke_writer(struct io_conn *conn, struct buffer *buf)
+{
+       assert(conn == buf->reader);
+
+       if (buf->iters == NUM_ITERS)
+               return io_close(conn, NULL);
+
+       /* You write. */
+       io_wake(buf->writer, do_write, buf);
+
+       /* I'll wait until you wake me. */
+       return io_idle(conn);
+}
+
+static struct io_op *poke_reader(struct io_conn *conn, struct buffer *buf)
+{
+       assert(conn == buf->writer);
+       /* You read. */
+       io_wake(buf->reader, do_read, buf);
+
+       if (++buf->iters == NUM_ITERS)
+               return io_close(conn, NULL);
+
+       /* I'll wait until you tell me to write. */
+       return io_idle(conn);
+}
+
+static struct io_op *reader(struct io_conn *conn, struct buffer *buf)
+{
+       assert(conn == buf->reader);
+
+       /* Wait for writer to tell us to read. */
+       return io_idle(conn);
+}
+
+static struct buffer buf[NUM];
+
+int main(void)
+{
+       unsigned int i;
+       int fds[2], last_read, last_write;
+
+       plan_tests(5 + NUM);
+
+       ok1(pipe(fds) == 0);
+       last_read = fds[0];
+       last_write = fds[1];
+
+       for (i = 1; i < NUM; i++) {
+               if (pipe(fds) < 0)
+                       break;
+               memset(buf[i].buf, i, sizeof(buf[i].buf));
+               sprintf(buf[i].buf, "%i-%i", i, i);
+
+               buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+               if (!buf[i].reader)
+                       break;
+               buf[i].writer = io_new_conn(fds[1], do_write, NULL, &buf[i]);
+               if (!buf[i].writer)
+                       break;
+               last_read = fds[0];
+       }
+       if (!ok1(i == NUM))
+               exit(exit_status());
+
+       /* Last one completes the cirle. */
+       i = 0;
+       sprintf(buf[i].buf, "%i-%i", i, i);
+       buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+       ok1(buf[i].reader);
+       buf[i].writer = io_new_conn(last_write, do_write, NULL, &buf[i]);
+       ok1(buf[i].writer);
+
+       /* They should eventually exit */
+       ok1(io_loop() == NULL);
+
+       for (i = 0; i < NUM; i++) {
+               char b[sizeof(buf[0].buf)];
+               memset(b, i, sizeof(b));
+               sprintf(b, "%i-%i", i, i);
+               ok1(memcmp(b, buf[(i + NUM_ITERS) % NUM].buf, sizeof(b)) == 0);
+       }
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}