2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 #include "tdb1_private.h"
31 - only allow a single transaction at a time per database. This makes
32 using the transaction API simpler, as otherwise the caller would
33 have to cope with temporary failures in transactions that conflict
34 with other current transactions
36 - keep the transaction recovery information in the same file as the
37 database, using a special 'transaction recovery' record pointed at
38 by the header. This removes the need for extra journal files as
39 used by some other databases
41 - dynamically allocated the transaction recover record, re-using it
42 for subsequent transactions. If a larger record is needed then
43 tdb1_free() the old record to place it on the normal tdb freelist
44 before allocating the new record
46 - during transactions, keep a linked list of writes all that have
47 been performed by intercepting all tdb1_write() calls. The hooked
48 transaction versions of tdb1_read() and tdb1_write() check this
49 linked list and try to use the elements of the list in preference
52 - don't allow any locks to be held when a transaction starts,
53 otherwise we can end up with deadlock (plus lack of lock nesting
54 in posix locks would mean the lock is lost)
56 - if the caller gains a lock during the transaction but doesn't
57 release it then fail the commit
59 - allow for nested calls to tdb1_transaction_start(), re-using the
60 existing transaction record. If the inner transaction is cancelled
61 then a subsequent commit will fail
63 - keep a mirrored copy of the tdb hash chain heads to allow for the
64 fast hash heads scan on traverse, updating the mirrored copy in
65 the transaction version of tdb1_write
67 - allow callers to mix transaction and non-transaction use of tdb,
68 although once a transaction is started then an exclusive lock is
69 gained until the transaction is committed or cancelled
71 - the commit stategy involves first saving away all modified data
72 into a linearised buffer in the transaction recovery area, then
73 marking the transaction recovery area with a magic value to
74 indicate a valid recovery record. In total 4 fsync/msync calls are
75 needed per commit to prevent race conditions. It might be possible
76 to reduce this to 3 or even 2 with some more work.
78 - check for a valid recovery record on open of the tdb, while the
79 open lock is held. Automatically recover from the transaction
80 recovery area if needed, then continue with the open as
81 usual. This allows for smooth crash recovery with no administrator
84 - if TDB1_NOSYNC is passed to flags in tdb1_open then transactions are
85 still available, but no transaction recovery area is used and no
86 fsync/msync calls are made.
88 - if TDB1_ALLOW_NESTING is passed to flags in tdb open, or added using
89 tdb1_add_flags() transaction nesting is enabled.
90 It resets the TDB1_DISALLOW_NESTING flag, as both cannot be used together.
91 The default is that transaction nesting is allowed.
92 Note: this default may change in future versions of tdb.
94 Beware. when transactions are nested a transaction successfully
95 completed with tdb1_transaction_commit() can be silently unrolled later.
97 - if TDB1_DISALLOW_NESTING is passed to flags in tdb open, or added using
98 tdb1_add_flags() transaction nesting is disabled.
99 It resets the TDB1_ALLOW_NESTING flag, as both cannot be used together.
100 An attempt create a nested transaction will fail with TDB_ERR_EINVAL.
101 The default is that transaction nesting is allowed.
102 Note: this default may change in future versions of tdb.
107 hold the context of any current transaction
109 struct tdb1_transaction {
110 /* we keep a mirrored copy of the tdb hash heads here so
111 tdb1_next_hash_chain() can operate efficiently */
112 uint32_t *hash_heads;
114 /* the original io methods - used to do IOs to the real db */
115 const struct tdb1_methods *io_methods;
117 /* the list of transaction blocks. When a block is first
118 written to, it gets created in this list */
121 uint32_t block_size; /* bytes in each block */
122 uint32_t last_block_size; /* number of valid bytes in the last block */
124 /* non-zero when an internal transaction error has
125 occurred. All write operations will then fail until the
126 transaction is ended */
127 int transaction_error;
129 /* when inside a transaction we need to keep track of any
130 nested tdb1_transaction_start() calls, as these are allowed,
131 but don't create a new transaction */
134 /* set when a prepare has already occurred */
136 tdb1_off_t magic_offset;
138 /* old file size before transaction */
139 tdb1_len_t old_map_size;
141 /* did we expand in this transaction */
147 read while in a transaction. We need to check first if the data is in our list
148 of transaction elements, then if not do a real read
150 static int transaction1_read(struct tdb1_context *tdb, tdb1_off_t off, void *buf,
151 tdb1_len_t len, int cv)
155 /* break it down into block sized ops */
156 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
157 tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
158 if (transaction1_read(tdb, off, buf, len2, cv) != 0) {
163 buf = (void *)(len2 + (char *)buf);
170 blk = off / tdb->transaction->block_size;
172 /* see if we have it in the block list */
173 if (tdb->transaction->num_blocks <= blk ||
174 tdb->transaction->blocks[blk] == NULL) {
175 /* nope, do a real read */
176 if (tdb->transaction->io_methods->tdb1_read(tdb, off, buf, len, cv) != 0) {
182 /* it is in the block list. Now check for the last block */
183 if (blk == tdb->transaction->num_blocks-1) {
184 if (len > tdb->transaction->last_block_size) {
189 /* now copy it out of this block */
190 memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
192 tdb1_convert(buf, len);
197 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
198 "transaction_read: failed at off=%d len=%d",
200 tdb->transaction->transaction_error = 1;
206 write while in a transaction
208 static int transaction1_write(struct tdb1_context *tdb, tdb1_off_t off,
209 const void *buf, tdb1_len_t len)
213 /* Only a commit is allowed on a prepared transaction */
214 if (tdb->transaction->prepared) {
215 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
216 "transaction_write: transaction already"
217 " prepared, write not allowed");
218 tdb->transaction->transaction_error = 1;
222 /* if the write is to a hash head, then update the transaction
224 if (len == sizeof(tdb1_off_t) && off >= TDB1_FREELIST_TOP &&
225 off < TDB1_FREELIST_TOP+TDB1_HASHTABLE_SIZE(tdb)) {
226 uint32_t chain = (off-TDB1_FREELIST_TOP) / sizeof(tdb1_off_t);
227 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
230 /* break it up into block sized chunks */
231 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
232 tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
233 if (transaction1_write(tdb, off, buf, len2) != 0) {
239 buf = (const void *)(len2 + (const char *)buf);
247 blk = off / tdb->transaction->block_size;
248 off = off % tdb->transaction->block_size;
250 if (tdb->transaction->num_blocks <= blk) {
251 uint8_t **new_blocks;
252 /* expand the blocks array */
253 if (tdb->transaction->blocks == NULL) {
254 new_blocks = (uint8_t **)malloc(
255 (blk+1)*sizeof(uint8_t *));
257 new_blocks = (uint8_t **)realloc(
258 tdb->transaction->blocks,
259 (blk+1)*sizeof(uint8_t *));
261 if (new_blocks == NULL) {
262 tdb->last_error = TDB_ERR_OOM;
265 memset(&new_blocks[tdb->transaction->num_blocks], 0,
266 (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
267 tdb->transaction->blocks = new_blocks;
268 tdb->transaction->num_blocks = blk+1;
269 tdb->transaction->last_block_size = 0;
272 /* allocate and fill a block? */
273 if (tdb->transaction->blocks[blk] == NULL) {
274 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
275 if (tdb->transaction->blocks[blk] == NULL) {
276 tdb->last_error = TDB_ERR_OOM;
277 tdb->transaction->transaction_error = 1;
280 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
281 tdb1_len_t len2 = tdb->transaction->block_size;
282 if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
283 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
285 if (tdb->transaction->io_methods->tdb1_read(tdb, blk * tdb->transaction->block_size,
286 tdb->transaction->blocks[blk],
288 SAFE_FREE(tdb->transaction->blocks[blk]);
289 tdb->last_error = TDB_ERR_IO;
292 if (blk == tdb->transaction->num_blocks-1) {
293 tdb->transaction->last_block_size = len2;
298 /* overwrite part of an existing block */
300 memset(tdb->transaction->blocks[blk] + off, 0, len);
302 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
304 if (blk == tdb->transaction->num_blocks-1) {
305 if (len + off > tdb->transaction->last_block_size) {
306 tdb->transaction->last_block_size = len + off;
313 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
314 "transaction_write: failed at off=%d len=%d",
315 (blk*tdb->transaction->block_size) + off, len);
316 tdb->transaction->transaction_error = 1;
322 write while in a transaction - this varient never expands the transaction blocks, it only
323 updates existing blocks. This means it cannot change the recovery size
325 static int transaction1_write_existing(struct tdb1_context *tdb, tdb1_off_t off,
326 const void *buf, tdb1_len_t len)
330 /* break it up into block sized chunks */
331 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
332 tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
333 if (transaction1_write_existing(tdb, off, buf, len2) != 0) {
339 buf = (const void *)(len2 + (const char *)buf);
347 blk = off / tdb->transaction->block_size;
348 off = off % tdb->transaction->block_size;
350 if (tdb->transaction->num_blocks <= blk ||
351 tdb->transaction->blocks[blk] == NULL) {
355 if (blk == tdb->transaction->num_blocks-1 &&
356 off + len > tdb->transaction->last_block_size) {
357 if (off >= tdb->transaction->last_block_size) {
360 len = tdb->transaction->last_block_size - off;
363 /* overwrite part of an existing block */
364 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
371 accelerated hash chain head search, using the cached hash heads
373 static void transaction1_next_hash_chain(struct tdb1_context *tdb, uint32_t *chain)
376 for (;h < tdb->header.hash_size;h++) {
377 /* the +1 takes account of the freelist */
378 if (0 != tdb->transaction->hash_heads[h+1]) {
386 out of bounds check during a transaction
388 static int transaction1_oob(struct tdb1_context *tdb, tdb1_off_t len, int probe)
390 if (len <= tdb->map_size) {
393 tdb->last_error = TDB_ERR_IO;
398 transaction version of tdb1_expand().
400 static int transaction1_expand_file(struct tdb1_context *tdb, tdb1_off_t size,
403 /* add a write to the transaction elements, so subsequent
404 reads see the zero data */
405 if (transaction1_write(tdb, size, NULL, addition) != 0) {
409 tdb->transaction->expanded = true;
414 static const struct tdb1_methods transaction1_methods = {
417 transaction1_next_hash_chain,
419 transaction1_expand_file,
424 start a tdb transaction. No token is returned, as only a single
425 transaction is allowed to be pending per tdb1_context
427 static int _tdb1_transaction_start(struct tdb1_context *tdb)
429 /* some sanity checks */
430 if (tdb->read_only || (tdb->flags & TDB1_INTERNAL) || tdb->traverse_read) {
431 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
432 "tdb1_transaction_start: cannot start a"
433 " transaction on a read-only or"
438 /* cope with nested tdb1_transaction_start() calls */
439 if (tdb->transaction != NULL) {
440 if (!(tdb->flags & TDB1_ALLOW_NESTING)) {
441 tdb->last_error = TDB_ERR_EINVAL;
444 tdb->transaction->nesting++;
448 if (tdb1_have_extra_locks(tdb)) {
449 /* the caller must not have any locks when starting a
450 transaction as otherwise we'll be screwed by lack
451 of nested locks in posix */
452 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
453 "tdb1_transaction_start: cannot start a"
454 " transaction with locks held");
458 if (tdb->travlocks.next != NULL) {
459 /* you cannot use transactions inside a traverse (although you can use
460 traverse inside a transaction) as otherwise you can end up with
462 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
463 "tdb1_transaction_start: cannot start a"
464 " transaction within a traverse");
468 tdb->transaction = (struct tdb1_transaction *)
469 calloc(sizeof(struct tdb1_transaction), 1);
470 if (tdb->transaction == NULL) {
471 tdb->last_error = TDB_ERR_OOM;
475 /* a page at a time seems like a reasonable compromise between compactness and efficiency */
476 tdb->transaction->block_size = tdb->page_size;
478 /* get the transaction write lock. This is a blocking lock. As
479 discussed with Volker, there are a number of ways we could
480 make this async, which we will probably do in the future */
481 if (tdb1_transaction_lock(tdb, F_WRLCK, TDB1_LOCK_WAIT) == -1) {
482 SAFE_FREE(tdb->transaction->blocks);
483 SAFE_FREE(tdb->transaction);
487 /* get a read lock from the freelist to the end of file. This
488 is upgraded to a write lock during the commit */
489 if (tdb1_allrecord_lock(tdb, F_RDLCK, TDB1_LOCK_WAIT, true) == -1) {
490 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
491 "tdb1_transaction_start: failed to get hash locks");
492 goto fail_allrecord_lock;
495 /* setup a copy of the hash table heads so the hash scan in
496 traverse can be fast */
497 tdb->transaction->hash_heads = (uint32_t *)
498 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
499 if (tdb->transaction->hash_heads == NULL) {
500 tdb->last_error = TDB_ERR_OOM;
503 if (tdb->methods->tdb1_read(tdb, TDB1_FREELIST_TOP, tdb->transaction->hash_heads,
504 TDB1_HASHTABLE_SIZE(tdb), 0) != 0) {
505 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
506 "tdb1_transaction_start: failed to read hash heads");
510 /* make sure we know about any file expansions already done by
512 tdb->methods->tdb1_oob(tdb, tdb->map_size + 1, 1);
513 tdb->transaction->old_map_size = tdb->map_size;
515 /* finally hook the io methods, replacing them with
516 transaction specific methods */
517 tdb->transaction->io_methods = tdb->methods;
518 tdb->methods = &transaction1_methods;
523 tdb1_allrecord_unlock(tdb, F_RDLCK);
525 tdb1_transaction_unlock(tdb, F_WRLCK);
526 SAFE_FREE(tdb->transaction->blocks);
527 SAFE_FREE(tdb->transaction->hash_heads);
528 SAFE_FREE(tdb->transaction);
532 int tdb1_transaction_start(struct tdb1_context *tdb)
534 return _tdb1_transaction_start(tdb);
540 static int transaction1_sync(struct tdb1_context *tdb, tdb1_off_t offset, tdb1_len_t length)
542 if (tdb->flags & TDB1_NOSYNC) {
547 if (fdatasync(tdb->fd) != 0) {
549 if (fsync(tdb->fd) != 0) {
551 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
552 "tdb1_transaction: fsync failed");
557 tdb1_off_t moffset = offset & ~(tdb->page_size-1);
558 if (msync(moffset + (char *)tdb->map_ptr,
559 length + (offset - moffset), MS_SYNC) != 0) {
560 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
562 " msync failed - %s",
572 static int _tdb1_transaction_cancel(struct tdb1_context *tdb)
576 if (tdb->transaction == NULL) {
577 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
578 "tdb1_transaction_cancel:"
583 if (tdb->transaction->nesting != 0) {
584 tdb->transaction->transaction_error = 1;
585 tdb->transaction->nesting--;
589 tdb->map_size = tdb->transaction->old_map_size;
591 /* free all the transaction blocks */
592 for (i=0;i<tdb->transaction->num_blocks;i++) {
593 if (tdb->transaction->blocks[i] != NULL) {
594 free(tdb->transaction->blocks[i]);
597 SAFE_FREE(tdb->transaction->blocks);
599 if (tdb->transaction->magic_offset) {
600 const struct tdb1_methods *methods = tdb->transaction->io_methods;
601 const uint32_t invalid = TDB1_RECOVERY_INVALID_MAGIC;
603 /* remove the recovery marker */
604 if (methods->tdb1_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
605 transaction1_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
606 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
607 "tdb1_transaction_cancel: failed to"
608 " remove recovery magic");
613 /* This also removes the OPEN_LOCK, if we have it. */
614 tdb1_release_transaction_locks(tdb);
616 /* restore the normal io methods */
617 tdb->methods = tdb->transaction->io_methods;
619 SAFE_FREE(tdb->transaction->hash_heads);
620 SAFE_FREE(tdb->transaction);
626 cancel the current transaction
628 int tdb1_transaction_cancel(struct tdb1_context *tdb)
630 return _tdb1_transaction_cancel(tdb);
634 work out how much space the linearised recovery data will consume
636 static tdb1_len_t tdb1_recovery_size(struct tdb1_context *tdb)
638 tdb1_len_t recovery_size = 0;
641 recovery_size = sizeof(uint32_t);
642 for (i=0;i<tdb->transaction->num_blocks;i++) {
643 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
646 if (tdb->transaction->blocks[i] == NULL) {
649 recovery_size += 2*sizeof(tdb1_off_t);
650 if (i == tdb->transaction->num_blocks-1) {
651 recovery_size += tdb->transaction->last_block_size;
653 recovery_size += tdb->transaction->block_size;
657 return recovery_size;
660 int tdb1_recovery_area(struct tdb1_context *tdb,
661 const struct tdb1_methods *methods,
662 tdb1_off_t *recovery_offset,
663 struct tdb1_record *rec)
665 if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, recovery_offset) == -1) {
669 if (*recovery_offset == 0) {
674 if (methods->tdb1_read(tdb, *recovery_offset, rec, sizeof(*rec),
675 TDB1_DOCONV()) == -1) {
679 /* ignore invalid recovery regions: can happen in crash */
680 if (rec->magic != TDB1_RECOVERY_MAGIC &&
681 rec->magic != TDB1_RECOVERY_INVALID_MAGIC) {
682 *recovery_offset = 0;
689 allocate the recovery area, or use an existing recovery area if it is
692 static int tdb1_recovery_allocate(struct tdb1_context *tdb,
693 tdb1_len_t *recovery_size,
694 tdb1_off_t *recovery_offset,
695 tdb1_len_t *recovery_max_size)
697 struct tdb1_record rec;
698 const struct tdb1_methods *methods = tdb->transaction->io_methods;
699 tdb1_off_t recovery_head;
701 if (tdb1_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
702 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
703 "tdb1_recovery_allocate:"
704 " failed to read recovery head");
708 *recovery_size = tdb1_recovery_size(tdb);
710 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
711 /* it fits in the existing area */
712 *recovery_max_size = rec.rec_len;
713 *recovery_offset = recovery_head;
717 /* we need to free up the old recovery area, then allocate a
718 new one at the end of the file. Note that we cannot use
719 tdb1_allocate() to allocate the new one as that might return
720 us an area that is being currently used (as of the start of
722 if (recovery_head != 0) {
723 if (tdb1_free(tdb, recovery_head, &rec) == -1) {
724 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
725 "tdb1_recovery_allocate: failed to free"
726 " previous recovery area");
731 /* the tdb1_free() call might have increased the recovery size */
732 *recovery_size = tdb1_recovery_size(tdb);
734 /* round up to a multiple of page size */
735 *recovery_max_size = TDB1_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
736 *recovery_offset = tdb->map_size;
737 recovery_head = *recovery_offset;
739 if (methods->tdb1_expand_file(tdb, tdb->transaction->old_map_size,
740 (tdb->map_size - tdb->transaction->old_map_size) +
741 sizeof(rec) + *recovery_max_size) == -1) {
742 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
743 "tdb1_recovery_allocate:"
744 " failed to create recovery area");
748 /* remap the file (if using mmap) */
749 methods->tdb1_oob(tdb, tdb->map_size + 1, 1);
751 /* we have to reset the old map size so that we don't try to expand the file
752 again in the transaction commit, which would destroy the recovery area */
753 tdb->transaction->old_map_size = tdb->map_size;
755 /* write the recovery header offset and sync - we can sync without a race here
756 as the magic ptr in the recovery record has not been set */
757 TDB1_CONV(recovery_head);
758 if (methods->tdb1_write(tdb, TDB1_RECOVERY_HEAD,
759 &recovery_head, sizeof(tdb1_off_t)) == -1) {
760 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
761 "tdb1_recovery_allocate:"
762 " failed to write recovery head");
765 if (transaction1_write_existing(tdb, TDB1_RECOVERY_HEAD, &recovery_head, sizeof(tdb1_off_t)) == -1) {
766 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
767 "tdb1_recovery_allocate:"
768 " failed to write recovery head");
777 setup the recovery data that will be used on a crash during commit
779 static int transaction1_setup_recovery(struct tdb1_context *tdb,
780 tdb1_off_t *magic_offset)
782 tdb1_len_t recovery_size;
783 unsigned char *data, *p;
784 const struct tdb1_methods *methods = tdb->transaction->io_methods;
785 struct tdb1_record *rec;
786 tdb1_off_t recovery_offset, recovery_max_size;
787 tdb1_off_t old_map_size = tdb->transaction->old_map_size;
788 uint32_t magic, tailer;
792 check that the recovery area has enough space
794 if (tdb1_recovery_allocate(tdb, &recovery_size,
795 &recovery_offset, &recovery_max_size) == -1) {
799 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
801 tdb->last_error = TDB_ERR_OOM;
805 rec = (struct tdb1_record *)data;
806 memset(rec, 0, sizeof(*rec));
808 rec->magic = TDB1_RECOVERY_INVALID_MAGIC;
809 rec->data_len = recovery_size;
810 rec->rec_len = recovery_max_size;
811 rec->key_len = old_map_size;
814 /* build the recovery data into a single blob to allow us to do a single
815 large write, which should be more efficient */
816 p = data + sizeof(*rec);
817 for (i=0;i<tdb->transaction->num_blocks;i++) {
821 if (tdb->transaction->blocks[i] == NULL) {
825 offset = i * tdb->transaction->block_size;
826 length = tdb->transaction->block_size;
827 if (i == tdb->transaction->num_blocks-1) {
828 length = tdb->transaction->last_block_size;
831 if (offset >= old_map_size) {
834 if (offset + length > tdb->transaction->old_map_size) {
835 tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT,
837 "tdb1_transaction_setup_recovery: transaction data over new region boundary");
841 memcpy(p, &offset, 4);
842 memcpy(p+4, &length, 4);
846 /* the recovery area contains the old data, not the
847 new data, so we have to call the original tdb1_read
849 if (methods->tdb1_read(tdb, offset, p + 8, length, 0) != 0) {
851 tdb->last_error = TDB_ERR_IO;
858 tailer = sizeof(*rec) + recovery_max_size;
859 memcpy(p, &tailer, 4);
864 /* write the recovery data to the recovery area */
865 if (methods->tdb1_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
866 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
867 "tdb1_transaction_setup_recovery:"
868 " failed to write recovery data");
872 if (transaction1_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
873 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
874 "tdb1_transaction_setup_recovery: failed to write"
875 " secondary recovery data");
880 /* as we don't have ordered writes, we have to sync the recovery
881 data before we update the magic to indicate that the recovery
883 if (transaction1_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
890 magic = TDB1_RECOVERY_MAGIC;
893 *magic_offset = recovery_offset + offsetof(struct tdb1_record, magic);
895 if (methods->tdb1_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
896 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
897 "tdb1_transaction_setup_recovery:"
898 " failed to write recovery magic");
901 if (transaction1_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
902 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
903 "tdb1_transaction_setup_recovery:"
904 " failed to write secondary recovery magic");
908 /* ensure the recovery magic marker is on disk */
909 if (transaction1_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
916 static int _tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
918 const struct tdb1_methods *methods;
920 if (tdb->transaction == NULL) {
921 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
922 "tdb1_transaction_prepare_commit:"
927 if (tdb->transaction->prepared) {
928 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
929 "tdb1_transaction_prepare_commit:"
930 " transaction already prepared");
931 _tdb1_transaction_cancel(tdb);
935 if (tdb->transaction->transaction_error) {
936 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
937 "tdb1_transaction_prepare_commit:"
938 " transaction error pending");
939 _tdb1_transaction_cancel(tdb);
944 if (tdb->transaction->nesting != 0) {
948 /* check for a null transaction */
949 if (tdb->transaction->blocks == NULL) {
953 methods = tdb->transaction->io_methods;
955 /* if there are any locks pending then the caller has not
956 nested their locks properly, so fail the transaction */
957 if (tdb1_have_extra_locks(tdb)) {
958 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
959 "tdb1_transaction_prepare_commit:"
960 " locks pending on commit");
961 _tdb1_transaction_cancel(tdb);
965 /* upgrade the main transaction lock region to a write lock */
966 if (tdb1_allrecord_upgrade(tdb) == -1) {
967 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
968 "tdb1_transaction_prepare_commit:"
969 " failed to upgrade hash locks");
970 _tdb1_transaction_cancel(tdb);
974 /* get the open lock - this prevents new users attaching to the database
976 if (tdb1_nest_lock(tdb, TDB1_OPEN_LOCK, F_WRLCK, TDB1_LOCK_WAIT) == -1) {
977 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
978 "tdb1_transaction_prepare_commit:"
979 " failed to get open lock");
980 _tdb1_transaction_cancel(tdb);
984 if (!(tdb->flags & TDB1_NOSYNC)) {
985 /* write the recovery data to the end of the file */
986 if (transaction1_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
987 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
988 "tdb1_transaction_prepare_commit:"
989 " failed to setup recovery data");
990 _tdb1_transaction_cancel(tdb);
995 tdb->transaction->prepared = true;
997 /* expand the file to the new size if needed */
998 if (tdb->map_size != tdb->transaction->old_map_size) {
999 if (methods->tdb1_expand_file(tdb, tdb->transaction->old_map_size,
1001 tdb->transaction->old_map_size) == -1) {
1002 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1003 "tdb1_transaction_prepare_commit:"
1004 " expansion failed");
1005 _tdb1_transaction_cancel(tdb);
1008 tdb->map_size = tdb->transaction->old_map_size;
1009 methods->tdb1_oob(tdb, tdb->map_size + 1, 1);
1012 /* Keep the open lock until the actual commit */
1018 prepare to commit the current transaction
1020 int tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
1022 return _tdb1_transaction_prepare_commit(tdb);
1025 /* A repack is worthwhile if the largest is less than half total free. */
1026 static bool repack_worthwhile(struct tdb1_context *tdb)
1029 struct tdb1_record rec;
1030 tdb1_len_t total = 0, largest = 0;
1032 if (tdb1_ofs_read(tdb, TDB1_FREELIST_TOP, &ptr) == -1) {
1036 while (ptr != 0 && tdb1_rec_free_read(tdb, ptr, &rec) == 0) {
1037 total += rec.rec_len;
1038 if (rec.rec_len > largest) {
1039 largest = rec.rec_len;
1044 return total > largest * 2;
1048 commit the current transaction
1050 int tdb1_transaction_commit(struct tdb1_context *tdb)
1052 const struct tdb1_methods *methods;
1054 bool need_repack = false;
1056 if (tdb->transaction == NULL) {
1057 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
1058 "tdb1_transaction_commit:"
1063 if (tdb->transaction->transaction_error) {
1064 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
1065 "tdb1_transaction_commit:"
1066 " transaction error pending");
1067 _tdb1_transaction_cancel(tdb);
1072 if (tdb->transaction->nesting != 0) {
1073 tdb->transaction->nesting--;
1077 /* check for a null transaction */
1078 if (tdb->transaction->blocks == NULL) {
1079 _tdb1_transaction_cancel(tdb);
1083 if (!tdb->transaction->prepared) {
1084 int ret = _tdb1_transaction_prepare_commit(tdb);
1089 methods = tdb->transaction->io_methods;
1091 /* perform all the writes */
1092 for (i=0;i<tdb->transaction->num_blocks;i++) {
1096 if (tdb->transaction->blocks[i] == NULL) {
1100 offset = i * tdb->transaction->block_size;
1101 length = tdb->transaction->block_size;
1102 if (i == tdb->transaction->num_blocks-1) {
1103 length = tdb->transaction->last_block_size;
1106 if (methods->tdb1_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1107 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1108 "tdb1_transaction_commit:"
1109 " write failed during commit");
1111 /* we've overwritten part of the data and
1112 possibly expanded the file, so we need to
1113 run the crash recovery code */
1114 tdb->methods = methods;
1115 tdb1_transaction_recover(tdb);
1117 _tdb1_transaction_cancel(tdb);
1119 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1120 "tdb1_transaction_commit: write failed");
1123 SAFE_FREE(tdb->transaction->blocks[i]);
1126 /* Do this before we drop lock or blocks. */
1127 if (tdb->transaction->expanded) {
1128 need_repack = repack_worthwhile(tdb);
1131 SAFE_FREE(tdb->transaction->blocks);
1132 tdb->transaction->num_blocks = 0;
1134 /* ensure the new data is on disk */
1135 if (transaction1_sync(tdb, 0, tdb->map_size) == -1) {
1140 TODO: maybe write to some dummy hdr field, or write to magic
1141 offset without mmap, before the last sync, instead of the
1145 /* on some systems (like Linux 2.6.x) changes via mmap/msync
1146 don't change the mtime of the file, this means the file may
1147 not be backed up (as tdb rounding to block sizes means that
1148 file size changes are quite rare too). The following forces
1149 mtime changes when a transaction completes */
1151 utime(tdb->name, NULL);
1154 /* use a transaction cancel to free memory and remove the
1155 transaction locks */
1156 _tdb1_transaction_cancel(tdb);
1159 return tdb1_repack(tdb);
1167 recover from an aborted transaction. Must be called with exclusive
1168 database write access already established (including the open
1169 lock to prevent new processes attaching)
1171 int tdb1_transaction_recover(struct tdb1_context *tdb)
1173 tdb1_off_t recovery_head, recovery_eof;
1174 unsigned char *data, *p;
1176 struct tdb1_record rec;
1178 /* find the recovery area */
1179 if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1180 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1181 "tdb1_transaction_recover:"
1182 " failed to read recovery head");
1186 if (recovery_head == 0) {
1187 /* we have never allocated a recovery record */
1191 /* read the recovery record */
1192 if (tdb->methods->tdb1_read(tdb, recovery_head, &rec,
1193 sizeof(rec), TDB1_DOCONV()) == -1) {
1194 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1195 "tdb1_transaction_recover:"
1196 " failed to read recovery record");
1200 if (rec.magic != TDB1_RECOVERY_MAGIC) {
1201 /* there is no valid recovery data */
1205 if (tdb->read_only) {
1206 tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1207 "tdb1_transaction_recover:"
1208 " attempt to recover read only"
1213 recovery_eof = rec.key_len;
1215 data = (unsigned char *)malloc(rec.data_len);
1217 tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1218 "tdb1_transaction_recover:"
1219 " failed to allocate recovery data");
1223 /* read the full recovery data */
1224 if (tdb->methods->tdb1_read(tdb, recovery_head + sizeof(rec), data,
1225 rec.data_len, 0) == -1) {
1226 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1227 "tdb1_transaction_recover:"
1228 " failed to read recovery data");
1232 /* recover the file data */
1234 while (p+8 < data + rec.data_len) {
1236 if (TDB1_DOCONV()) {
1240 memcpy(&len, p+4, 4);
1242 if (tdb->methods->tdb1_write(tdb, ofs, p+8, len) == -1) {
1244 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1245 "tdb1_transaction_recover: failed to recover"
1246 " %d bytes at offset %d", len, ofs);
1254 if (transaction1_sync(tdb, 0, tdb->map_size) == -1) {
1255 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1256 "tdb1_transaction_recover: failed to sync recovery");
1260 /* if the recovery area is after the recovered eof then remove it */
1261 if (recovery_eof <= recovery_head) {
1262 if (tdb1_ofs_write(tdb, TDB1_RECOVERY_HEAD, &zero) == -1) {
1263 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1264 "tdb1_transaction_recover: failed to remove"
1270 /* remove the recovery magic */
1271 if (tdb1_ofs_write(tdb, recovery_head + offsetof(struct tdb1_record, magic),
1273 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1274 "tdb1_transaction_recover: failed to remove"
1279 if (transaction1_sync(tdb, 0, recovery_eof) == -1) {
1280 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1281 "tdb1_transaction_recover:"
1282 " failed to sync2 recovery");
1286 tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1287 "tdb1_transaction_recover: recovered %d byte database",
1294 /* Any I/O failures we say "needs recovery". */
1295 bool tdb1_needs_recovery(struct tdb1_context *tdb)
1297 tdb1_off_t recovery_head;
1298 struct tdb1_record rec;
1300 /* find the recovery area */
1301 if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1305 if (recovery_head == 0) {
1306 /* we have never allocated a recovery record */
1310 /* read the recovery record */
1311 if (tdb->methods->tdb1_read(tdb, recovery_head, &rec,
1312 sizeof(rec), TDB1_DOCONV()) == -1) {
1316 return (rec.magic == TDB1_RECOVERY_MAGIC);