]> git.ozlabs.org Git - ccan-lca-2011.git/commitdiff
lca2011: rewrite to a serious server
authorRusty Russell <rusty@rustcorp.com.au>
Sat, 22 Jan 2011 03:40:51 +0000 (14:10 +1030)
committerRusty Russell <rusty@rustcorp.com.au>
Sat, 22 Jan 2011 03:40:51 +0000 (14:10 +1030)
This uses tevent and callback, and a simple state machine.

ccan/oserver/_info
ccan/oserver/oserver.c
ccan/oserver/oserver.h
ccan/oserver/test/run.c

index 6879f42903aa2bcab42523c330ccb3a1ae9bae03..c5f5b4be48225f8482ec325bc7fc16032a3cf432 100644 (file)
@@ -18,7 +18,7 @@
  *
  *     int main(int argc, char *argv[])
  *     {
- *             int fd, sockfd;
+ *             struct tevent_context *ev = tevent_context_init(NULL);
  *             unsigned int port = OSERVER_PORT;
  *
  *             opt_register_noarg("--help|--usage|-h", opt_usage_and_exit,
  *             if (argc != 1)
  *                     opt_log_stderr_exit("Unknown extra arguments");
  *
- *             sockfd = oserver_setup(port);
- *             if (sockfd < 0)
- *                     err(1, "Failed to set up server socket");
+ *             if (!oserver_setup(ev, port))
+ *                     err(1, "Failed to set up server");
  *
- *             fd = accept(sockfd, NULL, NULL);
- *             if (fd < 0)
- *                     err(1, "Accepting connection on TCP socket");
- *
- *             if (!oserver_serve(fd))
- *                     err(1, "Serving client");
- *             exit(0);
+ *             while (tevent_loop_wait(ev) == 0);
+ *             err(1, "Serving client");
  *     }
  */
 int main(int argc, char *argv[])
@@ -56,6 +50,8 @@ int main(int argc, char *argv[])
                printf("ccan/noerr\n");
                printf("ccan/failtest\n");
                printf("ccan/opt\n");
+               printf("ccan/array_size\n");
+               printf("ccan/tevent\n");
                return 0;
        }
 
index 20edfcbc19a79c846d3e2f31d834eebccd64fced..4503aa49610f1a13d55649d37dbfa89c0d9f24ac 100644 (file)
@@ -1,6 +1,8 @@
 #include <ccan/oserver/oserver.h>
 #include <ccan/read_write_all/read_write_all.h>
 #include <ccan/opt/opt.h>
+#include <ccan/tevent/tevent.h>
+#include <ccan/array_size/array_size.h>
 #include <ccan/noerr/noerr.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <stdlib.h>
 #include <string.h>
 #include <errno.h>
+#include <signal.h>
 
-bool oserver_serve(int fd)
+enum state {
+       RECEIVING_USER_QUESTION,
+       SENDING_ANSWER,
+       FINISHED
+};
+
+static uint16_t state_flag_map[] = {
+       [RECEIVING_USER_QUESTION]       = TEVENT_FD_READ,
+       [SENDING_ANSWER]                = TEVENT_FD_WRITE,
+       [FINISHED]                      = 0
+};
+
+struct client {
+       /* What are we doing today, brain? */
+       enum state state;
+       /* Our event info, and the file descriptor. */
+       struct tevent_fd *fde;
+       int fd;
+       /* The question we read from client. */
+       char *question;
+       /* How many bytes of the reply we sent so far. */
+       size_t bytes_sent;
+};
+
+/* 5 clients should be enough for anybody! */
+static struct client *clients[5];
+static int sfd;
+static struct tevent_fd *sfde;
+
+static ssize_t write_string(int fd, const char *str)
 {
-       char buf[1024];
-       unsigned int i;
+       return write(fd, str, strlen(str));
+}
 
-       for (i = 0; i < sizeof(buf)-1; i++) {
-               if (read(fd, &buf[i], 1) != 1) {
-                       errno = EIO;
-                       return false;
-               }
-               if (buf[i] == '\n' || buf[i] == '\r')
-                       break;
-               buf[i] = toupper(buf[i]);
+static ssize_t read_string(int fd, char **buf)
+{
+       ssize_t ret, len, maxlen;
+
+       len = strlen(*buf);
+       maxlen = talloc_array_length(*buf);
+
+       if (maxlen < len + 100) {
+               maxlen += 100;
+               *buf = talloc_realloc(NULL, *buf, char, maxlen);
        }
-       buf[i] = '\0';
 
-       if (!write_all(fd, "Louder, like this: '",
-                      strlen("Louder, like this: '"))
-           || !write_all(fd, buf, i)
-           || !write_all(fd, "'\r\n", strlen("'\r\n")))
+       ret = read(fd, *buf + len, maxlen - len - 1);
+       if (ret >= 0)
+               (*buf)[len + ret] = '\0';
+       return ret;
+}
+
+static bool input_finished(const char *str)
+{
+       return strchr(str, '\n');
+}
+
+/* Update state, and set our READ/WRITE flags appropriately. */
+static void set_state(struct client *c, enum state state)
+{
+       c->state = state;
+       tevent_fd_set_flags(c->fde, state_flag_map[state]);
+}
+
+/* Returns false on error, increments state on finishing string. */
+static bool send_string(struct client *c, const char *str)
+{
+       ssize_t len = write_string(c->fd, str + c->bytes_sent);
+       if (len < 0)
                return false;
+       c->bytes_sent += len;
+       if (c->bytes_sent == strlen(str)) {
+               c->bytes_sent = 0;
+               set_state(c, c->state+1);
+       }
        return true;
 }
 
-int oserver_setup(unsigned short port)
+static void service_client(struct tevent_context *ev,
+                          struct tevent_fd *fde, uint16_t flags, void *_c)
+{
+       struct client *c = _c;
+       ssize_t len;
+
+       switch (c->state) {
+       case RECEIVING_USER_QUESTION:
+               len = read_string(c->fd, &c->question);
+               if (len <= 0)
+                       goto fail;
+               if (input_finished(c->question)) {
+                       unsigned int i;
+
+                       for (i = 0; c->question[i]; i++)
+                               c->question[i] = toupper(c->question[i]);
+                       set_state(c, SENDING_ANSWER);
+               }
+               break;
+       case SENDING_ANSWER:
+               if (!send_string(c, c->question))
+                       goto fail;
+               break;
+       default:
+               goto fail;
+       }
+
+       if (c->state != FINISHED)
+               return;
+
+fail:
+       talloc_free(c);
+}
+
+static int cleanup_client(struct client *client)
+{
+       unsigned int i;
+
+       for (i = 0; i < ARRAY_SIZE(clients); i++) {
+               if (clients[i] == client) {
+                       clients[i] = NULL;
+                       tevent_fd_set_flags(sfde, TEVENT_FD_READ);
+                       return 0;
+               }
+       }
+       abort();
+}
+
+static void add_client(struct tevent_context *ev,
+                      struct tevent_fd *fde, uint16_t flags, void *unused)
+{
+       struct client *client;
+       unsigned int i;
+
+       client = talloc(sfde, struct client);
+       client->fd = accept(sfd, NULL, 0);
+       if (client->fd < 0)
+               err(1, "Accepting client connection");
+
+       client->state = RECEIVING_USER_QUESTION;
+       client->bytes_sent = 0;
+       client->question = talloc_strdup(client, "");
+       client->fde = tevent_add_fd(ev, client, client->fd,
+                                   state_flag_map[client->state],
+                                   service_client, client);
+       tevent_fd_set_auto_close(client->fde);
+
+       /* Find empty slot in array for this client. */
+       for (i = 0; clients[i]; i++);
+       clients[i] = client;
+       talloc_set_destructor(client, cleanup_client);
+
+       /* Full?  Stop listening... */
+       if (i == ARRAY_SIZE(clients)-1)
+               tevent_fd_set_flags(sfde, 0);
+}
+
+void *oserver_setup(struct tevent_context *ev, unsigned short port)
 {
-       int sockfd;
        int one = 1;
        union {
                struct sockaddr addr;
                struct sockaddr_in in;
        } u;
 
-       sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
-       if (sockfd < 0)
-               return -1;
+       sfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+       if (sfd < 0)
+               return false;
 
-       if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)))
+       if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)))
                warn("Setting socket reuse");
 
        u.in.sin_family = AF_INET;
        u.in.sin_port = htons(port);
        u.in.sin_addr.s_addr = INADDR_ANY;
-       if (bind(sockfd, &u.addr, sizeof(u.in)) == -1) {
-               close_noerr(sockfd);
-               return -1;
+       if (bind(sfd, &u.addr, sizeof(u.in)) == -1) {
+               close_noerr(sfd);
+               return NULL;
        }
 
-       if (listen(sockfd, 0) != 0) {
-               close_noerr(sockfd);
-               return -1;
+       if (listen(sfd, 0) != 0) {
+               close_noerr(sfd);
+               return NULL;
        }
-       return sockfd;
+
+       sfde = tevent_add_fd(ev, ev, sfd, TEVENT_FD_READ, add_client, NULL);
+       tevent_fd_set_auto_close(sfde);
+
+       /* Don't kill us if client dies. */
+       signal(SIGPIPE, SIG_IGN);
+
+       return sfde;
 }
index f71f3e2881182b7d4d9848cc8ca2f0d7a05dbd19..e1f3cc70869ae89df3f2dfd4689dd9e3840ad80c 100644 (file)
@@ -1,43 +1,29 @@
 #ifndef CCAN_OSERVER_H
 #define CCAN_OSERVER_H
 #include <stdbool.h>
+#include <ccan/tevent/tevent.h>
 
 /**
- * oserver_setup - get a listening filedescriptor for an oserver
+ * oserver_setup - set up an oserver
+ * @ev: tevent context to use.
  * @port: port to use (usually OSERVER_PORT)
+ *
  * Opens a socket and binds it to @port, then sets it up to listen
- * for connections and returns it.
+ * for connections.  talloc_free() the pointer returned to shut it down.
  *
  * Example:
- *     int serverfd;
+ *     void *oserver;
+ *     struct tevent_context *ev;
  *
- *     serverfd = oserver_setup(OSERVER_PORT);
- *     if (serverfd < 0)
+ *     ev = tevent_context_init(NULL);
+ *     oserver = oserver_setup(ev, OSERVER_PORT);
+ *     if (!oserver)
  *             err(1, "Failed to set up server");
- */
-int oserver_setup(unsigned short port);
-
-/**
- * oserver_serve - serve an oserver client via a file descriptor
- * @fd: the file descriptor (usually a connected socket)
- *
- * This returns false (with errno set) on failure.
  *
- * Example:
- *      #include <sys/types.h>
- *      #include <sys/socket.h>
- *     ...
- *             int clientfd;
- *     ...
- *             clientfd = accept(serverfd, NULL, NULL);
- *             if (clientfd < 0)
- *                     err(1, "Accepting connection from client");
- *             if (fork() == 0)
- *                     exit(oserver_serve(clientfd) ? 0 : 1);
- *             else
- *                     close(clientfd);
+ *     while (tevent_loop_wait(ev) == 0);
+ *     err(1, "Event loop failed");
  */
-bool oserver_serve(int fd);
+void *oserver_setup(struct tevent_context *ev, unsigned short port);
 
 #define OSERVER_PORT 2727
 #endif /* CCAN_OSERVER_H */
index b8e9dc3042c67312aade84acc9a4a95abaabf1a3..f99e40bde103e57d511fd4830d916b24115e548a 100644 (file)
-#include <ccan/failtest/failtest_override.h>
 #include <ccan/oserver/oserver.c>
 #include <ccan/oserver/oserver.h>
 #include <ccan/str/str.h>
-#include <ccan/foreach/foreach.h>
 #include <ccan/tap/tap.h>
 #include <sys/types.h>
+#include <sys/select.h>
 #include <sys/stat.h>
-#include <ccan/failtest/failtest.h>
 #include <fcntl.h>
 #include <string.h>
-#include <ccan/failtest/failtest_undo.h>
 
-static void exit_test(void)
+static void exit_quietly(struct tevent_context *ev,
+                        struct tevent_fd *fde, uint16_t flags, void *unused)
 {
-       failtest_exit(exit_status());
+       talloc_free(ev);
+       exit(0);
+}
+
+static void run_server(int readyfd, int exitfd)
+{
+       struct tevent_context *ev = tevent_context_init(NULL);
+
+       if (oserver_setup(ev, OSERVER_PORT) == NULL)
+               exit(1);
+
+       /* Tell parent we are ready to go. */
+       write(readyfd, "", 1);
+
+       tevent_add_fd(ev, ev, exitfd, TEVENT_FD_READ, exit_quietly, NULL);
+       while (tevent_loop_wait(ev) == 0);
+}      
+
+static bool write_sall(int fd, const char *str)
+{
+       while (str[0]) {
+               ssize_t len = write(fd, str, strlen(str));
+               if (len < 0)
+                       return false;
+               str += len;
+       }
+       return true;
+}
+
+static bool input_is(int fd, const char *str)
+{
+       while (str[0]) {
+               char buffer[1000];
+               ssize_t len = read(fd, buffer, strlen(str));
+               if (len < 0)
+                       return false;
+               if (strncmp(str, buffer, len) != 0)
+                       return false;
+               str += len;
+       }
+       return true;
+}
+
+static bool no_input(int fd)
+{
+       fd_set set;
+       struct timeval t = { 0, 0 };
+
+       FD_ZERO(&set);
+       FD_SET(fd, &set);
+       return (select(fd+1, &set, NULL, NULL, &t) == 0);
 }
 
 int main(int argc, char *argv[])
 {
-       int fd;
-       char buf[200];
-       const char *input;
+       int readyfd[2], exitfd[2];
+       union {
+               struct sockaddr addr;
+               struct sockaddr_in in;
+       } u;
+       int sfd1, sfd2;
+       char c;
 
        /* This is how many tests you plan to run */
-       plan_tests(3 * 6);
-       failtest_init(argc, argv);
-       tap_fail_callback = exit_test;
-
-       foreach_ptr(input,
-                   "This is a test\n",
-                   "This is a test\r",
-                   "This is a test\r\n",
-                   "This is a test\nWith extra",
-                   "This is a test\rWith extra",
-                   "This is a test\r\nWith extra") {
-               fd = open("run-fd", O_RDWR|O_CREAT|O_TRUNC, 0600);
-
-               write(fd, input, strlen(input));
-               lseek(fd, 0, SEEK_SET);
-
-               ok1(oserver_serve(fd));
-
-               lseek(fd, 0, SEEK_SET);
-               buf[read(fd, buf, sizeof(buf)-1)] = '\0';
-
-               ok1(strncmp(buf, input, strlen("This is a test")) == 0);
-               ok1(streq(buf + strlen("This is a test") + 1,
-                         "Louder, like this: 'THIS IS A TEST'\r\n"));
+       plan_tests(13);
+
+       pipe(readyfd);
+       pipe(exitfd);
+       if (fork() == 0) {
+               close(exitfd[1]);
+               close(readyfd[0]);
+               run_server(readyfd[1], exitfd[0]);
+               err(1, "Event loop failed");
        }
+       close(exitfd[0]);
+       close(readyfd[1]);
+
+       sfd1 = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+       sfd2 = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+
+       u.in.sin_family = AF_INET;
+       u.in.sin_port = htons(OSERVER_PORT);
+       u.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+
+       /* Wait until child is ready... */
+       if (read(readyfd[0], &c, 1) != 1)
+               errx(1, "Child problems");
+
+       /* Go! */
+       ok1(connect(sfd1, &u.addr, sizeof(u.in)) == 0);
+       ok1(connect(sfd2, &u.addr, sizeof(u.in)) == 0);
+
+       ok1(write_sall(sfd1, "question"));
+       ok1(write_sall(sfd2, "question"));
+       /* It shouldn't say anything until we've finished! */
+       ok1(no_input(sfd1));
+       ok1(no_input(sfd2));
+
+       ok1(write_sall(sfd1, " 1\n"));
+       ok1(write_sall(sfd2, " 2\n"));
+
+       ok1(input_is(sfd1, "QUESTION 1\n"));
+       ok1(input_is(sfd2, "QUESTION 2\n"));
+
+       /* Sockets should be dead now. */
+       ok1(read(sfd1, &c, 1) == 0);
+       ok1(read(sfd2, &c, 1) == 0);
+
+       /* Shut down server. */
+       write(exitfd[1], "", 1);
+
+       /* This will close once it's shut down, and return. */
+       ok1(read(readyfd[0], &c, 1) == 0);
 
        /* This exits depending on whether all tests passed */
-       failtest_exit(exit_status());
+       return exit_status();
 }