A zero-length read should complete immediately, even if the fd isn't readable.
Wire this up, and expose it for callers to use.
Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
#define CCAN_IO_BACKEND_H
#include <stdbool.h>
#include <ccan/timer/timer.h>
+#include <poll.h>
+
+/* A setting for actions to always run (eg. zero-length reads). */
+#define POLLALWAYS (((POLLIN|POLLOUT) + 1) & ~((POLLIN|POLLOUT)))
struct io_alloc {
void *(*alloc)(size_t size);
#include <errno.h>
#include <stdlib.h>
#include <assert.h>
-#include <poll.h>
#include <unistd.h>
#include <fcntl.h>
return true;
}
+/* Always done: call the next thing. */
+static int do_always(int fd, struct io_plan *plan)
+{
+ return 1;
+}
+
+struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg)
+{
+ struct io_plan plan;
+
+ assert(cb);
+ plan.io = do_always;
+ plan.next = cb;
+ plan.next_arg = arg;
+ plan.pollflag = POLLALWAYS;
+
+ return plan;
+}
+
/* Returns true if we're finished. */
static int do_write(int fd, struct io_plan *plan)
{
struct io_plan plan;
assert(cb);
+
+ if (len == 0)
+ return io_always_(cb, arg);
+
plan.u1.const_vp = data;
plan.u2.s = len;
plan.io = do_write;
struct io_plan plan;
assert(cb);
+
+ if (len == 0)
+ return io_always_(cb, arg);
+
plan.u1.cp = data;
plan.u2.s = len;
plan.io = do_read;
plan.next = cb;
plan.next_arg = arg;
+
plan.pollflag = POLLIN;
return plan;
struct io_plan plan;
assert(cb);
+
+ if (*len == 0)
+ return io_always_(cb, arg);
+
plan.u1.cp = data;
plan.u2.vp = len;
plan.io = do_read_partial;
struct io_plan plan;
assert(cb);
+
+ if (*len == 0)
+ return io_always_(cb, arg);
+
plan.u1.const_vp = data;
plan.u2.vp = len;
plan.io = do_write_partial;
struct io_plan (*cb)(struct io_conn *, void*),
void *arg);
+/**
+ * io_always - plan to immediately call next callback.
+ * @cb: function to call.
+ * @arg: @cb argument
+ *
+ * Sometimes it's neater to plan a callback rather than call it directly;
+ * for example, if you only need to read data for one path and not another.
+ *
+ * Example:
+ * static void start_conn_with_nothing(int fd)
+ * {
+ * // Silly example: close on next time around loop.
+ * io_new_conn(fd, io_always(io_close_cb, NULL));
+ * }
+ */
+#define io_always(cb, arg) \
+ io_debug(io_always_(typesafe_cb_preargs(struct io_plan, void *, \
+ (cb), (arg), \
+ struct io_conn *), \
+ (arg)))
+struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg);
+
/**
* io_connect - plan to connect to a listening socket.
* @fd: file descriptor.
#include <errno.h>
static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0;
+static bool some_always = false;
static struct pollfd *pollfds = NULL;
static struct fd **fds = NULL;
static struct timers timeouts;
if (pfd->events)
num_waiting--;
- pfd->events = conn->plan.pollflag;
+ pfd->events = conn->plan.pollflag & (POLLIN|POLLOUT);
if (conn->duplex) {
- int mask = conn->duplex->plan.pollflag;
+ int mask = conn->duplex->plan.pollflag & (POLLIN|POLLOUT);
/* You can't *both* read/write. */
assert(!mask || pfd->events != mask);
pfd->events |= mask;
if (!conn->plan.next)
num_closing++;
+
+ if (conn->plan.pollflag == POLLALWAYS)
+ some_always = true;
}
bool add_conn(struct io_conn *c)
{
- if (!add_fd(&c->fd, c->plan.pollflag))
+ if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT)))
return false;
/* Immediate close is allowed. */
if (!c->plan.next)
num_closing++;
+ if (c->plan.pollflag == POLLALWAYS)
+ some_always = true;
return true;
}
conn->timeout->conn = NULL;
}
+static void handle_always(void)
+{
+ int i;
+
+ some_always = false;
+
+ for (i = 0; i < num_fds && !io_loop_return; i++) {
+ struct io_conn *c = (void *)fds[i];
+
+ if (fds[i]->listener)
+ continue;
+
+ if (c->plan.pollflag == POLLALWAYS)
+ io_ready(c);
+
+ if (c->duplex && c->duplex->plan.pollflag == POLLALWAYS)
+ io_ready(c->duplex);
+ }
+}
+
/* This is the main loop. */
void *do_io_loop(struct io_conn **ready)
{
if (doing_debug() && some_timeouts)
continue;
+ if (some_always) {
+ handle_always();
+ continue;
+ }
+
if (num_fds == 0)
break;
(*state)++;
close(fd);
errno = 0;
- io_set_finish(io_new_conn(fd, io_read(state, 0,
+ io_set_finish(io_new_conn(fd, io_read(state, 1,
io_close_cb, NULL)),
finish_EBADF, state);
}
--- /dev/null
+#define DEBUG
+#define PORT "64019"
+#define main real_main
+int real_main(void);
+#include "run-19-always.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65019"
+#endif
+
+struct data {
+ int state;
+ size_t bytes;
+ char *buf;
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+ ok1(d->state == 1);
+ d->state++;
+ io_break(d, io_idle());
+}
+
+static struct io_plan write_buf(struct io_conn *conn, struct data *d)
+{
+ return io_write(d->buf, d->bytes, io_close_cb, d);
+}
+
+static void init_conn(int fd, struct data *d)
+{
+ ok1(d->state == 0);
+ d->state++;
+ /* Empty read should run immediately... */
+ io_set_finish(io_new_conn(fd, io_read(NULL, 0, write_buf, d)),
+ finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo)
+{
+ int fd, done, r;
+ char buf[100];
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+
+ for (done = 0; done < bytes; done += r) {
+ r = read(fd, buf, sizeof(buf));
+ if (r < 0)
+ exit(3);
+ done += r;
+ }
+ close(fd);
+}
+
+int main(void)
+{
+ struct data *d = malloc(sizeof(*d));
+ struct addrinfo *addrinfo;
+ struct io_listener *l;
+ int fd, status;
+
+ /* This is how many tests you plan to run */
+ plan_tests(9);
+ d->state = 0;
+ d->bytes = 1024*1024;
+ d->buf = malloc(d->bytes);
+ memset(d->buf, 'a', d->bytes);
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ l = io_new_listener(fd, init_conn, d);
+ ok1(l);
+ fflush(stdout);
+ if (!fork()) {
+ io_close_listener(l);
+ read_from_socket(d->bytes, addrinfo);
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ exit(0);
+ }
+ ok1(io_loop() == d);
+ ok1(d->state == 2);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ freeaddrinfo(addrinfo);
+ free(d->buf);
+ free(d);
+ io_close_listener(l);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}