]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
failtest: add --trace to replace --tracepath
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free((void *)x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in POSIX locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is canceled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or canceled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* the original io methods - used to do IOs to the real db */
96         const struct tdb_methods *io_methods;
97
98         /* the list of transaction blocks. When a block is first
99            written to, it gets created in this list */
100         uint8_t **blocks;
101         size_t num_blocks;
102         size_t last_block_size; /* number of valid bytes in the last block */
103
104         /* non-zero when an internal transaction error has
105            occurred. All write operations will then fail until the
106            transaction is ended */
107         int transaction_error;
108
109         /* when inside a transaction we need to keep track of any
110            nested tdb_transaction_start() calls, as these are allowed,
111            but don't create a new transaction */
112         unsigned int nesting;
113
114         /* set when a prepare has already occurred */
115         bool prepared;
116         tdb_off_t magic_offset;
117
118         /* old file size before transaction */
119         tdb_len_t old_map_size;
120 };
121
122 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
123 #define PAGESIZE 65536
124
125 /*
126   read while in a transaction. We need to check first if the data is in our list
127   of transaction elements, then if not do a real read
128 */
129 static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off,
130                                        void *buf, tdb_len_t len)
131 {
132         size_t blk;
133         enum TDB_ERROR ecode;
134
135         /* break it down into block sized ops */
136         while (len + (off % PAGESIZE) > PAGESIZE) {
137                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
138                 ecode = transaction_read(tdb, off, buf, len2);
139                 if (ecode != TDB_SUCCESS) {
140                         return ecode;
141                 }
142                 len -= len2;
143                 off += len2;
144                 buf = (void *)(len2 + (char *)buf);
145         }
146
147         if (len == 0) {
148                 return TDB_SUCCESS;
149         }
150
151         blk = off / PAGESIZE;
152
153         /* see if we have it in the block list */
154         if (tdb->tdb2.transaction->num_blocks <= blk ||
155             tdb->tdb2.transaction->blocks[blk] == NULL) {
156                 /* nope, do a real read */
157                 ecode = tdb->tdb2.transaction->io_methods->tread(tdb, off, buf, len);
158                 if (ecode != TDB_SUCCESS) {
159                         goto fail;
160                 }
161                 return 0;
162         }
163
164         /* it is in the block list. Now check for the last block */
165         if (blk == tdb->tdb2.transaction->num_blocks-1) {
166                 if (len > tdb->tdb2.transaction->last_block_size) {
167                         ecode = TDB_ERR_IO;
168                         goto fail;
169                 }
170         }
171
172         /* now copy it out of this block */
173         memcpy(buf, tdb->tdb2.transaction->blocks[blk] + (off % PAGESIZE), len);
174         return TDB_SUCCESS;
175
176 fail:
177         tdb->tdb2.transaction->transaction_error = 1;
178         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
179                           "transaction_read: failed at off=%zu len=%zu",
180                           (size_t)off, (size_t)len);
181 }
182
183
184 /*
185   write while in a transaction
186 */
187 static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off,
188                                         const void *buf, tdb_len_t len)
189 {
190         size_t blk;
191         enum TDB_ERROR ecode;
192
193         /* Only a commit is allowed on a prepared transaction */
194         if (tdb->tdb2.transaction->prepared) {
195                 ecode = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
196                                    "transaction_write: transaction already"
197                                    " prepared, write not allowed");
198                 goto fail;
199         }
200
201         /* break it up into block sized chunks */
202         while (len + (off % PAGESIZE) > PAGESIZE) {
203                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
204                 ecode = transaction_write(tdb, off, buf, len2);
205                 if (ecode != TDB_SUCCESS) {
206                         return ecode;
207                 }
208                 len -= len2;
209                 off += len2;
210                 if (buf != NULL) {
211                         buf = (const void *)(len2 + (const char *)buf);
212                 }
213         }
214
215         if (len == 0) {
216                 return TDB_SUCCESS;
217         }
218
219         blk = off / PAGESIZE;
220         off = off % PAGESIZE;
221
222         if (tdb->tdb2.transaction->num_blocks <= blk) {
223                 uint8_t **new_blocks;
224                 /* expand the blocks array */
225                 if (tdb->tdb2.transaction->blocks == NULL) {
226                         new_blocks = (uint8_t **)malloc(
227                                 (blk+1)*sizeof(uint8_t *));
228                 } else {
229                         new_blocks = (uint8_t **)realloc(
230                                 tdb->tdb2.transaction->blocks,
231                                 (blk+1)*sizeof(uint8_t *));
232                 }
233                 if (new_blocks == NULL) {
234                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
235                                            "transaction_write:"
236                                            " failed to allocate");
237                         goto fail;
238                 }
239                 memset(&new_blocks[tdb->tdb2.transaction->num_blocks], 0,
240                        (1+(blk - tdb->tdb2.transaction->num_blocks))*sizeof(uint8_t *));
241                 tdb->tdb2.transaction->blocks = new_blocks;
242                 tdb->tdb2.transaction->num_blocks = blk+1;
243                 tdb->tdb2.transaction->last_block_size = 0;
244         }
245
246         /* allocate and fill a block? */
247         if (tdb->tdb2.transaction->blocks[blk] == NULL) {
248                 tdb->tdb2.transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1);
249                 if (tdb->tdb2.transaction->blocks[blk] == NULL) {
250                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
251                                            "transaction_write:"
252                                            " failed to allocate");
253                         goto fail;
254                 }
255                 if (tdb->tdb2.transaction->old_map_size > blk * PAGESIZE) {
256                         tdb_len_t len2 = PAGESIZE;
257                         if (len2 + (blk * PAGESIZE) > tdb->tdb2.transaction->old_map_size) {
258                                 len2 = tdb->tdb2.transaction->old_map_size - (blk * PAGESIZE);
259                         }
260                         ecode = tdb->tdb2.transaction->io_methods->tread(tdb,
261                                         blk * PAGESIZE,
262                                         tdb->tdb2.transaction->blocks[blk],
263                                         len2);
264                         if (ecode != TDB_SUCCESS) {
265                                 ecode = tdb_logerr(tdb, ecode,
266                                                    TDB_LOG_ERROR,
267                                                    "transaction_write:"
268                                                    " failed to"
269                                                    " read old block: %s",
270                                                    strerror(errno));
271                                 SAFE_FREE(tdb->tdb2.transaction->blocks[blk]);
272                                 goto fail;
273                         }
274                         if (blk == tdb->tdb2.transaction->num_blocks-1) {
275                                 tdb->tdb2.transaction->last_block_size = len2;
276                         }
277                 }
278         }
279
280         /* overwrite part of an existing block */
281         if (buf == NULL) {
282                 memset(tdb->tdb2.transaction->blocks[blk] + off, 0, len);
283         } else {
284                 memcpy(tdb->tdb2.transaction->blocks[blk] + off, buf, len);
285         }
286         if (blk == tdb->tdb2.transaction->num_blocks-1) {
287                 if (len + off > tdb->tdb2.transaction->last_block_size) {
288                         tdb->tdb2.transaction->last_block_size = len + off;
289                 }
290         }
291
292         return TDB_SUCCESS;
293
294 fail:
295         tdb->tdb2.transaction->transaction_error = 1;
296         return ecode;
297 }
298
299
300 /*
301   write while in a transaction - this variant never expands the transaction blocks, it only
302   updates existing blocks. This means it cannot change the recovery size
303 */
304 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
305                                        const void *buf, tdb_len_t len)
306 {
307         size_t blk;
308
309         /* break it up into block sized chunks */
310         while (len + (off % PAGESIZE) > PAGESIZE) {
311                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
312                 transaction_write_existing(tdb, off, buf, len2);
313                 len -= len2;
314                 off += len2;
315                 if (buf != NULL) {
316                         buf = (const void *)(len2 + (const char *)buf);
317                 }
318         }
319
320         if (len == 0) {
321                 return;
322         }
323
324         blk = off / PAGESIZE;
325         off = off % PAGESIZE;
326
327         if (tdb->tdb2.transaction->num_blocks <= blk ||
328             tdb->tdb2.transaction->blocks[blk] == NULL) {
329                 return;
330         }
331
332         if (blk == tdb->tdb2.transaction->num_blocks-1 &&
333             off + len > tdb->tdb2.transaction->last_block_size) {
334                 if (off >= tdb->tdb2.transaction->last_block_size) {
335                         return;
336                 }
337                 len = tdb->tdb2.transaction->last_block_size - off;
338         }
339
340         /* overwrite part of an existing block */
341         memcpy(tdb->tdb2.transaction->blocks[blk] + off, buf, len);
342 }
343
344
345 /*
346   out of bounds check during a transaction
347 */
348 static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len,
349                                       bool probe)
350 {
351         if (len <= tdb->file->map_size || probe) {
352                 return TDB_SUCCESS;
353         }
354
355         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
356                    "tdb_oob len %lld beyond transaction size %lld",
357                    (long long)len,
358                    (long long)tdb->file->map_size);
359         return TDB_ERR_IO;
360 }
361
362 /*
363   transaction version of tdb_expand().
364 */
365 static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb,
366                                               tdb_off_t addition)
367 {
368         enum TDB_ERROR ecode;
369
370         /* add a write to the transaction elements, so subsequent
371            reads see the zero data */
372         ecode = transaction_write(tdb, tdb->file->map_size, NULL, addition);
373         if (ecode == TDB_SUCCESS) {
374                 tdb->file->map_size += addition;
375         }
376         return ecode;
377 }
378
379 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
380                                 size_t len, bool write_mode)
381 {
382         size_t blk = off / PAGESIZE, end_blk;
383
384         /* This is wrong for zero-length blocks, but will fail gracefully */
385         end_blk = (off + len - 1) / PAGESIZE;
386
387         /* Can only do direct if in single block and we've already copied. */
388         if (write_mode) {
389                 tdb->stats.transaction_write_direct++;
390                 if (blk != end_blk
391                     || blk >= tdb->tdb2.transaction->num_blocks
392                     || tdb->tdb2.transaction->blocks[blk] == NULL) {
393                         tdb->stats.transaction_write_direct_fail++;
394                         return NULL;
395                 }
396                 return tdb->tdb2.transaction->blocks[blk] + off % PAGESIZE;
397         }
398
399         tdb->stats.transaction_read_direct++;
400         /* Single which we have copied? */
401         if (blk == end_blk
402             && blk < tdb->tdb2.transaction->num_blocks
403             && tdb->tdb2.transaction->blocks[blk])
404                 return tdb->tdb2.transaction->blocks[blk] + off % PAGESIZE;
405
406         /* Otherwise must be all not copied. */
407         while (blk <= end_blk) {
408                 if (blk >= tdb->tdb2.transaction->num_blocks)
409                         break;
410                 if (tdb->tdb2.transaction->blocks[blk]) {
411                         tdb->stats.transaction_read_direct_fail++;
412                         return NULL;
413                 }
414                 blk++;
415         }
416         return tdb->tdb2.transaction->io_methods->direct(tdb, off, len, false);
417 }
418
419 static const struct tdb_methods transaction_methods = {
420         transaction_read,
421         transaction_write,
422         transaction_oob,
423         transaction_expand_file,
424         transaction_direct,
425 };
426
427 /*
428   sync to disk
429 */
430 static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
431                                        tdb_off_t offset, tdb_len_t length)
432 {
433         if (tdb->flags & TDB_NOSYNC) {
434                 return TDB_SUCCESS;
435         }
436
437         if (fsync(tdb->file->fd) != 0) {
438                 return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
439                                   "tdb_transaction: fsync failed: %s",
440                                   strerror(errno));
441         }
442 #ifdef MS_SYNC
443         if (tdb->file->map_ptr) {
444                 tdb_off_t moffset = offset & ~(getpagesize()-1);
445                 if (msync(moffset + (char *)tdb->file->map_ptr,
446                           length + (offset - moffset), MS_SYNC) != 0) {
447                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
448                                           "tdb_transaction: msync failed: %s",
449                                           strerror(errno));
450                 }
451         }
452 #endif
453         return TDB_SUCCESS;
454 }
455
456
457 static void _tdb_transaction_cancel(struct tdb_context *tdb)
458 {
459         int i;
460         enum TDB_ERROR ecode;
461
462         if (tdb->tdb2.transaction == NULL) {
463                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
464                            "tdb_transaction_cancel: no transaction");
465                 return;
466         }
467
468         if (tdb->tdb2.transaction->nesting != 0) {
469                 tdb->tdb2.transaction->transaction_error = 1;
470                 tdb->tdb2.transaction->nesting--;
471                 return;
472         }
473
474         tdb->file->map_size = tdb->tdb2.transaction->old_map_size;
475
476         /* free all the transaction blocks */
477         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
478                 if (tdb->tdb2.transaction->blocks[i] != NULL) {
479                         free(tdb->tdb2.transaction->blocks[i]);
480                 }
481         }
482         SAFE_FREE(tdb->tdb2.transaction->blocks);
483
484         if (tdb->tdb2.transaction->magic_offset) {
485                 const struct tdb_methods *methods = tdb->tdb2.transaction->io_methods;
486                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
487
488                 /* remove the recovery marker */
489                 ecode = methods->twrite(tdb, tdb->tdb2.transaction->magic_offset,
490                                         &invalid, sizeof(invalid));
491                 if (ecode == TDB_SUCCESS)
492                         ecode = transaction_sync(tdb,
493                                                  tdb->tdb2.transaction->magic_offset,
494                                                  sizeof(invalid));
495                 if (ecode != TDB_SUCCESS) {
496                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
497                                    "tdb_transaction_cancel: failed to remove"
498                                    " recovery magic");
499                 }
500         }
501
502         if (tdb->file->allrecord_lock.count)
503                 tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
504
505         /* restore the normal io methods */
506         tdb->tdb2.io = tdb->tdb2.transaction->io_methods;
507
508         tdb_transaction_unlock(tdb, F_WRLCK);
509
510         if (tdb_has_open_lock(tdb))
511                 tdb_unlock_open(tdb, F_WRLCK);
512
513         SAFE_FREE(tdb->tdb2.transaction);
514 }
515
516 /*
517   start a tdb transaction. No token is returned, as only a single
518   transaction is allowed to be pending per tdb_context
519 */
520 enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
521 {
522         enum TDB_ERROR ecode;
523
524         if (tdb->flags & TDB_VERSION1) {
525                 if (tdb1_transaction_start(tdb) == -1)
526                         return tdb->last_error;
527                 return TDB_SUCCESS;
528         }
529
530         tdb->stats.transactions++;
531         /* some sanity checks */
532         if (tdb->flags & TDB_INTERNAL) {
533                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
534                                                     TDB_LOG_USE_ERROR,
535                                                     "tdb_transaction_start:"
536                                                     " cannot start a"
537                                                     " transaction on an"
538                                                     " internal tdb");
539         }
540
541         if (tdb->flags & TDB_RDONLY) {
542                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_RDONLY,
543                                                     TDB_LOG_USE_ERROR,
544                                                     "tdb_transaction_start:"
545                                                     " cannot start a"
546                                                     " transaction on a "
547                                                     " read-only tdb");
548         }
549
550         /* cope with nested tdb_transaction_start() calls */
551         if (tdb->tdb2.transaction != NULL) {
552                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
553                         return tdb->last_error
554                                 = tdb_logerr(tdb, TDB_ERR_IO,
555                                              TDB_LOG_USE_ERROR,
556                                              "tdb_transaction_start:"
557                                              " already inside transaction");
558                 }
559                 tdb->tdb2.transaction->nesting++;
560                 tdb->stats.transaction_nest++;
561                 return 0;
562         }
563
564         if (tdb_has_hash_locks(tdb)) {
565                 /* the caller must not have any locks when starting a
566                    transaction as otherwise we'll be screwed by lack
567                    of nested locks in POSIX */
568                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK,
569                                                     TDB_LOG_USE_ERROR,
570                                                     "tdb_transaction_start:"
571                                                     " cannot start a"
572                                                     " transaction with locks"
573                                                     " held");
574         }
575
576         tdb->tdb2.transaction = (struct tdb_transaction *)
577                 calloc(sizeof(struct tdb_transaction), 1);
578         if (tdb->tdb2.transaction == NULL) {
579                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM,
580                                                     TDB_LOG_ERROR,
581                                                     "tdb_transaction_start:"
582                                                     " cannot allocate");
583         }
584
585         /* get the transaction write lock. This is a blocking lock. As
586            discussed with Volker, there are a number of ways we could
587            make this async, which we will probably do in the future */
588         ecode = tdb_transaction_lock(tdb, F_WRLCK);
589         if (ecode != TDB_SUCCESS) {
590                 SAFE_FREE(tdb->tdb2.transaction->blocks);
591                 SAFE_FREE(tdb->tdb2.transaction);
592                 return tdb->last_error = ecode;
593         }
594
595         /* get a read lock over entire file. This is upgraded to a write
596            lock during the commit */
597         ecode = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true);
598         if (ecode != TDB_SUCCESS) {
599                 goto fail_allrecord_lock;
600         }
601
602         /* make sure we know about any file expansions already done by
603            anyone else */
604         tdb->tdb2.io->oob(tdb, tdb->file->map_size + 1, true);
605         tdb->tdb2.transaction->old_map_size = tdb->file->map_size;
606
607         /* finally hook the io methods, replacing them with
608            transaction specific methods */
609         tdb->tdb2.transaction->io_methods = tdb->tdb2.io;
610         tdb->tdb2.io = &transaction_methods;
611         return tdb->last_error = TDB_SUCCESS;
612
613 fail_allrecord_lock:
614         tdb_transaction_unlock(tdb, F_WRLCK);
615         SAFE_FREE(tdb->tdb2.transaction->blocks);
616         SAFE_FREE(tdb->tdb2.transaction);
617         return tdb->last_error = ecode;
618 }
619
620
621 /*
622   cancel the current transaction
623 */
624 void tdb_transaction_cancel(struct tdb_context *tdb)
625 {
626         if (tdb->flags & TDB_VERSION1) {
627                 tdb1_transaction_cancel(tdb);
628                 return;
629         }
630         tdb->stats.transaction_cancel++;
631         _tdb_transaction_cancel(tdb);
632 }
633
634 /*
635   work out how much space the linearised recovery data will consume (worst case)
636 */
637 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
638 {
639         tdb_len_t recovery_size = 0;
640         int i;
641
642         recovery_size = 0;
643         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
644                 if (i * PAGESIZE >= tdb->tdb2.transaction->old_map_size) {
645                         break;
646                 }
647                 if (tdb->tdb2.transaction->blocks[i] == NULL) {
648                         continue;
649                 }
650                 recovery_size += 2*sizeof(tdb_off_t);
651                 if (i == tdb->tdb2.transaction->num_blocks-1) {
652                         recovery_size += tdb->tdb2.transaction->last_block_size;
653                 } else {
654                         recovery_size += PAGESIZE;
655                 }
656         }
657
658         return recovery_size;
659 }
660
661 static enum TDB_ERROR tdb_recovery_area(struct tdb_context *tdb,
662                                         const struct tdb_methods *methods,
663                                         tdb_off_t *recovery_offset,
664                                         struct tdb_recovery_record *rec)
665 {
666         enum TDB_ERROR ecode;
667
668         *recovery_offset = tdb_read_off(tdb,
669                                         offsetof(struct tdb_header, recovery));
670         if (TDB_OFF_IS_ERR(*recovery_offset)) {
671                 return TDB_OFF_TO_ERR(*recovery_offset);
672         }
673
674         if (*recovery_offset == 0) {
675                 rec->max_len = 0;
676                 return TDB_SUCCESS;
677         }
678
679         ecode = methods->tread(tdb, *recovery_offset, rec, sizeof(*rec));
680         if (ecode != TDB_SUCCESS)
681                 return ecode;
682
683         tdb_convert(tdb, rec, sizeof(*rec));
684         /* ignore invalid recovery regions: can happen in crash */
685         if (rec->magic != TDB_RECOVERY_MAGIC &&
686             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
687                 *recovery_offset = 0;
688                 rec->max_len = 0;
689         }
690         return TDB_SUCCESS;
691 }
692
693 static unsigned int same(const unsigned char *new,
694                          const unsigned char *old,
695                          unsigned int length)
696 {
697         unsigned int i;
698
699         for (i = 0; i < length; i++) {
700                 if (new[i] != old[i])
701                         break;
702         }
703         return i;
704 }
705
706 static unsigned int different(const unsigned char *new,
707                               const unsigned char *old,
708                               unsigned int length,
709                               unsigned int min_same,
710                               unsigned int *samelen)
711 {
712         unsigned int i;
713
714         *samelen = 0;
715         for (i = 0; i < length; i++) {
716                 if (new[i] == old[i]) {
717                         (*samelen)++;
718                 } else {
719                         if (*samelen >= min_same) {
720                                 return i - *samelen;
721                         }
722                         *samelen = 0;
723                 }
724         }
725
726         if (*samelen < min_same)
727                 *samelen = 0;
728         return length - *samelen;
729 }
730
731 /* Allocates recovery blob, without tdb_recovery_record at head set up. */
732 static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
733                                                   tdb_len_t *len)
734 {
735         struct tdb_recovery_record *rec;
736         size_t i;
737         enum TDB_ERROR ecode;
738         unsigned char *p;
739         const struct tdb_methods *old_methods = tdb->tdb2.io;
740
741         rec = malloc(sizeof(*rec) + tdb_recovery_size(tdb));
742         if (!rec) {
743                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
744                            "transaction_setup_recovery:"
745                            " cannot allocate");
746                 return TDB_ERR_PTR(TDB_ERR_OOM);
747         }
748
749         /* We temporarily revert to the old I/O methods, so we can use
750          * tdb_access_read */
751         tdb->tdb2.io = tdb->tdb2.transaction->io_methods;
752
753         /* build the recovery data into a single blob to allow us to do a single
754            large write, which should be more efficient */
755         p = (unsigned char *)(rec + 1);
756         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
757                 tdb_off_t offset;
758                 tdb_len_t length;
759                 unsigned int off;
760                 const unsigned char *buffer;
761
762                 if (tdb->tdb2.transaction->blocks[i] == NULL) {
763                         continue;
764                 }
765
766                 offset = i * PAGESIZE;
767                 length = PAGESIZE;
768                 if (i == tdb->tdb2.transaction->num_blocks-1) {
769                         length = tdb->tdb2.transaction->last_block_size;
770                 }
771
772                 if (offset >= tdb->tdb2.transaction->old_map_size) {
773                         continue;
774                 }
775
776                 if (offset + length > tdb->file->map_size) {
777                         ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
778                                            "tdb_transaction_setup_recovery:"
779                                            " transaction data over new region"
780                                            " boundary");
781                         goto fail;
782                 }
783                 if (offset + length > tdb->tdb2.transaction->old_map_size) {
784                         /* Short read at EOF. */
785                         length = tdb->tdb2.transaction->old_map_size - offset;
786                 }
787                 buffer = tdb_access_read(tdb, offset, length, false);
788                 if (TDB_PTR_IS_ERR(buffer)) {
789                         ecode = TDB_PTR_ERR(buffer);
790                         goto fail;
791                 }
792
793                 /* Skip over anything the same at the start. */
794                 off = same(tdb->tdb2.transaction->blocks[i], buffer, length);
795                 offset += off;
796
797                 while (off < length) {
798                         tdb_len_t len;
799                         unsigned int samelen;
800
801                         len = different(tdb->tdb2.transaction->blocks[i] + off,
802                                         buffer + off, length - off,
803                                         sizeof(offset) + sizeof(len) + 1,
804                                         &samelen);
805
806                         memcpy(p, &offset, sizeof(offset));
807                         memcpy(p + sizeof(offset), &len, sizeof(len));
808                         tdb_convert(tdb, p, sizeof(offset) + sizeof(len));
809                         p += sizeof(offset) + sizeof(len);
810                         memcpy(p, buffer + off, len);
811                         p += len;
812                         off += len + samelen;
813                         offset += len + samelen;
814                 }
815                 tdb_access_release(tdb, buffer);
816         }
817
818         *len = p - (unsigned char *)(rec + 1);
819         tdb->tdb2.io = old_methods;
820         return rec;
821
822 fail:
823         free(rec);
824         tdb->tdb2.io = old_methods;
825         return TDB_ERR_PTR(ecode);
826 }
827
828 static tdb_off_t create_recovery_area(struct tdb_context *tdb,
829                                       tdb_len_t rec_length,
830                                       struct tdb_recovery_record *rec)
831 {
832         tdb_off_t off, recovery_off;
833         tdb_len_t addition;
834         enum TDB_ERROR ecode;
835         const struct tdb_methods *methods = tdb->tdb2.transaction->io_methods;
836
837         /* round up to a multiple of page size. Overallocate, since each
838          * such allocation forces us to expand the file. */
839         rec->max_len
840                 = (((sizeof(*rec) + rec_length + rec_length / 2)
841                     + PAGESIZE-1) & ~(PAGESIZE-1))
842                 - sizeof(*rec);
843         off = tdb->file->map_size;
844
845         /* Restore ->map_size before calling underlying expand_file.
846            Also so that we don't try to expand the file again in the
847            transaction commit, which would destroy the recovery
848            area */
849         addition = (tdb->file->map_size - tdb->tdb2.transaction->old_map_size) +
850                 sizeof(*rec) + rec->max_len;
851         tdb->file->map_size = tdb->tdb2.transaction->old_map_size;
852         tdb->stats.transaction_expand_file++;
853         ecode = methods->expand_file(tdb, addition);
854         if (ecode != TDB_SUCCESS) {
855                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
856                            "tdb_recovery_allocate:"
857                            " failed to create recovery area");
858                 return TDB_ERR_TO_OFF(ecode);
859         }
860
861         /* we have to reset the old map size so that we don't try to
862            expand the file again in the transaction commit, which
863            would destroy the recovery area */
864         tdb->tdb2.transaction->old_map_size = tdb->file->map_size;
865
866         /* write the recovery header offset and sync - we can sync without a race here
867            as the magic ptr in the recovery record has not been set */
868         recovery_off = off;
869         tdb_convert(tdb, &recovery_off, sizeof(recovery_off));
870         ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery),
871                                 &recovery_off, sizeof(tdb_off_t));
872         if (ecode != TDB_SUCCESS) {
873                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
874                            "tdb_recovery_allocate:"
875                            " failed to write recovery head");
876                 return TDB_ERR_TO_OFF(ecode);
877         }
878         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
879                                    &recovery_off,
880                                    sizeof(tdb_off_t));
881         return off;
882 }
883
884 /*
885   setup the recovery data that will be used on a crash during commit
886 */
887 static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb)
888 {
889         tdb_len_t recovery_size = 0;
890         tdb_off_t recovery_off = 0;
891         tdb_off_t old_map_size = tdb->tdb2.transaction->old_map_size;
892         struct tdb_recovery_record *recovery;
893         const struct tdb_methods *methods = tdb->tdb2.transaction->io_methods;
894         uint64_t magic;
895         enum TDB_ERROR ecode;
896
897         recovery = alloc_recovery(tdb, &recovery_size);
898         if (TDB_PTR_IS_ERR(recovery))
899                 return TDB_PTR_ERR(recovery);
900
901         ecode = tdb_recovery_area(tdb, methods, &recovery_off, recovery);
902         if (ecode) {
903                 free(recovery);
904                 return ecode;
905         }
906
907         if (recovery->max_len < recovery_size) {
908                 /* Not large enough. Free up old recovery area. */
909                 if (recovery_off) {
910                         tdb->stats.frees++;
911                         ecode = add_free_record(tdb, recovery_off,
912                                                 sizeof(*recovery)
913                                                 + recovery->max_len,
914                                                 TDB_LOCK_WAIT, true);
915                         free(recovery);
916                         if (ecode != TDB_SUCCESS) {
917                                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
918                                                   "tdb_recovery_allocate:"
919                                                   " failed to free previous"
920                                                   " recovery area");
921                         }
922
923                         /* Refresh recovery after add_free_record above. */
924                         recovery = alloc_recovery(tdb, &recovery_size);
925                         if (TDB_PTR_IS_ERR(recovery))
926                                 return TDB_PTR_ERR(recovery);
927                 }
928
929                 recovery_off = create_recovery_area(tdb, recovery_size,
930                                                     recovery);
931                 if (TDB_OFF_IS_ERR(recovery_off)) {
932                         free(recovery);
933                         return TDB_OFF_TO_ERR(recovery_off);
934                 }
935         }
936
937         /* Now we know size, convert rec header. */
938         recovery->magic = TDB_RECOVERY_INVALID_MAGIC;
939         recovery->len = recovery_size;
940         recovery->eof = old_map_size;
941         tdb_convert(tdb, recovery, sizeof(*recovery));
942
943         /* write the recovery data to the recovery area */
944         ecode = methods->twrite(tdb, recovery_off, recovery, recovery_size);
945         if (ecode != TDB_SUCCESS) {
946                 free(recovery);
947                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
948                                   "tdb_transaction_setup_recovery:"
949                                   " failed to write recovery data");
950         }
951         transaction_write_existing(tdb, recovery_off, recovery, recovery_size);
952
953         free(recovery);
954
955         /* as we don't have ordered writes, we have to sync the recovery
956            data before we update the magic to indicate that the recovery
957            data is present */
958         ecode = transaction_sync(tdb, recovery_off, recovery_size);
959         if (ecode != TDB_SUCCESS)
960                 return ecode;
961
962         magic = TDB_RECOVERY_MAGIC;
963         tdb_convert(tdb, &magic, sizeof(magic));
964
965         tdb->tdb2.transaction->magic_offset
966                 = recovery_off + offsetof(struct tdb_recovery_record, magic);
967
968         ecode = methods->twrite(tdb, tdb->tdb2.transaction->magic_offset,
969                                 &magic, sizeof(magic));
970         if (ecode != TDB_SUCCESS) {
971                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
972                                   "tdb_transaction_setup_recovery:"
973                                   " failed to write recovery magic");
974         }
975         transaction_write_existing(tdb, tdb->tdb2.transaction->magic_offset,
976                                    &magic, sizeof(magic));
977
978         /* ensure the recovery magic marker is on disk */
979         return transaction_sync(tdb, tdb->tdb2.transaction->magic_offset,
980                                 sizeof(magic));
981 }
982
983 static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
984 {
985         const struct tdb_methods *methods;
986         enum TDB_ERROR ecode;
987
988         if (tdb->tdb2.transaction == NULL) {
989                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
990                                   "tdb_transaction_prepare_commit:"
991                                   " no transaction");
992         }
993
994         if (tdb->tdb2.transaction->prepared) {
995                 _tdb_transaction_cancel(tdb);
996                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
997                                   "tdb_transaction_prepare_commit:"
998                                   " transaction already prepared");
999         }
1000
1001         if (tdb->tdb2.transaction->transaction_error) {
1002                 _tdb_transaction_cancel(tdb);
1003                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
1004                                   "tdb_transaction_prepare_commit:"
1005                                   " transaction error pending");
1006         }
1007
1008
1009         if (tdb->tdb2.transaction->nesting != 0) {
1010                 return TDB_SUCCESS;
1011         }
1012
1013         /* check for a null transaction */
1014         if (tdb->tdb2.transaction->blocks == NULL) {
1015                 return TDB_SUCCESS;
1016         }
1017
1018         methods = tdb->tdb2.transaction->io_methods;
1019
1020         /* upgrade the main transaction lock region to a write lock */
1021         ecode = tdb_allrecord_upgrade(tdb, TDB_HASH_LOCK_START);
1022         if (ecode != TDB_SUCCESS) {
1023                 return ecode;
1024         }
1025
1026         /* get the open lock - this prevents new users attaching to the database
1027            during the commit */
1028         ecode = tdb_lock_open(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
1029         if (ecode != TDB_SUCCESS) {
1030                 return ecode;
1031         }
1032
1033         /* Since we have whole db locked, we don't need the expansion lock. */
1034         if (!(tdb->flags & TDB_NOSYNC)) {
1035                 /* Sets up tdb->tdb2.transaction->recovery and
1036                  * tdb->tdb2.transaction->magic_offset. */
1037                 ecode = transaction_setup_recovery(tdb);
1038                 if (ecode != TDB_SUCCESS) {
1039                         return ecode;
1040                 }
1041         }
1042
1043         tdb->tdb2.transaction->prepared = true;
1044
1045         /* expand the file to the new size if needed */
1046         if (tdb->file->map_size != tdb->tdb2.transaction->old_map_size) {
1047                 tdb_len_t add;
1048
1049                 add = tdb->file->map_size - tdb->tdb2.transaction->old_map_size;
1050                 /* Restore original map size for tdb_expand_file */
1051                 tdb->file->map_size = tdb->tdb2.transaction->old_map_size;
1052                 ecode = methods->expand_file(tdb, add);
1053                 if (ecode != TDB_SUCCESS) {
1054                         return ecode;
1055                 }
1056         }
1057
1058         /* Keep the open lock until the actual commit */
1059         return TDB_SUCCESS;
1060 }
1061
1062 /*
1063    prepare to commit the current transaction
1064 */
1065 enum TDB_ERROR tdb_transaction_prepare_commit(struct tdb_context *tdb)
1066 {
1067         if (tdb->flags & TDB_VERSION1) {
1068                 if (tdb1_transaction_prepare_commit(tdb) == -1)
1069                         return tdb->last_error;
1070                 return TDB_SUCCESS;
1071         }
1072         return tdb->last_error = _tdb_transaction_prepare_commit(tdb);
1073 }
1074
1075 /*
1076   commit the current transaction
1077 */
1078 enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb)
1079 {
1080         const struct tdb_methods *methods;
1081         int i;
1082         enum TDB_ERROR ecode;
1083
1084         if (tdb->flags & TDB_VERSION1) {
1085                 if (tdb1_transaction_commit(tdb) == -1)
1086                         return tdb->last_error;
1087                 return TDB_SUCCESS;
1088         }
1089
1090         if (tdb->tdb2.transaction == NULL) {
1091                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
1092                                                     TDB_LOG_USE_ERROR,
1093                                                     "tdb_transaction_commit:"
1094                                                     " no transaction");
1095         }
1096
1097         tdb_trace(tdb, "tdb_transaction_commit");
1098
1099         if (tdb->tdb2.transaction->nesting != 0) {
1100                 tdb->tdb2.transaction->nesting--;
1101                 return tdb->last_error = TDB_SUCCESS;
1102         }
1103
1104         /* check for a null transaction */
1105         if (tdb->tdb2.transaction->blocks == NULL) {
1106                 _tdb_transaction_cancel(tdb);
1107                 return tdb->last_error = TDB_SUCCESS;
1108         }
1109
1110         if (!tdb->tdb2.transaction->prepared) {
1111                 ecode = _tdb_transaction_prepare_commit(tdb);
1112                 if (ecode != TDB_SUCCESS) {
1113                         _tdb_transaction_cancel(tdb);
1114                         return tdb->last_error = ecode;
1115                 }
1116         }
1117
1118         methods = tdb->tdb2.transaction->io_methods;
1119
1120         /* perform all the writes */
1121         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
1122                 tdb_off_t offset;
1123                 tdb_len_t length;
1124
1125                 if (tdb->tdb2.transaction->blocks[i] == NULL) {
1126                         continue;
1127                 }
1128
1129                 offset = i * PAGESIZE;
1130                 length = PAGESIZE;
1131                 if (i == tdb->tdb2.transaction->num_blocks-1) {
1132                         length = tdb->tdb2.transaction->last_block_size;
1133                 }
1134
1135                 ecode = methods->twrite(tdb, offset,
1136                                         tdb->tdb2.transaction->blocks[i], length);
1137                 if (ecode != TDB_SUCCESS) {
1138                         /* we've overwritten part of the data and
1139                            possibly expanded the file, so we need to
1140                            run the crash recovery code */
1141                         tdb->tdb2.io = methods;
1142                         tdb_transaction_recover(tdb);
1143
1144                         _tdb_transaction_cancel(tdb);
1145
1146                         return tdb->last_error = ecode;
1147                 }
1148                 SAFE_FREE(tdb->tdb2.transaction->blocks[i]);
1149         }
1150
1151         SAFE_FREE(tdb->tdb2.transaction->blocks);
1152         tdb->tdb2.transaction->num_blocks = 0;
1153
1154         /* ensure the new data is on disk */
1155         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1156         if (ecode != TDB_SUCCESS) {
1157                 return tdb->last_error = ecode;
1158         }
1159
1160         /*
1161           TODO: maybe write to some dummy hdr field, or write to magic
1162           offset without mmap, before the last sync, instead of the
1163           utime() call
1164         */
1165
1166         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1167            don't change the mtime of the file, this means the file may
1168            not be backed up (as tdb rounding to block sizes means that
1169            file size changes are quite rare too). The following forces
1170            mtime changes when a transaction completes */
1171 #if HAVE_UTIME
1172         utime(tdb->name, NULL);
1173 #endif
1174
1175         /* use a transaction cancel to free memory and remove the
1176            transaction locks: it "restores" map_size, too. */
1177         tdb->tdb2.transaction->old_map_size = tdb->file->map_size;
1178         _tdb_transaction_cancel(tdb);
1179
1180         return tdb->last_error = TDB_SUCCESS;
1181 }
1182
1183
1184 /*
1185   recover from an aborted transaction. Must be called with exclusive
1186   database write access already established (including the open
1187   lock to prevent new processes attaching)
1188 */
1189 enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb)
1190 {
1191         tdb_off_t recovery_head, recovery_eof;
1192         unsigned char *data, *p;
1193         struct tdb_recovery_record rec;
1194         enum TDB_ERROR ecode;
1195
1196         /* find the recovery area */
1197         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1198         if (TDB_OFF_IS_ERR(recovery_head)) {
1199                 ecode = TDB_OFF_TO_ERR(recovery_head);
1200                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1201                                   "tdb_transaction_recover:"
1202                                   " failed to read recovery head");
1203         }
1204
1205         if (recovery_head == 0) {
1206                 /* we have never allocated a recovery record */
1207                 return TDB_SUCCESS;
1208         }
1209
1210         /* read the recovery record */
1211         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1212         if (ecode != TDB_SUCCESS) {
1213                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1214                                   "tdb_transaction_recover:"
1215                                   " failed to read recovery record");
1216         }
1217
1218         if (rec.magic != TDB_RECOVERY_MAGIC) {
1219                 /* there is no valid recovery data */
1220                 return TDB_SUCCESS;
1221         }
1222
1223         if (tdb->flags & TDB_RDONLY) {
1224                 return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1225                                   "tdb_transaction_recover:"
1226                                   " attempt to recover read only database");
1227         }
1228
1229         recovery_eof = rec.eof;
1230
1231         data = (unsigned char *)malloc(rec.len);
1232         if (data == NULL) {
1233                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1234                                   "tdb_transaction_recover:"
1235                                   " failed to allocate recovery data");
1236         }
1237
1238         /* read the full recovery data */
1239         ecode = tdb->tdb2.io->tread(tdb, recovery_head + sizeof(rec), data,
1240                                     rec.len);
1241         if (ecode != TDB_SUCCESS) {
1242                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1243                                   "tdb_transaction_recover:"
1244                                   " failed to read recovery data");
1245         }
1246
1247         /* recover the file data */
1248         p = data;
1249         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1250                 tdb_off_t ofs;
1251                 tdb_len_t len;
1252                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1253                 memcpy(&ofs, p, sizeof(ofs));
1254                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1255                 p += sizeof(ofs) + sizeof(len);
1256
1257                 ecode = tdb->tdb2.io->twrite(tdb, ofs, p, len);
1258                 if (ecode != TDB_SUCCESS) {
1259                         free(data);
1260                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1261                                           "tdb_transaction_recover:"
1262                                           " failed to recover %zu bytes"
1263                                           " at offset %zu",
1264                                           (size_t)len, (size_t)ofs);
1265                 }
1266                 p += len;
1267         }
1268
1269         free(data);
1270
1271         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1272         if (ecode != TDB_SUCCESS) {
1273                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1274                                   "tdb_transaction_recover:"
1275                                   " failed to sync recovery");
1276         }
1277
1278         /* if the recovery area is after the recovered eof then remove it */
1279         if (recovery_eof <= recovery_head) {
1280                 ecode = tdb_write_off(tdb, offsetof(struct tdb_header,
1281                                                     recovery),
1282                                       0);
1283                 if (ecode != TDB_SUCCESS) {
1284                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1285                                           "tdb_transaction_recover:"
1286                                           " failed to remove recovery head");
1287                 }
1288         }
1289
1290         /* remove the recovery magic */
1291         ecode = tdb_write_off(tdb,
1292                               recovery_head
1293                               + offsetof(struct tdb_recovery_record, magic),
1294                               TDB_RECOVERY_INVALID_MAGIC);
1295         if (ecode != TDB_SUCCESS) {
1296                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1297                                   "tdb_transaction_recover:"
1298                                   " failed to remove recovery magic");
1299         }
1300
1301         ecode = transaction_sync(tdb, 0, recovery_eof);
1302         if (ecode != TDB_SUCCESS) {
1303                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1304                                   "tdb_transaction_recover:"
1305                                   " failed to sync2 recovery");
1306         }
1307
1308         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1309                    "tdb_transaction_recover: recovered %zu byte database",
1310                    (size_t)recovery_eof);
1311
1312         /* all done */
1313         return TDB_SUCCESS;
1314 }
1315
1316 tdb_bool_err tdb_needs_recovery(struct tdb_context *tdb)
1317 {
1318         tdb_off_t recovery_head;
1319         struct tdb_recovery_record rec;
1320         enum TDB_ERROR ecode;
1321
1322         /* find the recovery area */
1323         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1324         if (TDB_OFF_IS_ERR(recovery_head)) {
1325                 return recovery_head;
1326         }
1327
1328         if (recovery_head == 0) {
1329                 /* we have never allocated a recovery record */
1330                 return false;
1331         }
1332
1333         /* read the recovery record */
1334         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1335         if (ecode != TDB_SUCCESS) {
1336                 return TDB_ERR_TO_OFF(ecode);
1337         }
1338
1339         return (rec.magic == TDB_RECOVERY_MAGIC);
1340 }