]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/tdb1_transaction.c
c4a2b10b20759685112524f6a101e69964d3ec9f
[ccan] / ccan / tdb2 / tdb1_transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb1_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb1_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb1_write() calls. The hooked
48     transaction versions of tdb1_read() and tdb1_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb1_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb1_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb1_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
89     tdb1_add_flags() transaction nesting is enabled.
90     The default is that transaction nesting is NOT allowed.
91
92     Beware. when transactions are nested a transaction successfully
93     completed with tdb1_transaction_commit() can be silently unrolled later.
94 */
95
96
97 /*
98   hold the context of any current transaction
99 */
100 struct tdb1_transaction {
101         /* we keep a mirrored copy of the tdb hash heads here so
102            tdb1_next_hash_chain() can operate efficiently */
103         uint32_t *hash_heads;
104
105         /* the original io methods - used to do IOs to the real db */
106         const struct tdb1_methods *io_methods;
107
108         /* the list of transaction blocks. When a block is first
109            written to, it gets created in this list */
110         uint8_t **blocks;
111         uint32_t num_blocks;
112         uint32_t block_size;      /* bytes in each block */
113         uint32_t last_block_size; /* number of valid bytes in the last block */
114
115         /* non-zero when an internal transaction error has
116            occurred. All write operations will then fail until the
117            transaction is ended */
118         int transaction_error;
119
120         /* when inside a transaction we need to keep track of any
121            nested tdb1_transaction_start() calls, as these are allowed,
122            but don't create a new transaction */
123         int nesting;
124
125         /* set when a prepare has already occurred */
126         bool prepared;
127         tdb1_off_t magic_offset;
128
129         /* old file size before transaction */
130         tdb1_len_t old_map_size;
131
132         /* did we expand in this transaction */
133         bool expanded;
134 };
135
136
137 /*
138   read while in a transaction. We need to check first if the data is in our list
139   of transaction elements, then if not do a real read
140 */
141 static int transaction1_read(struct tdb_context *tdb, tdb1_off_t off, void *buf,
142                              tdb1_len_t len, int cv)
143 {
144         uint32_t blk;
145
146         /* break it down into block sized ops */
147         while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
148                 tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
149                 if (transaction1_read(tdb, off, buf, len2, cv) != 0) {
150                         return -1;
151                 }
152                 len -= len2;
153                 off += len2;
154                 buf = (void *)(len2 + (char *)buf);
155         }
156
157         if (len == 0) {
158                 return 0;
159         }
160
161         blk = off / tdb->tdb1.transaction->block_size;
162
163         /* see if we have it in the block list */
164         if (tdb->tdb1.transaction->num_blocks <= blk ||
165             tdb->tdb1.transaction->blocks[blk] == NULL) {
166                 /* nope, do a real read */
167                 if (tdb->tdb1.transaction->io_methods->tdb1_read(tdb, off, buf, len, cv) != 0) {
168                         goto fail;
169                 }
170                 return 0;
171         }
172
173         /* it is in the block list. Now check for the last block */
174         if (blk == tdb->tdb1.transaction->num_blocks-1) {
175                 if (len > tdb->tdb1.transaction->last_block_size) {
176                         goto fail;
177                 }
178         }
179
180         /* now copy it out of this block */
181         memcpy(buf, tdb->tdb1.transaction->blocks[blk] + (off % tdb->tdb1.transaction->block_size), len);
182         if (cv) {
183                 tdb1_convert(buf, len);
184         }
185         return 0;
186
187 fail:
188         tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
189                                 "transaction_read: failed at off=%d len=%d",
190                                 off, len);
191         tdb->tdb1.transaction->transaction_error = 1;
192         return -1;
193 }
194
195
196 /*
197   write while in a transaction
198 */
199 static int transaction1_write(struct tdb_context *tdb, tdb1_off_t off,
200                              const void *buf, tdb1_len_t len)
201 {
202         uint32_t blk;
203
204         /* Only a commit is allowed on a prepared transaction */
205         if (tdb->tdb1.transaction->prepared) {
206                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
207                                         "transaction_write: transaction already"
208                                         " prepared, write not allowed");
209                 tdb->tdb1.transaction->transaction_error = 1;
210                 return -1;
211         }
212
213         /* if the write is to a hash head, then update the transaction
214            hash heads */
215         if (len == sizeof(tdb1_off_t) && off >= TDB1_FREELIST_TOP &&
216             off < TDB1_FREELIST_TOP+TDB1_HASHTABLE_SIZE(tdb)) {
217                 uint32_t chain = (off-TDB1_FREELIST_TOP) / sizeof(tdb1_off_t);
218                 memcpy(&tdb->tdb1.transaction->hash_heads[chain], buf, len);
219         }
220
221         /* break it up into block sized chunks */
222         while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
223                 tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
224                 if (transaction1_write(tdb, off, buf, len2) != 0) {
225                         return -1;
226                 }
227                 len -= len2;
228                 off += len2;
229                 if (buf != NULL) {
230                         buf = (const void *)(len2 + (const char *)buf);
231                 }
232         }
233
234         if (len == 0) {
235                 return 0;
236         }
237
238         blk = off / tdb->tdb1.transaction->block_size;
239         off = off % tdb->tdb1.transaction->block_size;
240
241         if (tdb->tdb1.transaction->num_blocks <= blk) {
242                 uint8_t **new_blocks;
243                 /* expand the blocks array */
244                 if (tdb->tdb1.transaction->blocks == NULL) {
245                         new_blocks = (uint8_t **)malloc(
246                                 (blk+1)*sizeof(uint8_t *));
247                 } else {
248                         new_blocks = (uint8_t **)realloc(
249                                 tdb->tdb1.transaction->blocks,
250                                 (blk+1)*sizeof(uint8_t *));
251                 }
252                 if (new_blocks == NULL) {
253                         tdb->last_error = TDB_ERR_OOM;
254                         goto fail;
255                 }
256                 memset(&new_blocks[tdb->tdb1.transaction->num_blocks], 0,
257                        (1+(blk - tdb->tdb1.transaction->num_blocks))*sizeof(uint8_t *));
258                 tdb->tdb1.transaction->blocks = new_blocks;
259                 tdb->tdb1.transaction->num_blocks = blk+1;
260                 tdb->tdb1.transaction->last_block_size = 0;
261         }
262
263         /* allocate and fill a block? */
264         if (tdb->tdb1.transaction->blocks[blk] == NULL) {
265                 tdb->tdb1.transaction->blocks[blk] = (uint8_t *)calloc(tdb->tdb1.transaction->block_size, 1);
266                 if (tdb->tdb1.transaction->blocks[blk] == NULL) {
267                         tdb->last_error = TDB_ERR_OOM;
268                         tdb->tdb1.transaction->transaction_error = 1;
269                         return -1;
270                 }
271                 if (tdb->tdb1.transaction->old_map_size > blk * tdb->tdb1.transaction->block_size) {
272                         tdb1_len_t len2 = tdb->tdb1.transaction->block_size;
273                         if (len2 + (blk * tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->old_map_size) {
274                                 len2 = tdb->tdb1.transaction->old_map_size - (blk * tdb->tdb1.transaction->block_size);
275                         }
276                         if (tdb->tdb1.transaction->io_methods->tdb1_read(tdb, blk * tdb->tdb1.transaction->block_size,
277                                                                    tdb->tdb1.transaction->blocks[blk],
278                                                                    len2, 0) != 0) {
279                                 SAFE_FREE(tdb->tdb1.transaction->blocks[blk]);
280                                 tdb->last_error = TDB_ERR_IO;
281                                 goto fail;
282                         }
283                         if (blk == tdb->tdb1.transaction->num_blocks-1) {
284                                 tdb->tdb1.transaction->last_block_size = len2;
285                         }
286                 }
287         }
288
289         /* overwrite part of an existing block */
290         if (buf == NULL) {
291                 memset(tdb->tdb1.transaction->blocks[blk] + off, 0, len);
292         } else {
293                 memcpy(tdb->tdb1.transaction->blocks[blk] + off, buf, len);
294         }
295         if (blk == tdb->tdb1.transaction->num_blocks-1) {
296                 if (len + off > tdb->tdb1.transaction->last_block_size) {
297                         tdb->tdb1.transaction->last_block_size = len + off;
298                 }
299         }
300
301         return 0;
302
303 fail:
304         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
305                    "transaction_write: failed at off=%d len=%d",
306                    (blk*tdb->tdb1.transaction->block_size) + off, len);
307         tdb->tdb1.transaction->transaction_error = 1;
308         return -1;
309 }
310
311
312 /*
313   write while in a transaction - this varient never expands the transaction blocks, it only
314   updates existing blocks. This means it cannot change the recovery size
315 */
316 static int transaction1_write_existing(struct tdb_context *tdb, tdb1_off_t off,
317                                       const void *buf, tdb1_len_t len)
318 {
319         uint32_t blk;
320
321         /* break it up into block sized chunks */
322         while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
323                 tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
324                 if (transaction1_write_existing(tdb, off, buf, len2) != 0) {
325                         return -1;
326                 }
327                 len -= len2;
328                 off += len2;
329                 if (buf != NULL) {
330                         buf = (const void *)(len2 + (const char *)buf);
331                 }
332         }
333
334         if (len == 0) {
335                 return 0;
336         }
337
338         blk = off / tdb->tdb1.transaction->block_size;
339         off = off % tdb->tdb1.transaction->block_size;
340
341         if (tdb->tdb1.transaction->num_blocks <= blk ||
342             tdb->tdb1.transaction->blocks[blk] == NULL) {
343                 return 0;
344         }
345
346         if (blk == tdb->tdb1.transaction->num_blocks-1 &&
347             off + len > tdb->tdb1.transaction->last_block_size) {
348                 if (off >= tdb->tdb1.transaction->last_block_size) {
349                         return 0;
350                 }
351                 len = tdb->tdb1.transaction->last_block_size - off;
352         }
353
354         /* overwrite part of an existing block */
355         memcpy(tdb->tdb1.transaction->blocks[blk] + off, buf, len);
356
357         return 0;
358 }
359
360
361 /*
362   accelerated hash chain head search, using the cached hash heads
363 */
364 static void transaction1_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
365 {
366         uint32_t h = *chain;
367         for (;h < tdb->tdb1.header.hash_size;h++) {
368                 /* the +1 takes account of the freelist */
369                 if (0 != tdb->tdb1.transaction->hash_heads[h+1]) {
370                         break;
371                 }
372         }
373         (*chain) = h;
374 }
375
376 /*
377   out of bounds check during a transaction
378 */
379 static int transaction1_oob(struct tdb_context *tdb, tdb1_off_t off, tdb1_off_t len, int probe)
380 {
381         if (off + len >= off && off + len <= tdb->file->map_size) {
382                 return 0;
383         }
384         tdb->last_error = TDB_ERR_IO;
385         return -1;
386 }
387
388 /*
389   transaction version of tdb1_expand().
390 */
391 static int transaction1_expand_file(struct tdb_context *tdb, tdb1_off_t size,
392                                     tdb1_off_t addition)
393 {
394         /* add a write to the transaction elements, so subsequent
395            reads see the zero data */
396         if (transaction1_write(tdb, size, NULL, addition) != 0) {
397                 return -1;
398         }
399
400         tdb->tdb1.transaction->expanded = true;
401
402         return 0;
403 }
404
405 static const struct tdb1_methods transaction1_methods = {
406         transaction1_read,
407         transaction1_write,
408         transaction1_next_hash_chain,
409         transaction1_oob,
410         transaction1_expand_file,
411 };
412
413
414 /*
415   start a tdb transaction. No token is returned, as only a single
416   transaction is allowed to be pending per tdb_context
417 */
418 static int _tdb1_transaction_start(struct tdb_context *tdb)
419 {
420         /* some sanity checks */
421         if (tdb->flags & TDB_INTERNAL) {
422                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
423                                              TDB_LOG_USE_ERROR,
424                                              "tdb1_transaction_start:"
425                                              " cannot start a"
426                                              " transaction on an"
427                                              " internal tdb");
428                 return -1;
429         }
430
431         if ((tdb->flags & TDB_RDONLY) || tdb->tdb1.traverse_read) {
432                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_RDONLY,
433                                              TDB_LOG_USE_ERROR,
434                                              "tdb_transaction_start:"
435                                              " cannot start a"
436                                              " transaction on a "
437                                              " read-only tdb");
438                 return -1;
439         }
440
441         /* cope with nested tdb1_transaction_start() calls */
442         if (tdb->tdb1.transaction != NULL) {
443                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
444                         tdb->last_error
445                                 = tdb_logerr(tdb, TDB_ERR_EINVAL,
446                                              TDB_LOG_USE_ERROR,
447                                              "tdb_transaction_start:"
448                                              " already inside transaction");
449                         return -1;
450                 }
451                 tdb->stats.transaction_nest++;
452                 tdb->tdb1.transaction->nesting++;
453                 return 0;
454         }
455
456         if (tdb1_have_extra_locks(tdb)) {
457                 /* the caller must not have any locks when starting a
458                    transaction as otherwise we'll be screwed by lack
459                    of nested locks in posix */
460                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
461                                         "tdb1_transaction_start: cannot start a"
462                                         " transaction with locks held");
463                 return -1;
464         }
465
466         if (tdb->tdb1.travlocks.next != NULL) {
467                 /* you cannot use transactions inside a traverse (although you can use
468                    traverse inside a transaction) as otherwise you can end up with
469                    deadlock */
470                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
471                                         "tdb1_transaction_start: cannot start a"
472                                         " transaction within a traverse");
473                 return -1;
474         }
475
476         tdb->tdb1.transaction = (struct tdb1_transaction *)
477                 calloc(sizeof(struct tdb1_transaction), 1);
478         if (tdb->tdb1.transaction == NULL) {
479                 tdb->last_error = TDB_ERR_OOM;
480                 return -1;
481         }
482
483         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
484         tdb->tdb1.transaction->block_size = tdb->tdb1.page_size;
485
486         /* get the transaction write lock. This is a blocking lock. As
487            discussed with Volker, there are a number of ways we could
488            make this async, which we will probably do in the future */
489         if (tdb1_transaction_lock(tdb, F_WRLCK, TDB_LOCK_WAIT) == -1) {
490                 SAFE_FREE(tdb->tdb1.transaction->blocks);
491                 SAFE_FREE(tdb->tdb1.transaction);
492                 return -1;
493         }
494
495         /* get a read lock from the freelist to the end of file. This
496            is upgraded to a write lock during the commit */
497         if (tdb1_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
498                 if (errno != EAGAIN && errno != EINTR) {
499                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
500                                    "tdb1_transaction_start:"
501                                    " failed to get hash locks");
502                 }
503                 goto fail_allrecord_lock;
504         }
505
506         /* setup a copy of the hash table heads so the hash scan in
507            traverse can be fast */
508         tdb->tdb1.transaction->hash_heads = (uint32_t *)
509                 calloc(tdb->tdb1.header.hash_size+1, sizeof(uint32_t));
510         if (tdb->tdb1.transaction->hash_heads == NULL) {
511                 tdb->last_error = TDB_ERR_OOM;
512                 goto fail;
513         }
514         if (tdb->tdb1.io->tdb1_read(tdb, TDB1_FREELIST_TOP, tdb->tdb1.transaction->hash_heads,
515                                    TDB1_HASHTABLE_SIZE(tdb), 0) != 0) {
516                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
517                            "tdb1_transaction_start: failed to read hash heads");
518                 goto fail;
519         }
520
521         /* make sure we know about any file expansions already done by
522            anyone else */
523         tdb->tdb1.io->tdb1_oob(tdb, tdb->file->map_size, 1, 1);
524         tdb->tdb1.transaction->old_map_size = tdb->file->map_size;
525
526         /* finally hook the io methods, replacing them with
527            transaction specific methods */
528         tdb->tdb1.transaction->io_methods = tdb->tdb1.io;
529         tdb->tdb1.io = &transaction1_methods;
530
531         tdb->stats.transactions++;
532         return 0;
533
534 fail:
535         tdb1_allrecord_unlock(tdb, F_RDLCK);
536 fail_allrecord_lock:
537         tdb1_transaction_unlock(tdb, F_WRLCK);
538         SAFE_FREE(tdb->tdb1.transaction->blocks);
539         SAFE_FREE(tdb->tdb1.transaction->hash_heads);
540         SAFE_FREE(tdb->tdb1.transaction);
541         return -1;
542 }
543
544 int tdb1_transaction_start(struct tdb_context *tdb)
545 {
546         return _tdb1_transaction_start(tdb);
547 }
548
549 /*
550   sync to disk
551 */
552 static int transaction1_sync(struct tdb_context *tdb, tdb1_off_t offset, tdb1_len_t length)
553 {
554         if (tdb->flags & TDB_NOSYNC) {
555                 return 0;
556         }
557
558 #if HAVE_FDATASYNC
559         if (fdatasync(tdb->file->fd) != 0) {
560 #else
561         if (fsync(tdb->file->fd) != 0) {
562 #endif
563                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
564                                         "tdb1_transaction: fsync failed");
565                 return -1;
566         }
567 #if HAVE_MMAP
568         if (tdb->file->map_ptr) {
569                 tdb1_off_t moffset = offset & ~(tdb->tdb1.page_size-1);
570                 if (msync(moffset + (char *)tdb->file->map_ptr,
571                           length + (offset - moffset), MS_SYNC) != 0) {
572                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
573                                                 "tdb1_transaction:"
574                                                 " msync failed - %s",
575                                                 strerror(errno));
576                         return -1;
577                 }
578         }
579 #endif
580         return 0;
581 }
582
583
584 static int _tdb1_transaction_cancel(struct tdb_context *tdb)
585 {
586         int i, ret = 0;
587
588         if (tdb->tdb1.transaction == NULL) {
589                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
590                                         "tdb1_transaction_cancel:"
591                                         " no transaction");
592                 return -1;
593         }
594
595         if (tdb->tdb1.transaction->nesting != 0) {
596                 tdb->tdb1.transaction->transaction_error = 1;
597                 tdb->tdb1.transaction->nesting--;
598                 return 0;
599         }
600
601         tdb->file->map_size = tdb->tdb1.transaction->old_map_size;
602
603         /* free all the transaction blocks */
604         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
605                 if (tdb->tdb1.transaction->blocks[i] != NULL) {
606                         free(tdb->tdb1.transaction->blocks[i]);
607                 }
608         }
609         SAFE_FREE(tdb->tdb1.transaction->blocks);
610
611         if (tdb->tdb1.transaction->magic_offset) {
612                 const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
613                 const uint32_t invalid = TDB1_RECOVERY_INVALID_MAGIC;
614
615                 /* remove the recovery marker */
616                 if (methods->tdb1_write(tdb, tdb->tdb1.transaction->magic_offset, &invalid, 4) == -1 ||
617                 transaction1_sync(tdb, tdb->tdb1.transaction->magic_offset, 4) == -1) {
618                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
619                                    "tdb1_transaction_cancel: failed to"
620                                    " remove recovery magic");
621                         ret = -1;
622                 }
623         }
624
625         /* This also removes the OPEN_LOCK, if we have it. */
626         tdb1_release_transaction_locks(tdb);
627
628         /* restore the normal io methods */
629         tdb->tdb1.io = tdb->tdb1.transaction->io_methods;
630
631         SAFE_FREE(tdb->tdb1.transaction->hash_heads);
632         SAFE_FREE(tdb->tdb1.transaction);
633
634         return ret;
635 }
636
637 /*
638   cancel the current transaction
639 */
640 int tdb1_transaction_cancel(struct tdb_context *tdb)
641 {
642         tdb->stats.transaction_cancel++;
643         return _tdb1_transaction_cancel(tdb);
644 }
645
646 /*
647   work out how much space the linearised recovery data will consume
648 */
649 static tdb1_len_t tdb1_recovery_size(struct tdb_context *tdb)
650 {
651         tdb1_len_t recovery_size = 0;
652         int i;
653
654         recovery_size = sizeof(uint32_t);
655         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
656                 if (i * tdb->tdb1.transaction->block_size >= tdb->tdb1.transaction->old_map_size) {
657                         break;
658                 }
659                 if (tdb->tdb1.transaction->blocks[i] == NULL) {
660                         continue;
661                 }
662                 recovery_size += 2*sizeof(tdb1_off_t);
663                 if (i == tdb->tdb1.transaction->num_blocks-1) {
664                         recovery_size += tdb->tdb1.transaction->last_block_size;
665                 } else {
666                         recovery_size += tdb->tdb1.transaction->block_size;
667                 }
668         }
669
670         return recovery_size;
671 }
672
673 int tdb1_recovery_area(struct tdb_context *tdb,
674                       const struct tdb1_methods *methods,
675                       tdb1_off_t *recovery_offset,
676                       struct tdb1_record *rec)
677 {
678         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, recovery_offset) == -1) {
679                 return -1;
680         }
681
682         if (*recovery_offset == 0) {
683                 rec->rec_len = 0;
684                 return 0;
685         }
686
687         if (methods->tdb1_read(tdb, *recovery_offset, rec, sizeof(*rec),
688                               TDB1_DOCONV()) == -1) {
689                 return -1;
690         }
691
692         /* ignore invalid recovery regions: can happen in crash */
693         if (rec->magic != TDB1_RECOVERY_MAGIC &&
694             rec->magic != TDB1_RECOVERY_INVALID_MAGIC) {
695                 *recovery_offset = 0;
696                 rec->rec_len = 0;
697         }
698         return 0;
699 }
700
701 /*
702   allocate the recovery area, or use an existing recovery area if it is
703   large enough
704 */
705 static int tdb1_recovery_allocate(struct tdb_context *tdb,
706                                  tdb1_len_t *recovery_size,
707                                  tdb1_off_t *recovery_offset,
708                                  tdb1_len_t *recovery_max_size)
709 {
710         struct tdb1_record rec;
711         const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
712         tdb1_off_t recovery_head;
713
714         if (tdb1_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
715                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
716                            "tdb1_recovery_allocate:"
717                            " failed to read recovery head");
718                 return -1;
719         }
720
721         *recovery_size = tdb1_recovery_size(tdb);
722
723         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
724                 /* it fits in the existing area */
725                 *recovery_max_size = rec.rec_len;
726                 *recovery_offset = recovery_head;
727                 return 0;
728         }
729
730         /* we need to free up the old recovery area, then allocate a
731            new one at the end of the file. Note that we cannot use
732            tdb1_allocate() to allocate the new one as that might return
733            us an area that is being currently used (as of the start of
734            the transaction) */
735         if (recovery_head != 0) {
736                 if (tdb1_free(tdb, recovery_head, &rec) == -1) {
737                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
738                                    "tdb1_recovery_allocate: failed to free"
739                                    " previous recovery area");
740                         return -1;
741                 }
742         }
743
744         /* the tdb1_free() call might have increased the recovery size */
745         *recovery_size = tdb1_recovery_size(tdb);
746
747         /* round up to a multiple of page size */
748         *recovery_max_size = TDB1_ALIGN(sizeof(rec) + *recovery_size,
749                                         tdb->tdb1.page_size) - sizeof(rec);
750         *recovery_offset = tdb->file->map_size;
751         recovery_head = *recovery_offset;
752
753         if (methods->tdb1_expand_file(tdb, tdb->tdb1.transaction->old_map_size,
754                                      (tdb->file->map_size - tdb->tdb1.transaction->old_map_size) +
755                                      sizeof(rec) + *recovery_max_size) == -1) {
756                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
757                            "tdb1_recovery_allocate:"
758                            " failed to create recovery area");
759                 return -1;
760         }
761         tdb->stats.transaction_expand_file++;
762
763         /* remap the file (if using mmap) */
764         methods->tdb1_oob(tdb, tdb->file->map_size, 1, 1);
765
766         /* we have to reset the old map size so that we don't try to expand the file
767            again in the transaction commit, which would destroy the recovery area */
768         tdb->tdb1.transaction->old_map_size = tdb->file->map_size;
769
770         /* write the recovery header offset and sync - we can sync without a race here
771            as the magic ptr in the recovery record has not been set */
772         TDB1_CONV(recovery_head);
773         if (methods->tdb1_write(tdb, TDB1_RECOVERY_HEAD,
774                                &recovery_head, sizeof(tdb1_off_t)) == -1) {
775                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
776                            "tdb1_recovery_allocate:"
777                            " failed to write recovery head");
778                 return -1;
779         }
780         if (transaction1_write_existing(tdb, TDB1_RECOVERY_HEAD, &recovery_head, sizeof(tdb1_off_t)) == -1) {
781                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
782                            "tdb1_recovery_allocate:"
783                            " failed to write recovery head");
784                 return -1;
785         }
786
787         return 0;
788 }
789
790
791 /*
792   setup the recovery data that will be used on a crash during commit
793 */
794 static int transaction1_setup_recovery(struct tdb_context *tdb,
795                                        tdb1_off_t *magic_offset)
796 {
797         tdb1_len_t recovery_size;
798         unsigned char *data, *p;
799         const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
800         struct tdb1_record *rec;
801         tdb1_off_t recovery_offset, recovery_max_size;
802         tdb1_off_t old_map_size = tdb->tdb1.transaction->old_map_size;
803         uint32_t magic, tailer;
804         int i;
805
806         /*
807           check that the recovery area has enough space
808         */
809         if (tdb1_recovery_allocate(tdb, &recovery_size,
810                                   &recovery_offset, &recovery_max_size) == -1) {
811                 return -1;
812         }
813
814         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
815         if (data == NULL) {
816                 tdb->last_error = TDB_ERR_OOM;
817                 return -1;
818         }
819
820         rec = (struct tdb1_record *)data;
821         memset(rec, 0, sizeof(*rec));
822
823         rec->magic    = TDB1_RECOVERY_INVALID_MAGIC;
824         rec->data_len = recovery_size;
825         rec->rec_len  = recovery_max_size;
826         rec->key_len  = old_map_size;
827         TDB1_CONV(*rec);
828
829         /* build the recovery data into a single blob to allow us to do a single
830            large write, which should be more efficient */
831         p = data + sizeof(*rec);
832         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
833                 tdb1_off_t offset;
834                 tdb1_len_t length;
835
836                 if (tdb->tdb1.transaction->blocks[i] == NULL) {
837                         continue;
838                 }
839
840                 offset = i * tdb->tdb1.transaction->block_size;
841                 length = tdb->tdb1.transaction->block_size;
842                 if (i == tdb->tdb1.transaction->num_blocks-1) {
843                         length = tdb->tdb1.transaction->last_block_size;
844                 }
845
846                 if (offset >= old_map_size) {
847                         continue;
848                 }
849                 if (offset + length > tdb->tdb1.transaction->old_map_size) {
850                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT,
851                                                 TDB_LOG_ERROR,
852                                                 "tdb1_transaction_setup_recovery: transaction data over new region boundary");
853                         free(data);
854                         return -1;
855                 }
856                 memcpy(p, &offset, 4);
857                 memcpy(p+4, &length, 4);
858                 if (TDB1_DOCONV()) {
859                         tdb1_convert(p, 8);
860                 }
861                 /* the recovery area contains the old data, not the
862                    new data, so we have to call the original tdb1_read
863                    method to get it */
864                 if (methods->tdb1_read(tdb, offset, p + 8, length, 0) != 0) {
865                         free(data);
866                         tdb->last_error = TDB_ERR_IO;
867                         return -1;
868                 }
869                 p += 8 + length;
870         }
871
872         /* and the tailer */
873         tailer = sizeof(*rec) + recovery_max_size;
874         memcpy(p, &tailer, 4);
875         if (TDB1_DOCONV()) {
876                 tdb1_convert(p, 4);
877         }
878
879         /* write the recovery data to the recovery area */
880         if (methods->tdb1_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
881                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
882                            "tdb1_transaction_setup_recovery:"
883                            " failed to write recovery data");
884                 free(data);
885                 return -1;
886         }
887         if (transaction1_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
888                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
889                            "tdb1_transaction_setup_recovery: failed to write"
890                            " secondary recovery data");
891                 free(data);
892                 return -1;
893         }
894
895         /* as we don't have ordered writes, we have to sync the recovery
896            data before we update the magic to indicate that the recovery
897            data is present */
898         if (transaction1_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
899                 free(data);
900                 return -1;
901         }
902
903         free(data);
904
905         magic = TDB1_RECOVERY_MAGIC;
906         TDB1_CONV(magic);
907
908         *magic_offset = recovery_offset + offsetof(struct tdb1_record, magic);
909
910         if (methods->tdb1_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
911                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
912                            "tdb1_transaction_setup_recovery:"
913                            " failed to write recovery magic");
914                 return -1;
915         }
916         if (transaction1_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
917                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
918                            "tdb1_transaction_setup_recovery:"
919                            " failed to write secondary recovery magic");
920                 return -1;
921         }
922
923         /* ensure the recovery magic marker is on disk */
924         if (transaction1_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
925                 return -1;
926         }
927
928         return 0;
929 }
930
931 static int _tdb1_transaction_prepare_commit(struct tdb_context *tdb)
932 {
933         const struct tdb1_methods *methods;
934
935         if (tdb->tdb1.transaction == NULL) {
936                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
937                                         "tdb1_transaction_prepare_commit:"
938                                         " no transaction");
939                 return -1;
940         }
941
942         if (tdb->tdb1.transaction->prepared) {
943                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
944                                         "tdb1_transaction_prepare_commit:"
945                                         " transaction already prepared");
946                 _tdb1_transaction_cancel(tdb);
947                 return -1;
948         }
949
950         if (tdb->tdb1.transaction->transaction_error) {
951                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
952                                         "tdb1_transaction_prepare_commit:"
953                                         " transaction error pending");
954                 _tdb1_transaction_cancel(tdb);
955                 return -1;
956         }
957
958
959         if (tdb->tdb1.transaction->nesting != 0) {
960                 return 0;
961         }
962
963         /* check for a null transaction */
964         if (tdb->tdb1.transaction->blocks == NULL) {
965                 return 0;
966         }
967
968         methods = tdb->tdb1.transaction->io_methods;
969
970         /* if there are any locks pending then the caller has not
971            nested their locks properly, so fail the transaction */
972         if (tdb1_have_extra_locks(tdb)) {
973                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
974                                         "tdb1_transaction_prepare_commit:"
975                                         " locks pending on commit");
976                 _tdb1_transaction_cancel(tdb);
977                 return -1;
978         }
979
980         /* upgrade the main transaction lock region to a write lock */
981         if (tdb1_allrecord_upgrade(tdb) == -1) {
982                 if (errno != EAGAIN && errno != EINTR) {
983                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
984                                    "tdb1_transaction_prepare_commit:"
985                                    " failed to upgrade hash locks");
986                 }
987                 return -1;
988         }
989
990         /* get the open lock - this prevents new users attaching to the database
991            during the commit */
992         if (tdb1_nest_lock(tdb, TDB1_OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
993                 if (errno != EAGAIN && errno != EINTR) {
994                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
995                                    "tdb1_transaction_prepare_commit:"
996                                    " failed to get open lock");
997                 }
998                 return -1;
999         }
1000
1001         if (!(tdb->flags & TDB_NOSYNC)) {
1002                 /* write the recovery data to the end of the file */
1003                 if (transaction1_setup_recovery(tdb, &tdb->tdb1.transaction->magic_offset) == -1) {
1004                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1005                                    "tdb1_transaction_prepare_commit:"
1006                                    " failed to setup recovery data");
1007                         return -1;
1008                 }
1009         }
1010
1011         tdb->tdb1.transaction->prepared = true;
1012
1013         /* expand the file to the new size if needed */
1014         if (tdb->file->map_size != tdb->tdb1.transaction->old_map_size) {
1015                 if (methods->tdb1_expand_file(tdb, tdb->tdb1.transaction->old_map_size,
1016                                              tdb->file->map_size -
1017                                              tdb->tdb1.transaction->old_map_size) == -1) {
1018                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1019                                    "tdb1_transaction_prepare_commit:"
1020                                    " expansion failed");
1021                         return -1;
1022                 }
1023                 tdb->stats.transaction_expand_file++;
1024                 tdb->file->map_size = tdb->tdb1.transaction->old_map_size;
1025                 methods->tdb1_oob(tdb, tdb->file->map_size, 1, 1);
1026         }
1027
1028         /* Keep the open lock until the actual commit */
1029
1030         return 0;
1031 }
1032
1033 /*
1034    prepare to commit the current transaction
1035 */
1036 int tdb1_transaction_prepare_commit(struct tdb_context *tdb)
1037 {
1038         return _tdb1_transaction_prepare_commit(tdb);
1039 }
1040
1041 /* A repack is worthwhile if the largest is less than half total free. */
1042 static bool repack_worthwhile(struct tdb_context *tdb)
1043 {
1044         tdb1_off_t ptr;
1045         struct tdb1_record rec;
1046         tdb1_len_t total = 0, largest = 0;
1047
1048         if (tdb1_ofs_read(tdb, TDB1_FREELIST_TOP, &ptr) == -1) {
1049                 return false;
1050         }
1051
1052         while (ptr != 0 && tdb1_rec_free_read(tdb, ptr, &rec) == 0) {
1053                 total += rec.rec_len;
1054                 if (rec.rec_len > largest) {
1055                         largest = rec.rec_len;
1056                 }
1057                 ptr = rec.next;
1058         }
1059
1060         return total > largest * 2;
1061 }
1062
1063 /*
1064   commit the current transaction
1065 */
1066 int tdb1_transaction_commit(struct tdb_context *tdb)
1067 {
1068         const struct tdb1_methods *methods;
1069         int i;
1070         bool need_repack = false;
1071
1072         if (tdb->tdb1.transaction == NULL) {
1073                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
1074                                         "tdb1_transaction_commit:"
1075                                         " no transaction");
1076                 return -1;
1077         }
1078
1079         if (tdb->tdb1.transaction->transaction_error) {
1080                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
1081                                         "tdb1_transaction_commit:"
1082                                         " transaction error pending");
1083                 _tdb1_transaction_cancel(tdb);
1084                 return -1;
1085         }
1086
1087
1088         if (tdb->tdb1.transaction->nesting != 0) {
1089                 tdb->tdb1.transaction->nesting--;
1090                 return 0;
1091         }
1092
1093         /* check for a null transaction */
1094         if (tdb->tdb1.transaction->blocks == NULL) {
1095                 _tdb1_transaction_cancel(tdb);
1096                 return 0;
1097         }
1098
1099         if (!tdb->tdb1.transaction->prepared) {
1100                 int ret = _tdb1_transaction_prepare_commit(tdb);
1101                 if (ret) {
1102                         _tdb1_transaction_cancel(tdb);
1103                         return ret;
1104                 }
1105         }
1106
1107         methods = tdb->tdb1.transaction->io_methods;
1108
1109         /* perform all the writes */
1110         for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
1111                 tdb1_off_t offset;
1112                 tdb1_len_t length;
1113
1114                 if (tdb->tdb1.transaction->blocks[i] == NULL) {
1115                         continue;
1116                 }
1117
1118                 offset = i * tdb->tdb1.transaction->block_size;
1119                 length = tdb->tdb1.transaction->block_size;
1120                 if (i == tdb->tdb1.transaction->num_blocks-1) {
1121                         length = tdb->tdb1.transaction->last_block_size;
1122                 }
1123
1124                 if (methods->tdb1_write(tdb, offset, tdb->tdb1.transaction->blocks[i], length) == -1) {
1125                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1126                                    "tdb1_transaction_commit:"
1127                                    " write failed during commit");
1128
1129                         /* we've overwritten part of the data and
1130                            possibly expanded the file, so we need to
1131                            run the crash recovery code */
1132                         tdb->tdb1.io = methods;
1133                         tdb1_transaction_recover(tdb);
1134
1135                         _tdb1_transaction_cancel(tdb);
1136
1137                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1138                                    "tdb1_transaction_commit: write failed");
1139                         return -1;
1140                 }
1141                 SAFE_FREE(tdb->tdb1.transaction->blocks[i]);
1142         }
1143
1144         /* Do this before we drop lock or blocks. */
1145         if (tdb->tdb1.transaction->expanded) {
1146                 need_repack = repack_worthwhile(tdb);
1147         }
1148
1149         SAFE_FREE(tdb->tdb1.transaction->blocks);
1150         tdb->tdb1.transaction->num_blocks = 0;
1151
1152         /* ensure the new data is on disk */
1153         if (transaction1_sync(tdb, 0, tdb->file->map_size) == -1) {
1154                 return -1;
1155         }
1156
1157         /*
1158           TODO: maybe write to some dummy hdr field, or write to magic
1159           offset without mmap, before the last sync, instead of the
1160           utime() call
1161         */
1162
1163         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1164            don't change the mtime of the file, this means the file may
1165            not be backed up (as tdb rounding to block sizes means that
1166            file size changes are quite rare too). The following forces
1167            mtime changes when a transaction completes */
1168 #if HAVE_UTIME
1169         utime(tdb->name, NULL);
1170 #endif
1171
1172         /* use a transaction cancel to free memory and remove the
1173            transaction locks */
1174         _tdb1_transaction_cancel(tdb);
1175
1176         if (need_repack) {
1177                 if (tdb_repack(tdb) != 0)
1178                         return -1;
1179         }
1180
1181         return 0;
1182 }
1183
1184
1185 /*
1186   recover from an aborted transaction. Must be called with exclusive
1187   database write access already established (including the open
1188   lock to prevent new processes attaching)
1189 */
1190 int tdb1_transaction_recover(struct tdb_context *tdb)
1191 {
1192         tdb1_off_t recovery_head, recovery_eof;
1193         unsigned char *data, *p;
1194         uint32_t zero = 0;
1195         struct tdb1_record rec;
1196
1197         /* find the recovery area */
1198         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1199                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1200                            "tdb1_transaction_recover:"
1201                            " failed to read recovery head");
1202                 return -1;
1203         }
1204
1205         if (recovery_head == 0) {
1206                 /* we have never allocated a recovery record */
1207                 return 0;
1208         }
1209
1210         /* read the recovery record */
1211         if (tdb->tdb1.io->tdb1_read(tdb, recovery_head, &rec,
1212                                    sizeof(rec), TDB1_DOCONV()) == -1) {
1213                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1214                            "tdb1_transaction_recover:"
1215                            " failed to read recovery record");
1216                 return -1;
1217         }
1218
1219         if (rec.magic != TDB1_RECOVERY_MAGIC) {
1220                 /* there is no valid recovery data */
1221                 return 0;
1222         }
1223
1224         if (tdb->flags & TDB_RDONLY) {
1225                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1226                                         "tdb1_transaction_recover:"
1227                                         " attempt to recover read only"
1228                                         " database");
1229                 return -1;
1230         }
1231
1232         recovery_eof = rec.key_len;
1233
1234         data = (unsigned char *)malloc(rec.data_len);
1235         if (data == NULL) {
1236                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1237                                         "tdb1_transaction_recover:"
1238                                         " failed to allocate recovery data");
1239                 return -1;
1240         }
1241
1242         /* read the full recovery data */
1243         if (tdb->tdb1.io->tdb1_read(tdb, recovery_head + sizeof(rec), data,
1244                                    rec.data_len, 0) == -1) {
1245                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1246                            "tdb1_transaction_recover:"
1247                            " failed to read recovery data");
1248                 return -1;
1249         }
1250
1251         /* recover the file data */
1252         p = data;
1253         while (p+8 < data + rec.data_len) {
1254                 uint32_t ofs, len;
1255                 if (TDB1_DOCONV()) {
1256                         tdb1_convert(p, 8);
1257                 }
1258                 memcpy(&ofs, p, 4);
1259                 memcpy(&len, p+4, 4);
1260
1261                 if (tdb->tdb1.io->tdb1_write(tdb, ofs, p+8, len) == -1) {
1262                         free(data);
1263                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1264                                    "tdb1_transaction_recover: failed to recover"
1265                                    " %d bytes at offset %d", len, ofs);
1266                         return -1;
1267                 }
1268                 p += 8 + len;
1269         }
1270
1271         free(data);
1272
1273         if (transaction1_sync(tdb, 0, tdb->file->map_size) == -1) {
1274                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1275                            "tdb1_transaction_recover: failed to sync recovery");
1276                 return -1;
1277         }
1278
1279         /* if the recovery area is after the recovered eof then remove it */
1280         if (recovery_eof <= recovery_head) {
1281                 if (tdb1_ofs_write(tdb, TDB1_RECOVERY_HEAD, &zero) == -1) {
1282                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1283                                    "tdb1_transaction_recover: failed to remove"
1284                                    " recovery head");
1285                         return -1;
1286                 }
1287         }
1288
1289         /* remove the recovery magic */
1290         if (tdb1_ofs_write(tdb, recovery_head + offsetof(struct tdb1_record, magic),
1291                           &zero) == -1) {
1292                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1293                            "tdb1_transaction_recover: failed to remove"
1294                            " recovery magic");
1295                 return -1;
1296         }
1297
1298         if (transaction1_sync(tdb, 0, recovery_eof) == -1) {
1299                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1300                            "tdb1_transaction_recover:"
1301                            " failed to sync2 recovery");
1302                 return -1;
1303         }
1304
1305         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1306                    "tdb1_transaction_recover: recovered %d byte database",
1307                    recovery_eof);
1308
1309         /* all done */
1310         return 0;
1311 }
1312
1313 /* Any I/O failures we say "needs recovery". */
1314 tdb_bool_err tdb1_needs_recovery(struct tdb_context *tdb)
1315 {
1316         tdb1_off_t recovery_head;
1317         struct tdb1_record rec;
1318
1319         /* find the recovery area */
1320         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1321                 return TDB_ERR_TO_OFF(tdb->last_error);
1322         }
1323
1324         if (recovery_head == 0) {
1325                 /* we have never allocated a recovery record */
1326                 return false;
1327         }
1328
1329         /* read the recovery record */
1330         if (tdb->tdb1.io->tdb1_read(tdb, recovery_head, &rec,
1331                                    sizeof(rec), TDB1_DOCONV()) == -1) {
1332                 return TDB_ERR_TO_OFF(tdb->last_error);
1333         }
1334
1335         return (rec.magic == TDB1_RECOVERY_MAGIC);
1336 }