]> git.ozlabs.org Git - ccan/blobdiff - ccan/tdb2/tdb1_transaction.c
tdb2: make tests work in parallel.
[ccan] / ccan / tdb2 / tdb1_transaction.c
index cf77a35996d5543df258d0237e4682c0ac5f2d31..157642a6322bacb1314624a78901a6e27a9d9e6e 100644 (file)
@@ -138,14 +138,14 @@ struct tdb1_transaction {
   read while in a transaction. We need to check first if the data is in our list
   of transaction elements, then if not do a real read
 */
-static int transaction1_read(struct tdb1_context *tdb, tdb1_off_t off, void *buf,
+static int transaction1_read(struct tdb_context *tdb, tdb1_off_t off, void *buf,
                             tdb1_len_t len, int cv)
 {
        uint32_t blk;
 
        /* break it down into block sized ops */
-       while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
-               tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
+       while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
+               tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
                if (transaction1_read(tdb, off, buf, len2, cv) != 0) {
                        return -1;
                }
@@ -158,27 +158,27 @@ static int transaction1_read(struct tdb1_context *tdb, tdb1_off_t off, void *buf
                return 0;
        }
 
-       blk = off / tdb->transaction->block_size;
+       blk = off / tdb->tdb1.transaction->block_size;
 
        /* see if we have it in the block list */
-       if (tdb->transaction->num_blocks <= blk ||
-           tdb->transaction->blocks[blk] == NULL) {
+       if (tdb->tdb1.transaction->num_blocks <= blk ||
+           tdb->tdb1.transaction->blocks[blk] == NULL) {
                /* nope, do a real read */
-               if (tdb->transaction->io_methods->tdb1_read(tdb, off, buf, len, cv) != 0) {
+               if (tdb->tdb1.transaction->io_methods->tdb1_read(tdb, off, buf, len, cv) != 0) {
                        goto fail;
                }
                return 0;
        }
 
        /* it is in the block list. Now check for the last block */
-       if (blk == tdb->transaction->num_blocks-1) {
-               if (len > tdb->transaction->last_block_size) {
+       if (blk == tdb->tdb1.transaction->num_blocks-1) {
+               if (len > tdb->tdb1.transaction->last_block_size) {
                        goto fail;
                }
        }
 
        /* now copy it out of this block */
-       memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
+       memcpy(buf, tdb->tdb1.transaction->blocks[blk] + (off % tdb->tdb1.transaction->block_size), len);
        if (cv) {
                tdb1_convert(buf, len);
        }
@@ -188,7 +188,7 @@ fail:
        tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
                                "transaction_read: failed at off=%d len=%d",
                                off, len);
-       tdb->transaction->transaction_error = 1;
+       tdb->tdb1.transaction->transaction_error = 1;
        return -1;
 }
 
@@ -196,17 +196,17 @@ fail:
 /*
   write while in a transaction
 */
-static int transaction1_write(struct tdb1_context *tdb, tdb1_off_t off,
+static int transaction1_write(struct tdb_context *tdb, tdb1_off_t off,
                             const void *buf, tdb1_len_t len)
 {
        uint32_t blk;
 
        /* Only a commit is allowed on a prepared transaction */
-       if (tdb->transaction->prepared) {
+       if (tdb->tdb1.transaction->prepared) {
                tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
                                        "transaction_write: transaction already"
                                        " prepared, write not allowed");
-               tdb->transaction->transaction_error = 1;
+               tdb->tdb1.transaction->transaction_error = 1;
                return -1;
        }
 
@@ -215,12 +215,12 @@ static int transaction1_write(struct tdb1_context *tdb, tdb1_off_t off,
        if (len == sizeof(tdb1_off_t) && off >= TDB1_FREELIST_TOP &&
            off < TDB1_FREELIST_TOP+TDB1_HASHTABLE_SIZE(tdb)) {
                uint32_t chain = (off-TDB1_FREELIST_TOP) / sizeof(tdb1_off_t);
-               memcpy(&tdb->transaction->hash_heads[chain], buf, len);
+               memcpy(&tdb->tdb1.transaction->hash_heads[chain], buf, len);
        }
 
        /* break it up into block sized chunks */
-       while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
-               tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
+       while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
+               tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
                if (transaction1_write(tdb, off, buf, len2) != 0) {
                        return -1;
                }
@@ -235,66 +235,66 @@ static int transaction1_write(struct tdb1_context *tdb, tdb1_off_t off,
                return 0;
        }
 
-       blk = off / tdb->transaction->block_size;
-       off = off % tdb->transaction->block_size;
+       blk = off / tdb->tdb1.transaction->block_size;
+       off = off % tdb->tdb1.transaction->block_size;
 
-       if (tdb->transaction->num_blocks <= blk) {
+       if (tdb->tdb1.transaction->num_blocks <= blk) {
                uint8_t **new_blocks;
                /* expand the blocks array */
-               if (tdb->transaction->blocks == NULL) {
+               if (tdb->tdb1.transaction->blocks == NULL) {
                        new_blocks = (uint8_t **)malloc(
                                (blk+1)*sizeof(uint8_t *));
                } else {
                        new_blocks = (uint8_t **)realloc(
-                               tdb->transaction->blocks,
+                               tdb->tdb1.transaction->blocks,
                                (blk+1)*sizeof(uint8_t *));
                }
                if (new_blocks == NULL) {
                        tdb->last_error = TDB_ERR_OOM;
                        goto fail;
                }
-               memset(&new_blocks[tdb->transaction->num_blocks], 0,
-                      (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
-               tdb->transaction->blocks = new_blocks;
-               tdb->transaction->num_blocks = blk+1;
-               tdb->transaction->last_block_size = 0;
+               memset(&new_blocks[tdb->tdb1.transaction->num_blocks], 0,
+                      (1+(blk - tdb->tdb1.transaction->num_blocks))*sizeof(uint8_t *));
+               tdb->tdb1.transaction->blocks = new_blocks;
+               tdb->tdb1.transaction->num_blocks = blk+1;
+               tdb->tdb1.transaction->last_block_size = 0;
        }
 
        /* allocate and fill a block? */
-       if (tdb->transaction->blocks[blk] == NULL) {
-               tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
-               if (tdb->transaction->blocks[blk] == NULL) {
+       if (tdb->tdb1.transaction->blocks[blk] == NULL) {
+               tdb->tdb1.transaction->blocks[blk] = (uint8_t *)calloc(tdb->tdb1.transaction->block_size, 1);
+               if (tdb->tdb1.transaction->blocks[blk] == NULL) {
                        tdb->last_error = TDB_ERR_OOM;
-                       tdb->transaction->transaction_error = 1;
+                       tdb->tdb1.transaction->transaction_error = 1;
                        return -1;
                }
-               if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
-                       tdb1_len_t len2 = tdb->transaction->block_size;
-                       if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
-                               len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
+               if (tdb->tdb1.transaction->old_map_size > blk * tdb->tdb1.transaction->block_size) {
+                       tdb1_len_t len2 = tdb->tdb1.transaction->block_size;
+                       if (len2 + (blk * tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->old_map_size) {
+                               len2 = tdb->tdb1.transaction->old_map_size - (blk * tdb->tdb1.transaction->block_size);
                        }
-                       if (tdb->transaction->io_methods->tdb1_read(tdb, blk * tdb->transaction->block_size,
-                                                                  tdb->transaction->blocks[blk],
+                       if (tdb->tdb1.transaction->io_methods->tdb1_read(tdb, blk * tdb->tdb1.transaction->block_size,
+                                                                  tdb->tdb1.transaction->blocks[blk],
                                                                   len2, 0) != 0) {
-                               SAFE_FREE(tdb->transaction->blocks[blk]);
+                               SAFE_FREE(tdb->tdb1.transaction->blocks[blk]);
                                tdb->last_error = TDB_ERR_IO;
                                goto fail;
                        }
-                       if (blk == tdb->transaction->num_blocks-1) {
-                               tdb->transaction->last_block_size = len2;
+                       if (blk == tdb->tdb1.transaction->num_blocks-1) {
+                               tdb->tdb1.transaction->last_block_size = len2;
                        }
                }
        }
 
        /* overwrite part of an existing block */
        if (buf == NULL) {
-               memset(tdb->transaction->blocks[blk] + off, 0, len);
+               memset(tdb->tdb1.transaction->blocks[blk] + off, 0, len);
        } else {
-               memcpy(tdb->transaction->blocks[blk] + off, buf, len);
+               memcpy(tdb->tdb1.transaction->blocks[blk] + off, buf, len);
        }
-       if (blk == tdb->transaction->num_blocks-1) {
-               if (len + off > tdb->transaction->last_block_size) {
-                       tdb->transaction->last_block_size = len + off;
+       if (blk == tdb->tdb1.transaction->num_blocks-1) {
+               if (len + off > tdb->tdb1.transaction->last_block_size) {
+                       tdb->tdb1.transaction->last_block_size = len + off;
                }
        }
 
@@ -303,8 +303,8 @@ static int transaction1_write(struct tdb1_context *tdb, tdb1_off_t off,
 fail:
        tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
                   "transaction_write: failed at off=%d len=%d",
-                  (blk*tdb->transaction->block_size) + off, len);
-       tdb->transaction->transaction_error = 1;
+                  (blk*tdb->tdb1.transaction->block_size) + off, len);
+       tdb->tdb1.transaction->transaction_error = 1;
        return -1;
 }
 
@@ -313,14 +313,14 @@ fail:
   write while in a transaction - this varient never expands the transaction blocks, it only
   updates existing blocks. This means it cannot change the recovery size
 */
-static int transaction1_write_existing(struct tdb1_context *tdb, tdb1_off_t off,
+static int transaction1_write_existing(struct tdb_context *tdb, tdb1_off_t off,
                                      const void *buf, tdb1_len_t len)
 {
        uint32_t blk;
 
        /* break it up into block sized chunks */
-       while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
-               tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
+       while (len + (off % tdb->tdb1.transaction->block_size) > tdb->tdb1.transaction->block_size) {
+               tdb1_len_t len2 = tdb->tdb1.transaction->block_size - (off % tdb->tdb1.transaction->block_size);
                if (transaction1_write_existing(tdb, off, buf, len2) != 0) {
                        return -1;
                }
@@ -335,24 +335,24 @@ static int transaction1_write_existing(struct tdb1_context *tdb, tdb1_off_t off,
                return 0;
        }
 
-       blk = off / tdb->transaction->block_size;
-       off = off % tdb->transaction->block_size;
+       blk = off / tdb->tdb1.transaction->block_size;
+       off = off % tdb->tdb1.transaction->block_size;
 
-       if (tdb->transaction->num_blocks <= blk ||
-           tdb->transaction->blocks[blk] == NULL) {
+       if (tdb->tdb1.transaction->num_blocks <= blk ||
+           tdb->tdb1.transaction->blocks[blk] == NULL) {
                return 0;
        }
 
-       if (blk == tdb->transaction->num_blocks-1 &&
-           off + len > tdb->transaction->last_block_size) {
-               if (off >= tdb->transaction->last_block_size) {
+       if (blk == tdb->tdb1.transaction->num_blocks-1 &&
+           off + len > tdb->tdb1.transaction->last_block_size) {
+               if (off >= tdb->tdb1.transaction->last_block_size) {
                        return 0;
                }
-               len = tdb->transaction->last_block_size - off;
+               len = tdb->tdb1.transaction->last_block_size - off;
        }
 
        /* overwrite part of an existing block */
-       memcpy(tdb->transaction->blocks[blk] + off, buf, len);
+       memcpy(tdb->tdb1.transaction->blocks[blk] + off, buf, len);
 
        return 0;
 }
@@ -361,12 +361,12 @@ static int transaction1_write_existing(struct tdb1_context *tdb, tdb1_off_t off,
 /*
   accelerated hash chain head search, using the cached hash heads
 */
-static void transaction1_next_hash_chain(struct tdb1_context *tdb, uint32_t *chain)
+static void transaction1_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
 {
        uint32_t h = *chain;
-       for (;h < tdb->header.hash_size;h++) {
+       for (;h < tdb->tdb1.header.hash_size;h++) {
                /* the +1 takes account of the freelist */
-               if (0 != tdb->transaction->hash_heads[h+1]) {
+               if (0 != tdb->tdb1.transaction->hash_heads[h+1]) {
                        break;
                }
        }
@@ -376,7 +376,7 @@ static void transaction1_next_hash_chain(struct tdb1_context *tdb, uint32_t *cha
 /*
   out of bounds check during a transaction
 */
-static int transaction1_oob(struct tdb1_context *tdb, tdb1_off_t len, int probe)
+static int transaction1_oob(struct tdb_context *tdb, tdb1_off_t len, int probe)
 {
        if (len <= tdb->file->map_size) {
                return 0;
@@ -388,7 +388,7 @@ static int transaction1_oob(struct tdb1_context *tdb, tdb1_off_t len, int probe)
 /*
   transaction version of tdb1_expand().
 */
-static int transaction1_expand_file(struct tdb1_context *tdb, tdb1_off_t size,
+static int transaction1_expand_file(struct tdb_context *tdb, tdb1_off_t size,
                                    tdb1_off_t addition)
 {
        /* add a write to the transaction elements, so subsequent
@@ -397,7 +397,7 @@ static int transaction1_expand_file(struct tdb1_context *tdb, tdb1_off_t size,
                return -1;
        }
 
-       tdb->transaction->expanded = true;
+       tdb->tdb1.transaction->expanded = true;
 
        return 0;
 }
@@ -413,26 +413,43 @@ static const struct tdb1_methods transaction1_methods = {
 
 /*
   start a tdb transaction. No token is returned, as only a single
-  transaction is allowed to be pending per tdb1_context
+  transaction is allowed to be pending per tdb_context
 */
-static int _tdb1_transaction_start(struct tdb1_context *tdb)
+static int _tdb1_transaction_start(struct tdb_context *tdb)
 {
        /* some sanity checks */
-       if ((tdb->flags & TDB_RDONLY) || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
-               tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
-                                       "tdb1_transaction_start: cannot start a"
-                                       " transaction on a read-only or"
-                                       " internal db");
+       if (tdb->flags & TDB_INTERNAL) {
+               tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
+                                            TDB_LOG_USE_ERROR,
+                                            "tdb1_transaction_start:"
+                                            " cannot start a"
+                                            " transaction on an"
+                                            " internal tdb");
+               return -1;
+       }
+
+       if ((tdb->flags & TDB_RDONLY) || tdb->tdb1.traverse_read) {
+               tdb->last_error = tdb_logerr(tdb, TDB_ERR_RDONLY,
+                                            TDB_LOG_USE_ERROR,
+                                            "tdb_transaction_start:"
+                                            " cannot start a"
+                                            " transaction on a "
+                                            " read-only tdb");
                return -1;
        }
 
        /* cope with nested tdb1_transaction_start() calls */
-       if (tdb->transaction != NULL) {
+       if (tdb->tdb1.transaction != NULL) {
                if (!(tdb->flags & TDB_ALLOW_NESTING)) {
-                       tdb->last_error = TDB_ERR_EINVAL;
+                       tdb->last_error
+                               = tdb_logerr(tdb, TDB_ERR_EINVAL,
+                                            TDB_LOG_USE_ERROR,
+                                            "tdb_transaction_start:"
+                                            " already inside transaction");
                        return -1;
                }
-               tdb->transaction->nesting++;
+               tdb->stats.transaction_nest++;
+               tdb->tdb1.transaction->nesting++;
                return 0;
        }
 
@@ -446,7 +463,7 @@ static int _tdb1_transaction_start(struct tdb1_context *tdb)
                return -1;
        }
 
-       if (tdb->travlocks.next != NULL) {
+       if (tdb->tdb1.travlocks.next != NULL) {
                /* you cannot use transactions inside a traverse (although you can use
                   traverse inside a transaction) as otherwise you can end up with
                   deadlock */
@@ -456,22 +473,22 @@ static int _tdb1_transaction_start(struct tdb1_context *tdb)
                return -1;
        }
 
-       tdb->transaction = (struct tdb1_transaction *)
+       tdb->tdb1.transaction = (struct tdb1_transaction *)
                calloc(sizeof(struct tdb1_transaction), 1);
-       if (tdb->transaction == NULL) {
+       if (tdb->tdb1.transaction == NULL) {
                tdb->last_error = TDB_ERR_OOM;
                return -1;
        }
 
        /* a page at a time seems like a reasonable compromise between compactness and efficiency */
-       tdb->transaction->block_size = tdb->page_size;
+       tdb->tdb1.transaction->block_size = tdb->tdb1.page_size;
 
        /* get the transaction write lock. This is a blocking lock. As
           discussed with Volker, there are a number of ways we could
           make this async, which we will probably do in the future */
        if (tdb1_transaction_lock(tdb, F_WRLCK, TDB_LOCK_WAIT) == -1) {
-               SAFE_FREE(tdb->transaction->blocks);
-               SAFE_FREE(tdb->transaction);
+               SAFE_FREE(tdb->tdb1.transaction->blocks);
+               SAFE_FREE(tdb->tdb1.transaction);
                return -1;
        }
 
@@ -488,13 +505,13 @@ static int _tdb1_transaction_start(struct tdb1_context *tdb)
 
        /* setup a copy of the hash table heads so the hash scan in
           traverse can be fast */
-       tdb->transaction->hash_heads = (uint32_t *)
-               calloc(tdb->header.hash_size+1, sizeof(uint32_t));
-       if (tdb->transaction->hash_heads == NULL) {
+       tdb->tdb1.transaction->hash_heads = (uint32_t *)
+               calloc(tdb->tdb1.header.hash_size+1, sizeof(uint32_t));
+       if (tdb->tdb1.transaction->hash_heads == NULL) {
                tdb->last_error = TDB_ERR_OOM;
                goto fail;
        }
-       if (tdb->methods->tdb1_read(tdb, TDB1_FREELIST_TOP, tdb->transaction->hash_heads,
+       if (tdb->tdb1.io->tdb1_read(tdb, TDB1_FREELIST_TOP, tdb->tdb1.transaction->hash_heads,
                                   TDB1_HASHTABLE_SIZE(tdb), 0) != 0) {
                tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
                           "tdb1_transaction_start: failed to read hash heads");
@@ -503,27 +520,28 @@ static int _tdb1_transaction_start(struct tdb1_context *tdb)
 
        /* make sure we know about any file expansions already done by
           anyone else */
-       tdb->methods->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
-       tdb->transaction->old_map_size = tdb->file->map_size;
+       tdb->tdb1.io->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
+       tdb->tdb1.transaction->old_map_size = tdb->file->map_size;
 
        /* finally hook the io methods, replacing them with
           transaction specific methods */
-       tdb->transaction->io_methods = tdb->methods;
-       tdb->methods = &transaction1_methods;
+       tdb->tdb1.transaction->io_methods = tdb->tdb1.io;
+       tdb->tdb1.io = &transaction1_methods;
 
+       tdb->stats.transactions++;
        return 0;
 
 fail:
        tdb1_allrecord_unlock(tdb, F_RDLCK);
 fail_allrecord_lock:
        tdb1_transaction_unlock(tdb, F_WRLCK);
-       SAFE_FREE(tdb->transaction->blocks);
-       SAFE_FREE(tdb->transaction->hash_heads);
-       SAFE_FREE(tdb->transaction);
+       SAFE_FREE(tdb->tdb1.transaction->blocks);
+       SAFE_FREE(tdb->tdb1.transaction->hash_heads);
+       SAFE_FREE(tdb->tdb1.transaction);
        return -1;
 }
 
-int tdb1_transaction_start(struct tdb1_context *tdb)
+int tdb1_transaction_start(struct tdb_context *tdb)
 {
        return _tdb1_transaction_start(tdb);
 }
@@ -531,7 +549,7 @@ int tdb1_transaction_start(struct tdb1_context *tdb)
 /*
   sync to disk
 */
-static int transaction1_sync(struct tdb1_context *tdb, tdb1_off_t offset, tdb1_len_t length)
+static int transaction1_sync(struct tdb_context *tdb, tdb1_off_t offset, tdb1_len_t length)
 {
        if (tdb->flags & TDB_NOSYNC) {
                return 0;
@@ -548,7 +566,7 @@ static int transaction1_sync(struct tdb1_context *tdb, tdb1_off_t offset, tdb1_l
        }
 #if HAVE_MMAP
        if (tdb->file->map_ptr) {
-               tdb1_off_t moffset = offset & ~(tdb->page_size-1);
+               tdb1_off_t moffset = offset & ~(tdb->tdb1.page_size-1);
                if (msync(moffset + (char *)tdb->file->map_ptr,
                          length + (offset - moffset), MS_SYNC) != 0) {
                        tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
@@ -563,40 +581,40 @@ static int transaction1_sync(struct tdb1_context *tdb, tdb1_off_t offset, tdb1_l
 }
 
 
-static int _tdb1_transaction_cancel(struct tdb1_context *tdb)
+static int _tdb1_transaction_cancel(struct tdb_context *tdb)
 {
        int i, ret = 0;
 
-       if (tdb->transaction == NULL) {
+       if (tdb->tdb1.transaction == NULL) {
                tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
                                        "tdb1_transaction_cancel:"
                                        " no transaction");
                return -1;
        }
 
-       if (tdb->transaction->nesting != 0) {
-               tdb->transaction->transaction_error = 1;
-               tdb->transaction->nesting--;
+       if (tdb->tdb1.transaction->nesting != 0) {
+               tdb->tdb1.transaction->transaction_error = 1;
+               tdb->tdb1.transaction->nesting--;
                return 0;
        }
 
-       tdb->file->map_size = tdb->transaction->old_map_size;
+       tdb->file->map_size = tdb->tdb1.transaction->old_map_size;
 
        /* free all the transaction blocks */
-       for (i=0;i<tdb->transaction->num_blocks;i++) {
-               if (tdb->transaction->blocks[i] != NULL) {
-                       free(tdb->transaction->blocks[i]);
+       for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
+               if (tdb->tdb1.transaction->blocks[i] != NULL) {
+                       free(tdb->tdb1.transaction->blocks[i]);
                }
        }
-       SAFE_FREE(tdb->transaction->blocks);
+       SAFE_FREE(tdb->tdb1.transaction->blocks);
 
-       if (tdb->transaction->magic_offset) {
-               const struct tdb1_methods *methods = tdb->transaction->io_methods;
+       if (tdb->tdb1.transaction->magic_offset) {
+               const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
                const uint32_t invalid = TDB1_RECOVERY_INVALID_MAGIC;
 
                /* remove the recovery marker */
-               if (methods->tdb1_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
-               transaction1_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
+               if (methods->tdb1_write(tdb, tdb->tdb1.transaction->magic_offset, &invalid, 4) == -1 ||
+               transaction1_sync(tdb, tdb->tdb1.transaction->magic_offset, 4) == -1) {
                        tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
                                   "tdb1_transaction_cancel: failed to"
                                   " remove recovery magic");
@@ -608,10 +626,10 @@ static int _tdb1_transaction_cancel(struct tdb1_context *tdb)
        tdb1_release_transaction_locks(tdb);
 
        /* restore the normal io methods */
-       tdb->methods = tdb->transaction->io_methods;
+       tdb->tdb1.io = tdb->tdb1.transaction->io_methods;
 
-       SAFE_FREE(tdb->transaction->hash_heads);
-       SAFE_FREE(tdb->transaction);
+       SAFE_FREE(tdb->tdb1.transaction->hash_heads);
+       SAFE_FREE(tdb->tdb1.transaction);
 
        return ret;
 }
@@ -619,39 +637,40 @@ static int _tdb1_transaction_cancel(struct tdb1_context *tdb)
 /*
   cancel the current transaction
 */
-int tdb1_transaction_cancel(struct tdb1_context *tdb)
+int tdb1_transaction_cancel(struct tdb_context *tdb)
 {
+       tdb->stats.transaction_cancel++;
        return _tdb1_transaction_cancel(tdb);
 }
 
 /*
   work out how much space the linearised recovery data will consume
 */
-static tdb1_len_t tdb1_recovery_size(struct tdb1_context *tdb)
+static tdb1_len_t tdb1_recovery_size(struct tdb_context *tdb)
 {
        tdb1_len_t recovery_size = 0;
        int i;
 
        recovery_size = sizeof(uint32_t);
-       for (i=0;i<tdb->transaction->num_blocks;i++) {
-               if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
+       for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
+               if (i * tdb->tdb1.transaction->block_size >= tdb->tdb1.transaction->old_map_size) {
                        break;
                }
-               if (tdb->transaction->blocks[i] == NULL) {
+               if (tdb->tdb1.transaction->blocks[i] == NULL) {
                        continue;
                }
                recovery_size += 2*sizeof(tdb1_off_t);
-               if (i == tdb->transaction->num_blocks-1) {
-                       recovery_size += tdb->transaction->last_block_size;
+               if (i == tdb->tdb1.transaction->num_blocks-1) {
+                       recovery_size += tdb->tdb1.transaction->last_block_size;
                } else {
-                       recovery_size += tdb->transaction->block_size;
+                       recovery_size += tdb->tdb1.transaction->block_size;
                }
        }
 
        return recovery_size;
 }
 
-int tdb1_recovery_area(struct tdb1_context *tdb,
+int tdb1_recovery_area(struct tdb_context *tdb,
                      const struct tdb1_methods *methods,
                      tdb1_off_t *recovery_offset,
                      struct tdb1_record *rec)
@@ -683,13 +702,13 @@ int tdb1_recovery_area(struct tdb1_context *tdb,
   allocate the recovery area, or use an existing recovery area if it is
   large enough
 */
-static int tdb1_recovery_allocate(struct tdb1_context *tdb,
+static int tdb1_recovery_allocate(struct tdb_context *tdb,
                                 tdb1_len_t *recovery_size,
                                 tdb1_off_t *recovery_offset,
                                 tdb1_len_t *recovery_max_size)
 {
        struct tdb1_record rec;
-       const struct tdb1_methods *methods = tdb->transaction->io_methods;
+       const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
        tdb1_off_t recovery_head;
 
        if (tdb1_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
@@ -726,25 +745,27 @@ static int tdb1_recovery_allocate(struct tdb1_context *tdb,
        *recovery_size = tdb1_recovery_size(tdb);
 
        /* round up to a multiple of page size */
-       *recovery_max_size = TDB1_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
+       *recovery_max_size = TDB1_ALIGN(sizeof(rec) + *recovery_size,
+                                       tdb->tdb1.page_size) - sizeof(rec);
        *recovery_offset = tdb->file->map_size;
        recovery_head = *recovery_offset;
 
-       if (methods->tdb1_expand_file(tdb, tdb->transaction->old_map_size,
-                                    (tdb->file->map_size - tdb->transaction->old_map_size) +
+       if (methods->tdb1_expand_file(tdb, tdb->tdb1.transaction->old_map_size,
+                                    (tdb->file->map_size - tdb->tdb1.transaction->old_map_size) +
                                     sizeof(rec) + *recovery_max_size) == -1) {
                tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
                           "tdb1_recovery_allocate:"
                           " failed to create recovery area");
                return -1;
        }
+       tdb->stats.transaction_expand_file++;
 
        /* remap the file (if using mmap) */
        methods->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
 
        /* we have to reset the old map size so that we don't try to expand the file
           again in the transaction commit, which would destroy the recovery area */
-       tdb->transaction->old_map_size = tdb->file->map_size;
+       tdb->tdb1.transaction->old_map_size = tdb->file->map_size;
 
        /* write the recovery header offset and sync - we can sync without a race here
           as the magic ptr in the recovery record has not been set */
@@ -770,15 +791,15 @@ static int tdb1_recovery_allocate(struct tdb1_context *tdb,
 /*
   setup the recovery data that will be used on a crash during commit
 */
-static int transaction1_setup_recovery(struct tdb1_context *tdb,
+static int transaction1_setup_recovery(struct tdb_context *tdb,
                                       tdb1_off_t *magic_offset)
 {
        tdb1_len_t recovery_size;
        unsigned char *data, *p;
-       const struct tdb1_methods *methods = tdb->transaction->io_methods;
+       const struct tdb1_methods *methods = tdb->tdb1.transaction->io_methods;
        struct tdb1_record *rec;
        tdb1_off_t recovery_offset, recovery_max_size;
-       tdb1_off_t old_map_size = tdb->transaction->old_map_size;
+       tdb1_off_t old_map_size = tdb->tdb1.transaction->old_map_size;
        uint32_t magic, tailer;
        int i;
 
@@ -808,24 +829,24 @@ static int transaction1_setup_recovery(struct tdb1_context *tdb,
        /* build the recovery data into a single blob to allow us to do a single
           large write, which should be more efficient */
        p = data + sizeof(*rec);
-       for (i=0;i<tdb->transaction->num_blocks;i++) {
+       for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
                tdb1_off_t offset;
                tdb1_len_t length;
 
-               if (tdb->transaction->blocks[i] == NULL) {
+               if (tdb->tdb1.transaction->blocks[i] == NULL) {
                        continue;
                }
 
-               offset = i * tdb->transaction->block_size;
-               length = tdb->transaction->block_size;
-               if (i == tdb->transaction->num_blocks-1) {
-                       length = tdb->transaction->last_block_size;
+               offset = i * tdb->tdb1.transaction->block_size;
+               length = tdb->tdb1.transaction->block_size;
+               if (i == tdb->tdb1.transaction->num_blocks-1) {
+                       length = tdb->tdb1.transaction->last_block_size;
                }
 
                if (offset >= old_map_size) {
                        continue;
                }
-               if (offset + length > tdb->transaction->old_map_size) {
+               if (offset + length > tdb->tdb1.transaction->old_map_size) {
                        tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT,
                                                TDB_LOG_ERROR,
                                                "tdb1_transaction_setup_recovery: transaction data over new region boundary");
@@ -907,18 +928,18 @@ static int transaction1_setup_recovery(struct tdb1_context *tdb,
        return 0;
 }
 
-static int _tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
+static int _tdb1_transaction_prepare_commit(struct tdb_context *tdb)
 {
        const struct tdb1_methods *methods;
 
-       if (tdb->transaction == NULL) {
+       if (tdb->tdb1.transaction == NULL) {
                tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
                                        "tdb1_transaction_prepare_commit:"
                                        " no transaction");
                return -1;
        }
 
-       if (tdb->transaction->prepared) {
+       if (tdb->tdb1.transaction->prepared) {
                tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
                                        "tdb1_transaction_prepare_commit:"
                                        " transaction already prepared");
@@ -926,7 +947,7 @@ static int _tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
                return -1;
        }
 
-       if (tdb->transaction->transaction_error) {
+       if (tdb->tdb1.transaction->transaction_error) {
                tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
                                        "tdb1_transaction_prepare_commit:"
                                        " transaction error pending");
@@ -935,16 +956,16 @@ static int _tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
        }
 
 
-       if (tdb->transaction->nesting != 0) {
+       if (tdb->tdb1.transaction->nesting != 0) {
                return 0;
        }
 
        /* check for a null transaction */
-       if (tdb->transaction->blocks == NULL) {
+       if (tdb->tdb1.transaction->blocks == NULL) {
                return 0;
        }
 
-       methods = tdb->transaction->io_methods;
+       methods = tdb->tdb1.transaction->io_methods;
 
        /* if there are any locks pending then the caller has not
           nested their locks properly, so fail the transaction */
@@ -979,7 +1000,7 @@ static int _tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
 
        if (!(tdb->flags & TDB_NOSYNC)) {
                /* write the recovery data to the end of the file */
-               if (transaction1_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
+               if (transaction1_setup_recovery(tdb, &tdb->tdb1.transaction->magic_offset) == -1) {
                        tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
                                   "tdb1_transaction_prepare_commit:"
                                   " failed to setup recovery data");
@@ -987,19 +1008,20 @@ static int _tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
                }
        }
 
-       tdb->transaction->prepared = true;
+       tdb->tdb1.transaction->prepared = true;
 
        /* expand the file to the new size if needed */
-       if (tdb->file->map_size != tdb->transaction->old_map_size) {
-               if (methods->tdb1_expand_file(tdb, tdb->transaction->old_map_size,
+       if (tdb->file->map_size != tdb->tdb1.transaction->old_map_size) {
+               if (methods->tdb1_expand_file(tdb, tdb->tdb1.transaction->old_map_size,
                                             tdb->file->map_size -
-                                            tdb->transaction->old_map_size) == -1) {
+                                            tdb->tdb1.transaction->old_map_size) == -1) {
                        tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
                                   "tdb1_transaction_prepare_commit:"
                                   " expansion failed");
                        return -1;
                }
-               tdb->file->map_size = tdb->transaction->old_map_size;
+               tdb->stats.transaction_expand_file++;
+               tdb->file->map_size = tdb->tdb1.transaction->old_map_size;
                methods->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
        }
 
@@ -1011,13 +1033,13 @@ static int _tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
 /*
    prepare to commit the current transaction
 */
-int tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
+int tdb1_transaction_prepare_commit(struct tdb_context *tdb)
 {
        return _tdb1_transaction_prepare_commit(tdb);
 }
 
 /* A repack is worthwhile if the largest is less than half total free. */
-static bool repack_worthwhile(struct tdb1_context *tdb)
+static bool repack_worthwhile(struct tdb_context *tdb)
 {
        tdb1_off_t ptr;
        struct tdb1_record rec;
@@ -1041,20 +1063,20 @@ static bool repack_worthwhile(struct tdb1_context *tdb)
 /*
   commit the current transaction
 */
-int tdb1_transaction_commit(struct tdb1_context *tdb)
+int tdb1_transaction_commit(struct tdb_context *tdb)
 {
        const struct tdb1_methods *methods;
        int i;
        bool need_repack = false;
 
-       if (tdb->transaction == NULL) {
+       if (tdb->tdb1.transaction == NULL) {
                tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
                                        "tdb1_transaction_commit:"
                                        " no transaction");
                return -1;
        }
 
-       if (tdb->transaction->transaction_error) {
+       if (tdb->tdb1.transaction->transaction_error) {
                tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
                                        "tdb1_transaction_commit:"
                                        " transaction error pending");
@@ -1063,18 +1085,18 @@ int tdb1_transaction_commit(struct tdb1_context *tdb)
        }
 
 
-       if (tdb->transaction->nesting != 0) {
-               tdb->transaction->nesting--;
+       if (tdb->tdb1.transaction->nesting != 0) {
+               tdb->tdb1.transaction->nesting--;
                return 0;
        }
 
        /* check for a null transaction */
-       if (tdb->transaction->blocks == NULL) {
+       if (tdb->tdb1.transaction->blocks == NULL) {
                _tdb1_transaction_cancel(tdb);
                return 0;
        }
 
-       if (!tdb->transaction->prepared) {
+       if (!tdb->tdb1.transaction->prepared) {
                int ret = _tdb1_transaction_prepare_commit(tdb);
                if (ret) {
                        _tdb1_transaction_cancel(tdb);
@@ -1082,24 +1104,24 @@ int tdb1_transaction_commit(struct tdb1_context *tdb)
                }
        }
 
-       methods = tdb->transaction->io_methods;
+       methods = tdb->tdb1.transaction->io_methods;
 
        /* perform all the writes */
-       for (i=0;i<tdb->transaction->num_blocks;i++) {
+       for (i=0;i<tdb->tdb1.transaction->num_blocks;i++) {
                tdb1_off_t offset;
                tdb1_len_t length;
 
-               if (tdb->transaction->blocks[i] == NULL) {
+               if (tdb->tdb1.transaction->blocks[i] == NULL) {
                        continue;
                }
 
-               offset = i * tdb->transaction->block_size;
-               length = tdb->transaction->block_size;
-               if (i == tdb->transaction->num_blocks-1) {
-                       length = tdb->transaction->last_block_size;
+               offset = i * tdb->tdb1.transaction->block_size;
+               length = tdb->tdb1.transaction->block_size;
+               if (i == tdb->tdb1.transaction->num_blocks-1) {
+                       length = tdb->tdb1.transaction->last_block_size;
                }
 
-               if (methods->tdb1_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
+               if (methods->tdb1_write(tdb, offset, tdb->tdb1.transaction->blocks[i], length) == -1) {
                        tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
                                   "tdb1_transaction_commit:"
                                   " write failed during commit");
@@ -1107,7 +1129,7 @@ int tdb1_transaction_commit(struct tdb1_context *tdb)
                        /* we've overwritten part of the data and
                           possibly expanded the file, so we need to
                           run the crash recovery code */
-                       tdb->methods = methods;
+                       tdb->tdb1.io = methods;
                        tdb1_transaction_recover(tdb);
 
                        _tdb1_transaction_cancel(tdb);
@@ -1116,16 +1138,16 @@ int tdb1_transaction_commit(struct tdb1_context *tdb)
                                   "tdb1_transaction_commit: write failed");
                        return -1;
                }
-               SAFE_FREE(tdb->transaction->blocks[i]);
+               SAFE_FREE(tdb->tdb1.transaction->blocks[i]);
        }
 
        /* Do this before we drop lock or blocks. */
-       if (tdb->transaction->expanded) {
+       if (tdb->tdb1.transaction->expanded) {
                need_repack = repack_worthwhile(tdb);
        }
 
-       SAFE_FREE(tdb->transaction->blocks);
-       tdb->transaction->num_blocks = 0;
+       SAFE_FREE(tdb->tdb1.transaction->blocks);
+       tdb->tdb1.transaction->num_blocks = 0;
 
        /* ensure the new data is on disk */
        if (transaction1_sync(tdb, 0, tdb->file->map_size) == -1) {
@@ -1152,7 +1174,8 @@ int tdb1_transaction_commit(struct tdb1_context *tdb)
        _tdb1_transaction_cancel(tdb);
 
        if (need_repack) {
-               return tdb1_repack(tdb);
+               if (tdb_repack(tdb) != 0)
+                       return -1;
        }
 
        return 0;
@@ -1164,7 +1187,7 @@ int tdb1_transaction_commit(struct tdb1_context *tdb)
   database write access already established (including the open
   lock to prevent new processes attaching)
 */
-int tdb1_transaction_recover(struct tdb1_context *tdb)
+int tdb1_transaction_recover(struct tdb_context *tdb)
 {
        tdb1_off_t recovery_head, recovery_eof;
        unsigned char *data, *p;
@@ -1185,7 +1208,7 @@ int tdb1_transaction_recover(struct tdb1_context *tdb)
        }
 
        /* read the recovery record */
-       if (tdb->methods->tdb1_read(tdb, recovery_head, &rec,
+       if (tdb->tdb1.io->tdb1_read(tdb, recovery_head, &rec,
                                   sizeof(rec), TDB1_DOCONV()) == -1) {
                tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
                           "tdb1_transaction_recover:"
@@ -1217,7 +1240,7 @@ int tdb1_transaction_recover(struct tdb1_context *tdb)
        }
 
        /* read the full recovery data */
-       if (tdb->methods->tdb1_read(tdb, recovery_head + sizeof(rec), data,
+       if (tdb->tdb1.io->tdb1_read(tdb, recovery_head + sizeof(rec), data,
                                   rec.data_len, 0) == -1) {
                tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
                           "tdb1_transaction_recover:"
@@ -1235,7 +1258,7 @@ int tdb1_transaction_recover(struct tdb1_context *tdb)
                memcpy(&ofs, p, 4);
                memcpy(&len, p+4, 4);
 
-               if (tdb->methods->tdb1_write(tdb, ofs, p+8, len) == -1) {
+               if (tdb->tdb1.io->tdb1_write(tdb, ofs, p+8, len) == -1) {
                        free(data);
                        tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
                                   "tdb1_transaction_recover: failed to recover"
@@ -1288,14 +1311,14 @@ int tdb1_transaction_recover(struct tdb1_context *tdb)
 }
 
 /* Any I/O failures we say "needs recovery". */
-bool tdb1_needs_recovery(struct tdb1_context *tdb)
+tdb_bool_err tdb1_needs_recovery(struct tdb_context *tdb)
 {
        tdb1_off_t recovery_head;
        struct tdb1_record rec;
 
        /* find the recovery area */
        if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
-               return true;
+               return tdb->last_error;
        }
 
        if (recovery_head == 0) {
@@ -1304,9 +1327,9 @@ bool tdb1_needs_recovery(struct tdb1_context *tdb)
        }
 
        /* read the recovery record */
-       if (tdb->methods->tdb1_read(tdb, recovery_head, &rec,
+       if (tdb->tdb1.io->tdb1_read(tdb, recovery_head, &rec,
                                   sizeof(rec), TDB1_DOCONV()) == -1) {
-               return true;
+               return tdb->last_error;
        }
 
        return (rec.magic == TDB1_RECOVERY_MAGIC);