io: io_always, and zero-length operations support.
authorRusty Russell <rusty@rustcorp.com.au>
Sat, 7 Dec 2013 07:10:47 +0000 (17:40 +1030)
committerRusty Russell <rusty@rustcorp.com.au>
Sat, 7 Dec 2013 07:11:44 +0000 (17:41 +1030)
A zero-length read should complete immediately, even if the fd isn't readable.
Wire this up, and expose it for callers to use.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ccan/io/backend.h
ccan/io/io.c
ccan/io/io.h
ccan/io/poll.c
ccan/io/test/run-18-errno.c
ccan/io/test/run-19-always-DEBUG.c [new file with mode: 0644]
ccan/io/test/run-19-always.c [new file with mode: 0644]

index 77d51dda9bf1825a6e02a6be85b3a0f3184c5804..e2090ff14068ebe0a34317941bb8efc6f7cb9a0a 100644 (file)
@@ -3,6 +3,10 @@
 #define CCAN_IO_BACKEND_H
 #include <stdbool.h>
 #include <ccan/timer/timer.h>
+#include <poll.h>
+
+/* A setting for actions to always run (eg. zero-length reads). */
+#define POLLALWAYS (((POLLIN|POLLOUT) + 1) & ~((POLLIN|POLLOUT)))
 
 struct io_alloc {
        void *(*alloc)(size_t size);
index faf8b87bfd649b7244e8ee01087651d1039dbbf5..734cb3939636acbf0922362915c5c804bcc2cb97 100644 (file)
@@ -8,7 +8,6 @@
 #include <errno.h>
 #include <stdlib.h>
 #include <assert.h>
-#include <poll.h>
 #include <unistd.h>
 #include <fcntl.h>
 
@@ -232,6 +231,26 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
        return true;
 }
 
+/* Always done: call the next thing. */
+static int do_always(int fd, struct io_plan *plan)
+{
+       return 1;
+}
+
+struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
+                         void *arg)
+{
+       struct io_plan plan;
+
+       assert(cb);
+       plan.io = do_always;
+       plan.next = cb;
+       plan.next_arg = arg;
+       plan.pollflag = POLLALWAYS;
+
+       return plan;
+}
+
 /* Returns true if we're finished. */
 static int do_write(int fd, struct io_plan *plan)
 {
@@ -252,6 +271,10 @@ struct io_plan io_write_(const void *data, size_t len,
        struct io_plan plan;
 
        assert(cb);
+
+       if (len == 0)
+               return io_always_(cb, arg);
+
        plan.u1.const_vp = data;
        plan.u2.s = len;
        plan.io = do_write;
@@ -281,11 +304,16 @@ struct io_plan io_read_(void *data, size_t len,
        struct io_plan plan;
 
        assert(cb);
+
+       if (len == 0)
+               return io_always_(cb, arg);
+
        plan.u1.cp = data;
        plan.u2.s = len;
        plan.io = do_read;
        plan.next = cb;
        plan.next_arg = arg;
+
        plan.pollflag = POLLIN;
 
        return plan;
@@ -309,6 +337,10 @@ struct io_plan io_read_partial_(void *data, size_t *len,
        struct io_plan plan;
 
        assert(cb);
+
+       if (*len == 0)
+               return io_always_(cb, arg);
+
        plan.u1.cp = data;
        plan.u2.vp = len;
        plan.io = do_read_partial;
@@ -337,6 +369,10 @@ struct io_plan io_write_partial_(const void *data, size_t *len,
        struct io_plan plan;
 
        assert(cb);
+
+       if (*len == 0)
+               return io_always_(cb, arg);
+
        plan.u1.const_vp = data;
        plan.u2.vp = len;
        plan.io = do_write_partial;
index 558a8769e1428355c254af670790d01f5ccda09e..bcdb11fdd0e0c1f1e9a08aac55a13e0afe888d92 100644 (file)
@@ -290,6 +290,29 @@ struct io_plan io_write_partial_(const void *data, size_t *len,
                                 struct io_plan (*cb)(struct io_conn *, void*),
                                 void *arg);
 
+/**
+ * io_always - plan to immediately call next callback.
+ * @cb: function to call.
+ * @arg: @cb argument
+ *
+ * Sometimes it's neater to plan a callback rather than call it directly;
+ * for example, if you only need to read data for one path and not another.
+ *
+ * Example:
+ * static void start_conn_with_nothing(int fd)
+ * {
+ *     // Silly example: close on next time around loop.
+ *     io_new_conn(fd, io_always(io_close_cb, NULL));
+ * }
+ */
+#define io_always(cb, arg)                                             \
+       io_debug(io_always_(typesafe_cb_preargs(struct io_plan, void *, \
+                                               (cb), (arg),            \
+                                               struct io_conn *),      \
+                           (arg)))
+struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
+                         void *arg);
+
 /**
  * io_connect - plan to connect to a listening socket.
  * @fd: file descriptor.
index 18691e17b4fc1c8a9247062be0fdaa5e84f9d170..d7b9eb56b950b5a1b4772525c86bb944b3a60c6f 100644 (file)
@@ -10,6 +10,7 @@
 #include <errno.h>
 
 static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0;
+static bool some_always = false;
 static struct pollfd *pollfds = NULL;
 static struct fd **fds = NULL;
 static struct timers timeouts;
@@ -146,9 +147,9 @@ void backend_plan_changed(struct io_conn *conn)
        if (pfd->events)
                num_waiting--;
 
-       pfd->events = conn->plan.pollflag;
+       pfd->events = conn->plan.pollflag & (POLLIN|POLLOUT);
        if (conn->duplex) {
-               int mask = conn->duplex->plan.pollflag;
+               int mask = conn->duplex->plan.pollflag & (POLLIN|POLLOUT);
                /* You can't *both* read/write. */
                assert(!mask || pfd->events != mask);
                pfd->events |= mask;
@@ -161,15 +162,20 @@ void backend_plan_changed(struct io_conn *conn)
 
        if (!conn->plan.next)
                num_closing++;
+
+       if (conn->plan.pollflag == POLLALWAYS)
+               some_always = true;
 }
 
 bool add_conn(struct io_conn *c)
 {
-       if (!add_fd(&c->fd, c->plan.pollflag))
+       if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT)))
                return false;
        /* Immediate close is allowed. */
        if (!c->plan.next)
                num_closing++;
+       if (c->plan.pollflag == POLLALWAYS)
+               some_always = true;
        return true;
 }
 
@@ -267,6 +273,26 @@ void backend_del_timeout(struct io_conn *conn)
        conn->timeout->conn = NULL;
 }
 
+static void handle_always(void)
+{
+       int i;
+
+       some_always = false;
+
+       for (i = 0; i < num_fds && !io_loop_return; i++) {
+               struct io_conn *c = (void *)fds[i];
+
+               if (fds[i]->listener)
+                       continue;
+
+               if (c->plan.pollflag == POLLALWAYS)
+                       io_ready(c);
+
+               if (c->duplex && c->duplex->plan.pollflag == POLLALWAYS)
+                       io_ready(c->duplex);
+       }
+}
+
 /* This is the main loop. */
 void *do_io_loop(struct io_conn **ready)
 {
@@ -317,6 +343,11 @@ void *do_io_loop(struct io_conn **ready)
                if (doing_debug() && some_timeouts)
                        continue;
 
+               if (some_always) {
+                       handle_always();
+                       continue;
+               }
+
                if (num_fds == 0)
                        break;
 
index 985a32297187a303cb5f92a0b9fe99635ffbde78..222c0fb58e3ee7ab05d9ae76bab97f8dfbabfae2 100644 (file)
@@ -36,7 +36,7 @@ static void init_conn(int fd, int *state)
                (*state)++;
                close(fd);
                errno = 0;
-               io_set_finish(io_new_conn(fd, io_read(state, 0,
+               io_set_finish(io_new_conn(fd, io_read(state, 1,
                                                      io_close_cb, NULL)),
                              finish_EBADF, state);
        }
diff --git a/ccan/io/test/run-19-always-DEBUG.c b/ccan/io/test/run-19-always-DEBUG.c
new file mode 100644 (file)
index 0000000..4decacd
--- /dev/null
@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64019"
+#define main real_main
+int real_main(void);
+#include "run-19-always.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
diff --git a/ccan/io/test/run-19-always.c b/ccan/io/test/run-19-always.c
new file mode 100644 (file)
index 0000000..e6413fc
--- /dev/null
@@ -0,0 +1,133 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65019"
+#endif
+
+struct data {
+       int state;
+       size_t bytes;
+       char *buf;
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+       ok1(d->state == 1);
+       d->state++;
+       io_break(d, io_idle());
+}
+
+static struct io_plan write_buf(struct io_conn *conn, struct data *d)
+{
+       return io_write(d->buf, d->bytes, io_close_cb, d);
+}
+
+static void init_conn(int fd, struct data *d)
+{
+       ok1(d->state == 0);
+       d->state++;
+       /* Empty read should run immediately... */
+       io_set_finish(io_new_conn(fd, io_read(NULL, 0, write_buf, d)),
+                     finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+       int fd, on = 1;
+       struct addrinfo *addrinfo, hints;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_protocol = 0;
+
+       if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+               return -1;
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               return -1;
+
+       setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+       if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+               close(fd);
+               return -1;
+       }
+       if (listen(fd, 1) != 0) {
+               close(fd);
+               return -1;
+       }
+       *info = addrinfo;
+       return fd;
+}
+
+static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo)
+{
+       int fd, done, r;
+       char buf[100];
+
+       fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+                   addrinfo->ai_protocol);
+       if (fd < 0)
+               exit(1);
+       if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+               exit(2);
+
+       for (done = 0; done < bytes; done += r) {
+               r = read(fd, buf, sizeof(buf));
+               if (r < 0)
+                       exit(3);
+               done += r;
+       }
+       close(fd);
+}
+
+int main(void)
+{
+       struct data *d = malloc(sizeof(*d));
+       struct addrinfo *addrinfo;
+       struct io_listener *l;
+       int fd, status;
+
+       /* This is how many tests you plan to run */
+       plan_tests(9);
+       d->state = 0;
+       d->bytes = 1024*1024;
+       d->buf = malloc(d->bytes);
+       memset(d->buf, 'a', d->bytes);
+       fd = make_listen_fd(PORT, &addrinfo);
+       ok1(fd >= 0);
+       l = io_new_listener(fd, init_conn, d);
+       ok1(l);
+       fflush(stdout);
+       if (!fork()) {
+               io_close_listener(l);
+               read_from_socket(d->bytes, addrinfo);
+               freeaddrinfo(addrinfo);
+               free(d->buf);
+               free(d);
+               exit(0);
+       }
+       ok1(io_loop() == d);
+       ok1(d->state == 2);
+
+       ok1(wait(&status));
+       ok1(WIFEXITED(status));
+       ok1(WEXITSTATUS(status) == 0);
+
+       freeaddrinfo(addrinfo);
+       free(d->buf);
+       free(d);
+       io_close_listener(l);
+
+       /* This exits depending on whether all tests passed */
+       return exit_status();
+}