htable \
idtree \
ilog \
+ io \
isaac \
iscsi \
jmap \
--- /dev/null
+../../licenses/LGPL-2.1
\ No newline at end of file
--- /dev/null
+Simple:
+ step1(conn): read(conn), then step2
+ step2(conn): write(conn), then close
+
+Pass-through:
+ step1(conn): read(conn), then step2
+ step2(conn): write(otherconn), then step1
+
+Pass-through-and-connect:
+ step1(conn): read(conn), then step2
+ step2(conn): connect(otherconn), then step3
+ step3(conn): write(otherconn), then step1
+
+Chatroom:
+ step1(conn): read(conn), then step2
+ step2(conn): for c in allcons: write(c). goto step1
+
+Simple:
+
+void event(struct io_event *done)
+{
+ char *buf = done->priv;
+ struct io_event *e;
+
+ e = queue_read(done, done->conn, buf, 100);
+ e = queue_write(e, done->conn, buf, 100);
+ queue_close(e, done->conn);
+}
+
+Pass-through:
+struct passthru {
+ char buf[100];
+ struct conn *rconn, *wconn;
+};
+
+void event(struct io_event *done)
+{
+ struct passthru *p = done->priv;
+ struct io_event *e;
+
+ e = queue_read(done, p->rconn, p->buf, 100);
+ e = queue_write(e, p->wconn, buf, 100);
+ queue_event(e, event);
+}
+
+Chatroom:
+struct list_head clients;
+
+struct buffer {
+ char buf[100];
+ unsigned int ref;
+};
+
+struct client {
+ struct list_node list;
+ struct connection *conn;
+ struct buffer *rbuf, *wbuf;
+};
+
+void broadcast(struct io_event *done)
+{
+ struct client *i, *c = done->conn->priv;
+ struct io_event *e;
+
+ list_for_each(&clients, i, list) {
+ e = queue_write(done, i->conn, c->buf->buf, 100);
+ e->priv = c->buf;
+ c->buf->ref++;
+ queue_event(e, drop_ref);
+ }
+
+
+
+void event(struct io_event *done)
+{
+ struct client *c = done->conn->priv;
+ struct io_event *e;
+
+ assert(c->conn == done->conn);
+ c->buf = malloc(sizeof(*c->buf));
+ c->buf->ref = 0;
+ e = queue_read(done, c->conn, c->buf->buf, 100);
+ e = queue_event(e, broadcast);
+}
+
+
+ step1(conn): read(conn), then step2
+ step2(conn): for c in allcons: write(c). goto step1
--- /dev/null
+#include <stdio.h>
+#include <string.h>
+#include "config.h"
+
+/**
+ * io - simple library for asynchronous io handling.
+ *
+ * io provides a mechanism to write I/O servers with multiple
+ * connections. Each callback indicates what I/O they plan next
+ * (eg. read, write). It is also possible to write custom I/O
+ * plans.
+ *
+ * When compiled with DEBUG, control flow is changed so that rather
+ * than returning to the main io_loop(), plans are executed sequentially
+ * providing a backtrace showing what has occurred on that connection.
+ * Which connection(s) do this depends on the user-specified io_debug
+ * function.
+ *
+ * Example:
+ * // Given tr A-Z a-z outputs tr a-z a-z
+ * #include <ccan/io/io.h>
+ * #include <ccan/err/err.h>
+ * #include <assert.h>
+ * #include <stdlib.h>
+ * #include <signal.h>
+ * #include <sys/types.h>
+ * #include <sys/wait.h>
+ *
+ * struct buffer {
+ * size_t max, off, rlen;
+ * char *buf;
+ * };
+ *
+ * struct stdin_buffer {
+ * struct io_conn *reader, *writer;
+ * size_t len;
+ * char inbuf[4096];
+ * };
+ *
+ * // This reads from stdin.
+ * static struct io_plan wake_writer(struct io_conn *, struct stdin_buffer *);
+ * // This writes the stdin buffer to the child.
+ * static struct io_plan wake_reader(struct io_conn *, struct stdin_buffer *);
+ *
+ * static struct io_plan wake_writer(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * assert(c == b->reader);
+ * io_wake(b->writer, io_write(b->inbuf, b->len, wake_reader, b));
+ * return io_idle();
+ * }
+ *
+ * static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * assert(c == b->reader);
+ * io_wake(b->writer, io_close());
+ * b->reader = NULL;
+ * }
+ *
+ * static struct io_plan wake_reader(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * assert(c == b->writer);
+ * if (!b->reader)
+ * return io_close();
+ * b->len = sizeof(b->inbuf);
+ * io_wake(b->reader, io_read_partial(b->inbuf, &b->len, wake_writer, b));
+ * return io_idle();
+ * }
+ *
+ * static void fail_child_write(struct io_conn *conn, struct stdin_buffer *b)
+ * {
+ * if (b->reader)
+ * err(1, "Failed writing to child.");
+ * }
+ *
+ * // This reads from the child and saves it into buffer.
+ * static struct io_plan read_from_child(struct io_conn *conn,
+ * struct buffer *b)
+ * {
+ * b->off += b->rlen;
+ *
+ * if (b->off == b->max)
+ * b->buf = realloc(b->buf, b->max *= 2);
+ *
+ * b->rlen = b->max - b->off;
+ * return io_read_partial(b->buf + b->off, &b->rlen, read_from_child, b);
+ * }
+ *
+ * // Feed a program our stdin, gather its stdout, print that at end.
+ * int main(int argc, char *argv[])
+ * {
+ * int tochild[2], fromchild[2];
+ * struct buffer out;
+ * struct stdin_buffer sbuf;
+ * int status;
+ * size_t off;
+ * ssize_t ret;
+ * struct io_conn *from_child;
+ *
+ * if (argc == 1)
+ * errx(1, "Usage: runner <cmdline>...");
+ *
+ * if (pipe(tochild) != 0 || pipe(fromchild) != 0)
+ * err(1, "Creating pipes");
+ *
+ * if (!fork()) {
+ * // Child runs command.
+ * close(tochild[1]);
+ * close(fromchild[0]);
+ *
+ * dup2(tochild[0], STDIN_FILENO);
+ * dup2(fromchild[1], STDOUT_FILENO);
+ * execvp(argv[1], argv + 1);
+ * exit(127);
+ * }
+ *
+ * close(tochild[0]);
+ * close(fromchild[1]);
+ * signal(SIGPIPE, SIG_IGN);
+ *
+ * sbuf.len = sizeof(sbuf.inbuf);
+ * sbuf.reader = io_new_conn(STDIN_FILENO,
+ * io_read_partial(sbuf.inbuf, &sbuf.len,
+ * wake_writer, &sbuf));
+ * sbuf.writer = io_new_conn(tochild[1], io_idle());
+ *
+ * out.max = 128;
+ * out.off = 0;
+ * out.rlen = 128;
+ * out.buf = malloc(out.max);
+ * from_child = io_new_conn(fromchild[0],
+ * io_read_partial(out.buf, &out.rlen,
+ * read_from_child, &out));
+ * if (!sbuf.reader || !sbuf.writer || !from_child)
+ * err(1, "Allocating connections");
+ *
+ * io_set_finish(sbuf.reader, reader_exit, &sbuf);
+ * io_set_finish(sbuf.writer, fail_child_write, &sbuf);
+ *
+ * io_loop();
+ * wait(&status);
+ *
+ * for (off = 0; off < out.off; off += ret) {
+ * ret = write(STDOUT_FILENO, out.buf+off, out.off-off);
+ * if (ret < 0)
+ * err(1, "Writing stdout");
+ * }
+ * free(out.buf);
+ *
+ * return WIFEXITED(status) ? WEXITSTATUS(status) : 2;
+ * }
+ *
+ * License: LGPL (v2.1 or any later version)
+ * Author: Rusty Russell <rusty@rustcorp.com.au>
+ */
+int main(int argc, char *argv[])
+{
+ if (argc != 2)
+ return 1;
+
+ if (strcmp(argv[1], "depends") == 0) {
+ printf("ccan/time\n");
+ printf("ccan/timer\n");
+ return 0;
+ }
+
+ return 1;
+}
--- /dev/null
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#ifndef CCAN_IO_BACKEND_H
+#define CCAN_IO_BACKEND_H
+#include <stdbool.h>
+#include <ccan/timer/timer.h>
+
+struct fd {
+ int fd;
+ bool listener;
+ size_t backend_info;
+};
+
+/* Listeners create connections. */
+struct io_listener {
+ struct fd fd;
+
+ /* These are for connections we create. */
+ void (*init)(int fd, void *arg);
+ void *arg;
+};
+
+struct io_timeout {
+ struct timer timer;
+ struct io_conn *conn;
+
+ struct io_plan (*next)(struct io_conn *, void *arg);
+ void *next_arg;
+};
+
+/* One connection per client. */
+struct io_conn {
+ struct fd fd;
+
+ void (*finish)(struct io_conn *, void *arg);
+ void *finish_arg;
+
+ struct io_conn *duplex;
+ struct io_timeout *timeout;
+
+ struct io_plan plan;
+};
+
+static inline bool timeout_active(const struct io_conn *conn)
+{
+ return conn->timeout && conn->timeout->conn;
+}
+
+extern void *io_loop_return;
+
+#ifdef DEBUG
+extern struct io_conn *current;
+static inline void set_current(struct io_conn *conn)
+{
+ current = conn;
+}
+static inline bool doing_debug_on(struct io_conn *conn)
+{
+ return io_debug_conn && io_debug_conn(conn);
+}
+static inline bool doing_debug(void)
+{
+ return io_debug_conn;
+}
+#else
+static inline void set_current(struct io_conn *conn)
+{
+}
+static inline bool doing_debug_on(struct io_conn *conn)
+{
+ return false;
+}
+static inline bool doing_debug(void)
+{
+ return false;
+}
+#endif
+
+bool add_listener(struct io_listener *l);
+bool add_conn(struct io_conn *c);
+bool add_duplex(struct io_conn *c);
+void del_listener(struct io_listener *l);
+void backend_plan_changed(struct io_conn *conn);
+void backend_add_timeout(struct io_conn *conn, struct timespec ts);
+void backend_del_timeout(struct io_conn *conn);
+void backend_del_conn(struct io_conn *conn);
+
+void io_ready(struct io_conn *conn);
+void *do_io_loop(struct io_conn **ready);
+#endif /* CCAN_IO_BACKEND_H */
--- /dev/null
+ALL:=run-loop run-different-speed run-length-prefix
+CCANDIR:=../../..
+CFLAGS:=-Wall -I$(CCANDIR) -O3 -flto
+LDFLAGS:=-O3 -flto
+LDLIBS:=-lrt
+
+OBJS:=time.o poll.o io.o err.o timer.o list.o
+
+default: $(ALL)
+
+run-loop: run-loop.o $(OBJS)
+run-different-speed: run-different-speed.o $(OBJS)
+run-length-prefix: run-length-prefix.o $(OBJS)
+
+time.o: $(CCANDIR)/ccan/time/time.c
+ $(CC) $(CFLAGS) -c -o $@ $<
+timer.o: $(CCANDIR)/ccan/timer/timer.c
+ $(CC) $(CFLAGS) -c -o $@ $<
+list.o: $(CCANDIR)/ccan/list/list.c
+ $(CC) $(CFLAGS) -c -o $@ $<
+poll.o: $(CCANDIR)/ccan/io/poll.c
+ $(CC) $(CFLAGS) -c -o $@ $<
+io.o: $(CCANDIR)/ccan/io/io.c
+ $(CC) $(CFLAGS) -c -o $@ $<
+err.o: $(CCANDIR)/ccan/err/err.c
+ $(CC) $(CFLAGS) -c -o $@ $<
+
+clean:
+ $(RM) -f *.o $(ALL)
--- /dev/null
+/* Simulate a server with connections of different speeds. We count
+ * how many connections complete in 10 seconds. */
+#include <ccan/io/io.h>
+#include <ccan/time/time.h>
+#include <ccan/err/err.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdio.h>
+#include <signal.h>
+
+#define REQUEST_SIZE 1024
+#define REPLY_SIZE 10240
+#define NUM_CONNS 500 /* per child */
+#define NUM_CHILDREN 2
+
+static unsigned int completed;
+
+struct client {
+ char request_buffer[REQUEST_SIZE];
+ char reply_buffer[REPLY_SIZE];
+};
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client);
+static struct io_plan read_request(struct io_conn *conn, struct client *client)
+{
+ return io_read(client->request_buffer, REQUEST_SIZE,
+ write_reply, client);
+}
+
+/* once we're done, loop again. */
+static struct io_plan write_complete(struct io_conn *conn, struct client *client)
+{
+ completed++;
+ return read_request(conn, client);
+}
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client)
+{
+ return io_write(client->reply_buffer, REPLY_SIZE,
+ write_complete, client);
+}
+
+/* This runs in the child. */
+static void create_clients(struct sockaddr_un *addr, int waitfd)
+{
+ struct client data;
+ int i, sock[NUM_CONNS], speed[NUM_CONNS], done[NUM_CONNS], count = 0;
+
+ for (i = 0; i < NUM_CONNS; i++) {
+ /* Set speed. */
+ speed[i] = (1 << (random() % 10));
+ sock[i] = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sock[i] < 0)
+ err(1, "creating socket");
+ if (connect(sock[i], (void *)addr, sizeof(*addr)) != 0)
+ err(1, "connecting socket");
+ /* Make nonblocking. */
+ fcntl(sock[i], F_SETFD, fcntl(sock[i], F_GETFD)|O_NONBLOCK);
+ done[i] = 0;
+ }
+
+ read(waitfd, &i, 1);
+
+ for (;;) {
+ for (i = 0; i < NUM_CONNS; i++) {
+ int ret, bytes = speed[i];
+ if (done[i] < REQUEST_SIZE) {
+ if (REQUEST_SIZE - done[i] < bytes)
+ bytes = REQUEST_SIZE - done[i];
+ ret = write(sock[i], data.request_buffer,
+ bytes);
+ if (ret > 0)
+ done[i] += ret;
+ else if (ret < 0 && errno != EAGAIN)
+ goto fail;
+ } else {
+ if (REQUEST_SIZE + REPLY_SIZE - done[i] < bytes)
+ bytes = REQUEST_SIZE + REPLY_SIZE
+ - done[i];
+ ret = read(sock[i], data.reply_buffer,
+ bytes);
+ if (ret > 0) {
+ done[i] += ret;
+ if (done[i] == REQUEST_SIZE + REPLY_SIZE) {
+ count++;
+ done[i] = 0;
+ }
+ } else if (ret < 0 && errno != EAGAIN)
+ goto fail;
+ }
+ }
+ }
+fail:
+ printf("Child did %u\n", count);
+ exit(0);
+}
+
+static int timeout[2];
+static void sigalarm(int sig)
+{
+ write(timeout[1], "1", 1);
+}
+
+static struct io_plan do_timeout(struct io_conn *conn, char *buf)
+{
+ return io_break(buf, io_idle());
+}
+
+int main(int argc, char *argv[])
+{
+ struct client client;
+ unsigned int i, j;
+ struct sockaddr_un addr;
+ struct timespec start, end;
+ int fd, wake[2];
+ char buf;
+
+ addr.sun_family = AF_UNIX;
+ sprintf(addr.sun_path, "/tmp/run-different-speed.sock.%u", getpid());
+
+ if (pipe(wake) != 0 || pipe(timeout) != 0)
+ err(1, "Creating pipes");
+
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0)
+ err(1, "Creating socket");
+
+ if (bind(fd, (void *)&addr, sizeof(addr)) != 0)
+ err(1, "Binding to %s", addr.sun_path);
+
+ if (listen(fd, NUM_CONNS) != 0)
+ err(1, "Listening on %s", addr.sun_path);
+
+ for (i = 0; i < NUM_CHILDREN; i++) {
+ switch (fork()) {
+ case -1:
+ err(1, "forking");
+ case 0:
+ close(wake[1]);
+ create_clients(&addr, wake[0]);
+ break;
+ }
+ for (j = 0; j < NUM_CONNS; j++) {
+ int ret = accept(fd, NULL, 0);
+ if (ret < 0)
+ err(1, "Accepting fd");
+ /* For efficiency, we share client structure */
+ io_new_conn(ret,
+ io_read(client.request_buffer, REQUEST_SIZE,
+ write_reply, &client));
+ }
+ }
+
+ io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf));
+
+ close(wake[0]);
+ for (i = 0; i < NUM_CHILDREN; i++)
+ write(wake[1], "1", 1);
+
+ signal(SIGALRM, sigalarm);
+ alarm(10);
+ start = time_now();
+ io_loop();
+ end = time_now();
+ close(fd);
+
+ printf("%u connections complete (%u ns per conn)\n",
+ completed,
+ (int)time_to_nsec(time_divide(time_sub(end, start), completed)));
+ return 0;
+}
--- /dev/null
+/* Simulate a server with connections of different speeds. We count
+ * how many connections complete in 10 seconds. */
+#include <ccan/io/io.h>
+#include <ccan/time/time.h>
+#include <ccan/err/err.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdio.h>
+#include <signal.h>
+#include <assert.h>
+
+#define REQUEST_MAX 131072
+#define NUM_CONNS 500 /* per child */
+#define NUM_CHILDREN 2
+
+static unsigned int completed;
+
+struct client {
+ unsigned int len;
+ char *request_buffer;
+};
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client);
+static struct io_plan read_body(struct io_conn *conn, struct client *client)
+{
+ assert(client->len <= REQUEST_MAX);
+ return io_read(client->request_buffer, client->len,
+ write_reply, client);
+}
+
+static struct io_plan io_read_header(struct client *client)
+{
+ return io_read(&client->len, sizeof(client->len), read_body, client);
+}
+
+/* once we're done, loop again. */
+static struct io_plan write_complete(struct io_conn *conn, struct client *client)
+{
+ completed++;
+ return io_read_header(client);
+}
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client)
+{
+ return io_write(&client->len, sizeof(client->len),
+ write_complete, client);
+}
+
+/* This runs in the child. */
+static void create_clients(struct sockaddr_un *addr, int waitfd)
+{
+ struct client data;
+ int i, sock[NUM_CONNS], len[NUM_CONNS], done[NUM_CONNS],
+ result[NUM_CONNS], count = 0;
+
+ for (i = 0; i < NUM_CONNS; i++) {
+ len[i] = (random() % REQUEST_MAX) + 1;
+ sock[i] = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sock[i] < 0)
+ err(1, "creating socket");
+ if (connect(sock[i], (void *)addr, sizeof(*addr)) != 0)
+ err(1, "connecting socket");
+ /* Make nonblocking. */
+ fcntl(sock[i], F_SETFD, fcntl(sock[i], F_GETFD)|O_NONBLOCK);
+ done[i] = 0;
+ }
+
+ read(waitfd, &i, 1);
+
+ for (;;) {
+ for (i = 0; i < NUM_CONNS; i++) {
+ int ret, totlen = len[i] + sizeof(len[i]);
+ if (done[i] < sizeof(len[i]) + len[i]) {
+ data.len = len[i];
+ ret = write(sock[i], (void *)&data + done[i],
+ totlen - done[i]);
+ if (ret > 0)
+ done[i] += ret;
+ else if (ret < 0 && errno != EAGAIN)
+ goto fail;
+ } else {
+ int off = done[i] - totlen;
+ ret = read(sock[i], (void *)&result[i] + off,
+ sizeof(result[i]) - off);
+ if (ret > 0) {
+ done[i] += ret;
+ if (done[i] == totlen
+ + sizeof(result[i])) {
+ assert(result[i] == len[i]);
+ count++;
+ done[i] = 0;
+ }
+ } else if (ret < 0 && errno != EAGAIN)
+ goto fail;
+ }
+ }
+ }
+fail:
+ printf("Child did %u\n", count);
+ exit(0);
+}
+
+static int timeout[2];
+static void sigalarm(int sig)
+{
+ write(timeout[1], "1", 1);
+}
+
+static struct io_plan do_timeout(struct io_conn *conn, char *buf)
+{
+ return io_break(buf, io_idle());
+}
+
+int main(int argc, char *argv[])
+{
+ unsigned int i, j;
+ struct sockaddr_un addr;
+ struct timespec start, end;
+ char buffer[REQUEST_MAX];
+ int fd, wake[2];
+ char buf;
+
+ addr.sun_family = AF_UNIX;
+ sprintf(addr.sun_path, "/tmp/run-different-speed.sock.%u", getpid());
+
+ if (pipe(wake) != 0 || pipe(timeout) != 0)
+ err(1, "Creating pipes");
+
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0)
+ err(1, "Creating socket");
+
+ if (bind(fd, (void *)&addr, sizeof(addr)) != 0)
+ err(1, "Binding to %s", addr.sun_path);
+
+ if (listen(fd, NUM_CONNS) != 0)
+ err(1, "Listening on %s", addr.sun_path);
+
+ for (i = 0; i < NUM_CHILDREN; i++) {
+ switch (fork()) {
+ case -1:
+ err(1, "forking");
+ case 0:
+ close(wake[1]);
+ create_clients(&addr, wake[0]);
+ break;
+ }
+ for (j = 0; j < NUM_CONNS; j++) {
+ struct client *client = malloc(sizeof(*client));
+ int ret = accept(fd, NULL, 0);
+ if (ret < 0)
+ err(1, "Accepting fd");
+ /* For efficiency, we share buffer */
+ client->request_buffer = buffer;
+ io_new_conn(ret, io_read_header(client));
+ }
+ }
+
+ io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf));
+
+ close(wake[0]);
+ for (i = 0; i < NUM_CHILDREN; i++)
+ write(wake[1], "1", 1);
+
+ signal(SIGALRM, sigalarm);
+ alarm(10);
+ start = time_now();
+ io_loop();
+ end = time_now();
+ close(fd);
+
+ printf("%u connections complete (%u ns per conn)\n",
+ completed,
+ (int)time_to_nsec(time_divide(time_sub(end, start), completed)));
+ return 0;
+}
--- /dev/null
+#include <ccan/io/io.h>
+#include <ccan/time/time.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+#include <err.h>
+#include <signal.h>
+
+#define NUM 500
+#define NUM_ITERS 10000
+
+struct buffer {
+ int iters;
+ struct io_conn *reader, *writer;
+ char buf[32];
+};
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
+
+static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
+{
+ assert(conn == buf->reader);
+
+ if (buf->iters == NUM_ITERS)
+ return io_close();
+
+ /* You write. */
+ io_wake(buf->writer,
+ io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
+
+ /* I'll wait until you wake me. */
+ return io_idle();
+}
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
+{
+ assert(conn == buf->writer);
+ /* You read. */
+ io_wake(buf->reader,
+ io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
+
+ if (++buf->iters == NUM_ITERS)
+ return io_close();
+
+ /* I'll wait until you tell me to write. */
+ return io_idle();
+}
+
+int main(void)
+{
+ unsigned int i;
+ int fds[2], last_read, last_write;
+ struct timespec start, end;
+ struct buffer buf[NUM];
+
+ if (pipe(fds) != 0)
+ err(1, "pipe");
+ last_read = fds[0];
+ last_write = fds[1];
+
+ for (i = 1; i < NUM; i++) {
+ buf[i].iters = 0;
+ if (pipe(fds) < 0)
+ err(1, "pipe");
+ memset(buf[i].buf, i, sizeof(buf[i].buf));
+ sprintf(buf[i].buf, "%i-%i", i, i);
+
+ buf[i].reader = io_new_conn(last_read, io_idle());
+ if (!buf[i].reader)
+ err(1, "Creating reader %i", i);
+ buf[i].writer = io_new_conn(fds[1],
+ io_write(&buf[i].buf,
+ sizeof(buf[i].buf),
+ poke_reader, &buf[i]));
+ if (!buf[i].writer)
+ err(1, "Creating writer %i", i);
+ last_read = fds[0];
+ }
+
+ /* Last one completes the cirle. */
+ i = 0;
+ buf[i].iters = 0;
+ sprintf(buf[i].buf, "%i-%i", i, i);
+ buf[i].reader = io_new_conn(last_read, io_idle());
+ if (!buf[i].reader)
+ err(1, "Creating reader %i", i);
+ buf[i].writer = io_new_conn(last_write, io_write(&buf[i].buf,
+ sizeof(buf[i].buf),
+ poke_reader, &buf[i]));
+ if (!buf[i].writer)
+ err(1, "Creating writer %i", i);
+
+ /* They should eventually exit */
+ start = time_now();
+ if (io_loop() != NULL)
+ errx(1, "io_loop?");
+ end = time_now();
+
+ for (i = 0; i < NUM; i++) {
+ char b[sizeof(buf[0].buf)];
+ memset(b, i, sizeof(b));
+ sprintf(b, "%i-%i", i, i);
+ if (memcmp(b, buf[(i + NUM_ITERS) % NUM].buf, sizeof(b)) != 0)
+ errx(1, "Buffer for %i was '%s' not '%s'",
+ i, buf[(i + NUM_ITERS) % NUM].buf, b);
+ }
+
+ printf("run-many: %u %u iterations: %llu usec\n",
+ NUM, NUM_ITERS, (long long)time_to_usec(time_sub(end, start)));
+ return 0;
+}
--- /dev/null
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <poll.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+void *io_loop_return;
+
+#ifdef DEBUG
+/* Set to skip the next plan. */
+bool io_plan_nodebug;
+/* The current connection to apply plan to. */
+struct io_conn *current;
+/* User-defined function to select which connection(s) to debug. */
+bool (*io_debug_conn)(struct io_conn *conn);
+/* Set when we wake up an connection we are debugging. */
+bool io_debug_wakeup;
+
+struct io_plan io_debug(struct io_plan plan)
+{
+ struct io_conn *ready = NULL;
+
+ if (io_plan_nodebug) {
+ io_plan_nodebug = false;
+ return plan;
+ }
+
+ if (!current || !doing_debug_on(current)) {
+ if (!io_debug_wakeup)
+ return plan;
+ }
+
+ io_debug_wakeup = false;
+ current->plan = plan;
+ backend_plan_changed(current);
+
+ /* Call back into the loop immediately. */
+ io_loop_return = do_io_loop(&ready);
+
+ if (ready) {
+ set_current(ready);
+ if (!ready->plan.next) {
+ /* Call finish function immediately. */
+ if (ready->finish) {
+ errno = ready->plan.u.close.saved_errno;
+ ready->finish(ready, ready->finish_arg);
+ ready->finish = NULL;
+ }
+ backend_del_conn(ready);
+ } else {
+ /* Calls back in itself, via io_debug_io(). */
+ if (ready->plan.io(ready->fd.fd, &ready->plan) != 2)
+ abort();
+ }
+ set_current(NULL);
+ }
+
+ /* Return a do-nothing plan, so backend_plan_changed in
+ * io_ready doesn't do anything (it's already been called). */
+ return io_idle_();
+}
+
+int io_debug_io(int ret)
+{
+ /* Cache it for debugging; current changes. */
+ struct io_conn *conn = current;
+ int saved_errno = errno;
+
+ if (!doing_debug_on(conn))
+ return ret;
+
+ /* These will all go linearly through the io_debug() path above. */
+ switch (ret) {
+ case -1:
+ /* This will call io_debug above. */
+ errno = saved_errno;
+ io_close();
+ break;
+ case 0: /* Keep going with plan. */
+ io_debug(conn->plan);
+ break;
+ case 1: /* Done: get next plan. */
+ if (timeout_active(conn))
+ backend_del_timeout(conn);
+ conn->plan.next(conn, conn->plan.next_arg);
+ break;
+ default:
+ abort();
+ }
+
+ /* Normally-invalid value, used for sanity check. */
+ return 2;
+}
+
+static void debug_io_wake(struct io_conn *conn)
+{
+ /* We want linear if we wake a debugged connection, too. */
+ if (io_debug_conn && io_debug_conn(conn))
+ io_debug_wakeup = true;
+}
+
+/* Counterpart to io_plan_no_debug(), called in macros in io.h */
+static void io_plan_debug_again(void)
+{
+ io_plan_nodebug = false;
+}
+#else
+static void debug_io_wake(struct io_conn *conn)
+{
+}
+static void io_plan_debug_again(void)
+{
+}
+#endif
+
+struct io_listener *io_new_listener_(int fd,
+ void (*init)(int fd, void *arg),
+ void *arg)
+{
+ struct io_listener *l = malloc(sizeof(*l));
+
+ if (!l)
+ return NULL;
+
+ l->fd.listener = true;
+ l->fd.fd = fd;
+ l->init = init;
+ l->arg = arg;
+ if (!add_listener(l)) {
+ free(l);
+ return NULL;
+ }
+ return l;
+}
+
+void io_close_listener(struct io_listener *l)
+{
+ close(l->fd.fd);
+ del_listener(l);
+ free(l);
+}
+
+struct io_conn *io_new_conn_(int fd, struct io_plan plan)
+{
+ struct io_conn *conn = malloc(sizeof(*conn));
+
+ io_plan_debug_again();
+
+ if (!conn)
+ return NULL;
+
+ conn->fd.listener = false;
+ conn->fd.fd = fd;
+ conn->plan = plan;
+ conn->finish = NULL;
+ conn->finish_arg = NULL;
+ conn->duplex = NULL;
+ conn->timeout = NULL;
+ if (!add_conn(conn)) {
+ free(conn);
+ return NULL;
+ }
+ return conn;
+}
+
+void io_set_finish_(struct io_conn *conn,
+ void (*finish)(struct io_conn *, void *),
+ void *arg)
+{
+ conn->finish = finish;
+ conn->finish_arg = arg;
+}
+
+struct io_conn *io_duplex_(struct io_conn *old, struct io_plan plan)
+{
+ struct io_conn *conn;
+
+ io_plan_debug_again();
+
+ assert(!old->duplex);
+
+ conn = malloc(sizeof(*conn));
+ if (!conn)
+ return NULL;
+
+ conn->fd.listener = false;
+ conn->fd.fd = old->fd.fd;
+ conn->plan = plan;
+ conn->duplex = old;
+ conn->finish = NULL;
+ conn->finish_arg = NULL;
+ conn->timeout = NULL;
+ if (!add_duplex(conn)) {
+ free(conn);
+ return NULL;
+ }
+ old->duplex = conn;
+ return conn;
+}
+
+bool io_timeout_(struct io_conn *conn, struct timespec ts,
+ struct io_plan (*cb)(struct io_conn *, void *), void *arg)
+{
+ assert(cb);
+
+ if (!conn->timeout) {
+ conn->timeout = malloc(sizeof(*conn->timeout));
+ if (!conn->timeout)
+ return false;
+ } else
+ assert(!timeout_active(conn));
+
+ conn->timeout->next = cb;
+ conn->timeout->next_arg = arg;
+ backend_add_timeout(conn, ts);
+ return true;
+}
+
+/* Returns true if we're finished. */
+static int do_write(int fd, struct io_plan *plan)
+{
+ ssize_t ret = write(fd, plan->u.write.buf, plan->u.write.len);
+ if (ret < 0)
+ return io_debug_io(-1);
+
+ plan->u.write.buf += ret;
+ plan->u.write.len -= ret;
+ return io_debug_io(plan->u.write.len == 0);
+}
+
+/* Queue some data to be written. */
+struct io_plan io_write_(const void *data, size_t len,
+ struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg)
+{
+ struct io_plan plan;
+
+ assert(cb);
+ plan.u.write.buf = data;
+ plan.u.write.len = len;
+ plan.io = do_write;
+ plan.next = cb;
+ plan.next_arg = arg;
+ plan.pollflag = POLLOUT;
+
+ return plan;
+}
+
+static int do_read(int fd, struct io_plan *plan)
+{
+ ssize_t ret = read(fd, plan->u.read.buf, plan->u.read.len);
+ if (ret <= 0)
+ return io_debug_io(-1);
+
+ plan->u.read.buf += ret;
+ plan->u.read.len -= ret;
+ return io_debug_io(plan->u.read.len == 0);
+}
+
+/* Queue a request to read into a buffer. */
+struct io_plan io_read_(void *data, size_t len,
+ struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg)
+{
+ struct io_plan plan;
+
+ assert(cb);
+ plan.u.read.buf = data;
+ plan.u.read.len = len;
+ plan.io = do_read;
+ plan.next = cb;
+ plan.next_arg = arg;
+ plan.pollflag = POLLIN;
+
+ return plan;
+}
+
+static int do_read_partial(int fd, struct io_plan *plan)
+{
+ ssize_t ret = read(fd, plan->u.readpart.buf, *plan->u.readpart.lenp);
+ if (ret <= 0)
+ return io_debug_io(-1);
+
+ *plan->u.readpart.lenp = ret;
+ return io_debug_io(1);
+}
+
+/* Queue a partial request to read into a buffer. */
+struct io_plan io_read_partial_(void *data, size_t *len,
+ struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg)
+{
+ struct io_plan plan;
+
+ assert(cb);
+ plan.u.readpart.buf = data;
+ plan.u.readpart.lenp = len;
+ plan.io = do_read_partial;
+ plan.next = cb;
+ plan.next_arg = arg;
+ plan.pollflag = POLLIN;
+
+ return plan;
+}
+
+static int do_write_partial(int fd, struct io_plan *plan)
+{
+ ssize_t ret = write(fd, plan->u.writepart.buf, *plan->u.writepart.lenp);
+ if (ret < 0)
+ return io_debug_io(-1);
+
+ *plan->u.writepart.lenp = ret;
+ return io_debug_io(1);
+}
+
+/* Queue a partial write request. */
+struct io_plan io_write_partial_(const void *data, size_t *len,
+ struct io_plan (*cb)(struct io_conn*, void *),
+ void *arg)
+{
+ struct io_plan plan;
+
+ assert(cb);
+ plan.u.writepart.buf = data;
+ plan.u.writepart.lenp = len;
+ plan.io = do_write_partial;
+ plan.next = cb;
+ plan.next_arg = arg;
+ plan.pollflag = POLLOUT;
+
+ return plan;
+}
+
+static int already_connected(int fd, struct io_plan *plan)
+{
+ return io_debug_io(1);
+}
+
+static int do_connect(int fd, struct io_plan *plan)
+{
+ int err, ret;
+ socklen_t len = sizeof(err);
+
+ /* Has async connect finished? */
+ ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
+ if (ret < 0)
+ return -1;
+
+ if (err == 0) {
+ /* Restore blocking if it was initially. */
+ fcntl(fd, F_SETFD, plan->u.len_len.len1);
+ return 1;
+ }
+ return 0;
+}
+
+struct io_plan io_connect_(int fd, const struct addrinfo *addr,
+ struct io_plan (*cb)(struct io_conn*, void *),
+ void *arg)
+{
+ struct io_plan plan;
+
+ assert(cb);
+
+ plan.next = cb;
+ plan.next_arg = arg;
+
+ /* Save old flags, set nonblock if not already. */
+ plan.u.len_len.len1 = fcntl(fd, F_GETFD);
+ fcntl(fd, F_SETFD, plan.u.len_len.len1 | O_NONBLOCK);
+
+ /* Immediate connect can happen. */
+ if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0) {
+ /* Dummy will be called immediately. */
+ plan.pollflag = POLLOUT;
+ plan.io = already_connected;
+ } else {
+ if (errno != EINPROGRESS)
+ return io_close_();
+
+ plan.pollflag = POLLIN;
+ plan.io = do_connect;
+ }
+ return plan;
+}
+
+struct io_plan io_idle_(void)
+{
+ struct io_plan plan;
+
+ plan.pollflag = 0;
+ plan.io = NULL;
+ /* Never called (overridden by io_wake), but NULL means closing */
+ plan.next = (void *)io_idle_;
+
+ return plan;
+}
+
+void io_wake_(struct io_conn *conn, struct io_plan plan)
+
+{
+ io_plan_debug_again();
+
+ /* It might be closing, but we haven't called its finish() yet. */
+ if (!conn->plan.next)
+ return;
+ /* It was idle, right? */
+ assert(!conn->plan.io);
+ conn->plan = plan;
+ backend_plan_changed(conn);
+
+ debug_io_wake(conn);
+}
+
+void io_ready(struct io_conn *conn)
+{
+ set_current(conn);
+ switch (conn->plan.io(conn->fd.fd, &conn->plan)) {
+ case -1: /* Failure means a new plan: close up. */
+ conn->plan = io_close();
+ backend_plan_changed(conn);
+ break;
+ case 0: /* Keep going with plan. */
+ break;
+ case 1: /* Done: get next plan. */
+ if (timeout_active(conn))
+ backend_del_timeout(conn);
+ conn->plan = conn->plan.next(conn, conn->plan.next_arg);
+ backend_plan_changed(conn);
+ }
+ set_current(NULL);
+}
+
+/* Close the connection, we're done. */
+struct io_plan io_close_(void)
+{
+ struct io_plan plan;
+
+ plan.pollflag = 0;
+ /* This means we're closing. */
+ plan.next = NULL;
+ plan.u.close.saved_errno = errno;
+
+ return plan;
+}
+
+struct io_plan io_close_cb(struct io_conn *conn, void *arg)
+{
+ return io_close();
+}
+
+/* Exit the loop, returning this (non-NULL) arg. */
+struct io_plan io_break_(void *ret, struct io_plan plan)
+{
+ io_plan_debug_again();
+
+ assert(ret);
+ io_loop_return = ret;
+
+ return plan;
+}
--- /dev/null
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#ifndef CCAN_IO_H
+#define CCAN_IO_H
+#include <ccan/typesafe_cb/typesafe_cb.h>
+#include <ccan/time/time.h>
+#include <stdbool.h>
+#include <unistd.h>
+#include "io_plan.h"
+
+/**
+ * io_new_conn - create a new connection.
+ * @fd: the file descriptor.
+ * @plan: the first I/O to perform.
+ *
+ * This creates a connection which owns @fd. @plan will be called on the
+ * next io_loop().
+ *
+ * Returns NULL on error (and sets errno).
+ *
+ * Example:
+ * int fd[2];
+ * struct io_conn *conn;
+ *
+ * pipe(fd);
+ * // Plan is to close the fd immediately.
+ * conn = io_new_conn(fd[0], io_close());
+ * if (!conn)
+ * exit(1);
+ */
+#define io_new_conn(fd, plan) \
+ (io_plan_no_debug(), io_new_conn_((fd), (plan)))
+struct io_conn *io_new_conn_(int fd, struct io_plan plan);
+
+/**
+ * io_set_finish - set finish function on a connection.
+ * @conn: the connection.
+ * @finish: the function to call when it's closed or fails.
+ * @arg: the argument to @finish.
+ *
+ * @finish will be called when an I/O operation fails, or you call
+ * io_close() on the connection. errno will be set to the value
+ * after the failed I/O, or at the call to io_close().
+ *
+ * Example:
+ * static void finish(struct io_conn *conn, void *unused)
+ * {
+ * // errno is not 0 after success, so this is a bit useless.
+ * printf("Conn %p closed with errno %i\n", conn, errno);
+ * }
+ * ...
+ * io_set_finish(conn, finish, NULL);
+ */
+#define io_set_finish(conn, finish, arg) \
+ io_set_finish_((conn), \
+ typesafe_cb_preargs(void, void *, \
+ (finish), (arg), \
+ struct io_conn *), \
+ (arg))
+void io_set_finish_(struct io_conn *conn,
+ void (*finish)(struct io_conn *, void *),
+ void *arg);
+
+/**
+ * io_new_listener - create a new accepting listener.
+ * @fd: the file descriptor.
+ * @init: the function to call for a new connection
+ * @arg: the argument to @init.
+ *
+ * When @fd becomes readable, we accept() and pass that fd to init().
+ *
+ * Returns NULL on error (and sets errno).
+ *
+ * Example:
+ * #include <sys/types.h>
+ * #include <sys/socket.h>
+ * #include <netdb.h>
+ *
+ * static void start_conn(int fd, char *msg)
+ * {
+ * printf("%s fd %i\n", msg, fd);
+ * close(fd);
+ * }
+ *
+ * // Set up a listening socket, return it.
+ * static struct io_listener *do_listen(const char *port)
+ * {
+ * struct addrinfo *addrinfo, hints;
+ * int fd, on = 1;
+ *
+ * memset(&hints, 0, sizeof(hints));
+ * hints.ai_family = AF_UNSPEC;
+ * hints.ai_socktype = SOCK_STREAM;
+ * hints.ai_flags = AI_PASSIVE;
+ * hints.ai_protocol = 0;
+ *
+ * if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ * return NULL;
+ *
+ * fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ * addrinfo->ai_protocol);
+ * if (fd < 0)
+ * return NULL;
+ *
+ * freeaddrinfo(addrinfo);
+ * setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ * if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ * close(fd);
+ * return NULL;
+ * }
+ * if (listen(fd, 1) != 0) {
+ * close(fd);
+ * return NULL;
+ * }
+ * return io_new_listener(fd, start_conn, (char *)"Got one!");
+ * }
+ */
+#define io_new_listener(fd, init, arg) \
+ io_new_listener_((fd), \
+ typesafe_cb_preargs(void, void *, \
+ (init), (arg), \
+ int fd), \
+ (arg))
+struct io_listener *io_new_listener_(int fd,
+ void (*init)(int fd, void *arg),
+ void *arg);
+
+/**
+ * io_close_listener - delete a listener.
+ * @listener: the listener returned from io_new_listener.
+ *
+ * This closes the fd and frees @listener.
+ *
+ * Example:
+ * ...
+ * struct io_listener *l = do_listen("8111");
+ * if (l) {
+ * io_loop();
+ * io_close_listener(l);
+ * }
+ */
+void io_close_listener(struct io_listener *listener);
+
+/**
+ * io_write - plan to write data.
+ * @data: the data buffer.
+ * @len: the length to write.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan write out a data buffer. Once it's all
+ * written, the @cb function will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * static void start_conn_with_write(int fd, const char *msg)
+ * {
+ * // Write message, then close.
+ * io_new_conn(fd, io_write(msg, strlen(msg), io_close_cb, NULL));
+ * }
+ */
+#define io_write(data, len, cb, arg) \
+ io_debug(io_write_((data), (len), \
+ typesafe_cb_preargs(struct io_plan, void *, \
+ (cb), (arg), struct io_conn *), \
+ (arg)))
+struct io_plan io_write_(const void *data, size_t len,
+ struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg);
+
+/**
+ * io_read - plan to read data.
+ * @data: the data buffer.
+ * @len: the length to read.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan to read data into a buffer. Once it's all
+ * read, the @cb function will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * static void start_conn_with_read(int fd, char msg[12])
+ * {
+ * // Read message, then close.
+ * io_new_conn(fd, io_read(msg, 12, io_close_cb, NULL));
+ * }
+ */
+#define io_read(data, len, cb, arg) \
+ io_debug(io_read_((data), (len), \
+ typesafe_cb_preargs(struct io_plan, void *, \
+ (cb), (arg), struct io_conn *), \
+ (arg)))
+struct io_plan io_read_(void *data, size_t len,
+ struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg);
+
+
+/**
+ * io_read_partial - plan to read some data.
+ * @data: the data buffer.
+ * @len: the maximum length to read, set to the length actually read.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan to read data into a buffer. Once any data is
+ * read, @len is updated and the @cb function will be called: on an
+ * error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * struct buf {
+ * size_t len;
+ * char buf[12];
+ * };
+ *
+ * static struct io_plan dump_and_close(struct io_conn *conn, struct buf *b)
+ * {
+ * printf("Partial read: '%*s'\n", (int)b->len, b->buf);
+ * free(b);
+ * return io_close();
+ * }
+ *
+ * static void start_conn_with_part_read(int fd, void *unused)
+ * {
+ * struct buf *b = malloc(sizeof(*b));
+ *
+ * // Read message, then dump and close.
+ * b->len = sizeof(b->buf);
+ * io_new_conn(fd, io_read_partial(b->buf, &b->len, dump_and_close, b));
+ * }
+ */
+#define io_read_partial(data, len, cb, arg) \
+ io_debug(io_read_partial_((data), (len), \
+ typesafe_cb_preargs(struct io_plan, void *, \
+ (cb), (arg), \
+ struct io_conn *), \
+ (arg)))
+struct io_plan io_read_partial_(void *data, size_t *len,
+ struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg);
+
+/**
+ * io_write_partial - plan to write some data.
+ * @data: the data buffer.
+ * @len: the maximum length to write, set to the length actually written.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan to write data from a buffer. Once any data is
+ * written, @len is updated and the @cb function will be called: on an
+ * error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * struct buf {
+ * size_t len;
+ * char buf[12];
+ * };
+ *
+ * static struct io_plan show_remainder(struct io_conn *conn, struct buf *b)
+ * {
+ * printf("Only wrote: '%*s'\n", (int)b->len, b->buf);
+ * free(b);
+ * return io_close();
+ * }
+ *
+ * static void start_conn_with_part_read(int fd, void *unused)
+ * {
+ * struct buf *b = malloc(sizeof(*b));
+ *
+ * // Write message, then dump and close.
+ * b->len = sizeof(b->buf);
+ * strcpy(b->buf, "Hello world");
+ * io_new_conn(fd, io_write_partial(b->buf, &b->len, show_remainder, b));
+ * }
+ */
+#define io_write_partial(data, len, cb, arg) \
+ io_debug(io_write_partial_((data), (len), \
+ typesafe_cb_preargs(struct io_plan, void *, \
+ (cb), (arg), \
+ struct io_conn *), \
+ (arg)))
+struct io_plan io_write_partial_(const void *data, size_t *len,
+ struct io_plan (*cb)(struct io_conn *, void*),
+ void *arg);
+
+/**
+ * io_connect - plan to connect to a listening socket.
+ * @fd: file descriptor.
+ * @addr: where to connect.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This initiates a connection, and creates a plan for
+ * (asynchronously). completing it. Once complete, @len is updated
+ * and the @cb function will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the connect may actually be done immediately.
+ *
+ * Example:
+ * #include <sys/types.h>
+ * #include <sys/socket.h>
+ * #include <netdb.h>
+ *
+ * // Write, then close socket.
+ * static struct io_plan start_write(struct io_conn *conn, void *unused)
+ * {
+ * return io_write("hello", 5, io_close_cb, NULL);
+ * }
+ *
+ * ...
+ *
+ * int fd;
+ * struct addrinfo *addrinfo;
+ *
+ * fd = socket(AF_INET, SOCK_STREAM, 0);
+ * getaddrinfo("localhost", "8111", NULL, &addrinfo);
+ * io_new_conn(fd, io_connect(fd, addrinfo, start_write, NULL));
+ */
+struct addrinfo;
+#define io_connect(fd, addr, cb, arg) \
+ io_debug(io_connect_((fd), (addr), \
+ typesafe_cb_preargs(struct io_plan, void *, \
+ (cb), (arg), \
+ struct io_conn *), \
+ (arg)))
+struct io_plan io_connect_(int fd, const struct addrinfo *addr,
+ struct io_plan (*cb)(struct io_conn *, void*),
+ void *arg);
+
+/**
+ * io_idle - plan to do nothing.
+ *
+ * This indicates the connection is idle: io_wake() will be called later do
+ * give the connection a new plan.
+ *
+ * Example:
+ * struct io_conn *sleeper;
+ * sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ * if (!sleeper)
+ * exit(1);
+ */
+#define io_idle() io_debug(io_idle_())
+struct io_plan io_idle_(void);
+
+/**
+ * io_timeout - set timeout function if the callback doesn't complete.
+ * @conn: the current connection.
+ * @ts: how long until the timeout should be called.
+ * @cb: callback to call.
+ * @arg: argument to @cb.
+ *
+ * If the usual next callback is not called for this connection before @ts,
+ * this function will be called. If next callback is called, the timeout
+ * is automatically removed.
+ *
+ * Returns false on allocation failure. A connection can only have one
+ * timeout.
+ *
+ * Example:
+ * static struct io_plan close_on_timeout(struct io_conn *conn, char *msg)
+ * {
+ * printf("%s\n", msg);
+ * return io_close();
+ * }
+ *
+ * ...
+ * io_timeout(sleeper, time_from_msec(100),
+ * close_on_timeout, (char *)"Bye!");
+ */
+#define io_timeout(conn, ts, fn, arg) \
+ io_timeout_((conn), (ts), \
+ typesafe_cb_preargs(struct io_plan, void *, \
+ (fn), (arg), \
+ struct io_conn *), \
+ (arg))
+bool io_timeout_(struct io_conn *conn, struct timespec ts,
+ struct io_plan (*fn)(struct io_conn *, void *), void *arg);
+
+/**
+ * io_duplex - split an fd into two connections.
+ * @conn: a connection.
+ * @plan: the first I/O function to call.
+ *
+ * Sometimes you want to be able to simultaneously read and write on a
+ * single fd, but io forces a linear call sequence. The solution is
+ * to have two connections for the same fd, and use one for read
+ * operations and one for write.
+ *
+ * You must io_close() both of them to close the fd.
+ *
+ * Example:
+ * static void setup_read_write(int fd,
+ * char greet_in[5], const char greet_out[5])
+ * {
+ * struct io_conn *writer, *reader;
+ *
+ * // Read their greeting and send ours at the same time.
+ * writer = io_new_conn(fd,
+ * io_write(greet_out, 5, io_close_cb, NULL));
+ * reader = io_duplex(writer,
+ * io_read(greet_in, 5, io_close_cb, NULL));
+ * if (!reader || !writer)
+ * exit(1);
+ * }
+ */
+#define io_duplex(conn, plan) \
+ (io_plan_no_debug(), io_duplex_((conn), (plan)))
+struct io_conn *io_duplex_(struct io_conn *conn, struct io_plan plan);
+
+/**
+ * io_wake - wake up an idle connection.
+ * @conn: an idle connection.
+ * @plan: the next I/O plan for @conn.
+ *
+ * This makes @conn ready to do I/O the next time around the io_loop().
+ *
+ * Example:
+ * struct io_conn *sleeper;
+ * sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ *
+ * io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL));
+ */
+#define io_wake(conn, plan) (io_plan_no_debug(), io_wake_((conn), (plan)))
+void io_wake_(struct io_conn *conn, struct io_plan plan);
+
+/**
+ * io_break - return from io_loop()
+ * @ret: non-NULL value to return from io_loop().
+ * @plan: I/O to perform on return (if any)
+ *
+ * This breaks out of the io_loop. As soon as the current @next
+ * function returns, any io_closed()'d connections will have their
+ * finish callbacks called, then io_loop() with return with @ret.
+ *
+ * If io_loop() is called again, then @plan will be carried out.
+ *
+ * Example:
+ * static struct io_plan fail_on_timeout(struct io_conn *conn, char *msg)
+ * {
+ * return io_break(msg, io_close());
+ * }
+ */
+#define io_break(ret, plan) (io_plan_no_debug(), io_break_((ret), (plan)))
+struct io_plan io_break_(void *ret, struct io_plan plan);
+
+/* FIXME: io_recvfrom/io_sendto */
+
+/**
+ * io_close - plan to close a connection.
+ *
+ * On return to io_loop, the connection will be closed.
+ *
+ * Example:
+ * static struct io_plan close_on_timeout(struct io_conn *conn, const char *msg)
+ * {
+ * printf("closing: %s\n", msg);
+ * return io_close();
+ * }
+ */
+#define io_close() io_debug(io_close_())
+struct io_plan io_close_(void);
+
+/**
+ * io_close_cb - helper callback to close a connection.
+ * @conn: the connection.
+ *
+ * This schedules a connection to be closed; designed to be used as
+ * a callback function.
+ *
+ * Example:
+ * #define close_on_timeout io_close_cb
+ */
+struct io_plan io_close_cb(struct io_conn *, void *unused);
+
+/**
+ * io_loop - process fds until all closed on io_break.
+ *
+ * This is the core loop; it exits with the io_break() arg, or NULL if
+ * all connections and listeners are closed.
+ *
+ * Example:
+ * io_loop();
+ */
+void *io_loop(void);
+#endif /* CCAN_IO_H */
--- /dev/null
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#ifndef CCAN_IO_PLAN_H
+#define CCAN_IO_PLAN_H
+struct io_conn;
+
+/**
+ * struct io_plan - a plan of what I/O to do.
+ * @pollflag: POLLIN or POLLOUT.
+ * @io: function to call when fd is available for @pollflag.
+ * @next: function to call after @io returns true.
+ * @next_arg: argument to @next.
+ * @u: scratch area for I/O.
+ *
+ * When the fd is POLLIN or POLLOUT (according to @pollflag), @io is
+ * called. If it returns -1, io_close() becomed the new plan (and errno
+ * is saved). If it returns 1, @next is called, otherwise @io is
+ * called again when @pollflag is available.
+ *
+ * You can use this to write your own io_plan functions.
+ */
+struct io_plan {
+ int pollflag;
+ /* Only NULL if idle. */
+ int (*io)(int fd, struct io_plan *plan);
+ /* Only NULL if closing. */
+ struct io_plan (*next)(struct io_conn *, void *arg);
+ void *next_arg;
+
+ union {
+ struct {
+ char *buf;
+ size_t len;
+ } read;
+ struct {
+ const char *buf;
+ size_t len;
+ } write;
+ struct {
+ char *buf;
+ size_t *lenp;
+ } readpart;
+ struct {
+ const char *buf;
+ size_t *lenp;
+ } writepart;
+ struct {
+ int saved_errno;
+ } close;
+ struct {
+ void *p;
+ size_t len;
+ } ptr_len;
+ struct {
+ void *p1;
+ void *p2;
+ } ptr_ptr;
+ struct {
+ size_t len1;
+ size_t len2;
+ } len_len;
+ } u;
+};
+
+#ifdef DEBUG
+/**
+ * io_debug_conn - routine to select connection(s) to debug.
+ *
+ * If this is set, the routine should return true if the connection is a
+ * debugging candidate. If so, the callchain for I/O operations on this
+ * connection will be linear, for easier use of a debugger.
+ *
+ * You will also see calls to any callbacks which wake the connection
+ * which is being debugged.
+ *
+ * Example:
+ * static bool debug_all(struct io_conn *conn)
+ * {
+ * return true();
+ * }
+ * ...
+ * io_debug_conn = debug_all;
+ */
+extern bool (*io_debug_conn)(struct io_conn *conn);
+
+/**
+ * io_debug - if we're debugging the current connection, call immediately.
+ *
+ * This determines if we are debugging the current connection: if so,
+ * it immediately applies the plan and calls back into io_loop() to
+ * create a linear call chain.
+ *
+ * Example:
+ * #define io_idle() io_debug(io_idle_())
+ * struct io_plan io_idle_(void);
+ */
+struct io_plan io_debug(struct io_plan plan);
+
+/**
+ * io_debug_io - return from function which actually does I/O.
+ *
+ * This determines if we are debugging the current connection: if so,
+ * it immediately sets the next function and calls into io_loop() to
+ * create a linear call chain.
+ *
+ * Example:
+ *
+ * static int do_write(int fd, struct io_plan *plan)
+ * {
+ * ssize_t ret = write(fd, plan->u.write.buf, plan->u.write.len);
+ * if (ret < 0)
+ * return io_debug_io(-1);
+ *
+ * plan->u.write.buf += ret;
+ * plan->u.write.len -= ret;
+ * return io_debug_io(plan->u.write.len == 0);
+ * }
+ */
+int io_debug_io(int ret);
+
+/**
+ * io_plan_no_debug - mark the next plan not to be called immediately.
+ *
+ * Most routines which take a plan are about to apply it to the current
+ * connection. We (ab)use this pattern for debugging: as soon as such a
+ * plan is created it is called, to create a linear call chain.
+ *
+ * Some routines, like io_break(), io_duplex() and io_wake() take an
+ * io_plan, but they must not be applied immediately to the current
+ * connection, so we call this first.
+ *
+ * Example:
+ * #define io_break(ret, plan) (io_plan_no_debug(), io_break_((ret), (plan)))
+ * struct io_plan io_break_(void *ret, struct io_plan plan);
+ */
+#define io_plan_no_debug() ((io_plan_nodebug = true))
+
+extern bool io_plan_nodebug;
+#else
+static inline struct io_plan io_debug(struct io_plan plan)
+{
+ return plan;
+}
+static inline int io_debug_io(int ret)
+{
+ return ret;
+}
+#define io_plan_no_debug() (void)0
+#endif
+
+#endif /* CCAN_IO_PLAN_H */
--- /dev/null
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <assert.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <limits.h>
+#include <errno.h>
+
+static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0;
+static struct pollfd *pollfds = NULL;
+static struct fd **fds = NULL;
+static struct timers timeouts;
+#ifdef DEBUG
+static unsigned int io_loop_level;
+static struct io_conn *free_later;
+static void io_loop_enter(void)
+{
+ io_loop_level++;
+}
+static void io_loop_exit(void)
+{
+ io_loop_level--;
+ if (io_loop_level == 0) {
+ /* Delayed free. */
+ while (free_later) {
+ struct io_conn *c = free_later;
+ free_later = c->finish_arg;
+ free(c);
+ }
+ }
+}
+static void free_conn(struct io_conn *conn)
+{
+ /* Only free on final exit: chain via finish. */
+ if (io_loop_level > 1) {
+ struct io_conn *c;
+ for (c = free_later; c; c = c->finish_arg)
+ assert(c != conn);
+ conn->finish_arg = free_later;
+ free_later = conn;
+ } else
+ free(conn);
+}
+#else
+static void io_loop_enter(void)
+{
+}
+static void io_loop_exit(void)
+{
+}
+static void free_conn(struct io_conn *conn)
+{
+ free(conn);
+}
+#endif
+
+static bool add_fd(struct fd *fd, short events)
+{
+ if (num_fds + 1 > max_fds) {
+ struct pollfd *newpollfds;
+ struct fd **newfds;
+ size_t num = max_fds ? max_fds * 2 : 8;
+
+ newpollfds = realloc(pollfds, sizeof(*newpollfds) * num);
+ if (!newpollfds)
+ return false;
+ pollfds = newpollfds;
+ newfds = realloc(fds, sizeof(*newfds) * num);
+ if (!newfds)
+ return false;
+ fds = newfds;
+ max_fds = num;
+ }
+
+ pollfds[num_fds].events = events;
+ /* In case it's idle. */
+ if (!events)
+ pollfds[num_fds].fd = -fd->fd;
+ else
+ pollfds[num_fds].fd = fd->fd;
+ pollfds[num_fds].revents = 0; /* In case we're iterating now */
+ fds[num_fds] = fd;
+ fd->backend_info = num_fds;
+ num_fds++;
+ if (events)
+ num_waiting++;
+
+ return true;
+}
+
+static void del_fd(struct fd *fd)
+{
+ size_t n = fd->backend_info;
+
+ assert(n != -1);
+ assert(n < num_fds);
+ if (pollfds[n].events)
+ num_waiting--;
+ if (n != num_fds - 1) {
+ /* Move last one over us. */
+ pollfds[n] = pollfds[num_fds-1];
+ fds[n] = fds[num_fds-1];
+ assert(fds[n]->backend_info == num_fds-1);
+ fds[n]->backend_info = n;
+ } else if (num_fds == 1) {
+ /* Free everything when no more fds. */
+ free(pollfds);
+ free(fds);
+ pollfds = NULL;
+ fds = NULL;
+ max_fds = 0;
+ }
+ num_fds--;
+ fd->backend_info = -1;
+ close(fd->fd);
+}
+
+bool add_listener(struct io_listener *l)
+{
+ if (!add_fd(&l->fd, POLLIN))
+ return false;
+ return true;
+}
+
+void backend_plan_changed(struct io_conn *conn)
+{
+ struct pollfd *pfd;
+
+ /* This can happen with debugging and delayed free... */
+ if (conn->fd.backend_info == -1)
+ return;
+
+ pfd = &pollfds[conn->fd.backend_info];
+
+ if (pfd->events)
+ num_waiting--;
+
+ pfd->events = conn->plan.pollflag;
+ if (conn->duplex) {
+ int mask = conn->duplex->plan.pollflag;
+ /* You can't *both* read/write. */
+ assert(!mask || pfd->events != mask);
+ pfd->events |= mask;
+ }
+ if (pfd->events) {
+ num_waiting++;
+ pfd->fd = conn->fd.fd;
+ } else
+ pfd->fd = -conn->fd.fd;
+
+ if (!conn->plan.next)
+ num_closing++;
+}
+
+bool add_conn(struct io_conn *c)
+{
+ if (!add_fd(&c->fd, c->plan.pollflag))
+ return false;
+ /* Immediate close is allowed. */
+ if (!c->plan.next)
+ num_closing++;
+ return true;
+}
+
+bool add_duplex(struct io_conn *c)
+{
+ c->fd.backend_info = c->duplex->fd.backend_info;
+ backend_plan_changed(c);
+ return true;
+}
+
+void backend_del_conn(struct io_conn *conn)
+{
+ if (conn->finish) {
+ errno = conn->plan.u.close.saved_errno;
+ conn->finish(conn, conn->finish_arg);
+ }
+ if (timeout_active(conn))
+ backend_del_timeout(conn);
+ free(conn->timeout);
+ if (conn->duplex) {
+ /* In case fds[] pointed to the other one. */
+ fds[conn->fd.backend_info] = &conn->duplex->fd;
+ conn->duplex->duplex = NULL;
+ conn->fd.backend_info = -1;
+ } else
+ del_fd(&conn->fd);
+ num_closing--;
+ free_conn(conn);
+}
+
+void del_listener(struct io_listener *l)
+{
+ del_fd(&l->fd);
+}
+
+static void set_plan(struct io_conn *conn, struct io_plan plan)
+{
+ conn->plan = plan;
+ backend_plan_changed(conn);
+}
+
+static void accept_conn(struct io_listener *l)
+{
+ int fd = accept(l->fd.fd, NULL, NULL);
+
+ /* FIXME: What to do here? */
+ if (fd < 0)
+ return;
+ l->init(fd, l->arg);
+}
+
+/* It's OK to miss some, as long as we make progress. */
+static bool finish_conns(struct io_conn **ready)
+{
+ unsigned int i;
+
+ for (i = 0; !io_loop_return && i < num_fds; i++) {
+ struct io_conn *c, *duplex;
+
+ if (!num_closing)
+ break;
+
+ if (fds[i]->listener)
+ continue;
+ c = (void *)fds[i];
+ for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
+ if (!c->plan.next) {
+ if (doing_debug_on(c) && ready) {
+ *ready = c;
+ return true;
+ }
+ backend_del_conn(c);
+ i--;
+ }
+ }
+ }
+ return false;
+}
+
+void backend_add_timeout(struct io_conn *conn, struct timespec duration)
+{
+ if (!timeouts.base)
+ timers_init(&timeouts, time_now());
+ timer_add(&timeouts, &conn->timeout->timer,
+ time_add(time_now(), duration));
+ conn->timeout->conn = conn;
+}
+
+void backend_del_timeout(struct io_conn *conn)
+{
+ assert(conn->timeout->conn == conn);
+ timer_del(&timeouts, &conn->timeout->timer);
+ conn->timeout->conn = NULL;
+}
+
+/* This is the main loop. */
+void *do_io_loop(struct io_conn **ready)
+{
+ void *ret;
+
+ io_loop_enter();
+
+ while (!io_loop_return) {
+ int i, r, timeout = INT_MAX;
+ struct timespec now;
+ bool some_timeouts = false;
+
+ if (timeouts.base) {
+ struct timespec first;
+ struct list_head expired;
+ struct io_timeout *t;
+
+ now = time_now();
+
+ /* Call functions for expired timers. */
+ timers_expire(&timeouts, now, &expired);
+ while ((t = list_pop(&expired, struct io_timeout, timer.list))) {
+ struct io_conn *conn = t->conn;
+ /* Clear, in case timer re-adds */
+ t->conn = NULL;
+ set_current(conn);
+ set_plan(conn, t->next(conn, t->next_arg));
+ some_timeouts = true;
+ }
+
+ /* Now figure out how long to wait for the next one. */
+ if (timer_earliest(&timeouts, &first)) {
+ uint64_t f = time_to_msec(time_sub(first, now));
+ if (f < INT_MAX)
+ timeout = f;
+ }
+ }
+
+ if (num_closing) {
+ /* If this finishes a debugging con, return now. */
+ if (finish_conns(ready))
+ return NULL;
+ /* Could have started/finished more. */
+ continue;
+ }
+
+ /* debug can recurse on io_loop; anything can change. */
+ if (doing_debug() && some_timeouts)
+ continue;
+
+ if (num_fds == 0)
+ break;
+
+ /* You can't tell them all to go to sleep! */
+ assert(num_waiting);
+
+ r = poll(pollfds, num_fds, timeout);
+ if (r < 0)
+ break;
+
+ for (i = 0; i < num_fds && !io_loop_return; i++) {
+ struct io_conn *c = (void *)fds[i];
+ int events = pollfds[i].revents;
+
+ if (r == 0)
+ break;
+
+ if (fds[i]->listener) {
+ if (events & POLLIN) {
+ accept_conn((void *)c);
+ r--;
+ }
+ } else if (events & (POLLIN|POLLOUT)) {
+ r--;
+ if (c->duplex) {
+ int mask = c->duplex->plan.pollflag;
+ if (events & mask) {
+ if (doing_debug_on(c->duplex)
+ && ready) {
+ *ready = c->duplex;
+ return NULL;
+ }
+ io_ready(c->duplex);
+ events &= ~mask;
+ /* debug can recurse;
+ * anything can change. */
+ if (doing_debug())
+ break;
+ if (!(events&(POLLIN|POLLOUT)))
+ continue;
+ }
+ }
+ if (doing_debug_on(c) && ready) {
+ *ready = c;
+ return NULL;
+ }
+ io_ready(c);
+ /* debug can recurse; anything can change. */
+ if (doing_debug())
+ break;
+ } else if (events & (POLLHUP|POLLNVAL|POLLERR)) {
+ r--;
+ set_current(c);
+ errno = EBADF;
+ set_plan(c, io_close());
+ if (c->duplex) {
+ set_current(c->duplex);
+ set_plan(c->duplex, io_close());
+ }
+ }
+ }
+ }
+
+ while (num_closing && !io_loop_return) {
+ if (finish_conns(ready))
+ return NULL;
+ }
+
+ ret = io_loop_return;
+ io_loop_return = NULL;
+
+ io_loop_exit();
+ return ret;
+}
+
+void *io_loop(void)
+{
+ return do_io_loop(NULL);
+}
--- /dev/null
+#define DEBUG
+#define PORT "64001"
+#define main real_main
+int real_main(void);
+#include "run-01-start-finish.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65001"
+#endif
+
+static void finish_ok(struct io_conn *conn, int *state)
+{
+ ok1(*state == 1);
+ (*state)++;
+ io_break(state + 1, io_idle());
+}
+
+static void init_conn(int fd, int *state)
+{
+ ok1(*state == 0);
+ (*state)++;
+ io_set_finish(io_new_conn(fd, io_close()), finish_ok, state);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ int state = 0;
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd;
+
+ /* This is how many tests you plan to run */
+ plan_tests(9);
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, &state);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ close(fd);
+ freeaddrinfo(addrinfo);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+ ok1(io_loop() == &state + 1);
+ ok1(state == 2);
+ io_close_listener(l);
+ ok1(wait(&state));
+ ok1(WIFEXITED(state));
+ ok1(WEXITSTATUS(state) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64002"
+#define main real_main
+int real_main(void);
+#include "run-02-read.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65002"
+#endif
+
+struct data {
+ int state;
+ char buf[4];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+
+ io_set_finish(io_new_conn(fd,
+ io_read(d->buf, sizeof(d->buf), io_close_cb, d)),
+ finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(10);
+ d->state = 0;
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ int i;
+
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ for (i = 0; i < strlen("hellothere"); i++) {
+ if (write(fd, "hellothere" + i, 1) != 1)
+ break;
+ }
+ close(fd);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+ ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+ free(d);
+ io_close_listener(l);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64003"
+#define main real_main
+int real_main(void);
+#include "run-03-readpartial.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65003"
+#endif
+
+struct data {
+ int state;
+ size_t bytes;
+ char buf[4];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+ d->bytes = sizeof(d->buf);
+
+ io_set_finish(io_new_conn(fd,
+ io_read_partial(d->buf, &d->bytes, io_close_cb, d)),
+ finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+static void write_to_socket(const char *str, const struct addrinfo *addrinfo)
+{
+ int fd, i;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ for (i = 0; i < strlen(str); i++) {
+ if (write(fd, str + i, 1) != 1)
+ break;
+ }
+ close(fd);
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(22);
+ d->state = 0;
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ write_to_socket("hellothere", addrinfo);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(0);
+ }
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+ ok1(d->bytes > 0);
+ ok1(d->bytes <= sizeof(d->buf));
+ ok1(memcmp(d->buf, "hellothere", d->bytes) == 0);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ write_to_socket("hi", addrinfo);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(0);
+ }
+ d->state = 0;
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+ ok1(d->bytes > 0);
+ ok1(d->bytes <= strlen("hi"));
+ ok1(memcmp(d->buf, "hi", d->bytes) == 0);
+
+ freeaddrinfo(addrinfo);
+ free(d);
+ io_close_listener(l);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64004"
+#define main real_main
+int real_main(void);
+#include "run-04-writepartial.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65004"
+#endif
+
+struct data {
+ int state;
+ size_t bytes;
+ char *buf;
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+ io_set_finish(io_new_conn(fd,
+ io_write_partial(d->buf, &d->bytes, io_close_cb, d)),
+ finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+static void read_from_socket(const char *str, const struct addrinfo *addrinfo)
+{
+ int fd;
+ char buf[100];
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ if (read(fd, buf, strlen(str)) != strlen(str))
+ exit(3);
+ if (memcmp(buf, str, strlen(str)) != 0)
+ exit(4);
+ close(fd);
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(11);
+ d->state = 0;
+ d->bytes = 1024*1024;
+ d->buf = malloc(d->bytes);
+ memset(d->buf, 'a', d->bytes);
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ read_from_socket("aaaaaa", addrinfo);
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ exit(0);
+ }
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+ ok1(d->bytes > 0);
+ ok1(d->bytes <= 1024*1024);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ io_close_listener(l);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64005"
+#define main real_main
+int real_main(void);
+#include "run-05-write.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65005"
+#endif
+
+struct data {
+ int state;
+ size_t bytes;
+ char *buf;
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+ io_set_finish(io_new_conn(fd, io_write(d->buf, d->bytes,
+ io_close_cb, d)),
+ finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo)
+{
+ int fd, done, r;
+ char buf[100];
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+
+ for (done = 0; done < bytes; done += r) {
+ r = read(fd, buf, sizeof(buf));
+ if (r < 0)
+ exit(3);
+ done += r;
+ }
+ close(fd);
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(9);
+ d->state = 0;
+ d->bytes = 1024*1024;
+ d->buf = malloc(d->bytes);
+ memset(d->buf, 'a', d->bytes);
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ read_from_socket(d->bytes, addrinfo);
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ exit(0);
+ }
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ io_close_listener(l);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64006"
+#define main real_main
+int real_main(void);
+#include "run-06-idle.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#ifndef PORT
+#define PORT "65006"
+#endif
+
+static struct io_conn *idler;
+
+struct data {
+ int state;
+ char buf[4];
+};
+
+static struct io_plan read_done(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 2 || d->state == 3);
+ d->state++;
+ return io_close();
+}
+
+static void finish_waker(struct io_conn *conn, struct data *d)
+{
+ io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d));
+ ok1(d->state == 1);
+ d->state++;
+}
+
+static void finish_idle(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 3);
+ d->state++;
+ io_break(d, io_idle());
+}
+
+static struct io_plan never(struct io_conn *conn, void *arg)
+{
+ abort();
+}
+
+static void init_conn(int fd, struct data *d)
+{
+ int fd2;
+
+ ok1(d->state == 0);
+ d->state++;
+ idler = io_new_conn(fd, io_idle());
+ io_set_finish(idler, finish_idle, d);
+
+ /* This will wake us up, as read will fail. */
+ fd2 = open("/dev/null", O_RDONLY);
+ ok1(fd2 >= 0);
+ io_set_finish(io_new_conn(fd2, io_read(idler, 1, never, NULL)),
+ finish_waker, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(13);
+ d->state = 0;
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ int i;
+
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ for (i = 0; i < strlen("hellothere"); i++) {
+ if (write(fd, "hellothere" + i, 1) != 1)
+ break;
+ }
+ close(fd);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+
+ ok1(io_loop() == d);
+ ok1(d->state == 4);
+ ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+ free(d);
+ io_close_listener(l);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64007"
+#define main real_main
+int real_main(void);
+#include "run-07-break.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65007"
+#endif
+
+struct data {
+ int state;
+ char buf[4];
+};
+
+static struct io_plan read_done(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ return io_close();
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 2);
+ d->state++;
+}
+
+static void init_conn(int fd, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+
+ io_set_finish(io_new_conn(fd,
+ io_break(d,
+ io_read(d->buf, sizeof(d->buf), read_done, d))),
+ finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(13);
+ d->state = 0;
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ int i;
+
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ for (i = 0; i < strlen("hellothere"); i++) {
+ if (write(fd, "hellothere" + i, 1) != 1)
+ break;
+ }
+ close(fd);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+ ok1(io_loop() == d);
+ ok1(d->state == 1);
+ io_close_listener(l);
+
+ ok1(io_loop() == NULL);
+ ok1(d->state == 3);
+ ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+ free(d);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define main real_main
+int real_main(void);
+#include "run-08-hangup-on-idle.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+static int fds2[2];
+
+static struct io_plan timeout_wakeup(struct io_conn *conn, char *buf)
+{
+ /* This kills the dummy connection. */
+ close(fds2[1]);
+ return io_read(buf, 16, io_close_cb, NULL);
+}
+
+int main(void)
+{
+ int fds[2];
+ struct io_conn *conn;
+ char buf[16];
+
+ plan_tests(4);
+
+ ok1(pipe(fds) == 0);
+
+ /* Write then close. */
+ io_new_conn(fds[1], io_write("hello there world", 16,
+ io_close_cb, NULL));
+ conn = io_new_conn(fds[0], io_idle());
+
+ /* To avoid assert(num_waiting) */
+ ok1(pipe(fds2) == 0);
+ io_new_conn(fds2[0], io_read(buf, 16, io_close_cb, NULL));
+
+ /* After half a second, it will read. */
+ io_timeout(conn, time_from_msec(500), timeout_wakeup, buf);
+
+ ok1(io_loop() == NULL);
+ ok1(memcmp(buf, "hello there world", 16) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define main real_main
+int real_main(void);
+#include "run-08-read-after-hangup.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <signal.h>
+
+static char inbuf[8];
+
+static struct io_plan wake_it(struct io_conn *conn, struct io_conn *reader)
+{
+ io_wake(reader, io_read(inbuf, 8, io_close_cb, NULL));
+ return io_close();
+}
+
+int main(void)
+{
+ int fds[2];
+ struct io_conn *conn;
+
+ plan_tests(3);
+
+ ok1(pipe(fds) == 0);
+ conn = io_new_conn(fds[0], io_idle());
+ io_new_conn(fds[1], io_write("EASYTEST", 8, wake_it, conn));
+
+ ok1(io_loop() == NULL);
+ ok1(memcmp(inbuf, "EASYTEST", sizeof(inbuf)) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64009"
+#define main real_main
+int real_main(void);
+#include "run-09-connect.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65009"
+#endif
+
+static struct io_listener *l;
+
+struct data {
+ int state;
+ char buf[10];
+};
+
+static struct io_plan closer(struct io_conn *conn, struct data *d)
+{
+ d->state++;
+ return io_close();
+}
+
+static struct io_plan connected(struct io_conn *conn, struct data *d2)
+{
+ ok1(d2->state == 0);
+ d2->state++;
+ return io_read(d2->buf, sizeof(d2->buf), closer, d2);
+}
+
+static void init_conn(int fd, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+ io_new_conn(fd, io_write(d->buf, sizeof(d->buf), closer, d));
+ io_close_listener(l);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d)), *d2 = malloc(sizeof(*d2));
+ struct addrinfo *addrinfo;
+ int fd;
+
+ /* This is how many tests you plan to run */
+ plan_tests(8);
+ d->state = 0;
+ memset(d->buf, 'a', sizeof(d->buf));
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, d);
+ ok1(l);
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ d2->state = 0;
+ ok1(io_new_conn(fd, io_connect(fd, addrinfo, connected, d2)));
+
+ ok1(io_loop() == NULL);
+ ok1(d->state == 2);
+ ok1(d2->state == 2);
+
+ freeaddrinfo(addrinfo);
+ free(d);
+ free(d2);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64010"
+#define main real_main
+int real_main(void);
+#include "run-10-many.c"
+#undef main
+/* We stack overflow if we debug all of them! */
+static bool debug_one(struct io_conn *conn)
+{
+ return conn == buf[1].reader;
+}
+int main(void) { io_debug_conn = debug_one; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#define NUM 100
+#define NUM_ITERS 1000
+
+struct buffer {
+ int iters;
+ struct io_conn *reader, *writer;
+ char buf[32];
+};
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
+
+static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
+{
+ assert(conn == buf->reader);
+
+ if (buf->iters == NUM_ITERS)
+ return io_close();
+
+ /* You write. */
+ io_wake(buf->writer,
+ io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
+
+ /* I'll wait until you wake me. */
+ return io_idle();
+}
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
+{
+ assert(conn == buf->writer);
+ /* You read. */
+ io_wake(buf->reader,
+ io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
+
+ if (++buf->iters == NUM_ITERS)
+ return io_close();
+
+ /* I'll wait until you tell me to write. */
+ return io_idle();
+}
+
+static struct buffer buf[NUM];
+
+int main(void)
+{
+ unsigned int i;
+ int fds[2], last_read, last_write;
+
+ plan_tests(5 + NUM);
+
+ ok1(pipe(fds) == 0);
+ last_read = fds[0];
+ last_write = fds[1];
+
+ for (i = 1; i < NUM; i++) {
+ if (pipe(fds) < 0)
+ break;
+ memset(buf[i].buf, i, sizeof(buf[i].buf));
+ sprintf(buf[i].buf, "%i-%i", i, i);
+
+ /* Wait for writer to tell us to read. */
+ buf[i].reader = io_new_conn(last_read, io_idle());
+ if (!buf[i].reader)
+ break;
+ buf[i].writer = io_new_conn(fds[1],
+ io_write(&buf[i].buf,
+ sizeof(buf[i].buf),
+ poke_reader, &buf[i]));
+ if (!buf[i].writer)
+ break;
+ last_read = fds[0];
+ }
+ if (!ok1(i == NUM))
+ exit(exit_status());
+
+ /* Last one completes the cirle. */
+ i = 0;
+ sprintf(buf[i].buf, "%i-%i", i, i);
+ buf[i].reader = io_new_conn(last_read, io_idle());
+ ok1(buf[i].reader);
+ buf[i].writer = io_new_conn(last_write,
+ io_write(&buf[i].buf, sizeof(buf[i].buf),
+ poke_reader, &buf[i]));
+ ok1(buf[i].writer);
+
+ /* They should eventually exit */
+ ok1(io_loop() == NULL);
+
+ for (i = 0; i < NUM; i++) {
+ char b[sizeof(buf[0].buf)];
+ memset(b, i, sizeof(b));
+ sprintf(b, "%i-%i", i, i);
+ ok1(memcmp(b, buf[(i + NUM_ITERS) % NUM].buf, sizeof(b)) == 0);
+ }
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64012"
+#define main real_main
+int real_main(void);
+#include "run-12-bidir.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65012"
+#endif
+
+struct data {
+ struct io_listener *l;
+ int state;
+ char buf[4];
+ char wbuf[32];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ d->state++;
+}
+
+static struct io_plan write_done(struct io_conn *conn, struct data *d)
+{
+ d->state++;
+ return io_close();
+}
+
+static void init_conn(int fd, struct data *d)
+{
+ struct io_conn *conn;
+
+ ok1(d->state == 0);
+ d->state++;
+
+ io_close_listener(d->l);
+
+ memset(d->wbuf, 7, sizeof(d->wbuf));
+
+ conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close_cb, d));
+ io_set_finish(conn, finish_ok, d);
+ conn = io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d));
+ ok1(conn);
+ io_set_finish(conn, finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(10);
+ d->state = 0;
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ d->l = io_new_listener(fd, init_conn, d);
+ ok1(d->l);
+ fflush(stdout);
+ if (!fork()) {
+ int i;
+ char buf[32];
+
+ io_close_listener(d->l);
+ free(d);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ for (i = 0; i < 32; i++) {
+ if (read(fd, buf+i, 1) != 1)
+ break;
+ }
+ for (i = 0; i < strlen("hellothere"); i++) {
+ if (write(fd, "hellothere" + i, 1) != 1)
+ break;
+ }
+ close(fd);
+ freeaddrinfo(addrinfo);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+ ok1(io_loop() == NULL);
+ ok1(d->state == 4);
+ ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+ free(d);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64013"
+#define main real_main
+int real_main(void);
+#include "run-13-all-idle.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <signal.h>
+
+int main(void)
+{
+ int status;
+
+ plan_tests(3);
+
+ if (fork() == 0) {
+ int fds[2];
+
+ ok1(pipe(fds) == 0);
+ io_new_conn(fds[0], io_idle());
+ io_loop();
+ exit(1);
+ }
+
+ ok1(wait(&status) != -1);
+ ok1(WIFSIGNALED(status));
+ ok1(WTERMSIG(status) == SIGABRT);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64015"
+#define main real_main
+int real_main(void);
+#include "run-15-timeout.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#ifndef PORT
+#define PORT "65015"
+#endif
+
+struct data {
+ int state;
+ int timeout_usec;
+ bool timed_out;
+ char buf[4];
+};
+
+
+static struct io_plan no_timeout(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ return io_close();
+}
+
+static struct io_plan timeout(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ d->timed_out = true;
+ return io_close();
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 2);
+ d->state++;
+ io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+ struct io_conn *conn;
+
+ ok1(d->state == 0);
+ d->state++;
+
+ conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d));
+ io_set_finish(conn, finish_ok, d);
+ io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(20);
+ d->state = 0;
+ d->timed_out = false;
+ d->timeout_usec = 100000;
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, d);
+ ok1(l);
+ fflush(stdout);
+
+ if (!fork()) {
+ int i;
+
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ usleep(500000);
+ for (i = 0; i < strlen("hellothere"); i++) {
+ if (write(fd, "hellothere" + i, 1) != 1)
+ break;
+ }
+ close(fd);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(i);
+ }
+ ok1(io_loop() == d);
+ ok1(d->state == 3);
+ ok1(d->timed_out == true);
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) < sizeof(d->buf));
+
+ /* This one shouldn't time out. */
+ d->state = 0;
+ d->timed_out = false;
+ d->timeout_usec = 500000;
+ fflush(stdout);
+
+ if (!fork()) {
+ int i;
+
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ usleep(100000);
+ for (i = 0; i < strlen("hellothere"); i++) {
+ if (write(fd, "hellothere" + i, 1) != 1)
+ break;
+ }
+ close(fd);
+ freeaddrinfo(addrinfo);
+ free(d);
+ exit(i);
+ }
+ ok1(io_loop() == d);
+ ok1(d->state == 3);
+ ok1(d->timed_out == false);
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) >= sizeof(d->buf));
+
+ io_close_listener(l);
+ freeaddrinfo(addrinfo);
+ free(d);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64017"
+#define main real_main
+int real_main(void);
+#include "run-17-homemade-io.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65017"
+#endif
+
+struct packet {
+ int state;
+ size_t len;
+ void *contents;
+};
+
+static void finish_ok(struct io_conn *conn, struct packet *pkt)
+{
+ ok1(pkt->state == 3);
+ pkt->state++;
+ io_break(pkt, io_idle());
+}
+
+static int do_read_packet(int fd, struct io_plan *plan)
+{
+ struct packet *pkt = plan->u.ptr_len.p;
+ char *dest;
+ ssize_t ret;
+ size_t off, totlen;
+
+ /* Reading len? */
+ if (plan->u.ptr_len.len < sizeof(size_t)) {
+ ok1(pkt->state == 1);
+ pkt->state++;
+ dest = (char *)&pkt->len;
+ off = plan->u.ptr_len.len;
+ totlen = sizeof(pkt->len);
+ } else {
+ ok1(pkt->state == 2);
+ pkt->state++;
+ if (pkt->len == 0)
+ return io_debug_io(1);
+ if (!pkt->contents && !(pkt->contents = malloc(pkt->len)))
+ goto fail;
+ else {
+ dest = pkt->contents;
+ off = plan->u.ptr_len.len - sizeof(pkt->len);
+ totlen = pkt->len;
+ }
+ }
+
+ ret = read(fd, dest + off, totlen - off);
+ if (ret <= 0)
+ goto fail;
+
+ plan->u.ptr_len.len += ret;
+
+ /* Finished? */
+ return io_debug_io(plan->u.ptr_len.len >= sizeof(pkt->len)
+ && plan->u.ptr_len.len == pkt->len + sizeof(pkt->len));
+
+fail:
+ free(pkt->contents);
+ return io_debug_io(-1);
+}
+
+static struct io_plan io_read_packet(struct packet *pkt,
+ struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg)
+{
+ struct io_plan plan;
+
+ assert(cb);
+ pkt->contents = NULL;
+ plan.u.ptr_len.p = pkt;
+ plan.u.ptr_len.len = 0;
+ plan.io = do_read_packet;
+ plan.next = cb;
+ plan.next_arg = arg;
+ plan.pollflag = POLLIN;
+
+ return plan;
+}
+
+static void init_conn(int fd, struct packet *pkt)
+{
+ ok1(pkt->state == 0);
+ pkt->state++;
+
+ io_set_finish(io_new_conn(fd, io_read_packet(pkt, io_close_cb, pkt)),
+ finish_ok, pkt);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct packet *pkt = malloc(sizeof(*pkt));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(13);
+ pkt->state = 0;
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, pkt);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ struct {
+ size_t len;
+ char data[8];
+ } data;
+
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+
+ data.len = sizeof(data.data);
+ memcpy(data.data, "hithere!", sizeof(data.data));
+ if (write(fd, &data, sizeof(data)) != sizeof(data))
+ exit(3);
+
+ close(fd);
+ freeaddrinfo(addrinfo);
+ free(pkt);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+ ok1(io_loop() == pkt);
+ ok1(pkt->state == 4);
+ ok1(pkt->len == 8);
+ ok1(memcmp(pkt->contents, "hithere!", 8) == 0);
+ free(pkt->contents);
+ free(pkt);
+ io_close_listener(l);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#define DEBUG
+#define PORT "64018"
+#define main real_main
+int real_main(void);
+#include "run-18-errno.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65018"
+#endif
+
+static void finish_100(struct io_conn *conn, int *state)
+{
+ ok1(errno == 100);
+ ok1(*state == 1);
+ (*state)++;
+}
+
+static void finish_EBADF(struct io_conn *conn, int *state)
+{
+ ok1(errno == EBADF);
+ ok1(*state == 3);
+ (*state)++;
+ io_break(state + 1, io_close());
+}
+
+static void init_conn(int fd, int *state)
+{
+ if (*state == 0) {
+ (*state)++;
+ errno = 100;
+ io_set_finish(io_new_conn(fd, io_close()), finish_100, state);
+ } else {
+ ok1(*state == 2);
+ (*state)++;
+ close(fd);
+ errno = 0;
+ io_set_finish(io_new_conn(fd, io_read(state, 0,
+ io_close_cb, NULL)),
+ finish_EBADF, state);
+ }
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ int state = 0;
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd;
+
+ /* This is how many tests you plan to run */
+ plan_tests(12);
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, &state);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ close(fd);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(3);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(4);
+ close(fd);
+ freeaddrinfo(addrinfo);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+ ok1(io_loop() == &state + 1);
+ ok1(state == 4);
+ io_close_listener(l);
+ ok1(wait(&state));
+ ok1(WIFEXITED(state));
+ ok1(WEXITSTATUS(state) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}