tdb2: fix tdb_chainlock
[ccan] / ccan / tdb2 / lock.c
index 2b00584580810ad9aca0ae24dd6f880bd58d0796..10d1e18a512b6d0f0964e8eda80fb33b5914dd54 100644 (file)
@@ -26,6 +26,8 @@
 */
 
 #include "private.h"
+#include <assert.h>
+#include <ccan/build_assert/build_assert.h>
 
 static int fcntl_lock(struct tdb_context *tdb,
                      int rw, off_t off, off_t len, bool waitflag)
@@ -255,14 +257,14 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype,
 {
        struct tdb_lock_type *new_lck;
 
-       if (offset >= TDB_HASH_LOCK_START + (1ULL << tdb->header.v.hash_bits)
-           + (tdb->header.v.num_zones * (tdb->header.v.free_buckets+1))) {
+       if (offset >= TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE + tdb->map_size / 8) {
                tdb->ecode = TDB_ERR_LOCK;
                tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                        "tdb_lock: invalid offset %llu for ltype=%d\n",
+                        "tdb_nest_lock: invalid offset %llu ltype=%d\n",
                         (long long)offset, ltype);
                return -1;
        }
+
        if (tdb->flags & TDB_NOLOCK)
                return 0;
 
@@ -276,13 +278,22 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype,
                return 0;
        }
 
+       if (tdb->num_lockrecs
+           && offset >= TDB_HASH_LOCK_START
+           && offset < TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE) {
+               tdb->ecode = TDB_ERR_LOCK;
+               tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
+                        "tdb_nest_lock: already have a hash lock?\n");
+               return -1;
+       }
+
        new_lck = (struct tdb_lock_type *)realloc(
                tdb->lockrecs,
                sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
        if (new_lck == NULL) {
                tdb->ecode = TDB_ERR_OOM;
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                        "tdb_lock: unable to allocate %llu lock structure",
+                        "tdb_nest_lock: unable to allocate %llu lock struct",
                         (long long)(tdb->num_lockrecs + 1));
                errno = ENOMEM;
                return -1;
@@ -349,7 +360,7 @@ static int tdb_nest_unlock(struct tdb_context *tdb, tdb_off_t off, int ltype)
        if ((lck == NULL) || (lck->count == 0)) {
                tdb->ecode = TDB_ERR_LOCK;
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                        "tdb_unlock: no lock for %llu\n", (long long)off);
+                        "tdb_nest_unlock: no lock for %llu\n", (long long)off);
                return -1;
        }
 
@@ -372,11 +383,6 @@ static int tdb_nest_unlock(struct tdb_context *tdb, tdb_off_t off, int ltype)
         */
        *lck = tdb->lockrecs[--tdb->num_lockrecs];
 
-       if (tdb->num_lockrecs == 0) {
-               /* If we're not holding any locks, header can change. */
-               tdb->header_uptodate = false;
-       }
-
        return ret;
 }
 
@@ -408,8 +414,10 @@ static int tdb_lock_gradual(struct tdb_context *tdb,
        int ret;
        enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
 
-       if (len <= 4) {
-               /* Single record.  Just do blocking lock. */
+       if (len <= 1) {
+               /* 0 would mean to end-of-file... */
+               assert(len != 0);
+               /* Single hash.  Just do blocking lock. */
                return tdb_brlock(tdb, ltype, off, len, flags);
        }
 
@@ -434,15 +442,10 @@ static int tdb_lock_gradual(struct tdb_context *tdb,
 }
 
 /* lock/unlock entire database.  It can only be upgradable if you have some
- * other way of guaranteeing exclusivity (ie. transaction write lock).
- * Note that we don't lock the free chains: noone can get those locks
- * without a hash chain lock first.
- * The header *will be* up to date once this returns success. */
+ * other way of guaranteeing exclusivity (ie. transaction write lock). */
 int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
                       enum tdb_lock_flags flags, bool upgradable)
 {
-       tdb_off_t hash_size;
-
        /* FIXME: There are no locks on read-only dbs */
        if (tdb->read_only) {
                tdb->ecode = TDB_ERR_LOCK;
@@ -482,31 +485,37 @@ int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
                return -1;
        }
 
-       /* Lock all the hash buckets. */
 again:
-       hash_size = (1ULL << tdb->header.v.hash_bits);
+       /* Lock hashes, gradually. */
        if (tdb_lock_gradual(tdb, ltype, flags, TDB_HASH_LOCK_START,
-                            hash_size)) {
+                            TDB_HASH_LOCK_RANGE)) {
                if (!(flags & TDB_LOCK_PROBE)) {
                        tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                                "tdb_lockall hashes failed (%s)\n",
+                                "tdb_allrecord_lock hashes failed (%s)\n",
                                 strerror(errno));
                }
                return -1;
        }
 
+       /* Lock free lists: there to end of file. */
+       if (tdb_brlock(tdb, ltype, TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE,
+                      0, flags)) {
+               if (!(flags & TDB_LOCK_PROBE)) {
+                       tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                                "tdb_allrecord_lock freelist failed (%s)\n",
+                                strerror(errno));
+               }
+               tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, 
+                            TDB_HASH_LOCK_RANGE);
+               return -1;
+       }
+
        tdb->allrecord_lock.count = 1;
        /* If it's upgradable, it's actually exclusive so we can treat
         * it as a write lock. */
        tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
        tdb->allrecord_lock.off = upgradable;
 
-       /* Now we re-check header, holding lock. */
-       if (unlikely(header_changed(tdb))) {
-               tdb_allrecord_unlock(tdb, ltype);
-               goto again;
-       }
-
        /* Now check for needing recovery. */
        if (unlikely(tdb_needs_recovery(tdb))) {
                tdb_allrecord_unlock(tdb, ltype);
@@ -529,11 +538,19 @@ void tdb_unlock_open(struct tdb_context *tdb)
        tdb_nest_unlock(tdb, TDB_OPEN_LOCK, F_WRLCK);
 }
 
+int tdb_lock_expand(struct tdb_context *tdb, int ltype)
+{
+       return tdb_nest_lock(tdb, TDB_EXPANSION_LOCK, ltype, TDB_LOCK_WAIT);
+}
+
+void tdb_unlock_expand(struct tdb_context *tdb, int ltype)
+{
+       tdb_nest_unlock(tdb, TDB_EXPANSION_LOCK, ltype);
+}
+
 /* unlock entire db */
 int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 {
-       tdb_off_t hash_size;
-
        /* FIXME: There are no locks on read-only dbs */
        if (tdb->read_only) {
                tdb->ecode = TDB_ERR_LOCK;
@@ -567,11 +584,13 @@ int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 
        tdb->allrecord_lock.count = 0;
        tdb->allrecord_lock.ltype = 0;
-       tdb->header_uptodate = false;
 
-       hash_size = (1ULL << tdb->header.v.hash_bits);
+       return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, 0);
+}
 
-       return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, hash_size);
+bool tdb_has_expansion_lock(struct tdb_context *tdb)
+{
+       return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL;
 }
 
 bool tdb_has_locks(struct tdb_context *tdb)
@@ -625,47 +644,65 @@ int tdb_unlockall_read(struct tdb_context *tdb)
 }
 #endif
 
-/* Returns the list we actually locked. */
-tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
-                       int ltype, enum tdb_lock_flags waitflag)
+static bool tdb_has_free_lock(struct tdb_context *tdb)
+{
+       unsigned int i;
+
+       for (i=0; i<tdb->num_lockrecs; i++) {
+               if (tdb->lockrecs[i].off
+                   > TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE)
+                       return true;
+       }
+       return false;
+}
+
+int tdb_lock_hashes(struct tdb_context *tdb,
+                   tdb_off_t hash_lock,
+                   tdb_len_t hash_range,
+                   int ltype, enum tdb_lock_flags waitflag)
 {
-       tdb_off_t list = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
-       /* Header can change ONLY if we had no locks before. */
-       bool can_change = tdb->num_lockrecs == 0;
+       /* FIXME: Do this properly, using hlock_range */
+       unsigned lock = TDB_HASH_LOCK_START
+               + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
 
        /* a allrecord lock allows us to avoid per chain locks */
        if (tdb->allrecord_lock.count &&
            (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
-               return list;
+               return 0;
        }
 
        if (tdb->allrecord_lock.count) {
                tdb->ecode = TDB_ERR_LOCK;
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                        "tdb_lock_list: have %s allrecordlock\n",
+                        "tdb_lock_hashes: have %s allrecordlock\n",
                         tdb->allrecord_lock.ltype == F_RDLCK
                         ? "read" : "write");
-               return TDB_OFF_ERR;
+               return -1;
        }
 
-again:
-       if (tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag))
-               return TDB_OFF_ERR;
-
-       if (can_change && unlikely(header_changed(tdb))) {
-               tdb_off_t new = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
-               if (new != list) {
-                       tdb_nest_unlock(tdb, TDB_HASH_LOCK_START+list, ltype);
-                       list = new;
-                       goto again;
-               }
+       if (tdb_has_free_lock(tdb)) {
+               tdb->ecode = TDB_ERR_LOCK;
+               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                        "tdb_lock_hashes: have free lock already\n");
+               return -1;
+       }
+
+       if (tdb_has_expansion_lock(tdb)) {
+               tdb->ecode = TDB_ERR_LOCK;
+               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                        "tdb_lock_hashes: have expansion lock already\n");
+               return -1;
        }
-       return list;
+
+       return tdb_nest_lock(tdb, lock, ltype, waitflag);
 }
 
-int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
+int tdb_unlock_hashes(struct tdb_context *tdb,
+                     tdb_off_t hash_lock,
+                     tdb_len_t hash_range, int ltype)
 {
-       list &= ((1ULL << tdb->header.v.hash_bits) - 1);
+       unsigned lock = TDB_HASH_LOCK_START
+               + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
 
        /* a allrecord lock allows us to avoid per chain locks */
        if (tdb->allrecord_lock.count) {
@@ -673,26 +710,30 @@ int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
                    && ltype == F_WRLCK) {
                        tdb->ecode = TDB_ERR_LOCK;
                        tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                                "tdb_unlock_list RO allrecord!\n");
+                                "tdb_unlock_hashes RO allrecord!\n");
                        return -1;
                }
                return 0;
-       } else {
-               return tdb_nest_unlock(tdb, TDB_HASH_LOCK_START + list, ltype);
        }
+
+       return tdb_nest_unlock(tdb, lock, ltype);
 }
 
-/* Free list locks come after hash locks */
-int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
-                      enum tdb_lock_flags waitflag)
+/* Hash locks use TDB_HASH_LOCK_START + the next 30 bits.
+ * Then we begin; bucket offsets are sizeof(tdb_len_t) apart, so we divide.
+ * The result is that on 32 bit systems we don't use lock values > 2^31 on
+ * files that are less than 4GB.
+ */
+static tdb_off_t free_lock_off(tdb_off_t b_off)
 {
-       /* You're supposed to have a hash lock first! */
-       if (!(tdb->flags & TDB_NOLOCK) && !tdb_has_locks(tdb)) {
-               tdb->ecode = TDB_ERR_LOCK;
-               tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                        "tdb_lock_free_list without lock!\n");
-               return -1;
-       }
+       return TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE
+               + b_off / sizeof(tdb_off_t);
+}
+
+int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
+                        enum tdb_lock_flags waitflag)
+{
+       assert(b_off >= sizeof(struct tdb_header));
 
        /* a allrecord lock allows us to avoid per chain locks */
        if (tdb->allrecord_lock.count) {
@@ -700,45 +741,31 @@ int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
                        return 0;
                tdb->ecode = TDB_ERR_LOCK;
                tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                        "tdb_lock_free_list with RO allrecordlock!\n");
+                        "tdb_lock_free_bucket with RO allrecordlock!\n");
+               return -1;
+       }
+
+#if 0 /* FIXME */
+       if (tdb_has_expansion_lock(tdb)) {
+               tdb->ecode = TDB_ERR_LOCK;
+               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                        "tdb_lock_free_bucket: have expansion lock already\n");
                return -1;
        }
+#endif
 
-       return tdb_nest_lock(tdb, TDB_HASH_LOCK_START
-                            + (1ULL << tdb->header.v.hash_bits)
-                            + flist, F_WRLCK, waitflag);
+       return tdb_nest_lock(tdb, free_lock_off(b_off), F_WRLCK, waitflag);
 }
 
-void tdb_unlock_free_list(struct tdb_context *tdb, tdb_off_t flist)
+void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off)
 {
        if (tdb->allrecord_lock.count)
                return;
 
-       tdb_nest_unlock(tdb, TDB_HASH_LOCK_START
-                       + (1ULL << tdb->header.v.hash_bits)
-                       + flist, F_WRLCK);
+       tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
 }
 
 #if 0
-static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
-                    int ltype, enum tdb_lock_flags waitflag,
-                    const char *func)
-{
-       int ret;
-       uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
-
-       ret = tdb_lock_list(tdb, h, ltype, waitflag) == TDB_OFF_ERR ? -1 : 0;
-       tdb_trace_1rec(tdb, func, *key);
-       return ret;
-}
-
-/* lock/unlock one hash chain. This is meant to be used to reduce
-   contention - it cannot guarantee how many records will be locked */
-int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
-{
-       return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
-}
-
 /* lock/unlock one hash chain, non-blocking. This is meant to be used
    to reduce contention - it cannot guarantee how many records will be
    locked */
@@ -748,14 +775,6 @@ int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
                         "tdb_chainlock_nonblock");
 }
 
-int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
-{
-       uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
-       tdb_trace_1rec(tdb, "tdb_chainunlock", key);
-       return tdb_unlock_list(tdb, h & ((1ULL << tdb->header.v.hash_bits)-1),
-                              F_WRLCK);
-}
-
 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
 {
        return chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,