2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 #include "tdb_private.h"
31 - only allow a single transaction at a time per database. This makes
32 using the transaction API simpler, as otherwise the caller would
33 have to cope with temporary failures in transactions that conflict
34 with other current transactions
36 - keep the transaction recovery information in the same file as the
37 database, using a special 'transaction recovery' record pointed at
38 by the header. This removes the need for extra journal files as
39 used by some other databases
41 - dynamically allocated the transaction recover record, re-using it
42 for subsequent transactions. If a larger record is needed then
43 tdb_free() the old record to place it on the normal tdb freelist
44 before allocating the new record
46 - during transactions, keep a linked list of writes all that have
47 been performed by intercepting all tdb_write() calls. The hooked
48 transaction versions of tdb_read() and tdb_write() check this
49 linked list and try to use the elements of the list in preference
52 - don't allow any locks to be held when a transaction starts,
53 otherwise we can end up with deadlock (plus lack of lock nesting
54 in posix locks would mean the lock is lost)
56 - if the caller gains a lock during the transaction but doesn't
57 release it then fail the commit
59 - allow for nested calls to tdb_transaction_start(), re-using the
60 existing transaction record. If the inner transaction is cancelled
61 then a subsequent commit will fail
63 - keep a mirrored copy of the tdb hash chain heads to allow for the
64 fast hash heads scan on traverse, updating the mirrored copy in
65 the transaction version of tdb_write
67 - allow callers to mix transaction and non-transaction use of tdb,
68 although once a transaction is started then an exclusive lock is
69 gained until the transaction is committed or cancelled
71 - the commit stategy involves first saving away all modified data
72 into a linearised buffer in the transaction recovery area, then
73 marking the transaction recovery area with a magic value to
74 indicate a valid recovery record. In total 4 fsync/msync calls are
75 needed per commit to prevent race conditions. It might be possible
76 to reduce this to 3 or even 2 with some more work.
78 - check for a valid recovery record on open of the tdb, while the
79 global lock is held. Automatically recover from the transaction
80 recovery area if needed, then continue with the open as
81 usual. This allows for smooth crash recovery with no administrator
84 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85 still available, but no transaction recovery area is used and no
86 fsync/msync calls are made.
88 - if TDB_NO_NESTING is passed to flags in tdb open then transaction
89 nesting is disabled. tdb_transaction_start() will then implicitely
90 cancel any pending transactions and always start a new transaction
91 context instead of nesting.
97 hold the context of any current transaction
99 struct tdb_transaction {
100 /* we keep a mirrored copy of the tdb hash heads here so
101 tdb_next_hash_chain() can operate efficiently */
102 uint32_t *hash_heads;
104 /* the original io methods - used to do IOs to the real db */
105 const struct tdb_methods *io_methods;
107 /* the list of transaction blocks. When a block is first
108 written to, it gets created in this list */
111 uint32_t block_size; /* bytes in each block */
112 uint32_t last_block_size; /* number of valid bytes in the last block */
114 /* non-zero when an internal transaction error has
115 occurred. All write operations will then fail until the
116 transaction is ended */
117 int transaction_error;
119 /* when inside a transaction we need to keep track of any
120 nested tdb_transaction_start() calls, as these are allowed,
121 but don't create a new transaction */
124 /* old file size before transaction */
125 tdb_len_t old_map_size;
130 read while in a transaction. We need to check first if the data is in our list
131 of transaction elements, then if not do a real read
133 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
134 tdb_len_t len, int cv)
138 /* break it down into block sized ops */
139 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
140 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
141 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
146 buf = (void *)(len2 + (char *)buf);
153 blk = off / tdb->transaction->block_size;
155 /* see if we have it in the block list */
156 if (tdb->transaction->num_blocks <= blk ||
157 tdb->transaction->blocks[blk] == NULL) {
158 /* nope, do a real read */
159 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
165 /* it is in the block list. Now check for the last block */
166 if (blk == tdb->transaction->num_blocks-1) {
167 if (len > tdb->transaction->last_block_size) {
172 /* now copy it out of this block */
173 memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
175 tdb_convert(buf, len);
180 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
181 tdb->ecode = TDB_ERR_IO;
182 tdb->transaction->transaction_error = 1;
188 write while in a transaction
190 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
191 const void *buf, tdb_len_t len)
195 /* if the write is to a hash head, then update the transaction
197 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
198 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
199 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
200 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
203 /* break it up into block sized chunks */
204 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
205 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
206 if (transaction_write(tdb, off, buf, len2) != 0) {
212 buf = (const void *)(len2 + (const char *)buf);
220 blk = off / tdb->transaction->block_size;
221 off = off % tdb->transaction->block_size;
223 if (tdb->transaction->num_blocks <= blk) {
224 uint8_t **new_blocks;
225 /* expand the blocks array */
226 if (tdb->transaction->blocks == NULL) {
227 new_blocks = (uint8_t **)malloc(
228 (blk+1)*sizeof(uint8_t *));
230 new_blocks = (uint8_t **)realloc(
231 tdb->transaction->blocks,
232 (blk+1)*sizeof(uint8_t *));
234 if (new_blocks == NULL) {
235 tdb->ecode = TDB_ERR_OOM;
238 memset(&new_blocks[tdb->transaction->num_blocks], 0,
239 (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
240 tdb->transaction->blocks = new_blocks;
241 tdb->transaction->num_blocks = blk+1;
242 tdb->transaction->last_block_size = 0;
245 /* allocate and fill a block? */
246 if (tdb->transaction->blocks[blk] == NULL) {
247 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
248 if (tdb->transaction->blocks[blk] == NULL) {
249 tdb->ecode = TDB_ERR_OOM;
250 tdb->transaction->transaction_error = 1;
253 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
254 tdb_len_t len2 = tdb->transaction->block_size;
255 if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
256 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
258 if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
259 tdb->transaction->blocks[blk],
261 SAFE_FREE(tdb->transaction->blocks[blk]);
262 tdb->ecode = TDB_ERR_IO;
265 if (blk == tdb->transaction->num_blocks-1) {
266 tdb->transaction->last_block_size = len2;
271 /* overwrite part of an existing block */
273 memset(tdb->transaction->blocks[blk] + off, 0, len);
275 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
277 if (blk == tdb->transaction->num_blocks-1) {
278 if (len + off > tdb->transaction->last_block_size) {
279 tdb->transaction->last_block_size = len + off;
286 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n",
287 (blk*tdb->transaction->block_size) + off, len));
288 tdb->transaction->transaction_error = 1;
294 write while in a transaction - this varient never expands the transaction blocks, it only
295 updates existing blocks. This means it cannot change the recovery size
297 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
298 const void *buf, tdb_len_t len)
302 /* break it up into block sized chunks */
303 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
304 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
305 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
311 buf = (const void *)(len2 + (const char *)buf);
319 blk = off / tdb->transaction->block_size;
320 off = off % tdb->transaction->block_size;
322 if (tdb->transaction->num_blocks <= blk ||
323 tdb->transaction->blocks[blk] == NULL) {
327 if (blk == tdb->transaction->num_blocks-1 &&
328 off + len > tdb->transaction->last_block_size) {
329 if (off >= tdb->transaction->last_block_size) {
332 len = tdb->transaction->last_block_size - off;
335 /* overwrite part of an existing block */
336 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
343 accelerated hash chain head search, using the cached hash heads
345 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
348 for (;h < tdb->header.hash_size;h++) {
349 /* the +1 takes account of the freelist */
350 if (0 != tdb->transaction->hash_heads[h+1]) {
358 out of bounds check during a transaction
360 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
362 if (len <= tdb->map_size) {
365 return TDB_ERRCODE(TDB_ERR_IO, -1);
369 transaction version of tdb_expand().
371 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
374 /* add a write to the transaction elements, so subsequent
375 reads see the zero data */
376 if (transaction_write(tdb, size, NULL, addition) != 0) {
384 brlock during a transaction - ignore them
386 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
387 int rw_type, int lck_type, int probe, size_t len)
392 static const struct tdb_methods transaction_methods = {
395 transaction_next_hash_chain,
397 transaction_expand_file,
401 int tdb_transaction_cancel_internal(struct tdb_context *tdb)
405 if (tdb->transaction == NULL) {
406 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
410 if (tdb->transaction->nesting != 0) {
411 tdb->transaction->transaction_error = 1;
412 tdb->transaction->nesting--;
416 tdb->map_size = tdb->transaction->old_map_size;
418 /* free all the transaction blocks */
419 for (i=0;i<tdb->transaction->num_blocks;i++) {
420 if (tdb->transaction->blocks[i] != NULL) {
421 free(tdb->transaction->blocks[i]);
424 SAFE_FREE(tdb->transaction->blocks);
426 /* remove any global lock created during the transaction */
427 if (tdb->global_lock.count != 0) {
428 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
429 tdb->global_lock.count = 0;
432 /* remove any locks created during the transaction */
433 if (tdb->num_locks != 0) {
434 for (i=0;i<tdb->num_lockrecs;i++) {
435 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
436 F_UNLCK,F_SETLKW, 0, 1);
439 tdb->num_lockrecs = 0;
440 SAFE_FREE(tdb->lockrecs);
443 /* restore the normal io methods */
444 tdb->methods = tdb->transaction->io_methods;
446 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
447 tdb_transaction_unlock(tdb);
448 SAFE_FREE(tdb->transaction->hash_heads);
449 SAFE_FREE(tdb->transaction);
455 start a tdb transaction. No token is returned, as only a single
456 transaction is allowed to be pending per tdb_context
458 int tdb_transaction_start(struct tdb_context *tdb)
460 /* some sanity checks */
461 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
462 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
463 tdb->ecode = TDB_ERR_EINVAL;
467 /* cope with nested tdb_transaction_start() calls */
468 if (tdb->transaction != NULL) {
469 tdb_trace(tdb, "tdb_transaction_start");
470 if (!tdb->flags & TDB_NO_NESTING) {
471 tdb->transaction->nesting++;
472 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
473 tdb->transaction->nesting));
476 tdb_transaction_cancel_internal(tdb);
477 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
481 if (tdb->num_locks != 0 || tdb->global_lock.count) {
482 /* the caller must not have any locks when starting a
483 transaction as otherwise we'll be screwed by lack
484 of nested locks in posix */
485 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
486 tdb->ecode = TDB_ERR_LOCK;
490 if (tdb->travlocks.next != NULL) {
491 /* you cannot use transactions inside a traverse (although you can use
492 traverse inside a transaction) as otherwise you can end up with
494 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
495 tdb->ecode = TDB_ERR_LOCK;
499 tdb->transaction = (struct tdb_transaction *)
500 calloc(sizeof(struct tdb_transaction), 1);
501 if (tdb->transaction == NULL) {
502 tdb->ecode = TDB_ERR_OOM;
506 /* a page at a time seems like a reasonable compromise between compactness and efficiency */
507 tdb->transaction->block_size = tdb->page_size;
509 /* get the transaction write lock. This is a blocking lock. As
510 discussed with Volker, there are a number of ways we could
511 make this async, which we will probably do in the future */
512 if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
513 SAFE_FREE(tdb->transaction->blocks);
514 SAFE_FREE(tdb->transaction);
518 /* get a read lock from the freelist to the end of file. This
519 is upgraded to a write lock during the commit */
520 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
521 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
522 tdb->ecode = TDB_ERR_LOCK;
526 /* setup a copy of the hash table heads so the hash scan in
527 traverse can be fast */
528 tdb->transaction->hash_heads = (uint32_t *)
529 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
530 if (tdb->transaction->hash_heads == NULL) {
531 tdb->ecode = TDB_ERR_OOM;
534 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
535 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
536 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
537 tdb->ecode = TDB_ERR_IO;
541 /* make sure we know about any file expansions already done by
543 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
544 tdb->transaction->old_map_size = tdb->map_size;
546 /* finally hook the io methods, replacing them with
547 transaction specific methods */
548 tdb->transaction->io_methods = tdb->methods;
549 tdb->methods = &transaction_methods;
551 /* Trace at the end, so we get sequence number correct. */
552 tdb_trace(tdb, "tdb_transaction_start");
556 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
557 tdb_transaction_unlock(tdb);
558 SAFE_FREE(tdb->transaction->blocks);
559 SAFE_FREE(tdb->transaction->hash_heads);
560 SAFE_FREE(tdb->transaction);
566 cancel the current transaction
568 int tdb_transaction_cancel(struct tdb_context *tdb)
570 tdb_trace(tdb, "tdb_transaction_cancel");
571 return tdb_transaction_cancel_internal(tdb);
576 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
578 if (fsync(tdb->fd) != 0) {
579 tdb->ecode = TDB_ERR_IO;
580 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
585 tdb_off_t moffset = offset & ~(tdb->page_size-1);
586 if (msync(moffset + (char *)tdb->map_ptr,
587 length + (offset - moffset), MS_SYNC) != 0) {
588 tdb->ecode = TDB_ERR_IO;
589 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
600 work out how much space the linearised recovery data will consume
602 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
604 tdb_len_t recovery_size = 0;
607 recovery_size = sizeof(uint32_t);
608 for (i=0;i<tdb->transaction->num_blocks;i++) {
609 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
612 if (tdb->transaction->blocks[i] == NULL) {
615 recovery_size += 2*sizeof(tdb_off_t);
616 if (i == tdb->transaction->num_blocks-1) {
617 recovery_size += tdb->transaction->last_block_size;
619 recovery_size += tdb->transaction->block_size;
623 return recovery_size;
627 allocate the recovery area, or use an existing recovery area if it is
630 static int tdb_recovery_allocate(struct tdb_context *tdb,
631 tdb_len_t *recovery_size,
632 tdb_off_t *recovery_offset,
633 tdb_len_t *recovery_max_size)
635 struct list_struct rec;
636 const struct tdb_methods *methods = tdb->transaction->io_methods;
637 tdb_off_t recovery_head;
639 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
640 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
646 if (recovery_head != 0 &&
647 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
648 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
652 *recovery_size = tdb_recovery_size(tdb);
654 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
655 /* it fits in the existing area */
656 *recovery_max_size = rec.rec_len;
657 *recovery_offset = recovery_head;
661 /* we need to free up the old recovery area, then allocate a
662 new one at the end of the file. Note that we cannot use
663 tdb_allocate() to allocate the new one as that might return
664 us an area that is being currently used (as of the start of
666 if (recovery_head != 0) {
667 if (tdb_free(tdb, recovery_head, &rec) == -1) {
668 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
673 /* the tdb_free() call might have increased the recovery size */
674 *recovery_size = tdb_recovery_size(tdb);
676 /* round up to a multiple of page size */
677 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
678 *recovery_offset = tdb->map_size;
679 recovery_head = *recovery_offset;
681 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
682 (tdb->map_size - tdb->transaction->old_map_size) +
683 sizeof(rec) + *recovery_max_size) == -1) {
684 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
688 /* remap the file (if using mmap) */
689 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
691 /* we have to reset the old map size so that we don't try to expand the file
692 again in the transaction commit, which would destroy the recovery area */
693 tdb->transaction->old_map_size = tdb->map_size;
695 /* write the recovery header offset and sync - we can sync without a race here
696 as the magic ptr in the recovery record has not been set */
697 CONVERT(recovery_head);
698 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
699 &recovery_head, sizeof(tdb_off_t)) == -1) {
700 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
703 if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
704 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
713 setup the recovery data that will be used on a crash during commit
715 static int transaction_setup_recovery(struct tdb_context *tdb,
716 tdb_off_t *magic_offset)
718 tdb_len_t recovery_size;
719 unsigned char *data, *p;
720 const struct tdb_methods *methods = tdb->transaction->io_methods;
721 struct list_struct *rec;
722 tdb_off_t recovery_offset, recovery_max_size;
723 tdb_off_t old_map_size = tdb->transaction->old_map_size;
724 uint32_t magic, tailer;
728 check that the recovery area has enough space
730 if (tdb_recovery_allocate(tdb, &recovery_size,
731 &recovery_offset, &recovery_max_size) == -1) {
735 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
737 tdb->ecode = TDB_ERR_OOM;
741 rec = (struct list_struct *)data;
742 memset(rec, 0, sizeof(*rec));
745 rec->data_len = recovery_size;
746 rec->rec_len = recovery_max_size;
747 rec->key_len = old_map_size;
750 /* build the recovery data into a single blob to allow us to do a single
751 large write, which should be more efficient */
752 p = data + sizeof(*rec);
753 for (i=0;i<tdb->transaction->num_blocks;i++) {
757 if (tdb->transaction->blocks[i] == NULL) {
761 offset = i * tdb->transaction->block_size;
762 length = tdb->transaction->block_size;
763 if (i == tdb->transaction->num_blocks-1) {
764 length = tdb->transaction->last_block_size;
767 if (offset >= old_map_size) {
770 if (offset + length > tdb->transaction->old_map_size) {
771 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
773 tdb->ecode = TDB_ERR_CORRUPT;
776 memcpy(p, &offset, 4);
777 memcpy(p+4, &length, 4);
781 /* the recovery area contains the old data, not the
782 new data, so we have to call the original tdb_read
784 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
786 tdb->ecode = TDB_ERR_IO;
793 tailer = sizeof(*rec) + recovery_max_size;
794 memcpy(p, &tailer, 4);
797 /* write the recovery data to the recovery area */
798 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
799 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
801 tdb->ecode = TDB_ERR_IO;
804 if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
805 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
807 tdb->ecode = TDB_ERR_IO;
811 /* as we don't have ordered writes, we have to sync the recovery
812 data before we update the magic to indicate that the recovery
814 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
821 magic = TDB_RECOVERY_MAGIC;
824 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
826 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
827 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
828 tdb->ecode = TDB_ERR_IO;
831 if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
832 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
833 tdb->ecode = TDB_ERR_IO;
837 /* ensure the recovery magic marker is on disk */
838 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
846 commit the current transaction
848 int tdb_transaction_commit(struct tdb_context *tdb)
850 const struct tdb_methods *methods;
851 tdb_off_t magic_offset = 0;
855 tdb_trace(tdb, "tdb_transaction_commit");
856 if (tdb->transaction == NULL) {
857 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
861 if (tdb->transaction->transaction_error) {
862 tdb->ecode = TDB_ERR_IO;
863 tdb_transaction_cancel_internal(tdb);
864 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
869 if (tdb->transaction->nesting != 0) {
870 tdb->transaction->nesting--;
874 /* check for a null transaction */
875 if (tdb->transaction->blocks == NULL) {
876 tdb_transaction_cancel_internal(tdb);
880 methods = tdb->transaction->io_methods;
882 /* if there are any locks pending then the caller has not
883 nested their locks properly, so fail the transaction */
884 if (tdb->num_locks || tdb->global_lock.count) {
885 tdb->ecode = TDB_ERR_LOCK;
886 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
887 tdb_transaction_cancel_internal(tdb);
891 /* upgrade the main transaction lock region to a write lock */
892 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
893 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
894 tdb->ecode = TDB_ERR_LOCK;
895 tdb_transaction_cancel_internal(tdb);
899 /* get the global lock - this prevents new users attaching to the database
901 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
902 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
903 tdb->ecode = TDB_ERR_LOCK;
904 tdb_transaction_cancel_internal(tdb);
908 if (!(tdb->flags & TDB_NOSYNC)) {
909 /* write the recovery data to the end of the file */
910 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
911 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
912 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
913 tdb_transaction_cancel_internal(tdb);
918 /* expand the file to the new size if needed */
919 if (tdb->map_size != tdb->transaction->old_map_size) {
920 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
922 tdb->transaction->old_map_size) == -1) {
923 tdb->ecode = TDB_ERR_IO;
924 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
925 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
926 tdb_transaction_cancel_internal(tdb);
929 tdb->map_size = tdb->transaction->old_map_size;
930 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
933 /* perform all the writes */
934 for (i=0;i<tdb->transaction->num_blocks;i++) {
938 if (tdb->transaction->blocks[i] == NULL) {
942 offset = i * tdb->transaction->block_size;
943 length = tdb->transaction->block_size;
944 if (i == tdb->transaction->num_blocks-1) {
945 length = tdb->transaction->last_block_size;
948 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
949 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
951 /* we've overwritten part of the data and
952 possibly expanded the file, so we need to
953 run the crash recovery code */
954 tdb->methods = methods;
955 tdb_transaction_recover(tdb);
957 tdb_transaction_cancel_internal(tdb);
958 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
960 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
963 SAFE_FREE(tdb->transaction->blocks[i]);
966 SAFE_FREE(tdb->transaction->blocks);
967 tdb->transaction->num_blocks = 0;
969 if (!(tdb->flags & TDB_NOSYNC)) {
970 /* ensure the new data is on disk */
971 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
975 /* remove the recovery marker */
976 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
977 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
981 /* ensure the recovery marker has been removed on disk */
982 if (transaction_sync(tdb, magic_offset, 4) == -1) {
987 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
990 TODO: maybe write to some dummy hdr field, or write to magic
991 offset without mmap, before the last sync, instead of the
995 /* on some systems (like Linux 2.6.x) changes via mmap/msync
996 don't change the mtime of the file, this means the file may
997 not be backed up (as tdb rounding to block sizes means that
998 file size changes are quite rare too). The following forces
999 mtime changes when a transaction completes */
1001 utime(tdb->name, NULL);
1004 /* use a transaction cancel to free memory and remove the
1005 transaction locks */
1006 tdb_transaction_cancel_internal(tdb);
1013 recover from an aborted transaction. Must be called with exclusive
1014 database write access already established (including the global
1015 lock to prevent new processes attaching)
1017 int tdb_transaction_recover(struct tdb_context *tdb)
1019 tdb_off_t recovery_head, recovery_eof;
1020 unsigned char *data, *p;
1022 struct list_struct rec;
1024 /* find the recovery area */
1025 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1026 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1027 tdb->ecode = TDB_ERR_IO;
1031 if (recovery_head == 0) {
1032 /* we have never allocated a recovery record */
1036 /* read the recovery record */
1037 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1038 sizeof(rec), DOCONV()) == -1) {
1039 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1040 tdb->ecode = TDB_ERR_IO;
1044 if (rec.magic != TDB_RECOVERY_MAGIC) {
1045 /* there is no valid recovery data */
1049 if (tdb->read_only) {
1050 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1051 tdb->ecode = TDB_ERR_CORRUPT;
1055 recovery_eof = rec.key_len;
1057 data = (unsigned char *)malloc(rec.data_len);
1059 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1060 tdb->ecode = TDB_ERR_OOM;
1064 /* read the full recovery data */
1065 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1066 rec.data_len, 0) == -1) {
1067 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1068 tdb->ecode = TDB_ERR_IO;
1072 /* recover the file data */
1074 while (p+8 < data + rec.data_len) {
1080 memcpy(&len, p+4, 4);
1082 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1084 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1085 tdb->ecode = TDB_ERR_IO;
1093 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1094 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1095 tdb->ecode = TDB_ERR_IO;
1099 /* if the recovery area is after the recovered eof then remove it */
1100 if (recovery_eof <= recovery_head) {
1101 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1102 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1103 tdb->ecode = TDB_ERR_IO;
1108 /* remove the recovery magic */
1109 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1111 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1112 tdb->ecode = TDB_ERR_IO;
1116 /* reduce the file size to the old size */
1118 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1119 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1120 tdb->ecode = TDB_ERR_IO;
1123 tdb->map_size = recovery_eof;
1126 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1127 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1128 tdb->ecode = TDB_ERR_IO;
1132 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",