]> git.ozlabs.org Git - ccan/blob - ccan/tdb/tools/replay_trace.c
Automate rerunning when we have to back off: gives accurate(ish) timings.
[ccan] / ccan / tdb / tools / replay_trace.c
1 #include <ccan/tdb/tdb.h>
2 #include <ccan/grab_file/grab_file.h>
3 #include <ccan/hash/hash.h>
4 #include <ccan/talloc/talloc.h>
5 #include <ccan/str_talloc/str_talloc.h>
6 #include <ccan/str/str.h>
7 #include <ccan/list/list.h>
8 #include <err.h>
9 #include <ctype.h>
10 #include <string.h>
11 #include <unistd.h>
12 #include <sys/types.h>
13 #include <sys/wait.h>
14 #include <sys/time.h>
15 #include <errno.h>
16 #include <signal.h>
17 #include <assert.h>
18 #include <fcntl.h>
19
20 #define STRINGIFY2(x) #x
21 #define STRINGIFY(x) STRINGIFY2(x)
22
23 /* Avoid mod by zero */
24 static unsigned int total_keys = 1;
25
26 /* #define DEBUG_DEPS 1 */
27
28 /* Traversals block transactions in the current implementation. */
29 #define TRAVERSALS_TAKE_TRANSACTION_LOCK 1
30
31 struct pipe {
32         int fd[2];
33 };
34 static struct pipe *pipes;
35 static int backoff_fd = -1;
36
37 static void __attribute__((noreturn)) fail(const char *filename,
38                                            unsigned int line,
39                                            const char *fmt, ...)
40 {
41         va_list ap;
42
43         va_start(ap, fmt);
44         fprintf(stderr, "%s:%u: FAIL: ", filename, line);
45         vfprintf(stderr, fmt, ap);
46         fprintf(stderr, "\n");
47         va_end(ap);
48         exit(1);
49 }
50         
51 /* Try or die. */
52 #define try(expr, expect)                                               \
53         do {                                                            \
54                 int ret = (expr);                                       \
55                 if (ret != (expect))                                    \
56                         fail(filename[file], i+1,                       \
57                              STRINGIFY(expr) "= %i", ret);              \
58         } while (0)
59
60 /* Try or imitate results. */
61 #define unreliable(expr, expect, force, undo)                           \
62         do {                                                            \
63                 int ret = expr;                                         \
64                 if (ret != expect) {                                    \
65                         fprintf(stderr, "%s:%u: %s gave %i not %i",     \
66                                 filename[file], i+1, STRINGIFY(expr),   \
67                                 ret, expect);                           \
68                         if (expect == 0)                                \
69                                 force;                                  \
70                         else                                            \
71                                 undo;                                   \
72                 }                                                       \
73         } while (0)
74
75 static bool key_eq(TDB_DATA a, TDB_DATA b)
76 {
77         if (a.dsize != b.dsize)
78                 return false;
79         return memcmp(a.dptr, b.dptr, a.dsize) == 0;
80 }
81
82 /* This is based on the hash algorithm from gdbm */
83 static unsigned int hash_key(TDB_DATA *key)
84 {
85         uint32_t value; /* Used to compute the hash value.  */
86         uint32_t   i;   /* Used to cycle through random values. */
87
88         /* Set the initial value from the key size. */
89         for (value = 0x238F13AF ^ key->dsize, i=0; i < key->dsize; i++)
90                 value = (value + (key->dptr[i] << (i*5 % 24)));
91
92         return (1103515243 * value + 12345);  
93 }
94
95 enum op_type {
96         OP_TDB_LOCKALL,
97         OP_TDB_LOCKALL_MARK,
98         OP_TDB_LOCKALL_UNMARK,
99         OP_TDB_LOCKALL_NONBLOCK,
100         OP_TDB_UNLOCKALL,
101         OP_TDB_LOCKALL_READ,
102         OP_TDB_LOCKALL_READ_NONBLOCK,
103         OP_TDB_UNLOCKALL_READ,
104         OP_TDB_CHAINLOCK,
105         OP_TDB_CHAINLOCK_NONBLOCK,
106         OP_TDB_CHAINLOCK_MARK,
107         OP_TDB_CHAINLOCK_UNMARK,
108         OP_TDB_CHAINUNLOCK,
109         OP_TDB_CHAINLOCK_READ,
110         OP_TDB_CHAINUNLOCK_READ,
111         OP_TDB_PARSE_RECORD,
112         OP_TDB_EXISTS,
113         OP_TDB_STORE,
114         OP_TDB_APPEND,
115         OP_TDB_GET_SEQNUM,
116         OP_TDB_WIPE_ALL,
117         OP_TDB_TRANSACTION_START,
118         OP_TDB_TRANSACTION_CANCEL,
119         OP_TDB_TRANSACTION_COMMIT,
120         OP_TDB_TRAVERSE_READ_START,
121         OP_TDB_TRAVERSE_START,
122         OP_TDB_TRAVERSE_END,
123         OP_TDB_TRAVERSE,
124         OP_TDB_TRAVERSE_END_EARLY,
125         OP_TDB_FIRSTKEY,
126         OP_TDB_NEXTKEY,
127         OP_TDB_FETCH,
128         OP_TDB_DELETE,
129 };
130
131 struct op {
132         unsigned int seqnum;
133         enum op_type type;
134         TDB_DATA key;
135         TDB_DATA data;
136         int ret;
137
138         /* Who is waiting for us? */
139         struct list_head post;
140         /* What are we waiting for? */
141         struct list_head pre;
142
143         /* If I'm part of a group (traverse/transaction) where is
144          * start?  (Otherwise, 0) */
145         unsigned int group_start;
146
147         union {
148                 int flag; /* open and store */
149                 struct {  /* append */
150                         TDB_DATA pre;
151                         TDB_DATA post;
152                 } append;
153                 /* transaction/traverse start/chainlock */
154                 unsigned int group_len;
155         };
156 };
157
158 struct op_desc {
159         unsigned int file;
160         unsigned int op_num;
161 };
162
163 static unsigned char hex_char(const char *filename, unsigned int line, char c)
164 {
165         c = toupper(c);
166         if (c >= 'A' && c <= 'F')
167                 return c - 'A' + 10;
168         if (c >= '0' && c <= '9')
169                 return c - '0';
170         fail(filename, line, "invalid hex character '%c'", c);
171 }
172
173 /* TDB data is <size>:<%02x>* */
174 static TDB_DATA make_tdb_data(const void *ctx,
175                               const char *filename, unsigned int line,
176                               const char *word)
177 {
178         TDB_DATA data;
179         unsigned int i;
180         const char *p;
181
182         if (streq(word, "NULL"))
183                 return tdb_null;
184
185         data.dsize = atoi(word);
186         data.dptr = talloc_array(ctx, unsigned char, data.dsize);
187         p = strchr(word, ':');
188         if (!p)
189                 fail(filename, line, "invalid tdb data '%s'", word);
190         p++;
191         for (i = 0; i < data.dsize; i++)
192                 data.dptr[i] = hex_char(filename, line, p[i*2])*16
193                         + hex_char(filename, line, p[i*2+1]);
194
195         return data;
196 }
197
198 static void add_op(const char *filename, struct op **op, unsigned int i,
199                    unsigned int seqnum, enum op_type type)
200 {
201         struct op *new;
202         *op = talloc_realloc(NULL, *op, struct op, i+1);
203         new = (*op) + i;
204         new->type = type;
205         new->seqnum = seqnum;
206         new->ret = 0;
207         new->group_start = 0;
208 }
209
210 static void op_add_nothing(const char *filename,
211                            struct op op[], unsigned int op_num, char *words[])
212 {
213         if (words[2])
214                 fail(filename, op_num+1, "Expected no arguments");
215         op[op_num].key = tdb_null;
216 }
217
218 static void op_add_key(const char *filename,
219                        struct op op[], unsigned int op_num, char *words[])
220 {
221         if (words[2] == NULL || words[3])
222                 fail(filename, op_num+1, "Expected just a key");
223
224         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
225         total_keys++;
226 }
227
228 static void op_add_key_ret(const char *filename,
229                            struct op op[], unsigned int op_num, char *words[])
230 {
231         if (!words[2] || !words[3] || !words[4] || words[5]
232             || !streq(words[3], "="))
233                 fail(filename, op_num+1, "Expected <key> = <ret>");
234         op[op_num].ret = atoi(words[4]);
235         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
236         /* May only be a unique key if it fails */
237         if (op[op_num].ret != 0)
238                 total_keys++;
239 }
240
241 static void op_add_key_data(const char *filename,
242                             struct op op[], unsigned int op_num, char *words[])
243 {
244         if (!words[2] || !words[3] || !words[4] || words[5]
245             || !streq(words[3], "="))
246                 fail(filename, op_num+1, "Expected <key> = <data>");
247         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
248         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[4]);
249         /* May only be a unique key if it fails */
250         if (!op[op_num].data.dptr)
251                 total_keys++;
252 }
253
254 /* We don't record the keys or data for a traverse, as we don't use them. */
255 static void op_add_traverse(const char *filename,
256                             struct op op[], unsigned int op_num, char *words[])
257 {
258         if (!words[2] || !words[3] || !words[4] || words[5]
259             || !streq(words[3], "="))
260                 fail(filename, op_num+1, "Expected <key> = <data>");
261         op[op_num].key = tdb_null;
262 }
263
264 /* Full traverse info is useful for debugging, but changing it to
265  * "traversefn" without the data makes the traces *much* smaller! */
266 static void op_add_traversefn(const char *filename,
267                             struct op op[], unsigned int op_num, char *words[])
268 {
269         if (words[2])
270                 fail(filename, op_num+1, "Expected no values");
271         op[op_num].key = tdb_null;
272 }
273
274 /* <seqnum> tdb_store <rec> <rec> <flag> = <ret> */
275 static void op_add_store(const char *filename,
276                          struct op op[], unsigned int op_num, char *words[])
277 {
278         if (!words[2] || !words[3] || !words[4] || !words[5] || !words[6]
279             || words[7] || !streq(words[5], "="))
280                 fail(filename, op_num+1, "Expect <key> <data> <flag> = <ret>");
281
282         op[op_num].flag = strtoul(words[4], NULL, 0);
283         op[op_num].ret = atoi(words[6]);
284         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
285         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[3]);
286         total_keys++;
287 }
288
289 /* <seqnum> tdb_append <rec> <rec> = <rec> */
290 static void op_add_append(const char *filename,
291                           struct op op[], unsigned int op_num, char *words[])
292 {
293         if (!words[2] || !words[3] || !words[4] || !words[5] || words[6]
294             || !streq(words[4], "="))
295                 fail(filename, op_num+1, "Expect <key> <data> = <rec>");
296
297         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
298         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[3]);
299
300         op[op_num].append.post
301                 = make_tdb_data(op, filename, op_num+1, words[5]);
302
303         /* By subtraction, figure out what previous data was. */
304         op[op_num].append.pre.dptr = op[op_num].append.post.dptr;
305         op[op_num].append.pre.dsize
306                 = op[op_num].append.post.dsize - op[op_num].data.dsize;
307         total_keys++;
308 }
309
310 /* <seqnum> tdb_get_seqnum = <ret> */
311 static void op_add_seqnum(const char *filename,
312                           struct op op[], unsigned int op_num, char *words[])
313 {
314         if (!words[2] || !words[3] || words[4] || !streq(words[2], "="))
315                 fail(filename, op_num+1, "Expect = <ret>");
316
317         op[op_num].key = tdb_null;
318         op[op_num].ret = atoi(words[3]);
319 }
320
321 static void op_add_traverse_start(const char *filename,
322                                   struct op op[],
323                                   unsigned int op_num, char *words[])
324 {
325         if (words[2])
326                 fail(filename, op_num+1, "Expect no arguments");
327
328         op[op_num].key = tdb_null;
329         op[op_num].group_len = 0;
330 }
331
332 static void op_add_transaction(const char *filename, struct op op[],
333                                unsigned int op_num, char *words[])
334 {
335         if (words[2])
336                 fail(filename, op_num+1, "Expect no arguments");
337
338         op[op_num].key = tdb_null;
339         op[op_num].group_len = 0;
340 }
341
342 static void op_add_chainlock(const char *filename,
343                              struct op op[], unsigned int op_num, char *words[])
344 {
345         if (words[2] == NULL || words[3])
346                 fail(filename, op_num+1, "Expected just a key");
347
348         /* A chainlock key isn't a key in the normal sense; it doesn't
349          * have to be in the db at all.  Also, we don't want to hash this op. */
350         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[2]);
351         op[op_num].key = tdb_null;
352         op[op_num].group_len = 0;
353 }
354
355 static void op_add_chainlock_ret(const char *filename,
356                                  struct op op[], unsigned int op_num,
357                                  char *words[])
358 {
359         if (!words[2] || !words[3] || !words[4] || words[5]
360             || !streq(words[3], "="))
361                 fail(filename, op_num+1, "Expected <key> = <ret>");
362         op[op_num].ret = atoi(words[4]);
363         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[2]);
364         op[op_num].key = tdb_null;
365         op[op_num].group_len = 0;
366         total_keys++;
367 }
368
369 static int op_find_start(struct op op[], unsigned int op_num, enum op_type type)
370 {
371         unsigned int i;
372
373         for (i = op_num-1; i > 0; i--) {
374                 if (op[i].type == type && !op[i].group_len)
375                         return i;
376         }
377         return 0;
378 }
379
380 static void op_analyze_transaction(const char *filename,
381                                    struct op op[], unsigned int op_num,
382                                    char *words[])
383 {
384         unsigned int start, i;
385
386         op[op_num].key = tdb_null;
387
388         if (words[2])
389                 fail(filename, op_num+1, "Expect no arguments");
390
391         start = op_find_start(op, op_num, OP_TDB_TRANSACTION_START);
392         if (!start)
393                 fail(filename, op_num+1, "no transaction start found");
394
395         op[start].group_len = op_num - start;
396
397         /* This rolls in nested transactions.  I think that's right. */
398         for (i = start; i <= op_num; i++)
399                 op[i].group_start = start;
400 }
401
402 /* We treat chainlocks a lot like transactions, even though that's overkill */
403 static void op_analyze_chainlock(const char *filename,
404                                  struct op op[], unsigned int op_num,
405                                  char *words[])
406 {
407         unsigned int i, start;
408
409         if (words[2] == NULL || words[3])
410                 fail(filename, op_num+1, "Expected just a key");
411
412         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[2]);
413         op[op_num].key = tdb_null;
414         total_keys++;
415
416         start = op_find_start(op, op_num, OP_TDB_CHAINLOCK);
417         if (!start)
418                 start = op_find_start(op, op_num, OP_TDB_CHAINLOCK_READ);
419         if (!start)
420                 fail(filename, op_num+1, "no initial chainlock found");
421
422         /* FIXME: We'd have to do something clever to make this work
423          * vs. deadlock. */
424         if (!key_eq(op[start].data, op[op_num].data))
425                 fail(filename, op_num+1, "nested chainlock calls?");
426
427         op[start].group_len = op_num - start;
428         for (i = start; i <= op_num; i++)
429                 op[i].group_start = start;
430 }
431
432 static void op_analyze_traverse(const char *filename,
433                                 struct op op[], unsigned int op_num,
434                                 char *words[])
435 {
436         int i, start;
437
438         op[op_num].key = tdb_null;
439
440         /* = %u means traverse function terminated. */
441         if (words[2]) {
442                 if (!streq(words[2], "=") || !words[3] || words[4])
443                         fail(filename, op_num+1, "expect = <num>");
444                 op[op_num].ret = atoi(words[3]);
445         } else
446                 op[op_num].ret = 0;
447
448         start = op_find_start(op, op_num, OP_TDB_TRAVERSE_START);
449         if (!start)
450                 start = op_find_start(op, op_num, OP_TDB_TRAVERSE_READ_START);
451         if (!start)
452                 fail(filename, op_num+1, "no traversal start found");
453
454         op[start].group_len = op_num - start;
455
456         /* Don't roll in nested traverse/chainlock */
457         for (i = start; i <= op_num; i++)
458                 if (!op[i].group_start)
459                         op[i].group_start = start;
460 }
461
462 /* Keep -Wmissing-declarations happy: */
463 const struct op_table *
464 find_keyword (register const char *str, register unsigned int len);
465
466 #include "keywords.c"
467
468 struct depend {
469         /* We can have more than one */
470         struct list_node pre_list;
471         struct list_node post_list;
472         struct op_desc needs;
473         struct op_desc prereq;
474 };
475
476 static void check_deps(const char *filename, struct op op[], unsigned int num)
477 {
478 #ifdef DEBUG_DEPS
479         unsigned int i;
480
481         for (i = 1; i < num; i++)
482                 if (!list_empty(&op[i].pre))
483                         fail(filename, i+1, "Still has dependencies");
484 #endif
485 }
486
487 static void dump_pre(char *filename[], struct op *op[],
488                      unsigned int file, unsigned int i)
489 {
490         struct depend *dep;
491
492         printf("%s:%u (%u) still waiting for:\n", filename[file], i+1,
493                 op[file][i].seqnum);
494         list_for_each(&op[file][i].pre, dep, pre_list)
495                 printf("    %s:%u (%u)\n",
496                        filename[dep->prereq.file], dep->prereq.op_num+1,
497                        op[dep->prereq.file][dep->prereq.op_num].seqnum);
498         check_deps(filename[file], op[file], i);
499 }
500
501 /* We simply read/write pointers, since we all are children. */
502 static bool do_pre(struct tdb_context *tdb,
503                    char *filename[], struct op *op[],
504                    unsigned int file, int pre_fd, unsigned int i,
505                    bool backoff)
506 {
507         while (!list_empty(&op[file][i].pre)) {
508                 struct depend *dep;
509
510 #if DEBUG_DEPS
511                 printf("%s:%u:waiting for pre\n", filename[file], i+1);
512                 fflush(stdout);
513 #endif
514                 if (backoff)
515                         alarm(2);
516                 else
517                         alarm(10);
518                 while (read(pre_fd, &dep, sizeof(dep)) != sizeof(dep)) {
519                         if (errno == EINTR) {
520                                 if (backoff) {
521                                         struct op_desc desc = { file,i };
522                                         warnx("%s:%u:avoiding deadlock",
523                                               filename[file], i+1);
524                                         if (write(backoff_fd, &desc,
525                                                   sizeof(desc)) != sizeof(desc))
526                                                 err(1, "writing backoff_fd");
527                                         return false;
528                                 }
529                                 dump_pre(filename, op, file, i);
530                                 exit(1);
531                         } else
532                                 errx(1, "Reading from pipe");
533                 }
534                 alarm(0);
535
536 #if DEBUG_DEPS
537                 printf("%s:%u:got pre %u from %s:%u\n", filename[file], i+1,
538                        dep->needs.op_num+1, filename[dep->prereq.file],
539                        dep->prereq.op_num+1);
540                 fflush(stdout);
541 #endif
542                 /* This could be any op, not just this one. */
543                 talloc_free(dep);
544         }
545         return true;
546 }
547
548 static void do_post(char *filename[], struct op *op[],
549                     unsigned int file, unsigned int i)
550 {
551         struct depend *dep;
552
553         list_for_each(&op[file][i].post, dep, post_list) {
554 #if DEBUG_DEPS
555                 printf("%s:%u:sending to file %s:%u\n", filename[file], i+1,
556                        filename[dep->needs.file], dep->needs.op_num+1);
557 #endif
558                 if (write(pipes[dep->needs.file].fd[1], &dep, sizeof(dep))
559                     != sizeof(dep))
560                         err(1, "%s:%u failed to tell file %s",
561                             filename[file], i+1, filename[dep->needs.file]);
562         }
563 }
564
565 static int get_len(TDB_DATA key, TDB_DATA data, void *private_data)
566 {
567         return data.dsize;
568 }
569
570 static unsigned run_ops(struct tdb_context *tdb,
571                         int pre_fd,
572                         char *filename[],
573                         struct op *op[],
574                         unsigned int file,
575                         unsigned int start, unsigned int stop,
576                         bool backoff);
577
578 struct traverse_info {
579         struct op **op;
580         char **filename;
581         unsigned file;
582         int pre_fd;
583         unsigned int start;
584         unsigned int i;
585 };
586
587 /* More complex.  Just do whatever's they did at the n'th entry. */
588 static int nontrivial_traverse(struct tdb_context *tdb,
589                                TDB_DATA key, TDB_DATA data,
590                                void *_tinfo)
591 {
592         struct traverse_info *tinfo = _tinfo;
593         unsigned int trav_len = tinfo->op[tinfo->file][tinfo->start].group_len;
594         bool avoid_deadlock = false;
595
596         if (tinfo->i == tinfo->start + trav_len) {
597                 /* This can happen if traverse expects to be empty. */
598                 if (trav_len == 1)
599                         return 1;
600                 fail(tinfo->filename[tinfo->file], tinfo->start + 1,
601                      "traverse did not terminate");
602         }
603
604         if (tinfo->op[tinfo->file][tinfo->i].type != OP_TDB_TRAVERSE)
605                 fail(tinfo->filename[tinfo->file], tinfo->start + 1,
606                      "%s:%u:traverse terminated early");
607
608 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
609         avoid_deadlock = true;
610 #endif
611
612         /* Run any normal ops. */
613         tinfo->i = run_ops(tdb, tinfo->pre_fd, tinfo->filename, tinfo->op,
614                            tinfo->file, tinfo->i+1, tinfo->start + trav_len,
615                            avoid_deadlock);
616
617         /* We backed off, or we hit OP_TDB_TRAVERSE_END/EARLY. */
618         if (tinfo->op[tinfo->file][tinfo->i].type != OP_TDB_TRAVERSE)
619                 return 1;
620
621         return 0;
622 }
623
624 static unsigned op_traverse(struct tdb_context *tdb,
625                             int pre_fd,
626                             char *filename[],
627                             unsigned int file,
628                             int (*traversefn)(struct tdb_context *,
629                                               tdb_traverse_func, void *),
630                             struct op *op[],
631                             unsigned int start)
632 {
633         struct traverse_info tinfo = { op, filename, file, pre_fd,
634                                        start, start+1 };
635
636         traversefn(tdb, nontrivial_traverse, &tinfo);
637
638         /* Traversing in wrong order can have strange effects: eg. if
639          * original traverse went A (delete A), B, we might do B
640          * (delete A).  So if we have ops left over, we do it now. */
641         while (tinfo.i != start + op[file][start].group_len) {
642                 if (op[file][tinfo.i].type == OP_TDB_TRAVERSE
643                     || op[file][tinfo.i].type == OP_TDB_TRAVERSE_END_EARLY)
644                         tinfo.i++;
645                 else
646                         tinfo.i = run_ops(tdb, pre_fd, filename, op, file,
647                                           tinfo.i,
648                                           start + op[file][start].group_len,
649                                           false);
650         }
651
652         return tinfo.i;
653 }
654
655 static void break_out(int sig)
656 {
657 }
658
659 static __attribute__((noinline))
660 unsigned run_ops(struct tdb_context *tdb,
661                  int pre_fd,
662                  char *filename[],
663                  struct op *op[],
664                  unsigned int file,
665                  unsigned int start, unsigned int stop,
666                  bool backoff)
667 {
668         unsigned int i;
669         struct sigaction sa;
670
671         sa.sa_handler = break_out;
672         sa.sa_flags = 0;
673
674         sigaction(SIGALRM, &sa, NULL);
675         for (i = start; i < stop; i++) {
676                 if (!do_pre(tdb, filename, op, file, pre_fd, i, backoff))
677                         return i;
678
679                 switch (op[file][i].type) {
680                 case OP_TDB_LOCKALL:
681                         try(tdb_lockall(tdb), op[file][i].ret);
682                         break;
683                 case OP_TDB_LOCKALL_MARK:
684                         try(tdb_lockall_mark(tdb), op[file][i].ret);
685                         break;
686                 case OP_TDB_LOCKALL_UNMARK:
687                         try(tdb_lockall_unmark(tdb), op[file][i].ret);
688                         break;
689                 case OP_TDB_LOCKALL_NONBLOCK:
690                         unreliable(tdb_lockall_nonblock(tdb), op[file][i].ret,
691                                    tdb_lockall(tdb), tdb_unlockall(tdb));
692                         break;
693                 case OP_TDB_UNLOCKALL:
694                         try(tdb_unlockall(tdb), op[file][i].ret);
695                         break;
696                 case OP_TDB_LOCKALL_READ:
697                         try(tdb_lockall_read(tdb), op[file][i].ret);
698                         break;
699                 case OP_TDB_LOCKALL_READ_NONBLOCK:
700                         unreliable(tdb_lockall_read_nonblock(tdb),
701                                    op[file][i].ret,
702                                    tdb_lockall_read(tdb),
703                                    tdb_unlockall_read(tdb));
704                         break;
705                 case OP_TDB_UNLOCKALL_READ:
706                         try(tdb_unlockall_read(tdb), op[file][i].ret);
707                         break;
708                 case OP_TDB_CHAINLOCK:
709                         try(tdb_chainlock(tdb, op[file][i].key),
710                             op[file][i].ret);
711                         break;
712                 case OP_TDB_CHAINLOCK_NONBLOCK:
713                         unreliable(tdb_chainlock_nonblock(tdb, op[file][i].key),
714                                    op[file][i].ret,
715                                    tdb_chainlock(tdb, op[file][i].key),
716                                    tdb_chainunlock(tdb, op[file][i].key));
717                         break;
718                 case OP_TDB_CHAINLOCK_MARK:
719                         try(tdb_chainlock_mark(tdb, op[file][i].key),
720                             op[file][i].ret);
721                         break;
722                 case OP_TDB_CHAINLOCK_UNMARK:
723                         try(tdb_chainlock_unmark(tdb, op[file][i].key),
724                             op[file][i].ret);
725                         break;
726                 case OP_TDB_CHAINUNLOCK:
727                         try(tdb_chainunlock(tdb, op[file][i].key),
728                             op[file][i].ret);
729                         break;
730                 case OP_TDB_CHAINLOCK_READ:
731                         try(tdb_chainlock_read(tdb, op[file][i].key),
732                             op[file][i].ret);
733                         break;
734                 case OP_TDB_CHAINUNLOCK_READ:
735                         try(tdb_chainunlock_read(tdb, op[file][i].key),
736                             op[file][i].ret);
737                         break;
738                 case OP_TDB_PARSE_RECORD:
739                         try(tdb_parse_record(tdb, op[file][i].key, get_len,
740                                              NULL),
741                             op[file][i].ret);
742                         break;
743                 case OP_TDB_EXISTS:
744                         try(tdb_exists(tdb, op[file][i].key), op[file][i].ret);
745                         break;
746                 case OP_TDB_STORE:
747                         try(tdb_store(tdb, op[file][i].key, op[file][i].data,
748                                       op[file][i].flag),
749                             op[file][i].ret);
750                         break;
751                 case OP_TDB_APPEND:
752                         try(tdb_append(tdb, op[file][i].key, op[file][i].data),
753                             op[file][i].ret);
754                         break;
755                 case OP_TDB_GET_SEQNUM:
756                         try(tdb_get_seqnum(tdb), op[file][i].ret);
757                         break;
758                 case OP_TDB_WIPE_ALL:
759                         try(tdb_wipe_all(tdb), op[file][i].ret);
760                         break;
761                 case OP_TDB_TRANSACTION_START:
762                         try(tdb_transaction_start(tdb), op[file][i].ret);
763                         break;
764                 case OP_TDB_TRANSACTION_CANCEL:
765                         try(tdb_transaction_cancel(tdb), op[file][i].ret);
766                         break;
767                 case OP_TDB_TRANSACTION_COMMIT:
768                         try(tdb_transaction_commit(tdb), op[file][i].ret);
769                         break;
770                 case OP_TDB_TRAVERSE_READ_START:
771                         i = op_traverse(tdb, pre_fd, filename, file,
772                                         tdb_traverse_read, op, i);
773                         break;
774                 case OP_TDB_TRAVERSE_START:
775                         i = op_traverse(tdb, pre_fd, filename, file,
776                                         tdb_traverse, op, i);
777                         break;
778                 case OP_TDB_TRAVERSE:
779                 case OP_TDB_TRAVERSE_END_EARLY:
780                         /* Terminate: we're in a traverse, and we've
781                          * done our ops. */
782                         return i;
783                 case OP_TDB_TRAVERSE_END:
784                         fail(filename[file], i+1, "unexpected end traverse");
785                 /* FIXME: These must be treated like traverse. */
786                 case OP_TDB_FIRSTKEY:
787                         if (!key_eq(tdb_firstkey(tdb), op[file][i].data))
788                                 fail(filename[file], i+1, "bad firstkey");
789                         break;
790                 case OP_TDB_NEXTKEY:
791                         if (!key_eq(tdb_nextkey(tdb, op[file][i].key),
792                                     op[file][i].data))
793                                 fail(filename[file], i+1, "bad nextkey");
794                         break;
795                 case OP_TDB_FETCH: {
796                         TDB_DATA f = tdb_fetch(tdb, op[file][i].key);
797                         if (!key_eq(f, op[file][i].data))
798                                 fail(filename[file], i+1, "bad fetch %u",
799                                      f.dsize);
800                         break;
801                 }
802                 case OP_TDB_DELETE:
803                         try(tdb_delete(tdb, op[file][i].key), op[file][i].ret);
804                         break;
805                 }
806                 do_post(filename, op, file, i);
807         }
808         return i;
809 }
810
811 /* tdbtorture, in particular, can do a tdb_close with a transaction in
812  * progress. */
813 static struct op *maybe_cancel_transaction(const char *filename,
814                                            struct op *op, unsigned int *num)
815 {
816         unsigned int start = op_find_start(op, *num, OP_TDB_TRANSACTION_START);
817
818         if (start) {
819                 char *words[] = { "<unknown>", "tdb_close", NULL };
820                 add_op(filename, &op, *num, op[start].seqnum,
821                        OP_TDB_TRANSACTION_CANCEL);
822                 op_analyze_transaction(filename, op, *num, words);
823                 (*num)++;
824         }
825         return op;
826 }
827
828 static struct op *load_tracefile(const char *filename, unsigned int *num,
829                                  unsigned int *hashsize,
830                                  unsigned int *tdb_flags,
831                                  unsigned int *open_flags)
832 {
833         unsigned int i;
834         struct op *op = talloc_array(NULL, struct op, 1);
835         char **words;
836         char **lines;
837         char *file;
838
839         file = grab_file(NULL, filename, NULL);
840         if (!file)
841                 err(1, "Reading %s", filename);
842
843         lines = strsplit(file, file, "\n", NULL);
844         if (!lines[0])
845                 errx(1, "%s is empty", filename);
846
847         words = strsplit(lines, lines[0], " ", NULL);
848         if (!streq(words[1], "tdb_open"))
849                 fail(filename, 1, "does not start with tdb_open");
850
851         *hashsize = atoi(words[2]);
852         *tdb_flags = strtoul(words[3], NULL, 0);
853         *open_flags = strtoul(words[4], NULL, 0);
854
855         for (i = 1; lines[i]; i++) {
856                 const struct op_table *opt;
857
858                 words = strsplit(lines, lines[i], " ", NULL);
859                 if (!words[0] || !words[1])
860                         fail(filename, i+1, "Expected seqnum number and op");
861                
862                 opt = find_keyword(words[1], strlen(words[1]));
863                 if (!opt) {
864                         if (streq(words[1], "tdb_close")) {
865                                 if (lines[i+1])
866                                         fail(filename, i+2,
867                                              "lines after tdb_close");
868                                 *num = i;
869                                 talloc_free(lines);
870                                 return maybe_cancel_transaction(filename,
871                                                                 op, num);
872                         }
873                         fail(filename, i+1, "Unknown operation '%s'", words[1]);
874                 }
875
876                 add_op(filename, &op, i, atoi(words[0]), opt->type);
877                 opt->enhance_op(filename, op, i, words);
878         }
879
880         fprintf(stderr, "%s:%u:last operation is not tdb_close: incomplete?",
881               filename, i);
882         talloc_free(lines);
883         *num = i - 1;
884         return maybe_cancel_transaction(filename, op, num);
885 }
886
887 /* We remember all the keys we've ever seen, and who has them. */
888 struct keyinfo {
889         TDB_DATA key;
890         unsigned int num_users;
891         struct op_desc *user;
892 };
893
894 static const TDB_DATA must_not_exist;
895 static const TDB_DATA must_exist;
896 static const TDB_DATA not_exists_or_empty;
897
898 /* NULL means doesn't care if it exists or not, &must_exist means
899  * it must exist but we don't care what, &must_not_exist means it must
900  * not exist, otherwise the data it needs. */
901 static const TDB_DATA *needs(const struct op *op)
902 {
903         switch (op->type) {
904         /* FIXME: Pull forward deps, since we can deadlock */
905         case OP_TDB_CHAINLOCK:
906         case OP_TDB_CHAINLOCK_NONBLOCK:
907         case OP_TDB_CHAINLOCK_MARK:
908         case OP_TDB_CHAINLOCK_UNMARK:
909         case OP_TDB_CHAINUNLOCK:
910         case OP_TDB_CHAINLOCK_READ:
911         case OP_TDB_CHAINUNLOCK_READ:
912                 return NULL;
913
914         case OP_TDB_APPEND:
915                 if (op->append.pre.dsize == 0)
916                         return &not_exists_or_empty;
917                 return &op->append.pre;
918
919         case OP_TDB_STORE:
920                 if (op->flag == TDB_INSERT) {
921                         if (op->ret < 0)
922                                 return &must_exist;
923                         else
924                                 return &must_not_exist;
925                 } else if (op->flag == TDB_MODIFY) {
926                         if (op->ret < 0)
927                                 return &must_not_exist;
928                         else
929                                 return &must_exist;
930                 }
931                 /* No flags?  Don't care */
932                 return NULL;
933
934         case OP_TDB_EXISTS:
935                 if (op->ret == 1)
936                         return &must_exist;
937                 else
938                         return &must_not_exist;
939
940         case OP_TDB_PARSE_RECORD:
941                 if (op->ret < 0)
942                         return &must_not_exist;
943                 return &must_exist;
944
945         /* FIXME: handle these. */
946         case OP_TDB_WIPE_ALL:
947         case OP_TDB_FIRSTKEY:
948         case OP_TDB_NEXTKEY:
949         case OP_TDB_GET_SEQNUM:
950         case OP_TDB_TRAVERSE:
951         case OP_TDB_TRANSACTION_COMMIT:
952         case OP_TDB_TRANSACTION_CANCEL:
953         case OP_TDB_TRANSACTION_START:
954                 return NULL;
955
956         case OP_TDB_FETCH:
957                 if (!op->data.dptr)
958                         return &must_not_exist;
959                 return &op->data;
960
961         case OP_TDB_DELETE:
962                 if (op->ret < 0)
963                         return &must_not_exist;
964                 return &must_exist;
965
966         default:
967                 errx(1, "Unexpected op type %i", op->type);
968         }
969         
970 }
971
972 static bool starts_transaction(const struct op *op)
973 {
974         return op->type == OP_TDB_TRANSACTION_START;
975 }
976
977 static bool in_transaction(const struct op op[], unsigned int i)
978 {
979         return op[i].group_start && starts_transaction(&op[op[i].group_start]);
980 }
981
982 static bool successful_transaction(const struct op *op)
983 {
984         return starts_transaction(op)
985                 && op[op->group_len].type == OP_TDB_TRANSACTION_COMMIT;
986 }
987
988 static bool starts_traverse(const struct op *op)
989 {
990         return op->type == OP_TDB_TRAVERSE_START
991                 || op->type == OP_TDB_TRAVERSE_READ_START;
992 }
993
994 static bool in_traverse(const struct op op[], unsigned int i)
995 {
996         return op[i].group_start && starts_traverse(&op[op[i].group_start]);
997 }
998
999 static bool starts_chainlock(const struct op *op)
1000 {
1001         return op->type == OP_TDB_CHAINLOCK_READ
1002                 || op->type == OP_TDB_CHAINLOCK;
1003 }
1004
1005 static bool in_chainlock(const struct op op[], unsigned int i)
1006 {
1007         return op[i].group_start && starts_chainlock(&op[op[i].group_start]);
1008 }
1009
1010 /* What's the data after this op?  pre if nothing changed. */
1011 static const TDB_DATA *gives(const TDB_DATA *key, const TDB_DATA *pre,
1012                              const struct op *op)
1013 {
1014         if (starts_transaction(op) || starts_chainlock(op)) {
1015                 unsigned int i;
1016
1017                 /* Cancelled transactions don't change anything. */
1018                 if (op[op->group_len].type == OP_TDB_TRANSACTION_CANCEL)
1019                         return pre;
1020                 assert(op[op->group_len].type == OP_TDB_TRANSACTION_COMMIT
1021                        || op[op->group_len].type == OP_TDB_CHAINUNLOCK_READ
1022                        || op[op->group_len].type == OP_TDB_CHAINUNLOCK);
1023
1024                 for (i = 1; i < op->group_len; i++) {
1025                         /* This skips nested transactions, too */
1026                         if (key_eq(op[i].key, *key))
1027                                 pre = gives(key, pre, &op[i]);
1028                 }
1029                 return pre;
1030         }
1031
1032         /* Failed ops don't change state of db. */
1033         if (op->ret < 0)
1034                 return pre;
1035
1036         if (op->type == OP_TDB_DELETE || op->type == OP_TDB_WIPE_ALL)
1037                 return &tdb_null;
1038
1039         if (op->type == OP_TDB_APPEND)
1040                 return &op->append.post;
1041
1042         if (op->type == OP_TDB_STORE)
1043                 return &op->data;
1044
1045         return pre;
1046 }
1047
1048 static struct keyinfo *hash_ops(struct op *op[], unsigned int num_ops[],
1049                                 unsigned int num)
1050 {
1051         unsigned int i, j, h;
1052         struct keyinfo *hash;
1053
1054         hash = talloc_zero_array(op[0], struct keyinfo, total_keys*2);
1055         for (i = 0; i < num; i++) {
1056                 for (j = 1; j < num_ops[i]; j++) {
1057                         /* We can't do this on allocation, due to realloc. */
1058                         list_head_init(&op[i][j].post);
1059                         list_head_init(&op[i][j].pre);
1060
1061                         if (!op[i][j].key.dptr)
1062                                 continue;
1063
1064                         h = hash_key(&op[i][j].key) % (total_keys * 2);
1065                         while (!key_eq(hash[h].key, op[i][j].key)) {
1066                                 if (!hash[h].key.dptr) {
1067                                         hash[h].key = op[i][j].key;
1068                                         break;
1069                                 }
1070                                 h = (h + 1) % (total_keys * 2);
1071                         }
1072                         /* Might as well save some memory if we can. */
1073                         if (op[i][j].key.dptr != hash[h].key.dptr) {
1074                                 talloc_free(op[i][j].key.dptr);
1075                                 op[i][j].key.dptr = hash[h].key.dptr;
1076                         }
1077                         hash[h].user = talloc_realloc(hash, hash[h].user,
1078                                                      struct op_desc,
1079                                                      hash[h].num_users+1);
1080
1081                         /* If it's in a transaction, it's the transaction which
1082                          * matters from an analysis POV. */
1083                         if (in_transaction(op[i], j)
1084                             || in_chainlock(op[i], j)) {
1085                                 unsigned start = op[i][j].group_start;
1086
1087                                 /* Don't include twice. */
1088                                 if (hash[h].num_users
1089                                     && hash[h].user[hash[h].num_users-1].file
1090                                         == i
1091                                     && hash[h].user[hash[h].num_users-1].op_num
1092                                         == start)
1093                                         continue;
1094
1095                                 hash[h].user[hash[h].num_users].op_num = start;
1096                         } else
1097                                 hash[h].user[hash[h].num_users].op_num = j;
1098                         hash[h].user[hash[h].num_users].file = i;
1099                         hash[h].num_users++;
1100                 }
1101         }
1102
1103         return hash;
1104 }
1105
1106 static bool satisfies(const TDB_DATA *key, const TDB_DATA *data,
1107                       const struct op *op)
1108 {
1109         const TDB_DATA *need = NULL;
1110
1111         if (starts_transaction(op) || starts_chainlock(op)) {
1112                 unsigned int i;
1113
1114                 /* Look through for an op in this transaction which
1115                  * needs this key. */
1116                 for (i = 1; i < op->group_len; i++) {
1117                         if (key_eq(op[i].key, *key)) {
1118                                 need = needs(&op[i]);
1119                                 /* tdb_exists() is special: there might be
1120                                  * something in the transaction with more
1121                                  * specific requirements.  Other ops don't have
1122                                  * specific requirements (eg. store or delete),
1123                                  * but they change the value so we can't get
1124                                  * more information from future ops. */
1125                                 if (op[i].type != OP_TDB_EXISTS)
1126                                         break;
1127                         }
1128                 }
1129         } else
1130                 need = needs(op);
1131
1132         /* Don't need anything?  Cool. */
1133         if (!need)
1134                 return true;
1135
1136         /* This should be tdb_null or a real value. */
1137         assert(data != &must_exist);
1138         assert(data != &must_not_exist);
1139         assert(data != &not_exists_or_empty);
1140
1141         /* Must not exist?  data must not exist. */
1142         if (need == &must_not_exist)
1143                 return data == &tdb_null;
1144
1145         /* Must exist? */
1146         if (need == &must_exist)
1147                 return data != &tdb_null;
1148
1149         /* Either noexist or empty. */
1150         if (need == &not_exists_or_empty)
1151                 return data->dsize == 0;
1152
1153         /* Needs something specific. */
1154         return key_eq(*data, *need);
1155 }
1156
1157 static void move_to_front(struct op_desc res[], unsigned off, unsigned elem)
1158 {
1159         if (elem != off) {
1160                 struct op_desc tmp = res[elem];
1161                 memmove(res + off + 1, res + off, (elem - off)*sizeof(res[0]));
1162                 res[off] = tmp;
1163         }
1164 }
1165
1166 static void restore_to_pos(struct op_desc res[], unsigned off, unsigned elem)
1167 {
1168         if (elem != off) {
1169                 struct op_desc tmp = res[off];
1170                 memmove(res + off, res + off + 1, (elem - off)*sizeof(res[0]));
1171                 res[elem] = tmp;
1172         }
1173 }
1174
1175 static bool sort_deps(char *filename[], struct op *op[],
1176                       struct op_desc res[],
1177                       unsigned off, unsigned num,
1178                       const TDB_DATA *key, const TDB_DATA *data,
1179                       unsigned num_files, unsigned fuzz)
1180 {
1181         unsigned int i, files_done;
1182         struct op *this_op;
1183         bool done[num_files];
1184
1185         /* None left?  We're sorted. */
1186         if (off == num)
1187                 return true;
1188
1189         /* Does this make sequence number go backwards?  Allow a little fuzz. */
1190         if (off > 0) {
1191                 int seqnum1 = op[res[off-1].file][res[off-1].op_num].seqnum;
1192                 int seqnum2 = op[res[off].file][res[off].op_num].seqnum;
1193
1194                 if (seqnum1 - seqnum2 > (int)fuzz) {
1195 #if DEBUG_DEPS
1196                         printf("Seqnum jump too far (%u -> %u)\n",
1197                                seqnum1, seqnum2);
1198 #endif
1199                         return false;
1200                 }
1201         }
1202
1203         memset(done, 0, sizeof(done));
1204
1205         /* Since ops within a trace file are ordered, we just need to figure
1206          * out which file to try next.  Since we don't take into account
1207          * inter-key relationships (which exist by virtue of trace file order),
1208          * we minimize the chance of harm by trying to keep in seqnum order. */
1209         for (files_done = 0, i = off; i < num && files_done < num_files; i++) {
1210                 if (done[res[i].file])
1211                         continue;
1212
1213                 this_op = &op[res[i].file][res[i].op_num];
1214
1215                 /* Is what we have good enough for this op? */
1216                 if (satisfies(key, data, this_op)) {
1217                         move_to_front(res, off, i);
1218                         if (sort_deps(filename, op, res, off+1, num,
1219                                       key, gives(key, data, this_op),
1220                                       num_files, fuzz))
1221                                 return true;
1222                         restore_to_pos(res, off, i);
1223                 }
1224                 done[res[i].file] = true;
1225                 files_done++;
1226         }
1227
1228         /* No combination worked. */
1229         return false;
1230 }
1231
1232 static void check_dep_sorting(struct op_desc user[], unsigned num_users,
1233                               unsigned num_files)
1234 {
1235 #if DEBUG_DEPS
1236         unsigned int i;
1237         unsigned minima[num_files];
1238
1239         memset(minima, 0, sizeof(minima));
1240         for (i = 0; i < num_users; i++) {
1241                 assert(minima[user[i].file] < user[i].op_num);
1242                 minima[user[i].file] = user[i].op_num;
1243         }
1244 #endif
1245 }
1246
1247 /* All these ops happen on the same key.  Which comes first?
1248  *
1249  * This can happen both because read ops or failed write ops don't
1250  * change sequence number, and also due to race since we access the
1251  * number unlocked (the race can cause less detectable ordering problems,
1252  * in which case we'll deadlock and report: fix manually in that case).
1253  */
1254 static void figure_deps(char *filename[], struct op *op[],
1255                         const TDB_DATA *key, struct op_desc user[],
1256                         unsigned num_users, unsigned num_files)
1257 {
1258         /* We assume database starts empty. */
1259         const struct TDB_DATA *data = &tdb_null;
1260         unsigned int fuzz;
1261
1262         /* We prefer to keep strict seqnum order if possible: it's the
1263          * most likely.  We get more lax if that fails. */
1264         for (fuzz = 0; fuzz < 100; fuzz = (fuzz + 1)*2) {
1265                 if (sort_deps(filename, op, user, 0, num_users, key, data,
1266                               num_files, fuzz))
1267                         break;
1268         }
1269
1270         if (fuzz >= 100)
1271                 fail(filename[user[0].file], user[0].op_num+1,
1272                      "Could not resolve inter-dependencies");
1273
1274         check_dep_sorting(user, num_users, num_files);
1275 }
1276
1277 static void sort_ops(struct keyinfo hash[], char *filename[], struct op *op[],
1278                      unsigned int num)
1279 {
1280         unsigned int h;
1281
1282         /* Gcc nexted function extension.  How cool is this? */
1283         int compare_seqnum(const void *_a, const void *_b)
1284         {
1285                 const struct op_desc *a = _a, *b = _b;
1286
1287                 /* First, maintain order within any trace file. */
1288                 if (a->file == b->file)
1289                         return a->op_num - b->op_num;
1290
1291                 /* Otherwise, arrange by seqnum order. */
1292                 if (op[a->file][a->op_num].seqnum !=
1293                     op[b->file][b->op_num].seqnum)
1294                         return op[a->file][a->op_num].seqnum
1295                                 - op[b->file][b->op_num].seqnum;
1296
1297                 /* Cancelled transactions are assumed to happen first. */
1298                 if (starts_transaction(&op[a->file][a->op_num])
1299                     && !successful_transaction(&op[a->file][a->op_num]))
1300                         return -1;
1301                 if (starts_transaction(&op[b->file][b->op_num])
1302                     && !successful_transaction(&op[b->file][b->op_num]))
1303                         return 1;
1304
1305                 /* No idea. */
1306                 return 0;
1307         }
1308
1309         /* Now sort into seqnum order. */
1310         for (h = 0; h < total_keys * 2; h++) {
1311                 struct op_desc *user = hash[h].user;
1312
1313                 qsort(user, hash[h].num_users, sizeof(user[0]), compare_seqnum);
1314                 figure_deps(filename, op, &hash[h].key, user, hash[h].num_users,
1315                             num);
1316         }
1317 }
1318
1319 static int destroy_depend(struct depend *dep)
1320 {
1321         list_del(&dep->pre_list);
1322         list_del(&dep->post_list);
1323         return 0;
1324 }
1325
1326 static void add_dependency(void *ctx,
1327                            struct op *op[],
1328                            char *filename[],
1329                            const struct op_desc *needs,
1330                            const struct op_desc *prereq)
1331 {
1332         struct depend *dep;
1333
1334         /* We don't depend on ourselves. */
1335         if (needs->file == prereq->file) {
1336                 assert(prereq->op_num < needs->op_num);
1337                 return;
1338         }
1339
1340 #if DEBUG_DEPS
1341         printf("%s:%u: depends on %s:%u\n",
1342                filename[needs->file], needs->op_num+1,
1343                filename[prereq->file], prereq->op_num+1);
1344 #endif
1345
1346         dep = talloc(ctx, struct depend);
1347         dep->needs = *needs;
1348         dep->prereq = *prereq;
1349
1350 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
1351         /* If something in a traverse depends on something in another
1352          * traverse/transaction, it creates a dependency between the
1353          * two groups. */
1354         if ((in_traverse(op[prereq->file], prereq->op_num)
1355              && (starts_transaction(&op[needs->file][needs->op_num])
1356                  || starts_traverse(&op[needs->file][needs->op_num])))
1357             || (in_traverse(op[needs->file], needs->op_num)
1358                 && (starts_transaction(&op[prereq->file][prereq->op_num])
1359                     || starts_traverse(&op[prereq->file][prereq->op_num])))) {
1360                 unsigned int start;
1361
1362                 /* We are satisfied by end of group. */
1363                 start = op[prereq->file][prereq->op_num].group_start;
1364                 dep->prereq.op_num = start + op[prereq->file][start].group_len;
1365                 /* And we need that done by start of our group. */
1366                 dep->needs.op_num = op[needs->file][needs->op_num].group_start;
1367         }
1368
1369         /* There is also this case:
1370          *  <traverse> <read foo> ...
1371          *  <transaction> ... </transaction> <create foo>
1372          * Where if we start the traverse then wait, we could block
1373          * the transaction and deadlock.
1374          *
1375          * We try to address this by ensuring that where seqnum indicates it's
1376          * possible, we wait for <create foo> before *starting* traverse.
1377          */
1378         else if (in_traverse(op[needs->file], needs->op_num)) {
1379                 struct op *need = &op[needs->file][needs->op_num];
1380                 if (op[needs->file][need->group_start].seqnum >
1381                     op[prereq->file][prereq->op_num].seqnum) {
1382                         dep->needs.op_num = need->group_start;
1383                 }
1384         }
1385 #endif
1386
1387         /* If you depend on a transaction or chainlock, you actually
1388          * depend on it ending. */
1389         if (starts_transaction(&op[prereq->file][dep->prereq.op_num])
1390             || starts_chainlock(&op[prereq->file][dep->prereq.op_num])) {
1391                 dep->prereq.op_num
1392                         += op[dep->prereq.file][dep->prereq.op_num].group_len;
1393 #if DEBUG_DEPS
1394                 printf("-> Actually end of transaction %s:%u\n",
1395                        filename[dep->prereq->file], dep->prereq->op_num+1);
1396 #endif
1397         } else
1398                 /* We should never create a dependency from middle of
1399                  * a transaction. */
1400                 assert(!in_transaction(op[prereq->file], dep->prereq.op_num)
1401                        || op[prereq->file][dep->prereq.op_num].type
1402                        == OP_TDB_TRANSACTION_COMMIT
1403                        || op[prereq->file][dep->prereq.op_num].type
1404                        == OP_TDB_TRANSACTION_CANCEL);
1405
1406         list_add(&op[dep->prereq.file][dep->prereq.op_num].post,
1407                  &dep->post_list);
1408         list_add(&op[dep->needs.file][dep->needs.op_num].pre,
1409                  &dep->pre_list);
1410         talloc_set_destructor(dep, destroy_depend);
1411 }
1412
1413 static bool changes_db(const TDB_DATA *key, const struct op *op)
1414 {
1415         return gives(key, NULL, op) != NULL;
1416 }
1417
1418 static void depend_on_previous(struct op *op[],
1419                                char *filename[],
1420                                unsigned int num,
1421                                struct op_desc user[],
1422                                unsigned int i,
1423                                int prev)
1424 {
1425         bool deps[num];
1426         int j;
1427
1428         if (i == 0)
1429                 return;
1430
1431         if (prev == i - 1) {
1432                 /* Just depend on previous. */
1433                 add_dependency(NULL, op, filename, &user[i], &user[prev]);
1434                 return;
1435         }
1436
1437         /* We have to wait for the readers.  Find last one in *each* file. */
1438         memset(deps, 0, sizeof(deps));
1439         deps[user[i].file] = true;
1440         for (j = i - 1; j > prev; j--) {
1441                 if (!deps[user[j].file]) {
1442                         add_dependency(NULL, op, filename, &user[i], &user[j]);
1443                         deps[user[j].file] = true;
1444                 }
1445         }
1446 }
1447
1448 /* This is simple, but not complete.  We don't take into account
1449  * indirect dependencies. */
1450 static void optimize_dependencies(struct op *op[], unsigned int num_ops[],
1451                                   unsigned int num)
1452 {
1453         unsigned int i, j;
1454
1455         /* There can only be one real dependency on each file */
1456         for (i = 0; i < num; i++) {
1457                 for (j = 1; j < num_ops[i]; j++) {
1458                         struct depend *dep, *next;
1459                         struct depend *prev[num];
1460
1461                         memset(prev, 0, sizeof(prev));
1462
1463                         list_for_each_safe(&op[i][j].pre, dep, next, pre_list) {
1464                                 if (!prev[dep->prereq.file]) {
1465                                         prev[dep->prereq.file] = dep;
1466                                         continue;
1467                                 }
1468                                 if (prev[dep->prereq.file]->prereq.op_num
1469                                     < dep->prereq.op_num) {
1470                                         talloc_free(prev[dep->prereq.file]);
1471                                         prev[dep->prereq.file] = dep;
1472                                 } else
1473                                         talloc_free(dep);
1474                         }
1475                 }
1476         }
1477
1478         for (i = 0; i < num; i++) {
1479                 int deps[num];
1480
1481                 for (j = 0; j < num; j++)
1482                         deps[j] = -1;
1483
1484                 for (j = 1; j < num_ops[i]; j++) {
1485                         struct depend *dep, *next;
1486
1487                         list_for_each_safe(&op[i][j].pre, dep, next, pre_list) {
1488                                 if (deps[dep->prereq.file]
1489                                     >= (int)dep->prereq.op_num)
1490                                         talloc_free(dep);
1491                                 else
1492                                         deps[dep->prereq.file]
1493                                                 = dep->prereq.op_num;
1494                         }
1495                 }
1496         }
1497 }
1498
1499 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
1500 /* Force an order among the traversals, so they don't deadlock (as much) */
1501 static void make_traverse_depends(char *filename[],
1502                                   struct op *op[], unsigned int num_ops[],
1503                                   unsigned int num)
1504 {
1505         unsigned int i, num_traversals = 0;
1506         int j;
1507         struct op_desc *desc;
1508
1509         /* Sort by which one runs first. */
1510         int compare_traverse_desc(const void *_a, const void *_b)
1511         {
1512                 const struct op_desc *da = _a, *db = _b;
1513                 const struct op *a = &op[da->file][da->op_num],
1514                         *b = &op[db->file][db->op_num];
1515
1516                 if (a->seqnum != b->seqnum)
1517                         return a->seqnum - b->seqnum;
1518
1519                 /* If they have same seqnum, it means one didn't make any
1520                  * changes.  Thus sort by end in that case. */
1521                 return a[a->group_len].seqnum - b[b->group_len].seqnum;
1522         }
1523
1524         desc = talloc_array(NULL, struct op_desc, 1);
1525
1526         /* Count them. */
1527         for (i = 0; i < num; i++) {
1528                 for (j = 1; j < num_ops[i]; j++) {
1529                         /* Traverse start (ignore those in
1530                          * transactions; they're already covered by
1531                          * transaction dependencies). */
1532                         if (starts_traverse(&op[i][j])
1533                             && !in_transaction(op[i], j)) {
1534                                 desc = talloc_realloc(NULL, desc,
1535                                                       struct op_desc,
1536                                                       num_traversals+1);
1537                                 desc[num_traversals].file = i;
1538                                 desc[num_traversals].op_num = j;
1539                                 num_traversals++;
1540                         }
1541                 }
1542         }
1543         qsort(desc, num_traversals, sizeof(desc[0]), compare_traverse_desc);
1544
1545         for (i = 1; i < num_traversals; i++) {
1546                 const struct op *prev = &op[desc[i-1].file][desc[i-1].op_num];
1547                 const struct op *curr = &op[desc[i].file][desc[i].op_num];
1548
1549                 /* Read traverses don't depend on each other (read lock). */
1550                 if (prev->type == OP_TDB_TRAVERSE_READ_START
1551                     && curr->type == OP_TDB_TRAVERSE_READ_START)
1552                         continue;
1553
1554                 /* Only make dependency if it's clear. */
1555                 if (compare_traverse_desc(&desc[i], &desc[i-1])) {
1556                         /* i depends on end of traverse i-1. */
1557                         struct op_desc end = desc[i-1];
1558                         end.op_num += prev->group_len;
1559                         add_dependency(NULL, op, filename, &desc[i], &end);
1560                 }
1561         }
1562         talloc_free(desc);
1563 }
1564
1565 static void set_nonblock(int fd)
1566 {
1567         if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL)|O_NONBLOCK) != 0)
1568                 err(1, "Setting pipe nonblocking");
1569 }
1570
1571 static bool handle_backoff(struct op *op[], int fd)
1572 {
1573         struct op_desc desc;
1574         bool handled = false;
1575
1576         /* Sloppy coding: we assume PIPEBUF never fills. */
1577         while (read(fd, &desc, sizeof(desc)) != -1) {
1578                 unsigned int i;
1579                 handled = true;
1580                 for (i = desc.op_num; i > 0; i--) {
1581                         if (op[desc.file][i].type == OP_TDB_TRAVERSE) {
1582                                 /* We insert a fake end here. */
1583                                 op[desc.file][i].type
1584                                         = OP_TDB_TRAVERSE_END_EARLY;
1585                                 break;
1586                         } else if (starts_traverse(&op[desc.file][i])) {
1587                                 unsigned int start = i;
1588                                 struct op tmp = op[desc.file][i];
1589                                 /* Move the ops outside traverse. */
1590                                 memmove(&op[desc.file][i],
1591                                         &op[desc.file][i+1],
1592                                         (desc.op_num-i-1) * sizeof(op[0][0]));
1593                                 op[desc.file][desc.op_num] = tmp;
1594                                 while (op[desc.file][i].group_start == start) {
1595                                         op[desc.file][i++].group_start
1596                                                 = desc.op_num;
1597                                 }
1598                                 break;
1599                         }
1600                 }
1601         }
1602         return handled;
1603 }
1604
1605 #else /* !TRAVERSALS_TAKE_TRANSACTION_LOCK */
1606 static bool handle_backoff(struct op *op[], int fd)
1607 {
1608         return false;
1609 }
1610 #endif
1611
1612 static void derive_dependencies(char *filename[],
1613                                 struct op *op[], unsigned int num_ops[],
1614                                 unsigned int num)
1615 {
1616         struct keyinfo *hash;
1617         unsigned int h, i;
1618
1619         /* Create hash table for faster key lookup. */
1620         hash = hash_ops(op, num_ops, num);
1621
1622         /* Sort them by sequence number. */
1623         sort_ops(hash, filename, op, num);
1624
1625         /* Create dependencies back to the last change, rather than
1626          * creating false dependencies by naively making each one
1627          * depend on the previous.  This has two purposes: it makes
1628          * later optimization simpler, and it also avoids deadlock with
1629          * same sequence number ops inside traversals (if one
1630          * traversal doesn't write anything, two ops can have the same
1631          * sequence number yet we can create a traversal dependency
1632          * the other way). */
1633         for (h = 0; h < total_keys * 2; h++) {
1634                 int prev = -1;
1635
1636                 if (hash[h].num_users < 2)
1637                         continue;
1638
1639                 for (i = 0; i < hash[h].num_users; i++) {
1640                         if (changes_db(&hash[h].key, &op[hash[h].user[i].file]
1641                                        [hash[h].user[i].op_num])) {
1642                                 depend_on_previous(op, filename, num,
1643                                                    hash[h].user, i, prev);
1644                                 prev = i;
1645                         } else if (prev >= 0)
1646                                 add_dependency(hash, op, filename,
1647                                                &hash[h].user[i],
1648                                                &hash[h].user[prev]);
1649                 }
1650         }
1651
1652 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
1653         make_traverse_depends(filename, op, num_ops, num);
1654 #endif
1655
1656         optimize_dependencies(op, num_ops, num);
1657 }
1658
1659 static struct timeval run_test(char *argv[],
1660                                unsigned int num_ops[],
1661                                unsigned int hashsize[],
1662                                unsigned int tdb_flags[],
1663                                unsigned int open_flags[],
1664                                struct op *op[],
1665                                int fds[2])
1666 {
1667         unsigned int i;
1668         struct timeval start, end, diff;
1669         bool ok = true;
1670
1671         for (i = 0; argv[i+2]; i++) {
1672                 struct tdb_context *tdb;
1673                 char c;
1674
1675                 switch (fork()) {
1676                 case -1:
1677                         err(1, "fork failed");
1678                 case 0:
1679                         close(fds[1]);
1680                         tdb = tdb_open_ex(argv[1], hashsize[i],
1681                                           tdb_flags[i]|TDB_NOSYNC,
1682                                           open_flags[i], 0600, NULL, hash_key);
1683                         if (!tdb)
1684                                 err(1, "Opening tdb %s", argv[1]);
1685
1686                         /* This catches parent exiting. */
1687                         if (read(fds[0], &c, 1) != 1)
1688                                 exit(1);
1689                         run_ops(tdb, pipes[i].fd[0], argv+2, op, i, 1,
1690                                 num_ops[i], false);
1691                         check_deps(argv[2+i], op[i], num_ops[i]);
1692                         exit(0);
1693                 default:
1694                         break;
1695                 }
1696         }
1697
1698         /* Let everything settle. */
1699         sleep(1);
1700
1701         printf("Starting run...");
1702         fflush(stdout);
1703         gettimeofday(&start, NULL);
1704         /* Tell them all to go!  Any write of sufficient length will do. */
1705         if (write(fds[1], hashsize, i) != i)
1706                 err(1, "Writing to wakeup pipe");
1707
1708         for (i = 0; argv[i + 2]; i++) {
1709                 int status;
1710                 wait(&status);
1711                 if (!WIFEXITED(status)) {
1712                         warnx("Child died with signal %i", WTERMSIG(status));
1713                         ok = false;
1714                 } else if (WEXITSTATUS(status) != 0)
1715                         /* Assume child spat out error. */
1716                         ok = false;
1717         }
1718         if (!ok)
1719                 exit(1);
1720
1721         gettimeofday(&end, NULL);
1722         printf("done\n");
1723
1724         if (end.tv_usec < start.tv_usec) {
1725                 end.tv_usec += 1000000;
1726                 end.tv_sec--;
1727         }
1728         diff.tv_sec = end.tv_sec - start.tv_sec;
1729         diff.tv_usec = end.tv_usec - start.tv_usec;
1730         return diff;
1731 }
1732
1733 int main(int argc, char *argv[])
1734 {
1735         struct timeval diff;
1736         unsigned int i, num_ops[argc], hashsize[argc], tdb_flags[argc], open_flags[argc];
1737         struct op *op[argc];
1738         int fds[2];
1739
1740         if (argc < 3)
1741                 errx(1, "Usage: %s <tdbfile> <tracefile>...", argv[0]);
1742
1743         pipes = talloc_array(NULL, struct pipe, argc - 1);
1744         for (i = 0; i < argc - 2; i++) {
1745                 printf("Loading tracefile %s...", argv[2+i]);
1746                 fflush(stdout);
1747                 op[i] = load_tracefile(argv[2+i], &num_ops[i], &hashsize[i],
1748                                        &tdb_flags[i], &open_flags[i]);
1749                 if (pipe(pipes[i].fd) != 0)
1750                         err(1, "creating pipe");
1751                 printf("done\n");
1752         }
1753
1754         printf("Calculating inter-dependencies...");
1755         fflush(stdout);
1756         derive_dependencies(argv+2, op, num_ops, i);
1757         printf("done\n");
1758
1759         /* Don't fork for single arg case: simple debugging. */
1760         if (argc == 3) {
1761                 struct tdb_context *tdb;
1762                 tdb = tdb_open_ex(argv[1], hashsize[0], tdb_flags[0]|TDB_NOSYNC,
1763                                   open_flags[0], 0600, NULL, hash_key);
1764                 printf("Single threaded run...");
1765                 fflush(stdout);
1766
1767                 run_ops(tdb, pipes[0].fd[0], argv+2, op, 0, 1, num_ops[0],
1768                         false);
1769                 check_deps(argv[2], op[0], num_ops[0]);
1770
1771                 printf("done\n");
1772                 exit(0);
1773         }
1774
1775         if (pipe(fds) != 0)
1776                 err(1, "creating pipe");
1777
1778 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
1779         if (pipe(pipes[argc-2].fd) != 0)
1780                 err(1, "creating pipe");
1781         backoff_fd = pipes[argc-2].fd[1];
1782         set_nonblock(pipes[argc-2].fd[1]);
1783         set_nonblock(pipes[argc-2].fd[0]);
1784 #endif
1785
1786         do {
1787                 diff = run_test(argv, num_ops, hashsize, tdb_flags, open_flags,
1788                                 op, fds);
1789         } while (handle_backoff(op, pipes[argc-2].fd[0]));
1790
1791         printf("Time replaying: %lu usec\n",
1792                diff.tv_sec * 1000000UL + diff.tv_usec);
1793         
1794         exit(0);
1795 }