2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 #include "tdb_private.h"
31 - only allow a single transaction at a time per database. This makes
32 using the transaction API simpler, as otherwise the caller would
33 have to cope with temporary failures in transactions that conflict
34 with other current transactions
36 - keep the transaction recovery information in the same file as the
37 database, using a special 'transaction recovery' record pointed at
38 by the header. This removes the need for extra journal files as
39 used by some other databases
41 - dynamically allocated the transaction recover record, re-using it
42 for subsequent transactions. If a larger record is needed then
43 tdb_free() the old record to place it on the normal tdb freelist
44 before allocating the new record
46 - during transactions, keep a linked list of writes all that have
47 been performed by intercepting all tdb_write() calls. The hooked
48 transaction versions of tdb_read() and tdb_write() check this
49 linked list and try to use the elements of the list in preference
52 - don't allow any locks to be held when a transaction starts,
53 otherwise we can end up with deadlock (plus lack of lock nesting
54 in posix locks would mean the lock is lost)
56 - if the caller gains a lock during the transaction but doesn't
57 release it then fail the commit
59 - allow for nested calls to tdb_transaction_start(), re-using the
60 existing transaction record. If the inner transaction is cancelled
61 then a subsequent commit will fail
63 - keep a mirrored copy of the tdb hash chain heads to allow for the
64 fast hash heads scan on traverse, updating the mirrored copy in
65 the transaction version of tdb_write
67 - allow callers to mix transaction and non-transaction use of tdb,
68 although once a transaction is started then an exclusive lock is
69 gained until the transaction is committed or cancelled
71 - the commit stategy involves first saving away all modified data
72 into a linearised buffer in the transaction recovery area, then
73 marking the transaction recovery area with a magic value to
74 indicate a valid recovery record. In total 4 fsync/msync calls are
75 needed per commit to prevent race conditions. It might be possible
76 to reduce this to 3 or even 2 with some more work.
78 - check for a valid recovery record on open of the tdb, while the
79 global lock is held. Automatically recover from the transaction
80 recovery area if needed, then continue with the open as
81 usual. This allows for smooth crash recovery with no administrator
84 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85 still available, but no transaction recovery area is used and no
86 fsync/msync calls are made.
88 - if TDB_NO_NESTING is passed to flags in tdb open then transaction
89 nesting is disabled. tdb_transaction_start() will then implicitely
90 cancel any pending transactions and always start a new transaction
91 context instead of nesting.
97 hold the context of any current transaction
99 struct tdb_transaction {
100 /* we keep a mirrored copy of the tdb hash heads here so
101 tdb_next_hash_chain() can operate efficiently */
102 uint32_t *hash_heads;
104 /* the original io methods - used to do IOs to the real db */
105 const struct tdb_methods *io_methods;
107 /* the list of transaction blocks. When a block is first
108 written to, it gets created in this list */
111 uint32_t block_size; /* bytes in each block */
112 uint32_t last_block_size; /* number of valid bytes in the last block */
114 /* non-zero when an internal transaction error has
115 occurred. All write operations will then fail until the
116 transaction is ended */
117 int transaction_error;
119 /* when inside a transaction we need to keep track of any
120 nested tdb_transaction_start() calls, as these are allowed,
121 but don't create a new transaction */
124 /* old file size before transaction */
125 tdb_len_t old_map_size;
130 read while in a transaction. We need to check first if the data is in our list
131 of transaction elements, then if not do a real read
133 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
134 tdb_len_t len, int cv)
138 /* break it down into block sized ops */
139 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
140 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
141 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
146 buf = (void *)(len2 + (char *)buf);
153 blk = off / tdb->transaction->block_size;
155 /* see if we have it in the block list */
156 if (tdb->transaction->num_blocks <= blk ||
157 tdb->transaction->blocks[blk] == NULL) {
158 /* nope, do a real read */
159 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
165 /* it is in the block list. Now check for the last block */
166 if (blk == tdb->transaction->num_blocks-1) {
167 if (len > tdb->transaction->last_block_size) {
172 /* now copy it out of this block */
173 memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
175 tdb_convert(buf, len);
180 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
181 tdb->ecode = TDB_ERR_IO;
182 tdb->transaction->transaction_error = 1;
188 write while in a transaction
190 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
191 const void *buf, tdb_len_t len)
195 /* if the write is to a hash head, then update the transaction
197 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
198 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
199 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
200 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
203 /* break it up into block sized chunks */
204 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
205 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
206 if (transaction_write(tdb, off, buf, len2) != 0) {
212 buf = (const void *)(len2 + (const char *)buf);
220 blk = off / tdb->transaction->block_size;
221 off = off % tdb->transaction->block_size;
223 if (tdb->transaction->num_blocks <= blk) {
224 uint8_t **new_blocks;
225 /* expand the blocks array */
226 if (tdb->transaction->blocks == NULL) {
227 new_blocks = (uint8_t **)malloc(
228 (blk+1)*sizeof(uint8_t *));
230 new_blocks = (uint8_t **)realloc(
231 tdb->transaction->blocks,
232 (blk+1)*sizeof(uint8_t *));
234 if (new_blocks == NULL) {
235 tdb->ecode = TDB_ERR_OOM;
238 memset(&new_blocks[tdb->transaction->num_blocks], 0,
239 (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
240 tdb->transaction->blocks = new_blocks;
241 tdb->transaction->num_blocks = blk+1;
242 tdb->transaction->last_block_size = 0;
245 /* allocate and fill a block? */
246 if (tdb->transaction->blocks[blk] == NULL) {
247 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
248 if (tdb->transaction->blocks[blk] == NULL) {
249 tdb->ecode = TDB_ERR_OOM;
250 tdb->transaction->transaction_error = 1;
253 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
254 tdb_len_t len2 = tdb->transaction->block_size;
255 if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
256 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
258 if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
259 tdb->transaction->blocks[blk],
261 SAFE_FREE(tdb->transaction->blocks[blk]);
262 tdb->ecode = TDB_ERR_IO;
265 if (blk == tdb->transaction->num_blocks-1) {
266 tdb->transaction->last_block_size = len2;
271 /* overwrite part of an existing block */
273 memset(tdb->transaction->blocks[blk] + off, 0, len);
275 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
277 if (blk == tdb->transaction->num_blocks-1) {
278 if (len + off > tdb->transaction->last_block_size) {
279 tdb->transaction->last_block_size = len + off;
286 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n",
287 (blk*tdb->transaction->block_size) + off, len));
288 tdb->transaction->transaction_error = 1;
294 write while in a transaction - this varient never expands the transaction blocks, it only
295 updates existing blocks. This means it cannot change the recovery size
297 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
298 const void *buf, tdb_len_t len)
302 /* break it up into block sized chunks */
303 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
304 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
305 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
311 buf = (const void *)(len2 + (const char *)buf);
319 blk = off / tdb->transaction->block_size;
320 off = off % tdb->transaction->block_size;
322 if (tdb->transaction->num_blocks <= blk ||
323 tdb->transaction->blocks[blk] == NULL) {
327 if (blk == tdb->transaction->num_blocks-1 &&
328 off + len > tdb->transaction->last_block_size) {
329 if (off >= tdb->transaction->last_block_size) {
332 len = tdb->transaction->last_block_size - off;
335 /* overwrite part of an existing block */
336 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
343 accelerated hash chain head search, using the cached hash heads
345 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
348 for (;h < tdb->header.hash_size;h++) {
349 /* the +1 takes account of the freelist */
350 if (0 != tdb->transaction->hash_heads[h+1]) {
358 out of bounds check during a transaction
360 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
362 if (len <= tdb->map_size) {
365 return TDB_ERRCODE(TDB_ERR_IO, -1);
369 transaction version of tdb_expand().
371 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
374 /* add a write to the transaction elements, so subsequent
375 reads see the zero data */
376 if (transaction_write(tdb, size, NULL, addition) != 0) {
384 brlock during a transaction - ignore them
386 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
387 int rw_type, int lck_type, int probe, size_t len)
392 static const struct tdb_methods transaction_methods = {
395 transaction_next_hash_chain,
397 transaction_expand_file,
403 start a tdb transaction. No token is returned, as only a single
404 transaction is allowed to be pending per tdb_context
406 int tdb_transaction_start(struct tdb_context *tdb)
408 tdb_trace(tdb, "tdb_transaction_start\n");
410 /* some sanity checks */
411 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
412 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
413 tdb->ecode = TDB_ERR_EINVAL;
417 /* cope with nested tdb_transaction_start() calls */
418 if (tdb->transaction != NULL) {
419 if (!tdb->flags & TDB_NO_NESTING) {
420 tdb->transaction->nesting++;
421 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
422 tdb->transaction->nesting));
425 tdb_transaction_cancel(tdb);
426 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
430 if (tdb->num_locks != 0 || tdb->global_lock.count) {
431 /* the caller must not have any locks when starting a
432 transaction as otherwise we'll be screwed by lack
433 of nested locks in posix */
434 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
435 tdb->ecode = TDB_ERR_LOCK;
439 if (tdb->travlocks.next != NULL) {
440 /* you cannot use transactions inside a traverse (although you can use
441 traverse inside a transaction) as otherwise you can end up with
443 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
444 tdb->ecode = TDB_ERR_LOCK;
448 tdb->transaction = (struct tdb_transaction *)
449 calloc(sizeof(struct tdb_transaction), 1);
450 if (tdb->transaction == NULL) {
451 tdb->ecode = TDB_ERR_OOM;
455 /* a page at a time seems like a reasonable compromise between compactness and efficiency */
456 tdb->transaction->block_size = tdb->page_size;
458 /* get the transaction write lock. This is a blocking lock. As
459 discussed with Volker, there are a number of ways we could
460 make this async, which we will probably do in the future */
461 if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
462 SAFE_FREE(tdb->transaction->blocks);
463 SAFE_FREE(tdb->transaction);
467 /* get a read lock from the freelist to the end of file. This
468 is upgraded to a write lock during the commit */
469 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
470 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
471 tdb->ecode = TDB_ERR_LOCK;
475 /* setup a copy of the hash table heads so the hash scan in
476 traverse can be fast */
477 tdb->transaction->hash_heads = (uint32_t *)
478 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
479 if (tdb->transaction->hash_heads == NULL) {
480 tdb->ecode = TDB_ERR_OOM;
483 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
484 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
485 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
486 tdb->ecode = TDB_ERR_IO;
490 /* make sure we know about any file expansions already done by
492 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
493 tdb->transaction->old_map_size = tdb->map_size;
495 /* finally hook the io methods, replacing them with
496 transaction specific methods */
497 tdb->transaction->io_methods = tdb->methods;
498 tdb->methods = &transaction_methods;
503 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
504 tdb_transaction_unlock(tdb);
505 SAFE_FREE(tdb->transaction->blocks);
506 SAFE_FREE(tdb->transaction->hash_heads);
507 SAFE_FREE(tdb->transaction);
513 cancel the current transaction
515 int tdb_transaction_cancel(struct tdb_context *tdb)
519 tdb_trace(tdb, "tdb_transaction_cancel\n");
520 if (tdb->transaction == NULL) {
521 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
525 if (tdb->transaction->nesting != 0) {
526 tdb->transaction->transaction_error = 1;
527 tdb->transaction->nesting--;
531 tdb->map_size = tdb->transaction->old_map_size;
533 /* free all the transaction blocks */
534 for (i=0;i<tdb->transaction->num_blocks;i++) {
535 if (tdb->transaction->blocks[i] != NULL) {
536 free(tdb->transaction->blocks[i]);
539 SAFE_FREE(tdb->transaction->blocks);
541 /* remove any global lock created during the transaction */
542 if (tdb->global_lock.count != 0) {
543 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
544 tdb->global_lock.count = 0;
547 /* remove any locks created during the transaction */
548 if (tdb->num_locks != 0) {
549 for (i=0;i<tdb->num_lockrecs;i++) {
550 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
551 F_UNLCK,F_SETLKW, 0, 1);
554 tdb->num_lockrecs = 0;
555 SAFE_FREE(tdb->lockrecs);
558 /* restore the normal io methods */
559 tdb->methods = tdb->transaction->io_methods;
561 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
562 tdb_transaction_unlock(tdb);
563 SAFE_FREE(tdb->transaction->hash_heads);
564 SAFE_FREE(tdb->transaction);
572 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
574 if (fsync(tdb->fd) != 0) {
575 tdb->ecode = TDB_ERR_IO;
576 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
581 tdb_off_t moffset = offset & ~(tdb->page_size-1);
582 if (msync(moffset + (char *)tdb->map_ptr,
583 length + (offset - moffset), MS_SYNC) != 0) {
584 tdb->ecode = TDB_ERR_IO;
585 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
596 work out how much space the linearised recovery data will consume
598 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
600 tdb_len_t recovery_size = 0;
603 recovery_size = sizeof(uint32_t);
604 for (i=0;i<tdb->transaction->num_blocks;i++) {
605 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
608 if (tdb->transaction->blocks[i] == NULL) {
611 recovery_size += 2*sizeof(tdb_off_t);
612 if (i == tdb->transaction->num_blocks-1) {
613 recovery_size += tdb->transaction->last_block_size;
615 recovery_size += tdb->transaction->block_size;
619 return recovery_size;
623 allocate the recovery area, or use an existing recovery area if it is
626 static int tdb_recovery_allocate(struct tdb_context *tdb,
627 tdb_len_t *recovery_size,
628 tdb_off_t *recovery_offset,
629 tdb_len_t *recovery_max_size)
631 struct list_struct rec;
632 const struct tdb_methods *methods = tdb->transaction->io_methods;
633 tdb_off_t recovery_head;
635 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
636 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
642 if (recovery_head != 0 &&
643 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
644 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
648 *recovery_size = tdb_recovery_size(tdb);
650 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
651 /* it fits in the existing area */
652 *recovery_max_size = rec.rec_len;
653 *recovery_offset = recovery_head;
657 /* we need to free up the old recovery area, then allocate a
658 new one at the end of the file. Note that we cannot use
659 tdb_allocate() to allocate the new one as that might return
660 us an area that is being currently used (as of the start of
662 if (recovery_head != 0) {
663 if (tdb_free(tdb, recovery_head, &rec) == -1) {
664 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
669 /* the tdb_free() call might have increased the recovery size */
670 *recovery_size = tdb_recovery_size(tdb);
672 /* round up to a multiple of page size */
673 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
674 *recovery_offset = tdb->map_size;
675 recovery_head = *recovery_offset;
677 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
678 (tdb->map_size - tdb->transaction->old_map_size) +
679 sizeof(rec) + *recovery_max_size) == -1) {
680 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
684 /* remap the file (if using mmap) */
685 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
687 /* we have to reset the old map size so that we don't try to expand the file
688 again in the transaction commit, which would destroy the recovery area */
689 tdb->transaction->old_map_size = tdb->map_size;
691 /* write the recovery header offset and sync - we can sync without a race here
692 as the magic ptr in the recovery record has not been set */
693 CONVERT(recovery_head);
694 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
695 &recovery_head, sizeof(tdb_off_t)) == -1) {
696 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
699 if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
700 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
709 setup the recovery data that will be used on a crash during commit
711 static int transaction_setup_recovery(struct tdb_context *tdb,
712 tdb_off_t *magic_offset)
714 tdb_len_t recovery_size;
715 unsigned char *data, *p;
716 const struct tdb_methods *methods = tdb->transaction->io_methods;
717 struct list_struct *rec;
718 tdb_off_t recovery_offset, recovery_max_size;
719 tdb_off_t old_map_size = tdb->transaction->old_map_size;
720 uint32_t magic, tailer;
724 check that the recovery area has enough space
726 if (tdb_recovery_allocate(tdb, &recovery_size,
727 &recovery_offset, &recovery_max_size) == -1) {
731 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
733 tdb->ecode = TDB_ERR_OOM;
737 rec = (struct list_struct *)data;
738 memset(rec, 0, sizeof(*rec));
741 rec->data_len = recovery_size;
742 rec->rec_len = recovery_max_size;
743 rec->key_len = old_map_size;
746 /* build the recovery data into a single blob to allow us to do a single
747 large write, which should be more efficient */
748 p = data + sizeof(*rec);
749 for (i=0;i<tdb->transaction->num_blocks;i++) {
753 if (tdb->transaction->blocks[i] == NULL) {
757 offset = i * tdb->transaction->block_size;
758 length = tdb->transaction->block_size;
759 if (i == tdb->transaction->num_blocks-1) {
760 length = tdb->transaction->last_block_size;
763 if (offset >= old_map_size) {
766 if (offset + length > tdb->transaction->old_map_size) {
767 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
769 tdb->ecode = TDB_ERR_CORRUPT;
772 memcpy(p, &offset, 4);
773 memcpy(p+4, &length, 4);
777 /* the recovery area contains the old data, not the
778 new data, so we have to call the original tdb_read
780 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
782 tdb->ecode = TDB_ERR_IO;
789 tailer = sizeof(*rec) + recovery_max_size;
790 memcpy(p, &tailer, 4);
793 /* write the recovery data to the recovery area */
794 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
795 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
797 tdb->ecode = TDB_ERR_IO;
800 if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
801 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
803 tdb->ecode = TDB_ERR_IO;
807 /* as we don't have ordered writes, we have to sync the recovery
808 data before we update the magic to indicate that the recovery
810 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
817 magic = TDB_RECOVERY_MAGIC;
820 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
822 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
823 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
824 tdb->ecode = TDB_ERR_IO;
827 if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
828 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
829 tdb->ecode = TDB_ERR_IO;
833 /* ensure the recovery magic marker is on disk */
834 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
842 commit the current transaction
844 int tdb_transaction_commit(struct tdb_context *tdb)
846 const struct tdb_methods *methods;
847 tdb_off_t magic_offset = 0;
851 tdb_trace(tdb, "tdb_transaction_commit\n");
852 if (tdb->transaction == NULL) {
853 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
857 if (tdb->transaction->transaction_error) {
858 tdb->ecode = TDB_ERR_IO;
859 tdb_transaction_cancel(tdb);
860 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
865 if (tdb->transaction->nesting != 0) {
866 tdb->transaction->nesting--;
870 /* check for a null transaction */
871 if (tdb->transaction->blocks == NULL) {
872 tdb_transaction_cancel(tdb);
876 methods = tdb->transaction->io_methods;
878 /* if there are any locks pending then the caller has not
879 nested their locks properly, so fail the transaction */
880 if (tdb->num_locks || tdb->global_lock.count) {
881 tdb->ecode = TDB_ERR_LOCK;
882 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
883 tdb_transaction_cancel(tdb);
887 /* upgrade the main transaction lock region to a write lock */
888 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
889 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
890 tdb->ecode = TDB_ERR_LOCK;
891 tdb_transaction_cancel(tdb);
895 /* get the global lock - this prevents new users attaching to the database
897 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
898 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
899 tdb->ecode = TDB_ERR_LOCK;
900 tdb_transaction_cancel(tdb);
904 if (!(tdb->flags & TDB_NOSYNC)) {
905 /* write the recovery data to the end of the file */
906 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
907 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
908 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
909 tdb_transaction_cancel(tdb);
914 /* expand the file to the new size if needed */
915 if (tdb->map_size != tdb->transaction->old_map_size) {
916 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
918 tdb->transaction->old_map_size) == -1) {
919 tdb->ecode = TDB_ERR_IO;
920 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
921 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
922 tdb_transaction_cancel(tdb);
925 tdb->map_size = tdb->transaction->old_map_size;
926 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
929 /* perform all the writes */
930 for (i=0;i<tdb->transaction->num_blocks;i++) {
934 if (tdb->transaction->blocks[i] == NULL) {
938 offset = i * tdb->transaction->block_size;
939 length = tdb->transaction->block_size;
940 if (i == tdb->transaction->num_blocks-1) {
941 length = tdb->transaction->last_block_size;
944 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
945 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
947 /* we've overwritten part of the data and
948 possibly expanded the file, so we need to
949 run the crash recovery code */
950 tdb->methods = methods;
951 tdb_transaction_recover(tdb);
953 tdb_transaction_cancel(tdb);
954 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
956 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
959 SAFE_FREE(tdb->transaction->blocks[i]);
962 SAFE_FREE(tdb->transaction->blocks);
963 tdb->transaction->num_blocks = 0;
965 if (!(tdb->flags & TDB_NOSYNC)) {
966 /* ensure the new data is on disk */
967 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
971 /* remove the recovery marker */
972 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
973 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
977 /* ensure the recovery marker has been removed on disk */
978 if (transaction_sync(tdb, magic_offset, 4) == -1) {
983 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
986 TODO: maybe write to some dummy hdr field, or write to magic
987 offset without mmap, before the last sync, instead of the
991 /* on some systems (like Linux 2.6.x) changes via mmap/msync
992 don't change the mtime of the file, this means the file may
993 not be backed up (as tdb rounding to block sizes means that
994 file size changes are quite rare too). The following forces
995 mtime changes when a transaction completes */
997 utime(tdb->name, NULL);
1000 /* use a transaction cancel to free memory and remove the
1001 transaction locks */
1002 tdb_transaction_cancel(tdb);
1009 recover from an aborted transaction. Must be called with exclusive
1010 database write access already established (including the global
1011 lock to prevent new processes attaching)
1013 int tdb_transaction_recover(struct tdb_context *tdb)
1015 tdb_off_t recovery_head, recovery_eof;
1016 unsigned char *data, *p;
1018 struct list_struct rec;
1020 /* find the recovery area */
1021 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1022 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1023 tdb->ecode = TDB_ERR_IO;
1027 if (recovery_head == 0) {
1028 /* we have never allocated a recovery record */
1032 /* read the recovery record */
1033 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1034 sizeof(rec), DOCONV()) == -1) {
1035 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1036 tdb->ecode = TDB_ERR_IO;
1040 if (rec.magic != TDB_RECOVERY_MAGIC) {
1041 /* there is no valid recovery data */
1045 if (tdb->read_only) {
1046 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1047 tdb->ecode = TDB_ERR_CORRUPT;
1051 recovery_eof = rec.key_len;
1053 data = (unsigned char *)malloc(rec.data_len);
1055 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1056 tdb->ecode = TDB_ERR_OOM;
1060 /* read the full recovery data */
1061 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1062 rec.data_len, 0) == -1) {
1063 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1064 tdb->ecode = TDB_ERR_IO;
1068 /* recover the file data */
1070 while (p+8 < data + rec.data_len) {
1076 memcpy(&len, p+4, 4);
1078 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1080 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1081 tdb->ecode = TDB_ERR_IO;
1089 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1090 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1091 tdb->ecode = TDB_ERR_IO;
1095 /* if the recovery area is after the recovered eof then remove it */
1096 if (recovery_eof <= recovery_head) {
1097 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1098 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1099 tdb->ecode = TDB_ERR_IO;
1104 /* remove the recovery magic */
1105 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1107 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1108 tdb->ecode = TDB_ERR_IO;
1112 /* reduce the file size to the old size */
1114 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1115 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1116 tdb->ecode = TDB_ERR_IO;
1119 tdb->map_size = recovery_eof;
1122 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1123 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1124 tdb->ecode = TDB_ERR_IO;
1128 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",