1 /* Licensed under LGPLv2.1+ - see LICENSE file for details */
5 #include <sys/socket.h>
13 #include <ccan/container_of/container_of.h>
17 struct io_plan io_conn_freed;
18 static bool io_extended_errors;
20 struct io_listener *io_new_listener_(const tal_t *ctx, int fd,
21 struct io_plan *(*init)(struct io_conn *,
25 struct io_listener *l = tal(ctx, struct io_listener);
29 l->fd.listener = true;
39 void io_close_listener(struct io_listener *l)
44 static struct io_plan *io_never_called(struct io_conn *conn, void *arg)
49 /* Returns false if conn was freed. */
50 static bool next_plan(struct io_conn *conn, struct io_plan *plan)
52 struct io_plan *(*next)(struct io_conn *, void *arg);
56 plan->status = IO_UNSET;
58 plan->next = io_never_called;
60 plan = next(conn, plan->next_arg);
62 if (plan == &io_conn_freed)
65 assert(plan == &conn->plan[plan->dir]);
66 assert(conn->plan[IO_IN].status != IO_UNSET
67 || conn->plan[IO_OUT].status != IO_UNSET);
69 backend_new_plan(conn);
73 bool io_fd_block(int fd, bool block)
75 int flags = fcntl(fd, F_GETFL);
85 return fcntl(fd, F_SETFL, flags) != -1;
88 struct io_conn *io_new_conn_(const tal_t *ctx, int fd,
89 struct io_plan *(*init)(struct io_conn *, void *),
92 struct io_conn *conn = tal(ctx, struct io_conn);
97 conn->fd.listener = false;
100 conn->finish_arg = NULL;
103 return tal_free(conn);
105 /* Keep our I/O async. */
106 io_fd_block(fd, false);
108 /* So we can get back from plan -> conn later */
109 conn->plan[IO_OUT].dir = IO_OUT;
110 conn->plan[IO_IN].dir = IO_IN;
112 /* We start with out doing nothing, and in doing our init. */
113 conn->plan[IO_OUT].status = IO_UNSET;
115 conn->plan[IO_IN].next = init;
116 conn->plan[IO_IN].next_arg = arg;
117 if (!next_plan(conn, &conn->plan[IO_IN]))
123 bool io_conn_exclusive(struct io_conn *conn, bool exclusive)
125 return backend_set_exclusive(&conn->plan[IO_IN], exclusive);
128 bool io_conn_out_exclusive(struct io_conn *conn, bool exclusive)
130 return backend_set_exclusive(&conn->plan[IO_OUT], exclusive);
133 void io_set_finish_(struct io_conn *conn,
134 void (*finish)(struct io_conn *, void *),
137 conn->finish = finish;
138 conn->finish_arg = arg;
141 struct io_plan_arg *io_plan_arg(struct io_conn *conn, enum io_direction dir)
143 assert(conn->plan[dir].status == IO_UNSET);
145 conn->plan[dir].status = IO_POLLING_NOTSTARTED;
146 return &conn->plan[dir].arg;
149 static struct io_plan *set_always(struct io_conn *conn,
150 enum io_direction dir,
151 struct io_plan *(*next)(struct io_conn *,
155 struct io_plan *plan = &conn->plan[dir];
157 plan->status = IO_ALWAYS;
158 /* Only happens on OOM, and only with non-default tal_backend. */
159 if (!backend_new_always(plan))
161 return io_set_plan(conn, dir, NULL, next, arg);
164 static struct io_plan *io_always_dir(struct io_conn *conn,
165 enum io_direction dir,
166 struct io_plan *(*next)(struct io_conn *,
170 return set_always(conn, dir, next, arg);
173 struct io_plan *io_always_(struct io_conn *conn,
174 struct io_plan *(*next)(struct io_conn *, void *),
177 return io_always_dir(conn, IO_IN, next, arg);
180 struct io_plan *io_out_always_(struct io_conn *conn,
181 struct io_plan *(*next)(struct io_conn *,
185 return io_always_dir(conn, IO_OUT, next, arg);
188 static int do_write(int fd, struct io_plan_arg *arg)
190 ssize_t ret = write(fd, arg->u1.cp, arg->u2.s);
196 return arg->u2.s == 0;
199 /* Queue some data to be written. */
200 struct io_plan *io_write_(struct io_conn *conn, const void *data, size_t len,
201 struct io_plan *(*next)(struct io_conn *, void *),
204 struct io_plan_arg *arg = io_plan_arg(conn, IO_OUT);
207 return set_always(conn, IO_OUT, next, next_arg);
209 arg->u1.const_vp = data;
212 return io_set_plan(conn, IO_OUT, do_write, next, next_arg);
215 static int do_read(int fd, struct io_plan_arg *arg)
217 ssize_t ret = read(fd, arg->u1.cp, arg->u2.s);
219 /* Errno isn't set if we hit EOF, so set it to distinct value */
227 return arg->u2.s == 0;
230 /* Queue a request to read into a buffer. */
231 struct io_plan *io_read_(struct io_conn *conn,
232 void *data, size_t len,
233 struct io_plan *(*next)(struct io_conn *, void *),
236 struct io_plan_arg *arg = io_plan_arg(conn, IO_IN);
239 return set_always(conn, IO_IN, next, next_arg);
244 return io_set_plan(conn, IO_IN, do_read, next, next_arg);
247 static int do_read_partial(int fd, struct io_plan_arg *arg)
249 ssize_t ret = read(fd, arg->u1.cp, *(size_t *)arg->u2.vp);
251 /* Errno isn't set if we hit EOF, so set it to distinct value */
257 *(size_t *)arg->u2.vp = ret;
261 /* Queue a partial request to read into a buffer. */
262 struct io_plan *io_read_partial_(struct io_conn *conn,
263 void *data, size_t maxlen, size_t *len,
264 struct io_plan *(*next)(struct io_conn *,
268 struct io_plan_arg *arg = io_plan_arg(conn, IO_IN);
271 return set_always(conn, IO_IN, next, next_arg);
274 /* We store the max len in here temporarily. */
278 return io_set_plan(conn, IO_IN, do_read_partial, next, next_arg);
281 static int do_write_partial(int fd, struct io_plan_arg *arg)
283 ssize_t ret = write(fd, arg->u1.cp, *(size_t *)arg->u2.vp);
287 *(size_t *)arg->u2.vp = ret;
291 /* Queue a partial write request. */
292 struct io_plan *io_write_partial_(struct io_conn *conn,
293 const void *data, size_t maxlen, size_t *len,
294 struct io_plan *(*next)(struct io_conn *,
298 struct io_plan_arg *arg = io_plan_arg(conn, IO_OUT);
301 return set_always(conn, IO_OUT, next, next_arg);
303 arg->u1.const_vp = data;
304 /* We store the max len in here temporarily. */
308 return io_set_plan(conn, IO_OUT, do_write_partial, next, next_arg);
311 static int do_connect(int fd, struct io_plan_arg *arg)
314 socklen_t len = sizeof(err);
316 /* Has async connect finished? */
317 ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
323 } else if (err == EINPROGRESS)
330 struct io_plan *io_connect_(struct io_conn *conn, const struct addrinfo *addr,
331 struct io_plan *(*next)(struct io_conn *, void *),
334 int fd = io_conn_fd(conn);
336 /* We don't actually need the arg, but we need it polling. */
337 io_plan_arg(conn, IO_OUT);
339 /* Note that io_new_conn() will make fd O_NONBLOCK */
341 /* Immediate connect can happen. */
342 if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0)
343 return set_always(conn, IO_OUT, next, next_arg);
345 if (errno != EINPROGRESS)
346 return io_close(conn);
348 return io_set_plan(conn, IO_OUT, do_connect, next, next_arg);
351 static struct io_plan *io_wait_dir(struct io_conn *conn,
353 enum io_direction dir,
354 struct io_plan *(*next)(struct io_conn *,
358 struct io_plan_arg *arg = io_plan_arg(conn, dir);
359 arg->u1.const_vp = wait;
361 conn->plan[dir].status = IO_WAITING;
363 return io_set_plan(conn, dir, NULL, next, next_arg);
366 struct io_plan *io_wait_(struct io_conn *conn,
368 struct io_plan *(*next)(struct io_conn *, void *),
371 return io_wait_dir(conn, wait, IO_IN, next, next_arg);
374 struct io_plan *io_out_wait_(struct io_conn *conn,
376 struct io_plan *(*next)(struct io_conn *, void *),
379 return io_wait_dir(conn, wait, IO_OUT, next, next_arg);
382 void io_wake(const void *wait)
388 /* Destroyed, do not touch */
390 /* Worked, call again. */
392 /* Failed with EAGAIN or did partial. */
394 /* No longer interested in read (or write) */
398 static enum plan_result do_plan(struct io_conn *conn, struct io_plan *plan,
401 /* We shouldn't have polled for this event if this wasn't true! */
402 assert(plan->status == IO_POLLING_NOTSTARTED
403 || plan->status == IO_POLLING_STARTED);
405 switch (plan->io(conn->fd.fd, &plan->arg)) {
407 /* This is expected, as we call optimistically! */
410 if (errno == EPIPE && idle_on_epipe) {
411 plan->status = IO_UNSET;
412 backend_new_plan(conn);
418 plan->status = IO_POLLING_STARTED;
419 /* If it started but didn't finish, don't call again. */
422 if (!next_plan(conn, plan))
424 if (plan->status == IO_POLLING_NOTSTARTED)
428 /* IO should only return -1, 0 or 1 */
433 void io_ready(struct io_conn *conn, int pollflags)
435 enum plan_result res;
437 if (pollflags & POLLIN) {
439 res = do_plan(conn, &conn->plan[IO_IN], false);
454 if (pollflags & POLLOUT) {
456 /* If we're writing to a closed pipe, we need to wait for
457 * read to fail if we're duplex: we want to drain it! */
458 res = do_plan(conn, &conn->plan[IO_OUT],
459 conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED
460 || conn->plan[IO_IN].status == IO_POLLING_STARTED);
474 void io_do_always(struct io_plan *plan)
476 struct io_conn *conn;
478 assert(plan->status == IO_ALWAYS);
479 conn = container_of(plan, struct io_conn, plan[plan->dir]);
481 next_plan(conn, plan);
484 void io_do_wakeup(struct io_conn *conn, enum io_direction dir)
486 struct io_plan *plan = &conn->plan[dir];
488 assert(plan->status == IO_WAITING);
490 set_always(conn, dir, plan->next, plan->next_arg);
493 /* Close the connection, we're done. */
494 struct io_plan *io_close(struct io_conn *conn)
497 return &io_conn_freed;
500 struct io_plan *io_close_cb(struct io_conn *conn, void *next_arg)
502 return io_close(conn);
505 struct io_plan *io_close_taken_fd(struct io_conn *conn)
507 io_fd_block(conn->fd.fd, true);
509 cleanup_conn_without_close(conn);
510 return io_close(conn);
513 /* Exit the loop, returning this (non-NULL) arg. */
514 void io_break(const void *ret)
517 io_loop_return = (void *)ret;
520 struct io_plan *io_never(struct io_conn *conn, void *unused)
522 return io_always(conn, io_never_called, NULL);
525 int io_conn_fd(const struct io_conn *conn)
530 struct io_plan *io_duplex(struct io_conn *conn,
531 struct io_plan *in_plan, struct io_plan *out_plan)
533 assert(conn == container_of(in_plan, struct io_conn, plan[IO_IN]));
534 /* in_plan must be conn->plan[IO_IN], out_plan must be [IO_OUT] */
535 assert(out_plan == in_plan + 1);
539 struct io_plan *io_halfclose(struct io_conn *conn)
541 /* Both unset? OK. */
542 if (conn->plan[IO_IN].status == IO_UNSET
543 && conn->plan[IO_OUT].status == IO_UNSET)
544 return io_close(conn);
546 /* We leave this unset then. */
547 if (conn->plan[IO_IN].status == IO_UNSET)
548 return &conn->plan[IO_IN];
550 return &conn->plan[IO_OUT];
553 struct io_plan *io_set_plan(struct io_conn *conn, enum io_direction dir,
554 int (*io)(int fd, struct io_plan_arg *arg),
555 struct io_plan *(*next)(struct io_conn *, void *),
558 struct io_plan *plan = &conn->plan[dir];
562 plan->next_arg = next_arg;
563 assert(next != NULL);
568 bool io_plan_in_started(const struct io_conn *conn)
570 return conn->plan[IO_IN].status == IO_POLLING_STARTED;
573 bool io_plan_out_started(const struct io_conn *conn)
575 return conn->plan[IO_OUT].status == IO_POLLING_STARTED;
578 /* Despite being a TCP expert, I missed the full extent of this
579 * problem. The legendary ZmnSCPxj implemented it (with the URL
580 * pointing to the explanation), and I imitate that here. */
581 struct io_plan *io_sock_shutdown(struct io_conn *conn)
583 if (shutdown(io_conn_fd(conn), SHUT_WR) != 0)
584 return io_close(conn);
586 /* And leave unset .*/
587 return &conn->plan[IO_IN];
590 bool io_flush_sync(struct io_conn *conn)
592 struct io_plan *plan = &conn->plan[IO_OUT];
595 /* Not writing? Nothing to do. */
596 if (plan->status != IO_POLLING_STARTED
597 && plan->status != IO_POLLING_NOTSTARTED)
600 /* Synchronous please. */
601 io_fd_block(io_conn_fd(conn), true);
604 switch (plan->io(conn->fd.fd, &plan->arg)) {
608 /* Incomplete, try again. */
610 plan->status = IO_POLLING_STARTED;
614 /* In case they come back. */
615 set_always(conn, IO_OUT, plan->next, plan->next_arg);
618 /* IO should only return -1, 0 or 1 */
622 io_fd_block(io_conn_fd(conn), false);
626 void io_set_extended_errors(bool state)
628 io_extended_errors = state;
631 bool io_get_extended_errors(void)
633 return io_extended_errors;