]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
eab00e0937778b51ce79d7b48de26daf94c93c78
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free((void *)x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in POSIX locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is canceled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or canceled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* the original io methods - used to do IOs to the real db */
96         const struct tdb_methods *io_methods;
97
98         /* the list of transaction blocks. When a block is first
99            written to, it gets created in this list */
100         uint8_t **blocks;
101         size_t num_blocks;
102         size_t last_block_size; /* number of valid bytes in the last block */
103
104         /* non-zero when an internal transaction error has
105            occurred. All write operations will then fail until the
106            transaction is ended */
107         int transaction_error;
108
109         /* when inside a transaction we need to keep track of any
110            nested tdb_transaction_start() calls, as these are allowed,
111            but don't create a new transaction */
112         unsigned int nesting;
113
114         /* set when a prepare has already occurred */
115         bool prepared;
116         tdb_off_t magic_offset;
117
118         /* old file size before transaction */
119         tdb_len_t old_map_size;
120 };
121
122 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
123 #define PAGESIZE 65536
124
125 /*
126   read while in a transaction. We need to check first if the data is in our list
127   of transaction elements, then if not do a real read
128 */
129 static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off,
130                                        void *buf, tdb_len_t len)
131 {
132         size_t blk;
133         enum TDB_ERROR ecode;
134
135         /* break it down into block sized ops */
136         while (len + (off % PAGESIZE) > PAGESIZE) {
137                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
138                 ecode = transaction_read(tdb, off, buf, len2);
139                 if (ecode != TDB_SUCCESS) {
140                         return ecode;
141                 }
142                 len -= len2;
143                 off += len2;
144                 buf = (void *)(len2 + (char *)buf);
145         }
146
147         if (len == 0) {
148                 return TDB_SUCCESS;
149         }
150
151         blk = off / PAGESIZE;
152
153         /* see if we have it in the block list */
154         if (tdb->tdb2.transaction->num_blocks <= blk ||
155             tdb->tdb2.transaction->blocks[blk] == NULL) {
156                 /* nope, do a real read */
157                 ecode = tdb->tdb2.transaction->io_methods->tread(tdb, off, buf, len);
158                 if (ecode != TDB_SUCCESS) {
159                         goto fail;
160                 }
161                 return 0;
162         }
163
164         /* it is in the block list. Now check for the last block */
165         if (blk == tdb->tdb2.transaction->num_blocks-1) {
166                 if (len > tdb->tdb2.transaction->last_block_size) {
167                         ecode = TDB_ERR_IO;
168                         goto fail;
169                 }
170         }
171
172         /* now copy it out of this block */
173         memcpy(buf, tdb->tdb2.transaction->blocks[blk] + (off % PAGESIZE), len);
174         return TDB_SUCCESS;
175
176 fail:
177         tdb->tdb2.transaction->transaction_error = 1;
178         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
179                           "transaction_read: failed at off=%zu len=%zu",
180                           (size_t)off, (size_t)len);
181 }
182
183
184 /*
185   write while in a transaction
186 */
187 static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off,
188                                         const void *buf, tdb_len_t len)
189 {
190         size_t blk;
191         enum TDB_ERROR ecode;
192
193         /* Only a commit is allowed on a prepared transaction */
194         if (tdb->tdb2.transaction->prepared) {
195                 ecode = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
196                                    "transaction_write: transaction already"
197                                    " prepared, write not allowed");
198                 goto fail;
199         }
200
201         /* break it up into block sized chunks */
202         while (len + (off % PAGESIZE) > PAGESIZE) {
203                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
204                 ecode = transaction_write(tdb, off, buf, len2);
205                 if (ecode != TDB_SUCCESS) {
206                         return -1;
207                 }
208                 len -= len2;
209                 off += len2;
210                 if (buf != NULL) {
211                         buf = (const void *)(len2 + (const char *)buf);
212                 }
213         }
214
215         if (len == 0) {
216                 return TDB_SUCCESS;
217         }
218
219         blk = off / PAGESIZE;
220         off = off % PAGESIZE;
221
222         if (tdb->tdb2.transaction->num_blocks <= blk) {
223                 uint8_t **new_blocks;
224                 /* expand the blocks array */
225                 if (tdb->tdb2.transaction->blocks == NULL) {
226                         new_blocks = (uint8_t **)malloc(
227                                 (blk+1)*sizeof(uint8_t *));
228                 } else {
229                         new_blocks = (uint8_t **)realloc(
230                                 tdb->tdb2.transaction->blocks,
231                                 (blk+1)*sizeof(uint8_t *));
232                 }
233                 if (new_blocks == NULL) {
234                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
235                                            "transaction_write:"
236                                            " failed to allocate");
237                         goto fail;
238                 }
239                 memset(&new_blocks[tdb->tdb2.transaction->num_blocks], 0,
240                        (1+(blk - tdb->tdb2.transaction->num_blocks))*sizeof(uint8_t *));
241                 tdb->tdb2.transaction->blocks = new_blocks;
242                 tdb->tdb2.transaction->num_blocks = blk+1;
243                 tdb->tdb2.transaction->last_block_size = 0;
244         }
245
246         /* allocate and fill a block? */
247         if (tdb->tdb2.transaction->blocks[blk] == NULL) {
248                 tdb->tdb2.transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1);
249                 if (tdb->tdb2.transaction->blocks[blk] == NULL) {
250                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
251                                            "transaction_write:"
252                                            " failed to allocate");
253                         goto fail;
254                 }
255                 if (tdb->tdb2.transaction->old_map_size > blk * PAGESIZE) {
256                         tdb_len_t len2 = PAGESIZE;
257                         if (len2 + (blk * PAGESIZE) > tdb->tdb2.transaction->old_map_size) {
258                                 len2 = tdb->tdb2.transaction->old_map_size - (blk * PAGESIZE);
259                         }
260                         ecode = tdb->tdb2.transaction->io_methods->tread(tdb,
261                                         blk * PAGESIZE,
262                                         tdb->tdb2.transaction->blocks[blk],
263                                         len2);
264                         if (ecode != TDB_SUCCESS) {
265                                 ecode = tdb_logerr(tdb, ecode,
266                                                    TDB_LOG_ERROR,
267                                                    "transaction_write:"
268                                                    " failed to"
269                                                    " read old block: %s",
270                                                    strerror(errno));
271                                 SAFE_FREE(tdb->tdb2.transaction->blocks[blk]);
272                                 goto fail;
273                         }
274                         if (blk == tdb->tdb2.transaction->num_blocks-1) {
275                                 tdb->tdb2.transaction->last_block_size = len2;
276                         }
277                 }
278         }
279
280         /* overwrite part of an existing block */
281         if (buf == NULL) {
282                 memset(tdb->tdb2.transaction->blocks[blk] + off, 0, len);
283         } else {
284                 memcpy(tdb->tdb2.transaction->blocks[blk] + off, buf, len);
285         }
286         if (blk == tdb->tdb2.transaction->num_blocks-1) {
287                 if (len + off > tdb->tdb2.transaction->last_block_size) {
288                         tdb->tdb2.transaction->last_block_size = len + off;
289                 }
290         }
291
292         return TDB_SUCCESS;
293
294 fail:
295         tdb->tdb2.transaction->transaction_error = 1;
296         return ecode;
297 }
298
299
300 /*
301   write while in a transaction - this variant never expands the transaction blocks, it only
302   updates existing blocks. This means it cannot change the recovery size
303 */
304 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
305                                        const void *buf, tdb_len_t len)
306 {
307         size_t blk;
308
309         /* break it up into block sized chunks */
310         while (len + (off % PAGESIZE) > PAGESIZE) {
311                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
312                 transaction_write_existing(tdb, off, buf, len2);
313                 len -= len2;
314                 off += len2;
315                 if (buf != NULL) {
316                         buf = (const void *)(len2 + (const char *)buf);
317                 }
318         }
319
320         if (len == 0) {
321                 return;
322         }
323
324         blk = off / PAGESIZE;
325         off = off % PAGESIZE;
326
327         if (tdb->tdb2.transaction->num_blocks <= blk ||
328             tdb->tdb2.transaction->blocks[blk] == NULL) {
329                 return;
330         }
331
332         if (blk == tdb->tdb2.transaction->num_blocks-1 &&
333             off + len > tdb->tdb2.transaction->last_block_size) {
334                 if (off >= tdb->tdb2.transaction->last_block_size) {
335                         return;
336                 }
337                 len = tdb->tdb2.transaction->last_block_size - off;
338         }
339
340         /* overwrite part of an existing block */
341         memcpy(tdb->tdb2.transaction->blocks[blk] + off, buf, len);
342 }
343
344
345 /*
346   out of bounds check during a transaction
347 */
348 static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len,
349                                       bool probe)
350 {
351         if (len <= tdb->file->map_size || probe) {
352                 return TDB_SUCCESS;
353         }
354
355         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
356                    "tdb_oob len %lld beyond transaction size %lld",
357                    (long long)len,
358                    (long long)tdb->file->map_size);
359         return TDB_ERR_IO;
360 }
361
362 /*
363   transaction version of tdb_expand().
364 */
365 static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb,
366                                               tdb_off_t addition)
367 {
368         enum TDB_ERROR ecode;
369
370         /* add a write to the transaction elements, so subsequent
371            reads see the zero data */
372         ecode = transaction_write(tdb, tdb->file->map_size, NULL, addition);
373         if (ecode == TDB_SUCCESS) {
374                 tdb->file->map_size += addition;
375         }
376         return ecode;
377 }
378
379 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
380                                 size_t len, bool write_mode)
381 {
382         size_t blk = off / PAGESIZE, end_blk;
383
384         /* This is wrong for zero-length blocks, but will fail gracefully */
385         end_blk = (off + len - 1) / PAGESIZE;
386
387         /* Can only do direct if in single block and we've already copied. */
388         if (write_mode) {
389                 tdb->stats.transaction_write_direct++;
390                 if (blk != end_blk
391                     || blk >= tdb->tdb2.transaction->num_blocks
392                     || tdb->tdb2.transaction->blocks[blk] == NULL) {
393                         tdb->stats.transaction_write_direct_fail++;
394                         return NULL;
395                 }
396                 return tdb->tdb2.transaction->blocks[blk] + off % PAGESIZE;
397         }
398
399         tdb->stats.transaction_read_direct++;
400         /* Single which we have copied? */
401         if (blk == end_blk
402             && blk < tdb->tdb2.transaction->num_blocks
403             && tdb->tdb2.transaction->blocks[blk])
404                 return tdb->tdb2.transaction->blocks[blk] + off % PAGESIZE;
405
406         /* Otherwise must be all not copied. */
407         while (blk <= end_blk) {
408                 if (blk >= tdb->tdb2.transaction->num_blocks)
409                         break;
410                 if (tdb->tdb2.transaction->blocks[blk]) {
411                         tdb->stats.transaction_read_direct_fail++;
412                         return NULL;
413                 }
414                 blk++;
415         }
416         return tdb->tdb2.transaction->io_methods->direct(tdb, off, len, false);
417 }
418
419 static const struct tdb_methods transaction_methods = {
420         transaction_read,
421         transaction_write,
422         transaction_oob,
423         transaction_expand_file,
424         transaction_direct,
425 };
426
427 /*
428   sync to disk
429 */
430 static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
431                                        tdb_off_t offset, tdb_len_t length)
432 {
433         if (tdb->flags & TDB_NOSYNC) {
434                 return TDB_SUCCESS;
435         }
436
437         if (fsync(tdb->file->fd) != 0) {
438                 return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
439                                   "tdb_transaction: fsync failed: %s",
440                                   strerror(errno));
441         }
442 #ifdef MS_SYNC
443         if (tdb->file->map_ptr) {
444                 tdb_off_t moffset = offset & ~(getpagesize()-1);
445                 if (msync(moffset + (char *)tdb->file->map_ptr,
446                           length + (offset - moffset), MS_SYNC) != 0) {
447                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
448                                           "tdb_transaction: msync failed: %s",
449                                           strerror(errno));
450                 }
451         }
452 #endif
453         return TDB_SUCCESS;
454 }
455
456
457 static void _tdb_transaction_cancel(struct tdb_context *tdb)
458 {
459         int i;
460         enum TDB_ERROR ecode;
461
462         if (tdb->tdb2.transaction == NULL) {
463                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
464                            "tdb_transaction_cancel: no transaction");
465                 return;
466         }
467
468         if (tdb->tdb2.transaction->nesting != 0) {
469                 tdb->tdb2.transaction->transaction_error = 1;
470                 tdb->tdb2.transaction->nesting--;
471                 return;
472         }
473
474         tdb->file->map_size = tdb->tdb2.transaction->old_map_size;
475
476         /* free all the transaction blocks */
477         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
478                 if (tdb->tdb2.transaction->blocks[i] != NULL) {
479                         free(tdb->tdb2.transaction->blocks[i]);
480                 }
481         }
482         SAFE_FREE(tdb->tdb2.transaction->blocks);
483
484         if (tdb->tdb2.transaction->magic_offset) {
485                 const struct tdb_methods *methods = tdb->tdb2.transaction->io_methods;
486                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
487
488                 /* remove the recovery marker */
489                 ecode = methods->twrite(tdb, tdb->tdb2.transaction->magic_offset,
490                                         &invalid, sizeof(invalid));
491                 if (ecode == TDB_SUCCESS)
492                         ecode = transaction_sync(tdb,
493                                                  tdb->tdb2.transaction->magic_offset,
494                                                  sizeof(invalid));
495                 if (ecode != TDB_SUCCESS) {
496                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
497                                    "tdb_transaction_cancel: failed to remove"
498                                    " recovery magic");
499                 }
500         }
501
502         if (tdb->file->allrecord_lock.count)
503                 tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
504
505         /* restore the normal io methods */
506         tdb->tdb2.io = tdb->tdb2.transaction->io_methods;
507
508         tdb_transaction_unlock(tdb, F_WRLCK);
509
510         if (tdb_has_open_lock(tdb))
511                 tdb_unlock_open(tdb, F_WRLCK);
512
513         SAFE_FREE(tdb->tdb2.transaction);
514 }
515
516 /*
517   start a tdb transaction. No token is returned, as only a single
518   transaction is allowed to be pending per tdb_context
519 */
520 enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
521 {
522         enum TDB_ERROR ecode;
523
524         if (tdb->flags & TDB_VERSION1) {
525                 if (tdb1_transaction_start(tdb) == -1)
526                         return tdb->last_error;
527                 return TDB_SUCCESS;
528         }
529
530         tdb->stats.transactions++;
531         /* some sanity checks */
532         if (tdb->flags & TDB_INTERNAL) {
533                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
534                                                     TDB_LOG_USE_ERROR,
535                                                     "tdb_transaction_start:"
536                                                     " cannot start a"
537                                                     " transaction on an"
538                                                     " internal tdb");
539         }
540
541         if (tdb->flags & TDB_RDONLY) {
542                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_RDONLY,
543                                                     TDB_LOG_USE_ERROR,
544                                                     "tdb_transaction_start:"
545                                                     " cannot start a"
546                                                     " transaction on a "
547                                                     " read-only tdb");
548         }
549
550         /* cope with nested tdb_transaction_start() calls */
551         if (tdb->tdb2.transaction != NULL) {
552                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
553                         return tdb->last_error
554                                 = tdb_logerr(tdb, TDB_ERR_IO,
555                                              TDB_LOG_USE_ERROR,
556                                              "tdb_transaction_start:"
557                                              " already inside transaction");
558                 }
559                 tdb->tdb2.transaction->nesting++;
560                 tdb->stats.transaction_nest++;
561                 return 0;
562         }
563
564         if (tdb_has_hash_locks(tdb)) {
565                 /* the caller must not have any locks when starting a
566                    transaction as otherwise we'll be screwed by lack
567                    of nested locks in POSIX */
568                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK,
569                                                     TDB_LOG_USE_ERROR,
570                                                     "tdb_transaction_start:"
571                                                     " cannot start a"
572                                                     " transaction with locks"
573                                                     " held");
574         }
575
576         tdb->tdb2.transaction = (struct tdb_transaction *)
577                 calloc(sizeof(struct tdb_transaction), 1);
578         if (tdb->tdb2.transaction == NULL) {
579                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM,
580                                                     TDB_LOG_ERROR,
581                                                     "tdb_transaction_start:"
582                                                     " cannot allocate");
583         }
584
585         /* get the transaction write lock. This is a blocking lock. As
586            discussed with Volker, there are a number of ways we could
587            make this async, which we will probably do in the future */
588         ecode = tdb_transaction_lock(tdb, F_WRLCK);
589         if (ecode != TDB_SUCCESS) {
590                 SAFE_FREE(tdb->tdb2.transaction->blocks);
591                 SAFE_FREE(tdb->tdb2.transaction);
592                 return tdb->last_error = ecode;
593         }
594
595         /* get a read lock over entire file. This is upgraded to a write
596            lock during the commit */
597         ecode = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true);
598         if (ecode != TDB_SUCCESS) {
599                 goto fail_allrecord_lock;
600         }
601
602         /* make sure we know about any file expansions already done by
603            anyone else */
604         tdb->tdb2.io->oob(tdb, tdb->file->map_size + 1, true);
605         tdb->tdb2.transaction->old_map_size = tdb->file->map_size;
606
607         /* finally hook the io methods, replacing them with
608            transaction specific methods */
609         tdb->tdb2.transaction->io_methods = tdb->tdb2.io;
610         tdb->tdb2.io = &transaction_methods;
611         return tdb->last_error = TDB_SUCCESS;
612
613 fail_allrecord_lock:
614         tdb_transaction_unlock(tdb, F_WRLCK);
615         SAFE_FREE(tdb->tdb2.transaction->blocks);
616         SAFE_FREE(tdb->tdb2.transaction);
617         return tdb->last_error = ecode;
618 }
619
620
621 /*
622   cancel the current transaction
623 */
624 void tdb_transaction_cancel(struct tdb_context *tdb)
625 {
626         if (tdb->flags & TDB_VERSION1) {
627                 tdb1_transaction_cancel(tdb);
628                 return;
629         }
630         tdb->stats.transaction_cancel++;
631         _tdb_transaction_cancel(tdb);
632 }
633
634 /*
635   work out how much space the linearised recovery data will consume (worst case)
636 */
637 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
638 {
639         tdb_len_t recovery_size = 0;
640         int i;
641
642         recovery_size = 0;
643         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
644                 if (i * PAGESIZE >= tdb->tdb2.transaction->old_map_size) {
645                         break;
646                 }
647                 if (tdb->tdb2.transaction->blocks[i] == NULL) {
648                         continue;
649                 }
650                 recovery_size += 2*sizeof(tdb_off_t);
651                 if (i == tdb->tdb2.transaction->num_blocks-1) {
652                         recovery_size += tdb->tdb2.transaction->last_block_size;
653                 } else {
654                         recovery_size += PAGESIZE;
655                 }
656         }
657
658         return recovery_size;
659 }
660
661 static enum TDB_ERROR tdb_recovery_area(struct tdb_context *tdb,
662                                         const struct tdb_methods *methods,
663                                         tdb_off_t *recovery_offset,
664                                         struct tdb_recovery_record *rec)
665 {
666         enum TDB_ERROR ecode;
667
668         *recovery_offset = tdb_read_off(tdb,
669                                         offsetof(struct tdb_header, recovery));
670         if (TDB_OFF_IS_ERR(*recovery_offset)) {
671                 return *recovery_offset;
672         }
673
674         if (*recovery_offset == 0) {
675                 rec->max_len = 0;
676                 return TDB_SUCCESS;
677         }
678
679         ecode = methods->tread(tdb, *recovery_offset, rec, sizeof(*rec));
680         if (ecode != TDB_SUCCESS)
681                 return ecode;
682
683         tdb_convert(tdb, rec, sizeof(*rec));
684         /* ignore invalid recovery regions: can happen in crash */
685         if (rec->magic != TDB_RECOVERY_MAGIC &&
686             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
687                 *recovery_offset = 0;
688                 rec->max_len = 0;
689         }
690         return TDB_SUCCESS;
691 }
692
693 static unsigned int same(const unsigned char *new,
694                          const unsigned char *old,
695                          unsigned int length)
696 {
697         unsigned int i;
698
699         for (i = 0; i < length; i++) {
700                 if (new[i] != old[i])
701                         break;
702         }
703         return i;
704 }
705
706 static unsigned int different(const unsigned char *new,
707                               const unsigned char *old,
708                               unsigned int length,
709                               unsigned int min_same,
710                               unsigned int *samelen)
711 {
712         unsigned int i;
713
714         *samelen = 0;
715         for (i = 0; i < length; i++) {
716                 if (new[i] == old[i]) {
717                         (*samelen)++;
718                 } else {
719                         if (*samelen >= min_same) {
720                                 return i - *samelen;
721                         }
722                         *samelen = 0;
723                 }
724         }
725
726         if (*samelen < min_same)
727                 *samelen = 0;
728         return length - *samelen;
729 }
730
731 /* Allocates recovery blob, without tdb_recovery_record at head set up. */
732 static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
733                                                   tdb_len_t *len)
734 {
735         struct tdb_recovery_record *rec;
736         size_t i;
737         enum TDB_ERROR ecode;
738         unsigned char *p;
739         const struct tdb_methods *old_methods = tdb->tdb2.io;
740
741         rec = malloc(sizeof(*rec) + tdb_recovery_size(tdb));
742         if (!rec) {
743                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
744                            "transaction_setup_recovery:"
745                            " cannot allocate");
746                 return TDB_ERR_PTR(TDB_ERR_OOM);
747         }
748
749         /* We temporarily revert to the old I/O methods, so we can use
750          * tdb_access_read */
751         tdb->tdb2.io = tdb->tdb2.transaction->io_methods;
752
753         /* build the recovery data into a single blob to allow us to do a single
754            large write, which should be more efficient */
755         p = (unsigned char *)(rec + 1);
756         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
757                 tdb_off_t offset;
758                 tdb_len_t length;
759                 unsigned int off;
760                 const unsigned char *buffer;
761
762                 if (tdb->tdb2.transaction->blocks[i] == NULL) {
763                         continue;
764                 }
765
766                 offset = i * PAGESIZE;
767                 length = PAGESIZE;
768                 if (i == tdb->tdb2.transaction->num_blocks-1) {
769                         length = tdb->tdb2.transaction->last_block_size;
770                 }
771
772                 if (offset >= tdb->tdb2.transaction->old_map_size) {
773                         continue;
774                 }
775
776                 if (offset + length > tdb->file->map_size) {
777                         ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
778                                            "tdb_transaction_setup_recovery:"
779                                            " transaction data over new region"
780                                            " boundary");
781                         goto fail;
782                 }
783                 if (offset + length > tdb->tdb2.transaction->old_map_size) {
784                         /* Short read at EOF. */
785                         length = tdb->tdb2.transaction->old_map_size - offset;
786                 }
787                 buffer = tdb_access_read(tdb, offset, length, false);
788                 if (TDB_PTR_IS_ERR(buffer)) {
789                         ecode = TDB_PTR_ERR(buffer);
790                         goto fail;
791                 }
792
793                 /* Skip over anything the same at the start. */
794                 off = same(tdb->tdb2.transaction->blocks[i], buffer, length);
795                 offset += off;
796
797                 while (off < length) {
798                         tdb_len_t len;
799                         unsigned int samelen;
800
801                         len = different(tdb->tdb2.transaction->blocks[i] + off,
802                                         buffer + off, length - off,
803                                         sizeof(offset) + sizeof(len) + 1,
804                                         &samelen);
805
806                         memcpy(p, &offset, sizeof(offset));
807                         memcpy(p + sizeof(offset), &len, sizeof(len));
808                         tdb_convert(tdb, p, sizeof(offset) + sizeof(len));
809                         p += sizeof(offset) + sizeof(len);
810                         memcpy(p, buffer + off, len);
811                         p += len;
812                         off += len + samelen;
813                         offset += len + samelen;
814                 }
815                 tdb_access_release(tdb, buffer);
816         }
817
818         *len = p - (unsigned char *)(rec + 1);
819         tdb->tdb2.io = old_methods;
820         return rec;
821
822 fail:
823         free(rec);
824         tdb->tdb2.io = old_methods;
825         return TDB_ERR_PTR(ecode);
826 }
827
828 static tdb_off_t create_recovery_area(struct tdb_context *tdb,
829                                       tdb_len_t rec_length,
830                                       struct tdb_recovery_record *rec)
831 {
832         tdb_off_t off, recovery_off;
833         tdb_len_t addition;
834         enum TDB_ERROR ecode;
835         const struct tdb_methods *methods = tdb->tdb2.transaction->io_methods;
836
837         /* round up to a multiple of page size. Overallocate, since each
838          * such allocation forces us to expand the file. */
839         rec->max_len
840                 = (((sizeof(*rec) + rec_length + rec_length / 2)
841                     + PAGESIZE-1) & ~(PAGESIZE-1))
842                 - sizeof(*rec);
843         off = tdb->file->map_size;
844
845         /* Restore ->map_size before calling underlying expand_file.
846            Also so that we don't try to expand the file again in the
847            transaction commit, which would destroy the recovery
848            area */
849         addition = (tdb->file->map_size - tdb->tdb2.transaction->old_map_size) +
850                 sizeof(*rec) + rec->max_len;
851         tdb->file->map_size = tdb->tdb2.transaction->old_map_size;
852         tdb->stats.transaction_expand_file++;
853         ecode = methods->expand_file(tdb, addition);
854         if (ecode != TDB_SUCCESS) {
855                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
856                                   "tdb_recovery_allocate:"
857                                   " failed to create recovery area");
858         }
859
860         /* we have to reset the old map size so that we don't try to
861            expand the file again in the transaction commit, which
862            would destroy the recovery area */
863         tdb->tdb2.transaction->old_map_size = tdb->file->map_size;
864
865         /* write the recovery header offset and sync - we can sync without a race here
866            as the magic ptr in the recovery record has not been set */
867         recovery_off = off;
868         tdb_convert(tdb, &recovery_off, sizeof(recovery_off));
869         ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery),
870                                 &recovery_off, sizeof(tdb_off_t));
871         if (ecode != TDB_SUCCESS) {
872                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
873                                   "tdb_recovery_allocate:"
874                                   " failed to write recovery head");
875         }
876         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
877                                    &recovery_off,
878                                    sizeof(tdb_off_t));
879         return off;
880 }
881
882 /*
883   setup the recovery data that will be used on a crash during commit
884 */
885 static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb)
886 {
887         tdb_len_t recovery_size = 0;
888         tdb_off_t recovery_off = 0;
889         tdb_off_t old_map_size = tdb->tdb2.transaction->old_map_size;
890         struct tdb_recovery_record *recovery;
891         const struct tdb_methods *methods = tdb->tdb2.transaction->io_methods;
892         uint64_t magic;
893         enum TDB_ERROR ecode;
894
895         recovery = alloc_recovery(tdb, &recovery_size);
896         if (TDB_PTR_IS_ERR(recovery))
897                 return TDB_PTR_ERR(recovery);
898
899         ecode = tdb_recovery_area(tdb, methods, &recovery_off, recovery);
900         if (ecode) {
901                 free(recovery);
902                 return ecode;
903         }
904
905         if (recovery->max_len < recovery_size) {
906                 /* Not large enough. Free up old recovery area. */
907                 if (recovery_off) {
908                         tdb->stats.frees++;
909                         ecode = add_free_record(tdb, recovery_off,
910                                                 sizeof(*recovery)
911                                                 + recovery->max_len,
912                                                 TDB_LOCK_WAIT, true);
913                         free(recovery);
914                         if (ecode != TDB_SUCCESS) {
915                                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
916                                                   "tdb_recovery_allocate:"
917                                                   " failed to free previous"
918                                                   " recovery area");
919                         }
920
921                         /* Refresh recovery after add_free_record above. */
922                         recovery = alloc_recovery(tdb, &recovery_size);
923                         if (TDB_PTR_IS_ERR(recovery))
924                                 return TDB_PTR_ERR(recovery);
925                 }
926
927                 recovery_off = create_recovery_area(tdb, recovery_size,
928                                                     recovery);
929                 if (TDB_OFF_IS_ERR(recovery_off)) {
930                         free(recovery);
931                         return recovery_off;
932                 }
933         }
934
935         /* Now we know size, convert rec header. */
936         recovery->magic = TDB_RECOVERY_INVALID_MAGIC;
937         recovery->len = recovery_size;
938         recovery->eof = old_map_size;
939         tdb_convert(tdb, recovery, sizeof(*recovery));
940
941         /* write the recovery data to the recovery area */
942         ecode = methods->twrite(tdb, recovery_off, recovery, recovery_size);
943         if (ecode != TDB_SUCCESS) {
944                 free(recovery);
945                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
946                                   "tdb_transaction_setup_recovery:"
947                                   " failed to write recovery data");
948         }
949         transaction_write_existing(tdb, recovery_off, recovery, recovery_size);
950
951         free(recovery);
952
953         /* as we don't have ordered writes, we have to sync the recovery
954            data before we update the magic to indicate that the recovery
955            data is present */
956         ecode = transaction_sync(tdb, recovery_off, recovery_size);
957         if (ecode != TDB_SUCCESS)
958                 return ecode;
959
960         magic = TDB_RECOVERY_MAGIC;
961         tdb_convert(tdb, &magic, sizeof(magic));
962
963         tdb->tdb2.transaction->magic_offset
964                 = recovery_off + offsetof(struct tdb_recovery_record, magic);
965
966         ecode = methods->twrite(tdb, tdb->tdb2.transaction->magic_offset,
967                                 &magic, sizeof(magic));
968         if (ecode != TDB_SUCCESS) {
969                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
970                                   "tdb_transaction_setup_recovery:"
971                                   " failed to write recovery magic");
972         }
973         transaction_write_existing(tdb, tdb->tdb2.transaction->magic_offset,
974                                    &magic, sizeof(magic));
975
976         /* ensure the recovery magic marker is on disk */
977         return transaction_sync(tdb, tdb->tdb2.transaction->magic_offset,
978                                 sizeof(magic));
979 }
980
981 static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
982 {
983         const struct tdb_methods *methods;
984         enum TDB_ERROR ecode;
985
986         if (tdb->tdb2.transaction == NULL) {
987                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
988                                   "tdb_transaction_prepare_commit:"
989                                   " no transaction");
990         }
991
992         if (tdb->tdb2.transaction->prepared) {
993                 _tdb_transaction_cancel(tdb);
994                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
995                                   "tdb_transaction_prepare_commit:"
996                                   " transaction already prepared");
997         }
998
999         if (tdb->tdb2.transaction->transaction_error) {
1000                 _tdb_transaction_cancel(tdb);
1001                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
1002                                   "tdb_transaction_prepare_commit:"
1003                                   " transaction error pending");
1004         }
1005
1006
1007         if (tdb->tdb2.transaction->nesting != 0) {
1008                 return TDB_SUCCESS;
1009         }
1010
1011         /* check for a null transaction */
1012         if (tdb->tdb2.transaction->blocks == NULL) {
1013                 return TDB_SUCCESS;
1014         }
1015
1016         methods = tdb->tdb2.transaction->io_methods;
1017
1018         /* upgrade the main transaction lock region to a write lock */
1019         ecode = tdb_allrecord_upgrade(tdb, TDB_HASH_LOCK_START);
1020         if (ecode != TDB_SUCCESS) {
1021                 return ecode;
1022         }
1023
1024         /* get the open lock - this prevents new users attaching to the database
1025            during the commit */
1026         ecode = tdb_lock_open(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
1027         if (ecode != TDB_SUCCESS) {
1028                 return ecode;
1029         }
1030
1031         /* Since we have whole db locked, we don't need the expansion lock. */
1032         if (!(tdb->flags & TDB_NOSYNC)) {
1033                 /* Sets up tdb->tdb2.transaction->recovery and
1034                  * tdb->tdb2.transaction->magic_offset. */
1035                 ecode = transaction_setup_recovery(tdb);
1036                 if (ecode != TDB_SUCCESS) {
1037                         return ecode;
1038                 }
1039         }
1040
1041         tdb->tdb2.transaction->prepared = true;
1042
1043         /* expand the file to the new size if needed */
1044         if (tdb->file->map_size != tdb->tdb2.transaction->old_map_size) {
1045                 tdb_len_t add;
1046
1047                 add = tdb->file->map_size - tdb->tdb2.transaction->old_map_size;
1048                 /* Restore original map size for tdb_expand_file */
1049                 tdb->file->map_size = tdb->tdb2.transaction->old_map_size;
1050                 ecode = methods->expand_file(tdb, add);
1051                 if (ecode != TDB_SUCCESS) {
1052                         return ecode;
1053                 }
1054         }
1055
1056         /* Keep the open lock until the actual commit */
1057         return TDB_SUCCESS;
1058 }
1059
1060 /*
1061    prepare to commit the current transaction
1062 */
1063 enum TDB_ERROR tdb_transaction_prepare_commit(struct tdb_context *tdb)
1064 {
1065         if (tdb->flags & TDB_VERSION1) {
1066                 if (tdb1_transaction_prepare_commit(tdb) == -1)
1067                         return tdb->last_error;
1068                 return TDB_SUCCESS;
1069         }
1070         return _tdb_transaction_prepare_commit(tdb);
1071 }
1072
1073 /*
1074   commit the current transaction
1075 */
1076 enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb)
1077 {
1078         const struct tdb_methods *methods;
1079         int i;
1080         enum TDB_ERROR ecode;
1081
1082         if (tdb->flags & TDB_VERSION1) {
1083                 if (tdb1_transaction_commit(tdb) == -1)
1084                         return tdb->last_error;
1085                 return TDB_SUCCESS;
1086         }
1087
1088         if (tdb->tdb2.transaction == NULL) {
1089                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
1090                                                     TDB_LOG_USE_ERROR,
1091                                                     "tdb_transaction_commit:"
1092                                                     " no transaction");
1093         }
1094
1095         tdb_trace(tdb, "tdb_transaction_commit");
1096
1097         if (tdb->tdb2.transaction->nesting != 0) {
1098                 tdb->tdb2.transaction->nesting--;
1099                 return tdb->last_error = TDB_SUCCESS;
1100         }
1101
1102         /* check for a null transaction */
1103         if (tdb->tdb2.transaction->blocks == NULL) {
1104                 _tdb_transaction_cancel(tdb);
1105                 return tdb->last_error = TDB_SUCCESS;
1106         }
1107
1108         if (!tdb->tdb2.transaction->prepared) {
1109                 ecode = _tdb_transaction_prepare_commit(tdb);
1110                 if (ecode != TDB_SUCCESS) {
1111                         _tdb_transaction_cancel(tdb);
1112                         return tdb->last_error = ecode;
1113                 }
1114         }
1115
1116         methods = tdb->tdb2.transaction->io_methods;
1117
1118         /* perform all the writes */
1119         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
1120                 tdb_off_t offset;
1121                 tdb_len_t length;
1122
1123                 if (tdb->tdb2.transaction->blocks[i] == NULL) {
1124                         continue;
1125                 }
1126
1127                 offset = i * PAGESIZE;
1128                 length = PAGESIZE;
1129                 if (i == tdb->tdb2.transaction->num_blocks-1) {
1130                         length = tdb->tdb2.transaction->last_block_size;
1131                 }
1132
1133                 ecode = methods->twrite(tdb, offset,
1134                                         tdb->tdb2.transaction->blocks[i], length);
1135                 if (ecode != TDB_SUCCESS) {
1136                         /* we've overwritten part of the data and
1137                            possibly expanded the file, so we need to
1138                            run the crash recovery code */
1139                         tdb->tdb2.io = methods;
1140                         tdb_transaction_recover(tdb);
1141
1142                         _tdb_transaction_cancel(tdb);
1143
1144                         return tdb->last_error = ecode;
1145                 }
1146                 SAFE_FREE(tdb->tdb2.transaction->blocks[i]);
1147         }
1148
1149         SAFE_FREE(tdb->tdb2.transaction->blocks);
1150         tdb->tdb2.transaction->num_blocks = 0;
1151
1152         /* ensure the new data is on disk */
1153         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1154         if (ecode != TDB_SUCCESS) {
1155                 return tdb->last_error = ecode;
1156         }
1157
1158         /*
1159           TODO: maybe write to some dummy hdr field, or write to magic
1160           offset without mmap, before the last sync, instead of the
1161           utime() call
1162         */
1163
1164         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1165            don't change the mtime of the file, this means the file may
1166            not be backed up (as tdb rounding to block sizes means that
1167            file size changes are quite rare too). The following forces
1168            mtime changes when a transaction completes */
1169 #if HAVE_UTIME
1170         utime(tdb->name, NULL);
1171 #endif
1172
1173         /* use a transaction cancel to free memory and remove the
1174            transaction locks: it "restores" map_size, too. */
1175         tdb->tdb2.transaction->old_map_size = tdb->file->map_size;
1176         _tdb_transaction_cancel(tdb);
1177
1178         return tdb->last_error = TDB_SUCCESS;
1179 }
1180
1181
1182 /*
1183   recover from an aborted transaction. Must be called with exclusive
1184   database write access already established (including the open
1185   lock to prevent new processes attaching)
1186 */
1187 enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb)
1188 {
1189         tdb_off_t recovery_head, recovery_eof;
1190         unsigned char *data, *p;
1191         struct tdb_recovery_record rec;
1192         enum TDB_ERROR ecode;
1193
1194         /* find the recovery area */
1195         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1196         if (TDB_OFF_IS_ERR(recovery_head)) {
1197                 return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
1198                                   "tdb_transaction_recover:"
1199                                   " failed to read recovery head");
1200         }
1201
1202         if (recovery_head == 0) {
1203                 /* we have never allocated a recovery record */
1204                 return TDB_SUCCESS;
1205         }
1206
1207         /* read the recovery record */
1208         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1209         if (ecode != TDB_SUCCESS) {
1210                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1211                                   "tdb_transaction_recover:"
1212                                   " failed to read recovery record");
1213         }
1214
1215         if (rec.magic != TDB_RECOVERY_MAGIC) {
1216                 /* there is no valid recovery data */
1217                 return TDB_SUCCESS;
1218         }
1219
1220         if (tdb->flags & TDB_RDONLY) {
1221                 return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1222                                   "tdb_transaction_recover:"
1223                                   " attempt to recover read only database");
1224         }
1225
1226         recovery_eof = rec.eof;
1227
1228         data = (unsigned char *)malloc(rec.len);
1229         if (data == NULL) {
1230                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1231                                   "tdb_transaction_recover:"
1232                                   " failed to allocate recovery data");
1233         }
1234
1235         /* read the full recovery data */
1236         ecode = tdb->tdb2.io->tread(tdb, recovery_head + sizeof(rec), data,
1237                                     rec.len);
1238         if (ecode != TDB_SUCCESS) {
1239                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1240                                   "tdb_transaction_recover:"
1241                                   " failed to read recovery data");
1242         }
1243
1244         /* recover the file data */
1245         p = data;
1246         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1247                 tdb_off_t ofs;
1248                 tdb_len_t len;
1249                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1250                 memcpy(&ofs, p, sizeof(ofs));
1251                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1252                 p += sizeof(ofs) + sizeof(len);
1253
1254                 ecode = tdb->tdb2.io->twrite(tdb, ofs, p, len);
1255                 if (ecode != TDB_SUCCESS) {
1256                         free(data);
1257                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1258                                           "tdb_transaction_recover:"
1259                                           " failed to recover %zu bytes"
1260                                           " at offset %zu",
1261                                           (size_t)len, (size_t)ofs);
1262                 }
1263                 p += len;
1264         }
1265
1266         free(data);
1267
1268         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1269         if (ecode != TDB_SUCCESS) {
1270                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1271                                   "tdb_transaction_recover:"
1272                                   " failed to sync recovery");
1273         }
1274
1275         /* if the recovery area is after the recovered eof then remove it */
1276         if (recovery_eof <= recovery_head) {
1277                 ecode = tdb_write_off(tdb, offsetof(struct tdb_header,
1278                                                     recovery),
1279                                       0);
1280                 if (ecode != TDB_SUCCESS) {
1281                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1282                                           "tdb_transaction_recover:"
1283                                           " failed to remove recovery head");
1284                 }
1285         }
1286
1287         /* remove the recovery magic */
1288         ecode = tdb_write_off(tdb,
1289                               recovery_head
1290                               + offsetof(struct tdb_recovery_record, magic),
1291                               TDB_RECOVERY_INVALID_MAGIC);
1292         if (ecode != TDB_SUCCESS) {
1293                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1294                                   "tdb_transaction_recover:"
1295                                   " failed to remove recovery magic");
1296         }
1297
1298         ecode = transaction_sync(tdb, 0, recovery_eof);
1299         if (ecode != TDB_SUCCESS) {
1300                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1301                                   "tdb_transaction_recover:"
1302                                   " failed to sync2 recovery");
1303         }
1304
1305         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1306                    "tdb_transaction_recover: recovered %zu byte database",
1307                    (size_t)recovery_eof);
1308
1309         /* all done */
1310         return TDB_SUCCESS;
1311 }
1312
1313 tdb_bool_err tdb_needs_recovery(struct tdb_context *tdb)
1314 {
1315         tdb_off_t recovery_head;
1316         struct tdb_recovery_record rec;
1317         enum TDB_ERROR ecode;
1318
1319         /* find the recovery area */
1320         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1321         if (TDB_OFF_IS_ERR(recovery_head)) {
1322                 return recovery_head;
1323         }
1324
1325         if (recovery_head == 0) {
1326                 /* we have never allocated a recovery record */
1327                 return false;
1328         }
1329
1330         /* read the recovery record */
1331         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1332         if (ecode != TDB_SUCCESS) {
1333                 return ecode;
1334         }
1335
1336         return (rec.magic == TDB_RECOVERY_MAGIC);
1337 }