]> git.ozlabs.org Git - ccan/blob - junkcode/rusty@rustcorp.com.au-ntdb/transaction.c
small fix for ccan/take/take.c
[ccan] / junkcode / rusty@rustcorp.com.au-ntdb / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the ntdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #include <assert.h>
29 #define SAFE_FREE(ntdb, x) do { if ((x) != NULL) {ntdb->free_fn((void *)x, ntdb->alloc_data); (x)=NULL;} } while(0)
30
31 /*
32   transaction design:
33
34   - only allow a single transaction at a time per database. This makes
35     using the transaction API simpler, as otherwise the caller would
36     have to cope with temporary failures in transactions that conflict
37     with other current transactions
38
39   - keep the transaction recovery information in the same file as the
40     database, using a special 'transaction recovery' record pointed at
41     by the header. This removes the need for extra journal files as
42     used by some other databases
43
44   - dynamically allocated the transaction recover record, re-using it
45     for subsequent transactions. If a larger record is needed then
46     ntdb_free() the old record to place it on the normal ntdb freelist
47     before allocating the new record
48
49   - during transactions, keep a linked list of writes all that have
50     been performed by intercepting all ntdb_write() calls. The hooked
51     transaction versions of ntdb_read() and ntdb_write() check this
52     linked list and try to use the elements of the list in preference
53     to the real database.
54
55   - don't allow any locks to be held when a transaction starts,
56     otherwise we can end up with deadlock (plus lack of lock nesting
57     in POSIX locks would mean the lock is lost)
58
59   - if the caller gains a lock during the transaction but doesn't
60     release it then fail the commit
61
62   - allow for nested calls to ntdb_transaction_start(), re-using the
63     existing transaction record. If the inner transaction is canceled
64     then a subsequent commit will fail
65
66   - keep a mirrored copy of the ntdb hash chain heads to allow for the
67     fast hash heads scan on traverse, updating the mirrored copy in
68     the transaction version of ntdb_write
69
70   - allow callers to mix transaction and non-transaction use of ntdb,
71     although once a transaction is started then an exclusive lock is
72     gained until the transaction is committed or canceled
73
74   - the commit stategy involves first saving away all modified data
75     into a linearised buffer in the transaction recovery area, then
76     marking the transaction recovery area with a magic value to
77     indicate a valid recovery record. In total 4 fsync/msync calls are
78     needed per commit to prevent race conditions. It might be possible
79     to reduce this to 3 or even 2 with some more work.
80
81   - check for a valid recovery record on open of the ntdb, while the
82     open lock is held. Automatically recover from the transaction
83     recovery area if needed, then continue with the open as
84     usual. This allows for smooth crash recovery with no administrator
85     intervention.
86
87   - if NTDB_NOSYNC is passed to flags in ntdb_open then transactions are
88     still available, but fsync/msync calls are made.  This means we
89     still are safe against unexpected death during transaction commit,
90     but not against machine reboots.
91 */
92
93 /*
94   hold the context of any current transaction
95 */
96 struct ntdb_transaction {
97         /* the original io methods - used to do IOs to the real db */
98         const struct ntdb_methods *io_methods;
99
100         /* the list of transaction blocks. When a block is first
101            written to, it gets created in this list */
102         uint8_t **blocks;
103         size_t num_blocks;
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested ntdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         unsigned int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         ntdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         ntdb_len_t old_map_size;
121 };
122
123 /*
124   read while in a transaction. We need to check first if the data is in our list
125   of transaction elements, then if not do a real read
126 */
127 static enum NTDB_ERROR transaction_read(struct ntdb_context *ntdb, ntdb_off_t off,
128                                        void *buf, ntdb_len_t len)
129 {
130         size_t blk;
131         enum NTDB_ERROR ecode;
132
133         /* break it down into block sized ops */
134         while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
135                 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
136                 ecode = transaction_read(ntdb, off, buf, len2);
137                 if (ecode != NTDB_SUCCESS) {
138                         return ecode;
139                 }
140                 len -= len2;
141                 off += len2;
142                 buf = (void *)(len2 + (char *)buf);
143         }
144
145         if (len == 0) {
146                 return NTDB_SUCCESS;
147         }
148
149         blk = off / NTDB_PGSIZE;
150
151         /* see if we have it in the block list */
152         if (ntdb->transaction->num_blocks <= blk ||
153             ntdb->transaction->blocks[blk] == NULL) {
154                 /* nope, do a real read */
155                 ecode = ntdb->transaction->io_methods->tread(ntdb, off, buf, len);
156                 if (ecode != NTDB_SUCCESS) {
157                         goto fail;
158                 }
159                 return 0;
160         }
161
162         /* now copy it out of this block */
163         memcpy(buf, ntdb->transaction->blocks[blk] + (off % NTDB_PGSIZE), len);
164         return NTDB_SUCCESS;
165
166 fail:
167         ntdb->transaction->transaction_error = 1;
168         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
169                           "transaction_read: failed at off=%zu len=%zu",
170                           (size_t)off, (size_t)len);
171 }
172
173
174 /*
175   write while in a transaction
176 */
177 static enum NTDB_ERROR transaction_write(struct ntdb_context *ntdb, ntdb_off_t off,
178                                         const void *buf, ntdb_len_t len)
179 {
180         size_t blk;
181         enum NTDB_ERROR ecode;
182
183         /* Only a commit is allowed on a prepared transaction */
184         if (ntdb->transaction->prepared) {
185                 ecode = ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
186                                    "transaction_write: transaction already"
187                                    " prepared, write not allowed");
188                 goto fail;
189         }
190
191         /* break it up into block sized chunks */
192         while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
193                 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
194                 ecode = transaction_write(ntdb, off, buf, len2);
195                 if (ecode != NTDB_SUCCESS) {
196                         return ecode;
197                 }
198                 len -= len2;
199                 off += len2;
200                 if (buf != NULL) {
201                         buf = (const void *)(len2 + (const char *)buf);
202                 }
203         }
204
205         if (len == 0) {
206                 return NTDB_SUCCESS;
207         }
208
209         blk = off / NTDB_PGSIZE;
210         off = off % NTDB_PGSIZE;
211
212         if (ntdb->transaction->num_blocks <= blk) {
213                 uint8_t **new_blocks;
214                 /* expand the blocks array */
215                 if (ntdb->transaction->blocks == NULL) {
216                         new_blocks = (uint8_t **)ntdb->alloc_fn(ntdb,
217                                     (blk+1)*sizeof(uint8_t *), ntdb->alloc_data);
218                 } else {
219                         new_blocks = (uint8_t **)ntdb->expand_fn(
220                                 ntdb->transaction->blocks,
221                                 (blk+1)*sizeof(uint8_t *), ntdb->alloc_data);
222                 }
223                 if (new_blocks == NULL) {
224                         ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
225                                            "transaction_write:"
226                                            " failed to allocate");
227                         goto fail;
228                 }
229                 memset(&new_blocks[ntdb->transaction->num_blocks], 0,
230                        (1+(blk - ntdb->transaction->num_blocks))*sizeof(uint8_t *));
231                 ntdb->transaction->blocks = new_blocks;
232                 ntdb->transaction->num_blocks = blk+1;
233         }
234
235         /* allocate and fill a block? */
236         if (ntdb->transaction->blocks[blk] == NULL) {
237                 ntdb->transaction->blocks[blk] = (uint8_t *)
238                         ntdb->alloc_fn(ntdb->transaction->blocks, NTDB_PGSIZE,
239                                    ntdb->alloc_data);
240                 if (ntdb->transaction->blocks[blk] == NULL) {
241                         ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
242                                            "transaction_write:"
243                                            " failed to allocate");
244                         goto fail;
245                 }
246                 memset(ntdb->transaction->blocks[blk], 0, NTDB_PGSIZE);
247                 if (ntdb->transaction->old_map_size > blk * NTDB_PGSIZE) {
248                         ntdb_len_t len2 = NTDB_PGSIZE;
249                         if (len2 + (blk * NTDB_PGSIZE) > ntdb->transaction->old_map_size) {
250                                 len2 = ntdb->transaction->old_map_size - (blk * NTDB_PGSIZE);
251                         }
252                         ecode = ntdb->transaction->io_methods->tread(ntdb,
253                                         blk * NTDB_PGSIZE,
254                                         ntdb->transaction->blocks[blk],
255                                         len2);
256                         if (ecode != NTDB_SUCCESS) {
257                                 ecode = ntdb_logerr(ntdb, ecode,
258                                                    NTDB_LOG_ERROR,
259                                                    "transaction_write:"
260                                                    " failed to"
261                                                    " read old block: %s",
262                                                    strerror(errno));
263                                 SAFE_FREE(ntdb, ntdb->transaction->blocks[blk]);
264                                 goto fail;
265                         }
266                 }
267         }
268
269         /* overwrite part of an existing block */
270         if (buf == NULL) {
271                 memset(ntdb->transaction->blocks[blk] + off, 0, len);
272         } else {
273                 memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
274         }
275         return NTDB_SUCCESS;
276
277 fail:
278         ntdb->transaction->transaction_error = 1;
279         return ecode;
280 }
281
282
283 /*
284   write while in a transaction - this variant never expands the transaction blocks, it only
285   updates existing blocks. This means it cannot change the recovery size
286 */
287 static void transaction_write_existing(struct ntdb_context *ntdb, ntdb_off_t off,
288                                        const void *buf, ntdb_len_t len)
289 {
290         size_t blk;
291
292         /* break it up into block sized chunks */
293         while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
294                 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
295                 transaction_write_existing(ntdb, off, buf, len2);
296                 len -= len2;
297                 off += len2;
298                 if (buf != NULL) {
299                         buf = (const void *)(len2 + (const char *)buf);
300                 }
301         }
302
303         if (len == 0) {
304                 return;
305         }
306
307         blk = off / NTDB_PGSIZE;
308         off = off % NTDB_PGSIZE;
309
310         if (ntdb->transaction->num_blocks <= blk ||
311             ntdb->transaction->blocks[blk] == NULL) {
312                 return;
313         }
314
315         /* overwrite part of an existing block */
316         memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
317 }
318
319
320 /*
321   out of bounds check during a transaction
322 */
323 static enum NTDB_ERROR transaction_oob(struct ntdb_context *ntdb,
324                                       ntdb_off_t off, ntdb_len_t len, bool probe)
325 {
326         if ((off + len >= off && off + len <= ntdb->file->map_size) || probe) {
327                 return NTDB_SUCCESS;
328         }
329
330         ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
331                    "ntdb_oob len %lld beyond transaction size %lld",
332                    (long long)(off + len),
333                    (long long)ntdb->file->map_size);
334         return NTDB_ERR_IO;
335 }
336
337 /*
338   transaction version of ntdb_expand().
339 */
340 static enum NTDB_ERROR transaction_expand_file(struct ntdb_context *ntdb,
341                                               ntdb_off_t addition)
342 {
343         enum NTDB_ERROR ecode;
344
345         assert((ntdb->file->map_size + addition) % NTDB_PGSIZE == 0);
346
347         /* add a write to the transaction elements, so subsequent
348            reads see the zero data */
349         ecode = transaction_write(ntdb, ntdb->file->map_size, NULL, addition);
350         if (ecode == NTDB_SUCCESS) {
351                 ntdb->file->map_size += addition;
352         }
353         return ecode;
354 }
355
356 static void *transaction_direct(struct ntdb_context *ntdb, ntdb_off_t off,
357                                 size_t len, bool write_mode)
358 {
359         size_t blk = off / NTDB_PGSIZE, end_blk;
360
361         /* This is wrong for zero-length blocks, but will fail gracefully */
362         end_blk = (off + len - 1) / NTDB_PGSIZE;
363
364         /* Can only do direct if in single block and we've already copied. */
365         if (write_mode) {
366                 ntdb->stats.transaction_write_direct++;
367                 if (blk != end_blk
368                     || blk >= ntdb->transaction->num_blocks
369                     || ntdb->transaction->blocks[blk] == NULL) {
370                         ntdb->stats.transaction_write_direct_fail++;
371                         return NULL;
372                 }
373                 return ntdb->transaction->blocks[blk] + off % NTDB_PGSIZE;
374         }
375
376         ntdb->stats.transaction_read_direct++;
377         /* Single which we have copied? */
378         if (blk == end_blk
379             && blk < ntdb->transaction->num_blocks
380             && ntdb->transaction->blocks[blk])
381                 return ntdb->transaction->blocks[blk] + off % NTDB_PGSIZE;
382
383         /* Otherwise must be all not copied. */
384         while (blk <= end_blk) {
385                 if (blk >= ntdb->transaction->num_blocks)
386                         break;
387                 if (ntdb->transaction->blocks[blk]) {
388                         ntdb->stats.transaction_read_direct_fail++;
389                         return NULL;
390                 }
391                 blk++;
392         }
393         return ntdb->transaction->io_methods->direct(ntdb, off, len, false);
394 }
395
396 static ntdb_off_t transaction_read_off(struct ntdb_context *ntdb,
397                                        ntdb_off_t off)
398 {
399         ntdb_off_t ret;
400         enum NTDB_ERROR ecode;
401
402         ecode = transaction_read(ntdb, off, &ret, sizeof(ret));
403         ntdb_convert(ntdb, &ret, sizeof(ret));
404         if (ecode != NTDB_SUCCESS) {
405                 return NTDB_ERR_TO_OFF(ecode);
406         }
407         return ret;
408 }
409
410 static enum NTDB_ERROR transaction_write_off(struct ntdb_context *ntdb,
411                                              ntdb_off_t off, ntdb_off_t val)
412 {
413         ntdb_convert(ntdb, &val, sizeof(val));
414         return transaction_write(ntdb, off, &val, sizeof(val));
415 }
416
417 static const struct ntdb_methods transaction_methods = {
418         transaction_read,
419         transaction_write,
420         transaction_oob,
421         transaction_expand_file,
422         transaction_direct,
423         transaction_read_off,
424         transaction_write_off,
425 };
426
427 /*
428   sync to disk
429 */
430 static enum NTDB_ERROR transaction_sync(struct ntdb_context *ntdb,
431                                        ntdb_off_t offset, ntdb_len_t length)
432 {
433         if (ntdb->flags & NTDB_NOSYNC) {
434                 return NTDB_SUCCESS;
435         }
436
437         if (fsync(ntdb->file->fd) != 0) {
438                 return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
439                                   "ntdb_transaction: fsync failed: %s",
440                                   strerror(errno));
441         }
442 #ifdef MS_SYNC
443         if (ntdb->file->map_ptr) {
444                 ntdb_off_t moffset = offset & ~(getpagesize()-1);
445                 if (msync(moffset + (char *)ntdb->file->map_ptr,
446                           length + (offset - moffset), MS_SYNC) != 0) {
447                         return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
448                                           "ntdb_transaction: msync failed: %s",
449                                           strerror(errno));
450                 }
451         }
452 #endif
453         return NTDB_SUCCESS;
454 }
455
456 static void free_transaction_blocks(struct ntdb_context *ntdb)
457 {
458         int i;
459
460         /* free all the transaction blocks */
461         for (i=0;i<ntdb->transaction->num_blocks;i++) {
462                 if (ntdb->transaction->blocks[i] != NULL) {
463                         ntdb->free_fn(ntdb->transaction->blocks[i],
464                                       ntdb->alloc_data);
465                 }
466         }
467         SAFE_FREE(ntdb, ntdb->transaction->blocks);
468         ntdb->transaction->num_blocks = 0;
469 }
470
471 static void _ntdb_transaction_cancel(struct ntdb_context *ntdb)
472 {
473         enum NTDB_ERROR ecode;
474
475         if (ntdb->transaction == NULL) {
476                 ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
477                            "ntdb_transaction_cancel: no transaction");
478                 return;
479         }
480
481         if (ntdb->transaction->nesting != 0) {
482                 ntdb->transaction->transaction_error = 1;
483                 ntdb->transaction->nesting--;
484                 return;
485         }
486
487         ntdb->file->map_size = ntdb->transaction->old_map_size;
488
489         free_transaction_blocks(ntdb);
490
491         if (ntdb->transaction->magic_offset) {
492                 const struct ntdb_methods *methods = ntdb->transaction->io_methods;
493                 uint64_t invalid = NTDB_RECOVERY_INVALID_MAGIC;
494
495                 /* remove the recovery marker */
496                 ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
497                                         &invalid, sizeof(invalid));
498                 if (ecode == NTDB_SUCCESS)
499                         ecode = transaction_sync(ntdb,
500                                                  ntdb->transaction->magic_offset,
501                                                  sizeof(invalid));
502                 if (ecode != NTDB_SUCCESS) {
503                         ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
504                                    "ntdb_transaction_cancel: failed to remove"
505                                    " recovery magic");
506                 }
507         }
508
509         if (ntdb->file->allrecord_lock.count)
510                 ntdb_allrecord_unlock(ntdb, ntdb->file->allrecord_lock.ltype);
511
512         /* restore the normal io methods */
513         ntdb->io = ntdb->transaction->io_methods;
514
515         ntdb_transaction_unlock(ntdb, F_WRLCK);
516
517         if (ntdb_has_open_lock(ntdb))
518                 ntdb_unlock_open(ntdb, F_WRLCK);
519
520         SAFE_FREE(ntdb, ntdb->transaction);
521 }
522
523 /*
524   start a ntdb transaction. No token is returned, as only a single
525   transaction is allowed to be pending per ntdb_context
526 */
527 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_start(struct ntdb_context *ntdb)
528 {
529         enum NTDB_ERROR ecode;
530
531         ntdb->stats.transactions++;
532         /* some sanity checks */
533         if (ntdb->flags & NTDB_INTERNAL) {
534                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
535                                    "ntdb_transaction_start:"
536                                    " cannot start a transaction on an"
537                                    " internal ntdb");
538         }
539
540         if (ntdb->flags & NTDB_RDONLY) {
541                 return ntdb_logerr(ntdb, NTDB_ERR_RDONLY, NTDB_LOG_USE_ERROR,
542                                    "ntdb_transaction_start:"
543                                    " cannot start a transaction on a"
544                                    " read-only ntdb");
545         }
546
547         /* cope with nested ntdb_transaction_start() calls */
548         if (ntdb->transaction != NULL) {
549                 if (!(ntdb->flags & NTDB_ALLOW_NESTING)) {
550                         return ntdb_logerr(ntdb, NTDB_ERR_IO,
551                                            NTDB_LOG_USE_ERROR,
552                                            "ntdb_transaction_start:"
553                                            " already inside transaction");
554                 }
555                 ntdb->transaction->nesting++;
556                 ntdb->stats.transaction_nest++;
557                 return 0;
558         }
559
560         if (ntdb_has_hash_locks(ntdb)) {
561                 /* the caller must not have any locks when starting a
562                    transaction as otherwise we'll be screwed by lack
563                    of nested locks in POSIX */
564                 return ntdb_logerr(ntdb, NTDB_ERR_LOCK,
565                                    NTDB_LOG_USE_ERROR,
566                                    "ntdb_transaction_start:"
567                                    " cannot start a transaction with locks"
568                                    " held");
569         }
570
571         ntdb->transaction = (struct ntdb_transaction *)
572                 ntdb->alloc_fn(ntdb, sizeof(struct ntdb_transaction),
573                                ntdb->alloc_data);
574         if (ntdb->transaction == NULL) {
575                 return ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
576                                    "ntdb_transaction_start:"
577                                    " cannot allocate");
578         }
579         memset(ntdb->transaction, 0, sizeof(*ntdb->transaction));
580
581         /* get the transaction write lock. This is a blocking lock. As
582            discussed with Volker, there are a number of ways we could
583            make this async, which we will probably do in the future */
584         ecode = ntdb_transaction_lock(ntdb, F_WRLCK);
585         if (ecode != NTDB_SUCCESS) {
586                 SAFE_FREE(ntdb, ntdb->transaction->blocks);
587                 SAFE_FREE(ntdb, ntdb->transaction);
588                 return ecode;
589         }
590
591         /* get a read lock over entire file. This is upgraded to a write
592            lock during the commit */
593         ecode = ntdb_allrecord_lock(ntdb, F_RDLCK, NTDB_LOCK_WAIT, true);
594         if (ecode != NTDB_SUCCESS) {
595                 goto fail_allrecord_lock;
596         }
597
598         /* make sure we know about any file expansions already done by
599            anyone else */
600         ntdb_oob(ntdb, ntdb->file->map_size, 1, true);
601         ntdb->transaction->old_map_size = ntdb->file->map_size;
602
603         /* finally hook the io methods, replacing them with
604            transaction specific methods */
605         ntdb->transaction->io_methods = ntdb->io;
606         ntdb->io = &transaction_methods;
607         return NTDB_SUCCESS;
608
609 fail_allrecord_lock:
610         ntdb_transaction_unlock(ntdb, F_WRLCK);
611         SAFE_FREE(ntdb, ntdb->transaction->blocks);
612         SAFE_FREE(ntdb, ntdb->transaction);
613         return ecode;
614 }
615
616
617 /*
618   cancel the current transaction
619 */
620 _PUBLIC_ void ntdb_transaction_cancel(struct ntdb_context *ntdb)
621 {
622         ntdb->stats.transaction_cancel++;
623         _ntdb_transaction_cancel(ntdb);
624 }
625
626 /*
627   work out how much space the linearised recovery data will consume (worst case)
628 */
629 static ntdb_len_t ntdb_recovery_size(struct ntdb_context *ntdb)
630 {
631         ntdb_len_t recovery_size = 0;
632         int i;
633
634         recovery_size = 0;
635         for (i=0;i<ntdb->transaction->num_blocks;i++) {
636                 if (i * NTDB_PGSIZE >= ntdb->transaction->old_map_size) {
637                         break;
638                 }
639                 if (ntdb->transaction->blocks[i] == NULL) {
640                         continue;
641                 }
642                 recovery_size += 2*sizeof(ntdb_off_t) + NTDB_PGSIZE;
643         }
644
645         return recovery_size;
646 }
647
648 static enum NTDB_ERROR ntdb_recovery_area(struct ntdb_context *ntdb,
649                                         const struct ntdb_methods *methods,
650                                         ntdb_off_t *recovery_offset,
651                                         struct ntdb_recovery_record *rec)
652 {
653         enum NTDB_ERROR ecode;
654
655         *recovery_offset = ntdb_read_off(ntdb,
656                                         offsetof(struct ntdb_header, recovery));
657         if (NTDB_OFF_IS_ERR(*recovery_offset)) {
658                 return NTDB_OFF_TO_ERR(*recovery_offset);
659         }
660
661         if (*recovery_offset == 0) {
662                 rec->max_len = 0;
663                 return NTDB_SUCCESS;
664         }
665
666         ecode = methods->tread(ntdb, *recovery_offset, rec, sizeof(*rec));
667         if (ecode != NTDB_SUCCESS)
668                 return ecode;
669
670         ntdb_convert(ntdb, rec, sizeof(*rec));
671         /* ignore invalid recovery regions: can happen in crash */
672         if (rec->magic != NTDB_RECOVERY_MAGIC &&
673             rec->magic != NTDB_RECOVERY_INVALID_MAGIC) {
674                 *recovery_offset = 0;
675                 rec->max_len = 0;
676         }
677         return NTDB_SUCCESS;
678 }
679
680 static unsigned int same(const unsigned char *new,
681                          const unsigned char *old,
682                          unsigned int length)
683 {
684         unsigned int i;
685
686         for (i = 0; i < length; i++) {
687                 if (new[i] != old[i])
688                         break;
689         }
690         return i;
691 }
692
693 static unsigned int different(const unsigned char *new,
694                               const unsigned char *old,
695                               unsigned int length,
696                               unsigned int min_same,
697                               unsigned int *samelen)
698 {
699         unsigned int i;
700
701         *samelen = 0;
702         for (i = 0; i < length; i++) {
703                 if (new[i] == old[i]) {
704                         (*samelen)++;
705                 } else {
706                         if (*samelen >= min_same) {
707                                 return i - *samelen;
708                         }
709                         *samelen = 0;
710                 }
711         }
712
713         if (*samelen < min_same)
714                 *samelen = 0;
715         return length - *samelen;
716 }
717
718 /* Allocates recovery blob, without ntdb_recovery_record at head set up. */
719 static struct ntdb_recovery_record *alloc_recovery(struct ntdb_context *ntdb,
720                                                   ntdb_len_t *len)
721 {
722         struct ntdb_recovery_record *rec;
723         size_t i;
724         enum NTDB_ERROR ecode;
725         unsigned char *p;
726         const struct ntdb_methods *old_methods = ntdb->io;
727
728         rec = ntdb->alloc_fn(ntdb, sizeof(*rec) + ntdb_recovery_size(ntdb),
729                          ntdb->alloc_data);
730         if (!rec) {
731                 ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
732                            "transaction_setup_recovery:"
733                            " cannot allocate");
734                 return NTDB_ERR_PTR(NTDB_ERR_OOM);
735         }
736
737         /* We temporarily revert to the old I/O methods, so we can use
738          * ntdb_access_read */
739         ntdb->io = ntdb->transaction->io_methods;
740
741         /* build the recovery data into a single blob to allow us to do a single
742            large write, which should be more efficient */
743         p = (unsigned char *)(rec + 1);
744         for (i=0;i<ntdb->transaction->num_blocks;i++) {
745                 ntdb_off_t offset;
746                 ntdb_len_t length;
747                 unsigned int off;
748                 const unsigned char *buffer;
749
750                 if (ntdb->transaction->blocks[i] == NULL) {
751                         continue;
752                 }
753
754                 offset = i * NTDB_PGSIZE;
755                 length = NTDB_PGSIZE;
756                 if (offset >= ntdb->transaction->old_map_size) {
757                         continue;
758                 }
759
760                 if (offset + length > ntdb->file->map_size) {
761                         ecode = ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
762                                            "ntdb_transaction_setup_recovery:"
763                                            " transaction data over new region"
764                                            " boundary");
765                         goto fail;
766                 }
767                 buffer = ntdb_access_read(ntdb, offset, length, false);
768                 if (NTDB_PTR_IS_ERR(buffer)) {
769                         ecode = NTDB_PTR_ERR(buffer);
770                         goto fail;
771                 }
772
773                 /* Skip over anything the same at the start. */
774                 off = same(ntdb->transaction->blocks[i], buffer, length);
775                 offset += off;
776
777                 while (off < length) {
778                         ntdb_len_t len1;
779                         unsigned int samelen;
780
781                         len1 = different(ntdb->transaction->blocks[i] + off,
782                                         buffer + off, length - off,
783                                         sizeof(offset) + sizeof(len1) + 1,
784                                         &samelen);
785
786                         memcpy(p, &offset, sizeof(offset));
787                         memcpy(p + sizeof(offset), &len1, sizeof(len1));
788                         ntdb_convert(ntdb, p, sizeof(offset) + sizeof(len1));
789                         p += sizeof(offset) + sizeof(len1);
790                         memcpy(p, buffer + off, len1);
791                         p += len1;
792                         off += len1 + samelen;
793                         offset += len1 + samelen;
794                 }
795                 ntdb_access_release(ntdb, buffer);
796         }
797
798         *len = p - (unsigned char *)(rec + 1);
799         ntdb->io = old_methods;
800         return rec;
801
802 fail:
803         ntdb->free_fn(rec, ntdb->alloc_data);
804         ntdb->io = old_methods;
805         return NTDB_ERR_PTR(ecode);
806 }
807
808 static ntdb_off_t create_recovery_area(struct ntdb_context *ntdb,
809                                       ntdb_len_t rec_length,
810                                       struct ntdb_recovery_record *rec)
811 {
812         ntdb_off_t off, recovery_off;
813         ntdb_len_t addition;
814         enum NTDB_ERROR ecode;
815         const struct ntdb_methods *methods = ntdb->transaction->io_methods;
816
817         /* round up to a multiple of page size. Overallocate, since each
818          * such allocation forces us to expand the file. */
819         rec->max_len = ntdb_expand_adjust(ntdb->file->map_size, rec_length);
820
821         /* Round up to a page. */
822         rec->max_len = ((sizeof(*rec) + rec->max_len + NTDB_PGSIZE-1)
823                         & ~(NTDB_PGSIZE-1))
824                 - sizeof(*rec);
825
826         off = ntdb->file->map_size;
827
828         /* Restore ->map_size before calling underlying expand_file.
829            Also so that we don't try to expand the file again in the
830            transaction commit, which would destroy the recovery
831            area */
832         addition = (ntdb->file->map_size - ntdb->transaction->old_map_size) +
833                 sizeof(*rec) + rec->max_len;
834         ntdb->file->map_size = ntdb->transaction->old_map_size;
835         ntdb->stats.transaction_expand_file++;
836         ecode = methods->expand_file(ntdb, addition);
837         if (ecode != NTDB_SUCCESS) {
838                 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
839                            "ntdb_recovery_allocate:"
840                            " failed to create recovery area");
841                 return NTDB_ERR_TO_OFF(ecode);
842         }
843
844         /* we have to reset the old map size so that we don't try to
845            expand the file again in the transaction commit, which
846            would destroy the recovery area */
847         ntdb->transaction->old_map_size = ntdb->file->map_size;
848
849         /* write the recovery header offset and sync - we can sync without a race here
850            as the magic ptr in the recovery record has not been set */
851         recovery_off = off;
852         ntdb_convert(ntdb, &recovery_off, sizeof(recovery_off));
853         ecode = methods->twrite(ntdb, offsetof(struct ntdb_header, recovery),
854                                 &recovery_off, sizeof(ntdb_off_t));
855         if (ecode != NTDB_SUCCESS) {
856                 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
857                            "ntdb_recovery_allocate:"
858                            " failed to write recovery head");
859                 return NTDB_ERR_TO_OFF(ecode);
860         }
861         transaction_write_existing(ntdb, offsetof(struct ntdb_header, recovery),
862                                    &recovery_off,
863                                    sizeof(ntdb_off_t));
864         return off;
865 }
866
867 /*
868   setup the recovery data that will be used on a crash during commit
869 */
870 static enum NTDB_ERROR transaction_setup_recovery(struct ntdb_context *ntdb)
871 {
872         ntdb_len_t recovery_size = 0;
873         ntdb_off_t recovery_off = 0;
874         ntdb_off_t old_map_size = ntdb->transaction->old_map_size;
875         struct ntdb_recovery_record *recovery;
876         const struct ntdb_methods *methods = ntdb->transaction->io_methods;
877         uint64_t magic;
878         enum NTDB_ERROR ecode;
879
880         recovery = alloc_recovery(ntdb, &recovery_size);
881         if (NTDB_PTR_IS_ERR(recovery))
882                 return NTDB_PTR_ERR(recovery);
883
884         /* If we didn't actually change anything we overwrote? */
885         if (recovery_size == 0) {
886                 /* In theory, we could have just appended data. */
887                 if (ntdb->transaction->num_blocks * NTDB_PGSIZE
888                     < ntdb->transaction->old_map_size) {
889                         free_transaction_blocks(ntdb);
890                 }
891                 ntdb->free_fn(recovery, ntdb->alloc_data);
892                 return NTDB_SUCCESS;
893         }
894
895         ecode = ntdb_recovery_area(ntdb, methods, &recovery_off, recovery);
896         if (ecode) {
897                 ntdb->free_fn(recovery, ntdb->alloc_data);
898                 return ecode;
899         }
900
901         if (recovery->max_len < recovery_size) {
902                 /* Not large enough. Free up old recovery area. */
903                 if (recovery_off) {
904                         ntdb->stats.frees++;
905                         ecode = add_free_record(ntdb, recovery_off,
906                                                 sizeof(*recovery)
907                                                 + recovery->max_len,
908                                                 NTDB_LOCK_WAIT, true);
909                         ntdb->free_fn(recovery, ntdb->alloc_data);
910                         if (ecode != NTDB_SUCCESS) {
911                                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
912                                                   "ntdb_recovery_allocate:"
913                                                   " failed to free previous"
914                                                   " recovery area");
915                         }
916
917                         /* Refresh recovery after add_free_record above. */
918                         recovery = alloc_recovery(ntdb, &recovery_size);
919                         if (NTDB_PTR_IS_ERR(recovery))
920                                 return NTDB_PTR_ERR(recovery);
921                 }
922
923                 recovery_off = create_recovery_area(ntdb, recovery_size,
924                                                     recovery);
925                 if (NTDB_OFF_IS_ERR(recovery_off)) {
926                         ntdb->free_fn(recovery, ntdb->alloc_data);
927                         return NTDB_OFF_TO_ERR(recovery_off);
928                 }
929         }
930
931         /* Now we know size, convert rec header. */
932         recovery->magic = NTDB_RECOVERY_INVALID_MAGIC;
933         recovery->len = recovery_size;
934         recovery->eof = old_map_size;
935         ntdb_convert(ntdb, recovery, sizeof(*recovery));
936
937         /* write the recovery data to the recovery area */
938         ecode = methods->twrite(ntdb, recovery_off, recovery,
939                                 sizeof(*recovery) + recovery_size);
940         if (ecode != NTDB_SUCCESS) {
941                 ntdb->free_fn(recovery, ntdb->alloc_data);
942                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
943                                   "ntdb_transaction_setup_recovery:"
944                                   " failed to write recovery data");
945         }
946         transaction_write_existing(ntdb, recovery_off, recovery, recovery_size);
947
948         ntdb->free_fn(recovery, ntdb->alloc_data);
949
950         /* as we don't have ordered writes, we have to sync the recovery
951            data before we update the magic to indicate that the recovery
952            data is present */
953         ecode = transaction_sync(ntdb, recovery_off, recovery_size);
954         if (ecode != NTDB_SUCCESS)
955                 return ecode;
956
957         magic = NTDB_RECOVERY_MAGIC;
958         ntdb_convert(ntdb, &magic, sizeof(magic));
959
960         ntdb->transaction->magic_offset
961                 = recovery_off + offsetof(struct ntdb_recovery_record, magic);
962
963         ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
964                                 &magic, sizeof(magic));
965         if (ecode != NTDB_SUCCESS) {
966                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
967                                   "ntdb_transaction_setup_recovery:"
968                                   " failed to write recovery magic");
969         }
970         transaction_write_existing(ntdb, ntdb->transaction->magic_offset,
971                                    &magic, sizeof(magic));
972
973         /* ensure the recovery magic marker is on disk */
974         return transaction_sync(ntdb, ntdb->transaction->magic_offset,
975                                 sizeof(magic));
976 }
977
978 static enum NTDB_ERROR _ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
979 {
980         const struct ntdb_methods *methods;
981         enum NTDB_ERROR ecode;
982
983         if (ntdb->transaction == NULL) {
984                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
985                                   "ntdb_transaction_prepare_commit:"
986                                   " no transaction");
987         }
988
989         if (ntdb->transaction->prepared) {
990                 _ntdb_transaction_cancel(ntdb);
991                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
992                                   "ntdb_transaction_prepare_commit:"
993                                   " transaction already prepared");
994         }
995
996         if (ntdb->transaction->transaction_error) {
997                 _ntdb_transaction_cancel(ntdb);
998                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
999                                   "ntdb_transaction_prepare_commit:"
1000                                   " transaction error pending");
1001         }
1002
1003
1004         if (ntdb->transaction->nesting != 0) {
1005                 return NTDB_SUCCESS;
1006         }
1007
1008         /* check for a null transaction */
1009         if (ntdb->transaction->blocks == NULL) {
1010                 return NTDB_SUCCESS;
1011         }
1012
1013         methods = ntdb->transaction->io_methods;
1014
1015         /* upgrade the main transaction lock region to a write lock */
1016         ecode = ntdb_allrecord_upgrade(ntdb, NTDB_HASH_LOCK_START);
1017         if (ecode != NTDB_SUCCESS) {
1018                 return ecode;
1019         }
1020
1021         /* get the open lock - this prevents new users attaching to the database
1022            during the commit */
1023         ecode = ntdb_lock_open(ntdb, F_WRLCK, NTDB_LOCK_WAIT|NTDB_LOCK_NOCHECK);
1024         if (ecode != NTDB_SUCCESS) {
1025                 return ecode;
1026         }
1027
1028         /* Sets up ntdb->transaction->recovery and
1029          * ntdb->transaction->magic_offset. */
1030         ecode = transaction_setup_recovery(ntdb);
1031         if (ecode != NTDB_SUCCESS) {
1032                 return ecode;
1033         }
1034
1035         ntdb->transaction->prepared = true;
1036
1037         /* expand the file to the new size if needed */
1038         if (ntdb->file->map_size != ntdb->transaction->old_map_size) {
1039                 ntdb_len_t add;
1040
1041                 add = ntdb->file->map_size - ntdb->transaction->old_map_size;
1042                 /* Restore original map size for ntdb_expand_file */
1043                 ntdb->file->map_size = ntdb->transaction->old_map_size;
1044                 ecode = methods->expand_file(ntdb, add);
1045                 if (ecode != NTDB_SUCCESS) {
1046                         return ecode;
1047                 }
1048         }
1049
1050         /* Keep the open lock until the actual commit */
1051         return NTDB_SUCCESS;
1052 }
1053
1054 /*
1055    prepare to commit the current transaction
1056 */
1057 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
1058 {
1059         return _ntdb_transaction_prepare_commit(ntdb);
1060 }
1061
1062 /*
1063   commit the current transaction
1064 */
1065 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_commit(struct ntdb_context *ntdb)
1066 {
1067         const struct ntdb_methods *methods;
1068         int i;
1069         enum NTDB_ERROR ecode;
1070
1071         if (ntdb->transaction == NULL) {
1072                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
1073                                    "ntdb_transaction_commit:"
1074                                    " no transaction");
1075         }
1076
1077         ntdb_trace(ntdb, "ntdb_transaction_commit");
1078
1079         if (ntdb->transaction->nesting != 0) {
1080                 ntdb->transaction->nesting--;
1081                 return NTDB_SUCCESS;
1082         }
1083
1084         if (!ntdb->transaction->prepared) {
1085                 ecode = _ntdb_transaction_prepare_commit(ntdb);
1086                 if (ecode != NTDB_SUCCESS) {
1087                         _ntdb_transaction_cancel(ntdb);
1088                         return ecode;
1089                 }
1090         }
1091
1092         /* check for a null transaction (prepare_commit may do this!) */
1093         if (ntdb->transaction->blocks == NULL) {
1094                 _ntdb_transaction_cancel(ntdb);
1095                 return NTDB_SUCCESS;
1096         }
1097
1098         methods = ntdb->transaction->io_methods;
1099
1100         /* perform all the writes */
1101         for (i=0;i<ntdb->transaction->num_blocks;i++) {
1102                 ntdb_off_t offset;
1103                 ntdb_len_t length;
1104
1105                 if (ntdb->transaction->blocks[i] == NULL) {
1106                         continue;
1107                 }
1108
1109                 offset = i * NTDB_PGSIZE;
1110                 length = NTDB_PGSIZE;
1111
1112                 ecode = methods->twrite(ntdb, offset,
1113                                         ntdb->transaction->blocks[i], length);
1114                 if (ecode != NTDB_SUCCESS) {
1115                         /* we've overwritten part of the data and
1116                            possibly expanded the file, so we need to
1117                            run the crash recovery code */
1118                         ntdb->io = methods;
1119                         ntdb_transaction_recover(ntdb);
1120
1121                         _ntdb_transaction_cancel(ntdb);
1122
1123                         return ecode;
1124                 }
1125                 SAFE_FREE(ntdb, ntdb->transaction->blocks[i]);
1126         }
1127
1128         SAFE_FREE(ntdb, ntdb->transaction->blocks);
1129         ntdb->transaction->num_blocks = 0;
1130
1131         /* ensure the new data is on disk */
1132         ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1133         if (ecode != NTDB_SUCCESS) {
1134                 return ecode;
1135         }
1136
1137         /*
1138           TODO: maybe write to some dummy hdr field, or write to magic
1139           offset without mmap, before the last sync, instead of the
1140           utime() call
1141         */
1142
1143         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1144            don't change the mtime of the file, this means the file may
1145            not be backed up (as ntdb rounding to block sizes means that
1146            file size changes are quite rare too). The following forces
1147            mtime changes when a transaction completes */
1148 #if HAVE_UTIME
1149         utime(ntdb->name, NULL);
1150 #endif
1151
1152         /* use a transaction cancel to free memory and remove the
1153            transaction locks: it "restores" map_size, too. */
1154         ntdb->transaction->old_map_size = ntdb->file->map_size;
1155         _ntdb_transaction_cancel(ntdb);
1156
1157         return NTDB_SUCCESS;
1158 }
1159
1160
1161 /*
1162   recover from an aborted transaction. Must be called with exclusive
1163   database write access already established (including the open
1164   lock to prevent new processes attaching)
1165 */
1166 enum NTDB_ERROR ntdb_transaction_recover(struct ntdb_context *ntdb)
1167 {
1168         ntdb_off_t recovery_head, recovery_eof;
1169         unsigned char *data, *p;
1170         struct ntdb_recovery_record rec;
1171         enum NTDB_ERROR ecode;
1172
1173         /* find the recovery area */
1174         recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1175         if (NTDB_OFF_IS_ERR(recovery_head)) {
1176                 ecode = NTDB_OFF_TO_ERR(recovery_head);
1177                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1178                                   "ntdb_transaction_recover:"
1179                                   " failed to read recovery head");
1180         }
1181
1182         if (recovery_head == 0) {
1183                 /* we have never allocated a recovery record */
1184                 return NTDB_SUCCESS;
1185         }
1186
1187         /* read the recovery record */
1188         ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1189         if (ecode != NTDB_SUCCESS) {
1190                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1191                                   "ntdb_transaction_recover:"
1192                                   " failed to read recovery record");
1193         }
1194
1195         if (rec.magic != NTDB_RECOVERY_MAGIC) {
1196                 /* there is no valid recovery data */
1197                 return NTDB_SUCCESS;
1198         }
1199
1200         if (ntdb->flags & NTDB_RDONLY) {
1201                 return ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
1202                                   "ntdb_transaction_recover:"
1203                                   " attempt to recover read only database");
1204         }
1205
1206         recovery_eof = rec.eof;
1207
1208         data = (unsigned char *)ntdb->alloc_fn(ntdb, rec.len, ntdb->alloc_data);
1209         if (data == NULL) {
1210                 return ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
1211                                   "ntdb_transaction_recover:"
1212                                   " failed to allocate recovery data");
1213         }
1214
1215         /* read the full recovery data */
1216         ecode = ntdb->io->tread(ntdb, recovery_head + sizeof(rec), data,
1217                                     rec.len);
1218         if (ecode != NTDB_SUCCESS) {
1219                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1220                                   "ntdb_transaction_recover:"
1221                                   " failed to read recovery data");
1222         }
1223
1224         /* recover the file data */
1225         p = data;
1226         while (p+sizeof(ntdb_off_t)+sizeof(ntdb_len_t) < data + rec.len) {
1227                 ntdb_off_t ofs;
1228                 ntdb_len_t len;
1229                 ntdb_convert(ntdb, p, sizeof(ofs) + sizeof(len));
1230                 memcpy(&ofs, p, sizeof(ofs));
1231                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1232                 p += sizeof(ofs) + sizeof(len);
1233
1234                 ecode = ntdb->io->twrite(ntdb, ofs, p, len);
1235                 if (ecode != NTDB_SUCCESS) {
1236                         ntdb->free_fn(data, ntdb->alloc_data);
1237                         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1238                                           "ntdb_transaction_recover:"
1239                                           " failed to recover %zu bytes"
1240                                           " at offset %zu",
1241                                           (size_t)len, (size_t)ofs);
1242                 }
1243                 p += len;
1244         }
1245
1246         ntdb->free_fn(data, ntdb->alloc_data);
1247
1248         ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1249         if (ecode != NTDB_SUCCESS) {
1250                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1251                                   "ntdb_transaction_recover:"
1252                                   " failed to sync recovery");
1253         }
1254
1255         /* if the recovery area is after the recovered eof then remove it */
1256         if (recovery_eof <= recovery_head) {
1257                 ecode = ntdb_write_off(ntdb, offsetof(struct ntdb_header,
1258                                                     recovery),
1259                                       0);
1260                 if (ecode != NTDB_SUCCESS) {
1261                         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1262                                           "ntdb_transaction_recover:"
1263                                           " failed to remove recovery head");
1264                 }
1265         }
1266
1267         /* remove the recovery magic */
1268         ecode = ntdb_write_off(ntdb,
1269                               recovery_head
1270                               + offsetof(struct ntdb_recovery_record, magic),
1271                               NTDB_RECOVERY_INVALID_MAGIC);
1272         if (ecode != NTDB_SUCCESS) {
1273                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1274                                   "ntdb_transaction_recover:"
1275                                   " failed to remove recovery magic");
1276         }
1277
1278         ecode = transaction_sync(ntdb, 0, recovery_eof);
1279         if (ecode != NTDB_SUCCESS) {
1280                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1281                                   "ntdb_transaction_recover:"
1282                                   " failed to sync2 recovery");
1283         }
1284
1285         ntdb_logerr(ntdb, NTDB_SUCCESS, NTDB_LOG_WARNING,
1286                    "ntdb_transaction_recover: recovered %zu byte database",
1287                    (size_t)recovery_eof);
1288
1289         /* all done */
1290         return NTDB_SUCCESS;
1291 }
1292
1293 ntdb_bool_err ntdb_needs_recovery(struct ntdb_context *ntdb)
1294 {
1295         ntdb_off_t recovery_head;
1296         struct ntdb_recovery_record rec;
1297         enum NTDB_ERROR ecode;
1298
1299         /* find the recovery area */
1300         recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1301         if (NTDB_OFF_IS_ERR(recovery_head)) {
1302                 return recovery_head;
1303         }
1304
1305         if (recovery_head == 0) {
1306                 /* we have never allocated a recovery record */
1307                 return false;
1308         }
1309
1310         /* read the recovery record */
1311         ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1312         if (ecode != NTDB_SUCCESS) {
1313                 return NTDB_ERR_TO_OFF(ecode);
1314         }
1315
1316         return (rec.magic == NTDB_RECOVERY_MAGIC);
1317 }