From: Rusty Russell Date: Mon, 14 Oct 2013 10:58:35 +0000 (+1030) Subject: ccan/io: timer support. X-Git-Url: https://git.ozlabs.org/?p=ccan;a=commitdiff_plain;h=1fe2db9cbe84812e1465db36f538a222e9984c93 ccan/io: timer support. Upgrade license, since timer is LGPL. Signed-off-by: Rusty Russell --- diff --git a/ccan/io/LICENSE b/ccan/io/LICENSE index 2354d129..dc314eca 120000 --- a/ccan/io/LICENSE +++ b/ccan/io/LICENSE @@ -1 +1 @@ -../../licenses/BSD-MIT \ No newline at end of file +../../licenses/LGPL-2.1 \ No newline at end of file diff --git a/ccan/io/_info b/ccan/io/_info index f494c124..d596b4de 100644 --- a/ccan/io/_info +++ b/ccan/io/_info @@ -159,7 +159,8 @@ * return WIFEXITED(status) ? WEXITSTATUS(status) : 2; * } * - * License: BSD-MIT + * License: LGPL (v2.1 or any later version) + * Author: Rusty Russell */ int main(int argc, char *argv[]) { @@ -167,6 +168,8 @@ int main(int argc, char *argv[]) return 1; if (strcmp(argv[1], "depends") == 0) { + printf("ccan/time\n"); + printf("ccan/timer\n"); return 0; } diff --git a/ccan/io/backend.h b/ccan/io/backend.h index 2b41fa72..a36dee9b 100644 --- a/ccan/io/backend.h +++ b/ccan/io/backend.h @@ -1,7 +1,8 @@ -/* Licensed under BSD-MIT - see LICENSE file for details */ +/* Licensed under LGPLv2.1+ - see LICENSE file for details */ #ifndef CCAN_IO_BACKEND_H #define CCAN_IO_BACKEND_H #include +#include struct fd { int fd; @@ -61,11 +62,20 @@ struct io_state_writepart { size_t *lenp; }; +struct io_timeout { + struct timer timer; + struct io_conn *conn; + + struct io_op *(*next)(struct io_conn *, void *arg); + void *next_arg; +}; + /* One connection per client. */ struct io_conn { struct fd fd; struct io_conn *duplex; + struct io_timeout *timeout; enum io_state state; union { @@ -76,6 +86,11 @@ struct io_conn { } u; }; +static inline bool timeout_active(const struct io_conn *conn) +{ + return conn->timeout && conn->timeout->conn; +} + extern void *io_loop_return; bool add_listener(struct io_listener *l); @@ -83,6 +98,8 @@ bool add_conn(struct io_conn *c); bool add_duplex(struct io_conn *c); void del_listener(struct io_listener *l); void backend_set_state(struct io_conn *conn, struct io_op *op); +void backend_add_timeout(struct io_conn *conn, struct timespec ts); +void backend_del_timeout(struct io_conn *conn); struct io_op *do_ready(struct io_conn *conn); #endif /* CCAN_IO_BACKEND_H */ diff --git a/ccan/io/benchmarks/Makefile b/ccan/io/benchmarks/Makefile index 21f3441d..0068400d 100644 --- a/ccan/io/benchmarks/Makefile +++ b/ccan/io/benchmarks/Makefile @@ -4,7 +4,7 @@ CFLAGS:=-Wall -I$(CCANDIR) -O3 -flto LDFLAGS:=-O3 -flto LDLIBS:=-lrt -OBJS:=time.o poll.o io.o err.o +OBJS:=time.o poll.o io.o err.o timer.o list.o default: $(ALL) @@ -14,6 +14,10 @@ run-length-prefix: run-length-prefix.o $(OBJS) time.o: $(CCANDIR)/ccan/time/time.c $(CC) $(CFLAGS) -c -o $@ $< +timer.o: $(CCANDIR)/ccan/timer/timer.c + $(CC) $(CFLAGS) -c -o $@ $< +list.o: $(CCANDIR)/ccan/list/list.c + $(CC) $(CFLAGS) -c -o $@ $< poll.o: $(CCANDIR)/ccan/io/poll.c $(CC) $(CFLAGS) -c -o $@ $< io.o: $(CCANDIR)/ccan/io/io.c diff --git a/ccan/io/io.c b/ccan/io/io.c index 6efc68ee..84c2ccad 100644 --- a/ccan/io/io.c +++ b/ccan/io/io.c @@ -1,4 +1,4 @@ -/* Licensed under BSD-MIT - see LICENSE file for details */ +/* Licensed under LGPLv2.1+ - see LICENSE file for details */ #include "io.h" #include "backend.h" #include @@ -58,6 +58,7 @@ struct io_conn *io_new_conn_(int fd, conn->fd.finish_arg = conn->fd.next_arg = arg; conn->state = NEXT; conn->duplex = NULL; + conn->timeout = NULL; if (!add_conn(conn)) { free(conn); return NULL; @@ -85,6 +86,7 @@ struct io_conn *io_duplex_(struct io_conn *old, conn->fd.finish_arg = conn->fd.next_arg = arg; conn->state = NEXT; conn->duplex = old; + conn->timeout = NULL; if (!add_duplex(conn)) { free(conn); return NULL; @@ -119,6 +121,22 @@ struct io_next *io_next_(struct io_conn *conn, return to_ionext(conn); } +bool io_timeout_(struct io_conn *conn, struct timespec ts, + struct io_op *(*next)(struct io_conn *, void *), void *arg) +{ + if (!conn->timeout) { + conn->timeout = malloc(sizeof(*conn->timeout)); + if (!conn->timeout) + return false; + } else + assert(!timeout_active(conn)); + + conn->timeout->next = next; + conn->timeout->next_arg = arg; + backend_add_timeout(conn, ts); + return true; +} + /* Queue some data to be written. */ struct io_op *io_write(const void *data, size_t len, struct io_next *next) { @@ -175,6 +193,8 @@ void io_wake_(struct io_conn *conn, static struct io_op *do_next(struct io_conn *conn) { + if (timeout_active(conn)) + backend_del_timeout(conn); return conn->fd.next(conn, conn->fd.next_arg); } diff --git a/ccan/io/io.h b/ccan/io/io.h index 5ca9731d..a481c5f3 100644 --- a/ccan/io/io.h +++ b/ccan/io/io.h @@ -1,7 +1,8 @@ -/* Licensed under BSD-MIT - see LICENSE file for details */ +/* Licensed under LGPLv2.1+ - see LICENSE file for details */ #ifndef CCAN_IO_H #define CCAN_IO_H #include +#include #include #include @@ -151,6 +152,30 @@ struct io_op *io_write_partial(const void *data, size_t *len, */ struct io_op *io_idle(struct io_conn *conn); +/** + * io_timeout - set timeout function if the callback doesn't fire. + * @conn: the current connection. + * @ts: how long until the timeout should be called. + * @next: function to call. + * @arg: argument to @next. + * + * If the usual next callback is not called for this connection before @ts, + * this function will be called. If next callback is called, the timeout + * is automatically removed. + * + * Returns false on allocation failure. A connection can only have one + * timeout. + */ +#define io_timeout(conn, ts, next, arg) \ + io_timeout_((conn), (ts), \ + typesafe_cb_preargs(struct io_op *, void *, \ + (next), (arg), \ + struct io_conn *), \ + (arg)) + +bool io_timeout_(struct io_conn *conn, struct timespec ts, + struct io_op *(*next)(struct io_conn *, void *), void *arg); + /** * io_duplex - split an fd into two connections. * @conn: a connection. diff --git a/ccan/io/poll.c b/ccan/io/poll.c index 7ceb8a7e..94536728 100644 --- a/ccan/io/poll.c +++ b/ccan/io/poll.c @@ -1,4 +1,4 @@ -/* Licensed under BSD-MIT - see LICENSE file for details */ +/* Licensed under LGPLv2.1+ - see LICENSE file for details */ #include "io.h" #include "backend.h" #include @@ -6,10 +6,12 @@ #include #include #include +#include static size_t num_fds = 0, max_fds = 0, num_next = 0, num_finished = 0, num_waiting = 0; static struct pollfd *pollfds = NULL; static struct fd **fds = NULL; +static struct timers timeouts; static bool add_fd(struct fd *fd, short events) { @@ -90,6 +92,9 @@ static void del_conn(struct io_conn *conn) { if (conn->fd.finish) conn->fd.finish(conn, conn->fd.finish_arg); + if (timeout_active(conn)) + backend_del_timeout(conn); + free(conn->timeout); if (conn->duplex) { /* In case fds[] pointed to the other one. */ fds[conn->fd.backend_info] = &conn->duplex->fd; @@ -197,13 +202,54 @@ static void ready(struct io_conn *c) backend_set_state(c, do_ready(c)); } +void backend_add_timeout(struct io_conn *conn, struct timespec duration) +{ + if (!timeouts.base) + timers_init(&timeouts, time_now()); + timer_add(&timeouts, &conn->timeout->timer, + time_add(time_now(), duration)); + conn->timeout->conn = conn; +} + +void backend_del_timeout(struct io_conn *conn) +{ + assert(conn->timeout->conn == conn); + timer_del(&timeouts, &conn->timeout->timer); + conn->timeout->conn = NULL; +} + /* This is the main loop. */ void *io_loop(void) { void *ret; while (!io_loop_return) { - int i, r; + int i, r, timeout = INT_MAX; + struct timespec now; + + if (timeouts.base) { + struct timespec first; + struct list_head expired; + struct io_timeout *t; + + now = time_now(); + + /* Call functions for expired timers. */ + timers_expire(&timeouts, now, &expired); + while ((t = list_pop(&expired, struct io_timeout, timer.list))) { + struct io_conn *conn = t->conn; + /* Clear, in case timer re-adds */ + t->conn = NULL; + backend_set_state(conn, t->next(conn, t->next_arg)); + } + + /* Now figure out how long to wait for the next one. */ + if (timer_earliest(&timeouts, &first)) { + uint64_t f = time_to_msec(time_sub(first, now)); + if (f < INT_MAX) + timeout = f; + } + } if (num_finished || num_next) { finish_and_next(false); @@ -217,7 +263,7 @@ void *io_loop(void) /* You can't tell them all to go to sleep! */ assert(num_waiting); - r = poll(pollfds, num_fds, -1); + r = poll(pollfds, num_fds, timeout); if (r < 0) break; @@ -225,10 +271,16 @@ void *io_loop(void) struct io_conn *c = (void *)fds[i]; int events = pollfds[i].revents; + if (r == 0) + break; + if (fds[i]->listener) { - if (events & POLLIN) + if (events & POLLIN) { accept_conn((void *)c); + r--; + } } else if (events & (POLLIN|POLLOUT)) { + r--; if (c->duplex) { int mask = pollmask(c->duplex->state); if (events & mask) { @@ -240,13 +292,13 @@ void *io_loop(void) } ready(c); } else if (events & POLLHUP) { + r--; backend_set_state(c, io_close(c, NULL)); if (c->duplex) backend_set_state(c->duplex, io_close(c->duplex, NULL)); } - } } diff --git a/ccan/io/test/run-15-timeout.c b/ccan/io/test/run-15-timeout.c new file mode 100644 index 00000000..b5e0dc3f --- /dev/null +++ b/ccan/io/test/run-15-timeout.c @@ -0,0 +1,166 @@ +#include +/* Include the C files directly. */ +#include +#include +#include +#include +#include +#include + +struct data { + int state; + int timeout_usec; + bool timed_out; + char buf[4]; +}; + + +static struct io_op *no_timeout(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 1); + d->state++; + return io_close(conn, d); +} + +static struct io_op *timeout(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 1); + d->state++; + d->timed_out = true; + return io_close(conn, d); +} + +static struct io_op *start_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 0); + d->state++; + io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d); + return io_read(d->buf, sizeof(d->buf), io_next(conn, no_timeout, d)); +} + +static void finish_ok(struct io_conn *conn, struct data *d) +{ + ok1(d->state == 2); + d->state++; + io_break(d, NULL); +} + +static int make_listen_fd(const char *port, struct addrinfo **info) +{ + int fd, on = 1; + struct addrinfo *addrinfo, hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0) + return -1; + + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + return -1; + + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) { + close(fd); + return -1; + } + if (listen(fd, 1) != 0) { + close(fd); + return -1; + } + *info = addrinfo; + return fd; +} + +int main(void) +{ + struct data *d = malloc(sizeof(*d)); + struct addrinfo *addrinfo; + struct io_listener *l; + int fd, status; + + /* This is how many tests you plan to run */ + plan_tests(20); + d->state = 0; + d->timed_out = false; + d->timeout_usec = 100000; + fd = make_listen_fd("65002", &addrinfo); + ok1(fd >= 0); + l = io_new_listener(fd, start_ok, finish_ok, d); + ok1(l); + fflush(stdout); + + if (!fork()) { + int i; + + io_close_listener(l); + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + signal(SIGPIPE, SIG_IGN); + usleep(500000); + for (i = 0; i < strlen("hellothere"); i++) { + if (write(fd, "hellothere" + i, 1) != 1) + break; + } + close(fd); + freeaddrinfo(addrinfo); + free(d); + exit(i); + } + ok1(io_loop() == d); + ok1(d->state == 3); + ok1(d->timed_out == true); + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) < sizeof(d->buf)); + + /* This one shouldn't time out. */ + d->state = 0; + d->timed_out = false; + d->timeout_usec = 500000; + fflush(stdout); + + if (!fork()) { + int i; + + io_close_listener(l); + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); + if (fd < 0) + exit(1); + if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) + exit(2); + signal(SIGPIPE, SIG_IGN); + usleep(100000); + for (i = 0; i < strlen("hellothere"); i++) { + if (write(fd, "hellothere" + i, 1) != 1) + break; + } + close(fd); + freeaddrinfo(addrinfo); + free(d); + exit(i); + } + ok1(io_loop() == d); + ok1(d->state == 3); + ok1(d->timed_out == false); + ok1(wait(&status)); + ok1(WIFEXITED(status)); + ok1(WEXITSTATUS(status) >= sizeof(d->buf)); + + io_close_listener(l); + freeaddrinfo(addrinfo); + free(d); + + /* This exits depending on whether all tests passed */ + return exit_status(); +}