antithread: correctly list ccan/typesafe_cb as dependency.
[ccan] / ccan / antithread / antithread.c
1 /* Licensed under GPLv3+ - see LICENSE file for details */
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <fcntl.h>
5 #include <stdbool.h>
6 #include <string.h>
7 #include <sys/mman.h>
8 #include <sys/types.h>
9 #include <sys/wait.h>
10 #include <signal.h>
11 #include <errno.h>
12 #include <assert.h>
13 #include "antithread.h"
14 #include <ccan/err/err.h>
15 #include <ccan/noerr/noerr.h>
16 #include <ccan/talloc/talloc.h>
17 #include <ccan/read_write_all/read_write_all.h>
18 #include <ccan/antithread/alloc/alloc.h>
19 #include <ccan/list/list.h>
20
21 /* FIXME: Valgrind support should be possible for some cases.  Tricky
22  * case is where another process allocates for you, but at worst we
23  * could reset what is valid and what isn't on every entry into the
24  * library or something. */
25
26 static LIST_HEAD(pools);
27
28 /* Talloc destroys parents before children (damn Tridge's failing destructors!)
29  * so we need the first child (ie. last-destroyed) to actually clean up. */
30 struct at_pool_contents {
31         struct list_node list;
32         void *pool;
33         unsigned long poolsize;
34         int fd;
35         int parent_rfd, parent_wfd;
36         struct at_pool *atp;
37 };
38
39 struct at_pool {
40         struct at_pool_contents *p;
41         const void *ctx;
42 };
43
44 struct athread {
45         pid_t pid;
46         int rfd, wfd;
47 };
48
49 /* FIXME: Better locking through futexes. */
50 static void lock(int fd, unsigned long off)
51 {
52         struct flock fl;
53
54         fl.l_type = F_WRLCK;
55         fl.l_whence = SEEK_SET;
56         fl.l_start = off;
57         fl.l_len = 1;
58
59         while (fcntl(fd, F_SETLKW, &fl) < 0) {
60                 if (errno != EINTR)
61                         err(1, "Failure locking antithread file");
62         }
63 }
64
65 static void unlock(int fd, unsigned long off)
66 {
67         struct flock fl;
68         int serrno = errno;
69
70         fl.l_type = F_UNLCK;
71         fl.l_whence = SEEK_SET;
72         fl.l_start = off;
73         fl.l_len = 1;
74
75         fcntl(fd, F_SETLK, &fl);
76         errno = serrno;
77 }
78
79 /* This pointer is in a pool.  Find which one. */
80 static struct at_pool_contents *find_pool(const void *ptr)
81 {
82         struct at_pool_contents *p;
83
84         list_for_each(&pools, p, list) {
85                 /* Special case for initial allocation: ptr *is* pool */
86                 if (ptr == p->atp)
87                         return p;
88
89                 if ((char *)ptr >= (char *)p->pool
90                     && (char *)ptr < (char *)p->pool + p->poolsize)
91                         return p;
92         }
93         abort();
94 }
95
96 static int destroy_pool(struct at_pool_contents *p)
97 {
98         list_del(&p->list);
99         munmap(p->pool, p->poolsize);
100         close(p->fd);
101         close(p->parent_rfd);
102         close(p->parent_wfd);
103         return 0;
104 }
105
106 static void *at_realloc(const void *parent, void *ptr, size_t size)
107 {
108         struct at_pool_contents *p = find_pool(parent);
109         /* FIXME: realloc in ccan/alloc? */
110         void *new;
111
112         if (size == 0) {
113                 alloc_free(p->pool, p->poolsize, ptr);
114                 new = NULL;
115         } else if (ptr == NULL) {
116                 /* FIXME: Alignment */
117                 new = alloc_get(p->pool, p->poolsize, size, 16);
118         } else {
119                 if (size <= alloc_size(p->pool, p->poolsize, ptr))
120                         new = ptr;
121                 else {
122                         new = alloc_get(p->pool, p->poolsize, size, 16);
123                         if (new) {
124                                 memcpy(new, ptr,
125                                        alloc_size(p->pool, p->poolsize, ptr));
126                                 alloc_free(p->pool, p->poolsize, ptr);
127                         }
128                 }
129         }
130
131         return new;
132 }
133
134 static struct at_pool_contents *locked;
135 static void talloc_lock(const void *ptr)
136 {
137         struct at_pool_contents *p = find_pool(ptr);
138
139         lock(p->fd, 0);
140         assert(!locked);
141         locked = p;
142 }
143
144 static void talloc_unlock(void)
145 {
146         struct at_pool_contents *p = locked;
147
148         locked = NULL;
149         unlock(p->fd, 0);
150 }
151
152 /* We add 16MB to size.  This compensates for address randomization. */
153 #define PADDING (16 * 1024 * 1024)
154
155 /* Create a new sharable pool. */
156 struct at_pool *at_pool(unsigned long size)
157 {
158         int fd;
159         struct at_pool *atp;
160         struct at_pool_contents *p;
161         FILE *f;
162
163         /* FIXME: How much should we actually add for overhead?. */
164         size += 32 * getpagesize();
165
166         /* Round up to whole pages. */
167         size = (size + getpagesize()-1) & ~(getpagesize()-1);
168
169         f = tmpfile();
170         if (!f)
171                 return NULL;
172
173         fd = dup(fileno(f));
174         fclose_noerr(f);
175
176         if (fd < 0)
177                 return NULL;
178
179         if (ftruncate(fd, size + PADDING) != 0)
180                 goto fail_close;
181
182         atp = talloc(NULL, struct at_pool);
183         if (!atp)
184                 goto fail_close;
185
186         atp->p = p = talloc(NULL, struct at_pool_contents);
187         if (!p)
188                 goto fail_free;
189
190         /* First map gets a nice big area. */
191         p->pool = mmap(NULL, size+PADDING, PROT_READ|PROT_WRITE, MAP_SHARED, fd,
192                        0);
193         if (p->pool == MAP_FAILED)
194                 goto fail_free;
195
196         /* Then we remap into the middle of it. */
197         munmap(p->pool, size+PADDING);
198         p->pool = mmap((char *)p->pool + PADDING/2, size, PROT_READ|PROT_WRITE,
199                        MAP_SHARED, fd, 0);
200         if (p->pool == MAP_FAILED)
201                 goto fail_free;
202
203         p->fd = fd;
204         p->poolsize = size;
205         p->parent_rfd = p->parent_wfd = -1;
206         p->atp = atp;
207         alloc_init(p->pool, p->poolsize);
208         list_add(&pools, &p->list);
209         talloc_set_destructor(p, destroy_pool);
210
211         atp->ctx = talloc_add_external(atp,
212                                        at_realloc, talloc_lock, talloc_unlock);
213         if (!atp->ctx)
214                 goto fail_free;
215         return atp;
216
217 fail_free:
218         talloc_free(atp);
219 fail_close:
220         close_noerr(fd);
221         return NULL;
222 }
223
224 /* Talloc off this to allocate from within the pool. */
225 const void *at_pool_ctx(struct at_pool *atp)
226 {
227         return atp->ctx;
228 }
229
230 static int cant_destroy_self(struct athread *at)
231 {
232         /* Perhaps this means we want to detach, but it doesn't really
233          * make sense. */
234         abort();
235         return 0;
236 }
237
238 static int destroy_at(struct athread *at)
239 {
240         /* If it is already a zombie, this is harmless. */
241         kill(at->pid, SIGTERM);
242
243         close(at->rfd);
244         close(at->wfd);
245
246         /* FIXME: Should we do SIGKILL if process doesn't exit soon? */
247         if (waitpid(at->pid, NULL, 0) != at->pid)
248                 err(1, "Waiting for athread %p (pid %u)", at, at->pid);
249
250         return 0;
251 }
252
253 /* Sets up thread and forks it.  NULL on error. */
254 static struct athread *fork_thread(struct at_pool *atp)
255 {
256         int p2c[2], c2p[2];
257         struct athread *at;
258         struct at_pool_contents *pool = atp->p;
259
260         /* You can't already be a child of this pool. */
261         if (pool->parent_rfd != -1)
262                 errx(1, "Can't create antithread on this pool: we're one");
263
264         /* We don't want this allocated *in* the pool. */
265         at = talloc_steal(atp, talloc(NULL, struct athread));
266
267         if (pipe(p2c) != 0)
268                 goto free;
269
270         if (pipe(c2p) != 0)
271                 goto close_p2c;
272
273         at->pid = fork();
274         if (at->pid == -1)
275                 goto close_c2p;
276
277         if (at->pid == 0) {
278                 /* Child */
279                 close(c2p[0]);
280                 close(p2c[1]);
281                 pool->parent_rfd = p2c[0];
282                 pool->parent_wfd = c2p[1];
283                 talloc_set_destructor(at, cant_destroy_self);
284         } else {
285                 /* Parent */
286                 close(c2p[1]);
287                 close(p2c[0]);
288                 at->rfd = c2p[0];
289                 at->wfd = p2c[1];
290                 talloc_set_destructor(at, destroy_at);
291         }
292
293         return at;
294 close_c2p:
295         close_noerr(c2p[0]);
296         close_noerr(c2p[1]);
297 close_p2c:
298         close_noerr(p2c[0]);
299         close_noerr(p2c[1]);
300 free:
301         talloc_free(at);
302         return NULL;
303 }
304
305 /* Creating an antithread via fork() */
306 struct athread *_at_run(struct at_pool *atp,
307                         void *(*fn)(struct at_pool *, void *),
308                         void *obj)
309 {
310         struct athread *at;
311
312         at = fork_thread(atp);
313         if (!at)
314                 return NULL;
315
316         if (at->pid == 0) {
317                 /* Child */
318                 at_tell_parent(atp, fn(atp, obj));
319                 exit(0);
320         }
321         /* Parent */
322         return at;
323 }
324
325 static unsigned int num_args(char *const argv[])
326 {
327         unsigned int i;
328
329         for (i = 0; argv[i]; i++);
330         return i;
331 }
332
333 /* Fork and execvp, with added arguments for child to grab. */
334 struct athread *at_spawn(struct at_pool *atp, void *arg, char *cmdline[])
335 {
336         struct athread *at;
337         int err;
338
339         at = fork_thread(atp);
340         if (!at)
341                 return NULL;
342
343         if (at->pid == 0) {
344                 /* child */
345                 char *argv[num_args(cmdline) + 2];
346                 argv[0] = cmdline[0];
347                 argv[1] = talloc_asprintf(NULL, "AT:%p/%lu/%i/%i/%i/%p",
348                                           atp->p->pool, atp->p->poolsize,
349                                           atp->p->fd, atp->p->parent_rfd,
350                                           atp->p->parent_wfd, arg);
351                 /* Copy including NULL terminator. */
352                 memcpy(&argv[2], &cmdline[1], num_args(cmdline)*sizeof(char *));
353                 execvp(argv[0], argv);
354
355                 err = errno;
356                 write_all(atp->p->parent_wfd, &err, sizeof(err));
357                 exit(1);
358         }
359
360         /* Child should always write an error code (or 0). */
361         if (read(at->rfd, &err, sizeof(err)) != sizeof(err)) {
362                 errno = ECHILD;
363                 talloc_free(at);
364                 return NULL;
365         }
366         if (err != 0) {
367                 errno = err;
368                 talloc_free(at);
369                 return NULL;
370         }
371         return at;
372 }
373
374 /* The fd to poll on */
375 int at_fd(struct athread *at)
376 {
377         return at->rfd;
378 }
379
380 /* What's the antithread saying?  Blocks if fd not ready. */
381 void *at_read(struct athread *at)
382 {
383         void *ret;
384
385         switch (read(at->rfd, &ret, sizeof(ret))) {
386         case -1:
387                 err(1, "Reading from athread %p (pid %u)", at, at->pid);
388         case 0:
389                 /* Thread died. */
390                 return NULL;
391         case sizeof(ret):
392                 return ret;
393         default:
394                 /* Should never happen. */
395                 err(1, "Short read from athread %p (pid %u)", at, at->pid);
396         }
397 }
398
399 /* Say something to a child. */
400 void at_tell(struct athread *at, const void *status)
401 {
402         if (write(at->wfd, &status, sizeof(status)) != sizeof(status))
403                 err(1, "Failure writing to athread %p (pid %u)", at, at->pid);
404 }
405
406 /* For child to grab arguments from command line (removes them) */
407 struct at_pool *at_get_pool(int *argc, char *argv[], void **arg)
408 {
409         struct at_pool *atp = talloc(NULL, struct at_pool);
410         struct at_pool_contents *p;
411         void *map;
412         int err;
413
414         if (!argv[1]) {
415                 errno = EINVAL;
416                 goto fail;
417         }
418
419         /* If they don't care, use dummy value. */
420         if (arg == NULL)
421                 arg = &map;
422
423         p = atp->p = talloc(atp, struct at_pool_contents);
424
425         if (sscanf(argv[1], "AT:%p/%lu/%i/%i/%i/%p",
426                    &p->pool, &p->poolsize, &p->fd,
427                    &p->parent_rfd, &p->parent_wfd, arg) != 6) {
428                 errno = EINVAL;
429                 goto fail;
430         }
431
432         /* FIXME: To try to adjust for address space randomization, we
433          * could re-exec a few times. */
434         map = mmap(p->pool, p->poolsize, PROT_READ|PROT_WRITE, MAP_SHARED,
435                    p->fd, 0);
436         if (map != p->pool) {
437                 fprintf(stderr, "Mapping %lu bytes @%p gave %p\n",
438                         p->poolsize, p->pool, map);
439                 errno = ENOMEM;
440                 goto fail;
441         }
442
443         list_add(&pools, &p->list);
444         talloc_set_destructor(p, destroy_pool);
445         p->atp = atp;
446
447         atp->ctx = talloc_add_external(atp,
448                                        at_realloc, talloc_lock, talloc_unlock);
449         if (!atp->ctx)
450                 goto fail;
451
452         /* Tell parent we're good. */
453         err = 0;
454         if (write(p->parent_wfd, &err, sizeof(err)) != sizeof(err)) {
455                 errno = EBADF;
456                 goto fail;
457         }
458
459         /* Delete AT arg. */
460         memmove(&argv[1], &argv[2], --(*argc));
461
462         return atp;
463
464 fail:
465         talloc_free(atp);
466         return NULL;
467 }
468
469 /* Say something to our parent (async). */
470 void at_tell_parent(struct at_pool *atp, const void *status)
471 {
472         if (atp->p->parent_wfd == -1)
473                 errx(1, "This process is not an antithread of this pool");
474
475         if (write(atp->p->parent_wfd, &status, sizeof(status))!=sizeof(status))
476                 err(1, "Failure writing to parent");
477 }
478
479 /* What's the parent saying?  Blocks if fd not ready. */
480 void *at_read_parent(struct at_pool *atp)
481 {
482         void *ret;
483
484         if (atp->p->parent_rfd == -1)
485                 errx(1, "This process is not an antithread of this pool");
486
487         switch (read(atp->p->parent_rfd, &ret, sizeof(ret))) {
488         case -1:
489                 err(1, "Reading from parent");
490         case 0:
491                 /* Parent died. */
492                 return NULL;
493         case sizeof(ret):
494                 return ret;
495         default:
496                 /* Should never happen. */
497                 err(1, "Short read from parent");
498         }
499 }
500
501 /* The fd to poll on */
502 int at_parent_fd(struct at_pool *atp)
503 {
504         if (atp->p->parent_rfd == -1)
505                 errx(1, "This process is not an antithread of this pool");
506
507         return atp->p->parent_rfd;
508 }
509
510 /* FIXME: Futexme. */
511 void at_lock(void *obj)
512 {
513         struct at_pool *atp = talloc_find_parent_bytype(obj, struct at_pool);
514 #if 0
515         unsigned int *l;
516
517         /* This isn't required yet, but ensures it's a talloc ptr */
518         l = talloc_lock_ptr(obj);
519 #endif
520
521         lock(atp->p->fd, (char *)obj - (char *)atp->p->pool);
522
523 #if 0
524         if (*l)
525                 errx(1, "Object %p was already locked (something died?)", obj);
526         *l = 1;
527 #endif
528 }
529
530 void at_unlock(void *obj)
531 {
532         struct at_pool *atp = talloc_find_parent_bytype(obj, struct at_pool);
533 #if 0
534         unsigned int *l;
535
536         l = talloc_lock_ptr(obj);
537         if (!*l)
538                 errx(1, "Object %p was already unlocked", obj);
539         *l = 0;
540 #endif
541         unlock(atp->p->fd, (char *)obj - (char *)atp->p->pool);
542 }
543
544 void at_lock_all(struct at_pool *atp)
545 {
546         lock(atp->p->fd, 0);
547 }
548         
549 void at_unlock_all(struct at_pool *atp)
550 {
551         unlock(atp->p->fd, 0);
552 }