]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
6e0b1669fcd9fa1e2276282e621856f1f8236c3c
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in posix locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is cancelled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or cancelled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129                             tdb_len_t len)
130 {
131         size_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % getpagesize()) > getpagesize()) {
135                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
136                 if (transaction_read(tdb, off, buf, len2) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / getpagesize();
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->read(tdb, off, buf, len) != 0) {
155                         goto fail;
156                 }
157                 return 0;
158         }
159
160         /* it is in the block list. Now check for the last block */
161         if (blk == tdb->transaction->num_blocks-1) {
162                 if (len > tdb->transaction->last_block_size) {
163                         goto fail;
164                 }
165         }
166
167         /* now copy it out of this block */
168         memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len);
169         return 0;
170
171 fail:
172         tdb_logerr(tdb, TDB_ERR_IO, TDB_DEBUG_FATAL,
173                    "transaction_read: failed at off=%zu len=%zu",
174                    (size_t)off, (size_t)len);
175         tdb->transaction->transaction_error = 1;
176         return -1;
177 }
178
179
180 /*
181   write while in a transaction
182 */
183 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
184                              const void *buf, tdb_len_t len)
185 {
186         size_t blk;
187
188         /* Only a commit is allowed on a prepared transaction */
189         if (tdb->transaction->prepared) {
190                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_FATAL,
191                          "transaction_write: transaction already prepared,"
192                          " write not allowed");
193                 goto fail;
194         }
195
196         /* break it up into block sized chunks */
197         while (len + (off % getpagesize()) > getpagesize()) {
198                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
199                 if (transaction_write(tdb, off, buf, len2) != 0) {
200                         return -1;
201                 }
202                 len -= len2;
203                 off += len2;
204                 if (buf != NULL) {
205                         buf = (const void *)(len2 + (const char *)buf);
206                 }
207         }
208
209         if (len == 0) {
210                 return 0;
211         }
212
213         blk = off / getpagesize();
214         off = off % getpagesize();
215
216         if (tdb->transaction->num_blocks <= blk) {
217                 uint8_t **new_blocks;
218                 /* expand the blocks array */
219                 if (tdb->transaction->blocks == NULL) {
220                         new_blocks = (uint8_t **)malloc(
221                                 (blk+1)*sizeof(uint8_t *));
222                 } else {
223                         new_blocks = (uint8_t **)realloc(
224                                 tdb->transaction->blocks,
225                                 (blk+1)*sizeof(uint8_t *));
226                 }
227                 if (new_blocks == NULL) {
228                         tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_FATAL,
229                                    "transaction_write: failed to allocate");
230                         goto fail;
231                 }
232                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
233                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
234                 tdb->transaction->blocks = new_blocks;
235                 tdb->transaction->num_blocks = blk+1;
236                 tdb->transaction->last_block_size = 0;
237         }
238
239         /* allocate and fill a block? */
240         if (tdb->transaction->blocks[blk] == NULL) {
241                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1);
242                 if (tdb->transaction->blocks[blk] == NULL) {
243                         tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_FATAL,
244                                    "transaction_write: failed to allocate");
245                         goto fail;
246                 }
247                 if (tdb->transaction->old_map_size > blk * getpagesize()) {
248                         tdb_len_t len2 = getpagesize();
249                         if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) {
250                                 len2 = tdb->transaction->old_map_size - (blk * getpagesize());
251                         }
252                         if (tdb->transaction->io_methods->read(tdb, blk * getpagesize(),
253                                                                tdb->transaction->blocks[blk],
254                                                                len2) != 0) {
255                                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_FATAL,
256                                            "transaction_write: failed to"
257                                            " read old block: %s",
258                                            strerror(errno));
259                                 SAFE_FREE(tdb->transaction->blocks[blk]);
260                                 goto fail;
261                         }
262                         if (blk == tdb->transaction->num_blocks-1) {
263                                 tdb->transaction->last_block_size = len2;
264                         }
265                 }
266         }
267
268         /* overwrite part of an existing block */
269         if (buf == NULL) {
270                 memset(tdb->transaction->blocks[blk] + off, 0, len);
271         } else {
272                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
273         }
274         if (blk == tdb->transaction->num_blocks-1) {
275                 if (len + off > tdb->transaction->last_block_size) {
276                         tdb->transaction->last_block_size = len + off;
277                 }
278         }
279
280         return 0;
281
282 fail:
283         tdb->transaction->transaction_error = 1;
284         return -1;
285 }
286
287
288 /*
289   write while in a transaction - this varient never expands the transaction blocks, it only
290   updates existing blocks. This means it cannot change the recovery size
291 */
292 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
293                                        const void *buf, tdb_len_t len)
294 {
295         size_t blk;
296
297         /* break it up into block sized chunks */
298         while (len + (off % getpagesize()) > getpagesize()) {
299                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
300                 transaction_write_existing(tdb, off, buf, len2);
301                 len -= len2;
302                 off += len2;
303                 if (buf != NULL) {
304                         buf = (const void *)(len2 + (const char *)buf);
305                 }
306         }
307
308         if (len == 0) {
309                 return;
310         }
311
312         blk = off / getpagesize();
313         off = off % getpagesize();
314
315         if (tdb->transaction->num_blocks <= blk ||
316             tdb->transaction->blocks[blk] == NULL) {
317                 return;
318         }
319
320         if (blk == tdb->transaction->num_blocks-1 &&
321             off + len > tdb->transaction->last_block_size) {
322                 if (off >= tdb->transaction->last_block_size) {
323                         return;
324                 }
325                 len = tdb->transaction->last_block_size - off;
326         }
327
328         /* overwrite part of an existing block */
329         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
330 }
331
332
333 /*
334   out of bounds check during a transaction
335 */
336 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
337 {
338         if (len <= tdb->map_size) {
339                 return 0;
340         }
341         tdb->ecode = TDB_ERR_IO;
342         if (!probe) {
343                 tdb_logerr(tdb, TDB_ERR_IO, TDB_DEBUG_FATAL,
344                            "tdb_oob len %lld beyond transaction size %lld",
345                            (long long)len,
346                            (long long)tdb->map_size);
347         }
348         return -1;
349 }
350
351 /*
352   transaction version of tdb_expand().
353 */
354 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t addition)
355 {
356         /* add a write to the transaction elements, so subsequent
357            reads see the zero data */
358         if (transaction_write(tdb, tdb->map_size, NULL, addition) != 0) {
359                 return -1;
360         }
361         tdb->map_size += addition;
362         return 0;
363 }
364
365 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
366                                 size_t len, bool write)
367 {
368         size_t blk = off / getpagesize(), end_blk;
369
370         /* This is wrong for zero-length blocks, but will fail gracefully */
371         end_blk = (off + len - 1) / getpagesize();
372
373         /* Can only do direct if in single block and we've already copied. */
374         if (write) {
375                 if (blk != end_blk)
376                         return NULL;
377                 if (blk >= tdb->transaction->num_blocks)
378                         return NULL;
379                 if (tdb->transaction->blocks[blk] == NULL)
380                         return NULL;
381                 return tdb->transaction->blocks[blk] + off % getpagesize();
382         }
383
384         /* Single which we have copied? */
385         if (blk == end_blk
386             && blk < tdb->transaction->num_blocks
387             && tdb->transaction->blocks[blk])
388                 return tdb->transaction->blocks[blk] + off % getpagesize();
389
390         /* Otherwise must be all not copied. */
391         while (blk < end_blk) {
392                 if (blk >= tdb->transaction->num_blocks)
393                         break;
394                 if (tdb->transaction->blocks[blk])
395                         return NULL;
396                 blk++;
397         }
398         return tdb->transaction->io_methods->direct(tdb, off, len, write);
399 }
400
401 static const struct tdb_methods transaction_methods = {
402         transaction_read,
403         transaction_write,
404         transaction_oob,
405         transaction_expand_file,
406         transaction_direct,
407 };
408
409 /*
410   sync to disk
411 */
412 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
413 {
414         if (tdb->flags & TDB_NOSYNC) {
415                 return 0;
416         }
417
418         if (fsync(tdb->fd) != 0) {
419                 tdb_logerr(tdb, TDB_ERR_IO, TDB_DEBUG_FATAL,
420                            "tdb_transaction: fsync failed: %s",
421                            strerror(errno));
422                 return -1;
423         }
424 #ifdef MS_SYNC
425         if (tdb->map_ptr) {
426                 tdb_off_t moffset = offset & ~(getpagesize()-1);
427                 if (msync(moffset + (char *)tdb->map_ptr,
428                           length + (offset - moffset), MS_SYNC) != 0) {
429                         tdb_logerr(tdb, TDB_ERR_IO, TDB_DEBUG_FATAL,
430                                    "tdb_transaction: msync failed: %s",
431                                    strerror(errno));
432                         return -1;
433                 }
434         }
435 #endif
436         return 0;
437 }
438
439
440 static void _tdb_transaction_cancel(struct tdb_context *tdb)
441 {
442         int i;
443
444         if (tdb->transaction == NULL) {
445                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
446                            "tdb_transaction_cancel: no transaction");
447                 return;
448         }
449
450         if (tdb->transaction->nesting != 0) {
451                 tdb->transaction->transaction_error = 1;
452                 tdb->transaction->nesting--;
453                 return;
454         }
455
456         tdb->map_size = tdb->transaction->old_map_size;
457
458         /* free all the transaction blocks */
459         for (i=0;i<tdb->transaction->num_blocks;i++) {
460                 if (tdb->transaction->blocks[i] != NULL) {
461                         free(tdb->transaction->blocks[i]);
462                 }
463         }
464         SAFE_FREE(tdb->transaction->blocks);
465
466         if (tdb->transaction->magic_offset) {
467                 const struct tdb_methods *methods = tdb->transaction->io_methods;
468                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
469
470                 /* remove the recovery marker */
471                 if (methods->write(tdb, tdb->transaction->magic_offset,
472                                    &invalid, sizeof(invalid)) == -1 ||
473                     transaction_sync(tdb, tdb->transaction->magic_offset,
474                                      sizeof(invalid)) == -1) {
475                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
476                                    "tdb_transaction_cancel: failed to remove"
477                                    " recovery magic");
478                 }
479         }
480
481         if (tdb->allrecord_lock.count)
482                 tdb_allrecord_unlock(tdb, tdb->allrecord_lock.ltype);
483
484         /* restore the normal io methods */
485         tdb->methods = tdb->transaction->io_methods;
486
487         tdb_transaction_unlock(tdb, F_WRLCK);
488
489         if (tdb_has_open_lock(tdb))
490                 tdb_unlock_open(tdb);
491
492         SAFE_FREE(tdb->transaction);
493 }
494
495 /*
496   start a tdb transaction. No token is returned, as only a single
497   transaction is allowed to be pending per tdb_context
498 */
499 int tdb_transaction_start(struct tdb_context *tdb)
500 {
501         /* some sanity checks */
502         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
503                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
504                            "tdb_transaction_start: cannot start a transaction"
505                            " on a read-only or internal db");
506                 return -1;
507         }
508
509         /* cope with nested tdb_transaction_start() calls */
510         if (tdb->transaction != NULL) {
511                 tdb_logerr(tdb, TDB_ERR_NESTING, TDB_DEBUG_ERROR,
512                            "tdb_transaction_start:"
513                            " already inside transaction");
514                 return -1;
515         }
516
517         if (tdb_has_hash_locks(tdb)) {
518                 /* the caller must not have any locks when starting a
519                    transaction as otherwise we'll be screwed by lack
520                    of nested locks in posix */
521                 tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
522                            "tdb_transaction_start: cannot start a transaction"
523                            " with locks held");
524                 return -1;
525         }
526
527         tdb->transaction = (struct tdb_transaction *)
528                 calloc(sizeof(struct tdb_transaction), 1);
529         if (tdb->transaction == NULL) {
530                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_ERROR,
531                            "tdb_transaction_start: cannot allocate");
532                 return -1;
533         }
534
535         /* get the transaction write lock. This is a blocking lock. As
536            discussed with Volker, there are a number of ways we could
537            make this async, which we will probably do in the future */
538         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
539                 SAFE_FREE(tdb->transaction->blocks);
540                 SAFE_FREE(tdb->transaction);
541                 return -1;
542         }
543
544         /* get a read lock over entire file. This is upgraded to a write
545            lock during the commit */
546         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
547                 goto fail_allrecord_lock;
548         }
549
550         /* make sure we know about any file expansions already done by
551            anyone else */
552         tdb->methods->oob(tdb, tdb->map_size + 1, true);
553         tdb->transaction->old_map_size = tdb->map_size;
554
555         /* finally hook the io methods, replacing them with
556            transaction specific methods */
557         tdb->transaction->io_methods = tdb->methods;
558         tdb->methods = &transaction_methods;
559         return 0;
560
561 fail_allrecord_lock:
562         tdb_transaction_unlock(tdb, F_WRLCK);
563         SAFE_FREE(tdb->transaction->blocks);
564         SAFE_FREE(tdb->transaction);
565         return -1;
566 }
567
568
569 /*
570   cancel the current transaction
571 */
572 void tdb_transaction_cancel(struct tdb_context *tdb)
573 {
574         _tdb_transaction_cancel(tdb);
575 }
576
577 /*
578   work out how much space the linearised recovery data will consume
579 */
580 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
581 {
582         tdb_len_t recovery_size = 0;
583         int i;
584
585         recovery_size = sizeof(tdb_len_t);
586         for (i=0;i<tdb->transaction->num_blocks;i++) {
587                 if (i * getpagesize() >= tdb->transaction->old_map_size) {
588                         break;
589                 }
590                 if (tdb->transaction->blocks[i] == NULL) {
591                         continue;
592                 }
593                 recovery_size += 2*sizeof(tdb_off_t);
594                 if (i == tdb->transaction->num_blocks-1) {
595                         recovery_size += tdb->transaction->last_block_size;
596                 } else {
597                         recovery_size += getpagesize();
598                 }
599         }
600
601         return recovery_size;
602 }
603
604 /*
605   allocate the recovery area, or use an existing recovery area if it is
606   large enough
607 */
608 static int tdb_recovery_allocate(struct tdb_context *tdb,
609                                  tdb_len_t *recovery_size,
610                                  tdb_off_t *recovery_offset,
611                                  tdb_len_t *recovery_max_size)
612 {
613         struct tdb_recovery_record rec;
614         const struct tdb_methods *methods = tdb->transaction->io_methods;
615         tdb_off_t recovery_head;
616         size_t addition;
617
618         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
619         if (recovery_head == TDB_OFF_ERR) {
620                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
621                          "tdb_recovery_allocate:"
622                          " failed to read recovery head");
623                 return -1;
624         }
625
626         if (recovery_head != 0) {
627                 if (methods->read(tdb, recovery_head, &rec, sizeof(rec))) {
628                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
629                                  "tdb_recovery_allocate:"
630                                  " failed to read recovery record");
631                         return -1;
632                 }
633                 tdb_convert(tdb, &rec, sizeof(rec));
634                 /* ignore invalid recovery regions: can happen in crash */
635                 if (rec.magic != TDB_RECOVERY_MAGIC &&
636                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
637                         recovery_head = 0;
638                 }
639         }
640
641         *recovery_size = tdb_recovery_size(tdb);
642
643         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
644                 /* it fits in the existing area */
645                 *recovery_max_size = rec.max_len;
646                 *recovery_offset = recovery_head;
647                 return 0;
648         }
649
650         /* we need to free up the old recovery area, then allocate a
651            new one at the end of the file. Note that we cannot use
652            normal allocation to allocate the new one as that might return
653            us an area that is being currently used (as of the start of
654            the transaction) */
655         if (recovery_head != 0) {
656                 add_stat(tdb, frees, 1);
657                 if (add_free_record(tdb, recovery_head,
658                                     sizeof(rec) + rec.max_len) != 0) {
659                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
660                                    "tdb_recovery_allocate:"
661                                    " failed to free previous recovery area");
662                         return -1;
663                 }
664         }
665
666         /* the tdb_free() call might have increased the recovery size */
667         *recovery_size = tdb_recovery_size(tdb);
668
669         /* round up to a multiple of page size */
670         *recovery_max_size
671                 = (((sizeof(rec) + *recovery_size) + getpagesize()-1)
672                    & ~(getpagesize()-1))
673                 - sizeof(rec);
674         *recovery_offset = tdb->map_size;
675         recovery_head = *recovery_offset;
676
677         /* Restore ->map_size before calling underlying expand_file.
678            Also so that we don't try to expand the file again in the
679            transaction commit, which would destroy the recovery
680            area */
681         addition = (tdb->map_size - tdb->transaction->old_map_size) +
682                 sizeof(rec) + *recovery_max_size;
683         tdb->map_size = tdb->transaction->old_map_size;
684         if (methods->expand_file(tdb, addition) == -1) {
685                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
686                          "tdb_recovery_allocate:"
687                          " failed to create recovery area");
688                 return -1;
689         }
690
691         /* we have to reset the old map size so that we don't try to
692            expand the file again in the transaction commit, which
693            would destroy the recovery area */
694         tdb->transaction->old_map_size = tdb->map_size;
695
696         /* write the recovery header offset and sync - we can sync without a race here
697            as the magic ptr in the recovery record has not been set */
698         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
699         if (methods->write(tdb, offsetof(struct tdb_header, recovery),
700                            &recovery_head, sizeof(tdb_off_t)) == -1) {
701                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
702                          "tdb_recovery_allocate:"
703                          " failed to write recovery head");
704                 return -1;
705         }
706         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
707                                    &recovery_head,
708                                    sizeof(tdb_off_t));
709         return 0;
710 }
711
712 /* Set up header for the recovery record. */
713 static void set_recovery_header(struct tdb_recovery_record *rec,
714                                 uint64_t magic,
715                                 uint64_t datalen, uint64_t actuallen,
716                                 uint64_t oldsize)
717 {
718         rec->magic = magic;
719         rec->max_len = actuallen;
720         rec->len = datalen;
721         rec->eof = oldsize;
722 }
723
724 /*
725   setup the recovery data that will be used on a crash during commit
726 */
727 static int transaction_setup_recovery(struct tdb_context *tdb,
728                                       tdb_off_t *magic_offset)
729 {
730         tdb_len_t recovery_size;
731         unsigned char *data, *p;
732         const struct tdb_methods *methods = tdb->transaction->io_methods;
733         struct tdb_recovery_record *rec;
734         tdb_off_t recovery_offset, recovery_max_size;
735         tdb_off_t old_map_size = tdb->transaction->old_map_size;
736         uint64_t magic, tailer;
737         int i;
738
739         /*
740           check that the recovery area has enough space
741         */
742         if (tdb_recovery_allocate(tdb, &recovery_size,
743                                   &recovery_offset, &recovery_max_size) == -1) {
744                 return -1;
745         }
746
747         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
748         if (data == NULL) {
749                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_FATAL,
750                            "transaction_setup_recovery: cannot allocate");
751                 return -1;
752         }
753
754         rec = (struct tdb_recovery_record *)data;
755         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
756                             recovery_size, recovery_max_size, old_map_size);
757         tdb_convert(tdb, rec, sizeof(*rec));
758
759         /* build the recovery data into a single blob to allow us to do a single
760            large write, which should be more efficient */
761         p = data + sizeof(*rec);
762         for (i=0;i<tdb->transaction->num_blocks;i++) {
763                 tdb_off_t offset;
764                 tdb_len_t length;
765
766                 if (tdb->transaction->blocks[i] == NULL) {
767                         continue;
768                 }
769
770                 offset = i * getpagesize();
771                 length = getpagesize();
772                 if (i == tdb->transaction->num_blocks-1) {
773                         length = tdb->transaction->last_block_size;
774                 }
775
776                 if (offset >= old_map_size) {
777                         continue;
778                 }
779                 if (offset + length > tdb->map_size) {
780                         tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_DEBUG_FATAL,
781                                    "tdb_transaction_setup_recovery:"
782                                    " transaction data over new region boundary");
783                         free(data);
784                         return -1;
785                 }
786                 memcpy(p, &offset, sizeof(offset));
787                 memcpy(p + sizeof(offset), &length, sizeof(length));
788                 tdb_convert(tdb, p, sizeof(offset) + sizeof(length));
789
790                 /* the recovery area contains the old data, not the
791                    new data, so we have to call the original tdb_read
792                    method to get it */
793                 if (methods->read(tdb, offset,
794                                   p + sizeof(offset) + sizeof(length),
795                                   length) != 0) {
796                         free(data);
797                         return -1;
798                 }
799                 p += sizeof(offset) + sizeof(length) + length;
800         }
801
802         /* and the tailer */
803         tailer = sizeof(*rec) + recovery_max_size;
804         memcpy(p, &tailer, sizeof(tailer));
805         tdb_convert(tdb, p, sizeof(tailer));
806
807         /* write the recovery data to the recovery area */
808         if (methods->write(tdb, recovery_offset, data,
809                            sizeof(*rec) + recovery_size) == -1) {
810                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
811                          "tdb_transaction_setup_recovery:"
812                          " failed to write recovery data");
813                 free(data);
814                 return -1;
815         }
816         transaction_write_existing(tdb, recovery_offset, data,
817                                    sizeof(*rec) + recovery_size);
818
819         /* as we don't have ordered writes, we have to sync the recovery
820            data before we update the magic to indicate that the recovery
821            data is present */
822         if (transaction_sync(tdb, recovery_offset,
823                              sizeof(*rec) + recovery_size) == -1) {
824                 free(data);
825                 return -1;
826         }
827
828         free(data);
829
830         magic = TDB_RECOVERY_MAGIC;
831         tdb_convert(tdb, &magic, sizeof(magic));
832
833         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
834                                                    magic);
835
836         if (methods->write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
837                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
838                          "tdb_transaction_setup_recovery:"
839                          " failed to write recovery magic");
840                 return -1;
841         }
842         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
843
844         /* ensure the recovery magic marker is on disk */
845         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
846                 return -1;
847         }
848
849         return 0;
850 }
851
852 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
853 {
854         const struct tdb_methods *methods;
855
856         if (tdb->transaction == NULL) {
857                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
858                            "tdb_transaction_prepare_commit: no transaction");
859                 return -1;
860         }
861
862         if (tdb->transaction->prepared) {
863                 _tdb_transaction_cancel(tdb);
864                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
865                            "tdb_transaction_prepare_commit:"
866                            " transaction already prepared");
867                 return -1;
868         }
869
870         if (tdb->transaction->transaction_error) {
871                 _tdb_transaction_cancel(tdb);
872                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
873                            "tdb_transaction_prepare_commit:"
874                            " transaction error pending");
875                 return -1;
876         }
877
878
879         if (tdb->transaction->nesting != 0) {
880                 tdb->transaction->nesting--;
881                 return 0;
882         }
883
884         /* check for a null transaction */
885         if (tdb->transaction->blocks == NULL) {
886                 return 0;
887         }
888
889         methods = tdb->transaction->io_methods;
890
891         /* upgrade the main transaction lock region to a write lock */
892         if (tdb_allrecord_upgrade(tdb) == -1) {
893                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_ERROR,
894                          "tdb_transaction_prepare_commit:"
895                          " failed to upgrade hash locks");
896                 _tdb_transaction_cancel(tdb);
897                 return -1;
898         }
899
900         /* get the open lock - this prevents new users attaching to the database
901            during the commit */
902         if (tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK) == -1) {
903                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_ERROR,
904                          "tdb_transaction_prepare_commit:"
905                          " failed to get open lock");
906                 _tdb_transaction_cancel(tdb);
907                 return -1;
908         }
909
910         /* Since we have whole db locked, we don't need the expansion lock. */
911         if (!(tdb->flags & TDB_NOSYNC)) {
912                 /* write the recovery data to the end of the file */
913                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
914                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
915                                  "tdb_transaction_prepare_commit:"
916                                  " failed to setup recovery data");
917                         _tdb_transaction_cancel(tdb);
918                         return -1;
919                 }
920         }
921
922         tdb->transaction->prepared = true;
923
924         /* expand the file to the new size if needed */
925         if (tdb->map_size != tdb->transaction->old_map_size) {
926                 tdb_len_t add = tdb->map_size - tdb->transaction->old_map_size;
927                 /* Restore original map size for tdb_expand_file */
928                 tdb->map_size = tdb->transaction->old_map_size;
929                 if (methods->expand_file(tdb, add) == -1) {
930                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_ERROR,
931                                  "tdb_transaction_prepare_commit:"
932                                  " expansion failed");
933                         _tdb_transaction_cancel(tdb);
934                         return -1;
935                 }
936         }
937
938         /* Keep the open lock until the actual commit */
939
940         return 0;
941 }
942
943 /*
944    prepare to commit the current transaction
945 */
946 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
947 {
948         return _tdb_transaction_prepare_commit(tdb);
949 }
950
951 /*
952   commit the current transaction
953 */
954 int tdb_transaction_commit(struct tdb_context *tdb)
955 {
956         const struct tdb_methods *methods;
957         int i;
958
959         if (tdb->transaction == NULL) {
960                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
961                          "tdb_transaction_commit: no transaction");
962                 return -1;
963         }
964
965         tdb_trace(tdb, "tdb_transaction_commit");
966
967         if (tdb->transaction->transaction_error) {
968                 tdb_transaction_cancel(tdb);
969                 tdb_logerr(tdb, TDB_ERR_IO, TDB_DEBUG_ERROR,
970                            "tdb_transaction_commit:"
971                            " transaction error pending");
972                 return -1;
973         }
974
975
976         if (tdb->transaction->nesting != 0) {
977                 tdb->transaction->nesting--;
978                 return 0;
979         }
980
981         /* check for a null transaction */
982         if (tdb->transaction->blocks == NULL) {
983                 _tdb_transaction_cancel(tdb);
984                 return 0;
985         }
986
987         if (!tdb->transaction->prepared) {
988                 int ret = _tdb_transaction_prepare_commit(tdb);
989                 if (ret)
990                         return ret;
991         }
992
993         methods = tdb->transaction->io_methods;
994
995         /* perform all the writes */
996         for (i=0;i<tdb->transaction->num_blocks;i++) {
997                 tdb_off_t offset;
998                 tdb_len_t length;
999
1000                 if (tdb->transaction->blocks[i] == NULL) {
1001                         continue;
1002                 }
1003
1004                 offset = i * getpagesize();
1005                 length = getpagesize();
1006                 if (i == tdb->transaction->num_blocks-1) {
1007                         length = tdb->transaction->last_block_size;
1008                 }
1009
1010                 if (methods->write(tdb, offset, tdb->transaction->blocks[i],
1011                                    length) == -1) {
1012                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1013                                    "tdb_transaction_commit:"
1014                                    " write failed during commit");
1015
1016                         /* we've overwritten part of the data and
1017                            possibly expanded the file, so we need to
1018                            run the crash recovery code */
1019                         tdb->methods = methods;
1020                         tdb_transaction_recover(tdb);
1021
1022                         _tdb_transaction_cancel(tdb);
1023
1024                         return -1;
1025                 }
1026                 SAFE_FREE(tdb->transaction->blocks[i]);
1027         }
1028
1029         SAFE_FREE(tdb->transaction->blocks);
1030         tdb->transaction->num_blocks = 0;
1031
1032         /* ensure the new data is on disk */
1033         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1034                 return -1;
1035         }
1036
1037         /*
1038           TODO: maybe write to some dummy hdr field, or write to magic
1039           offset without mmap, before the last sync, instead of the
1040           utime() call
1041         */
1042
1043         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1044            don't change the mtime of the file, this means the file may
1045            not be backed up (as tdb rounding to block sizes means that
1046            file size changes are quite rare too). The following forces
1047            mtime changes when a transaction completes */
1048 #if HAVE_UTIME
1049         utime(tdb->name, NULL);
1050 #endif
1051
1052         /* use a transaction cancel to free memory and remove the
1053            transaction locks */
1054         _tdb_transaction_cancel(tdb);
1055
1056         return 0;
1057 }
1058
1059
1060 /*
1061   recover from an aborted transaction. Must be called with exclusive
1062   database write access already established (including the open
1063   lock to prevent new processes attaching)
1064 */
1065 int tdb_transaction_recover(struct tdb_context *tdb)
1066 {
1067         tdb_off_t recovery_head, recovery_eof;
1068         unsigned char *data, *p;
1069         struct tdb_recovery_record rec;
1070
1071         /* find the recovery area */
1072         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1073         if (recovery_head == TDB_OFF_ERR) {
1074                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1075                          "tdb_transaction_recover:"
1076                          " failed to read recovery head");
1077                 return -1;
1078         }
1079
1080         if (recovery_head == 0) {
1081                 /* we have never allocated a recovery record */
1082                 return 0;
1083         }
1084
1085         /* read the recovery record */
1086         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1087                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1088                            "tdb_transaction_recover:"
1089                            " failed to read recovery record");
1090                 return -1;
1091         }
1092
1093         if (rec.magic != TDB_RECOVERY_MAGIC) {
1094                 /* there is no valid recovery data */
1095                 return 0;
1096         }
1097
1098         if (tdb->read_only) {
1099                 tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_DEBUG_FATAL,
1100                            "tdb_transaction_recover:"
1101                            " attempt to recover read only database");
1102                 return -1;
1103         }
1104
1105         recovery_eof = rec.eof;
1106
1107         data = (unsigned char *)malloc(rec.len);
1108         if (data == NULL) {
1109                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_FATAL,
1110                            "tdb_transaction_recover:"
1111                            " failed to allocate recovery data");
1112                 return -1;
1113         }
1114
1115         /* read the full recovery data */
1116         if (tdb->methods->read(tdb, recovery_head + sizeof(rec), data,
1117                                rec.len) == -1) {
1118                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1119                            "tdb_transaction_recover:"
1120                            " failed to read recovery data");
1121                 return -1;
1122         }
1123
1124         /* recover the file data */
1125         p = data;
1126         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1127                 tdb_off_t ofs;
1128                 tdb_len_t len;
1129                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1130                 memcpy(&ofs, p, sizeof(ofs));
1131                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1132                 p += sizeof(ofs) + sizeof(len);
1133
1134                 if (tdb->methods->write(tdb, ofs, p, len) == -1) {
1135                         free(data);
1136                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1137                                  "tdb_transaction_recover:"
1138                                  " failed to recover %zu bytes at offset %zu",
1139                                  (size_t)len, (size_t)ofs);
1140                         return -1;
1141                 }
1142                 p += len;
1143         }
1144
1145         free(data);
1146
1147         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1148                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1149                            "tdb_transaction_recover: failed to sync recovery");
1150                 return -1;
1151         }
1152
1153         /* if the recovery area is after the recovered eof then remove it */
1154         if (recovery_eof <= recovery_head) {
1155                 if (tdb_write_off(tdb, offsetof(struct tdb_header,recovery), 0)
1156                     == -1) {
1157                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1158                                  "tdb_transaction_recover:"
1159                                  " failed to remove recovery head");
1160                         return -1;
1161                 }
1162         }
1163
1164         /* remove the recovery magic */
1165         if (tdb_write_off(tdb,
1166                           recovery_head
1167                           + offsetof(struct tdb_recovery_record, magic),
1168                           TDB_RECOVERY_INVALID_MAGIC) == -1) {
1169                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1170                          "tdb_transaction_recover:"
1171                          " failed to remove recovery magic");
1172                 return -1;
1173         }
1174
1175         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1176                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1177                          "tdb_transaction_recover: failed to sync2 recovery");
1178                 return -1;
1179         }
1180
1181         tdb_logerr(tdb, TDB_SUCCESS, TDB_DEBUG_TRACE,
1182                    "tdb_transaction_recover: recovered %zu byte database",
1183                    (size_t)recovery_eof);
1184
1185         /* all done */
1186         return 0;
1187 }
1188
1189 /* Any I/O failures we say "needs recovery". */
1190 bool tdb_needs_recovery(struct tdb_context *tdb)
1191 {
1192         tdb_off_t recovery_head;
1193         struct tdb_recovery_record rec;
1194
1195         /* find the recovery area */
1196         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1197         if (recovery_head == TDB_OFF_ERR) {
1198                 return true;
1199         }
1200
1201         if (recovery_head == 0) {
1202                 /* we have never allocated a recovery record */
1203                 return false;
1204         }
1205
1206         /* read the recovery record */
1207         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1208                 return true;
1209         }
1210
1211         return (rec.magic == TDB_RECOVERY_MAGIC);
1212 }