]> git.ozlabs.org Git - ccan/blobdiff - ccan/tdb2/lock.c
tdb2: add missing prototype, move tdb_firstkey/tdb_nextkey to traverse.c
[ccan] / ccan / tdb2 / lock.c
index 14a7b5e5c65904749bd78647bb7558b12489fb15..15c97e3b98343b0a02ee910bdac0ec93664e3527 100644 (file)
@@ -26,6 +26,8 @@
 */
 
 #include "private.h"
+#include <assert.h>
+#include <ccan/build_assert/build_assert.h>
 
 static int fcntl_lock(struct tdb_context *tdb,
                      int rw, off_t off, off_t len, bool waitflag)
@@ -255,14 +257,14 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype,
 {
        struct tdb_lock_type *new_lck;
 
-       if (offset >= TDB_HASH_LOCK_START + (1ULL << tdb->header.v.hash_bits)
-           + (tdb->header.v.num_zones * (tdb->header.v.free_buckets+1))) {
+       if (offset >= TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE + tdb->map_size / 8) {
                tdb->ecode = TDB_ERR_LOCK;
                tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                        "tdb_lock: invalid offset %llu for ltype=%d\n",
+                        "tdb_lock: invalid offset %llu ltype=%d\n",
                         (long long)offset, ltype);
                return -1;
        }
+
        if (tdb->flags & TDB_NOLOCK)
                return 0;
 
@@ -372,11 +374,6 @@ static int tdb_nest_unlock(struct tdb_context *tdb, tdb_off_t off, int ltype)
         */
        *lck = tdb->lockrecs[--tdb->num_lockrecs];
 
-       if (tdb->num_lockrecs == 0) {
-               /* If we're not holding any locks, header can change. */
-               tdb->header_uptodate = false;
-       }
-
        return ret;
 }
 
@@ -408,8 +405,10 @@ static int tdb_lock_gradual(struct tdb_context *tdb,
        int ret;
        enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
 
-       if (len <= 4) {
-               /* Single record.  Just do blocking lock. */
+       if (len <= 1) {
+               /* 0 would mean to end-of-file... */
+               assert(len != 0);
+               /* Single hash.  Just do blocking lock. */
                return tdb_brlock(tdb, ltype, off, len, flags);
        }
 
@@ -435,14 +434,11 @@ static int tdb_lock_gradual(struct tdb_context *tdb,
 
 /* lock/unlock entire database.  It can only be upgradable if you have some
  * other way of guaranteeing exclusivity (ie. transaction write lock).
- * Note that we don't lock the free chains: noone can get those locks
- * without a hash chain lock first.
- * The header *will be* up to date once this returns success. */
+ * Note that we don't lock the free chains: currently noone can get those locks
+ * without a hash chain lock first. */
 int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
                       enum tdb_lock_flags flags, bool upgradable)
 {
-       tdb_off_t hash_size;
-
        /* FIXME: There are no locks on read-only dbs */
        if (tdb->read_only) {
                tdb->ecode = TDB_ERR_LOCK;
@@ -482,11 +478,9 @@ int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
                return -1;
        }
 
-       /* Lock all the hash buckets. */
 again:
-       hash_size = (1ULL << tdb->header.v.hash_bits);
        if (tdb_lock_gradual(tdb, ltype, flags, TDB_HASH_LOCK_START,
-                            hash_size)) {
+                            TDB_HASH_LOCK_RANGE)) {
                if (!(flags & TDB_LOCK_PROBE)) {
                        tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
                                 "tdb_lockall hashes failed (%s)\n",
@@ -501,12 +495,6 @@ again:
        tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
        tdb->allrecord_lock.off = upgradable;
 
-       /* Now we re-check header, holding lock. */
-       if (unlikely(header_changed(tdb))) {
-               tdb_allrecord_unlock(tdb, ltype);
-               goto again;
-       }
-
        /* Now check for needing recovery. */
        if (unlikely(tdb_needs_recovery(tdb))) {
                tdb_allrecord_unlock(tdb, ltype);
@@ -529,11 +517,19 @@ void tdb_unlock_open(struct tdb_context *tdb)
        tdb_nest_unlock(tdb, TDB_OPEN_LOCK, F_WRLCK);
 }
 
+int tdb_lock_expand(struct tdb_context *tdb, int ltype)
+{
+       return tdb_nest_lock(tdb, TDB_EXPANSION_LOCK, ltype, TDB_LOCK_WAIT);
+}
+
+void tdb_unlock_expand(struct tdb_context *tdb, int ltype)
+{
+       tdb_nest_unlock(tdb, TDB_EXPANSION_LOCK, ltype);
+}
+
 /* unlock entire db */
 int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 {
-       tdb_off_t hash_size;
-
        /* FIXME: There are no locks on read-only dbs */
        if (tdb->read_only) {
                tdb->ecode = TDB_ERR_LOCK;
@@ -567,11 +563,15 @@ int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 
        tdb->allrecord_lock.count = 0;
        tdb->allrecord_lock.ltype = 0;
-       tdb->header_uptodate = false;
 
-       hash_size = (1ULL << tdb->header.v.hash_bits);
+       return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START,
+                           TDB_HASH_LOCK_RANGE);
+}
 
-       return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, hash_size);
+bool tdb_has_expansion_lock(struct tdb_context *tdb)
+{
+       return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL
+               || (tdb->flags & TDB_NOLOCK);
 }
 
 bool tdb_has_locks(struct tdb_context *tdb)
@@ -625,18 +625,19 @@ int tdb_unlockall_read(struct tdb_context *tdb)
 }
 #endif
 
-/* Returns the list we actually locked. */
-tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
-                       int ltype, enum tdb_lock_flags waitflag)
+int tdb_lock_hashes(struct tdb_context *tdb,
+                   tdb_off_t hash_lock,
+                   tdb_len_t hash_range,
+                   int ltype, enum tdb_lock_flags waitflag)
 {
-       tdb_off_t list = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
-       /* Header can change ONLY if we had no locks before. */
-       bool can_change = tdb->num_lockrecs == 0;
+       /* FIXME: Do this properly, using hlock_range */
+       unsigned lock = TDB_HASH_LOCK_START
+               + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
 
        /* a allrecord lock allows us to avoid per chain locks */
        if (tdb->allrecord_lock.count &&
            (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
-               return list;
+               return 0;
        }
 
        if (tdb->allrecord_lock.count) {
@@ -645,27 +646,18 @@ tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
                         "tdb_lock_list: have %s allrecordlock\n",
                         tdb->allrecord_lock.ltype == F_RDLCK
                         ? "read" : "write");
-               return TDB_OFF_ERR;
+               return -1;
        }
 
-again:
-       if (tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag))
-               return TDB_OFF_ERR;
-
-       if (can_change && unlikely(header_changed(tdb))) {
-               tdb_off_t new = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
-               if (new != list) {
-                       tdb_nest_unlock(tdb, TDB_HASH_LOCK_START+list, ltype);
-                       list = new;
-                       goto again;
-               }
-       }
-       return list;
+       return tdb_nest_lock(tdb, lock, ltype, waitflag);
 }
 
-int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
+int tdb_unlock_hashes(struct tdb_context *tdb,
+                     tdb_off_t hash_lock,
+                     tdb_len_t hash_range, int ltype)
 {
-       list &= ((1ULL << tdb->header.v.hash_bits) - 1);
+       unsigned lock = TDB_HASH_LOCK_START
+               + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
 
        /* a allrecord lock allows us to avoid per chain locks */
        if (tdb->allrecord_lock.count) {
@@ -677,15 +669,26 @@ int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
                        return -1;
                }
                return 0;
-       } else {
-               return tdb_nest_unlock(tdb, TDB_HASH_LOCK_START + list, ltype);
        }
+
+       return tdb_nest_unlock(tdb, lock, ltype);
 }
 
-/* Free list locks come after hash locks */
-int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
-                      enum tdb_lock_flags waitflag)
+/* Hash locks use TDB_HASH_LOCK_START + the next 30 bits.
+ * Then we begin; bucket offsets are sizeof(tdb_len_t) apart, so we divide.
+ * The result is that on 32 bit systems we don't use lock values > 2^31 on
+ * files that are less than 4GB.
+ */
+static tdb_off_t free_lock_off(tdb_off_t b_off)
 {
+       return TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE + b_off / sizeof(tdb_off_t);
+}
+
+int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
+                        enum tdb_lock_flags waitflag)
+{
+       assert(b_off >= sizeof(struct tdb_header));
+
        /* You're supposed to have a hash lock first! */
        if (!(tdb->flags & TDB_NOLOCK) && !tdb_has_locks(tdb)) {
                tdb->ecode = TDB_ERR_LOCK;
@@ -704,19 +707,15 @@ int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
                return -1;
        }
 
-       return tdb_nest_lock(tdb, TDB_HASH_LOCK_START
-                            + (1ULL << tdb->header.v.hash_bits)
-                            + flist, F_WRLCK, waitflag);
+       return tdb_nest_lock(tdb, free_lock_off(b_off), F_WRLCK, waitflag);
 }
 
-void tdb_unlock_free_list(struct tdb_context *tdb, tdb_off_t flist)
+void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off)
 {
        if (tdb->allrecord_lock.count)
                return;
 
-       tdb_nest_unlock(tdb, TDB_HASH_LOCK_START
-                       + (1ULL << tdb->header.v.hash_bits)
-                       + flist, F_WRLCK);
+       tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
 }
 
 /* Even if the entry isn't in this hash bucket, you'd have to lock this
@@ -728,7 +727,7 @@ static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
        int ret;
        uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
 
-       ret = tdb_lock_list(tdb, h, ltype, waitflag) == TDB_OFF_ERR ? -1 : 0;
+       ret = tdb_lock_hashes(tdb, h, 1, ltype, waitflag);
        tdb_trace_1rec(tdb, func, *key);
        return ret;
 }
@@ -744,7 +743,7 @@ int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
 {
        uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
        tdb_trace_1rec(tdb, "tdb_chainunlock", key);
-       return tdb_unlock_list(tdb, h, F_WRLCK);
+       return tdb_unlock_hashes(tdb, h, 1, F_WRLCK);
 }
 
 #if 0