]> git.ozlabs.org Git - ccan/blob - ccan/io/poll.c
io: io_never for events that should never happen.
[ccan] / ccan / io / poll.c
1 /* Licensed under LGPLv2.1+ - see LICENSE file for details */
2 #include "io.h"
3 #include "backend.h"
4 #include <assert.h>
5 #include <poll.h>
6 #include <stdlib.h>
7 #include <sys/types.h>
8 #include <sys/socket.h>
9 #include <limits.h>
10 #include <errno.h>
11
12 static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0;
13 static bool some_always = false;
14 static struct pollfd *pollfds = NULL;
15 static struct fd **fds = NULL;
16 static struct timers timeouts;
17 #ifdef DEBUG
18 static unsigned int io_loop_level;
19 static struct io_conn *free_later;
20 static void io_loop_enter(void)
21 {
22         io_loop_level++;
23 }
24 static void io_loop_exit(void)
25 {
26         io_loop_level--;
27         if (io_loop_level == 0) {
28                 /* Delayed free. */
29                 while (free_later) {
30                         struct io_conn *c = free_later;
31                         free_later = c->finish_arg;
32                         io_alloc.free(c);
33                 }
34         }
35 }
36 static void free_conn(struct io_conn *conn)
37 {
38         /* Only free on final exit: chain via finish. */
39         if (io_loop_level > 1) {
40                 struct io_conn *c;
41                 for (c = free_later; c; c = c->finish_arg)
42                         assert(c != conn);
43                 conn->finish_arg = free_later;
44                 free_later = conn;
45         } else
46                 io_alloc.free(conn);
47 }
48 #else
49 static void io_loop_enter(void)
50 {
51 }
52 static void io_loop_exit(void)
53 {
54 }
55 static void free_conn(struct io_conn *conn)
56 {
57         io_alloc.free(conn);
58 }
59 #endif
60
61 static bool add_fd(struct fd *fd, short events)
62 {
63         if (num_fds + 1 > max_fds) {
64                 struct pollfd *newpollfds;
65                 struct fd **newfds;
66                 size_t num = max_fds ? max_fds * 2 : 8;
67
68                 newpollfds = io_alloc.realloc(pollfds, sizeof(*newpollfds)*num);
69                 if (!newpollfds)
70                         return false;
71                 pollfds = newpollfds;
72                 newfds = io_alloc.realloc(fds, sizeof(*newfds) * num);
73                 if (!newfds)
74                         return false;
75                 fds = newfds;
76                 max_fds = num;
77         }
78
79         pollfds[num_fds].events = events;
80         /* In case it's idle. */
81         if (!events)
82                 pollfds[num_fds].fd = -fd->fd;
83         else
84                 pollfds[num_fds].fd = fd->fd;
85         pollfds[num_fds].revents = 0; /* In case we're iterating now */
86         fds[num_fds] = fd;
87         fd->backend_info = num_fds;
88         num_fds++;
89         if (events)
90                 num_waiting++;
91
92         return true;
93 }
94
95 static void del_fd(struct fd *fd)
96 {
97         size_t n = fd->backend_info;
98
99         assert(n != -1);
100         assert(n < num_fds);
101         if (pollfds[n].events)
102                 num_waiting--;
103         if (n != num_fds - 1) {
104                 /* Move last one over us. */
105                 pollfds[n] = pollfds[num_fds-1];
106                 fds[n] = fds[num_fds-1];
107                 assert(fds[n]->backend_info == num_fds-1);
108                 fds[n]->backend_info = n;
109                 /* If that happens to be a duplex, move that too. */
110                 if (!fds[n]->listener) {
111                         struct io_conn *c = (void *)fds[n];
112                         if (c->duplex) {
113                                 assert(c->duplex->fd.backend_info == num_fds-1);
114                                 c->duplex->fd.backend_info = n;
115                         }
116                 }
117         } else if (num_fds == 1) {
118                 /* Free everything when no more fds. */
119                 io_alloc.free(pollfds);
120                 io_alloc.free(fds);
121                 pollfds = NULL;
122                 fds = NULL;
123                 max_fds = 0;
124         }
125         num_fds--;
126         fd->backend_info = -1;
127         close(fd->fd);
128 }
129
130 bool add_listener(struct io_listener *l)
131 {
132         if (!add_fd(&l->fd, POLLIN))
133                 return false;
134         return true;
135 }
136
137 void backend_plan_changed(struct io_conn *conn)
138 {
139         struct pollfd *pfd;
140
141         /* This can happen with debugging and delayed free... */
142         if (conn->fd.backend_info == -1)
143                 return;
144
145         pfd = &pollfds[conn->fd.backend_info];
146
147         if (pfd->events)
148                 num_waiting--;
149
150         pfd->events = conn->plan.pollflag & (POLLIN|POLLOUT);
151         if (conn->duplex) {
152                 int mask = conn->duplex->plan.pollflag & (POLLIN|POLLOUT);
153                 /* You can't *both* read/write. */
154                 assert(!mask || pfd->events != mask);
155                 pfd->events |= mask;
156         }
157         if (pfd->events) {
158                 num_waiting++;
159                 pfd->fd = conn->fd.fd;
160         } else
161                 pfd->fd = -conn->fd.fd;
162
163         if (!conn->plan.next)
164                 num_closing++;
165
166         if (conn->plan.pollflag == POLLALWAYS)
167                 some_always = true;
168 }
169
170 bool add_conn(struct io_conn *c)
171 {
172         if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT)))
173                 return false;
174         /* Immediate close is allowed. */
175         if (!c->plan.next)
176                 num_closing++;
177         if (c->plan.pollflag == POLLALWAYS)
178                 some_always = true;
179         return true;
180 }
181
182 bool add_duplex(struct io_conn *c)
183 {
184         c->fd.backend_info = c->duplex->fd.backend_info;
185         backend_plan_changed(c);
186         return true;
187 }
188
189 void backend_del_conn(struct io_conn *conn)
190 {
191         if (timeout_active(conn))
192                 backend_del_timeout(conn);
193         io_alloc.free(conn->timeout);
194         if (conn->duplex) {
195                 /* In case fds[] pointed to the other one. */
196                 assert(conn->duplex->fd.backend_info == conn->fd.backend_info);
197                 fds[conn->fd.backend_info] = &conn->duplex->fd;
198                 conn->duplex->duplex = NULL;
199                 conn->fd.backend_info = -1;
200         } else
201                 del_fd(&conn->fd);
202         num_closing--;
203         if (conn->finish) {
204                 /* Saved by io_close */
205                 errno = conn->plan.u1.s;
206                 conn->finish(conn, conn->finish_arg);
207         }
208         free_conn(conn);
209 }
210
211 void del_listener(struct io_listener *l)
212 {
213         del_fd(&l->fd);
214 }
215
216 static void set_plan(struct io_conn *conn, struct io_plan plan)
217 {
218         conn->plan = plan;
219         backend_plan_changed(conn);
220 }
221
222 static void accept_conn(struct io_listener *l)
223 {
224         int fd = accept(l->fd.fd, NULL, NULL);
225
226         /* FIXME: What to do here? */
227         if (fd < 0)
228                 return;
229         l->init(fd, l->arg);
230 }
231
232 /* It's OK to miss some, as long as we make progress. */
233 static bool finish_conns(struct io_conn **ready)
234 {
235         unsigned int i;
236
237         for (i = 0; !io_loop_return && i < num_fds; i++) {
238                 struct io_conn *c, *duplex;
239
240                 if (!num_closing)
241                         break;
242
243                 if (fds[i]->listener)
244                         continue;
245                 c = (void *)fds[i];
246                 for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
247                         if (!c->plan.next) {
248                                 if (doing_debug_on(c) && ready) {
249                                         *ready = c;
250                                         return true;
251                                 }
252                                 backend_del_conn(c);
253                                 i--;
254                         }
255                 }
256         }
257         return false;
258 }
259
260 void backend_add_timeout(struct io_conn *conn, struct timespec duration)
261 {
262         if (!timeouts.base)
263                 timers_init(&timeouts, time_now());
264         timer_add(&timeouts, &conn->timeout->timer,
265                   time_add(time_now(), duration));
266         conn->timeout->conn = conn;
267 }
268
269 void backend_del_timeout(struct io_conn *conn)
270 {
271         assert(conn->timeout->conn == conn);
272         timer_del(&timeouts, &conn->timeout->timer);
273         conn->timeout->conn = NULL;
274 }
275
276 static void handle_always(void)
277 {
278         int i;
279
280         some_always = false;
281
282         for (i = 0; i < num_fds && !io_loop_return; i++) {
283                 struct io_conn *c = (void *)fds[i];
284
285                 if (fds[i]->listener)
286                         continue;
287
288                 if (c->plan.pollflag == POLLALWAYS)
289                         io_ready(c);
290
291                 if (c->duplex && c->duplex->plan.pollflag == POLLALWAYS)
292                         io_ready(c->duplex);
293         }
294 }
295
296 /* This is the main loop. */
297 void *do_io_loop(struct io_conn **ready)
298 {
299         void *ret;
300
301         io_loop_enter();
302
303         while (!io_loop_return) {
304                 int i, r, timeout = INT_MAX;
305                 struct timespec now;
306                 bool some_timeouts = false;
307
308                 if (timeouts.base) {
309                         struct timespec first;
310                         struct list_head expired;
311                         struct io_timeout *t;
312
313                         now = time_now();
314
315                         /* Call functions for expired timers. */
316                         timers_expire(&timeouts, now, &expired);
317                         while ((t = list_pop(&expired, struct io_timeout, timer.list))) {
318                                 struct io_conn *conn = t->conn;
319                                 /* Clear, in case timer re-adds */
320                                 t->conn = NULL;
321                                 set_current(conn);
322                                 set_plan(conn, t->next(conn, t->next_arg));
323                                 some_timeouts = true;
324                         }
325
326                         /* Now figure out how long to wait for the next one. */
327                         if (timer_earliest(&timeouts, &first)) {
328                                 uint64_t f = time_to_msec(time_sub(first, now));
329                                 if (f < INT_MAX)
330                                         timeout = f;
331                         }
332                 }
333
334                 if (num_closing) {
335                         /* If this finishes a debugging con, return now. */
336                         if (finish_conns(ready))
337                                 return NULL;
338                         /* Could have started/finished more. */
339                         continue;
340                 }
341
342                 /* debug can recurse on io_loop; anything can change. */
343                 if (doing_debug() && some_timeouts)
344                         continue;
345
346                 if (some_always) {
347                         handle_always();
348                         continue;
349                 }
350
351                 if (num_fds == 0)
352                         break;
353
354                 /* You can't tell them all to go to sleep! */
355                 assert(num_waiting);
356
357                 r = poll(pollfds, num_fds, timeout);
358                 if (r < 0)
359                         break;
360
361                 for (i = 0; i < num_fds && !io_loop_return; i++) {
362                         struct io_conn *c = (void *)fds[i];
363                         int events = pollfds[i].revents;
364
365                         if (r == 0)
366                                 break;
367
368                         if (fds[i]->listener) {
369                                 if (events & POLLIN) {
370                                         accept_conn((void *)c);
371                                         r--;
372                                 }
373                         } else if (events & (POLLIN|POLLOUT)) {
374                                 r--;
375                                 if (c->duplex) {
376                                         int mask = c->duplex->plan.pollflag;
377                                         if (events & mask) {
378                                                 if (doing_debug_on(c->duplex)
379                                                         && ready) {
380                                                         *ready = c->duplex;
381                                                         return NULL;
382                                                 }
383                                                 io_ready(c->duplex);
384                                                 events &= ~mask;
385                                                 /* debug can recurse;
386                                                  * anything can change. */
387                                                 if (doing_debug())
388                                                         break;
389
390                                                 /* If no events, or it closed
391                                                  * the duplex, continue. */
392                                                 if (!(events&(POLLIN|POLLOUT))
393                                                     || !c->plan.next)
394                                                         continue;
395                                         }
396                                 }
397                                 if (doing_debug_on(c) && ready) {
398                                         *ready = c;
399                                         return NULL;
400                                 }
401                                 io_ready(c);
402                                 /* debug can recurse; anything can change. */
403                                 if (doing_debug())
404                                         break;
405                         } else if (events & (POLLHUP|POLLNVAL|POLLERR)) {
406                                 r--;
407                                 set_current(c);
408                                 errno = EBADF;
409                                 set_plan(c, io_close());
410                                 if (c->duplex) {
411                                         set_current(c->duplex);
412                                         set_plan(c->duplex, io_close());
413                                 }
414                         }
415                 }
416         }
417
418         while (num_closing && !io_loop_return) {
419                 if (finish_conns(ready))
420                         return NULL;
421         }
422
423         ret = io_loop_return;
424         io_loop_return = NULL;
425
426         io_loop_exit();
427         return ret;
428 }
429
430 void *io_loop(void)
431 {
432         return do_io_loop(NULL);
433 }