ccan/io: initialize connection with an explicit I/O plan.
authorRusty Russell <rusty@rustcorp.com.au>
Mon, 14 Oct 2013 10:58:36 +0000 (21:28 +1030)
committerRusty Russell <rusty@rustcorp.com.au>
Mon, 14 Oct 2013 10:58:36 +0000 (21:28 +1030)
Rather than going via a callback, which tends to just set up I/O, do
any setup before the call to io_new_conn(), then pass it the io_plan
directly.

The patch shows how much this simplifies our test code.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
18 files changed:
ccan/io/_info
ccan/io/benchmarks/run-different-speed.c
ccan/io/benchmarks/run-length-prefix.c
ccan/io/benchmarks/run-loop.c
ccan/io/io.c
ccan/io/io.h
ccan/io/poll.c
ccan/io/test/run-01-start-finish.c
ccan/io/test/run-02-read.c
ccan/io/test/run-03-readpartial.c
ccan/io/test/run-04-writepartial.c
ccan/io/test/run-05-write.c
ccan/io/test/run-06-idle.c
ccan/io/test/run-07-break.c
ccan/io/test/run-10-many.c
ccan/io/test/run-12-bidir.c
ccan/io/test/run-13-all-idle.c
ccan/io/test/run-15-timeout.c

index 150b93e5be9c5d0a7ebd0c7cdf30724246628dde..0dfb43c60ef52d837d0a00dd7eb95c37a77af7c2 100644 (file)
  * // This reads from stdin.
  * static struct io_plan wake_writer(struct io_conn *, struct stdin_buffer *);
  * // This writes the stdin buffer to the child.
- * static struct io_plan write_to_child(struct io_conn *c,
- *                                     struct stdin_buffer *b);
- * static struct io_plan read_stdin(struct io_conn *c, struct stdin_buffer *b)
- * {
- *     assert(c == b->reader);
- *     b->len = sizeof(b->inbuf);
- *     return io_read_partial(b->inbuf, &b->len, wake_writer, b);
- * }
+ * static struct io_plan wake_reader(struct io_conn *, struct stdin_buffer *);
  *
  * static struct io_plan wake_writer(struct io_conn *c, struct stdin_buffer *b)
  * {
  *     assert(c == b->reader);
- *     io_wake(b->writer, write_to_child, b);
+ *     io_wake(b->writer, io_write(b->inbuf, b->len, wake_reader, b));
  *     return io_idle();
  * }
  *
  * static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
  * {
  *     assert(c == b->reader);
- *     io_wake(b->writer, write_to_child, b);
+ *     io_wake(b->writer, io_close(b->writer, NULL));
  *     b->reader = NULL;
  * }
  *
  * static struct io_plan wake_reader(struct io_conn *c, struct stdin_buffer *b)
  * {
  *     assert(c == b->writer);
- *     io_wake(b->reader, read_stdin, b);
- *     return io_idle();
- * }
- *
- * static struct io_plan write_to_child(struct io_conn *conn,
- *                                     struct stdin_buffer *b)
- * {
- *     assert(conn == b->writer);
- *     if (!b->reader)
- *             return io_close(conn, NULL);
- *     return io_write(b->inbuf, b->len, wake_reader, b);
- * }
- *
- * static struct io_plan start_writer(struct io_conn *conn,
- *                                   struct stdin_buffer *b)
- * {
- *     assert(conn == b->writer);
+ *     if (!b->reader)
+ *             return io_close(c, NULL);
+ *     b->len = sizeof(b->inbuf);
+ *     io_wake(b->reader, io_read_partial(b->inbuf, &b->len, wake_writer, b));
  *     return io_idle();
  * }
  *
  * {
  *     b->off += b->rlen;
  *
- *     if (b->off == b->max) {
- *             if (b->max == 0)
- *                     b->max = 128;
- *             else if (b->max >= 1024*1024)
- *                     b->max += 1024*1024;
- *             else
- *                     b->max *= 2;
- *             b->buf = realloc(b->buf, b->max);
- *     }
+ *     if (b->off == b->max)
+ *             b->buf = realloc(b->buf, b->max *= 2);
  *
  *     b->rlen = b->max - b->off;
  *     return io_read_partial(b->buf + b->off, &b->rlen, read_from_child, b);
  * int main(int argc, char *argv[])
  * {
  *     int tochild[2], fromchild[2];
- *     struct buffer out = { 0, 0, 0, NULL };
+ *     struct buffer out;
  *     struct stdin_buffer sbuf;
  *     int status;
  *     size_t off;
  *     ssize_t ret;
+ *     struct io_conn *from_child;
  *
  *     if (argc == 1)
  *             errx(1, "Usage: runner <cmdline>...");
  *     close(fromchild[1]);
  *     signal(SIGPIPE, SIG_IGN);
  *
- *     sbuf.reader = io_new_conn(STDIN_FILENO, read_stdin, reader_exit, &sbuf);
- *     sbuf.writer = io_new_conn(tochild[1], start_writer, fail_child_write,
+ *     sbuf.len = sizeof(sbuf.inbuf);
+ *     sbuf.reader = io_new_conn(STDIN_FILENO,
+ *                               io_read_partial(sbuf.inbuf, &sbuf.len,
+ *                                               wake_writer, &sbuf),
+ *                               reader_exit, &sbuf);
+ *     sbuf.writer = io_new_conn(tochild[1], io_idle(), fail_child_write,
  *                               &sbuf);
- *     if (!sbuf.reader || !sbuf.writer
- *         || !io_new_conn(fromchild[0], read_from_child, NULL, &out))
+ *
+ *     out.max = 128;
+ *     out.off = 0;
+ *     out.rlen = 128;
+ *     out.buf = malloc(out.max);
+ *     from_child = io_new_conn(fromchild[0],
+ *                              io_read_partial(out.buf, &out.rlen,
+ *                                              read_from_child, &out),
+ *                              NULL, NULL);
+ *     if (!sbuf.reader || !sbuf.writer || !from_child)
  *             err(1, "Allocating connections");
  *
  *     io_loop();
index 537a67bbc01b5de934f63db04612e27d276f9415..7a95fd8426bbbf8020f93db8177c98654da41c07 100644 (file)
@@ -28,7 +28,7 @@ struct client {
 static struct io_plan write_reply(struct io_conn *conn, struct client *client);
 static struct io_plan read_request(struct io_conn *conn, struct client *client)
 {
-       return io_read(conn, client->request_buffer, REQUEST_SIZE,
+       return io_read(client->request_buffer, REQUEST_SIZE,
                       write_reply, client);
 }
 
@@ -41,7 +41,7 @@ static struct io_plan write_complete(struct io_conn *conn, struct client *client
 
 static struct io_plan write_reply(struct io_conn *conn, struct client *client)
 {
-       return io_write(conn, client->reply_buffer, REPLY_SIZE,
+       return io_write(client->reply_buffer, REPLY_SIZE,
                        write_complete, client);
 }
 
@@ -108,12 +108,7 @@ static void sigalarm(int sig)
 
 static struct io_plan do_timeout(struct io_conn *conn, char *buf)
 {
-       return io_break(conn, buf, NULL, NULL);
-}
-
-static struct io_plan do_timeout_read(struct io_conn *conn, char *buf)
-{
-       return io_read(conn, buf, 1, do_timeout, buf);
+       return io_break(buf, io_idle());
 }
 
 int main(int argc, char *argv[])
@@ -155,11 +150,14 @@ int main(int argc, char *argv[])
                        if (ret < 0)
                                err(1, "Accepting fd");
                        /* For efficiency, we share client structure */
-                       io_new_conn(ret, read_request, NULL, &client);
+                       io_new_conn(ret,
+                                   io_read(client.request_buffer, REQUEST_SIZE,
+                                           write_reply, &client),
+                                   NULL, NULL);
                }
        }
 
-       io_new_conn(timeout[0], do_timeout_read, NULL, &buf);
+       io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf), NULL, NULL);
 
        close(wake[0]);
        for (i = 0; i < NUM_CHILDREN; i++)
index 2ed9c72933f18d9a15a01f2c711752d0a103e61e..5eb33acf87401ae1eeefa18d6b78d5194a01f931 100644 (file)
@@ -29,26 +29,25 @@ static struct io_plan write_reply(struct io_conn *conn, struct client *client);
 static struct io_plan read_body(struct io_conn *conn, struct client *client)
 {
        assert(client->len <= REQUEST_MAX);
-       return io_read(conn, client->request_buffer, client->len,
+       return io_read(client->request_buffer, client->len,
                       write_reply, client);
 }
 
-static struct io_plan read_header(struct io_conn *conn, struct client *client)
+static struct io_plan io_read_header(struct client *client)
 {
-       return io_read(conn, &client->len, sizeof(client->len),
-                      read_body, client);
+       return io_read(&client->len, sizeof(client->len), read_body, client);
 }
 
 /* once we're done, loop again. */
 static struct io_plan write_complete(struct io_conn *conn, struct client *client)
 {
        completed++;
-       return read_header(conn, client);
+       return io_read_header(client);
 }
 
 static struct io_plan write_reply(struct io_conn *conn, struct client *client)
 {
-       return io_write(conn, &client->len, sizeof(client->len),
+       return io_write(&client->len, sizeof(client->len),
                        write_complete, client);
 }
 
@@ -114,12 +113,7 @@ static void sigalarm(int sig)
 
 static struct io_plan do_timeout(struct io_conn *conn, char *buf)
 {
-       return io_break(conn, buf, NULL, NULL);
-}
-
-static struct io_plan do_timeout_read(struct io_conn *conn, char *buf)
-{
-       return io_read(conn, buf, 1, do_timeout, buf);
+       return io_break(buf, io_idle());
 }
 
 int main(int argc, char *argv[])
@@ -163,11 +157,11 @@ int main(int argc, char *argv[])
                                err(1, "Accepting fd");
                        /* For efficiency, we share buffer */
                        client->request_buffer = buffer;
-                       io_new_conn(ret, read_header, NULL, client);
+                       io_new_conn(ret, io_read_header(client), NULL, NULL);
                }
        }
 
-       io_new_conn(timeout[0], do_timeout_read, NULL, &buf);
+       io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf), NULL, NULL);
 
        close(wake[0]);
        for (i = 0; i < NUM_CHILDREN; i++)
index a2898e13e255f6b76e647df1d10aa603a1dfac6e..b0e6b02c804fec579ed2d6019b3d266f993aeb7e 100644 (file)
@@ -16,23 +16,8 @@ struct buffer {
        char buf[32];
 };
 
-static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf);
 static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
 
-static struct io_plan do_read(struct io_conn *conn, struct buffer *buf)
-{
-       assert(conn == buf->reader);
-
-       return io_read(conn, &buf->buf, sizeof(buf->buf), poke_writer, buf);
-}
-
-static struct io_plan do_write(struct io_conn *conn, struct buffer *buf)
-{
-       assert(conn == buf->writer);
-
-       return io_write(conn, &buf->buf, sizeof(buf->buf), poke_reader, buf);
-}
-
 static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
 {
        assert(conn == buf->reader);
@@ -41,31 +26,25 @@ static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
                return io_close(conn, NULL);
 
        /* You write. */
-       io_wake(buf->writer, do_write, buf);
+       io_wake(buf->writer,
+               io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
 
        /* I'll wait until you wake me. */
-       return io_idle(conn);
+       return io_idle();
 }
 
 static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
 {
        assert(conn == buf->writer);
        /* You read. */
-       io_wake(buf->reader, do_read, buf);
+       io_wake(buf->reader,
+               io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
 
        if (++buf->iters == NUM_ITERS)
                return io_close(conn, NULL);
 
        /* I'll wait until you tell me to write. */
-       return io_idle(conn);
-}
-
-static struct io_plan reader(struct io_conn *conn, struct buffer *buf)
-{
-       assert(conn == buf->reader);
-
-       /* Wait for writer to tell us to read. */
-       return io_idle(conn);
+       return io_idle();
 }
 
 int main(void)
@@ -87,10 +66,14 @@ int main(void)
                memset(buf[i].buf, i, sizeof(buf[i].buf));
                sprintf(buf[i].buf, "%i-%i", i, i);
 
-               buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+               buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL);
                if (!buf[i].reader)
                        err(1, "Creating reader %i", i);
-               buf[i].writer = io_new_conn(fds[1], do_write, NULL, &buf[i]);
+               buf[i].writer = io_new_conn(fds[1],
+                                           io_write(&buf[i].buf,
+                                                    sizeof(buf[i].buf),
+                                                    poke_reader, &buf[i]),
+                                           NULL, NULL);
                if (!buf[i].writer)
                        err(1, "Creating writer %i", i);
                last_read = fds[0];
@@ -100,10 +83,13 @@ int main(void)
        i = 0;
        buf[i].iters = 0;
        sprintf(buf[i].buf, "%i-%i", i, i);
-       buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+       buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL);
        if (!buf[i].reader)
                err(1, "Creating reader %i", i);
-       buf[i].writer = io_new_conn(last_write, do_write, NULL, &buf[i]);
+       buf[i].writer = io_new_conn(last_write, io_write(&buf[i].buf,
+                                                        sizeof(buf[i].buf),
+                                                        poke_reader, &buf[i]),
+                                   NULL, NULL);
        if (!buf[i].writer)
                err(1, "Creating writer %i", i);
 
index 5ea90ea7b03437cd7a7a5e69210a6fc85a8a1363..bd45630a8c9168406a7088f1f12bcfed7f524f54 100644 (file)
@@ -40,7 +40,7 @@ void io_close_listener(struct io_listener *l)
 }
 
 struct io_conn *io_new_conn_(int fd,
-                            struct io_plan (*start)(struct io_conn *, void *),
+                            struct io_plan plan,
                             void (*finish)(struct io_conn *, void *),
                             void *arg)
 {
@@ -51,11 +51,9 @@ struct io_conn *io_new_conn_(int fd,
 
        conn->fd.listener = false;
        conn->fd.fd = fd;
-       conn->plan.next = start;
+       conn->plan = plan;
        conn->finish = finish;
-       conn->finish_arg = conn->plan.next_arg = arg;
-       conn->plan.pollflag = 0;
-       conn->plan.state = IO_NEXT;
+       conn->finish_arg = arg;
        conn->duplex = NULL;
        conn->timeout = NULL;
        if (!add_conn(conn)) {
@@ -66,9 +64,9 @@ struct io_conn *io_new_conn_(int fd,
 }
 
 struct io_conn *io_duplex_(struct io_conn *old,
-                            struct io_plan (*start)(struct io_conn *, void *),
-                            void (*finish)(struct io_conn *, void *),
-                            void *arg)
+                          struct io_plan plan,
+                          void (*finish)(struct io_conn *, void *),
+                          void *arg)
 {
        struct io_conn *conn;
 
@@ -80,12 +78,10 @@ struct io_conn *io_duplex_(struct io_conn *old,
 
        conn->fd.listener = false;
        conn->fd.fd = old->fd.fd;
-       conn->plan.next = start;
-       conn->finish = finish;
-       conn->finish_arg = conn->plan.next_arg = arg;
-       conn->plan.pollflag = 0;
-       conn->plan.state = IO_NEXT;
+       conn->plan = plan;
        conn->duplex = old;
+       conn->finish = finish;
+       conn->finish_arg = arg;
        conn->timeout = NULL;
        if (!add_duplex(conn)) {
                free(conn);
@@ -239,18 +235,14 @@ struct io_plan io_idle(void)
        return plan;
 }
 
-void io_wake_(struct io_conn *conn,
-             struct io_plan (*fn)(struct io_conn *, void *), void *arg)
+void io_wake(struct io_conn *conn, struct io_plan plan)
 
 {
        /* It might have finished, but we haven't called its finish() yet. */
        if (conn->plan.state == IO_FINISHED)
                return;
        assert(conn->plan.state == IO_IDLE);
-       conn->plan.next = fn;
-       conn->plan.next_arg = arg;
-       conn->plan.pollflag = 0;
-       conn->plan.state = IO_NEXT;
+       conn->plan = plan;
        backend_wakeup(conn);
 }
 
@@ -289,18 +281,9 @@ struct io_plan io_close(struct io_conn *conn, void *arg)
 }
 
 /* Exit the loop, returning this (non-NULL) arg. */
-struct io_plan io_break_(void *ret,
-                        struct io_plan (*fn)(struct io_conn *, void *),
-                        void *arg)
+struct io_plan io_break(void *ret, struct io_plan plan)
 {
-       struct io_plan plan;
-
        io_loop_return = ret;
 
-       plan.state = IO_NEXT;
-       plan.pollflag = 0;
-       plan.next = fn;
-       plan.next_arg = arg;
-
        return plan;
 }
index 9ba46b67b497a93af6c429954a4e3013ba2e4742..24176f557ec6d4e6a60f5480029b1b5a22d1b5d0 100644 (file)
@@ -64,29 +64,23 @@ struct io_plan {
 /**
  * io_new_conn - create a new connection.
  * @fd: the file descriptor.
- * @start: the first function to call.
+ * @plan: the first I/O function.
  * @finish: the function to call when it's closed or fails.
- * @arg: the argument to both @start and @finish.
+ * @arg: the argument to @finish.
  *
- * This creates a connection which owns @fd.  @start will be called on the
- * next return to io_loop(), and @finish will be called when an I/O operation
+ * This creates a connection which owns @fd.  @plan will be called on the
+ * next io_loop(), and @finish will be called when an I/O operation
  * fails, or you call io_close() on the connection.
  *
- * The @start function must call one of the io queueing functions
- * (eg. io_read, io_write) and return the next function to call once
- * that is done using io_next().  The alternative is to call io_close().
- *
  * Returns NULL on error (and sets errno).
  */
-#define io_new_conn(fd, start, finish, arg)                            \
-       io_new_conn_((fd),                                              \
-                    typesafe_cb_preargs(struct io_plan, void *,        \
-                                        (start), (arg), struct io_conn *), \
+#define io_new_conn(fd, plan, finish, arg)                             \
+       io_new_conn_((fd), (plan),                                      \
                     typesafe_cb_preargs(void, void *, (finish), (arg), \
                                         struct io_conn *),             \
                     (arg))
 struct io_conn *io_new_conn_(int fd,
-                            struct io_plan (*start)(struct io_conn *, void *),
+                            struct io_plan plan,
                             void (*finish)(struct io_conn *, void *),
                             void *arg);
 
@@ -243,9 +237,9 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
 /**
  * io_duplex - split an fd into two connections.
  * @conn: a connection.
- * @start: the first function to call.
+ * @plan: the first I/O function to call.
  * @finish: the function to call when it's closed or fails.
- * @arg: the argument to both @start and @finish.
+ * @arg: the argument to @finish.
  *
  * Sometimes you want to be able to simultaneously read and write on a
  * single fd, but io forces a linear call sequence.  The solition is
@@ -254,56 +248,38 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
  *
  * You must io_close() both of them to close the fd.
  */
-#define io_duplex(conn, start, finish, arg)                            \
-       io_duplex_((conn),                                              \
-                  typesafe_cb_preargs(struct io_plan, void *,          \
-                                      (start), (arg), struct io_conn *), \
+#define io_duplex(conn, plan, finish, arg)                             \
+       io_duplex_((conn), (plan),                                      \
                   typesafe_cb_preargs(void, void *, (finish), (arg),   \
                                       struct io_conn *),               \
                   (arg))
 
 struct io_conn *io_duplex_(struct io_conn *conn,
-                          struct io_plan (*start)(struct io_conn *, void *),
+                          struct io_plan plan,
                           void (*finish)(struct io_conn *, void *),
                           void *arg);
 
 /**
- * io_wake - wake up and idle connection.
+ * io_wake - wake up an idle connection.
  * @conn: an idle connection.
- * @fn: the next function to call once queued IO is complete.
- * @arg: the argument to @next.
+ * @plan: the next I/O function for @conn.
  *
- * This makes @conn run its @next function the next time around the
- * io_loop().
+ * This makes @conn do I/O the next time around the io_loop().
  */
-#define io_wake(conn, fn, arg)                                         \
-       io_wake_((conn),                                                \
-                typesafe_cb_preargs(struct io_plan, void *,            \
-                                    (fn), (arg), struct io_conn *),    \
-                (arg))
-void io_wake_(struct io_conn *conn,
-             struct io_plan (*fn)(struct io_conn *, void *), void *arg);
+void io_wake(struct io_conn *conn, struct io_plan plan);
 
 /**
  * io_break - return from io_loop()
  * @ret: non-NULL value to return from io_loop().
- * @cb: function to call once on return
- * @arg: @cb argument
+ * @plan: I/O to perform on return (if any)
  *
  * This breaks out of the io_loop.  As soon as the current @next
  * function returns, any io_closed()'d connections will have their
  * finish callbacks called, then io_loop() with return with @ret.
  *
- * If io_loop() is called again, then @cb will be called.
+ * If io_loop() is called again, then @plan will be carried out.
  */
-#define io_break(ret, fn, arg)                                         \
-       io_break_((ret),                                                \
-                 typesafe_cb_preargs(struct io_plan, void *,           \
-                                     (fn), (arg), struct io_conn *),   \
-                 (arg))
-struct io_plan io_break_(void *ret,
-                        struct io_plan (*fn)(struct io_conn *, void *),
-                        void *arg);
+struct io_plan io_break(void *ret, struct io_plan plan);
 
 /* FIXME: io_recvfrom/io_sendto */
 
index c30bea1743444b7a024e1c7b20811207c2959f2e..f9221d1ff6d91fc398563def5d24d33ec626bdcf 100644 (file)
@@ -37,6 +37,9 @@ static bool add_fd(struct fd *fd, short events)
        fds[num_fds] = fd;
        fd->backend_info = num_fds;
        num_fds++;
+       if (events)
+               num_waiting++;
+
        return true;
 }
 
@@ -46,6 +49,8 @@ static void del_fd(struct fd *fd)
 
        assert(n != -1);
        assert(n < num_fds);
+       if (pollfds[n].events)
+               num_waiting--;
        if (n != num_fds - 1) {
                /* Move last one over us. */
                pollfds[n] = pollfds[num_fds-1];
@@ -69,22 +74,49 @@ bool add_listener(struct io_listener *l)
 {
        if (!add_fd(&l->fd, POLLIN))
                return false;
-       num_waiting++;
        return true;
 }
 
+static void adjust_counts(enum io_state state)
+{
+       if (state == IO_NEXT)
+               num_next++;
+       else if (state == IO_FINISHED)
+               num_finished++;
+}
+
+static void update_pollevents(struct io_conn *conn)
+{
+       struct pollfd *pfd = &pollfds[conn->fd.backend_info];
+
+       if (pfd->events)
+               num_waiting--;
+
+       pfd->events = conn->plan.pollflag;
+       if (conn->duplex) {
+               int mask = conn->duplex->plan.pollflag;
+               /* You can't *both* read/write. */
+               assert(!mask || pfd->events != mask);
+               pfd->events |= mask;
+       }
+       if (pfd->events)
+               num_waiting++;
+
+       adjust_counts(conn->plan.state);
+}
+
 bool add_conn(struct io_conn *c)
 {
-       if (!add_fd(&c->fd, 0))
+       if (!add_fd(&c->fd, c->plan.pollflag))
                return false;
-       num_next++;
+       adjust_counts(c->plan.state);
        return true;
 }
 
 bool add_duplex(struct io_conn *c)
 {
        c->fd.backend_info = c->duplex->fd.backend_info;
-       num_next++;
+       update_pollevents(c);
        return true;
 }
 
@@ -114,32 +146,13 @@ void del_listener(struct io_listener *l)
 
 static void backend_set_state(struct io_conn *conn, struct io_plan plan)
 {
-       struct pollfd *pfd = &pollfds[conn->fd.backend_info];
-
-       if (pfd->events)
-               num_waiting--;
-
-       pfd->events = plan.pollflag;
-       if (conn->duplex) {
-               int mask = conn->duplex->plan.pollflag;
-               /* You can't *both* read/write. */
-               assert(!mask || pfd->events != mask);
-               pfd->events |= mask;
-       }
-       if (pfd->events)
-               num_waiting++;
-
-       if (plan.state == IO_NEXT)
-               num_next++;
-       else if (plan.state == IO_FINISHED)
-               num_finished++;
-
        conn->plan = plan;
+       update_pollevents(conn);
 }
 
 void backend_wakeup(struct io_conn *conn)
 {
-       num_next++;
+       update_pollevents(conn);
 }
 
 static void accept_conn(struct io_listener *l)
index 53c07f24b4a07b0267953966b2a37af6242ea158..b5114abeafcceabe3b26d88581336e63b89ee582 100644 (file)
@@ -6,23 +6,18 @@
 #include <sys/wait.h>
 #include <stdio.h>
 
-static struct io_plan start_ok(struct io_conn *conn, int *state)
-{
-       ok1(*state == 0);
-       (*state)++;
-       return io_close(conn, NULL);
-}
-
 static void finish_ok(struct io_conn *conn, int *state)
 {
        ok1(*state == 1);
        (*state)++;
-       io_break(state + 1, NULL, NULL);
+       io_break(state + 1, io_idle());
 }
 
 static void init_conn(int fd, int *state)
 {
-       if (!io_new_conn(fd, start_ok, finish_ok, state))
+       ok1(*state == 0);
+       (*state)++;
+       if (!io_new_conn(fd, io_close(NULL, NULL), finish_ok, state))
                abort();
 }
 
index abc66463c74eaee691e536d4c2052002f13c3bd3..9abcd9647b6fb7295e998ec9437f6ef4543ff29c 100644 (file)
@@ -11,23 +11,20 @@ struct data {
        char buf[4];
 };
 
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
-{
-       ok1(d->state == 0);
-       d->state++;
-       return io_read(d->buf, sizeof(d->buf), io_close, d);
-}
-
 static void finish_ok(struct io_conn *conn, struct data *d)
 {
        ok1(d->state == 1);
        d->state++;
-       io_break(d, NULL, NULL);
+       io_break(d, io_idle());
 }
 
 static void init_conn(int fd, struct data *d)
 {
-       if (!io_new_conn(fd, start_ok, finish_ok, d))
+       ok1(d->state == 0);
+       d->state++;
+
+       if (!io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close, d),
+                        finish_ok, d))
                abort();
 }
 
index a7d0ae3161aa674cc7b22a23ec16c1148cb1cca8..a24be7e025847b3e36bf4cf865cc11113e3a47f8 100644 (file)
@@ -12,24 +12,21 @@ struct data {
        char buf[4];
 };
 
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
-{
-       ok1(d->state == 0);
-       d->state++;
-       d->bytes = sizeof(d->buf);
-       return io_read_partial(d->buf, &d->bytes, io_close, d);
-}
-
 static void finish_ok(struct io_conn *conn, struct data *d)
 {
        ok1(d->state == 1);
        d->state++;
-       io_break(d, NULL, NULL);
+       io_break(d, io_idle());
 }
 
 static void init_conn(int fd, struct data *d)
 {
-       if (!io_new_conn(fd, start_ok, finish_ok, d))
+       ok1(d->state == 0);
+       d->state++;
+       d->bytes = sizeof(d->buf);
+
+       if (!io_new_conn(fd, io_read_partial(d->buf, &d->bytes, io_close, d),
+                        finish_ok, d))
                abort();
 }
 
index 11ac22a7c2fb79c125171fca873f2375f55eb257..12a21ddcd712e08fc793150380f8a8e0fefc17a4 100644 (file)
@@ -12,23 +12,19 @@ struct data {
        char *buf;
 };
 
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
-{
-       ok1(d->state == 0);
-       d->state++;
-       return io_write_partial(d->buf, &d->bytes, io_close, d);
-}
-
 static void finish_ok(struct io_conn *conn, struct data *d)
 {
        ok1(d->state == 1);
        d->state++;
-       io_break(d, NULL, NULL);
+       io_break(d, io_idle());
 }
 
 static void init_conn(int fd, struct data *d)
 {
-       if (!io_new_conn(fd, start_ok, finish_ok, d))
+       ok1(d->state == 0);
+       d->state++;
+       if (!io_new_conn(fd, io_write_partial(d->buf, &d->bytes, io_close, d),
+                        finish_ok, d))
                abort();
 }
 
index 3f0a8f98cb78b27dd75d8bcb1df8a5ef54d81f0c..e878c33054776e00406488e00a04e17aeecf8e3a 100644 (file)
@@ -12,23 +12,19 @@ struct data {
        char *buf;
 };
 
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
-{
-       ok1(d->state == 0);
-       d->state++;
-       return io_write(d->buf, d->bytes, io_close, d);
-}
-
 static void finish_ok(struct io_conn *conn, struct data *d)
 {
        ok1(d->state == 1);
        d->state++;
-       io_break(d, NULL, NULL);
+       io_break(d, io_idle());
 }
 
 static void init_conn(int fd, struct data *d)
 {
-       if (!io_new_conn(fd, start_ok, finish_ok, d))
+       ok1(d->state == 0);
+       d->state++;
+       if (!io_new_conn(fd, io_write(d->buf, d->bytes, io_close, d),
+                        finish_ok, d))
                abort();
 }
 
index 314293a5f0f2b5d6a145c8b38359056bf4a6bc0a..fb681a08fcf60d920141c2044335b6fc01570c26 100644 (file)
@@ -16,55 +16,39 @@ struct data {
        char buf[4];
 };
 
-static struct io_plan plan_read(struct io_conn *conn, struct data *d)
+static struct io_plan read_done(struct io_conn *conn, struct data *d)
 {
        ok1(d->state == 2 || d->state == 3);
        d->state++;
-       return io_read(d->buf, sizeof(d->buf), io_close, d);
+       return io_close(conn, NULL);
 }
 
-static struct io_plan start_waker(struct io_conn *conn, struct data *d)
+static void finish_waker(struct io_conn *conn, struct data *d)
 {
+       io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d));
        ok1(d->state == 1);
        d->state++;
-
-       io_wake(idler, plan_read, d);
-       return io_close(conn, NULL);
 }
 
-static void finish_waker(struct io_conn *conn, struct data *d)
+static void finish_idle(struct io_conn *conn, struct data *d)
 {
-       ok1(d->state == 2 || d->state == 3);
+       ok1(d->state == 3);
        d->state++;
+       io_break(d, io_idle());
 }
 
-static struct io_plan start_idle(struct io_conn *conn, struct data *d)
+static void init_conn(int fd, struct data *d)
 {
-       int fd;
+       int fd2;
 
        ok1(d->state == 0);
        d->state++;
-       idler = conn;
+       idler = io_new_conn(fd, io_idle(), finish_idle, d);
 
-       /* This will wake us up. */
-       fd = open("/dev/null", O_RDONLY);
-       ok1(fd >= 0);
-       ok1(io_new_conn(fd, start_waker, finish_waker, d));
-
-       return io_idle();
-}
-
-static void finish_idle(struct io_conn *conn, struct data *d)
-{
-       ok1(d->state == 4);
-       d->state++;
-       io_break(d, NULL, NULL);
-}
-
-static void init_conn(int fd, struct data *d)
-{
-       if (!io_new_conn(fd, start_idle, finish_idle, d))
-               abort();
+       /* This will wake us up, as read will fail. */
+       fd2 = open("/dev/null", O_RDONLY);
+       ok1(fd2 >= 0);
+       ok1(io_new_conn(fd2, io_read(idler, 1, NULL, NULL), finish_waker, d));
 }
 
 static int make_listen_fd(const char *port, struct addrinfo **info)
@@ -107,7 +91,7 @@ int main(void)
        int fd, status;
 
        /* This is how many tests you plan to run */
-       plan_tests(15);
+       plan_tests(14);
        d->state = 0;
        fd = make_listen_fd("65006", &addrinfo);
        ok1(fd >= 0);
@@ -137,7 +121,7 @@ int main(void)
        freeaddrinfo(addrinfo);
 
        ok1(io_loop() == d);
-       ok1(d->state == 5);
+       ok1(d->state == 4);
        ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
        free(d);
        io_close_listener(l);
index e8db388904638da90f913f9823bae60220337caa..5a6e9701b52ca3d2c9567c096ebe02bd3c83458c 100644 (file)
@@ -11,18 +11,11 @@ struct data {
        char buf[4];
 };
 
-static struct io_plan plan_read(struct io_conn *conn, struct data *d)
+static struct io_plan read_done(struct io_conn *conn, struct data *d)
 {
        ok1(d->state == 1);
        d->state++;
-       return io_read(d->buf, sizeof(d->buf), io_close, d);
-}
-
-static struct io_plan start_break(struct io_conn *conn, struct data *d)
-{
-       ok1(d->state == 0);
-       d->state++;
-       return io_break(d, plan_read, d);
+       return io_close(conn, NULL);
 }
 
 static void finish_ok(struct io_conn *conn, struct data *d)
@@ -33,7 +26,13 @@ static void finish_ok(struct io_conn *conn, struct data *d)
 
 static void init_conn(int fd, struct data *d)
 {
-       if (!io_new_conn(fd, start_break, finish_ok, d))
+       ok1(d->state == 0);
+       d->state++;
+
+       if (!io_new_conn(fd,
+                        io_break(d,
+                                 io_read(d->buf, sizeof(d->buf), read_done, d)),
+                        finish_ok, d))
                abort();
 }
 
index 95a716e2263d26bc65e91d82f818d6a29c276509..1c39635b62ff674e6b961402d49af94881696f2b 100644 (file)
@@ -15,23 +15,8 @@ struct buffer {
        char buf[32];
 };
 
-static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf);
 static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
 
-static struct io_plan plan_read(struct io_conn *conn, struct buffer *buf)
-{
-       assert(conn == buf->reader);
-
-       return io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf);
-}
-
-static struct io_plan plan_write(struct io_conn *conn, struct buffer *buf)
-{
-       assert(conn == buf->writer);
-
-       return io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf);
-}
-
 static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
 {
        assert(conn == buf->reader);
@@ -40,7 +25,8 @@ static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
                return io_close(conn, NULL);
 
        /* You write. */
-       io_wake(buf->writer, plan_write, buf);
+       io_wake(buf->writer,
+               io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
 
        /* I'll wait until you wake me. */
        return io_idle();
@@ -50,7 +36,8 @@ static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
 {
        assert(conn == buf->writer);
        /* You read. */
-       io_wake(buf->reader, plan_read, buf);
+       io_wake(buf->reader,
+               io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
 
        if (++buf->iters == NUM_ITERS)
                return io_close(conn, NULL);
@@ -59,14 +46,6 @@ static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
        return io_idle();
 }
 
-static struct io_plan reader(struct io_conn *conn, struct buffer *buf)
-{
-       assert(conn == buf->reader);
-
-       /* Wait for writer to tell us to read. */
-       return io_idle();
-}
-
 static struct buffer buf[NUM];
 
 int main(void)
@@ -86,10 +65,15 @@ int main(void)
                memset(buf[i].buf, i, sizeof(buf[i].buf));
                sprintf(buf[i].buf, "%i-%i", i, i);
 
-               buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+               /* Wait for writer to tell us to read. */
+               buf[i].reader = io_new_conn(last_read, io_idle(), NULL, &buf[i]);
                if (!buf[i].reader)
                        break;
-               buf[i].writer = io_new_conn(fds[1], plan_write, NULL, &buf[i]);
+               buf[i].writer = io_new_conn(fds[1],
+                                           io_write(&buf[i].buf,
+                                                    sizeof(buf[i].buf),
+                                                    poke_reader, &buf[i]),
+                                           NULL, &buf[i]);
                if (!buf[i].writer)
                        break;
                last_read = fds[0];
@@ -100,9 +84,12 @@ int main(void)
        /* Last one completes the cirle. */
        i = 0;
        sprintf(buf[i].buf, "%i-%i", i, i);
-       buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+       buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL);
        ok1(buf[i].reader);
-       buf[i].writer = io_new_conn(last_write, plan_write, NULL, &buf[i]);
+       buf[i].writer = io_new_conn(last_write,
+                                   io_write(&buf[i].buf, sizeof(buf[i].buf),
+                                            poke_reader, &buf[i]),
+                                   NULL, NULL);
        ok1(buf[i].writer);
 
        /* They should eventually exit */
index 3534bc3138f92df5aa6bb57e51849aac26ac6f3b..6dc94c8a4a57a1baee29d281b7e04355a7cd7752 100644 (file)
@@ -18,28 +18,27 @@ static void finish_ok(struct io_conn *conn, struct data *d)
        d->state++;
 }
 
-static struct io_plan write_out(struct io_conn *conn, struct data *d)
+static struct io_plan write_done(struct io_conn *conn, struct data *d)
 {
        d->state++;
-       return io_write(d->wbuf, sizeof(d->wbuf), io_close, d);
+       return io_close(conn, NULL);
 }
 
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
+static void init_conn(int fd, struct data *d)
 {
+       struct io_conn *conn;
+
        ok1(d->state == 0);
        d->state++;
 
        io_close_listener(d->l);
 
        memset(d->wbuf, 7, sizeof(d->wbuf));
-       ok1(io_duplex(conn, write_out, finish_ok, d));
-       return io_read(d->buf, sizeof(d->buf), io_close, d);
-}
 
-static void init_conn(int fd, struct data *d)
-{
-       if (!io_new_conn(fd, start_ok, finish_ok, d))
-               abort();
+       conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close, d),
+                          finish_ok, d);
+       ok1(io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d),
+                     finish_ok, d));
 }
 
 static int make_listen_fd(const char *port, struct addrinfo **info)
index f83fb31e46a245eeabc854ea7afe9d9deb44d0eb..7ad6bfe4d0ae14d230f2cefbc970641f8423dfc8 100644 (file)
@@ -7,11 +7,6 @@
 #include <stdio.h>
 #include <signal.h>
 
-static struct io_plan start(struct io_conn *conn, void *unused)
-{
-       return io_idle();
-}
-
 int main(void)
 {
        int status;
@@ -22,7 +17,7 @@ int main(void)
                int fds[2];
 
                ok1(pipe(fds) == 0);
-               io_new_conn(fds[0], start, NULL, NULL);
+               io_new_conn(fds[0], io_idle(), NULL, NULL);
                io_loop();
                exit(1);
        }
index 0ff3fc8a54a85c207c3552a1fc73528b60069cc6..f8ddc6a57ed6e21ae492a497476a1cd65c2a6bfa 100644 (file)
@@ -30,25 +30,23 @@ static struct io_plan timeout(struct io_conn *conn, struct data *d)
        return io_close(conn, d);
 }
 
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
-{
-       ok1(d->state == 0);
-       d->state++;
-       io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
-       return io_read(d->buf, sizeof(d->buf), no_timeout, d);
-}
-
 static void finish_ok(struct io_conn *conn, struct data *d)
 {
        ok1(d->state == 2);
        d->state++;
-       io_break(d, NULL, NULL);
+       io_break(d, io_idle());
 }
 
 static void init_conn(int fd, struct data *d)
 {
-       if (!io_new_conn(fd, start_ok, finish_ok, d))
-               abort();
+       struct io_conn *conn;
+
+       ok1(d->state == 0);
+       d->state++;
+
+       conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d),
+                          finish_ok, d);
+       io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
 }
 
 static int make_listen_fd(const char *port, struct addrinfo **info)