]> git.ozlabs.org Git - ccan/blob - ccan/io/poll.c
ccan/io: test custom io functions.
[ccan] / ccan / io / poll.c
1 /* Licensed under LGPLv2.1+ - see LICENSE file for details */
2 #include "io.h"
3 #include "backend.h"
4 #include <assert.h>
5 #include <poll.h>
6 #include <stdlib.h>
7 #include <sys/types.h>
8 #include <sys/socket.h>
9 #include <limits.h>
10
11 static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0;
12 static struct pollfd *pollfds = NULL;
13 static struct fd **fds = NULL;
14 static struct timers timeouts;
15 #ifdef DEBUG
16 static unsigned int io_loop_level;
17 static struct io_conn *free_later;
18 static void io_loop_enter(void)
19 {
20         io_loop_level++;
21 }
22 static void io_loop_exit(void)
23 {
24         io_loop_level--;
25         if (io_loop_level == 0) {
26                 /* Delayed free. */
27                 while (free_later) {
28                         struct io_conn *c = free_later;
29                         free_later = c->finish_arg;
30                         free(c);
31                 }
32         }
33 }
34 static void free_conn(struct io_conn *conn)
35 {
36         /* Only free on final exit: chain via finish. */
37         if (io_loop_level > 1) {
38                 struct io_conn *c;
39                 for (c = free_later; c; c = c->finish_arg)
40                         assert(c != conn);
41                 conn->finish_arg = free_later;
42                 free_later = conn;
43         } else
44                 free(conn);
45 }
46 #else
47 static void io_loop_enter(void)
48 {
49 }
50 static void io_loop_exit(void)
51 {
52 }
53 static void free_conn(struct io_conn *conn)
54 {
55         free(conn);
56 }
57 #endif
58
59 static bool add_fd(struct fd *fd, short events)
60 {
61         if (num_fds + 1 > max_fds) {
62                 struct pollfd *newpollfds;
63                 struct fd **newfds;
64                 size_t num = max_fds ? max_fds * 2 : 8;
65
66                 newpollfds = realloc(pollfds, sizeof(*newpollfds) * num);
67                 if (!newpollfds)
68                         return false;
69                 pollfds = newpollfds;
70                 newfds = realloc(fds, sizeof(*newfds) * num);
71                 if (!newfds)
72                         return false;
73                 fds = newfds;
74                 max_fds = num;
75         }
76
77         pollfds[num_fds].events = events;
78         /* In case it's idle. */
79         if (!events)
80                 pollfds[num_fds].fd = -fd->fd;
81         else
82                 pollfds[num_fds].fd = fd->fd;
83         pollfds[num_fds].revents = 0; /* In case we're iterating now */
84         fds[num_fds] = fd;
85         fd->backend_info = num_fds;
86         num_fds++;
87         if (events)
88                 num_waiting++;
89
90         return true;
91 }
92
93 static void del_fd(struct fd *fd)
94 {
95         size_t n = fd->backend_info;
96
97         assert(n != -1);
98         assert(n < num_fds);
99         if (pollfds[n].events)
100                 num_waiting--;
101         if (n != num_fds - 1) {
102                 /* Move last one over us. */
103                 pollfds[n] = pollfds[num_fds-1];
104                 fds[n] = fds[num_fds-1];
105                 assert(fds[n]->backend_info == num_fds-1);
106                 fds[n]->backend_info = n;
107         } else if (num_fds == 1) {
108                 /* Free everything when no more fds. */
109                 free(pollfds);
110                 free(fds);
111                 pollfds = NULL;
112                 fds = NULL;
113                 max_fds = 0;
114         }
115         num_fds--;
116         fd->backend_info = -1;
117         close(fd->fd);
118 }
119
120 bool add_listener(struct io_listener *l)
121 {
122         if (!add_fd(&l->fd, POLLIN))
123                 return false;
124         return true;
125 }
126
127 void backend_plan_changed(struct io_conn *conn)
128 {
129         struct pollfd *pfd;
130
131         /* This can happen with debugging and delayed free... */
132         if (conn->fd.backend_info == -1)
133                 return;
134
135         pfd = &pollfds[conn->fd.backend_info];
136
137         if (pfd->events)
138                 num_waiting--;
139
140         pfd->events = conn->plan.pollflag;
141         if (conn->duplex) {
142                 int mask = conn->duplex->plan.pollflag;
143                 /* You can't *both* read/write. */
144                 assert(!mask || pfd->events != mask);
145                 pfd->events |= mask;
146         }
147         if (pfd->events) {
148                 num_waiting++;
149                 pfd->fd = conn->fd.fd;
150         } else
151                 pfd->fd = -conn->fd.fd;
152
153         if (!conn->plan.next)
154                 num_closing++;
155 }
156
157 bool add_conn(struct io_conn *c)
158 {
159         if (!add_fd(&c->fd, c->plan.pollflag))
160                 return false;
161         /* Immediate close is allowed. */
162         if (!c->plan.next)
163                 num_closing++;
164         return true;
165 }
166
167 bool add_duplex(struct io_conn *c)
168 {
169         c->fd.backend_info = c->duplex->fd.backend_info;
170         backend_plan_changed(c);
171         return true;
172 }
173
174 static void del_conn(struct io_conn *conn)
175 {
176         if (conn->finish)
177                 conn->finish(conn, conn->finish_arg);
178         if (timeout_active(conn))
179                 backend_del_timeout(conn);
180         free(conn->timeout);
181         if (conn->duplex) {
182                 /* In case fds[] pointed to the other one. */
183                 fds[conn->fd.backend_info] = &conn->duplex->fd;
184                 conn->duplex->duplex = NULL;
185                 conn->fd.backend_info = -1;
186         } else
187                 del_fd(&conn->fd);
188         num_closing--;
189 }
190
191 void del_listener(struct io_listener *l)
192 {
193         del_fd(&l->fd);
194 }
195
196 static void set_plan(struct io_conn *conn, struct io_plan plan)
197 {
198         conn->plan = plan;
199         backend_plan_changed(conn);
200 }
201
202 static void accept_conn(struct io_listener *l)
203 {
204         int fd = accept(l->fd.fd, NULL, NULL);
205
206         /* FIXME: What to do here? */
207         if (fd < 0)
208                 return;
209         l->init(fd, l->arg);
210 }
211
212 /* It's OK to miss some, as long as we make progress. */
213 static void finish_conns(void)
214 {
215         unsigned int i;
216
217         for (i = 0; !io_loop_return && i < num_fds; i++) {
218                 struct io_conn *c, *duplex;
219
220                 if (!num_closing)
221                         break;
222
223                 if (fds[i]->listener)
224                         continue;
225                 c = (void *)fds[i];
226                 for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
227                         if (!c->plan.next) {
228                                 del_conn(c);
229                                 free_conn(c);
230                                 i--;
231                         }
232                 }
233         }
234 }
235
236 void backend_add_timeout(struct io_conn *conn, struct timespec duration)
237 {
238         if (!timeouts.base)
239                 timers_init(&timeouts, time_now());
240         timer_add(&timeouts, &conn->timeout->timer,
241                   time_add(time_now(), duration));
242         conn->timeout->conn = conn;
243 }
244
245 void backend_del_timeout(struct io_conn *conn)
246 {
247         assert(conn->timeout->conn == conn);
248         timer_del(&timeouts, &conn->timeout->timer);
249         conn->timeout->conn = NULL;
250 }
251
252 /* This is the main loop. */
253 void *io_loop(void)
254 {
255         void *ret;
256
257         io_loop_enter();
258
259         while (!io_loop_return) {
260                 int i, r, timeout = INT_MAX;
261                 struct timespec now;
262                 bool some_timeouts = false;
263
264                 if (timeouts.base) {
265                         struct timespec first;
266                         struct list_head expired;
267                         struct io_timeout *t;
268
269                         now = time_now();
270
271                         /* Call functions for expired timers. */
272                         timers_expire(&timeouts, now, &expired);
273                         while ((t = list_pop(&expired, struct io_timeout, timer.list))) {
274                                 struct io_conn *conn = t->conn;
275                                 /* Clear, in case timer re-adds */
276                                 t->conn = NULL;
277                                 set_current(conn);
278                                 set_plan(conn, t->next(conn, t->next_arg));
279                                 some_timeouts = true;
280                         }
281
282                         /* Now figure out how long to wait for the next one. */
283                         if (timer_earliest(&timeouts, &first)) {
284                                 uint64_t f = time_to_msec(time_sub(first, now));
285                                 if (f < INT_MAX)
286                                         timeout = f;
287                         }
288                 }
289
290                 if (num_closing) {
291                         finish_conns();
292                         /* Could have started/finished more. */
293                         continue;
294                 }
295
296                 /* debug can recurse on io_loop; anything can change. */
297                 if (doing_debug() && some_timeouts)
298                         continue;
299
300                 if (num_fds == 0)
301                         break;
302
303                 /* You can't tell them all to go to sleep! */
304                 assert(num_waiting);
305
306                 r = poll(pollfds, num_fds, timeout);
307                 if (r < 0)
308                         break;
309
310                 for (i = 0; i < num_fds && !io_loop_return; i++) {
311                         struct io_conn *c = (void *)fds[i];
312                         int events = pollfds[i].revents;
313
314                         if (r == 0)
315                                 break;
316
317                         if (fds[i]->listener) {
318                                 if (events & POLLIN) {
319                                         accept_conn((void *)c);
320                                         r--;
321                                 }
322                         } else if (events & (POLLIN|POLLOUT)) {
323                                 r--;
324                                 if (c->duplex) {
325                                         int mask = c->duplex->plan.pollflag;
326                                         if (events & mask) {
327                                                 io_ready(c->duplex);
328                                                 events &= ~mask;
329                                                 /* debug can recurse;
330                                                  * anything can change. */
331                                                 if (doing_debug())
332                                                         break;
333                                                 if (!(events&(POLLIN|POLLOUT)))
334                                                         continue;
335                                         }
336                                 }
337                                 io_ready(c);
338                                 /* debug can recurse; anything can change. */
339                                 if (doing_debug())
340                                         break;
341                         } else if (events & POLLHUP) {
342                                 r--;
343                                 set_current(c);
344                                 set_plan(c, io_close(c, NULL));
345                                 if (c->duplex) {
346                                         set_current(c->duplex);
347                                         set_plan(c->duplex,
348                                                  io_close(c->duplex, NULL));
349                                 }
350                         }
351                 }
352         }
353
354         while (num_closing)
355                 finish_conns();
356
357         ret = io_loop_return;
358         io_loop_return = NULL;
359
360         io_loop_exit();
361         return ret;
362 }