]> git.ozlabs.org Git - ccan/blob - ccan/tdb/tools/replay_trace.c
Handle pre-existing records, wipe_all and repack (ldb users.ldb trace)
[ccan] / ccan / tdb / tools / replay_trace.c
1 #include <ccan/tdb/tdb.h>
2 #include <ccan/grab_file/grab_file.h>
3 #include <ccan/hash/hash.h>
4 #include <ccan/talloc/talloc.h>
5 #include <ccan/str_talloc/str_talloc.h>
6 #include <ccan/str/str.h>
7 #include <ccan/list/list.h>
8 #include <err.h>
9 #include <ctype.h>
10 #include <string.h>
11 #include <unistd.h>
12 #include <sys/types.h>
13 #include <sys/wait.h>
14 #include <sys/time.h>
15 #include <errno.h>
16 #include <signal.h>
17 #include <assert.h>
18 #include <fcntl.h>
19
20 #define STRINGIFY2(x) #x
21 #define STRINGIFY(x) STRINGIFY2(x)
22
23 /* Avoid mod by zero */
24 static unsigned int total_keys = 1;
25
26 /* All the wipe_all ops. */
27 static struct op_desc *wipe_alls = NULL;
28 static unsigned int num_wipe_alls = 0;
29
30 /* #define DEBUG_DEPS 1 */
31
32 /* Traversals block transactions in the current implementation. */
33 #define TRAVERSALS_TAKE_TRANSACTION_LOCK 1
34
35 struct pipe {
36         int fd[2];
37 };
38 static struct pipe *pipes;
39 static int backoff_fd = -1;
40
41 static void __attribute__((noreturn)) fail(const char *filename,
42                                            unsigned int line,
43                                            const char *fmt, ...)
44 {
45         va_list ap;
46
47         va_start(ap, fmt);
48         fprintf(stderr, "%s:%u: FAIL: ", filename, line);
49         vfprintf(stderr, fmt, ap);
50         fprintf(stderr, "\n");
51         va_end(ap);
52         exit(1);
53 }
54         
55 /* Try or die. */
56 #define try(expr, expect)                                               \
57         do {                                                            \
58                 int ret = (expr);                                       \
59                 if (ret != (expect))                                    \
60                         fail(filename[file], i+1,                       \
61                              STRINGIFY(expr) "= %i", ret);              \
62         } while (0)
63
64 /* Try or imitate results. */
65 #define unreliable(expr, expect, force, undo)                           \
66         do {                                                            \
67                 int ret = expr;                                         \
68                 if (ret != expect) {                                    \
69                         fprintf(stderr, "%s:%u: %s gave %i not %i",     \
70                                 filename[file], i+1, STRINGIFY(expr),   \
71                                 ret, expect);                           \
72                         if (expect == 0)                                \
73                                 force;                                  \
74                         else                                            \
75                                 undo;                                   \
76                 }                                                       \
77         } while (0)
78
79 static bool key_eq(TDB_DATA a, TDB_DATA b)
80 {
81         if (a.dsize != b.dsize)
82                 return false;
83         return memcmp(a.dptr, b.dptr, a.dsize) == 0;
84 }
85
86 /* This is based on the hash algorithm from gdbm */
87 static unsigned int hash_key(TDB_DATA *key)
88 {
89         uint32_t value; /* Used to compute the hash value.  */
90         uint32_t   i;   /* Used to cycle through random values. */
91
92         /* Set the initial value from the key size. */
93         for (value = 0x238F13AF ^ key->dsize, i=0; i < key->dsize; i++)
94                 value = (value + (key->dptr[i] << (i*5 % 24)));
95
96         return (1103515243 * value + 12345);  
97 }
98
99 enum op_type {
100         OP_TDB_LOCKALL,
101         OP_TDB_LOCKALL_MARK,
102         OP_TDB_LOCKALL_UNMARK,
103         OP_TDB_LOCKALL_NONBLOCK,
104         OP_TDB_UNLOCKALL,
105         OP_TDB_LOCKALL_READ,
106         OP_TDB_LOCKALL_READ_NONBLOCK,
107         OP_TDB_UNLOCKALL_READ,
108         OP_TDB_CHAINLOCK,
109         OP_TDB_CHAINLOCK_NONBLOCK,
110         OP_TDB_CHAINLOCK_MARK,
111         OP_TDB_CHAINLOCK_UNMARK,
112         OP_TDB_CHAINUNLOCK,
113         OP_TDB_CHAINLOCK_READ,
114         OP_TDB_CHAINUNLOCK_READ,
115         OP_TDB_PARSE_RECORD,
116         OP_TDB_EXISTS,
117         OP_TDB_STORE,
118         OP_TDB_APPEND,
119         OP_TDB_GET_SEQNUM,
120         OP_TDB_WIPE_ALL,
121         OP_TDB_TRANSACTION_START,
122         OP_TDB_TRANSACTION_CANCEL,
123         OP_TDB_TRANSACTION_PREPARE_COMMIT,
124         OP_TDB_TRANSACTION_COMMIT,
125         OP_TDB_TRAVERSE_READ_START,
126         OP_TDB_TRAVERSE_START,
127         OP_TDB_TRAVERSE_END,
128         OP_TDB_TRAVERSE,
129         OP_TDB_TRAVERSE_END_EARLY,
130         OP_TDB_FIRSTKEY,
131         OP_TDB_NEXTKEY,
132         OP_TDB_FETCH,
133         OP_TDB_DELETE,
134         OP_TDB_REPACK,
135 };
136
137 struct op {
138         unsigned int seqnum;
139         enum op_type type;
140         TDB_DATA key;
141         TDB_DATA data;
142         int ret;
143
144         /* Who is waiting for us? */
145         struct list_head post;
146         /* What are we waiting for? */
147         struct list_head pre;
148
149         /* If I'm part of a group (traverse/transaction) where is
150          * start?  (Otherwise, 0) */
151         unsigned int group_start;
152
153         union {
154                 int flag; /* open and store */
155                 struct {  /* append */
156                         TDB_DATA pre;
157                         TDB_DATA post;
158                 } append;
159                 /* transaction/traverse start/chainlock */
160                 unsigned int group_len;
161         };
162 };
163
164 struct op_desc {
165         unsigned int file;
166         unsigned int op_num;
167 };
168
169 static unsigned char hex_char(const char *filename, unsigned int line, char c)
170 {
171         c = toupper(c);
172         if (c >= 'A' && c <= 'F')
173                 return c - 'A' + 10;
174         if (c >= '0' && c <= '9')
175                 return c - '0';
176         fail(filename, line, "invalid hex character '%c'", c);
177 }
178
179 /* TDB data is <size>:<%02x>* */
180 static TDB_DATA make_tdb_data(const void *ctx,
181                               const char *filename, unsigned int line,
182                               const char *word)
183 {
184         TDB_DATA data;
185         unsigned int i;
186         const char *p;
187
188         if (streq(word, "NULL"))
189                 return tdb_null;
190
191         data.dsize = atoi(word);
192         data.dptr = talloc_array(ctx, unsigned char, data.dsize);
193         p = strchr(word, ':');
194         if (!p)
195                 fail(filename, line, "invalid tdb data '%s'", word);
196         p++;
197         for (i = 0; i < data.dsize; i++)
198                 data.dptr[i] = hex_char(filename, line, p[i*2])*16
199                         + hex_char(filename, line, p[i*2+1]);
200
201         return data;
202 }
203
204 static void add_op(const char *filename, struct op **op, unsigned int i,
205                    unsigned int seqnum, enum op_type type)
206 {
207         struct op *new;
208         *op = talloc_realloc(NULL, *op, struct op, i+1);
209         new = (*op) + i;
210         new->type = type;
211         new->seqnum = seqnum;
212         new->ret = 0;
213         new->group_start = 0;
214 }
215
216 static void op_add_nothing(char *filename[], struct op op[],
217                            unsigned file, unsigned op_num, char *words[])
218 {
219         if (words[2])
220                 fail(filename[file], op_num+1, "Expected no arguments");
221         op[op_num].key = tdb_null;
222 }
223
224 static void op_add_key(char *filename[], struct op op[],
225                        unsigned file, unsigned op_num, char *words[])
226 {
227         if (words[2] == NULL || words[3])
228                 fail(filename[file], op_num+1, "Expected just a key");
229
230         op[op_num].key = make_tdb_data(op, filename[file], op_num+1, words[2]);
231         total_keys++;
232 }
233
234 static void op_add_key_ret(char *filename[], struct op op[],
235                            unsigned file, unsigned op_num, char *words[])
236 {
237         if (!words[2] || !words[3] || !words[4] || words[5]
238             || !streq(words[3], "="))
239                 fail(filename[file], op_num+1, "Expected <key> = <ret>");
240         op[op_num].ret = atoi(words[4]);
241         op[op_num].key = make_tdb_data(op, filename[file], op_num+1, words[2]);
242         /* May only be a unique key if it fails */
243         if (op[op_num].ret != 0)
244                 total_keys++;
245 }
246
247 static void op_add_key_data(char *filename[], struct op op[],
248                             unsigned file, unsigned op_num, char *words[])
249 {
250         if (!words[2] || !words[3] || !words[4] || words[5]
251             || !streq(words[3], "="))
252                 fail(filename[file], op_num+1, "Expected <key> = <data>");
253         op[op_num].key = make_tdb_data(op, filename[file], op_num+1, words[2]);
254         op[op_num].data = make_tdb_data(op, filename[file], op_num+1, words[4]);
255         /* Likely only be a unique key if it fails */
256         if (!op[op_num].data.dptr)
257                 total_keys++;
258         else if (random() % 2)
259                 total_keys++;
260 }
261
262 /* We don't record the keys or data for a traverse, as we don't use them. */
263 static void op_add_traverse(char *filename[], struct op op[],
264                             unsigned file, unsigned op_num, char *words[])
265 {
266         if (!words[2] || !words[3] || !words[4] || words[5]
267             || !streq(words[3], "="))
268                 fail(filename[file], op_num+1, "Expected <key> = <data>");
269         op[op_num].key = tdb_null;
270 }
271
272 /* Full traverse info is useful for debugging, but changing it to
273  * "traversefn" without the data makes the traces *much* smaller! */
274 static void op_add_traversefn(char *filename[], struct op op[],
275                             unsigned file, unsigned op_num, char *words[])
276 {
277         if (words[2])
278                 fail(filename[file], op_num+1, "Expected no values");
279         op[op_num].key = tdb_null;
280 }
281
282 /* <seqnum> tdb_store <rec> <rec> <flag> = <ret> */
283 static void op_add_store(char *filename[], struct op op[],
284                          unsigned file, unsigned op_num, char *words[])
285 {
286         if (!words[2] || !words[3] || !words[4] || !words[5] || !words[6]
287             || words[7] || !streq(words[5], "="))
288                 fail(filename[file], op_num+1, "Expect <key> <data> <flag> = <ret>");
289
290         op[op_num].flag = strtoul(words[4], NULL, 0);
291         op[op_num].ret = atoi(words[6]);
292         op[op_num].key = make_tdb_data(op, filename[file], op_num+1, words[2]);
293         op[op_num].data = make_tdb_data(op, filename[file], op_num+1, words[3]);
294         total_keys++;
295 }
296
297 /* <seqnum> tdb_append <rec> <rec> = <rec> */
298 static void op_add_append(char *filename[], struct op op[],
299                           unsigned file, unsigned op_num, char *words[])
300 {
301         if (!words[2] || !words[3] || !words[4] || !words[5] || words[6]
302             || !streq(words[4], "="))
303                 fail(filename[file], op_num+1, "Expect <key> <data> = <rec>");
304
305         op[op_num].key = make_tdb_data(op, filename[file], op_num+1, words[2]);
306         op[op_num].data = make_tdb_data(op, filename[file], op_num+1, words[3]);
307
308         op[op_num].append.post
309                 = make_tdb_data(op, filename[file], op_num+1, words[5]);
310
311         /* By subtraction, figure out what previous data was. */
312         op[op_num].append.pre.dptr = op[op_num].append.post.dptr;
313         op[op_num].append.pre.dsize
314                 = op[op_num].append.post.dsize - op[op_num].data.dsize;
315         total_keys++;
316 }
317
318 /* <seqnum> tdb_get_seqnum = <ret> */
319 static void op_add_seqnum(char *filename[], struct op op[],
320                           unsigned file, unsigned op_num, char *words[])
321 {
322         if (!words[2] || !words[3] || words[4] || !streq(words[2], "="))
323                 fail(filename[file], op_num+1, "Expect = <ret>");
324
325         op[op_num].key = tdb_null;
326         op[op_num].ret = atoi(words[3]);
327 }
328
329 static void op_add_traverse_start(char *filename[], struct op op[],
330                                   unsigned file, unsigned op_num, char *words[])
331 {
332         if (words[2])
333                 fail(filename[file], op_num+1, "Expect no arguments");
334
335         op[op_num].key = tdb_null;
336         op[op_num].group_len = 0;
337 }
338
339 static void op_add_transaction(char *filename[], struct op op[],
340                                unsigned file, unsigned op_num, char *words[])
341 {
342         if (words[2])
343                 fail(filename[file], op_num+1, "Expect no arguments");
344
345         op[op_num].key = tdb_null;
346         op[op_num].group_len = 0;
347 }
348
349 static void op_add_chainlock(char *filename[], struct op op[],
350                              unsigned file, unsigned op_num, char *words[])
351 {
352         if (words[2] == NULL || words[3])
353                 fail(filename[file], op_num+1, "Expected just a key");
354
355         /* A chainlock key isn't a key in the normal sense; it doesn't
356          * have to be in the db at all.  Also, we don't want to hash this op. */
357         op[op_num].data = make_tdb_data(op, filename[file], op_num+1, words[2]);
358         op[op_num].key = tdb_null;
359         op[op_num].group_len = 0;
360 }
361
362 static void op_add_chainlock_ret(char *filename[], struct op op[],
363                                  unsigned file, unsigned op_num, char *words[])
364 {
365         if (!words[2] || !words[3] || !words[4] || words[5]
366             || !streq(words[3], "="))
367                 fail(filename[file], op_num+1, "Expected <key> = <ret>");
368         op[op_num].ret = atoi(words[4]);
369         op[op_num].data = make_tdb_data(op, filename[file], op_num+1, words[2]);
370         op[op_num].key = tdb_null;
371         op[op_num].group_len = 0;
372         total_keys++;
373 }
374
375 static void op_add_wipe_all(char *filename[], struct op op[],
376                             unsigned file, unsigned op_num, char *words[])
377 {
378         if (words[2])
379                 fail(filename[file], op_num+1, "Expected no arguments");
380         op[op_num].key = tdb_null;
381         wipe_alls = talloc_realloc(NULL, wipe_alls, struct op_desc,
382                                    num_wipe_alls+1);
383         wipe_alls[num_wipe_alls].file = file;
384         wipe_alls[num_wipe_alls].op_num = op_num;
385         num_wipe_alls++;
386 }
387
388 static int op_find_start(struct op op[], unsigned int op_num, enum op_type type)
389 {
390         unsigned int i;
391
392         for (i = op_num-1; i > 0; i--) {
393                 if (op[i].type == type && !op[i].group_len)
394                         return i;
395         }
396         return 0;
397 }
398
399 static void op_analyze_transaction(char *filename[], struct op op[],
400                                    unsigned file, unsigned op_num,
401                                    char *words[])
402 {
403         unsigned int start, i;
404
405         op[op_num].key = tdb_null;
406
407         if (words[2])
408                 fail(filename[file], op_num+1, "Expect no arguments");
409
410         start = op_find_start(op, op_num, OP_TDB_TRANSACTION_START);
411         if (!start)
412                 fail(filename[file], op_num+1, "no transaction start found");
413
414         op[start].group_len = op_num - start;
415
416         /* This rolls in nested transactions.  I think that's right. */
417         for (i = start; i <= op_num; i++)
418                 op[i].group_start = start;
419 }
420
421 /* We treat chainlocks a lot like transactions, even though that's overkill */
422 static void op_analyze_chainlock(char *filename[], struct op op[],
423                                  unsigned file, unsigned op_num, char *words[])
424 {
425         unsigned int i, start;
426
427         if (words[2] == NULL || words[3])
428                 fail(filename[file], op_num+1, "Expected just a key");
429
430         op[op_num].data = make_tdb_data(op, filename[file], op_num+1, words[2]);
431         op[op_num].key = tdb_null;
432         total_keys++;
433
434         start = op_find_start(op, op_num, OP_TDB_CHAINLOCK);
435         if (!start)
436                 start = op_find_start(op, op_num, OP_TDB_CHAINLOCK_READ);
437         if (!start)
438                 fail(filename[file], op_num+1, "no initial chainlock found");
439
440         /* FIXME: We'd have to do something clever to make this work
441          * vs. deadlock. */
442         if (!key_eq(op[start].data, op[op_num].data))
443                 fail(filename[file], op_num+1, "nested chainlock calls?");
444
445         op[start].group_len = op_num - start;
446         for (i = start; i <= op_num; i++)
447                 op[i].group_start = start;
448 }
449
450 static void op_analyze_traverse(char *filename[], struct op op[],
451                                 unsigned file, unsigned op_num, char *words[])
452 {
453         int i, start;
454
455         op[op_num].key = tdb_null;
456
457         /* = %u means traverse function terminated. */
458         if (words[2]) {
459                 if (!streq(words[2], "=") || !words[3] || words[4])
460                         fail(filename[file], op_num+1, "expect = <num>");
461                 op[op_num].ret = atoi(words[3]);
462         } else
463                 op[op_num].ret = 0;
464
465         start = op_find_start(op, op_num, OP_TDB_TRAVERSE_START);
466         if (!start)
467                 start = op_find_start(op, op_num, OP_TDB_TRAVERSE_READ_START);
468         if (!start)
469                 fail(filename[file], op_num+1, "no traversal start found");
470
471         op[start].group_len = op_num - start;
472
473         /* Don't roll in nested traverse/chainlock */
474         for (i = start; i <= op_num; i++)
475                 if (!op[i].group_start)
476                         op[i].group_start = start;
477 }
478
479 /* Keep -Wmissing-declarations happy: */
480 const struct op_table *
481 find_keyword (register const char *str, register unsigned int len);
482
483 #include "keywords.c"
484
485 struct depend {
486         /* We can have more than one */
487         struct list_node pre_list;
488         struct list_node post_list;
489         struct op_desc needs;
490         struct op_desc prereq;
491 };
492
493 static void check_deps(const char *filename, struct op op[], unsigned int num)
494 {
495 #ifdef DEBUG_DEPS
496         unsigned int i;
497
498         for (i = 1; i < num; i++)
499                 if (!list_empty(&op[i].pre))
500                         fail(filename, i+1, "Still has dependencies");
501 #endif
502 }
503
504 static void dump_pre(char *filename[], struct op *op[],
505                      unsigned int file, unsigned int i)
506 {
507         struct depend *dep;
508
509         printf("%s:%u (%u) still waiting for:\n", filename[file], i+1,
510                 op[file][i].seqnum);
511         list_for_each(&op[file][i].pre, dep, pre_list)
512                 printf("    %s:%u (%u)\n",
513                        filename[dep->prereq.file], dep->prereq.op_num+1,
514                        op[dep->prereq.file][dep->prereq.op_num].seqnum);
515         check_deps(filename[file], op[file], i);
516 }
517
518 /* We simply read/write pointers, since we all are children. */
519 static bool do_pre(struct tdb_context *tdb,
520                    char *filename[], struct op *op[],
521                    unsigned int file, int pre_fd, unsigned int i,
522                    bool backoff)
523 {
524         while (!list_empty(&op[file][i].pre)) {
525                 struct depend *dep;
526
527 #if DEBUG_DEPS
528                 printf("%s:%u:waiting for pre\n", filename[file], i+1);
529                 fflush(stdout);
530 #endif
531                 if (backoff)
532                         alarm(2);
533                 else
534                         alarm(10);
535                 while (read(pre_fd, &dep, sizeof(dep)) != sizeof(dep)) {
536                         if (errno == EINTR) {
537                                 if (backoff) {
538                                         struct op_desc desc = { file,i };
539                                         warnx("%s:%u:avoiding deadlock",
540                                               filename[file], i+1);
541                                         if (write(backoff_fd, &desc,
542                                                   sizeof(desc)) != sizeof(desc))
543                                                 err(1, "writing backoff_fd");
544                                         return false;
545                                 }
546                                 dump_pre(filename, op, file, i);
547                                 exit(1);
548                         } else
549                                 errx(1, "Reading from pipe");
550                 }
551                 alarm(0);
552
553 #if DEBUG_DEPS
554                 printf("%s:%u:got pre %u from %s:%u\n", filename[file], i+1,
555                        dep->needs.op_num+1, filename[dep->prereq.file],
556                        dep->prereq.op_num+1);
557                 fflush(stdout);
558 #endif
559                 /* This could be any op, not just this one. */
560                 talloc_free(dep);
561         }
562         return true;
563 }
564
565 static void do_post(char *filename[], struct op *op[],
566                     unsigned int file, unsigned int i)
567 {
568         struct depend *dep;
569
570         list_for_each(&op[file][i].post, dep, post_list) {
571 #if DEBUG_DEPS
572                 printf("%s:%u:sending to file %s:%u\n", filename[file], i+1,
573                        filename[dep->needs.file], dep->needs.op_num+1);
574 #endif
575                 if (write(pipes[dep->needs.file].fd[1], &dep, sizeof(dep))
576                     != sizeof(dep))
577                         err(1, "%s:%u failed to tell file %s",
578                             filename[file], i+1, filename[dep->needs.file]);
579         }
580 }
581
582 static int get_len(TDB_DATA key, TDB_DATA data, void *private_data)
583 {
584         return data.dsize;
585 }
586
587 static unsigned run_ops(struct tdb_context *tdb,
588                         int pre_fd,
589                         char *filename[],
590                         struct op *op[],
591                         unsigned int file,
592                         unsigned int start, unsigned int stop,
593                         bool backoff);
594
595 struct traverse_info {
596         struct op **op;
597         char **filename;
598         unsigned file;
599         int pre_fd;
600         unsigned int start;
601         unsigned int i;
602 };
603
604 /* More complex.  Just do whatever's they did at the n'th entry. */
605 static int nontrivial_traverse(struct tdb_context *tdb,
606                                TDB_DATA key, TDB_DATA data,
607                                void *_tinfo)
608 {
609         struct traverse_info *tinfo = _tinfo;
610         unsigned int trav_len = tinfo->op[tinfo->file][tinfo->start].group_len;
611         bool avoid_deadlock = false;
612
613         if (tinfo->i == tinfo->start + trav_len) {
614                 /* This can happen if traverse expects to be empty. */
615                 if (trav_len == 1)
616                         return 1;
617                 fail(tinfo->filename[tinfo->file], tinfo->start + 1,
618                      "traverse did not terminate");
619         }
620
621         if (tinfo->op[tinfo->file][tinfo->i].type != OP_TDB_TRAVERSE)
622                 fail(tinfo->filename[tinfo->file], tinfo->start + 1,
623                      "%s:%u:traverse terminated early");
624
625 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
626         avoid_deadlock = true;
627 #endif
628
629         /* Run any normal ops. */
630         tinfo->i = run_ops(tdb, tinfo->pre_fd, tinfo->filename, tinfo->op,
631                            tinfo->file, tinfo->i+1, tinfo->start + trav_len,
632                            avoid_deadlock);
633
634         /* We backed off, or we hit OP_TDB_TRAVERSE_END/EARLY. */
635         if (tinfo->op[tinfo->file][tinfo->i].type != OP_TDB_TRAVERSE)
636                 return 1;
637
638         return 0;
639 }
640
641 static unsigned op_traverse(struct tdb_context *tdb,
642                             int pre_fd,
643                             char *filename[],
644                             unsigned int file,
645                             int (*traversefn)(struct tdb_context *,
646                                               tdb_traverse_func, void *),
647                             struct op *op[],
648                             unsigned int start)
649 {
650         struct traverse_info tinfo = { op, filename, file, pre_fd,
651                                        start, start+1 };
652
653         traversefn(tdb, nontrivial_traverse, &tinfo);
654
655         /* Traversing in wrong order can have strange effects: eg. if
656          * original traverse went A (delete A), B, we might do B
657          * (delete A).  So if we have ops left over, we do it now. */
658         while (tinfo.i != start + op[file][start].group_len) {
659                 if (op[file][tinfo.i].type == OP_TDB_TRAVERSE
660                     || op[file][tinfo.i].type == OP_TDB_TRAVERSE_END_EARLY)
661                         tinfo.i++;
662                 else
663                         tinfo.i = run_ops(tdb, pre_fd, filename, op, file,
664                                           tinfo.i,
665                                           start + op[file][start].group_len,
666                                           false);
667         }
668
669         return tinfo.i;
670 }
671
672 static void break_out(int sig)
673 {
674 }
675
676 static __attribute__((noinline))
677 unsigned run_ops(struct tdb_context *tdb,
678                  int pre_fd,
679                  char *filename[],
680                  struct op *op[],
681                  unsigned int file,
682                  unsigned int start, unsigned int stop,
683                  bool backoff)
684 {
685         unsigned int i;
686         struct sigaction sa;
687
688         sa.sa_handler = break_out;
689         sa.sa_flags = 0;
690
691         sigaction(SIGALRM, &sa, NULL);
692         for (i = start; i < stop; i++) {
693                 if (!do_pre(tdb, filename, op, file, pre_fd, i, backoff))
694                         return i;
695
696                 switch (op[file][i].type) {
697                 case OP_TDB_LOCKALL:
698                         try(tdb_lockall(tdb), op[file][i].ret);
699                         break;
700                 case OP_TDB_LOCKALL_MARK:
701                         try(tdb_lockall_mark(tdb), op[file][i].ret);
702                         break;
703                 case OP_TDB_LOCKALL_UNMARK:
704                         try(tdb_lockall_unmark(tdb), op[file][i].ret);
705                         break;
706                 case OP_TDB_LOCKALL_NONBLOCK:
707                         unreliable(tdb_lockall_nonblock(tdb), op[file][i].ret,
708                                    tdb_lockall(tdb), tdb_unlockall(tdb));
709                         break;
710                 case OP_TDB_UNLOCKALL:
711                         try(tdb_unlockall(tdb), op[file][i].ret);
712                         break;
713                 case OP_TDB_LOCKALL_READ:
714                         try(tdb_lockall_read(tdb), op[file][i].ret);
715                         break;
716                 case OP_TDB_LOCKALL_READ_NONBLOCK:
717                         unreliable(tdb_lockall_read_nonblock(tdb),
718                                    op[file][i].ret,
719                                    tdb_lockall_read(tdb),
720                                    tdb_unlockall_read(tdb));
721                         break;
722                 case OP_TDB_UNLOCKALL_READ:
723                         try(tdb_unlockall_read(tdb), op[file][i].ret);
724                         break;
725                 case OP_TDB_CHAINLOCK:
726                         try(tdb_chainlock(tdb, op[file][i].key),
727                             op[file][i].ret);
728                         break;
729                 case OP_TDB_CHAINLOCK_NONBLOCK:
730                         unreliable(tdb_chainlock_nonblock(tdb, op[file][i].key),
731                                    op[file][i].ret,
732                                    tdb_chainlock(tdb, op[file][i].key),
733                                    tdb_chainunlock(tdb, op[file][i].key));
734                         break;
735                 case OP_TDB_CHAINLOCK_MARK:
736                         try(tdb_chainlock_mark(tdb, op[file][i].key),
737                             op[file][i].ret);
738                         break;
739                 case OP_TDB_CHAINLOCK_UNMARK:
740                         try(tdb_chainlock_unmark(tdb, op[file][i].key),
741                             op[file][i].ret);
742                         break;
743                 case OP_TDB_CHAINUNLOCK:
744                         try(tdb_chainunlock(tdb, op[file][i].key),
745                             op[file][i].ret);
746                         break;
747                 case OP_TDB_CHAINLOCK_READ:
748                         try(tdb_chainlock_read(tdb, op[file][i].key),
749                             op[file][i].ret);
750                         break;
751                 case OP_TDB_CHAINUNLOCK_READ:
752                         try(tdb_chainunlock_read(tdb, op[file][i].key),
753                             op[file][i].ret);
754                         break;
755                 case OP_TDB_PARSE_RECORD:
756                         try(tdb_parse_record(tdb, op[file][i].key, get_len,
757                                              NULL),
758                             op[file][i].ret);
759                         break;
760                 case OP_TDB_EXISTS:
761                         try(tdb_exists(tdb, op[file][i].key), op[file][i].ret);
762                         break;
763                 case OP_TDB_STORE:
764                         try(tdb_store(tdb, op[file][i].key, op[file][i].data,
765                                       op[file][i].flag),
766                             op[file][i].ret);
767                         break;
768                 case OP_TDB_APPEND:
769                         try(tdb_append(tdb, op[file][i].key, op[file][i].data),
770                             op[file][i].ret);
771                         break;
772                 case OP_TDB_GET_SEQNUM:
773                         try(tdb_get_seqnum(tdb), op[file][i].ret);
774                         break;
775                 case OP_TDB_WIPE_ALL:
776                         try(tdb_wipe_all(tdb), op[file][i].ret);
777                         break;
778                 case OP_TDB_TRANSACTION_START:
779                         try(tdb_transaction_start(tdb), op[file][i].ret);
780                         break;
781                 case OP_TDB_TRANSACTION_CANCEL:
782                         try(tdb_transaction_cancel(tdb), op[file][i].ret);
783                         break;
784                 case OP_TDB_TRANSACTION_PREPARE_COMMIT:
785                         try(tdb_transaction_prepare_commit(tdb),
786                             op[file][i].ret);
787                         break;
788                 case OP_TDB_TRANSACTION_COMMIT:
789                         try(tdb_transaction_commit(tdb), op[file][i].ret);
790                         break;
791                 case OP_TDB_TRAVERSE_READ_START:
792                         i = op_traverse(tdb, pre_fd, filename, file,
793                                         tdb_traverse_read, op, i);
794                         break;
795                 case OP_TDB_TRAVERSE_START:
796                         i = op_traverse(tdb, pre_fd, filename, file,
797                                         tdb_traverse, op, i);
798                         break;
799                 case OP_TDB_TRAVERSE:
800                 case OP_TDB_TRAVERSE_END_EARLY:
801                         /* Terminate: we're in a traverse, and we've
802                          * done our ops. */
803                         return i;
804                 case OP_TDB_TRAVERSE_END:
805                         fail(filename[file], i+1, "unexpected end traverse");
806                 /* FIXME: These must be treated like traverse. */
807                 case OP_TDB_FIRSTKEY:
808                         if (!key_eq(tdb_firstkey(tdb), op[file][i].data))
809                                 fail(filename[file], i+1, "bad firstkey");
810                         break;
811                 case OP_TDB_NEXTKEY:
812                         if (!key_eq(tdb_nextkey(tdb, op[file][i].key),
813                                     op[file][i].data))
814                                 fail(filename[file], i+1, "bad nextkey");
815                         break;
816                 case OP_TDB_FETCH: {
817                         TDB_DATA f = tdb_fetch(tdb, op[file][i].key);
818                         if (!key_eq(f, op[file][i].data))
819                                 fail(filename[file], i+1, "bad fetch %u",
820                                      f.dsize);
821                         break;
822                 }
823                 case OP_TDB_DELETE:
824                         try(tdb_delete(tdb, op[file][i].key), op[file][i].ret);
825                         break;
826                 case OP_TDB_REPACK:
827                         /* We do nothing here: the transaction and traverse are
828                          * traced.  It's in the trace to mark it, since it
829                          * may become unnecessary in future. */
830                         break;
831                 }
832                 do_post(filename, op, file, i);
833         }
834         return i;
835 }
836
837 /* tdbtorture, in particular, can do a tdb_close with a transaction in
838  * progress. */
839 static struct op *maybe_cancel_transaction(char *filename[], unsigned int file,
840                                            struct op *op, unsigned int *num)
841 {
842         unsigned int start = op_find_start(op, *num, OP_TDB_TRANSACTION_START);
843
844         if (start) {
845                 char *words[] = { "<unknown>", "tdb_close", NULL };
846                 add_op(filename[file], &op, *num, op[start].seqnum,
847                        OP_TDB_TRANSACTION_CANCEL);
848                 op_analyze_transaction(filename, op, file, *num, words);
849                 (*num)++;
850         }
851         return op;
852 }
853
854 static struct op *load_tracefile(char *filename[],
855                                  unsigned int file,
856                                  unsigned int *num,
857                                  unsigned int *hashsize,
858                                  unsigned int *tdb_flags,
859                                  unsigned int *open_flags)
860 {
861         unsigned int i;
862         struct op *op = talloc_array(NULL, struct op, 1);
863         char **words;
864         char **lines;
865         char *contents;
866
867         contents = grab_file(NULL, filename[file], NULL);
868         if (!contents)
869                 err(1, "Reading %s", filename[file]);
870
871         lines = strsplit(contents, contents, "\n", NULL);
872         if (!lines[0])
873                 errx(1, "%s is empty", filename[file]);
874
875         words = strsplit(lines, lines[0], " ", NULL);
876         if (!streq(words[1], "tdb_open"))
877                 fail(filename[file], 1, "does not start with tdb_open");
878
879         *hashsize = atoi(words[2]);
880         *tdb_flags = strtoul(words[3], NULL, 0);
881         *open_flags = strtoul(words[4], NULL, 0);
882
883         for (i = 1; lines[i]; i++) {
884                 const struct op_table *opt;
885
886                 words = strsplit(lines, lines[i], " ", NULL);
887                 if (!words[0] || !words[1])
888                         fail(filename[file], i+1,
889                              "Expected seqnum number and op");
890                
891                 opt = find_keyword(words[1], strlen(words[1]));
892                 if (!opt) {
893                         if (streq(words[1], "tdb_close")) {
894                                 if (lines[i+1])
895                                         fail(filename[file], i+2,
896                                              "lines after tdb_close");
897                                 *num = i;
898                                 talloc_free(lines);
899                                 return maybe_cancel_transaction(filename, file,
900                                                                 op, num);
901                         }
902                         fail(filename[file], i+1,
903                              "Unknown operation '%s'", words[1]);
904                 }
905
906                 add_op(filename[file], &op, i, atoi(words[0]), opt->type);
907                 opt->enhance_op(filename, op, file, i, words);
908         }
909
910         fprintf(stderr, "%s:%u:last operation is not tdb_close: incomplete?",
911                 filename[file], i);
912         talloc_free(lines);
913         *num = i - 1;
914         return maybe_cancel_transaction(filename, file, op, num);
915 }
916
917 /* We remember all the keys we've ever seen, and who has them. */
918 struct keyinfo {
919         TDB_DATA key;
920         unsigned int num_users;
921         struct op_desc *user;
922 };
923
924 static bool starts_transaction(const struct op *op)
925 {
926         return op->type == OP_TDB_TRANSACTION_START;
927 }
928
929 static bool in_transaction(const struct op op[], unsigned int i)
930 {
931         return op[i].group_start && starts_transaction(&op[op[i].group_start]);
932 }
933
934 static bool successful_transaction(const struct op *op)
935 {
936         return starts_transaction(op)
937                 && op[op->group_len].type == OP_TDB_TRANSACTION_COMMIT;
938 }
939
940 static bool starts_traverse(const struct op *op)
941 {
942         return op->type == OP_TDB_TRAVERSE_START
943                 || op->type == OP_TDB_TRAVERSE_READ_START;
944 }
945
946 static bool in_traverse(const struct op op[], unsigned int i)
947 {
948         return op[i].group_start && starts_traverse(&op[op[i].group_start]);
949 }
950
951 static bool starts_chainlock(const struct op *op)
952 {
953         return op->type == OP_TDB_CHAINLOCK_READ
954                 || op->type == OP_TDB_CHAINLOCK;
955 }
956
957 static bool in_chainlock(const struct op op[], unsigned int i)
958 {
959         return op[i].group_start && starts_chainlock(&op[op[i].group_start]);
960 }
961
962 static const TDB_DATA must_not_exist;
963 static const TDB_DATA must_exist;
964 static const TDB_DATA not_exists_or_empty;
965
966 /* NULL means doesn't care if it exists or not, &must_exist means
967  * it must exist but we don't care what, &must_not_exist means it must
968  * not exist, otherwise the data it needs. */
969 static const TDB_DATA *needs(const TDB_DATA *key, const struct op *op)
970 {
971         /* Look through for an op in this transaction which needs this key. */
972         if (starts_transaction(op) || starts_chainlock(op)) {
973                 unsigned int i;
974                 const TDB_DATA *need = NULL;
975
976                 for (i = 1; i < op->group_len; i++) {
977                         if (key_eq(op[i].key, *key)
978                             || op[i].type == OP_TDB_WIPE_ALL) {
979                                 need = needs(key, &op[i]);
980                                 /* tdb_exists() is special: there might be
981                                  * something in the transaction with more
982                                  * specific requirements.  Other ops don't have
983                                  * specific requirements (eg. store or delete),
984                                  * but they change the value so we can't get
985                                  * more information from future ops. */
986                                 if (op[i].type != OP_TDB_EXISTS)
987                                         break;
988                         }
989                 }
990
991                 return need;
992         }
993
994         switch (op->type) {
995         /* FIXME: Pull forward deps, since we can deadlock */
996         case OP_TDB_CHAINLOCK:
997         case OP_TDB_CHAINLOCK_NONBLOCK:
998         case OP_TDB_CHAINLOCK_MARK:
999         case OP_TDB_CHAINLOCK_UNMARK:
1000         case OP_TDB_CHAINUNLOCK:
1001         case OP_TDB_CHAINLOCK_READ:
1002         case OP_TDB_CHAINUNLOCK_READ:
1003                 return NULL;
1004
1005         case OP_TDB_APPEND:
1006                 if (op->append.pre.dsize == 0)
1007                         return &not_exists_or_empty;
1008                 return &op->append.pre;
1009
1010         case OP_TDB_STORE:
1011                 if (op->flag == TDB_INSERT) {
1012                         if (op->ret < 0)
1013                                 return &must_exist;
1014                         else
1015                                 return &must_not_exist;
1016                 } else if (op->flag == TDB_MODIFY) {
1017                         if (op->ret < 0)
1018                                 return &must_not_exist;
1019                         else
1020                                 return &must_exist;
1021                 }
1022                 /* No flags?  Don't care */
1023                 return NULL;
1024
1025         case OP_TDB_EXISTS:
1026                 if (op->ret == 1)
1027                         return &must_exist;
1028                 else
1029                         return &must_not_exist;
1030
1031         case OP_TDB_PARSE_RECORD:
1032                 if (op->ret < 0)
1033                         return &must_not_exist;
1034                 return &must_exist;
1035
1036         /* FIXME: handle these. */
1037         case OP_TDB_WIPE_ALL:
1038         case OP_TDB_FIRSTKEY:
1039         case OP_TDB_NEXTKEY:
1040         case OP_TDB_GET_SEQNUM:
1041         case OP_TDB_TRAVERSE:
1042         case OP_TDB_TRANSACTION_COMMIT:
1043         case OP_TDB_TRANSACTION_CANCEL:
1044         case OP_TDB_TRANSACTION_START:
1045                 return NULL;
1046
1047         case OP_TDB_FETCH:
1048                 if (!op->data.dptr)
1049                         return &must_not_exist;
1050                 return &op->data;
1051
1052         case OP_TDB_DELETE:
1053                 if (op->ret < 0)
1054                         return &must_not_exist;
1055                 return &must_exist;
1056
1057         default:
1058                 errx(1, "Unexpected op type %i", op->type);
1059         }
1060         
1061 }
1062
1063 /* What's the data after this op?  pre if nothing changed. */
1064 static const TDB_DATA *gives(const TDB_DATA *key, const TDB_DATA *pre,
1065                              const struct op *op)
1066 {
1067         if (starts_transaction(op) || starts_chainlock(op)) {
1068                 unsigned int i;
1069
1070                 /* Cancelled transactions don't change anything. */
1071                 if (op[op->group_len].type == OP_TDB_TRANSACTION_CANCEL)
1072                         return pre;
1073                 assert(op[op->group_len].type == OP_TDB_TRANSACTION_COMMIT
1074                        || op[op->group_len].type == OP_TDB_CHAINUNLOCK_READ
1075                        || op[op->group_len].type == OP_TDB_CHAINUNLOCK);
1076
1077                 for (i = 1; i < op->group_len; i++) {
1078                         /* This skips nested transactions, too */
1079                         if (key_eq(op[i].key, *key)
1080                             || op[i].type == OP_TDB_WIPE_ALL)
1081                                 pre = gives(key, pre, &op[i]);
1082                 }
1083                 return pre;
1084         }
1085
1086         /* Failed ops don't change state of db. */
1087         if (op->ret < 0)
1088                 return pre;
1089
1090         if (op->type == OP_TDB_DELETE || op->type == OP_TDB_WIPE_ALL)
1091                 return &tdb_null;
1092
1093         if (op->type == OP_TDB_APPEND)
1094                 return &op->append.post;
1095
1096         if (op->type == OP_TDB_STORE)
1097                 return &op->data;
1098
1099         return pre;
1100 }
1101
1102 static void add_hash_user(struct keyinfo *hash,
1103                           unsigned int h,
1104                           struct op *op[],
1105                           unsigned int file,
1106                           unsigned int op_num)
1107 {
1108         hash[h].user = talloc_realloc(hash, hash[h].user,
1109                                       struct op_desc, hash[h].num_users+1);
1110
1111         /* If it's in a transaction, it's the transaction which
1112          * matters from an analysis POV. */
1113         if (in_transaction(op[file], op_num)
1114             || in_chainlock(op[file], op_num)) {
1115                 unsigned i;
1116
1117                 op_num = op[file][op_num].group_start;
1118
1119                 /* Don't include twice. */
1120                 for (i = 0; i < hash[h].num_users; i++) {
1121                         if (hash[h].user[i].file == file
1122                             && hash[h].user[i].op_num == op_num)
1123                                 return;
1124                 }
1125         }
1126         hash[h].user[hash[h].num_users].op_num = op_num;
1127         hash[h].user[hash[h].num_users].file = file;
1128         hash[h].num_users++;
1129 }
1130
1131 static struct keyinfo *hash_ops(struct op *op[], unsigned int num_ops[],
1132                                 unsigned int num)
1133 {
1134         unsigned int i, j, h;
1135         struct keyinfo *hash;
1136
1137         hash = talloc_zero_array(op[0], struct keyinfo, total_keys*2);
1138         for (i = 0; i < num; i++) {
1139                 for (j = 1; j < num_ops[i]; j++) {
1140                         /* We can't do this on allocation, due to realloc. */
1141                         list_head_init(&op[i][j].post);
1142                         list_head_init(&op[i][j].pre);
1143
1144                         if (!op[i][j].key.dptr)
1145                                 continue;
1146
1147                         h = hash_key(&op[i][j].key) % (total_keys * 2);
1148                         while (!key_eq(hash[h].key, op[i][j].key)) {
1149                                 if (!hash[h].key.dptr) {
1150                                         hash[h].key = op[i][j].key;
1151                                         break;
1152                                 }
1153                                 h = (h + 1) % (total_keys * 2);
1154                         }
1155                         /* Might as well save some memory if we can. */
1156                         if (op[i][j].key.dptr != hash[h].key.dptr) {
1157                                 talloc_free(op[i][j].key.dptr);
1158                                 op[i][j].key.dptr = hash[h].key.dptr;
1159                         }
1160
1161                         add_hash_user(hash, h, op, i, j);
1162                 }
1163         }
1164
1165         /* Any wipe all entries need adding to all hash entries. */
1166         for (h = 0; h < total_keys*2; h++) {
1167                 if (!hash[h].num_users)
1168                         continue;
1169
1170                 for (i = 0; i < num_wipe_alls; i++)
1171                         add_hash_user(hash, h, op,
1172                                       wipe_alls[i].file, wipe_alls[i].op_num);
1173         }
1174
1175         return hash;
1176 }
1177
1178 static bool satisfies(const TDB_DATA *key, const TDB_DATA *data,
1179                       const struct op *op)
1180 {
1181         const TDB_DATA *need = needs(key, op);
1182
1183         /* Don't need anything?  Cool. */
1184         if (!need)
1185                 return true;
1186
1187         /* This should be tdb_null or a real value. */
1188         assert(data != &must_exist);
1189         assert(data != &must_not_exist);
1190         assert(data != &not_exists_or_empty);
1191
1192         /* Must not exist?  data must not exist. */
1193         if (need == &must_not_exist)
1194                 return data == &tdb_null;
1195
1196         /* Must exist? */
1197         if (need == &must_exist)
1198                 return data != &tdb_null;
1199
1200         /* Either noexist or empty. */
1201         if (need == &not_exists_or_empty)
1202                 return data->dsize == 0;
1203
1204         /* Needs something specific. */
1205         return key_eq(*data, *need);
1206 }
1207
1208 static void move_to_front(struct op_desc res[], unsigned off, unsigned elem)
1209 {
1210         if (elem != off) {
1211                 struct op_desc tmp = res[elem];
1212                 memmove(res + off + 1, res + off, (elem - off)*sizeof(res[0]));
1213                 res[off] = tmp;
1214         }
1215 }
1216
1217 static void restore_to_pos(struct op_desc res[], unsigned off, unsigned elem)
1218 {
1219         if (elem != off) {
1220                 struct op_desc tmp = res[off];
1221                 memmove(res + off, res + off + 1, (elem - off)*sizeof(res[0]));
1222                 res[elem] = tmp;
1223         }
1224 }
1225
1226 static bool sort_deps(char *filename[], struct op *op[],
1227                       struct op_desc res[],
1228                       unsigned off, unsigned num,
1229                       const TDB_DATA *key, const TDB_DATA *data,
1230                       unsigned num_files, unsigned fuzz)
1231 {
1232         unsigned int i, files_done;
1233         struct op *this_op;
1234         bool done[num_files];
1235
1236         /* None left?  We're sorted. */
1237         if (off == num)
1238                 return true;
1239
1240         /* Does this make sequence number go backwards?  Allow a little fuzz. */
1241         if (off > 0) {
1242                 int seqnum1 = op[res[off-1].file][res[off-1].op_num].seqnum;
1243                 int seqnum2 = op[res[off].file][res[off].op_num].seqnum;
1244
1245                 if (seqnum1 - seqnum2 > (int)fuzz) {
1246 #if DEBUG_DEPS
1247                         printf("Seqnum jump too far (%u -> %u)\n",
1248                                seqnum1, seqnum2);
1249 #endif
1250                         return false;
1251                 }
1252         }
1253
1254         memset(done, 0, sizeof(done));
1255
1256         /* Since ops within a trace file are ordered, we just need to figure
1257          * out which file to try next.  Since we don't take into account
1258          * inter-key relationships (which exist by virtue of trace file order),
1259          * we minimize the chance of harm by trying to keep in seqnum order. */
1260         for (files_done = 0, i = off; i < num && files_done < num_files; i++) {
1261                 if (done[res[i].file])
1262                         continue;
1263
1264                 this_op = &op[res[i].file][res[i].op_num];
1265
1266                 /* Is what we have good enough for this op? */
1267                 if (satisfies(key, data, this_op)) {
1268                         move_to_front(res, off, i);
1269                         if (sort_deps(filename, op, res, off+1, num,
1270                                       key, gives(key, data, this_op),
1271                                       num_files, fuzz))
1272                                 return true;
1273                         restore_to_pos(res, off, i);
1274                 }
1275                 done[res[i].file] = true;
1276                 files_done++;
1277         }
1278
1279         /* No combination worked. */
1280         return false;
1281 }
1282
1283 static void check_dep_sorting(struct op_desc user[], unsigned num_users,
1284                               unsigned num_files)
1285 {
1286 #if DEBUG_DEPS
1287         unsigned int i;
1288         unsigned minima[num_files];
1289
1290         memset(minima, 0, sizeof(minima));
1291         for (i = 0; i < num_users; i++) {
1292                 assert(minima[user[i].file] < user[i].op_num);
1293                 minima[user[i].file] = user[i].op_num;
1294         }
1295 #endif
1296 }
1297
1298 /* All these ops happen on the same key.  Which comes first?
1299  *
1300  * This can happen both because read ops or failed write ops don't
1301  * change sequence number, and also due to race since we access the
1302  * number unlocked (the race can cause less detectable ordering problems,
1303  * in which case we'll deadlock and report: fix manually in that case).
1304  */
1305 static bool figure_deps(char *filename[], struct op *op[],
1306                         const TDB_DATA *key, const TDB_DATA *data,
1307                         struct op_desc user[],
1308                         unsigned num_users, unsigned num_files)
1309 {
1310         unsigned int fuzz;
1311
1312         /* We prefer to keep strict seqnum order if possible: it's the
1313          * most likely.  We get more lax if that fails. */
1314         for (fuzz = 0; fuzz < 100; fuzz = (fuzz + 1)*2) {
1315                 if (sort_deps(filename, op, user, 0, num_users, key, data,
1316                               num_files, fuzz))
1317                         break;
1318         }
1319
1320         if (fuzz >= 100)
1321                 return false;
1322
1323         check_dep_sorting(user, num_users, num_files);
1324         return true;
1325 }
1326
1327 /* We're having trouble sorting out dependencies for this key.  Assume that it's
1328  * a pre-existing record in the db, so determine a likely value. */
1329 static const TDB_DATA *preexisting_data(char *filename[], struct op *op[],
1330                                         const TDB_DATA *key,
1331                                         struct op_desc *user,
1332                                         unsigned int num_users)
1333 {
1334         unsigned int i;
1335         const TDB_DATA *data;
1336
1337         for (i = 0; i < num_users; i++) {
1338                 data = needs(key, &op[user->file][user->op_num]);
1339                 if (data && data != &must_not_exist) {
1340                         printf("%s:%u: needs pre-existing record\n",
1341                                filename[user->file], user->op_num+1);
1342                         return data;
1343                 }
1344         }
1345         return &tdb_null;
1346 }
1347
1348 static void sort_ops(struct tdb_context *tdb,
1349                      struct keyinfo hash[], char *filename[], struct op *op[],
1350                      unsigned int num)
1351 {
1352         unsigned int h;
1353
1354         /* Gcc nexted function extension.  How cool is this? */
1355         int compare_seqnum(const void *_a, const void *_b)
1356         {
1357                 const struct op_desc *a = _a, *b = _b;
1358
1359                 /* First, maintain order within any trace file. */
1360                 if (a->file == b->file)
1361                         return a->op_num - b->op_num;
1362
1363                 /* Otherwise, arrange by seqnum order. */
1364                 if (op[a->file][a->op_num].seqnum !=
1365                     op[b->file][b->op_num].seqnum)
1366                         return op[a->file][a->op_num].seqnum
1367                                 - op[b->file][b->op_num].seqnum;
1368
1369                 /* Cancelled transactions are assumed to happen first. */
1370                 if (starts_transaction(&op[a->file][a->op_num])
1371                     && !successful_transaction(&op[a->file][a->op_num]))
1372                         return -1;
1373                 if (starts_transaction(&op[b->file][b->op_num])
1374                     && !successful_transaction(&op[b->file][b->op_num]))
1375                         return 1;
1376
1377                 /* No idea. */
1378                 return 0;
1379         }
1380
1381         /* Now sort into seqnum order. */
1382         for (h = 0; h < total_keys * 2; h++) {
1383                 struct op_desc *user = hash[h].user;
1384
1385                 qsort(user, hash[h].num_users, sizeof(user[0]), compare_seqnum);
1386                 if (!figure_deps(filename, op, &hash[h].key, &tdb_null, user,
1387                                  hash[h].num_users, num)) {
1388                         const TDB_DATA *data;
1389
1390                         data = preexisting_data(filename, op, &hash[h].key,
1391                                                 user, hash[h].num_users);
1392                         /* Give the first op what it wants: does that help? */
1393                         if (!figure_deps(filename, op, &hash[h].key, data, user,
1394                                          hash[h].num_users, num))
1395                                 fail(filename[user[0].file], user[0].op_num+1,
1396                                      "Could not resolve inter-dependencies");
1397                         if (tdb_store(tdb, hash[h].key, *data, TDB_INSERT) != 0)
1398                                 errx(1, "Could not store initial value");
1399                 }
1400         }
1401 }
1402
1403 static int destroy_depend(struct depend *dep)
1404 {
1405         list_del(&dep->pre_list);
1406         list_del(&dep->post_list);
1407         return 0;
1408 }
1409
1410 static void add_dependency(void *ctx,
1411                            struct op *op[],
1412                            char *filename[],
1413                            const struct op_desc *needs,
1414                            const struct op_desc *prereq)
1415 {
1416         struct depend *dep;
1417
1418         /* We don't depend on ourselves. */
1419         if (needs->file == prereq->file) {
1420                 assert(prereq->op_num < needs->op_num);
1421                 return;
1422         }
1423
1424 #if DEBUG_DEPS
1425         printf("%s:%u: depends on %s:%u\n",
1426                filename[needs->file], needs->op_num+1,
1427                filename[prereq->file], prereq->op_num+1);
1428 #endif
1429
1430         dep = talloc(ctx, struct depend);
1431         dep->needs = *needs;
1432         dep->prereq = *prereq;
1433
1434 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
1435         /* If something in a traverse depends on something in another
1436          * traverse/transaction, it creates a dependency between the
1437          * two groups. */
1438         if ((in_traverse(op[prereq->file], prereq->op_num)
1439              && (starts_transaction(&op[needs->file][needs->op_num])
1440                  || starts_traverse(&op[needs->file][needs->op_num])))
1441             || (in_traverse(op[needs->file], needs->op_num)
1442                 && (starts_transaction(&op[prereq->file][prereq->op_num])
1443                     || starts_traverse(&op[prereq->file][prereq->op_num])))) {
1444                 unsigned int start;
1445
1446                 /* We are satisfied by end of group. */
1447                 start = op[prereq->file][prereq->op_num].group_start;
1448                 dep->prereq.op_num = start + op[prereq->file][start].group_len;
1449                 /* And we need that done by start of our group. */
1450                 dep->needs.op_num = op[needs->file][needs->op_num].group_start;
1451         }
1452
1453         /* There is also this case:
1454          *  <traverse> <read foo> ...
1455          *  <transaction> ... </transaction> <create foo>
1456          * Where if we start the traverse then wait, we could block
1457          * the transaction and deadlock.
1458          *
1459          * We try to address this by ensuring that where seqnum indicates it's
1460          * possible, we wait for <create foo> before *starting* traverse.
1461          */
1462         else if (in_traverse(op[needs->file], needs->op_num)) {
1463                 struct op *need = &op[needs->file][needs->op_num];
1464                 if (op[needs->file][need->group_start].seqnum >
1465                     op[prereq->file][prereq->op_num].seqnum) {
1466                         dep->needs.op_num = need->group_start;
1467                 }
1468         }
1469 #endif
1470
1471         /* If you depend on a transaction or chainlock, you actually
1472          * depend on it ending. */
1473         if (starts_transaction(&op[prereq->file][dep->prereq.op_num])
1474             || starts_chainlock(&op[prereq->file][dep->prereq.op_num])) {
1475                 dep->prereq.op_num
1476                         += op[dep->prereq.file][dep->prereq.op_num].group_len;
1477 #if DEBUG_DEPS
1478                 printf("-> Actually end of transaction %s:%u\n",
1479                        filename[dep->prereq->file], dep->prereq->op_num+1);
1480 #endif
1481         } else
1482                 /* We should never create a dependency from middle of
1483                  * a transaction. */
1484                 assert(!in_transaction(op[prereq->file], dep->prereq.op_num)
1485                        || op[prereq->file][dep->prereq.op_num].type
1486                        == OP_TDB_TRANSACTION_COMMIT
1487                        || op[prereq->file][dep->prereq.op_num].type
1488                        == OP_TDB_TRANSACTION_CANCEL);
1489
1490         list_add(&op[dep->prereq.file][dep->prereq.op_num].post,
1491                  &dep->post_list);
1492         list_add(&op[dep->needs.file][dep->needs.op_num].pre,
1493                  &dep->pre_list);
1494         talloc_set_destructor(dep, destroy_depend);
1495 }
1496
1497 static bool changes_db(const TDB_DATA *key, const struct op *op)
1498 {
1499         return gives(key, NULL, op) != NULL;
1500 }
1501
1502 static void depend_on_previous(struct op *op[],
1503                                char *filename[],
1504                                unsigned int num,
1505                                struct op_desc user[],
1506                                unsigned int i,
1507                                int prev)
1508 {
1509         bool deps[num];
1510         int j;
1511
1512         if (i == 0)
1513                 return;
1514
1515         if (prev == i - 1) {
1516                 /* Just depend on previous. */
1517                 add_dependency(NULL, op, filename, &user[i], &user[prev]);
1518                 return;
1519         }
1520
1521         /* We have to wait for the readers.  Find last one in *each* file. */
1522         memset(deps, 0, sizeof(deps));
1523         deps[user[i].file] = true;
1524         for (j = i - 1; j > prev; j--) {
1525                 if (!deps[user[j].file]) {
1526                         add_dependency(NULL, op, filename, &user[i], &user[j]);
1527                         deps[user[j].file] = true;
1528                 }
1529         }
1530 }
1531
1532 /* This is simple, but not complete.  We don't take into account
1533  * indirect dependencies. */
1534 static void optimize_dependencies(struct op *op[], unsigned int num_ops[],
1535                                   unsigned int num)
1536 {
1537         unsigned int i, j;
1538
1539         /* There can only be one real dependency on each file */
1540         for (i = 0; i < num; i++) {
1541                 for (j = 1; j < num_ops[i]; j++) {
1542                         struct depend *dep, *next;
1543                         struct depend *prev[num];
1544
1545                         memset(prev, 0, sizeof(prev));
1546
1547                         list_for_each_safe(&op[i][j].pre, dep, next, pre_list) {
1548                                 if (!prev[dep->prereq.file]) {
1549                                         prev[dep->prereq.file] = dep;
1550                                         continue;
1551                                 }
1552                                 if (prev[dep->prereq.file]->prereq.op_num
1553                                     < dep->prereq.op_num) {
1554                                         talloc_free(prev[dep->prereq.file]);
1555                                         prev[dep->prereq.file] = dep;
1556                                 } else
1557                                         talloc_free(dep);
1558                         }
1559                 }
1560         }
1561
1562         for (i = 0; i < num; i++) {
1563                 int deps[num];
1564
1565                 for (j = 0; j < num; j++)
1566                         deps[j] = -1;
1567
1568                 for (j = 1; j < num_ops[i]; j++) {
1569                         struct depend *dep, *next;
1570
1571                         list_for_each_safe(&op[i][j].pre, dep, next, pre_list) {
1572                                 if (deps[dep->prereq.file]
1573                                     >= (int)dep->prereq.op_num)
1574                                         talloc_free(dep);
1575                                 else
1576                                         deps[dep->prereq.file]
1577                                                 = dep->prereq.op_num;
1578                         }
1579                 }
1580         }
1581 }
1582
1583 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
1584 /* Force an order among the traversals, so they don't deadlock (as much) */
1585 static void make_traverse_depends(char *filename[],
1586                                   struct op *op[], unsigned int num_ops[],
1587                                   unsigned int num)
1588 {
1589         unsigned int i, num_traversals = 0;
1590         int j;
1591         struct op_desc *desc;
1592
1593         /* Sort by which one runs first. */
1594         int compare_traverse_desc(const void *_a, const void *_b)
1595         {
1596                 const struct op_desc *da = _a, *db = _b;
1597                 const struct op *a = &op[da->file][da->op_num],
1598                         *b = &op[db->file][db->op_num];
1599
1600                 if (a->seqnum != b->seqnum)
1601                         return a->seqnum - b->seqnum;
1602
1603                 /* If they have same seqnum, it means one didn't make any
1604                  * changes.  Thus sort by end in that case. */
1605                 return a[a->group_len].seqnum - b[b->group_len].seqnum;
1606         }
1607
1608         desc = talloc_array(NULL, struct op_desc, 1);
1609
1610         /* Count them. */
1611         for (i = 0; i < num; i++) {
1612                 for (j = 1; j < num_ops[i]; j++) {
1613                         /* Traverse start (ignore those in
1614                          * transactions; they're already covered by
1615                          * transaction dependencies). */
1616                         if (starts_traverse(&op[i][j])
1617                             && !in_transaction(op[i], j)) {
1618                                 desc = talloc_realloc(NULL, desc,
1619                                                       struct op_desc,
1620                                                       num_traversals+1);
1621                                 desc[num_traversals].file = i;
1622                                 desc[num_traversals].op_num = j;
1623                                 num_traversals++;
1624                         }
1625                 }
1626         }
1627         qsort(desc, num_traversals, sizeof(desc[0]), compare_traverse_desc);
1628
1629         for (i = 1; i < num_traversals; i++) {
1630                 const struct op *prev = &op[desc[i-1].file][desc[i-1].op_num];
1631                 const struct op *curr = &op[desc[i].file][desc[i].op_num];
1632
1633                 /* Read traverses don't depend on each other (read lock). */
1634                 if (prev->type == OP_TDB_TRAVERSE_READ_START
1635                     && curr->type == OP_TDB_TRAVERSE_READ_START)
1636                         continue;
1637
1638                 /* Only make dependency if it's clear. */
1639                 if (compare_traverse_desc(&desc[i], &desc[i-1])) {
1640                         /* i depends on end of traverse i-1. */
1641                         struct op_desc end = desc[i-1];
1642                         end.op_num += prev->group_len;
1643                         add_dependency(NULL, op, filename, &desc[i], &end);
1644                 }
1645         }
1646         talloc_free(desc);
1647 }
1648
1649 static void set_nonblock(int fd)
1650 {
1651         if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL)|O_NONBLOCK) != 0)
1652                 err(1, "Setting pipe nonblocking");
1653 }
1654
1655 static bool handle_backoff(struct op *op[], int fd)
1656 {
1657         struct op_desc desc;
1658         bool handled = false;
1659
1660         /* Sloppy coding: we assume PIPEBUF never fills. */
1661         while (read(fd, &desc, sizeof(desc)) != -1) {
1662                 unsigned int i;
1663                 handled = true;
1664                 for (i = desc.op_num; i > 0; i--) {
1665                         if (op[desc.file][i].type == OP_TDB_TRAVERSE) {
1666                                 /* We insert a fake end here. */
1667                                 op[desc.file][i].type
1668                                         = OP_TDB_TRAVERSE_END_EARLY;
1669                                 break;
1670                         } else if (starts_traverse(&op[desc.file][i])) {
1671                                 unsigned int start = i;
1672                                 struct op tmp = op[desc.file][i];
1673                                 /* Move the ops outside traverse. */
1674                                 memmove(&op[desc.file][i],
1675                                         &op[desc.file][i+1],
1676                                         (desc.op_num-i-1) * sizeof(op[0][0]));
1677                                 op[desc.file][desc.op_num] = tmp;
1678                                 while (op[desc.file][i].group_start == start) {
1679                                         op[desc.file][i++].group_start
1680                                                 = desc.op_num;
1681                                 }
1682                                 break;
1683                         }
1684                 }
1685         }
1686         return handled;
1687 }
1688
1689 #else /* !TRAVERSALS_TAKE_TRANSACTION_LOCK */
1690 static bool handle_backoff(struct op *op[], int fd)
1691 {
1692         return false;
1693 }
1694 #endif
1695
1696 static void derive_dependencies(struct tdb_context *tdb,
1697                                 char *filename[],
1698                                 struct op *op[], unsigned int num_ops[],
1699                                 unsigned int num)
1700 {
1701         struct keyinfo *hash;
1702         unsigned int h, i;
1703
1704         /* Create hash table for faster key lookup. */
1705         hash = hash_ops(op, num_ops, num);
1706
1707         /* Sort them by sequence number. */
1708         sort_ops(tdb, hash, filename, op, num);
1709
1710         /* Create dependencies back to the last change, rather than
1711          * creating false dependencies by naively making each one
1712          * depend on the previous.  This has two purposes: it makes
1713          * later optimization simpler, and it also avoids deadlock with
1714          * same sequence number ops inside traversals (if one
1715          * traversal doesn't write anything, two ops can have the same
1716          * sequence number yet we can create a traversal dependency
1717          * the other way). */
1718         for (h = 0; h < total_keys * 2; h++) {
1719                 int prev = -1;
1720
1721                 if (hash[h].num_users < 2)
1722                         continue;
1723
1724                 for (i = 0; i < hash[h].num_users; i++) {
1725                         if (changes_db(&hash[h].key, &op[hash[h].user[i].file]
1726                                        [hash[h].user[i].op_num])) {
1727                                 depend_on_previous(op, filename, num,
1728                                                    hash[h].user, i, prev);
1729                                 prev = i;
1730                         } else if (prev >= 0)
1731                                 add_dependency(hash, op, filename,
1732                                                &hash[h].user[i],
1733                                                &hash[h].user[prev]);
1734                 }
1735         }
1736
1737 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
1738         make_traverse_depends(filename, op, num_ops, num);
1739 #endif
1740
1741         optimize_dependencies(op, num_ops, num);
1742 }
1743
1744 static struct timeval run_test(char *argv[],
1745                                unsigned int num_ops[],
1746                                unsigned int hashsize[],
1747                                unsigned int tdb_flags[],
1748                                unsigned int open_flags[],
1749                                struct op *op[],
1750                                int fds[2])
1751 {
1752         unsigned int i;
1753         struct timeval start, end, diff;
1754         bool ok = true;
1755
1756         for (i = 0; argv[i+2]; i++) {
1757                 struct tdb_context *tdb;
1758                 char c;
1759
1760                 switch (fork()) {
1761                 case -1:
1762                         err(1, "fork failed");
1763                 case 0:
1764                         close(fds[1]);
1765                         tdb = tdb_open_ex(argv[1], hashsize[i],
1766                                           tdb_flags[i],
1767                                           open_flags[i], 0600, NULL, hash_key);
1768                         if (!tdb)
1769                                 err(1, "Opening tdb %s", argv[1]);
1770
1771                         /* This catches parent exiting. */
1772                         if (read(fds[0], &c, 1) != 1)
1773                                 exit(1);
1774                         run_ops(tdb, pipes[i].fd[0], argv+2, op, i, 1,
1775                                 num_ops[i], false);
1776                         check_deps(argv[2+i], op[i], num_ops[i]);
1777                         exit(0);
1778                 default:
1779                         break;
1780                 }
1781         }
1782
1783         /* Let everything settle. */
1784         sleep(1);
1785
1786         printf("Starting run...");
1787         fflush(stdout);
1788         gettimeofday(&start, NULL);
1789         /* Tell them all to go!  Any write of sufficient length will do. */
1790         if (write(fds[1], hashsize, i) != i)
1791                 err(1, "Writing to wakeup pipe");
1792
1793         for (i = 0; argv[i + 2]; i++) {
1794                 int status;
1795                 wait(&status);
1796                 if (!WIFEXITED(status)) {
1797                         warnx("Child died with signal %i", WTERMSIG(status));
1798                         ok = false;
1799                 } else if (WEXITSTATUS(status) != 0)
1800                         /* Assume child spat out error. */
1801                         ok = false;
1802         }
1803         if (!ok)
1804                 exit(1);
1805
1806         gettimeofday(&end, NULL);
1807         printf("done\n");
1808
1809         if (end.tv_usec < start.tv_usec) {
1810                 end.tv_usec += 1000000;
1811                 end.tv_sec--;
1812         }
1813         diff.tv_sec = end.tv_sec - start.tv_sec;
1814         diff.tv_usec = end.tv_usec - start.tv_usec;
1815         return diff;
1816 }
1817
1818 int main(int argc, char *argv[])
1819 {
1820         struct timeval diff;
1821         unsigned int i, num_ops[argc], hashsize[argc], tdb_flags[argc], open_flags[argc];
1822         struct op *op[argc];
1823         int fds[2];
1824         struct tdb_context *tdb;
1825
1826         if (argc < 3)
1827                 errx(1, "Usage: %s <tdbfile> <tracefile>...", argv[0]);
1828
1829         pipes = talloc_array(NULL, struct pipe, argc - 1);
1830         for (i = 0; i < argc - 2; i++) {
1831                 printf("Loading tracefile %s...", argv[2+i]);
1832                 fflush(stdout);
1833                 op[i] = load_tracefile(argv+2, i, &num_ops[i], &hashsize[i],
1834                                        &tdb_flags[i], &open_flags[i]);
1835                 if (pipe(pipes[i].fd) != 0)
1836                         err(1, "creating pipe");
1837                 /* Don't truncate, or clear if first: we do that. */
1838                 open_flags[i] &= ~(O_TRUNC);
1839                 tdb_flags[i] &= ~(TDB_CLEAR_IF_FIRST);
1840                 /* Open NOSYNC, to save time. */
1841                 tdb_flags[i] |= TDB_NOSYNC;
1842                 printf("done\n");
1843         }
1844
1845         /* Dependency may figure we need to create seed records. */
1846         tdb = tdb_open_ex(argv[1], hashsize[0], TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
1847                           O_CREAT|O_TRUNC|O_RDWR, 0600, NULL, hash_key);
1848
1849         printf("Calculating inter-dependencies...");
1850         fflush(stdout);
1851         derive_dependencies(tdb, argv+2, op, num_ops, i);
1852         printf("done\n");
1853         tdb_close(tdb);
1854
1855         /* Don't fork for single arg case: simple debugging. */
1856         if (argc == 3) {
1857                 tdb = tdb_open_ex(argv[1], hashsize[0], tdb_flags[0],
1858                                   open_flags[0], 0600, NULL, hash_key);
1859                 printf("Single threaded run...");
1860                 fflush(stdout);
1861
1862                 run_ops(tdb, pipes[0].fd[0], argv+2, op, 0, 1, num_ops[0],
1863                         false);
1864                 check_deps(argv[2], op[0], num_ops[0]);
1865
1866                 printf("done\n");
1867                 exit(0);
1868         }
1869
1870         if (pipe(fds) != 0)
1871                 err(1, "creating pipe");
1872
1873 #if TRAVERSALS_TAKE_TRANSACTION_LOCK
1874         if (pipe(pipes[argc-2].fd) != 0)
1875                 err(1, "creating pipe");
1876         backoff_fd = pipes[argc-2].fd[1];
1877         set_nonblock(pipes[argc-2].fd[1]);
1878         set_nonblock(pipes[argc-2].fd[0]);
1879 #endif
1880
1881         do {
1882                 diff = run_test(argv, num_ops, hashsize, tdb_flags, open_flags,
1883                                 op, fds);
1884         } while (handle_backoff(op, pipes[argc-2].fd[0]));
1885
1886         printf("Time replaying: %lu usec\n",
1887                diff.tv_sec * 1000000UL + diff.tv_usec);
1888         
1889         exit(0);
1890 }