return true;
}
+bool add_duplex(struct io_conn *c)
+{
+ c->fd.backend_info = c->duplex->fd.backend_info;
+ num_next++;
+ return true;
+}
+
static void del_conn(struct io_conn *conn)
{
if (conn->fd.finish)
conn->fd.finish(conn, conn->fd.finish_arg);
- del_fd(&conn->fd);
+ if (conn->duplex) {
+ /* In case fds[] pointed to the other one. */
+ fds[conn->fd.backend_info] = &conn->duplex->fd;
+ conn->duplex->duplex = NULL;
+ } else
+ del_fd(&conn->fd);
if (conn->state == FINISHED)
num_finished--;
else if (conn->state == NEXT)
del_fd(&l->fd);
}
-void backend_set_state(struct io_conn *conn, struct io_op *op)
+static int pollmask(enum io_state state)
{
- enum io_state state = from_ioop(op);
- struct pollfd *pfd = &pollfds[conn->fd.backend_info];
-
switch (state) {
case READ:
case READPART:
- pfd->events = POLLIN;
- break;
+ return POLLIN;
case WRITE:
case WRITEPART:
- pfd->events = POLLOUT;
- break;
- case IDLE:
- pfd->events = 0;
- break;
- case NEXT:
- num_next++;
- break;
- case FINISHED:
- num_finished++;
- break;
+ return POLLOUT;
default:
- abort();
+ return 0;
}
+}
+
+void backend_set_state(struct io_conn *conn, struct io_op *op)
+{
+ enum io_state state = from_ioop(op);
+ struct pollfd *pfd = &pollfds[conn->fd.backend_info];
+
+ pfd->events = pollmask(state);
+ if (conn->duplex) {
+ int mask = pollmask(conn->duplex->state);
+ /* You can't *both* read/write. */
+ assert(!mask || pfd->events != mask);
+ pfd->events |= mask;
+ }
+
+ if (state == NEXT)
+ num_next++;
+ else if (state == FINISHED)
+ num_finished++;
+
conn->state = state;
}
unsigned int i;
for (i = 0; !io_loop_return && i < num_fds; i++) {
- struct io_conn *c;
+ struct io_conn *c, *duplex;
if (!num_finished) {
if (finished_only || num_next == 0)
if (fds[i]->listener)
continue;
c = (void *)fds[i];
- if (c->state == FINISHED) {
- del_conn(c);
- free(c);
- i--;
- } else if (!finished_only && c->state == NEXT) {
- backend_set_state(c, c->fd.next(c, c->fd.next_arg));
- num_next--;
+ for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
+ if (c->state == FINISHED) {
+ del_conn(c);
+ free(c);
+ i--;
+ } else if (!finished_only && c->state == NEXT) {
+ backend_set_state(c,
+ c->fd.next(c,
+ c->fd.next_arg));
+ num_next--;
+ }
}
}
}
+static void ready(struct io_conn *c)
+{
+ backend_set_state(c, do_ready(c));
+}
+
/* This is the main loop. */
void *io_loop(void)
{
for (i = 0; i < num_fds && !io_loop_return; i++) {
struct io_conn *c = (void *)fds[i];
- if (pollfds[i].revents & POLLOUT)
- backend_set_state(c, do_writeable(c));
- else if (pollfds[i].revents & POLLIN) {
- if (fds[i]->listener)
+ int events = pollfds[i].revents;
+
+ if (fds[i]->listener) {
+ if (events & POLLIN)
accept_conn((void *)c);
- else
- backend_set_state(c, do_readable(c));
- } else if (pollfds[i].revents & POLLHUP) {
+ } else if (events & (POLLIN|POLLOUT)) {
+ if (c->duplex) {
+ int mask = pollmask(c->duplex->state);
+ if (events & mask) {
+ ready(c->duplex);
+ events &= ~mask;
+ if (!(events&(POLLIN|POLLOUT)))
+ continue;
+ }
+ }
+ ready(c);
+ } else if (events & POLLHUP) {
backend_set_state(c, io_close(c, NULL));
+ if (c->duplex)
+ backend_set_state(c->duplex,
+ io_close(c->duplex,
+ NULL));
}
+
}
}