From 580457bd3e4af60d5367412589d2aa1bb0289eed Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Fri, 1 Mar 2019 12:13:35 +1030 Subject: [PATCH] ccan/io: add io_conn_exclusive and io_conn_out_exclusive. There are cases where we want to suppress all activity except for a single fd; we already have ugly io_flush_sync, but this is more useful and more general. Signed-off-by: Rusty Russell --- ccan/io/backend.h | 3 + ccan/io/io.c | 10 ++ ccan/io/io.h | 29 ++++ ccan/io/poll.c | 116 +++++++++++++-- ccan/io/test/run-41-io_poll_override.c | 2 +- ccan/io/test/run-46-exclusive.c | 142 ++++++++++++++++++ ccan/io/test/run-47-exclusive-duplex.c | 144 +++++++++++++++++++ ccan/io/test/run-48-exclusive-duplex-write.c | 144 +++++++++++++++++++ 8 files changed, 577 insertions(+), 13 deletions(-) create mode 100644 ccan/io/test/run-46-exclusive.c create mode 100644 ccan/io/test/run-47-exclusive-duplex.c create mode 100644 ccan/io/test/run-48-exclusive-duplex-write.c diff --git a/ccan/io/backend.h b/ccan/io/backend.h index d877e44e..714972d1 100644 --- a/ccan/io/backend.h +++ b/ccan/io/backend.h @@ -8,6 +8,8 @@ struct fd { int fd; bool listener; + /* We could put these in io_plan, but they pack nicely here */ + bool exclusive[2]; size_t backend_info; }; @@ -76,6 +78,7 @@ void cleanup_conn_without_close(struct io_conn *c); bool backend_new_always(struct io_plan *plan); void backend_new_plan(struct io_conn *conn); void backend_plan_done(struct io_conn *conn); +bool backend_set_exclusive(struct io_plan *plan, bool exclusive); void backend_wake(const void *wait); diff --git a/ccan/io/io.c b/ccan/io/io.c index 7ebd5a20..36dcb81e 100644 --- a/ccan/io/io.c +++ b/ccan/io/io.c @@ -119,6 +119,16 @@ struct io_conn *io_new_conn_(const tal_t *ctx, int fd, return conn; } +bool io_conn_exclusive(struct io_conn *conn, bool exclusive) +{ + return backend_set_exclusive(&conn->plan[IO_IN], exclusive); +} + +bool io_conn_out_exclusive(struct io_conn *conn, bool exclusive) +{ + return backend_set_exclusive(&conn->plan[IO_OUT], exclusive); +} + void io_set_finish_(struct io_conn *conn, void (*finish)(struct io_conn *, void *), void *arg) diff --git a/ccan/io/io.h b/ccan/io/io.h index 510ee0be..a93c6ed4 100644 --- a/ccan/io/io.h +++ b/ccan/io/io.h @@ -722,6 +722,35 @@ bool io_plan_out_started(const struct io_conn *conn); */ bool io_flush_sync(struct io_conn *conn); +/** + * io_conn_exclusive - set/unset an io_conn to exclusively serviced + * @conn: the connection + * @exclusive: whether to be exclusive or not + * + * If any io_conn is set exclusive, then no non-exclusive io_conn (or + * io_listener) will be serviced by io_loop(). If it's a io_duplex io_conn(), + * then io_conn_exclusive() makes the read-side exclusive; io_conn_out_exclusive() + * makes the write-side exclusive. + * + * This allows you to temporarily service only one (or several) fds. + * For example, you might want to flush out one io_conn and not + * receive any new connections or read any otherninput. + * + * Returns true of there any exclusive io_conn remain, otherwise false. + * (This is useful for checking your own logic: dangling exclusive io_conn + * are dangerous!). + */ +bool io_conn_exclusive(struct io_conn *conn, bool exclusive); + +/** + * io_conn_out_exclusive - set/unset exclusive on the write-side of a duplex + * @conn: the connection, post io_duplex + * @exclusive: whether to be exclusive or not + * + * See io_conn_exclusive() above. + */ +bool io_conn_out_exclusive(struct io_conn *conn, bool exclusive); + /** * io_fd_block - helper to set an fd blocking/nonblocking. * @fd: the file descriptor diff --git a/ccan/io/poll.c b/ccan/io/poll.c index b44f1256..95b61032 100644 --- a/ccan/io/poll.c +++ b/ccan/io/poll.c @@ -11,7 +11,7 @@ #include #include -static size_t num_fds = 0, max_fds = 0, num_waiting = 0, num_always = 0, max_always = 0; +static size_t num_fds = 0, max_fds = 0, num_waiting = 0, num_always = 0, max_always = 0, num_exclusive = 0; static struct pollfd *pollfds = NULL; static struct fd **fds = NULL; static struct io_plan **always = NULL; @@ -64,6 +64,7 @@ static bool add_fd(struct fd *fd, short events) pollfds[num_fds].revents = 0; /* In case we're iterating now */ fds[num_fds] = fd; fd->backend_info = num_fds; + fd->exclusive[0] = fd->exclusive[1] = false; num_fds++; if (events) num_waiting++; @@ -93,6 +94,11 @@ static void del_fd(struct fd *fd) } num_fds--; fd->backend_info = -1; + + if (fd->exclusive[IO_IN]) + num_exclusive--; + if (fd->exclusive[IO_OUT]) + num_exclusive--; } static void destroy_listener(struct io_listener *l) @@ -157,12 +163,9 @@ bool backend_new_always(struct io_plan *plan) return true; } -void backend_new_plan(struct io_conn *conn) +static void setup_pfd(struct io_conn *conn, struct pollfd *pfd) { - struct pollfd *pfd = &pollfds[conn->fd.backend_info]; - - if (pfd->events) - num_waiting--; + assert(pfd == &pollfds[conn->fd.backend_info]); pfd->events = 0; if (conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED @@ -173,13 +176,25 @@ void backend_new_plan(struct io_conn *conn) pfd->events |= POLLOUT; if (pfd->events) { - num_waiting++; pfd->fd = conn->fd.fd; } else { pfd->fd = -conn->fd.fd - 1; } } +void backend_new_plan(struct io_conn *conn) +{ + struct pollfd *pfd = &pollfds[conn->fd.backend_info]; + + if (pfd->events) + num_waiting--; + + setup_pfd(conn, pfd); + + if (pfd->events) + num_waiting++; +} + void backend_wake(const void *wait) { unsigned int i; @@ -250,18 +265,88 @@ static void accept_conn(struct io_listener *l) io_new_conn(l->ctx, fd, l->init, l->arg); } +/* Return pointer to exclusive flag for this plan. */ +static bool *exclusive(struct io_plan *plan) +{ + struct io_conn *conn; + + conn = container_of(plan, struct io_conn, plan[plan->dir]); + return &conn->fd.exclusive[plan->dir]; +} + +/* For simplicity, we do one always at a time */ static bool handle_always(void) { - bool ret = false; + /* Backwards is simple easier to remove entries */ + for (int i = num_always - 1; i >= 0; i--) { + struct io_plan *plan = always[i]; - while (num_always > 0) { + if (num_exclusive && !*exclusive(plan)) + continue; /* Remove first: it might re-add */ - struct io_plan *plan = always[num_always-1]; + if (i != num_always-1) + always[i] = always[num_always-1]; num_always--; io_do_always(plan); - ret = true; + return true; + } + + return false; +} + +bool backend_set_exclusive(struct io_plan *plan, bool excl) +{ + bool *excl_ptr = exclusive(plan); + + if (excl != *excl_ptr) { + *excl_ptr = excl; + if (!excl) + num_exclusive--; + else + num_exclusive++; + } + + return num_exclusive != 0; +} + +/* FIXME: We could do this once at set_exclusive time, and catch everywhere + * else that we manipulate events. */ +static void exclude_pollfds(void) +{ + if (num_exclusive == 0) + return; + + for (size_t i = 0; i < num_fds; i++) { + struct pollfd *pfd = &pollfds[fds[i]->backend_info]; + + if (!fds[i]->exclusive[IO_IN]) + pfd->events &= ~POLLIN; + if (!fds[i]->exclusive[IO_OUT]) + pfd->events &= ~POLLOUT; + + /* If we're not listening, we don't want error events + * either. */ + if (!pfd->events) + pfd->fd = -fds[i]->fd - 1; + } +} + +static void restore_pollfds(void) +{ + if (num_exclusive == 0) + return; + + for (size_t i = 0; i < num_fds; i++) { + struct pollfd *pfd = &pollfds[fds[i]->backend_info]; + + if (fds[i]->listener) { + pfd->events = POLLIN; + pfd->fd = fds[i]->fd; + } else { + struct io_conn *conn = (void *)fds[i]; + setup_pfd(conn, pfd); + } } - return ret; } /* This is the main loop. */ @@ -312,7 +397,11 @@ void *io_loop(struct timers *timers, struct timer **expired) } } + /* We do this temporarily, assuming exclusive is unusual */ + exclude_pollfds(); r = pollfn(pollfds, num_fds, ms_timeout); + restore_pollfds(); + if (r < 0) { /* Signals shouldn't break us, unless they set * io_loop_return. */ @@ -325,6 +414,9 @@ void *io_loop(struct timers *timers, struct timer **expired) struct io_conn *c = (void *)fds[i]; int events = pollfds[i].revents; + /* Clear so we don't get confused if exclusive next time */ + pollfds[i].revents = 0; + if (r == 0) break; diff --git a/ccan/io/test/run-41-io_poll_override.c b/ccan/io/test/run-41-io_poll_override.c index 0a62e2d3..146faeb5 100644 --- a/ccan/io/test/run-41-io_poll_override.c +++ b/ccan/io/test/run-41-io_poll_override.c @@ -6,7 +6,7 @@ #include #include -#define PORT "65020" +#define PORT "65041" /* Should be looking to read from one fd. */ static int mypoll(struct pollfd *fds, nfds_t nfds, int timeout) diff --git a/ccan/io/test/run-46-exclusive.c b/ccan/io/test/run-46-exclusive.c new file mode 100644 index 00000000..6a9de017 --- /dev/null +++ b/ccan/io/test/run-46-exclusive.c @@ -0,0 +1,142 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +#define PORT "65046" + +struct data { + struct io_listener *l; + int num_clients; + char *pattern; + char buf[30]; + size_t buflen; +}; + +static struct io_plan *read_more(struct io_conn *conn, struct data *d); + +static struct io_plan *read_done(struct io_conn *conn, struct data *d) +{ + tal_resize(&d->pattern, tal_count(d->pattern) + strlen(d->buf)); + strcat(d->pattern, d->buf); + return read_more(conn, d); +} + +static struct io_plan *read_more(struct io_conn *conn, struct data *d) +{ + memset(d->buf, 0, sizeof(d->buf)); + return io_read_partial(conn, d->buf, sizeof(d->buf), &d->buflen, + read_done, d); +} + + +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) +{ + d->num_clients++; + if (d->num_clients == 2) { + /* Free listener so when conns close we exit io_loop */ + io_close_listener(d->l); + /* Set priority to second connection. */ + ok1(io_conn_exclusive(conn, true) == true); + } + return read_more(conn, d); +} + + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +int main(void) +{ + struct addrinfo *addrinfo = NULL; + int fd, status; + struct data d; + + d.num_clients = 0; + + /* This is how many tests you plan to run */ + plan_tests(8); + fd = make_listen_fd(PORT, &addrinfo); + ok1(fd >= 0); + d.l = io_new_listener(NULL, fd, init_conn, &d); + ok1(d.l); + fflush(stdout); + + if (!fork()) { + int fd1, fd2; + + io_close_listener(d.l); + fd1 = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd1 < 0) + exit(1); + if (connect(fd1, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + if (write(fd1, "1hellothere", strlen("1hellothere")) != strlen("1hellothere")) + exit(3); + fd2 = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd2 < 0) + exit(1); + if (connect(fd2, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + signal(SIGPIPE, SIG_IGN); + + sleep(1); + if (write(fd1, "1helloagain", strlen("1helloagain")) != strlen("1helloagain")) + exit(4); + sleep(1); + if (write(fd2, "2hellonew", strlen("2hellonew")) != strlen("2hellonew")) + exit(5); + close(fd1); + close(fd2); + freeaddrinfo(addrinfo); + exit(0); + } + freeaddrinfo(addrinfo); + + d.pattern = tal_arrz(NULL, char, 1); + ok1(io_loop(NULL, NULL) == NULL); + if (!ok1(strcmp(d.pattern, "1hellothere2hellonew1helloagain") == 0)) + printf("d.patterns = %s\n", d.pattern); + tal_free(d.pattern); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} diff --git a/ccan/io/test/run-47-exclusive-duplex.c b/ccan/io/test/run-47-exclusive-duplex.c new file mode 100644 index 00000000..60e6de41 --- /dev/null +++ b/ccan/io/test/run-47-exclusive-duplex.c @@ -0,0 +1,144 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +#define PORT "65047" + +struct data { + struct io_listener *l; + char *pattern; + char buf[30]; + size_t buflen; +}; + +static struct io_plan *read_more(struct io_conn *conn, struct data *d); +static struct io_plan *write_more(struct io_conn *conn, struct data *d); + +static struct io_plan *read_done(struct io_conn *conn, struct data *d) +{ + tal_resize(&d->pattern, tal_count(d->pattern) + 1 + strlen(d->buf)); + strcat(d->pattern, "<"); + strcat(d->pattern, d->buf); + return read_more(conn, d); +} + +static struct io_plan *read_more(struct io_conn *conn, struct data *d) +{ + memset(d->buf, 0, sizeof(d->buf)); + return io_read_partial(conn, d->buf, sizeof(d->buf), &d->buflen, + read_done, d); +} + +static struct io_plan *write_done(struct io_conn *conn, struct data *d) +{ + tal_resize(&d->pattern, tal_count(d->pattern) + 1); + strcat(d->pattern, ">"); + return write_more(conn, d); +} + +static struct io_plan *write_more(struct io_conn *conn, struct data *d) +{ + return io_write_partial(conn, d->buf, 1, &d->buflen, + write_done, d); +} + +static struct io_plan *read_priority_init(struct io_conn *conn, struct data *d) +{ + /* This should suppress the write */ + ok1(io_conn_exclusive(conn, true)); + return read_more(conn, d); +} + +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) +{ + /* Free listener so when conns close we exit io_loop */ + io_close_listener(d->l); + + return io_duplex(conn, read_priority_init(conn, d), write_more(conn, d)); +} + + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +int main(void) +{ + struct addrinfo *addrinfo = NULL; + int fd, status; + struct data d; + + /* This is how many tests you plan to run */ + plan_tests(8); + fd = make_listen_fd(PORT, &addrinfo); + ok1(fd >= 0); + d.l = io_new_listener(NULL, fd, init_conn, &d); + ok1(d.l); + fflush(stdout); + + if (!fork()) { + io_close_listener(d.l); + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + signal(SIGPIPE, SIG_IGN); + + if (write(fd, "1hellothere", strlen("1hellothere")) != strlen("1hellothere")) + exit(3); + sleep(1); + if (write(fd, "1helloagain", strlen("1helloagain")) != strlen("1helloagain")) + exit(4); + close(fd); + freeaddrinfo(addrinfo); + exit(0); + } + freeaddrinfo(addrinfo); + + d.pattern = tal_arrz(NULL, char, 1); + ok1(io_loop(NULL, NULL) == NULL); + /* No trace of writes */ + ok1(strcmp(d.pattern, "<1hellothere<1helloagain") == 0); + tal_free(d.pattern); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} diff --git a/ccan/io/test/run-48-exclusive-duplex-write.c b/ccan/io/test/run-48-exclusive-duplex-write.c new file mode 100644 index 00000000..897f83f7 --- /dev/null +++ b/ccan/io/test/run-48-exclusive-duplex-write.c @@ -0,0 +1,144 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include + +#define PORT "65048" + +struct data { + struct io_listener *l; + char *pattern; + char buf[30]; + size_t buflen; +}; + +static struct io_plan *read_more(struct io_conn *conn, struct data *d); +static struct io_plan *write_more(struct io_conn *conn, struct data *d); + +static struct io_plan *read_done(struct io_conn *conn, struct data *d) +{ + tal_resize(&d->pattern, tal_count(d->pattern) + 1 + strlen(d->buf)); + strcat(d->pattern, "<"); + strcat(d->pattern, d->buf); + return read_more(conn, d); +} + +static struct io_plan *read_more(struct io_conn *conn, struct data *d) +{ + memset(d->buf, 0, sizeof(d->buf)); + return io_read_partial(conn, d->buf, sizeof(d->buf), &d->buflen, + read_done, d); +} + +static struct io_plan *write_done(struct io_conn *conn, struct data *d) +{ + tal_resize(&d->pattern, tal_count(d->pattern) + 1); + strcat(d->pattern, ">"); + return write_more(conn, d); +} + +static struct io_plan *write_more(struct io_conn *conn, struct data *d) +{ + return io_write_partial(conn, d->buf, 1, &d->buflen, + write_done, d); +} + +static struct io_plan *write_priority_init(struct io_conn *conn, struct data *d) +{ + /* This should suppress the read */ + ok1(io_conn_out_exclusive(conn, true)); + return write_more(conn, d); +} + +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) +{ + /* Free listener so when conns close we exit io_loop */ + io_close_listener(d->l); + + return io_duplex(conn, read_more(conn, d), write_priority_init(conn, d)); +} + + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +int main(void) +{ + struct addrinfo *addrinfo = NULL; + int fd, status; + struct data d; + + /* This is how many tests you plan to run */ + plan_tests(8); + fd = make_listen_fd(PORT, &addrinfo); + ok1(fd >= 0); + d.l = io_new_listener(NULL, fd, init_conn, &d); + ok1(d.l); + fflush(stdout); + + if (!fork()) { + io_close_listener(d.l); + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + signal(SIGPIPE, SIG_IGN); + + if (write(fd, "1hellothere", strlen("1hellothere")) != strlen("1hellothere")) + exit(3); + sleep(1); + if (write(fd, "1helloagain", strlen("1helloagain")) != strlen("1helloagain")) + exit(4); + close(fd); + freeaddrinfo(addrinfo); + exit(0); + } + freeaddrinfo(addrinfo); + + d.pattern = tal_arrz(NULL, char, 1); + ok1(io_loop(NULL, NULL) == NULL); + /* No trace of reads */ + ok1(strspn(d.pattern, ">") == strlen(d.pattern)); + tal_free(d.pattern); + + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) == 0); + + /* This exits depending on whether all tests passed */ + return exit_status(); +} -- 2.39.2