tdb2: tdb_repack
[ccan] / ccan / tdb2 / tdb1_transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb1_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb1_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb1_write() calls. The hooked
48     transaction versions of tdb1_read() and tdb1_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb1_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb1_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb1_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
89     tdb1_add_flags() transaction nesting is enabled.
90     The default is that transaction nesting is NOT allowed.
91
92     Beware. when transactions are nested a transaction successfully
93     completed with tdb1_transaction_commit() can be silently unrolled later.
94 */
95
96
97 /*
98   hold the context of any current transaction
99 */
100 struct tdb1_transaction {
101         /* we keep a mirrored copy of the tdb hash heads here so
102            tdb1_next_hash_chain() can operate efficiently */
103         uint32_t *hash_heads;
104
105         /* the original io methods - used to do IOs to the real db */
106         const struct tdb1_methods *io_methods;
107
108         /* the list of transaction blocks. When a block is first
109            written to, it gets created in this list */
110         uint8_t **blocks;
111         uint32_t num_blocks;
112         uint32_t block_size;      /* bytes in each block */
113         uint32_t last_block_size; /* number of valid bytes in the last block */
114
115         /* non-zero when an internal transaction error has
116            occurred. All write operations will then fail until the
117            transaction is ended */
118         int transaction_error;
119
120         /* when inside a transaction we need to keep track of any
121            nested tdb1_transaction_start() calls, as these are allowed,
122            but don't create a new transaction */
123         int nesting;
124
125         /* set when a prepare has already occurred */
126         bool prepared;
127         tdb1_off_t magic_offset;
128
129         /* old file size before transaction */
130         tdb1_len_t old_map_size;
131
132         /* did we expand in this transaction */
133         bool expanded;
134 };
135
136
137 /*
138   read while in a transaction. We need to check first if the data is in our list
139   of transaction elements, then if not do a real read
140 */
141 static int transaction1_read(struct tdb_context *tdb, tdb1_off_t off, void *buf,
142                              tdb1_len_t len, int cv)
143 {
144         uint32_t blk;
145
146         /* break it down into block sized ops */
147         while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
148                 tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
149                 if (transaction1_read(tdb, off, buf, len2, cv) != 0) {
150                         return -1;
151                 }
152                 len -= len2;
153                 off += len2;
154                 buf = (void *)(len2 + (char *)buf);
155         }
156
157         if (len == 0) {
158                 return 0;
159         }
160
161         blk = off / tdb->tdb1.transaction->block_size;
162
163         /* see if we have it in the block list */
164         if (tdb->tdb1.transaction->num_blocks <= blk ||
165             tdb->tdb1.transaction->blocks[blk] == NULL) {
166                 /* nope, do a real read */
167                 if (tdb->tdb1.transaction->io_methods->tdb1_read(tdb, off, buf, len, cv) != 0) {
168                         goto fail;
169                 }
170                 return 0;
171         }
172
173         /* it is in the block list. Now check for the last block */
174         if (blk == tdb->tdb1.transaction->num_blocks-1) {
175                 if (len > tdb->tdb1.transaction->last_block_size) {
176                         goto fail;
177                 }
178         }
179
180         /* now copy it out of this block */
181         memcpy(buf, tdb->tdb1.transaction->blocks[blk] + (off % tdb->tdb1.transaction->block_size), len);
182         if (cv) {
183                 tdb1_convert(buf, len);
184         }
185         return 0;
186
187 fail:
188         tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
189                                 "transaction_read: failed at off=%d len=%d",
190                                 off, len);
191         tdb->tdb1.transaction->transaction_error = 1;
192         return -1;
193 }
194
195
196 /*
197   write while in a transaction
198 */
199 static int transaction1_write(struct tdb_context *tdb, tdb1_off_t off,
200                              const void *buf, tdb1_len_t len)
201 {
202         uint32_t blk;
203
204         /* Only a commit is allowed on a prepared transaction */
205         if (tdb->tdb1.transaction->prepared) {
206                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
207                                         "transaction_write: transaction already"
208                                         " prepared, write not allowed");
209                 tdb->tdb1.transaction->transaction_error = 1;
210                 return -1;
211         }
212
213         /* if the write is to a hash head, then update the transaction
214            hash heads */
215         if (len == sizeof(tdb1_off_t) && off >= TDB1_FREELIST_TOP &&
216             off < TDB1_FREELIST_TOP+TDB1_HASHTABLE_SIZE(tdb)) {
217                 uint32_t chain = (off-TDB1_FREELIST_TOP) / sizeof(tdb1_off_t);
218                 memcpy(&tdb->tdb1.transaction->hash_heads[chain], buf, len);
219         }
220
221         /* break it up into block sized chunks */
222         while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
223                 tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
224                 if (transaction1_write(tdb, off, buf, len2) != 0) {
225                         return -1;
226                 }
227                 len -= len2;
228                 off += len2;
229                 if (buf != NULL) {
230                         buf = (const void *)(len2 + (const char *)buf);
231                 }
232         }
233
234         if (len == 0) {
235                 return 0;
236         }
237
238         blk = off / tdb->tdb1.transaction->block_size;
239         off = off % tdb->tdb1.transaction->block_size;
240
241         if (tdb->tdb1.transaction->num_blocks <= blk) {
242                 uint8_t **new_blocks;
243                 /* expand the blocks array */
244                 if (tdb->tdb1.transaction->blocks == NULL) {
245                         new_blocks = (uint8_t **)malloc(
246                                 (blk+1)*sizeof(uint8_t *));
247                 } else {
248                         new_blocks = (uint8_t **)realloc(
249                                 tdb->tdb1.transaction->blocks,
250                                 (blk+1)*sizeof(uint8_t *));
251                 }
252                 if (new_blocks == NULL) {
253                         tdb->last_error = TDB_ERR_OOM;
254                         goto fail;
255                 }
256                 memset(&new_blocks[tdb->tdb1.transaction->num_blocks], 0,
257                        (1+(blk - tdb->tdb1.transaction->num_blocks))*sizeof(uint8_t *));
258                 tdb->tdb1.transaction->blocks = new_blocks;
259                 tdb->tdb1.transaction->num_blocks = blk+1;
260                 tdb->tdb1.transaction->last_block_size = 0;
261         }
262
263         /* allocate and fill a block? */
264         if (tdb->tdb1.transaction->blocks[blk] == NULL) {
265                 tdb->tdb1.transaction->blocks[blk] = (uint8_t *)calloc(tdb->tdb1.transaction->block_size, 1);
266                 if (tdb->tdb1.transaction->blocks[blk] == NULL) {
267                         tdb->last_error = TDB_ERR_OOM;
268                         tdb->tdb1.transaction->transaction_error = 1;
269                         return -1;
270                 }
271                 if (tdb->tdb1.transaction->old_map_size > blk * tdb->tdb1.transaction->block_size) {
272                         tdb1_len_t len2 = tdb->tdb1.transaction->block_size;
273                         if (len2 + (blk * tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->old_map_size) {
274                                 len2 = tdb->tdb1.transaction->old_map_size - (blk * tdb->tdb1.transaction->block_size);
275                         }
276                         if (tdb->tdb1.transaction->io_methods->tdb1_read(tdb, blk * tdb->tdb1.transaction->block_size,
277                                                                    tdb->tdb1.transaction->blocks[blk],
278                                                                    len2, 0) != 0) {
279                                 SAFE_FREE(tdb->tdb1.transaction->blocks[blk]);
280                                 tdb->last_error = TDB_ERR_IO;
281                                 goto fail;
282                         }
283                         if (blk == tdb->tdb1.transaction->num_blocks-1) {
284                                 tdb->tdb1.transaction->last_block_size = len2;
285                         }
286                 }
287         }
288
289         /* overwrite part of an existing block */
290         if (buf == NULL) {
291                 memset(tdb->tdb1.transaction->blocks[blk] + off, 0, len);
292         } else {
293                 memcpy(tdb->tdb1.transaction->blocks[blk] + off, buf, len);
294         }
295         if (blk == tdb->tdb1.transaction->num_blocks-1) {
296                 if (len + off > tdb->tdb1.transaction->last_block_size) {
297                         tdb->tdb1.transaction->last_block_size = len + off;
298                 }
299         }
300
301         return 0;
302
303 fail:
304         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
305                    "transaction_write: failed at off=%d len=%d",
306                    (blk*tdb->tdb1.transaction->block_size) + off, len);
307         tdb->tdb1.transaction->transaction_error = 1;
308         return -1;
309 }
310
311
312 /*
313   write while in a transaction - this varient never expands the transaction blocks, it only
314   updates existing blocks. This means it cannot change the recovery size
315 */
316 static int transaction1_write_existing(struct tdb_context *tdb, tdb1_off_t off,
317                                       const void *buf, tdb1_len_t len)
318 {
319         uint32_t blk;
320
321         /* break it up into block sized chunks */
322         while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
323                 tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
324                 if (transaction1_write_existing(tdb, off, buf, len2) != 0) {
325                         return -1;
326                 }
327                 len -= len2;
328                 off += len2;
329                 if (buf != NULL) {
330                         buf = (const void *)(len2 + (const char *)buf);
331                 }
332         }
333
334         if (len == 0) {
335                 return 0;
336         }
337
338         blk = off / tdb->tdb1.transaction->block_size;
339         off = off % tdb->tdb1.transaction->block_size;
340
341         if (tdb->tdb1.transaction->num_blocks <= blk ||
342             tdb->tdb1.transaction->blocks[blk] == NULL) {
343                 return 0;
344         }
345
346         if (blk == tdb->tdb1.transaction->num_blocks-1 &&
347             off + len > tdb->tdb1.transaction->last_block_size) {
348                 if (off >= tdb->tdb1.transaction->last_block_size) {
349                         return 0;
350                 }
351                 len = tdb->tdb1.transaction->last_block_size - off;
352         }
353
354         /* overwrite part of an existing block */
355         memcpy(tdb->tdb1.transaction->blocks[blk] + off, buf, len);
356
357         return 0;
358 }
359
360
361 /*
362   accelerated hash chain head search, using the cached hash heads
363 */
364 static void transaction1_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
365 {
366         uint32_t h = *chain;
367         for (;h < tdb->tdb1.header.hash_size;h++) {
368                 /* the +1 takes account of the freelist */
369                 if (0 != tdb->tdb1.transaction->hash_heads[h+1]) {
370                         break;
371                 }
372         }
373         (*chain) = h;
374 }
375
376 /*
377   out of bounds check during a transaction
378 */
379 static int transaction1_oob(struct tdb_context *tdb, tdb1_off_t len, int probe)
380 {
381         if (len <= tdb->file->map_size) {
382                 return 0;
383         }
384         tdb->last_error = TDB_ERR_IO;
385         return -1;
386 }
387
388 /*
389   transaction version of tdb1_expand().
390 */
391 static int transaction1_expand_file(struct tdb_context *tdb, tdb1_off_t size,
392                                     tdb1_off_t addition)
393 {
394         /* add a write to the transaction elements, so subsequent
395            reads see the zero data */
396         if (transaction1_write(tdb, size, NULL, addition) != 0) {
397                 return -1;
398         }
399
400         tdb->tdb1.transaction->expanded = true;
401
402         return 0;
403 }
404
405 static const struct tdb1_methods transaction1_methods = {
406         transaction1_read,
407         transaction1_write,
408         transaction1_next_hash_chain,
409         transaction1_oob,
410         transaction1_expand_file,
411 };
412
413
414 /*
415   start a tdb transaction. No token is returned, as only a single
416   transaction is allowed to be pending per tdb_context
417 */
418 static int _tdb1_transaction_start(struct tdb_context *tdb)
419 {
420         /* some sanity checks */
421         if ((tdb->flags & TDB_RDONLY) || (tdb->flags & TDB_INTERNAL) || tdb->tdb1.traverse_read) {
422                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
423                                         "tdb1_transaction_start: cannot start a"
424                                         " transaction on a read-only or"
425                                         " internal db");
426                 return -1;
427         }
428
429         /* cope with nested tdb1_transaction_start() calls */
430         if (tdb->tdb1.transaction != NULL) {
431                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
432                         tdb->last_error = TDB_ERR_EINVAL;
433                         return -1;
434                 }
435                 tdb->tdb1.transaction->nesting++;
436                 return 0;
437         }
438
439         if (tdb1_have_extra_locks(tdb)) {
440                 /* the caller must not have any locks when starting a
441                    transaction as otherwise we'll be screwed by lack
442                    of nested locks in posix */
443                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
444                                         "tdb1_transaction_start: cannot start a"
445                                         " transaction with locks held");
446                 return -1;
447         }
448
449         if (tdb->tdb1.travlocks.next != NULL) {
450                 /* you cannot use transactions inside a traverse (although you can use
451                    traverse inside a transaction) as otherwise you can end up with
452                    deadlock */
453                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
454                                         "tdb1_transaction_start: cannot start a"
455                                         " transaction within a traverse");
456                 return -1;
457         }
458
459         tdb->tdb1.transaction = (struct tdb1_transaction *)
460                 calloc(sizeof(struct tdb1_transaction), 1);
461         if (tdb->tdb1.transaction == NULL) {
462                 tdb->last_error = TDB_ERR_OOM;
463                 return -1;
464         }
465
466         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
467         tdb->tdb1.transaction->block_size = tdb->tdb1.page_size;
468
469         /* get the transaction write lock. This is a blocking lock. As
470            discussed with Volker, there are a number of ways we could
471            make this async, which we will probably do in the future */
472         if (tdb1_transaction_lock(tdb, F_WRLCK, TDB_LOCK_WAIT) == -1) {
473                 SAFE_FREE(tdb->tdb1.transaction->blocks);
474                 SAFE_FREE(tdb->tdb1.transaction);
475                 return -1;
476         }
477
478         /* get a read lock from the freelist to the end of file. This
479            is upgraded to a write lock during the commit */
480         if (tdb1_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
481                 if (errno != EAGAIN && errno != EINTR) {
482                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
483                                    "tdb1_transaction_start:"
484                                    " failed to get hash locks");
485                 }
486                 goto fail_allrecord_lock;
487         }
488
489         /* setup a copy of the hash table heads so the hash scan in
490            traverse can be fast */
491         tdb->tdb1.transaction->hash_heads = (uint32_t *)
492                 calloc(tdb->tdb1.header.hash_size+1, sizeof(uint32_t));
493         if (tdb->tdb1.transaction->hash_heads == NULL) {
494                 tdb->last_error = TDB_ERR_OOM;
495                 goto fail;
496         }
497         if (tdb->tdb1.io->tdb1_read(tdb, TDB1_FREELIST_TOP, tdb->tdb1.transaction->hash_heads,
498                                    TDB1_HASHTABLE_SIZE(tdb), 0) != 0) {
499                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
500                            "tdb1_transaction_start: failed to read hash heads");
501                 goto fail;
502         }
503
504         /* make sure we know about any file expansions already done by
505            anyone else */
506         tdb->tdb1.io->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
507         tdb->tdb1.transaction->old_map_size = tdb->file->map_size;
508
509         /* finally hook the io methods, replacing them with
510            transaction specific methods */
511         tdb->tdb1.transaction->io_methods = tdb->tdb1.io;
512         tdb->tdb1.io = &transaction1_methods;
513
514         return 0;
515
516 fail:
517         tdb1_allrecord_unlock(tdb, F_RDLCK);
518 fail_allrecord_lock:
519         tdb1_transaction_unlock(tdb, F_WRLCK);
520         SAFE_FREE(tdb->tdb1.transaction->blocks);
521         SAFE_FREE(tdb->tdb1.transaction->hash_heads);
522         SAFE_FREE(tdb->tdb1.transaction);
523         return -1;
524 }
525
526 int tdb1_transaction_start(struct tdb_context *tdb)
527 {
528         return _tdb1_transaction_start(tdb);
529 }
530
531 /*
532   sync to disk
533 */
534 static int transaction1_sync(struct tdb_context *tdb, tdb1_off_t offset, tdb1_len_t length)
535 {
536         if (tdb->flags & TDB_NOSYNC) {
537                 return 0;
538         }
539
540 #if HAVE_FDATASYNC
541         if (fdatasync(tdb->file->fd) != 0) {
542 #else
543         if (fsync(tdb->file->fd) != 0) {
544 #endif
545                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
546                                         "tdb1_transaction: fsync failed");
547                 return -1;
548         }
549 #if HAVE_MMAP
550         if (tdb->file->map_ptr) {
551                 tdb1_off_t moffset = offset & ~(tdb->tdb1.page_size-1);
552                 if (msync(moffset + (char *)tdb->file->map_ptr,
553                           length + (offset - moffset), MS_SYNC) != 0) {
554                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
555                                                 "tdb1_transaction:"
556                                                 " msync failed - %s",
557                                                 strerror(errno));
558                         return -1;
559                 }
560         }
561 #endif
562         return 0;
563 }
564
565
566 static int _tdb1_transaction_cancel(struct tdb_context *tdb)
567 {
568         int i, ret = 0;
569
570         if (tdb->tdb1.transaction == NULL) {
571                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
572                                         "tdb1_transaction_cancel:"
573                                         " no transaction");
574                 return -1;
575         }
576
577         if (tdb->tdb1.transaction->nesting != 0) {
578                 tdb->tdb1.transaction->transaction_error = 1;
579                 tdb->tdb1.transaction->nesting--;
580                 return 0;
581         }
582
583         tdb->file->map_size = tdb->tdb1.transaction->old_map_size;
584
585         /* free all the transaction blocks */
586         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
587                 if (tdb->tdb1.transaction->blocks[i] != NULL) {
588                         free(tdb->tdb1.transaction->blocks[i]);
589                 }
590         }
591         SAFE_FREE(tdb->tdb1.transaction->blocks);
592
593         if (tdb->tdb1.transaction->magic_offset) {
594                 const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
595                 const uint32_t invalid = TDB1_RECOVERY_INVALID_MAGIC;
596
597                 /* remove the recovery marker */
598                 if (methods->tdb1_write(tdb, tdb->tdb1.transaction->magic_offset, &invalid, 4) == -1 ||
599                 transaction1_sync(tdb, tdb->tdb1.transaction->magic_offset, 4) == -1) {
600                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
601                                    "tdb1_transaction_cancel: failed to"
602                                    " remove recovery magic");
603                         ret = -1;
604                 }
605         }
606
607         /* This also removes the OPEN_LOCK, if we have it. */
608         tdb1_release_transaction_locks(tdb);
609
610         /* restore the normal io methods */
611         tdb->tdb1.io = tdb->tdb1.transaction->io_methods;
612
613         SAFE_FREE(tdb->tdb1.transaction->hash_heads);
614         SAFE_FREE(tdb->tdb1.transaction);
615
616         return ret;
617 }
618
619 /*
620   cancel the current transaction
621 */
622 int tdb1_transaction_cancel(struct tdb_context *tdb)
623 {
624         return _tdb1_transaction_cancel(tdb);
625 }
626
627 /*
628   work out how much space the linearised recovery data will consume
629 */
630 static tdb1_len_t tdb1_recovery_size(struct tdb_context *tdb)
631 {
632         tdb1_len_t recovery_size = 0;
633         int i;
634
635         recovery_size = sizeof(uint32_t);
636         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
637                 if (i * tdb->tdb1.transaction->block_size >= tdb->tdb1.transaction->old_map_size) {
638                         break;
639                 }
640                 if (tdb->tdb1.transaction->blocks[i] == NULL) {
641                         continue;
642                 }
643                 recovery_size += 2*sizeof(tdb1_off_t);
644                 if (i == tdb->tdb1.transaction->num_blocks-1) {
645                         recovery_size += tdb->tdb1.transaction->last_block_size;
646                 } else {
647                         recovery_size += tdb->tdb1.transaction->block_size;
648                 }
649         }
650
651         return recovery_size;
652 }
653
654 int tdb1_recovery_area(struct tdb_context *tdb,
655                       const struct tdb1_methods *methods,
656                       tdb1_off_t *recovery_offset,
657                       struct tdb1_record *rec)
658 {
659         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, recovery_offset) == -1) {
660                 return -1;
661         }
662
663         if (*recovery_offset == 0) {
664                 rec->rec_len = 0;
665                 return 0;
666         }
667
668         if (methods->tdb1_read(tdb, *recovery_offset, rec, sizeof(*rec),
669                               TDB1_DOCONV()) == -1) {
670                 return -1;
671         }
672
673         /* ignore invalid recovery regions: can happen in crash */
674         if (rec->magic != TDB1_RECOVERY_MAGIC &&
675             rec->magic != TDB1_RECOVERY_INVALID_MAGIC) {
676                 *recovery_offset = 0;
677                 rec->rec_len = 0;
678         }
679         return 0;
680 }
681
682 /*
683   allocate the recovery area, or use an existing recovery area if it is
684   large enough
685 */
686 static int tdb1_recovery_allocate(struct tdb_context *tdb,
687                                  tdb1_len_t *recovery_size,
688                                  tdb1_off_t *recovery_offset,
689                                  tdb1_len_t *recovery_max_size)
690 {
691         struct tdb1_record rec;
692         const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
693         tdb1_off_t recovery_head;
694
695         if (tdb1_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
696                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
697                            "tdb1_recovery_allocate:"
698                            " failed to read recovery head");
699                 return -1;
700         }
701
702         *recovery_size = tdb1_recovery_size(tdb);
703
704         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
705                 /* it fits in the existing area */
706                 *recovery_max_size = rec.rec_len;
707                 *recovery_offset = recovery_head;
708                 return 0;
709         }
710
711         /* we need to free up the old recovery area, then allocate a
712            new one at the end of the file. Note that we cannot use
713            tdb1_allocate() to allocate the new one as that might return
714            us an area that is being currently used (as of the start of
715            the transaction) */
716         if (recovery_head != 0) {
717                 if (tdb1_free(tdb, recovery_head, &rec) == -1) {
718                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
719                                    "tdb1_recovery_allocate: failed to free"
720                                    " previous recovery area");
721                         return -1;
722                 }
723         }
724
725         /* the tdb1_free() call might have increased the recovery size */
726         *recovery_size = tdb1_recovery_size(tdb);
727
728         /* round up to a multiple of page size */
729         *recovery_max_size = TDB1_ALIGN(sizeof(rec) + *recovery_size,
730                                         tdb->tdb1.page_size) - sizeof(rec);
731         *recovery_offset = tdb->file->map_size;
732         recovery_head = *recovery_offset;
733
734         if (methods->tdb1_expand_file(tdb, tdb->tdb1.transaction->old_map_size,
735                                      (tdb->file->map_size - tdb->tdb1.transaction->old_map_size) +
736                                      sizeof(rec) + *recovery_max_size) == -1) {
737                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
738                            "tdb1_recovery_allocate:"
739                            " failed to create recovery area");
740                 return -1;
741         }
742
743         /* remap the file (if using mmap) */
744         methods->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
745
746         /* we have to reset the old map size so that we don't try to expand the file
747            again in the transaction commit, which would destroy the recovery area */
748         tdb->tdb1.transaction->old_map_size = tdb->file->map_size;
749
750         /* write the recovery header offset and sync - we can sync without a race here
751            as the magic ptr in the recovery record has not been set */
752         TDB1_CONV(recovery_head);
753         if (methods->tdb1_write(tdb, TDB1_RECOVERY_HEAD,
754                                &recovery_head, sizeof(tdb1_off_t)) == -1) {
755                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
756                            "tdb1_recovery_allocate:"
757                            " failed to write recovery head");
758                 return -1;
759         }
760         if (transaction1_write_existing(tdb, TDB1_RECOVERY_HEAD, &recovery_head, sizeof(tdb1_off_t)) == -1) {
761                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
762                            "tdb1_recovery_allocate:"
763                            " failed to write recovery head");
764                 return -1;
765         }
766
767         return 0;
768 }
769
770
771 /*
772   setup the recovery data that will be used on a crash during commit
773 */
774 static int transaction1_setup_recovery(struct tdb_context *tdb,
775                                        tdb1_off_t *magic_offset)
776 {
777         tdb1_len_t recovery_size;
778         unsigned char *data, *p;
779         const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
780         struct tdb1_record *rec;
781         tdb1_off_t recovery_offset, recovery_max_size;
782         tdb1_off_t old_map_size = tdb->tdb1.transaction->old_map_size;
783         uint32_t magic, tailer;
784         int i;
785
786         /*
787           check that the recovery area has enough space
788         */
789         if (tdb1_recovery_allocate(tdb, &recovery_size,
790                                   &recovery_offset, &recovery_max_size) == -1) {
791                 return -1;
792         }
793
794         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
795         if (data == NULL) {
796                 tdb->last_error = TDB_ERR_OOM;
797                 return -1;
798         }
799
800         rec = (struct tdb1_record *)data;
801         memset(rec, 0, sizeof(*rec));
802
803         rec->magic    = TDB1_RECOVERY_INVALID_MAGIC;
804         rec->data_len = recovery_size;
805         rec->rec_len  = recovery_max_size;
806         rec->key_len  = old_map_size;
807         TDB1_CONV(*rec);
808
809         /* build the recovery data into a single blob to allow us to do a single
810            large write, which should be more efficient */
811         p = data + sizeof(*rec);
812         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
813                 tdb1_off_t offset;
814                 tdb1_len_t length;
815
816                 if (tdb->tdb1.transaction->blocks[i] == NULL) {
817                         continue;
818                 }
819
820                 offset = i * tdb->tdb1.transaction->block_size;
821                 length = tdb->tdb1.transaction->block_size;
822                 if (i == tdb->tdb1.transaction->num_blocks-1) {
823                         length = tdb->tdb1.transaction->last_block_size;
824                 }
825
826                 if (offset >= old_map_size) {
827                         continue;
828                 }
829                 if (offset + length > tdb->tdb1.transaction->old_map_size) {
830                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT,
831                                                 TDB_LOG_ERROR,
832                                                 "tdb1_transaction_setup_recovery: transaction data over new region boundary");
833                         free(data);
834                         return -1;
835                 }
836                 memcpy(p, &offset, 4);
837                 memcpy(p+4, &length, 4);
838                 if (TDB1_DOCONV()) {
839                         tdb1_convert(p, 8);
840                 }
841                 /* the recovery area contains the old data, not the
842                    new data, so we have to call the original tdb1_read
843                    method to get it */
844                 if (methods->tdb1_read(tdb, offset, p + 8, length, 0) != 0) {
845                         free(data);
846                         tdb->last_error = TDB_ERR_IO;
847                         return -1;
848                 }
849                 p += 8 + length;
850         }
851
852         /* and the tailer */
853         tailer = sizeof(*rec) + recovery_max_size;
854         memcpy(p, &tailer, 4);
855         if (TDB1_DOCONV()) {
856                 tdb1_convert(p, 4);
857         }
858
859         /* write the recovery data to the recovery area */
860         if (methods->tdb1_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
861                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
862                            "tdb1_transaction_setup_recovery:"
863                            " failed to write recovery data");
864                 free(data);
865                 return -1;
866         }
867         if (transaction1_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
868                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
869                            "tdb1_transaction_setup_recovery: failed to write"
870                            " secondary recovery data");
871                 free(data);
872                 return -1;
873         }
874
875         /* as we don't have ordered writes, we have to sync the recovery
876            data before we update the magic to indicate that the recovery
877            data is present */
878         if (transaction1_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
879                 free(data);
880                 return -1;
881         }
882
883         free(data);
884
885         magic = TDB1_RECOVERY_MAGIC;
886         TDB1_CONV(magic);
887
888         *magic_offset = recovery_offset + offsetof(struct tdb1_record, magic);
889
890         if (methods->tdb1_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
891                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
892                            "tdb1_transaction_setup_recovery:"
893                            " failed to write recovery magic");
894                 return -1;
895         }
896         if (transaction1_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
897                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
898                            "tdb1_transaction_setup_recovery:"
899                            " failed to write secondary recovery magic");
900                 return -1;
901         }
902
903         /* ensure the recovery magic marker is on disk */
904         if (transaction1_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
905                 return -1;
906         }
907
908         return 0;
909 }
910
911 static int _tdb1_transaction_prepare_commit(struct tdb_context *tdb)
912 {
913         const struct tdb1_methods *methods;
914
915         if (tdb->tdb1.transaction == NULL) {
916                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
917                                         "tdb1_transaction_prepare_commit:"
918                                         " no transaction");
919                 return -1;
920         }
921
922         if (tdb->tdb1.transaction->prepared) {
923                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
924                                         "tdb1_transaction_prepare_commit:"
925                                         " transaction already prepared");
926                 _tdb1_transaction_cancel(tdb);
927                 return -1;
928         }
929
930         if (tdb->tdb1.transaction->transaction_error) {
931                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
932                                         "tdb1_transaction_prepare_commit:"
933                                         " transaction error pending");
934                 _tdb1_transaction_cancel(tdb);
935                 return -1;
936         }
937
938
939         if (tdb->tdb1.transaction->nesting != 0) {
940                 return 0;
941         }
942
943         /* check for a null transaction */
944         if (tdb->tdb1.transaction->blocks == NULL) {
945                 return 0;
946         }
947
948         methods = tdb->tdb1.transaction->io_methods;
949
950         /* if there are any locks pending then the caller has not
951            nested their locks properly, so fail the transaction */
952         if (tdb1_have_extra_locks(tdb)) {
953                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
954                                         "tdb1_transaction_prepare_commit:"
955                                         " locks pending on commit");
956                 _tdb1_transaction_cancel(tdb);
957                 return -1;
958         }
959
960         /* upgrade the main transaction lock region to a write lock */
961         if (tdb1_allrecord_upgrade(tdb) == -1) {
962                 if (errno != EAGAIN && errno != EINTR) {
963                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
964                                    "tdb1_transaction_prepare_commit:"
965                                    " failed to upgrade hash locks");
966                 }
967                 return -1;
968         }
969
970         /* get the open lock - this prevents new users attaching to the database
971            during the commit */
972         if (tdb1_nest_lock(tdb, TDB1_OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
973                 if (errno != EAGAIN && errno != EINTR) {
974                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
975                                    "tdb1_transaction_prepare_commit:"
976                                    " failed to get open lock");
977                 }
978                 return -1;
979         }
980
981         if (!(tdb->flags & TDB_NOSYNC)) {
982                 /* write the recovery data to the end of the file */
983                 if (transaction1_setup_recovery(tdb, &tdb->tdb1.transaction->magic_offset) == -1) {
984                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
985                                    "tdb1_transaction_prepare_commit:"
986                                    " failed to setup recovery data");
987                         return -1;
988                 }
989         }
990
991         tdb->tdb1.transaction->prepared = true;
992
993         /* expand the file to the new size if needed */
994         if (tdb->file->map_size != tdb->tdb1.transaction->old_map_size) {
995                 if (methods->tdb1_expand_file(tdb, tdb->tdb1.transaction->old_map_size,
996                                              tdb->file->map_size -
997                                              tdb->tdb1.transaction->old_map_size) == -1) {
998                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
999                                    "tdb1_transaction_prepare_commit:"
1000                                    " expansion failed");
1001                         return -1;
1002                 }
1003                 tdb->file->map_size = tdb->tdb1.transaction->old_map_size;
1004                 methods->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
1005         }
1006
1007         /* Keep the open lock until the actual commit */
1008
1009         return 0;
1010 }
1011
1012 /*
1013    prepare to commit the current transaction
1014 */
1015 int tdb1_transaction_prepare_commit(struct tdb_context *tdb)
1016 {
1017         return _tdb1_transaction_prepare_commit(tdb);
1018 }
1019
1020 /* A repack is worthwhile if the largest is less than half total free. */
1021 static bool repack_worthwhile(struct tdb_context *tdb)
1022 {
1023         tdb1_off_t ptr;
1024         struct tdb1_record rec;
1025         tdb1_len_t total = 0, largest = 0;
1026
1027         if (tdb1_ofs_read(tdb, TDB1_FREELIST_TOP, &ptr) == -1) {
1028                 return false;
1029         }
1030
1031         while (ptr != 0 && tdb1_rec_free_read(tdb, ptr, &rec) == 0) {
1032                 total += rec.rec_len;
1033                 if (rec.rec_len > largest) {
1034                         largest = rec.rec_len;
1035                 }
1036                 ptr = rec.next;
1037         }
1038
1039         return total > largest * 2;
1040 }
1041
1042 /*
1043   commit the current transaction
1044 */
1045 int tdb1_transaction_commit(struct tdb_context *tdb)
1046 {
1047         const struct tdb1_methods *methods;
1048         int i;
1049         bool need_repack = false;
1050
1051         if (tdb->tdb1.transaction == NULL) {
1052                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
1053                                         "tdb1_transaction_commit:"
1054                                         " no transaction");
1055                 return -1;
1056         }
1057
1058         if (tdb->tdb1.transaction->transaction_error) {
1059                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
1060                                         "tdb1_transaction_commit:"
1061                                         " transaction error pending");
1062                 _tdb1_transaction_cancel(tdb);
1063                 return -1;
1064         }
1065
1066
1067         if (tdb->tdb1.transaction->nesting != 0) {
1068                 tdb->tdb1.transaction->nesting--;
1069                 return 0;
1070         }
1071
1072         /* check for a null transaction */
1073         if (tdb->tdb1.transaction->blocks == NULL) {
1074                 _tdb1_transaction_cancel(tdb);
1075                 return 0;
1076         }
1077
1078         if (!tdb->tdb1.transaction->prepared) {
1079                 int ret = _tdb1_transaction_prepare_commit(tdb);
1080                 if (ret) {
1081                         _tdb1_transaction_cancel(tdb);
1082                         return ret;
1083                 }
1084         }
1085
1086         methods = tdb->tdb1.transaction->io_methods;
1087
1088         /* perform all the writes */
1089         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
1090                 tdb1_off_t offset;
1091                 tdb1_len_t length;
1092
1093                 if (tdb->tdb1.transaction->blocks[i] == NULL) {
1094                         continue;
1095                 }
1096
1097                 offset = i * tdb->tdb1.transaction->block_size;
1098                 length = tdb->tdb1.transaction->block_size;
1099                 if (i == tdb->tdb1.transaction->num_blocks-1) {
1100                         length = tdb->tdb1.transaction->last_block_size;
1101                 }
1102
1103                 if (methods->tdb1_write(tdb, offset, tdb->tdb1.transaction->blocks[i], length) == -1) {
1104                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1105                                    "tdb1_transaction_commit:"
1106                                    " write failed during commit");
1107
1108                         /* we've overwritten part of the data and
1109                            possibly expanded the file, so we need to
1110                            run the crash recovery code */
1111                         tdb->tdb1.io = methods;
1112                         tdb1_transaction_recover(tdb);
1113
1114                         _tdb1_transaction_cancel(tdb);
1115
1116                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1117                                    "tdb1_transaction_commit: write failed");
1118                         return -1;
1119                 }
1120                 SAFE_FREE(tdb->tdb1.transaction->blocks[i]);
1121         }
1122
1123         /* Do this before we drop lock or blocks. */
1124         if (tdb->tdb1.transaction->expanded) {
1125                 need_repack = repack_worthwhile(tdb);
1126         }
1127
1128         SAFE_FREE(tdb->tdb1.transaction->blocks);
1129         tdb->tdb1.transaction->num_blocks = 0;
1130
1131         /* ensure the new data is on disk */
1132         if (transaction1_sync(tdb, 0, tdb->file->map_size) == -1) {
1133                 return -1;
1134         }
1135
1136         /*
1137           TODO: maybe write to some dummy hdr field, or write to magic
1138           offset without mmap, before the last sync, instead of the
1139           utime() call
1140         */
1141
1142         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1143            don't change the mtime of the file, this means the file may
1144            not be backed up (as tdb rounding to block sizes means that
1145            file size changes are quite rare too). The following forces
1146            mtime changes when a transaction completes */
1147 #if HAVE_UTIME
1148         utime(tdb->name, NULL);
1149 #endif
1150
1151         /* use a transaction cancel to free memory and remove the
1152            transaction locks */
1153         _tdb1_transaction_cancel(tdb);
1154
1155         if (need_repack) {
1156                 return tdb_repack(tdb);
1157         }
1158
1159         return 0;
1160 }
1161
1162
1163 /*
1164   recover from an aborted transaction. Must be called with exclusive
1165   database write access already established (including the open
1166   lock to prevent new processes attaching)
1167 */
1168 int tdb1_transaction_recover(struct tdb_context *tdb)
1169 {
1170         tdb1_off_t recovery_head, recovery_eof;
1171         unsigned char *data, *p;
1172         uint32_t zero = 0;
1173         struct tdb1_record rec;
1174
1175         /* find the recovery area */
1176         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1177                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1178                            "tdb1_transaction_recover:"
1179                            " failed to read recovery head");
1180                 return -1;
1181         }
1182
1183         if (recovery_head == 0) {
1184                 /* we have never allocated a recovery record */
1185                 return 0;
1186         }
1187
1188         /* read the recovery record */
1189         if (tdb->tdb1.io->tdb1_read(tdb, recovery_head, &rec,
1190                                    sizeof(rec), TDB1_DOCONV()) == -1) {
1191                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1192                            "tdb1_transaction_recover:"
1193                            " failed to read recovery record");
1194                 return -1;
1195         }
1196
1197         if (rec.magic != TDB1_RECOVERY_MAGIC) {
1198                 /* there is no valid recovery data */
1199                 return 0;
1200         }
1201
1202         if (tdb->flags & TDB_RDONLY) {
1203                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1204                                         "tdb1_transaction_recover:"
1205                                         " attempt to recover read only"
1206                                         " database");
1207                 return -1;
1208         }
1209
1210         recovery_eof = rec.key_len;
1211
1212         data = (unsigned char *)malloc(rec.data_len);
1213         if (data == NULL) {
1214                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1215                                         "tdb1_transaction_recover:"
1216                                         " failed to allocate recovery data");
1217                 return -1;
1218         }
1219
1220         /* read the full recovery data */
1221         if (tdb->tdb1.io->tdb1_read(tdb, recovery_head + sizeof(rec), data,
1222                                    rec.data_len, 0) == -1) {
1223                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1224                            "tdb1_transaction_recover:"
1225                            " failed to read recovery data");
1226                 return -1;
1227         }
1228
1229         /* recover the file data */
1230         p = data;
1231         while (p+8 < data + rec.data_len) {
1232                 uint32_t ofs, len;
1233                 if (TDB1_DOCONV()) {
1234                         tdb1_convert(p, 8);
1235                 }
1236                 memcpy(&ofs, p, 4);
1237                 memcpy(&len, p+4, 4);
1238
1239                 if (tdb->tdb1.io->tdb1_write(tdb, ofs, p+8, len) == -1) {
1240                         free(data);
1241                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1242                                    "tdb1_transaction_recover: failed to recover"
1243                                    " %d bytes at offset %d", len, ofs);
1244                         return -1;
1245                 }
1246                 p += 8 + len;
1247         }
1248
1249         free(data);
1250
1251         if (transaction1_sync(tdb, 0, tdb->file->map_size) == -1) {
1252                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1253                            "tdb1_transaction_recover: failed to sync recovery");
1254                 return -1;
1255         }
1256
1257         /* if the recovery area is after the recovered eof then remove it */
1258         if (recovery_eof <= recovery_head) {
1259                 if (tdb1_ofs_write(tdb, TDB1_RECOVERY_HEAD, &zero) == -1) {
1260                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1261                                    "tdb1_transaction_recover: failed to remove"
1262                                    " recovery head");
1263                         return -1;
1264                 }
1265         }
1266
1267         /* remove the recovery magic */
1268         if (tdb1_ofs_write(tdb, recovery_head + offsetof(struct tdb1_record, magic),
1269                           &zero) == -1) {
1270                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1271                            "tdb1_transaction_recover: failed to remove"
1272                            " recovery magic");
1273                 return -1;
1274         }
1275
1276         if (transaction1_sync(tdb, 0, recovery_eof) == -1) {
1277                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1278                            "tdb1_transaction_recover:"
1279                            " failed to sync2 recovery");
1280                 return -1;
1281         }
1282
1283         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1284                    "tdb1_transaction_recover: recovered %d byte database",
1285                    recovery_eof);
1286
1287         /* all done */
1288         return 0;
1289 }
1290
1291 /* Any I/O failures we say "needs recovery". */
1292 bool tdb1_needs_recovery(struct tdb_context *tdb)
1293 {
1294         tdb1_off_t recovery_head;
1295         struct tdb1_record rec;
1296
1297         /* find the recovery area */
1298         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1299                 return true;
1300         }
1301
1302         if (recovery_head == 0) {
1303                 /* we have never allocated a recovery record */
1304                 return false;
1305         }
1306
1307         /* read the recovery record */
1308         if (tdb->tdb1.io->tdb1_read(tdb, recovery_head, &rec,
1309                                    sizeof(rec), TDB1_DOCONV()) == -1) {
1310                 return true;
1311         }
1312
1313         return (rec.magic == TDB1_RECOVERY_MAGIC);
1314 }