]> git.ozlabs.org Git - ccan/blob - ccan/antithread/antithread.c
tdb2: begin tdb1 to tdb2 porting guide.
[ccan] / ccan / antithread / antithread.c
1 #include <stdlib.h>
2 #include <unistd.h>
3 #include <fcntl.h>
4 #include <stdbool.h>
5 #include <string.h>
6 #include <sys/mman.h>
7 #include <sys/types.h>
8 #include <sys/wait.h>
9 #include <errno.h>
10 #include <assert.h>
11 #include <err.h>
12 #include "antithread.h"
13 #include <ccan/noerr/noerr.h>
14 #include <ccan/talloc/talloc.h>
15 #include <ccan/read_write_all/read_write_all.h>
16 #include <ccan/alloc/alloc.h>
17 #include <ccan/list/list.h>
18
19 /* FIXME: Valgrind support should be possible for some cases.  Tricky
20  * case is where another process allocates for you, but at worst we
21  * could reset what is valid and what isn't on every entry into the
22  * library or something. */
23
24 static LIST_HEAD(pools);
25
26 /* Talloc destroys parents before children (damn Tridge's failing destructors!)
27  * so we need the first child (ie. last-destroyed) to actually clean up. */
28 struct at_pool_contents {
29         struct list_node list;
30         void *pool;
31         unsigned long poolsize;
32         int fd;
33         int parent_rfd, parent_wfd;
34         struct at_pool *atp;
35 };
36
37 struct at_pool {
38         struct at_pool_contents *p;
39         const void *ctx;
40 };
41
42 struct athread {
43         pid_t pid;
44         int rfd, wfd;
45 };
46
47 /* FIXME: Better locking through futexes. */
48 static void lock(int fd, unsigned long off)
49 {
50         struct flock fl;
51
52         fl.l_type = F_WRLCK;
53         fl.l_whence = SEEK_SET;
54         fl.l_start = off;
55         fl.l_len = 1;
56
57         while (fcntl(fd, F_SETLKW, &fl) < 0) {
58                 if (errno != EINTR)
59                         err(1, "Failure locking antithread file");
60         }
61 }
62
63 static void unlock(int fd, unsigned long off)
64 {
65         struct flock fl;
66         int serrno = errno;
67
68         fl.l_type = F_UNLCK;
69         fl.l_whence = SEEK_SET;
70         fl.l_start = off;
71         fl.l_len = 1;
72
73         fcntl(fd, F_SETLK, &fl);
74         errno = serrno;
75 }
76
77 /* This pointer is in a pool.  Find which one. */
78 static struct at_pool_contents *find_pool(const void *ptr)
79 {
80         struct at_pool_contents *p;
81
82         list_for_each(&pools, p, list) {
83                 /* Special case for initial allocation: ptr *is* pool */
84                 if (ptr == p->atp)
85                         return p;
86
87                 if ((char *)ptr >= (char *)p->pool
88                     && (char *)ptr < (char *)p->pool + p->poolsize)
89                         return p;
90         }
91         abort();
92 }
93
94 static int destroy_pool(struct at_pool_contents *p)
95 {
96         list_del(&p->list);
97         munmap(p->pool, p->poolsize);
98         close(p->fd);
99         close(p->parent_rfd);
100         close(p->parent_wfd);
101         return 0;
102 }
103
104 static void *at_realloc(const void *parent, void *ptr, size_t size)
105 {
106         struct at_pool_contents *p = find_pool(parent);
107         /* FIXME: realloc in ccan/alloc? */
108         void *new;
109
110         if (size == 0) {
111                 alloc_free(p->pool, p->poolsize, ptr);
112                 new = NULL;
113         } else if (ptr == NULL) {
114                 /* FIXME: Alignment */
115                 new = alloc_get(p->pool, p->poolsize, size, 16);
116         } else {
117                 if (size <= alloc_size(p->pool, p->poolsize, ptr))
118                         new = ptr;
119                 else {
120                         new = alloc_get(p->pool, p->poolsize, size, 16);
121                         if (new) {
122                                 memcpy(new, ptr,
123                                        alloc_size(p->pool, p->poolsize, ptr));
124                                 alloc_free(p->pool, p->poolsize, ptr);
125                         }
126                 }
127         }
128
129         return new;
130 }
131
132 static struct at_pool_contents *locked;
133 static void talloc_lock(const void *ptr)
134 {
135         struct at_pool_contents *p = find_pool(ptr);
136
137         lock(p->fd, 0);
138         assert(!locked);
139         locked = p;
140 }
141
142 static void talloc_unlock(void)
143 {
144         struct at_pool_contents *p = locked;
145
146         locked = NULL;
147         unlock(p->fd, 0);
148 }
149
150 /* We add 16MB to size.  This compensates for address randomization. */
151 #define PADDING (16 * 1024 * 1024)
152
153 /* Create a new sharable pool. */
154 struct at_pool *at_pool(unsigned long size)
155 {
156         int fd;
157         struct at_pool *atp;
158         struct at_pool_contents *p;
159         FILE *f;
160
161         /* FIXME: How much should we actually add for overhead?. */
162         size += 32 * getpagesize();
163
164         /* Round up to whole pages. */
165         size = (size + getpagesize()-1) & ~(getpagesize()-1);
166
167         f = tmpfile();
168         if (!f)
169                 return NULL;
170
171         fd = dup(fileno(f));
172         fclose_noerr(f);
173
174         if (fd < 0)
175                 return NULL;
176
177         if (ftruncate(fd, size + PADDING) != 0)
178                 goto fail_close;
179
180         atp = talloc(NULL, struct at_pool);
181         if (!atp)
182                 goto fail_close;
183
184         atp->p = p = talloc(NULL, struct at_pool_contents);
185         if (!p)
186                 goto fail_free;
187
188         /* First map gets a nice big area. */
189         p->pool = mmap(NULL, size+PADDING, PROT_READ|PROT_WRITE, MAP_SHARED, fd,
190                        0);
191         if (p->pool == MAP_FAILED)
192                 goto fail_free;
193
194         /* Then we remap into the middle of it. */
195         munmap(p->pool, size+PADDING);
196         p->pool = mmap(p->pool + PADDING/2, size, PROT_READ|PROT_WRITE,
197                        MAP_SHARED, fd, 0);
198         if (p->pool == MAP_FAILED)
199                 goto fail_free;
200
201         p->fd = fd;
202         p->poolsize = size;
203         p->parent_rfd = p->parent_wfd = -1;
204         p->atp = atp;
205         alloc_init(p->pool, p->poolsize);
206         list_add(&pools, &p->list);
207         talloc_set_destructor(p, destroy_pool);
208
209         atp->ctx = talloc_add_external(atp,
210                                        at_realloc, talloc_lock, talloc_unlock);
211         if (!atp->ctx)
212                 goto fail_free;
213         return atp;
214
215 fail_free:
216         talloc_free(atp);
217 fail_close:
218         close_noerr(fd);
219         return NULL;
220 }
221
222 /* Talloc off this to allocate from within the pool. */
223 const void *at_pool_ctx(struct at_pool *atp)
224 {
225         return atp->ctx;
226 }
227
228 static int cant_destroy_self(struct athread *at)
229 {
230         /* Perhaps this means we want to detach, but it doesn't really
231          * make sense. */
232         abort();
233         return 0;
234 }
235
236 static int destroy_at(struct athread *at)
237 {
238         /* If it is already a zombie, this is harmless. */
239         kill(at->pid, SIGTERM);
240
241         close(at->rfd);
242         close(at->wfd);
243
244         /* FIXME: Should we do SIGKILL if process doesn't exit soon? */
245         if (waitpid(at->pid, NULL, 0) != at->pid)
246                 err(1, "Waiting for athread %p (pid %u)", at, at->pid);
247
248         return 0;
249 }
250
251 /* Sets up thread and forks it.  NULL on error. */
252 static struct athread *fork_thread(struct at_pool *atp)
253 {
254         int p2c[2], c2p[2];
255         struct athread *at;
256         struct at_pool_contents *pool = atp->p;
257
258         /* You can't already be a child of this pool. */
259         if (pool->parent_rfd != -1)
260                 errx(1, "Can't create antithread on this pool: we're one");
261
262         /* We don't want this allocated *in* the pool. */
263         at = talloc_steal(atp, talloc(NULL, struct athread));
264
265         if (pipe(p2c) != 0)
266                 goto free;
267
268         if (pipe(c2p) != 0)
269                 goto close_p2c;
270
271         at->pid = fork();
272         if (at->pid == -1)
273                 goto close_c2p;
274
275         if (at->pid == 0) {
276                 /* Child */
277                 close(c2p[0]);
278                 close(p2c[1]);
279                 pool->parent_rfd = p2c[0];
280                 pool->parent_wfd = c2p[1];
281                 talloc_set_destructor(at, cant_destroy_self);
282         } else {
283                 /* Parent */
284                 close(c2p[1]);
285                 close(p2c[0]);
286                 at->rfd = c2p[0];
287                 at->wfd = p2c[1];
288                 talloc_set_destructor(at, destroy_at);
289         }
290
291         return at;
292 close_c2p:
293         close_noerr(c2p[0]);
294         close_noerr(c2p[1]);
295 close_p2c:
296         close_noerr(p2c[0]);
297         close_noerr(p2c[1]);
298 free:
299         talloc_free(at);
300         return NULL;
301 }
302
303 /* Creating an antithread via fork() */
304 struct athread *_at_run(struct at_pool *atp,
305                         void *(*fn)(struct at_pool *, void *),
306                         void *obj)
307 {
308         struct athread *at;
309
310         at = fork_thread(atp);
311         if (!at)
312                 return NULL;
313
314         if (at->pid == 0) {
315                 /* Child */
316                 at_tell_parent(atp, fn(atp, obj));
317                 exit(0);
318         }
319         /* Parent */
320         return at;
321 }
322
323 static unsigned int num_args(char *const argv[])
324 {
325         unsigned int i;
326
327         for (i = 0; argv[i]; i++);
328         return i;
329 }
330
331 /* Fork and execvp, with added arguments for child to grab. */
332 struct athread *at_spawn(struct at_pool *atp, void *arg, char *cmdline[])
333 {
334         struct athread *at;
335         int err;
336
337         at = fork_thread(atp);
338         if (!at)
339                 return NULL;
340
341         if (at->pid == 0) {
342                 /* child */
343                 char *argv[num_args(cmdline) + 2];
344                 argv[0] = cmdline[0];
345                 argv[1] = talloc_asprintf(NULL, "AT:%p/%lu/%i/%i/%i/%p",
346                                           atp->p->pool, atp->p->poolsize,
347                                           atp->p->fd, atp->p->parent_rfd,
348                                           atp->p->parent_wfd, arg);
349                 /* Copy including NULL terminator. */
350                 memcpy(&argv[2], &cmdline[1], num_args(cmdline)*sizeof(char *));
351                 execvp(argv[0], argv);
352
353                 err = errno;
354                 write_all(atp->p->parent_wfd, &err, sizeof(err));
355                 exit(1);
356         }
357
358         /* Child should always write an error code (or 0). */
359         if (read(at->rfd, &err, sizeof(err)) != sizeof(err)) {
360                 errno = ECHILD;
361                 talloc_free(at);
362                 return NULL;
363         }
364         if (err != 0) {
365                 errno = err;
366                 talloc_free(at);
367                 return NULL;
368         }
369         return at;
370 }
371
372 /* The fd to poll on */
373 int at_fd(struct athread *at)
374 {
375         return at->rfd;
376 }
377
378 /* What's the antithread saying?  Blocks if fd not ready. */
379 void *at_read(struct athread *at)
380 {
381         void *ret;
382
383         switch (read(at->rfd, &ret, sizeof(ret))) {
384         case -1:
385                 err(1, "Reading from athread %p (pid %u)", at, at->pid);
386         case 0:
387                 /* Thread died. */
388                 return NULL;
389         case sizeof(ret):
390                 return ret;
391         default:
392                 /* Should never happen. */
393                 err(1, "Short read from athread %p (pid %u)", at, at->pid);
394         }
395 }
396
397 /* Say something to a child. */
398 void at_tell(struct athread *at, const void *status)
399 {
400         if (write(at->wfd, &status, sizeof(status)) != sizeof(status))
401                 err(1, "Failure writing to athread %p (pid %u)", at, at->pid);
402 }
403
404 /* For child to grab arguments from command line (removes them) */
405 struct at_pool *at_get_pool(int *argc, char *argv[], void **arg)
406 {
407         struct at_pool *atp = talloc(NULL, struct at_pool);
408         struct at_pool_contents *p;
409         void *map;
410         int err;
411
412         if (!argv[1]) {
413                 errno = EINVAL;
414                 goto fail;
415         }
416
417         /* If they don't care, use dummy value. */
418         if (arg == NULL)
419                 arg = &map;
420
421         p = atp->p = talloc(atp, struct at_pool_contents);
422
423         if (sscanf(argv[1], "AT:%p/%lu/%i/%i/%i/%p", 
424                    &p->pool, &p->poolsize, &p->fd,
425                    &p->parent_rfd, &p->parent_wfd, arg) != 6) {
426                 errno = EINVAL;
427                 goto fail;
428         }
429
430         /* FIXME: To try to adjust for address space randomization, we
431          * could re-exec a few times. */
432         map = mmap(p->pool, p->poolsize, PROT_READ|PROT_WRITE, MAP_SHARED,
433                    p->fd, 0);
434         if (map != p->pool) {
435                 fprintf(stderr, "Mapping %lu bytes @%p gave %p\n",
436                         p->poolsize, p->pool, map);
437                 errno = ENOMEM;
438                 goto fail;
439         }
440
441         list_add(&pools, &p->list);
442         talloc_set_destructor(p, destroy_pool);
443         p->atp = atp;
444
445         atp->ctx = talloc_add_external(atp,
446                                        at_realloc, talloc_lock, talloc_unlock);
447         if (!atp->ctx)
448                 goto fail;
449
450         /* Tell parent we're good. */
451         err = 0;
452         if (write(p->parent_wfd, &err, sizeof(err)) != sizeof(err)) {
453                 errno = EBADF;
454                 goto fail;
455         }
456
457         /* Delete AT arg. */
458         memmove(&argv[1], &argv[2], --(*argc));
459
460         return atp;
461
462 fail:
463         talloc_free(atp);
464         return NULL;
465 }
466
467 /* Say something to our parent (async). */
468 void at_tell_parent(struct at_pool *atp, const void *status)
469 {
470         if (atp->p->parent_wfd == -1)
471                 errx(1, "This process is not an antithread of this pool");
472
473         if (write(atp->p->parent_wfd, &status, sizeof(status))!=sizeof(status))
474                 err(1, "Failure writing to parent");
475 }
476
477 /* What's the parent saying?  Blocks if fd not ready. */
478 void *at_read_parent(struct at_pool *atp)
479 {
480         void *ret;
481
482         if (atp->p->parent_rfd == -1)
483                 errx(1, "This process is not an antithread of this pool");
484
485         switch (read(atp->p->parent_rfd, &ret, sizeof(ret))) {
486         case -1:
487                 err(1, "Reading from parent");
488         case 0:
489                 /* Parent died. */
490                 return NULL;
491         case sizeof(ret):
492                 return ret;
493         default:
494                 /* Should never happen. */
495                 err(1, "Short read from parent");
496         }
497 }
498
499 /* The fd to poll on */
500 int at_parent_fd(struct at_pool *atp)
501 {
502         if (atp->p->parent_rfd == -1)
503                 errx(1, "This process is not an antithread of this pool");
504
505         return atp->p->parent_rfd;
506 }
507
508 /* FIXME: Futexme. */
509 void at_lock(void *obj)
510 {
511         struct at_pool *atp = talloc_find_parent_bytype(obj, struct at_pool);
512 #if 0
513         unsigned int *l;
514
515         /* This isn't required yet, but ensures it's a talloc ptr */
516         l = talloc_lock_ptr(obj);
517 #endif
518
519         lock(atp->p->fd, (char *)obj - (char *)atp->p->pool);
520
521 #if 0
522         if (*l)
523                 errx(1, "Object %p was already locked (something died?)", obj);
524         *l = 1;
525 #endif
526 }
527
528 void at_unlock(void *obj)
529 {
530         struct at_pool *atp = talloc_find_parent_bytype(obj, struct at_pool);
531 #if 0
532         unsigned int *l;
533
534         l = talloc_lock_ptr(obj);
535         if (!*l)
536                 errx(1, "Object %p was already unlocked", obj);
537         *l = 0;
538 #endif
539         unlock(atp->p->fd, (char *)obj - (char *)atp->p->pool);
540 }
541
542 void at_lock_all(struct at_pool *atp)
543 {
544         lock(atp->p->fd, 0);
545 }
546         
547 void at_unlock_all(struct at_pool *atp)
548 {
549         unlock(atp->p->fd, 0);
550 }