* // This reads from stdin.
* static struct io_plan wake_writer(struct io_conn *, struct stdin_buffer *);
* // This writes the stdin buffer to the child.
- * static struct io_plan write_to_child(struct io_conn *c,
- * struct stdin_buffer *b);
- * static struct io_plan read_stdin(struct io_conn *c, struct stdin_buffer *b)
- * {
- * assert(c == b->reader);
- * b->len = sizeof(b->inbuf);
- * return io_read_partial(b->inbuf, &b->len, wake_writer, b);
- * }
+ * static struct io_plan wake_reader(struct io_conn *, struct stdin_buffer *);
*
* static struct io_plan wake_writer(struct io_conn *c, struct stdin_buffer *b)
* {
* assert(c == b->reader);
- * io_wake(b->writer, write_to_child, b);
+ * io_wake(b->writer, io_write(b->inbuf, b->len, wake_reader, b));
* return io_idle();
* }
*
* static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
* {
* assert(c == b->reader);
- * io_wake(b->writer, write_to_child, b);
+ * io_wake(b->writer, io_close(b->writer, NULL));
* b->reader = NULL;
* }
*
* static struct io_plan wake_reader(struct io_conn *c, struct stdin_buffer *b)
* {
* assert(c == b->writer);
- * io_wake(b->reader, read_stdin, b);
- * return io_idle();
- * }
- *
- * static struct io_plan write_to_child(struct io_conn *conn,
- * struct stdin_buffer *b)
- * {
- * assert(conn == b->writer);
- * if (!b->reader)
- * return io_close(conn, NULL);
- * return io_write(b->inbuf, b->len, wake_reader, b);
- * }
- *
- * static struct io_plan start_writer(struct io_conn *conn,
- * struct stdin_buffer *b)
- * {
- * assert(conn == b->writer);
+ * if (!b->reader)
+ * return io_close(c, NULL);
+ * b->len = sizeof(b->inbuf);
+ * io_wake(b->reader, io_read_partial(b->inbuf, &b->len, wake_writer, b));
* return io_idle();
* }
*
* {
* b->off += b->rlen;
*
- * if (b->off == b->max) {
- * if (b->max == 0)
- * b->max = 128;
- * else if (b->max >= 1024*1024)
- * b->max += 1024*1024;
- * else
- * b->max *= 2;
- * b->buf = realloc(b->buf, b->max);
- * }
+ * if (b->off == b->max)
+ * b->buf = realloc(b->buf, b->max *= 2);
*
* b->rlen = b->max - b->off;
* return io_read_partial(b->buf + b->off, &b->rlen, read_from_child, b);
* int main(int argc, char *argv[])
* {
* int tochild[2], fromchild[2];
- * struct buffer out = { 0, 0, 0, NULL };
+ * struct buffer out;
* struct stdin_buffer sbuf;
* int status;
* size_t off;
* ssize_t ret;
+ * struct io_conn *from_child;
*
* if (argc == 1)
* errx(1, "Usage: runner <cmdline>...");
* close(fromchild[1]);
* signal(SIGPIPE, SIG_IGN);
*
- * sbuf.reader = io_new_conn(STDIN_FILENO, read_stdin, reader_exit, &sbuf);
- * sbuf.writer = io_new_conn(tochild[1], start_writer, fail_child_write,
+ * sbuf.len = sizeof(sbuf.inbuf);
+ * sbuf.reader = io_new_conn(STDIN_FILENO,
+ * io_read_partial(sbuf.inbuf, &sbuf.len,
+ * wake_writer, &sbuf),
+ * reader_exit, &sbuf);
+ * sbuf.writer = io_new_conn(tochild[1], io_idle(), fail_child_write,
* &sbuf);
- * if (!sbuf.reader || !sbuf.writer
- * || !io_new_conn(fromchild[0], read_from_child, NULL, &out))
+ *
+ * out.max = 128;
+ * out.off = 0;
+ * out.rlen = 128;
+ * out.buf = malloc(out.max);
+ * from_child = io_new_conn(fromchild[0],
+ * io_read_partial(out.buf, &out.rlen,
+ * read_from_child, &out),
+ * NULL, NULL);
+ * if (!sbuf.reader || !sbuf.writer || !from_child)
* err(1, "Allocating connections");
*
* io_loop();
static struct io_plan write_reply(struct io_conn *conn, struct client *client);
static struct io_plan read_request(struct io_conn *conn, struct client *client)
{
- return io_read(conn, client->request_buffer, REQUEST_SIZE,
+ return io_read(client->request_buffer, REQUEST_SIZE,
write_reply, client);
}
static struct io_plan write_reply(struct io_conn *conn, struct client *client)
{
- return io_write(conn, client->reply_buffer, REPLY_SIZE,
+ return io_write(client->reply_buffer, REPLY_SIZE,
write_complete, client);
}
static struct io_plan do_timeout(struct io_conn *conn, char *buf)
{
- return io_break(conn, buf, NULL, NULL);
-}
-
-static struct io_plan do_timeout_read(struct io_conn *conn, char *buf)
-{
- return io_read(conn, buf, 1, do_timeout, buf);
+ return io_break(buf, io_idle());
}
int main(int argc, char *argv[])
if (ret < 0)
err(1, "Accepting fd");
/* For efficiency, we share client structure */
- io_new_conn(ret, read_request, NULL, &client);
+ io_new_conn(ret,
+ io_read(client.request_buffer, REQUEST_SIZE,
+ write_reply, &client),
+ NULL, NULL);
}
}
- io_new_conn(timeout[0], do_timeout_read, NULL, &buf);
+ io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf), NULL, NULL);
close(wake[0]);
for (i = 0; i < NUM_CHILDREN; i++)
static struct io_plan read_body(struct io_conn *conn, struct client *client)
{
assert(client->len <= REQUEST_MAX);
- return io_read(conn, client->request_buffer, client->len,
+ return io_read(client->request_buffer, client->len,
write_reply, client);
}
-static struct io_plan read_header(struct io_conn *conn, struct client *client)
+static struct io_plan io_read_header(struct client *client)
{
- return io_read(conn, &client->len, sizeof(client->len),
- read_body, client);
+ return io_read(&client->len, sizeof(client->len), read_body, client);
}
/* once we're done, loop again. */
static struct io_plan write_complete(struct io_conn *conn, struct client *client)
{
completed++;
- return read_header(conn, client);
+ return io_read_header(client);
}
static struct io_plan write_reply(struct io_conn *conn, struct client *client)
{
- return io_write(conn, &client->len, sizeof(client->len),
+ return io_write(&client->len, sizeof(client->len),
write_complete, client);
}
static struct io_plan do_timeout(struct io_conn *conn, char *buf)
{
- return io_break(conn, buf, NULL, NULL);
-}
-
-static struct io_plan do_timeout_read(struct io_conn *conn, char *buf)
-{
- return io_read(conn, buf, 1, do_timeout, buf);
+ return io_break(buf, io_idle());
}
int main(int argc, char *argv[])
err(1, "Accepting fd");
/* For efficiency, we share buffer */
client->request_buffer = buffer;
- io_new_conn(ret, read_header, NULL, client);
+ io_new_conn(ret, io_read_header(client), NULL, NULL);
}
}
- io_new_conn(timeout[0], do_timeout_read, NULL, &buf);
+ io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf), NULL, NULL);
close(wake[0]);
for (i = 0; i < NUM_CHILDREN; i++)
char buf[32];
};
-static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf);
static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
-static struct io_plan do_read(struct io_conn *conn, struct buffer *buf)
-{
- assert(conn == buf->reader);
-
- return io_read(conn, &buf->buf, sizeof(buf->buf), poke_writer, buf);
-}
-
-static struct io_plan do_write(struct io_conn *conn, struct buffer *buf)
-{
- assert(conn == buf->writer);
-
- return io_write(conn, &buf->buf, sizeof(buf->buf), poke_reader, buf);
-}
-
static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->reader);
return io_close(conn, NULL);
/* You write. */
- io_wake(buf->writer, do_write, buf);
+ io_wake(buf->writer,
+ io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
/* I'll wait until you wake me. */
- return io_idle(conn);
+ return io_idle();
}
static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->writer);
/* You read. */
- io_wake(buf->reader, do_read, buf);
+ io_wake(buf->reader,
+ io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
if (++buf->iters == NUM_ITERS)
return io_close(conn, NULL);
/* I'll wait until you tell me to write. */
- return io_idle(conn);
-}
-
-static struct io_plan reader(struct io_conn *conn, struct buffer *buf)
-{
- assert(conn == buf->reader);
-
- /* Wait for writer to tell us to read. */
- return io_idle(conn);
+ return io_idle();
}
int main(void)
memset(buf[i].buf, i, sizeof(buf[i].buf));
sprintf(buf[i].buf, "%i-%i", i, i);
- buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+ buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL);
if (!buf[i].reader)
err(1, "Creating reader %i", i);
- buf[i].writer = io_new_conn(fds[1], do_write, NULL, &buf[i]);
+ buf[i].writer = io_new_conn(fds[1],
+ io_write(&buf[i].buf,
+ sizeof(buf[i].buf),
+ poke_reader, &buf[i]),
+ NULL, NULL);
if (!buf[i].writer)
err(1, "Creating writer %i", i);
last_read = fds[0];
i = 0;
buf[i].iters = 0;
sprintf(buf[i].buf, "%i-%i", i, i);
- buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+ buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL);
if (!buf[i].reader)
err(1, "Creating reader %i", i);
- buf[i].writer = io_new_conn(last_write, do_write, NULL, &buf[i]);
+ buf[i].writer = io_new_conn(last_write, io_write(&buf[i].buf,
+ sizeof(buf[i].buf),
+ poke_reader, &buf[i]),
+ NULL, NULL);
if (!buf[i].writer)
err(1, "Creating writer %i", i);
}
struct io_conn *io_new_conn_(int fd,
- struct io_plan (*start)(struct io_conn *, void *),
+ struct io_plan plan,
void (*finish)(struct io_conn *, void *),
void *arg)
{
conn->fd.listener = false;
conn->fd.fd = fd;
- conn->plan.next = start;
+ conn->plan = plan;
conn->finish = finish;
- conn->finish_arg = conn->plan.next_arg = arg;
- conn->plan.pollflag = 0;
- conn->plan.state = IO_NEXT;
+ conn->finish_arg = arg;
conn->duplex = NULL;
conn->timeout = NULL;
if (!add_conn(conn)) {
}
struct io_conn *io_duplex_(struct io_conn *old,
- struct io_plan (*start)(struct io_conn *, void *),
- void (*finish)(struct io_conn *, void *),
- void *arg)
+ struct io_plan plan,
+ void (*finish)(struct io_conn *, void *),
+ void *arg)
{
struct io_conn *conn;
conn->fd.listener = false;
conn->fd.fd = old->fd.fd;
- conn->plan.next = start;
- conn->finish = finish;
- conn->finish_arg = conn->plan.next_arg = arg;
- conn->plan.pollflag = 0;
- conn->plan.state = IO_NEXT;
+ conn->plan = plan;
conn->duplex = old;
+ conn->finish = finish;
+ conn->finish_arg = arg;
conn->timeout = NULL;
if (!add_duplex(conn)) {
free(conn);
return plan;
}
-void io_wake_(struct io_conn *conn,
- struct io_plan (*fn)(struct io_conn *, void *), void *arg)
+void io_wake(struct io_conn *conn, struct io_plan plan)
{
/* It might have finished, but we haven't called its finish() yet. */
if (conn->plan.state == IO_FINISHED)
return;
assert(conn->plan.state == IO_IDLE);
- conn->plan.next = fn;
- conn->plan.next_arg = arg;
- conn->plan.pollflag = 0;
- conn->plan.state = IO_NEXT;
+ conn->plan = plan;
backend_wakeup(conn);
}
}
/* Exit the loop, returning this (non-NULL) arg. */
-struct io_plan io_break_(void *ret,
- struct io_plan (*fn)(struct io_conn *, void *),
- void *arg)
+struct io_plan io_break(void *ret, struct io_plan plan)
{
- struct io_plan plan;
-
io_loop_return = ret;
- plan.state = IO_NEXT;
- plan.pollflag = 0;
- plan.next = fn;
- plan.next_arg = arg;
-
return plan;
}
/**
* io_new_conn - create a new connection.
* @fd: the file descriptor.
- * @start: the first function to call.
+ * @plan: the first I/O function.
* @finish: the function to call when it's closed or fails.
- * @arg: the argument to both @start and @finish.
+ * @arg: the argument to @finish.
*
- * This creates a connection which owns @fd. @start will be called on the
- * next return to io_loop(), and @finish will be called when an I/O operation
+ * This creates a connection which owns @fd. @plan will be called on the
+ * next io_loop(), and @finish will be called when an I/O operation
* fails, or you call io_close() on the connection.
*
- * The @start function must call one of the io queueing functions
- * (eg. io_read, io_write) and return the next function to call once
- * that is done using io_next(). The alternative is to call io_close().
- *
* Returns NULL on error (and sets errno).
*/
-#define io_new_conn(fd, start, finish, arg) \
- io_new_conn_((fd), \
- typesafe_cb_preargs(struct io_plan, void *, \
- (start), (arg), struct io_conn *), \
+#define io_new_conn(fd, plan, finish, arg) \
+ io_new_conn_((fd), (plan), \
typesafe_cb_preargs(void, void *, (finish), (arg), \
struct io_conn *), \
(arg))
struct io_conn *io_new_conn_(int fd,
- struct io_plan (*start)(struct io_conn *, void *),
+ struct io_plan plan,
void (*finish)(struct io_conn *, void *),
void *arg);
/**
* io_duplex - split an fd into two connections.
* @conn: a connection.
- * @start: the first function to call.
+ * @plan: the first I/O function to call.
* @finish: the function to call when it's closed or fails.
- * @arg: the argument to both @start and @finish.
+ * @arg: the argument to @finish.
*
* Sometimes you want to be able to simultaneously read and write on a
* single fd, but io forces a linear call sequence. The solition is
*
* You must io_close() both of them to close the fd.
*/
-#define io_duplex(conn, start, finish, arg) \
- io_duplex_((conn), \
- typesafe_cb_preargs(struct io_plan, void *, \
- (start), (arg), struct io_conn *), \
+#define io_duplex(conn, plan, finish, arg) \
+ io_duplex_((conn), (plan), \
typesafe_cb_preargs(void, void *, (finish), (arg), \
struct io_conn *), \
(arg))
struct io_conn *io_duplex_(struct io_conn *conn,
- struct io_plan (*start)(struct io_conn *, void *),
+ struct io_plan plan,
void (*finish)(struct io_conn *, void *),
void *arg);
/**
- * io_wake - wake up and idle connection.
+ * io_wake - wake up an idle connection.
* @conn: an idle connection.
- * @fn: the next function to call once queued IO is complete.
- * @arg: the argument to @next.
+ * @plan: the next I/O function for @conn.
*
- * This makes @conn run its @next function the next time around the
- * io_loop().
+ * This makes @conn do I/O the next time around the io_loop().
*/
-#define io_wake(conn, fn, arg) \
- io_wake_((conn), \
- typesafe_cb_preargs(struct io_plan, void *, \
- (fn), (arg), struct io_conn *), \
- (arg))
-void io_wake_(struct io_conn *conn,
- struct io_plan (*fn)(struct io_conn *, void *), void *arg);
+void io_wake(struct io_conn *conn, struct io_plan plan);
/**
* io_break - return from io_loop()
* @ret: non-NULL value to return from io_loop().
- * @cb: function to call once on return
- * @arg: @cb argument
+ * @plan: I/O to perform on return (if any)
*
* This breaks out of the io_loop. As soon as the current @next
* function returns, any io_closed()'d connections will have their
* finish callbacks called, then io_loop() with return with @ret.
*
- * If io_loop() is called again, then @cb will be called.
+ * If io_loop() is called again, then @plan will be carried out.
*/
-#define io_break(ret, fn, arg) \
- io_break_((ret), \
- typesafe_cb_preargs(struct io_plan, void *, \
- (fn), (arg), struct io_conn *), \
- (arg))
-struct io_plan io_break_(void *ret,
- struct io_plan (*fn)(struct io_conn *, void *),
- void *arg);
+struct io_plan io_break(void *ret, struct io_plan plan);
/* FIXME: io_recvfrom/io_sendto */
fds[num_fds] = fd;
fd->backend_info = num_fds;
num_fds++;
+ if (events)
+ num_waiting++;
+
return true;
}
assert(n != -1);
assert(n < num_fds);
+ if (pollfds[n].events)
+ num_waiting--;
if (n != num_fds - 1) {
/* Move last one over us. */
pollfds[n] = pollfds[num_fds-1];
{
if (!add_fd(&l->fd, POLLIN))
return false;
- num_waiting++;
return true;
}
+static void adjust_counts(enum io_state state)
+{
+ if (state == IO_NEXT)
+ num_next++;
+ else if (state == IO_FINISHED)
+ num_finished++;
+}
+
+static void update_pollevents(struct io_conn *conn)
+{
+ struct pollfd *pfd = &pollfds[conn->fd.backend_info];
+
+ if (pfd->events)
+ num_waiting--;
+
+ pfd->events = conn->plan.pollflag;
+ if (conn->duplex) {
+ int mask = conn->duplex->plan.pollflag;
+ /* You can't *both* read/write. */
+ assert(!mask || pfd->events != mask);
+ pfd->events |= mask;
+ }
+ if (pfd->events)
+ num_waiting++;
+
+ adjust_counts(conn->plan.state);
+}
+
bool add_conn(struct io_conn *c)
{
- if (!add_fd(&c->fd, 0))
+ if (!add_fd(&c->fd, c->plan.pollflag))
return false;
- num_next++;
+ adjust_counts(c->plan.state);
return true;
}
bool add_duplex(struct io_conn *c)
{
c->fd.backend_info = c->duplex->fd.backend_info;
- num_next++;
+ update_pollevents(c);
return true;
}
static void backend_set_state(struct io_conn *conn, struct io_plan plan)
{
- struct pollfd *pfd = &pollfds[conn->fd.backend_info];
-
- if (pfd->events)
- num_waiting--;
-
- pfd->events = plan.pollflag;
- if (conn->duplex) {
- int mask = conn->duplex->plan.pollflag;
- /* You can't *both* read/write. */
- assert(!mask || pfd->events != mask);
- pfd->events |= mask;
- }
- if (pfd->events)
- num_waiting++;
-
- if (plan.state == IO_NEXT)
- num_next++;
- else if (plan.state == IO_FINISHED)
- num_finished++;
-
conn->plan = plan;
+ update_pollevents(conn);
}
void backend_wakeup(struct io_conn *conn)
{
- num_next++;
+ update_pollevents(conn);
}
static void accept_conn(struct io_listener *l)
#include <sys/wait.h>
#include <stdio.h>
-static struct io_plan start_ok(struct io_conn *conn, int *state)
-{
- ok1(*state == 0);
- (*state)++;
- return io_close(conn, NULL);
-}
-
static void finish_ok(struct io_conn *conn, int *state)
{
ok1(*state == 1);
(*state)++;
- io_break(state + 1, NULL, NULL);
+ io_break(state + 1, io_idle());
}
static void init_conn(int fd, int *state)
{
- if (!io_new_conn(fd, start_ok, finish_ok, state))
+ ok1(*state == 0);
+ (*state)++;
+ if (!io_new_conn(fd, io_close(NULL, NULL), finish_ok, state))
abort();
}
char buf[4];
};
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
-{
- ok1(d->state == 0);
- d->state++;
- return io_read(d->buf, sizeof(d->buf), io_close, d);
-}
-
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
- io_break(d, NULL, NULL);
+ io_break(d, io_idle());
}
static void init_conn(int fd, struct data *d)
{
- if (!io_new_conn(fd, start_ok, finish_ok, d))
+ ok1(d->state == 0);
+ d->state++;
+
+ if (!io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close, d),
+ finish_ok, d))
abort();
}
char buf[4];
};
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
-{
- ok1(d->state == 0);
- d->state++;
- d->bytes = sizeof(d->buf);
- return io_read_partial(d->buf, &d->bytes, io_close, d);
-}
-
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
- io_break(d, NULL, NULL);
+ io_break(d, io_idle());
}
static void init_conn(int fd, struct data *d)
{
- if (!io_new_conn(fd, start_ok, finish_ok, d))
+ ok1(d->state == 0);
+ d->state++;
+ d->bytes = sizeof(d->buf);
+
+ if (!io_new_conn(fd, io_read_partial(d->buf, &d->bytes, io_close, d),
+ finish_ok, d))
abort();
}
char *buf;
};
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
-{
- ok1(d->state == 0);
- d->state++;
- return io_write_partial(d->buf, &d->bytes, io_close, d);
-}
-
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
- io_break(d, NULL, NULL);
+ io_break(d, io_idle());
}
static void init_conn(int fd, struct data *d)
{
- if (!io_new_conn(fd, start_ok, finish_ok, d))
+ ok1(d->state == 0);
+ d->state++;
+ if (!io_new_conn(fd, io_write_partial(d->buf, &d->bytes, io_close, d),
+ finish_ok, d))
abort();
}
char *buf;
};
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
-{
- ok1(d->state == 0);
- d->state++;
- return io_write(d->buf, d->bytes, io_close, d);
-}
-
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
- io_break(d, NULL, NULL);
+ io_break(d, io_idle());
}
static void init_conn(int fd, struct data *d)
{
- if (!io_new_conn(fd, start_ok, finish_ok, d))
+ ok1(d->state == 0);
+ d->state++;
+ if (!io_new_conn(fd, io_write(d->buf, d->bytes, io_close, d),
+ finish_ok, d))
abort();
}
char buf[4];
};
-static struct io_plan plan_read(struct io_conn *conn, struct data *d)
+static struct io_plan read_done(struct io_conn *conn, struct data *d)
{
ok1(d->state == 2 || d->state == 3);
d->state++;
- return io_read(d->buf, sizeof(d->buf), io_close, d);
+ return io_close(conn, NULL);
}
-static struct io_plan start_waker(struct io_conn *conn, struct data *d)
+static void finish_waker(struct io_conn *conn, struct data *d)
{
+ io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d));
ok1(d->state == 1);
d->state++;
-
- io_wake(idler, plan_read, d);
- return io_close(conn, NULL);
}
-static void finish_waker(struct io_conn *conn, struct data *d)
+static void finish_idle(struct io_conn *conn, struct data *d)
{
- ok1(d->state == 2 || d->state == 3);
+ ok1(d->state == 3);
d->state++;
+ io_break(d, io_idle());
}
-static struct io_plan start_idle(struct io_conn *conn, struct data *d)
+static void init_conn(int fd, struct data *d)
{
- int fd;
+ int fd2;
ok1(d->state == 0);
d->state++;
- idler = conn;
+ idler = io_new_conn(fd, io_idle(), finish_idle, d);
- /* This will wake us up. */
- fd = open("/dev/null", O_RDONLY);
- ok1(fd >= 0);
- ok1(io_new_conn(fd, start_waker, finish_waker, d));
-
- return io_idle();
-}
-
-static void finish_idle(struct io_conn *conn, struct data *d)
-{
- ok1(d->state == 4);
- d->state++;
- io_break(d, NULL, NULL);
-}
-
-static void init_conn(int fd, struct data *d)
-{
- if (!io_new_conn(fd, start_idle, finish_idle, d))
- abort();
+ /* This will wake us up, as read will fail. */
+ fd2 = open("/dev/null", O_RDONLY);
+ ok1(fd2 >= 0);
+ ok1(io_new_conn(fd2, io_read(idler, 1, NULL, NULL), finish_waker, d));
}
static int make_listen_fd(const char *port, struct addrinfo **info)
int fd, status;
/* This is how many tests you plan to run */
- plan_tests(15);
+ plan_tests(14);
d->state = 0;
fd = make_listen_fd("65006", &addrinfo);
ok1(fd >= 0);
freeaddrinfo(addrinfo);
ok1(io_loop() == d);
- ok1(d->state == 5);
+ ok1(d->state == 4);
ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
free(d);
io_close_listener(l);
char buf[4];
};
-static struct io_plan plan_read(struct io_conn *conn, struct data *d)
+static struct io_plan read_done(struct io_conn *conn, struct data *d)
{
ok1(d->state == 1);
d->state++;
- return io_read(d->buf, sizeof(d->buf), io_close, d);
-}
-
-static struct io_plan start_break(struct io_conn *conn, struct data *d)
-{
- ok1(d->state == 0);
- d->state++;
- return io_break(d, plan_read, d);
+ return io_close(conn, NULL);
}
static void finish_ok(struct io_conn *conn, struct data *d)
static void init_conn(int fd, struct data *d)
{
- if (!io_new_conn(fd, start_break, finish_ok, d))
+ ok1(d->state == 0);
+ d->state++;
+
+ if (!io_new_conn(fd,
+ io_break(d,
+ io_read(d->buf, sizeof(d->buf), read_done, d)),
+ finish_ok, d))
abort();
}
char buf[32];
};
-static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf);
static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
-static struct io_plan plan_read(struct io_conn *conn, struct buffer *buf)
-{
- assert(conn == buf->reader);
-
- return io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf);
-}
-
-static struct io_plan plan_write(struct io_conn *conn, struct buffer *buf)
-{
- assert(conn == buf->writer);
-
- return io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf);
-}
-
static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->reader);
return io_close(conn, NULL);
/* You write. */
- io_wake(buf->writer, plan_write, buf);
+ io_wake(buf->writer,
+ io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
/* I'll wait until you wake me. */
return io_idle();
{
assert(conn == buf->writer);
/* You read. */
- io_wake(buf->reader, plan_read, buf);
+ io_wake(buf->reader,
+ io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
if (++buf->iters == NUM_ITERS)
return io_close(conn, NULL);
return io_idle();
}
-static struct io_plan reader(struct io_conn *conn, struct buffer *buf)
-{
- assert(conn == buf->reader);
-
- /* Wait for writer to tell us to read. */
- return io_idle();
-}
-
static struct buffer buf[NUM];
int main(void)
memset(buf[i].buf, i, sizeof(buf[i].buf));
sprintf(buf[i].buf, "%i-%i", i, i);
- buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+ /* Wait for writer to tell us to read. */
+ buf[i].reader = io_new_conn(last_read, io_idle(), NULL, &buf[i]);
if (!buf[i].reader)
break;
- buf[i].writer = io_new_conn(fds[1], plan_write, NULL, &buf[i]);
+ buf[i].writer = io_new_conn(fds[1],
+ io_write(&buf[i].buf,
+ sizeof(buf[i].buf),
+ poke_reader, &buf[i]),
+ NULL, &buf[i]);
if (!buf[i].writer)
break;
last_read = fds[0];
/* Last one completes the cirle. */
i = 0;
sprintf(buf[i].buf, "%i-%i", i, i);
- buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+ buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL);
ok1(buf[i].reader);
- buf[i].writer = io_new_conn(last_write, plan_write, NULL, &buf[i]);
+ buf[i].writer = io_new_conn(last_write,
+ io_write(&buf[i].buf, sizeof(buf[i].buf),
+ poke_reader, &buf[i]),
+ NULL, NULL);
ok1(buf[i].writer);
/* They should eventually exit */
d->state++;
}
-static struct io_plan write_out(struct io_conn *conn, struct data *d)
+static struct io_plan write_done(struct io_conn *conn, struct data *d)
{
d->state++;
- return io_write(d->wbuf, sizeof(d->wbuf), io_close, d);
+ return io_close(conn, NULL);
}
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
+static void init_conn(int fd, struct data *d)
{
+ struct io_conn *conn;
+
ok1(d->state == 0);
d->state++;
io_close_listener(d->l);
memset(d->wbuf, 7, sizeof(d->wbuf));
- ok1(io_duplex(conn, write_out, finish_ok, d));
- return io_read(d->buf, sizeof(d->buf), io_close, d);
-}
-static void init_conn(int fd, struct data *d)
-{
- if (!io_new_conn(fd, start_ok, finish_ok, d))
- abort();
+ conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close, d),
+ finish_ok, d);
+ ok1(io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d),
+ finish_ok, d));
}
static int make_listen_fd(const char *port, struct addrinfo **info)
#include <stdio.h>
#include <signal.h>
-static struct io_plan start(struct io_conn *conn, void *unused)
-{
- return io_idle();
-}
-
int main(void)
{
int status;
int fds[2];
ok1(pipe(fds) == 0);
- io_new_conn(fds[0], start, NULL, NULL);
+ io_new_conn(fds[0], io_idle(), NULL, NULL);
io_loop();
exit(1);
}
return io_close(conn, d);
}
-static struct io_plan start_ok(struct io_conn *conn, struct data *d)
-{
- ok1(d->state == 0);
- d->state++;
- io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
- return io_read(d->buf, sizeof(d->buf), no_timeout, d);
-}
-
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 2);
d->state++;
- io_break(d, NULL, NULL);
+ io_break(d, io_idle());
}
static void init_conn(int fd, struct data *d)
{
- if (!io_new_conn(fd, start_ok, finish_ok, d))
- abort();
+ struct io_conn *conn;
+
+ ok1(d->state == 0);
+ d->state++;
+
+ conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d),
+ finish_ok, d);
+ io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)