]> git.ozlabs.org Git - ccan/blobdiff - ccan/tdb/transaction.c
tdb2: fix tdb_summary reports
[ccan] / ccan / tdb / transaction.c
index 1086c5f037793d9df00269ff78b14ae2ad126e72..11194773647e255f1a7c8743cd69d5c85bfeda59 100644 (file)
@@ -76,7 +76,7 @@
     to reduce this to 3 or even 2 with some more work.
 
   - check for a valid recovery record on open of the tdb, while the
-    global lock is held. Automatically recover from the transaction
+    open lock is held. Automatically recover from the transaction
     recovery area if needed, then continue with the open as
     usual. This allows for smooth crash recovery with no administrator
     intervention.
     still available, but no transaction recovery area is used and no
     fsync/msync calls are made.
 
-  - if TDB_NO_NESTING is passed to flags in tdb open then transaction
-    nesting is disabled. tdb_transaction_start() will then implicitely
-    cancel any pending transactions and always start a new transaction
-    context instead of nesting.
-
+  - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
+    tdb_add_flags() transaction nesting is enabled.
+    It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
+    The default is that transaction nesting is allowed.
+    Note: this default may change in future versions of tdb.
+
+    Beware. when transactions are nested a transaction successfully
+    completed with tdb_transaction_commit() can be silently unrolled later.
+
+  - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
+    tdb_add_flags() transaction nesting is disabled.
+    It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
+    An attempt create a nested transaction will fail with TDB_ERR_NESTING.
+    The default is that transaction nesting is allowed.
+    Note: this default may change in future versions of tdb.
 */
 
 
@@ -127,6 +137,9 @@ struct tdb_transaction {
 
        /* old file size before transaction */
        tdb_len_t old_map_size;
+
+       /* we should re-pack on commit */
+       bool need_repack;
 };
 
 
@@ -139,14 +152,6 @@ static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
 {
        uint32_t blk;
 
-       /* Only a commit is allowed on a prepared transaction */
-       if (tdb->transaction->prepared) {
-               tdb->ecode = TDB_ERR_EINVAL;
-               TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: transaction already prepared, read not allowed\n"));
-               tdb->transaction->transaction_error = 1;
-               return -1;
-       }
-
        /* break it down into block sized ops */
        while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
                tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
@@ -382,7 +387,8 @@ static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
        if (len <= tdb->map_size) {
                return 0;
        }
-       return TDB_ERRCODE(TDB_ERR_IO, -1);
+       tdb->ecode = TDB_ERR_IO;
+       return -1;
 }
 
 /*
@@ -397,15 +403,8 @@ static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
                return -1;
        }
 
-       return 0;
-}
+       tdb->transaction->need_repack = true;
 
-/*
-  brlock during a transaction - ignore them
-*/
-static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
-                             int rw_type, int lck_type, int probe, size_t len)
-{
        return 0;
 }
 
@@ -415,7 +414,6 @@ static const struct tdb_methods transaction_methods = {
        transaction_next_hash_chain,
        transaction_oob,
        transaction_expand_file,
-       transaction_brlock
 };
 
 /*
@@ -447,7 +445,8 @@ static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t
        return 0;
 }
 
-int tdb_transaction_cancel_internal(struct tdb_context *tdb)
+
+static int _tdb_transaction_cancel(struct tdb_context *tdb)
 {
        int i, ret = 0;
 
@@ -474,38 +473,23 @@ int tdb_transaction_cancel_internal(struct tdb_context *tdb)
 
        if (tdb->transaction->magic_offset) {
                const struct tdb_methods *methods = tdb->transaction->io_methods;
-               uint32_t zero = 0;
+               uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
 
                /* remove the recovery marker */
-               if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
+               if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
                transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
                        ret = -1;
                }
        }
 
-       /* remove any global lock created during the transaction */
-       if (tdb->global_lock.count != 0) {
-               tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
-               tdb->global_lock.count = 0;
-       }
-
-       /* remove any locks created during the transaction */
-       if (tdb->num_locks != 0) {
-               for (i=0;i<tdb->num_lockrecs;i++) {
-                       tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
-                                  F_UNLCK,F_SETLKW, 0, 1);
-               }
-               tdb->num_locks = 0;
-               tdb->num_lockrecs = 0;
-               SAFE_FREE(tdb->lockrecs);
-       }
+       /* This also removes the OPEN_LOCK, if we have it. */
+       tdb_release_extra_locks(tdb);
 
        /* restore the normal io methods */
        tdb->methods = tdb->transaction->io_methods;
 
-       tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
-       tdb_transaction_unlock(tdb);
+       tdb_transaction_unlock(tdb, F_WRLCK);
        SAFE_FREE(tdb->transaction->hash_heads);
        SAFE_FREE(tdb->transaction);
        
@@ -527,19 +511,18 @@ int tdb_transaction_start(struct tdb_context *tdb)
 
        /* cope with nested tdb_transaction_start() calls */
        if (tdb->transaction != NULL) {
-               tdb_trace(tdb, "tdb_transaction_start");
-               if (!tdb->flags & TDB_NO_NESTING) {
-                       tdb->transaction->nesting++;
-                       TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
-                                tdb->transaction->nesting));
-                       return 0;
-               } else {
-                       tdb_transaction_cancel_internal(tdb);
-                       TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
+               if (!(tdb->flags & TDB_ALLOW_NESTING)) {
+                       tdb->ecode = TDB_ERR_NESTING;
+                       return -1;
                }
+               tdb_trace(tdb, "tdb_transaction_start");
+               tdb->transaction->nesting++;
+               TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
+                        tdb->transaction->nesting));
+               return 0;
        }
 
-       if (tdb->num_locks != 0 || tdb->global_lock.count) {
+       if (tdb_have_extra_locks(tdb)) {
                /* the caller must not have any locks when starting a
                   transaction as otherwise we'll be screwed by lack
                   of nested locks in posix */
@@ -578,10 +561,9 @@ int tdb_transaction_start(struct tdb_context *tdb)
        
        /* get a read lock from the freelist to the end of file. This
           is upgraded to a write lock during the commit */
-       if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
+       if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
-               tdb->ecode = TDB_ERR_LOCK;
-               goto fail;
+               goto fail_allrecord_lock;
        }
 
        /* setup a copy of the hash table heads so the hash scan in
@@ -614,8 +596,9 @@ int tdb_transaction_start(struct tdb_context *tdb)
        return 0;
        
 fail:
-       tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
-       tdb_transaction_unlock(tdb);
+       tdb_allrecord_unlock(tdb, F_RDLCK, false);
+fail_allrecord_lock:
+       tdb_transaction_unlock(tdb, F_WRLCK);
        SAFE_FREE(tdb->transaction->blocks);
        SAFE_FREE(tdb->transaction->hash_heads);
        SAFE_FREE(tdb->transaction);
@@ -627,9 +610,9 @@ fail:
   cancel the current transaction
 */
 int tdb_transaction_cancel(struct tdb_context *tdb)
-{      
+{
        tdb_trace(tdb, "tdb_transaction_cancel");
-       return tdb_transaction_cancel_internal(tdb);
+       return _tdb_transaction_cancel(tdb);
 }
 
 /*
@@ -668,7 +651,7 @@ static int tdb_recovery_allocate(struct tdb_context *tdb,
                                 tdb_off_t *recovery_offset,
                                 tdb_len_t *recovery_max_size)
 {
-       struct list_struct rec;
+       struct tdb_record rec;
        const struct tdb_methods *methods = tdb->transaction->io_methods;
        tdb_off_t recovery_head;
 
@@ -679,10 +662,16 @@ static int tdb_recovery_allocate(struct tdb_context *tdb,
 
        rec.rec_len = 0;
 
-       if (recovery_head != 0 && 
-           methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
-               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
-               return -1;
+       if (recovery_head != 0) {
+               if (methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
+                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
+                       return -1;
+               }
+               /* ignore invalid recovery regions: can happen in crash */
+               if (rec.magic != TDB_RECOVERY_MAGIC &&
+                   rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
+                       recovery_head = 0;
+               }
        }
 
        *recovery_size = tdb_recovery_size(tdb);
@@ -754,7 +743,7 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
        tdb_len_t recovery_size;
        unsigned char *data, *p;
        const struct tdb_methods *methods = tdb->transaction->io_methods;
-       struct list_struct *rec;
+       struct tdb_record *rec;
        tdb_off_t recovery_offset, recovery_max_size;
        tdb_off_t old_map_size = tdb->transaction->old_map_size;
        uint32_t magic, tailer;
@@ -774,10 +763,10 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
                return -1;
        }
 
-       rec = (struct list_struct *)data;
+       rec = (struct tdb_record *)data;
        memset(rec, 0, sizeof(*rec));
 
-       rec->magic    = 0;
+       rec->magic    = TDB_RECOVERY_INVALID_MAGIC;
        rec->data_len = recovery_size;
        rec->rec_len  = recovery_max_size;
        rec->key_len  = old_map_size;
@@ -857,7 +846,7 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
        magic = TDB_RECOVERY_MAGIC;
        CONVERT(magic);
 
-       *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
+       *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
 
        if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
@@ -878,7 +867,7 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
        return 0;
 }
 
-static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
+static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
 {      
        const struct tdb_methods *methods;
 
@@ -889,14 +878,14 @@ static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
 
        if (tdb->transaction->prepared) {
                tdb->ecode = TDB_ERR_EINVAL;
-               tdb_transaction_cancel(tdb);
+               _tdb_transaction_cancel(tdb);
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
                return -1;
        }
 
        if (tdb->transaction->transaction_error) {
                tdb->ecode = TDB_ERR_IO;
-               tdb_transaction_cancel_internal(tdb);
+               _tdb_transaction_cancel(tdb);
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
                return -1;
        }
@@ -907,11 +896,6 @@ static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
                return 0;
        }               
 
-#ifdef TDB_TRACE
-       /* store seqnum now, before reading becomes illegal. */
-       tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &tdb->transaction_prepare_seqnum);
-#endif
-
        /* check for a null transaction */
        if (tdb->transaction->blocks == NULL) {
                return 0;
@@ -921,27 +905,25 @@ static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
        
        /* if there are any locks pending then the caller has not
           nested their locks properly, so fail the transaction */
-       if (tdb->num_locks || tdb->global_lock.count) {
+       if (tdb_have_extra_locks(tdb)) {
                tdb->ecode = TDB_ERR_LOCK;
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
-               tdb_transaction_cancel_internal(tdb);
+               _tdb_transaction_cancel(tdb);
                return -1;
        }
 
        /* upgrade the main transaction lock region to a write lock */
-       if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
+       if (tdb_allrecord_upgrade(tdb) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
-               tdb->ecode = TDB_ERR_LOCK;
-               tdb_transaction_cancel_internal(tdb);
+               _tdb_transaction_cancel(tdb);
                return -1;
        }
 
-       /* get the global lock - this prevents new users attaching to the database
+       /* get the open lock - this prevents new users attaching to the database
           during the commit */
-       if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
-               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
-               tdb->ecode = TDB_ERR_LOCK;
-               tdb_transaction_cancel_internal(tdb);
+       if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
+               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
+               _tdb_transaction_cancel(tdb);
                return -1;
        }
 
@@ -949,8 +931,7 @@ static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
                /* write the recovery data to the end of the file */
                if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
-                       tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
-                       tdb_transaction_cancel_internal(tdb);
+                       _tdb_transaction_cancel(tdb);
                        return -1;
                }
        }
@@ -964,15 +945,14 @@ static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
                                             tdb->transaction->old_map_size) == -1) {
                        tdb->ecode = TDB_ERR_IO;
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
-                       tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
-                       tdb_transaction_cancel_internal(tdb);
+                       _tdb_transaction_cancel(tdb);
                        return -1;
                }
                tdb->map_size = tdb->transaction->old_map_size;
                methods->tdb_oob(tdb, tdb->map_size + 1, 1);
        }
 
-       /* Keep the global lock until the actual commit */
+       /* Keep the open lock until the actual commit */
 
        return 0;
 }
@@ -983,7 +963,7 @@ static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
 {      
        tdb_trace(tdb, "tdb_transaction_prepare_commit");
-       return tdb_transaction_prepare_commit_internal(tdb);
+       return _tdb_transaction_prepare_commit(tdb);
 }
 
 /*
@@ -993,19 +973,14 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 {      
        const struct tdb_methods *methods;
        int i;
+       bool need_repack;
 
        if (tdb->transaction == NULL) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
                return -1;
        }
 
-       /* If we've prepared, can't read seqnum. */
-       if (tdb->transaction->prepared) {
-               tdb_trace_seqnum(tdb, tdb->transaction_prepare_seqnum,
-                                "tdb_transaction_commit");
-       } else {
-               tdb_trace(tdb, "tdb_transaction_commit");
-       }
+       tdb_trace(tdb, "tdb_transaction_commit");
 
        if (tdb->transaction->transaction_error) {
                tdb->ecode = TDB_ERR_IO;
@@ -1022,12 +997,12 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 
        /* check for a null transaction */
        if (tdb->transaction->blocks == NULL) {
-               tdb_transaction_cancel_internal(tdb);
+               _tdb_transaction_cancel(tdb);
                return 0;
        }
 
        if (!tdb->transaction->prepared) {
-               int ret = tdb_transaction_prepare_commit_internal(tdb);
+               int ret = _tdb_transaction_prepare_commit(tdb);
                if (ret)
                        return ret;
        }
@@ -1058,8 +1033,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
                        tdb->methods = methods;
                        tdb_transaction_recover(tdb); 
 
-                       tdb_transaction_cancel_internal(tdb);
-                       tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+                       _tdb_transaction_cancel(tdb);
 
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
                        return -1;
@@ -1075,8 +1049,6 @@ int tdb_transaction_commit(struct tdb_context *tdb)
                return -1;
        }
 
-       tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
-
        /*
          TODO: maybe write to some dummy hdr field, or write to magic
          offset without mmap, before the last sync, instead of the
@@ -1092,9 +1064,15 @@ int tdb_transaction_commit(struct tdb_context *tdb)
        utime(tdb->name, NULL);
 #endif
 
+       need_repack = tdb->transaction->need_repack;
+
        /* use a transaction cancel to free memory and remove the
           transaction locks */
-       tdb_transaction_cancel_internal(tdb);
+       _tdb_transaction_cancel(tdb);
+
+       if (need_repack) {
+               return tdb_repack(tdb);
+       }
 
        return 0;
 }
@@ -1102,7 +1080,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 
 /*
   recover from an aborted transaction. Must be called with exclusive
-  database write access already established (including the global
+  database write access already established (including the open
   lock to prevent new processes attaching)
 */
 int tdb_transaction_recover(struct tdb_context *tdb)
@@ -1110,7 +1088,7 @@ int tdb_transaction_recover(struct tdb_context *tdb)
        tdb_off_t recovery_head, recovery_eof;
        unsigned char *data, *p;
        uint32_t zero = 0;
-       struct list_struct rec;
+       struct tdb_record rec;
 
        /* find the recovery area */
        if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
@@ -1197,23 +1175,13 @@ int tdb_transaction_recover(struct tdb_context *tdb)
        }
 
        /* remove the recovery magic */
-       if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
+       if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
                          &zero) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
                tdb->ecode = TDB_ERR_IO;
                return -1;                      
        }
        
-       /* reduce the file size to the old size */
-       tdb_munmap(tdb);
-       if (ftruncate(tdb->fd, recovery_eof) != 0) {
-               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
-               tdb->ecode = TDB_ERR_IO;
-               return -1;                      
-       }
-       tdb->map_size = recovery_eof;
-       tdb_mmap(tdb);
-
        if (transaction_sync(tdb, 0, recovery_eof) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
                tdb->ecode = TDB_ERR_IO;
@@ -1226,3 +1194,28 @@ int tdb_transaction_recover(struct tdb_context *tdb)
        /* all done */
        return 0;
 }
+
+/* Any I/O failures we say "needs recovery". */
+bool tdb_needs_recovery(struct tdb_context *tdb)
+{
+       tdb_off_t recovery_head;
+       struct tdb_record rec;
+
+       /* find the recovery area */
+       if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
+               return true;
+       }
+
+       if (recovery_head == 0) {
+               /* we have never allocated a recovery record */
+               return false;
+       }
+
+       /* read the recovery record */
+       if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
+                                  sizeof(rec), DOCONV()) == -1) {
+               return true;
+       }
+
+       return (rec.magic == TDB_RECOVERY_MAGIC);
+}