]> git.ozlabs.org Git - ccan/blob - ccan/io/io.c
io: io_always, and zero-length operations support.
[ccan] / ccan / io / io.c
1 /* Licensed under LGPLv2.1+ - see LICENSE file for details */
2 #include "io.h"
3 #include "backend.h"
4 #include <sys/types.h>
5 #include <sys/socket.h>
6 #include <netdb.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <assert.h>
11 #include <unistd.h>
12 #include <fcntl.h>
13
14 void *io_loop_return;
15
16 struct io_alloc io_alloc = {
17         malloc, realloc, free
18 };
19
20 #ifdef DEBUG
21 /* Set to skip the next plan. */
22 bool io_plan_nodebug;
23 /* The current connection to apply plan to. */
24 struct io_conn *current;
25 /* User-defined function to select which connection(s) to debug. */
26 bool (*io_debug_conn)(struct io_conn *conn);
27 /* Set when we wake up an connection we are debugging. */
28 bool io_debug_wakeup;
29
30 struct io_plan io_debug(struct io_plan plan)
31 {
32         struct io_conn *ready = NULL;
33
34         if (io_plan_nodebug) {
35                 io_plan_nodebug = false;
36                 return plan;
37         }
38
39         if (!current || !doing_debug_on(current)) {
40                 if (!io_debug_wakeup)
41                         return plan;
42         }
43
44         io_debug_wakeup = false;
45         current->plan = plan;
46         backend_plan_changed(current);
47
48         /* Call back into the loop immediately. */
49         io_loop_return = do_io_loop(&ready);
50
51         if (ready) {
52                 set_current(ready);
53                 if (!ready->plan.next) {
54                         /* Call finish function immediately. */
55                         if (ready->finish) {
56                                 errno = ready->plan.u1.s;
57                                 ready->finish(ready, ready->finish_arg);
58                                 ready->finish = NULL;
59                         }
60                         backend_del_conn(ready);
61                 } else {
62                         /* Calls back in itself, via io_debug_io(). */
63                         if (ready->plan.io(ready->fd.fd, &ready->plan) != 2)
64                                 abort();
65                 }
66                 set_current(NULL);
67         }
68
69         /* Return a do-nothing plan, so backend_plan_changed in
70          * io_ready doesn't do anything (it's already been called). */
71         return io_idle_();
72 }
73
74 int io_debug_io(int ret)
75 {
76         /* Cache it for debugging; current changes. */
77         struct io_conn *conn = current;
78         int saved_errno = errno;
79
80         if (!doing_debug_on(conn))
81                 return ret;
82
83         /* These will all go linearly through the io_debug() path above. */
84         switch (ret) {
85         case -1:
86                 /* This will call io_debug above. */
87                 errno = saved_errno;
88                 io_close();
89                 break;
90         case 0: /* Keep going with plan. */
91                 io_debug(conn->plan);
92                 break;
93         case 1: /* Done: get next plan. */
94                 if (timeout_active(conn))
95                         backend_del_timeout(conn);
96                 /* In case they call io_duplex, clear our poll flags so
97                  * both sides don't seem to be both doing read or write
98                  * (See assert(!mask || pfd->events != mask) in poll.c) */
99                 conn->plan.pollflag = 0;
100                 conn->plan.next(conn, conn->plan.next_arg);
101                 break;
102         default:
103                 abort();
104         }
105
106         /* Normally-invalid value, used for sanity check. */
107         return 2;
108 }
109
110 static void debug_io_wake(struct io_conn *conn)
111 {
112         /* We want linear if we wake a debugged connection, too. */
113         if (io_debug_conn && io_debug_conn(conn))
114                 io_debug_wakeup = true;
115 }
116
117 /* Counterpart to io_plan_no_debug(), called in macros in io.h */
118 static void io_plan_debug_again(void)
119 {
120         io_plan_nodebug = false;
121 }
122 #else
123 static void debug_io_wake(struct io_conn *conn)
124 {
125 }
126 static void io_plan_debug_again(void)
127 {
128 }
129 #endif
130
131 struct io_listener *io_new_listener_(int fd,
132                                      void (*init)(int fd, void *arg),
133                                      void *arg)
134 {
135         struct io_listener *l = io_alloc.alloc(sizeof(*l));
136
137         if (!l)
138                 return NULL;
139
140         l->fd.listener = true;
141         l->fd.fd = fd;
142         l->init = init;
143         l->arg = arg;
144         if (!add_listener(l)) {
145                 io_alloc.free(l);
146                 return NULL;
147         }
148         return l;
149 }
150
151 void io_close_listener(struct io_listener *l)
152 {
153         close(l->fd.fd);
154         del_listener(l);
155         io_alloc.free(l);
156 }
157
158 struct io_conn *io_new_conn_(int fd, struct io_plan plan)
159 {
160         struct io_conn *conn = io_alloc.alloc(sizeof(*conn));
161
162         io_plan_debug_again();
163
164         if (!conn)
165                 return NULL;
166
167         conn->fd.listener = false;
168         conn->fd.fd = fd;
169         conn->plan = plan;
170         conn->finish = NULL;
171         conn->finish_arg = NULL;
172         conn->duplex = NULL;
173         conn->timeout = NULL;
174         if (!add_conn(conn)) {
175                 io_alloc.free(conn);
176                 return NULL;
177         }
178         return conn;
179 }
180
181 void io_set_finish_(struct io_conn *conn,
182                     void (*finish)(struct io_conn *, void *),
183                     void *arg)
184 {
185         conn->finish = finish;
186         conn->finish_arg = arg;
187 }
188
189 struct io_conn *io_duplex_(struct io_conn *old, struct io_plan plan)
190 {
191         struct io_conn *conn;
192
193         io_plan_debug_again();
194
195         assert(!old->duplex);
196
197         conn = io_alloc.alloc(sizeof(*conn));
198         if (!conn)
199                 return NULL;
200
201         conn->fd.listener = false;
202         conn->fd.fd = old->fd.fd;
203         conn->plan = plan;
204         conn->duplex = old;
205         conn->finish = NULL;
206         conn->finish_arg = NULL;
207         conn->timeout = NULL;
208         if (!add_duplex(conn)) {
209                 io_alloc.free(conn);
210                 return NULL;
211         }
212         old->duplex = conn;
213         return conn;
214 }
215
216 bool io_timeout_(struct io_conn *conn, struct timespec ts,
217                  struct io_plan (*cb)(struct io_conn *, void *), void *arg)
218 {
219         assert(cb);
220
221         if (!conn->timeout) {
222                 conn->timeout = io_alloc.alloc(sizeof(*conn->timeout));
223                 if (!conn->timeout)
224                         return false;
225         } else
226                 assert(!timeout_active(conn));
227
228         conn->timeout->next = cb;
229         conn->timeout->next_arg = arg;
230         backend_add_timeout(conn, ts);
231         return true;
232 }
233
234 /* Always done: call the next thing. */
235 static int do_always(int fd, struct io_plan *plan)
236 {
237         return 1;
238 }
239
240 struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
241                           void *arg)
242 {
243         struct io_plan plan;
244
245         assert(cb);
246         plan.io = do_always;
247         plan.next = cb;
248         plan.next_arg = arg;
249         plan.pollflag = POLLALWAYS;
250
251         return plan;
252 }
253
254 /* Returns true if we're finished. */
255 static int do_write(int fd, struct io_plan *plan)
256 {
257         ssize_t ret = write(fd, plan->u1.cp, plan->u2.s);
258         if (ret < 0)
259                 return io_debug_io(-1);
260
261         plan->u1.cp += ret;
262         plan->u2.s -= ret;
263         return io_debug_io(plan->u2.s == 0);
264 }
265
266 /* Queue some data to be written. */
267 struct io_plan io_write_(const void *data, size_t len,
268                          struct io_plan (*cb)(struct io_conn *, void *),
269                          void *arg)
270 {
271         struct io_plan plan;
272
273         assert(cb);
274
275         if (len == 0)
276                 return io_always_(cb, arg);
277
278         plan.u1.const_vp = data;
279         plan.u2.s = len;
280         plan.io = do_write;
281         plan.next = cb;
282         plan.next_arg = arg;
283         plan.pollflag = POLLOUT;
284
285         return plan;
286 }
287
288 static int do_read(int fd, struct io_plan *plan)
289 {
290         ssize_t ret = read(fd, plan->u1.cp, plan->u2.s);
291         if (ret <= 0)
292                 return io_debug_io(-1);
293
294         plan->u1.cp += ret;
295         plan->u2.s -= ret;
296         return io_debug_io(plan->u2.s == 0);
297 }
298
299 /* Queue a request to read into a buffer. */
300 struct io_plan io_read_(void *data, size_t len,
301                         struct io_plan (*cb)(struct io_conn *, void *),
302                         void *arg)
303 {
304         struct io_plan plan;
305
306         assert(cb);
307
308         if (len == 0)
309                 return io_always_(cb, arg);
310
311         plan.u1.cp = data;
312         plan.u2.s = len;
313         plan.io = do_read;
314         plan.next = cb;
315         plan.next_arg = arg;
316
317         plan.pollflag = POLLIN;
318
319         return plan;
320 }
321
322 static int do_read_partial(int fd, struct io_plan *plan)
323 {
324         ssize_t ret = read(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
325         if (ret <= 0)
326                 return io_debug_io(-1);
327
328         *(size_t *)plan->u2.vp = ret;
329         return io_debug_io(1);
330 }
331
332 /* Queue a partial request to read into a buffer. */
333 struct io_plan io_read_partial_(void *data, size_t *len,
334                                 struct io_plan (*cb)(struct io_conn *, void *),
335                                 void *arg)
336 {
337         struct io_plan plan;
338
339         assert(cb);
340
341         if (*len == 0)
342                 return io_always_(cb, arg);
343
344         plan.u1.cp = data;
345         plan.u2.vp = len;
346         plan.io = do_read_partial;
347         plan.next = cb;
348         plan.next_arg = arg;
349         plan.pollflag = POLLIN;
350
351         return plan;
352 }
353
354 static int do_write_partial(int fd, struct io_plan *plan)
355 {
356         ssize_t ret = write(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
357         if (ret < 0)
358                 return io_debug_io(-1);
359
360         *(size_t *)plan->u2.vp = ret;
361         return io_debug_io(1);
362 }
363
364 /* Queue a partial write request. */
365 struct io_plan io_write_partial_(const void *data, size_t *len,
366                                  struct io_plan (*cb)(struct io_conn*, void *),
367                                  void *arg)
368 {
369         struct io_plan plan;
370
371         assert(cb);
372
373         if (*len == 0)
374                 return io_always_(cb, arg);
375
376         plan.u1.const_vp = data;
377         plan.u2.vp = len;
378         plan.io = do_write_partial;
379         plan.next = cb;
380         plan.next_arg = arg;
381         plan.pollflag = POLLOUT;
382
383         return plan;
384 }
385
386 static int already_connected(int fd, struct io_plan *plan)
387 {
388         return io_debug_io(1);
389 }
390
391 static int do_connect(int fd, struct io_plan *plan)
392 {
393         int err, ret;
394         socklen_t len = sizeof(err);
395
396         /* Has async connect finished? */
397         ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
398         if (ret < 0)
399                 return -1;
400
401         if (err == 0) {
402                 /* Restore blocking if it was initially. */
403                 fcntl(fd, F_SETFD, plan->u1.s);
404                 return 1;
405         }
406         return 0;
407 }
408
409 struct io_plan io_connect_(int fd, const struct addrinfo *addr,
410                            struct io_plan (*cb)(struct io_conn*, void *),
411                            void *arg)
412 {
413         struct io_plan plan;
414
415         assert(cb);
416
417         plan.next = cb;
418         plan.next_arg = arg;
419
420         /* Save old flags, set nonblock if not already. */
421         plan.u1.s = fcntl(fd, F_GETFD);
422         fcntl(fd, F_SETFD, plan.u1.s | O_NONBLOCK);
423
424         /* Immediate connect can happen. */
425         if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0) {
426                 /* Dummy will be called immediately. */
427                 plan.pollflag = POLLOUT;
428                 plan.io = already_connected;
429         } else {
430                 if (errno != EINPROGRESS)
431                         return io_close_();
432
433                 plan.pollflag = POLLIN;
434                 plan.io = do_connect;
435         }
436         return plan;
437 }
438
439 struct io_plan io_idle_(void)
440 {
441         struct io_plan plan;
442
443         plan.pollflag = 0;
444         plan.io = NULL;
445         /* Never called (overridden by io_wake), but NULL means closing */
446         plan.next = (void *)io_idle_;
447
448         return plan;
449 }
450
451 bool io_is_idle(const struct io_conn *conn)
452 {
453         return conn->plan.io == NULL;
454 }
455
456 void io_wake_(struct io_conn *conn, struct io_plan plan)
457
458 {
459         io_plan_debug_again();
460
461         /* It might be closing, but we haven't called its finish() yet. */
462         if (!conn->plan.next)
463                 return;
464         /* It was idle, right? */
465         assert(!conn->plan.io);
466         conn->plan = plan;
467         backend_plan_changed(conn);
468
469         debug_io_wake(conn);
470 }
471
472 void io_ready(struct io_conn *conn)
473 {
474         /* Beware io_close_other! */
475         if (!conn->plan.next)
476                 return;
477
478         set_current(conn);
479         switch (conn->plan.io(conn->fd.fd, &conn->plan)) {
480         case -1: /* Failure means a new plan: close up. */
481                 conn->plan = io_close();
482                 backend_plan_changed(conn);
483                 break;
484         case 0: /* Keep going with plan. */
485                 break;
486         case 1: /* Done: get next plan. */
487                 if (timeout_active(conn))
488                         backend_del_timeout(conn);
489                 /* In case they call io_duplex, clear our poll flags so
490                  * both sides don't seem to be both doing read or write
491                  * (See assert(!mask || pfd->events != mask) in poll.c) */
492                 conn->plan.pollflag = 0;
493                 conn->plan = conn->plan.next(conn, conn->plan.next_arg);
494                 backend_plan_changed(conn);
495         }
496         set_current(NULL);
497 }
498
499 /* Close the connection, we're done. */
500 struct io_plan io_close_(void)
501 {
502         struct io_plan plan;
503
504         plan.pollflag = 0;
505         /* This means we're closing. */
506         plan.next = NULL;
507         plan.u1.s = errno;
508
509         return plan;
510 }
511
512 struct io_plan io_close_cb(struct io_conn *conn, void *arg)
513 {
514         return io_close();
515 }
516
517 void io_close_other(struct io_conn *conn)
518 {
519         conn->plan = io_close_();
520         backend_plan_changed(conn);
521 }
522
523 /* Exit the loop, returning this (non-NULL) arg. */
524 struct io_plan io_break_(void *ret, struct io_plan plan)
525 {
526         io_plan_debug_again();
527
528         assert(ret);
529         io_loop_return = ret;
530
531         return plan;
532 }
533
534 int io_conn_fd(const struct io_conn *conn)
535 {
536         return conn->fd.fd;
537 }
538
539 void io_set_alloc(void *(*allocfn)(size_t size),
540                   void *(*reallocfn)(void *ptr, size_t size),
541                   void (*freefn)(void *ptr))
542 {
543         io_alloc.alloc = allocfn;
544         io_alloc.realloc = reallocfn;
545         io_alloc.free = freefn;
546 }