]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
htable: fix bug where first entry has hash of 0 or 1.
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free((void *)x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in POSIX locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is canceled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or canceled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* the original io methods - used to do IOs to the real db */
96         const struct tdb_methods *io_methods;
97
98         /* the list of transaction blocks. When a block is first
99            written to, it gets created in this list */
100         uint8_t **blocks;
101         size_t num_blocks;
102         size_t last_block_size; /* number of valid bytes in the last block */
103
104         /* non-zero when an internal transaction error has
105            occurred. All write operations will then fail until the
106            transaction is ended */
107         int transaction_error;
108
109         /* when inside a transaction we need to keep track of any
110            nested tdb_transaction_start() calls, as these are allowed,
111            but don't create a new transaction */
112         unsigned int nesting;
113
114         /* set when a prepare has already occurred */
115         bool prepared;
116         tdb_off_t magic_offset;
117
118         /* old file size before transaction */
119         tdb_len_t old_map_size;
120 };
121
122 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
123 #define PAGESIZE 65536
124
125 /*
126   read while in a transaction. We need to check first if the data is in our list
127   of transaction elements, then if not do a real read
128 */
129 static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off,
130                                        void *buf, tdb_len_t len)
131 {
132         size_t blk;
133         enum TDB_ERROR ecode;
134
135         /* break it down into block sized ops */
136         while (len + (off % PAGESIZE) > PAGESIZE) {
137                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
138                 ecode = transaction_read(tdb, off, buf, len2);
139                 if (ecode != TDB_SUCCESS) {
140                         return ecode;
141                 }
142                 len -= len2;
143                 off += len2;
144                 buf = (void *)(len2 + (char *)buf);
145         }
146
147         if (len == 0) {
148                 return TDB_SUCCESS;
149         }
150
151         blk = off / PAGESIZE;
152
153         /* see if we have it in the block list */
154         if (tdb->tdb2.transaction->num_blocks <= blk ||
155             tdb->tdb2.transaction->blocks[blk] == NULL) {
156                 /* nope, do a real read */
157                 ecode = tdb->tdb2.transaction->io_methods->tread(tdb, off, buf, len);
158                 if (ecode != TDB_SUCCESS) {
159                         goto fail;
160                 }
161                 return 0;
162         }
163
164         /* it is in the block list. Now check for the last block */
165         if (blk == tdb->tdb2.transaction->num_blocks-1) {
166                 if (len > tdb->tdb2.transaction->last_block_size) {
167                         ecode = TDB_ERR_IO;
168                         goto fail;
169                 }
170         }
171
172         /* now copy it out of this block */
173         memcpy(buf, tdb->tdb2.transaction->blocks[blk] + (off % PAGESIZE), len);
174         return TDB_SUCCESS;
175
176 fail:
177         tdb->tdb2.transaction->transaction_error = 1;
178         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
179                           "transaction_read: failed at off=%zu len=%zu",
180                           (size_t)off, (size_t)len);
181 }
182
183
184 /*
185   write while in a transaction
186 */
187 static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off,
188                                         const void *buf, tdb_len_t len)
189 {
190         size_t blk;
191         enum TDB_ERROR ecode;
192
193         /* Only a commit is allowed on a prepared transaction */
194         if (tdb->tdb2.transaction->prepared) {
195                 ecode = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
196                                    "transaction_write: transaction already"
197                                    " prepared, write not allowed");
198                 goto fail;
199         }
200
201         /* break it up into block sized chunks */
202         while (len + (off % PAGESIZE) > PAGESIZE) {
203                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
204                 ecode = transaction_write(tdb, off, buf, len2);
205                 if (ecode != TDB_SUCCESS) {
206                         return ecode;
207                 }
208                 len -= len2;
209                 off += len2;
210                 if (buf != NULL) {
211                         buf = (const void *)(len2 + (const char *)buf);
212                 }
213         }
214
215         if (len == 0) {
216                 return TDB_SUCCESS;
217         }
218
219         blk = off / PAGESIZE;
220         off = off % PAGESIZE;
221
222         if (tdb->tdb2.transaction->num_blocks <= blk) {
223                 uint8_t **new_blocks;
224                 /* expand the blocks array */
225                 if (tdb->tdb2.transaction->blocks == NULL) {
226                         new_blocks = (uint8_t **)malloc(
227                                 (blk+1)*sizeof(uint8_t *));
228                 } else {
229                         new_blocks = (uint8_t **)realloc(
230                                 tdb->tdb2.transaction->blocks,
231                                 (blk+1)*sizeof(uint8_t *));
232                 }
233                 if (new_blocks == NULL) {
234                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
235                                            "transaction_write:"
236                                            " failed to allocate");
237                         goto fail;
238                 }
239                 memset(&new_blocks[tdb->tdb2.transaction->num_blocks], 0,
240                        (1+(blk - tdb->tdb2.transaction->num_blocks))*sizeof(uint8_t *));
241                 tdb->tdb2.transaction->blocks = new_blocks;
242                 tdb->tdb2.transaction->num_blocks = blk+1;
243                 tdb->tdb2.transaction->last_block_size = 0;
244         }
245
246         /* allocate and fill a block? */
247         if (tdb->tdb2.transaction->blocks[blk] == NULL) {
248                 tdb->tdb2.transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1);
249                 if (tdb->tdb2.transaction->blocks[blk] == NULL) {
250                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
251                                            "transaction_write:"
252                                            " failed to allocate");
253                         goto fail;
254                 }
255                 if (tdb->tdb2.transaction->old_map_size > blk * PAGESIZE) {
256                         tdb_len_t len2 = PAGESIZE;
257                         if (len2 + (blk * PAGESIZE) > tdb->tdb2.transaction->old_map_size) {
258                                 len2 = tdb->tdb2.transaction->old_map_size - (blk * PAGESIZE);
259                         }
260                         ecode = tdb->tdb2.transaction->io_methods->tread(tdb,
261                                         blk * PAGESIZE,
262                                         tdb->tdb2.transaction->blocks[blk],
263                                         len2);
264                         if (ecode != TDB_SUCCESS) {
265                                 ecode = tdb_logerr(tdb, ecode,
266                                                    TDB_LOG_ERROR,
267                                                    "transaction_write:"
268                                                    " failed to"
269                                                    " read old block: %s",
270                                                    strerror(errno));
271                                 SAFE_FREE(tdb->tdb2.transaction->blocks[blk]);
272                                 goto fail;
273                         }
274                         if (blk == tdb->tdb2.transaction->num_blocks-1) {
275                                 tdb->tdb2.transaction->last_block_size = len2;
276                         }
277                 }
278         }
279
280         /* overwrite part of an existing block */
281         if (buf == NULL) {
282                 memset(tdb->tdb2.transaction->blocks[blk] + off, 0, len);
283         } else {
284                 memcpy(tdb->tdb2.transaction->blocks[blk] + off, buf, len);
285         }
286         if (blk == tdb->tdb2.transaction->num_blocks-1) {
287                 if (len + off > tdb->tdb2.transaction->last_block_size) {
288                         tdb->tdb2.transaction->last_block_size = len + off;
289                 }
290         }
291
292         return TDB_SUCCESS;
293
294 fail:
295         tdb->tdb2.transaction->transaction_error = 1;
296         return ecode;
297 }
298
299
300 /*
301   write while in a transaction - this variant never expands the transaction blocks, it only
302   updates existing blocks. This means it cannot change the recovery size
303 */
304 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
305                                        const void *buf, tdb_len_t len)
306 {
307         size_t blk;
308
309         /* break it up into block sized chunks */
310         while (len + (off % PAGESIZE) > PAGESIZE) {
311                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
312                 transaction_write_existing(tdb, off, buf, len2);
313                 len -= len2;
314                 off += len2;
315                 if (buf != NULL) {
316                         buf = (const void *)(len2 + (const char *)buf);
317                 }
318         }
319
320         if (len == 0) {
321                 return;
322         }
323
324         blk = off / PAGESIZE;
325         off = off % PAGESIZE;
326
327         if (tdb->tdb2.transaction->num_blocks <= blk ||
328             tdb->tdb2.transaction->blocks[blk] == NULL) {
329                 return;
330         }
331
332         if (blk == tdb->tdb2.transaction->num_blocks-1 &&
333             off + len > tdb->tdb2.transaction->last_block_size) {
334                 if (off >= tdb->tdb2.transaction->last_block_size) {
335                         return;
336                 }
337                 len = tdb->tdb2.transaction->last_block_size - off;
338         }
339
340         /* overwrite part of an existing block */
341         memcpy(tdb->tdb2.transaction->blocks[blk] + off, buf, len);
342 }
343
344
345 /*
346   out of bounds check during a transaction
347 */
348 static enum TDB_ERROR transaction_oob(struct tdb_context *tdb,
349                                       tdb_off_t off, tdb_len_t len, bool probe)
350 {
351         if ((off + len >= off && off + len <= tdb->file->map_size) || probe) {
352                 return TDB_SUCCESS;
353         }
354
355         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
356                    "tdb_oob len %lld beyond transaction size %lld",
357                    (long long)(off + len),
358                    (long long)tdb->file->map_size);
359         return TDB_ERR_IO;
360 }
361
362 /*
363   transaction version of tdb_expand().
364 */
365 static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb,
366                                               tdb_off_t addition)
367 {
368         enum TDB_ERROR ecode;
369
370         /* add a write to the transaction elements, so subsequent
371            reads see the zero data */
372         ecode = transaction_write(tdb, tdb->file->map_size, NULL, addition);
373         if (ecode == TDB_SUCCESS) {
374                 tdb->file->map_size += addition;
375         }
376         return ecode;
377 }
378
379 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
380                                 size_t len, bool write_mode)
381 {
382         size_t blk = off / PAGESIZE, end_blk;
383
384         /* This is wrong for zero-length blocks, but will fail gracefully */
385         end_blk = (off + len - 1) / PAGESIZE;
386
387         /* Can only do direct if in single block and we've already copied. */
388         if (write_mode) {
389                 tdb->stats.transaction_write_direct++;
390                 if (blk != end_blk
391                     || blk >= tdb->tdb2.transaction->num_blocks
392                     || tdb->tdb2.transaction->blocks[blk] == NULL) {
393                         tdb->stats.transaction_write_direct_fail++;
394                         return NULL;
395                 }
396                 return tdb->tdb2.transaction->blocks[blk] + off % PAGESIZE;
397         }
398
399         tdb->stats.transaction_read_direct++;
400         /* Single which we have copied? */
401         if (blk == end_blk
402             && blk < tdb->tdb2.transaction->num_blocks
403             && tdb->tdb2.transaction->blocks[blk])
404                 return tdb->tdb2.transaction->blocks[blk] + off % PAGESIZE;
405
406         /* Otherwise must be all not copied. */
407         while (blk <= end_blk) {
408                 if (blk >= tdb->tdb2.transaction->num_blocks)
409                         break;
410                 if (tdb->tdb2.transaction->blocks[blk]) {
411                         tdb->stats.transaction_read_direct_fail++;
412                         return NULL;
413                 }
414                 blk++;
415         }
416         return tdb->tdb2.transaction->io_methods->direct(tdb, off, len, false);
417 }
418
419 static const struct tdb_methods transaction_methods = {
420         transaction_read,
421         transaction_write,
422         transaction_oob,
423         transaction_expand_file,
424         transaction_direct,
425 };
426
427 /*
428   sync to disk
429 */
430 static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
431                                        tdb_off_t offset, tdb_len_t length)
432 {
433         if (tdb->flags & TDB_NOSYNC) {
434                 return TDB_SUCCESS;
435         }
436
437         if (fsync(tdb->file->fd) != 0) {
438                 return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
439                                   "tdb_transaction: fsync failed: %s",
440                                   strerror(errno));
441         }
442 #ifdef MS_SYNC
443         if (tdb->file->map_ptr) {
444                 tdb_off_t moffset = offset & ~(getpagesize()-1);
445                 if (msync(moffset + (char *)tdb->file->map_ptr,
446                           length + (offset - moffset), MS_SYNC) != 0) {
447                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
448                                           "tdb_transaction: msync failed: %s",
449                                           strerror(errno));
450                 }
451         }
452 #endif
453         return TDB_SUCCESS;
454 }
455
456
457 static void _tdb_transaction_cancel(struct tdb_context *tdb)
458 {
459         int i;
460         enum TDB_ERROR ecode;
461
462         if (tdb->tdb2.transaction == NULL) {
463                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
464                            "tdb_transaction_cancel: no transaction");
465                 return;
466         }
467
468         if (tdb->tdb2.transaction->nesting != 0) {
469                 tdb->tdb2.transaction->transaction_error = 1;
470                 tdb->tdb2.transaction->nesting--;
471                 return;
472         }
473
474         tdb->file->map_size = tdb->tdb2.transaction->old_map_size;
475
476         /* free all the transaction blocks */
477         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
478                 if (tdb->tdb2.transaction->blocks[i] != NULL) {
479                         free(tdb->tdb2.transaction->blocks[i]);
480                 }
481         }
482         SAFE_FREE(tdb->tdb2.transaction->blocks);
483
484         if (tdb->tdb2.transaction->magic_offset) {
485                 const struct tdb_methods *methods = tdb->tdb2.transaction->io_methods;
486                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
487
488                 /* remove the recovery marker */
489                 ecode = methods->twrite(tdb, tdb->tdb2.transaction->magic_offset,
490                                         &invalid, sizeof(invalid));
491                 if (ecode == TDB_SUCCESS)
492                         ecode = transaction_sync(tdb,
493                                                  tdb->tdb2.transaction->magic_offset,
494                                                  sizeof(invalid));
495                 if (ecode != TDB_SUCCESS) {
496                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
497                                    "tdb_transaction_cancel: failed to remove"
498                                    " recovery magic");
499                 }
500         }
501
502         if (tdb->file->allrecord_lock.count)
503                 tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
504
505         /* restore the normal io methods */
506         tdb->tdb2.io = tdb->tdb2.transaction->io_methods;
507
508         tdb_transaction_unlock(tdb, F_WRLCK);
509
510         if (tdb_has_open_lock(tdb))
511                 tdb_unlock_open(tdb, F_WRLCK);
512
513         SAFE_FREE(tdb->tdb2.transaction);
514 }
515
516 /*
517   start a tdb transaction. No token is returned, as only a single
518   transaction is allowed to be pending per tdb_context
519 */
520 enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
521 {
522         enum TDB_ERROR ecode;
523
524         if (tdb->flags & TDB_VERSION1) {
525                 if (tdb1_transaction_start(tdb) == -1)
526                         return tdb->last_error;
527                 return TDB_SUCCESS;
528         }
529
530         tdb->stats.transactions++;
531         /* some sanity checks */
532         if (tdb->flags & TDB_INTERNAL) {
533                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
534                                                     TDB_LOG_USE_ERROR,
535                                                     "tdb_transaction_start:"
536                                                     " cannot start a"
537                                                     " transaction on an"
538                                                     " internal tdb");
539         }
540
541         if (tdb->flags & TDB_RDONLY) {
542                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_RDONLY,
543                                                     TDB_LOG_USE_ERROR,
544                                                     "tdb_transaction_start:"
545                                                     " cannot start a"
546                                                     " transaction on a "
547                                                     " read-only tdb");
548         }
549
550         /* cope with nested tdb_transaction_start() calls */
551         if (tdb->tdb2.transaction != NULL) {
552                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
553                         return tdb->last_error
554                                 = tdb_logerr(tdb, TDB_ERR_IO,
555                                              TDB_LOG_USE_ERROR,
556                                              "tdb_transaction_start:"
557                                              " already inside transaction");
558                 }
559                 tdb->tdb2.transaction->nesting++;
560                 tdb->stats.transaction_nest++;
561                 return 0;
562         }
563
564         if (tdb_has_hash_locks(tdb)) {
565                 /* the caller must not have any locks when starting a
566                    transaction as otherwise we'll be screwed by lack
567                    of nested locks in POSIX */
568                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK,
569                                                     TDB_LOG_USE_ERROR,
570                                                     "tdb_transaction_start:"
571                                                     " cannot start a"
572                                                     " transaction with locks"
573                                                     " held");
574         }
575
576         tdb->tdb2.transaction = (struct tdb_transaction *)
577                 calloc(sizeof(struct tdb_transaction), 1);
578         if (tdb->tdb2.transaction == NULL) {
579                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM,
580                                                     TDB_LOG_ERROR,
581                                                     "tdb_transaction_start:"
582                                                     " cannot allocate");
583         }
584
585         /* get the transaction write lock. This is a blocking lock. As
586            discussed with Volker, there are a number of ways we could
587            make this async, which we will probably do in the future */
588         ecode = tdb_transaction_lock(tdb, F_WRLCK);
589         if (ecode != TDB_SUCCESS) {
590                 SAFE_FREE(tdb->tdb2.transaction->blocks);
591                 SAFE_FREE(tdb->tdb2.transaction);
592                 return tdb->last_error = ecode;
593         }
594
595         /* get a read lock over entire file. This is upgraded to a write
596            lock during the commit */
597         ecode = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true);
598         if (ecode != TDB_SUCCESS) {
599                 goto fail_allrecord_lock;
600         }
601
602         /* make sure we know about any file expansions already done by
603            anyone else */
604         tdb->tdb2.io->oob(tdb, tdb->file->map_size, 1, true);
605         tdb->tdb2.transaction->old_map_size = tdb->file->map_size;
606
607         /* finally hook the io methods, replacing them with
608            transaction specific methods */
609         tdb->tdb2.transaction->io_methods = tdb->tdb2.io;
610         tdb->tdb2.io = &transaction_methods;
611         return tdb->last_error = TDB_SUCCESS;
612
613 fail_allrecord_lock:
614         tdb_transaction_unlock(tdb, F_WRLCK);
615         SAFE_FREE(tdb->tdb2.transaction->blocks);
616         SAFE_FREE(tdb->tdb2.transaction);
617         return tdb->last_error = ecode;
618 }
619
620
621 /*
622   cancel the current transaction
623 */
624 void tdb_transaction_cancel(struct tdb_context *tdb)
625 {
626         if (tdb->flags & TDB_VERSION1) {
627                 tdb1_transaction_cancel(tdb);
628                 return;
629         }
630         tdb->stats.transaction_cancel++;
631         _tdb_transaction_cancel(tdb);
632 }
633
634 /*
635   work out how much space the linearised recovery data will consume (worst case)
636 */
637 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
638 {
639         tdb_len_t recovery_size = 0;
640         int i;
641
642         recovery_size = 0;
643         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
644                 if (i * PAGESIZE >= tdb->tdb2.transaction->old_map_size) {
645                         break;
646                 }
647                 if (tdb->tdb2.transaction->blocks[i] == NULL) {
648                         continue;
649                 }
650                 recovery_size += 2*sizeof(tdb_off_t);
651                 if (i == tdb->tdb2.transaction->num_blocks-1) {
652                         recovery_size += tdb->tdb2.transaction->last_block_size;
653                 } else {
654                         recovery_size += PAGESIZE;
655                 }
656         }
657
658         return recovery_size;
659 }
660
661 static enum TDB_ERROR tdb_recovery_area(struct tdb_context *tdb,
662                                         const struct tdb_methods *methods,
663                                         tdb_off_t *recovery_offset,
664                                         struct tdb_recovery_record *rec)
665 {
666         enum TDB_ERROR ecode;
667
668         *recovery_offset = tdb_read_off(tdb,
669                                         offsetof(struct tdb_header, recovery));
670         if (TDB_OFF_IS_ERR(*recovery_offset)) {
671                 return TDB_OFF_TO_ERR(*recovery_offset);
672         }
673
674         if (*recovery_offset == 0) {
675                 rec->max_len = 0;
676                 return TDB_SUCCESS;
677         }
678
679         ecode = methods->tread(tdb, *recovery_offset, rec, sizeof(*rec));
680         if (ecode != TDB_SUCCESS)
681                 return ecode;
682
683         tdb_convert(tdb, rec, sizeof(*rec));
684         /* ignore invalid recovery regions: can happen in crash */
685         if (rec->magic != TDB_RECOVERY_MAGIC &&
686             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
687                 *recovery_offset = 0;
688                 rec->max_len = 0;
689         }
690         return TDB_SUCCESS;
691 }
692
693 static unsigned int same(const unsigned char *new,
694                          const unsigned char *old,
695                          unsigned int length)
696 {
697         unsigned int i;
698
699         for (i = 0; i < length; i++) {
700                 if (new[i] != old[i])
701                         break;
702         }
703         return i;
704 }
705
706 static unsigned int different(const unsigned char *new,
707                               const unsigned char *old,
708                               unsigned int length,
709                               unsigned int min_same,
710                               unsigned int *samelen)
711 {
712         unsigned int i;
713
714         *samelen = 0;
715         for (i = 0; i < length; i++) {
716                 if (new[i] == old[i]) {
717                         (*samelen)++;
718                 } else {
719                         if (*samelen >= min_same) {
720                                 return i - *samelen;
721                         }
722                         *samelen = 0;
723                 }
724         }
725
726         if (*samelen < min_same)
727                 *samelen = 0;
728         return length - *samelen;
729 }
730
731 /* Allocates recovery blob, without tdb_recovery_record at head set up. */
732 static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
733                                                   tdb_len_t *len)
734 {
735         struct tdb_recovery_record *rec;
736         size_t i;
737         enum TDB_ERROR ecode;
738         unsigned char *p;
739         const struct tdb_methods *old_methods = tdb->tdb2.io;
740
741         rec = malloc(sizeof(*rec) + tdb_recovery_size(tdb));
742         if (!rec) {
743                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
744                            "transaction_setup_recovery:"
745                            " cannot allocate");
746                 return TDB_ERR_PTR(TDB_ERR_OOM);
747         }
748
749         /* We temporarily revert to the old I/O methods, so we can use
750          * tdb_access_read */
751         tdb->tdb2.io = tdb->tdb2.transaction->io_methods;
752
753         /* build the recovery data into a single blob to allow us to do a single
754            large write, which should be more efficient */
755         p = (unsigned char *)(rec + 1);
756         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
757                 tdb_off_t offset;
758                 tdb_len_t length;
759                 unsigned int off;
760                 const unsigned char *buffer;
761
762                 if (tdb->tdb2.transaction->blocks[i] == NULL) {
763                         continue;
764                 }
765
766                 offset = i * PAGESIZE;
767                 length = PAGESIZE;
768                 if (i == tdb->tdb2.transaction->num_blocks-1) {
769                         length = tdb->tdb2.transaction->last_block_size;
770                 }
771
772                 if (offset >= tdb->tdb2.transaction->old_map_size) {
773                         continue;
774                 }
775
776                 if (offset + length > tdb->file->map_size) {
777                         ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
778                                            "tdb_transaction_setup_recovery:"
779                                            " transaction data over new region"
780                                            " boundary");
781                         goto fail;
782                 }
783                 if (offset + length > tdb->tdb2.transaction->old_map_size) {
784                         /* Short read at EOF. */
785                         length = tdb->tdb2.transaction->old_map_size - offset;
786                 }
787                 buffer = tdb_access_read(tdb, offset, length, false);
788                 if (TDB_PTR_IS_ERR(buffer)) {
789                         ecode = TDB_PTR_ERR(buffer);
790                         goto fail;
791                 }
792
793                 /* Skip over anything the same at the start. */
794                 off = same(tdb->tdb2.transaction->blocks[i], buffer, length);
795                 offset += off;
796
797                 while (off < length) {
798                         tdb_len_t len;
799                         unsigned int samelen;
800
801                         len = different(tdb->tdb2.transaction->blocks[i] + off,
802                                         buffer + off, length - off,
803                                         sizeof(offset) + sizeof(len) + 1,
804                                         &samelen);
805
806                         memcpy(p, &offset, sizeof(offset));
807                         memcpy(p + sizeof(offset), &len, sizeof(len));
808                         tdb_convert(tdb, p, sizeof(offset) + sizeof(len));
809                         p += sizeof(offset) + sizeof(len);
810                         memcpy(p, buffer + off, len);
811                         p += len;
812                         off += len + samelen;
813                         offset += len + samelen;
814                 }
815                 tdb_access_release(tdb, buffer);
816         }
817
818         *len = p - (unsigned char *)(rec + 1);
819         tdb->tdb2.io = old_methods;
820         return rec;
821
822 fail:
823         free(rec);
824         tdb->tdb2.io = old_methods;
825         return TDB_ERR_PTR(ecode);
826 }
827
828 static tdb_off_t create_recovery_area(struct tdb_context *tdb,
829                                       tdb_len_t rec_length,
830                                       struct tdb_recovery_record *rec)
831 {
832         tdb_off_t off, recovery_off;
833         tdb_len_t addition;
834         enum TDB_ERROR ecode;
835         const struct tdb_methods *methods = tdb->tdb2.transaction->io_methods;
836
837         /* round up to a multiple of page size. Overallocate, since each
838          * such allocation forces us to expand the file. */
839         rec->max_len = tdb_expand_adjust(tdb->file->map_size, rec_length);
840
841         /* Round up to a page. */
842         rec->max_len = ((sizeof(*rec) + rec->max_len + PAGESIZE-1)
843                         & ~(PAGESIZE-1))
844                 - sizeof(*rec);
845
846         off = tdb->file->map_size;
847
848         /* Restore ->map_size before calling underlying expand_file.
849            Also so that we don't try to expand the file again in the
850            transaction commit, which would destroy the recovery
851            area */
852         addition = (tdb->file->map_size - tdb->tdb2.transaction->old_map_size) +
853                 sizeof(*rec) + rec->max_len;
854         tdb->file->map_size = tdb->tdb2.transaction->old_map_size;
855         tdb->stats.transaction_expand_file++;
856         ecode = methods->expand_file(tdb, addition);
857         if (ecode != TDB_SUCCESS) {
858                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
859                            "tdb_recovery_allocate:"
860                            " failed to create recovery area");
861                 return TDB_ERR_TO_OFF(ecode);
862         }
863
864         /* we have to reset the old map size so that we don't try to
865            expand the file again in the transaction commit, which
866            would destroy the recovery area */
867         tdb->tdb2.transaction->old_map_size = tdb->file->map_size;
868
869         /* write the recovery header offset and sync - we can sync without a race here
870            as the magic ptr in the recovery record has not been set */
871         recovery_off = off;
872         tdb_convert(tdb, &recovery_off, sizeof(recovery_off));
873         ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery),
874                                 &recovery_off, sizeof(tdb_off_t));
875         if (ecode != TDB_SUCCESS) {
876                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
877                            "tdb_recovery_allocate:"
878                            " failed to write recovery head");
879                 return TDB_ERR_TO_OFF(ecode);
880         }
881         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
882                                    &recovery_off,
883                                    sizeof(tdb_off_t));
884         return off;
885 }
886
887 /*
888   setup the recovery data that will be used on a crash during commit
889 */
890 static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb)
891 {
892         tdb_len_t recovery_size = 0;
893         tdb_off_t recovery_off = 0;
894         tdb_off_t old_map_size = tdb->tdb2.transaction->old_map_size;
895         struct tdb_recovery_record *recovery;
896         const struct tdb_methods *methods = tdb->tdb2.transaction->io_methods;
897         uint64_t magic;
898         enum TDB_ERROR ecode;
899
900         recovery = alloc_recovery(tdb, &recovery_size);
901         if (TDB_PTR_IS_ERR(recovery))
902                 return TDB_PTR_ERR(recovery);
903
904         ecode = tdb_recovery_area(tdb, methods, &recovery_off, recovery);
905         if (ecode) {
906                 free(recovery);
907                 return ecode;
908         }
909
910         if (recovery->max_len < recovery_size) {
911                 /* Not large enough. Free up old recovery area. */
912                 if (recovery_off) {
913                         tdb->stats.frees++;
914                         ecode = add_free_record(tdb, recovery_off,
915                                                 sizeof(*recovery)
916                                                 + recovery->max_len,
917                                                 TDB_LOCK_WAIT, true);
918                         free(recovery);
919                         if (ecode != TDB_SUCCESS) {
920                                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
921                                                   "tdb_recovery_allocate:"
922                                                   " failed to free previous"
923                                                   " recovery area");
924                         }
925
926                         /* Refresh recovery after add_free_record above. */
927                         recovery = alloc_recovery(tdb, &recovery_size);
928                         if (TDB_PTR_IS_ERR(recovery))
929                                 return TDB_PTR_ERR(recovery);
930                 }
931
932                 recovery_off = create_recovery_area(tdb, recovery_size,
933                                                     recovery);
934                 if (TDB_OFF_IS_ERR(recovery_off)) {
935                         free(recovery);
936                         return TDB_OFF_TO_ERR(recovery_off);
937                 }
938         }
939
940         /* Now we know size, convert rec header. */
941         recovery->magic = TDB_RECOVERY_INVALID_MAGIC;
942         recovery->len = recovery_size;
943         recovery->eof = old_map_size;
944         tdb_convert(tdb, recovery, sizeof(*recovery));
945
946         /* write the recovery data to the recovery area */
947         ecode = methods->twrite(tdb, recovery_off, recovery, recovery_size);
948         if (ecode != TDB_SUCCESS) {
949                 free(recovery);
950                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
951                                   "tdb_transaction_setup_recovery:"
952                                   " failed to write recovery data");
953         }
954         transaction_write_existing(tdb, recovery_off, recovery, recovery_size);
955
956         free(recovery);
957
958         /* as we don't have ordered writes, we have to sync the recovery
959            data before we update the magic to indicate that the recovery
960            data is present */
961         ecode = transaction_sync(tdb, recovery_off, recovery_size);
962         if (ecode != TDB_SUCCESS)
963                 return ecode;
964
965         magic = TDB_RECOVERY_MAGIC;
966         tdb_convert(tdb, &magic, sizeof(magic));
967
968         tdb->tdb2.transaction->magic_offset
969                 = recovery_off + offsetof(struct tdb_recovery_record, magic);
970
971         ecode = methods->twrite(tdb, tdb->tdb2.transaction->magic_offset,
972                                 &magic, sizeof(magic));
973         if (ecode != TDB_SUCCESS) {
974                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
975                                   "tdb_transaction_setup_recovery:"
976                                   " failed to write recovery magic");
977         }
978         transaction_write_existing(tdb, tdb->tdb2.transaction->magic_offset,
979                                    &magic, sizeof(magic));
980
981         /* ensure the recovery magic marker is on disk */
982         return transaction_sync(tdb, tdb->tdb2.transaction->magic_offset,
983                                 sizeof(magic));
984 }
985
986 static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
987 {
988         const struct tdb_methods *methods;
989         enum TDB_ERROR ecode;
990
991         if (tdb->tdb2.transaction == NULL) {
992                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
993                                   "tdb_transaction_prepare_commit:"
994                                   " no transaction");
995         }
996
997         if (tdb->tdb2.transaction->prepared) {
998                 _tdb_transaction_cancel(tdb);
999                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
1000                                   "tdb_transaction_prepare_commit:"
1001                                   " transaction already prepared");
1002         }
1003
1004         if (tdb->tdb2.transaction->transaction_error) {
1005                 _tdb_transaction_cancel(tdb);
1006                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
1007                                   "tdb_transaction_prepare_commit:"
1008                                   " transaction error pending");
1009         }
1010
1011
1012         if (tdb->tdb2.transaction->nesting != 0) {
1013                 return TDB_SUCCESS;
1014         }
1015
1016         /* check for a null transaction */
1017         if (tdb->tdb2.transaction->blocks == NULL) {
1018                 return TDB_SUCCESS;
1019         }
1020
1021         methods = tdb->tdb2.transaction->io_methods;
1022
1023         /* upgrade the main transaction lock region to a write lock */
1024         ecode = tdb_allrecord_upgrade(tdb, TDB_HASH_LOCK_START);
1025         if (ecode != TDB_SUCCESS) {
1026                 return ecode;
1027         }
1028
1029         /* get the open lock - this prevents new users attaching to the database
1030            during the commit */
1031         ecode = tdb_lock_open(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
1032         if (ecode != TDB_SUCCESS) {
1033                 return ecode;
1034         }
1035
1036         /* Since we have whole db locked, we don't need the expansion lock. */
1037         if (!(tdb->flags & TDB_NOSYNC)) {
1038                 /* Sets up tdb->tdb2.transaction->recovery and
1039                  * tdb->tdb2.transaction->magic_offset. */
1040                 ecode = transaction_setup_recovery(tdb);
1041                 if (ecode != TDB_SUCCESS) {
1042                         return ecode;
1043                 }
1044         }
1045
1046         tdb->tdb2.transaction->prepared = true;
1047
1048         /* expand the file to the new size if needed */
1049         if (tdb->file->map_size != tdb->tdb2.transaction->old_map_size) {
1050                 tdb_len_t add;
1051
1052                 add = tdb->file->map_size - tdb->tdb2.transaction->old_map_size;
1053                 /* Restore original map size for tdb_expand_file */
1054                 tdb->file->map_size = tdb->tdb2.transaction->old_map_size;
1055                 ecode = methods->expand_file(tdb, add);
1056                 if (ecode != TDB_SUCCESS) {
1057                         return ecode;
1058                 }
1059         }
1060
1061         /* Keep the open lock until the actual commit */
1062         return TDB_SUCCESS;
1063 }
1064
1065 /*
1066    prepare to commit the current transaction
1067 */
1068 enum TDB_ERROR tdb_transaction_prepare_commit(struct tdb_context *tdb)
1069 {
1070         if (tdb->flags & TDB_VERSION1) {
1071                 if (tdb1_transaction_prepare_commit(tdb) == -1)
1072                         return tdb->last_error;
1073                 return TDB_SUCCESS;
1074         }
1075         return tdb->last_error = _tdb_transaction_prepare_commit(tdb);
1076 }
1077
1078 /*
1079   commit the current transaction
1080 */
1081 enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb)
1082 {
1083         const struct tdb_methods *methods;
1084         int i;
1085         enum TDB_ERROR ecode;
1086
1087         if (tdb->flags & TDB_VERSION1) {
1088                 if (tdb1_transaction_commit(tdb) == -1)
1089                         return tdb->last_error;
1090                 return TDB_SUCCESS;
1091         }
1092
1093         if (tdb->tdb2.transaction == NULL) {
1094                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
1095                                                     TDB_LOG_USE_ERROR,
1096                                                     "tdb_transaction_commit:"
1097                                                     " no transaction");
1098         }
1099
1100         tdb_trace(tdb, "tdb_transaction_commit");
1101
1102         if (tdb->tdb2.transaction->nesting != 0) {
1103                 tdb->tdb2.transaction->nesting--;
1104                 return tdb->last_error = TDB_SUCCESS;
1105         }
1106
1107         /* check for a null transaction */
1108         if (tdb->tdb2.transaction->blocks == NULL) {
1109                 _tdb_transaction_cancel(tdb);
1110                 return tdb->last_error = TDB_SUCCESS;
1111         }
1112
1113         if (!tdb->tdb2.transaction->prepared) {
1114                 ecode = _tdb_transaction_prepare_commit(tdb);
1115                 if (ecode != TDB_SUCCESS) {
1116                         _tdb_transaction_cancel(tdb);
1117                         return tdb->last_error = ecode;
1118                 }
1119         }
1120
1121         methods = tdb->tdb2.transaction->io_methods;
1122
1123         /* perform all the writes */
1124         for (i=0;i<tdb->tdb2.transaction->num_blocks;i++) {
1125                 tdb_off_t offset;
1126                 tdb_len_t length;
1127
1128                 if (tdb->tdb2.transaction->blocks[i] == NULL) {
1129                         continue;
1130                 }
1131
1132                 offset = i * PAGESIZE;
1133                 length = PAGESIZE;
1134                 if (i == tdb->tdb2.transaction->num_blocks-1) {
1135                         length = tdb->tdb2.transaction->last_block_size;
1136                 }
1137
1138                 ecode = methods->twrite(tdb, offset,
1139                                         tdb->tdb2.transaction->blocks[i], length);
1140                 if (ecode != TDB_SUCCESS) {
1141                         /* we've overwritten part of the data and
1142                            possibly expanded the file, so we need to
1143                            run the crash recovery code */
1144                         tdb->tdb2.io = methods;
1145                         tdb_transaction_recover(tdb);
1146
1147                         _tdb_transaction_cancel(tdb);
1148
1149                         return tdb->last_error = ecode;
1150                 }
1151                 SAFE_FREE(tdb->tdb2.transaction->blocks[i]);
1152         }
1153
1154         SAFE_FREE(tdb->tdb2.transaction->blocks);
1155         tdb->tdb2.transaction->num_blocks = 0;
1156
1157         /* ensure the new data is on disk */
1158         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1159         if (ecode != TDB_SUCCESS) {
1160                 return tdb->last_error = ecode;
1161         }
1162
1163         /*
1164           TODO: maybe write to some dummy hdr field, or write to magic
1165           offset without mmap, before the last sync, instead of the
1166           utime() call
1167         */
1168
1169         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1170            don't change the mtime of the file, this means the file may
1171            not be backed up (as tdb rounding to block sizes means that
1172            file size changes are quite rare too). The following forces
1173            mtime changes when a transaction completes */
1174 #if HAVE_UTIME
1175         utime(tdb->name, NULL);
1176 #endif
1177
1178         /* use a transaction cancel to free memory and remove the
1179            transaction locks: it "restores" map_size, too. */
1180         tdb->tdb2.transaction->old_map_size = tdb->file->map_size;
1181         _tdb_transaction_cancel(tdb);
1182
1183         return tdb->last_error = TDB_SUCCESS;
1184 }
1185
1186
1187 /*
1188   recover from an aborted transaction. Must be called with exclusive
1189   database write access already established (including the open
1190   lock to prevent new processes attaching)
1191 */
1192 enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb)
1193 {
1194         tdb_off_t recovery_head, recovery_eof;
1195         unsigned char *data, *p;
1196         struct tdb_recovery_record rec;
1197         enum TDB_ERROR ecode;
1198
1199         /* find the recovery area */
1200         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1201         if (TDB_OFF_IS_ERR(recovery_head)) {
1202                 ecode = TDB_OFF_TO_ERR(recovery_head);
1203                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1204                                   "tdb_transaction_recover:"
1205                                   " failed to read recovery head");
1206         }
1207
1208         if (recovery_head == 0) {
1209                 /* we have never allocated a recovery record */
1210                 return TDB_SUCCESS;
1211         }
1212
1213         /* read the recovery record */
1214         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1215         if (ecode != TDB_SUCCESS) {
1216                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1217                                   "tdb_transaction_recover:"
1218                                   " failed to read recovery record");
1219         }
1220
1221         if (rec.magic != TDB_RECOVERY_MAGIC) {
1222                 /* there is no valid recovery data */
1223                 return TDB_SUCCESS;
1224         }
1225
1226         if (tdb->flags & TDB_RDONLY) {
1227                 return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1228                                   "tdb_transaction_recover:"
1229                                   " attempt to recover read only database");
1230         }
1231
1232         recovery_eof = rec.eof;
1233
1234         data = (unsigned char *)malloc(rec.len);
1235         if (data == NULL) {
1236                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1237                                   "tdb_transaction_recover:"
1238                                   " failed to allocate recovery data");
1239         }
1240
1241         /* read the full recovery data */
1242         ecode = tdb->tdb2.io->tread(tdb, recovery_head + sizeof(rec), data,
1243                                     rec.len);
1244         if (ecode != TDB_SUCCESS) {
1245                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1246                                   "tdb_transaction_recover:"
1247                                   " failed to read recovery data");
1248         }
1249
1250         /* recover the file data */
1251         p = data;
1252         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1253                 tdb_off_t ofs;
1254                 tdb_len_t len;
1255                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1256                 memcpy(&ofs, p, sizeof(ofs));
1257                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1258                 p += sizeof(ofs) + sizeof(len);
1259
1260                 ecode = tdb->tdb2.io->twrite(tdb, ofs, p, len);
1261                 if (ecode != TDB_SUCCESS) {
1262                         free(data);
1263                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1264                                           "tdb_transaction_recover:"
1265                                           " failed to recover %zu bytes"
1266                                           " at offset %zu",
1267                                           (size_t)len, (size_t)ofs);
1268                 }
1269                 p += len;
1270         }
1271
1272         free(data);
1273
1274         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1275         if (ecode != TDB_SUCCESS) {
1276                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1277                                   "tdb_transaction_recover:"
1278                                   " failed to sync recovery");
1279         }
1280
1281         /* if the recovery area is after the recovered eof then remove it */
1282         if (recovery_eof <= recovery_head) {
1283                 ecode = tdb_write_off(tdb, offsetof(struct tdb_header,
1284                                                     recovery),
1285                                       0);
1286                 if (ecode != TDB_SUCCESS) {
1287                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1288                                           "tdb_transaction_recover:"
1289                                           " failed to remove recovery head");
1290                 }
1291         }
1292
1293         /* remove the recovery magic */
1294         ecode = tdb_write_off(tdb,
1295                               recovery_head
1296                               + offsetof(struct tdb_recovery_record, magic),
1297                               TDB_RECOVERY_INVALID_MAGIC);
1298         if (ecode != TDB_SUCCESS) {
1299                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1300                                   "tdb_transaction_recover:"
1301                                   " failed to remove recovery magic");
1302         }
1303
1304         ecode = transaction_sync(tdb, 0, recovery_eof);
1305         if (ecode != TDB_SUCCESS) {
1306                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1307                                   "tdb_transaction_recover:"
1308                                   " failed to sync2 recovery");
1309         }
1310
1311         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1312                    "tdb_transaction_recover: recovered %zu byte database",
1313                    (size_t)recovery_eof);
1314
1315         /* all done */
1316         return TDB_SUCCESS;
1317 }
1318
1319 tdb_bool_err tdb_needs_recovery(struct tdb_context *tdb)
1320 {
1321         tdb_off_t recovery_head;
1322         struct tdb_recovery_record rec;
1323         enum TDB_ERROR ecode;
1324
1325         /* find the recovery area */
1326         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1327         if (TDB_OFF_IS_ERR(recovery_head)) {
1328                 return recovery_head;
1329         }
1330
1331         if (recovery_head == 0) {
1332                 /* we have never allocated a recovery record */
1333                 return false;
1334         }
1335
1336         /* read the recovery record */
1337         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1338         if (ecode != TDB_SUCCESS) {
1339                 return TDB_ERR_TO_OFF(ecode);
1340         }
1341
1342         return (rec.magic == TDB_RECOVERY_MAGIC);
1343 }