]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/tdb1_transaction.c
126f7684689ba50b7fbd352d75aa8c0679b185e8
[ccan] / ccan / tdb2 / tdb1_transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb1_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb1_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb1_write() calls. The hooked
48     transaction versions of tdb1_read() and tdb1_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb1_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb1_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb1_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
89     tdb1_add_flags() transaction nesting is enabled.
90     The default is that transaction nesting is NOT allowed.
91
92     Beware. when transactions are nested a transaction successfully
93     completed with tdb1_transaction_commit() can be silently unrolled later.
94 */
95
96
97 /*
98   hold the context of any current transaction
99 */
100 struct tdb1_transaction {
101         /* we keep a mirrored copy of the tdb hash heads here so
102            tdb1_next_hash_chain() can operate efficiently */
103         uint32_t *hash_heads;
104
105         /* the original io methods - used to do IOs to the real db */
106         const struct tdb1_methods *io_methods;
107
108         /* the list of transaction blocks. When a block is first
109            written to, it gets created in this list */
110         uint8_t **blocks;
111         uint32_t num_blocks;
112         uint32_t block_size;      /* bytes in each block */
113         uint32_t last_block_size; /* number of valid bytes in the last block */
114
115         /* non-zero when an internal transaction error has
116            occurred. All write operations will then fail until the
117            transaction is ended */
118         int transaction_error;
119
120         /* when inside a transaction we need to keep track of any
121            nested tdb1_transaction_start() calls, as these are allowed,
122            but don't create a new transaction */
123         int nesting;
124
125         /* set when a prepare has already occurred */
126         bool prepared;
127         tdb1_off_t magic_offset;
128
129         /* old file size before transaction */
130         tdb1_len_t old_map_size;
131
132         /* did we expand in this transaction */
133         bool expanded;
134 };
135
136
137 /*
138   read while in a transaction. We need to check first if the data is in our list
139   of transaction elements, then if not do a real read
140 */
141 static int transaction1_read(struct tdb_context *tdb, tdb1_off_t off, void *buf,
142                              tdb1_len_t len, int cv)
143 {
144         uint32_t blk;
145
146         /* break it down into block sized ops */
147         while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
148                 tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
149                 if (transaction1_read(tdb, off, buf, len2, cv) != 0) {
150                         return -1;
151                 }
152                 len -= len2;
153                 off += len2;
154                 buf = (void *)(len2 + (char *)buf);
155         }
156
157         if (len == 0) {
158                 return 0;
159         }
160
161         blk = off / tdb->tdb1.transaction->block_size;
162
163         /* see if we have it in the block list */
164         if (tdb->tdb1.transaction->num_blocks <= blk ||
165             tdb->tdb1.transaction->blocks[blk] == NULL) {
166                 /* nope, do a real read */
167                 if (tdb->tdb1.transaction->io_methods->tdb1_read(tdb, off, buf, len, cv) != 0) {
168                         goto fail;
169                 }
170                 return 0;
171         }
172
173         /* it is in the block list. Now check for the last block */
174         if (blk == tdb->tdb1.transaction->num_blocks-1) {
175                 if (len > tdb->tdb1.transaction->last_block_size) {
176                         goto fail;
177                 }
178         }
179
180         /* now copy it out of this block */
181         memcpy(buf, tdb->tdb1.transaction->blocks[blk] + (off % tdb->tdb1.transaction->block_size), len);
182         if (cv) {
183                 tdb1_convert(buf, len);
184         }
185         return 0;
186
187 fail:
188         tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
189                                 "transaction_read: failed at off=%d len=%d",
190                                 off, len);
191         tdb->tdb1.transaction->transaction_error = 1;
192         return -1;
193 }
194
195
196 /*
197   write while in a transaction
198 */
199 static int transaction1_write(struct tdb_context *tdb, tdb1_off_t off,
200                              const void *buf, tdb1_len_t len)
201 {
202         uint32_t blk;
203
204         /* Only a commit is allowed on a prepared transaction */
205         if (tdb->tdb1.transaction->prepared) {
206                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
207                                         "transaction_write: transaction already"
208                                         " prepared, write not allowed");
209                 tdb->tdb1.transaction->transaction_error = 1;
210                 return -1;
211         }
212
213         /* if the write is to a hash head, then update the transaction
214            hash heads */
215         if (len == sizeof(tdb1_off_t) && off >= TDB1_FREELIST_TOP &&
216             off < TDB1_FREELIST_TOP+TDB1_HASHTABLE_SIZE(tdb)) {
217                 uint32_t chain = (off-TDB1_FREELIST_TOP) / sizeof(tdb1_off_t);
218                 memcpy(&tdb->tdb1.transaction->hash_heads[chain], buf, len);
219         }
220
221         /* break it up into block sized chunks */
222         while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
223                 tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
224                 if (transaction1_write(tdb, off, buf, len2) != 0) {
225                         return -1;
226                 }
227                 len -= len2;
228                 off += len2;
229                 if (buf != NULL) {
230                         buf = (const void *)(len2 + (const char *)buf);
231                 }
232         }
233
234         if (len == 0) {
235                 return 0;
236         }
237
238         blk = off / tdb->tdb1.transaction->block_size;
239         off = off % tdb->tdb1.transaction->block_size;
240
241         if (tdb->tdb1.transaction->num_blocks <= blk) {
242                 uint8_t **new_blocks;
243                 /* expand the blocks array */
244                 if (tdb->tdb1.transaction->blocks == NULL) {
245                         new_blocks = (uint8_t **)malloc(
246                                 (blk+1)*sizeof(uint8_t *));
247                 } else {
248                         new_blocks = (uint8_t **)realloc(
249                                 tdb->tdb1.transaction->blocks,
250                                 (blk+1)*sizeof(uint8_t *));
251                 }
252                 if (new_blocks == NULL) {
253                         tdb->last_error = TDB_ERR_OOM;
254                         goto fail;
255                 }
256                 memset(&new_blocks[tdb->tdb1.transaction->num_blocks], 0,
257                        (1+(blk - tdb->tdb1.transaction->num_blocks))*sizeof(uint8_t *));
258                 tdb->tdb1.transaction->blocks = new_blocks;
259                 tdb->tdb1.transaction->num_blocks = blk+1;
260                 tdb->tdb1.transaction->last_block_size = 0;
261         }
262
263         /* allocate and fill a block? */
264         if (tdb->tdb1.transaction->blocks[blk] == NULL) {
265                 tdb->tdb1.transaction->blocks[blk] = (uint8_t *)calloc(tdb->tdb1.transaction->block_size, 1);
266                 if (tdb->tdb1.transaction->blocks[blk] == NULL) {
267                         tdb->last_error = TDB_ERR_OOM;
268                         tdb->tdb1.transaction->transaction_error = 1;
269                         return -1;
270                 }
271                 if (tdb->tdb1.transaction->old_map_size > blk * tdb->tdb1.transaction->block_size) {
272                         tdb1_len_t len2 = tdb->tdb1.transaction->block_size;
273                         if (len2 + (blk * tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->old_map_size) {
274                                 len2 = tdb->tdb1.transaction->old_map_size - (blk * tdb->tdb1.transaction->block_size);
275                         }
276                         if (tdb->tdb1.transaction->io_methods->tdb1_read(tdb, blk * tdb->tdb1.transaction->block_size,
277                                                                    tdb->tdb1.transaction->blocks[blk],
278                                                                    len2, 0) != 0) {
279                                 SAFE_FREE(tdb->tdb1.transaction->blocks[blk]);
280                                 tdb->last_error = TDB_ERR_IO;
281                                 goto fail;
282                         }
283                         if (blk == tdb->tdb1.transaction->num_blocks-1) {
284                                 tdb->tdb1.transaction->last_block_size = len2;
285                         }
286                 }
287         }
288
289         /* overwrite part of an existing block */
290         if (buf == NULL) {
291                 memset(tdb->tdb1.transaction->blocks[blk] + off, 0, len);
292         } else {
293                 memcpy(tdb->tdb1.transaction->blocks[blk] + off, buf, len);
294         }
295         if (blk == tdb->tdb1.transaction->num_blocks-1) {
296                 if (len + off > tdb->tdb1.transaction->last_block_size) {
297                         tdb->tdb1.transaction->last_block_size = len + off;
298                 }
299         }
300
301         return 0;
302
303 fail:
304         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
305                    "transaction_write: failed at off=%d len=%d",
306                    (blk*tdb->tdb1.transaction->block_size) + off, len);
307         tdb->tdb1.transaction->transaction_error = 1;
308         return -1;
309 }
310
311
312 /*
313   write while in a transaction - this varient never expands the transaction blocks, it only
314   updates existing blocks. This means it cannot change the recovery size
315 */
316 static int transaction1_write_existing(struct tdb_context *tdb, tdb1_off_t off,
317                                       const void *buf, tdb1_len_t len)
318 {
319         uint32_t blk;
320
321         /* break it up into block sized chunks */
322         while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
323                 tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
324                 if (transaction1_write_existing(tdb, off, buf, len2) != 0) {
325                         return -1;
326                 }
327                 len -= len2;
328                 off += len2;
329                 if (buf != NULL) {
330                         buf = (const void *)(len2 + (const char *)buf);
331                 }
332         }
333
334         if (len == 0) {
335                 return 0;
336         }
337
338         blk = off / tdb->tdb1.transaction->block_size;
339         off = off % tdb->tdb1.transaction->block_size;
340
341         if (tdb->tdb1.transaction->num_blocks <= blk ||
342             tdb->tdb1.transaction->blocks[blk] == NULL) {
343                 return 0;
344         }
345
346         if (blk == tdb->tdb1.transaction->num_blocks-1 &&
347             off + len > tdb->tdb1.transaction->last_block_size) {
348                 if (off >= tdb->tdb1.transaction->last_block_size) {
349                         return 0;
350                 }
351                 len = tdb->tdb1.transaction->last_block_size - off;
352         }
353
354         /* overwrite part of an existing block */
355         memcpy(tdb->tdb1.transaction->blocks[blk] + off, buf, len);
356
357         return 0;
358 }
359
360
361 /*
362   accelerated hash chain head search, using the cached hash heads
363 */
364 static void transaction1_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
365 {
366         uint32_t h = *chain;
367         for (;h < tdb->tdb1.header.hash_size;h++) {
368                 /* the +1 takes account of the freelist */
369                 if (0 != tdb->tdb1.transaction->hash_heads[h+1]) {
370                         break;
371                 }
372         }
373         (*chain) = h;
374 }
375
376 /*
377   out of bounds check during a transaction
378 */
379 static int transaction1_oob(struct tdb_context *tdb, tdb1_off_t len, int probe)
380 {
381         if (len <= tdb->file->map_size) {
382                 return 0;
383         }
384         tdb->last_error = TDB_ERR_IO;
385         return -1;
386 }
387
388 /*
389   transaction version of tdb1_expand().
390 */
391 static int transaction1_expand_file(struct tdb_context *tdb, tdb1_off_t size,
392                                     tdb1_off_t addition)
393 {
394         /* add a write to the transaction elements, so subsequent
395            reads see the zero data */
396         if (transaction1_write(tdb, size, NULL, addition) != 0) {
397                 return -1;
398         }
399
400         tdb->tdb1.transaction->expanded = true;
401
402         return 0;
403 }
404
405 static const struct tdb1_methods transaction1_methods = {
406         transaction1_read,
407         transaction1_write,
408         transaction1_next_hash_chain,
409         transaction1_oob,
410         transaction1_expand_file,
411 };
412
413
414 /*
415   start a tdb transaction. No token is returned, as only a single
416   transaction is allowed to be pending per tdb_context
417 */
418 static int _tdb1_transaction_start(struct tdb_context *tdb)
419 {
420         /* some sanity checks */
421         if ((tdb->flags & TDB_RDONLY) || (tdb->flags & TDB_INTERNAL) || tdb->tdb1.traverse_read) {
422                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
423                                         "tdb1_transaction_start: cannot start a"
424                                         " transaction on a read-only or"
425                                         " internal db");
426                 return -1;
427         }
428
429         /* cope with nested tdb1_transaction_start() calls */
430         if (tdb->tdb1.transaction != NULL) {
431                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
432                         tdb->last_error = TDB_ERR_EINVAL;
433                         return -1;
434                 }
435                 tdb->stats.transaction_nest++;
436                 tdb->tdb1.transaction->nesting++;
437                 return 0;
438         }
439
440         if (tdb1_have_extra_locks(tdb)) {
441                 /* the caller must not have any locks when starting a
442                    transaction as otherwise we'll be screwed by lack
443                    of nested locks in posix */
444                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
445                                         "tdb1_transaction_start: cannot start a"
446                                         " transaction with locks held");
447                 return -1;
448         }
449
450         if (tdb->tdb1.travlocks.next != NULL) {
451                 /* you cannot use transactions inside a traverse (although you can use
452                    traverse inside a transaction) as otherwise you can end up with
453                    deadlock */
454                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
455                                         "tdb1_transaction_start: cannot start a"
456                                         " transaction within a traverse");
457                 return -1;
458         }
459
460         tdb->tdb1.transaction = (struct tdb1_transaction *)
461                 calloc(sizeof(struct tdb1_transaction), 1);
462         if (tdb->tdb1.transaction == NULL) {
463                 tdb->last_error = TDB_ERR_OOM;
464                 return -1;
465         }
466
467         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
468         tdb->tdb1.transaction->block_size = tdb->tdb1.page_size;
469
470         /* get the transaction write lock. This is a blocking lock. As
471            discussed with Volker, there are a number of ways we could
472            make this async, which we will probably do in the future */
473         if (tdb1_transaction_lock(tdb, F_WRLCK, TDB_LOCK_WAIT) == -1) {
474                 SAFE_FREE(tdb->tdb1.transaction->blocks);
475                 SAFE_FREE(tdb->tdb1.transaction);
476                 return -1;
477         }
478
479         /* get a read lock from the freelist to the end of file. This
480            is upgraded to a write lock during the commit */
481         if (tdb1_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
482                 if (errno != EAGAIN && errno != EINTR) {
483                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
484                                    "tdb1_transaction_start:"
485                                    " failed to get hash locks");
486                 }
487                 goto fail_allrecord_lock;
488         }
489
490         /* setup a copy of the hash table heads so the hash scan in
491            traverse can be fast */
492         tdb->tdb1.transaction->hash_heads = (uint32_t *)
493                 calloc(tdb->tdb1.header.hash_size+1, sizeof(uint32_t));
494         if (tdb->tdb1.transaction->hash_heads == NULL) {
495                 tdb->last_error = TDB_ERR_OOM;
496                 goto fail;
497         }
498         if (tdb->tdb1.io->tdb1_read(tdb, TDB1_FREELIST_TOP, tdb->tdb1.transaction->hash_heads,
499                                    TDB1_HASHTABLE_SIZE(tdb), 0) != 0) {
500                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
501                            "tdb1_transaction_start: failed to read hash heads");
502                 goto fail;
503         }
504
505         /* make sure we know about any file expansions already done by
506            anyone else */
507         tdb->tdb1.io->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
508         tdb->tdb1.transaction->old_map_size = tdb->file->map_size;
509
510         /* finally hook the io methods, replacing them with
511            transaction specific methods */
512         tdb->tdb1.transaction->io_methods = tdb->tdb1.io;
513         tdb->tdb1.io = &transaction1_methods;
514
515         tdb->stats.transactions++;
516         return 0;
517
518 fail:
519         tdb1_allrecord_unlock(tdb, F_RDLCK);
520 fail_allrecord_lock:
521         tdb1_transaction_unlock(tdb, F_WRLCK);
522         SAFE_FREE(tdb->tdb1.transaction->blocks);
523         SAFE_FREE(tdb->tdb1.transaction->hash_heads);
524         SAFE_FREE(tdb->tdb1.transaction);
525         return -1;
526 }
527
528 int tdb1_transaction_start(struct tdb_context *tdb)
529 {
530         return _tdb1_transaction_start(tdb);
531 }
532
533 /*
534   sync to disk
535 */
536 static int transaction1_sync(struct tdb_context *tdb, tdb1_off_t offset, tdb1_len_t length)
537 {
538         if (tdb->flags & TDB_NOSYNC) {
539                 return 0;
540         }
541
542 #if HAVE_FDATASYNC
543         if (fdatasync(tdb->file->fd) != 0) {
544 #else
545         if (fsync(tdb->file->fd) != 0) {
546 #endif
547                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
548                                         "tdb1_transaction: fsync failed");
549                 return -1;
550         }
551 #if HAVE_MMAP
552         if (tdb->file->map_ptr) {
553                 tdb1_off_t moffset = offset & ~(tdb->tdb1.page_size-1);
554                 if (msync(moffset + (char *)tdb->file->map_ptr,
555                           length + (offset - moffset), MS_SYNC) != 0) {
556                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
557                                                 "tdb1_transaction:"
558                                                 " msync failed - %s",
559                                                 strerror(errno));
560                         return -1;
561                 }
562         }
563 #endif
564         return 0;
565 }
566
567
568 static int _tdb1_transaction_cancel(struct tdb_context *tdb)
569 {
570         int i, ret = 0;
571
572         if (tdb->tdb1.transaction == NULL) {
573                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
574                                         "tdb1_transaction_cancel:"
575                                         " no transaction");
576                 return -1;
577         }
578
579         if (tdb->tdb1.transaction->nesting != 0) {
580                 tdb->tdb1.transaction->transaction_error = 1;
581                 tdb->tdb1.transaction->nesting--;
582                 return 0;
583         }
584
585         tdb->file->map_size = tdb->tdb1.transaction->old_map_size;
586
587         /* free all the transaction blocks */
588         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
589                 if (tdb->tdb1.transaction->blocks[i] != NULL) {
590                         free(tdb->tdb1.transaction->blocks[i]);
591                 }
592         }
593         SAFE_FREE(tdb->tdb1.transaction->blocks);
594
595         if (tdb->tdb1.transaction->magic_offset) {
596                 const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
597                 const uint32_t invalid = TDB1_RECOVERY_INVALID_MAGIC;
598
599                 /* remove the recovery marker */
600                 if (methods->tdb1_write(tdb, tdb->tdb1.transaction->magic_offset, &invalid, 4) == -1 ||
601                 transaction1_sync(tdb, tdb->tdb1.transaction->magic_offset, 4) == -1) {
602                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
603                                    "tdb1_transaction_cancel: failed to"
604                                    " remove recovery magic");
605                         ret = -1;
606                 }
607         }
608
609         /* This also removes the OPEN_LOCK, if we have it. */
610         tdb1_release_transaction_locks(tdb);
611
612         /* restore the normal io methods */
613         tdb->tdb1.io = tdb->tdb1.transaction->io_methods;
614
615         SAFE_FREE(tdb->tdb1.transaction->hash_heads);
616         SAFE_FREE(tdb->tdb1.transaction);
617
618         return ret;
619 }
620
621 /*
622   cancel the current transaction
623 */
624 int tdb1_transaction_cancel(struct tdb_context *tdb)
625 {
626         tdb->stats.transaction_cancel++;
627         return _tdb1_transaction_cancel(tdb);
628 }
629
630 /*
631   work out how much space the linearised recovery data will consume
632 */
633 static tdb1_len_t tdb1_recovery_size(struct tdb_context *tdb)
634 {
635         tdb1_len_t recovery_size = 0;
636         int i;
637
638         recovery_size = sizeof(uint32_t);
639         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
640                 if (i * tdb->tdb1.transaction->block_size >= tdb->tdb1.transaction->old_map_size) {
641                         break;
642                 }
643                 if (tdb->tdb1.transaction->blocks[i] == NULL) {
644                         continue;
645                 }
646                 recovery_size += 2*sizeof(tdb1_off_t);
647                 if (i == tdb->tdb1.transaction->num_blocks-1) {
648                         recovery_size += tdb->tdb1.transaction->last_block_size;
649                 } else {
650                         recovery_size += tdb->tdb1.transaction->block_size;
651                 }
652         }
653
654         return recovery_size;
655 }
656
657 int tdb1_recovery_area(struct tdb_context *tdb,
658                       const struct tdb1_methods *methods,
659                       tdb1_off_t *recovery_offset,
660                       struct tdb1_record *rec)
661 {
662         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, recovery_offset) == -1) {
663                 return -1;
664         }
665
666         if (*recovery_offset == 0) {
667                 rec->rec_len = 0;
668                 return 0;
669         }
670
671         if (methods->tdb1_read(tdb, *recovery_offset, rec, sizeof(*rec),
672                               TDB1_DOCONV()) == -1) {
673                 return -1;
674         }
675
676         /* ignore invalid recovery regions: can happen in crash */
677         if (rec->magic != TDB1_RECOVERY_MAGIC &&
678             rec->magic != TDB1_RECOVERY_INVALID_MAGIC) {
679                 *recovery_offset = 0;
680                 rec->rec_len = 0;
681         }
682         return 0;
683 }
684
685 /*
686   allocate the recovery area, or use an existing recovery area if it is
687   large enough
688 */
689 static int tdb1_recovery_allocate(struct tdb_context *tdb,
690                                  tdb1_len_t *recovery_size,
691                                  tdb1_off_t *recovery_offset,
692                                  tdb1_len_t *recovery_max_size)
693 {
694         struct tdb1_record rec;
695         const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
696         tdb1_off_t recovery_head;
697
698         if (tdb1_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
699                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
700                            "tdb1_recovery_allocate:"
701                            " failed to read recovery head");
702                 return -1;
703         }
704
705         *recovery_size = tdb1_recovery_size(tdb);
706
707         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
708                 /* it fits in the existing area */
709                 *recovery_max_size = rec.rec_len;
710                 *recovery_offset = recovery_head;
711                 return 0;
712         }
713
714         /* we need to free up the old recovery area, then allocate a
715            new one at the end of the file. Note that we cannot use
716            tdb1_allocate() to allocate the new one as that might return
717            us an area that is being currently used (as of the start of
718            the transaction) */
719         if (recovery_head != 0) {
720                 if (tdb1_free(tdb, recovery_head, &rec) == -1) {
721                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
722                                    "tdb1_recovery_allocate: failed to free"
723                                    " previous recovery area");
724                         return -1;
725                 }
726         }
727
728         /* the tdb1_free() call might have increased the recovery size */
729         *recovery_size = tdb1_recovery_size(tdb);
730
731         /* round up to a multiple of page size */
732         *recovery_max_size = TDB1_ALIGN(sizeof(rec) + *recovery_size,
733                                         tdb->tdb1.page_size) - sizeof(rec);
734         *recovery_offset = tdb->file->map_size;
735         recovery_head = *recovery_offset;
736
737         if (methods->tdb1_expand_file(tdb, tdb->tdb1.transaction->old_map_size,
738                                      (tdb->file->map_size - tdb->tdb1.transaction->old_map_size) +
739                                      sizeof(rec) + *recovery_max_size) == -1) {
740                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
741                            "tdb1_recovery_allocate:"
742                            " failed to create recovery area");
743                 return -1;
744         }
745         tdb->stats.transaction_expand_file++;
746
747         /* remap the file (if using mmap) */
748         methods->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
749
750         /* we have to reset the old map size so that we don't try to expand the file
751            again in the transaction commit, which would destroy the recovery area */
752         tdb->tdb1.transaction->old_map_size = tdb->file->map_size;
753
754         /* write the recovery header offset and sync - we can sync without a race here
755            as the magic ptr in the recovery record has not been set */
756         TDB1_CONV(recovery_head);
757         if (methods->tdb1_write(tdb, TDB1_RECOVERY_HEAD,
758                                &recovery_head, sizeof(tdb1_off_t)) == -1) {
759                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
760                            "tdb1_recovery_allocate:"
761                            " failed to write recovery head");
762                 return -1;
763         }
764         if (transaction1_write_existing(tdb, TDB1_RECOVERY_HEAD, &recovery_head, sizeof(tdb1_off_t)) == -1) {
765                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
766                            "tdb1_recovery_allocate:"
767                            " failed to write recovery head");
768                 return -1;
769         }
770
771         return 0;
772 }
773
774
775 /*
776   setup the recovery data that will be used on a crash during commit
777 */
778 static int transaction1_setup_recovery(struct tdb_context *tdb,
779                                        tdb1_off_t *magic_offset)
780 {
781         tdb1_len_t recovery_size;
782         unsigned char *data, *p;
783         const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
784         struct tdb1_record *rec;
785         tdb1_off_t recovery_offset, recovery_max_size;
786         tdb1_off_t old_map_size = tdb->tdb1.transaction->old_map_size;
787         uint32_t magic, tailer;
788         int i;
789
790         /*
791           check that the recovery area has enough space
792         */
793         if (tdb1_recovery_allocate(tdb, &recovery_size,
794                                   &recovery_offset, &recovery_max_size) == -1) {
795                 return -1;
796         }
797
798         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
799         if (data == NULL) {
800                 tdb->last_error = TDB_ERR_OOM;
801                 return -1;
802         }
803
804         rec = (struct tdb1_record *)data;
805         memset(rec, 0, sizeof(*rec));
806
807         rec->magic    = TDB1_RECOVERY_INVALID_MAGIC;
808         rec->data_len = recovery_size;
809         rec->rec_len  = recovery_max_size;
810         rec->key_len  = old_map_size;
811         TDB1_CONV(*rec);
812
813         /* build the recovery data into a single blob to allow us to do a single
814            large write, which should be more efficient */
815         p = data + sizeof(*rec);
816         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
817                 tdb1_off_t offset;
818                 tdb1_len_t length;
819
820                 if (tdb->tdb1.transaction->blocks[i] == NULL) {
821                         continue;
822                 }
823
824                 offset = i * tdb->tdb1.transaction->block_size;
825                 length = tdb->tdb1.transaction->block_size;
826                 if (i == tdb->tdb1.transaction->num_blocks-1) {
827                         length = tdb->tdb1.transaction->last_block_size;
828                 }
829
830                 if (offset >= old_map_size) {
831                         continue;
832                 }
833                 if (offset + length > tdb->tdb1.transaction->old_map_size) {
834                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT,
835                                                 TDB_LOG_ERROR,
836                                                 "tdb1_transaction_setup_recovery: transaction data over new region boundary");
837                         free(data);
838                         return -1;
839                 }
840                 memcpy(p, &offset, 4);
841                 memcpy(p+4, &length, 4);
842                 if (TDB1_DOCONV()) {
843                         tdb1_convert(p, 8);
844                 }
845                 /* the recovery area contains the old data, not the
846                    new data, so we have to call the original tdb1_read
847                    method to get it */
848                 if (methods->tdb1_read(tdb, offset, p + 8, length, 0) != 0) {
849                         free(data);
850                         tdb->last_error = TDB_ERR_IO;
851                         return -1;
852                 }
853                 p += 8 + length;
854         }
855
856         /* and the tailer */
857         tailer = sizeof(*rec) + recovery_max_size;
858         memcpy(p, &tailer, 4);
859         if (TDB1_DOCONV()) {
860                 tdb1_convert(p, 4);
861         }
862
863         /* write the recovery data to the recovery area */
864         if (methods->tdb1_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
865                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
866                            "tdb1_transaction_setup_recovery:"
867                            " failed to write recovery data");
868                 free(data);
869                 return -1;
870         }
871         if (transaction1_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
872                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
873                            "tdb1_transaction_setup_recovery: failed to write"
874                            " secondary recovery data");
875                 free(data);
876                 return -1;
877         }
878
879         /* as we don't have ordered writes, we have to sync the recovery
880            data before we update the magic to indicate that the recovery
881            data is present */
882         if (transaction1_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
883                 free(data);
884                 return -1;
885         }
886
887         free(data);
888
889         magic = TDB1_RECOVERY_MAGIC;
890         TDB1_CONV(magic);
891
892         *magic_offset = recovery_offset + offsetof(struct tdb1_record, magic);
893
894         if (methods->tdb1_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
895                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
896                            "tdb1_transaction_setup_recovery:"
897                            " failed to write recovery magic");
898                 return -1;
899         }
900         if (transaction1_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
901                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
902                            "tdb1_transaction_setup_recovery:"
903                            " failed to write secondary recovery magic");
904                 return -1;
905         }
906
907         /* ensure the recovery magic marker is on disk */
908         if (transaction1_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
909                 return -1;
910         }
911
912         return 0;
913 }
914
915 static int _tdb1_transaction_prepare_commit(struct tdb_context *tdb)
916 {
917         const struct tdb1_methods *methods;
918
919         if (tdb->tdb1.transaction == NULL) {
920                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
921                                         "tdb1_transaction_prepare_commit:"
922                                         " no transaction");
923                 return -1;
924         }
925
926         if (tdb->tdb1.transaction->prepared) {
927                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
928                                         "tdb1_transaction_prepare_commit:"
929                                         " transaction already prepared");
930                 _tdb1_transaction_cancel(tdb);
931                 return -1;
932         }
933
934         if (tdb->tdb1.transaction->transaction_error) {
935                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
936                                         "tdb1_transaction_prepare_commit:"
937                                         " transaction error pending");
938                 _tdb1_transaction_cancel(tdb);
939                 return -1;
940         }
941
942
943         if (tdb->tdb1.transaction->nesting != 0) {
944                 return 0;
945         }
946
947         /* check for a null transaction */
948         if (tdb->tdb1.transaction->blocks == NULL) {
949                 return 0;
950         }
951
952         methods = tdb->tdb1.transaction->io_methods;
953
954         /* if there are any locks pending then the caller has not
955            nested their locks properly, so fail the transaction */
956         if (tdb1_have_extra_locks(tdb)) {
957                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
958                                         "tdb1_transaction_prepare_commit:"
959                                         " locks pending on commit");
960                 _tdb1_transaction_cancel(tdb);
961                 return -1;
962         }
963
964         /* upgrade the main transaction lock region to a write lock */
965         if (tdb1_allrecord_upgrade(tdb) == -1) {
966                 if (errno != EAGAIN && errno != EINTR) {
967                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
968                                    "tdb1_transaction_prepare_commit:"
969                                    " failed to upgrade hash locks");
970                 }
971                 return -1;
972         }
973
974         /* get the open lock - this prevents new users attaching to the database
975            during the commit */
976         if (tdb1_nest_lock(tdb, TDB1_OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
977                 if (errno != EAGAIN && errno != EINTR) {
978                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
979                                    "tdb1_transaction_prepare_commit:"
980                                    " failed to get open lock");
981                 }
982                 return -1;
983         }
984
985         if (!(tdb->flags & TDB_NOSYNC)) {
986                 /* write the recovery data to the end of the file */
987                 if (transaction1_setup_recovery(tdb, &tdb->tdb1.transaction->magic_offset) == -1) {
988                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
989                                    "tdb1_transaction_prepare_commit:"
990                                    " failed to setup recovery data");
991                         return -1;
992                 }
993         }
994
995         tdb->tdb1.transaction->prepared = true;
996
997         /* expand the file to the new size if needed */
998         if (tdb->file->map_size != tdb->tdb1.transaction->old_map_size) {
999                 if (methods->tdb1_expand_file(tdb, tdb->tdb1.transaction->old_map_size,
1000                                              tdb->file->map_size -
1001                                              tdb->tdb1.transaction->old_map_size) == -1) {
1002                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1003                                    "tdb1_transaction_prepare_commit:"
1004                                    " expansion failed");
1005                         return -1;
1006                 }
1007                 tdb->stats.transaction_expand_file++;
1008                 tdb->file->map_size = tdb->tdb1.transaction->old_map_size;
1009                 methods->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
1010         }
1011
1012         /* Keep the open lock until the actual commit */
1013
1014         return 0;
1015 }
1016
1017 /*
1018    prepare to commit the current transaction
1019 */
1020 int tdb1_transaction_prepare_commit(struct tdb_context *tdb)
1021 {
1022         return _tdb1_transaction_prepare_commit(tdb);
1023 }
1024
1025 /* A repack is worthwhile if the largest is less than half total free. */
1026 static bool repack_worthwhile(struct tdb_context *tdb)
1027 {
1028         tdb1_off_t ptr;
1029         struct tdb1_record rec;
1030         tdb1_len_t total = 0, largest = 0;
1031
1032         if (tdb1_ofs_read(tdb, TDB1_FREELIST_TOP, &ptr) == -1) {
1033                 return false;
1034         }
1035
1036         while (ptr != 0 && tdb1_rec_free_read(tdb, ptr, &rec) == 0) {
1037                 total += rec.rec_len;
1038                 if (rec.rec_len > largest) {
1039                         largest = rec.rec_len;
1040                 }
1041                 ptr = rec.next;
1042         }
1043
1044         return total > largest * 2;
1045 }
1046
1047 /*
1048   commit the current transaction
1049 */
1050 int tdb1_transaction_commit(struct tdb_context *tdb)
1051 {
1052         const struct tdb1_methods *methods;
1053         int i;
1054         bool need_repack = false;
1055
1056         if (tdb->tdb1.transaction == NULL) {
1057                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
1058                                         "tdb1_transaction_commit:"
1059                                         " no transaction");
1060                 return -1;
1061         }
1062
1063         if (tdb->tdb1.transaction->transaction_error) {
1064                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
1065                                         "tdb1_transaction_commit:"
1066                                         " transaction error pending");
1067                 _tdb1_transaction_cancel(tdb);
1068                 return -1;
1069         }
1070
1071
1072         if (tdb->tdb1.transaction->nesting != 0) {
1073                 tdb->tdb1.transaction->nesting--;
1074                 return 0;
1075         }
1076
1077         /* check for a null transaction */
1078         if (tdb->tdb1.transaction->blocks == NULL) {
1079                 _tdb1_transaction_cancel(tdb);
1080                 return 0;
1081         }
1082
1083         if (!tdb->tdb1.transaction->prepared) {
1084                 int ret = _tdb1_transaction_prepare_commit(tdb);
1085                 if (ret) {
1086                         _tdb1_transaction_cancel(tdb);
1087                         return ret;
1088                 }
1089         }
1090
1091         methods = tdb->tdb1.transaction->io_methods;
1092
1093         /* perform all the writes */
1094         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
1095                 tdb1_off_t offset;
1096                 tdb1_len_t length;
1097
1098                 if (tdb->tdb1.transaction->blocks[i] == NULL) {
1099                         continue;
1100                 }
1101
1102                 offset = i * tdb->tdb1.transaction->block_size;
1103                 length = tdb->tdb1.transaction->block_size;
1104                 if (i == tdb->tdb1.transaction->num_blocks-1) {
1105                         length = tdb->tdb1.transaction->last_block_size;
1106                 }
1107
1108                 if (methods->tdb1_write(tdb, offset, tdb->tdb1.transaction->blocks[i], length) == -1) {
1109                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1110                                    "tdb1_transaction_commit:"
1111                                    " write failed during commit");
1112
1113                         /* we've overwritten part of the data and
1114                            possibly expanded the file, so we need to
1115                            run the crash recovery code */
1116                         tdb->tdb1.io = methods;
1117                         tdb1_transaction_recover(tdb);
1118
1119                         _tdb1_transaction_cancel(tdb);
1120
1121                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1122                                    "tdb1_transaction_commit: write failed");
1123                         return -1;
1124                 }
1125                 SAFE_FREE(tdb->tdb1.transaction->blocks[i]);
1126         }
1127
1128         /* Do this before we drop lock or blocks. */
1129         if (tdb->tdb1.transaction->expanded) {
1130                 need_repack = repack_worthwhile(tdb);
1131         }
1132
1133         SAFE_FREE(tdb->tdb1.transaction->blocks);
1134         tdb->tdb1.transaction->num_blocks = 0;
1135
1136         /* ensure the new data is on disk */
1137         if (transaction1_sync(tdb, 0, tdb->file->map_size) == -1) {
1138                 return -1;
1139         }
1140
1141         /*
1142           TODO: maybe write to some dummy hdr field, or write to magic
1143           offset without mmap, before the last sync, instead of the
1144           utime() call
1145         */
1146
1147         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1148            don't change the mtime of the file, this means the file may
1149            not be backed up (as tdb rounding to block sizes means that
1150            file size changes are quite rare too). The following forces
1151            mtime changes when a transaction completes */
1152 #if HAVE_UTIME
1153         utime(tdb->name, NULL);
1154 #endif
1155
1156         /* use a transaction cancel to free memory and remove the
1157            transaction locks */
1158         _tdb1_transaction_cancel(tdb);
1159
1160         if (need_repack) {
1161                 return tdb_repack(tdb);
1162         }
1163
1164         return 0;
1165 }
1166
1167
1168 /*
1169   recover from an aborted transaction. Must be called with exclusive
1170   database write access already established (including the open
1171   lock to prevent new processes attaching)
1172 */
1173 int tdb1_transaction_recover(struct tdb_context *tdb)
1174 {
1175         tdb1_off_t recovery_head, recovery_eof;
1176         unsigned char *data, *p;
1177         uint32_t zero = 0;
1178         struct tdb1_record rec;
1179
1180         /* find the recovery area */
1181         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1182                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1183                            "tdb1_transaction_recover:"
1184                            " failed to read recovery head");
1185                 return -1;
1186         }
1187
1188         if (recovery_head == 0) {
1189                 /* we have never allocated a recovery record */
1190                 return 0;
1191         }
1192
1193         /* read the recovery record */
1194         if (tdb->tdb1.io->tdb1_read(tdb, recovery_head, &rec,
1195                                    sizeof(rec), TDB1_DOCONV()) == -1) {
1196                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1197                            "tdb1_transaction_recover:"
1198                            " failed to read recovery record");
1199                 return -1;
1200         }
1201
1202         if (rec.magic != TDB1_RECOVERY_MAGIC) {
1203                 /* there is no valid recovery data */
1204                 return 0;
1205         }
1206
1207         if (tdb->flags & TDB_RDONLY) {
1208                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1209                                         "tdb1_transaction_recover:"
1210                                         " attempt to recover read only"
1211                                         " database");
1212                 return -1;
1213         }
1214
1215         recovery_eof = rec.key_len;
1216
1217         data = (unsigned char *)malloc(rec.data_len);
1218         if (data == NULL) {
1219                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1220                                         "tdb1_transaction_recover:"
1221                                         " failed to allocate recovery data");
1222                 return -1;
1223         }
1224
1225         /* read the full recovery data */
1226         if (tdb->tdb1.io->tdb1_read(tdb, recovery_head + sizeof(rec), data,
1227                                    rec.data_len, 0) == -1) {
1228                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1229                            "tdb1_transaction_recover:"
1230                            " failed to read recovery data");
1231                 return -1;
1232         }
1233
1234         /* recover the file data */
1235         p = data;
1236         while (p+8 < data + rec.data_len) {
1237                 uint32_t ofs, len;
1238                 if (TDB1_DOCONV()) {
1239                         tdb1_convert(p, 8);
1240                 }
1241                 memcpy(&ofs, p, 4);
1242                 memcpy(&len, p+4, 4);
1243
1244                 if (tdb->tdb1.io->tdb1_write(tdb, ofs, p+8, len) == -1) {
1245                         free(data);
1246                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1247                                    "tdb1_transaction_recover: failed to recover"
1248                                    " %d bytes at offset %d", len, ofs);
1249                         return -1;
1250                 }
1251                 p += 8 + len;
1252         }
1253
1254         free(data);
1255
1256         if (transaction1_sync(tdb, 0, tdb->file->map_size) == -1) {
1257                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1258                            "tdb1_transaction_recover: failed to sync recovery");
1259                 return -1;
1260         }
1261
1262         /* if the recovery area is after the recovered eof then remove it */
1263         if (recovery_eof <= recovery_head) {
1264                 if (tdb1_ofs_write(tdb, TDB1_RECOVERY_HEAD, &zero) == -1) {
1265                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1266                                    "tdb1_transaction_recover: failed to remove"
1267                                    " recovery head");
1268                         return -1;
1269                 }
1270         }
1271
1272         /* remove the recovery magic */
1273         if (tdb1_ofs_write(tdb, recovery_head + offsetof(struct tdb1_record, magic),
1274                           &zero) == -1) {
1275                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1276                            "tdb1_transaction_recover: failed to remove"
1277                            " recovery magic");
1278                 return -1;
1279         }
1280
1281         if (transaction1_sync(tdb, 0, recovery_eof) == -1) {
1282                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1283                            "tdb1_transaction_recover:"
1284                            " failed to sync2 recovery");
1285                 return -1;
1286         }
1287
1288         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1289                    "tdb1_transaction_recover: recovered %d byte database",
1290                    recovery_eof);
1291
1292         /* all done */
1293         return 0;
1294 }
1295
1296 /* Any I/O failures we say "needs recovery". */
1297 tdb_bool_err tdb1_needs_recovery(struct tdb_context *tdb)
1298 {
1299         tdb1_off_t recovery_head;
1300         struct tdb1_record rec;
1301
1302         /* find the recovery area */
1303         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1304                 return tdb->last_error;
1305         }
1306
1307         if (recovery_head == 0) {
1308                 /* we have never allocated a recovery record */
1309                 return false;
1310         }
1311
1312         /* read the recovery record */
1313         if (tdb->tdb1.io->tdb1_read(tdb, recovery_head, &rec,
1314                                    sizeof(rec), TDB1_DOCONV()) == -1) {
1315                 return tdb->last_error;
1316         }
1317
1318         return (rec.magic == TDB1_RECOVERY_MAGIC);
1319 }