]> git.ozlabs.org Git - ccan/blobdiff - ccan/io/poll.c
ccan/io: io_duplex.
[ccan] / ccan / io / poll.c
index 070f6d842ae7f45bd4b0b529b6c31cd6c71d7b20..fdff271c64f26e09f060b9e52e468dcf80d97696 100644 (file)
@@ -76,11 +76,23 @@ bool add_conn(struct io_conn *c)
        return true;
 }
 
+bool add_duplex(struct io_conn *c)
+{
+       c->fd.backend_info = c->duplex->fd.backend_info;
+       num_next++;
+       return true;
+}
+
 static void del_conn(struct io_conn *conn)
 {
        if (conn->fd.finish)
                conn->fd.finish(conn, conn->fd.finish_arg);
-       del_fd(&conn->fd);
+       if (conn->duplex) {
+               /* In case fds[] pointed to the other one. */
+               fds[conn->fd.backend_info] = &conn->duplex->fd;
+               conn->duplex->duplex = NULL;
+       } else
+               del_fd(&conn->fd);
        if (conn->state == FINISHED)
                num_finished--;
        else if (conn->state == NEXT)
@@ -92,32 +104,38 @@ void del_listener(struct io_listener *l)
        del_fd(&l->fd);
 }
 
-void backend_set_state(struct io_conn *conn, struct io_op *op)
+static int pollmask(enum io_state state)
 {
-       enum io_state state = from_ioop(op);
-       struct pollfd *pfd = &pollfds[conn->fd.backend_info];
-
        switch (state) {
        case READ:
        case READPART:
-               pfd->events = POLLIN;
-               break;
+               return POLLIN;
        case WRITE:
        case WRITEPART:
-               pfd->events = POLLOUT;
-               break;
-       case IDLE:
-               pfd->events = 0;
-               break;
-       case NEXT:
-               num_next++;
-               break;
-       case FINISHED:
-               num_finished++;
-               break;
+               return POLLOUT;
        default:
-               abort();
+               return 0;
        }
+}
+
+void backend_set_state(struct io_conn *conn, struct io_op *op)
+{
+       enum io_state state = from_ioop(op);
+       struct pollfd *pfd = &pollfds[conn->fd.backend_info];
+
+       pfd->events = pollmask(state);
+       if (conn->duplex) {
+               int mask = pollmask(conn->duplex->state);
+               /* You can't *both* read/write. */
+               assert(!mask || pfd->events != mask);
+               pfd->events |= mask;
+       }
+
+       if (state == NEXT)
+               num_next++;
+       else if (state == FINISHED)
+               num_finished++;
+
        conn->state = state;
 }
 
@@ -142,7 +160,7 @@ static void finish_and_next(bool finished_only)
        unsigned int i;
 
        for (i = 0; !io_loop_return && i < num_fds; i++) {
-               struct io_conn *c;
+               struct io_conn *c, *duplex;
 
                if (!num_finished) {
                        if (finished_only || num_next == 0)
@@ -151,17 +169,26 @@ static void finish_and_next(bool finished_only)
                if (fds[i]->listener)
                        continue;
                c = (void *)fds[i];
-               if (c->state == FINISHED) {
-                       del_conn(c);
-                       free(c);
-                       i--;
-               } else if (!finished_only && c->state == NEXT) {
-                       backend_set_state(c, c->fd.next(c, c->fd.next_arg));
-                       num_next--;
+               for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
+                       if (c->state == FINISHED) {
+                               del_conn(c);
+                               free(c);
+                               i--;
+                       } else if (!finished_only && c->state == NEXT) {
+                               backend_set_state(c,
+                                                 c->fd.next(c,
+                                                            c->fd.next_arg));
+                               num_next--;
+                       }
                }
        }
 }
 
+static void ready(struct io_conn *c)
+{
+       backend_set_state(c, do_ready(c));
+}
+
 /* This is the main loop. */
 void *io_loop(void)
 {
@@ -185,16 +212,30 @@ void *io_loop(void)
 
                for (i = 0; i < num_fds && !io_loop_return; i++) {
                        struct io_conn *c = (void *)fds[i];
-                       if (pollfds[i].revents & POLLOUT)
-                               backend_set_state(c, do_writeable(c));
-                       else if (pollfds[i].revents & POLLIN) {
-                               if (fds[i]->listener)
+                       int events = pollfds[i].revents;
+
+                       if (fds[i]->listener) {
+                               if (events & POLLIN)
                                        accept_conn((void *)c);
-                               else
-                                       backend_set_state(c, do_readable(c));
-                       } else if (pollfds[i].revents & POLLHUP) {
+                       } else if (events & (POLLIN|POLLOUT)) {
+                               if (c->duplex) {
+                                       int mask = pollmask(c->duplex->state);
+                                       if (events & mask) {
+                                               ready(c->duplex);
+                                               events &= ~mask;
+                                               if (!(events&(POLLIN|POLLOUT)))
+                                                       continue;
+                                       }
+                               }
+                               ready(c);
+                       } else if (events & POLLHUP) {
                                backend_set_state(c, io_close(c, NULL));
+                               if (c->duplex)
+                                       backend_set_state(c->duplex,
+                                                         io_close(c->duplex,
+                                                                  NULL));
                        }
+
                }
        }