X-Git-Url: https://git.ozlabs.org/?a=blobdiff_plain;f=ccan%2Foserver%2Foserver.c;h=50b49bd5f94746c8259fb3c609c285c5d68684c9;hb=ce689e269acaabd77dcb82166340441a4a00f9a1;hp=5a40ff98cce90c8c96908da83ff497d1bed921d1;hpb=75020f977fecd26610845171aaba4f9accc75c6e;p=ccan-lca-2011.git diff --git a/ccan/oserver/oserver.c b/ccan/oserver/oserver.c index 5a40ff9..50b49bd 100644 --- a/ccan/oserver/oserver.c +++ b/ccan/oserver/oserver.c @@ -1,5 +1,11 @@ #include +#include +#include #include +#include +#include +#include +#include #include #include #include @@ -10,52 +16,408 @@ #include #include #include +#include +#include +#include +#include +#include +#include + +static uint16_t state_flag_map[] = { + [SENDING_GREETING] = TEVENT_FD_WRITE, + [RECEIVING_USER_QUESTION] = TEVENT_FD_READ, + [SENDING_OTHER_QUESTION_PREFIX] = TEVENT_FD_WRITE, + [SENDING_OTHER_QUESTION] = TEVENT_FD_WRITE, + [RECEIVING_OTHER_ANSWER] = TEVENT_FD_READ, + [SENDING_ANSWER_PREFIX] = TEVENT_FD_WRITE, + [SENDING_ANSWER] = TEVENT_FD_WRITE, + [FINISHED] = 0 +}; -void oserver_serve(int fd) +static ssize_t write_string(int fd, const char *str) +{ + return write(fd, str, strlen(str)); +} + +static ssize_t read_string(int fd, char **buf) +{ + ssize_t ret, len, maxlen; + + len = strlen(*buf); + maxlen = talloc_array_length(*buf); + + if (maxlen < len + 100) { + maxlen += 100; + *buf = talloc_realloc(NULL, *buf, char, maxlen); + } + + ret = read(fd, *buf + len, maxlen - len - 1); + if (ret >= 0) + (*buf)[len + ret] = '\0'; + return ret; +} + +static bool input_finished(const char *str) +{ + return strchr(str, '\n'); +} + +/* Update state, and set our READ/WRITE flags appropriately. */ +static void set_state(struct client *c, enum state state) +{ + c->state = state; + tevent_fd_set_flags(c->fde, state_flag_map[state]); +} + +/* Returns false on error, increments state on finishing string. */ +static bool send_string(struct client *c, const char *str) +{ + ssize_t len = write_string(c->fd, str + c->bytes_sent); + if (len < 0) + return false; + c->bytes_sent += len; + if (c->bytes_sent == strlen(str)) { + c->bytes_sent = 0; + set_state(c, c->state+1); + } + return true; +} + +static bool get_subclient(struct client *me) { - char buf[1024]; unsigned int i; - for (i = 0; i < sizeof(buf)-1; i++) { - if (read(fd, &buf[i], 1) != 1) - errx(1, "Client disconnected"); - if (buf[i] == '\n' || buf[i] == '\r') - break; - buf[i] = toupper(buf[i]); + for (i = 0; i < ARRAY_SIZE(me->oserver->clients); i++) { + struct client *c = me->oserver->clients[i]; + if (!c || c == me) + continue; + if (c->oracle == -1 && input_finished(c->question)) { + me->subclient = c->id; + c->oracle = me->id; + return true; + } } - buf[i] = '\0'; + return false; +} - if (!write_all(fd, "Louder, like this: '", - strlen("Louder, like this: '")) - || !write_all(fd, buf, i) - || !write_all(fd, "'\r\n", strlen("'\r\n"))) - err(1, "Write failed"); - exit(0); +static bool get_oracle(struct client *me) +{ + unsigned int i; + + for (i = 0; i < ARRAY_SIZE(me->oserver->clients); i++) { + struct client *c = me->oserver->clients[i]; + if (!c || c == me) + continue; + if (c->subclient == -1 && input_finished(c->question)) { + me->oracle = c->id; + c->subclient = me->id; + return true; + } + } + return false; +} + +static void wakeup(struct client *c) +{ + tevent_fd_set_flags(c->fde, state_flag_map[c->state]); +} + +static void service_client(struct tevent_context *ev, + struct tevent_fd *fde, uint16_t flags, void *_c) +{ + struct client *c = _c; + ssize_t len; + + switch (c->state) { + case SENDING_GREETING: + if (!send_string(c, "Welcome. Please ask your question.\n")) + goto fail; + break; + case RECEIVING_USER_QUESTION: + len = read_string(c->fd, &c->question); + if (len <= 0) + goto fail; + if (input_finished(c->question)) + set_state(c, SENDING_OTHER_QUESTION_PREFIX); + break; + case SENDING_OTHER_QUESTION_PREFIX: + if (c->subclient == -1) + goto need_subclient; + if (!send_string(c, "While the Oracle ponders," + " please answer the following question:\n")) + goto fail; + break; + case SENDING_OTHER_QUESTION: + if (c->subclient == -1) + goto need_subclient; + if (!send_string(c, + c->oserver->clients[c->subclient]->question)) + goto fail; + break; + case RECEIVING_OTHER_ANSWER: + if (c->subclient == -1) + goto need_subclient; + len = read_string(c->fd, + &c->oserver->clients[c->subclient]->answer); + if (len <= 0) + goto fail; + if (input_finished(c->oserver->clients[c->subclient]->answer)) { + set_state(c, SENDING_ANSWER_PREFIX); + wakeup(c->oserver->clients[c->subclient]); + } + break; + case SENDING_ANSWER_PREFIX: + if (!input_finished(c->answer)) + goto need_answer; + if (!send_string(c, "The Oracle spake thus:\n")) + goto fail; + break; + case SENDING_ANSWER: + if (!send_string(c, c->answer)) + goto fail; + break; + default: + goto fail; + } + + if (c->state != FINISHED) + return; +fail: + talloc_free(c); + return; + +need_subclient: + if (!get_subclient(c)) { + /* We can't get one: go to sleep until someone find_oracle() */ + tevent_fd_set_flags(c->fde, 0); + } else + /* In case they are waiting... */ + wakeup(c->oserver->clients[c->subclient]); + return; + +need_answer: + /* If we don't have an oracle and find one, that's OK. */ + if (c->oracle == -1 && get_oracle(c)) { + /* In case they are waiting... */ + wakeup(c->oserver->clients[c->oracle]); + return; + } + + /* Either our oracle is not finished, or we don't have one: sleep. */ + tevent_fd_set_flags(c->fde, 0); +} + +static int cleanup_client(struct client *client) +{ + + /* We were an oracle? */ + if (client->subclient >= 0) + client->oserver->clients[client->subclient]->oracle = -1; + + /* We had an oracle? */ + if (client->oracle >= 0) + client->oserver->clients[client->oracle]->subclient = -1; + + assert(client->oserver->clients[client->id] == client); + client->oserver->clients[client->id] = NULL; + return 0; } -int oserver_setup(void) +static void add_client(struct tevent_context *ev, + struct tevent_fd *fde, uint16_t flags, void *_oserver) { - int sockfd; + struct oserver *oserver = _oserver; + struct client *client; + + client = talloc(oserver, struct client); + client->fd = accept(oserver->fd, NULL, 0); + if (client->fd < 0) + err(1, "Accepting client connection"); + + client->state = SENDING_GREETING; + client->bytes_sent = 0; + client->question = talloc_strdup(client, ""); + client->oserver = oserver; + client->oracle = -1; + client->subclient = -1; + client->answer = talloc_strdup(client, ""); + client->fde = tevent_add_fd(ev, client, client->fd, + state_flag_map[client->state], + service_client, client); + tevent_fd_set_auto_close(client->fde); + + /* Find empty slot in array for this client. */ + for (client->id = 0; oserver->clients[client->id]; client->id++); + oserver->clients[client->id] = client; + talloc_set_destructor(client, cleanup_client); + + /* Full? Stop listening... */ + if (client->id == ARRAY_SIZE(oserver->clients)-1) + tevent_fd_set_flags(oserver->fde, 0); +} + +static void clear_clients(struct oserver *oserver) +{ + memset(oserver->clients, 0, + ARRAY_SIZE(oserver->clients) * sizeof(oserver->clients[0])); +} + +static int destroy_oserver(struct oserver *oserver) +{ + close(oserver->fd); + return 0; +} + +static void talloc_dump(struct tevent_context *ev, + struct tevent_signal *se, + int signum, + int count, + void *siginfo, + void *_oserver) +{ + struct oserver *oserver = _oserver; + FILE *f; + + /* Fork off a child for the report, so we aren't stopped. */ + if (fork() == 0) { + f = fopen("/var/run/oserver/talloc.dump", "w"); + if (f) { + talloc_report_full(oserver, f); + fclose(f); + } + _exit(0); + } +} + +static void dump(struct tevent_context *ev, + struct tevent_signal *se, + int signum, + int count, + void *siginfo, + void *_oserver) +{ + struct oserver *oserver = _oserver; + char *str; + int fd; + + str = cdump_bundle(ev, cdump_struct_oserver, oserver); + fd = open(oserver->dumpfile, O_CREAT|O_TRUNC|O_WRONLY, 0600); + write(fd, str, strlen(str)); + close(fd); + talloc_free(str); +} + +static bool load_file(struct oserver *oserver, const char *file) +{ + char *str; + + if (!file) + return false; + + str = grab_file(oserver, file, NULL); + if (!str) + return false; + + if (!cdump_unbundle(oserver, cdump_struct_oserver, oserver, str)) { + talloc_free(str); + return false; + } + talloc_free(str); + return true; +} + +static bool complete_server(struct tevent_context *ev, + struct oserver *oserver, const char *dumpfile) +{ + /* Re-set this even if restored from file, in case it changed. */ + oserver->dumpfile = dumpfile; + if (oserver->dumpfile) + tevent_add_signal(ev, oserver, SIGHUP, SA_RESTART, + dump, oserver); + + /* Don't kill us if client dies. */ + signal(SIGPIPE, SIG_IGN); + + /* Show talloc tree on SIGUSR1. */ + tevent_add_signal(ev, oserver, SIGUSR1, SA_RESTART, + talloc_dump, oserver); + + oserver->fde = tevent_add_fd(ev, oserver, oserver->fd, + TEVENT_FD_READ, add_client, oserver); + if (!oserver->fde) + return false; + return true; +} + +struct oserver *oserver_restore(struct tevent_context *ev, const char *dumpfile) +{ + unsigned int i; + struct oserver *oserver = talloc(ev, struct oserver); + if (!load_file(oserver, dumpfile)) { + talloc_free(oserver); + return NULL; + } + + /* Restore ignored fields in clients (fde and oserver). */ + for (i = 0; i < ARRAY_SIZE(oserver->clients); i++) { + struct client *client = oserver->clients[i]; + if (!client) + continue; + client->oserver = oserver; + client->fde = tevent_add_fd(ev, client, client->fd, + state_flag_map[client->state], + service_client, client); + tevent_fd_set_auto_close(client->fde); + } + + if (!complete_server(ev, oserver, dumpfile)) { + talloc_free(oserver); + return NULL; + } + return oserver; +} + +struct oserver *oserver_setup(struct tevent_context *ev, unsigned short port, + const char *dumpfile) +{ + struct oserver *oserver; int one = 1; union { struct sockaddr addr; struct sockaddr_in in; } u; - sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (sockfd < 0) - err(1, "Creating TCP socket"); + oserver = talloc(ev, struct oserver); + clear_clients(oserver); + oserver->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (oserver->fd < 0) { + talloc_free(oserver); + return NULL; + } + + talloc_set_destructor(oserver, destroy_oserver); - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) + if (setsockopt(oserver->fd, SOL_SOCKET, SO_REUSEADDR, + &one, sizeof(one))) warn("Setting socket reuse"); u.in.sin_family = AF_INET; - u.in.sin_port = htons(OSERVER_PORT); + u.in.sin_port = htons(port); u.in.sin_addr.s_addr = INADDR_ANY; - if (bind(sockfd, &u.addr, sizeof(u.in)) == -1) - err(1, "Binding TCP socket"); + if (bind(oserver->fd, &u.addr, sizeof(u.in)) == -1) { + talloc_free(oserver); + return NULL; + } + + if (listen(oserver->fd, 0) != 0) { + talloc_free(oserver); + return NULL; + } + + if (!complete_server(ev, oserver, dumpfile)) { + talloc_free(oserver); + return NULL; + } - if (listen(sockfd, 0) != 0) - err(1, "Listening on TCP socket"); - return sockfd; + return oserver; }