ccan/io: io_duplex.
authorRusty Russell <rusty@rustcorp.com.au>
Mon, 14 Oct 2013 10:58:20 +0000 (21:28 +1030)
committerRusty Russell <rusty@rustcorp.com.au>
Mon, 14 Oct 2013 10:58:20 +0000 (21:28 +1030)
Cleaner model for I/O, with cost of complexity if you really want bidir.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ccan/io/SCENARIOS [new file with mode: 0644]
ccan/io/backend.h
ccan/io/io.c
ccan/io/io.h
ccan/io/poll.c
ccan/io/test/run-12-bidir.c [new file with mode: 0644]

diff --git a/ccan/io/SCENARIOS b/ccan/io/SCENARIOS
new file mode 100644 (file)
index 0000000..01bf47a
--- /dev/null
@@ -0,0 +1,88 @@
+Simple:
+       step1(conn): read(conn), then step2
+       step2(conn): write(conn), then close
+
+Pass-through:
+       step1(conn): read(conn), then step2
+       step2(conn): write(otherconn), then step1
+
+Pass-through-and-connect:
+       step1(conn): read(conn), then step2
+       step2(conn): connect(otherconn), then step3
+       step3(conn): write(otherconn), then step1
+
+Chatroom:
+       step1(conn): read(conn), then step2
+       step2(conn): for c in allcons: write(c).  goto step1
+
+Simple:
+
+void event(struct io_event *done)
+{
+       char *buf = done->priv;
+       struct io_event *e;
+
+       e = queue_read(done, done->conn, buf, 100);
+       e = queue_write(e, done->conn, buf, 100);
+       queue_close(e, done->conn);
+}
+
+Pass-through:
+struct passthru {
+       char buf[100];
+       struct conn *rconn, *wconn;
+};
+
+void event(struct io_event *done)
+{
+       struct passthru *p = done->priv;
+       struct io_event *e;
+
+       e = queue_read(done, p->rconn, p->buf, 100);
+       e = queue_write(e, p->wconn, buf, 100);
+       queue_event(e, event);
+}
+
+Chatroom:
+struct list_head clients;
+
+struct buffer {
+       char buf[100];
+       unsigned int ref;
+};
+
+struct client {
+       struct list_node list;
+       struct connection *conn;
+       struct buffer *rbuf, *wbuf;
+};
+
+void broadcast(struct io_event *done)
+{
+       struct client *i, *c = done->conn->priv;
+       struct io_event *e;
+
+       list_for_each(&clients, i, list) {
+               e = queue_write(done, i->conn, c->buf->buf, 100);
+               e->priv = c->buf;
+               c->buf->ref++;
+               queue_event(e, drop_ref);
+       }
+
+
+
+void event(struct io_event *done)
+{
+       struct client *c = done->conn->priv;
+       struct io_event *e;
+
+       assert(c->conn == done->conn);
+       c->buf = malloc(sizeof(*c->buf));
+       c->buf->ref = 0;
+       e = queue_read(done, c->conn, c->buf->buf, 100);
+       e = queue_event(e, broadcast);
+}
+
+
+       step1(conn): read(conn), then step2
+       step2(conn): for c in allcons: write(c).  goto step1
index 06427eb6fa37ea53aabfb47725211e8a448756b8..2b41fa72864ca1df0ac16b2afa023e2cfcd1856c 100644 (file)
@@ -22,11 +22,15 @@ struct io_listener {
 };
 
 enum io_state {
-       NEXT, /* eg starting, woken from idle, return from io_break. */
+       /* These wait for something to input */
        READ,
-       WRITE,
        READPART,
+
+       /* These wait for room to output */
+       WRITE,
        WRITEPART,
+
+       NEXT, /* eg starting, woken from idle, return from io_break. */
        IDLE,
        FINISHED,
        PROCESSING /* We expect them to change this now. */
@@ -61,6 +65,8 @@ struct io_state_writepart {
 struct io_conn {
        struct fd fd;
 
+       struct io_conn *duplex;
+
        enum io_state state;
        union {
                struct io_state_read read;
@@ -74,9 +80,9 @@ extern void *io_loop_return;
 
 bool add_listener(struct io_listener *l);
 bool add_conn(struct io_conn *c);
+bool add_duplex(struct io_conn *c);
 void del_listener(struct io_listener *l);
 void backend_set_state(struct io_conn *conn, struct io_op *op);
 
-struct io_op *do_writeable(struct io_conn *conn);
-struct io_op *do_readable(struct io_conn *conn);
+struct io_op *do_ready(struct io_conn *conn);
 #endif /* CCAN_IO_BACKEND_H */
index 325db7872153952895b60d5b6dc4c99f1cb9fe0a..6efc68ee732c15ab99d00104e2f6adb6ef0b5f71 100644 (file)
@@ -57,6 +57,7 @@ struct io_conn *io_new_conn_(int fd,
        conn->fd.finish = finish;
        conn->fd.finish_arg = conn->fd.next_arg = arg;
        conn->state = NEXT;
+       conn->duplex = NULL;
        if (!add_conn(conn)) {
                free(conn);
                return NULL;
@@ -64,6 +65,34 @@ struct io_conn *io_new_conn_(int fd,
        return conn;
 }
 
+struct io_conn *io_duplex_(struct io_conn *old,
+                            struct io_op *(*start)(struct io_conn *, void *),
+                            void (*finish)(struct io_conn *, void *),
+                            void *arg)
+{
+       struct io_conn *conn;
+
+       assert(!old->duplex);
+
+       conn = malloc(sizeof(*conn));
+       if (!conn)
+               return NULL;
+
+       conn->fd.listener = false;
+       conn->fd.fd = old->fd.fd;
+       conn->fd.next = start;
+       conn->fd.finish = finish;
+       conn->fd.finish_arg = conn->fd.next_arg = arg;
+       conn->state = NEXT;
+       conn->duplex = old;
+       if (!add_duplex(conn)) {
+               free(conn);
+               return NULL;
+       }
+       old->duplex = conn;
+       return conn;
+}
+
 /* Convenient token which only we can produce. */
 static inline struct io_next *to_ionext(struct io_conn *conn)
 {
@@ -149,7 +178,7 @@ static struct io_op *do_next(struct io_conn *conn)
        return conn->fd.next(conn, conn->fd.next_arg);
 }
 
-struct io_op *do_writeable(struct io_conn *conn)
+struct io_op *do_ready(struct io_conn *conn)
 {
        ssize_t ret;
        bool finished;
@@ -171,22 +200,6 @@ struct io_op *do_writeable(struct io_conn *conn)
                *conn->u.writepart.lenp = ret;
                finished = true;
                break;
-       default:
-               /* Shouldn't happen. */
-               abort();
-       }
-
-       if (finished)
-               return do_next(conn);
-       return to_ioop(conn->state);
-}
-
-struct io_op *do_readable(struct io_conn *conn)
-{
-       ssize_t ret;
-       bool finished;
-
-       switch (conn->state) {
        case READ:
                ret = read(conn->fd.fd, conn->u.read.buf, conn->u.read.len);
                if (ret <= 0)
index 49b6a25e83bba172e483e6c7749056744146ef52..5ca9731dc517226fb23a1b2620b23a79d749a809 100644 (file)
@@ -151,6 +151,33 @@ struct io_op *io_write_partial(const void *data, size_t *len,
  */
 struct io_op *io_idle(struct io_conn *conn);
 
+/**
+ * io_duplex - split an fd into two connections.
+ * @conn: a connection.
+ * @start: the first function to call.
+ * @finish: the function to call when it's closed or fails.
+ * @arg: the argument to both @start and @finish.
+ *
+ * Sometimes you want to be able to simultaneously read and write on a
+ * single fd, but io forces a linear call sequence.  The solition is
+ * to have two connections for the same fd, and use one for read
+ * operations and one for write.
+ *
+ * You must io_close() both of them to close the fd.
+ */
+#define io_duplex(conn, start, finish, arg)                            \
+       io_duplex_((conn),                                              \
+                  typesafe_cb_preargs(struct io_op *, void *,          \
+                                      (start), (arg), struct io_conn *), \
+                  typesafe_cb_preargs(void, void *, (finish), (arg),   \
+                                      struct io_conn *),               \
+                  (arg))
+
+struct io_conn *io_duplex_(struct io_conn *conn,
+                          struct io_op *(*start)(struct io_conn *, void *),
+                          void (*finish)(struct io_conn *, void *),
+                          void *arg);
+
 /**
  * io_wake - wake up and idle connection.
  * @conn: an idle connection.
index 070f6d842ae7f45bd4b0b529b6c31cd6c71d7b20..fdff271c64f26e09f060b9e52e468dcf80d97696 100644 (file)
@@ -76,11 +76,23 @@ bool add_conn(struct io_conn *c)
        return true;
 }
 
+bool add_duplex(struct io_conn *c)
+{
+       c->fd.backend_info = c->duplex->fd.backend_info;
+       num_next++;
+       return true;
+}
+
 static void del_conn(struct io_conn *conn)
 {
        if (conn->fd.finish)
                conn->fd.finish(conn, conn->fd.finish_arg);
-       del_fd(&conn->fd);
+       if (conn->duplex) {
+               /* In case fds[] pointed to the other one. */
+               fds[conn->fd.backend_info] = &conn->duplex->fd;
+               conn->duplex->duplex = NULL;
+       } else
+               del_fd(&conn->fd);
        if (conn->state == FINISHED)
                num_finished--;
        else if (conn->state == NEXT)
@@ -92,32 +104,38 @@ void del_listener(struct io_listener *l)
        del_fd(&l->fd);
 }
 
-void backend_set_state(struct io_conn *conn, struct io_op *op)
+static int pollmask(enum io_state state)
 {
-       enum io_state state = from_ioop(op);
-       struct pollfd *pfd = &pollfds[conn->fd.backend_info];
-
        switch (state) {
        case READ:
        case READPART:
-               pfd->events = POLLIN;
-               break;
+               return POLLIN;
        case WRITE:
        case WRITEPART:
-               pfd->events = POLLOUT;
-               break;
-       case IDLE:
-               pfd->events = 0;
-               break;
-       case NEXT:
-               num_next++;
-               break;
-       case FINISHED:
-               num_finished++;
-               break;
+               return POLLOUT;
        default:
-               abort();
+               return 0;
        }
+}
+
+void backend_set_state(struct io_conn *conn, struct io_op *op)
+{
+       enum io_state state = from_ioop(op);
+       struct pollfd *pfd = &pollfds[conn->fd.backend_info];
+
+       pfd->events = pollmask(state);
+       if (conn->duplex) {
+               int mask = pollmask(conn->duplex->state);
+               /* You can't *both* read/write. */
+               assert(!mask || pfd->events != mask);
+               pfd->events |= mask;
+       }
+
+       if (state == NEXT)
+               num_next++;
+       else if (state == FINISHED)
+               num_finished++;
+
        conn->state = state;
 }
 
@@ -142,7 +160,7 @@ static void finish_and_next(bool finished_only)
        unsigned int i;
 
        for (i = 0; !io_loop_return && i < num_fds; i++) {
-               struct io_conn *c;
+               struct io_conn *c, *duplex;
 
                if (!num_finished) {
                        if (finished_only || num_next == 0)
@@ -151,17 +169,26 @@ static void finish_and_next(bool finished_only)
                if (fds[i]->listener)
                        continue;
                c = (void *)fds[i];
-               if (c->state == FINISHED) {
-                       del_conn(c);
-                       free(c);
-                       i--;
-               } else if (!finished_only && c->state == NEXT) {
-                       backend_set_state(c, c->fd.next(c, c->fd.next_arg));
-                       num_next--;
+               for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
+                       if (c->state == FINISHED) {
+                               del_conn(c);
+                               free(c);
+                               i--;
+                       } else if (!finished_only && c->state == NEXT) {
+                               backend_set_state(c,
+                                                 c->fd.next(c,
+                                                            c->fd.next_arg));
+                               num_next--;
+                       }
                }
        }
 }
 
+static void ready(struct io_conn *c)
+{
+       backend_set_state(c, do_ready(c));
+}
+
 /* This is the main loop. */
 void *io_loop(void)
 {
@@ -185,16 +212,30 @@ void *io_loop(void)
 
                for (i = 0; i < num_fds && !io_loop_return; i++) {
                        struct io_conn *c = (void *)fds[i];
-                       if (pollfds[i].revents & POLLOUT)
-                               backend_set_state(c, do_writeable(c));
-                       else if (pollfds[i].revents & POLLIN) {
-                               if (fds[i]->listener)
+                       int events = pollfds[i].revents;
+
+                       if (fds[i]->listener) {
+                               if (events & POLLIN)
                                        accept_conn((void *)c);
-                               else
-                                       backend_set_state(c, do_readable(c));
-                       } else if (pollfds[i].revents & POLLHUP) {
+                       } else if (events & (POLLIN|POLLOUT)) {
+                               if (c->duplex) {
+                                       int mask = pollmask(c->duplex->state);
+                                       if (events & mask) {
+                                               ready(c->duplex);
+                                               events &= ~mask;
+                                               if (!(events&(POLLIN|POLLOUT)))
+                                                       continue;
+                                       }
+                               }
+                               ready(c);
+                       } else if (events & POLLHUP) {
                                backend_set_state(c, io_close(c, NULL));
+                               if (c->duplex)
+                                       backend_set_state(c->duplex,
+                                                         io_close(c->duplex,
+                                                                  NULL));
                        }
+
                }
        }
 
diff --git a/ccan/io/test/run-12-bidir.c b/ccan/io/test/run-12-bidir.c
new file mode 100644 (file)
index 0000000..f9cf4e5
--- /dev/null
@@ -0,0 +1,122 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+       struct io_listener *l;
+       int state;
+       char buf[4];
+       char wbuf[32];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       d->state++;
+}
+
+static struct io_op *write_out(struct io_conn *conn, struct data *d)
+{
+       d->state++;
+       return io_write(d->wbuf, sizeof(d->wbuf), io_next(conn, io_close, d));
+}
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+
+       io_close_listener(d->l);
+
+       memset(d->wbuf, 7, sizeof(d->wbuf));
+       ok1(io_duplex(conn, write_out, finish_ok, d));
+       return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(10);
+       d->state = 0;
+       fd = make_listen_fd("65012", &addrinfo);
+       ok1(fd >= 0);
+       d->l = io_new_listener(fd, start_ok, finish_ok, d);
+       ok1(d->l);
+       fflush(stdout);
+       if (!fork()) {
+               int i;
+               char buf[32];
+
+               io_close_listener(d->l);
+               free(d);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               for (i = 0; i < 32; i++) {
+                       if (read(fd, buf+i, 1) != 1)
+                               break;
+               }
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == NULL);
+       ok1(d->state == 4);
+       ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+       free(d);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}