]> git.ozlabs.org Git - ccan/blob - ccan/io/poll.c
ccan/io: fix io_connect.
[ccan] / ccan / io / poll.c
1 /* Licensed under LGPLv2.1+ - see LICENSE file for details */
2 #include "io.h"
3 #include "backend.h"
4 #include <assert.h>
5 #include <poll.h>
6 #include <stdlib.h>
7 #include <sys/types.h>
8 #include <sys/socket.h>
9 #include <limits.h>
10 #include <errno.h>
11
12 static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0;
13 static bool some_always = false;
14 static struct pollfd *pollfds = NULL;
15 static struct fd **fds = NULL;
16 static struct timers timeouts;
17 #ifdef DEBUG
18 static unsigned int io_loop_level;
19 static struct io_conn *free_later;
20 static void io_loop_enter(void)
21 {
22         io_loop_level++;
23 }
24 static void io_loop_exit(void)
25 {
26         io_loop_level--;
27         if (io_loop_level == 0) {
28                 /* Delayed free. */
29                 while (free_later) {
30                         struct io_conn *c = free_later;
31                         free_later = c->finish_arg;
32                         io_alloc.free(c);
33                 }
34         }
35 }
36 static void free_conn(struct io_conn *conn)
37 {
38         /* Only free on final exit: chain via finish. */
39         if (io_loop_level > 1) {
40                 struct io_conn *c;
41                 for (c = free_later; c; c = c->finish_arg)
42                         assert(c != conn);
43                 conn->finish_arg = free_later;
44                 free_later = conn;
45         } else
46                 io_alloc.free(conn);
47 }
48 #else
49 static void io_loop_enter(void)
50 {
51 }
52 static void io_loop_exit(void)
53 {
54 }
55 static void free_conn(struct io_conn *conn)
56 {
57         io_alloc.free(conn);
58 }
59 #endif
60
61 static bool add_fd(struct fd *fd, short events)
62 {
63         if (num_fds + 1 > max_fds) {
64                 struct pollfd *newpollfds;
65                 struct fd **newfds;
66                 size_t num = max_fds ? max_fds * 2 : 8;
67
68                 newpollfds = io_alloc.realloc(pollfds, sizeof(*newpollfds)*num);
69                 if (!newpollfds)
70                         return false;
71                 pollfds = newpollfds;
72                 newfds = io_alloc.realloc(fds, sizeof(*newfds) * num);
73                 if (!newfds)
74                         return false;
75                 fds = newfds;
76                 max_fds = num;
77         }
78
79         pollfds[num_fds].events = events;
80         /* In case it's idle. */
81         if (!events)
82                 pollfds[num_fds].fd = -fd->fd;
83         else
84                 pollfds[num_fds].fd = fd->fd;
85         pollfds[num_fds].revents = 0; /* In case we're iterating now */
86         fds[num_fds] = fd;
87         fd->backend_info = num_fds;
88         num_fds++;
89         if (events)
90                 num_waiting++;
91
92         return true;
93 }
94
95 static void del_fd(struct fd *fd)
96 {
97         size_t n = fd->backend_info;
98
99         assert(n != -1);
100         assert(n < num_fds);
101         if (pollfds[n].events)
102                 num_waiting--;
103         if (n != num_fds - 1) {
104                 /* Move last one over us. */
105                 pollfds[n] = pollfds[num_fds-1];
106                 fds[n] = fds[num_fds-1];
107                 assert(fds[n]->backend_info == num_fds-1);
108                 fds[n]->backend_info = n;
109                 /* If that happens to be a duplex, move that too. */
110                 if (!fds[n]->listener) {
111                         struct io_conn *c = (void *)fds[n];
112                         if (c->duplex) {
113                                 assert(c->duplex->fd.backend_info == num_fds-1);
114                                 c->duplex->fd.backend_info = n;
115                         }
116                 }
117         } else if (num_fds == 1) {
118                 /* Free everything when no more fds. */
119                 io_alloc.free(pollfds);
120                 io_alloc.free(fds);
121                 pollfds = NULL;
122                 fds = NULL;
123                 max_fds = 0;
124         }
125         num_fds--;
126         fd->backend_info = -1;
127         close(fd->fd);
128 }
129
130 bool add_listener(struct io_listener *l)
131 {
132         if (!add_fd(&l->fd, POLLIN))
133                 return false;
134         return true;
135 }
136
137 void backend_plan_changed(struct io_conn *conn)
138 {
139         struct pollfd *pfd;
140
141         /* This can happen with debugging and delayed free... */
142         if (conn->fd.backend_info == -1)
143                 return;
144
145         pfd = &pollfds[conn->fd.backend_info];
146
147         if (pfd->events)
148                 num_waiting--;
149
150         pfd->events = conn->plan.pollflag & (POLLIN|POLLOUT);
151         if (conn->duplex) {
152                 int mask = conn->duplex->plan.pollflag & (POLLIN|POLLOUT);
153                 /* You can't *both* read/write. */
154                 assert(!mask || pfd->events != mask);
155                 pfd->events |= mask;
156         }
157         if (pfd->events) {
158                 num_waiting++;
159                 pfd->fd = conn->fd.fd;
160         } else
161                 pfd->fd = -conn->fd.fd;
162
163         if (!conn->plan.next)
164                 num_closing++;
165
166         if (conn->plan.pollflag == POLLALWAYS)
167                 some_always = true;
168 }
169
170 void backend_wait_changed(const void *wait)
171 {
172         unsigned int i;
173
174         for (i = 0; i < num_fds; i++) {
175                 struct io_conn *c, *duplex;
176
177                 /* Ignore listeners */
178                 if (fds[i]->listener)
179                         continue;
180                 c = (void *)fds[i];
181                 for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
182                         /* Ignore closing. */
183                         if (!c->plan.next)
184                                 continue;
185                         /* Not idle? */
186                         if (c->plan.io)
187                                 continue;
188                         /* Waiting on something else? */
189                         if (c->plan.u1.const_vp != wait)
190                                 continue;
191                         /* Make it do the next thing. */
192                         c->plan = io_always_(c->plan.next, c->plan.next_arg);
193                         backend_plan_changed(c);
194                 }
195         }
196 }
197
198 bool add_conn(struct io_conn *c)
199 {
200         if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT)))
201                 return false;
202         /* Immediate close is allowed. */
203         if (!c->plan.next)
204                 num_closing++;
205         if (c->plan.pollflag == POLLALWAYS)
206                 some_always = true;
207         return true;
208 }
209
210 bool add_duplex(struct io_conn *c)
211 {
212         c->fd.backend_info = c->duplex->fd.backend_info;
213         backend_plan_changed(c);
214         return true;
215 }
216
217 void backend_del_conn(struct io_conn *conn)
218 {
219         if (timeout_active(conn))
220                 backend_del_timeout(conn);
221         io_alloc.free(conn->timeout);
222         if (conn->duplex) {
223                 /* In case fds[] pointed to the other one. */
224                 assert(conn->duplex->fd.backend_info == conn->fd.backend_info);
225                 fds[conn->fd.backend_info] = &conn->duplex->fd;
226                 conn->duplex->duplex = NULL;
227                 conn->fd.backend_info = -1;
228         } else
229                 del_fd(&conn->fd);
230         num_closing--;
231         if (conn->finish) {
232                 /* Saved by io_close */
233                 errno = conn->plan.u1.s;
234                 conn->finish(conn, conn->finish_arg);
235         }
236         free_conn(conn);
237 }
238
239 void del_listener(struct io_listener *l)
240 {
241         del_fd(&l->fd);
242 }
243
244 static void set_plan(struct io_conn *conn, struct io_plan plan)
245 {
246         conn->plan = plan;
247         backend_plan_changed(conn);
248 }
249
250 static void accept_conn(struct io_listener *l)
251 {
252         int fd = accept(l->fd.fd, NULL, NULL);
253
254         /* FIXME: What to do here? */
255         if (fd < 0)
256                 return;
257         l->init(fd, l->arg);
258 }
259
260 /* It's OK to miss some, as long as we make progress. */
261 static bool finish_conns(struct io_conn **ready)
262 {
263         unsigned int i;
264
265         for (i = 0; !io_loop_return && i < num_fds; i++) {
266                 struct io_conn *c, *duplex;
267
268                 if (!num_closing)
269                         break;
270
271                 if (fds[i]->listener)
272                         continue;
273                 c = (void *)fds[i];
274                 for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
275                         if (!c->plan.next) {
276                                 if (doing_debug_on(c) && ready) {
277                                         *ready = c;
278                                         return true;
279                                 }
280                                 backend_del_conn(c);
281                                 i--;
282                         }
283                 }
284         }
285         return false;
286 }
287
288 void backend_add_timeout(struct io_conn *conn, struct timerel duration)
289 {
290         if (!timeouts.base)
291                 timers_init(&timeouts, time_now());
292         timer_add(&timeouts, &conn->timeout->timer,
293                   timeabs_add(time_now(), duration));
294         conn->timeout->conn = conn;
295 }
296
297 void backend_del_timeout(struct io_conn *conn)
298 {
299         assert(conn->timeout->conn == conn);
300         timer_del(&timeouts, &conn->timeout->timer);
301         conn->timeout->conn = NULL;
302 }
303
304 static void handle_always(void)
305 {
306         int i;
307
308         some_always = false;
309
310         for (i = 0; i < num_fds && !io_loop_return; i++) {
311                 struct io_conn *c = (void *)fds[i];
312
313                 if (fds[i]->listener)
314                         continue;
315
316                 if (c->plan.pollflag == POLLALWAYS)
317                         io_ready(c);
318
319                 if (c->duplex && c->duplex->plan.pollflag == POLLALWAYS)
320                         io_ready(c->duplex);
321         }
322 }
323
324 /* This is the main loop. */
325 void *do_io_loop(struct io_conn **ready)
326 {
327         void *ret;
328
329         io_loop_enter();
330
331         while (!io_loop_return) {
332                 int i, r, timeout = INT_MAX;
333                 struct timeabs now;
334                 bool some_timeouts = false;
335
336                 if (timeouts.base) {
337                         struct timeabs first;
338                         struct list_head expired;
339                         struct io_timeout *t;
340
341                         now = time_now();
342
343                         /* Call functions for expired timers. */
344                         timers_expire(&timeouts, now, &expired);
345                         while ((t = list_pop(&expired, struct io_timeout, timer.list))) {
346                                 struct io_conn *conn = t->conn;
347                                 /* Clear, in case timer re-adds */
348                                 t->conn = NULL;
349                                 set_current(conn);
350                                 set_plan(conn, t->next(conn, t->next_arg));
351                                 some_timeouts = true;
352                         }
353
354                         /* Now figure out how long to wait for the next one. */
355                         if (timer_earliest(&timeouts, &first)) {
356                                 uint64_t f = time_to_msec(time_between(first, now));
357                                 if (f < INT_MAX)
358                                         timeout = f;
359                         }
360                 }
361
362                 if (num_closing) {
363                         /* If this finishes a debugging con, return now. */
364                         if (finish_conns(ready))
365                                 return NULL;
366                         /* Could have started/finished more. */
367                         continue;
368                 }
369
370                 /* debug can recurse on io_loop; anything can change. */
371                 if (doing_debug() && some_timeouts)
372                         continue;
373
374                 if (some_always) {
375                         handle_always();
376                         continue;
377                 }
378
379                 if (num_fds == 0)
380                         break;
381
382                 /* You can't tell them all to go to sleep! */
383                 assert(num_waiting);
384
385                 r = poll(pollfds, num_fds, timeout);
386                 if (r < 0)
387                         break;
388
389                 for (i = 0; i < num_fds && !io_loop_return; i++) {
390                         struct io_conn *c = (void *)fds[i];
391                         int events = pollfds[i].revents;
392
393                         if (r == 0)
394                                 break;
395
396                         if (fds[i]->listener) {
397                                 if (events & POLLIN) {
398                                         accept_conn((void *)c);
399                                         r--;
400                                 }
401                         } else if (events & (POLLIN|POLLOUT)) {
402                                 r--;
403                                 if (c->duplex) {
404                                         int mask = c->duplex->plan.pollflag;
405                                         if (events & mask) {
406                                                 if (doing_debug_on(c->duplex)
407                                                         && ready) {
408                                                         *ready = c->duplex;
409                                                         return NULL;
410                                                 }
411                                                 io_ready(c->duplex);
412                                                 events &= ~mask;
413                                                 /* debug can recurse;
414                                                  * anything can change. */
415                                                 if (doing_debug())
416                                                         break;
417
418                                                 /* If no events, or it closed
419                                                  * the duplex, continue. */
420                                                 if (!(events&(POLLIN|POLLOUT))
421                                                     || !c->plan.next)
422                                                         continue;
423                                         }
424                                 }
425                                 if (doing_debug_on(c) && ready) {
426                                         *ready = c;
427                                         return NULL;
428                                 }
429                                 io_ready(c);
430                                 /* debug can recurse; anything can change. */
431                                 if (doing_debug())
432                                         break;
433                         } else if (events & (POLLHUP|POLLNVAL|POLLERR)) {
434                                 r--;
435                                 set_current(c);
436                                 errno = EBADF;
437                                 set_plan(c, io_close());
438                                 if (c->duplex) {
439                                         set_current(c->duplex);
440                                         set_plan(c->duplex, io_close());
441                                 }
442                         }
443                 }
444         }
445
446         while (num_closing && !io_loop_return) {
447                 if (finish_conns(ready))
448                         return NULL;
449         }
450
451         ret = io_loop_return;
452         io_loop_return = NULL;
453
454         io_loop_exit();
455         return ret;
456 }
457
458 void *io_loop(void)
459 {
460         return do_io_loop(NULL);
461 }