Merge branch 'master' of ozlabs.org:ccan
[ccan] / ccan / io / io.c
1 /* Licensed under LGPLv2.1+ - see LICENSE file for details */
2 #include "io.h"
3 #include "backend.h"
4 #include <sys/types.h>
5 #include <sys/socket.h>
6 #include <netdb.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <assert.h>
11 #include <poll.h>
12 #include <unistd.h>
13 #include <fcntl.h>
14
15 void *io_loop_return;
16
17 struct io_alloc io_alloc = {
18         malloc, realloc, free
19 };
20
21 #ifdef DEBUG
22 /* Set to skip the next plan. */
23 bool io_plan_nodebug;
24 /* The current connection to apply plan to. */
25 struct io_conn *current;
26 /* User-defined function to select which connection(s) to debug. */
27 bool (*io_debug_conn)(struct io_conn *conn);
28 /* Set when we wake up an connection we are debugging. */
29 bool io_debug_wakeup;
30
31 struct io_plan io_debug(struct io_plan plan)
32 {
33         struct io_conn *ready = NULL;
34
35         if (io_plan_nodebug) {
36                 io_plan_nodebug = false;
37                 return plan;
38         }
39
40         if (!current || !doing_debug_on(current)) {
41                 if (!io_debug_wakeup)
42                         return plan;
43         }
44
45         io_debug_wakeup = false;
46         current->plan = plan;
47         backend_plan_changed(current);
48
49         /* Call back into the loop immediately. */
50         io_loop_return = do_io_loop(&ready);
51
52         if (ready) {
53                 set_current(ready);
54                 if (!ready->plan.next) {
55                         /* Call finish function immediately. */
56                         if (ready->finish) {
57                                 errno = ready->plan.u1.s;
58                                 ready->finish(ready, ready->finish_arg);
59                                 ready->finish = NULL;
60                         }
61                         backend_del_conn(ready);
62                 } else {
63                         /* Calls back in itself, via io_debug_io(). */
64                         if (ready->plan.io(ready->fd.fd, &ready->plan) != 2)
65                                 abort();
66                 }
67                 set_current(NULL);
68         }
69
70         /* Return a do-nothing plan, so backend_plan_changed in
71          * io_ready doesn't do anything (it's already been called). */
72         return io_idle_();
73 }
74
75 int io_debug_io(int ret)
76 {
77         /* Cache it for debugging; current changes. */
78         struct io_conn *conn = current;
79         int saved_errno = errno;
80
81         if (!doing_debug_on(conn))
82                 return ret;
83
84         /* These will all go linearly through the io_debug() path above. */
85         switch (ret) {
86         case -1:
87                 /* This will call io_debug above. */
88                 errno = saved_errno;
89                 io_close();
90                 break;
91         case 0: /* Keep going with plan. */
92                 io_debug(conn->plan);
93                 break;
94         case 1: /* Done: get next plan. */
95                 if (timeout_active(conn))
96                         backend_del_timeout(conn);
97                 conn->plan.next(conn, conn->plan.next_arg);
98                 break;
99         default:
100                 abort();
101         }
102
103         /* Normally-invalid value, used for sanity check. */
104         return 2;
105 }
106
107 static void debug_io_wake(struct io_conn *conn)
108 {
109         /* We want linear if we wake a debugged connection, too. */
110         if (io_debug_conn && io_debug_conn(conn))
111                 io_debug_wakeup = true;
112 }
113
114 /* Counterpart to io_plan_no_debug(), called in macros in io.h */
115 static void io_plan_debug_again(void)
116 {
117         io_plan_nodebug = false;
118 }
119 #else
120 static void debug_io_wake(struct io_conn *conn)
121 {
122 }
123 static void io_plan_debug_again(void)
124 {
125 }
126 #endif
127
128 struct io_listener *io_new_listener_(int fd,
129                                      void (*init)(int fd, void *arg),
130                                      void *arg)
131 {
132         struct io_listener *l = io_alloc.alloc(sizeof(*l));
133
134         if (!l)
135                 return NULL;
136
137         l->fd.listener = true;
138         l->fd.fd = fd;
139         l->init = init;
140         l->arg = arg;
141         if (!add_listener(l)) {
142                 io_alloc.free(l);
143                 return NULL;
144         }
145         return l;
146 }
147
148 void io_close_listener(struct io_listener *l)
149 {
150         close(l->fd.fd);
151         del_listener(l);
152         io_alloc.free(l);
153 }
154
155 struct io_conn *io_new_conn_(int fd, struct io_plan plan)
156 {
157         struct io_conn *conn = io_alloc.alloc(sizeof(*conn));
158
159         io_plan_debug_again();
160
161         if (!conn)
162                 return NULL;
163
164         conn->fd.listener = false;
165         conn->fd.fd = fd;
166         conn->plan = plan;
167         conn->finish = NULL;
168         conn->finish_arg = NULL;
169         conn->duplex = NULL;
170         conn->timeout = NULL;
171         if (!add_conn(conn)) {
172                 io_alloc.free(conn);
173                 return NULL;
174         }
175         return conn;
176 }
177
178 void io_set_finish_(struct io_conn *conn,
179                     void (*finish)(struct io_conn *, void *),
180                     void *arg)
181 {
182         conn->finish = finish;
183         conn->finish_arg = arg;
184 }
185
186 struct io_conn *io_duplex_(struct io_conn *old, struct io_plan plan)
187 {
188         struct io_conn *conn;
189
190         io_plan_debug_again();
191
192         assert(!old->duplex);
193
194         conn = io_alloc.alloc(sizeof(*conn));
195         if (!conn)
196                 return NULL;
197
198         conn->fd.listener = false;
199         conn->fd.fd = old->fd.fd;
200         conn->plan = plan;
201         conn->duplex = old;
202         conn->finish = NULL;
203         conn->finish_arg = NULL;
204         conn->timeout = NULL;
205         if (!add_duplex(conn)) {
206                 io_alloc.free(conn);
207                 return NULL;
208         }
209         old->duplex = conn;
210         return conn;
211 }
212
213 bool io_timeout_(struct io_conn *conn, struct timespec ts,
214                  struct io_plan (*cb)(struct io_conn *, void *), void *arg)
215 {
216         assert(cb);
217
218         if (!conn->timeout) {
219                 conn->timeout = io_alloc.alloc(sizeof(*conn->timeout));
220                 if (!conn->timeout)
221                         return false;
222         } else
223                 assert(!timeout_active(conn));
224
225         conn->timeout->next = cb;
226         conn->timeout->next_arg = arg;
227         backend_add_timeout(conn, ts);
228         return true;
229 }
230
231 /* Returns true if we're finished. */
232 static int do_write(int fd, struct io_plan *plan)
233 {
234         ssize_t ret = write(fd, plan->u1.cp, plan->u2.s);
235         if (ret < 0)
236                 return io_debug_io(-1);
237
238         plan->u1.cp += ret;
239         plan->u2.s -= ret;
240         return io_debug_io(plan->u2.s == 0);
241 }
242
243 /* Queue some data to be written. */
244 struct io_plan io_write_(const void *data, size_t len,
245                          struct io_plan (*cb)(struct io_conn *, void *),
246                          void *arg)
247 {
248         struct io_plan plan;
249
250         assert(cb);
251         plan.u1.const_vp = data;
252         plan.u2.s = len;
253         plan.io = do_write;
254         plan.next = cb;
255         plan.next_arg = arg;
256         plan.pollflag = POLLOUT;
257
258         return plan;
259 }
260
261 static int do_read(int fd, struct io_plan *plan)
262 {
263         ssize_t ret = read(fd, plan->u1.cp, plan->u2.s);
264         if (ret <= 0)
265                 return io_debug_io(-1);
266
267         plan->u1.cp += ret;
268         plan->u2.s -= ret;
269         return io_debug_io(plan->u2.s == 0);
270 }
271
272 /* Queue a request to read into a buffer. */
273 struct io_plan io_read_(void *data, size_t len,
274                         struct io_plan (*cb)(struct io_conn *, void *),
275                         void *arg)
276 {
277         struct io_plan plan;
278
279         assert(cb);
280         plan.u1.cp = data;
281         plan.u2.s = len;
282         plan.io = do_read;
283         plan.next = cb;
284         plan.next_arg = arg;
285         plan.pollflag = POLLIN;
286
287         return plan;
288 }
289
290 static int do_read_partial(int fd, struct io_plan *plan)
291 {
292         ssize_t ret = read(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
293         if (ret <= 0)
294                 return io_debug_io(-1);
295
296         *(size_t *)plan->u2.vp = ret;
297         return io_debug_io(1);
298 }
299
300 /* Queue a partial request to read into a buffer. */
301 struct io_plan io_read_partial_(void *data, size_t *len,
302                                 struct io_plan (*cb)(struct io_conn *, void *),
303                                 void *arg)
304 {
305         struct io_plan plan;
306
307         assert(cb);
308         plan.u1.cp = data;
309         plan.u2.vp = len;
310         plan.io = do_read_partial;
311         plan.next = cb;
312         plan.next_arg = arg;
313         plan.pollflag = POLLIN;
314
315         return plan;
316 }
317
318 static int do_write_partial(int fd, struct io_plan *plan)
319 {
320         ssize_t ret = write(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
321         if (ret < 0)
322                 return io_debug_io(-1);
323
324         *(size_t *)plan->u2.vp = ret;
325         return io_debug_io(1);
326 }
327
328 /* Queue a partial write request. */
329 struct io_plan io_write_partial_(const void *data, size_t *len,
330                                  struct io_plan (*cb)(struct io_conn*, void *),
331                                  void *arg)
332 {
333         struct io_plan plan;
334
335         assert(cb);
336         plan.u1.const_vp = data;
337         plan.u2.vp = len;
338         plan.io = do_write_partial;
339         plan.next = cb;
340         plan.next_arg = arg;
341         plan.pollflag = POLLOUT;
342
343         return plan;
344 }
345
346 static int already_connected(int fd, struct io_plan *plan)
347 {
348         return io_debug_io(1);
349 }
350
351 static int do_connect(int fd, struct io_plan *plan)
352 {
353         int err, ret;
354         socklen_t len = sizeof(err);
355
356         /* Has async connect finished? */
357         ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
358         if (ret < 0)
359                 return -1;
360
361         if (err == 0) {
362                 /* Restore blocking if it was initially. */
363                 fcntl(fd, F_SETFD, plan->u1.s);
364                 return 1;
365         }
366         return 0;
367 }
368
369 struct io_plan io_connect_(int fd, const struct addrinfo *addr,
370                            struct io_plan (*cb)(struct io_conn*, void *),
371                            void *arg)
372 {
373         struct io_plan plan;
374
375         assert(cb);
376
377         plan.next = cb;
378         plan.next_arg = arg;
379
380         /* Save old flags, set nonblock if not already. */
381         plan.u1.s = fcntl(fd, F_GETFD);
382         fcntl(fd, F_SETFD, plan.u1.s | O_NONBLOCK);
383
384         /* Immediate connect can happen. */
385         if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0) {
386                 /* Dummy will be called immediately. */
387                 plan.pollflag = POLLOUT;
388                 plan.io = already_connected;
389         } else {
390                 if (errno != EINPROGRESS)
391                         return io_close_();
392
393                 plan.pollflag = POLLIN;
394                 plan.io = do_connect;
395         }
396         return plan;
397 }
398
399 struct io_plan io_idle_(void)
400 {
401         struct io_plan plan;
402
403         plan.pollflag = 0;
404         plan.io = NULL;
405         /* Never called (overridden by io_wake), but NULL means closing */
406         plan.next = (void *)io_idle_;
407
408         return plan;
409 }
410
411 void io_wake_(struct io_conn *conn, struct io_plan plan)
412
413 {
414         io_plan_debug_again();
415
416         /* It might be closing, but we haven't called its finish() yet. */
417         if (!conn->plan.next)
418                 return;
419         /* It was idle, right? */
420         assert(!conn->plan.io);
421         conn->plan = plan;
422         backend_plan_changed(conn);
423
424         debug_io_wake(conn);
425 }
426
427 void io_ready(struct io_conn *conn)
428 {
429         set_current(conn);
430         switch (conn->plan.io(conn->fd.fd, &conn->plan)) {
431         case -1: /* Failure means a new plan: close up. */
432                 conn->plan = io_close();
433                 backend_plan_changed(conn);
434                 break;
435         case 0: /* Keep going with plan. */
436                 break;
437         case 1: /* Done: get next plan. */
438                 if (timeout_active(conn))
439                         backend_del_timeout(conn);
440                 conn->plan = conn->plan.next(conn, conn->plan.next_arg);
441                 backend_plan_changed(conn);
442         }
443         set_current(NULL);
444 }
445
446 /* Close the connection, we're done. */
447 struct io_plan io_close_(void)
448 {
449         struct io_plan plan;
450
451         plan.pollflag = 0;
452         /* This means we're closing. */
453         plan.next = NULL;
454         plan.u1.s = errno;
455
456         return plan;
457 }
458
459 struct io_plan io_close_cb(struct io_conn *conn, void *arg)
460 {
461         return io_close();
462 }
463
464 /* Exit the loop, returning this (non-NULL) arg. */
465 struct io_plan io_break_(void *ret, struct io_plan plan)
466 {
467         io_plan_debug_again();
468
469         assert(ret);
470         io_loop_return = ret;
471
472         return plan;
473 }
474
475 int io_conn_fd(const struct io_conn *conn)
476 {
477         return conn->fd.fd;
478 }
479
480 void io_set_alloc(void *(*allocfn)(size_t size),
481                   void *(*reallocfn)(void *ptr, size_t size),
482                   void (*freefn)(void *ptr))
483 {
484         io_alloc.alloc = allocfn;
485         io_alloc.realloc = reallocfn;
486         io_alloc.free = freefn;
487 }