]> git.ozlabs.org Git - ccan/blob - ccan/antithread/antithread.c
To compile ccan under DragonFly BSD some additional includes are required.
[ccan] / ccan / antithread / antithread.c
1 #include <stdlib.h>
2 #include <unistd.h>
3 #include <fcntl.h>
4 #include <stdbool.h>
5 #include <string.h>
6 #include <sys/mman.h>
7 #include <sys/types.h>
8 #include <sys/wait.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <err.h>
13 #include "antithread.h"
14 #include <ccan/noerr/noerr.h>
15 #include <ccan/talloc/talloc.h>
16 #include <ccan/read_write_all/read_write_all.h>
17 #include <ccan/alloc/alloc.h>
18 #include <ccan/list/list.h>
19
20 /* FIXME: Valgrind support should be possible for some cases.  Tricky
21  * case is where another process allocates for you, but at worst we
22  * could reset what is valid and what isn't on every entry into the
23  * library or something. */
24
25 static LIST_HEAD(pools);
26
27 /* Talloc destroys parents before children (damn Tridge's failing destructors!)
28  * so we need the first child (ie. last-destroyed) to actually clean up. */
29 struct at_pool_contents {
30         struct list_node list;
31         void *pool;
32         unsigned long poolsize;
33         int fd;
34         int parent_rfd, parent_wfd;
35         struct at_pool *atp;
36 };
37
38 struct at_pool {
39         struct at_pool_contents *p;
40         const void *ctx;
41 };
42
43 struct athread {
44         pid_t pid;
45         int rfd, wfd;
46 };
47
48 /* FIXME: Better locking through futexes. */
49 static void lock(int fd, unsigned long off)
50 {
51         struct flock fl;
52
53         fl.l_type = F_WRLCK;
54         fl.l_whence = SEEK_SET;
55         fl.l_start = off;
56         fl.l_len = 1;
57
58         while (fcntl(fd, F_SETLKW, &fl) < 0) {
59                 if (errno != EINTR)
60                         err(1, "Failure locking antithread file");
61         }
62 }
63
64 static void unlock(int fd, unsigned long off)
65 {
66         struct flock fl;
67         int serrno = errno;
68
69         fl.l_type = F_UNLCK;
70         fl.l_whence = SEEK_SET;
71         fl.l_start = off;
72         fl.l_len = 1;
73
74         fcntl(fd, F_SETLK, &fl);
75         errno = serrno;
76 }
77
78 /* This pointer is in a pool.  Find which one. */
79 static struct at_pool_contents *find_pool(const void *ptr)
80 {
81         struct at_pool_contents *p;
82
83         list_for_each(&pools, p, list) {
84                 /* Special case for initial allocation: ptr *is* pool */
85                 if (ptr == p->atp)
86                         return p;
87
88                 if ((char *)ptr >= (char *)p->pool
89                     && (char *)ptr < (char *)p->pool + p->poolsize)
90                         return p;
91         }
92         abort();
93 }
94
95 static int destroy_pool(struct at_pool_contents *p)
96 {
97         list_del(&p->list);
98         munmap(p->pool, p->poolsize);
99         close(p->fd);
100         close(p->parent_rfd);
101         close(p->parent_wfd);
102         return 0;
103 }
104
105 static void *at_realloc(const void *parent, void *ptr, size_t size)
106 {
107         struct at_pool_contents *p = find_pool(parent);
108         /* FIXME: realloc in ccan/alloc? */
109         void *new;
110
111         if (size == 0) {
112                 alloc_free(p->pool, p->poolsize, ptr);
113                 new = NULL;
114         } else if (ptr == NULL) {
115                 /* FIXME: Alignment */
116                 new = alloc_get(p->pool, p->poolsize, size, 16);
117         } else {
118                 if (size <= alloc_size(p->pool, p->poolsize, ptr))
119                         new = ptr;
120                 else {
121                         new = alloc_get(p->pool, p->poolsize, size, 16);
122                         if (new) {
123                                 memcpy(new, ptr,
124                                        alloc_size(p->pool, p->poolsize, ptr));
125                                 alloc_free(p->pool, p->poolsize, ptr);
126                         }
127                 }
128         }
129
130         return new;
131 }
132
133 static struct at_pool_contents *locked;
134 static void talloc_lock(const void *ptr)
135 {
136         struct at_pool_contents *p = find_pool(ptr);
137
138         lock(p->fd, 0);
139         assert(!locked);
140         locked = p;
141 }
142
143 static void talloc_unlock(void)
144 {
145         struct at_pool_contents *p = locked;
146
147         locked = NULL;
148         unlock(p->fd, 0);
149 }
150
151 /* We add 16MB to size.  This compensates for address randomization. */
152 #define PADDING (16 * 1024 * 1024)
153
154 /* Create a new sharable pool. */
155 struct at_pool *at_pool(unsigned long size)
156 {
157         int fd;
158         struct at_pool *atp;
159         struct at_pool_contents *p;
160         FILE *f;
161
162         /* FIXME: How much should we actually add for overhead?. */
163         size += 32 * getpagesize();
164
165         /* Round up to whole pages. */
166         size = (size + getpagesize()-1) & ~(getpagesize()-1);
167
168         f = tmpfile();
169         if (!f)
170                 return NULL;
171
172         fd = dup(fileno(f));
173         fclose_noerr(f);
174
175         if (fd < 0)
176                 return NULL;
177
178         if (ftruncate(fd, size + PADDING) != 0)
179                 goto fail_close;
180
181         atp = talloc(NULL, struct at_pool);
182         if (!atp)
183                 goto fail_close;
184
185         atp->p = p = talloc(NULL, struct at_pool_contents);
186         if (!p)
187                 goto fail_free;
188
189         /* First map gets a nice big area. */
190         p->pool = mmap(NULL, size+PADDING, PROT_READ|PROT_WRITE, MAP_SHARED, fd,
191                        0);
192         if (p->pool == MAP_FAILED)
193                 goto fail_free;
194
195         /* Then we remap into the middle of it. */
196         munmap(p->pool, size+PADDING);
197         p->pool = mmap((char *)p->pool + PADDING/2, size, PROT_READ|PROT_WRITE,
198                        MAP_SHARED, fd, 0);
199         if (p->pool == MAP_FAILED)
200                 goto fail_free;
201
202         p->fd = fd;
203         p->poolsize = size;
204         p->parent_rfd = p->parent_wfd = -1;
205         p->atp = atp;
206         alloc_init(p->pool, p->poolsize);
207         list_add(&pools, &p->list);
208         talloc_set_destructor(p, destroy_pool);
209
210         atp->ctx = talloc_add_external(atp,
211                                        at_realloc, talloc_lock, talloc_unlock);
212         if (!atp->ctx)
213                 goto fail_free;
214         return atp;
215
216 fail_free:
217         talloc_free(atp);
218 fail_close:
219         close_noerr(fd);
220         return NULL;
221 }
222
223 /* Talloc off this to allocate from within the pool. */
224 const void *at_pool_ctx(struct at_pool *atp)
225 {
226         return atp->ctx;
227 }
228
229 static int cant_destroy_self(struct athread *at)
230 {
231         /* Perhaps this means we want to detach, but it doesn't really
232          * make sense. */
233         abort();
234         return 0;
235 }
236
237 static int destroy_at(struct athread *at)
238 {
239         /* If it is already a zombie, this is harmless. */
240         kill(at->pid, SIGTERM);
241
242         close(at->rfd);
243         close(at->wfd);
244
245         /* FIXME: Should we do SIGKILL if process doesn't exit soon? */
246         if (waitpid(at->pid, NULL, 0) != at->pid)
247                 err(1, "Waiting for athread %p (pid %u)", at, at->pid);
248
249         return 0;
250 }
251
252 /* Sets up thread and forks it.  NULL on error. */
253 static struct athread *fork_thread(struct at_pool *atp)
254 {
255         int p2c[2], c2p[2];
256         struct athread *at;
257         struct at_pool_contents *pool = atp->p;
258
259         /* You can't already be a child of this pool. */
260         if (pool->parent_rfd != -1)
261                 errx(1, "Can't create antithread on this pool: we're one");
262
263         /* We don't want this allocated *in* the pool. */
264         at = talloc_steal(atp, talloc(NULL, struct athread));
265
266         if (pipe(p2c) != 0)
267                 goto free;
268
269         if (pipe(c2p) != 0)
270                 goto close_p2c;
271
272         at->pid = fork();
273         if (at->pid == -1)
274                 goto close_c2p;
275
276         if (at->pid == 0) {
277                 /* Child */
278                 close(c2p[0]);
279                 close(p2c[1]);
280                 pool->parent_rfd = p2c[0];
281                 pool->parent_wfd = c2p[1];
282                 talloc_set_destructor(at, cant_destroy_self);
283         } else {
284                 /* Parent */
285                 close(c2p[1]);
286                 close(p2c[0]);
287                 at->rfd = c2p[0];
288                 at->wfd = p2c[1];
289                 talloc_set_destructor(at, destroy_at);
290         }
291
292         return at;
293 close_c2p:
294         close_noerr(c2p[0]);
295         close_noerr(c2p[1]);
296 close_p2c:
297         close_noerr(p2c[0]);
298         close_noerr(p2c[1]);
299 free:
300         talloc_free(at);
301         return NULL;
302 }
303
304 /* Creating an antithread via fork() */
305 struct athread *_at_run(struct at_pool *atp,
306                         void *(*fn)(struct at_pool *, void *),
307                         void *obj)
308 {
309         struct athread *at;
310
311         at = fork_thread(atp);
312         if (!at)
313                 return NULL;
314
315         if (at->pid == 0) {
316                 /* Child */
317                 at_tell_parent(atp, fn(atp, obj));
318                 exit(0);
319         }
320         /* Parent */
321         return at;
322 }
323
324 static unsigned int num_args(char *const argv[])
325 {
326         unsigned int i;
327
328         for (i = 0; argv[i]; i++);
329         return i;
330 }
331
332 /* Fork and execvp, with added arguments for child to grab. */
333 struct athread *at_spawn(struct at_pool *atp, void *arg, char *cmdline[])
334 {
335         struct athread *at;
336         int err;
337
338         at = fork_thread(atp);
339         if (!at)
340                 return NULL;
341
342         if (at->pid == 0) {
343                 /* child */
344                 char *argv[num_args(cmdline) + 2];
345                 argv[0] = cmdline[0];
346                 argv[1] = talloc_asprintf(NULL, "AT:%p/%lu/%i/%i/%i/%p",
347                                           atp->p->pool, atp->p->poolsize,
348                                           atp->p->fd, atp->p->parent_rfd,
349                                           atp->p->parent_wfd, arg);
350                 /* Copy including NULL terminator. */
351                 memcpy(&argv[2], &cmdline[1], num_args(cmdline)*sizeof(char *));
352                 execvp(argv[0], argv);
353
354                 err = errno;
355                 write_all(atp->p->parent_wfd, &err, sizeof(err));
356                 exit(1);
357         }
358
359         /* Child should always write an error code (or 0). */
360         if (read(at->rfd, &err, sizeof(err)) != sizeof(err)) {
361                 errno = ECHILD;
362                 talloc_free(at);
363                 return NULL;
364         }
365         if (err != 0) {
366                 errno = err;
367                 talloc_free(at);
368                 return NULL;
369         }
370         return at;
371 }
372
373 /* The fd to poll on */
374 int at_fd(struct athread *at)
375 {
376         return at->rfd;
377 }
378
379 /* What's the antithread saying?  Blocks if fd not ready. */
380 void *at_read(struct athread *at)
381 {
382         void *ret;
383
384         switch (read(at->rfd, &ret, sizeof(ret))) {
385         case -1:
386                 err(1, "Reading from athread %p (pid %u)", at, at->pid);
387         case 0:
388                 /* Thread died. */
389                 return NULL;
390         case sizeof(ret):
391                 return ret;
392         default:
393                 /* Should never happen. */
394                 err(1, "Short read from athread %p (pid %u)", at, at->pid);
395         }
396 }
397
398 /* Say something to a child. */
399 void at_tell(struct athread *at, const void *status)
400 {
401         if (write(at->wfd, &status, sizeof(status)) != sizeof(status))
402                 err(1, "Failure writing to athread %p (pid %u)", at, at->pid);
403 }
404
405 /* For child to grab arguments from command line (removes them) */
406 struct at_pool *at_get_pool(int *argc, char *argv[], void **arg)
407 {
408         struct at_pool *atp = talloc(NULL, struct at_pool);
409         struct at_pool_contents *p;
410         void *map;
411         int err;
412
413         if (!argv[1]) {
414                 errno = EINVAL;
415                 goto fail;
416         }
417
418         /* If they don't care, use dummy value. */
419         if (arg == NULL)
420                 arg = &map;
421
422         p = atp->p = talloc(atp, struct at_pool_contents);
423
424         if (sscanf(argv[1], "AT:%p/%lu/%i/%i/%i/%p",
425                    &p->pool, &p->poolsize, &p->fd,
426                    &p->parent_rfd, &p->parent_wfd, arg) != 6) {
427                 errno = EINVAL;
428                 goto fail;
429         }
430
431         /* FIXME: To try to adjust for address space randomization, we
432          * could re-exec a few times. */
433         map = mmap(p->pool, p->poolsize, PROT_READ|PROT_WRITE, MAP_SHARED,
434                    p->fd, 0);
435         if (map != p->pool) {
436                 fprintf(stderr, "Mapping %lu bytes @%p gave %p\n",
437                         p->poolsize, p->pool, map);
438                 errno = ENOMEM;
439                 goto fail;
440         }
441
442         list_add(&pools, &p->list);
443         talloc_set_destructor(p, destroy_pool);
444         p->atp = atp;
445
446         atp->ctx = talloc_add_external(atp,
447                                        at_realloc, talloc_lock, talloc_unlock);
448         if (!atp->ctx)
449                 goto fail;
450
451         /* Tell parent we're good. */
452         err = 0;
453         if (write(p->parent_wfd, &err, sizeof(err)) != sizeof(err)) {
454                 errno = EBADF;
455                 goto fail;
456         }
457
458         /* Delete AT arg. */
459         memmove(&argv[1], &argv[2], --(*argc));
460
461         return atp;
462
463 fail:
464         talloc_free(atp);
465         return NULL;
466 }
467
468 /* Say something to our parent (async). */
469 void at_tell_parent(struct at_pool *atp, const void *status)
470 {
471         if (atp->p->parent_wfd == -1)
472                 errx(1, "This process is not an antithread of this pool");
473
474         if (write(atp->p->parent_wfd, &status, sizeof(status))!=sizeof(status))
475                 err(1, "Failure writing to parent");
476 }
477
478 /* What's the parent saying?  Blocks if fd not ready. */
479 void *at_read_parent(struct at_pool *atp)
480 {
481         void *ret;
482
483         if (atp->p->parent_rfd == -1)
484                 errx(1, "This process is not an antithread of this pool");
485
486         switch (read(atp->p->parent_rfd, &ret, sizeof(ret))) {
487         case -1:
488                 err(1, "Reading from parent");
489         case 0:
490                 /* Parent died. */
491                 return NULL;
492         case sizeof(ret):
493                 return ret;
494         default:
495                 /* Should never happen. */
496                 err(1, "Short read from parent");
497         }
498 }
499
500 /* The fd to poll on */
501 int at_parent_fd(struct at_pool *atp)
502 {
503         if (atp->p->parent_rfd == -1)
504                 errx(1, "This process is not an antithread of this pool");
505
506         return atp->p->parent_rfd;
507 }
508
509 /* FIXME: Futexme. */
510 void at_lock(void *obj)
511 {
512         struct at_pool *atp = talloc_find_parent_bytype(obj, struct at_pool);
513 #if 0
514         unsigned int *l;
515
516         /* This isn't required yet, but ensures it's a talloc ptr */
517         l = talloc_lock_ptr(obj);
518 #endif
519
520         lock(atp->p->fd, (char *)obj - (char *)atp->p->pool);
521
522 #if 0
523         if (*l)
524                 errx(1, "Object %p was already locked (something died?)", obj);
525         *l = 1;
526 #endif
527 }
528
529 void at_unlock(void *obj)
530 {
531         struct at_pool *atp = talloc_find_parent_bytype(obj, struct at_pool);
532 #if 0
533         unsigned int *l;
534
535         l = talloc_lock_ptr(obj);
536         if (!*l)
537                 errx(1, "Object %p was already unlocked", obj);
538         *l = 0;
539 #endif
540         unlock(atp->p->fd, (char *)obj - (char *)atp->p->pool);
541 }
542
543 void at_lock_all(struct at_pool *atp)
544 {
545         lock(atp->p->fd, 0);
546 }
547         
548 void at_unlock_all(struct at_pool *atp)
549 {
550         unlock(atp->p->fd, 0);
551 }