]> git.ozlabs.org Git - ccan/commitdiff
Merge branch 'io'
authorRusty Russell <rusty@rustcorp.com.au>
Mon, 14 Oct 2013 11:03:51 +0000 (21:33 +1030)
committerRusty Russell <rusty@rustcorp.com.au>
Mon, 14 Oct 2013 11:03:51 +0000 (21:33 +1030)
45 files changed:
Makefile-ccan
ccan/io/LICENSE [new symlink]
ccan/io/SCENARIOS [new file with mode: 0644]
ccan/io/_info [new file with mode: 0644]
ccan/io/backend.h [new file with mode: 0644]
ccan/io/benchmarks/Makefile [new file with mode: 0644]
ccan/io/benchmarks/run-different-speed.c [new file with mode: 0644]
ccan/io/benchmarks/run-length-prefix.c [new file with mode: 0644]
ccan/io/benchmarks/run-loop.c [new file with mode: 0644]
ccan/io/io.c [new file with mode: 0644]
ccan/io/io.h [new file with mode: 0644]
ccan/io/io_plan.h [new file with mode: 0644]
ccan/io/poll.c [new file with mode: 0644]
ccan/io/test/run-01-start-finish-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-01-start-finish.c [new file with mode: 0644]
ccan/io/test/run-02-read-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-02-read.c [new file with mode: 0644]
ccan/io/test/run-03-readpartial-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-03-readpartial.c [new file with mode: 0644]
ccan/io/test/run-04-writepartial-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-04-writepartial.c [new file with mode: 0644]
ccan/io/test/run-05-write-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-05-write.c [new file with mode: 0644]
ccan/io/test/run-06-idle-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-06-idle.c [new file with mode: 0644]
ccan/io/test/run-07-break-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-07-break.c [new file with mode: 0644]
ccan/io/test/run-08-hangup-on-idle-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-08-hangup-on-idle.c [new file with mode: 0644]
ccan/io/test/run-08-read-after-hangup-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-08-read-after-hangup.c [new file with mode: 0644]
ccan/io/test/run-09-connect-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-09-connect.c [new file with mode: 0644]
ccan/io/test/run-10-many-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-10-many.c [new file with mode: 0644]
ccan/io/test/run-12-bidir-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-12-bidir.c [new file with mode: 0644]
ccan/io/test/run-13-all-idle-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-13-all-idle.c [new file with mode: 0644]
ccan/io/test/run-15-timeout-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-15-timeout.c [new file with mode: 0644]
ccan/io/test/run-17-homemade-io-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-17-homemade-io.c [new file with mode: 0644]
ccan/io/test/run-18-errno-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-18-errno.c [new file with mode: 0644]

index 86d6ac0c44e3ca149d1e0fefb2448379bf5df4d3..7781657766f1268690caa0dc73bb2a7a490102a1 100644 (file)
@@ -55,6 +55,7 @@ MODS_WITH_SRC := antithread \
        htable \
        idtree \
        ilog \
+       io \
        isaac \
        iscsi \
        jmap \
diff --git a/ccan/io/LICENSE b/ccan/io/LICENSE
new file mode 120000 (symlink)
index 0000000..dc314ec
--- /dev/null
@@ -0,0 +1 @@
+../../licenses/LGPL-2.1
\ No newline at end of file
diff --git a/ccan/io/SCENARIOS b/ccan/io/SCENARIOS
new file mode 100644 (file)
index 0000000..01bf47a
--- /dev/null
@@ -0,0 +1,88 @@
+Simple:
+       step1(conn): read(conn), then step2
+       step2(conn): write(conn), then close
+
+Pass-through:
+       step1(conn): read(conn), then step2
+       step2(conn): write(otherconn), then step1
+
+Pass-through-and-connect:
+       step1(conn): read(conn), then step2
+       step2(conn): connect(otherconn), then step3
+       step3(conn): write(otherconn), then step1
+
+Chatroom:
+       step1(conn): read(conn), then step2
+       step2(conn): for c in allcons: write(c).  goto step1
+
+Simple:
+
+void event(struct io_event *done)
+{
+       char *buf = done->priv;
+       struct io_event *e;
+
+       e = queue_read(done, done->conn, buf, 100);
+       e = queue_write(e, done->conn, buf, 100);
+       queue_close(e, done->conn);
+}
+
+Pass-through:
+struct passthru {
+       char buf[100];
+       struct conn *rconn, *wconn;
+};
+
+void event(struct io_event *done)
+{
+       struct passthru *p = done->priv;
+       struct io_event *e;
+
+       e = queue_read(done, p->rconn, p->buf, 100);
+       e = queue_write(e, p->wconn, buf, 100);
+       queue_event(e, event);
+}
+
+Chatroom:
+struct list_head clients;
+
+struct buffer {
+       char buf[100];
+       unsigned int ref;
+};
+
+struct client {
+       struct list_node list;
+       struct connection *conn;
+       struct buffer *rbuf, *wbuf;
+};
+
+void broadcast(struct io_event *done)
+{
+       struct client *i, *c = done->conn->priv;
+       struct io_event *e;
+
+       list_for_each(&clients, i, list) {
+               e = queue_write(done, i->conn, c->buf->buf, 100);
+               e->priv = c->buf;
+               c->buf->ref++;
+               queue_event(e, drop_ref);
+       }
+
+
+
+void event(struct io_event *done)
+{
+       struct client *c = done->conn->priv;
+       struct io_event *e;
+
+       assert(c->conn == done->conn);
+       c->buf = malloc(sizeof(*c->buf));
+       c->buf->ref = 0;
+       e = queue_read(done, c->conn, c->buf->buf, 100);
+       e = queue_event(e, broadcast);
+}
+
+
+       step1(conn): read(conn), then step2
+       step2(conn): for c in allcons: write(c).  goto step1
diff --git a/ccan/io/_info b/ccan/io/_info
new file mode 100644 (file)
index 0000000..235e6ba
--- /dev/null
@@ -0,0 +1,167 @@
+#include <stdio.h>
+#include <string.h>
+#include "config.h"
+
+/**
+ * io - simple library for asynchronous io handling.
+ *
+ * io provides a mechanism to write I/O servers with multiple
+ * connections.  Each callback indicates what I/O they plan next
+ * (eg. read, write).  It is also possible to write custom I/O
+ * plans.
+ *
+ * When compiled with DEBUG, control flow is changed so that rather
+ * than returning to the main io_loop(), plans are executed sequentially
+ * providing a backtrace showing what has occurred on that connection.
+ * Which connection(s) do this depends on the user-specified io_debug
+ * function.
+ *
+ * Example:
+ * // Given tr A-Z a-z outputs tr a-z a-z
+ * #include <ccan/io/io.h>
+ * #include <ccan/err/err.h>
+ * #include <assert.h>
+ * #include <stdlib.h>
+ * #include <signal.h>
+ * #include <sys/types.h>
+ * #include <sys/wait.h>
+ *
+ * struct buffer {
+ *     size_t max, off, rlen;
+ *     char *buf;
+ * };
+ *
+ * struct stdin_buffer {
+ *     struct io_conn *reader, *writer;
+ *     size_t len;
+ *     char inbuf[4096];
+ * };
+ *
+ * // This reads from stdin.
+ * static struct io_plan wake_writer(struct io_conn *, struct stdin_buffer *);
+ * // This writes the stdin buffer to the child.
+ * static struct io_plan wake_reader(struct io_conn *, struct stdin_buffer *);
+ *
+ * static struct io_plan wake_writer(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ *     assert(c == b->reader);
+ *     io_wake(b->writer, io_write(b->inbuf, b->len, wake_reader, b));
+ *     return io_idle();
+ * }
+ *
+ * static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ *     assert(c == b->reader);
+ *     io_wake(b->writer, io_close());
+ *     b->reader = NULL;
+ * }
+ *
+ * static struct io_plan wake_reader(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ *     assert(c == b->writer);
+ *     if (!b->reader)
+ *             return io_close();
+ *     b->len = sizeof(b->inbuf);
+ *     io_wake(b->reader, io_read_partial(b->inbuf, &b->len, wake_writer, b));
+ *     return io_idle();
+ * }
+ *
+ * static void fail_child_write(struct io_conn *conn, struct stdin_buffer *b)
+ * {
+ *     if (b->reader)
+ *             err(1, "Failed writing to child.");
+ * }
+ *
+ * // This reads from the child and saves it into buffer.
+ * static struct io_plan read_from_child(struct io_conn *conn,
+ *                                      struct buffer *b)
+ * {
+ *     b->off += b->rlen;
+ *
+ *     if (b->off == b->max)
+ *             b->buf = realloc(b->buf, b->max *= 2);
+ *
+ *     b->rlen = b->max - b->off;
+ *     return io_read_partial(b->buf + b->off, &b->rlen, read_from_child, b);
+ * }
+ *
+ * // Feed a program our stdin, gather its stdout, print that at end.
+ * int main(int argc, char *argv[])
+ * {
+ *     int tochild[2], fromchild[2];
+ *     struct buffer out;
+ *     struct stdin_buffer sbuf;
+ *     int status;
+ *     size_t off;
+ *     ssize_t ret;
+ *     struct io_conn *from_child;
+ *
+ *     if (argc == 1)
+ *             errx(1, "Usage: runner <cmdline>...");
+ *
+ *     if (pipe(tochild) != 0 || pipe(fromchild) != 0)
+ *             err(1, "Creating pipes");
+ *
+ *     if (!fork()) {
+ *             // Child runs command.
+ *             close(tochild[1]);
+ *             close(fromchild[0]);
+ *
+ *             dup2(tochild[0], STDIN_FILENO);
+ *             dup2(fromchild[1], STDOUT_FILENO);
+ *             execvp(argv[1], argv + 1);
+ *             exit(127);
+ *     }
+ *
+ *     close(tochild[0]);
+ *     close(fromchild[1]);
+ *     signal(SIGPIPE, SIG_IGN);
+ *
+ *     sbuf.len = sizeof(sbuf.inbuf);
+ *     sbuf.reader = io_new_conn(STDIN_FILENO,
+ *                               io_read_partial(sbuf.inbuf, &sbuf.len,
+ *                                               wake_writer, &sbuf));
+ *     sbuf.writer = io_new_conn(tochild[1], io_idle());
+ *
+ *     out.max = 128;
+ *     out.off = 0;
+ *     out.rlen = 128;
+ *     out.buf = malloc(out.max);
+ *     from_child = io_new_conn(fromchild[0],
+ *                              io_read_partial(out.buf, &out.rlen,
+ *                                              read_from_child, &out));
+ *     if (!sbuf.reader || !sbuf.writer || !from_child)
+ *             err(1, "Allocating connections");
+ *
+ *     io_set_finish(sbuf.reader, reader_exit, &sbuf);
+ *     io_set_finish(sbuf.writer, fail_child_write, &sbuf);
+ *
+ *     io_loop();
+ *     wait(&status);
+ *
+ *     for (off = 0; off < out.off; off += ret) {
+ *             ret = write(STDOUT_FILENO, out.buf+off, out.off-off);
+ *             if (ret < 0)
+ *                     err(1, "Writing stdout");
+ *     }
+ *     free(out.buf);
+ *
+ *     return WIFEXITED(status) ? WEXITSTATUS(status) : 2;
+ * }
+ *
+ * License: LGPL (v2.1 or any later version)
+ * Author: Rusty Russell <rusty@rustcorp.com.au>
+ */
+int main(int argc, char *argv[])
+{
+       if (argc != 2)
+               return 1;
+
+       if (strcmp(argv[1], "depends") == 0) {
+               printf("ccan/time\n");
+               printf("ccan/timer\n");
+               return 0;
+       }
+
+       return 1;
+}
diff --git a/ccan/io/backend.h b/ccan/io/backend.h
new file mode 100644 (file)
index 0000000..30a338f
--- /dev/null
@@ -0,0 +1,89 @@
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#ifndef CCAN_IO_BACKEND_H
+#define CCAN_IO_BACKEND_H
+#include <stdbool.h>
+#include <ccan/timer/timer.h>
+
+struct fd {
+       int fd;
+       bool listener;
+       size_t backend_info;
+};
+
+/* Listeners create connections. */
+struct io_listener {
+       struct fd fd;
+
+       /* These are for connections we create. */
+       void (*init)(int fd, void *arg);
+       void *arg;
+};
+
+struct io_timeout {
+       struct timer timer;
+       struct io_conn *conn;
+
+       struct io_plan (*next)(struct io_conn *, void *arg);
+       void *next_arg;
+};
+
+/* One connection per client. */
+struct io_conn {
+       struct fd fd;
+
+       void (*finish)(struct io_conn *, void *arg);
+       void *finish_arg;
+
+       struct io_conn *duplex;
+       struct io_timeout *timeout;
+
+       struct io_plan plan;
+};
+
+static inline bool timeout_active(const struct io_conn *conn)
+{
+       return conn->timeout && conn->timeout->conn;
+}
+
+extern void *io_loop_return;
+
+#ifdef DEBUG
+extern struct io_conn *current;
+static inline void set_current(struct io_conn *conn)
+{
+       current = conn;
+}
+static inline bool doing_debug_on(struct io_conn *conn)
+{
+       return io_debug_conn && io_debug_conn(conn);
+}
+static inline bool doing_debug(void)
+{
+       return io_debug_conn;
+}
+#else
+static inline void set_current(struct io_conn *conn)
+{
+}
+static inline bool doing_debug_on(struct io_conn *conn)
+{
+       return false;
+}
+static inline bool doing_debug(void)
+{
+       return false;
+}
+#endif
+
+bool add_listener(struct io_listener *l);
+bool add_conn(struct io_conn *c);
+bool add_duplex(struct io_conn *c);
+void del_listener(struct io_listener *l);
+void backend_plan_changed(struct io_conn *conn);
+void backend_add_timeout(struct io_conn *conn, struct timespec ts);
+void backend_del_timeout(struct io_conn *conn);
+void backend_del_conn(struct io_conn *conn);
+
+void io_ready(struct io_conn *conn);
+void *do_io_loop(struct io_conn **ready);
+#endif /* CCAN_IO_BACKEND_H */
diff --git a/ccan/io/benchmarks/Makefile b/ccan/io/benchmarks/Makefile
new file mode 100644 (file)
index 0000000..0068400
--- /dev/null
@@ -0,0 +1,29 @@
+ALL:=run-loop run-different-speed run-length-prefix
+CCANDIR:=../../..
+CFLAGS:=-Wall -I$(CCANDIR) -O3 -flto
+LDFLAGS:=-O3 -flto
+LDLIBS:=-lrt
+
+OBJS:=time.o poll.o io.o err.o timer.o list.o
+
+default: $(ALL)
+
+run-loop: run-loop.o $(OBJS)
+run-different-speed: run-different-speed.o $(OBJS)
+run-length-prefix: run-length-prefix.o $(OBJS)
+
+time.o: $(CCANDIR)/ccan/time/time.c
+       $(CC) $(CFLAGS) -c -o $@ $<
+timer.o: $(CCANDIR)/ccan/timer/timer.c
+       $(CC) $(CFLAGS) -c -o $@ $<
+list.o: $(CCANDIR)/ccan/list/list.c
+       $(CC) $(CFLAGS) -c -o $@ $<
+poll.o: $(CCANDIR)/ccan/io/poll.c
+       $(CC) $(CFLAGS) -c -o $@ $<
+io.o: $(CCANDIR)/ccan/io/io.c
+       $(CC) $(CFLAGS) -c -o $@ $<
+err.o: $(CCANDIR)/ccan/err/err.c
+       $(CC) $(CFLAGS) -c -o $@ $<
+
+clean:
+       $(RM) -f *.o $(ALL)
diff --git a/ccan/io/benchmarks/run-different-speed.c b/ccan/io/benchmarks/run-different-speed.c
new file mode 100644 (file)
index 0000000..5ee15ea
--- /dev/null
@@ -0,0 +1,176 @@
+/* Simulate a server with connections of different speeds.  We count
+ * how many connections complete in 10 seconds. */
+#include <ccan/io/io.h>
+#include <ccan/time/time.h>
+#include <ccan/err/err.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdio.h>
+#include <signal.h>
+
+#define REQUEST_SIZE 1024
+#define REPLY_SIZE 10240
+#define NUM_CONNS 500 /* per child */
+#define NUM_CHILDREN 2
+
+static unsigned int completed;
+
+struct client {
+       char request_buffer[REQUEST_SIZE];
+       char reply_buffer[REPLY_SIZE];
+};
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client);
+static struct io_plan read_request(struct io_conn *conn, struct client *client)
+{
+       return io_read(client->request_buffer, REQUEST_SIZE,
+                      write_reply, client);
+}
+
+/* once we're done, loop again. */
+static struct io_plan write_complete(struct io_conn *conn, struct client *client)
+{
+       completed++;
+       return read_request(conn, client);
+}
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client)
+{
+       return io_write(client->reply_buffer, REPLY_SIZE,
+                       write_complete, client);
+}
+
+/* This runs in the child. */
+static void create_clients(struct sockaddr_un *addr, int waitfd)
+{
+       struct client data;
+       int i, sock[NUM_CONNS], speed[NUM_CONNS], done[NUM_CONNS], count = 0;
+
+       for (i = 0; i < NUM_CONNS; i++) {
+               /* Set speed. */
+               speed[i] = (1 << (random() % 10));
+               sock[i] = socket(AF_UNIX, SOCK_STREAM, 0);
+               if (sock[i] < 0)
+                       err(1, "creating socket");
+               if (connect(sock[i], (void *)addr, sizeof(*addr)) != 0)
+                       err(1, "connecting socket");
+               /* Make nonblocking. */
+               fcntl(sock[i], F_SETFD, fcntl(sock[i], F_GETFD)|O_NONBLOCK);
+               done[i] = 0;
+       }
+
+       read(waitfd, &i, 1);
+
+       for (;;) {
+               for (i = 0; i < NUM_CONNS; i++) {
+                       int ret, bytes = speed[i];
+                       if (done[i] < REQUEST_SIZE) {
+                               if (REQUEST_SIZE - done[i] < bytes)
+                                       bytes = REQUEST_SIZE - done[i];
+                               ret = write(sock[i], data.request_buffer,
+                                           bytes);
+                               if (ret > 0)
+                                       done[i] += ret;
+                               else if (ret < 0 && errno != EAGAIN)
+                                       goto fail;
+                       } else {
+                               if (REQUEST_SIZE + REPLY_SIZE - done[i] < bytes)
+                                       bytes = REQUEST_SIZE + REPLY_SIZE
+                                               - done[i];
+                               ret = read(sock[i], data.reply_buffer,
+                                           bytes);
+                               if (ret > 0) {
+                                       done[i] += ret;
+                                       if (done[i] == REQUEST_SIZE + REPLY_SIZE) {
+                                               count++;
+                                               done[i] = 0;
+                                       }
+                               } else if (ret < 0 && errno != EAGAIN)
+                                       goto fail;
+                       }
+               }
+       }
+fail:
+       printf("Child did %u\n", count);
+       exit(0);
+}
+
+static int timeout[2];
+static void sigalarm(int sig)
+{
+       write(timeout[1], "1", 1);
+}
+
+static struct io_plan do_timeout(struct io_conn *conn, char *buf)
+{
+       return io_break(buf, io_idle());
+}
+
+int main(int argc, char *argv[])
+{
+       struct client client;
+       unsigned int i, j;
+       struct sockaddr_un addr;
+       struct timespec start, end;
+       int fd, wake[2];
+       char buf;
+
+       addr.sun_family = AF_UNIX;
+       sprintf(addr.sun_path, "/tmp/run-different-speed.sock.%u", getpid());
+
+       if (pipe(wake) != 0 || pipe(timeout) != 0)
+               err(1, "Creating pipes");
+
+       fd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (fd < 0)
+               err(1, "Creating socket");
+
+       if (bind(fd, (void *)&addr, sizeof(addr)) != 0)
+               err(1, "Binding to %s", addr.sun_path);
+
+       if (listen(fd, NUM_CONNS) != 0)
+               err(1, "Listening on %s", addr.sun_path);
+
+       for (i = 0; i < NUM_CHILDREN; i++) {
+               switch (fork()) {
+               case -1:
+                       err(1, "forking");
+               case 0:
+                       close(wake[1]);
+                       create_clients(&addr, wake[0]);
+                       break;
+               }
+               for (j = 0; j < NUM_CONNS; j++) {
+                       int ret = accept(fd, NULL, 0);
+                       if (ret < 0)
+                               err(1, "Accepting fd");
+                       /* For efficiency, we share client structure */
+                       io_new_conn(ret,
+                                   io_read(client.request_buffer, REQUEST_SIZE,
+                                           write_reply, &client));
+               }
+       }
+
+       io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf));
+
+       close(wake[0]);
+       for (i = 0; i < NUM_CHILDREN; i++)
+               write(wake[1], "1", 1);
+
+       signal(SIGALRM, sigalarm);
+       alarm(10);
+       start = time_now();
+       io_loop();
+       end = time_now();
+       close(fd);
+
+       printf("%u connections complete (%u ns per conn)\n",
+              completed,
+              (int)time_to_nsec(time_divide(time_sub(end, start), completed)));
+       return 0;
+}
diff --git a/ccan/io/benchmarks/run-length-prefix.c b/ccan/io/benchmarks/run-length-prefix.c
new file mode 100644 (file)
index 0000000..d88e3af
--- /dev/null
@@ -0,0 +1,181 @@
+/* Simulate a server with connections of different speeds.  We count
+ * how many connections complete in 10 seconds. */
+#include <ccan/io/io.h>
+#include <ccan/time/time.h>
+#include <ccan/err/err.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdio.h>
+#include <signal.h>
+#include <assert.h>
+
+#define REQUEST_MAX 131072
+#define NUM_CONNS 500 /* per child */
+#define NUM_CHILDREN 2
+
+static unsigned int completed;
+
+struct client {
+       unsigned int len;
+       char *request_buffer;
+};
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client);
+static struct io_plan read_body(struct io_conn *conn, struct client *client)
+{
+       assert(client->len <= REQUEST_MAX);
+       return io_read(client->request_buffer, client->len,
+                      write_reply, client);
+}
+
+static struct io_plan io_read_header(struct client *client)
+{
+       return io_read(&client->len, sizeof(client->len), read_body, client);
+}
+
+/* once we're done, loop again. */
+static struct io_plan write_complete(struct io_conn *conn, struct client *client)
+{
+       completed++;
+       return io_read_header(client);
+}
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client)
+{
+       return io_write(&client->len, sizeof(client->len),
+                       write_complete, client);
+}
+
+/* This runs in the child. */
+static void create_clients(struct sockaddr_un *addr, int waitfd)
+{
+       struct client data;
+       int i, sock[NUM_CONNS], len[NUM_CONNS], done[NUM_CONNS],
+               result[NUM_CONNS], count = 0;
+
+       for (i = 0; i < NUM_CONNS; i++) {
+               len[i] = (random() % REQUEST_MAX) + 1;
+               sock[i] = socket(AF_UNIX, SOCK_STREAM, 0);
+               if (sock[i] < 0)
+                       err(1, "creating socket");
+               if (connect(sock[i], (void *)addr, sizeof(*addr)) != 0)
+                       err(1, "connecting socket");
+               /* Make nonblocking. */
+               fcntl(sock[i], F_SETFD, fcntl(sock[i], F_GETFD)|O_NONBLOCK);
+               done[i] = 0;
+       }
+
+       read(waitfd, &i, 1);
+
+       for (;;) {
+               for (i = 0; i < NUM_CONNS; i++) {
+                       int ret, totlen = len[i] + sizeof(len[i]);
+                       if (done[i] < sizeof(len[i]) + len[i]) {
+                               data.len = len[i];
+                               ret = write(sock[i], (void *)&data + done[i],
+                                           totlen - done[i]);
+                               if (ret > 0)
+                                       done[i] += ret;
+                               else if (ret < 0 && errno != EAGAIN)
+                                       goto fail;
+                       } else {
+                               int off = done[i] - totlen;
+                               ret = read(sock[i], (void *)&result[i] + off,
+                                          sizeof(result[i]) - off);
+                               if (ret > 0) {
+                                       done[i] += ret;
+                                       if (done[i] == totlen
+                                           + sizeof(result[i])) {
+                                               assert(result[i] == len[i]);
+                                               count++;
+                                               done[i] = 0;
+                                       }
+                               } else if (ret < 0 && errno != EAGAIN)
+                                       goto fail;
+                       }
+               }
+       }
+fail:
+       printf("Child did %u\n", count);
+       exit(0);
+}
+
+static int timeout[2];
+static void sigalarm(int sig)
+{
+       write(timeout[1], "1", 1);
+}
+
+static struct io_plan do_timeout(struct io_conn *conn, char *buf)
+{
+       return io_break(buf, io_idle());
+}
+
+int main(int argc, char *argv[])
+{
+       unsigned int i, j;
+       struct sockaddr_un addr;
+       struct timespec start, end;
+       char buffer[REQUEST_MAX];
+       int fd, wake[2];
+       char buf;
+
+       addr.sun_family = AF_UNIX;
+       sprintf(addr.sun_path, "/tmp/run-different-speed.sock.%u", getpid());
+
+       if (pipe(wake) != 0 || pipe(timeout) != 0)
+               err(1, "Creating pipes");
+
+       fd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (fd < 0)
+               err(1, "Creating socket");
+
+       if (bind(fd, (void *)&addr, sizeof(addr)) != 0)
+               err(1, "Binding to %s", addr.sun_path);
+
+       if (listen(fd, NUM_CONNS) != 0)
+               err(1, "Listening on %s", addr.sun_path);
+
+       for (i = 0; i < NUM_CHILDREN; i++) {
+               switch (fork()) {
+               case -1:
+                       err(1, "forking");
+               case 0:
+                       close(wake[1]);
+                       create_clients(&addr, wake[0]);
+                       break;
+               }
+               for (j = 0; j < NUM_CONNS; j++) {
+                       struct client *client = malloc(sizeof(*client));
+                       int ret = accept(fd, NULL, 0);
+                       if (ret < 0)
+                               err(1, "Accepting fd");
+                       /* For efficiency, we share buffer */
+                       client->request_buffer = buffer;
+                       io_new_conn(ret, io_read_header(client));
+               }
+       }
+
+       io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf));
+
+       close(wake[0]);
+       for (i = 0; i < NUM_CHILDREN; i++)
+               write(wake[1], "1", 1);
+
+       signal(SIGALRM, sigalarm);
+       alarm(10);
+       start = time_now();
+       io_loop();
+       end = time_now();
+       close(fd);
+
+       printf("%u connections complete (%u ns per conn)\n",
+              completed,
+              (int)time_to_nsec(time_divide(time_sub(end, start), completed)));
+       return 0;
+}
diff --git a/ccan/io/benchmarks/run-loop.c b/ccan/io/benchmarks/run-loop.c
new file mode 100644 (file)
index 0000000..ef01cf6
--- /dev/null
@@ -0,0 +1,112 @@
+#include <ccan/io/io.h>
+#include <ccan/time/time.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+#include <err.h>
+#include <signal.h>
+
+#define NUM 500
+#define NUM_ITERS 10000
+
+struct buffer {
+       int iters;
+       struct io_conn *reader, *writer;
+       char buf[32];
+};
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
+
+static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
+{
+       assert(conn == buf->reader);
+
+       if (buf->iters == NUM_ITERS)
+               return io_close();
+
+       /* You write. */
+       io_wake(buf->writer,
+               io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
+
+       /* I'll wait until you wake me. */
+       return io_idle();
+}
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
+{
+       assert(conn == buf->writer);
+       /* You read. */
+       io_wake(buf->reader,
+               io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
+
+       if (++buf->iters == NUM_ITERS)
+               return io_close();
+
+       /* I'll wait until you tell me to write. */
+       return io_idle();
+}
+
+int main(void)
+{
+       unsigned int i;
+       int fds[2], last_read, last_write;
+       struct timespec start, end;
+       struct buffer buf[NUM];
+
+       if (pipe(fds) != 0)
+               err(1, "pipe");
+       last_read = fds[0];
+       last_write = fds[1];
+
+       for (i = 1; i < NUM; i++) {
+               buf[i].iters = 0;
+               if (pipe(fds) < 0)
+                       err(1, "pipe");
+               memset(buf[i].buf, i, sizeof(buf[i].buf));
+               sprintf(buf[i].buf, "%i-%i", i, i);
+
+               buf[i].reader = io_new_conn(last_read, io_idle());
+               if (!buf[i].reader)
+                       err(1, "Creating reader %i", i);
+               buf[i].writer = io_new_conn(fds[1],
+                                           io_write(&buf[i].buf,
+                                                    sizeof(buf[i].buf),
+                                                    poke_reader, &buf[i]));
+               if (!buf[i].writer)
+                       err(1, "Creating writer %i", i);
+               last_read = fds[0];
+       }
+
+       /* Last one completes the cirle. */
+       i = 0;
+       buf[i].iters = 0;
+       sprintf(buf[i].buf, "%i-%i", i, i);
+       buf[i].reader = io_new_conn(last_read, io_idle());
+       if (!buf[i].reader)
+               err(1, "Creating reader %i", i);
+       buf[i].writer = io_new_conn(last_write, io_write(&buf[i].buf,
+                                                        sizeof(buf[i].buf),
+                                                        poke_reader, &buf[i]));
+       if (!buf[i].writer)
+               err(1, "Creating writer %i", i);
+
+       /* They should eventually exit */
+       start = time_now();
+       if (io_loop() != NULL)
+               errx(1, "io_loop?");
+       end = time_now();
+
+       for (i = 0; i < NUM; i++) {
+               char b[sizeof(buf[0].buf)];
+               memset(b, i, sizeof(b));
+               sprintf(b, "%i-%i", i, i);
+               if (memcmp(b, buf[(i + NUM_ITERS) % NUM].buf, sizeof(b)) != 0)
+                       errx(1, "Buffer for %i was '%s' not '%s'",
+                            i, buf[(i + NUM_ITERS) % NUM].buf, b);
+       }
+
+       printf("run-many: %u %u iterations: %llu usec\n",
+              NUM, NUM_ITERS, (long long)time_to_usec(time_sub(end, start)));
+       return 0;
+}
diff --git a/ccan/io/io.c b/ccan/io/io.c
new file mode 100644 (file)
index 0000000..76f1f44
--- /dev/null
@@ -0,0 +1,469 @@
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <poll.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+void *io_loop_return;
+
+#ifdef DEBUG
+/* Set to skip the next plan. */
+bool io_plan_nodebug;
+/* The current connection to apply plan to. */
+struct io_conn *current;
+/* User-defined function to select which connection(s) to debug. */
+bool (*io_debug_conn)(struct io_conn *conn);
+/* Set when we wake up an connection we are debugging. */
+bool io_debug_wakeup;
+
+struct io_plan io_debug(struct io_plan plan)
+{
+       struct io_conn *ready = NULL;
+
+       if (io_plan_nodebug) {
+               io_plan_nodebug = false;
+               return plan;
+       }
+
+       if (!current || !doing_debug_on(current)) {
+               if (!io_debug_wakeup)
+                       return plan;
+       }
+
+       io_debug_wakeup = false;
+       current->plan = plan;
+       backend_plan_changed(current);
+
+       /* Call back into the loop immediately. */
+       io_loop_return = do_io_loop(&ready);
+
+       if (ready) {
+               set_current(ready);
+               if (!ready->plan.next) {
+                       /* Call finish function immediately. */
+                       if (ready->finish) {
+                               errno = ready->plan.u.close.saved_errno;
+                               ready->finish(ready, ready->finish_arg);
+                               ready->finish = NULL;
+                       }
+                       backend_del_conn(ready);
+               } else {
+                       /* Calls back in itself, via io_debug_io(). */
+                       if (ready->plan.io(ready->fd.fd, &ready->plan) != 2)
+                               abort();
+               }
+               set_current(NULL);
+       }
+
+       /* Return a do-nothing plan, so backend_plan_changed in
+        * io_ready doesn't do anything (it's already been called). */
+       return io_idle_();
+}
+
+int io_debug_io(int ret)
+{
+       /* Cache it for debugging; current changes. */
+       struct io_conn *conn = current;
+       int saved_errno = errno;
+
+       if (!doing_debug_on(conn))
+               return ret;
+
+       /* These will all go linearly through the io_debug() path above. */
+       switch (ret) {
+       case -1:
+               /* This will call io_debug above. */
+               errno = saved_errno;
+               io_close();
+               break;
+       case 0: /* Keep going with plan. */
+               io_debug(conn->plan);
+               break;
+       case 1: /* Done: get next plan. */
+               if (timeout_active(conn))
+                       backend_del_timeout(conn);
+               conn->plan.next(conn, conn->plan.next_arg);
+               break;
+       default:
+               abort();
+       }
+
+       /* Normally-invalid value, used for sanity check. */
+       return 2;
+}
+
+static void debug_io_wake(struct io_conn *conn)
+{
+       /* We want linear if we wake a debugged connection, too. */
+       if (io_debug_conn && io_debug_conn(conn))
+               io_debug_wakeup = true;
+}
+
+/* Counterpart to io_plan_no_debug(), called in macros in io.h */
+static void io_plan_debug_again(void)
+{
+       io_plan_nodebug = false;
+}
+#else
+static void debug_io_wake(struct io_conn *conn)
+{
+}
+static void io_plan_debug_again(void)
+{
+}
+#endif
+
+struct io_listener *io_new_listener_(int fd,
+                                    void (*init)(int fd, void *arg),
+                                    void *arg)
+{
+       struct io_listener *l = malloc(sizeof(*l));
+
+       if (!l)
+               return NULL;
+
+       l->fd.listener = true;
+       l->fd.fd = fd;
+       l->init = init;
+       l->arg = arg;
+       if (!add_listener(l)) {
+               free(l);
+               return NULL;
+       }
+       return l;
+}
+
+void io_close_listener(struct io_listener *l)
+{
+       close(l->fd.fd);
+       del_listener(l);
+       free(l);
+}
+
+struct io_conn *io_new_conn_(int fd, struct io_plan plan)
+{
+       struct io_conn *conn = malloc(sizeof(*conn));
+
+       io_plan_debug_again();
+
+       if (!conn)
+               return NULL;
+
+       conn->fd.listener = false;
+       conn->fd.fd = fd;
+       conn->plan = plan;
+       conn->finish = NULL;
+       conn->finish_arg = NULL;
+       conn->duplex = NULL;
+       conn->timeout = NULL;
+       if (!add_conn(conn)) {
+               free(conn);
+               return NULL;
+       }
+       return conn;
+}
+
+void io_set_finish_(struct io_conn *conn,
+                   void (*finish)(struct io_conn *, void *),
+                   void *arg)
+{
+       conn->finish = finish;
+       conn->finish_arg = arg;
+}
+
+struct io_conn *io_duplex_(struct io_conn *old, struct io_plan plan)
+{
+       struct io_conn *conn;
+
+       io_plan_debug_again();
+
+       assert(!old->duplex);
+
+       conn = malloc(sizeof(*conn));
+       if (!conn)
+               return NULL;
+
+       conn->fd.listener = false;
+       conn->fd.fd = old->fd.fd;
+       conn->plan = plan;
+       conn->duplex = old;
+       conn->finish = NULL;
+       conn->finish_arg = NULL;
+       conn->timeout = NULL;
+       if (!add_duplex(conn)) {
+               free(conn);
+               return NULL;
+       }
+       old->duplex = conn;
+       return conn;
+}
+
+bool io_timeout_(struct io_conn *conn, struct timespec ts,
+                struct io_plan (*cb)(struct io_conn *, void *), void *arg)
+{
+       assert(cb);
+
+       if (!conn->timeout) {
+               conn->timeout = malloc(sizeof(*conn->timeout));
+               if (!conn->timeout)
+                       return false;
+       } else
+               assert(!timeout_active(conn));
+
+       conn->timeout->next = cb;
+       conn->timeout->next_arg = arg;
+       backend_add_timeout(conn, ts);
+       return true;
+}
+
+/* Returns true if we're finished. */
+static int do_write(int fd, struct io_plan *plan)
+{
+       ssize_t ret = write(fd, plan->u.write.buf, plan->u.write.len);
+       if (ret < 0)
+               return io_debug_io(-1);
+
+       plan->u.write.buf += ret;
+       plan->u.write.len -= ret;
+       return io_debug_io(plan->u.write.len == 0);
+}
+
+/* Queue some data to be written. */
+struct io_plan io_write_(const void *data, size_t len,
+                        struct io_plan (*cb)(struct io_conn *, void *),
+                        void *arg)
+{
+       struct io_plan plan;
+
+       assert(cb);
+       plan.u.write.buf = data;
+       plan.u.write.len = len;
+       plan.io = do_write;
+       plan.next = cb;
+       plan.next_arg = arg;
+       plan.pollflag = POLLOUT;
+
+       return plan;
+}
+
+static int do_read(int fd, struct io_plan *plan)
+{
+       ssize_t ret = read(fd, plan->u.read.buf, plan->u.read.len);
+       if (ret <= 0)
+               return io_debug_io(-1);
+
+       plan->u.read.buf += ret;
+       plan->u.read.len -= ret;
+       return io_debug_io(plan->u.read.len == 0);
+}
+
+/* Queue a request to read into a buffer. */
+struct io_plan io_read_(void *data, size_t len,
+                       struct io_plan (*cb)(struct io_conn *, void *),
+                       void *arg)
+{
+       struct io_plan plan;
+
+       assert(cb);
+       plan.u.read.buf = data;
+       plan.u.read.len = len;
+       plan.io = do_read;
+       plan.next = cb;
+       plan.next_arg = arg;
+       plan.pollflag = POLLIN;
+
+       return plan;
+}
+
+static int do_read_partial(int fd, struct io_plan *plan)
+{
+       ssize_t ret = read(fd, plan->u.readpart.buf, *plan->u.readpart.lenp);
+       if (ret <= 0)
+               return io_debug_io(-1);
+
+       *plan->u.readpart.lenp = ret;
+       return io_debug_io(1);
+}
+
+/* Queue a partial request to read into a buffer. */
+struct io_plan io_read_partial_(void *data, size_t *len,
+                               struct io_plan (*cb)(struct io_conn *, void *),
+                               void *arg)
+{
+       struct io_plan plan;
+
+       assert(cb);
+       plan.u.readpart.buf = data;
+       plan.u.readpart.lenp = len;
+       plan.io = do_read_partial;
+       plan.next = cb;
+       plan.next_arg = arg;
+       plan.pollflag = POLLIN;
+
+       return plan;
+}
+
+static int do_write_partial(int fd, struct io_plan *plan)
+{
+       ssize_t ret = write(fd, plan->u.writepart.buf, *plan->u.writepart.lenp);
+       if (ret < 0)
+               return io_debug_io(-1);
+
+       *plan->u.writepart.lenp = ret;
+       return io_debug_io(1);
+}
+
+/* Queue a partial write request. */
+struct io_plan io_write_partial_(const void *data, size_t *len,
+                                struct io_plan (*cb)(struct io_conn*, void *),
+                                void *arg)
+{
+       struct io_plan plan;
+
+       assert(cb);
+       plan.u.writepart.buf = data;
+       plan.u.writepart.lenp = len;
+       plan.io = do_write_partial;
+       plan.next = cb;
+       plan.next_arg = arg;
+       plan.pollflag = POLLOUT;
+
+       return plan;
+}
+
+static int already_connected(int fd, struct io_plan *plan)
+{
+       return io_debug_io(1);
+}
+
+static int do_connect(int fd, struct io_plan *plan)
+{
+       int err, ret;
+       socklen_t len = sizeof(err);
+
+       /* Has async connect finished? */
+       ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
+       if (ret < 0)
+               return -1;
+
+       if (err == 0) {
+               /* Restore blocking if it was initially. */
+               fcntl(fd, F_SETFD, plan->u.len_len.len1);
+               return 1;
+       }
+       return 0;
+}
+
+struct io_plan io_connect_(int fd, const struct addrinfo *addr,
+                          struct io_plan (*cb)(struct io_conn*, void *),
+                          void *arg)
+{
+       struct io_plan plan;
+
+       assert(cb);
+
+       plan.next = cb;
+       plan.next_arg = arg;
+
+       /* Save old flags, set nonblock if not already. */
+       plan.u.len_len.len1 = fcntl(fd, F_GETFD);
+       fcntl(fd, F_SETFD, plan.u.len_len.len1 | O_NONBLOCK);
+
+       /* Immediate connect can happen. */
+       if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0) {
+               /* Dummy will be called immediately. */
+               plan.pollflag = POLLOUT;
+               plan.io = already_connected;
+       } else {
+               if (errno != EINPROGRESS)
+                       return io_close_();
+
+               plan.pollflag = POLLIN;
+               plan.io = do_connect;
+       }
+       return plan;
+}
+
+struct io_plan io_idle_(void)
+{
+       struct io_plan plan;
+
+       plan.pollflag = 0;
+       plan.io = NULL;
+       /* Never called (overridden by io_wake), but NULL means closing */
+       plan.next = (void *)io_idle_;
+
+       return plan;
+}
+
+void io_wake_(struct io_conn *conn, struct io_plan plan)
+
+{
+       io_plan_debug_again();
+
+       /* It might be closing, but we haven't called its finish() yet. */
+       if (!conn->plan.next)
+               return;
+       /* It was idle, right? */
+       assert(!conn->plan.io);
+       conn->plan = plan;
+       backend_plan_changed(conn);
+
+       debug_io_wake(conn);
+}
+
+void io_ready(struct io_conn *conn)
+{
+       set_current(conn);
+       switch (conn->plan.io(conn->fd.fd, &conn->plan)) {
+       case -1: /* Failure means a new plan: close up. */
+               conn->plan = io_close();
+               backend_plan_changed(conn);
+               break;
+       case 0: /* Keep going with plan. */
+               break;
+       case 1: /* Done: get next plan. */
+               if (timeout_active(conn))
+                       backend_del_timeout(conn);
+               conn->plan = conn->plan.next(conn, conn->plan.next_arg);
+               backend_plan_changed(conn);
+       }
+       set_current(NULL);
+}
+
+/* Close the connection, we're done. */
+struct io_plan io_close_(void)
+{
+       struct io_plan plan;
+
+       plan.pollflag = 0;
+       /* This means we're closing. */
+       plan.next = NULL;
+       plan.u.close.saved_errno = errno;
+
+       return plan;
+}
+
+struct io_plan io_close_cb(struct io_conn *conn, void *arg)
+{
+       return io_close();
+}
+
+/* Exit the loop, returning this (non-NULL) arg. */
+struct io_plan io_break_(void *ret, struct io_plan plan)
+{
+       io_plan_debug_again();
+
+       assert(ret);
+       io_loop_return = ret;
+
+       return plan;
+}
diff --git a/ccan/io/io.h b/ccan/io/io.h
new file mode 100644 (file)
index 0000000..b5ffdd2
--- /dev/null
@@ -0,0 +1,493 @@
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#ifndef CCAN_IO_H
+#define CCAN_IO_H
+#include <ccan/typesafe_cb/typesafe_cb.h>
+#include <ccan/time/time.h>
+#include <stdbool.h>
+#include <unistd.h>
+#include "io_plan.h"
+
+/**
+ * io_new_conn - create a new connection.
+ * @fd: the file descriptor.
+ * @plan: the first I/O to perform.
+ *
+ * This creates a connection which owns @fd.  @plan will be called on the
+ * next io_loop().
+ *
+ * Returns NULL on error (and sets errno).
+ *
+ * Example:
+ *     int fd[2];
+ *     struct io_conn *conn;
+ *
+ *     pipe(fd);
+ *     // Plan is to close the fd immediately.
+ *     conn = io_new_conn(fd[0], io_close());
+ *     if (!conn)
+ *             exit(1);
+ */
+#define io_new_conn(fd, plan)                          \
+       (io_plan_no_debug(), io_new_conn_((fd), (plan)))
+struct io_conn *io_new_conn_(int fd, struct io_plan plan);
+
+/**
+ * io_set_finish - set finish function on a connection.
+ * @conn: the connection.
+ * @finish: the function to call when it's closed or fails.
+ * @arg: the argument to @finish.
+ *
+ * @finish will be called when an I/O operation fails, or you call
+ * io_close() on the connection.  errno will be set to the value
+ * after the failed I/O, or at the call to io_close().
+ *
+ * Example:
+ * static void finish(struct io_conn *conn, void *unused)
+ * {
+ *     // errno is not 0 after success, so this is a bit useless.
+ *     printf("Conn %p closed with errno %i\n", conn, errno);
+ * }
+ * ...
+ *     io_set_finish(conn, finish, NULL);
+ */
+#define io_set_finish(conn, finish, arg)                               \
+       io_set_finish_((conn),                                          \
+                      typesafe_cb_preargs(void, void *,                \
+                                          (finish), (arg),             \
+                                          struct io_conn *),           \
+                      (arg))
+void io_set_finish_(struct io_conn *conn,
+                   void (*finish)(struct io_conn *, void *),
+                   void *arg);
+
+/**
+ * io_new_listener - create a new accepting listener.
+ * @fd: the file descriptor.
+ * @init: the function to call for a new connection
+ * @arg: the argument to @init.
+ *
+ * When @fd becomes readable, we accept() and pass that fd to init().
+ *
+ * Returns NULL on error (and sets errno).
+ *
+ * Example:
+ * #include <sys/types.h>
+ * #include <sys/socket.h>
+ * #include <netdb.h>
+ *
+ * static void start_conn(int fd, char *msg)
+ * {
+ *     printf("%s fd %i\n", msg, fd);
+ *     close(fd);
+ * }
+ *
+ * // Set up a listening socket, return it.
+ * static struct io_listener *do_listen(const char *port)
+ * {
+ *     struct addrinfo *addrinfo, hints;
+ *     int fd, on = 1;
+ *
+ *     memset(&hints, 0, sizeof(hints));
+ *     hints.ai_family = AF_UNSPEC;
+ *     hints.ai_socktype = SOCK_STREAM;
+ *     hints.ai_flags = AI_PASSIVE;
+ *     hints.ai_protocol = 0;
+ *
+ *     if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ *             return NULL;
+ *
+ *     fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ *                 addrinfo->ai_protocol);
+ *     if (fd < 0)
+ *             return NULL;
+ *
+ *     freeaddrinfo(addrinfo);
+ *     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ *     if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ *             close(fd);
+ *             return NULL;
+ *     }
+ *     if (listen(fd, 1) != 0) {
+ *             close(fd);
+ *             return NULL;
+ *     }
+ *     return io_new_listener(fd, start_conn, (char *)"Got one!");
+ * }
+ */
+#define io_new_listener(fd, init, arg)                                 \
+       io_new_listener_((fd),                                          \
+                        typesafe_cb_preargs(void, void *,              \
+                                            (init), (arg),             \
+                                            int fd),                   \
+                        (arg))
+struct io_listener *io_new_listener_(int fd,
+                                    void (*init)(int fd, void *arg),
+                                    void *arg);
+
+/**
+ * io_close_listener - delete a listener.
+ * @listener: the listener returned from io_new_listener.
+ *
+ * This closes the fd and frees @listener.
+ *
+ * Example:
+ * ...
+ *     struct io_listener *l = do_listen("8111");
+ *     if (l) {
+ *             io_loop();
+ *             io_close_listener(l);
+ *     }
+ */
+void io_close_listener(struct io_listener *listener);
+
+/**
+ * io_write - plan to write data.
+ * @data: the data buffer.
+ * @len: the length to write.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan write out a data buffer.  Once it's all
+ * written, the @cb function will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * static void start_conn_with_write(int fd, const char *msg)
+ * {
+ *     // Write message, then close.
+ *     io_new_conn(fd, io_write(msg, strlen(msg), io_close_cb, NULL));
+ * }
+ */
+#define io_write(data, len, cb, arg)                                   \
+       io_debug(io_write_((data), (len),                               \
+                          typesafe_cb_preargs(struct io_plan, void *,  \
+                                              (cb), (arg), struct io_conn *), \
+                          (arg)))
+struct io_plan io_write_(const void *data, size_t len,
+                        struct io_plan (*cb)(struct io_conn *, void *),
+                        void *arg);
+
+/**
+ * io_read - plan to read data.
+ * @data: the data buffer.
+ * @len: the length to read.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan to read data into a buffer.  Once it's all
+ * read, the @cb function will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * static void start_conn_with_read(int fd, char msg[12])
+ * {
+ *     // Read message, then close.
+ *     io_new_conn(fd, io_read(msg, 12, io_close_cb, NULL));
+ * }
+ */
+#define io_read(data, len, cb, arg)                                    \
+       io_debug(io_read_((data), (len),                                \
+                         typesafe_cb_preargs(struct io_plan, void *,   \
+                                             (cb), (arg), struct io_conn *), \
+                         (arg)))
+struct io_plan io_read_(void *data, size_t len,
+                       struct io_plan (*cb)(struct io_conn *, void *),
+                       void *arg);
+
+
+/**
+ * io_read_partial - plan to read some data.
+ * @data: the data buffer.
+ * @len: the maximum length to read, set to the length actually read.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan to read data into a buffer.  Once any data is
+ * read, @len is updated and the @cb function will be called: on an
+ * error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * struct buf {
+ *     size_t len;
+ *     char buf[12];
+ * };
+ *
+ * static struct io_plan dump_and_close(struct io_conn *conn, struct buf *b)
+ * {
+ *     printf("Partial read: '%*s'\n", (int)b->len, b->buf);
+ *     free(b);
+ *     return io_close();
+ * }
+ *
+ * static void start_conn_with_part_read(int fd, void *unused)
+ * {
+ *     struct buf *b = malloc(sizeof(*b));
+ *
+ *     // Read message, then dump and close.
+ *     b->len = sizeof(b->buf);
+ *     io_new_conn(fd, io_read_partial(b->buf, &b->len, dump_and_close, b));
+ * }
+ */
+#define io_read_partial(data, len, cb, arg)                            \
+       io_debug(io_read_partial_((data), (len),                        \
+                                 typesafe_cb_preargs(struct io_plan, void *, \
+                                                     (cb), (arg),      \
+                                                     struct io_conn *), \
+                                 (arg)))
+struct io_plan io_read_partial_(void *data, size_t *len,
+                               struct io_plan (*cb)(struct io_conn *, void *),
+                               void *arg);
+
+/**
+ * io_write_partial - plan to write some data.
+ * @data: the data buffer.
+ * @len: the maximum length to write, set to the length actually written.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan to write data from a buffer.   Once any data is
+ * written, @len is updated and the @cb function will be called: on an
+ * error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * struct buf {
+ *     size_t len;
+ *     char buf[12];
+ * };
+ *
+ * static struct io_plan show_remainder(struct io_conn *conn, struct buf *b)
+ * {
+ *     printf("Only wrote: '%*s'\n", (int)b->len, b->buf);
+ *     free(b);
+ *     return io_close();
+ * }
+ *
+ * static void start_conn_with_part_read(int fd, void *unused)
+ * {
+ *     struct buf *b = malloc(sizeof(*b));
+ *
+ *     // Write message, then dump and close.
+ *     b->len = sizeof(b->buf);
+ *     strcpy(b->buf, "Hello world");
+ *     io_new_conn(fd, io_write_partial(b->buf, &b->len, show_remainder, b));
+ * }
+ */
+#define io_write_partial(data, len, cb, arg)                           \
+       io_debug(io_write_partial_((data), (len),                       \
+                                  typesafe_cb_preargs(struct io_plan, void *, \
+                                                      (cb), (arg),     \
+                                                      struct io_conn *), \
+                                  (arg)))
+struct io_plan io_write_partial_(const void *data, size_t *len,
+                                struct io_plan (*cb)(struct io_conn *, void*),
+                                void *arg);
+
+/**
+ * io_connect - plan to connect to a listening socket.
+ * @fd: file descriptor.
+ * @addr: where to connect.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This initiates a connection, and creates a plan for
+ * (asynchronously).  completing it.  Once complete, @len is updated
+ * and the @cb function will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the connect may actually be done immediately.
+ *
+ * Example:
+ * #include <sys/types.h>
+ * #include <sys/socket.h>
+ * #include <netdb.h>
+ *
+ * // Write, then close socket.
+ * static struct io_plan start_write(struct io_conn *conn, void *unused)
+ * {
+ *     return io_write("hello", 5, io_close_cb, NULL);
+ * }
+ *
+ * ...
+ *
+ *     int fd;
+ *     struct addrinfo *addrinfo;
+ *
+ *     fd = socket(AF_INET, SOCK_STREAM, 0);
+ *     getaddrinfo("localhost", "8111", NULL, &addrinfo);
+ *     io_new_conn(fd, io_connect(fd, addrinfo, start_write, NULL));
+ */
+struct addrinfo;
+#define io_connect(fd, addr, cb, arg)                                  \
+       io_debug(io_connect_((fd), (addr),                              \
+                            typesafe_cb_preargs(struct io_plan, void *, \
+                                                (cb), (arg),           \
+                                                struct io_conn *),     \
+                            (arg)))
+struct io_plan io_connect_(int fd, const struct addrinfo *addr,
+                          struct io_plan (*cb)(struct io_conn *, void*),
+                          void *arg);
+
+/**
+ * io_idle - plan to do nothing.
+ *
+ * This indicates the connection is idle: io_wake() will be called later do
+ * give the connection a new plan.
+ *
+ * Example:
+ *     struct io_conn *sleeper;
+ *     sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ *     if (!sleeper)
+ *             exit(1);
+ */
+#define io_idle() io_debug(io_idle_())
+struct io_plan io_idle_(void);
+
+/**
+ * io_timeout - set timeout function if the callback doesn't complete.
+ * @conn: the current connection.
+ * @ts: how long until the timeout should be called.
+ * @cb: callback to call.
+ * @arg: argument to @cb.
+ *
+ * If the usual next callback is not called for this connection before @ts,
+ * this function will be called.  If next callback is called, the timeout
+ * is automatically removed.
+ *
+ * Returns false on allocation failure.  A connection can only have one
+ * timeout.
+ *
+ * Example:
+ *     static struct io_plan close_on_timeout(struct io_conn *conn, char *msg)
+ *     {
+ *             printf("%s\n", msg);
+ *             return io_close();
+ *     }
+ *
+ *     ...
+ *     io_timeout(sleeper, time_from_msec(100),
+ *                close_on_timeout, (char *)"Bye!");
+ */
+#define io_timeout(conn, ts, fn, arg)                                  \
+       io_timeout_((conn), (ts),                                       \
+                   typesafe_cb_preargs(struct io_plan, void *,         \
+                                       (fn), (arg),                    \
+                                       struct io_conn *),              \
+                   (arg))
+bool io_timeout_(struct io_conn *conn, struct timespec ts,
+                struct io_plan (*fn)(struct io_conn *, void *), void *arg);
+
+/**
+ * io_duplex - split an fd into two connections.
+ * @conn: a connection.
+ * @plan: the first I/O function to call.
+ *
+ * Sometimes you want to be able to simultaneously read and write on a
+ * single fd, but io forces a linear call sequence.  The solution is
+ * to have two connections for the same fd, and use one for read
+ * operations and one for write.
+ *
+ * You must io_close() both of them to close the fd.
+ *
+ * Example:
+ *     static void setup_read_write(int fd,
+ *                                  char greet_in[5], const char greet_out[5])
+ *     {
+ *             struct io_conn *writer, *reader;
+ *
+ *             // Read their greeting and send ours at the same time.
+ *             writer = io_new_conn(fd,
+ *                                  io_write(greet_out, 5, io_close_cb, NULL));
+ *             reader = io_duplex(writer,
+ *                                  io_read(greet_in, 5, io_close_cb, NULL));
+ *             if (!reader || !writer)
+ *                     exit(1);
+ *     }
+ */
+#define io_duplex(conn, plan)                          \
+       (io_plan_no_debug(), io_duplex_((conn), (plan)))
+struct io_conn *io_duplex_(struct io_conn *conn, struct io_plan plan);
+
+/**
+ * io_wake - wake up an idle connection.
+ * @conn: an idle connection.
+ * @plan: the next I/O plan for @conn.
+ *
+ * This makes @conn ready to do I/O the next time around the io_loop().
+ *
+ * Example:
+ *     struct io_conn *sleeper;
+ *     sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ *
+ *     io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL));
+ */
+#define io_wake(conn, plan) (io_plan_no_debug(), io_wake_((conn), (plan)))
+void io_wake_(struct io_conn *conn, struct io_plan plan);
+
+/**
+ * io_break - return from io_loop()
+ * @ret: non-NULL value to return from io_loop().
+ * @plan: I/O to perform on return (if any)
+ *
+ * This breaks out of the io_loop.  As soon as the current @next
+ * function returns, any io_closed()'d connections will have their
+ * finish callbacks called, then io_loop() with return with @ret.
+ *
+ * If io_loop() is called again, then @plan will be carried out.
+ *
+ * Example:
+ *     static struct io_plan fail_on_timeout(struct io_conn *conn, char *msg)
+ *     {
+ *             return io_break(msg, io_close());
+ *     }
+ */
+#define io_break(ret, plan) (io_plan_no_debug(), io_break_((ret), (plan)))
+struct io_plan io_break_(void *ret, struct io_plan plan);
+
+/* FIXME: io_recvfrom/io_sendto */
+
+/**
+ * io_close - plan to close a connection.
+ *
+ * On return to io_loop, the connection will be closed.
+ *
+ * Example:
+ * static struct io_plan close_on_timeout(struct io_conn *conn, const char *msg)
+ * {
+ *     printf("closing: %s\n", msg);
+ *     return io_close();
+ * }
+ */
+#define io_close() io_debug(io_close_())
+struct io_plan io_close_(void);
+
+/**
+ * io_close_cb - helper callback to close a connection.
+ * @conn: the connection.
+ *
+ * This schedules a connection to be closed; designed to be used as
+ * a callback function.
+ *
+ * Example:
+ *     #define close_on_timeout io_close_cb
+ */
+struct io_plan io_close_cb(struct io_conn *, void *unused);
+
+/**
+ * io_loop - process fds until all closed on io_break.
+ *
+ * This is the core loop; it exits with the io_break() arg, or NULL if
+ * all connections and listeners are closed.
+ *
+ * Example:
+ *     io_loop();
+ */
+void *io_loop(void);
+#endif /* CCAN_IO_H */
diff --git a/ccan/io/io_plan.h b/ccan/io/io_plan.h
new file mode 100644 (file)
index 0000000..57c7b97
--- /dev/null
@@ -0,0 +1,150 @@
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#ifndef CCAN_IO_PLAN_H
+#define CCAN_IO_PLAN_H
+struct io_conn;
+
+/**
+ * struct io_plan - a plan of what I/O to do.
+ * @pollflag: POLLIN or POLLOUT.
+ * @io: function to call when fd is available for @pollflag.
+ * @next: function to call after @io returns true.
+ * @next_arg: argument to @next.
+ * @u: scratch area for I/O.
+ *
+ * When the fd is POLLIN or POLLOUT (according to @pollflag), @io is
+ * called.  If it returns -1, io_close() becomed the new plan (and errno
+ * is saved).  If it returns 1, @next is called, otherwise @io is
+ * called again when @pollflag is available.
+ *
+ * You can use this to write your own io_plan functions.
+ */
+struct io_plan {
+       int pollflag;
+       /* Only NULL if idle. */
+       int (*io)(int fd, struct io_plan *plan);
+       /* Only NULL if closing. */
+       struct io_plan (*next)(struct io_conn *, void *arg);
+       void *next_arg;
+
+       union {
+               struct {
+                       char *buf;
+                       size_t len;
+               } read;
+               struct {
+                       const char *buf;
+                       size_t len;
+               } write;
+               struct {
+                       char *buf;
+                       size_t *lenp;
+               } readpart;
+               struct {
+                       const char *buf;
+                       size_t *lenp;
+               } writepart;
+               struct {
+                       int saved_errno;
+               } close;
+               struct {
+                       void *p;
+                       size_t len;
+               } ptr_len;
+               struct {
+                       void *p1;
+                       void *p2;
+               } ptr_ptr;
+               struct {
+                       size_t len1;
+                       size_t len2;
+               } len_len;
+       } u;
+};
+
+#ifdef DEBUG
+/**
+ * io_debug_conn - routine to select connection(s) to debug.
+ *
+ * If this is set, the routine should return true if the connection is a
+ * debugging candidate.  If so, the callchain for I/O operations on this
+ * connection will be linear, for easier use of a debugger.
+ *
+ * You will also see calls to any callbacks which wake the connection
+ * which is being debugged.
+ *
+ * Example:
+ *     static bool debug_all(struct io_conn *conn)
+ *     {
+ *             return true();
+ *     }
+ *     ...
+ *     io_debug_conn = debug_all;
+ */
+extern bool (*io_debug_conn)(struct io_conn *conn);
+
+/**
+ * io_debug - if we're debugging the current connection, call immediately.
+ *
+ * This determines if we are debugging the current connection: if so,
+ * it immediately applies the plan and calls back into io_loop() to
+ * create a linear call chain.
+ *
+ * Example:
+ *     #define io_idle() io_debug(io_idle_())
+ *     struct io_plan io_idle_(void);
+ */
+struct io_plan io_debug(struct io_plan plan);
+
+/**
+ * io_debug_io - return from function which actually does I/O.
+ *
+ * This determines if we are debugging the current connection: if so,
+ * it immediately sets the next function and calls into io_loop() to
+ * create a linear call chain.
+ *
+ * Example:
+ *
+ * static int do_write(int fd, struct io_plan *plan)
+ * {
+ *     ssize_t ret = write(fd, plan->u.write.buf, plan->u.write.len);
+ *     if (ret < 0)
+ *             return io_debug_io(-1);
+ *
+ *     plan->u.write.buf += ret;
+ *     plan->u.write.len -= ret;
+ *     return io_debug_io(plan->u.write.len == 0);
+ * }
+ */
+int io_debug_io(int ret);
+
+/**
+ * io_plan_no_debug - mark the next plan not to be called immediately.
+ *
+ * Most routines which take a plan are about to apply it to the current
+ * connection.  We (ab)use this pattern for debugging: as soon as such a
+ * plan is created it is called, to create a linear call chain.
+ *
+ * Some routines, like io_break(), io_duplex() and io_wake() take an
+ * io_plan, but they must not be applied immediately to the current
+ * connection, so we call this first.
+ *
+ * Example:
+ * #define io_break(ret, plan) (io_plan_no_debug(), io_break_((ret), (plan)))
+ * struct io_plan io_break_(void *ret, struct io_plan plan);
+ */
+#define io_plan_no_debug() ((io_plan_nodebug = true))
+
+extern bool io_plan_nodebug;
+#else
+static inline struct io_plan io_debug(struct io_plan plan)
+{
+       return plan;
+}
+static inline int io_debug_io(int ret)
+{
+       return ret;
+}
+#define io_plan_no_debug() (void)0
+#endif
+
+#endif /* CCAN_IO_PLAN_H */
diff --git a/ccan/io/poll.c b/ccan/io/poll.c
new file mode 100644 (file)
index 0000000..31a5660
--- /dev/null
@@ -0,0 +1,388 @@
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <assert.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <limits.h>
+#include <errno.h>
+
+static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0;
+static struct pollfd *pollfds = NULL;
+static struct fd **fds = NULL;
+static struct timers timeouts;
+#ifdef DEBUG
+static unsigned int io_loop_level;
+static struct io_conn *free_later;
+static void io_loop_enter(void)
+{
+       io_loop_level++;
+}
+static void io_loop_exit(void)
+{
+       io_loop_level--;
+       if (io_loop_level == 0) {
+               /* Delayed free. */
+               while (free_later) {
+                       struct io_conn *c = free_later;
+                       free_later = c->finish_arg;
+                       free(c);
+               }
+       }
+}
+static void free_conn(struct io_conn *conn)
+{
+       /* Only free on final exit: chain via finish. */
+       if (io_loop_level > 1) {
+               struct io_conn *c;
+               for (c = free_later; c; c = c->finish_arg)
+                       assert(c != conn);
+               conn->finish_arg = free_later;
+               free_later = conn;
+       } else
+               free(conn);
+}
+#else
+static void io_loop_enter(void)
+{
+}
+static void io_loop_exit(void)
+{
+}
+static void free_conn(struct io_conn *conn)
+{
+       free(conn);
+}
+#endif
+
+static bool add_fd(struct fd *fd, short events)
+{
+       if (num_fds + 1 > max_fds) {
+               struct pollfd *newpollfds;
+               struct fd **newfds;
+               size_t num = max_fds ? max_fds * 2 : 8;
+
+               newpollfds = realloc(pollfds, sizeof(*newpollfds) * num);
+               if (!newpollfds)
+                       return false;
+               pollfds = newpollfds;
+               newfds = realloc(fds, sizeof(*newfds) * num);
+               if (!newfds)
+                       return false;
+               fds = newfds;
+               max_fds = num;
+       }
+
+       pollfds[num_fds].events = events;
+       /* In case it's idle. */
+       if (!events)
+               pollfds[num_fds].fd = -fd->fd;
+       else
+               pollfds[num_fds].fd = fd->fd;
+       pollfds[num_fds].revents = 0; /* In case we're iterating now */
+       fds[num_fds] = fd;
+       fd->backend_info = num_fds;
+       num_fds++;
+       if (events)
+               num_waiting++;
+
+       return true;
+}
+
+static void del_fd(struct fd *fd)
+{
+       size_t n = fd->backend_info;
+
+       assert(n != -1);
+       assert(n < num_fds);
+       if (pollfds[n].events)
+               num_waiting--;
+       if (n != num_fds - 1) {
+               /* Move last one over us. */
+               pollfds[n] = pollfds[num_fds-1];
+               fds[n] = fds[num_fds-1];
+               assert(fds[n]->backend_info == num_fds-1);
+               fds[n]->backend_info = n;
+       } else if (num_fds == 1) {
+               /* Free everything when no more fds. */
+               free(pollfds);
+               free(fds);
+               pollfds = NULL;
+               fds = NULL;
+               max_fds = 0;
+       }
+       num_fds--;
+       fd->backend_info = -1;
+       close(fd->fd);
+}
+
+bool add_listener(struct io_listener *l)
+{
+       if (!add_fd(&l->fd, POLLIN))
+               return false;
+       return true;
+}
+
+void backend_plan_changed(struct io_conn *conn)
+{
+       struct pollfd *pfd;
+
+       /* This can happen with debugging and delayed free... */
+       if (conn->fd.backend_info == -1)
+               return;
+
+       pfd = &pollfds[conn->fd.backend_info];
+
+       if (pfd->events)
+               num_waiting--;
+
+       pfd->events = conn->plan.pollflag;
+       if (conn->duplex) {
+               int mask = conn->duplex->plan.pollflag;
+               /* You can't *both* read/write. */
+               assert(!mask || pfd->events != mask);
+               pfd->events |= mask;
+       }
+       if (pfd->events) {
+               num_waiting++;
+               pfd->fd = conn->fd.fd;
+       } else
+               pfd->fd = -conn->fd.fd;
+
+       if (!conn->plan.next)
+               num_closing++;
+}
+
+bool add_conn(struct io_conn *c)
+{
+       if (!add_fd(&c->fd, c->plan.pollflag))
+               return false;
+       /* Immediate close is allowed. */
+       if (!c->plan.next)
+               num_closing++;
+       return true;
+}
+
+bool add_duplex(struct io_conn *c)
+{
+       c->fd.backend_info = c->duplex->fd.backend_info;
+       backend_plan_changed(c);
+       return true;
+}
+
+void backend_del_conn(struct io_conn *conn)
+{
+       if (conn->finish) {
+               errno = conn->plan.u.close.saved_errno;
+               conn->finish(conn, conn->finish_arg);
+       }
+       if (timeout_active(conn))
+               backend_del_timeout(conn);
+       free(conn->timeout);
+       if (conn->duplex) {
+               /* In case fds[] pointed to the other one. */
+               fds[conn->fd.backend_info] = &conn->duplex->fd;
+               conn->duplex->duplex = NULL;
+               conn->fd.backend_info = -1;
+       } else
+               del_fd(&conn->fd);
+       num_closing--;
+       free_conn(conn);
+}
+
+void del_listener(struct io_listener *l)
+{
+       del_fd(&l->fd);
+}
+
+static void set_plan(struct io_conn *conn, struct io_plan plan)
+{
+       conn->plan = plan;
+       backend_plan_changed(conn);
+}
+
+static void accept_conn(struct io_listener *l)
+{
+       int fd = accept(l->fd.fd, NULL, NULL);
+
+       /* FIXME: What to do here? */
+       if (fd < 0)
+               return;
+       l->init(fd, l->arg);
+}
+
+/* It's OK to miss some, as long as we make progress. */
+static bool finish_conns(struct io_conn **ready)
+{
+       unsigned int i;
+
+       for (i = 0; !io_loop_return && i < num_fds; i++) {
+               struct io_conn *c, *duplex;
+
+               if (!num_closing)
+                       break;
+
+               if (fds[i]->listener)
+                       continue;
+               c = (void *)fds[i];
+               for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
+                       if (!c->plan.next) {
+                               if (doing_debug_on(c) && ready) {
+                                       *ready = c;
+                                       return true;
+                               }
+                               backend_del_conn(c);
+                               i--;
+                       }
+               }
+       }
+       return false;
+}
+
+void backend_add_timeout(struct io_conn *conn, struct timespec duration)
+{
+       if (!timeouts.base)
+               timers_init(&timeouts, time_now());
+       timer_add(&timeouts, &conn->timeout->timer,
+                 time_add(time_now(), duration));
+       conn->timeout->conn = conn;
+}
+
+void backend_del_timeout(struct io_conn *conn)
+{
+       assert(conn->timeout->conn == conn);
+       timer_del(&timeouts, &conn->timeout->timer);
+       conn->timeout->conn = NULL;
+}
+
+/* This is the main loop. */
+void *do_io_loop(struct io_conn **ready)
+{
+       void *ret;
+
+       io_loop_enter();
+
+       while (!io_loop_return) {
+               int i, r, timeout = INT_MAX;
+               struct timespec now;
+               bool some_timeouts = false;
+
+               if (timeouts.base) {
+                       struct timespec first;
+                       struct list_head expired;
+                       struct io_timeout *t;
+
+                       now = time_now();
+
+                       /* Call functions for expired timers. */
+                       timers_expire(&timeouts, now, &expired);
+                       while ((t = list_pop(&expired, struct io_timeout, timer.list))) {
+                               struct io_conn *conn = t->conn;
+                               /* Clear, in case timer re-adds */
+                               t->conn = NULL;
+                               set_current(conn);
+                               set_plan(conn, t->next(conn, t->next_arg));
+                               some_timeouts = true;
+                       }
+
+                       /* Now figure out how long to wait for the next one. */
+                       if (timer_earliest(&timeouts, &first)) {
+                               uint64_t f = time_to_msec(time_sub(first, now));
+                               if (f < INT_MAX)
+                                       timeout = f;
+                       }
+               }
+
+               if (num_closing) {
+                       /* If this finishes a debugging con, return now. */
+                       if (finish_conns(ready))
+                               return NULL;
+                       /* Could have started/finished more. */
+                       continue;
+               }
+
+               /* debug can recurse on io_loop; anything can change. */
+               if (doing_debug() && some_timeouts)
+                       continue;
+
+               if (num_fds == 0)
+                       break;
+
+               /* You can't tell them all to go to sleep! */
+               assert(num_waiting);
+
+               r = poll(pollfds, num_fds, timeout);
+               if (r < 0)
+                       break;
+
+               for (i = 0; i < num_fds && !io_loop_return; i++) {
+                       struct io_conn *c = (void *)fds[i];
+                       int events = pollfds[i].revents;
+
+                       if (r == 0)
+                               break;
+
+                       if (fds[i]->listener) {
+                               if (events & POLLIN) {
+                                       accept_conn((void *)c);
+                                       r--;
+                               }
+                       } else if (events & (POLLIN|POLLOUT)) {
+                               r--;
+                               if (c->duplex) {
+                                       int mask = c->duplex->plan.pollflag;
+                                       if (events & mask) {
+                                               if (doing_debug_on(c->duplex)
+                                                       && ready) {
+                                                       *ready = c->duplex;
+                                                       return NULL;
+                                               }
+                                               io_ready(c->duplex);
+                                               events &= ~mask;
+                                               /* debug can recurse;
+                                                * anything can change. */
+                                               if (doing_debug())
+                                                       break;
+                                               if (!(events&(POLLIN|POLLOUT)))
+                                                       continue;
+                                       }
+                               }
+                               if (doing_debug_on(c) && ready) {
+                                       *ready = c;
+                                       return NULL;
+                               }
+                               io_ready(c);
+                               /* debug can recurse; anything can change. */
+                               if (doing_debug())
+                                       break;
+                       } else if (events & (POLLHUP|POLLNVAL|POLLERR)) {
+                               r--;
+                               set_current(c);
+                               errno = EBADF;
+                               set_plan(c, io_close());
+                               if (c->duplex) {
+                                       set_current(c->duplex);
+                                       set_plan(c->duplex, io_close());
+                               }
+                       }
+               }
+       }
+
+       while (num_closing && !io_loop_return) {
+               if (finish_conns(ready))
+                       return NULL;
+       }
+
+       ret = io_loop_return;
+       io_loop_return = NULL;
+
+       io_loop_exit();
+       return ret;
+}
+
+void *io_loop(void)
+{
+       return do_io_loop(NULL);
+}
diff --git a/ccan/io/test/run-01-start-finish-DEBUG.c b/ccan/io/test/run-01-start-finish-DEBUG.c
new file mode 100644 (file)
index 0000000..9e33f2b
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64001"
+#define main real_main
+int real_main(void);
+#include "run-01-start-finish.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-01-start-finish.c b/ccan/io/test/run-01-start-finish.c
new file mode 100644 (file)
index 0000000..a63baf7
--- /dev/null
@@ -0,0 +1,95 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65001"
+#endif
+
+static void finish_ok(struct io_conn *conn, int *state)
+{
+       ok1(*state == 1);
+       (*state)++;
+       io_break(state + 1, io_idle());
+}
+
+static void init_conn(int fd, int *state)
+{
+       ok1(*state == 0);
+       (*state)++;
+       io_set_finish(io_new_conn(fd, io_close()), finish_ok, state);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       int state = 0;
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd;
+
+       /* This is how many tests you plan to run */
+       plan_tests(9);
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, &state);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               close(fd);
+               freeaddrinfo(addrinfo);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == &state + 1);
+       ok1(state == 2);
+       io_close_listener(l);
+       ok1(wait(&state));
+       ok1(WIFEXITED(state));
+       ok1(WEXITSTATUS(state) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-02-read-DEBUG.c b/ccan/io/test/run-02-read-DEBUG.c
new file mode 100644 (file)
index 0000000..5ca2781
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64002"
+#define main real_main
+int real_main(void);
+#include "run-02-read.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-02-read.c b/ccan/io/test/run-02-read.c
new file mode 100644 (file)
index 0000000..c7a7eaf
--- /dev/null
@@ -0,0 +1,115 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65002"
+#endif
+
+struct data {
+       int state;
+       char buf[4];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+
+       io_set_finish(io_new_conn(fd,
+                                 io_read(d->buf, sizeof(d->buf), io_close_cb, d)),
+                     finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(10);
+       d->state = 0;
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               int i;
+
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == d);
+       ok1(d->state == 2);
+       ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+       free(d);
+       io_close_listener(l);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-03-readpartial-DEBUG.c b/ccan/io/test/run-03-readpartial-DEBUG.c
new file mode 100644 (file)
index 0000000..c473b65
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64003"
+#define main real_main
+int real_main(void);
+#include "run-03-readpartial.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-03-readpartial.c b/ccan/io/test/run-03-readpartial.c
new file mode 100644 (file)
index 0000000..7ecccc7
--- /dev/null
@@ -0,0 +1,144 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65003"
+#endif
+
+struct data {
+       int state;
+       size_t bytes;
+       char buf[4];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+       d->bytes = sizeof(d->buf);
+
+       io_set_finish(io_new_conn(fd,
+                                 io_read_partial(d->buf, &d->bytes, io_close_cb, d)),
+                     finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+static void write_to_socket(const char *str, const struct addrinfo *addrinfo)
+{
+       int fd, i;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               exit(1);
+       if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+               exit(2);
+       signal(SIGPIPE, SIG_IGN);
+       for (i = 0; i < strlen(str); i++) {
+               if (write(fd, str + i, 1) != 1)
+                       break;
+       }
+       close(fd);
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(22);
+       d->state = 0;
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               write_to_socket("hellothere", addrinfo);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(0);
+       }
+       ok1(io_loop() == d);
+       ok1(d->state == 2);
+       ok1(d->bytes > 0);
+       ok1(d->bytes <= sizeof(d->buf));
+       ok1(memcmp(d->buf, "hellothere", d->bytes) == 0);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               write_to_socket("hi", addrinfo);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(0);
+       }
+       d->state = 0;
+       ok1(io_loop() == d);
+       ok1(d->state == 2);
+       ok1(d->bytes > 0);
+       ok1(d->bytes <= strlen("hi"));
+       ok1(memcmp(d->buf, "hi", d->bytes) == 0);
+
+       freeaddrinfo(addrinfo);
+       free(d);
+       io_close_listener(l);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-04-writepartial-DEBUG.c b/ccan/io/test/run-04-writepartial-DEBUG.c
new file mode 100644 (file)
index 0000000..fa65bcf
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64004"
+#define main real_main
+int real_main(void);
+#include "run-04-writepartial.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-04-writepartial.c b/ccan/io/test/run-04-writepartial.c
new file mode 100644 (file)
index 0000000..8cfb2ab
--- /dev/null
@@ -0,0 +1,127 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65004"
+#endif
+
+struct data {
+       int state;
+       size_t bytes;
+       char *buf;
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+       io_set_finish(io_new_conn(fd,
+                                 io_write_partial(d->buf, &d->bytes, io_close_cb, d)),
+                                 finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+static void read_from_socket(const char *str, const struct addrinfo *addrinfo)
+{
+       int fd;
+       char buf[100];
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               exit(1);
+       if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+               exit(2);
+       if (read(fd, buf, strlen(str)) != strlen(str))
+               exit(3);
+       if (memcmp(buf, str, strlen(str)) != 0)
+               exit(4);
+       close(fd);
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(11);
+       d->state = 0;
+       d->bytes = 1024*1024;
+       d->buf = malloc(d->bytes);
+       memset(d->buf, 'a', d->bytes);
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               read_from_socket("aaaaaa", addrinfo);
+               freeaddrinfo(addrinfo);
+               free(d->buf);
+               free(d);
+               exit(0);
+       }
+       ok1(io_loop() == d);
+       ok1(d->state == 2);
+       ok1(d->bytes > 0);
+       ok1(d->bytes <= 1024*1024);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       freeaddrinfo(addrinfo);
+       free(d->buf);
+       free(d);
+       io_close_listener(l);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-05-write-DEBUG.c b/ccan/io/test/run-05-write-DEBUG.c
new file mode 100644 (file)
index 0000000..831e671
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64005"
+#define main real_main
+int real_main(void);
+#include "run-05-write.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-05-write.c b/ccan/io/test/run-05-write.c
new file mode 100644 (file)
index 0000000..5319def
--- /dev/null
@@ -0,0 +1,128 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65005"
+#endif
+
+struct data {
+       int state;
+       size_t bytes;
+       char *buf;
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+       io_set_finish(io_new_conn(fd, io_write(d->buf, d->bytes,
+                                              io_close_cb, d)),
+                     finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo)
+{
+       int fd, done, r;
+       char buf[100];
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               exit(1);
+       if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+               exit(2);
+
+       for (done = 0; done < bytes; done += r) {
+               r = read(fd, buf, sizeof(buf));
+               if (r < 0)
+                       exit(3);
+               done += r;
+       }
+       close(fd);
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(9);
+       d->state = 0;
+       d->bytes = 1024*1024;
+       d->buf = malloc(d->bytes);
+       memset(d->buf, 'a', d->bytes);
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               read_from_socket(d->bytes, addrinfo);
+               freeaddrinfo(addrinfo);
+               free(d->buf);
+               free(d);
+               exit(0);
+       }
+       ok1(io_loop() == d);
+       ok1(d->state == 2);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       freeaddrinfo(addrinfo);
+       free(d->buf);
+       free(d);
+       io_close_listener(l);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-06-idle-DEBUG.c b/ccan/io/test/run-06-idle-DEBUG.c
new file mode 100644 (file)
index 0000000..298ce23
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64006"
+#define main real_main
+int real_main(void);
+#include "run-06-idle.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-06-idle.c b/ccan/io/test/run-06-idle.c
new file mode 100644 (file)
index 0000000..51cca96
--- /dev/null
@@ -0,0 +1,146 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#ifndef PORT
+#define PORT "65006"
+#endif
+
+static struct io_conn *idler;
+
+struct data {
+       int state;
+       char buf[4];
+};
+
+static struct io_plan read_done(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 2 || d->state == 3);
+       d->state++;
+       return io_close();
+}
+
+static void finish_waker(struct io_conn *conn, struct data *d)
+{
+       io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d));
+       ok1(d->state == 1);
+       d->state++;
+}
+
+static void finish_idle(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 3);
+       d->state++;
+       io_break(d, io_idle());
+}
+
+static struct io_plan never(struct io_conn *conn, void *arg)
+{
+       abort();
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       int fd2;
+
+       ok1(d->state == 0);
+       d->state++;
+       idler = io_new_conn(fd, io_idle());
+       io_set_finish(idler, finish_idle, d);
+
+       /* This will wake us up, as read will fail. */
+       fd2 = open("/dev/null", O_RDONLY);
+       ok1(fd2 >= 0);
+       io_set_finish(io_new_conn(fd2, io_read(idler, 1, never, NULL)),
+                     finish_waker, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(13);
+       d->state = 0;
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               int i;
+
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+
+       ok1(io_loop() == d);
+       ok1(d->state == 4);
+       ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+       free(d);
+       io_close_listener(l);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-07-break-DEBUG.c b/ccan/io/test/run-07-break-DEBUG.c
new file mode 100644 (file)
index 0000000..602d7c2
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64007"
+#define main real_main
+int real_main(void);
+#include "run-07-break.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-07-break.c b/ccan/io/test/run-07-break.c
new file mode 100644 (file)
index 0000000..19cc6a8
--- /dev/null
@@ -0,0 +1,125 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65007"
+#endif
+
+struct data {
+       int state;
+       char buf[4];
+};
+
+static struct io_plan read_done(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       return io_close();
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 2);
+       d->state++;
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+
+       io_set_finish(io_new_conn(fd,
+                                 io_break(d,
+                                          io_read(d->buf, sizeof(d->buf), read_done, d))),
+                     finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(13);
+       d->state = 0;
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               int i;
+
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == d);
+       ok1(d->state == 1);
+       io_close_listener(l);
+
+       ok1(io_loop() == NULL);
+       ok1(d->state == 3);
+       ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+       free(d);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-08-hangup-on-idle-DEBUG.c b/ccan/io/test/run-08-hangup-on-idle-DEBUG.c
new file mode 100644 (file)
index 0000000..f916b8e
--- /dev/null
@@ -0,0 +1,7 @@
+#define DEBUG
+#define main real_main
+int real_main(void);
+#include "run-08-hangup-on-idle.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-08-hangup-on-idle.c b/ccan/io/test/run-08-hangup-on-idle.c
new file mode 100644 (file)
index 0000000..b384043
--- /dev/null
@@ -0,0 +1,45 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+static int fds2[2];
+
+static struct io_plan timeout_wakeup(struct io_conn *conn, char *buf)
+{
+       /* This kills the dummy connection. */
+       close(fds2[1]);
+       return io_read(buf, 16, io_close_cb, NULL);
+}
+
+int main(void)
+{
+       int fds[2];
+       struct io_conn *conn;
+       char buf[16];
+
+       plan_tests(4);
+
+       ok1(pipe(fds) == 0);
+
+       /* Write then close. */
+       io_new_conn(fds[1], io_write("hello there world", 16,
+                                    io_close_cb, NULL));
+       conn = io_new_conn(fds[0], io_idle());
+
+       /* To avoid assert(num_waiting) */
+       ok1(pipe(fds2) == 0);
+       io_new_conn(fds2[0], io_read(buf, 16, io_close_cb, NULL));
+
+       /* After half a second, it will read. */
+       io_timeout(conn, time_from_msec(500), timeout_wakeup, buf);
+
+       ok1(io_loop() == NULL);
+       ok1(memcmp(buf, "hello there world", 16) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-08-read-after-hangup-DEBUG.c b/ccan/io/test/run-08-read-after-hangup-DEBUG.c
new file mode 100644 (file)
index 0000000..8c60289
--- /dev/null
@@ -0,0 +1,7 @@
+#define DEBUG
+#define main real_main
+int real_main(void);
+#include "run-08-read-after-hangup.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-08-read-after-hangup.c b/ccan/io/test/run-08-read-after-hangup.c
new file mode 100644 (file)
index 0000000..b73139e
--- /dev/null
@@ -0,0 +1,34 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <signal.h>
+
+static char inbuf[8];
+
+static struct io_plan wake_it(struct io_conn *conn, struct io_conn *reader)
+{
+       io_wake(reader, io_read(inbuf, 8, io_close_cb, NULL));
+       return io_close();
+}
+
+int main(void)
+{
+       int fds[2];
+       struct io_conn *conn;
+
+       plan_tests(3);
+
+       ok1(pipe(fds) == 0);
+       conn = io_new_conn(fds[0], io_idle());
+       io_new_conn(fds[1], io_write("EASYTEST", 8, wake_it, conn));
+
+       ok1(io_loop() == NULL);
+       ok1(memcmp(inbuf, "EASYTEST", sizeof(inbuf)) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-09-connect-DEBUG.c b/ccan/io/test/run-09-connect-DEBUG.c
new file mode 100644 (file)
index 0000000..5520dd7
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64009"
+#define main real_main
+int real_main(void);
+#include "run-09-connect.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-09-connect.c b/ccan/io/test/run-09-connect.c
new file mode 100644 (file)
index 0000000..fd7e160
--- /dev/null
@@ -0,0 +1,103 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65009"
+#endif
+
+static struct io_listener *l;
+
+struct data {
+       int state;
+       char buf[10];
+};
+
+static struct io_plan closer(struct io_conn *conn, struct data *d)
+{
+       d->state++;
+       return io_close();
+}
+
+static struct io_plan connected(struct io_conn *conn, struct data *d2)
+{
+       ok1(d2->state == 0);
+       d2->state++;
+       return io_read(d2->buf, sizeof(d2->buf), closer, d2);
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+       io_new_conn(fd, io_write(d->buf, sizeof(d->buf), closer, d));
+       io_close_listener(l);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d)), *d2 = malloc(sizeof(*d2));
+       struct addrinfo *addrinfo;
+       int fd;
+
+       /* This is how many tests you plan to run */
+       plan_tests(8);
+       d->state = 0;
+       memset(d->buf, 'a', sizeof(d->buf));
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, d);
+       ok1(l);
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       d2->state = 0;
+       ok1(io_new_conn(fd, io_connect(fd, addrinfo, connected, d2)));
+
+       ok1(io_loop() == NULL);
+       ok1(d->state == 2);
+       ok1(d2->state == 2);
+
+       freeaddrinfo(addrinfo);
+       free(d);
+       free(d2);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-10-many-DEBUG.c b/ccan/io/test/run-10-many-DEBUG.c
new file mode 100644 (file)
index 0000000..675c795
--- /dev/null
@@ -0,0 +1,12 @@
+#define DEBUG
+#define PORT "64010"
+#define main real_main
+int real_main(void);
+#include "run-10-many.c"
+#undef main
+/* We stack overflow if we debug all of them! */
+static bool debug_one(struct io_conn *conn)
+{
+       return conn == buf[1].reader;
+}
+int main(void) { io_debug_conn = debug_one; return real_main(); }
diff --git a/ccan/io/test/run-10-many.c b/ccan/io/test/run-10-many.c
new file mode 100644 (file)
index 0000000..da59a3c
--- /dev/null
@@ -0,0 +1,105 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#define NUM 100
+#define NUM_ITERS 1000
+
+struct buffer {
+       int iters;
+       struct io_conn *reader, *writer;
+       char buf[32];
+};
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
+
+static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
+{
+       assert(conn == buf->reader);
+
+       if (buf->iters == NUM_ITERS)
+               return io_close();
+
+       /* You write. */
+       io_wake(buf->writer,
+               io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
+
+       /* I'll wait until you wake me. */
+       return io_idle();
+}
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
+{
+       assert(conn == buf->writer);
+       /* You read. */
+       io_wake(buf->reader,
+               io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
+
+       if (++buf->iters == NUM_ITERS)
+               return io_close();
+
+       /* I'll wait until you tell me to write. */
+       return io_idle();
+}
+
+static struct buffer buf[NUM];
+
+int main(void)
+{
+       unsigned int i;
+       int fds[2], last_read, last_write;
+
+       plan_tests(5 + NUM);
+
+       ok1(pipe(fds) == 0);
+       last_read = fds[0];
+       last_write = fds[1];
+
+       for (i = 1; i < NUM; i++) {
+               if (pipe(fds) < 0)
+                       break;
+               memset(buf[i].buf, i, sizeof(buf[i].buf));
+               sprintf(buf[i].buf, "%i-%i", i, i);
+
+               /* Wait for writer to tell us to read. */
+               buf[i].reader = io_new_conn(last_read, io_idle());
+               if (!buf[i].reader)
+                       break;
+               buf[i].writer = io_new_conn(fds[1],
+                                           io_write(&buf[i].buf,
+                                                    sizeof(buf[i].buf),
+                                                    poke_reader, &buf[i]));
+               if (!buf[i].writer)
+                       break;
+               last_read = fds[0];
+       }
+       if (!ok1(i == NUM))
+               exit(exit_status());
+
+       /* Last one completes the cirle. */
+       i = 0;
+       sprintf(buf[i].buf, "%i-%i", i, i);
+       buf[i].reader = io_new_conn(last_read, io_idle());
+       ok1(buf[i].reader);
+       buf[i].writer = io_new_conn(last_write,
+                                   io_write(&buf[i].buf, sizeof(buf[i].buf),
+                                            poke_reader, &buf[i]));
+       ok1(buf[i].writer);
+
+       /* They should eventually exit */
+       ok1(io_loop() == NULL);
+
+       for (i = 0; i < NUM; i++) {
+               char b[sizeof(buf[0].buf)];
+               memset(b, i, sizeof(b));
+               sprintf(b, "%i-%i", i, i);
+               ok1(memcmp(b, buf[(i + NUM_ITERS) % NUM].buf, sizeof(b)) == 0);
+       }
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-12-bidir-DEBUG.c b/ccan/io/test/run-12-bidir-DEBUG.c
new file mode 100644 (file)
index 0000000..55c4cf7
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64012"
+#define main real_main
+int real_main(void);
+#include "run-12-bidir.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-12-bidir.c b/ccan/io/test/run-12-bidir.c
new file mode 100644 (file)
index 0000000..1ab0a21
--- /dev/null
@@ -0,0 +1,132 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65012"
+#endif
+
+struct data {
+       struct io_listener *l;
+       int state;
+       char buf[4];
+       char wbuf[32];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       d->state++;
+}
+
+static struct io_plan write_done(struct io_conn *conn, struct data *d)
+{
+       d->state++;
+       return io_close();
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       struct io_conn *conn;
+
+       ok1(d->state == 0);
+       d->state++;
+
+       io_close_listener(d->l);
+
+       memset(d->wbuf, 7, sizeof(d->wbuf));
+
+       conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close_cb, d));
+       io_set_finish(conn, finish_ok, d);
+       conn = io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d));
+       ok1(conn);
+       io_set_finish(conn, finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(10);
+       d->state = 0;
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       d->l = io_new_listener(fd, init_conn, d);
+       ok1(d->l);
+       fflush(stdout);
+       if (!fork()) {
+               int i;
+               char buf[32];
+
+               io_close_listener(d->l);
+               free(d);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               for (i = 0; i < 32; i++) {
+                       if (read(fd, buf+i, 1) != 1)
+                               break;
+               }
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == NULL);
+       ok1(d->state == 4);
+       ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+       free(d);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-13-all-idle-DEBUG.c b/ccan/io/test/run-13-all-idle-DEBUG.c
new file mode 100644 (file)
index 0000000..2969a13
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64013"
+#define main real_main
+int real_main(void);
+#include "run-13-all-idle.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-13-all-idle.c b/ccan/io/test/run-13-all-idle.c
new file mode 100644 (file)
index 0000000..0e7e156
--- /dev/null
@@ -0,0 +1,31 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <signal.h>
+
+int main(void)
+{
+       int status;
+
+       plan_tests(3);
+
+       if (fork() == 0) {
+               int fds[2];
+
+               ok1(pipe(fds) == 0);
+               io_new_conn(fds[0], io_idle());
+               io_loop();
+               exit(1);
+       }
+
+       ok1(wait(&status) != -1);
+       ok1(WIFSIGNALED(status));
+       ok1(WTERMSIG(status) == SIGABRT);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-15-timeout-DEBUG.c b/ccan/io/test/run-15-timeout-DEBUG.c
new file mode 100644 (file)
index 0000000..d511486
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64015"
+#define main real_main
+int real_main(void);
+#include "run-15-timeout.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-15-timeout.c b/ccan/io/test/run-15-timeout.c
new file mode 100644 (file)
index 0000000..6f92ec3
--- /dev/null
@@ -0,0 +1,174 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#ifndef PORT
+#define PORT "65015"
+#endif
+
+struct data {
+       int state;
+       int timeout_usec;
+       bool timed_out;
+       char buf[4];
+};
+
+
+static struct io_plan no_timeout(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       return io_close();
+}
+
+static struct io_plan timeout(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       d->timed_out = true;
+       return io_close();
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 2);
+       d->state++;
+       io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       struct io_conn *conn;
+
+       ok1(d->state == 0);
+       d->state++;
+
+       conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d));
+       io_set_finish(conn, finish_ok, d);
+       io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(20);
+       d->state = 0;
+       d->timed_out = false;
+       d->timeout_usec = 100000;
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, d);
+       ok1(l);
+       fflush(stdout);
+
+       if (!fork()) {
+               int i;
+
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               usleep(500000);
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(i);
+       }
+       ok1(io_loop() == d);
+       ok1(d->state == 3);
+       ok1(d->timed_out == true);
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) < sizeof(d->buf));
+
+       /* This one shouldn't time out. */
+       d->state = 0;
+       d->timed_out = false;
+       d->timeout_usec = 500000;
+       fflush(stdout);
+
+       if (!fork()) {
+               int i;
+
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               usleep(100000);
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               free(d);
+               exit(i);
+       }
+       ok1(io_loop() == d);
+       ok1(d->state == 3);
+       ok1(d->timed_out == false);
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) >= sizeof(d->buf));
+
+       io_close_listener(l);
+       freeaddrinfo(addrinfo);
+       free(d);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-17-homemade-io-DEBUG.c b/ccan/io/test/run-17-homemade-io-DEBUG.c
new file mode 100644 (file)
index 0000000..5c44ce0
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64017"
+#define main real_main
+int real_main(void);
+#include "run-17-homemade-io.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-17-homemade-io.c b/ccan/io/test/run-17-homemade-io.c
new file mode 100644 (file)
index 0000000..b827713
--- /dev/null
@@ -0,0 +1,183 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65017"
+#endif
+
+struct packet {
+       int state;
+       size_t len;
+       void *contents;
+};
+
+static void finish_ok(struct io_conn *conn, struct packet *pkt)
+{
+       ok1(pkt->state == 3);
+       pkt->state++;
+       io_break(pkt, io_idle());
+}
+
+static int do_read_packet(int fd, struct io_plan *plan)
+{
+       struct packet *pkt = plan->u.ptr_len.p;
+       char *dest;
+       ssize_t ret;
+       size_t off, totlen;
+
+       /* Reading len? */
+       if (plan->u.ptr_len.len < sizeof(size_t)) {
+               ok1(pkt->state == 1);
+               pkt->state++;
+               dest = (char *)&pkt->len;
+               off = plan->u.ptr_len.len;
+               totlen = sizeof(pkt->len);
+       } else {
+               ok1(pkt->state == 2);
+               pkt->state++;
+               if (pkt->len == 0)
+                       return io_debug_io(1);
+               if (!pkt->contents && !(pkt->contents = malloc(pkt->len)))
+                       goto fail;
+               else {
+                       dest = pkt->contents;
+                       off = plan->u.ptr_len.len - sizeof(pkt->len);
+                       totlen = pkt->len;
+               }
+       }
+
+       ret = read(fd, dest + off, totlen - off);
+       if (ret <= 0)
+               goto fail;
+
+       plan->u.ptr_len.len += ret;
+
+       /* Finished? */
+       return io_debug_io(plan->u.ptr_len.len >= sizeof(pkt->len)
+                          && plan->u.ptr_len.len == pkt->len + sizeof(pkt->len));
+
+fail:
+       free(pkt->contents);
+       return io_debug_io(-1);
+}
+
+static struct io_plan io_read_packet(struct packet *pkt,
+                                    struct io_plan (*cb)(struct io_conn *, void *),
+                                    void *arg)
+{
+       struct io_plan plan;
+
+       assert(cb);
+       pkt->contents = NULL;
+       plan.u.ptr_len.p = pkt;
+       plan.u.ptr_len.len = 0;
+       plan.io = do_read_packet;
+       plan.next = cb;
+       plan.next_arg = arg;
+       plan.pollflag = POLLIN;
+
+       return plan;
+}
+
+static void init_conn(int fd, struct packet *pkt)
+{
+       ok1(pkt->state == 0);
+       pkt->state++;
+
+       io_set_finish(io_new_conn(fd, io_read_packet(pkt, io_close_cb, pkt)),
+                     finish_ok, pkt);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct packet *pkt = malloc(sizeof(*pkt));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(13);
+       pkt->state = 0;
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, pkt);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               struct {
+                       size_t len;
+                       char data[8];
+               } data;
+
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+
+               data.len = sizeof(data.data);
+               memcpy(data.data, "hithere!", sizeof(data.data));
+               if (write(fd, &data, sizeof(data)) != sizeof(data))
+                       exit(3);
+
+               close(fd);
+               freeaddrinfo(addrinfo);
+               free(pkt);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == pkt);
+       ok1(pkt->state == 4);
+       ok1(pkt->len == 8);
+       ok1(memcmp(pkt->contents, "hithere!", 8) == 0);
+       free(pkt->contents);
+       free(pkt);
+       io_close_listener(l);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}
diff --git a/ccan/io/test/run-18-errno-DEBUG.c b/ccan/io/test/run-18-errno-DEBUG.c
new file mode 100644 (file)
index 0000000..863d1ae
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64018"
+#define main real_main
+int real_main(void);
+#include "run-18-errno.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-18-errno.c b/ccan/io/test/run-18-errno.c
new file mode 100644 (file)
index 0000000..985a322
--- /dev/null
@@ -0,0 +1,120 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65018"
+#endif
+
+static void finish_100(struct io_conn *conn, int *state)
+{
+       ok1(errno == 100);
+       ok1(*state == 1);
+       (*state)++;
+}
+
+static void finish_EBADF(struct io_conn *conn, int *state)
+{
+       ok1(errno == EBADF);
+       ok1(*state == 3);
+       (*state)++;
+       io_break(state + 1, io_close());
+}
+
+static void init_conn(int fd, int *state)
+{
+       if (*state == 0) {
+               (*state)++;
+               errno = 100;
+               io_set_finish(io_new_conn(fd, io_close()), finish_100, state);
+       } else {
+               ok1(*state == 2);
+               (*state)++;
+               close(fd);
+               errno = 0;
+               io_set_finish(io_new_conn(fd, io_read(state, 0,
+                                                     io_close_cb, NULL)),
+                             finish_EBADF, state);
+       }
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       int state = 0;
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd;
+
+       /* This is how many tests you plan to run */
+       plan_tests(12);
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, &state);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               close(fd);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(3);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(4);
+               close(fd);
+               freeaddrinfo(addrinfo);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == &state + 1);
+       ok1(state == 4);
+       io_close_listener(l);
+       ok1(wait(&state));
+       ok1(WIFEXITED(state));
+       ok1(WEXITSTATUS(state) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}