* #include <signal.h>
* #include <sys/types.h>
* #include <sys/wait.h>
+ * #include <string.h>
*
* struct buffer {
- * size_t max, off, rlen;
- * char *buf;
+ * bool finished;
+ * size_t start, end, rlen, wlen;
+ * char buf[4096];
* };
*
- * struct stdin_buffer {
- * struct io_conn *reader, *writer;
- * size_t len;
- * char inbuf[4096];
- * };
- *
- * // This reads from stdin.
- * static struct io_plan wake_writer(struct io_conn *, struct stdin_buffer *);
- * // This writes the stdin buffer to the child.
- * static struct io_plan wake_reader(struct io_conn *, struct stdin_buffer *);
- *
- * static struct io_plan wake_writer(struct io_conn *c, struct stdin_buffer *b)
+ * static void finish(struct io_conn *c, struct buffer *b)
* {
- * assert(c == b->reader);
- * io_wake(b->writer, io_write(b->inbuf, b->len, wake_reader, b));
- * return io_idle();
+ * // Mark us finished.
+ * b->finished = true;
+ * // Wake writer just in case it's asleep.
+ * io_wake(b);
* }
*
- * static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
+ * static struct io_plan read_in(struct io_conn *c, struct buffer *b)
* {
- * assert(c == b->reader);
- * io_wake(b->writer, io_close());
- * b->reader = NULL;
- * }
+ * // Add what we just read.
+ * b->end += b->rlen;
+ * assert(b->end <= sizeof(b->buf));
*
- * static struct io_plan wake_reader(struct io_conn *c, struct stdin_buffer *b)
- * {
- * assert(c == b->writer);
- * if (!b->reader)
- * return io_close();
- * b->len = sizeof(b->inbuf);
- * io_wake(b->reader, io_read_partial(b->inbuf, &b->len, wake_writer, b));
- * return io_idle();
- * }
+ * // If we just read something, wake writer.
+ * if (b->rlen != 0)
+ * io_wake(b);
*
- * static void fail_child_write(struct io_conn *conn, struct stdin_buffer *b)
- * {
- * if (b->reader)
- * err(1, "Failed writing to child.");
+ * // If buffer is empty, return to start.
+ * if (b->start == b->end)
+ * b->start = b->end = 0;
+ *
+ * // Read in some of the rest.
+ * b->rlen = sizeof(b->buf) - b->end;
+ *
+ * // No room? Wait for writer
+ * if (b->rlen == 0)
+ * return io_wait(b, read_in, b);
+ *
+ * return io_read_partial(b->buf + b->end, &b->rlen, read_in, b);
* }
*
- * // This reads from the child and saves it into buffer.
- * static struct io_plan read_from_child(struct io_conn *conn,
- * struct buffer *b)
+ * static struct io_plan write_out(struct io_conn *c, struct buffer *b)
* {
- * b->off += b->rlen;
- *
- * if (b->off == b->max)
- * b->buf = realloc(b->buf, b->max *= 2);
+ * // Remove what we just wrote.
+ * b->start += b->wlen;
+ * assert(b->start <= sizeof(b->buf));
+ *
+ * // If we wrote somthing, wake writer.
+ * if (b->wlen != 0)
+ * io_wake(b);
+ *
+ * b->wlen = b->end - b->start;
+ * // Nothing to write? Wait for reader.
+ * if (b->wlen == 0) {
+ * if (b->finished)
+ * return io_close();
+ * return io_wait(b, write_out, b);
+ * }
*
- * b->rlen = b->max - b->off;
- * return io_read_partial(b->buf + b->off, &b->rlen, read_from_child, b);
+ * return io_write_partial(b->buf + b->start, &b->wlen, write_out, b);
* }
*
* // Feed a program our stdin, gather its stdout, print that at end.
* int main(int argc, char *argv[])
* {
* int tochild[2], fromchild[2];
- * struct buffer out;
- * struct stdin_buffer sbuf;
+ * struct buffer to, from;
* int status;
- * size_t off;
- * ssize_t ret;
- * struct io_conn *from_child;
+ * struct io_conn *reader;
*
* if (argc == 1)
* errx(1, "Usage: runner <cmdline>...");
* close(fromchild[1]);
* signal(SIGPIPE, SIG_IGN);
*
- * sbuf.len = sizeof(sbuf.inbuf);
- * sbuf.reader = io_new_conn(STDIN_FILENO,
- * io_read_partial(sbuf.inbuf, &sbuf.len,
- * wake_writer, &sbuf));
- * sbuf.writer = io_new_conn(tochild[1], io_idle());
- *
- * out.max = 128;
- * out.off = 0;
- * out.rlen = 128;
- * out.buf = malloc(out.max);
- * from_child = io_new_conn(fromchild[0],
- * io_read_partial(out.buf, &out.rlen,
- * read_from_child, &out));
- * if (!sbuf.reader || !sbuf.writer || !from_child)
- * err(1, "Allocating connections");
- *
- * io_set_finish(sbuf.reader, reader_exit, &sbuf);
- * io_set_finish(sbuf.writer, fail_child_write, &sbuf);
+ * // Read from stdin, write to child.
+ * memset(&to, 0, sizeof(to));
+ * reader = io_new_conn(STDIN_FILENO, read_in(NULL, &to));
+ * io_set_finish(reader, finish, &to);
+ * io_new_conn(tochild[1], write_out(NULL, &to));
+ *
+ * // Read from child, write to stdout.
+ * reader = io_new_conn(fromchild[0], read_in(NULL, &from));
+ * io_set_finish(reader, finish, &from);
+ * io_new_conn(STDOUT_FILENO, write_out(NULL, &from));
*
* io_loop();
* wait(&status);
*
- * for (off = 0; off < out.off; off += ret) {
- * ret = write(STDOUT_FILENO, out.buf+off, out.off-off);
- * if (ret < 0)
- * err(1, "Writing stdout");
- * }
- * free(out.buf);
- *
* return WIFEXITED(status) ? WEXITSTATUS(status) : 2;
* }
*
bool add_duplex(struct io_conn *c);
void del_listener(struct io_listener *l);
void backend_plan_changed(struct io_conn *conn);
+void backend_wait_changed(const void *wait);
void backend_add_timeout(struct io_conn *conn, struct timespec ts);
void backend_del_timeout(struct io_conn *conn);
void backend_del_conn(struct io_conn *conn);
struct io_conn *current;
/* User-defined function to select which connection(s) to debug. */
bool (*io_debug_conn)(struct io_conn *conn);
-/* Set when we wake up an connection we are debugging. */
-bool io_debug_wakeup;
struct io_plan io_debug(struct io_plan plan)
{
return plan;
}
- if (!current || !doing_debug_on(current)) {
- if (!io_debug_wakeup)
- return plan;
- }
+ if (!current || !doing_debug_on(current))
+ return plan;
- io_debug_wakeup = false;
current->plan = plan;
backend_plan_changed(current);
/* Return a do-nothing plan, so backend_plan_changed in
* io_ready doesn't do anything (it's already been called). */
- return io_idle_();
+ return io_wait_(NULL, (void *)1, NULL);
}
int io_debug_io(int ret)
return 2;
}
-static void debug_io_wake(struct io_conn *conn)
-{
- /* We want linear if we wake a debugged connection, too. */
- if (io_debug_conn && io_debug_conn(conn))
- io_debug_wakeup = true;
-}
-
/* Counterpart to io_plan_no_debug(), called in macros in io.h */
static void io_plan_debug_again(void)
{
io_plan_nodebug = false;
}
#else
-static void debug_io_wake(struct io_conn *conn)
-{
-}
static void io_plan_debug_again(void)
{
}
return plan;
}
-struct io_plan io_idle_(void)
+struct io_plan io_wait_(const void *wait,
+ struct io_plan (*cb)(struct io_conn *, void*),
+ void *arg)
{
struct io_plan plan;
+ assert(cb);
plan.pollflag = 0;
plan.io = NULL;
- /* Never called (overridden by io_wake), but NULL means closing */
- plan.next = (void *)io_idle_;
+ plan.next = cb;
+ plan.next_arg = arg;
- return plan;
-}
+ plan.u1.const_vp = wait;
-bool io_is_idle(const struct io_conn *conn)
-{
- return conn->plan.io == NULL;
+ return plan;
}
-void io_wake_(struct io_conn *conn, struct io_plan plan)
-
+void io_wake(const void *wait)
{
- io_plan_debug_again();
-
- /* It might be closing, but we haven't called its finish() yet. */
- if (!conn->plan.next)
- return;
- /* It was idle, right? */
- assert(!conn->plan.io);
- conn->plan = plan;
- backend_plan_changed(conn);
-
- debug_io_wake(conn);
+ backend_wait_changed(wait);
}
void io_ready(struct io_conn *conn)
struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
void *arg);
+
/**
* io_connect - plan to connect to a listening socket.
* @fd: file descriptor.
void *arg);
/**
- * io_idle - plan to do nothing.
+ * io_wait - plan to wait for something.
+ * @wait: the address to wait on.
+ * @cb: function to call after waiting.
+ * @arg: @cb argument
*
- * This indicates the connection is idle: io_wake() will be called later do
- * give the connection a new plan.
+ * This indicates the connection is idle: io_wake() will be called later to
+ * restart the connection.
*
* Example:
* struct io_conn *sleeper;
- * sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ * unsigned int counter = 0;
+ * sleeper = io_new_conn(open("/dev/null", O_RDONLY),
+ * io_wait(&counter, io_close_cb, NULL));
* if (!sleeper)
* exit(1);
*/
-#define io_idle() io_debug(io_idle_())
-struct io_plan io_idle_(void);
+#define io_wait(wait, cb, arg) \
+ io_debug(io_wait_(wait, \
+ typesafe_cb_preargs(struct io_plan, void *, \
+ (cb), (arg), \
+ struct io_conn *), \
+ (arg)))
+
+struct io_plan io_wait_(const void *wait,
+ struct io_plan (*cb)(struct io_conn *, void *),
+ void *arg);
/**
* io_timeout - set timeout function if the callback doesn't complete.
struct io_conn *io_duplex_(struct io_conn *conn, struct io_plan plan);
/**
- * io_wake - wake up an idle connection.
- * @conn: an idle connection.
- * @plan: the next I/O plan for @conn.
- *
- * This makes @conn ready to do I/O the next time around the io_loop().
+ * io_wake - wake up any connections waiting on @wait
+ * @wait: the address to trigger.
*
* Example:
- * struct io_conn *sleeper;
- * sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
- *
- * io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL));
- */
-#define io_wake(conn, plan) (io_plan_no_debug(), io_wake_((conn), (plan)))
-void io_wake_(struct io_conn *conn, struct io_plan plan);
-
-/**
- * io_is_idle - is a connection idle?
- *
- * This can be useful for complex protocols, eg. where you want a connection
- * to send something, so you queue it and wake it if it's idle.
+ * unsigned int wait;
*
- * Example:
- * struct io_conn *sleeper;
- * sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ * io_new_conn(open("/dev/null", O_RDONLY),
+ * io_wait(&wait, io_close_cb, NULL));
*
- * assert(io_is_idle(sleeper));
- * io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL));
+ * io_wake(&wait);
*/
-bool io_is_idle(const struct io_conn *conn);
+void io_wake(const void *wait);
/**
* io_break - return from io_loop()
some_always = true;
}
+void backend_wait_changed(const void *wait)
+{
+ unsigned int i;
+
+ for (i = 0; i < num_fds; i++) {
+ struct io_conn *c, *duplex;
+
+ /* Ignore listeners */
+ if (fds[i]->listener)
+ continue;
+ c = (void *)fds[i];
+ for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
+ /* Ignore closing. */
+ if (!c->plan.next)
+ continue;
+ /* Not idle? */
+ if (c->plan.io)
+ continue;
+ /* Waiting on something else? */
+ if (c->plan.u1.const_vp != wait)
+ continue;
+ /* Make it do the next thing. */
+ c->plan = io_always_(c->plan.next, c->plan.next_arg);
+ backend_plan_changed(c);
+ }
+ }
+}
+
bool add_conn(struct io_conn *c)
{
if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT)))
static void finish_waker(struct io_conn *conn, struct data *d)
{
- ok1(io_is_idle(idler));
- io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d));
+ io_wake(d);
ok1(d->state == 1);
d->state++;
}
abort();
}
+static struct io_plan read_buf(struct io_conn *conn, struct data *d)
+{
+ return io_read(d->buf, sizeof(d->buf), read_done, d);
+}
+
static void init_conn(int fd, struct data *d)
{
int fd2;
ok1(d->state == 0);
d->state++;
- idler = io_new_conn(fd, io_idle());
+ idler = io_new_conn(fd, io_wait(d, read_buf, d));
io_set_finish(idler, finish_idle, d);
/* This will wake us up, as read will fail. */
int fd, status;
/* This is how many tests you plan to run */
- plan_tests(14);
+ plan_tests(13);
d->state = 0;
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
return io_read(buf, 16, io_close_cb, NULL);
}
+static struct io_plan never(struct io_conn *conn, void *unused)
+{
+ abort();
+}
+
int main(void)
{
int fds[2];
/* Write then close. */
io_new_conn(fds[1], io_write("hello there world", 16,
io_close_cb, NULL));
- conn = io_new_conn(fds[0], io_idle());
+ conn = io_new_conn(fds[0], io_wait(buf, never, NULL));
/* To avoid assert(num_waiting) */
ok1(pipe(fds2) == 0);
static struct io_plan wake_it(struct io_conn *conn, struct io_conn *reader)
{
- io_wake(reader, io_read(inbuf, 8, io_close_cb, NULL));
+ io_wake(inbuf);
return io_close();
}
+static struct io_plan read_buf(struct io_conn *conn, void *unused)
+{
+ return io_read(inbuf, 8, io_close_cb, NULL);
+}
+
int main(void)
{
int fds[2];
plan_tests(3);
ok1(pipe(fds) == 0);
- conn = io_new_conn(fds[0], io_idle());
+ conn = io_new_conn(fds[0], io_wait(inbuf, read_buf, NULL));
io_new_conn(fds[1], io_write("EASYTEST", 8, wake_it, conn));
ok1(io_loop() == NULL);
};
static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
+static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf);
+
+static struct io_plan read_buf(struct io_conn *conn, struct buffer *buf)
+{
+ return io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf);
+}
static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
{
return io_close();
/* You write. */
- io_wake(buf->writer,
- io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
+ io_wake(&buf->writer);
/* I'll wait until you wake me. */
- return io_idle();
+ return io_wait(&buf->reader, read_buf, buf);
+}
+
+static struct io_plan write_buf(struct io_conn *conn, struct buffer *buf)
+{
+ return io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf);
}
static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
{
assert(conn == buf->writer);
/* You read. */
- io_wake(buf->reader,
- io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
+ io_wake(&buf->reader);
if (++buf->iters == NUM_ITERS)
return io_close();
/* I'll wait until you tell me to write. */
- return io_idle();
+ return io_wait(&buf->writer, write_buf, buf);
}
static struct buffer buf[NUM];
sprintf(buf[i].buf, "%i-%i", i, i);
/* Wait for writer to tell us to read. */
- buf[i].reader = io_new_conn(last_read, io_idle());
+ buf[i].reader = io_new_conn(last_read,
+ io_wait(&buf[i].reader, read_buf,
+ &buf[i]));
if (!buf[i].reader)
break;
- buf[i].writer = io_new_conn(fds[1],
- io_write(&buf[i].buf,
- sizeof(buf[i].buf),
- poke_reader, &buf[i]));
+ buf[i].writer = io_new_conn(fds[1], write_buf(NULL, &buf[i]));
if (!buf[i].writer)
break;
last_read = fds[0];
/* Last one completes the cirle. */
i = 0;
sprintf(buf[i].buf, "%i-%i", i, i);
- buf[i].reader = io_new_conn(last_read, io_idle());
+ buf[i].reader = io_new_conn(last_read,
+ io_wait(&buf[i].reader, read_buf, &buf[i]));
ok1(buf[i].reader);
- buf[i].writer = io_new_conn(last_write,
- io_write(&buf[i].buf, sizeof(buf[i].buf),
- poke_reader, &buf[i]));
+ buf[i].writer = io_new_conn(last_write, write_buf(NULL, &buf[i]));
ok1(buf[i].writer);
/* They should eventually exit */
int fds[2];
ok1(pipe(fds) == 0);
- io_new_conn(fds[0], io_idle());
+ io_new_conn(fds[0], io_wait(&status, io_close_cb, NULL));
io_loop();
exit(1);
}
static struct io_plan write_done(struct io_conn *conn, struct data *d)
{
d->state++;
- return io_idle();
+ return io_wait(d, io_close_cb, NULL);
}
static struct io_plan read_done(struct io_conn *conn, struct data *d)