]> git.ozlabs.org Git - ccan/commitdiff
ccan/io: duplex support.
authorRusty Russell <rusty@rustcorp.com.au>
Mon, 4 Aug 2014 08:12:21 +0000 (17:42 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Mon, 4 Aug 2014 08:12:21 +0000 (17:42 +0930)
This is actually pretty simple now.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ccan/io/io.c
ccan/io/io.h
ccan/io/test/run-12-bidir.c
ccan/io/test/run-14-duplex-both-read.c
ccan/io/test/run-16-duplex-test.c

index 2832c513466f646cdca3c8d8ad7857f5175bb791..23e8070485663dc2ee04d5b81b4efd05fb6f2493 100644 (file)
@@ -56,9 +56,10 @@ static void next_plan(struct io_conn *conn, struct io_plan *plan)
 
        plan = next(conn, plan->next_arg);
 
 
        plan = next(conn, plan->next_arg);
 
-       /* It should have set a plan inside this conn. */
+       /* It should have set a plan inside this conn (or duplex) */
        assert(plan == &conn->plan[IO_IN]
        assert(plan == &conn->plan[IO_IN]
-              || plan == &conn->plan[IO_OUT]);
+              || plan == &conn->plan[IO_OUT]
+              || plan == &conn->plan[2]);
        assert(conn->plan[IO_IN].status != IO_UNSET
               || conn->plan[IO_OUT].status != IO_UNSET);
 
        assert(conn->plan[IO_IN].status != IO_UNSET
               || conn->plan[IO_OUT].status != IO_UNSET);
 
@@ -428,7 +429,7 @@ void io_break(const void *ret)
        io_loop_return = (void *)ret;
 }
 
        io_loop_return = (void *)ret;
 }
 
-struct io_plan *io_never(struct io_conn *conn)
+struct io_plan *io_never(struct io_conn *conn, void *unused)
 {
        return io_always(conn, io_never_called, NULL);
 }
 {
        return io_always(conn, io_never_called, NULL);
 }
@@ -437,3 +438,11 @@ int io_conn_fd(const struct io_conn *conn)
 {
        return conn->fd.fd;
 }
 {
        return conn->fd.fd;
 }
+
+struct io_plan *io_duplex(struct io_plan *in_plan, struct io_plan *out_plan)
+{
+       /* in_plan must be conn->plan[IO_IN], out_plan must be [IO_OUT] */
+       assert(out_plan == in_plan + 1);
+
+       return out_plan + 1;
+}
index 00b9b33db5e7884bfc969acee67247818d0cfa75..d4ba2d87688e81143863e232d4c1eb34283ab602 100644 (file)
@@ -403,6 +403,38 @@ struct io_plan *io_connect_(struct io_conn *conn, const struct addrinfo *addr,
                            struct io_plan *(*next)(struct io_conn *, void *),
                            void *arg);
 
                            struct io_plan *(*next)(struct io_conn *, void *),
                            void *arg);
 
+/**
+ * io_duplex - set plans for both input and output.
+ * @conn: the connection that plan is for.
+ * @in: the input plan
+ * @out: the output plan
+ *
+ * Most plans are either for input or output; io_duplex creates a plan
+ * which does both.  This is often used in the init function to create
+ * two independent streams, though it can be used once on any connection.
+ *
+ * Note that if either plan closes the connection, it will be closed.
+ *
+ * Note that if one plan is io_wait or io_always, that causes a problem:
+ * they look at the input and output plan slots to figure out which to
+ * use, but if the other plan hasn't been evaluated yet, that will fail.
+ * In this case, you'll need to ensure the other plan is evaluated first,
+ * eg. "struct io_plan *r = io_read(...); return io_duplex(r, io_always(...))"
+ *
+ * Example:
+ * struct buf {
+ *     char in[100];
+ *     char out[100];
+ * };
+ *
+ * static struct io_plan *read_and_write(struct io_conn *conn, struct buf *b)
+ * {
+ *     return io_duplex(io_read(conn, b->in, sizeof(b->in), io_close_cb, b),
+ *                      io_write(conn, b->out, sizeof(b->out), io_close_cb, b));
+ * }
+ */
+struct io_plan *io_duplex(struct io_plan *in_plan, struct io_plan *out_plan);
+
 /**
  * io_wait - leave a plan idle until something wakes us.
  * @conn: the connection that plan is for.
 /**
  * io_wait - leave a plan idle until something wakes us.
  * @conn: the connection that plan is for.
@@ -471,6 +503,7 @@ void io_break(const void *ret);
 /**
  * io_never - assert if callback is called.
  * @conn: the connection that plan is for.
 /**
  * io_never - assert if callback is called.
  * @conn: the connection that plan is for.
+ * @unused: an unused parameter to make this suitable for use as a callback.
  *
  * Sometimes you want to make it clear that a callback should never happen
  * (eg. for io_break).  This will assert() if called.
  *
  * Sometimes you want to make it clear that a callback should never happen
  * (eg. for io_break).  This will assert() if called.
@@ -480,10 +513,10 @@ void io_break(const void *ret);
  * {
  *     io_break(conn);
  *     // We won't ever return from io_break
  * {
  *     io_break(conn);
  *     // We won't ever return from io_break
- *     return io_never(conn);
+ *     return io_never(conn, NULL);
  * }
  */
  * }
  */
-struct io_plan *io_never(struct io_conn *conn);
+struct io_plan *io_never(struct io_conn *conn, void *unused);
 
 /* FIXME: io_recvfrom/io_sendto */
 
 
 /* FIXME: io_recvfrom/io_sendto */
 
index 533f465d12a8e161a646177cd0e1046d944f86fc..0329b81f6755e63f338de1f1890bb803d72b422e 100644 (file)
@@ -6,7 +6,6 @@
 #include <sys/wait.h>
 #include <stdio.h>
 
 #include <sys/wait.h>
 #include <stdio.h>
 
-#if 0
 #ifndef PORT
 #define PORT "65012"
 #endif
 #ifndef PORT
 #define PORT "65012"
 #endif
@@ -14,6 +13,7 @@
 struct data {
        struct io_listener *l;
        int state;
 struct data {
        struct io_listener *l;
        int state;
+       int done;
        char buf[4];
        char wbuf[32];
 };
        char buf[4];
        char wbuf[32];
 };
@@ -23,28 +23,27 @@ static void finish_ok(struct io_conn *conn, struct data *d)
        d->state++;
 }
 
        d->state++;
 }
 
-static struct io_plan *write_done(struct io_conn *conn, struct data *d)
+static struct io_plan *rw_done(struct io_conn *conn, struct data *d)
 {
        d->state++;
 {
        d->state++;
-       return io_close(conn);
+       d->done++;
+       if (d->done == 2)
+               return io_close(conn);
+       return io_wait(conn, NULL, io_never, NULL);
 }
 
 }
 
-static void init_conn(int fd, struct data *d)
+static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
 {
 {
-       struct io_conn *conn;
-
        ok1(d->state == 0);
        d->state++;
 
        io_close_listener(d->l);
 
        memset(d->wbuf, 7, sizeof(d->wbuf));
        ok1(d->state == 0);
        d->state++;
 
        io_close_listener(d->l);
 
        memset(d->wbuf, 7, sizeof(d->wbuf));
-
-       conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close_cb, d));
-       io_set_finish(conn, finish_ok, d);
-       conn = io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d));
-       ok1(conn);
        io_set_finish(conn, finish_ok, d);
        io_set_finish(conn, finish_ok, d);
+
+       return io_duplex(io_read(conn, d->buf, sizeof(d->buf), rw_done, d),
+                        io_write(conn, d->wbuf, sizeof(d->wbuf), rw_done, d));
 }
 
 static int make_listen_fd(const char *port, struct addrinfo **info)
 }
 
 static int make_listen_fd(const char *port, struct addrinfo **info)
@@ -88,9 +87,10 @@ int main(void)
        /* This is how many tests you plan to run */
        plan_tests(10);
        d->state = 0;
        /* This is how many tests you plan to run */
        plan_tests(10);
        d->state = 0;
+       d->done = 0;
        fd = make_listen_fd(PORT, &addrinfo);
        ok1(fd >= 0);
        fd = make_listen_fd(PORT, &addrinfo);
        ok1(fd >= 0);
-       d->l = io_new_listener(fd, init_conn, d);
+       d->l = io_new_listener(NULL, fd, init_conn, d);
        ok1(d->l);
        fflush(stdout);
        if (!fork()) {
        ok1(d->l);
        fflush(stdout);
        if (!fork()) {
@@ -121,6 +121,7 @@ int main(void)
        freeaddrinfo(addrinfo);
        ok1(io_loop() == NULL);
        ok1(d->state == 4);
        freeaddrinfo(addrinfo);
        ok1(io_loop() == NULL);
        ok1(d->state == 4);
+       ok1(d->done == 2);
        ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
        free(d);
 
        ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
        free(d);
 
@@ -131,9 +132,3 @@ int main(void)
        /* This exits depending on whether all tests passed */
        return exit_status();
 }
        /* This exits depending on whether all tests passed */
        return exit_status();
 }
-#else
-int main(void)
-{
-       return 0;
-}
-#endif
index b4a14c3dfc3e0edbf1e59b2e5bf08534a1855660..b52553aaf5be7235015d51c38b51e07a54241d54 100644 (file)
@@ -8,7 +8,6 @@
 #include <sys/wait.h>
 #include <stdio.h>
 
 #include <sys/wait.h>
 #include <stdio.h>
 
-#if 0
 #ifndef PORT
 #define PORT "65014"
 #endif
 #ifndef PORT
 #define PORT "65014"
 #endif
@@ -16,7 +15,6 @@
 struct data {
        struct io_listener *l;
        int state;
 struct data {
        struct io_listener *l;
        int state;
-       struct io_conn *c1, *c2;
        char buf[4];
        char wbuf[32];
 };
        char buf[4];
        char wbuf[32];
 };
@@ -26,23 +24,24 @@ static void finish_ok(struct io_conn *conn, struct data *d)
        d->state++;
 }
 
        d->state++;
 }
 
-static struct io_plan end(struct io_conn *conn, struct data *d)
+static struct io_plan *end(struct io_conn *conn, struct data *d)
 {
        d->state++;
 {
        d->state++;
-       return io_close();
+       if (d->state == 4)
+               return io_close(conn);
+       else
+               return io_wait(conn, NULL, io_never, NULL);
 }
 
 }
 
-static struct io_plan make_duplex(struct io_conn *conn, struct data *d)
+static struct io_plan *make_duplex(struct io_conn *conn, struct data *d)
 {
 {
+       d->state++;
        /* Have duplex read the rest of the buffer. */
        /* Have duplex read the rest of the buffer. */
-       d->c2 = io_duplex(conn, io_read(d->buf+1, sizeof(d->buf)-1, end, d));
-       ok1(d->c2);
-       io_set_finish(d->c2, finish_ok, d);
-
-       return io_write(d->wbuf, sizeof(d->wbuf), end, d);
+       return io_duplex(io_read(conn, d->buf+1, sizeof(d->buf)-1, end, d),
+                        io_write(conn, d->wbuf, sizeof(d->wbuf), end, d));
 }
 
 }
 
-static void init_conn(int fd, struct data *d)
+static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
 {
        ok1(d->state == 0);
        d->state++;
 {
        ok1(d->state == 0);
        d->state++;
@@ -50,9 +49,8 @@ static void init_conn(int fd, struct data *d)
        io_close_listener(d->l);
 
        memset(d->wbuf, 7, sizeof(d->wbuf));
        io_close_listener(d->l);
 
        memset(d->wbuf, 7, sizeof(d->wbuf));
-
-       d->c1 = io_new_conn(fd, io_read(d->buf, 1, make_duplex, d));
-       io_set_finish(d->c1, finish_ok, d);
+       io_set_finish(conn, finish_ok, d);
+       return io_read(conn, d->buf, 1, make_duplex, d);
 }
 
 static int make_listen_fd(const char *port, struct addrinfo **info)
 }
 
 static int make_listen_fd(const char *port, struct addrinfo **info)
@@ -94,11 +92,11 @@ int main(void)
        int fd, status;
 
        /* This is how many tests you plan to run */
        int fd, status;
 
        /* This is how many tests you plan to run */
-       plan_tests(10);
+       plan_tests(9);
        d->state = 0;
        fd = make_listen_fd(PORT, &addrinfo);
        ok1(fd >= 0);
        d->state = 0;
        fd = make_listen_fd(PORT, &addrinfo);
        ok1(fd >= 0);
-       d->l = io_new_listener(fd, init_conn, d);
+       d->l = io_new_listener(NULL, fd, init_conn, d);
        ok1(d->l);
        fflush(stdout);
        if (!fork()) {
        ok1(d->l);
        fflush(stdout);
        if (!fork()) {
@@ -139,9 +137,3 @@ int main(void)
        /* This exits depending on whether all tests passed */
        return exit_status();
 }
        /* This exits depending on whether all tests passed */
        return exit_status();
 }
-#else
-int main(void)
-{
-       return 0;
-}
-#endif
index e31c5c5bad95ae0117b53740cc63d17c1dbc4298..9d939c58aa422dc09be1a4057fc0d0e9cb4ad0d9 100644 (file)
@@ -8,14 +8,12 @@
 #include <sys/wait.h>
 #include <stdio.h>
 
 #include <sys/wait.h>
 #include <stdio.h>
 
-#if 0
 #ifndef PORT
 #define PORT "65016"
 #endif
 
 struct data {
        struct io_listener *l;
 #ifndef PORT
 #define PORT "65016"
 #endif
 
 struct data {
        struct io_listener *l;
-       struct io_conn *writer;
        int state;
        char buf[4];
        char wbuf[32];
        int state;
        char buf[4];
        char wbuf[32];
@@ -26,35 +24,27 @@ static void finish_ok(struct io_conn *conn, struct data *d)
        d->state++;
 }
 
        d->state++;
 }
 
-static struct io_plan write_done(struct io_conn *conn, struct data *d)
+static struct io_plan *io_done(struct io_conn *conn, struct data *d)
 {
        d->state++;
 {
        d->state++;
-       return io_wait(d, io_close_cb, NULL);
+       if (d->state == 3)
+               return io_close(conn);
+       return io_wait(conn, d, io_close_cb, NULL);
 }
 
 }
 
-static struct io_plan read_done(struct io_conn *conn, struct data *d)
+static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
 {
 {
-       d->state++;
-       io_close_other(d->writer);
-       return io_close();
-}
-
-static void init_conn(int fd, struct data *d)
-{
-       struct io_conn *conn;
-
        ok1(d->state == 0);
        d->state++;
 
        memset(d->wbuf, 7, sizeof(d->wbuf));
 
        ok1(d->state == 0);
        d->state++;
 
        memset(d->wbuf, 7, sizeof(d->wbuf));
 
-       conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), read_done, d));
        io_set_finish(conn, finish_ok, d);
        io_set_finish(conn, finish_ok, d);
-       d->writer = io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d));
-       ok1(d->writer);
-       io_set_finish(d->writer, finish_ok, d);
 
        io_close_listener(d->l);
 
        io_close_listener(d->l);
+
+       return io_duplex(io_read(conn, d->buf, sizeof(d->buf), io_done, d),
+                        io_write(conn, d->wbuf, sizeof(d->wbuf), io_done, d));
 }
 
 static int make_listen_fd(const char *port, struct addrinfo **info)
 }
 
 static int make_listen_fd(const char *port, struct addrinfo **info)
@@ -96,11 +86,11 @@ int main(void)
        int fd, status;
 
        /* This is how many tests you plan to run */
        int fd, status;
 
        /* This is how many tests you plan to run */
-       plan_tests(10);
+       plan_tests(9);
        d->state = 0;
        fd = make_listen_fd(PORT, &addrinfo);
        ok1(fd >= 0);
        d->state = 0;
        fd = make_listen_fd(PORT, &addrinfo);
        ok1(fd >= 0);
-       d->l = io_new_listener(fd, init_conn, d);
+       d->l = io_new_listener(NULL, fd, init_conn, d);
        ok1(d->l);
        fflush(stdout);
        if (!fork()) {
        ok1(d->l);
        fflush(stdout);
        if (!fork()) {
@@ -130,7 +120,7 @@ int main(void)
        }
        freeaddrinfo(addrinfo);
        ok1(io_loop() == NULL);
        }
        freeaddrinfo(addrinfo);
        ok1(io_loop() == NULL);
-       ok1(d->state == 5);
+       ok1(d->state == 4);
        ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
        free(d);
 
        ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
        free(d);
 
@@ -141,9 +131,3 @@ int main(void)
        /* This exits depending on whether all tests passed */
        return exit_status();
 }
        /* This exits depending on whether all tests passed */
        return exit_status();
 }
-#else
-int main(void)
-{
-       return 0;
-}
-#endif