From: Rusty Russell Date: Sat, 22 Jan 2011 03:40:51 +0000 (+1030) Subject: lca2011: rewrite to a serious server X-Git-Url: http://git.ozlabs.org/?p=ccan-lca-2011.git;a=commitdiff_plain;h=5807ed8dff7520582ee88f16c98b3965389bf8ff lca2011: rewrite to a serious server This uses tevent and callback, and a simple state machine. --- diff --git a/ccan/oserver/_info b/ccan/oserver/_info index 6879f42..c5f5b4b 100644 --- a/ccan/oserver/_info +++ b/ccan/oserver/_info @@ -18,7 +18,7 @@ * * int main(int argc, char *argv[]) * { - * int fd, sockfd; + * struct tevent_context *ev = tevent_context_init(NULL); * unsigned int port = OSERVER_PORT; * * opt_register_noarg("--help|--usage|-h", opt_usage_and_exit, @@ -30,17 +30,11 @@ * if (argc != 1) * opt_log_stderr_exit("Unknown extra arguments"); * - * sockfd = oserver_setup(port); - * if (sockfd < 0) - * err(1, "Failed to set up server socket"); + * if (!oserver_setup(ev, port)) + * err(1, "Failed to set up server"); * - * fd = accept(sockfd, NULL, NULL); - * if (fd < 0) - * err(1, "Accepting connection on TCP socket"); - * - * if (!oserver_serve(fd)) - * err(1, "Serving client"); - * exit(0); + * while (tevent_loop_wait(ev) == 0); + * err(1, "Serving client"); * } */ int main(int argc, char *argv[]) @@ -56,6 +50,8 @@ int main(int argc, char *argv[]) printf("ccan/noerr\n"); printf("ccan/failtest\n"); printf("ccan/opt\n"); + printf("ccan/array_size\n"); + printf("ccan/tevent\n"); return 0; } diff --git a/ccan/oserver/oserver.c b/ccan/oserver/oserver.c index 20edfcb..4503aa4 100644 --- a/ccan/oserver/oserver.c +++ b/ccan/oserver/oserver.c @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include #include #include @@ -13,58 +15,196 @@ #include #include #include +#include -bool oserver_serve(int fd) +enum state { + RECEIVING_USER_QUESTION, + SENDING_ANSWER, + FINISHED +}; + +static uint16_t state_flag_map[] = { + [RECEIVING_USER_QUESTION] = TEVENT_FD_READ, + [SENDING_ANSWER] = TEVENT_FD_WRITE, + [FINISHED] = 0 +}; + +struct client { + /* What are we doing today, brain? */ + enum state state; + /* Our event info, and the file descriptor. */ + struct tevent_fd *fde; + int fd; + /* The question we read from client. */ + char *question; + /* How many bytes of the reply we sent so far. */ + size_t bytes_sent; +}; + +/* 5 clients should be enough for anybody! */ +static struct client *clients[5]; +static int sfd; +static struct tevent_fd *sfde; + +static ssize_t write_string(int fd, const char *str) { - char buf[1024]; - unsigned int i; + return write(fd, str, strlen(str)); +} - for (i = 0; i < sizeof(buf)-1; i++) { - if (read(fd, &buf[i], 1) != 1) { - errno = EIO; - return false; - } - if (buf[i] == '\n' || buf[i] == '\r') - break; - buf[i] = toupper(buf[i]); +static ssize_t read_string(int fd, char **buf) +{ + ssize_t ret, len, maxlen; + + len = strlen(*buf); + maxlen = talloc_array_length(*buf); + + if (maxlen < len + 100) { + maxlen += 100; + *buf = talloc_realloc(NULL, *buf, char, maxlen); } - buf[i] = '\0'; - if (!write_all(fd, "Louder, like this: '", - strlen("Louder, like this: '")) - || !write_all(fd, buf, i) - || !write_all(fd, "'\r\n", strlen("'\r\n"))) + ret = read(fd, *buf + len, maxlen - len - 1); + if (ret >= 0) + (*buf)[len + ret] = '\0'; + return ret; +} + +static bool input_finished(const char *str) +{ + return strchr(str, '\n'); +} + +/* Update state, and set our READ/WRITE flags appropriately. */ +static void set_state(struct client *c, enum state state) +{ + c->state = state; + tevent_fd_set_flags(c->fde, state_flag_map[state]); +} + +/* Returns false on error, increments state on finishing string. */ +static bool send_string(struct client *c, const char *str) +{ + ssize_t len = write_string(c->fd, str + c->bytes_sent); + if (len < 0) return false; + c->bytes_sent += len; + if (c->bytes_sent == strlen(str)) { + c->bytes_sent = 0; + set_state(c, c->state+1); + } return true; } -int oserver_setup(unsigned short port) +static void service_client(struct tevent_context *ev, + struct tevent_fd *fde, uint16_t flags, void *_c) +{ + struct client *c = _c; + ssize_t len; + + switch (c->state) { + case RECEIVING_USER_QUESTION: + len = read_string(c->fd, &c->question); + if (len <= 0) + goto fail; + if (input_finished(c->question)) { + unsigned int i; + + for (i = 0; c->question[i]; i++) + c->question[i] = toupper(c->question[i]); + set_state(c, SENDING_ANSWER); + } + break; + case SENDING_ANSWER: + if (!send_string(c, c->question)) + goto fail; + break; + default: + goto fail; + } + + if (c->state != FINISHED) + return; + +fail: + talloc_free(c); +} + +static int cleanup_client(struct client *client) +{ + unsigned int i; + + for (i = 0; i < ARRAY_SIZE(clients); i++) { + if (clients[i] == client) { + clients[i] = NULL; + tevent_fd_set_flags(sfde, TEVENT_FD_READ); + return 0; + } + } + abort(); +} + +static void add_client(struct tevent_context *ev, + struct tevent_fd *fde, uint16_t flags, void *unused) +{ + struct client *client; + unsigned int i; + + client = talloc(sfde, struct client); + client->fd = accept(sfd, NULL, 0); + if (client->fd < 0) + err(1, "Accepting client connection"); + + client->state = RECEIVING_USER_QUESTION; + client->bytes_sent = 0; + client->question = talloc_strdup(client, ""); + client->fde = tevent_add_fd(ev, client, client->fd, + state_flag_map[client->state], + service_client, client); + tevent_fd_set_auto_close(client->fde); + + /* Find empty slot in array for this client. */ + for (i = 0; clients[i]; i++); + clients[i] = client; + talloc_set_destructor(client, cleanup_client); + + /* Full? Stop listening... */ + if (i == ARRAY_SIZE(clients)-1) + tevent_fd_set_flags(sfde, 0); +} + +void *oserver_setup(struct tevent_context *ev, unsigned short port) { - int sockfd; int one = 1; union { struct sockaddr addr; struct sockaddr_in in; } u; - sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (sockfd < 0) - return -1; + sfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (sfd < 0) + return false; - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) warn("Setting socket reuse"); u.in.sin_family = AF_INET; u.in.sin_port = htons(port); u.in.sin_addr.s_addr = INADDR_ANY; - if (bind(sockfd, &u.addr, sizeof(u.in)) == -1) { - close_noerr(sockfd); - return -1; + if (bind(sfd, &u.addr, sizeof(u.in)) == -1) { + close_noerr(sfd); + return NULL; } - if (listen(sockfd, 0) != 0) { - close_noerr(sockfd); - return -1; + if (listen(sfd, 0) != 0) { + close_noerr(sfd); + return NULL; } - return sockfd; + + sfde = tevent_add_fd(ev, ev, sfd, TEVENT_FD_READ, add_client, NULL); + tevent_fd_set_auto_close(sfde); + + /* Don't kill us if client dies. */ + signal(SIGPIPE, SIG_IGN); + + return sfde; } diff --git a/ccan/oserver/oserver.h b/ccan/oserver/oserver.h index f71f3e2..e1f3cc7 100644 --- a/ccan/oserver/oserver.h +++ b/ccan/oserver/oserver.h @@ -1,43 +1,29 @@ #ifndef CCAN_OSERVER_H #define CCAN_OSERVER_H #include +#include /** - * oserver_setup - get a listening filedescriptor for an oserver + * oserver_setup - set up an oserver + * @ev: tevent context to use. * @port: port to use (usually OSERVER_PORT) + * * Opens a socket and binds it to @port, then sets it up to listen - * for connections and returns it. + * for connections. talloc_free() the pointer returned to shut it down. * * Example: - * int serverfd; + * void *oserver; + * struct tevent_context *ev; * - * serverfd = oserver_setup(OSERVER_PORT); - * if (serverfd < 0) + * ev = tevent_context_init(NULL); + * oserver = oserver_setup(ev, OSERVER_PORT); + * if (!oserver) * err(1, "Failed to set up server"); - */ -int oserver_setup(unsigned short port); - -/** - * oserver_serve - serve an oserver client via a file descriptor - * @fd: the file descriptor (usually a connected socket) - * - * This returns false (with errno set) on failure. * - * Example: - * #include - * #include - * ... - * int clientfd; - * ... - * clientfd = accept(serverfd, NULL, NULL); - * if (clientfd < 0) - * err(1, "Accepting connection from client"); - * if (fork() == 0) - * exit(oserver_serve(clientfd) ? 0 : 1); - * else - * close(clientfd); + * while (tevent_loop_wait(ev) == 0); + * err(1, "Event loop failed"); */ -bool oserver_serve(int fd); +void *oserver_setup(struct tevent_context *ev, unsigned short port); #define OSERVER_PORT 2727 #endif /* CCAN_OSERVER_H */ diff --git a/ccan/oserver/test/run.c b/ccan/oserver/test/run.c index b8e9dc3..f99e40b 100644 --- a/ccan/oserver/test/run.c +++ b/ccan/oserver/test/run.c @@ -1,54 +1,130 @@ -#include #include #include #include -#include #include #include +#include #include -#include #include #include -#include -static void exit_test(void) +static void exit_quietly(struct tevent_context *ev, + struct tevent_fd *fde, uint16_t flags, void *unused) { - failtest_exit(exit_status()); + talloc_free(ev); + exit(0); +} + +static void run_server(int readyfd, int exitfd) +{ + struct tevent_context *ev = tevent_context_init(NULL); + + if (oserver_setup(ev, OSERVER_PORT) == NULL) + exit(1); + + /* Tell parent we are ready to go. */ + write(readyfd, "", 1); + + tevent_add_fd(ev, ev, exitfd, TEVENT_FD_READ, exit_quietly, NULL); + while (tevent_loop_wait(ev) == 0); +} + +static bool write_sall(int fd, const char *str) +{ + while (str[0]) { + ssize_t len = write(fd, str, strlen(str)); + if (len < 0) + return false; + str += len; + } + return true; +} + +static bool input_is(int fd, const char *str) +{ + while (str[0]) { + char buffer[1000]; + ssize_t len = read(fd, buffer, strlen(str)); + if (len < 0) + return false; + if (strncmp(str, buffer, len) != 0) + return false; + str += len; + } + return true; +} + +static bool no_input(int fd) +{ + fd_set set; + struct timeval t = { 0, 0 }; + + FD_ZERO(&set); + FD_SET(fd, &set); + return (select(fd+1, &set, NULL, NULL, &t) == 0); } int main(int argc, char *argv[]) { - int fd; - char buf[200]; - const char *input; + int readyfd[2], exitfd[2]; + union { + struct sockaddr addr; + struct sockaddr_in in; + } u; + int sfd1, sfd2; + char c; /* This is how many tests you plan to run */ - plan_tests(3 * 6); - failtest_init(argc, argv); - tap_fail_callback = exit_test; - - foreach_ptr(input, - "This is a test\n", - "This is a test\r", - "This is a test\r\n", - "This is a test\nWith extra", - "This is a test\rWith extra", - "This is a test\r\nWith extra") { - fd = open("run-fd", O_RDWR|O_CREAT|O_TRUNC, 0600); - - write(fd, input, strlen(input)); - lseek(fd, 0, SEEK_SET); - - ok1(oserver_serve(fd)); - - lseek(fd, 0, SEEK_SET); - buf[read(fd, buf, sizeof(buf)-1)] = '\0'; - - ok1(strncmp(buf, input, strlen("This is a test")) == 0); - ok1(streq(buf + strlen("This is a test") + 1, - "Louder, like this: 'THIS IS A TEST'\r\n")); + plan_tests(13); + + pipe(readyfd); + pipe(exitfd); + if (fork() == 0) { + close(exitfd[1]); + close(readyfd[0]); + run_server(readyfd[1], exitfd[0]); + err(1, "Event loop failed"); } + close(exitfd[0]); + close(readyfd[1]); + + sfd1 = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + sfd2 = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + + u.in.sin_family = AF_INET; + u.in.sin_port = htons(OSERVER_PORT); + u.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + + /* Wait until child is ready... */ + if (read(readyfd[0], &c, 1) != 1) + errx(1, "Child problems"); + + /* Go! */ + ok1(connect(sfd1, &u.addr, sizeof(u.in)) == 0); + ok1(connect(sfd2, &u.addr, sizeof(u.in)) == 0); + + ok1(write_sall(sfd1, "question")); + ok1(write_sall(sfd2, "question")); + /* It shouldn't say anything until we've finished! */ + ok1(no_input(sfd1)); + ok1(no_input(sfd2)); + + ok1(write_sall(sfd1, " 1\n")); + ok1(write_sall(sfd2, " 2\n")); + + ok1(input_is(sfd1, "QUESTION 1\n")); + ok1(input_is(sfd2, "QUESTION 2\n")); + + /* Sockets should be dead now. */ + ok1(read(sfd1, &c, 1) == 0); + ok1(read(sfd2, &c, 1) == 0); + + /* Shut down server. */ + write(exitfd[1], "", 1); + + /* This will close once it's shut down, and return. */ + ok1(read(readyfd[0], &c, 1) == 0); /* This exits depending on whether all tests passed */ - failtest_exit(exit_status()); + return exit_status(); }