]> git.ozlabs.org Git - ccan/blob - ccan/tdb/tools/replay_trace.c
Better tdb tracing, start of decent replay_trace.
[ccan] / ccan / tdb / tools / replay_trace.c
1 #include <ccan/tdb/tdb.h>
2 #include <ccan/grab_file/grab_file.h>
3 #include <ccan/hash/hash.h>
4 #include <ccan/talloc/talloc.h>
5 #include <ccan/str_talloc/str_talloc.h>
6 #include <ccan/str/str.h>
7 #include <ccan/list/list.h>
8 #include <err.h>
9 #include <ctype.h>
10 #include <string.h>
11 #include <unistd.h>
12 #include <sys/types.h>
13 #include <sys/wait.h>
14 #include <sys/time.h>
15
16 #define STRINGIFY2(x) #x
17 #define STRINGIFY(x) STRINGIFY2(x)
18
19 /* Avoid mod by zero */
20 static unsigned int total_keys = 1;
21
22 /* #define DEBUG_DEPS 1 */
23
24 struct pipe {
25         int fd[2];
26 };
27 static struct pipe *pipes;
28
29 static void __attribute__((noreturn)) fail(const char *filename,
30                                            unsigned int line,
31                                            const char *fmt, ...)
32 {
33         va_list ap;
34
35         va_start(ap, fmt);
36         fprintf(stderr, "%s:%u: FAIL: ", filename, line);
37         vfprintf(stderr, fmt, ap);
38         fprintf(stderr, "\n");
39         va_end(ap);
40         exit(1);
41 }
42         
43 /* Try or die. */
44 #define try(expr, expect)                                               \
45         do {                                                            \
46                 int ret = (expr);                                       \
47                 if (ret != (expect))                                    \
48                         fail(filename, i+1, STRINGIFY(expr) "= %i", ret); \
49         } while (0)
50
51 /* Try or imitate results. */
52 #define unreliable(expr, expect, force, undo)                           \
53         do {                                                            \
54                 int ret = expr;                                         \
55                 if (ret != expect) {                                    \
56                         fprintf(stderr, "%s:%u: %s gave %i not %i",     \
57                               filename, i+1, STRINGIFY(expr), ret, expect); \
58                         if (expect == 0)                                \
59                                 force;                                  \
60                         else                                            \
61                                 undo;                                   \
62                 }                                                       \
63         } while (0)
64
65 static bool key_eq(TDB_DATA a, TDB_DATA b)
66 {
67         if (a.dsize != b.dsize)
68                 return false;
69         return memcmp(a.dptr, b.dptr, a.dsize) == 0;
70 }
71
72 /* This is based on the hash algorithm from gdbm */
73 static unsigned int hash_key(TDB_DATA *key)
74 {
75         uint32_t value; /* Used to compute the hash value.  */
76         uint32_t   i;   /* Used to cycle through random values. */
77
78         /* Set the initial value from the key size. */
79         for (value = 0x238F13AF ^ key->dsize, i=0; i < key->dsize; i++)
80                 value = (value + (key->dptr[i] << (i*5 % 24)));
81
82         return (1103515243 * value + 12345);  
83 }
84
85 enum op_type {
86         OP_TDB_LOCKALL,
87         OP_TDB_LOCKALL_MARK,
88         OP_TDB_LOCKALL_UNMARK,
89         OP_TDB_LOCKALL_NONBLOCK,
90         OP_TDB_UNLOCKALL,
91         OP_TDB_LOCKALL_READ,
92         OP_TDB_LOCKALL_READ_NONBLOCK,
93         OP_TDB_UNLOCKALL_READ,
94         OP_TDB_CHAINLOCK,
95         OP_TDB_CHAINLOCK_NONBLOCK,
96         OP_TDB_CHAINLOCK_MARK,
97         OP_TDB_CHAINLOCK_UNMARK,
98         OP_TDB_CHAINUNLOCK,
99         OP_TDB_CHAINLOCK_READ,
100         OP_TDB_CHAINUNLOCK_READ,
101         OP_TDB_INCREMENT_SEQNUM_NONBLOCK,
102         OP_TDB_PARSE_RECORD,
103         OP_TDB_EXISTS,
104         OP_TDB_STORE,
105         OP_TDB_APPEND,
106         OP_TDB_GET_SEQNUM,
107         OP_TDB_WIPE_ALL,
108         OP_TDB_TRANSACTION_START,
109         OP_TDB_TRANSACTION_CANCEL,
110         OP_TDB_TRANSACTION_COMMIT,
111         OP_TDB_TRAVERSE_READ_START,
112         OP_TDB_TRAVERSE_START,
113         OP_TDB_TRAVERSE_END,
114         OP_TDB_TRAVERSE,
115         OP_TDB_FIRSTKEY,
116         OP_TDB_NEXTKEY,
117         OP_TDB_FETCH,
118         OP_TDB_DELETE,
119 };
120
121 struct op {
122         unsigned int serial;
123         enum op_type op;
124         TDB_DATA key;
125         TDB_DATA data;
126         int ret;
127         /* Who is waiting for us? */
128         struct list_head post;
129         /* How many are we waiting for? */
130         unsigned int pre;
131
132         union {
133                 int flag; /* open and store */
134                 struct traverse *trav; /* traverse start */
135                 TDB_DATA post_append; /* append */
136         };
137 };
138
139 static unsigned char hex_char(const char *filename, unsigned int line, char c)
140 {
141         c = toupper(c);
142         if (c >= 'A' && c <= 'F')
143                 return c - 'A' + 10;
144         if (c >= '0' && c <= '9')
145                 return c - '0';
146         fail(filename, line, "invalid hex character '%c'", c);
147 }
148
149 /* TDB data is <size>:<%02x>* */
150 static TDB_DATA make_tdb_data(const void *ctx,
151                               const char *filename, unsigned int line,
152                               const char *word)
153 {
154         TDB_DATA data;
155         unsigned int i;
156         const char *p;
157
158         if (streq(word, "NULL"))
159                 return tdb_null;
160
161         data.dsize = atoi(word);
162         data.dptr = talloc_array(ctx, unsigned char, data.dsize);
163         p = strchr(word, ':');
164         if (!p)
165                 fail(filename, line, "invalid tdb data '%s'", word);
166         p++;
167         for (i = 0; i < data.dsize; i++)
168                 data.dptr[i] = hex_char(filename, line, p[i*2])*16
169                         + hex_char(filename, line, p[i*2+1]);
170
171         return data;
172 }
173
174 static void add_op(const char *filename, struct op **op, unsigned int i,
175                    unsigned int serial, enum op_type type)
176 {
177         struct op *new;
178         *op = talloc_realloc(NULL, *op, struct op, i+1);
179         new = (*op) + i;
180         new->op = type;
181         new->serial = serial;
182         new->pre = 0;
183         new->ret = 0;
184 }
185
186 static void op_add_nothing(const char *filename,
187                            struct op op[], unsigned int op_num, char *words[])
188 {
189         if (words[2])
190                 fail(filename, op_num+1, "Expected no arguments");
191         op[op_num].key = tdb_null;
192 }
193
194 static void op_add_key(const char *filename,
195                        struct op op[], unsigned int op_num, char *words[])
196 {
197         if (words[2] == NULL || words[3])
198                 fail(filename, op_num+1, "Expected just a key");
199
200         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
201         if (op[op_num].op != OP_TDB_TRAVERSE)
202                 total_keys++;
203 }
204
205 static void op_add_key_ret(const char *filename,
206                            struct op op[], unsigned int op_num, char *words[])
207 {
208         if (!words[2] || !words[3] || !words[4] || words[5]
209             || !streq(words[3], "="))
210                 fail(filename, op_num+1, "Expected <key> = <ret>");
211         op[op_num].ret = atoi(words[4]);
212         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
213         /* May only be a unique key if it fails */
214         if (op[op_num].ret != 0)
215                 total_keys++;
216 }
217
218 static void op_add_key_data(const char *filename,
219                             struct op op[], unsigned int op_num, char *words[])
220 {
221         if (!words[2] || !words[3] || !words[4] || words[5]
222             || !streq(words[3], "="))
223                 fail(filename, op_num+1, "Expected <key> = <data>");
224         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
225         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[4]);
226         /* May only be a unique key if it fails */
227         if (!op[op_num].data.dptr)
228                 total_keys++;
229 }
230
231 /* <serial> tdb_store <rec> <rec> <flag> = <ret> */
232 static void op_add_store(const char *filename,
233                          struct op op[], unsigned int op_num, char *words[])
234 {
235         if (!words[2] || !words[3] || !words[4] || !words[5] || !words[6]
236             || words[7] || !streq(words[5], "="))
237                 fail(filename, op_num+1, "Expect <key> <data> <flag> = <ret>");
238
239         op[op_num].flag = strtoul(words[4], NULL, 0);
240         op[op_num].ret = atoi(words[6]);
241         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
242         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[3]);
243         total_keys++;
244 }
245
246 /* <serial> tdb_append <rec> <rec> = <rec> */
247 static void op_add_append(const char *filename,
248                           struct op op[], unsigned int op_num, char *words[])
249 {
250         if (!words[2] || !words[3] || !words[4] || !words[5] || words[6]
251             || !streq(words[4], "="))
252                 fail(filename, op_num+1, "Expect <key> <data> = <rec>");
253
254         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
255         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[3]);
256         op[op_num].post_append
257                 = make_tdb_data(op, filename, op_num+1, words[5]);
258         total_keys++;
259 }
260
261 /* <serial> tdb_get_seqnum = <ret> */
262 static void op_add_seqnum(const char *filename,
263                           struct op op[], unsigned int op_num, char *words[])
264 {
265         if (!words[2] || !words[3] || words[4] || !streq(words[2], "="))
266                 fail(filename, op_num+1, "Expect = <ret>");
267
268         op[op_num].key = tdb_null;
269         op[op_num].ret = atoi(words[3]);
270 }
271
272 static void op_add_traverse(const char *filename,
273                             struct op op[], unsigned int op_num, char *words[])
274 {
275         if (words[2])
276                 fail(filename, op_num+1, "Expect no arguments");
277
278         op[op_num].key = tdb_null;
279         op[op_num].trav = NULL;
280 }
281
282 struct traverse_hash {
283         TDB_DATA key;
284         unsigned int index;
285 };
286
287 /* A traverse is a hash of keys, each one associated with ops. */
288 struct traverse {
289         /* How many traversal callouts should I do? */
290         unsigned int num;
291
292         /* Where is traversal end op? */
293         unsigned int end;
294
295         /* For trivial traversals. */
296         struct traverse_hash *hash;
297 };
298
299 /* A trivial traversal is one which doesn't terminate early and only
300  * plays with its own record.  We can reliably replay these even if
301  * traverse order changes. */
302 static bool is_trivial_traverse(struct op op[], unsigned int end)
303 {
304 #if 0
305         unsigned int i;
306         TDB_DATA cur = tdb_null;
307
308         if (op[end].ret != 0)
309                 return false;
310
311         for (i = 0; i < end; i++) {
312                 if (!op[i].key.dptr)
313                         continue;
314                 if (op[i].op == OP_TDB_TRAVERSE)
315                         cur = op[i].key;
316                 if (!key_eq(cur, op[i].key))
317                         return false;
318         }
319         return true;
320 #endif
321         /* With multiple things happening at once, no traverse is trivial. */
322         return false;
323 }
324
325 static void op_analyze_traverse(const char *filename,
326                                 struct op op[], unsigned int op_num,
327                                 char *words[])
328 {
329         int i;
330         struct traverse *trav = talloc(op, struct traverse);
331
332         /* = %u means traverse function terminated. */
333         if (words[2]) {
334                 if (!streq(words[2], "=") || !words[3] || words[4])
335                         fail(filename, op_num+1, "expect = <num>");
336                 op[op_num].ret = atoi(words[3]);
337         } else
338                 op[op_num].ret = 0;
339
340         trav->num = 0;
341         trav->end = op_num;
342         for (i = op_num-1; i >= 0; i--) {
343                 if (op[i].op == OP_TDB_TRAVERSE)
344                         trav->num++;
345                 if (op[i].op != OP_TDB_TRAVERSE_READ_START
346                     && op[i].op != OP_TDB_TRAVERSE_START)
347                         continue;
348                 if (op[i].trav)
349                         continue;
350                 break;
351         }
352
353         if (i < 0)
354                 fail(filename, op_num+1, "no traversal start found");
355
356         op[i].trav = trav;
357
358         if (is_trivial_traverse(op+i, op_num-i)) {
359                 /* Fill in a plentiful hash table. */
360                 op[i].trav->hash = talloc_zero_array(op[i].trav,
361                                                      struct traverse_hash,
362                                                      trav->num * 2);
363                 for (; i < op_num; i++) {
364                         unsigned int h;
365                         if (op[i].op != OP_TDB_TRAVERSE)
366                                 continue;
367                         h = hash_key(&op[i].key) % (trav->num * 2);
368                         while (trav->hash[h].index)
369                                 h = (h + 1) % (trav->num * 2);
370                         trav->hash[h].index = i+1;
371                         trav->hash[h].key = op[i].key;
372                 }
373         } else
374                 trav->hash = NULL;
375 }
376
377 /* Keep -Wmissing-declarations happy: */
378 const struct op_table *
379 find_keyword (register const char *str, register unsigned int len);
380
381 #include "keywords.c"
382
383 static int get_len(TDB_DATA key, TDB_DATA data, void *private_data)
384 {
385         return data.dsize;
386 }
387
388 static unsigned run_ops(struct tdb_context *tdb,
389                         int pre_fd,
390                         const char *filename,
391                         struct op op[],
392                         unsigned int start, unsigned int stop);
393
394 struct traverse_info {
395         struct op *op;
396         const char *filename;
397         int pre_fd;
398         unsigned int start;
399         unsigned int i;
400 };
401
402 /* Trivial case: do whatever they did for this key. */
403 static int trivial_traverse(struct tdb_context *tdb,
404                             TDB_DATA key, TDB_DATA data,
405                             void *_tinfo)
406 {
407         struct traverse_info *tinfo = _tinfo;
408         struct traverse *trav = tinfo->op[tinfo->start].trav;
409         unsigned int h = hash_key(&key) % (trav->num * 2);
410
411         while (trav->hash[h].index) {
412                 if (key_eq(trav->hash[h].key, key)) {
413                         run_ops(tdb, tinfo->pre_fd, tinfo->filename, tinfo->op,
414                                 trav->hash[h].index, trav->end);
415                         tinfo->i++;
416                         return 0;
417                 }
418                 h = (h + 1) % (trav->num * 2);
419         }
420         fail(tinfo->filename, tinfo->start + 1, "unexpected traverse key");
421 }
422
423 /* More complex.  Just do whatever's they did at the n'th entry. */
424 static int nontrivial_traverse(struct tdb_context *tdb,
425                                TDB_DATA key, TDB_DATA data,
426                                void *_tinfo)
427 {
428         struct traverse_info *tinfo = _tinfo;
429         struct traverse *trav = tinfo->op[tinfo->start].trav;
430
431         if (tinfo->i == trav->end) {
432                 /* This can happen if traverse expects to be empty. */
433                 if (tinfo->start + 1 == trav->end)
434                         return 1;
435                 fail(tinfo->filename, tinfo->start + 1,
436                      "traverse did not terminate");
437         }
438
439         if (tinfo->op[tinfo->i].op != OP_TDB_TRAVERSE)
440                 fail(tinfo->filename, tinfo->start + 1,
441                      "%s:%u:traverse terminated early");
442
443         /* Run any normal ops. */
444         tinfo->i = run_ops(tdb, tinfo->pre_fd, tinfo->filename, tinfo->op,
445                            tinfo->i+1, trav->end);
446
447         if (tinfo->i == trav->end)
448                 return 1;
449
450         return 0;
451 }
452
453 static unsigned op_traverse(struct tdb_context *tdb,
454                             int pre_fd,
455                             const char *filename,
456                             int (*traversefn)(struct tdb_context *,
457                                               tdb_traverse_func, void *),
458                             struct op op[],
459                             unsigned int start)
460 {
461         struct traverse *trav = op[start].trav;
462         struct traverse_info tinfo = { op, filename, pre_fd, start, start+1 };
463
464         /* Trivial case. */
465         if (trav->hash) {
466                 int ret = traversefn(tdb, trivial_traverse, &tinfo);
467                 if (ret != trav->num)
468                         fail(filename, start+1, "short traversal %i", ret);
469                 return trav->end;
470         }
471
472         traversefn(tdb, nontrivial_traverse, &tinfo);
473
474         /* Traversing in wrong order can have strange effects: eg. if
475          * original traverse went A (delete A), B, we might do B
476          * (delete A).  So if we have ops left over, we do it now. */
477         while (tinfo.i != trav->end) {
478                 if (op[tinfo.i].op == OP_TDB_TRAVERSE)
479                         tinfo.i++;
480                 else
481                         tinfo.i = run_ops(tdb, pre_fd, filename, op,
482                                           tinfo.i, trav->end);
483         }
484         return trav->end;
485 }
486
487 struct depend {
488         /* We can have more than one */
489         struct list_node list;
490         unsigned int file;
491         unsigned int op;
492 };
493
494 static void do_pre(const char *filename, int pre_fd,
495                    struct op op[], unsigned int i)
496 {
497         while (op[i].pre != 0) {
498                 unsigned int opnum;
499
500 #if DEBUG_DEPS
501                 printf("%s:%u:waiting for pre\n", filename, i+1);
502 #endif
503                 if (read(pre_fd, &opnum, sizeof(opnum)) != sizeof(opnum))
504                         errx(1, "Reading from pipe");
505
506 #if DEBUG_DEPS
507                 printf("%s:%u:got pre %u\n",
508                        filename, i+1, opnum);
509 #endif
510                 /* This could be any op, not just this one. */
511                 if (op[opnum].pre == 0)
512                         errx(1, "Got unexpected notification for op line %u",
513                              opnum + 1);
514                 op[opnum].pre--;
515         }
516 }
517
518 static void do_post(const char *filename, const struct op op[], unsigned int i)
519 {
520         struct depend *dep;
521
522         list_for_each(&op[i].post, dep, list) {
523 #if DEBUG_DEPS
524                 printf("%s:%u:sending %u to file %u\n", filename, i+1,
525                        dep->op, dep->file);
526 #endif
527                 if (write(pipes[dep->file].fd[1], &dep->op, sizeof(dep->op))
528                     != sizeof(dep->op))
529                         err(1, "Failed to tell file %u", dep->file);
530         }
531 }
532
533 static __attribute__((noinline))
534 unsigned run_ops(struct tdb_context *tdb,
535                  int pre_fd,
536                  const char *filename,
537                  struct op op[], unsigned int start, unsigned int stop)
538 {
539         unsigned int i;
540
541         for (i = start; i < stop; i++) {
542                 do_pre(filename, pre_fd, op, i);
543
544                 switch (op[i].op) {
545                 case OP_TDB_LOCKALL:
546                         try(tdb_lockall(tdb), op[i].ret);
547                         break;
548                 case OP_TDB_LOCKALL_MARK:
549                         try(tdb_lockall_mark(tdb), op[i].ret);
550                         break;
551                 case OP_TDB_LOCKALL_UNMARK:
552                         try(tdb_lockall_unmark(tdb), op[i].ret);
553                         break;
554                 case OP_TDB_LOCKALL_NONBLOCK:
555                         unreliable(tdb_lockall_nonblock(tdb), op[i].ret,
556                                    tdb_lockall(tdb), tdb_unlockall(tdb));
557                         break;
558                 case OP_TDB_UNLOCKALL:
559                         try(tdb_unlockall(tdb), op[i].ret);
560                         break;
561                 case OP_TDB_LOCKALL_READ:
562                         try(tdb_lockall_read(tdb), op[i].ret);
563                         break;
564                 case OP_TDB_LOCKALL_READ_NONBLOCK:
565                         unreliable(tdb_lockall_read_nonblock(tdb), op[i].ret,
566                                    tdb_lockall_read(tdb),
567                                    tdb_unlockall_read(tdb));
568                         break;
569                 case OP_TDB_UNLOCKALL_READ:
570                         try(tdb_unlockall_read(tdb), op[i].ret);
571                         break;
572                 case OP_TDB_CHAINLOCK:
573                         try(tdb_chainlock(tdb, op[i].key), op[i].ret);
574                         break;
575                 case OP_TDB_CHAINLOCK_NONBLOCK:
576                         unreliable(tdb_chainlock_nonblock(tdb, op[i].key),
577                                    op[i].ret,
578                                    tdb_chainlock(tdb, op[i].key),
579                                    tdb_chainunlock(tdb, op[i].key));
580                         break;
581                 case OP_TDB_CHAINLOCK_MARK:
582                         try(tdb_chainlock_mark(tdb, op[i].key), op[i].ret);
583                         break;
584                 case OP_TDB_CHAINLOCK_UNMARK:
585                         try(tdb_chainlock_unmark(tdb, op[i].key), op[i].ret);
586                         break;
587                 case OP_TDB_CHAINUNLOCK:
588                         try(tdb_chainunlock(tdb, op[i].key), op[i].ret);
589                         break;
590                 case OP_TDB_CHAINLOCK_READ:
591                         try(tdb_chainlock_read(tdb, op[i].key), op[i].ret);
592                         break;
593                 case OP_TDB_CHAINUNLOCK_READ:
594                         try(tdb_chainunlock_read(tdb, op[i].key), op[i].ret);
595                         break;
596                 case OP_TDB_INCREMENT_SEQNUM_NONBLOCK:
597                         tdb_increment_seqnum_nonblock(tdb);
598                         break;
599                 case OP_TDB_PARSE_RECORD:
600                         try(tdb_parse_record(tdb, op[i].key, get_len, NULL),
601                             op[i].ret);
602                         break;
603                 case OP_TDB_EXISTS:
604                         try(tdb_exists(tdb, op[i].key), op[i].ret);
605                         break;
606                 case OP_TDB_STORE:
607                         try(tdb_store(tdb, op[i].key, op[i].data, op[i].flag),
608                             op[i].ret < 0 ? op[i].ret : 0);
609                         break;
610                 case OP_TDB_APPEND:
611                         try(tdb_append(tdb, op[i].key, op[i].data),
612                             op[i].ret < 0 ? op[i].ret : 0);
613                         break;
614                 case OP_TDB_GET_SEQNUM:
615                         try(tdb_get_seqnum(tdb), op[i].ret);
616                         break;
617                 case OP_TDB_WIPE_ALL:
618                         try(tdb_wipe_all(tdb), op[i].ret);
619                         break;
620                 case OP_TDB_TRANSACTION_START:
621                         try(tdb_transaction_start(tdb), op[i].ret);
622                         break;
623                 case OP_TDB_TRANSACTION_CANCEL:
624                         try(tdb_transaction_cancel(tdb), op[i].ret);
625                         break;
626                 case OP_TDB_TRANSACTION_COMMIT:
627                         try(tdb_transaction_commit(tdb), op[i].ret);
628                         break;
629                 case OP_TDB_TRAVERSE_READ_START:
630                         i = op_traverse(tdb, pre_fd, filename,
631                                         tdb_traverse_read, op, i);
632                         break;
633                 case OP_TDB_TRAVERSE_START:
634                         i = op_traverse(tdb, pre_fd, filename,
635                                         tdb_traverse, op, i);
636                         break;
637                 case OP_TDB_TRAVERSE:
638                         /* Terminate: we're in a traverse, and we've
639                          * done our ops. */
640                         return i;
641                 case OP_TDB_TRAVERSE_END:
642                         fail(filename, i+1, "unepxected end traverse");
643                 /* FIXME: These must be treated like traverse. */
644                 case OP_TDB_FIRSTKEY:
645                         if (!key_eq(tdb_firstkey(tdb), op[i].data))
646                                 fail(filename, i+1, "bad firstkey");
647                         break;
648                 case OP_TDB_NEXTKEY:
649                         if (!key_eq(tdb_nextkey(tdb, op[i].key), op[i].data))
650                                 fail(filename, i+1, "bad nextkey");
651                         break;
652                 case OP_TDB_FETCH: {
653                         TDB_DATA f = tdb_fetch(tdb, op[i].key);
654                         if (!key_eq(f, op[i].data))
655                                 fail(filename, i+1, "bad fetch %u", f.dsize);
656                         break;
657                 }
658                 case OP_TDB_DELETE:
659                         try(tdb_delete(tdb, op[i].key), op[i].ret);
660                         break;
661                 }
662                 do_post(filename, op, i);
663         }
664         return i;
665 }
666
667 static struct op *load_tracefile(const char *filename, unsigned int *num,
668                                  unsigned int *hashsize,
669                                  unsigned int *tdb_flags,
670                                  unsigned int *open_flags)
671 {
672         unsigned int i;
673         struct op *op = talloc_array(NULL, struct op, 1);
674         char **words;
675         char **lines;
676         char *file;
677
678         file = grab_file(NULL, filename, NULL);
679         if (!file)
680                 err(1, "Reading %s", filename);
681
682         lines = strsplit(file, file, "\n", NULL);
683         if (!lines[0])
684                 errx(1, "%s is empty", filename);
685
686         words = strsplit(lines, lines[0], " ", NULL);
687         if (!streq(words[1], "tdb_open"))
688                 fail(filename, 1, "does not start with tdb_open");
689
690         *hashsize = atoi(words[2]);
691         *tdb_flags = strtoul(words[3], NULL, 0);
692         *open_flags = strtoul(words[4], NULL, 0);
693
694         for (i = 1; lines[i]; i++) {
695                 const struct op_table *opt;
696
697                 words = strsplit(lines, lines[i], " ", NULL);
698                 if (!words[0] || !words[1])
699                         fail(filename, i+1, "Expected serial number and op");
700                
701                 opt = find_keyword(words[1], strlen(words[1]));
702                 if (!opt) {
703                         if (streq(words[1], "tdb_close")) {
704                                 if (lines[i+1])
705                                         fail(filename, i+2,
706                                              "lines after tdb_close");
707                                 *num = i;
708                                 talloc_free(lines);
709                                 return op;
710                         }
711                         fail(filename, i+1, "Unknown operation '%s'", words[1]);
712                 }
713
714                 add_op(filename, &op, i, atoi(words[0]), opt->type);
715                 opt->enhance_op(filename, op, i, words);
716         }
717
718         fprintf(stderr, "%s:%u:last operation is not tdb_close: incomplete?",
719               filename, i);
720         talloc_free(lines);
721         *num = i - 1;
722         return op;
723 }
724
725 /* We remember all the keys we've ever seen, and who has them. */
726 struct key_user {
727         unsigned int file;
728         unsigned int op_num;
729 };
730
731 struct keyinfo {
732         TDB_DATA key;
733         unsigned int num_users;
734         struct key_user *user;
735 };
736
737 static bool changes_db(const struct op *op)
738 {
739         if (op->ret != 0)
740                 return false;
741
742         return op->op == OP_TDB_STORE
743                 || op->op == OP_TDB_APPEND
744                 || op->op == OP_TDB_WIPE_ALL
745                 || op->op == OP_TDB_TRANSACTION_COMMIT
746                 || op->op == OP_TDB_DELETE;
747 }
748
749 static struct keyinfo *hash_ops(struct op *op[], unsigned int num_ops[],
750                                 unsigned int num)
751 {
752         unsigned int i, j, h;
753         struct keyinfo *hash;
754
755         /* Gcc nexted function extension.  How cool is this? */
756         int compare_user_serial(const void *_a, const void *_b)
757         {
758                 const struct key_user *a = _a, *b = _b;
759                 int ret = op[a->file][a->op_num].serial
760                         - op[b->file][b->op_num].serial;
761
762                 /* Fetches don't inc serial, so we put changes first. */
763                 if (ret == 0) {
764                         if (changes_db(&op[a->file][a->op_num])
765                             && !changes_db(&op[b->file][b->op_num]))
766                                 return -1;
767                         if (changes_db(&op[b->file][b->op_num])
768                             && !changes_db(&op[a->file][a->op_num]))
769                                 return 1;
770                 }
771                 return ret;
772         }
773
774         hash = talloc_zero_array(op[0], struct keyinfo, total_keys*2);
775         for (i = 0; i < num; i++) {
776                 for (j = 1; j < num_ops[i]; j++) {
777                         /* We can't do this on allocation, due to realloc. */
778                         list_head_init(&op[i][j].post);
779
780                         if (!op[i][j].key.dptr)
781                                 continue;
782
783                         /* We don't wait for traverse keys */
784                         /* FIXME: We should, for trivial traversals. */
785                         if (op[i][j].op == OP_TDB_TRAVERSE)
786                                 continue;
787
788                         h = hash_key(&op[i][j].key) % (total_keys * 2);
789                         while (!key_eq(hash[h].key, op[i][j].key)) {
790                                 if (!hash[h].key.dptr) {
791                                         hash[h].key = op[i][j].key;
792                                         break;
793                                 }
794                                 h = (h + 1) % (total_keys * 2);
795                         }
796                         /* Might as well save some memory if we can. */
797                         if (op[i][j].key.dptr != hash[h].key.dptr) {
798                                 talloc_free(op[i][j].key.dptr);
799                                 op[i][j].key.dptr = hash[h].key.dptr;
800                         }
801                         hash[h].user = talloc_realloc(hash, hash[h].user,
802                                                      struct key_user,
803                                                      hash[h].num_users+1);
804                         hash[h].user[hash[h].num_users].op_num = j;
805                         hash[h].user[hash[h].num_users].file = i;
806                         hash[h].num_users++;
807                 }
808         }
809
810         /* Now sort into seqnum order. */
811         for (h = 0; h < total_keys * 2; h++)
812                 qsort(hash[h].user, hash[h].num_users, sizeof(hash[h].user[0]),
813                       compare_user_serial);
814
815         return hash;
816 }
817
818 static void add_dependency(void *ctx,
819                            struct op *op[],
820                            unsigned int needs_file,
821                            unsigned int needs_opnum,
822                            unsigned int satisfies_file,
823                            unsigned int satisfies_opnum)
824 {
825         struct depend *post;
826
827         post = talloc(ctx, struct depend);
828         post->file = needs_file;
829         post->op = needs_opnum;
830         list_add(&op[satisfies_file][satisfies_opnum].post, &post->list);
831
832         op[needs_file][needs_opnum].pre++;
833 }
834
835 static void derive_dependencies(char *filename[],
836                                 struct op *op[], unsigned int num_ops[],
837                                 unsigned int num)
838 {
839         struct keyinfo *hash;
840         unsigned int i;
841
842         /* Create hash table for faster key lookup. */
843         hash = hash_ops(op, num_ops, num);
844
845         /* We make the naive assumption that two ops on the same key
846          * have to be ordered; it's overkill. */
847         for (i = 0; i < total_keys * 2; i++) {
848                 unsigned int j;
849
850                 for (j = 1; j < hash[i].num_users; j++) {
851                         /* We don't depend on ourselves. */
852                         if (hash[i].user[j].file == hash[i].user[j-1].file)
853                                 continue;
854 #if DEBUG_DEPS
855                         printf("%s:%u: depends on %s:%u\n",
856                                filename[hash[i].user[j].file],
857                                hash[i].user[j].op_num+1,
858                                filename[hash[i].user[j-1].file],
859                                hash[i].user[j-1].op_num+1);
860 #endif
861                         add_dependency(hash, op,
862                                        hash[i].user[j].file,
863                                        hash[i].user[j].op_num,
864                                        hash[i].user[j-1].file,
865                                        hash[i].user[j-1].op_num);
866                 }
867         }
868 }
869
870 int main(int argc, char *argv[])
871 {
872         struct timeval start, end;
873         unsigned int i, num_ops[argc], hashsize[argc], tdb_flags[argc], open_flags[argc];
874         struct op *op[argc];
875         int fds[2];
876         char c;
877
878         if (argc < 3)
879                 errx(1, "Usage: %s <tdbfile> <tracefile>...", argv[0]);
880
881         pipes = talloc_array(NULL, struct pipe, argc - 2);
882         for (i = 0; i < argc - 2; i++) {
883                 op[i] = load_tracefile(argv[2+i], &num_ops[i], &hashsize[i],
884                                        &tdb_flags[i], &open_flags[i]);
885                 if (pipe(pipes[i].fd) != 0)
886                         err(1, "creating pipe");
887         }
888
889         derive_dependencies(argv+2, op, num_ops, i);
890
891         /* Don't fork for single arg case: simple debugging. */
892         if (argc == 3) {
893                 struct tdb_context *tdb;
894                 tdb = tdb_open_ex(argv[1], hashsize[0], tdb_flags[0],
895                                   open_flags[0], 0600,
896                                   NULL, hash_key);
897                 run_ops(tdb, pipes[0].fd[0], argv[2],
898                         op[0], 1, num_ops[0]);
899                 exit(0);
900         }
901
902         if (pipe(fds) != 0)
903                 err(1, "creating pipe");
904
905         for (i = 0; i < argc - 2; i++) {
906                 struct tdb_context *tdb;
907
908                 switch (fork()) {
909                 case -1:
910                         err(1, "fork failed");
911                 case 0:
912                         close(fds[1]);
913                         tdb = tdb_open_ex(argv[1], hashsize[i], tdb_flags[i],
914                                           open_flags[i], 0600,
915                                           NULL, hash_key);
916                         if (!tdb)
917                                 err(1, "Opening tdb %s", argv[1]);
918
919                         /* This catches parent exiting. */
920                         if (read(fds[0], &c, 1) != 1)
921                                 exit(1);
922                         run_ops(tdb, pipes[i].fd[0], argv[2+i],
923                                 op[i], 1, num_ops[i]);
924                         exit(0);
925                 default:
926                         break;
927                 }
928         }
929
930         /* Let everything settle. */
931         sleep(1);
932
933         gettimeofday(&start, NULL);
934         /* Tell them all to go!  Any write of sufficient length will do. */
935         if (write(fds[1], hashsize, i) != 1)
936                 err(1, "Writing to wakeup pipe");
937
938         for (i = 0; i < argc - 2; i++) {
939                 int status;
940                 wait(&status);
941                 if (!WIFEXITED(status))
942                         errx(1, "Child died with signal");
943                 if (WEXITSTATUS(status) != 0)
944                         errx(1, "Child died with error code");
945         }
946         gettimeofday(&end, NULL);
947
948         end.tv_sec -= start.tv_sec;
949         printf("Time replaying: %lu usec\n",
950                end.tv_sec * 1000000UL + (end.tv_usec - start.tv_usec));
951         
952         exit(0);
953 }