lib/waiter: Add timeout waiters
authorJeremy Kerr <jk@ozlabs.org>
Tue, 21 May 2013 03:52:00 +0000 (11:52 +0800)
committerJeremy Kerr <jk@ozlabs.org>
Mon, 24 Jun 2013 04:52:49 +0000 (12:52 +0800)
Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
discover/discover-server.c
discover/udev.c
discover/user-event.c
lib/waiter/waiter.c
lib/waiter/waiter.h
ui/common/discover-client.c
ui/ncurses/nc-cui.c
ui/twin/pbt-scr.c

index 4ba91eeba5526c3ce85bd87e4c181fe542a434af..67ce266c4c433fced328f810360094cd8e60f10f 100644 (file)
@@ -241,8 +241,9 @@ static int discover_server_process_connection(void *arg)
                }
        }
 
-       client->waiter = waiter_register(server->waitset, client->fd, WAIT_IN,
-                               discover_server_process_message, client);
+       client->waiter = waiter_register_io(server->waitset, client->fd,
+                               WAIT_IN, discover_server_process_message,
+                               client);
 
        return 0;
 }
@@ -327,7 +328,7 @@ struct discover_server *discover_server_init(struct waitset *waitset)
                goto out_err;
        }
 
-       server->waiter = waiter_register(server->waitset, server->socket,
+       server->waiter = waiter_register_io(server->waitset, server->socket,
                        WAIT_IN, discover_server_process_connection, server);
 
        return server;
index 330b286848513d47e68f769c64c7dfaeedf8bcc4..309a749acfbad20787e703f256216be11f017d08 100644 (file)
@@ -296,7 +296,7 @@ struct pb_udev *udev_init(struct waitset *waitset,
        if (result)
                goto fail_monitor;
 
-       waiter_register(waitset, udev_monitor_get_fd(udev->monitor), WAIT_IN,
+       waiter_register_io(waitset, udev_monitor_get_fd(udev->monitor), WAIT_IN,
                udev_process, udev->monitor);
 
        pb_log("%s: waiting on udev\n", __func__);
index 23842e1343315a0017df1513e7838d8463ca25fd..013357b49bff6340f671af561d0cb25ec12a18fc 100644 (file)
@@ -158,7 +158,8 @@ struct user_event *user_event_init(struct waitset *waitset,
                        strerror(errno));
        }
 
-       waiter_register(waitset, uev->socket, WAIT_IN, user_event_process, uev);
+       waiter_register_io(waitset, uev->socket, WAIT_IN,
+                       user_event_process, uev);
 
        pb_log("%s: waiting on %s\n", __func__, PBOOT_USER_EVENT_SOCKET);
 
index 78ba045f8b18fe5088d8c11422175b4654f87752..513ab608af3a3115ec6a39d58a0645dee129cd56 100644 (file)
@@ -3,15 +3,21 @@
 #include <stdbool.h>
 #include <string.h>
 #include <assert.h>
+#include <sys/time.h>
 
 #include <talloc/talloc.h>
 
 #include "waiter.h"
 
 struct waiter {
+       enum {
+               WAITER_IO,
+               WAITER_TIME,
+       }               type;
        struct waitset  *set;
        int             fd;
        int             events;
+       struct timeval  timeout;
        waiter_cb       callback;
        void            *arg;
 };
@@ -21,12 +27,17 @@ struct waitset {
        int             n_waiters;
        bool            waiters_changed;
 
+       struct timeval  next_timeout;
+
        /* These are kept consistent over each call to waiter_poll, as
         * set->waiters may be updated (by waiters' callbacks calling
         * waiter_register or waiter_remove) during iteration. */
        struct pollfd   *pollfds;
-       struct waiter   **cur_waiters;
-       int             cur_n_waiters;
+       int             n_pollfds;
+       struct waiter   **io_waiters;
+       int             n_io_waiters;
+       struct waiter   **time_waiters;
+       int             n_time_waiters;
 };
 
 struct waitset *waitset_create(void *ctx)
@@ -40,8 +51,7 @@ void waitset_destroy(struct waitset *set)
        talloc_free(set);
 }
 
-struct waiter *waiter_register(struct waitset *set, int fd, int events,
-               waiter_cb callback, void *arg)
+static struct waiter *waiter_new(struct waitset *set)
 {
        struct waiter **waiters, *waiter;
 
@@ -62,7 +72,15 @@ struct waiter *waiter_register(struct waitset *set, int fd, int events,
        set->n_waiters++;
 
        set->waiters[set->n_waiters - 1] = waiter;
+       return waiter;
+}
+
+struct waiter *waiter_register_io(struct waitset *set, int fd, int events,
+               waiter_cb callback, void *arg)
+{
+       struct waiter *waiter = waiter_new(set);
 
+       waiter->type = WAITER_IO;
        waiter->set = set;
        waiter->fd = fd;
        waiter->events = events;
@@ -72,6 +90,27 @@ struct waiter *waiter_register(struct waitset *set, int fd, int events,
        return waiter;
 }
 
+struct waiter *waiter_register_timeout(struct waitset *set, int delay_ms,
+               waiter_cb callback, void *arg)
+{
+       struct waiter *waiter = waiter_new(set);
+       struct timeval now, delay;
+
+       delay.tv_sec = delay_ms / 1000;
+       delay.tv_usec = 1000 * (delay_ms % 1000);
+
+       gettimeofday(&now, NULL);
+
+       timeradd(&now, &delay, &waiter->timeout);
+
+       waiter->type = WAITER_TIME;
+       waiter->set = set;
+       waiter->callback = callback;
+       waiter->arg = arg;
+
+       return waiter;
+}
+
 void waiter_remove(struct waiter *waiter)
 {
        struct waitset *set = waiter->set;
@@ -94,47 +133,117 @@ void waiter_remove(struct waiter *waiter)
        talloc_free(waiter);
 }
 
+static void update_waiters(struct waitset *set)
+{
+       int n_io, n_time, i_io, i_time, i;
+
+       if (!set->waiters_changed)
+               return;
+
+       n_io = n_time = 0;
+
+       for (i = 0; i < set->n_waiters; i++) {
+               if (set->waiters[i]->type == WAITER_IO)
+                       n_io++;
+               else if (set->waiters[i]->type == WAITER_TIME)
+                       n_time++;
+       }
+
+       /* realloc if counts have changed */
+       if (set->n_io_waiters != n_io) {
+               set->io_waiters = talloc_realloc(set, set->io_waiters,
+                               struct waiter *, n_io);
+               set->pollfds = talloc_realloc(set, set->pollfds,
+                               struct pollfd, n_io);
+               set->n_io_waiters = n_io;
+       }
+       if (set->n_time_waiters != n_time) {
+               set->time_waiters = talloc_realloc(set, set->time_waiters,
+                               struct waiter *, n_time);
+               set->n_time_waiters = n_time;
+       }
+
+       /* IO waiters: copy to io_waiters, populate pollfds */
+       for (i = i_io = 0; i < set->n_waiters; i++) {
+               struct waiter *waiter = set->waiters[i];
+
+               if (waiter->type != WAITER_IO)
+                       continue;
+
+               set->pollfds[i_io].fd = waiter->fd;
+               set->pollfds[i_io].events = waiter->events;
+               set->io_waiters[i_io] = waiter;
+               i_io++;
+       }
+
+       /* time waiters: copy to time_waiters, calculate next expiry */
+       timerclear(&set->next_timeout);
+       for (i = i_time = 0; i < set->n_waiters; i++) {
+               struct waiter *waiter = set->waiters[i];
+
+               if (waiter->type != WAITER_TIME)
+                       continue;
+
+               if (!timerisset(&set->next_timeout) ||
+                               timercmp(&waiter->timeout,
+                                       &set->next_timeout, <))
+                       set->next_timeout = waiter->timeout;
+
+               set->time_waiters[i_time] = waiter;
+               i_time++;
+       }
+}
+
 int waiter_poll(struct waitset *set)
 {
+       struct timeval now, timeout;
+       int timeout_ms;
        int i, rc;
 
        /* If the waiters have been updated, we need to update our
         * consistent copy */
-       if (set->waiters_changed) {
-
-               /* We need to reallocate if the count has changes */
-               if (set->cur_n_waiters != set->n_waiters) {
-                       set->cur_waiters = talloc_realloc(set, set->cur_waiters,
-                                       struct waiter *, set->n_waiters);
-                       set->pollfds = talloc_realloc(set, set->pollfds,
-                                       struct pollfd, set->n_waiters);
-                       set->cur_n_waiters = set->n_waiters;
-               }
-
-               /* Populate cur_waiters and pollfds from ->waiters data */
-               for (i = 0; i < set->n_waiters; i++) {
-                       set->pollfds[i].fd = set->waiters[i]->fd;
-                       set->pollfds[i].events = set->waiters[i]->events;
-                       set->pollfds[i].revents = 0;
-                       set->cur_waiters[i] = set->waiters[i];
-               }
-
-               set->waiters_changed = false;
+       update_waiters(set);
+
+       if (timerisset(&set->next_timeout)) {
+               gettimeofday(&now, NULL);
+               timersub(&set->next_timeout, &now, &timeout);
+               timeout_ms = timeout.tv_sec * 1000 +
+                               timeout.tv_usec / 1000;
+               if (timeout_ms < 0)
+                       timeout_ms = 0;
+       } else {
+               timeout_ms = -1;
        }
 
-       rc = poll(set->pollfds, set->cur_n_waiters, -1);
 
-       if (rc <= 0)
+       rc = poll(set->pollfds, set->n_io_waiters, timeout_ms);
+
+       if (rc < 0)
                return rc;
 
-       for (i = 0; i < set->cur_n_waiters; i++) {
-               if (set->pollfds[i].revents) {
-                       rc = set->cur_waiters[i]->callback(
-                                       set->cur_waiters[i]->arg);
+       for (i = 0; i < set->n_io_waiters; i++) {
+               struct waiter *waiter = set->io_waiters[i];
+
+               if (!set->pollfds[i].revents)
+                       continue;
+               rc = waiter->callback(waiter->arg);
+
+               if (rc)
+                       waiter_remove(waiter);
+       }
+
+       if (set->n_time_waiters > 0)
+               gettimeofday(&now, NULL);
+
+       for (i = 0; i < set->n_time_waiters; i++) {
+               struct waiter *waiter = set->time_waiters[i];
+
+               if (timercmp(&waiter->timeout, &now, >))
+                       continue;
+
+               waiter->callback(waiter->arg);
 
-                       if (rc)
-                               waiter_remove(set->cur_waiters[i]);
-               }
+               waiter_remove(waiter);
        }
 
        return 0;
index ed7f6bbdb9994a4c673a4e3bf71bb552a9f45d69..58ea04c628ac6cd346493a4a8cfb0f24860eec7c 100644 (file)
@@ -16,7 +16,10 @@ typedef int (*waiter_cb)(void *);
 struct waitset *waitset_create(void *ctx);
 void waitset_destroy(struct waitset *waitset);
 
-struct waiter *waiter_register(struct waitset *waitset, int fd, int events,
+struct waiter *waiter_register_io(struct waitset *waitset, int fd, int events,
+               waiter_cb callback, void *arg);
+
+struct waiter *waiter_register_timeout(struct waitset *set, int delay_ms,
                waiter_cb callback, void *arg);
 
 void waiter_remove(struct waiter *waiter);
index a40bfdafe0ce669b1d832f5d95d90f50267eadc8..107d0314d2de24f563e6c87bbe0d66ec4f733330 100644 (file)
@@ -215,8 +215,8 @@ struct discover_client* discover_client_init(struct waitset *waitset,
                goto out_err;
        }
 
-       waiter_register(waitset, client->fd, WAIT_IN, discover_client_process,
-                       client);
+       waiter_register_io(waitset, client->fd, WAIT_IN,
+                       discover_client_process, client);
 
        return client;
 
index d091331b9656ce1a5635f08b73d64fd5b0e5fcbc..ef3fd23b8474711a65d1b78d77845d1cf22c7acb 100644 (file)
@@ -572,7 +572,7 @@ retry_start:
        atexit(nc_atexit);
        nc_start();
 
-       waiter_register(cui->waitset, STDIN_FILENO, WAIT_IN,
+       waiter_register_io(cui->waitset, STDIN_FILENO, WAIT_IN,
                        cui_process_key, cui);
 
        if (js_map) {
@@ -580,7 +580,7 @@ retry_start:
                cui->pjs = pjs_init(cui, js_map);
 
                if (cui->pjs)
-                       waiter_register(cui->waitset, pjs_get_fd(cui->pjs),
+                       waiter_register_io(cui->waitset, pjs_get_fd(cui->pjs),
                                        WAIT_IN, cui_process_js, cui);
        }
 
index 8532bc10bfb79cfb79338fae96dea0bbcc035670..e093c3c66b13fcc28defaa3d8b882a3d8d746511 100644 (file)
@@ -443,8 +443,8 @@ struct pbt_scr *pbt_scr_init(void *talloc_ctx,
 
        assert(waiter_fd != -1);
 
-       waiter_register(waitset, waiter_fd, WAIT_IN, (void *)pbt_twin_waiter_cb,
-               &scr->twin_ctx);
+       waiter_register_io(waitset, waiter_fd, WAIT_IN,
+                       (void *)pbt_twin_waiter_cb, &scr->twin_ctx);
 
        return scr;