]> git.ozlabs.org Git - ccan/blob - ccan/io/io.c
io: add io_is_idle().
[ccan] / ccan / io / io.c
1 /* Licensed under LGPLv2.1+ - see LICENSE file for details */
2 #include "io.h"
3 #include "backend.h"
4 #include <sys/types.h>
5 #include <sys/socket.h>
6 #include <netdb.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <assert.h>
11 #include <poll.h>
12 #include <unistd.h>
13 #include <fcntl.h>
14
15 void *io_loop_return;
16
17 struct io_alloc io_alloc = {
18         malloc, realloc, free
19 };
20
21 #ifdef DEBUG
22 /* Set to skip the next plan. */
23 bool io_plan_nodebug;
24 /* The current connection to apply plan to. */
25 struct io_conn *current;
26 /* User-defined function to select which connection(s) to debug. */
27 bool (*io_debug_conn)(struct io_conn *conn);
28 /* Set when we wake up an connection we are debugging. */
29 bool io_debug_wakeup;
30
31 struct io_plan io_debug(struct io_plan plan)
32 {
33         struct io_conn *ready = NULL;
34
35         if (io_plan_nodebug) {
36                 io_plan_nodebug = false;
37                 return plan;
38         }
39
40         if (!current || !doing_debug_on(current)) {
41                 if (!io_debug_wakeup)
42                         return plan;
43         }
44
45         io_debug_wakeup = false;
46         current->plan = plan;
47         backend_plan_changed(current);
48
49         /* If it closed, close duplex. */
50         if (!current->plan.next && current->duplex) {
51                 current->duplex->plan = io_close_();
52                 backend_plan_changed(current->duplex);
53         }
54
55         /* Call back into the loop immediately. */
56         io_loop_return = do_io_loop(&ready);
57
58         if (ready) {
59                 set_current(ready);
60                 if (!ready->plan.next) {
61                         /* Call finish function immediately. */
62                         if (ready->finish) {
63                                 errno = ready->plan.u1.s;
64                                 ready->finish(ready, ready->finish_arg);
65                                 ready->finish = NULL;
66                         }
67                         backend_del_conn(ready);
68                 } else {
69                         /* Calls back in itself, via io_debug_io(). */
70                         if (ready->plan.io(ready->fd.fd, &ready->plan) != 2)
71                                 abort();
72                 }
73                 set_current(NULL);
74         }
75
76         /* Return a do-nothing plan, so backend_plan_changed in
77          * io_ready doesn't do anything (it's already been called). */
78         return io_idle_();
79 }
80
81 int io_debug_io(int ret)
82 {
83         /* Cache it for debugging; current changes. */
84         struct io_conn *conn = current;
85         int saved_errno = errno;
86
87         if (!doing_debug_on(conn))
88                 return ret;
89
90         /* These will all go linearly through the io_debug() path above. */
91         switch (ret) {
92         case -1:
93                 /* This will call io_debug above. */
94                 errno = saved_errno;
95                 io_close();
96                 break;
97         case 0: /* Keep going with plan. */
98                 io_debug(conn->plan);
99                 break;
100         case 1: /* Done: get next plan. */
101                 if (timeout_active(conn))
102                         backend_del_timeout(conn);
103                 /* In case they call io_duplex, clear our poll flags so
104                  * both sides don't seem to be both doing read or write
105                  * (See assert(!mask || pfd->events != mask) in poll.c) */
106                 conn->plan.pollflag = 0;
107                 conn->plan.next(conn, conn->plan.next_arg);
108                 break;
109         default:
110                 abort();
111         }
112
113         /* Normally-invalid value, used for sanity check. */
114         return 2;
115 }
116
117 static void debug_io_wake(struct io_conn *conn)
118 {
119         /* We want linear if we wake a debugged connection, too. */
120         if (io_debug_conn && io_debug_conn(conn))
121                 io_debug_wakeup = true;
122 }
123
124 /* Counterpart to io_plan_no_debug(), called in macros in io.h */
125 static void io_plan_debug_again(void)
126 {
127         io_plan_nodebug = false;
128 }
129 #else
130 static void debug_io_wake(struct io_conn *conn)
131 {
132 }
133 static void io_plan_debug_again(void)
134 {
135 }
136 #endif
137
138 struct io_listener *io_new_listener_(int fd,
139                                      void (*init)(int fd, void *arg),
140                                      void *arg)
141 {
142         struct io_listener *l = io_alloc.alloc(sizeof(*l));
143
144         if (!l)
145                 return NULL;
146
147         l->fd.listener = true;
148         l->fd.fd = fd;
149         l->init = init;
150         l->arg = arg;
151         if (!add_listener(l)) {
152                 io_alloc.free(l);
153                 return NULL;
154         }
155         return l;
156 }
157
158 void io_close_listener(struct io_listener *l)
159 {
160         close(l->fd.fd);
161         del_listener(l);
162         io_alloc.free(l);
163 }
164
165 struct io_conn *io_new_conn_(int fd, struct io_plan plan)
166 {
167         struct io_conn *conn = io_alloc.alloc(sizeof(*conn));
168
169         io_plan_debug_again();
170
171         if (!conn)
172                 return NULL;
173
174         conn->fd.listener = false;
175         conn->fd.fd = fd;
176         conn->plan = plan;
177         conn->finish = NULL;
178         conn->finish_arg = NULL;
179         conn->duplex = NULL;
180         conn->timeout = NULL;
181         if (!add_conn(conn)) {
182                 io_alloc.free(conn);
183                 return NULL;
184         }
185         return conn;
186 }
187
188 void io_set_finish_(struct io_conn *conn,
189                     void (*finish)(struct io_conn *, void *),
190                     void *arg)
191 {
192         conn->finish = finish;
193         conn->finish_arg = arg;
194 }
195
196 struct io_conn *io_duplex_(struct io_conn *old, struct io_plan plan)
197 {
198         struct io_conn *conn;
199
200         io_plan_debug_again();
201
202         assert(!old->duplex);
203
204         conn = io_alloc.alloc(sizeof(*conn));
205         if (!conn)
206                 return NULL;
207
208         conn->fd.listener = false;
209         conn->fd.fd = old->fd.fd;
210         conn->plan = plan;
211         conn->duplex = old;
212         conn->finish = NULL;
213         conn->finish_arg = NULL;
214         conn->timeout = NULL;
215         if (!add_duplex(conn)) {
216                 io_alloc.free(conn);
217                 return NULL;
218         }
219         old->duplex = conn;
220         return conn;
221 }
222
223 bool io_timeout_(struct io_conn *conn, struct timespec ts,
224                  struct io_plan (*cb)(struct io_conn *, void *), void *arg)
225 {
226         assert(cb);
227
228         if (!conn->timeout) {
229                 conn->timeout = io_alloc.alloc(sizeof(*conn->timeout));
230                 if (!conn->timeout)
231                         return false;
232         } else
233                 assert(!timeout_active(conn));
234
235         conn->timeout->next = cb;
236         conn->timeout->next_arg = arg;
237         backend_add_timeout(conn, ts);
238         return true;
239 }
240
241 /* Returns true if we're finished. */
242 static int do_write(int fd, struct io_plan *plan)
243 {
244         ssize_t ret = write(fd, plan->u1.cp, plan->u2.s);
245         if (ret < 0)
246                 return io_debug_io(-1);
247
248         plan->u1.cp += ret;
249         plan->u2.s -= ret;
250         return io_debug_io(plan->u2.s == 0);
251 }
252
253 /* Queue some data to be written. */
254 struct io_plan io_write_(const void *data, size_t len,
255                          struct io_plan (*cb)(struct io_conn *, void *),
256                          void *arg)
257 {
258         struct io_plan plan;
259
260         assert(cb);
261         plan.u1.const_vp = data;
262         plan.u2.s = len;
263         plan.io = do_write;
264         plan.next = cb;
265         plan.next_arg = arg;
266         plan.pollflag = POLLOUT;
267
268         return plan;
269 }
270
271 static int do_read(int fd, struct io_plan *plan)
272 {
273         ssize_t ret = read(fd, plan->u1.cp, plan->u2.s);
274         if (ret <= 0)
275                 return io_debug_io(-1);
276
277         plan->u1.cp += ret;
278         plan->u2.s -= ret;
279         return io_debug_io(plan->u2.s == 0);
280 }
281
282 /* Queue a request to read into a buffer. */
283 struct io_plan io_read_(void *data, size_t len,
284                         struct io_plan (*cb)(struct io_conn *, void *),
285                         void *arg)
286 {
287         struct io_plan plan;
288
289         assert(cb);
290         plan.u1.cp = data;
291         plan.u2.s = len;
292         plan.io = do_read;
293         plan.next = cb;
294         plan.next_arg = arg;
295         plan.pollflag = POLLIN;
296
297         return plan;
298 }
299
300 static int do_read_partial(int fd, struct io_plan *plan)
301 {
302         ssize_t ret = read(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
303         if (ret <= 0)
304                 return io_debug_io(-1);
305
306         *(size_t *)plan->u2.vp = ret;
307         return io_debug_io(1);
308 }
309
310 /* Queue a partial request to read into a buffer. */
311 struct io_plan io_read_partial_(void *data, size_t *len,
312                                 struct io_plan (*cb)(struct io_conn *, void *),
313                                 void *arg)
314 {
315         struct io_plan plan;
316
317         assert(cb);
318         plan.u1.cp = data;
319         plan.u2.vp = len;
320         plan.io = do_read_partial;
321         plan.next = cb;
322         plan.next_arg = arg;
323         plan.pollflag = POLLIN;
324
325         return plan;
326 }
327
328 static int do_write_partial(int fd, struct io_plan *plan)
329 {
330         ssize_t ret = write(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
331         if (ret < 0)
332                 return io_debug_io(-1);
333
334         *(size_t *)plan->u2.vp = ret;
335         return io_debug_io(1);
336 }
337
338 /* Queue a partial write request. */
339 struct io_plan io_write_partial_(const void *data, size_t *len,
340                                  struct io_plan (*cb)(struct io_conn*, void *),
341                                  void *arg)
342 {
343         struct io_plan plan;
344
345         assert(cb);
346         plan.u1.const_vp = data;
347         plan.u2.vp = len;
348         plan.io = do_write_partial;
349         plan.next = cb;
350         plan.next_arg = arg;
351         plan.pollflag = POLLOUT;
352
353         return plan;
354 }
355
356 static int already_connected(int fd, struct io_plan *plan)
357 {
358         return io_debug_io(1);
359 }
360
361 static int do_connect(int fd, struct io_plan *plan)
362 {
363         int err, ret;
364         socklen_t len = sizeof(err);
365
366         /* Has async connect finished? */
367         ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
368         if (ret < 0)
369                 return -1;
370
371         if (err == 0) {
372                 /* Restore blocking if it was initially. */
373                 fcntl(fd, F_SETFD, plan->u1.s);
374                 return 1;
375         }
376         return 0;
377 }
378
379 struct io_plan io_connect_(int fd, const struct addrinfo *addr,
380                            struct io_plan (*cb)(struct io_conn*, void *),
381                            void *arg)
382 {
383         struct io_plan plan;
384
385         assert(cb);
386
387         plan.next = cb;
388         plan.next_arg = arg;
389
390         /* Save old flags, set nonblock if not already. */
391         plan.u1.s = fcntl(fd, F_GETFD);
392         fcntl(fd, F_SETFD, plan.u1.s | O_NONBLOCK);
393
394         /* Immediate connect can happen. */
395         if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0) {
396                 /* Dummy will be called immediately. */
397                 plan.pollflag = POLLOUT;
398                 plan.io = already_connected;
399         } else {
400                 if (errno != EINPROGRESS)
401                         return io_close_();
402
403                 plan.pollflag = POLLIN;
404                 plan.io = do_connect;
405         }
406         return plan;
407 }
408
409 struct io_plan io_idle_(void)
410 {
411         struct io_plan plan;
412
413         plan.pollflag = 0;
414         plan.io = NULL;
415         /* Never called (overridden by io_wake), but NULL means closing */
416         plan.next = (void *)io_idle_;
417
418         return plan;
419 }
420
421 bool io_is_idle(const struct io_conn *conn)
422 {
423         return conn->plan.io == NULL;
424 }
425
426 void io_wake_(struct io_conn *conn, struct io_plan plan)
427
428 {
429         io_plan_debug_again();
430
431         /* It might be closing, but we haven't called its finish() yet. */
432         if (!conn->plan.next)
433                 return;
434         /* It was idle, right? */
435         assert(!conn->plan.io);
436         conn->plan = plan;
437         backend_plan_changed(conn);
438
439         debug_io_wake(conn);
440 }
441
442 void io_ready(struct io_conn *conn)
443 {
444         set_current(conn);
445         switch (conn->plan.io(conn->fd.fd, &conn->plan)) {
446         case -1: /* Failure means a new plan: close up. */
447                 conn->plan = io_close();
448                 backend_plan_changed(conn);
449                 break;
450         case 0: /* Keep going with plan. */
451                 break;
452         case 1: /* Done: get next plan. */
453                 if (timeout_active(conn))
454                         backend_del_timeout(conn);
455                 /* In case they call io_duplex, clear our poll flags so
456                  * both sides don't seem to be both doing read or write
457                  * (See assert(!mask || pfd->events != mask) in poll.c) */
458                 conn->plan.pollflag = 0;
459                 conn->plan = conn->plan.next(conn, conn->plan.next_arg);
460                 backend_plan_changed(conn);
461         }
462         set_current(NULL);
463
464         /* If it closed, close duplex if not already */
465         if (!conn->plan.next && conn->duplex && conn->duplex->plan.next) {
466                 set_current(conn->duplex);
467                 conn->duplex->plan = io_close();
468                 backend_plan_changed(conn->duplex);
469                 set_current(NULL);
470         }
471 }
472
473 /* Close the connection, we're done. */
474 struct io_plan io_close_(void)
475 {
476         struct io_plan plan;
477
478         plan.pollflag = 0;
479         /* This means we're closing. */
480         plan.next = NULL;
481         plan.u1.s = errno;
482
483         return plan;
484 }
485
486 struct io_plan io_close_cb(struct io_conn *conn, void *arg)
487 {
488         return io_close();
489 }
490
491 /* Exit the loop, returning this (non-NULL) arg. */
492 struct io_plan io_break_(void *ret, struct io_plan plan)
493 {
494         io_plan_debug_again();
495
496         assert(ret);
497         io_loop_return = ret;
498
499         return plan;
500 }
501
502 int io_conn_fd(const struct io_conn *conn)
503 {
504         return conn->fd.fd;
505 }
506
507 void io_set_alloc(void *(*allocfn)(size_t size),
508                   void *(*reallocfn)(void *ptr, size_t size),
509                   void (*freefn)(void *ptr))
510 {
511         io_alloc.alloc = allocfn;
512         io_alloc.realloc = reallocfn;
513         io_alloc.free = freefn;
514 }