struct fd {
int fd;
bool listener;
+ /* We could put these in io_plan, but they pack nicely here */
+ bool exclusive[2];
size_t backend_info;
};
bool backend_new_always(struct io_plan *plan);
void backend_new_plan(struct io_conn *conn);
void backend_plan_done(struct io_conn *conn);
+bool backend_set_exclusive(struct io_plan *plan, bool exclusive);
void backend_wake(const void *wait);
return conn;
}
+bool io_conn_exclusive(struct io_conn *conn, bool exclusive)
+{
+ return backend_set_exclusive(&conn->plan[IO_IN], exclusive);
+}
+
+bool io_conn_out_exclusive(struct io_conn *conn, bool exclusive)
+{
+ return backend_set_exclusive(&conn->plan[IO_OUT], exclusive);
+}
+
void io_set_finish_(struct io_conn *conn,
void (*finish)(struct io_conn *, void *),
void *arg)
*/
bool io_flush_sync(struct io_conn *conn);
+/**
+ * io_conn_exclusive - set/unset an io_conn to exclusively serviced
+ * @conn: the connection
+ * @exclusive: whether to be exclusive or not
+ *
+ * If any io_conn is set exclusive, then no non-exclusive io_conn (or
+ * io_listener) will be serviced by io_loop(). If it's a io_duplex io_conn(),
+ * then io_conn_exclusive() makes the read-side exclusive; io_conn_out_exclusive()
+ * makes the write-side exclusive.
+ *
+ * This allows you to temporarily service only one (or several) fds.
+ * For example, you might want to flush out one io_conn and not
+ * receive any new connections or read any otherninput.
+ *
+ * Returns true of there any exclusive io_conn remain, otherwise false.
+ * (This is useful for checking your own logic: dangling exclusive io_conn
+ * are dangerous!).
+ */
+bool io_conn_exclusive(struct io_conn *conn, bool exclusive);
+
+/**
+ * io_conn_out_exclusive - set/unset exclusive on the write-side of a duplex
+ * @conn: the connection, post io_duplex
+ * @exclusive: whether to be exclusive or not
+ *
+ * See io_conn_exclusive() above.
+ */
+bool io_conn_out_exclusive(struct io_conn *conn, bool exclusive);
+
/**
* io_fd_block - helper to set an fd blocking/nonblocking.
* @fd: the file descriptor
#include <ccan/time/time.h>
#include <ccan/timer/timer.h>
-static size_t num_fds = 0, max_fds = 0, num_waiting = 0, num_always = 0, max_always = 0;
+static size_t num_fds = 0, max_fds = 0, num_waiting = 0, num_always = 0, max_always = 0, num_exclusive = 0;
static struct pollfd *pollfds = NULL;
static struct fd **fds = NULL;
static struct io_plan **always = NULL;
pollfds[num_fds].revents = 0; /* In case we're iterating now */
fds[num_fds] = fd;
fd->backend_info = num_fds;
+ fd->exclusive[0] = fd->exclusive[1] = false;
num_fds++;
if (events)
num_waiting++;
}
num_fds--;
fd->backend_info = -1;
+
+ if (fd->exclusive[IO_IN])
+ num_exclusive--;
+ if (fd->exclusive[IO_OUT])
+ num_exclusive--;
}
static void destroy_listener(struct io_listener *l)
return true;
}
-void backend_new_plan(struct io_conn *conn)
+static void setup_pfd(struct io_conn *conn, struct pollfd *pfd)
{
- struct pollfd *pfd = &pollfds[conn->fd.backend_info];
-
- if (pfd->events)
- num_waiting--;
+ assert(pfd == &pollfds[conn->fd.backend_info]);
pfd->events = 0;
if (conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED
pfd->events |= POLLOUT;
if (pfd->events) {
- num_waiting++;
pfd->fd = conn->fd.fd;
} else {
pfd->fd = -conn->fd.fd - 1;
}
}
+void backend_new_plan(struct io_conn *conn)
+{
+ struct pollfd *pfd = &pollfds[conn->fd.backend_info];
+
+ if (pfd->events)
+ num_waiting--;
+
+ setup_pfd(conn, pfd);
+
+ if (pfd->events)
+ num_waiting++;
+}
+
void backend_wake(const void *wait)
{
unsigned int i;
io_new_conn(l->ctx, fd, l->init, l->arg);
}
+/* Return pointer to exclusive flag for this plan. */
+static bool *exclusive(struct io_plan *plan)
+{
+ struct io_conn *conn;
+
+ conn = container_of(plan, struct io_conn, plan[plan->dir]);
+ return &conn->fd.exclusive[plan->dir];
+}
+
+/* For simplicity, we do one always at a time */
static bool handle_always(void)
{
- bool ret = false;
+ /* Backwards is simple easier to remove entries */
+ for (int i = num_always - 1; i >= 0; i--) {
+ struct io_plan *plan = always[i];
- while (num_always > 0) {
+ if (num_exclusive && !*exclusive(plan))
+ continue;
/* Remove first: it might re-add */
- struct io_plan *plan = always[num_always-1];
+ if (i != num_always-1)
+ always[i] = always[num_always-1];
num_always--;
io_do_always(plan);
- ret = true;
+ return true;
+ }
+
+ return false;
+}
+
+bool backend_set_exclusive(struct io_plan *plan, bool excl)
+{
+ bool *excl_ptr = exclusive(plan);
+
+ if (excl != *excl_ptr) {
+ *excl_ptr = excl;
+ if (!excl)
+ num_exclusive--;
+ else
+ num_exclusive++;
+ }
+
+ return num_exclusive != 0;
+}
+
+/* FIXME: We could do this once at set_exclusive time, and catch everywhere
+ * else that we manipulate events. */
+static void exclude_pollfds(void)
+{
+ if (num_exclusive == 0)
+ return;
+
+ for (size_t i = 0; i < num_fds; i++) {
+ struct pollfd *pfd = &pollfds[fds[i]->backend_info];
+
+ if (!fds[i]->exclusive[IO_IN])
+ pfd->events &= ~POLLIN;
+ if (!fds[i]->exclusive[IO_OUT])
+ pfd->events &= ~POLLOUT;
+
+ /* If we're not listening, we don't want error events
+ * either. */
+ if (!pfd->events)
+ pfd->fd = -fds[i]->fd - 1;
+ }
+}
+
+static void restore_pollfds(void)
+{
+ if (num_exclusive == 0)
+ return;
+
+ for (size_t i = 0; i < num_fds; i++) {
+ struct pollfd *pfd = &pollfds[fds[i]->backend_info];
+
+ if (fds[i]->listener) {
+ pfd->events = POLLIN;
+ pfd->fd = fds[i]->fd;
+ } else {
+ struct io_conn *conn = (void *)fds[i];
+ setup_pfd(conn, pfd);
+ }
}
- return ret;
}
/* This is the main loop. */
}
}
+ /* We do this temporarily, assuming exclusive is unusual */
+ exclude_pollfds();
r = pollfn(pollfds, num_fds, ms_timeout);
+ restore_pollfds();
+
if (r < 0) {
/* Signals shouldn't break us, unless they set
* io_loop_return. */
struct io_conn *c = (void *)fds[i];
int events = pollfds[i].revents;
+ /* Clear so we don't get confused if exclusive next time */
+ pollfds[i].revents = 0;
+
if (r == 0)
break;
#include <sys/wait.h>
#include <stdio.h>
-#define PORT "65020"
+#define PORT "65041"
/* Should be looking to read from one fd. */
static int mypoll(struct pollfd *fds, nfds_t nfds, int timeout)
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#define PORT "65046"
+
+struct data {
+ struct io_listener *l;
+ int num_clients;
+ char *pattern;
+ char buf[30];
+ size_t buflen;
+};
+
+static struct io_plan *read_more(struct io_conn *conn, struct data *d);
+
+static struct io_plan *read_done(struct io_conn *conn, struct data *d)
+{
+ tal_resize(&d->pattern, tal_count(d->pattern) + strlen(d->buf));
+ strcat(d->pattern, d->buf);
+ return read_more(conn, d);
+}
+
+static struct io_plan *read_more(struct io_conn *conn, struct data *d)
+{
+ memset(d->buf, 0, sizeof(d->buf));
+ return io_read_partial(conn, d->buf, sizeof(d->buf), &d->buflen,
+ read_done, d);
+}
+
+
+static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
+{
+ d->num_clients++;
+ if (d->num_clients == 2) {
+ /* Free listener so when conns close we exit io_loop */
+ io_close_listener(d->l);
+ /* Set priority to second connection. */
+ ok1(io_conn_exclusive(conn, true) == true);
+ }
+ return read_more(conn, d);
+}
+
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct addrinfo *addrinfo = NULL;
+ int fd, status;
+ struct data d;
+
+ d.num_clients = 0;
+
+ /* This is how many tests you plan to run */
+ plan_tests(8);
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ d.l = io_new_listener(NULL, fd, init_conn, &d);
+ ok1(d.l);
+ fflush(stdout);
+
+ if (!fork()) {
+ int fd1, fd2;
+
+ io_close_listener(d.l);
+ fd1 = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd1 < 0)
+ exit(1);
+ if (connect(fd1, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ if (write(fd1, "1hellothere", strlen("1hellothere")) != strlen("1hellothere"))
+ exit(3);
+ fd2 = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd2 < 0)
+ exit(1);
+ if (connect(fd2, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+
+ sleep(1);
+ if (write(fd1, "1helloagain", strlen("1helloagain")) != strlen("1helloagain"))
+ exit(4);
+ sleep(1);
+ if (write(fd2, "2hellonew", strlen("2hellonew")) != strlen("2hellonew"))
+ exit(5);
+ close(fd1);
+ close(fd2);
+ freeaddrinfo(addrinfo);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+
+ d.pattern = tal_arrz(NULL, char, 1);
+ ok1(io_loop(NULL, NULL) == NULL);
+ if (!ok1(strcmp(d.pattern, "1hellothere2hellonew1helloagain") == 0))
+ printf("d.patterns = %s\n", d.pattern);
+ tal_free(d.pattern);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#define PORT "65047"
+
+struct data {
+ struct io_listener *l;
+ char *pattern;
+ char buf[30];
+ size_t buflen;
+};
+
+static struct io_plan *read_more(struct io_conn *conn, struct data *d);
+static struct io_plan *write_more(struct io_conn *conn, struct data *d);
+
+static struct io_plan *read_done(struct io_conn *conn, struct data *d)
+{
+ tal_resize(&d->pattern, tal_count(d->pattern) + 1 + strlen(d->buf));
+ strcat(d->pattern, "<");
+ strcat(d->pattern, d->buf);
+ return read_more(conn, d);
+}
+
+static struct io_plan *read_more(struct io_conn *conn, struct data *d)
+{
+ memset(d->buf, 0, sizeof(d->buf));
+ return io_read_partial(conn, d->buf, sizeof(d->buf), &d->buflen,
+ read_done, d);
+}
+
+static struct io_plan *write_done(struct io_conn *conn, struct data *d)
+{
+ tal_resize(&d->pattern, tal_count(d->pattern) + 1);
+ strcat(d->pattern, ">");
+ return write_more(conn, d);
+}
+
+static struct io_plan *write_more(struct io_conn *conn, struct data *d)
+{
+ return io_write_partial(conn, d->buf, 1, &d->buflen,
+ write_done, d);
+}
+
+static struct io_plan *read_priority_init(struct io_conn *conn, struct data *d)
+{
+ /* This should suppress the write */
+ ok1(io_conn_exclusive(conn, true));
+ return read_more(conn, d);
+}
+
+static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
+{
+ /* Free listener so when conns close we exit io_loop */
+ io_close_listener(d->l);
+
+ return io_duplex(conn, read_priority_init(conn, d), write_more(conn, d));
+}
+
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct addrinfo *addrinfo = NULL;
+ int fd, status;
+ struct data d;
+
+ /* This is how many tests you plan to run */
+ plan_tests(8);
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ d.l = io_new_listener(NULL, fd, init_conn, &d);
+ ok1(d.l);
+ fflush(stdout);
+
+ if (!fork()) {
+ io_close_listener(d.l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+
+ if (write(fd, "1hellothere", strlen("1hellothere")) != strlen("1hellothere"))
+ exit(3);
+ sleep(1);
+ if (write(fd, "1helloagain", strlen("1helloagain")) != strlen("1helloagain"))
+ exit(4);
+ close(fd);
+ freeaddrinfo(addrinfo);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+
+ d.pattern = tal_arrz(NULL, char, 1);
+ ok1(io_loop(NULL, NULL) == NULL);
+ /* No trace of writes */
+ ok1(strcmp(d.pattern, "<1hellothere<1helloagain") == 0);
+ tal_free(d.pattern);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}
--- /dev/null
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#define PORT "65048"
+
+struct data {
+ struct io_listener *l;
+ char *pattern;
+ char buf[30];
+ size_t buflen;
+};
+
+static struct io_plan *read_more(struct io_conn *conn, struct data *d);
+static struct io_plan *write_more(struct io_conn *conn, struct data *d);
+
+static struct io_plan *read_done(struct io_conn *conn, struct data *d)
+{
+ tal_resize(&d->pattern, tal_count(d->pattern) + 1 + strlen(d->buf));
+ strcat(d->pattern, "<");
+ strcat(d->pattern, d->buf);
+ return read_more(conn, d);
+}
+
+static struct io_plan *read_more(struct io_conn *conn, struct data *d)
+{
+ memset(d->buf, 0, sizeof(d->buf));
+ return io_read_partial(conn, d->buf, sizeof(d->buf), &d->buflen,
+ read_done, d);
+}
+
+static struct io_plan *write_done(struct io_conn *conn, struct data *d)
+{
+ tal_resize(&d->pattern, tal_count(d->pattern) + 1);
+ strcat(d->pattern, ">");
+ return write_more(conn, d);
+}
+
+static struct io_plan *write_more(struct io_conn *conn, struct data *d)
+{
+ return io_write_partial(conn, d->buf, 1, &d->buflen,
+ write_done, d);
+}
+
+static struct io_plan *write_priority_init(struct io_conn *conn, struct data *d)
+{
+ /* This should suppress the read */
+ ok1(io_conn_out_exclusive(conn, true));
+ return write_more(conn, d);
+}
+
+static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
+{
+ /* Free listener so when conns close we exit io_loop */
+ io_close_listener(d->l);
+
+ return io_duplex(conn, read_more(conn, d), write_priority_init(conn, d));
+}
+
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+ int fd, on = 1;
+ struct addrinfo *addrinfo, hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_protocol = 0;
+
+ if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ return -1;
+
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ return -1;
+
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ close(fd);
+ return -1;
+ }
+ if (listen(fd, 1) != 0) {
+ close(fd);
+ return -1;
+ }
+ *info = addrinfo;
+ return fd;
+}
+
+int main(void)
+{
+ struct addrinfo *addrinfo = NULL;
+ int fd, status;
+ struct data d;
+
+ /* This is how many tests you plan to run */
+ plan_tests(8);
+ fd = make_listen_fd(PORT, &addrinfo);
+ ok1(fd >= 0);
+ d.l = io_new_listener(NULL, fd, init_conn, &d);
+ ok1(d.l);
+ fflush(stdout);
+
+ if (!fork()) {
+ io_close_listener(d.l);
+ fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ addrinfo->ai_protocol);
+ if (fd < 0)
+ exit(1);
+ if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+ exit(2);
+ signal(SIGPIPE, SIG_IGN);
+
+ if (write(fd, "1hellothere", strlen("1hellothere")) != strlen("1hellothere"))
+ exit(3);
+ sleep(1);
+ if (write(fd, "1helloagain", strlen("1helloagain")) != strlen("1helloagain"))
+ exit(4);
+ close(fd);
+ freeaddrinfo(addrinfo);
+ exit(0);
+ }
+ freeaddrinfo(addrinfo);
+
+ d.pattern = tal_arrz(NULL, char, 1);
+ ok1(io_loop(NULL, NULL) == NULL);
+ /* No trace of reads */
+ ok1(strspn(d.pattern, ">") == strlen(d.pattern));
+ tal_free(d.pattern);
+
+ ok1(wait(&status));
+ ok1(WIFEXITED(status));
+ ok1(WEXITSTATUS(status) == 0);
+
+ /* This exits depending on whether all tests passed */
+ return exit_status();
+}