]> git.ozlabs.org Git - ccan/blobdiff - ccan/tdb2/transaction.c
tdb2: more stats
[ccan] / ccan / tdb2 / transaction.c
index 09f932b8381fb5e81dd0581d9800d72d9670505d..9205828a8a16138d754e0013b68d9b91396b1894 100644 (file)
@@ -120,7 +120,7 @@ struct tdb_transaction {
 };
 
 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
-#define PAGESIZE 4096
+#define PAGESIZE 65536
 
 /*
   read while in a transaction. We need to check first if the data is in our list
@@ -387,15 +387,17 @@ static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
 
        /* Can only do direct if in single block and we've already copied. */
        if (write_mode) {
-               if (blk != end_blk)
-                       return NULL;
-               if (blk >= tdb->transaction->num_blocks)
-                       return NULL;
-               if (tdb->transaction->blocks[blk] == NULL)
+               tdb->stats.transaction_write_direct++;
+               if (blk != end_blk
+                   || blk >= tdb->transaction->num_blocks
+                   || tdb->transaction->blocks[blk] == NULL) {
+                       tdb->stats.transaction_write_direct_fail++;
                        return NULL;
+               }
                return tdb->transaction->blocks[blk] + off % PAGESIZE;
        }
 
+       tdb->stats.transaction_read_direct++;
        /* Single which we have copied? */
        if (blk == end_blk
            && blk < tdb->transaction->num_blocks
@@ -406,8 +408,10 @@ static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
        while (blk <= end_blk) {
                if (blk >= tdb->transaction->num_blocks)
                        break;
-               if (tdb->transaction->blocks[blk])
+               if (tdb->transaction->blocks[blk]) {
+                       tdb->stats.transaction_read_direct_fail++;
                        return NULL;
+               }
                blk++;
        }
        return tdb->transaction->io_methods->direct(tdb, off, len, false);
@@ -438,7 +442,7 @@ static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
        }
 #ifdef MS_SYNC
        if (tdb->file->map_ptr) {
-               tdb_off_t moffset = offset & ~(PAGESIZE-1);
+               tdb_off_t moffset = offset & ~(getpagesize()-1);
                if (msync(moffset + (char *)tdb->file->map_ptr,
                          length + (offset - moffset), MS_SYNC) != 0) {
                        return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
@@ -518,6 +522,7 @@ enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
 {
        enum TDB_ERROR ecode;
 
+       tdb->stats.transactions++;
        /* some sanity checks */
        if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
                return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
@@ -538,6 +543,7 @@ enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
                                             " already inside transaction");
                }
                tdb->transaction->nesting++;
+               tdb->stats.transaction_nest++;
                return 0;
        }
 
@@ -603,6 +609,7 @@ fail_allrecord_lock:
 */
 void tdb_transaction_cancel(struct tdb_context *tdb)
 {
+       tdb->stats.transaction_cancel++;
        _tdb_transaction_cancel(tdb);
 }
 
@@ -711,7 +718,7 @@ static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
        size_t i;
        enum TDB_ERROR ecode;
        unsigned char *p;
-       const struct tdb_methods *methods = tdb->transaction->io_methods;
+       const struct tdb_methods *old_methods = tdb->methods;
 
        rec = malloc(sizeof(*rec) + tdb_recovery_size(tdb));
        if (!rec) {
@@ -721,6 +728,10 @@ static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
                return TDB_ERR_PTR(TDB_ERR_OOM);
        }
 
+       /* We temporarily revert to the old I/O methods, so we can use
+        * tdb_access_read */
+       tdb->methods = tdb->transaction->io_methods;
+
        /* build the recovery data into a single blob to allow us to do a single
           large write, which should be more efficient */
        p = (unsigned char *)(rec + 1);
@@ -728,7 +739,7 @@ static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
                tdb_off_t offset;
                tdb_len_t length;
                unsigned int off;
-               unsigned char buffer[PAGESIZE];
+               const unsigned char *buffer;
 
                if (tdb->transaction->blocks[i] == NULL) {
                        continue;
@@ -745,21 +756,20 @@ static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
                }
 
                if (offset + length > tdb->file->map_size) {
-                       free(rec);
-                       tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
-                                  "tdb_transaction_setup_recovery:"
-                                  " transaction data over new region"
-                                  " boundary");
-                       return TDB_ERR_PTR(TDB_ERR_CORRUPT);
+                       ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
+                                          "tdb_transaction_setup_recovery:"
+                                          " transaction data over new region"
+                                          " boundary");
+                       goto fail;
                }
                if (offset + length > tdb->transaction->old_map_size) {
                        /* Short read at EOF. */
                        length = tdb->transaction->old_map_size - offset;
                }
-               ecode = methods->tread(tdb, offset, buffer, length);
-               if (ecode != TDB_SUCCESS) {
-                       free(rec);
-                       return TDB_ERR_PTR(ecode);
+               buffer = tdb_access_read(tdb, offset, length, false);
+               if (TDB_PTR_IS_ERR(buffer)) {
+                       ecode = TDB_PTR_ERR(buffer);
+                       goto fail;
                }
 
                /* Skip over anything the same at the start. */
@@ -784,10 +794,17 @@ static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
                        off += len + samelen;
                        offset += len + samelen;
                }
+               tdb_access_release(tdb, buffer);
        }
 
        *len = p - (unsigned char *)(rec + 1);
+       tdb->methods = old_methods;
        return rec;
+
+fail:
+       free(rec);
+       tdb->methods = old_methods;
+       return TDB_ERR_PTR(ecode);
 }
 
 static tdb_off_t create_recovery_area(struct tdb_context *tdb,
@@ -814,6 +831,7 @@ static tdb_off_t create_recovery_area(struct tdb_context *tdb,
        addition = (tdb->file->map_size - tdb->transaction->old_map_size) +
                sizeof(*rec) + rec->max_len;
        tdb->file->map_size = tdb->transaction->old_map_size;
+       tdb->stats.transaction_expand_file++;
        ecode = methods->expand_file(tdb, addition);
        if (ecode != TDB_SUCCESS) {
                return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,