]> git.ozlabs.org Git - ccan/blobdiff - ccan/tdb2/transaction.c
tdb2: add TDB_RDONLY flag, allow setting/unsetting it.
[ccan] / ccan / tdb2 / transaction.c
index 09f932b8381fb5e81dd0581d9800d72d9670505d..a29acf96e2d417c8c3d79313f8ada959f7c5719c 100644 (file)
@@ -120,7 +120,7 @@ struct tdb_transaction {
 };
 
 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
-#define PAGESIZE 4096
+#define PAGESIZE 65536
 
 /*
   read while in a transaction. We need to check first if the data is in our list
@@ -348,15 +348,14 @@ static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
 static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len,
                                      bool probe)
 {
-       if (len <= tdb->file->map_size) {
+       if (len <= tdb->file->map_size || probe) {
                return TDB_SUCCESS;
        }
-       if (!probe) {
-               tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
-                          "tdb_oob len %lld beyond transaction size %lld",
-                          (long long)len,
-                          (long long)tdb->file->map_size);
-       }
+
+       tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
+                  "tdb_oob len %lld beyond transaction size %lld",
+                  (long long)len,
+                  (long long)tdb->file->map_size);
        return TDB_ERR_IO;
 }
 
@@ -387,15 +386,17 @@ static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
 
        /* Can only do direct if in single block and we've already copied. */
        if (write_mode) {
-               if (blk != end_blk)
-                       return NULL;
-               if (blk >= tdb->transaction->num_blocks)
-                       return NULL;
-               if (tdb->transaction->blocks[blk] == NULL)
+               tdb->stats.transaction_write_direct++;
+               if (blk != end_blk
+                   || blk >= tdb->transaction->num_blocks
+                   || tdb->transaction->blocks[blk] == NULL) {
+                       tdb->stats.transaction_write_direct_fail++;
                        return NULL;
+               }
                return tdb->transaction->blocks[blk] + off % PAGESIZE;
        }
 
+       tdb->stats.transaction_read_direct++;
        /* Single which we have copied? */
        if (blk == end_blk
            && blk < tdb->transaction->num_blocks
@@ -406,8 +407,10 @@ static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
        while (blk <= end_blk) {
                if (blk >= tdb->transaction->num_blocks)
                        break;
-               if (tdb->transaction->blocks[blk])
+               if (tdb->transaction->blocks[blk]) {
+                       tdb->stats.transaction_read_direct_fail++;
                        return NULL;
+               }
                blk++;
        }
        return tdb->transaction->io_methods->direct(tdb, off, len, false);
@@ -438,7 +441,7 @@ static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
        }
 #ifdef MS_SYNC
        if (tdb->file->map_ptr) {
-               tdb_off_t moffset = offset & ~(PAGESIZE-1);
+               tdb_off_t moffset = offset & ~(getpagesize()-1);
                if (msync(moffset + (char *)tdb->file->map_ptr,
                          length + (offset - moffset), MS_SYNC) != 0) {
                        return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
@@ -505,7 +508,7 @@ static void _tdb_transaction_cancel(struct tdb_context *tdb)
        tdb_transaction_unlock(tdb, F_WRLCK);
 
        if (tdb_has_open_lock(tdb))
-               tdb_unlock_open(tdb);
+               tdb_unlock_open(tdb, F_WRLCK);
 
        SAFE_FREE(tdb->transaction);
 }
@@ -518,14 +521,24 @@ enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
 {
        enum TDB_ERROR ecode;
 
+       tdb->stats.transactions++;
        /* some sanity checks */
-       if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
+       if (tdb->flags & TDB_INTERNAL) {
                return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
+                                                   TDB_LOG_USE_ERROR,
+                                                   "tdb_transaction_start:"
+                                                   " cannot start a"
+                                                   " transaction on an"
+                                                   " internal tdb");
+       }
+
+       if (tdb->flags & TDB_RDONLY) {
+               return tdb->last_error = tdb_logerr(tdb, TDB_ERR_RDONLY,
                                                    TDB_LOG_USE_ERROR,
                                                    "tdb_transaction_start:"
                                                    " cannot start a"
                                                    " transaction on a "
-                                                   "read-only or internal db");
+                                                   " read-only tdb");
        }
 
        /* cope with nested tdb_transaction_start() calls */
@@ -538,6 +551,7 @@ enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
                                             " already inside transaction");
                }
                tdb->transaction->nesting++;
+               tdb->stats.transaction_nest++;
                return 0;
        }
 
@@ -603,6 +617,7 @@ fail_allrecord_lock:
 */
 void tdb_transaction_cancel(struct tdb_context *tdb)
 {
+       tdb->stats.transaction_cancel++;
        _tdb_transaction_cancel(tdb);
 }
 
@@ -711,7 +726,7 @@ static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
        size_t i;
        enum TDB_ERROR ecode;
        unsigned char *p;
-       const struct tdb_methods *methods = tdb->transaction->io_methods;
+       const struct tdb_methods *old_methods = tdb->methods;
 
        rec = malloc(sizeof(*rec) + tdb_recovery_size(tdb));
        if (!rec) {
@@ -721,6 +736,10 @@ static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
                return TDB_ERR_PTR(TDB_ERR_OOM);
        }
 
+       /* We temporarily revert to the old I/O methods, so we can use
+        * tdb_access_read */
+       tdb->methods = tdb->transaction->io_methods;
+
        /* build the recovery data into a single blob to allow us to do a single
           large write, which should be more efficient */
        p = (unsigned char *)(rec + 1);
@@ -728,7 +747,7 @@ static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
                tdb_off_t offset;
                tdb_len_t length;
                unsigned int off;
-               unsigned char buffer[PAGESIZE];
+               const unsigned char *buffer;
 
                if (tdb->transaction->blocks[i] == NULL) {
                        continue;
@@ -745,21 +764,20 @@ static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
                }
 
                if (offset + length > tdb->file->map_size) {
-                       free(rec);
-                       tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
-                                  "tdb_transaction_setup_recovery:"
-                                  " transaction data over new region"
-                                  " boundary");
-                       return TDB_ERR_PTR(TDB_ERR_CORRUPT);
+                       ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
+                                          "tdb_transaction_setup_recovery:"
+                                          " transaction data over new region"
+                                          " boundary");
+                       goto fail;
                }
                if (offset + length > tdb->transaction->old_map_size) {
                        /* Short read at EOF. */
                        length = tdb->transaction->old_map_size - offset;
                }
-               ecode = methods->tread(tdb, offset, buffer, length);
-               if (ecode != TDB_SUCCESS) {
-                       free(rec);
-                       return TDB_ERR_PTR(ecode);
+               buffer = tdb_access_read(tdb, offset, length, false);
+               if (TDB_PTR_IS_ERR(buffer)) {
+                       ecode = TDB_PTR_ERR(buffer);
+                       goto fail;
                }
 
                /* Skip over anything the same at the start. */
@@ -784,10 +802,17 @@ static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
                        off += len + samelen;
                        offset += len + samelen;
                }
+               tdb_access_release(tdb, buffer);
        }
 
        *len = p - (unsigned char *)(rec + 1);
+       tdb->methods = old_methods;
        return rec;
+
+fail:
+       free(rec);
+       tdb->methods = old_methods;
+       return TDB_ERR_PTR(ecode);
 }
 
 static tdb_off_t create_recovery_area(struct tdb_context *tdb,
@@ -814,6 +839,7 @@ static tdb_off_t create_recovery_area(struct tdb_context *tdb,
        addition = (tdb->file->map_size - tdb->transaction->old_map_size) +
                sizeof(*rec) + rec->max_len;
        tdb->file->map_size = tdb->transaction->old_map_size;
+       tdb->stats.transaction_expand_file++;
        ecode = methods->expand_file(tdb, addition);
        if (ecode != TDB_SUCCESS) {
                return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
@@ -987,7 +1013,7 @@ static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
 
        /* get the open lock - this prevents new users attaching to the database
           during the commit */
-       ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
+       ecode = tdb_lock_open(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
        if (ecode != TDB_SUCCESS) {
                return ecode;
        }
@@ -1170,7 +1196,7 @@ enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb)
                return TDB_SUCCESS;
        }
 
-       if (tdb->read_only) {
+       if (tdb->flags & TDB_RDONLY) {
                return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
                                  "tdb_transaction_recover:"
                                  " attempt to recover read only database");