2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 #include "tdb_private.h"
31 - only allow a single transaction at a time per database. This makes
32 using the transaction API simpler, as otherwise the caller would
33 have to cope with temporary failures in transactions that conflict
34 with other current transactions
36 - keep the transaction recovery information in the same file as the
37 database, using a special 'transaction recovery' record pointed at
38 by the header. This removes the need for extra journal files as
39 used by some other databases
41 - dynamically allocated the transaction recover record, re-using it
42 for subsequent transactions. If a larger record is needed then
43 tdb_free() the old record to place it on the normal tdb freelist
44 before allocating the new record
46 - during transactions, keep a linked list of writes all that have
47 been performed by intercepting all tdb_write() calls. The hooked
48 transaction versions of tdb_read() and tdb_write() check this
49 linked list and try to use the elements of the list in preference
52 - don't allow any locks to be held when a transaction starts,
53 otherwise we can end up with deadlock (plus lack of lock nesting
54 in posix locks would mean the lock is lost)
56 - if the caller gains a lock during the transaction but doesn't
57 release it then fail the commit
59 - allow for nested calls to tdb_transaction_start(), re-using the
60 existing transaction record. If the inner transaction is cancelled
61 then a subsequent commit will fail
63 - keep a mirrored copy of the tdb hash chain heads to allow for the
64 fast hash heads scan on traverse, updating the mirrored copy in
65 the transaction version of tdb_write
67 - allow callers to mix transaction and non-transaction use of tdb,
68 although once a transaction is started then an exclusive lock is
69 gained until the transaction is committed or cancelled
71 - the commit stategy involves first saving away all modified data
72 into a linearised buffer in the transaction recovery area, then
73 marking the transaction recovery area with a magic value to
74 indicate a valid recovery record. In total 4 fsync/msync calls are
75 needed per commit to prevent race conditions. It might be possible
76 to reduce this to 3 or even 2 with some more work.
78 - check for a valid recovery record on open of the tdb, while the
79 global lock is held. Automatically recover from the transaction
80 recovery area if needed, then continue with the open as
81 usual. This allows for smooth crash recovery with no administrator
84 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85 still available, but no transaction recovery area is used and no
86 fsync/msync calls are made.
88 - if TDB_NO_NESTING is passed to flags in tdb open then transaction
89 nesting is disabled. tdb_transaction_start() will then implicitely
90 cancel any pending transactions and always start a new transaction
91 context instead of nesting.
97 hold the context of any current transaction
99 struct tdb_transaction {
100 /* we keep a mirrored copy of the tdb hash heads here so
101 tdb_next_hash_chain() can operate efficiently */
102 uint32_t *hash_heads;
104 /* the original io methods - used to do IOs to the real db */
105 const struct tdb_methods *io_methods;
107 /* the list of transaction blocks. When a block is first
108 written to, it gets created in this list */
111 uint32_t block_size; /* bytes in each block */
112 uint32_t last_block_size; /* number of valid bytes in the last block */
114 /* non-zero when an internal transaction error has
115 occurred. All write operations will then fail until the
116 transaction is ended */
117 int transaction_error;
119 /* when inside a transaction we need to keep track of any
120 nested tdb_transaction_start() calls, as these are allowed,
121 but don't create a new transaction */
124 /* old file size before transaction */
125 tdb_len_t old_map_size;
130 read while in a transaction. We need to check first if the data is in our list
131 of transaction elements, then if not do a real read
133 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
134 tdb_len_t len, int cv)
138 /* break it down into block sized ops */
139 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
140 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
141 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
146 buf = (void *)(len2 + (char *)buf);
153 blk = off / tdb->transaction->block_size;
155 /* see if we have it in the block list */
156 if (tdb->transaction->num_blocks <= blk ||
157 tdb->transaction->blocks[blk] == NULL) {
158 /* nope, do a real read */
159 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
165 /* it is in the block list. Now check for the last block */
166 if (blk == tdb->transaction->num_blocks-1) {
167 if (len > tdb->transaction->last_block_size) {
172 /* now copy it out of this block */
173 memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
175 tdb_convert(buf, len);
180 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
181 tdb->ecode = TDB_ERR_IO;
182 tdb->transaction->transaction_error = 1;
188 write while in a transaction
190 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
191 const void *buf, tdb_len_t len)
195 /* if the write is to a hash head, then update the transaction
197 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
198 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
199 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
200 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
203 /* break it up into block sized chunks */
204 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
205 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
206 if (transaction_write(tdb, off, buf, len2) != 0) {
212 buf = (const void *)(len2 + (const char *)buf);
220 blk = off / tdb->transaction->block_size;
221 off = off % tdb->transaction->block_size;
223 if (tdb->transaction->num_blocks <= blk) {
224 uint8_t **new_blocks;
225 /* expand the blocks array */
226 if (tdb->transaction->blocks == NULL) {
227 new_blocks = (uint8_t **)malloc(
228 (blk+1)*sizeof(uint8_t *));
230 new_blocks = (uint8_t **)realloc(
231 tdb->transaction->blocks,
232 (blk+1)*sizeof(uint8_t *));
234 if (new_blocks == NULL) {
235 tdb->ecode = TDB_ERR_OOM;
238 memset(&new_blocks[tdb->transaction->num_blocks], 0,
239 (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
240 tdb->transaction->blocks = new_blocks;
241 tdb->transaction->num_blocks = blk+1;
242 tdb->transaction->last_block_size = 0;
245 /* allocate and fill a block? */
246 if (tdb->transaction->blocks[blk] == NULL) {
247 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
248 if (tdb->transaction->blocks[blk] == NULL) {
249 tdb->ecode = TDB_ERR_OOM;
250 tdb->transaction->transaction_error = 1;
253 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
254 tdb_len_t len2 = tdb->transaction->block_size;
255 if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
256 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
258 if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
259 tdb->transaction->blocks[blk],
261 SAFE_FREE(tdb->transaction->blocks[blk]);
262 tdb->ecode = TDB_ERR_IO;
265 if (blk == tdb->transaction->num_blocks-1) {
266 tdb->transaction->last_block_size = len2;
271 /* overwrite part of an existing block */
273 memset(tdb->transaction->blocks[blk] + off, 0, len);
275 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
277 if (blk == tdb->transaction->num_blocks-1) {
278 if (len + off > tdb->transaction->last_block_size) {
279 tdb->transaction->last_block_size = len + off;
286 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n",
287 (blk*tdb->transaction->block_size) + off, len));
288 tdb->transaction->transaction_error = 1;
294 write while in a transaction - this varient never expands the transaction blocks, it only
295 updates existing blocks. This means it cannot change the recovery size
297 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
298 const void *buf, tdb_len_t len)
302 /* break it up into block sized chunks */
303 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
304 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
305 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
311 buf = (const void *)(len2 + (const char *)buf);
319 blk = off / tdb->transaction->block_size;
320 off = off % tdb->transaction->block_size;
322 if (tdb->transaction->num_blocks <= blk ||
323 tdb->transaction->blocks[blk] == NULL) {
327 if (blk == tdb->transaction->num_blocks-1 &&
328 off + len > tdb->transaction->last_block_size) {
329 if (off >= tdb->transaction->last_block_size) {
332 len = tdb->transaction->last_block_size - off;
335 /* overwrite part of an existing block */
336 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
343 accelerated hash chain head search, using the cached hash heads
345 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
348 for (;h < tdb->header.hash_size;h++) {
349 /* the +1 takes account of the freelist */
350 if (0 != tdb->transaction->hash_heads[h+1]) {
358 out of bounds check during a transaction
360 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
362 if (len <= tdb->map_size) {
365 return TDB_ERRCODE(TDB_ERR_IO, -1);
369 transaction version of tdb_expand().
371 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
374 /* add a write to the transaction elements, so subsequent
375 reads see the zero data */
376 if (transaction_write(tdb, size, NULL, addition) != 0) {
384 brlock during a transaction - ignore them
386 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
387 int rw_type, int lck_type, int probe, size_t len)
392 static const struct tdb_methods transaction_methods = {
395 transaction_next_hash_chain,
397 transaction_expand_file,
403 start a tdb transaction. No token is returned, as only a single
404 transaction is allowed to be pending per tdb_context
406 int tdb_transaction_start(struct tdb_context *tdb)
408 /* some sanity checks */
409 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
410 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
411 tdb->ecode = TDB_ERR_EINVAL;
415 /* cope with nested tdb_transaction_start() calls */
416 if (tdb->transaction != NULL) {
417 if (!tdb->flags & TDB_NO_NESTING) {
418 tdb->transaction->nesting++;
419 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
420 tdb->transaction->nesting));
423 tdb_transaction_cancel(tdb);
424 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
428 if (tdb->num_locks != 0 || tdb->global_lock.count) {
429 /* the caller must not have any locks when starting a
430 transaction as otherwise we'll be screwed by lack
431 of nested locks in posix */
432 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
433 tdb->ecode = TDB_ERR_LOCK;
437 if (tdb->travlocks.next != NULL) {
438 /* you cannot use transactions inside a traverse (although you can use
439 traverse inside a transaction) as otherwise you can end up with
441 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
442 tdb->ecode = TDB_ERR_LOCK;
446 tdb->transaction = (struct tdb_transaction *)
447 calloc(sizeof(struct tdb_transaction), 1);
448 if (tdb->transaction == NULL) {
449 tdb->ecode = TDB_ERR_OOM;
453 /* a page at a time seems like a reasonable compromise between compactness and efficiency */
454 tdb->transaction->block_size = tdb->page_size;
456 /* get the transaction write lock. This is a blocking lock. As
457 discussed with Volker, there are a number of ways we could
458 make this async, which we will probably do in the future */
459 if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
460 SAFE_FREE(tdb->transaction->blocks);
461 SAFE_FREE(tdb->transaction);
465 /* get a read lock from the freelist to the end of file. This
466 is upgraded to a write lock during the commit */
467 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
468 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
469 tdb->ecode = TDB_ERR_LOCK;
473 /* setup a copy of the hash table heads so the hash scan in
474 traverse can be fast */
475 tdb->transaction->hash_heads = (uint32_t *)
476 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
477 if (tdb->transaction->hash_heads == NULL) {
478 tdb->ecode = TDB_ERR_OOM;
481 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
482 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
483 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
484 tdb->ecode = TDB_ERR_IO;
488 /* make sure we know about any file expansions already done by
490 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
491 tdb->transaction->old_map_size = tdb->map_size;
493 /* finally hook the io methods, replacing them with
494 transaction specific methods */
495 tdb->transaction->io_methods = tdb->methods;
496 tdb->methods = &transaction_methods;
501 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
502 tdb_transaction_unlock(tdb);
503 SAFE_FREE(tdb->transaction->blocks);
504 SAFE_FREE(tdb->transaction->hash_heads);
505 SAFE_FREE(tdb->transaction);
511 cancel the current transaction
513 int tdb_transaction_cancel(struct tdb_context *tdb)
517 if (tdb->transaction == NULL) {
518 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
522 if (tdb->transaction->nesting != 0) {
523 tdb->transaction->transaction_error = 1;
524 tdb->transaction->nesting--;
528 tdb->map_size = tdb->transaction->old_map_size;
530 /* free all the transaction blocks */
531 for (i=0;i<tdb->transaction->num_blocks;i++) {
532 if (tdb->transaction->blocks[i] != NULL) {
533 free(tdb->transaction->blocks[i]);
536 SAFE_FREE(tdb->transaction->blocks);
538 /* remove any global lock created during the transaction */
539 if (tdb->global_lock.count != 0) {
540 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
541 tdb->global_lock.count = 0;
544 /* remove any locks created during the transaction */
545 if (tdb->num_locks != 0) {
546 for (i=0;i<tdb->num_lockrecs;i++) {
547 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
548 F_UNLCK,F_SETLKW, 0, 1);
551 tdb->num_lockrecs = 0;
552 SAFE_FREE(tdb->lockrecs);
555 /* restore the normal io methods */
556 tdb->methods = tdb->transaction->io_methods;
558 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
559 tdb_transaction_unlock(tdb);
560 SAFE_FREE(tdb->transaction->hash_heads);
561 SAFE_FREE(tdb->transaction);
569 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
571 if (fsync(tdb->fd) != 0) {
572 tdb->ecode = TDB_ERR_IO;
573 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
578 tdb_off_t moffset = offset & ~(tdb->page_size-1);
579 if (msync(moffset + (char *)tdb->map_ptr,
580 length + (offset - moffset), MS_SYNC) != 0) {
581 tdb->ecode = TDB_ERR_IO;
582 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
593 work out how much space the linearised recovery data will consume
595 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
597 tdb_len_t recovery_size = 0;
600 recovery_size = sizeof(uint32_t);
601 for (i=0;i<tdb->transaction->num_blocks;i++) {
602 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
605 if (tdb->transaction->blocks[i] == NULL) {
608 recovery_size += 2*sizeof(tdb_off_t);
609 if (i == tdb->transaction->num_blocks-1) {
610 recovery_size += tdb->transaction->last_block_size;
612 recovery_size += tdb->transaction->block_size;
616 return recovery_size;
620 allocate the recovery area, or use an existing recovery area if it is
623 static int tdb_recovery_allocate(struct tdb_context *tdb,
624 tdb_len_t *recovery_size,
625 tdb_off_t *recovery_offset,
626 tdb_len_t *recovery_max_size)
628 struct list_struct rec;
629 const struct tdb_methods *methods = tdb->transaction->io_methods;
630 tdb_off_t recovery_head;
632 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
633 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
639 if (recovery_head != 0 &&
640 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
641 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
645 *recovery_size = tdb_recovery_size(tdb);
647 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
648 /* it fits in the existing area */
649 *recovery_max_size = rec.rec_len;
650 *recovery_offset = recovery_head;
654 /* we need to free up the old recovery area, then allocate a
655 new one at the end of the file. Note that we cannot use
656 tdb_allocate() to allocate the new one as that might return
657 us an area that is being currently used (as of the start of
659 if (recovery_head != 0) {
660 if (tdb_free(tdb, recovery_head, &rec) == -1) {
661 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
666 /* the tdb_free() call might have increased the recovery size */
667 *recovery_size = tdb_recovery_size(tdb);
669 /* round up to a multiple of page size */
670 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
671 *recovery_offset = tdb->map_size;
672 recovery_head = *recovery_offset;
674 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
675 (tdb->map_size - tdb->transaction->old_map_size) +
676 sizeof(rec) + *recovery_max_size) == -1) {
677 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
681 /* remap the file (if using mmap) */
682 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
684 /* we have to reset the old map size so that we don't try to expand the file
685 again in the transaction commit, which would destroy the recovery area */
686 tdb->transaction->old_map_size = tdb->map_size;
688 /* write the recovery header offset and sync - we can sync without a race here
689 as the magic ptr in the recovery record has not been set */
690 CONVERT(recovery_head);
691 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
692 &recovery_head, sizeof(tdb_off_t)) == -1) {
693 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
696 if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
697 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
706 setup the recovery data that will be used on a crash during commit
708 static int transaction_setup_recovery(struct tdb_context *tdb,
709 tdb_off_t *magic_offset)
711 tdb_len_t recovery_size;
712 unsigned char *data, *p;
713 const struct tdb_methods *methods = tdb->transaction->io_methods;
714 struct list_struct *rec;
715 tdb_off_t recovery_offset, recovery_max_size;
716 tdb_off_t old_map_size = tdb->transaction->old_map_size;
717 uint32_t magic, tailer;
721 check that the recovery area has enough space
723 if (tdb_recovery_allocate(tdb, &recovery_size,
724 &recovery_offset, &recovery_max_size) == -1) {
728 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
730 tdb->ecode = TDB_ERR_OOM;
734 rec = (struct list_struct *)data;
735 memset(rec, 0, sizeof(*rec));
738 rec->data_len = recovery_size;
739 rec->rec_len = recovery_max_size;
740 rec->key_len = old_map_size;
743 /* build the recovery data into a single blob to allow us to do a single
744 large write, which should be more efficient */
745 p = data + sizeof(*rec);
746 for (i=0;i<tdb->transaction->num_blocks;i++) {
750 if (tdb->transaction->blocks[i] == NULL) {
754 offset = i * tdb->transaction->block_size;
755 length = tdb->transaction->block_size;
756 if (i == tdb->transaction->num_blocks-1) {
757 length = tdb->transaction->last_block_size;
760 if (offset >= old_map_size) {
763 if (offset + length > tdb->transaction->old_map_size) {
764 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
766 tdb->ecode = TDB_ERR_CORRUPT;
769 memcpy(p, &offset, 4);
770 memcpy(p+4, &length, 4);
774 /* the recovery area contains the old data, not the
775 new data, so we have to call the original tdb_read
777 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
779 tdb->ecode = TDB_ERR_IO;
786 tailer = sizeof(*rec) + recovery_max_size;
787 memcpy(p, &tailer, 4);
790 /* write the recovery data to the recovery area */
791 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
792 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
794 tdb->ecode = TDB_ERR_IO;
797 if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
798 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
800 tdb->ecode = TDB_ERR_IO;
804 /* as we don't have ordered writes, we have to sync the recovery
805 data before we update the magic to indicate that the recovery
807 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
814 magic = TDB_RECOVERY_MAGIC;
817 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
819 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
820 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
821 tdb->ecode = TDB_ERR_IO;
824 if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
825 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
826 tdb->ecode = TDB_ERR_IO;
830 /* ensure the recovery magic marker is on disk */
831 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
839 commit the current transaction
841 int tdb_transaction_commit(struct tdb_context *tdb)
843 const struct tdb_methods *methods;
844 tdb_off_t magic_offset = 0;
848 if (tdb->transaction == NULL) {
849 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
853 if (tdb->transaction->transaction_error) {
854 tdb->ecode = TDB_ERR_IO;
855 tdb_transaction_cancel(tdb);
856 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
861 if (tdb->transaction->nesting != 0) {
862 tdb->transaction->nesting--;
866 /* check for a null transaction */
867 if (tdb->transaction->blocks == NULL) {
868 tdb_transaction_cancel(tdb);
872 methods = tdb->transaction->io_methods;
874 /* if there are any locks pending then the caller has not
875 nested their locks properly, so fail the transaction */
876 if (tdb->num_locks || tdb->global_lock.count) {
877 tdb->ecode = TDB_ERR_LOCK;
878 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
879 tdb_transaction_cancel(tdb);
883 /* upgrade the main transaction lock region to a write lock */
884 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
885 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
886 tdb->ecode = TDB_ERR_LOCK;
887 tdb_transaction_cancel(tdb);
891 /* get the global lock - this prevents new users attaching to the database
893 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
894 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
895 tdb->ecode = TDB_ERR_LOCK;
896 tdb_transaction_cancel(tdb);
900 if (!(tdb->flags & TDB_NOSYNC)) {
901 /* write the recovery data to the end of the file */
902 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
903 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
904 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
905 tdb_transaction_cancel(tdb);
910 /* expand the file to the new size if needed */
911 if (tdb->map_size != tdb->transaction->old_map_size) {
912 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
914 tdb->transaction->old_map_size) == -1) {
915 tdb->ecode = TDB_ERR_IO;
916 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
917 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
918 tdb_transaction_cancel(tdb);
921 tdb->map_size = tdb->transaction->old_map_size;
922 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
925 /* perform all the writes */
926 for (i=0;i<tdb->transaction->num_blocks;i++) {
930 if (tdb->transaction->blocks[i] == NULL) {
934 offset = i * tdb->transaction->block_size;
935 length = tdb->transaction->block_size;
936 if (i == tdb->transaction->num_blocks-1) {
937 length = tdb->transaction->last_block_size;
940 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
941 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
943 /* we've overwritten part of the data and
944 possibly expanded the file, so we need to
945 run the crash recovery code */
946 tdb->methods = methods;
947 tdb_transaction_recover(tdb);
949 tdb_transaction_cancel(tdb);
950 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
952 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
955 SAFE_FREE(tdb->transaction->blocks[i]);
958 SAFE_FREE(tdb->transaction->blocks);
959 tdb->transaction->num_blocks = 0;
961 if (!(tdb->flags & TDB_NOSYNC)) {
962 /* ensure the new data is on disk */
963 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
967 /* remove the recovery marker */
968 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
969 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
973 /* ensure the recovery marker has been removed on disk */
974 if (transaction_sync(tdb, magic_offset, 4) == -1) {
979 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
982 TODO: maybe write to some dummy hdr field, or write to magic
983 offset without mmap, before the last sync, instead of the
987 /* on some systems (like Linux 2.6.x) changes via mmap/msync
988 don't change the mtime of the file, this means the file may
989 not be backed up (as tdb rounding to block sizes means that
990 file size changes are quite rare too). The following forces
991 mtime changes when a transaction completes */
993 utime(tdb->name, NULL);
996 /* use a transaction cancel to free memory and remove the
998 tdb_transaction_cancel(tdb);
1005 recover from an aborted transaction. Must be called with exclusive
1006 database write access already established (including the global
1007 lock to prevent new processes attaching)
1009 int tdb_transaction_recover(struct tdb_context *tdb)
1011 tdb_off_t recovery_head, recovery_eof;
1012 unsigned char *data, *p;
1014 struct list_struct rec;
1016 /* find the recovery area */
1017 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1018 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1019 tdb->ecode = TDB_ERR_IO;
1023 if (recovery_head == 0) {
1024 /* we have never allocated a recovery record */
1028 /* read the recovery record */
1029 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1030 sizeof(rec), DOCONV()) == -1) {
1031 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1032 tdb->ecode = TDB_ERR_IO;
1036 if (rec.magic != TDB_RECOVERY_MAGIC) {
1037 /* there is no valid recovery data */
1041 if (tdb->read_only) {
1042 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1043 tdb->ecode = TDB_ERR_CORRUPT;
1047 recovery_eof = rec.key_len;
1049 data = (unsigned char *)malloc(rec.data_len);
1051 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1052 tdb->ecode = TDB_ERR_OOM;
1056 /* read the full recovery data */
1057 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1058 rec.data_len, 0) == -1) {
1059 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1060 tdb->ecode = TDB_ERR_IO;
1064 /* recover the file data */
1066 while (p+8 < data + rec.data_len) {
1072 memcpy(&len, p+4, 4);
1074 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1076 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1077 tdb->ecode = TDB_ERR_IO;
1085 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1086 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1087 tdb->ecode = TDB_ERR_IO;
1091 /* if the recovery area is after the recovered eof then remove it */
1092 if (recovery_eof <= recovery_head) {
1093 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1094 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1095 tdb->ecode = TDB_ERR_IO;
1100 /* remove the recovery magic */
1101 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1103 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1104 tdb->ecode = TDB_ERR_IO;
1108 /* reduce the file size to the old size */
1110 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1111 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1112 tdb->ecode = TDB_ERR_IO;
1115 tdb->map_size = recovery_eof;
1118 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1119 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1120 tdb->ecode = TDB_ERR_IO;
1124 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",