Cleaner model for I/O, with cost of complexity if you really want bidir.
Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
--- /dev/null
+Simple:
+ step1(conn): read(conn), then step2
+ step2(conn): write(conn), then close
+
+Pass-through:
+ step1(conn): read(conn), then step2
+ step2(conn): write(otherconn), then step1
+
+Pass-through-and-connect:
+ step1(conn): read(conn), then step2
+ step2(conn): connect(otherconn), then step3
+ step3(conn): write(otherconn), then step1
+
+Chatroom:
+ step1(conn): read(conn), then step2
+ step2(conn): for c in allcons: write(c). goto step1
+
+Simple:
+
+void event(struct io_event *done)
+{
+ char *buf = done->priv;
+ struct io_event *e;
+
+ e = queue_read(done, done->conn, buf, 100);
+ e = queue_write(e, done->conn, buf, 100);
+ queue_close(e, done->conn);
+}
+
+Pass-through:
+struct passthru {
+ char buf[100];
+ struct conn *rconn, *wconn;
+};
+
+void event(struct io_event *done)
+{
+ struct passthru *p = done->priv;
+ struct io_event *e;
+
+ e = queue_read(done, p->rconn, p->buf, 100);
+ e = queue_write(e, p->wconn, buf, 100);
+ queue_event(e, event);
+}
+
+Chatroom:
+struct list_head clients;
+
+struct buffer {
+ char buf[100];
+ unsigned int ref;
+};
+
+struct client {
+ struct list_node list;
+ struct connection *conn;
+ struct buffer *rbuf, *wbuf;
+};
+
+void broadcast(struct io_event *done)
+{
+ struct client *i, *c = done->conn->priv;
+ struct io_event *e;
+
+ list_for_each(&clients, i, list) {
+ e = queue_write(done, i->conn, c->buf->buf, 100);
+ e->priv = c->buf;
+ c->buf->ref++;
+ queue_event(e, drop_ref);
+ }
+
+
+
+void event(struct io_event *done)
+{
+ struct client *c = done->conn->priv;
+ struct io_event *e;
+
+ assert(c->conn == done->conn);
+ c->buf = malloc(sizeof(*c->buf));
+ c->buf->ref = 0;
+ e = queue_read(done, c->conn, c->buf->buf, 100);
+ e = queue_event(e, broadcast);
+}
+
+
+ step1(conn): read(conn), then step2
+ step2(conn): for c in allcons: write(c). goto step1
};
enum io_state {
- NEXT, /* eg starting, woken from idle, return from io_break. */
+ /* These wait for something to input */
READ,
- WRITE,
READPART,
+
+ /* These wait for room to output */
+ WRITE,
WRITEPART,
+
+ NEXT, /* eg starting, woken from idle, return from io_break. */
IDLE,
FINISHED,
PROCESSING /* We expect them to change this now. */
struct io_conn {
struct fd fd;
+ struct io_conn *duplex;
+
enum io_state state;
union {
struct io_state_read read;
bool add_listener(struct io_listener *l);
bool add_conn(struct io_conn *c);
+bool add_duplex(struct io_conn *c);
void del_listener(struct io_listener *l);
void backend_set_state(struct io_conn *conn, struct io_op *op);
-struct io_op *do_writeable(struct io_conn *conn);
-struct io_op *do_readable(struct io_conn *conn);
+struct io_op *do_ready(struct io_conn *conn);
#endif /* CCAN_IO_BACKEND_H */
conn->fd.finish = finish;
conn->fd.finish_arg = conn->fd.next_arg = arg;
conn->state = NEXT;
+ conn->duplex = NULL;
if (!add_conn(conn)) {
free(conn);
return NULL;
return conn;
}
+struct io_conn *io_duplex_(struct io_conn *old,
+ struct io_op *(*start)(struct io_conn *, void *),
+ void (*finish)(struct io_conn *, void *),
+ void *arg)
+{
+ struct io_conn *conn;
+
+ assert(!old->duplex);
+
+ conn = malloc(sizeof(*conn));
+ if (!conn)
+ return NULL;
+
+ conn->fd.listener = false;
+ conn->fd.fd = old->fd.fd;
+ conn->fd.next = start;
+ conn->fd.finish = finish;
+ conn->fd.finish_arg = conn->fd.next_arg = arg;
+ conn->state = NEXT;
+ conn->duplex = old;
+ if (!add_duplex(conn)) {
+ free(conn);
+ return NULL;
+ }
+ old->duplex = conn;
+ return conn;
+}
+
/* Convenient token which only we can produce. */
static inline struct io_next *to_ionext(struct io_conn *conn)
{
return conn->fd.next(conn, conn->fd.next_arg);
}
-struct io_op *do_writeable(struct io_conn *conn)
+struct io_op *do_ready(struct io_conn *conn)
{
ssize_t ret;
bool finished;
*conn->u.writepart.lenp = ret;
finished = true;
break;
- default:
- /* Shouldn't happen. */
- abort();
- }
-
- if (finished)
- return do_next(conn);
- return to_ioop(conn->state);
-}
-
-struct io_op *do_readable(struct io_conn *conn)
-{
- ssize_t ret;
- bool finished;
-
- switch (conn->state) {
case READ:
ret = read(conn->fd.fd, conn->u.read.buf, conn->u.read.len);
if (ret <= 0)
*/
struct io_op *io_idle(struct io_conn *conn);
+/**
+ * io_duplex - split an fd into two connections.
+ * @conn: a connection.
+ * @start: the first function to call.
+ * @finish: the function to call when it's closed or fails.
+ * @arg: the argument to both @start and @finish.
+ *
+ * Sometimes you want to be able to simultaneously read and write on a
+ * single fd, but io forces a linear call sequence. The solition is
+ * to have two connections for the same fd, and use one for read
+ * operations and one for write.
+ *
+ * You must io_close() both of them to close the fd.
+ */
+#define io_duplex(conn, start, finish, arg) \
+ io_duplex_((conn), \
+ typesafe_cb_preargs(struct io_op *, void *, \
+ (start), (arg), struct io_conn *), \
+ typesafe_cb_preargs(void, void *, (finish), (arg), \
+ struct io_conn *), \
+ (arg))
+
+struct io_conn *io_duplex_(struct io_conn *conn,
+ struct io_op *(*start)(struct io_conn *, void *),
+ void (*finish)(struct io_conn *, void *),
+ void *arg);
+
/**
* io_wake - wake up and idle connection.
* @conn: an idle connection.
return true;
}
+bool add_duplex(struct io_conn *c)
+{
+ c->fd.backend_info = c->duplex->fd.backend_info;
+ num_next++;
+ return true;
+}
+
static void del_conn(struct io_conn *conn)
{
if (conn->fd.finish)
conn->fd.finish(conn, conn->fd.finish_arg);
- del_fd(&conn->fd);
+ if (conn->duplex) {
+ /* In case fds[] pointed to the other one. */
+ fds[conn->fd.backend_info] = &conn->duplex->fd;
+ conn->duplex->duplex = NULL;
+ } else
+ del_fd(&conn->fd);
if (conn->state == FINISHED)
num_finished--;
else if (conn->state == NEXT)
del_fd(&l->fd);
}
-void backend_set_state(struct io_conn *conn, struct io_op *op)
+static int pollmask(enum io_state state)
{
- enum io_state state = from_ioop(op);
- struct pollfd *pfd = &pollfds[conn->fd.backend_info];
-
switch (state) {
case READ:
case READPART:
- pfd->events = POLLIN;
- break;
+ return POLLIN;
case WRITE:
case WRITEPART:
- pfd->events = POLLOUT;
- break;
- case IDLE:
- pfd->events = 0;
- break;
- case NEXT:
- num_next++;
- break;
- case FINISHED:
- num_finished++;
- break;
+ return POLLOUT;
default:
- abort();
+ return 0;
}
+}
+
+void backend_set_state(struct io_conn *conn, struct io_op *op)
+{
+ enum io_state state = from_ioop(op);
+ struct pollfd *pfd = &pollfds[conn->fd.backend_info];
+
+ pfd->events = pollmask(state);
+ if (conn->duplex) {
+ int mask = pollmask(conn->duplex->state);
+ /* You can't *both* read/write. */
+ assert(!mask || pfd->events != mask);
+ pfd->events |= mask;
+ }
+
+ if (state == NEXT)
+ num_next++;
+ else if (state == FINISHED)
+ num_finished++;
+
conn->state = state;
}
unsigned int i;
for (i = 0; !io_loop_return && i < num_fds; i++) {
- struct io_conn *c;
+ struct io_conn *c, *duplex;
if (!num_finished) {
if (finished_only || num_next == 0)
if (fds[i]->listener)
continue;
c = (void *)fds[i];
- if (c->state == FINISHED) {
- del_conn(c);
- free(c);
- i--;
- } else if (!finished_only && c->state == NEXT) {
- backend_set_state(c, c->fd.next(c, c->fd.next_arg));
- num_next--;
+ for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
+ if (c->state == FINISHED) {
+ del_conn(c);
+ free(c);
+ i--;
+ } else if (!finished_only && c->state == NEXT) {
+ backend_set_state(c,
+ c->fd.next(c,
+ c->fd.next_arg));
+ num_next--;
+ }
}
}
}
+static void ready(struct io_conn *c)
+{
+ backend_set_state(c, do_ready(c));
+}
+
/* This is the main loop. */
void *io_loop(void)
{
for (i = 0; i < num_fds && !io_loop_return; i++) {
struct io_conn *c = (void *)fds[i];
- if (pollfds[i].revents & POLLOUT)
- backend_set_state(c, do_writeable(c));
- else if (pollfds[i].revents & POLLIN) {
- if (fds[i]->listener)
+ int events = pollfds[i].revents;
+
+ if (fds[i]->listener) {
+ if (events & POLLIN)
accept_conn((void *)c);
- else
- backend_set_state(c, do_readable(c));
- } else if (pollfds[i].revents & POLLHUP) {
+ } else if (events & (POLLIN|POLLOUT)) {
+ if (c->duplex) {
+ int mask = pollmask(c->duplex->state);
+ if (events & mask) {
+ ready(c->duplex);
+ events &= ~mask;
+ if (!(events&(POLLIN|POLLOUT)))
+ continue;
+ }
+ }
+ ready(c);
+ } else if (events & POLLHUP) {
backend_set_state(c, io_close(c, NULL));
+ if (c->duplex)
+ backend_set_state(c->duplex,
+ io_close(c->duplex,
+ NULL));
}
+
}
}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+ struct io_listener *l;
+ int state;
+ char buf[4];
+ char wbuf[32];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ d->state++;
+}
+
+static struct io_op *write_out(struct io_conn *conn, struct data *d)
+{
+ d->state++;
+ return io_write(d->wbuf, sizeof(d->wbuf), io_next(conn, io_close, d));
+}
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+
+ io_close_listener(d->l);
+
+ memset(d->wbuf, 7, sizeof(d->wbuf));
+ ok1(io_duplex(conn, write_out, finish_ok, d));
+ return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(10);
+ d->state = 0;
+ fd = make_listen_fd("65012", &addrinfo);
+ ok1(fd >= 0);
+ d->l = io_new_listener(fd, start_ok, finish_ok, d);
+ ok1(d->l);
+ fflush(stdout);
+ if (!fork()) {
+ int i;
+ char buf[32];
+
+ io_close_listener(d->l);
+ free(d);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+ for (i = 0; i < 32; i++) {
+ if (read(fd, buf+i, 1) != 1)
+ break;
+ }
+ for (i = 0; i < strlen("hellothere"); i++) {
+ if (write(fd, "hellothere" + i, 1) != 1)
+ break;
+ }
+ close(fd);
+ freeaddrinfo(addrinfo);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+ ok1(io_loop() == NULL);
+ ok1(d->state == 4);
+ ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+ free(d);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}