]> git.ozlabs.org Git - ccan/blob - ccan/tdb/tools/replay_trace.c
0a4201c3522a9a25283267f766d41973df67b150
[ccan] / ccan / tdb / tools / replay_trace.c
1 #include <ccan/tdb/tdb.h>
2 #include <ccan/grab_file/grab_file.h>
3 #include <ccan/hash/hash.h>
4 #include <ccan/talloc/talloc.h>
5 #include <ccan/str_talloc/str_talloc.h>
6 #include <ccan/str/str.h>
7 #include <ccan/list/list.h>
8 #include <err.h>
9 #include <ctype.h>
10 #include <string.h>
11 #include <unistd.h>
12 #include <sys/types.h>
13 #include <sys/wait.h>
14 #include <sys/time.h>
15
16 #define STRINGIFY2(x) #x
17 #define STRINGIFY(x) STRINGIFY2(x)
18
19 /* Avoid mod by zero */
20 static unsigned int total_keys = 1;
21
22 /* #define DEBUG_DEPS 1 */
23
24 struct pipe {
25         int fd[2];
26 };
27 static struct pipe *pipes;
28
29 static void __attribute__((noreturn)) fail(const char *filename,
30                                            unsigned int line,
31                                            const char *fmt, ...)
32 {
33         va_list ap;
34
35         va_start(ap, fmt);
36         fprintf(stderr, "%s:%u: FAIL: ", filename, line);
37         vfprintf(stderr, fmt, ap);
38         fprintf(stderr, "\n");
39         va_end(ap);
40         exit(1);
41 }
42         
43 /* Try or die. */
44 #define try(expr, expect)                                               \
45         do {                                                            \
46                 int ret = (expr);                                       \
47                 if (ret != (expect))                                    \
48                         fail(filename, i+1, STRINGIFY(expr) "= %i", ret); \
49         } while (0)
50
51 /* Try or imitate results. */
52 #define unreliable(expr, expect, force, undo)                           \
53         do {                                                            \
54                 int ret = expr;                                         \
55                 if (ret != expect) {                                    \
56                         fprintf(stderr, "%s:%u: %s gave %i not %i",     \
57                               filename, i+1, STRINGIFY(expr), ret, expect); \
58                         if (expect == 0)                                \
59                                 force;                                  \
60                         else                                            \
61                                 undo;                                   \
62                 }                                                       \
63         } while (0)
64
65 static bool key_eq(TDB_DATA a, TDB_DATA b)
66 {
67         if (a.dsize != b.dsize)
68                 return false;
69         return memcmp(a.dptr, b.dptr, a.dsize) == 0;
70 }
71
72 /* This is based on the hash algorithm from gdbm */
73 static unsigned int hash_key(TDB_DATA *key)
74 {
75         uint32_t value; /* Used to compute the hash value.  */
76         uint32_t   i;   /* Used to cycle through random values. */
77
78         /* Set the initial value from the key size. */
79         for (value = 0x238F13AF ^ key->dsize, i=0; i < key->dsize; i++)
80                 value = (value + (key->dptr[i] << (i*5 % 24)));
81
82         return (1103515243 * value + 12345);  
83 }
84
85 enum op_type {
86         OP_TDB_LOCKALL,
87         OP_TDB_LOCKALL_MARK,
88         OP_TDB_LOCKALL_UNMARK,
89         OP_TDB_LOCKALL_NONBLOCK,
90         OP_TDB_UNLOCKALL,
91         OP_TDB_LOCKALL_READ,
92         OP_TDB_LOCKALL_READ_NONBLOCK,
93         OP_TDB_UNLOCKALL_READ,
94         OP_TDB_CHAINLOCK,
95         OP_TDB_CHAINLOCK_NONBLOCK,
96         OP_TDB_CHAINLOCK_MARK,
97         OP_TDB_CHAINLOCK_UNMARK,
98         OP_TDB_CHAINUNLOCK,
99         OP_TDB_CHAINLOCK_READ,
100         OP_TDB_CHAINUNLOCK_READ,
101         OP_TDB_PARSE_RECORD,
102         OP_TDB_EXISTS,
103         OP_TDB_STORE,
104         OP_TDB_APPEND,
105         OP_TDB_GET_SEQNUM,
106         OP_TDB_WIPE_ALL,
107         OP_TDB_TRANSACTION_START,
108         OP_TDB_TRANSACTION_CANCEL,
109         OP_TDB_TRANSACTION_COMMIT,
110         OP_TDB_TRAVERSE_READ_START,
111         OP_TDB_TRAVERSE_START,
112         OP_TDB_TRAVERSE_END,
113         OP_TDB_TRAVERSE,
114         OP_TDB_FIRSTKEY,
115         OP_TDB_NEXTKEY,
116         OP_TDB_FETCH,
117         OP_TDB_DELETE,
118 };
119
120 struct op {
121         unsigned int serial;
122         enum op_type op;
123         TDB_DATA key;
124         TDB_DATA data;
125         int ret;
126         /* Who is waiting for us? */
127         struct list_head post;
128         /* How many are we waiting for? */
129         unsigned int pre;
130
131         union {
132                 int flag; /* open and store */
133                 struct traverse *trav; /* traverse start */
134                 TDB_DATA post_append; /* append */
135         };
136 };
137
138 static unsigned char hex_char(const char *filename, unsigned int line, char c)
139 {
140         c = toupper(c);
141         if (c >= 'A' && c <= 'F')
142                 return c - 'A' + 10;
143         if (c >= '0' && c <= '9')
144                 return c - '0';
145         fail(filename, line, "invalid hex character '%c'", c);
146 }
147
148 /* TDB data is <size>:<%02x>* */
149 static TDB_DATA make_tdb_data(const void *ctx,
150                               const char *filename, unsigned int line,
151                               const char *word)
152 {
153         TDB_DATA data;
154         unsigned int i;
155         const char *p;
156
157         if (streq(word, "NULL"))
158                 return tdb_null;
159
160         data.dsize = atoi(word);
161         data.dptr = talloc_array(ctx, unsigned char, data.dsize);
162         p = strchr(word, ':');
163         if (!p)
164                 fail(filename, line, "invalid tdb data '%s'", word);
165         p++;
166         for (i = 0; i < data.dsize; i++)
167                 data.dptr[i] = hex_char(filename, line, p[i*2])*16
168                         + hex_char(filename, line, p[i*2+1]);
169
170         return data;
171 }
172
173 static void add_op(const char *filename, struct op **op, unsigned int i,
174                    unsigned int serial, enum op_type type)
175 {
176         struct op *new;
177         *op = talloc_realloc(NULL, *op, struct op, i+1);
178         new = (*op) + i;
179         new->op = type;
180         new->serial = serial;
181         new->pre = 0;
182         new->ret = 0;
183 }
184
185 static void op_add_nothing(const char *filename,
186                            struct op op[], unsigned int op_num, char *words[])
187 {
188         if (words[2])
189                 fail(filename, op_num+1, "Expected no arguments");
190         op[op_num].key = tdb_null;
191 }
192
193 static void op_add_key(const char *filename,
194                        struct op op[], unsigned int op_num, char *words[])
195 {
196         if (words[2] == NULL || words[3])
197                 fail(filename, op_num+1, "Expected just a key");
198
199         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
200         if (op[op_num].op != OP_TDB_TRAVERSE)
201                 total_keys++;
202 }
203
204 static void op_add_key_ret(const char *filename,
205                            struct op op[], unsigned int op_num, char *words[])
206 {
207         if (!words[2] || !words[3] || !words[4] || words[5]
208             || !streq(words[3], "="))
209                 fail(filename, op_num+1, "Expected <key> = <ret>");
210         op[op_num].ret = atoi(words[4]);
211         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
212         /* May only be a unique key if it fails */
213         if (op[op_num].ret != 0)
214                 total_keys++;
215 }
216
217 static void op_add_key_data(const char *filename,
218                             struct op op[], unsigned int op_num, char *words[])
219 {
220         if (!words[2] || !words[3] || !words[4] || words[5]
221             || !streq(words[3], "="))
222                 fail(filename, op_num+1, "Expected <key> = <data>");
223         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
224         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[4]);
225         /* May only be a unique key if it fails */
226         if (!op[op_num].data.dptr)
227                 total_keys++;
228 }
229
230 /* <serial> tdb_store <rec> <rec> <flag> = <ret> */
231 static void op_add_store(const char *filename,
232                          struct op op[], unsigned int op_num, char *words[])
233 {
234         if (!words[2] || !words[3] || !words[4] || !words[5] || !words[6]
235             || words[7] || !streq(words[5], "="))
236                 fail(filename, op_num+1, "Expect <key> <data> <flag> = <ret>");
237
238         op[op_num].flag = strtoul(words[4], NULL, 0);
239         op[op_num].ret = atoi(words[6]);
240         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
241         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[3]);
242         total_keys++;
243 }
244
245 /* <serial> tdb_append <rec> <rec> = <rec> */
246 static void op_add_append(const char *filename,
247                           struct op op[], unsigned int op_num, char *words[])
248 {
249         if (!words[2] || !words[3] || !words[4] || !words[5] || words[6]
250             || !streq(words[4], "="))
251                 fail(filename, op_num+1, "Expect <key> <data> = <rec>");
252
253         op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
254         op[op_num].data = make_tdb_data(op, filename, op_num+1, words[3]);
255         op[op_num].post_append
256                 = make_tdb_data(op, filename, op_num+1, words[5]);
257         total_keys++;
258 }
259
260 /* <serial> tdb_get_seqnum = <ret> */
261 static void op_add_seqnum(const char *filename,
262                           struct op op[], unsigned int op_num, char *words[])
263 {
264         if (!words[2] || !words[3] || words[4] || !streq(words[2], "="))
265                 fail(filename, op_num+1, "Expect = <ret>");
266
267         op[op_num].key = tdb_null;
268         op[op_num].ret = atoi(words[3]);
269 }
270
271 static void op_add_traverse(const char *filename,
272                             struct op op[], unsigned int op_num, char *words[])
273 {
274         if (words[2])
275                 fail(filename, op_num+1, "Expect no arguments");
276
277         op[op_num].key = tdb_null;
278         op[op_num].trav = NULL;
279 }
280
281 struct traverse_hash {
282         TDB_DATA key;
283         unsigned int index;
284 };
285
286 /* A traverse is a hash of keys, each one associated with ops. */
287 struct traverse {
288         /* How many traversal callouts should I do? */
289         unsigned int num;
290
291         /* Where is traversal end op? */
292         unsigned int end;
293
294         /* For trivial traversals. */
295         struct traverse_hash *hash;
296 };
297
298 /* A trivial traversal is one which doesn't terminate early and only
299  * plays with its own record.  We can reliably replay these even if
300  * traverse order changes. */
301 static bool is_trivial_traverse(struct op op[], unsigned int end)
302 {
303 #if 0
304         unsigned int i;
305         TDB_DATA cur = tdb_null;
306
307         if (op[end].ret != 0)
308                 return false;
309
310         for (i = 0; i < end; i++) {
311                 if (!op[i].key.dptr)
312                         continue;
313                 if (op[i].op == OP_TDB_TRAVERSE)
314                         cur = op[i].key;
315                 if (!key_eq(cur, op[i].key))
316                         return false;
317         }
318         return true;
319 #endif
320         /* With multiple things happening at once, no traverse is trivial. */
321         return false;
322 }
323
324 static void op_analyze_traverse(const char *filename,
325                                 struct op op[], unsigned int op_num,
326                                 char *words[])
327 {
328         int i;
329         struct traverse *trav = talloc(op, struct traverse);
330
331         op[op_num].key = tdb_null;
332
333         /* = %u means traverse function terminated. */
334         if (words[2]) {
335                 if (!streq(words[2], "=") || !words[3] || words[4])
336                         fail(filename, op_num+1, "expect = <num>");
337                 op[op_num].ret = atoi(words[3]);
338         } else
339                 op[op_num].ret = 0;
340
341         trav->num = 0;
342         trav->end = op_num;
343         for (i = op_num-1; i >= 0; i--) {
344                 if (op[i].op == OP_TDB_TRAVERSE)
345                         trav->num++;
346                 if (op[i].op != OP_TDB_TRAVERSE_READ_START
347                     && op[i].op != OP_TDB_TRAVERSE_START)
348                         continue;
349                 if (op[i].trav)
350                         continue;
351                 break;
352         }
353
354         if (i < 0)
355                 fail(filename, op_num+1, "no traversal start found");
356
357         op[i].trav = trav;
358
359         if (is_trivial_traverse(op+i, op_num-i)) {
360                 /* Fill in a plentiful hash table. */
361                 op[i].trav->hash = talloc_zero_array(op[i].trav,
362                                                      struct traverse_hash,
363                                                      trav->num * 2);
364                 for (; i < op_num; i++) {
365                         unsigned int h;
366                         if (op[i].op != OP_TDB_TRAVERSE)
367                                 continue;
368                         h = hash_key(&op[i].key) % (trav->num * 2);
369                         while (trav->hash[h].index)
370                                 h = (h + 1) % (trav->num * 2);
371                         trav->hash[h].index = i+1;
372                         trav->hash[h].key = op[i].key;
373                 }
374         } else
375                 trav->hash = NULL;
376 }
377
378 /* Keep -Wmissing-declarations happy: */
379 const struct op_table *
380 find_keyword (register const char *str, register unsigned int len);
381
382 #include "keywords.c"
383
384 static int get_len(TDB_DATA key, TDB_DATA data, void *private_data)
385 {
386         return data.dsize;
387 }
388
389 static unsigned run_ops(struct tdb_context *tdb,
390                         int pre_fd,
391                         const char *filename,
392                         struct op op[],
393                         unsigned int start, unsigned int stop);
394
395 struct traverse_info {
396         struct op *op;
397         const char *filename;
398         int pre_fd;
399         unsigned int start;
400         unsigned int i;
401 };
402
403 /* Trivial case: do whatever they did for this key. */
404 static int trivial_traverse(struct tdb_context *tdb,
405                             TDB_DATA key, TDB_DATA data,
406                             void *_tinfo)
407 {
408         struct traverse_info *tinfo = _tinfo;
409         struct traverse *trav = tinfo->op[tinfo->start].trav;
410         unsigned int h = hash_key(&key) % (trav->num * 2);
411
412         while (trav->hash[h].index) {
413                 if (key_eq(trav->hash[h].key, key)) {
414                         run_ops(tdb, tinfo->pre_fd, tinfo->filename, tinfo->op,
415                                 trav->hash[h].index, trav->end);
416                         tinfo->i++;
417                         return 0;
418                 }
419                 h = (h + 1) % (trav->num * 2);
420         }
421         fail(tinfo->filename, tinfo->start + 1, "unexpected traverse key");
422 }
423
424 /* More complex.  Just do whatever's they did at the n'th entry. */
425 static int nontrivial_traverse(struct tdb_context *tdb,
426                                TDB_DATA key, TDB_DATA data,
427                                void *_tinfo)
428 {
429         struct traverse_info *tinfo = _tinfo;
430         struct traverse *trav = tinfo->op[tinfo->start].trav;
431
432         if (tinfo->i == trav->end) {
433                 /* This can happen if traverse expects to be empty. */
434                 if (tinfo->start + 1 == trav->end)
435                         return 1;
436                 fail(tinfo->filename, tinfo->start + 1,
437                      "traverse did not terminate");
438         }
439
440         if (tinfo->op[tinfo->i].op != OP_TDB_TRAVERSE)
441                 fail(tinfo->filename, tinfo->start + 1,
442                      "%s:%u:traverse terminated early");
443
444         /* Run any normal ops. */
445         tinfo->i = run_ops(tdb, tinfo->pre_fd, tinfo->filename, tinfo->op,
446                            tinfo->i+1, trav->end);
447
448         if (tinfo->i == trav->end)
449                 return 1;
450
451         return 0;
452 }
453
454 static unsigned op_traverse(struct tdb_context *tdb,
455                             int pre_fd,
456                             const char *filename,
457                             int (*traversefn)(struct tdb_context *,
458                                               tdb_traverse_func, void *),
459                             struct op op[],
460                             unsigned int start)
461 {
462         struct traverse *trav = op[start].trav;
463         struct traverse_info tinfo = { op, filename, pre_fd, start, start+1 };
464
465         /* Trivial case. */
466         if (trav->hash) {
467                 int ret = traversefn(tdb, trivial_traverse, &tinfo);
468                 if (ret != trav->num)
469                         fail(filename, start+1, "short traversal %i", ret);
470                 return trav->end;
471         }
472
473         traversefn(tdb, nontrivial_traverse, &tinfo);
474
475         /* Traversing in wrong order can have strange effects: eg. if
476          * original traverse went A (delete A), B, we might do B
477          * (delete A).  So if we have ops left over, we do it now. */
478         while (tinfo.i != trav->end) {
479                 if (op[tinfo.i].op == OP_TDB_TRAVERSE)
480                         tinfo.i++;
481                 else
482                         tinfo.i = run_ops(tdb, pre_fd, filename, op,
483                                           tinfo.i, trav->end);
484         }
485         return trav->end;
486 }
487
488 struct depend {
489         /* We can have more than one */
490         struct list_node list;
491         unsigned int file;
492         unsigned int op;
493 };
494
495 static void do_pre(const char *filename, int pre_fd,
496                    struct op op[], unsigned int i)
497 {
498         while (op[i].pre != 0) {
499                 unsigned int opnum;
500
501 #if DEBUG_DEPS
502                 printf("%s:%u:waiting for pre\n", filename, i+1);
503 #endif
504                 if (read(pre_fd, &opnum, sizeof(opnum)) != sizeof(opnum))
505                         errx(1, "Reading from pipe");
506
507 #if DEBUG_DEPS
508                 printf("%s:%u:got pre %u\n",
509                        filename, i+1, opnum);
510 #endif
511                 /* This could be any op, not just this one. */
512                 if (op[opnum].pre == 0)
513                         errx(1, "Got unexpected notification for op line %u",
514                              opnum + 1);
515                 op[opnum].pre--;
516         }
517 }
518
519 static void do_post(const char *filename, const struct op op[], unsigned int i)
520 {
521         struct depend *dep;
522
523         list_for_each(&op[i].post, dep, list) {
524 #if DEBUG_DEPS
525                 printf("%s:%u:sending %u to file %u\n", filename, i+1,
526                        dep->op, dep->file);
527 #endif
528                 if (write(pipes[dep->file].fd[1], &dep->op, sizeof(dep->op))
529                     != sizeof(dep->op))
530                         err(1, "Failed to tell file %u", dep->file);
531         }
532 }
533
534 static __attribute__((noinline))
535 unsigned run_ops(struct tdb_context *tdb,
536                  int pre_fd,
537                  const char *filename,
538                  struct op op[], unsigned int start, unsigned int stop)
539 {
540         unsigned int i;
541
542         for (i = start; i < stop; i++) {
543                 do_pre(filename, pre_fd, op, i);
544
545                 switch (op[i].op) {
546                 case OP_TDB_LOCKALL:
547                         try(tdb_lockall(tdb), op[i].ret);
548                         break;
549                 case OP_TDB_LOCKALL_MARK:
550                         try(tdb_lockall_mark(tdb), op[i].ret);
551                         break;
552                 case OP_TDB_LOCKALL_UNMARK:
553                         try(tdb_lockall_unmark(tdb), op[i].ret);
554                         break;
555                 case OP_TDB_LOCKALL_NONBLOCK:
556                         unreliable(tdb_lockall_nonblock(tdb), op[i].ret,
557                                    tdb_lockall(tdb), tdb_unlockall(tdb));
558                         break;
559                 case OP_TDB_UNLOCKALL:
560                         try(tdb_unlockall(tdb), op[i].ret);
561                         break;
562                 case OP_TDB_LOCKALL_READ:
563                         try(tdb_lockall_read(tdb), op[i].ret);
564                         break;
565                 case OP_TDB_LOCKALL_READ_NONBLOCK:
566                         unreliable(tdb_lockall_read_nonblock(tdb), op[i].ret,
567                                    tdb_lockall_read(tdb),
568                                    tdb_unlockall_read(tdb));
569                         break;
570                 case OP_TDB_UNLOCKALL_READ:
571                         try(tdb_unlockall_read(tdb), op[i].ret);
572                         break;
573                 case OP_TDB_CHAINLOCK:
574                         try(tdb_chainlock(tdb, op[i].key), op[i].ret);
575                         break;
576                 case OP_TDB_CHAINLOCK_NONBLOCK:
577                         unreliable(tdb_chainlock_nonblock(tdb, op[i].key),
578                                    op[i].ret,
579                                    tdb_chainlock(tdb, op[i].key),
580                                    tdb_chainunlock(tdb, op[i].key));
581                         break;
582                 case OP_TDB_CHAINLOCK_MARK:
583                         try(tdb_chainlock_mark(tdb, op[i].key), op[i].ret);
584                         break;
585                 case OP_TDB_CHAINLOCK_UNMARK:
586                         try(tdb_chainlock_unmark(tdb, op[i].key), op[i].ret);
587                         break;
588                 case OP_TDB_CHAINUNLOCK:
589                         try(tdb_chainunlock(tdb, op[i].key), op[i].ret);
590                         break;
591                 case OP_TDB_CHAINLOCK_READ:
592                         try(tdb_chainlock_read(tdb, op[i].key), op[i].ret);
593                         break;
594                 case OP_TDB_CHAINUNLOCK_READ:
595                         try(tdb_chainunlock_read(tdb, op[i].key), op[i].ret);
596                         break;
597                 case OP_TDB_PARSE_RECORD:
598                         try(tdb_parse_record(tdb, op[i].key, get_len, NULL),
599                             op[i].ret);
600                         break;
601                 case OP_TDB_EXISTS:
602                         try(tdb_exists(tdb, op[i].key), op[i].ret);
603                         break;
604                 case OP_TDB_STORE:
605                         try(tdb_store(tdb, op[i].key, op[i].data, op[i].flag),
606                             op[i].ret < 0 ? op[i].ret : 0);
607                         break;
608                 case OP_TDB_APPEND:
609                         try(tdb_append(tdb, op[i].key, op[i].data),
610                             op[i].ret < 0 ? op[i].ret : 0);
611                         break;
612                 case OP_TDB_GET_SEQNUM:
613                         try(tdb_get_seqnum(tdb), op[i].ret);
614                         break;
615                 case OP_TDB_WIPE_ALL:
616                         try(tdb_wipe_all(tdb), op[i].ret);
617                         break;
618                 case OP_TDB_TRANSACTION_START:
619                         try(tdb_transaction_start(tdb), op[i].ret);
620                         break;
621                 case OP_TDB_TRANSACTION_CANCEL:
622                         try(tdb_transaction_cancel(tdb), op[i].ret);
623                         break;
624                 case OP_TDB_TRANSACTION_COMMIT:
625                         try(tdb_transaction_commit(tdb), op[i].ret);
626                         break;
627                 case OP_TDB_TRAVERSE_READ_START:
628                         i = op_traverse(tdb, pre_fd, filename,
629                                         tdb_traverse_read, op, i);
630                         break;
631                 case OP_TDB_TRAVERSE_START:
632                         i = op_traverse(tdb, pre_fd, filename,
633                                         tdb_traverse, op, i);
634                         break;
635                 case OP_TDB_TRAVERSE:
636                         /* Terminate: we're in a traverse, and we've
637                          * done our ops. */
638                         return i;
639                 case OP_TDB_TRAVERSE_END:
640                         fail(filename, i+1, "unepxected end traverse");
641                 /* FIXME: These must be treated like traverse. */
642                 case OP_TDB_FIRSTKEY:
643                         if (!key_eq(tdb_firstkey(tdb), op[i].data))
644                                 fail(filename, i+1, "bad firstkey");
645                         break;
646                 case OP_TDB_NEXTKEY:
647                         if (!key_eq(tdb_nextkey(tdb, op[i].key), op[i].data))
648                                 fail(filename, i+1, "bad nextkey");
649                         break;
650                 case OP_TDB_FETCH: {
651                         TDB_DATA f = tdb_fetch(tdb, op[i].key);
652                         if (!key_eq(f, op[i].data))
653                                 fail(filename, i+1, "bad fetch %u", f.dsize);
654                         break;
655                 }
656                 case OP_TDB_DELETE:
657                         try(tdb_delete(tdb, op[i].key), op[i].ret);
658                         break;
659                 }
660                 do_post(filename, op, i);
661         }
662         return i;
663 }
664
665 static struct op *load_tracefile(const char *filename, unsigned int *num,
666                                  unsigned int *hashsize,
667                                  unsigned int *tdb_flags,
668                                  unsigned int *open_flags)
669 {
670         unsigned int i;
671         struct op *op = talloc_array(NULL, struct op, 1);
672         char **words;
673         char **lines;
674         char *file;
675
676         file = grab_file(NULL, filename, NULL);
677         if (!file)
678                 err(1, "Reading %s", filename);
679
680         lines = strsplit(file, file, "\n", NULL);
681         if (!lines[0])
682                 errx(1, "%s is empty", filename);
683
684         words = strsplit(lines, lines[0], " ", NULL);
685         if (!streq(words[1], "tdb_open"))
686                 fail(filename, 1, "does not start with tdb_open");
687
688         *hashsize = atoi(words[2]);
689         *tdb_flags = strtoul(words[3], NULL, 0);
690         *open_flags = strtoul(words[4], NULL, 0);
691
692         for (i = 1; lines[i]; i++) {
693                 const struct op_table *opt;
694
695                 words = strsplit(lines, lines[i], " ", NULL);
696                 if (!words[0] || !words[1])
697                         fail(filename, i+1, "Expected serial number and op");
698                
699                 opt = find_keyword(words[1], strlen(words[1]));
700                 if (!opt) {
701                         if (streq(words[1], "tdb_close")) {
702                                 if (lines[i+1])
703                                         fail(filename, i+2,
704                                              "lines after tdb_close");
705                                 *num = i;
706                                 talloc_free(lines);
707                                 return op;
708                         }
709                         fail(filename, i+1, "Unknown operation '%s'", words[1]);
710                 }
711
712                 add_op(filename, &op, i, atoi(words[0]), opt->type);
713                 opt->enhance_op(filename, op, i, words);
714         }
715
716         fprintf(stderr, "%s:%u:last operation is not tdb_close: incomplete?",
717               filename, i);
718         talloc_free(lines);
719         *num = i - 1;
720         return op;
721 }
722
723 /* We remember all the keys we've ever seen, and who has them. */
724 struct key_user {
725         unsigned int file;
726         unsigned int op_num;
727 };
728
729 struct keyinfo {
730         TDB_DATA key;
731         unsigned int num_users;
732         struct key_user *user;
733 };
734
735 static bool changes_db(const struct op *op)
736 {
737         if (op->ret != 0)
738                 return false;
739
740         return op->op == OP_TDB_STORE
741                 || op->op == OP_TDB_APPEND
742                 || op->op == OP_TDB_WIPE_ALL
743                 || op->op == OP_TDB_TRANSACTION_COMMIT
744                 || op->op == OP_TDB_DELETE;
745 }
746
747 static struct keyinfo *hash_ops(struct op *op[], unsigned int num_ops[],
748                                 unsigned int num)
749 {
750         unsigned int i, j, h;
751         struct keyinfo *hash;
752
753         /* Gcc nexted function extension.  How cool is this? */
754         int compare_user_serial(const void *_a, const void *_b)
755         {
756                 const struct key_user *a = _a, *b = _b;
757                 int ret = op[a->file][a->op_num].serial
758                         - op[b->file][b->op_num].serial;
759
760                 /* Fetches don't inc serial, so we put changes first. */
761                 if (ret == 0) {
762                         if (changes_db(&op[a->file][a->op_num])
763                             && !changes_db(&op[b->file][b->op_num]))
764                                 return -1;
765                         if (changes_db(&op[b->file][b->op_num])
766                             && !changes_db(&op[a->file][a->op_num]))
767                                 return 1;
768                 }
769                 return ret;
770         }
771
772         hash = talloc_zero_array(op[0], struct keyinfo, total_keys*2);
773         for (i = 0; i < num; i++) {
774                 for (j = 1; j < num_ops[i]; j++) {
775                         /* We can't do this on allocation, due to realloc. */
776                         list_head_init(&op[i][j].post);
777
778                         if (!op[i][j].key.dptr)
779                                 continue;
780
781                         /* We don't wait for traverse keys */
782                         /* FIXME: We should, for trivial traversals. */
783                         if (op[i][j].op == OP_TDB_TRAVERSE)
784                                 continue;
785
786                         h = hash_key(&op[i][j].key) % (total_keys * 2);
787                         while (!key_eq(hash[h].key, op[i][j].key)) {
788                                 if (!hash[h].key.dptr) {
789                                         hash[h].key = op[i][j].key;
790                                         break;
791                                 }
792                                 h = (h + 1) % (total_keys * 2);
793                         }
794                         /* Might as well save some memory if we can. */
795                         if (op[i][j].key.dptr != hash[h].key.dptr) {
796                                 talloc_free(op[i][j].key.dptr);
797                                 op[i][j].key.dptr = hash[h].key.dptr;
798                         }
799                         hash[h].user = talloc_realloc(hash, hash[h].user,
800                                                      struct key_user,
801                                                      hash[h].num_users+1);
802                         hash[h].user[hash[h].num_users].op_num = j;
803                         hash[h].user[hash[h].num_users].file = i;
804                         hash[h].num_users++;
805                 }
806         }
807
808         /* Now sort into seqnum order. */
809         for (h = 0; h < total_keys * 2; h++)
810                 qsort(hash[h].user, hash[h].num_users, sizeof(hash[h].user[0]),
811                       compare_user_serial);
812
813         return hash;
814 }
815
816 static void add_dependency(void *ctx,
817                            struct op *op[],
818                            unsigned int needs_file,
819                            unsigned int needs_opnum,
820                            unsigned int satisfies_file,
821                            unsigned int satisfies_opnum)
822 {
823         struct depend *post;
824
825         post = talloc(ctx, struct depend);
826         post->file = needs_file;
827         post->op = needs_opnum;
828         list_add(&op[satisfies_file][satisfies_opnum].post, &post->list);
829
830         op[needs_file][needs_opnum].pre++;
831 }
832
833 static void derive_dependencies(char *filename[],
834                                 struct op *op[], unsigned int num_ops[],
835                                 unsigned int num)
836 {
837         struct keyinfo *hash;
838         unsigned int i;
839
840         /* Create hash table for faster key lookup. */
841         hash = hash_ops(op, num_ops, num);
842
843         /* We make the naive assumption that two ops on the same key
844          * have to be ordered; it's overkill. */
845         for (i = 0; i < total_keys * 2; i++) {
846                 unsigned int j;
847
848                 for (j = 1; j < hash[i].num_users; j++) {
849                         /* We don't depend on ourselves. */
850                         if (hash[i].user[j].file == hash[i].user[j-1].file)
851                                 continue;
852 #if DEBUG_DEPS
853                         printf("%s:%u: depends on %s:%u\n",
854                                filename[hash[i].user[j].file],
855                                hash[i].user[j].op_num+1,
856                                filename[hash[i].user[j-1].file],
857                                hash[i].user[j-1].op_num+1);
858 #endif
859                         add_dependency(hash, op,
860                                        hash[i].user[j].file,
861                                        hash[i].user[j].op_num,
862                                        hash[i].user[j-1].file,
863                                        hash[i].user[j-1].op_num);
864                 }
865         }
866 }
867
868 int main(int argc, char *argv[])
869 {
870         struct timeval start, end;
871         unsigned int i, num_ops[argc], hashsize[argc], tdb_flags[argc], open_flags[argc];
872         struct op *op[argc];
873         int fds[2];
874         char c;
875
876         if (argc < 3)
877                 errx(1, "Usage: %s <tdbfile> <tracefile>...", argv[0]);
878
879         pipes = talloc_array(NULL, struct pipe, argc - 2);
880         for (i = 0; i < argc - 2; i++) {
881                 op[i] = load_tracefile(argv[2+i], &num_ops[i], &hashsize[i],
882                                        &tdb_flags[i], &open_flags[i]);
883                 if (pipe(pipes[i].fd) != 0)
884                         err(1, "creating pipe");
885         }
886
887         derive_dependencies(argv+2, op, num_ops, i);
888
889         /* Don't fork for single arg case: simple debugging. */
890         if (argc == 3) {
891                 struct tdb_context *tdb;
892                 tdb = tdb_open_ex(argv[1], hashsize[0], tdb_flags[0],
893                                   open_flags[0], 0600,
894                                   NULL, hash_key);
895                 run_ops(tdb, pipes[0].fd[0], argv[2],
896                         op[0], 1, num_ops[0]);
897                 exit(0);
898         }
899
900         if (pipe(fds) != 0)
901                 err(1, "creating pipe");
902
903         for (i = 0; i < argc - 2; i++) {
904                 struct tdb_context *tdb;
905
906                 switch (fork()) {
907                 case -1:
908                         err(1, "fork failed");
909                 case 0:
910                         close(fds[1]);
911                         tdb = tdb_open_ex(argv[1], hashsize[i], tdb_flags[i],
912                                           open_flags[i], 0600,
913                                           NULL, hash_key);
914                         if (!tdb)
915                                 err(1, "Opening tdb %s", argv[1]);
916
917                         /* This catches parent exiting. */
918                         if (read(fds[0], &c, 1) != 1)
919                                 exit(1);
920                         run_ops(tdb, pipes[i].fd[0], argv[2+i],
921                                 op[i], 1, num_ops[i]);
922                         exit(0);
923                 default:
924                         break;
925                 }
926         }
927
928         /* Let everything settle. */
929         sleep(1);
930
931         gettimeofday(&start, NULL);
932         /* Tell them all to go!  Any write of sufficient length will do. */
933         if (write(fds[1], hashsize, i) != i)
934                 err(1, "Writing to wakeup pipe");
935
936         for (i = 0; i < argc - 2; i++) {
937                 int status;
938                 wait(&status);
939                 if (!WIFEXITED(status))
940                         errx(1, "Child died with signal %i", WTERMSIG(status));
941                 if (WEXITSTATUS(status) != 0)
942                         errx(1, "Child died with error code");
943         }
944         gettimeofday(&end, NULL);
945
946         end.tv_sec -= start.tv_sec;
947         printf("Time replaying: %lu usec\n",
948                end.tv_sec * 1000000UL + (end.tv_usec - start.tv_usec));
949         
950         exit(0);
951 }