io: handle duplex corner cases.
authorRusty Russell <rusty@rustcorp.com.au>
Thu, 14 Nov 2013 02:23:57 +0000 (12:53 +1030)
committerRusty Russell <rusty@rustcorp.com.au>
Thu, 14 Nov 2013 02:23:57 +0000 (12:53 +1030)
Especially where we have just done a read and spin off a duplex to do a read
as well.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ccan/io/io.c
ccan/io/poll.c
ccan/io/test/run-14-duplex-both-read-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-14-duplex-both-read.c [new file with mode: 0644]

index b74520444353761451529af0d0467522293f2686..83104f86b43ddfceed6d5bf7938a60d28ac159f2 100644 (file)
@@ -100,6 +100,10 @@ int io_debug_io(int ret)
        case 1: /* Done: get next plan. */
                if (timeout_active(conn))
                        backend_del_timeout(conn);
        case 1: /* Done: get next plan. */
                if (timeout_active(conn))
                        backend_del_timeout(conn);
+               /* In case they call io_duplex, clear our poll flags so
+                * both sides don't seem to be both doing read or write
+                * (See assert(!mask || pfd->events != mask) in poll.c) */
+               conn->plan.pollflag = 0;
                conn->plan.next(conn, conn->plan.next_arg);
                break;
        default:
                conn->plan.next(conn, conn->plan.next_arg);
                break;
        default:
@@ -443,13 +447,17 @@ void io_ready(struct io_conn *conn)
        case 1: /* Done: get next plan. */
                if (timeout_active(conn))
                        backend_del_timeout(conn);
        case 1: /* Done: get next plan. */
                if (timeout_active(conn))
                        backend_del_timeout(conn);
+               /* In case they call io_duplex, clear our poll flags so
+                * both sides don't seem to be both doing read or write
+                * (See assert(!mask || pfd->events != mask) in poll.c) */
+               conn->plan.pollflag = 0;
                conn->plan = conn->plan.next(conn, conn->plan.next_arg);
                backend_plan_changed(conn);
        }
        set_current(NULL);
 
                conn->plan = conn->plan.next(conn, conn->plan.next_arg);
                backend_plan_changed(conn);
        }
        set_current(NULL);
 
-       /* If it closed, close duplex. */
-       if (!conn->plan.next && conn->duplex) {
+       /* If it closed, close duplex if not already */
+       if (!conn->plan.next && conn->duplex && conn->duplex->plan.next) {
                set_current(conn->duplex);
                conn->duplex->plan = io_close();
                backend_plan_changed(conn->duplex);
                set_current(conn->duplex);
                conn->duplex->plan = io_close();
                backend_plan_changed(conn->duplex);
index 0078fc658411c1d50e01a5c3e9eed278f5214a83..1b11badd642c0d87d1270d642ac63db40f9cb24e 100644 (file)
@@ -346,7 +346,11 @@ void *do_io_loop(struct io_conn **ready)
                                                 * anything can change. */
                                                if (doing_debug())
                                                        break;
                                                 * anything can change. */
                                                if (doing_debug())
                                                        break;
-                                               if (!(events&(POLLIN|POLLOUT)))
+
+                                               /* If no events, or it closed
+                                                * the duplex, continue. */
+                                               if (!(events&(POLLIN|POLLOUT))
+                                                   || !c->plan.next)
                                                        continue;
                                        }
                                }
                                                        continue;
                                        }
                                }
diff --git a/ccan/io/test/run-14-duplex-both-read-DEBUG.c b/ccan/io/test/run-14-duplex-both-read-DEBUG.c
new file mode 100644 (file)
index 0000000..5c4aae7
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64014"
+#define main real_main
+int real_main(void);
+#include "run-14-duplex-both-read.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-14-duplex-both-read.c b/ccan/io/test/run-14-duplex-both-read.c
new file mode 100644 (file)
index 0000000..366f1d4
--- /dev/null
@@ -0,0 +1,151 @@
+/* Check a bug where we have just completed a read, then set up a duplex
+ * which tries to do a read. */
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65014"
+#endif
+
+#define is_idle(conn) ((conn)->plan.io == NULL)
+
+struct data {
+       struct io_listener *l;
+       int state;
+       struct io_conn *c1, *c2;
+       char buf[4];
+       char wbuf[32];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       d->state++;
+}
+
+static struct io_plan end(struct io_conn *conn, struct data *d)
+{
+       d->state++;
+
+       /* last one out closes. */
+       if (conn == d->c1 && is_idle(d->c2))
+               return io_close();
+
+       /* last one out closes. */
+       if (conn == d->c2 && is_idle(d->c1))
+               return io_close();
+
+       return io_idle();
+}
+
+static struct io_plan make_duplex(struct io_conn *conn, struct data *d)
+{
+       /* Have duplex read the rest of the buffer. */
+       d->c2 = io_duplex(conn, io_read(d->buf+1, sizeof(d->buf)-1, end, d));
+       ok1(d->c2);
+       io_set_finish(d->c2, finish_ok, d);
+
+       return io_write(d->wbuf, sizeof(d->wbuf), end, d);
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+
+       io_close_listener(d->l);
+
+       memset(d->wbuf, 7, sizeof(d->wbuf));
+
+       d->c1 = io_new_conn(fd, io_read(d->buf, 1, make_duplex, d));
+       io_set_finish(d->c1, finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(10);
+       d->state = 0;
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       d->l = io_new_listener(fd, init_conn, d);
+       ok1(d->l);
+       fflush(stdout);
+       if (!fork()) {
+               int i;
+               char buf[32];
+
+               io_close_listener(d->l);
+               free(d);
+               fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                           addrinfo->ai_protocol);
+               if (fd < 0)
+                       exit(1);
+               if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+                       exit(2);
+               signal(SIGPIPE, SIG_IGN);
+               for (i = 0; i < strlen("hellothere"); i++) {
+                       if (write(fd, "hellothere" + i, 1) != 1)
+                               break;
+               }
+               for (i = 0; i < 32; i++) {
+                       if (read(fd, buf+i, 1) != 1)
+                               break;
+               }
+               close(fd);
+               freeaddrinfo(addrinfo);
+               exit(0);
+       }
+       freeaddrinfo(addrinfo);
+       ok1(io_loop() == NULL);
+       ok1(d->state == 5);
+       ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+       free(d);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}