io: add io_fd_block() helper.
[ccan] / ccan / io / benchmarks / run-length-prefix.c
1 /* Simulate a server with connections of different speeds.  We count
2  * how many connections complete in 10 seconds. */
3 #include <ccan/io/io.h>
4 #include <ccan/time/time.h>
5 #include <ccan/err/err.h>
6 #include <sys/types.h>
7 #include <sys/socket.h>
8 #include <sys/un.h>
9 #include <stdlib.h>
10 #include <unistd.h>
11 #include <fcntl.h>
12 #include <errno.h>
13 #include <stdio.h>
14 #include <signal.h>
15 #include <assert.h>
16
17 #define REQUEST_MAX 131072
18 #define NUM_CONNS 500 /* per child */
19 #define NUM_CHILDREN 2
20
21 static unsigned int completed;
22
23 struct client {
24         unsigned int len;
25         char *request_buffer;
26 };
27
28 static struct io_plan write_reply(struct io_conn *conn, struct client *client);
29 static struct io_plan read_body(struct io_conn *conn, struct client *client)
30 {
31         assert(client->len <= REQUEST_MAX);
32         return io_read(client->request_buffer, client->len,
33                        write_reply, client);
34 }
35
36 static struct io_plan io_read_header(struct client *client)
37 {
38         return io_read(&client->len, sizeof(client->len), read_body, client);
39 }
40
41 /* once we're done, loop again. */
42 static struct io_plan write_complete(struct io_conn *conn, struct client *client)
43 {
44         completed++;
45         return io_read_header(client);
46 }
47
48 static struct io_plan write_reply(struct io_conn *conn, struct client *client)
49 {
50         return io_write(&client->len, sizeof(client->len),
51                         write_complete, client);
52 }
53
54 /* This runs in the child. */
55 static void create_clients(struct sockaddr_un *addr, int waitfd)
56 {
57         struct client data;
58         int i, sock[NUM_CONNS], len[NUM_CONNS], done[NUM_CONNS],
59                 result[NUM_CONNS], count = 0;
60
61         for (i = 0; i < NUM_CONNS; i++) {
62                 len[i] = (random() % REQUEST_MAX) + 1;
63                 sock[i] = socket(AF_UNIX, SOCK_STREAM, 0);
64                 if (sock[i] < 0)
65                         err(1, "creating socket");
66                 if (connect(sock[i], (void *)addr, sizeof(*addr)) != 0)
67                         err(1, "connecting socket");
68                 /* Make nonblocking. */
69                 io_fd_block(sock[i], false);
70                 done[i] = 0;
71         }
72
73         read(waitfd, &i, 1);
74
75         for (;;) {
76                 for (i = 0; i < NUM_CONNS; i++) {
77                         int ret, totlen = len[i] + sizeof(len[i]);
78                         if (done[i] < sizeof(len[i]) + len[i]) {
79                                 data.len = len[i];
80                                 ret = write(sock[i], (void *)&data + done[i],
81                                             totlen - done[i]);
82                                 if (ret > 0)
83                                         done[i] += ret;
84                                 else if (ret < 0 && errno != EAGAIN)
85                                         goto fail;
86                         } else {
87                                 int off = done[i] - totlen;
88                                 ret = read(sock[i], (void *)&result[i] + off,
89                                            sizeof(result[i]) - off);
90                                 if (ret > 0) {
91                                         done[i] += ret;
92                                         if (done[i] == totlen
93                                             + sizeof(result[i])) {
94                                                 assert(result[i] == len[i]);
95                                                 count++;
96                                                 done[i] = 0;
97                                         }
98                                 } else if (ret < 0 && errno != EAGAIN)
99                                         goto fail;
100                         }
101                 }
102         }
103 fail:
104         printf("Child did %u\n", count);
105         exit(0);
106 }
107
108 static int timeout[2];
109 static void sigalarm(int sig)
110 {
111         write(timeout[1], "1", 1);
112 }
113
114 static struct io_plan do_timeout(struct io_conn *conn, char *buf)
115 {
116         return io_break(buf, io_idle());
117 }
118
119 int main(int argc, char *argv[])
120 {
121         unsigned int i, j;
122         struct sockaddr_un addr;
123         struct timespec start, end;
124         char buffer[REQUEST_MAX];
125         int fd, wake[2];
126         char buf;
127
128         addr.sun_family = AF_UNIX;
129         sprintf(addr.sun_path, "/tmp/run-different-speed.sock.%u", getpid());
130
131         if (pipe(wake) != 0 || pipe(timeout) != 0)
132                 err(1, "Creating pipes");
133
134         fd = socket(AF_UNIX, SOCK_STREAM, 0);
135         if (fd < 0)
136                 err(1, "Creating socket");
137
138         if (bind(fd, (void *)&addr, sizeof(addr)) != 0)
139                 err(1, "Binding to %s", addr.sun_path);
140
141         if (listen(fd, NUM_CONNS) != 0)
142                 err(1, "Listening on %s", addr.sun_path);
143
144         for (i = 0; i < NUM_CHILDREN; i++) {
145                 switch (fork()) {
146                 case -1:
147                         err(1, "forking");
148                 case 0:
149                         close(wake[1]);
150                         create_clients(&addr, wake[0]);
151                         break;
152                 }
153                 for (j = 0; j < NUM_CONNS; j++) {
154                         struct client *client = malloc(sizeof(*client));
155                         int ret = accept(fd, NULL, 0);
156                         if (ret < 0)
157                                 err(1, "Accepting fd");
158                         /* For efficiency, we share buffer */
159                         client->request_buffer = buffer;
160                         io_new_conn(ret, io_read_header(client));
161                 }
162         }
163
164         io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf));
165
166         close(wake[0]);
167         for (i = 0; i < NUM_CHILDREN; i++)
168                 write(wake[1], "1", 1);
169
170         signal(SIGALRM, sigalarm);
171         alarm(10);
172         start = time_now();
173         io_loop();
174         end = time_now();
175         close(fd);
176
177         printf("%u connections complete (%u ns per conn)\n",
178                completed,
179                (int)time_to_nsec(time_divide(time_sub(end, start), completed)));
180         return 0;
181 }