]> git.ozlabs.org Git - ccan/blob - ccan/tdb/tools/replay_trace.c
Fix last minute warning "fix" in replay_trace.c, remove seqnum inc tracing.
[ccan] / ccan / tdb / tools / replay_trace.c
1 #include <ccan/tdb/tdb.h>
2 #include <ccan/grab_file/grab_file.h>
3 #include <ccan/hash/hash.h>
4 #include <ccan/talloc/talloc.h>
5 #include <ccan/str_talloc/str_talloc.h>
6 #include <ccan/str/str.h>
7 #include <ccan/list/list.h>
8 #include <err.h>
9 #include <ctype.h>
10 #include <string.h>
11 #include <unistd.h>
12 #include <sys/types.h>
13 #include <sys/wait.h>
14 #include <sys/time.h>
15
16 #define STRINGIFY2(x) #x
17 #define STRINGIFY(x) STRINGIFY2(x)
18
19 /* Avoid mod by zero */
20 static unsigned int total_keys = 1;
21
22 /* #define DEBUG_DEPS 1 */
23
24 struct pipe {
25         int fd[2];
26 };
27 static struct pipe *pipes;
28
29 static void __attribute__((noreturn)) fail(const char *filename,
30                                            unsigned int line,
31                                            const char *fmt, ...)
32 {
33         va_list ap;
34
35         va_start(ap, fmt);
36         fprintf(stderr, "%s:%u: FAIL: ", filename, line);
37         vfprintf(stderr, fmt, ap);
38         fprintf(stderr, "\n");
39         va_end(ap);
40         exit(1);
41 }
42         
43 /* Try or die. */
44 #define try(expr, expect)                                               \
45         do {                                                            \
46                 int ret = (expr);                                       \
47                 if (ret != (expect))                                    \
48                         fail(filename, i+1, STRINGIFY(expr) "= %i", ret); \
49         } while (0)
50
51 /* Try or imitate results. */
52 #define unreliable(expr, expect, force, undo)                           \
53         do {                                                            \
54                 int ret = expr;                                         \
55                 if (ret != expect) {                                    \
56                         fprintf(stderr, "%s:%u: %s gave %i not %i",     \
57                               filename, i+1, STRINGIFY(expr), ret, expect); \
58                         if (expect == 0)                                \
59                                 force;                                  \
60                         else                                            \
61                                 undo;                                   \
62                 }                                                       \
63         } while (0)
64
65 static bool key_eq(TDB_DATA a, TDB_DATA b)
66 {
67         if (a.dsize != b.dsize)
68                 return false;
69         return memcmp(a.dptr, b.dptr, a.dsize) == 0;
70 }
71
72 /* This is based on the hash algorithm from gdbm */
73 static unsigned int hash_key(TDB_DATA *key)
74 {
75         uint32_t value; /* Used to compute the hash value.  */
76         uint32_t   i;   /* Used to cycle through random values. */
77
78         /* Set the initial value from the key size. */
79         for (value = 0x238F13AF ^ key->dsize, i=0; i < key->dsize; i++)
80                 value = (value + (key->dptr[i] << (i*5 % 24)));
81
82         return (1103515243 * value + 12345);  
83 }
84
85 enum op_type {
86         OP_TDB_LOCKALL,
87         OP_TDB_LOCKALL_MARK,
88         OP_TDB_LOCKALL_UNMARK,
89         OP_TDB_LOCKALL_NONBLOCK,
90         OP_TDB_UNLOCKALL,
91         OP_TDB_LOCKALL_READ,
92         OP_TDB_LOCKALL_READ_NONBLOCK,
93         OP_TDB_UNLOCKALL_READ,
94         OP_TDB_CHAINLOCK,
95         OP_TDB_CHAINLOCK_NONBLOCK,
96         OP_TDB_CHAINLOCK_MARK,
97         OP_TDB_CHAINLOCK_UNMARK,
98         OP_TDB_CHAINUNLOCK,
99         OP_TDB_CHAINLOCK_READ,
100         OP_TDB_CHAINUNLOCK_READ,
101         OP_TDB_PARSE_RECORD,
102         OP_TDB_EXISTS,
103         OP_TDB_STORE,
104         OP_TDB_APPEND,
105         OP_TDB_GET_SEQNUM,
106         OP_TDB_WIPE_ALL,
107         OP_TDB_TRANSACTION_START,
108         OP_TDB_TRANSACTION_CANCEL,
109         OP_TDB_TRANSACTION_COMMIT,
110         OP_TDB_TRAVERSE_READ_START,
111         OP_TDB_TRAVERSE_START,
112         OP_TDB_TRAVERSE_END,
113         OP_TDB_TRAVERSE,
114         OP_TDB_FIRSTKEY,
115         OP_TDB_NEXTKEY,
116         OP_TDB_FETCH,
117         OP_TDB_DELETE,
118 };
119
120 struct op {
121         unsigned int serial;
122         enum op_type op;
123         TDB_DATA key;
124         TDB_DATA data;
125         int ret;
126         /* Who is waiting for us? */
127         struct list_head post;
128         /* How many are we waiting for? */
129         unsigned int pre;
130
131         union {
132                 int flag; /* open and store */
133                 struct traverse *trav; /* traverse start */
134                 TDB_DATA post_append; /* append */
135         };
136 };
137
138 static unsigned char hex_char(const char *filename, unsigned int line, char c)
139 {
140         c = toupper(c);
141         if (c >= 'A' && c <= 'F')
142                 return c - 'A' + 10;
143         if (c >= '0' && c <= '9')
144                 return c - '0';
145         fail(filename, line, "invalid hex character '%c'", c);
146 }
147
148 /* TDB data is <size>:<%02x>* */
149 static TDB_DATA make_tdb_data(const void *ctx,
150                               const char *filename, unsigned int line,
151                               const char *word)
152 {
153         TDB_DATA data;
154         unsigned int i;
155         const char *p;
156
157         if (streq(word, "NULL"))
158                 return tdb_null;
159
160         data.dsize = atoi(word);
161         data.dptr = talloc_array(ctx, unsigned char, data.dsize);
162         p = strchr(word, ':');
163         if (!p)
164                 fail(filename, line, "invalid tdb data '%s'", word);
165         p++;
166         for (i = 0; i < data.dsize; i++)
167                 data.dptr[i] = hex_char(filename, line, p[i*2])*16
168                         + hex_char(filename, line, p[i*2+1]);
169
170         return data;
171 }
172
173 static void add_op(const char *filename, struct op **op, unsigned int i,
174                    unsigned int serial, enum op_type type)
175 {
176         struct op *new;
177         *op = talloc_realloc(NULL, *op, struct op, i+1);
178         new = (*op) + i;
179         new->op = type;
180         new->serial = serial;
181         new->pre = 0;
182         new->ret = 0;
183 }
184
185 static void op_add_nothing(const char *filename,
186                            struct op op[], unsigned int op_num, char *words[])
187 {
188         if (words[2])
189                 fail(filename, op_num+1, "Expected no arguments");
190         op[op_num].key = tdb_null;
191 }
192
193 static void op_add_key(const char *filename,
194                        struct op op[], unsigned int op_num, char *words[])
195 {
196         if (words[2] == NULL || words[3])
197                 fail(filename, op_num+1, "Expected just a key");
198
199         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
200         if (op[op_num].op != OP_TDB_TRAVERSE)
201                 total_keys++;
202 }
203
204 static void op_add_key_ret(const char *filename,
205                            struct op op[], unsigned int op_num, char *words[])
206 {
207         if (!words[2] || !words[3] || !words[4] || words[5]
208             || !streq(words[3], "="))
209                 fail(filename, op_num+1, "Expected <key> = <ret>");
210         op[op_num].ret = atoi(words[4]);
211         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
212         /* May only be a unique key if it fails */
213         if (op[op_num].ret != 0)
214                 total_keys++;
215 }
216
217 static void op_add_key_data(const char *filename,
218                             struct op op[], unsigned int op_num, char *words[])
219 {
220         if (!words[2] || !words[3] || !words[4] || words[5]
221             || !streq(words[3], "="))
222                 fail(filename, op_num+1, "Expected <key> = <data>");
223         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
224         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[4]);
225         /* May only be a unique key if it fails */
226         if (!op[op_num].data.dptr)
227                 total_keys++;
228 }
229
230 /* <serial> tdb_store <rec> <rec> <flag> = <ret> */
231 static void op_add_store(const char *filename,
232                          struct op op[], unsigned int op_num, char *words[])
233 {
234         if (!words[2] || !words[3] || !words[4] || !words[5] || !words[6]
235             || words[7] || !streq(words[5], "="))
236                 fail(filename, op_num+1, "Expect <key> <data> <flag> = <ret>");
237
238         op[op_num].flag = strtoul(words[4], NULL, 0);
239         op[op_num].ret = atoi(words[6]);
240         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
241         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[3]);
242         total_keys++;
243 }
244
245 /* <serial> tdb_append <rec> <rec> = <rec> */
246 static void op_add_append(const char *filename,
247                           struct op op[], unsigned int op_num, char *words[])
248 {
249         if (!words[2] || !words[3] || !words[4] || !words[5] || words[6]
250             || !streq(words[4], "="))
251                 fail(filename, op_num+1, "Expect <key> <data> = <rec>");
252
253         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
254         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[3]);
255         op[op_num].post_append
256                 = make_tdb_data(op, filename, op_num+1, words[5]);
257         total_keys++;
258 }
259
260 /* <serial> tdb_get_seqnum = <ret> */
261 static void op_add_seqnum(const char *filename,
262                           struct op op[], unsigned int op_num, char *words[])
263 {
264         if (!words[2] || !words[3] || words[4] || !streq(words[2], "="))
265                 fail(filename, op_num+1, "Expect = <ret>");
266
267         op[op_num].key = tdb_null;
268         op[op_num].ret = atoi(words[3]);
269 }
270
271 static void op_add_traverse(const char *filename,
272                             struct op op[], unsigned int op_num, char *words[])
273 {
274         if (words[2])
275                 fail(filename, op_num+1, "Expect no arguments");
276
277         op[op_num].key = tdb_null;
278         op[op_num].trav = NULL;
279 }
280
281 struct traverse_hash {
282         TDB_DATA key;
283         unsigned int index;
284 };
285
286 /* A traverse is a hash of keys, each one associated with ops. */
287 struct traverse {
288         /* How many traversal callouts should I do? */
289         unsigned int num;
290
291         /* Where is traversal end op? */
292         unsigned int end;
293
294         /* For trivial traversals. */
295         struct traverse_hash *hash;
296 };
297
298 /* A trivial traversal is one which doesn't terminate early and only
299  * plays with its own record.  We can reliably replay these even if
300  * traverse order changes. */
301 static bool is_trivial_traverse(struct op op[], unsigned int end)
302 {
303 #if 0
304         unsigned int i;
305         TDB_DATA cur = tdb_null;
306
307         if (op[end].ret != 0)
308                 return false;
309
310         for (i = 0; i < end; i++) {
311                 if (!op[i].key.dptr)
312                         continue;
313                 if (op[i].op == OP_TDB_TRAVERSE)
314                         cur = op[i].key;
315                 if (!key_eq(cur, op[i].key))
316                         return false;
317         }
318         return true;
319 #endif
320         /* With multiple things happening at once, no traverse is trivial. */
321         return false;
322 }
323
324 static void op_analyze_traverse(const char *filename,
325                                 struct op op[], unsigned int op_num,
326                                 char *words[])
327 {
328         int i;
329         struct traverse *trav = talloc(op, struct traverse);
330
331         /* = %u means traverse function terminated. */
332         if (words[2]) {
333                 if (!streq(words[2], "=") || !words[3] || words[4])
334                         fail(filename, op_num+1, "expect = <num>");
335                 op[op_num].ret = atoi(words[3]);
336         } else
337                 op[op_num].ret = 0;
338
339         trav->num = 0;
340         trav->end = op_num;
341         for (i = op_num-1; i >= 0; i--) {
342                 if (op[i].op == OP_TDB_TRAVERSE)
343                         trav->num++;
344                 if (op[i].op != OP_TDB_TRAVERSE_READ_START
345                     && op[i].op != OP_TDB_TRAVERSE_START)
346                         continue;
347                 if (op[i].trav)
348                         continue;
349                 break;
350         }
351
352         if (i < 0)
353                 fail(filename, op_num+1, "no traversal start found");
354
355         op[i].trav = trav;
356
357         if (is_trivial_traverse(op+i, op_num-i)) {
358                 /* Fill in a plentiful hash table. */
359                 op[i].trav->hash = talloc_zero_array(op[i].trav,
360                                                      struct traverse_hash,
361                                                      trav->num * 2);
362                 for (; i < op_num; i++) {
363                         unsigned int h;
364                         if (op[i].op != OP_TDB_TRAVERSE)
365                                 continue;
366                         h = hash_key(&op[i].key) % (trav->num * 2);
367                         while (trav->hash[h].index)
368                                 h = (h + 1) % (trav->num * 2);
369                         trav->hash[h].index = i+1;
370                         trav->hash[h].key = op[i].key;
371                 }
372         } else
373                 trav->hash = NULL;
374 }
375
376 /* Keep -Wmissing-declarations happy: */
377 const struct op_table *
378 find_keyword (register const char *str, register unsigned int len);
379
380 #include "keywords.c"
381
382 static int get_len(TDB_DATA key, TDB_DATA data, void *private_data)
383 {
384         return data.dsize;
385 }
386
387 static unsigned run_ops(struct tdb_context *tdb,
388                         int pre_fd,
389                         const char *filename,
390                         struct op op[],
391                         unsigned int start, unsigned int stop);
392
393 struct traverse_info {
394         struct op *op;
395         const char *filename;
396         int pre_fd;
397         unsigned int start;
398         unsigned int i;
399 };
400
401 /* Trivial case: do whatever they did for this key. */
402 static int trivial_traverse(struct tdb_context *tdb,
403                             TDB_DATA key, TDB_DATA data,
404                             void *_tinfo)
405 {
406         struct traverse_info *tinfo = _tinfo;
407         struct traverse *trav = tinfo->op[tinfo->start].trav;
408         unsigned int h = hash_key(&key) % (trav->num * 2);
409
410         while (trav->hash[h].index) {
411                 if (key_eq(trav->hash[h].key, key)) {
412                         run_ops(tdb, tinfo->pre_fd, tinfo->filename, tinfo->op,
413                                 trav->hash[h].index, trav->end);
414                         tinfo->i++;
415                         return 0;
416                 }
417                 h = (h + 1) % (trav->num * 2);
418         }
419         fail(tinfo->filename, tinfo->start + 1, "unexpected traverse key");
420 }
421
422 /* More complex.  Just do whatever's they did at the n'th entry. */
423 static int nontrivial_traverse(struct tdb_context *tdb,
424                                TDB_DATA key, TDB_DATA data,
425                                void *_tinfo)
426 {
427         struct traverse_info *tinfo = _tinfo;
428         struct traverse *trav = tinfo->op[tinfo->start].trav;
429
430         if (tinfo->i == trav->end) {
431                 /* This can happen if traverse expects to be empty. */
432                 if (tinfo->start + 1 == trav->end)
433                         return 1;
434                 fail(tinfo->filename, tinfo->start + 1,
435                      "traverse did not terminate");
436         }
437
438         if (tinfo->op[tinfo->i].op != OP_TDB_TRAVERSE)
439                 fail(tinfo->filename, tinfo->start + 1,
440                      "%s:%u:traverse terminated early");
441
442         /* Run any normal ops. */
443         tinfo->i = run_ops(tdb, tinfo->pre_fd, tinfo->filename, tinfo->op,
444                            tinfo->i+1, trav->end);
445
446         if (tinfo->i == trav->end)
447                 return 1;
448
449         return 0;
450 }
451
452 static unsigned op_traverse(struct tdb_context *tdb,
453                             int pre_fd,
454                             const char *filename,
455                             int (*traversefn)(struct tdb_context *,
456                                               tdb_traverse_func, void *),
457                             struct op op[],
458                             unsigned int start)
459 {
460         struct traverse *trav = op[start].trav;
461         struct traverse_info tinfo = { op, filename, pre_fd, start, start+1 };
462
463         /* Trivial case. */
464         if (trav->hash) {
465                 int ret = traversefn(tdb, trivial_traverse, &tinfo);
466                 if (ret != trav->num)
467                         fail(filename, start+1, "short traversal %i", ret);
468                 return trav->end;
469         }
470
471         traversefn(tdb, nontrivial_traverse, &tinfo);
472
473         /* Traversing in wrong order can have strange effects: eg. if
474          * original traverse went A (delete A), B, we might do B
475          * (delete A).  So if we have ops left over, we do it now. */
476         while (tinfo.i != trav->end) {
477                 if (op[tinfo.i].op == OP_TDB_TRAVERSE)
478                         tinfo.i++;
479                 else
480                         tinfo.i = run_ops(tdb, pre_fd, filename, op,
481                                           tinfo.i, trav->end);
482         }
483         return trav->end;
484 }
485
486 struct depend {
487         /* We can have more than one */
488         struct list_node list;
489         unsigned int file;
490         unsigned int op;
491 };
492
493 static void do_pre(const char *filename, int pre_fd,
494                    struct op op[], unsigned int i)
495 {
496         while (op[i].pre != 0) {
497                 unsigned int opnum;
498
499 #if DEBUG_DEPS
500                 printf("%s:%u:waiting for pre\n", filename, i+1);
501 #endif
502                 if (read(pre_fd, &opnum, sizeof(opnum)) != sizeof(opnum))
503                         errx(1, "Reading from pipe");
504
505 #if DEBUG_DEPS
506                 printf("%s:%u:got pre %u\n",
507                        filename, i+1, opnum);
508 #endif
509                 /* This could be any op, not just this one. */
510                 if (op[opnum].pre == 0)
511                         errx(1, "Got unexpected notification for op line %u",
512                              opnum + 1);
513                 op[opnum].pre--;
514         }
515 }
516
517 static void do_post(const char *filename, const struct op op[], unsigned int i)
518 {
519         struct depend *dep;
520
521         list_for_each(&op[i].post, dep, list) {
522 #if DEBUG_DEPS
523                 printf("%s:%u:sending %u to file %u\n", filename, i+1,
524                        dep->op, dep->file);
525 #endif
526                 if (write(pipes[dep->file].fd[1], &dep->op, sizeof(dep->op))
527                     != sizeof(dep->op))
528                         err(1, "Failed to tell file %u", dep->file);
529         }
530 }
531
532 static __attribute__((noinline))
533 unsigned run_ops(struct tdb_context *tdb,
534                  int pre_fd,
535                  const char *filename,
536                  struct op op[], unsigned int start, unsigned int stop)
537 {
538         unsigned int i;
539
540         for (i = start; i < stop; i++) {
541                 do_pre(filename, pre_fd, op, i);
542
543                 switch (op[i].op) {
544                 case OP_TDB_LOCKALL:
545                         try(tdb_lockall(tdb), op[i].ret);
546                         break;
547                 case OP_TDB_LOCKALL_MARK:
548                         try(tdb_lockall_mark(tdb), op[i].ret);
549                         break;
550                 case OP_TDB_LOCKALL_UNMARK:
551                         try(tdb_lockall_unmark(tdb), op[i].ret);
552                         break;
553                 case OP_TDB_LOCKALL_NONBLOCK:
554                         unreliable(tdb_lockall_nonblock(tdb), op[i].ret,
555                                    tdb_lockall(tdb), tdb_unlockall(tdb));
556                         break;
557                 case OP_TDB_UNLOCKALL:
558                         try(tdb_unlockall(tdb), op[i].ret);
559                         break;
560                 case OP_TDB_LOCKALL_READ:
561                         try(tdb_lockall_read(tdb), op[i].ret);
562                         break;
563                 case OP_TDB_LOCKALL_READ_NONBLOCK:
564                         unreliable(tdb_lockall_read_nonblock(tdb), op[i].ret,
565                                    tdb_lockall_read(tdb),
566                                    tdb_unlockall_read(tdb));
567                         break;
568                 case OP_TDB_UNLOCKALL_READ:
569                         try(tdb_unlockall_read(tdb), op[i].ret);
570                         break;
571                 case OP_TDB_CHAINLOCK:
572                         try(tdb_chainlock(tdb, op[i].key), op[i].ret);
573                         break;
574                 case OP_TDB_CHAINLOCK_NONBLOCK:
575                         unreliable(tdb_chainlock_nonblock(tdb, op[i].key),
576                                    op[i].ret,
577                                    tdb_chainlock(tdb, op[i].key),
578                                    tdb_chainunlock(tdb, op[i].key));
579                         break;
580                 case OP_TDB_CHAINLOCK_MARK:
581                         try(tdb_chainlock_mark(tdb, op[i].key), op[i].ret);
582                         break;
583                 case OP_TDB_CHAINLOCK_UNMARK:
584                         try(tdb_chainlock_unmark(tdb, op[i].key), op[i].ret);
585                         break;
586                 case OP_TDB_CHAINUNLOCK:
587                         try(tdb_chainunlock(tdb, op[i].key), op[i].ret);
588                         break;
589                 case OP_TDB_CHAINLOCK_READ:
590                         try(tdb_chainlock_read(tdb, op[i].key), op[i].ret);
591                         break;
592                 case OP_TDB_CHAINUNLOCK_READ:
593                         try(tdb_chainunlock_read(tdb, op[i].key), op[i].ret);
594                         break;
595                 case OP_TDB_PARSE_RECORD:
596                         try(tdb_parse_record(tdb, op[i].key, get_len, NULL),
597                             op[i].ret);
598                         break;
599                 case OP_TDB_EXISTS:
600                         try(tdb_exists(tdb, op[i].key), op[i].ret);
601                         break;
602                 case OP_TDB_STORE:
603                         try(tdb_store(tdb, op[i].key, op[i].data, op[i].flag),
604                             op[i].ret < 0 ? op[i].ret : 0);
605                         break;
606                 case OP_TDB_APPEND:
607                         try(tdb_append(tdb, op[i].key, op[i].data),
608                             op[i].ret < 0 ? op[i].ret : 0);
609                         break;
610                 case OP_TDB_GET_SEQNUM:
611                         try(tdb_get_seqnum(tdb), op[i].ret);
612                         break;
613                 case OP_TDB_WIPE_ALL:
614                         try(tdb_wipe_all(tdb), op[i].ret);
615                         break;
616                 case OP_TDB_TRANSACTION_START:
617                         try(tdb_transaction_start(tdb), op[i].ret);
618                         break;
619                 case OP_TDB_TRANSACTION_CANCEL:
620                         try(tdb_transaction_cancel(tdb), op[i].ret);
621                         break;
622                 case OP_TDB_TRANSACTION_COMMIT:
623                         try(tdb_transaction_commit(tdb), op[i].ret);
624                         break;
625                 case OP_TDB_TRAVERSE_READ_START:
626                         i = op_traverse(tdb, pre_fd, filename,
627                                         tdb_traverse_read, op, i);
628                         break;
629                 case OP_TDB_TRAVERSE_START:
630                         i = op_traverse(tdb, pre_fd, filename,
631                                         tdb_traverse, op, i);
632                         break;
633                 case OP_TDB_TRAVERSE:
634                         /* Terminate: we're in a traverse, and we've
635                          * done our ops. */
636                         return i;
637                 case OP_TDB_TRAVERSE_END:
638                         fail(filename, i+1, "unepxected end traverse");
639                 /* FIXME: These must be treated like traverse. */
640                 case OP_TDB_FIRSTKEY:
641                         if (!key_eq(tdb_firstkey(tdb), op[i].data))
642                                 fail(filename, i+1, "bad firstkey");
643                         break;
644                 case OP_TDB_NEXTKEY:
645                         if (!key_eq(tdb_nextkey(tdb, op[i].key), op[i].data))
646                                 fail(filename, i+1, "bad nextkey");
647                         break;
648                 case OP_TDB_FETCH: {
649                         TDB_DATA f = tdb_fetch(tdb, op[i].key);
650                         if (!key_eq(f, op[i].data))
651                                 fail(filename, i+1, "bad fetch %u", f.dsize);
652                         break;
653                 }
654                 case OP_TDB_DELETE:
655                         try(tdb_delete(tdb, op[i].key), op[i].ret);
656                         break;
657                 }
658                 do_post(filename, op, i);
659         }
660         return i;
661 }
662
663 static struct op *load_tracefile(const char *filename, unsigned int *num,
664                                  unsigned int *hashsize,
665                                  unsigned int *tdb_flags,
666                                  unsigned int *open_flags)
667 {
668         unsigned int i;
669         struct op *op = talloc_array(NULL, struct op, 1);
670         char **words;
671         char **lines;
672         char *file;
673
674         file = grab_file(NULL, filename, NULL);
675         if (!file)
676                 err(1, "Reading %s", filename);
677
678         lines = strsplit(file, file, "\n", NULL);
679         if (!lines[0])
680                 errx(1, "%s is empty", filename);
681
682         words = strsplit(lines, lines[0], " ", NULL);
683         if (!streq(words[1], "tdb_open"))
684                 fail(filename, 1, "does not start with tdb_open");
685
686         *hashsize = atoi(words[2]);
687         *tdb_flags = strtoul(words[3], NULL, 0);
688         *open_flags = strtoul(words[4], NULL, 0);
689
690         for (i = 1; lines[i]; i++) {
691                 const struct op_table *opt;
692
693                 words = strsplit(lines, lines[i], " ", NULL);
694                 if (!words[0] || !words[1])
695                         fail(filename, i+1, "Expected serial number and op");
696                
697                 opt = find_keyword(words[1], strlen(words[1]));
698                 if (!opt) {
699                         if (streq(words[1], "tdb_close")) {
700                                 if (lines[i+1])
701                                         fail(filename, i+2,
702                                              "lines after tdb_close");
703                                 *num = i;
704                                 talloc_free(lines);
705                                 return op;
706                         }
707                         fail(filename, i+1, "Unknown operation '%s'", words[1]);
708                 }
709
710                 add_op(filename, &op, i, atoi(words[0]), opt->type);
711                 opt->enhance_op(filename, op, i, words);
712         }
713
714         fprintf(stderr, "%s:%u:last operation is not tdb_close: incomplete?",
715               filename, i);
716         talloc_free(lines);
717         *num = i - 1;
718         return op;
719 }
720
721 /* We remember all the keys we've ever seen, and who has them. */
722 struct key_user {
723         unsigned int file;
724         unsigned int op_num;
725 };
726
727 struct keyinfo {
728         TDB_DATA key;
729         unsigned int num_users;
730         struct key_user *user;
731 };
732
733 static bool changes_db(const struct op *op)
734 {
735         if (op->ret != 0)
736                 return false;
737
738         return op->op == OP_TDB_STORE
739                 || op->op == OP_TDB_APPEND
740                 || op->op == OP_TDB_WIPE_ALL
741                 || op->op == OP_TDB_TRANSACTION_COMMIT
742                 || op->op == OP_TDB_DELETE;
743 }
744
745 static struct keyinfo *hash_ops(struct op *op[], unsigned int num_ops[],
746                                 unsigned int num)
747 {
748         unsigned int i, j, h;
749         struct keyinfo *hash;
750
751         /* Gcc nexted function extension.  How cool is this? */
752         int compare_user_serial(const void *_a, const void *_b)
753         {
754                 const struct key_user *a = _a, *b = _b;
755                 int ret = op[a->file][a->op_num].serial
756                         - op[b->file][b->op_num].serial;
757
758                 /* Fetches don't inc serial, so we put changes first. */
759                 if (ret == 0) {
760                         if (changes_db(&op[a->file][a->op_num])
761                             && !changes_db(&op[b->file][b->op_num]))
762                                 return -1;
763                         if (changes_db(&op[b->file][b->op_num])
764                             && !changes_db(&op[a->file][a->op_num]))
765                                 return 1;
766                 }
767                 return ret;
768         }
769
770         hash = talloc_zero_array(op[0], struct keyinfo, total_keys*2);
771         for (i = 0; i < num; i++) {
772                 for (j = 1; j < num_ops[i]; j++) {
773                         /* We can't do this on allocation, due to realloc. */
774                         list_head_init(&op[i][j].post);
775
776                         if (!op[i][j].key.dptr)
777                                 continue;
778
779                         /* We don't wait for traverse keys */
780                         /* FIXME: We should, for trivial traversals. */
781                         if (op[i][j].op == OP_TDB_TRAVERSE)
782                                 continue;
783
784                         h = hash_key(&op[i][j].key) % (total_keys * 2);
785                         while (!key_eq(hash[h].key, op[i][j].key)) {
786                                 if (!hash[h].key.dptr) {
787                                         hash[h].key = op[i][j].key;
788                                         break;
789                                 }
790                                 h = (h + 1) % (total_keys * 2);
791                         }
792                         /* Might as well save some memory if we can. */
793                         if (op[i][j].key.dptr != hash[h].key.dptr) {
794                                 talloc_free(op[i][j].key.dptr);
795                                 op[i][j].key.dptr = hash[h].key.dptr;
796                         }
797                         hash[h].user = talloc_realloc(hash, hash[h].user,
798                                                      struct key_user,
799                                                      hash[h].num_users+1);
800                         hash[h].user[hash[h].num_users].op_num = j;
801                         hash[h].user[hash[h].num_users].file = i;
802                         hash[h].num_users++;
803                 }
804         }
805
806         /* Now sort into seqnum order. */
807         for (h = 0; h < total_keys * 2; h++)
808                 qsort(hash[h].user, hash[h].num_users, sizeof(hash[h].user[0]),
809                       compare_user_serial);
810
811         return hash;
812 }
813
814 static void add_dependency(void *ctx,
815                            struct op *op[],
816                            unsigned int needs_file,
817                            unsigned int needs_opnum,
818                            unsigned int satisfies_file,
819                            unsigned int satisfies_opnum)
820 {
821         struct depend *post;
822
823         post = talloc(ctx, struct depend);
824         post->file = needs_file;
825         post->op = needs_opnum;
826         list_add(&op[satisfies_file][satisfies_opnum].post, &post->list);
827
828         op[needs_file][needs_opnum].pre++;
829 }
830
831 static void derive_dependencies(char *filename[],
832                                 struct op *op[], unsigned int num_ops[],
833                                 unsigned int num)
834 {
835         struct keyinfo *hash;
836         unsigned int i;
837
838         /* Create hash table for faster key lookup. */
839         hash = hash_ops(op, num_ops, num);
840
841         /* We make the naive assumption that two ops on the same key
842          * have to be ordered; it's overkill. */
843         for (i = 0; i < total_keys * 2; i++) {
844                 unsigned int j;
845
846                 for (j = 1; j < hash[i].num_users; j++) {
847                         /* We don't depend on ourselves. */
848                         if (hash[i].user[j].file == hash[i].user[j-1].file)
849                                 continue;
850 #if DEBUG_DEPS
851                         printf("%s:%u: depends on %s:%u\n",
852                                filename[hash[i].user[j].file],
853                                hash[i].user[j].op_num+1,
854                                filename[hash[i].user[j-1].file],
855                                hash[i].user[j-1].op_num+1);
856 #endif
857                         add_dependency(hash, op,
858                                        hash[i].user[j].file,
859                                        hash[i].user[j].op_num,
860                                        hash[i].user[j-1].file,
861                                        hash[i].user[j-1].op_num);
862                 }
863         }
864 }
865
866 int main(int argc, char *argv[])
867 {
868         struct timeval start, end;
869         unsigned int i, num_ops[argc], hashsize[argc], tdb_flags[argc], open_flags[argc];
870         struct op *op[argc];
871         int fds[2];
872         char c;
873
874         if (argc < 3)
875                 errx(1, "Usage: %s <tdbfile> <tracefile>...", argv[0]);
876
877         pipes = talloc_array(NULL, struct pipe, argc - 2);
878         for (i = 0; i < argc - 2; i++) {
879                 op[i] = load_tracefile(argv[2+i], &num_ops[i], &hashsize[i],
880                                        &tdb_flags[i], &open_flags[i]);
881                 if (pipe(pipes[i].fd) != 0)
882                         err(1, "creating pipe");
883         }
884
885         derive_dependencies(argv+2, op, num_ops, i);
886
887         /* Don't fork for single arg case: simple debugging. */
888         if (argc == 3) {
889                 struct tdb_context *tdb;
890                 tdb = tdb_open_ex(argv[1], hashsize[0], tdb_flags[0],
891                                   open_flags[0], 0600,
892                                   NULL, hash_key);
893                 run_ops(tdb, pipes[0].fd[0], argv[2],
894                         op[0], 1, num_ops[0]);
895                 exit(0);
896         }
897
898         if (pipe(fds) != 0)
899                 err(1, "creating pipe");
900
901         for (i = 0; i < argc - 2; i++) {
902                 struct tdb_context *tdb;
903
904                 switch (fork()) {
905                 case -1:
906                         err(1, "fork failed");
907                 case 0:
908                         close(fds[1]);
909                         tdb = tdb_open_ex(argv[1], hashsize[i], tdb_flags[i],
910                                           open_flags[i], 0600,
911                                           NULL, hash_key);
912                         if (!tdb)
913                                 err(1, "Opening tdb %s", argv[1]);
914
915                         /* This catches parent exiting. */
916                         if (read(fds[0], &c, 1) != 1)
917                                 exit(1);
918                         run_ops(tdb, pipes[i].fd[0], argv[2+i],
919                                 op[i], 1, num_ops[i]);
920                         exit(0);
921                 default:
922                         break;
923                 }
924         }
925
926         /* Let everything settle. */
927         sleep(1);
928
929         gettimeofday(&start, NULL);
930         /* Tell them all to go!  Any write of sufficient length will do. */
931         if (write(fds[1], hashsize, i) != i)
932                 err(1, "Writing to wakeup pipe");
933
934         for (i = 0; i < argc - 2; i++) {
935                 int status;
936                 wait(&status);
937                 if (!WIFEXITED(status))
938                         errx(1, "Child died with signal");
939                 if (WEXITSTATUS(status) != 0)
940                         errx(1, "Child died with error code");
941         }
942         gettimeofday(&end, NULL);
943
944         end.tv_sec -= start.tv_sec;
945         printf("Time replaying: %lu usec\n",
946                end.tv_sec * 1000000UL + (end.tv_usec - start.tv_usec));
947         
948         exit(0);
949 }