From: Rusty Russell Date: Sat, 7 Dec 2013 07:10:47 +0000 (+1030) Subject: io: io_always, and zero-length operations support. X-Git-Url: http://git.ozlabs.org/?p=ccan;a=commitdiff_plain;h=12ab811533406e22b78ce8b227474f1056375ae4 io: io_always, and zero-length operations support. A zero-length read should complete immediately, even if the fd isn't readable. Wire this up, and expose it for callers to use. Signed-off-by: Rusty Russell --- diff --git a/ccan/io/backend.h b/ccan/io/backend.h index 77d51dda..e2090ff1 100644 --- a/ccan/io/backend.h +++ b/ccan/io/backend.h @@ -3,6 +3,10 @@ #define CCAN_IO_BACKEND_H #include #include +#include + +/* A setting for actions to always run (eg. zero-length reads). */ +#define POLLALWAYS (((POLLIN|POLLOUT) + 1) & ~((POLLIN|POLLOUT))) struct io_alloc { void *(*alloc)(size_t size); diff --git a/ccan/io/io.c b/ccan/io/io.c index faf8b87b..734cb393 100644 --- a/ccan/io/io.c +++ b/ccan/io/io.c @@ -8,7 +8,6 @@ #include #include #include -#include #include #include @@ -232,6 +231,26 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts, return true; } +/* Always done: call the next thing. */ +static int do_always(int fd, struct io_plan *plan) +{ + return 1; +} + +struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *), + void *arg) +{ + struct io_plan plan; + + assert(cb); + plan.io = do_always; + plan.next = cb; + plan.next_arg = arg; + plan.pollflag = POLLALWAYS; + + return plan; +} + /* Returns true if we're finished. */ static int do_write(int fd, struct io_plan *plan) { @@ -252,6 +271,10 @@ struct io_plan io_write_(const void *data, size_t len, struct io_plan plan; assert(cb); + + if (len == 0) + return io_always_(cb, arg); + plan.u1.const_vp = data; plan.u2.s = len; plan.io = do_write; @@ -281,11 +304,16 @@ struct io_plan io_read_(void *data, size_t len, struct io_plan plan; assert(cb); + + if (len == 0) + return io_always_(cb, arg); + plan.u1.cp = data; plan.u2.s = len; plan.io = do_read; plan.next = cb; plan.next_arg = arg; + plan.pollflag = POLLIN; return plan; @@ -309,6 +337,10 @@ struct io_plan io_read_partial_(void *data, size_t *len, struct io_plan plan; assert(cb); + + if (*len == 0) + return io_always_(cb, arg); + plan.u1.cp = data; plan.u2.vp = len; plan.io = do_read_partial; @@ -337,6 +369,10 @@ struct io_plan io_write_partial_(const void *data, size_t *len, struct io_plan plan; assert(cb); + + if (*len == 0) + return io_always_(cb, arg); + plan.u1.const_vp = data; plan.u2.vp = len; plan.io = do_write_partial; diff --git a/ccan/io/io.h b/ccan/io/io.h index 558a8769..bcdb11fd 100644 --- a/ccan/io/io.h +++ b/ccan/io/io.h @@ -290,6 +290,29 @@ struct io_plan io_write_partial_(const void *data, size_t *len, struct io_plan (*cb)(struct io_conn *, void*), void *arg); +/** + * io_always - plan to immediately call next callback. + * @cb: function to call. + * @arg: @cb argument + * + * Sometimes it's neater to plan a callback rather than call it directly; + * for example, if you only need to read data for one path and not another. + * + * Example: + * static void start_conn_with_nothing(int fd) + * { + * // Silly example: close on next time around loop. + * io_new_conn(fd, io_always(io_close_cb, NULL)); + * } + */ +#define io_always(cb, arg) \ + io_debug(io_always_(typesafe_cb_preargs(struct io_plan, void *, \ + (cb), (arg), \ + struct io_conn *), \ + (arg))) +struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *), + void *arg); + /** * io_connect - plan to connect to a listening socket. * @fd: file descriptor. diff --git a/ccan/io/poll.c b/ccan/io/poll.c index 18691e17..d7b9eb56 100644 --- a/ccan/io/poll.c +++ b/ccan/io/poll.c @@ -10,6 +10,7 @@ #include static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0; +static bool some_always = false; static struct pollfd *pollfds = NULL; static struct fd **fds = NULL; static struct timers timeouts; @@ -146,9 +147,9 @@ void backend_plan_changed(struct io_conn *conn) if (pfd->events) num_waiting--; - pfd->events = conn->plan.pollflag; + pfd->events = conn->plan.pollflag & (POLLIN|POLLOUT); if (conn->duplex) { - int mask = conn->duplex->plan.pollflag; + int mask = conn->duplex->plan.pollflag & (POLLIN|POLLOUT); /* You can't *both* read/write. */ assert(!mask || pfd->events != mask); pfd->events |= mask; @@ -161,15 +162,20 @@ void backend_plan_changed(struct io_conn *conn) if (!conn->plan.next) num_closing++; + + if (conn->plan.pollflag == POLLALWAYS) + some_always = true; } bool add_conn(struct io_conn *c) { - if (!add_fd(&c->fd, c->plan.pollflag)) + if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT))) return false; /* Immediate close is allowed. */ if (!c->plan.next) num_closing++; + if (c->plan.pollflag == POLLALWAYS) + some_always = true; return true; } @@ -267,6 +273,26 @@ void backend_del_timeout(struct io_conn *conn) conn->timeout->conn = NULL; } +static void handle_always(void) +{ + int i; + + some_always = false; + + for (i = 0; i < num_fds && !io_loop_return; i++) { + struct io_conn *c = (void *)fds[i]; + + if (fds[i]->listener) + continue; + + if (c->plan.pollflag == POLLALWAYS) + io_ready(c); + + if (c->duplex && c->duplex->plan.pollflag == POLLALWAYS) + io_ready(c->duplex); + } +} + /* This is the main loop. */ void *do_io_loop(struct io_conn **ready) { @@ -317,6 +343,11 @@ void *do_io_loop(struct io_conn **ready) if (doing_debug() && some_timeouts) continue; + if (some_always) { + handle_always(); + continue; + } + if (num_fds == 0) break; diff --git a/ccan/io/test/run-18-errno.c b/ccan/io/test/run-18-errno.c index 985a3229..222c0fb5 100644 --- a/ccan/io/test/run-18-errno.c +++ b/ccan/io/test/run-18-errno.c @@ -36,7 +36,7 @@ static void init_conn(int fd, int *state) (*state)++; close(fd); errno = 0; - io_set_finish(io_new_conn(fd, io_read(state, 0, + io_set_finish(io_new_conn(fd, io_read(state, 1, io_close_cb, NULL)), finish_EBADF, state); } diff --git a/ccan/io/test/run-19-always-DEBUG.c b/ccan/io/test/run-19-always-DEBUG.c new file mode 100644 index 00000000..4decacd8 --- /dev/null +++ b/ccan/io/test/run-19-always-DEBUG.c @@ -0,0 +1,8 @@ +#define DEBUG +#define PORT "64019" +#define main real_main +int real_main(void); +#include "run-19-always.c" +#undef main +static bool always_debug(struct io_conn *conn) { return true; } +int main(void) { io_debug_conn = always_debug; return real_main(); } diff --git a/ccan/io/test/run-19-always.c b/ccan/io/test/run-19-always.c new file mode 100644 index 00000000..e6413fc9 --- /dev/null +++ b/ccan/io/test/run-19-always.c @@ -0,0 +1,133 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +#ifndef PORT +#define PORT "65019" +#endif + +struct data { + int state; + size_t bytes; + char *buf; +}; + +static void finish_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 1); + d->state++; + io_break(d, io_idle()); +} + +static struct io_plan write_buf(struct io_conn *conn, struct data *d) +{ + return io_write(d->buf, d->bytes, io_close_cb, d); +} + +static void init_conn(int fd, struct data *d) +{ + ok1(d->state == 0); + d->state++; + /* Empty read should run immediately... */ + io_set_finish(io_new_conn(fd, io_read(NULL, 0, write_buf, d)), + finish_ok, d); +} + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo) +{ + int fd, done, r; + char buf[100]; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + + for (done = 0; done < bytes; done += r) { + r = read(fd, buf, sizeof(buf)); + if (r < 0) + exit(3); + done += r; + } + close(fd); +} + +int main(void) +{ + struct data *d = malloc(sizeof(*d)); + struct addrinfo *addrinfo; + struct io_listener *l; + int fd, status; + + /* This is how many tests you plan to run */ + plan_tests(9); + d->state = 0; + d->bytes = 1024*1024; + d->buf = malloc(d->bytes); + memset(d->buf, 'a', d->bytes); + fd = make_listen_fd(PORT, &addrinfo); + ok1(fd >= 0); + l = io_new_listener(fd, init_conn, d); + ok1(l); + fflush(stdout); + if (!fork()) { + io_close_listener(l); + read_from_socket(d->bytes, addrinfo); + freeaddrinfo(addrinfo); + free(d->buf); + free(d); + exit(0); + } + ok1(io_loop() == d); + ok1(d->state == 2); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + freeaddrinfo(addrinfo); + free(d->buf); + free(d); + io_close_listener(l); + + /* This exits depending on whether all tests passed */ + return exit_status(); +}