]> git.ozlabs.org Git - ccan/blob - ccan/io/io.c
io failtest timer tools: fallout from time changes.
[ccan] / ccan / io / io.c
1 /* Licensed under LGPLv2.1+ - see LICENSE file for details */
2 #include "io.h"
3 #include "backend.h"
4 #include <sys/types.h>
5 #include <sys/socket.h>
6 #include <netdb.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <assert.h>
11 #include <unistd.h>
12 #include <fcntl.h>
13
14 void *io_loop_return;
15
16 struct io_alloc io_alloc = {
17         malloc, realloc, free
18 };
19
20 #ifdef DEBUG
21 /* Set to skip the next plan. */
22 bool io_plan_nodebug;
23 /* The current connection to apply plan to. */
24 struct io_conn *current;
25 /* User-defined function to select which connection(s) to debug. */
26 bool (*io_debug_conn)(struct io_conn *conn);
27
28 struct io_plan io_debug(struct io_plan plan)
29 {
30         struct io_conn *ready = NULL;
31
32         if (io_plan_nodebug) {
33                 io_plan_nodebug = false;
34                 return plan;
35         }
36
37         if (!current || !doing_debug_on(current))
38                 return plan;
39
40         current->plan = plan;
41         backend_plan_changed(current);
42
43         /* Call back into the loop immediately. */
44         io_loop_return = do_io_loop(&ready);
45
46         if (ready) {
47                 set_current(ready);
48                 if (!ready->plan.next) {
49                         /* Call finish function immediately. */
50                         if (ready->finish) {
51                                 errno = ready->plan.u1.s;
52                                 ready->finish(ready, ready->finish_arg);
53                                 ready->finish = NULL;
54                         }
55                         backend_del_conn(ready);
56                 } else {
57                         /* Calls back in itself, via io_debug_io(). */
58                         if (ready->plan.io(ready->fd.fd, &ready->plan) != 2)
59                                 abort();
60                 }
61                 set_current(NULL);
62         }
63
64         /* Return a do-nothing plan, so backend_plan_changed in
65          * io_ready doesn't do anything (it's already been called). */
66         return io_wait_(NULL, (void *)1, NULL);
67 }
68
69 int io_debug_io(int ret)
70 {
71         /* Cache it for debugging; current changes. */
72         struct io_conn *conn = current;
73         int saved_errno = errno;
74
75         if (!doing_debug_on(conn))
76                 return ret;
77
78         /* These will all go linearly through the io_debug() path above. */
79         switch (ret) {
80         case -1:
81                 /* This will call io_debug above. */
82                 errno = saved_errno;
83                 io_close();
84                 break;
85         case 0: /* Keep going with plan. */
86                 io_debug(conn->plan);
87                 break;
88         case 1: /* Done: get next plan. */
89                 if (timeout_active(conn))
90                         backend_del_timeout(conn);
91                 /* In case they call io_duplex, clear our poll flags so
92                  * both sides don't seem to be both doing read or write
93                  * (See assert(!mask || pfd->events != mask) in poll.c) */
94                 conn->plan.pollflag = 0;
95                 conn->plan.next(conn, conn->plan.next_arg);
96                 break;
97         default:
98                 abort();
99         }
100
101         /* Normally-invalid value, used for sanity check. */
102         return 2;
103 }
104
105 /* Counterpart to io_plan_no_debug(), called in macros in io.h */
106 static void io_plan_debug_again(void)
107 {
108         io_plan_nodebug = false;
109 }
110 #else
111 static void io_plan_debug_again(void)
112 {
113 }
114 #endif
115
116 struct io_listener *io_new_listener_(int fd,
117                                      void (*init)(int fd, void *arg),
118                                      void *arg)
119 {
120         struct io_listener *l = io_alloc.alloc(sizeof(*l));
121
122         if (!l)
123                 return NULL;
124
125         l->fd.listener = true;
126         l->fd.fd = fd;
127         l->init = init;
128         l->arg = arg;
129         if (!add_listener(l)) {
130                 io_alloc.free(l);
131                 return NULL;
132         }
133         return l;
134 }
135
136 void io_close_listener(struct io_listener *l)
137 {
138         close(l->fd.fd);
139         del_listener(l);
140         io_alloc.free(l);
141 }
142
143 struct io_conn *io_new_conn_(int fd, struct io_plan plan)
144 {
145         struct io_conn *conn = io_alloc.alloc(sizeof(*conn));
146
147         io_plan_debug_again();
148
149         if (!conn)
150                 return NULL;
151
152         conn->fd.listener = false;
153         conn->fd.fd = fd;
154         conn->plan = plan;
155         conn->finish = NULL;
156         conn->finish_arg = NULL;
157         conn->duplex = NULL;
158         conn->timeout = NULL;
159         if (!add_conn(conn)) {
160                 io_alloc.free(conn);
161                 return NULL;
162         }
163         return conn;
164 }
165
166 void io_set_finish_(struct io_conn *conn,
167                     void (*finish)(struct io_conn *, void *),
168                     void *arg)
169 {
170         conn->finish = finish;
171         conn->finish_arg = arg;
172 }
173
174 struct io_conn *io_duplex_(struct io_conn *old, struct io_plan plan)
175 {
176         struct io_conn *conn;
177
178         io_plan_debug_again();
179
180         assert(!old->duplex);
181
182         conn = io_alloc.alloc(sizeof(*conn));
183         if (!conn)
184                 return NULL;
185
186         conn->fd.listener = false;
187         conn->fd.fd = old->fd.fd;
188         conn->plan = plan;
189         conn->duplex = old;
190         conn->finish = NULL;
191         conn->finish_arg = NULL;
192         conn->timeout = NULL;
193         if (!add_duplex(conn)) {
194                 io_alloc.free(conn);
195                 return NULL;
196         }
197         old->duplex = conn;
198         return conn;
199 }
200
201 bool io_timeout_(struct io_conn *conn, struct timerel t,
202                  struct io_plan (*cb)(struct io_conn *, void *), void *arg)
203 {
204         assert(cb);
205
206         if (!conn->timeout) {
207                 conn->timeout = io_alloc.alloc(sizeof(*conn->timeout));
208                 if (!conn->timeout)
209                         return false;
210         } else
211                 assert(!timeout_active(conn));
212
213         conn->timeout->next = cb;
214         conn->timeout->next_arg = arg;
215         backend_add_timeout(conn, t);
216         return true;
217 }
218
219 /* Always done: call the next thing. */
220 static int do_always(int fd, struct io_plan *plan)
221 {
222         return 1;
223 }
224
225 struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
226                           void *arg)
227 {
228         struct io_plan plan;
229
230         assert(cb);
231         plan.io = do_always;
232         plan.next = cb;
233         plan.next_arg = arg;
234         plan.pollflag = POLLALWAYS;
235
236         return plan;
237 }
238
239 /* Returns true if we're finished. */
240 static int do_write(int fd, struct io_plan *plan)
241 {
242         ssize_t ret = write(fd, plan->u1.cp, plan->u2.s);
243         if (ret < 0)
244                 return io_debug_io(-1);
245
246         plan->u1.cp += ret;
247         plan->u2.s -= ret;
248         return io_debug_io(plan->u2.s == 0);
249 }
250
251 /* Queue some data to be written. */
252 struct io_plan io_write_(const void *data, size_t len,
253                          struct io_plan (*cb)(struct io_conn *, void *),
254                          void *arg)
255 {
256         struct io_plan plan;
257
258         assert(cb);
259
260         if (len == 0)
261                 return io_always_(cb, arg);
262
263         plan.u1.const_vp = data;
264         plan.u2.s = len;
265         plan.io = do_write;
266         plan.next = cb;
267         plan.next_arg = arg;
268         plan.pollflag = POLLOUT;
269
270         return plan;
271 }
272
273 static int do_read(int fd, struct io_plan *plan)
274 {
275         ssize_t ret = read(fd, plan->u1.cp, plan->u2.s);
276         if (ret <= 0)
277                 return io_debug_io(-1);
278
279         plan->u1.cp += ret;
280         plan->u2.s -= ret;
281         return io_debug_io(plan->u2.s == 0);
282 }
283
284 /* Queue a request to read into a buffer. */
285 struct io_plan io_read_(void *data, size_t len,
286                         struct io_plan (*cb)(struct io_conn *, void *),
287                         void *arg)
288 {
289         struct io_plan plan;
290
291         assert(cb);
292
293         if (len == 0)
294                 return io_always_(cb, arg);
295
296         plan.u1.cp = data;
297         plan.u2.s = len;
298         plan.io = do_read;
299         plan.next = cb;
300         plan.next_arg = arg;
301
302         plan.pollflag = POLLIN;
303
304         return plan;
305 }
306
307 static int do_read_partial(int fd, struct io_plan *plan)
308 {
309         ssize_t ret = read(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
310         if (ret <= 0)
311                 return io_debug_io(-1);
312
313         *(size_t *)plan->u2.vp = ret;
314         return io_debug_io(1);
315 }
316
317 /* Queue a partial request to read into a buffer. */
318 struct io_plan io_read_partial_(void *data, size_t *len,
319                                 struct io_plan (*cb)(struct io_conn *, void *),
320                                 void *arg)
321 {
322         struct io_plan plan;
323
324         assert(cb);
325
326         if (*len == 0)
327                 return io_always_(cb, arg);
328
329         plan.u1.cp = data;
330         plan.u2.vp = len;
331         plan.io = do_read_partial;
332         plan.next = cb;
333         plan.next_arg = arg;
334         plan.pollflag = POLLIN;
335
336         return plan;
337 }
338
339 static int do_write_partial(int fd, struct io_plan *plan)
340 {
341         ssize_t ret = write(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
342         if (ret < 0)
343                 return io_debug_io(-1);
344
345         *(size_t *)plan->u2.vp = ret;
346         return io_debug_io(1);
347 }
348
349 /* Queue a partial write request. */
350 struct io_plan io_write_partial_(const void *data, size_t *len,
351                                  struct io_plan (*cb)(struct io_conn*, void *),
352                                  void *arg)
353 {
354         struct io_plan plan;
355
356         assert(cb);
357
358         if (*len == 0)
359                 return io_always_(cb, arg);
360
361         plan.u1.const_vp = data;
362         plan.u2.vp = len;
363         plan.io = do_write_partial;
364         plan.next = cb;
365         plan.next_arg = arg;
366         plan.pollflag = POLLOUT;
367
368         return plan;
369 }
370
371 static int already_connected(int fd, struct io_plan *plan)
372 {
373         return io_debug_io(1);
374 }
375
376 static int do_connect(int fd, struct io_plan *plan)
377 {
378         int err, ret;
379         socklen_t len = sizeof(err);
380
381         /* Has async connect finished? */
382         ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
383         if (ret < 0)
384                 return -1;
385
386         if (err == 0) {
387                 /* Restore blocking if it was initially. */
388                 fcntl(fd, F_SETFD, plan->u1.s);
389                 return 1;
390         }
391         return 0;
392 }
393
394 struct io_plan io_connect_(int fd, const struct addrinfo *addr,
395                            struct io_plan (*cb)(struct io_conn*, void *),
396                            void *arg)
397 {
398         struct io_plan plan;
399
400         assert(cb);
401
402         plan.next = cb;
403         plan.next_arg = arg;
404
405         /* Save old flags, set nonblock if not already. */
406         plan.u1.s = fcntl(fd, F_GETFD);
407         fcntl(fd, F_SETFD, plan.u1.s | O_NONBLOCK);
408
409         /* Immediate connect can happen. */
410         if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0) {
411                 /* Dummy will be called immediately. */
412                 plan.pollflag = POLLOUT;
413                 plan.io = already_connected;
414         } else {
415                 if (errno != EINPROGRESS)
416                         return io_close_();
417
418                 plan.pollflag = POLLIN;
419                 plan.io = do_connect;
420         }
421         return plan;
422 }
423
424 struct io_plan io_wait_(const void *wait,
425                         struct io_plan (*cb)(struct io_conn *, void*),
426                         void *arg)
427 {
428         struct io_plan plan;
429
430         assert(cb);
431         plan.pollflag = 0;
432         plan.io = NULL;
433         plan.next = cb;
434         plan.next_arg = arg;
435
436         plan.u1.const_vp = wait;
437
438         return plan;
439 }
440
441 void io_wake(const void *wait)
442 {
443         backend_wait_changed(wait);
444 }
445
446 void io_ready(struct io_conn *conn)
447 {
448         /* Beware io_close_other! */
449         if (!conn->plan.next)
450                 return;
451
452         set_current(conn);
453         switch (conn->plan.io(conn->fd.fd, &conn->plan)) {
454         case -1: /* Failure means a new plan: close up. */
455                 conn->plan = io_close();
456                 backend_plan_changed(conn);
457                 break;
458         case 0: /* Keep going with plan. */
459                 break;
460         case 1: /* Done: get next plan. */
461                 if (timeout_active(conn))
462                         backend_del_timeout(conn);
463                 /* In case they call io_duplex, clear our poll flags so
464                  * both sides don't seem to be both doing read or write
465                  * (See assert(!mask || pfd->events != mask) in poll.c) */
466                 conn->plan.pollflag = 0;
467                 conn->plan = conn->plan.next(conn, conn->plan.next_arg);
468                 backend_plan_changed(conn);
469         }
470         set_current(NULL);
471 }
472
473 /* Close the connection, we're done. */
474 struct io_plan io_close_(void)
475 {
476         struct io_plan plan;
477
478         plan.pollflag = 0;
479         /* This means we're closing. */
480         plan.next = NULL;
481         plan.u1.s = errno;
482
483         return plan;
484 }
485
486 struct io_plan io_close_cb(struct io_conn *conn, void *arg)
487 {
488         return io_close();
489 }
490
491 void io_close_other(struct io_conn *conn)
492 {
493         /* Don't close if already closing! */
494         if (conn->plan.next) {
495                 conn->plan = io_close_();
496                 backend_plan_changed(conn);
497         }
498 }
499
500 /* Exit the loop, returning this (non-NULL) arg. */
501 struct io_plan io_break_(void *ret, struct io_plan plan)
502 {
503         io_plan_debug_again();
504
505         assert(ret);
506         io_loop_return = ret;
507
508         return plan;
509 }
510
511 static struct io_plan io_never_called(struct io_conn *conn, void *arg)
512 {
513         abort();
514 }
515
516 struct io_plan io_never(void)
517 {
518         return io_always_(io_never_called, NULL);
519 }
520
521 int io_conn_fd(const struct io_conn *conn)
522 {
523         return conn->fd.fd;
524 }
525
526 void io_set_alloc(void *(*allocfn)(size_t size),
527                   void *(*reallocfn)(void *ptr, size_t size),
528                   void (*freefn)(void *ptr))
529 {
530         io_alloc.alloc = allocfn;
531         io_alloc.realloc = reallocfn;
532         io_alloc.free = freefn;
533 }