]> git.ozlabs.org Git - ccan/blob - ccan/io/io.c
ccan/io: go linear for debugging.
[ccan] / ccan / io / io.c
1 /* Licensed under LGPLv2.1+ - see LICENSE file for details */
2 #include "io.h"
3 #include "backend.h"
4 #include <sys/types.h>
5 #include <sys/socket.h>
6 #include <netdb.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <assert.h>
11 #include <poll.h>
12
13 void *io_loop_return;
14
15 #ifdef DEBUG
16 bool io_plan_for_other;
17 struct io_conn *current;
18 bool (*io_debug)(struct io_conn *conn);
19 bool io_debug_wakeup;
20
21 static void debug_io_plan(struct io_plan *plan)
22 {
23         if (io_plan_for_other) {
24                 io_plan_for_other = false;
25                 return;
26         }
27
28         if (!io_debug || !current)
29                 return;
30
31         if (!io_debug(current) && !io_debug_wakeup)
32                 return;
33
34         io_debug_wakeup = false;
35         current->plan = *plan;
36         backend_plan_changed(current);
37
38         /* Call back into the loop immediately. */
39         io_loop_return = io_loop();
40 }
41
42 static void debug_io_wake(struct io_conn *conn)
43 {
44         /* We want linear if we wake a debugged connection, too. */
45         if (io_debug && io_debug(conn))
46                 io_debug_wakeup = true;
47 }
48 #else
49 static void debug_io_plan(struct io_plan *plan)
50 {
51 }
52 static void debug_io_wake(struct io_conn *conn)
53 {
54 }
55 #endif
56
57 struct io_listener *io_new_listener_(int fd,
58                                      void (*init)(int fd, void *arg),
59                                      void *arg)
60 {
61         struct io_listener *l = malloc(sizeof(*l));
62
63         if (!l)
64                 return NULL;
65
66         l->fd.listener = true;
67         l->fd.fd = fd;
68         l->init = init;
69         l->arg = arg;
70         if (!add_listener(l)) {
71                 free(l);
72                 return NULL;
73         }
74         return l;
75 }
76
77 void io_close_listener(struct io_listener *l)
78 {
79         close(l->fd.fd);
80         del_listener(l);
81         free(l);
82 }
83
84 struct io_conn *io_new_conn_(int fd,
85                              struct io_plan plan,
86                              void (*finish)(struct io_conn *, void *),
87                              void *arg)
88 {
89         struct io_conn *conn = malloc(sizeof(*conn));
90
91         if (!conn)
92                 return NULL;
93
94         conn->fd.listener = false;
95         conn->fd.fd = fd;
96         conn->plan = plan;
97         conn->finish = finish;
98         conn->finish_arg = arg;
99         conn->duplex = NULL;
100         conn->timeout = NULL;
101         if (!add_conn(conn)) {
102                 free(conn);
103                 return NULL;
104         }
105         return conn;
106 }
107
108 struct io_conn *io_duplex_(struct io_conn *old,
109                            struct io_plan plan,
110                            void (*finish)(struct io_conn *, void *),
111                            void *arg)
112 {
113         struct io_conn *conn;
114
115         assert(!old->duplex);
116
117         conn = malloc(sizeof(*conn));
118         if (!conn)
119                 return NULL;
120
121         conn->fd.listener = false;
122         conn->fd.fd = old->fd.fd;
123         conn->plan = plan;
124         conn->duplex = old;
125         conn->finish = finish;
126         conn->finish_arg = arg;
127         conn->timeout = NULL;
128         if (!add_duplex(conn)) {
129                 free(conn);
130                 return NULL;
131         }
132         old->duplex = conn;
133         return conn;
134 }
135
136 bool io_timeout_(struct io_conn *conn, struct timespec ts,
137                  struct io_plan (*cb)(struct io_conn *, void *), void *arg)
138 {
139         assert(cb);
140
141         if (!conn->timeout) {
142                 conn->timeout = malloc(sizeof(*conn->timeout));
143                 if (!conn->timeout)
144                         return false;
145         } else
146                 assert(!timeout_active(conn));
147
148         conn->timeout->next = cb;
149         conn->timeout->next_arg = arg;
150         backend_add_timeout(conn, ts);
151         return true;
152 }
153
154 /* Returns true if we're finished. */
155 static bool do_write(int fd, struct io_plan *plan)
156 {
157         ssize_t ret = write(fd, plan->u.write.buf, plan->u.write.len);
158         if (ret < 0) {
159                 /* Override next function to close us. */
160                 plan->next = io_close;
161                 return true;
162         }
163
164         plan->u.write.buf += ret;
165         plan->u.write.len -= ret;
166         return (plan->u.write.len == 0);
167 }
168
169 /* Queue some data to be written. */
170 struct io_plan io_write_(const void *data, size_t len,
171                          struct io_plan (*cb)(struct io_conn *, void *),
172                          void *arg)
173 {
174         struct io_plan plan;
175
176         assert(cb);
177         plan.u.write.buf = data;
178         plan.u.write.len = len;
179         plan.io = do_write;
180         plan.next = cb;
181         plan.next_arg = arg;
182         plan.pollflag = POLLOUT;
183
184         debug_io_plan(&plan);
185         return plan;
186 }
187
188 static bool do_read(int fd, struct io_plan *plan)
189 {
190         ssize_t ret = read(fd, plan->u.read.buf, plan->u.read.len);
191         if (ret <= 0) {
192                 /* Override next function to close us. */
193                 plan->next = io_close;
194                 return true;
195         }
196
197         plan->u.read.buf += ret;
198         plan->u.read.len -= ret;
199         return (plan->u.read.len == 0);
200 }
201
202 /* Queue a request to read into a buffer. */
203 struct io_plan io_read_(void *data, size_t len,
204                         struct io_plan (*cb)(struct io_conn *, void *),
205                         void *arg)
206 {
207         struct io_plan plan;
208
209         assert(cb);
210         plan.u.read.buf = data;
211         plan.u.read.len = len;
212         plan.io = do_read;
213         plan.next = cb;
214         plan.next_arg = arg;
215         plan.pollflag = POLLIN;
216
217         debug_io_plan(&plan);
218         return plan;
219 }
220
221 static bool do_read_partial(int fd, struct io_plan *plan)
222 {
223         ssize_t ret = read(fd, plan->u.readpart.buf, *plan->u.readpart.lenp);
224         if (ret <= 0) {
225                 /* Override next function to close us. */
226                 plan->next = io_close;
227                 return true;
228         }
229
230         *plan->u.readpart.lenp = ret;
231         return true;
232 }
233
234 /* Queue a partial request to read into a buffer. */
235 struct io_plan io_read_partial_(void *data, size_t *len,
236                                 struct io_plan (*cb)(struct io_conn *, void *),
237                                 void *arg)
238 {
239         struct io_plan plan;
240
241         assert(cb);
242         plan.u.readpart.buf = data;
243         plan.u.readpart.lenp = len;
244         plan.io = do_read_partial;
245         plan.next = cb;
246         plan.next_arg = arg;
247         plan.pollflag = POLLIN;
248
249         debug_io_plan(&plan);
250         return plan;
251 }
252
253 static bool do_write_partial(int fd, struct io_plan *plan)
254 {
255         ssize_t ret = write(fd, plan->u.writepart.buf, *plan->u.writepart.lenp);
256         if (ret < 0) {
257                 /* Override next function to close us. */
258                 plan->next = io_close;
259                 return true;
260         }
261
262         *plan->u.writepart.lenp = ret;
263         return true;
264 }
265
266 /* Queue a partial write request. */
267 struct io_plan io_write_partial_(const void *data, size_t *len,
268                                  struct io_plan (*cb)(struct io_conn*, void *),
269                                  void *arg)
270 {
271         struct io_plan plan;
272
273         assert(cb);
274         plan.u.writepart.buf = data;
275         plan.u.writepart.lenp = len;
276         plan.io = do_write_partial;
277         plan.next = cb;
278         plan.next_arg = arg;
279         plan.pollflag = POLLOUT;
280
281         debug_io_plan(&plan);
282         return plan;
283 }
284
285 struct io_plan io_idle(void)
286 {
287         struct io_plan plan;
288
289         plan.pollflag = 0;
290         plan.io = NULL;
291         /* Never called (overridded by io_wake), but NULL means closing */
292         plan.next = io_close;
293
294         debug_io_plan(&plan);
295         return plan;
296 }
297
298 void io_wake_(struct io_conn *conn, struct io_plan plan)
299
300 {
301         /* It might be closing, but we haven't called its finish() yet. */
302         if (!conn->plan.next)
303                 return;
304         /* It was idle, right? */
305         assert(!conn->plan.io);
306         conn->plan = plan;
307         backend_plan_changed(conn);
308
309         debug_io_wake(conn);
310 }
311
312 void io_ready(struct io_conn *conn)
313 {
314         if (conn->plan.io(conn->fd.fd, &conn->plan)) {
315                 set_current(conn);
316                 if (timeout_active(conn))
317                         backend_del_timeout(conn);
318                 conn->plan = conn->plan.next(conn, conn->plan.next_arg);
319                 backend_plan_changed(conn);
320                 set_current(NULL);
321         }
322 }
323
324 /* Useful next functions. */
325 /* Close the connection, we're done. */
326 struct io_plan io_close(struct io_conn *conn, void *arg)
327 {
328         struct io_plan plan;
329
330         plan.pollflag = 0;
331         /* This means we're closing. */
332         plan.next = NULL;
333
334         debug_io_plan(&plan);
335         return plan;
336 }
337
338 /* Exit the loop, returning this (non-NULL) arg. */
339 struct io_plan io_break_(void *ret, struct io_plan plan)
340 {
341         assert(ret);
342         io_loop_return = ret;
343
344         return plan;
345 }