]> git.ozlabs.org Git - ccan/blob - ccan/io/io.c
ccan/io: eliminate dir argument from io_wait and io_always.
[ccan] / ccan / io / io.c
1 /* Licensed under LGPLv2.1+ - see LICENSE file for details */
2 #include "io.h"
3 #include "backend.h"
4 #include <sys/types.h>
5 #include <sys/socket.h>
6 #include <netdb.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <assert.h>
11 #include <unistd.h>
12 #include <fcntl.h>
13
14 void *io_loop_return;
15
16 struct io_listener *io_new_listener_(const tal_t *ctx, int fd,
17                                      struct io_plan *(*init)(struct io_conn *,
18                                                              void *),
19                                      void *arg)
20 {
21         struct io_listener *l = tal(ctx, struct io_listener);
22         if (!l)
23                 return NULL;
24
25         l->fd.listener = true;
26         l->fd.fd = fd;
27         l->init = init;
28         l->arg = arg;
29         l->ctx = ctx;
30         if (!add_listener(l))
31                 return tal_free(l);
32         return l;
33 }
34
35 void io_close_listener(struct io_listener *l)
36 {
37         close(l->fd.fd);
38         del_listener(l);
39         tal_free(l);
40 }
41
42 static struct io_plan *io_never_called(struct io_conn *conn, void *arg)
43 {
44         abort();
45 }
46
47 static void next_plan(struct io_conn *conn, struct io_plan *plan)
48 {
49         struct io_plan *(*next)(struct io_conn *, void *arg);
50
51         next = plan->next;
52
53         plan->status = IO_UNSET;
54         plan->io = NULL;
55         plan->next = io_never_called;
56
57         plan = next(conn, plan->next_arg);
58
59         /* It should have set a plan inside this conn. */
60         assert(plan == &conn->plan[IO_IN]
61                || plan == &conn->plan[IO_OUT]);
62         assert(conn->plan[IO_IN].status != IO_UNSET
63                || conn->plan[IO_OUT].status != IO_UNSET);
64
65         backend_new_plan(conn);
66 }
67
68 struct io_conn *io_new_conn_(const tal_t *ctx, int fd,
69                              struct io_plan *(*init)(struct io_conn *, void *),
70                              void *arg)
71 {
72         struct io_conn *conn = tal(ctx, struct io_conn);
73
74         if (!conn)
75                 return NULL;
76
77         conn->fd.listener = false;
78         conn->fd.fd = fd;
79         conn->finish = NULL;
80         conn->finish_arg = NULL;
81         conn->list = NULL;
82
83         if (!add_conn(conn))
84                 return tal_free(conn);
85
86         /* We start with out doing nothing, and in doing our init. */
87         conn->plan[IO_OUT].status = IO_UNSET;
88
89         conn->plan[IO_IN].next = init;
90         conn->plan[IO_IN].next_arg = arg;
91         next_plan(conn, &conn->plan[IO_IN]);
92
93         return conn;
94 }
95
96 void io_set_finish_(struct io_conn *conn,
97                     void (*finish)(struct io_conn *, void *),
98                     void *arg)
99 {
100         conn->finish = finish;
101         conn->finish_arg = arg;
102 }
103
104 struct io_plan *io_get_plan(struct io_conn *conn, enum io_direction dir)
105 {
106         assert(conn->plan[dir].status == IO_UNSET);
107
108         conn->plan[dir].status = IO_POLLING;
109         return &conn->plan[dir];
110 }
111
112 static struct io_plan *set_always(struct io_conn *conn,
113                                   struct io_plan *plan,
114                                   struct io_plan *(*next)(struct io_conn *,
115                                                           void *),
116                                   void *arg)
117 {
118         plan->next = next;
119         plan->next_arg = arg;
120         plan->status = IO_ALWAYS;
121
122         backend_new_always(conn);
123         return plan;
124 }
125
126 struct io_plan *io_always_(struct io_conn *conn,
127                            struct io_plan *(*next)(struct io_conn *, void *),
128                            void *arg)
129 {
130         struct io_plan *plan;
131
132         /* If we're duplex, we want this on the current plan.  Otherwise,
133          * doesn't matter. */
134         if (conn->plan[IO_IN].status == IO_UNSET)
135                 plan = io_get_plan(conn, IO_IN);
136         else
137                 plan = io_get_plan(conn, IO_OUT);
138
139         assert(next);
140         set_always(conn, plan, next, arg);
141
142         return plan;
143 }
144
145 static int do_write(int fd, struct io_plan *plan)
146 {
147         ssize_t ret = write(fd, plan->u1.cp, plan->u2.s);
148         if (ret < 0)
149                 return -1;
150
151         plan->u1.cp += ret;
152         plan->u2.s -= ret;
153         return plan->u2.s == 0;
154 }
155
156 /* Queue some data to be written. */
157 struct io_plan *io_write_(struct io_conn *conn, const void *data, size_t len,
158                           struct io_plan *(*next)(struct io_conn *, void *),
159                           void *arg)
160 {
161         struct io_plan *plan = io_get_plan(conn, IO_OUT);
162
163         assert(next);
164
165         if (len == 0)
166                 return set_always(conn, plan, next, arg);
167
168         plan->u1.const_vp = data;
169         plan->u2.s = len;
170         plan->io = do_write;
171         plan->next = next;
172         plan->next_arg = arg;
173
174         return plan;
175 }
176
177 static int do_read(int fd, struct io_plan *plan)
178 {
179         ssize_t ret = read(fd, plan->u1.cp, plan->u2.s);
180         if (ret <= 0)
181                 return -1;
182
183         plan->u1.cp += ret;
184         plan->u2.s -= ret;
185         return plan->u2.s == 0;
186 }
187
188 /* Queue a request to read into a buffer. */
189 struct io_plan *io_read_(struct io_conn *conn,
190                          void *data, size_t len,
191                          struct io_plan *(*next)(struct io_conn *, void *),
192                          void *arg)
193 {
194         struct io_plan *plan = io_get_plan(conn, IO_IN);
195
196         assert(next);
197
198         if (len == 0)
199                 return set_always(conn, plan, next, arg);
200
201         plan->u1.cp = data;
202         plan->u2.s = len;
203         plan->io = do_read;
204         plan->next = next;
205         plan->next_arg = arg;
206
207         return plan;
208 }
209
210 static int do_read_partial(int fd, struct io_plan *plan)
211 {
212         ssize_t ret = read(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
213         if (ret <= 0)
214                 return -1;
215
216         *(size_t *)plan->u2.vp = ret;
217         return 1;
218 }
219
220 /* Queue a partial request to read into a buffer. */
221 struct io_plan *io_read_partial_(struct io_conn *conn,
222                                  void *data, size_t maxlen, size_t *len,
223                                  struct io_plan *(*next)(struct io_conn *,
224                                                          void *),
225                                  void *arg)
226 {
227         struct io_plan *plan = io_get_plan(conn, IO_IN);
228
229         assert(next);
230
231         if (maxlen == 0)
232                 return set_always(conn, plan, next, arg);
233
234         plan->u1.cp = data;
235         /* We store the max len in here temporarily. */
236         *len = maxlen;
237         plan->u2.vp = len;
238         plan->io = do_read_partial;
239         plan->next = next;
240         plan->next_arg = arg;
241
242         return plan;
243 }
244
245 static int do_write_partial(int fd, struct io_plan *plan)
246 {
247         ssize_t ret = write(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
248         if (ret < 0)
249                 return -1;
250
251         *(size_t *)plan->u2.vp = ret;
252         return 1;
253 }
254
255 /* Queue a partial write request. */
256 struct io_plan *io_write_partial_(struct io_conn *conn,
257                                   const void *data, size_t maxlen, size_t *len,
258                                   struct io_plan *(*next)(struct io_conn *,
259                                                           void*),
260                                   void *arg)
261 {
262         struct io_plan *plan = io_get_plan(conn, IO_OUT);
263
264         assert(next);
265
266         if (maxlen == 0)
267                 return set_always(conn, plan, next, arg);
268
269         plan->u1.const_vp = data;
270         /* We store the max len in here temporarily. */
271         *len = maxlen;
272         plan->u2.vp = len;
273         plan->io = do_write_partial;
274         plan->next = next;
275         plan->next_arg = arg;
276
277         return plan;
278 }
279
280 static int do_connect(int fd, struct io_plan *plan)
281 {
282         int err, ret;
283         socklen_t len = sizeof(err);
284
285         /* Has async connect finished? */
286         ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
287         if (ret < 0)
288                 return -1;
289
290         if (err == 0) {
291                 /* Restore blocking if it was initially. */
292                 fcntl(fd, F_SETFL, plan->u1.s);
293                 return 1;
294         } else if (err == EINPROGRESS)
295                 return 0;
296
297         errno = err;
298         return -1;
299 }
300
301 struct io_plan *io_connect_(struct io_conn *conn, const struct addrinfo *addr,
302                             struct io_plan *(*next)(struct io_conn *, void *),
303                             void *arg)
304 {
305         struct io_plan *plan = io_get_plan(conn, IO_IN);
306         int fd = io_conn_fd(conn);
307
308         assert(next);
309
310         /* Save old flags, set nonblock if not already. */
311         plan->u1.s = fcntl(fd, F_GETFL);
312         fcntl(fd, F_SETFL, plan->u1.s | O_NONBLOCK);
313
314         /* Immediate connect can happen. */
315         if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0)
316                 return set_always(conn, plan, next, arg);
317
318         if (errno != EINPROGRESS)
319                 return io_close(conn);
320
321         plan->next = next;
322         plan->next_arg = arg;
323         plan->io = do_connect;
324
325         return plan;
326 }
327
328 struct io_plan *io_wait_(struct io_conn *conn,
329                          const void *wait,
330                          struct io_plan *(*next)(struct io_conn *, void *),
331                          void *arg)
332 {
333         struct io_plan *plan;
334
335         /* If we're duplex, we want this on the current plan.  Otherwise,
336          * doesn't matter. */
337         if (conn->plan[IO_IN].status == IO_UNSET)
338                 plan = io_get_plan(conn, IO_IN);
339         else
340                 plan = io_get_plan(conn, IO_OUT);
341
342         assert(next);
343
344         plan->next = next;
345         plan->next_arg = arg;
346         plan->u1.const_vp = wait;
347         plan->status = IO_WAITING;
348
349         return plan;
350 }
351
352 void io_wake(const void *wait)
353 {
354         backend_wake(wait);
355 }
356
357 static void do_plan(struct io_conn *conn, struct io_plan *plan)
358 {
359         /* Someone else might have called io_close() on us. */
360         if (plan->status == IO_CLOSING)
361                 return;
362
363         /* We shouldn't have polled for this event if this wasn't true! */
364         assert(plan->status == IO_POLLING);
365
366         switch (plan->io(conn->fd.fd, plan)) {
367         case -1:
368                 io_close(conn);
369                 break;
370         case 0:
371                 break;
372         case 1:
373                 next_plan(conn, plan);
374                 break;
375         default:
376                 /* IO should only return -1, 0 or 1 */
377                 abort();
378         }
379 }
380
381 void io_ready(struct io_conn *conn, int pollflags)
382 {
383         if (pollflags & POLLIN)
384                 do_plan(conn, &conn->plan[IO_IN]);
385
386         if (pollflags & POLLOUT)
387                 do_plan(conn, &conn->plan[IO_OUT]);
388 }
389
390 void io_do_always(struct io_conn *conn)
391 {
392         if (conn->plan[IO_IN].status == IO_ALWAYS)
393                 next_plan(conn, &conn->plan[IO_IN]);
394
395         if (conn->plan[IO_OUT].status == IO_ALWAYS)
396                 next_plan(conn, &conn->plan[IO_OUT]);
397 }
398
399 void io_do_wakeup(struct io_conn *conn, struct io_plan *plan)
400 {
401         assert(plan->status == IO_WAITING);
402         next_plan(conn, plan);
403 }
404
405 /* Close the connection, we're done. */
406 struct io_plan *io_close(struct io_conn *conn)
407 {
408         /* Already closing?  Don't close twice. */
409         if (conn->plan[IO_IN].status == IO_CLOSING)
410                 return &conn->plan[IO_IN];
411
412         conn->plan[IO_IN].status = conn->plan[IO_OUT].status = IO_CLOSING;
413         conn->plan[IO_IN].u1.s = errno;
414         backend_new_closing(conn);
415
416         return &conn->plan[IO_IN];
417 }
418
419 struct io_plan *io_close_cb(struct io_conn *conn, void *arg)
420 {
421         return io_close(conn);
422 }
423
424 /* Exit the loop, returning this (non-NULL) arg. */
425 void io_break(const void *ret)
426 {
427         assert(ret);
428         io_loop_return = (void *)ret;
429 }
430
431 struct io_plan *io_never(struct io_conn *conn)
432 {
433         return io_always(conn, io_never_called, NULL);
434 }
435
436 int io_conn_fd(const struct io_conn *conn)
437 {
438         return conn->fd.fd;
439 }