io: change io_idle() to io_wait()
authorRusty Russell <rusty@rustcorp.com.au>
Thu, 10 Apr 2014 11:48:36 +0000 (21:18 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Thu, 24 Apr 2014 06:44:26 +0000 (16:14 +0930)
Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ccan/io/_info
ccan/io/backend.h
ccan/io/io.c
ccan/io/io.h
ccan/io/poll.c
ccan/io/test/run-06-idle.c
ccan/io/test/run-08-hangup-on-idle.c
ccan/io/test/run-08-read-after-hangup.c
ccan/io/test/run-10-many.c
ccan/io/test/run-13-all-idle.c
ccan/io/test/run-16-duplex-test.c

index 235e6ba3d281df8ee8f7576030ceabcc9779e533..8f77e361d40cc31d3df293e0993942ad30416005 100644 (file)
  * #include <signal.h>
  * #include <sys/types.h>
  * #include <sys/wait.h>
+ * #include <string.h>
  *
  * struct buffer {
- *     size_t max, off, rlen;
- *     char *buf;
+ *     bool finished;
+ *     size_t start, end, rlen, wlen;
+ *     char buf[4096];
  * };
  *
- * struct stdin_buffer {
- *     struct io_conn *reader, *writer;
- *     size_t len;
- *     char inbuf[4096];
- * };
- *
- * // This reads from stdin.
- * static struct io_plan wake_writer(struct io_conn *, struct stdin_buffer *);
- * // This writes the stdin buffer to the child.
- * static struct io_plan wake_reader(struct io_conn *, struct stdin_buffer *);
- *
- * static struct io_plan wake_writer(struct io_conn *c, struct stdin_buffer *b)
+ * static void finish(struct io_conn *c, struct buffer *b)
  * {
- *     assert(c == b->reader);
- *     io_wake(b->writer, io_write(b->inbuf, b->len, wake_reader, b));
- *     return io_idle();
+ *     // Mark us finished.
+ *     b->finished = true;
+ *     // Wake writer just in case it's asleep.
+ *     io_wake(b);
  * }
  *
- * static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
+ * static struct io_plan read_in(struct io_conn *c, struct buffer *b)
  * {
- *     assert(c == b->reader);
- *     io_wake(b->writer, io_close());
- *     b->reader = NULL;
- * }
+ *     // Add what we just read.
+ *     b->end += b->rlen;
+ *     assert(b->end <= sizeof(b->buf));
  *
- * static struct io_plan wake_reader(struct io_conn *c, struct stdin_buffer *b)
- * {
- *     assert(c == b->writer);
- *     if (!b->reader)
- *             return io_close();
- *     b->len = sizeof(b->inbuf);
- *     io_wake(b->reader, io_read_partial(b->inbuf, &b->len, wake_writer, b));
- *     return io_idle();
- * }
+ *     // If we just read something, wake writer.
+ *     if (b->rlen != 0)
+ *             io_wake(b);
  *
- * static void fail_child_write(struct io_conn *conn, struct stdin_buffer *b)
- * {
- *     if (b->reader)
- *             err(1, "Failed writing to child.");
+ *     // If buffer is empty, return to start.
+ *     if (b->start == b->end)
+ *             b->start = b->end = 0;
+ *
+ *     // Read in some of the rest.
+ *     b->rlen = sizeof(b->buf) - b->end;
+ *
+ *     // No room?  Wait for writer
+ *     if (b->rlen == 0)
+ *             return io_wait(b, read_in, b);
+ *
+ *     return io_read_partial(b->buf + b->end, &b->rlen, read_in, b);
  * }
  *
- * // This reads from the child and saves it into buffer.
- * static struct io_plan read_from_child(struct io_conn *conn,
- *                                      struct buffer *b)
+ * static struct io_plan write_out(struct io_conn *c, struct buffer *b)
  * {
- *     b->off += b->rlen;
- *
- *     if (b->off == b->max)
- *             b->buf = realloc(b->buf, b->max *= 2);
+ *     // Remove what we just wrote.
+ *     b->start += b->wlen;
+ *     assert(b->start <= sizeof(b->buf));
+ *
+ *     // If we wrote somthing, wake writer.
+ *     if (b->wlen != 0)
+ *             io_wake(b);
+ *
+ *     b->wlen = b->end - b->start;
+ *     // Nothing to write?  Wait for reader.
+ *     if (b->wlen == 0) {
+ *             if (b->finished)
+ *                     return io_close();
+ *             return io_wait(b, write_out, b);
+ *     }
  *
- *     b->rlen = b->max - b->off;
- *     return io_read_partial(b->buf + b->off, &b->rlen, read_from_child, b);
+ *     return io_write_partial(b->buf + b->start, &b->wlen, write_out, b);
  * }
  *
  * // Feed a program our stdin, gather its stdout, print that at end.
  * int main(int argc, char *argv[])
  * {
  *     int tochild[2], fromchild[2];
- *     struct buffer out;
- *     struct stdin_buffer sbuf;
+ *     struct buffer to, from;
  *     int status;
- *     size_t off;
- *     ssize_t ret;
- *     struct io_conn *from_child;
+ *     struct io_conn *reader;
  *
  *     if (argc == 1)
  *             errx(1, "Usage: runner <cmdline>...");
  *     close(fromchild[1]);
  *     signal(SIGPIPE, SIG_IGN);
  *
- *     sbuf.len = sizeof(sbuf.inbuf);
- *     sbuf.reader = io_new_conn(STDIN_FILENO,
- *                               io_read_partial(sbuf.inbuf, &sbuf.len,
- *                                               wake_writer, &sbuf));
- *     sbuf.writer = io_new_conn(tochild[1], io_idle());
- *
- *     out.max = 128;
- *     out.off = 0;
- *     out.rlen = 128;
- *     out.buf = malloc(out.max);
- *     from_child = io_new_conn(fromchild[0],
- *                              io_read_partial(out.buf, &out.rlen,
- *                                              read_from_child, &out));
- *     if (!sbuf.reader || !sbuf.writer || !from_child)
- *             err(1, "Allocating connections");
- *
- *     io_set_finish(sbuf.reader, reader_exit, &sbuf);
- *     io_set_finish(sbuf.writer, fail_child_write, &sbuf);
+ *     // Read from stdin, write to child.
+ *     memset(&to, 0, sizeof(to));
+ *     reader = io_new_conn(STDIN_FILENO, read_in(NULL, &to));
+ *     io_set_finish(reader, finish, &to);
+ *     io_new_conn(tochild[1], write_out(NULL, &to));
+ *
+ *     // Read from child, write to stdout.
+ *     reader = io_new_conn(fromchild[0], read_in(NULL, &from));
+ *     io_set_finish(reader, finish, &from);
+ *     io_new_conn(STDOUT_FILENO, write_out(NULL, &from));
  *
  *     io_loop();
  *     wait(&status);
  *
- *     for (off = 0; off < out.off; off += ret) {
- *             ret = write(STDOUT_FILENO, out.buf+off, out.off-off);
- *             if (ret < 0)
- *                     err(1, "Writing stdout");
- *     }
- *     free(out.buf);
- *
  *     return WIFEXITED(status) ? WEXITSTATUS(status) : 2;
  * }
  *
index e2090ff14068ebe0a34317941bb8efc6f7cb9a0a..b6674025b0233cf7e2f3a801dd9b41b026a3e97a 100644 (file)
@@ -91,6 +91,7 @@ bool add_conn(struct io_conn *c);
 bool add_duplex(struct io_conn *c);
 void del_listener(struct io_listener *l);
 void backend_plan_changed(struct io_conn *conn);
+void backend_wait_changed(const void *wait);
 void backend_add_timeout(struct io_conn *conn, struct timespec ts);
 void backend_del_timeout(struct io_conn *conn);
 void backend_del_conn(struct io_conn *conn);
index 90039e1e6960155a87862bbc1e86f9cbc987f35f..a1610a405230da40ec725dbc1163564b685986b6 100644 (file)
@@ -24,8 +24,6 @@ bool io_plan_nodebug;
 struct io_conn *current;
 /* User-defined function to select which connection(s) to debug. */
 bool (*io_debug_conn)(struct io_conn *conn);
-/* Set when we wake up an connection we are debugging. */
-bool io_debug_wakeup;
 
 struct io_plan io_debug(struct io_plan plan)
 {
@@ -36,12 +34,9 @@ struct io_plan io_debug(struct io_plan plan)
                return plan;
        }
 
-       if (!current || !doing_debug_on(current)) {
-               if (!io_debug_wakeup)
-                       return plan;
-       }
+       if (!current || !doing_debug_on(current))
+               return plan;
 
-       io_debug_wakeup = false;
        current->plan = plan;
        backend_plan_changed(current);
 
@@ -68,7 +63,7 @@ struct io_plan io_debug(struct io_plan plan)
 
        /* Return a do-nothing plan, so backend_plan_changed in
         * io_ready doesn't do anything (it's already been called). */
-       return io_idle_();
+       return io_wait_(NULL, (void *)1, NULL);
 }
 
 int io_debug_io(int ret)
@@ -107,22 +102,12 @@ int io_debug_io(int ret)
        return 2;
 }
 
-static void debug_io_wake(struct io_conn *conn)
-{
-       /* We want linear if we wake a debugged connection, too. */
-       if (io_debug_conn && io_debug_conn(conn))
-               io_debug_wakeup = true;
-}
-
 /* Counterpart to io_plan_no_debug(), called in macros in io.h */
 static void io_plan_debug_again(void)
 {
        io_plan_nodebug = false;
 }
 #else
-static void debug_io_wake(struct io_conn *conn)
-{
-}
 static void io_plan_debug_again(void)
 {
 }
@@ -436,37 +421,26 @@ struct io_plan io_connect_(int fd, const struct addrinfo *addr,
        return plan;
 }
 
-struct io_plan io_idle_(void)
+struct io_plan io_wait_(const void *wait,
+                       struct io_plan (*cb)(struct io_conn *, void*),
+                       void *arg)
 {
        struct io_plan plan;
 
+       assert(cb);
        plan.pollflag = 0;
        plan.io = NULL;
-       /* Never called (overridden by io_wake), but NULL means closing */
-       plan.next = (void *)io_idle_;
+       plan.next = cb;
+       plan.next_arg = arg;
 
-       return plan;
-}
+       plan.u1.const_vp = wait;
 
-bool io_is_idle(const struct io_conn *conn)
-{
-       return conn->plan.io == NULL;
+       return plan;
 }
 
-void io_wake_(struct io_conn *conn, struct io_plan plan)
-
+void io_wake(const void *wait)
 {
-       io_plan_debug_again();
-
-       /* It might be closing, but we haven't called its finish() yet. */
-       if (!conn->plan.next)
-               return;
-       /* It was idle, right? */
-       assert(!conn->plan.io);
-       conn->plan = plan;
-       backend_plan_changed(conn);
-
-       debug_io_wake(conn);
+       backend_wait_changed(wait);
 }
 
 void io_ready(struct io_conn *conn)
index 478190f4d05b16def3f7eb481c2c8ecff334ee83..505c4c70a3fd5bc3f183a3927a2595d9776f9d2b 100644 (file)
@@ -314,6 +314,7 @@ struct io_plan io_write_partial_(const void *data, size_t *len,
 struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
                          void *arg);
 
+
 /**
  * io_connect - plan to connect to a listening socket.
  * @fd: file descriptor.
@@ -360,19 +361,32 @@ struct io_plan io_connect_(int fd, const struct addrinfo *addr,
                           void *arg);
 
 /**
- * io_idle - plan to do nothing.
+ * io_wait - plan to wait for something.
+ * @wait: the address to wait on.
+ * @cb: function to call after waiting.
+ * @arg: @cb argument
  *
- * This indicates the connection is idle: io_wake() will be called later do
- * give the connection a new plan.
+ * This indicates the connection is idle: io_wake() will be called later to
+ * restart the connection.
  *
  * Example:
  *     struct io_conn *sleeper;
- *     sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ *     unsigned int counter = 0;
+ *     sleeper = io_new_conn(open("/dev/null", O_RDONLY),
+ *                           io_wait(&counter, io_close_cb, NULL));
  *     if (!sleeper)
  *             exit(1);
  */
-#define io_idle() io_debug(io_idle_())
-struct io_plan io_idle_(void);
+#define io_wait(wait, cb, arg)                                         \
+       io_debug(io_wait_(wait,                                         \
+                         typesafe_cb_preargs(struct io_plan, void *,   \
+                                             (cb), (arg),              \
+                                             struct io_conn *),        \
+                         (arg)))
+
+struct io_plan io_wait_(const void *wait,
+                       struct io_plan (*cb)(struct io_conn *, void *),
+                       void *arg);
 
 /**
  * io_timeout - set timeout function if the callback doesn't complete.
@@ -440,35 +454,18 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
 struct io_conn *io_duplex_(struct io_conn *conn, struct io_plan plan);
 
 /**
- * io_wake - wake up an idle connection.
- * @conn: an idle connection.
- * @plan: the next I/O plan for @conn.
- *
- * This makes @conn ready to do I/O the next time around the io_loop().
+ * io_wake - wake up any connections waiting on @wait
+ * @wait: the address to trigger.
  *
  * Example:
- *     struct io_conn *sleeper;
- *     sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
- *
- *     io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL));
- */
-#define io_wake(conn, plan) (io_plan_no_debug(), io_wake_((conn), (plan)))
-void io_wake_(struct io_conn *conn, struct io_plan plan);
-
-/**
- * io_is_idle - is a connection idle?
- *
- * This can be useful for complex protocols, eg. where you want a connection
- * to send something, so you queue it and wake it if it's idle.
+ *     unsigned int wait;
  *
- * Example:
- *     struct io_conn *sleeper;
- *     sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ *     io_new_conn(open("/dev/null", O_RDONLY),
+ *                io_wait(&wait, io_close_cb, NULL));
  *
- *     assert(io_is_idle(sleeper));
- *     io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL));
+ *     io_wake(&wait);
  */
-bool io_is_idle(const struct io_conn *conn);
+void io_wake(const void *wait);
 
 /**
  * io_break - return from io_loop()
index e3f595c9884c0d2fcea42d598ca3321d067069a8..8ba376a549aa47ee4be29af3345c034dfdfbf481 100644 (file)
@@ -167,6 +167,34 @@ void backend_plan_changed(struct io_conn *conn)
                some_always = true;
 }
 
+void backend_wait_changed(const void *wait)
+{
+       unsigned int i;
+
+       for (i = 0; i < num_fds; i++) {
+               struct io_conn *c, *duplex;
+
+               /* Ignore listeners */
+               if (fds[i]->listener)
+                       continue;
+               c = (void *)fds[i];
+               for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
+                       /* Ignore closing. */
+                       if (!c->plan.next)
+                               continue;
+                       /* Not idle? */
+                       if (c->plan.io)
+                               continue;
+                       /* Waiting on something else? */
+                       if (c->plan.u1.const_vp != wait)
+                               continue;
+                       /* Make it do the next thing. */
+                       c->plan = io_always_(c->plan.next, c->plan.next_arg);
+                       backend_plan_changed(c);
+               }
+       }
+}
+
 bool add_conn(struct io_conn *c)
 {
        if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT)))
index 7ae16e9e96a3f8203c05d5d2778729c3573af290..82f6e0af6851af408a9741c1e9a42fb04da95792 100644 (file)
@@ -29,8 +29,7 @@ static struct io_plan read_done(struct io_conn *conn, struct data *d)
 
 static void finish_waker(struct io_conn *conn, struct data *d)
 {
-       ok1(io_is_idle(idler));
-       io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d));
+       io_wake(d);
        ok1(d->state == 1);
        d->state++;
 }
@@ -47,13 +46,18 @@ static struct io_plan never(struct io_conn *conn, void *arg)
        abort();
 }
 
+static struct io_plan read_buf(struct io_conn *conn, struct data *d)
+{
+       return io_read(d->buf, sizeof(d->buf), read_done, d);
+}
+
 static void init_conn(int fd, struct data *d)
 {
        int fd2;
 
        ok1(d->state == 0);
        d->state++;
-       idler = io_new_conn(fd, io_idle());
+       idler = io_new_conn(fd, io_wait(d, read_buf, d));
        io_set_finish(idler, finish_idle, d);
 
        /* This will wake us up, as read will fail. */
@@ -103,7 +107,7 @@ int main(void)
        int fd, status;
 
        /* This is how many tests you plan to run */
-       plan_tests(14);
+       plan_tests(13);
        d->state = 0;
        fd = make_listen_fd(PORT, &addrinfo);
        ok1(fd >= 0);
index b3840433410b7bfda4068293728c8050b2483856..c8257703d3333ef2a5199db85b7232022157102a 100644 (file)
@@ -15,6 +15,11 @@ static struct io_plan timeout_wakeup(struct io_conn *conn, char *buf)
        return io_read(buf, 16, io_close_cb, NULL);
 }
 
+static struct io_plan never(struct io_conn *conn, void *unused)
+{
+       abort();
+}
+
 int main(void)
 {
        int fds[2];
@@ -28,7 +33,7 @@ int main(void)
        /* Write then close. */
        io_new_conn(fds[1], io_write("hello there world", 16,
                                     io_close_cb, NULL));
-       conn = io_new_conn(fds[0], io_idle());
+       conn = io_new_conn(fds[0], io_wait(buf, never, NULL));
 
        /* To avoid assert(num_waiting) */
        ok1(pipe(fds2) == 0);
index b73139e4f4c7a256c1e00301dce63c67622c1869..f6b3db1cb9fd45338d57b5012c00ca9a38a9197e 100644 (file)
@@ -11,10 +11,15 @@ static char inbuf[8];
 
 static struct io_plan wake_it(struct io_conn *conn, struct io_conn *reader)
 {
-       io_wake(reader, io_read(inbuf, 8, io_close_cb, NULL));
+       io_wake(inbuf);
        return io_close();
 }
 
+static struct io_plan read_buf(struct io_conn *conn, void *unused)
+{
+       return io_read(inbuf, 8, io_close_cb, NULL);
+}
+
 int main(void)
 {
        int fds[2];
@@ -23,7 +28,7 @@ int main(void)
        plan_tests(3);
 
        ok1(pipe(fds) == 0);
-       conn = io_new_conn(fds[0], io_idle());
+       conn = io_new_conn(fds[0], io_wait(inbuf, read_buf, NULL));
        io_new_conn(fds[1], io_write("EASYTEST", 8, wake_it, conn));
 
        ok1(io_loop() == NULL);
index da59a3cc30ef0f6ebb734a24d755515f636b4971..53e971d0e284ef0775e520114230fa25e89746f4 100644 (file)
@@ -16,6 +16,12 @@ struct buffer {
 };
 
 static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
+static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf);
+
+static struct io_plan read_buf(struct io_conn *conn, struct buffer *buf)
+{
+       return io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf);
+}
 
 static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
 {
@@ -25,25 +31,28 @@ static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
                return io_close();
 
        /* You write. */
-       io_wake(buf->writer,
-               io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
+       io_wake(&buf->writer);
 
        /* I'll wait until you wake me. */
-       return io_idle();
+       return io_wait(&buf->reader, read_buf, buf);
+}
+
+static struct io_plan write_buf(struct io_conn *conn, struct buffer *buf)
+{
+       return io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf);
 }
 
 static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
 {
        assert(conn == buf->writer);
        /* You read. */
-       io_wake(buf->reader,
-               io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
+       io_wake(&buf->reader);
 
        if (++buf->iters == NUM_ITERS)
                return io_close();
 
        /* I'll wait until you tell me to write. */
-       return io_idle();
+       return io_wait(&buf->writer, write_buf, buf);
 }
 
 static struct buffer buf[NUM];
@@ -66,13 +75,12 @@ int main(void)
                sprintf(buf[i].buf, "%i-%i", i, i);
 
                /* Wait for writer to tell us to read. */
-               buf[i].reader = io_new_conn(last_read, io_idle());
+               buf[i].reader = io_new_conn(last_read,
+                                           io_wait(&buf[i].reader, read_buf,
+                                                   &buf[i]));
                if (!buf[i].reader)
                        break;
-               buf[i].writer = io_new_conn(fds[1],
-                                           io_write(&buf[i].buf,
-                                                    sizeof(buf[i].buf),
-                                                    poke_reader, &buf[i]));
+               buf[i].writer = io_new_conn(fds[1], write_buf(NULL, &buf[i]));
                if (!buf[i].writer)
                        break;
                last_read = fds[0];
@@ -83,11 +91,10 @@ int main(void)
        /* Last one completes the cirle. */
        i = 0;
        sprintf(buf[i].buf, "%i-%i", i, i);
-       buf[i].reader = io_new_conn(last_read, io_idle());
+       buf[i].reader = io_new_conn(last_read,
+                                   io_wait(&buf[i].reader, read_buf, &buf[i]));
        ok1(buf[i].reader);
-       buf[i].writer = io_new_conn(last_write,
-                                   io_write(&buf[i].buf, sizeof(buf[i].buf),
-                                            poke_reader, &buf[i]));
+       buf[i].writer = io_new_conn(last_write, write_buf(NULL, &buf[i]));
        ok1(buf[i].writer);
 
        /* They should eventually exit */
index 0e7e1565d579fa56b76397dcf05107c12611f63e..31510007a90114c786c9b95a1ba1959b35b6a0e0 100644 (file)
@@ -17,7 +17,7 @@ int main(void)
                int fds[2];
 
                ok1(pipe(fds) == 0);
-               io_new_conn(fds[0], io_idle());
+               io_new_conn(fds[0], io_wait(&status, io_close_cb, NULL));
                io_loop();
                exit(1);
        }
index 8a1a04f0c8190fc25ebd9ec61180d7d5d54b91a8..addca841e627a592d550a66ed56689e28479008a 100644 (file)
@@ -28,7 +28,7 @@ static void finish_ok(struct io_conn *conn, struct data *d)
 static struct io_plan write_done(struct io_conn *conn, struct data *d)
 {
        d->state++;
-       return io_idle();
+       return io_wait(d, io_close_cb, NULL);
 }
 
 static struct io_plan read_done(struct io_conn *conn, struct data *d)