From: Rusty Russell Date: Mon, 4 Aug 2014 08:13:21 +0000 (+0930) Subject: ccan/io: implement timeouts. X-Git-Url: http://git.ozlabs.org/?p=ccan;a=commitdiff_plain;h=94dd4c2bddd0dc080ad5b85465fa3f45f486967a ccan/io: implement timeouts. We do this by the simplest method: return from io_loop() and let the caller sort them out. Signed-off-by: Rusty Russell --- diff --git a/ccan/io/_info b/ccan/io/_info index 7fc6ac61..2e0018ad 100644 --- a/ccan/io/_info +++ b/ccan/io/_info @@ -118,7 +118,7 @@ * io_set_finish(reader, finish, &from); * io_new_conn(NULL, STDOUT_FILENO, write_out, &from); * - * io_loop(); + * io_loop(NULL, NULL); * wait(&status); * * return WIFEXITED(status) ? WEXITSTATUS(status) : 2; @@ -133,7 +133,10 @@ int main(int argc, char *argv[]) return 1; if (strcmp(argv[1], "depends") == 0) { + printf("ccan/list\n"); printf("ccan/tal\n"); + printf("ccan/time\n"); + printf("ccan/timer\n"); printf("ccan/typesafe_cb\n"); return 0; } diff --git a/ccan/io/io.h b/ccan/io/io.h index d4ba2d87..165ff6f6 100644 --- a/ccan/io/io.h +++ b/ccan/io/io.h @@ -6,6 +6,9 @@ #include #include +struct timers; +struct list_head; + /** * struct io_plan - a plan for input or output. * @@ -169,7 +172,7 @@ struct io_listener *io_new_listener_(const tal_t *ctx, int fd, * ... * struct io_listener *l = do_listen("8111"); * if (l) { - * io_loop(); + * io_loop(NULL, NULL); * io_close_listener(l); * } */ @@ -553,14 +556,17 @@ struct io_plan *io_close_cb(struct io_conn *, void *unused); /** * io_loop - process fds until all closed on io_break. + * @timers - timers which are waiting to go off (or NULL for none) + * @expired - a list filled with expired timers (can be NULL if @timers is) * * This is the core loop; it exits with the io_break() arg, or NULL if - * all connections and listeners are closed. + * all connections and listeners are closed, or with @expired set to a + * list of expired timers (if @timers isn't NULL). * * Example: - * io_loop(); + * io_loop(NULL, NULL); */ -void *io_loop(void); +void *io_loop(struct timers *timers, struct list_head *expired); /** * io_conn_fd - get the fd from a connection. diff --git a/ccan/io/poll.c b/ccan/io/poll.c index c1a62452..e4058766 100644 --- a/ccan/io/poll.c +++ b/ccan/io/poll.c @@ -8,6 +8,9 @@ #include #include #include +#include +#include +#include static size_t num_fds = 0, max_fds = 0, num_waiting = 0; static struct pollfd *pollfds = NULL; @@ -223,12 +226,19 @@ static bool handle_always(void) } /* This is the main loop. */ -void *io_loop(void) +void *io_loop(struct timers *timers, struct list_head *expired) { void *ret; + /* if timers is NULL, expired must be. If not, not. */ + assert(!timers == !expired); + + /* Make sure this is empty if we exit for some other reason. */ + if (expired) + list_head_init(expired); + while (!io_loop_return) { - int i, r; + int i, r, ms_timeout = -1; if (close_conns()) { /* Could have started/finished more. */ @@ -247,7 +257,28 @@ void *io_loop(void) /* You can't tell them all to go to sleep! */ assert(num_waiting); - r = poll(pollfds, num_fds, -1); + if (timers) { + struct timeabs now, first; + + now = time_now(); + + /* Call functions for expired timers. */ + timers_expire(timers, now, expired); + if (!list_empty(expired)) + break; + + /* Now figure out how long to wait for the next one. */ + if (timer_earliest(timers, &first)) { + uint64_t next; + next = time_to_msec(time_between(first, now)); + if (next < INT_MAX) + ms_timeout = next; + else + ms_timeout = INT_MAX; + } + } + + r = poll(pollfds, num_fds, ms_timeout); if (r < 0) break; diff --git a/ccan/io/test/run-01-start-finish.c b/ccan/io/test/run-01-start-finish.c index b1ce78d3..eb2c90a6 100644 --- a/ccan/io/test/run-01-start-finish.c +++ b/ccan/io/test/run-01-start-finish.c @@ -88,7 +88,7 @@ int main(void) exit(0); } freeaddrinfo(addrinfo); - ok1(io_loop() == &state + 1); + ok1(io_loop(NULL, NULL) == &state + 1); ok1(state == 2); io_close_listener(l); ok1(wait(&state)); diff --git a/ccan/io/test/run-02-read.c b/ccan/io/test/run-02-read.c index 4a043359..b43bb8bf 100644 --- a/ccan/io/test/run-02-read.c +++ b/ccan/io/test/run-02-read.c @@ -99,7 +99,7 @@ int main(void) exit(0); } freeaddrinfo(addrinfo); - ok1(io_loop() == d); + ok1(io_loop(NULL, NULL) == d); ok1(d->state == 2); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); free(d); diff --git a/ccan/io/test/run-03-readpartial.c b/ccan/io/test/run-03-readpartial.c index 7b360638..2d15c2f5 100644 --- a/ccan/io/test/run-03-readpartial.c +++ b/ccan/io/test/run-03-readpartial.c @@ -106,7 +106,7 @@ int main(void) free(d); exit(0); } - ok1(io_loop() == d); + ok1(io_loop(NULL, NULL) == d); ok1(d->state == 2); ok1(d->bytes > 0); ok1(d->bytes <= sizeof(d->buf)); @@ -125,7 +125,7 @@ int main(void) exit(0); } d->state = 0; - ok1(io_loop() == d); + ok1(io_loop(NULL, NULL) == d); ok1(d->state == 2); ok1(d->bytes > 0); ok1(d->bytes <= strlen("hi")); diff --git a/ccan/io/test/run-04-writepartial.c b/ccan/io/test/run-04-writepartial.c index 4ca21a32..011bbc89 100644 --- a/ccan/io/test/run-04-writepartial.c +++ b/ccan/io/test/run-04-writepartial.c @@ -109,7 +109,7 @@ int main(void) free(d); exit(0); } - ok1(io_loop() == d); + ok1(io_loop(NULL, NULL) == d); ok1(d->state == 2); ok1(d->bytes > 0); ok1(d->bytes <= 1024*1024); diff --git a/ccan/io/test/run-05-write.c b/ccan/io/test/run-05-write.c index 2e744cff..b17ef83a 100644 --- a/ccan/io/test/run-05-write.c +++ b/ccan/io/test/run-05-write.c @@ -110,7 +110,7 @@ int main(void) free(d); exit(0); } - ok1(io_loop() == d); + ok1(io_loop(NULL, NULL) == d); ok1(d->state == 2); ok1(wait(&status)); diff --git a/ccan/io/test/run-06-idle.c b/ccan/io/test/run-06-idle.c index d01cb31f..8f3d5410 100644 --- a/ccan/io/test/run-06-idle.c +++ b/ccan/io/test/run-06-idle.c @@ -143,7 +143,7 @@ int main(void) } freeaddrinfo(addrinfo); - ok1(io_loop() == d); + ok1(io_loop(NULL, NULL) == d); ok1(d->state == 4); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); free(d); diff --git a/ccan/io/test/run-07-break.c b/ccan/io/test/run-07-break.c index 1f69e9f5..dd192c4d 100644 --- a/ccan/io/test/run-07-break.c +++ b/ccan/io/test/run-07-break.c @@ -107,11 +107,11 @@ int main(void) exit(0); } freeaddrinfo(addrinfo); - ok1(io_loop() == d); + ok1(io_loop(NULL, NULL) == d); ok1(d->state == 1); io_close_listener(l); - ok1(io_loop() == NULL); + ok1(io_loop(NULL, NULL) == NULL); ok1(d->state == 3); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); free(d); diff --git a/ccan/io/test/run-08-hangup-on-idle.c b/ccan/io/test/run-08-hangup-on-idle.c index 52309b4b..eb3dab3d 100644 --- a/ccan/io/test/run-08-hangup-on-idle.c +++ b/ccan/io/test/run-08-hangup-on-idle.c @@ -52,7 +52,7 @@ int main(void) exit(0); } - ok1(io_loop() == NULL); + ok1(io_loop(NULL, NULL) == NULL); ok1(memcmp(buf, "hello there world", 16) == 0); /* This exits depending on whether all tests passed */ diff --git a/ccan/io/test/run-08-read-after-hangup.c b/ccan/io/test/run-08-read-after-hangup.c index 7b6731ec..14af5ae6 100644 --- a/ccan/io/test/run-08-read-after-hangup.c +++ b/ccan/io/test/run-08-read-after-hangup.c @@ -41,7 +41,7 @@ int main(void) conn = io_new_conn(NULL, fds[0], init_waiter, NULL); io_new_conn(conn, fds[1], init_writer, conn); - ok1(io_loop() == NULL); + ok1(io_loop(NULL, NULL) == NULL); ok1(memcmp(inbuf, "EASYTEST", sizeof(inbuf)) == 0); /* This exits depending on whether all tests passed */ diff --git a/ccan/io/test/run-09-connect.c b/ccan/io/test/run-09-connect.c index 8f57f00d..09b33387 100644 --- a/ccan/io/test/run-09-connect.c +++ b/ccan/io/test/run-09-connect.c @@ -99,7 +99,7 @@ int main(void) addrinfo->ai_protocol); ok1(io_new_conn(NULL, fd, setup_connect, addrinfo)); - ok1(io_loop() == NULL); + ok1(io_loop(NULL, NULL) == NULL); ok1(d->state == 2); ok1(d2->state == 2); diff --git a/ccan/io/test/run-10-many.c b/ccan/io/test/run-10-many.c index fc48ecb5..3339a335 100644 --- a/ccan/io/test/run-10-many.c +++ b/ccan/io/test/run-10-many.c @@ -101,7 +101,7 @@ int main(void) ok1(buf[i].writer); /* They should eventually exit */ - ok1(io_loop() == NULL); + ok1(io_loop(NULL, NULL) == NULL); for (i = 0; i < NUM; i++) { char b[sizeof(buf[0].buf)]; diff --git a/ccan/io/test/run-12-bidir.c b/ccan/io/test/run-12-bidir.c index 0329b81f..bddd7fbe 100644 --- a/ccan/io/test/run-12-bidir.c +++ b/ccan/io/test/run-12-bidir.c @@ -119,7 +119,7 @@ int main(void) exit(0); } freeaddrinfo(addrinfo); - ok1(io_loop() == NULL); + ok1(io_loop(NULL, NULL) == NULL); ok1(d->state == 4); ok1(d->done == 2); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); diff --git a/ccan/io/test/run-13-all-idle.c b/ccan/io/test/run-13-all-idle.c index 3701472b..7896b14f 100644 --- a/ccan/io/test/run-13-all-idle.c +++ b/ccan/io/test/run-13-all-idle.c @@ -23,7 +23,7 @@ int main(void) ok1(pipe(fds) == 0); io_new_conn(NULL, fds[0], setup_waiter, &status); - io_loop(); + io_loop(NULL, NULL); exit(1); } diff --git a/ccan/io/test/run-14-duplex-both-read.c b/ccan/io/test/run-14-duplex-both-read.c index b52553aa..8cf22aa1 100644 --- a/ccan/io/test/run-14-duplex-both-read.c +++ b/ccan/io/test/run-14-duplex-both-read.c @@ -125,7 +125,7 @@ int main(void) exit(0); } freeaddrinfo(addrinfo); - ok1(io_loop() == NULL); + ok1(io_loop(NULL, NULL) == NULL); ok1(d->state == 5); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); free(d); diff --git a/ccan/io/test/run-15-timeout.c b/ccan/io/test/run-15-timeout.c index 9224b0a9..e0a3b05e 100644 --- a/ccan/io/test/run-15-timeout.c +++ b/ccan/io/test/run-15-timeout.c @@ -3,55 +3,49 @@ #include #include #include +#include #include #include #include -#if 0 #ifndef PORT #define PORT "65015" #endif struct data { + struct timers timers; int state; + struct io_conn *conn; + struct timer timer; int timeout_usec; - bool timed_out; char buf[4]; }; - -static struct io_plan no_timeout(struct io_conn *conn, struct data *d) +static void finish_ok(struct io_conn *conn, struct data *d) { - ok1(d->state == 1); d->state++; - return io_close(); + io_break(d); } -static struct io_plan timeout(struct io_conn *conn, struct data *d) +static struct io_plan *no_timeout(struct io_conn *conn, struct data *d) { ok1(d->state == 1); d->state++; - d->timed_out = true; - return io_close(); -} - -static void finish_ok(struct io_conn *conn, struct data *d) -{ - ok1(d->state == 2); - d->state++; - io_break(d); + return io_close(conn); } -static void init_conn(int fd, struct data *d) +static struct io_plan *init_conn(struct io_conn *conn, struct data *d) { - struct io_conn *conn; - ok1(d->state == 0); d->state++; - conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d)); + d->conn = conn; io_set_finish(conn, finish_ok, d); - io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d); + + timer_add(&d->timers, &d->timer, + timeabs_add(time_now(), time_from_usec(d->timeout_usec))); + + return io_read(conn, d->buf, sizeof(d->buf), no_timeout, d); } static int make_listen_fd(const char *port, struct addrinfo **info) @@ -91,16 +85,17 @@ int main(void) struct data *d = malloc(sizeof(*d)); struct addrinfo *addrinfo; struct io_listener *l; + struct list_head expired; int fd, status; /* This is how many tests you plan to run */ - plan_tests(20); + plan_tests(21); d->state = 0; - d->timed_out = false; d->timeout_usec = 100000; + timers_init(&d->timers, time_now()); fd = make_listen_fd(PORT, &addrinfo); ok1(fd >= 0); - l = io_new_listener(fd, init_conn, d); + l = io_new_listener(NULL, fd, init_conn, d); ok1(l); fflush(stdout); @@ -122,19 +117,31 @@ int main(void) } close(fd); freeaddrinfo(addrinfo); + timers_cleanup(&d->timers); free(d); exit(i); } - ok1(io_loop() == d); - ok1(d->state == 3); - ok1(d->timed_out == true); + ok1(io_loop(&d->timers, &expired) == NULL); + + /* One element, d->timer. */ + ok1(list_pop(&expired, struct timer, list) == &d->timer); + ok1(list_empty(&expired)); + ok1(d->state == 1); + + io_close(d->conn); + + /* Finished will be called, d will be returned */ + ok1(io_loop(&d->timers, &expired) == d); + ok1(list_empty(&expired)); + ok1(d->state == 2); + + /* It should have died. */ ok1(wait(&status)); ok1(WIFEXITED(status)); ok1(WEXITSTATUS(status) < sizeof(d->buf)); /* This one shouldn't time out. */ d->state = 0; - d->timed_out = false; d->timeout_usec = 500000; fflush(stdout); @@ -156,26 +163,22 @@ int main(void) } close(fd); freeaddrinfo(addrinfo); + timers_cleanup(&d->timers); free(d); exit(i); } - ok1(io_loop() == d); + ok1(io_loop(&d->timers, &expired) == d); ok1(d->state == 3); - ok1(d->timed_out == false); + ok1(list_empty(&expired)); ok1(wait(&status)); ok1(WIFEXITED(status)); ok1(WEXITSTATUS(status) >= sizeof(d->buf)); io_close_listener(l); freeaddrinfo(addrinfo); + timers_cleanup(&d->timers); free(d); /* This exits depending on whether all tests passed */ return exit_status(); } -#else -int main(void) -{ - return 0; -} -#endif diff --git a/ccan/io/test/run-16-duplex-test.c b/ccan/io/test/run-16-duplex-test.c index 9d939c58..ea588661 100644 --- a/ccan/io/test/run-16-duplex-test.c +++ b/ccan/io/test/run-16-duplex-test.c @@ -119,7 +119,7 @@ int main(void) exit(0); } freeaddrinfo(addrinfo); - ok1(io_loop() == NULL); + ok1(io_loop(NULL, NULL) == NULL); ok1(d->state == 4); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); free(d); diff --git a/ccan/io/test/run-17-homemade-io.c b/ccan/io/test/run-17-homemade-io.c index a868b213..1087e250 100644 --- a/ccan/io/test/run-17-homemade-io.c +++ b/ccan/io/test/run-17-homemade-io.c @@ -166,7 +166,7 @@ int main(void) exit(0); } freeaddrinfo(addrinfo); - ok1(io_loop() == pkt); + ok1(io_loop(NULL, NULL) == pkt); ok1(pkt->state == 4); ok1(pkt->len == 8); ok1(memcmp(pkt->contents, "hithere!", 8) == 0); diff --git a/ccan/io/test/run-18-errno.c b/ccan/io/test/run-18-errno.c index f67c0eb0..c19ab375 100644 --- a/ccan/io/test/run-18-errno.c +++ b/ccan/io/test/run-18-errno.c @@ -109,7 +109,7 @@ int main(void) exit(0); } freeaddrinfo(addrinfo); - ok1(io_loop() == &state + 1); + ok1(io_loop(NULL, NULL) == &state + 1); ok1(state == 4); io_close_listener(l); ok1(wait(&state)); diff --git a/ccan/io/test/run-19-always.c b/ccan/io/test/run-19-always.c index 24771969..63eb34e6 100644 --- a/ccan/io/test/run-19-always.c +++ b/ccan/io/test/run-19-always.c @@ -117,7 +117,7 @@ int main(void) free(d); exit(0); } - ok1(io_loop() == d); + ok1(io_loop(NULL, NULL) == d); ok1(d->state == 2); ok1(wait(&status));