]> git.ozlabs.org Git - ccan/blob - ccan/tdb/tools/replay_trace.c
d58125b355e0ebe1a3c5f959bbe5aed95c0563f7
[ccan] / ccan / tdb / tools / replay_trace.c
1 #include <ccan/tdb/tdb.h>
2 #include <ccan/grab_file/grab_file.h>
3 #include <ccan/hash/hash.h>
4 #include <ccan/talloc/talloc.h>
5 #include <ccan/str_talloc/str_talloc.h>
6 #include <ccan/str/str.h>
7 #include <ccan/list/list.h>
8 #include <err.h>
9 #include <ctype.h>
10 #include <string.h>
11 #include <unistd.h>
12 #include <sys/types.h>
13 #include <sys/wait.h>
14 #include <sys/time.h>
15 #include <errno.h>
16 #include <signal.h>
17 #include <assert.h>
18
19 #define STRINGIFY2(x) #x
20 #define STRINGIFY(x) STRINGIFY2(x)
21
22 /* Avoid mod by zero */
23 static unsigned int total_keys = 1;
24
25 /* #define DEBUG_DEPS 1 */
26
27 /* Traversals block transactions in the current implementation. */
28 #define TRAVERSALS_TAKE_TRANSACTION_LOCK 1
29
30 struct pipe {
31         int fd[2];
32 };
33 static struct pipe *pipes;
34
35 static void __attribute__((noreturn)) fail(const char *filename,
36                                            unsigned int line,
37                                            const char *fmt, ...)
38 {
39         va_list ap;
40
41         va_start(ap, fmt);
42         fprintf(stderr, "%s:%u: FAIL: ", filename, line);
43         vfprintf(stderr, fmt, ap);
44         fprintf(stderr, "\n");
45         va_end(ap);
46         exit(1);
47 }
48         
49 /* Try or die. */
50 #define try(expr, expect)                                               \
51         do {                                                            \
52                 int ret = (expr);                                       \
53                 if (ret != (expect))                                    \
54                         fail(filename[file], i+1,                       \
55                              STRINGIFY(expr) "= %i", ret);              \
56         } while (0)
57
58 /* Try or imitate results. */
59 #define unreliable(expr, expect, force, undo)                           \
60         do {                                                            \
61                 int ret = expr;                                         \
62                 if (ret != expect) {                                    \
63                         fprintf(stderr, "%s:%u: %s gave %i not %i",     \
64                                 filename[file], i+1, STRINGIFY(expr),   \
65                                 ret, expect);                           \
66                         if (expect == 0)                                \
67                                 force;                                  \
68                         else                                            \
69                                 undo;                                   \
70                 }                                                       \
71         } while (0)
72
73 static bool key_eq(TDB_DATA a, TDB_DATA b)
74 {
75         if (a.dsize != b.dsize)
76                 return false;
77         return memcmp(a.dptr, b.dptr, a.dsize) == 0;
78 }
79
80 /* This is based on the hash algorithm from gdbm */
81 static unsigned int hash_key(TDB_DATA *key)
82 {
83         uint32_t value; /* Used to compute the hash value.  */
84         uint32_t   i;   /* Used to cycle through random values. */
85
86         /* Set the initial value from the key size. */
87         for (value = 0x238F13AF ^ key->dsize, i=0; i < key->dsize; i++)
88                 value = (value + (key->dptr[i] << (i*5 % 24)));
89
90         return (1103515243 * value + 12345);  
91 }
92
93 enum op_type {
94         OP_TDB_LOCKALL,
95         OP_TDB_LOCKALL_MARK,
96         OP_TDB_LOCKALL_UNMARK,
97         OP_TDB_LOCKALL_NONBLOCK,
98         OP_TDB_UNLOCKALL,
99         OP_TDB_LOCKALL_READ,
100         OP_TDB_LOCKALL_READ_NONBLOCK,
101         OP_TDB_UNLOCKALL_READ,
102         OP_TDB_CHAINLOCK,
103         OP_TDB_CHAINLOCK_NONBLOCK,
104         OP_TDB_CHAINLOCK_MARK,
105         OP_TDB_CHAINLOCK_UNMARK,
106         OP_TDB_CHAINUNLOCK,
107         OP_TDB_CHAINLOCK_READ,
108         OP_TDB_CHAINUNLOCK_READ,
109         OP_TDB_PARSE_RECORD,
110         OP_TDB_EXISTS,
111         OP_TDB_STORE,
112         OP_TDB_APPEND,
113         OP_TDB_GET_SEQNUM,
114         OP_TDB_WIPE_ALL,
115         OP_TDB_TRANSACTION_START,
116         OP_TDB_TRANSACTION_CANCEL,
117         OP_TDB_TRANSACTION_COMMIT,
118         OP_TDB_TRAVERSE_READ_START,
119         OP_TDB_TRAVERSE_START,
120         OP_TDB_TRAVERSE_END,
121         OP_TDB_TRAVERSE,
122         OP_TDB_FIRSTKEY,
123         OP_TDB_NEXTKEY,
124         OP_TDB_FETCH,
125         OP_TDB_DELETE,
126 };
127
128 struct op {
129         unsigned int serial;
130         enum op_type op;
131         TDB_DATA key;
132         TDB_DATA data;
133         int ret;
134
135         /* Who is waiting for us? */
136         struct list_head post;
137         /* What are we waiting for? */
138         struct list_head pre;
139
140         /* If I'm part of a group (traverse/transaction) where is
141          * start?  (Otherwise, 0) */
142         unsigned int group_start;
143
144         union {
145                 int flag; /* open and store */
146                 struct {  /* append */
147                         TDB_DATA pre;
148                         TDB_DATA post;
149                 } append;
150                 unsigned int group_len; /* transaction/traverse start */
151         };
152 };
153
154 static unsigned char hex_char(const char *filename, unsigned int line, char c)
155 {
156         c = toupper(c);
157         if (c >= 'A' && c <= 'F')
158                 return c - 'A' + 10;
159         if (c >= '0' && c <= '9')
160                 return c - '0';
161         fail(filename, line, "invalid hex character '%c'", c);
162 }
163
164 /* TDB data is <size>:<%02x>* */
165 static TDB_DATA make_tdb_data(const void *ctx,
166                               const char *filename, unsigned int line,
167                               const char *word)
168 {
169         TDB_DATA data;
170         unsigned int i;
171         const char *p;
172
173         if (streq(word, "NULL"))
174                 return tdb_null;
175
176         data.dsize = atoi(word);
177         data.dptr = talloc_array(ctx, unsigned char, data.dsize);
178         p = strchr(word, ':');
179         if (!p)
180                 fail(filename, line, "invalid tdb data '%s'", word);
181         p++;
182         for (i = 0; i < data.dsize; i++)
183                 data.dptr[i] = hex_char(filename, line, p[i*2])*16
184                         + hex_char(filename, line, p[i*2+1]);
185
186         return data;
187 }
188
189 static void add_op(const char *filename, struct op **op, unsigned int i,
190                    unsigned int serial, enum op_type type)
191 {
192         struct op *new;
193         *op = talloc_realloc(NULL, *op, struct op, i+1);
194         new = (*op) + i;
195         new->op = type;
196         new->serial = serial;
197         new->ret = 0;
198         new->group_start = 0;
199 }
200
201 static void op_add_nothing(const char *filename,
202                            struct op op[], unsigned int op_num, char *words[])
203 {
204         if (words[2])
205                 fail(filename, op_num+1, "Expected no arguments");
206         op[op_num].key = tdb_null;
207 }
208
209 static void op_add_key(const char *filename,
210                        struct op op[], unsigned int op_num, char *words[])
211 {
212         if (words[2] == NULL || words[3])
213                 fail(filename, op_num+1, "Expected just a key");
214
215         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
216         if (op[op_num].op != OP_TDB_TRAVERSE)
217                 total_keys++;
218 }
219
220 static void op_add_key_ret(const char *filename,
221                            struct op op[], unsigned int op_num, char *words[])
222 {
223         if (!words[2] || !words[3] || !words[4] || words[5]
224             || !streq(words[3], "="))
225                 fail(filename, op_num+1, "Expected <key> = <ret>");
226         op[op_num].ret = atoi(words[4]);
227         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
228         /* May only be a unique key if it fails */
229         if (op[op_num].ret != 0)
230                 total_keys++;
231 }
232
233 static void op_add_key_data(const char *filename,
234                             struct op op[], unsigned int op_num, char *words[])
235 {
236         if (!words[2] || !words[3] || !words[4] || words[5]
237             || !streq(words[3], "="))
238                 fail(filename, op_num+1, "Expected <key> = <data>");
239         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
240         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[4]);
241         /* May only be a unique key if it fails */
242         if (!op[op_num].data.dptr)
243                 total_keys++;
244 }
245
246 /* <serial> tdb_store <rec> <rec> <flag> = <ret> */
247 static void op_add_store(const char *filename,
248                          struct op op[], unsigned int op_num, char *words[])
249 {
250         if (!words[2] || !words[3] || !words[4] || !words[5] || !words[6]
251             || words[7] || !streq(words[5], "="))
252                 fail(filename, op_num+1, "Expect <key> <data> <flag> = <ret>");
253
254         op[op_num].flag = strtoul(words[4], NULL, 0);
255         op[op_num].ret = atoi(words[6]);
256         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
257         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[3]);
258         total_keys++;
259 }
260
261 /* <serial> tdb_append <rec> <rec> = <rec> */
262 static void op_add_append(const char *filename,
263                           struct op op[], unsigned int op_num, char *words[])
264 {
265         if (!words[2] || !words[3] || !words[4] || !words[5] || words[6]
266             || !streq(words[4], "="))
267                 fail(filename, op_num+1, "Expect <key> <data> = <rec>");
268
269         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
270         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[3]);
271
272         op[op_num].append.post
273                 = make_tdb_data(op, filename, op_num+1, words[5]);
274
275         /* By subtraction, figure out what previous data was. */
276         op[op_num].append.pre.dptr = op[op_num].append.post.dptr;
277         op[op_num].append.pre.dsize
278                 = op[op_num].append.post.dsize - op[op_num].data.dsize;
279         total_keys++;
280 }
281
282 /* <serial> tdb_get_seqnum = <ret> */
283 static void op_add_seqnum(const char *filename,
284                           struct op op[], unsigned int op_num, char *words[])
285 {
286         if (!words[2] || !words[3] || words[4] || !streq(words[2], "="))
287                 fail(filename, op_num+1, "Expect = <ret>");
288
289         op[op_num].key = tdb_null;
290         op[op_num].ret = atoi(words[3]);
291 }
292
293 static void op_add_traverse(const char *filename,
294                             struct op op[], unsigned int op_num, char *words[])
295 {
296         if (words[2])
297                 fail(filename, op_num+1, "Expect no arguments");
298
299         op[op_num].key = tdb_null;
300         op[op_num].group_len = 0;
301 }
302
303 static void op_add_transaction(const char *filename, struct op op[],
304                                unsigned int op_num, char *words[])
305 {
306         if (words[2])
307                 fail(filename, op_num+1, "Expect no arguments");
308
309         op[op_num].key = tdb_null;
310         op[op_num].group_len = 0;
311 }
312
313 static void op_analyze_transaction(const char *filename,
314                                    struct op op[], unsigned int op_num,
315                                    char *words[])
316 {
317         int i, start;
318
319         op[op_num].key = tdb_null;
320
321         if (words[2])
322                 fail(filename, op_num+1, "Expect no arguments");
323
324         for (i = op_num-1; i >= 0; i--) {
325                 if (op[i].op == OP_TDB_TRANSACTION_START && !op[i].group_len)
326                         break;
327         }
328
329         if (i < 0)
330                 fail(filename, op_num+1, "no transaction start found");
331
332         start = i;
333         op[start].group_len = op_num - i;
334
335         /* This rolls in nested transactions.  I think that's right. */
336         for (i++; i <= op_num; i++)
337                 op[i].group_start = start;
338 }
339
340 struct traverse_hash {
341         TDB_DATA key;
342         unsigned int index;
343 };
344
345 static void op_analyze_traverse(const char *filename,
346                                 struct op op[], unsigned int op_num,
347                                 char *words[])
348 {
349         int i, start;
350
351         op[op_num].key = tdb_null;
352
353         /* = %u means traverse function terminated. */
354         if (words[2]) {
355                 if (!streq(words[2], "=") || !words[3] || words[4])
356                         fail(filename, op_num+1, "expect = <num>");
357                 op[op_num].ret = atoi(words[3]);
358         } else
359                 op[op_num].ret = 0;
360
361         for (i = op_num-1; i >= 0; i--) {
362                 if (op[i].op != OP_TDB_TRAVERSE_READ_START
363                     && op[i].op != OP_TDB_TRAVERSE_START)
364                         continue;
365                 if (op[i].group_len)
366                         continue;
367                 break;
368         }
369
370         if (i < 0)
371                 fail(filename, op_num+1, "no traversal start found");
372
373         start = i;
374         op[start].group_len = op_num - start;
375
376         for (i = start; i <= op_num; i++)
377                 op[i].group_start = start;
378 }
379
380 /* Keep -Wmissing-declarations happy: */
381 const struct op_table *
382 find_keyword (register const char *str, register unsigned int len);
383
384 #include "keywords.c"
385
386 struct depend {
387         /* We can have more than one */
388         struct list_node pre_list;
389         struct list_node post_list;
390         unsigned int needs_file;
391         unsigned int needs_opnum;
392         unsigned int satisfies_file;
393         unsigned int satisfies_opnum;
394 };
395
396 static void check_deps(const char *filename, struct op op[], unsigned int num)
397 {
398 #ifdef DEBUG_DEPS
399         unsigned int i;
400
401         for (i = 1; i < num; i++)
402                 if (!list_empty(&op[i].pre))
403                         fail(filename, i+1, "Still has dependencies");
404 #endif
405 }
406
407 static void dump_pre(char *filename[], unsigned int file,
408                      struct op op[], unsigned int i)
409 {
410         struct depend *dep;
411
412         printf("%s:%u still waiting for:\n", filename[file], i+1);
413         list_for_each(&op[i].pre, dep, pre_list)
414                 printf("    %s:%u\n",
415                        filename[dep->satisfies_file], dep->satisfies_opnum+1);
416         check_deps(filename[file], op, i);
417 }
418
419 /* We simply read/write pointers, since we all are children. */
420 static void do_pre(char *filename[], unsigned int file, int pre_fd,
421                    struct op op[], unsigned int i)
422 {
423         while (!list_empty(&op[i].pre)) {
424                 struct depend *dep;
425
426 #if DEBUG_DEPS
427                 printf("%s:%u:waiting for pre\n", filename[file], i+1);
428                 fflush(stdout);
429 #endif
430                 alarm(10);
431                 while (read(pre_fd, &dep, sizeof(dep)) != sizeof(dep)) {
432                         if (errno == EINTR) {
433                                 dump_pre(filename, file, op, i);
434                                 exit(1);
435                         } else
436                                 errx(1, "Reading from pipe");
437                 }
438                 alarm(0);
439
440 #if DEBUG_DEPS
441                 printf("%s:%u:got pre %u from %s:%u\n", filename[file], i+1,
442                        dep->needs_opnum+1, filename[dep->satisfies_file],
443                        dep->satisfies_opnum+1);
444                 fflush(stdout);
445 #endif
446                 /* This could be any op, not just this one. */
447                 talloc_free(dep);
448         }
449 }
450
451 static void do_post(char *filename[], unsigned int file,
452                     const struct op op[], unsigned int i)
453 {
454         struct depend *dep;
455
456         list_for_each(&op[i].post, dep, post_list) {
457 #if DEBUG_DEPS
458                 printf("%s:%u:sending to file %s:%u\n", filename[file], i+1,
459                        filename[dep->needs_file], dep->needs_opnum+1);
460 #endif
461                 if (write(pipes[dep->needs_file].fd[1], &dep, sizeof(dep))
462                     != sizeof(dep))
463                         err(1, "%s:%u failed to tell file %s",
464                             filename[file], i+1, filename[dep->needs_file]);
465         }
466 }
467
468 static int get_len(TDB_DATA key, TDB_DATA data, void *private_data)
469 {
470         return data.dsize;
471 }
472
473 static unsigned run_ops(struct tdb_context *tdb,
474                         int pre_fd,
475                         char *filename[],
476                         unsigned int file,
477                         struct op op[],
478                         unsigned int start, unsigned int stop);
479
480 struct traverse_info {
481         struct op *op;
482         char **filename;
483         unsigned file;
484         int pre_fd;
485         unsigned int start;
486         unsigned int i;
487 };
488
489 /* More complex.  Just do whatever's they did at the n'th entry. */
490 static int nontrivial_traverse(struct tdb_context *tdb,
491                                TDB_DATA key, TDB_DATA data,
492                                void *_tinfo)
493 {
494         struct traverse_info *tinfo = _tinfo;
495         unsigned int trav_len = tinfo->op[tinfo->start].group_len;
496
497         if (tinfo->i == tinfo->start + trav_len) {
498                 /* This can happen if traverse expects to be empty. */
499                 if (trav_len == 1)
500                         return 1;
501                 fail(tinfo->filename[tinfo->file], tinfo->start + 1,
502                      "traverse did not terminate");
503         }
504
505         if (tinfo->op[tinfo->i].op != OP_TDB_TRAVERSE)
506                 fail(tinfo->filename[tinfo->file], tinfo->start + 1,
507                      "%s:%u:traverse terminated early");
508
509         /* Run any normal ops. */
510         tinfo->i = run_ops(tdb, tinfo->pre_fd, tinfo->filename, tinfo->file,
511                            tinfo->op, tinfo->i+1, tinfo->start + trav_len);
512
513         if (tinfo->i == tinfo->start + trav_len)
514                 return 1;
515
516         return 0;
517 }
518
519 static unsigned op_traverse(struct tdb_context *tdb,
520                             int pre_fd,
521                             char *filename[],
522                             unsigned int file,
523                             int (*traversefn)(struct tdb_context *,
524                                               tdb_traverse_func, void *),
525                             struct op op[],
526                             unsigned int start)
527 {
528         struct traverse_info tinfo = { op, filename, file, pre_fd,
529                                        start, start+1 };
530
531         traversefn(tdb, nontrivial_traverse, &tinfo);
532
533         /* Traversing in wrong order can have strange effects: eg. if
534          * original traverse went A (delete A), B, we might do B
535          * (delete A).  So if we have ops left over, we do it now. */
536         while (tinfo.i != start + op[start].group_len) {
537                 if (op[tinfo.i].op == OP_TDB_TRAVERSE)
538                         tinfo.i++;
539                 else
540                         tinfo.i = run_ops(tdb, pre_fd, filename, file, op,
541                                           tinfo.i, start + op[start].group_len);
542         }
543
544         return tinfo.i;
545 }
546
547 static void break_out(int sig)
548 {
549 }
550
551 static __attribute__((noinline))
552 unsigned run_ops(struct tdb_context *tdb,
553                  int pre_fd,
554                  char *filename[],
555                  unsigned int file,
556                  struct op op[], unsigned int start, unsigned int stop)
557 {
558         unsigned int i;
559         struct sigaction sa;
560
561         sa.sa_handler = break_out;
562         sa.sa_flags = 0;
563
564         sigaction(SIGALRM, &sa, NULL);
565         for (i = start; i < stop; i++) {
566                 do_pre(filename, file, pre_fd, op, i);
567
568                 switch (op[i].op) {
569                 case OP_TDB_LOCKALL:
570                         try(tdb_lockall(tdb), op[i].ret);
571                         break;
572                 case OP_TDB_LOCKALL_MARK:
573                         try(tdb_lockall_mark(tdb), op[i].ret);
574                         break;
575                 case OP_TDB_LOCKALL_UNMARK:
576                         try(tdb_lockall_unmark(tdb), op[i].ret);
577                         break;
578                 case OP_TDB_LOCKALL_NONBLOCK:
579                         unreliable(tdb_lockall_nonblock(tdb), op[i].ret,
580                                    tdb_lockall(tdb), tdb_unlockall(tdb));
581                         break;
582                 case OP_TDB_UNLOCKALL:
583                         try(tdb_unlockall(tdb), op[i].ret);
584                         break;
585                 case OP_TDB_LOCKALL_READ:
586                         try(tdb_lockall_read(tdb), op[i].ret);
587                         break;
588                 case OP_TDB_LOCKALL_READ_NONBLOCK:
589                         unreliable(tdb_lockall_read_nonblock(tdb), op[i].ret,
590                                    tdb_lockall_read(tdb),
591                                    tdb_unlockall_read(tdb));
592                         break;
593                 case OP_TDB_UNLOCKALL_READ:
594                         try(tdb_unlockall_read(tdb), op[i].ret);
595                         break;
596                 case OP_TDB_CHAINLOCK:
597                         try(tdb_chainlock(tdb, op[i].key), op[i].ret);
598                         break;
599                 case OP_TDB_CHAINLOCK_NONBLOCK:
600                         unreliable(tdb_chainlock_nonblock(tdb, op[i].key),
601                                    op[i].ret,
602                                    tdb_chainlock(tdb, op[i].key),
603                                    tdb_chainunlock(tdb, op[i].key));
604                         break;
605                 case OP_TDB_CHAINLOCK_MARK:
606                         try(tdb_chainlock_mark(tdb, op[i].key), op[i].ret);
607                         break;
608                 case OP_TDB_CHAINLOCK_UNMARK:
609                         try(tdb_chainlock_unmark(tdb, op[i].key), op[i].ret);
610                         break;
611                 case OP_TDB_CHAINUNLOCK:
612                         try(tdb_chainunlock(tdb, op[i].key), op[i].ret);
613                         break;
614                 case OP_TDB_CHAINLOCK_READ:
615                         try(tdb_chainlock_read(tdb, op[i].key), op[i].ret);
616                         break;
617                 case OP_TDB_CHAINUNLOCK_READ:
618                         try(tdb_chainunlock_read(tdb, op[i].key), op[i].ret);
619                         break;
620                 case OP_TDB_PARSE_RECORD:
621                         try(tdb_parse_record(tdb, op[i].key, get_len, NULL),
622                             op[i].ret);
623                         break;
624                 case OP_TDB_EXISTS:
625                         try(tdb_exists(tdb, op[i].key), op[i].ret);
626                         break;
627                 case OP_TDB_STORE:
628                         try(tdb_store(tdb, op[i].key, op[i].data, op[i].flag),
629                             op[i].ret);
630                         break;
631                 case OP_TDB_APPEND:
632                         try(tdb_append(tdb, op[i].key, op[i].data), op[i].ret);
633                         break;
634                 case OP_TDB_GET_SEQNUM:
635                         try(tdb_get_seqnum(tdb), op[i].ret);
636                         break;
637                 case OP_TDB_WIPE_ALL:
638                         try(tdb_wipe_all(tdb), op[i].ret);
639                         break;
640                 case OP_TDB_TRANSACTION_START:
641                         try(tdb_transaction_start(tdb), op[i].ret);
642                         break;
643                 case OP_TDB_TRANSACTION_CANCEL:
644                         try(tdb_transaction_cancel(tdb), op[i].ret);
645                         break;
646                 case OP_TDB_TRANSACTION_COMMIT:
647                         try(tdb_transaction_commit(tdb), op[i].ret);
648                         break;
649                 case OP_TDB_TRAVERSE_READ_START:
650                         i = op_traverse(tdb, pre_fd, filename, file,
651                                         tdb_traverse_read, op, i);
652                         break;
653                 case OP_TDB_TRAVERSE_START:
654                         i = op_traverse(tdb, pre_fd, filename, file,
655                                         tdb_traverse, op, i);
656                         break;
657                 case OP_TDB_TRAVERSE:
658                         /* Terminate: we're in a traverse, and we've
659                          * done our ops. */
660                         return i;
661                 case OP_TDB_TRAVERSE_END:
662                         fail(filename[file], i+1, "unexpected end traverse");
663                 /* FIXME: These must be treated like traverse. */
664                 case OP_TDB_FIRSTKEY:
665                         if (!key_eq(tdb_firstkey(tdb), op[i].data))
666                                 fail(filename[file], i+1, "bad firstkey");
667                         break;
668                 case OP_TDB_NEXTKEY:
669                         if (!key_eq(tdb_nextkey(tdb, op[i].key), op[i].data))
670                                 fail(filename[file], i+1, "bad nextkey");
671                         break;
672                 case OP_TDB_FETCH: {
673                         TDB_DATA f = tdb_fetch(tdb, op[i].key);
674                         if (!key_eq(f, op[i].data))
675                                 fail(filename[file], i+1, "bad fetch %u",
676                                      f.dsize);
677                         break;
678                 }
679                 case OP_TDB_DELETE:
680                         try(tdb_delete(tdb, op[i].key), op[i].ret);
681                         break;
682                 }
683                 do_post(filename, file, op, i);
684         }
685         return i;
686 }
687
688 static struct op *load_tracefile(const char *filename, unsigned int *num,
689                                  unsigned int *hashsize,
690                                  unsigned int *tdb_flags,
691                                  unsigned int *open_flags)
692 {
693         unsigned int i;
694         struct op *op = talloc_array(NULL, struct op, 1);
695         char **words;
696         char **lines;
697         char *file;
698
699         file = grab_file(NULL, filename, NULL);
700         if (!file)
701                 err(1, "Reading %s", filename);
702
703         lines = strsplit(file, file, "\n", NULL);
704         if (!lines[0])
705                 errx(1, "%s is empty", filename);
706
707         words = strsplit(lines, lines[0], " ", NULL);
708         if (!streq(words[1], "tdb_open"))
709                 fail(filename, 1, "does not start with tdb_open");
710
711         *hashsize = atoi(words[2]);
712         *tdb_flags = strtoul(words[3], NULL, 0);
713         *open_flags = strtoul(words[4], NULL, 0);
714
715         for (i = 1; lines[i]; i++) {
716                 const struct op_table *opt;
717
718                 words = strsplit(lines, lines[i], " ", NULL);
719                 if (!words[0] || !words[1])
720                         fail(filename, i+1, "Expected serial number and op");
721                
722                 opt = find_keyword(words[1], strlen(words[1]));
723                 if (!opt) {
724                         if (streq(words[1], "tdb_close")) {
725                                 if (lines[i+1])
726                                         fail(filename, i+2,
727                                              "lines after tdb_close");
728                                 *num = i;
729                                 talloc_free(lines);
730                                 return op;
731                         }
732                         fail(filename, i+1, "Unknown operation '%s'", words[1]);
733                 }
734
735                 add_op(filename, &op, i, atoi(words[0]), opt->type);
736                 opt->enhance_op(filename, op, i, words);
737         }
738
739         fprintf(stderr, "%s:%u:last operation is not tdb_close: incomplete?",
740               filename, i);
741         talloc_free(lines);
742         *num = i - 1;
743         return op;
744 }
745
746 /* We remember all the keys we've ever seen, and who has them. */
747 struct key_user {
748         unsigned int file;
749         unsigned int op_num;
750 };
751
752 struct keyinfo {
753         TDB_DATA key;
754         unsigned int num_users;
755         struct key_user *user;
756 };
757
758 static const TDB_DATA must_not_exist;
759 static const TDB_DATA must_exist;
760 static const TDB_DATA not_exists_or_empty;
761
762 /* NULL means doesn't care if it exists or not, &must_exist means
763  * it must exist but we don't care what, &must_not_exist means it must
764  * not exist, otherwise the data it needs. */
765 static const TDB_DATA *needs(const struct op *op)
766 {
767         switch (op->op) {
768         /* FIXME: Pull forward deps, since we can deadlock */
769         case OP_TDB_CHAINLOCK:
770         case OP_TDB_CHAINLOCK_NONBLOCK:
771         case OP_TDB_CHAINLOCK_MARK:
772         case OP_TDB_CHAINLOCK_UNMARK:
773         case OP_TDB_CHAINUNLOCK:
774         case OP_TDB_CHAINLOCK_READ:
775         case OP_TDB_CHAINUNLOCK_READ:
776                 return NULL;
777
778         case OP_TDB_APPEND:
779                 if (op->append.pre.dsize == 0)
780                         return &not_exists_or_empty;
781                 return &op->append.pre;
782
783         case OP_TDB_STORE:
784                 if (op->flag == TDB_INSERT) {
785                         if (op->ret < 0)
786                                 return &must_exist;
787                         else
788                                 return &must_not_exist;
789                 } else if (op->flag == TDB_MODIFY) {
790                         if (op->ret < 0)
791                                 return &must_not_exist;
792                         else
793                                 return &must_exist;
794                 }
795                 /* No flags?  Don't care */
796                 return NULL;
797
798         case OP_TDB_EXISTS:
799                 if (op->ret == 1)
800                         return &must_exist;
801                 else
802                         return &must_not_exist;
803
804         case OP_TDB_PARSE_RECORD:
805                 if (op->ret < 0)
806                         return &must_not_exist;
807                 return &must_exist;
808
809         /* FIXME: handle these. */
810         case OP_TDB_WIPE_ALL:
811         case OP_TDB_FIRSTKEY:
812         case OP_TDB_NEXTKEY:
813         case OP_TDB_GET_SEQNUM:
814         case OP_TDB_TRAVERSE:
815         case OP_TDB_TRANSACTION_COMMIT:
816         case OP_TDB_TRANSACTION_CANCEL:
817         case OP_TDB_TRANSACTION_START:
818                 return NULL;
819
820         case OP_TDB_FETCH:
821                 if (!op->data.dptr)
822                         return &must_not_exist;
823                 return &op->data;
824
825         case OP_TDB_DELETE:
826                 if (op->ret < 0)
827                         return &must_not_exist;
828                 return &must_exist;
829
830         default:
831                 errx(1, "Unexpected op %i", op->op);
832         }
833         
834 }
835
836 /* What's the data after this op?  pre if nothing changed. */
837 static const TDB_DATA *gives(const struct op *op, const TDB_DATA *pre)
838 {
839         /* Failed ops don't change state of db. */
840         if (op->ret < 0)
841                 return pre;
842
843         if (op->op == OP_TDB_DELETE || op->op == OP_TDB_WIPE_ALL)
844                 return &tdb_null;
845
846         if (op->op == OP_TDB_APPEND)
847                 return &op->append.post;
848
849         if (op->op == OP_TDB_STORE)
850                 return &op->data;
851
852         return pre;
853 }
854
855 static struct keyinfo *hash_ops(struct op *op[], unsigned int num_ops[],
856                                 unsigned int num)
857 {
858         unsigned int i, j, h;
859         struct keyinfo *hash;
860
861         hash = talloc_zero_array(op[0], struct keyinfo, total_keys*2);
862         for (i = 0; i < num; i++) {
863                 for (j = 1; j < num_ops[i]; j++) {
864                         /* We can't do this on allocation, due to realloc. */
865                         list_head_init(&op[i][j].post);
866                         list_head_init(&op[i][j].pre);
867
868                         if (!op[i][j].key.dptr)
869                                 continue;
870
871                         /* We don't wait for traverse keys */
872                         /* FIXME: We should, for trivial traversals. */
873                         if (op[i][j].op == OP_TDB_TRAVERSE)
874                                 continue;
875
876                         h = hash_key(&op[i][j].key) % (total_keys * 2);
877                         while (!key_eq(hash[h].key, op[i][j].key)) {
878                                 if (!hash[h].key.dptr) {
879                                         hash[h].key = op[i][j].key;
880                                         break;
881                                 }
882                                 h = (h + 1) % (total_keys * 2);
883                         }
884                         /* Might as well save some memory if we can. */
885                         if (op[i][j].key.dptr != hash[h].key.dptr) {
886                                 talloc_free(op[i][j].key.dptr);
887                                 op[i][j].key.dptr = hash[h].key.dptr;
888                         }
889                         hash[h].user = talloc_realloc(hash, hash[h].user,
890                                                      struct key_user,
891                                                      hash[h].num_users+1);
892                         hash[h].user[hash[h].num_users].op_num = j;
893                         hash[h].user[hash[h].num_users].file = i;
894                         hash[h].num_users++;
895                 }
896         }
897
898         return hash;
899 }
900
901 static bool satisfies(const TDB_DATA *data, const TDB_DATA *need)
902 {
903         /* Don't need anything?  Cool. */
904         if (!need)
905                 return true;
906
907         /* This should be tdb_null or a real value. */
908         assert(data != &must_exist);
909         assert(data != &must_not_exist);
910         assert(data != &not_exists_or_empty);
911
912         /* must_not_exist == must_not_exist, must_exist == must_exist, or
913            not_exists_or_empty == not_exists_or_empty. */
914         if (data->dsize == need->dsize && data->dptr == need->dptr)
915                 return true;
916
917         /* Must not exist?  data must not exist. */
918         if (need == &must_not_exist)
919                 return data->dptr == NULL;
920
921         /* Must exist? */
922         if (need == &must_exist)
923                 return data->dptr != NULL;
924
925         /* Either noexist or empty. */
926         if (need == &not_exists_or_empty)
927                 return data->dsize == 0;
928
929         /* Needs something specific. */
930         return key_eq(*data, *need);
931 }
932
933 static void move_to_front(struct key_user res[], unsigned int elem)
934 {
935         if (elem != 0) {
936                 struct key_user tmp = res[elem];
937                 memmove(res + 1, res, elem*sizeof(res[0]));
938                 res[0] = tmp;
939         }
940 }
941
942 static void restore_to_pos(struct key_user res[], unsigned int elem)
943 {
944         if (elem != 0) {
945                 struct key_user tmp = res[0];
946                 memmove(res, res + 1, elem*sizeof(res[0]));
947                 res[elem] = tmp;
948         }
949 }
950
951 static bool sort_deps(char *filename[], struct op *op[],
952                       struct key_user res[], unsigned num,
953                       const TDB_DATA *data, unsigned num_files)
954 {
955         unsigned int i, files_done;
956         struct op *this_op;
957         bool done[num_files];
958
959         /* Nothing left?  We're sorted. */
960         if (num == 0)
961                 return true;
962
963         memset(done, 0, sizeof(done));
964
965         /* Since ops within a trace file are ordered, we just need to figure
966          * out which file to try next.  Since we don't take into account
967          * inter-key relationships (which exist by virtue of trace file order),
968          * we minimize the chance of harm by trying to keep in serial order. */
969         for (files_done = 0, i = 0; i < num && files_done < num_files; i++) {
970                 if (done[res[i].file])
971                         continue;
972
973                 this_op = &op[res[i].file][res[i].op_num];
974                 /* Is what we have good enough for this op? */
975                 if (satisfies(data, needs(this_op))) {
976                         move_to_front(res, i);
977                         if (sort_deps(filename, op, res+1, num-1,
978                                       gives(this_op, data), num_files))
979                                 return true;
980                         restore_to_pos(res, i);
981                 }
982                 done[res[i].file] = true;
983                 files_done++;
984         }
985
986         /* No combination worked. */
987         return false;
988 }
989
990 static void check_dep_sorting(struct key_user user[], unsigned num_users,
991                               unsigned num_files)
992 {
993 #if DEBUG_DEPS
994         unsigned int i;
995         unsigned minima[num_files];
996
997         memset(minima, 0, sizeof(minima));
998         for (i = 0; i < num_users; i++) {
999                 assert(minima[user[i].file] < user[i].op_num);
1000                 minima[user[i].file] = user[i].op_num;
1001         }
1002 #endif
1003 }
1004
1005 /* All these ops have the same serial number.  Which comes first?
1006  *
1007  * This can happen both because read ops or failed write ops don't
1008  * change serial number, and also due to race since we access the
1009  * number unlocked (the race can cause less detectable ordering problems,
1010  * in which case we'll deadlock and report: fix manually in that case).
1011  */
1012 static void figure_deps(char *filename[], struct op *op[],
1013                         struct key_user user[], unsigned num_users,
1014                         unsigned num_files)
1015 {
1016         /* We assume database starts empty. */
1017         const struct TDB_DATA *data = &tdb_null;
1018
1019         if (!sort_deps(filename, op, user, num_users, data, num_files))
1020                 fail(filename[user[0].file], user[0].op_num+1,
1021                      "Could not resolve inter-dependencies");
1022
1023         check_dep_sorting(user, num_users, num_files);
1024 }
1025
1026 static void sort_ops(struct keyinfo hash[], char *filename[], struct op *op[],
1027                      unsigned int num)
1028 {
1029         unsigned int h;
1030
1031         /* Gcc nexted function extension.  How cool is this? */
1032         int compare_serial(const void *_a, const void *_b)
1033         {
1034                 const struct key_user *a = _a, *b = _b;
1035
1036                 /* First, maintain order within any trace file. */
1037                 if (a->file == b->file)
1038                         return a->op_num - b->op_num;
1039
1040                 /* Otherwise, arrange by serial order. */
1041                 return op[a->file][a->op_num].serial
1042                         - op[b->file][b->op_num].serial;
1043         }
1044
1045         /* Now sort into serial order. */
1046         for (h = 0; h < total_keys * 2; h++) {
1047                 struct key_user *user = hash[h].user;
1048
1049                 qsort(user, hash[h].num_users, sizeof(user[0]), compare_serial);
1050                 figure_deps(filename, op, user, hash[h].num_users, num);
1051         }
1052 }
1053
1054 static int destroy_depend(struct depend *dep)
1055 {
1056         list_del(&dep->pre_list);
1057         list_del(&dep->post_list);
1058         return 0;
1059 }
1060
1061 static void add_dependency(void *ctx,
1062                            struct op *op[],
1063                            char *filename[],
1064                            unsigned int needs_file,
1065                            unsigned int needs_opnum,
1066                            unsigned int satisfies_file,
1067                            unsigned int satisfies_opnum)
1068 {
1069         struct depend *dep;
1070         unsigned int needs_start, sat_start;
1071
1072         /* We don't depend on ourselves. */
1073         if (needs_file == satisfies_file) {
1074                 assert(satisfies_opnum < needs_opnum);
1075                 return;
1076         }
1077
1078 #if DEBUG_DEPS
1079         printf("%s:%u: depends on %s:%u\n",
1080                filename[needs_file], needs_opnum+1,
1081                filename[satisfies_file], satisfies_opnum+1);
1082 #endif
1083
1084         needs_start = op[needs_file][needs_opnum].group_start;
1085         sat_start = op[satisfies_file][satisfies_opnum].group_start;
1086
1087         /* If needs is in a transaction, we need it before start. */
1088         if (needs_start) {
1089                 switch (op[needs_file][needs_start].op) {
1090                 case OP_TDB_TRANSACTION_START:
1091                         needs_opnum = needs_start;
1092 #ifdef DEBUG_DEPS
1093                         printf("  -> Back to %u\n", needs_start+1);
1094                         fflush(stdout);
1095 #endif
1096                         break;
1097                 default:
1098                         break;
1099                 }
1100         }
1101
1102         /* If satisfies is in a transaction, we wait until after commit. */
1103         /* FIXME: If transaction is cancelled, don't need dependency. */
1104         if (sat_start) {
1105                 if (op[satisfies_file][sat_start].op
1106                     == OP_TDB_TRANSACTION_START) {
1107                         satisfies_opnum = sat_start
1108                                 + op[satisfies_file][sat_start].group_len;
1109 #ifdef DEBUG_DEPS
1110                         printf("  -> Depends on %u\n", satisfies_opnum+1);
1111                         fflush(stdout);
1112 #endif
1113                 }
1114         }
1115
1116         assert(op[needs_file][needs_opnum].op != OP_TDB_TRAVERSE);
1117         assert(op[satisfies_file][satisfies_opnum].op != OP_TDB_TRAVERSE);
1118
1119         dep = talloc(ctx, struct depend);
1120         dep->needs_file = needs_file;
1121         dep->needs_opnum = needs_opnum;
1122         dep->satisfies_file = satisfies_file;
1123         dep->satisfies_opnum = satisfies_opnum;
1124         list_add(&op[satisfies_file][satisfies_opnum].post, &dep->post_list);
1125         list_add(&op[needs_file][needs_opnum].pre, &dep->pre_list);
1126         talloc_set_destructor(dep, destroy_depend);
1127 }
1128
1129 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
1130 struct traverse_dep {
1131         unsigned int file;
1132         unsigned int op_num;
1133 };
1134
1135 /* Traversals can deadlock against each other.  Force order. */
1136 static void make_traverse_depends(char *filename[],
1137                                   struct op *op[], unsigned int num_ops[],
1138                                   unsigned int num)
1139 {
1140         unsigned int i, j, num_traversals = 0;
1141         struct traverse_dep *dep;
1142
1143         /* Sort by which one runs first. */
1144         int compare_traverse_dep(const void *_a, const void *_b)
1145         {
1146                 const struct traverse_dep *ta = _a, *tb = _b;
1147                 const struct op *a = &op[ta->file][ta->op_num],
1148                         *b = &op[tb->file][tb->op_num];
1149
1150                 if (a->serial != b->serial)
1151                         return a->serial - b->serial;
1152
1153                 /* If they have same serial, it means one didn't make any
1154                  * changes.  Thus sort by end in that case. */
1155                 return a[a->group_len].serial - b[b->group_len].serial;
1156         }
1157
1158         dep = talloc_array(NULL, struct traverse_dep, 1);
1159
1160         /* Count them. */
1161         for (i = 0; i < num; i++) {
1162                 for (j = 1; j < num_ops[i]; j++) {
1163                         /* Transaction on traverse start. */
1164                         if (op[i][j].group_start == j) {
1165                                 dep = talloc_realloc(NULL, dep,
1166                                                      struct traverse_dep,
1167                                                      num_traversals+1);
1168                                 dep[num_traversals].file = i;
1169                                 dep[num_traversals].op_num = j;
1170                                 num_traversals++;
1171                         }
1172                 }
1173         }
1174         qsort(dep, num_traversals, sizeof(dep[0]), compare_traverse_dep);
1175         for (i = 1; i < num_traversals; i++) {
1176                 /* i depends on end of traverse i-1. */
1177                 add_dependency(NULL, op, filename, dep[i].file, dep[i].op_num,
1178                                dep[i-1].file, dep[i-1].op_num
1179                                + op[dep[i-1].file][dep[i-1].op_num].group_len);
1180         }
1181         talloc_free(dep);
1182 }
1183 #endif /* TRAVERSALS_TAKE_TRANSACTION_LOCK */
1184
1185 static bool changes_db(const struct op *op)
1186 {
1187         return gives(op, NULL) != NULL;
1188 }
1189
1190 static void depend_on_previous(struct op *op[],
1191                                char *filename[],
1192                                unsigned int num,
1193                                struct key_user user[],
1194                                unsigned int i,
1195                                int prev)
1196 {
1197         bool deps[num];
1198         int j;
1199
1200         if (i == 0)
1201                 return;
1202
1203         if (prev == i - 1) {
1204                 /* Just depend on previous. */
1205                 add_dependency(NULL, op, filename,
1206                                user[i].file, user[i].op_num,
1207                                user[prev].file, user[prev].op_num);
1208                 return;
1209         }
1210
1211         /* We have to wait for the readers.  Find last one in *each* file. */
1212         memset(deps, 0, sizeof(deps));
1213         deps[user[i].file] = true;
1214         for (j = i - 1; j > prev; j--) {
1215                 if (!deps[user[j].file]) {
1216                         add_dependency(NULL, op, filename,
1217                                        user[i].file, user[i].op_num,
1218                                        user[j].file, user[j].op_num);
1219                         deps[user[j].file] = true;
1220                 }
1221         }
1222 }
1223
1224 /* This is simple, but not complete.  We don't take into account
1225  * indirect dependencies. */
1226 static void optimize_dependencies(struct op *op[], unsigned int num_ops[],
1227                                   unsigned int num)
1228 {
1229         unsigned int i, j;
1230
1231         /* There can only be one real dependency on each file */
1232         for (i = 0; i < num; i++) {
1233                 for (j = 1; j < num_ops[i]; j++) {
1234                         struct depend *dep, *next;
1235                         struct depend *prev[num];
1236
1237                         memset(prev, 0, sizeof(prev));
1238
1239                         list_for_each_safe(&op[i][j].pre, dep, next, pre_list) {
1240                                 if (!prev[dep->satisfies_file]) {
1241                                         prev[dep->satisfies_file] = dep;
1242                                         continue;
1243                                 }
1244                                 if (prev[dep->satisfies_file]->satisfies_opnum
1245                                     < dep->satisfies_opnum) {
1246                                         talloc_free(prev[dep->satisfies_file]);
1247                                         prev[dep->satisfies_file] = dep;
1248                                 } else
1249                                         talloc_free(dep);
1250                         }
1251                 }
1252         }
1253
1254         for (i = 0; i < num; i++) {
1255                 int deps[num];
1256
1257                 for (j = 0; j < num; j++)
1258                         deps[j] = -1;
1259
1260                 for (j = 1; j < num_ops[i]; j++) {
1261                         struct depend *dep, *next;
1262
1263                         list_for_each_safe(&op[i][j].pre, dep, next, pre_list) {
1264                                 if (deps[dep->satisfies_file]
1265                                     >= (int)dep->satisfies_opnum)
1266                                         talloc_free(dep);
1267                                 else
1268                                         deps[dep->satisfies_file]
1269                                                 = dep->satisfies_opnum;
1270                         }
1271                 }
1272         }
1273 }
1274
1275 static void derive_dependencies(char *filename[],
1276                                 struct op *op[], unsigned int num_ops[],
1277                                 unsigned int num)
1278 {
1279         struct keyinfo *hash;
1280         unsigned int h, i;
1281
1282         /* Create hash table for faster key lookup. */
1283         hash = hash_ops(op, num_ops, num);
1284
1285         /* Sort them by serial number. */
1286         sort_ops(hash, filename, op, num);
1287
1288         /* Create dependencies back to the last change, rather than
1289          * creating false dependencies by naively making each one
1290          * depend on the previous.  This has two purposes: it makes
1291          * later optimization simpler, and it also avoids deadlock with
1292          * same sequence number ops inside traversals (if one
1293          * traversal doesn't write anything, two ops can have the same
1294          * sequence number yet we can create a traversal dependency
1295          * the other way). */
1296         for (h = 0; h < total_keys * 2; h++) {
1297                 int prev = -1;
1298
1299                 if (hash[h].num_users < 2)
1300                         continue;
1301
1302                 for (i = 0; i < hash[h].num_users; i++) {
1303                         if (changes_db(&op[hash[h].user[i].file]
1304                                        [hash[h].user[i].op_num])) {
1305                                 depend_on_previous(op, filename, num,
1306                                                    hash[h].user, i, prev);
1307                                 prev = i;
1308                         } else if (prev >= 0)
1309                                 add_dependency(hash, op, filename,
1310                                                hash[h].user[i].file,
1311                                                hash[h].user[i].op_num,
1312                                                hash[h].user[prev].file,
1313                                                hash[h].user[prev].op_num);
1314                 }
1315         }
1316
1317 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
1318         make_traverse_depends(filename, op, num_ops, num);
1319 #endif
1320
1321         optimize_dependencies(op, num_ops, num);
1322 }
1323
1324 int main(int argc, char *argv[])
1325 {
1326         struct timeval start, end;
1327         unsigned int i, num_ops[argc], hashsize[argc], tdb_flags[argc], open_flags[argc];
1328         struct op *op[argc];
1329         int fds[2];
1330         char c;
1331         bool ok = true;
1332
1333         if (argc < 3)
1334                 errx(1, "Usage: %s <tdbfile> <tracefile>...", argv[0]);
1335
1336         pipes = talloc_array(NULL, struct pipe, argc - 2);
1337         for (i = 0; i < argc - 2; i++) {
1338                 printf("Loading tracefile %s...", argv[2+i]);
1339                 fflush(stdout);
1340                 op[i] = load_tracefile(argv[2+i], &num_ops[i], &hashsize[i],
1341                                        &tdb_flags[i], &open_flags[i]);
1342                 if (pipe(pipes[i].fd) != 0)
1343                         err(1, "creating pipe");
1344                 printf("done\n");
1345         }
1346
1347         printf("Calculating inter-dependencies...");
1348         fflush(stdout);
1349         derive_dependencies(argv+2, op, num_ops, i);
1350         printf("done\n");
1351
1352         /* Don't fork for single arg case: simple debugging. */
1353         if (argc == 3) {
1354                 struct tdb_context *tdb;
1355                 tdb = tdb_open_ex(argv[1], hashsize[0], tdb_flags[0],
1356                                   open_flags[0], 0600,
1357                                   NULL, hash_key);
1358                 printf("Single threaded run...");
1359                 fflush(stdout);
1360
1361                 run_ops(tdb, pipes[0].fd[0], argv+2, 0, op[0], 1, num_ops[0]);
1362                 check_deps(argv[2], op[0], num_ops[0]);
1363
1364                 printf("done\n");
1365                 exit(0);
1366         }
1367
1368         if (pipe(fds) != 0)
1369                 err(1, "creating pipe");
1370
1371         for (i = 0; i < argc - 2; i++) {
1372                 struct tdb_context *tdb;
1373
1374                 switch (fork()) {
1375                 case -1:
1376                         err(1, "fork failed");
1377                 case 0:
1378                         close(fds[1]);
1379                         tdb = tdb_open_ex(argv[1], hashsize[i], tdb_flags[i],
1380                                           open_flags[i], 0600,
1381                                           NULL, hash_key);
1382                         if (!tdb)
1383                                 err(1, "Opening tdb %s", argv[1]);
1384
1385                         /* This catches parent exiting. */
1386                         if (read(fds[0], &c, 1) != 1)
1387                                 exit(1);
1388                         run_ops(tdb, pipes[i].fd[0], argv+2, i, op[i], 1,
1389                                 num_ops[i]);
1390                         check_deps(argv[2+i], op[i], num_ops[i]);
1391                         exit(0);
1392                 default:
1393                         break;
1394                 }
1395         }
1396
1397         /* Let everything settle. */
1398         sleep(1);
1399
1400         printf("Starting run...");
1401         fflush(stdout);
1402         gettimeofday(&start, NULL);
1403         /* Tell them all to go!  Any write of sufficient length will do. */
1404         if (write(fds[1], hashsize, i) != i)
1405                 err(1, "Writing to wakeup pipe");
1406
1407         for (i = 0; i < argc - 2; i++) {
1408                 int status;
1409                 wait(&status);
1410                 if (!WIFEXITED(status)) {
1411                         warnx("Child died with signal %i", WTERMSIG(status));
1412                         ok = false;
1413                 } else if (WEXITSTATUS(status) != 0)
1414                         /* Assume child spat out error. */
1415                         ok = false;
1416         }
1417         if (!ok)
1418                 exit(1);
1419
1420         gettimeofday(&end, NULL);
1421         printf("done\n");
1422
1423         end.tv_sec -= start.tv_sec;
1424         printf("Time replaying: %lu usec\n",
1425                end.tv_sec * 1000000UL + (end.tv_usec - start.tv_usec));
1426         
1427         exit(0);
1428 }