]> git.ozlabs.org Git - ccan/commitdiff
Merge branch 'master' of ozlabs.org:ccan
authorRusty Russell <rusty@rustcorp.com.au>
Thu, 14 Nov 2013 02:29:41 +0000 (12:59 +1030)
committerRusty Russell <rusty@rustcorp.com.au>
Thu, 14 Nov 2013 02:29:41 +0000 (12:59 +1030)
ccan/io/io.c
ccan/io/io.h
ccan/io/poll.c
ccan/io/test/run-06-idle.c
ccan/io/test/run-14-duplex-both-read-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-14-duplex-both-read.c [new file with mode: 0644]

index b74520444353761451529af0d0467522293f2686..54ba7da529ec433661163f2d01fb4ab335562fa8 100644 (file)
@@ -100,6 +100,10 @@ int io_debug_io(int ret)
        case 1: /* Done: get next plan. */
                if (timeout_active(conn))
                        backend_del_timeout(conn);
+               /* In case they call io_duplex, clear our poll flags so
+                * both sides don't seem to be both doing read or write
+                * (See assert(!mask || pfd->events != mask) in poll.c) */
+               conn->plan.pollflag = 0;
                conn->plan.next(conn, conn->plan.next_arg);
                break;
        default:
@@ -414,6 +418,11 @@ struct io_plan io_idle_(void)
        return plan;
 }
 
+bool io_is_idle(const struct io_conn *conn)
+{
+       return conn->plan.io == NULL;
+}
+
 void io_wake_(struct io_conn *conn, struct io_plan plan)
 
 {
@@ -443,13 +452,17 @@ void io_ready(struct io_conn *conn)
        case 1: /* Done: get next plan. */
                if (timeout_active(conn))
                        backend_del_timeout(conn);
+               /* In case they call io_duplex, clear our poll flags so
+                * both sides don't seem to be both doing read or write
+                * (See assert(!mask || pfd->events != mask) in poll.c) */
+               conn->plan.pollflag = 0;
                conn->plan = conn->plan.next(conn, conn->plan.next_arg);
                backend_plan_changed(conn);
        }
        set_current(NULL);
 
-       /* If it closed, close duplex. */
-       if (!conn->plan.next && conn->duplex) {
+       /* If it closed, close duplex if not already */
+       if (!conn->plan.next && conn->duplex && conn->duplex->plan.next) {
                set_current(conn->duplex);
                conn->duplex->plan = io_close();
                backend_plan_changed(conn->duplex);
index 0318aef300873fa71104e0a2b630ad28e93e7065..df0764bd17e1ff7f50633ffe91a45de4c02c6a22 100644 (file)
@@ -431,6 +431,21 @@ struct io_conn *io_duplex_(struct io_conn *conn, struct io_plan plan);
 #define io_wake(conn, plan) (io_plan_no_debug(), io_wake_((conn), (plan)))
 void io_wake_(struct io_conn *conn, struct io_plan plan);
 
+/**
+ * io_is_idle - is a connection idle?
+ *
+ * This can be useful for complex protocols, eg. where you want a connection
+ * to send something, so you queue it and wake it if it's idle.
+ *
+ * Example:
+ *     struct io_conn *sleeper;
+ *     sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ *
+ *     assert(io_is_idle(sleeper));
+ *     io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL));
+ */
+bool io_is_idle(const struct io_conn *conn);
+
 /**
  * io_break - return from io_loop()
  * @ret: non-NULL value to return from io_loop().
index 0078fc658411c1d50e01a5c3e9eed278f5214a83..1b11badd642c0d87d1270d642ac63db40f9cb24e 100644 (file)
@@ -346,7 +346,11 @@ void *do_io_loop(struct io_conn **ready)
                                                 * anything can change. */
                                                if (doing_debug())
                                                        break;
-                                               if (!(events&(POLLIN|POLLOUT)))
+
+                                               /* If no events, or it closed
+                                                * the duplex, continue. */
+                                               if (!(events&(POLLIN|POLLOUT))
+                                                   || !c->plan.next)
                                                        continue;
                                        }
                                }
index 51cca961a077416c47287a5b78d697cd5513d554..455b8608f1cae1723763154a5bb918d1b5a46a11 100644 (file)
@@ -29,6 +29,7 @@ static struct io_plan read_done(struct io_conn *conn, struct data *d)
 
 static void finish_waker(struct io_conn *conn, struct data *d)
 {
+       ok1(io_is_idle(idler));
        io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d));
        ok1(d->state == 1);
        d->state++;
@@ -102,7 +103,7 @@ int main(void)
        int fd, status;
 
        /* This is how many tests you plan to run */
-       plan_tests(13);
+       plan_tests(14);
        d->state = 0;
        fd = make_listen_fd(PORT, &addrinfo);
        ok1(fd >= 0);
diff --git a/ccan/io/test/run-14-duplex-both-read-DEBUG.c b/ccan/io/test/run-14-duplex-both-read-DEBUG.c
new file mode 100644 (file)
index 0000000..5c4aae7
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64014"
+#define main real_main
+int real_main(void);
+#include "run-14-duplex-both-read.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-14-duplex-both-read.c b/ccan/io/test/run-14-duplex-both-read.c
new file mode 100644 (file)
index 0000000..70bdec0
--- /dev/null
@@ -0,0 +1,149 @@
+/* Check a bug where we have just completed a read, then set up a duplex
+ * which tries to do a read. */
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65014"
+#endif
+
+struct data {
+       struct io_listener *l;
+       int state;
+       struct io_conn *c1, *c2;
+       char buf[4];
+       char wbuf[32];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       d->state++;
+}
+
+static struct io_plan end(struct io_conn *conn, struct data *d)
+{
+       d->state++;
+
+       /* last one out closes. */
+       if (conn == d->c1 && io_is_idle(d->c2))
+               return io_close();
+
+       /* last one out closes. */
+       if (conn == d->c2 && io_is_idle(d->c1))
+               return io_close();
+
+       return io_idle();
+}
+
+static struct io_plan make_duplex(struct io_conn *conn, struct data *d)
+{
+       /* Have duplex read the rest of the buffer. */
+       d->c2 = io_duplex(conn, io_read(d->buf+1, sizeof(d->buf)-1, end, d));
+       ok1(d->c2);
+       io_set_finish(d->c2, finish_ok, d);
+
+       return io_write(d->wbuf, sizeof(d->wbuf), end, d);
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+
+       io_close_listener(d->l);
+
+       memset(d->wbuf, 7, sizeof(d->wbuf));
+
+       d->c1 = io_new_conn(fd, io_read(d->buf, 1, make_duplex, d));
+       io_set_finish(d->c1, finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(10);
+       d->state = 0;
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       d->l = io_new_listener(fd, init_conn, d);
+       ok1(d->l);
+       fflush(stdout);
+       if (!fork()) {
+               int i;
+               char buf[32];
+
+               io_close_listener(d->l);
+               free(d);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               for (i = 0; i < 32; i++) {
+                       if (read(fd, buf+i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == NULL);
+       ok1(d->state == 5);
+       ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+       free(d);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}