]> git.ozlabs.org Git - ccan/blobdiff - ccan/tdb2/lock.c
gitify the tree, especially the web makefile.
[ccan] / ccan / tdb2 / lock.c
index 14a7b5e5c65904749bd78647bb7558b12489fb15..14f9c0ae307b348b1a7401b6001bbd419a8e0c69 100644 (file)
@@ -26,6 +26,8 @@
 */
 
 #include "private.h"
+#include <assert.h>
+#include <ccan/build_assert/build_assert.h>
 
 static int fcntl_lock(struct tdb_context *tdb,
                      int rw, off_t off, off_t len, bool waitflag)
@@ -255,14 +257,14 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype,
 {
        struct tdb_lock_type *new_lck;
 
-       if (offset >= TDB_HASH_LOCK_START + (1ULL << tdb->header.v.hash_bits)
-           + (tdb->header.v.num_zones * (tdb->header.v.free_buckets+1))) {
+       if (offset >= TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE + tdb->map_size / 8) {
                tdb->ecode = TDB_ERR_LOCK;
                tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                        "tdb_lock: invalid offset %llu for ltype=%d\n",
+                        "tdb_nest_lock: invalid offset %llu ltype=%d\n",
                         (long long)offset, ltype);
                return -1;
        }
+
        if (tdb->flags & TDB_NOLOCK)
                return 0;
 
@@ -282,7 +284,7 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype,
        if (new_lck == NULL) {
                tdb->ecode = TDB_ERR_OOM;
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                        "tdb_lock: unable to allocate %llu lock structure",
+                        "tdb_nest_lock: unable to allocate %llu lock struct",
                         (long long)(tdb->num_lockrecs + 1));
                errno = ENOMEM;
                return -1;
@@ -349,7 +351,7 @@ static int tdb_nest_unlock(struct tdb_context *tdb, tdb_off_t off, int ltype)
        if ((lck == NULL) || (lck->count == 0)) {
                tdb->ecode = TDB_ERR_LOCK;
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                        "tdb_unlock: no lock for %llu\n", (long long)off);
+                        "tdb_nest_unlock: no lock for %llu\n", (long long)off);
                return -1;
        }
 
@@ -372,11 +374,6 @@ static int tdb_nest_unlock(struct tdb_context *tdb, tdb_off_t off, int ltype)
         */
        *lck = tdb->lockrecs[--tdb->num_lockrecs];
 
-       if (tdb->num_lockrecs == 0) {
-               /* If we're not holding any locks, header can change. */
-               tdb->header_uptodate = false;
-       }
-
        return ret;
 }
 
@@ -408,8 +405,10 @@ static int tdb_lock_gradual(struct tdb_context *tdb,
        int ret;
        enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
 
-       if (len <= 4) {
-               /* Single record.  Just do blocking lock. */
+       if (len <= 1) {
+               /* 0 would mean to end-of-file... */
+               assert(len != 0);
+               /* Single hash.  Just do blocking lock. */
                return tdb_brlock(tdb, ltype, off, len, flags);
        }
 
@@ -434,15 +433,10 @@ static int tdb_lock_gradual(struct tdb_context *tdb,
 }
 
 /* lock/unlock entire database.  It can only be upgradable if you have some
- * other way of guaranteeing exclusivity (ie. transaction write lock).
- * Note that we don't lock the free chains: noone can get those locks
- * without a hash chain lock first.
- * The header *will be* up to date once this returns success. */
+ * other way of guaranteeing exclusivity (ie. transaction write lock). */
 int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
                       enum tdb_lock_flags flags, bool upgradable)
 {
-       tdb_off_t hash_size;
-
        /* FIXME: There are no locks on read-only dbs */
        if (tdb->read_only) {
                tdb->ecode = TDB_ERR_LOCK;
@@ -482,16 +476,28 @@ int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
                return -1;
        }
 
-       /* Lock all the hash buckets. */
 again:
-       hash_size = (1ULL << tdb->header.v.hash_bits);
+       /* Lock hashes, gradually. */
        if (tdb_lock_gradual(tdb, ltype, flags, TDB_HASH_LOCK_START,
-                            hash_size)) {
+                            TDB_HASH_LOCK_RANGE)) {
+               if (!(flags & TDB_LOCK_PROBE)) {
+                       tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                                "tdb_allrecord_lock hashes failed (%s)\n",
+                                strerror(errno));
+               }
+               return -1;
+       }
+
+       /* Lock free lists: there to end of file. */
+       if (tdb_brlock(tdb, ltype, TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE,
+                      0, flags)) {
                if (!(flags & TDB_LOCK_PROBE)) {
                        tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                                "tdb_lockall hashes failed (%s)\n",
+                                "tdb_allrecord_lock freelist failed (%s)\n",
                                 strerror(errno));
                }
+               tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, 
+                            TDB_HASH_LOCK_RANGE);
                return -1;
        }
 
@@ -501,12 +507,6 @@ again:
        tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
        tdb->allrecord_lock.off = upgradable;
 
-       /* Now we re-check header, holding lock. */
-       if (unlikely(header_changed(tdb))) {
-               tdb_allrecord_unlock(tdb, ltype);
-               goto again;
-       }
-
        /* Now check for needing recovery. */
        if (unlikely(tdb_needs_recovery(tdb))) {
                tdb_allrecord_unlock(tdb, ltype);
@@ -529,11 +529,19 @@ void tdb_unlock_open(struct tdb_context *tdb)
        tdb_nest_unlock(tdb, TDB_OPEN_LOCK, F_WRLCK);
 }
 
+int tdb_lock_expand(struct tdb_context *tdb, int ltype)
+{
+       return tdb_nest_lock(tdb, TDB_EXPANSION_LOCK, ltype, TDB_LOCK_WAIT);
+}
+
+void tdb_unlock_expand(struct tdb_context *tdb, int ltype)
+{
+       tdb_nest_unlock(tdb, TDB_EXPANSION_LOCK, ltype);
+}
+
 /* unlock entire db */
 int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 {
-       tdb_off_t hash_size;
-
        /* FIXME: There are no locks on read-only dbs */
        if (tdb->read_only) {
                tdb->ecode = TDB_ERR_LOCK;
@@ -567,11 +575,13 @@ int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 
        tdb->allrecord_lock.count = 0;
        tdb->allrecord_lock.ltype = 0;
-       tdb->header_uptodate = false;
 
-       hash_size = (1ULL << tdb->header.v.hash_bits);
+       return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, 0);
+}
 
-       return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, hash_size);
+bool tdb_has_expansion_lock(struct tdb_context *tdb)
+{
+       return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL;
 }
 
 bool tdb_has_locks(struct tdb_context *tdb)
@@ -625,47 +635,65 @@ int tdb_unlockall_read(struct tdb_context *tdb)
 }
 #endif
 
-/* Returns the list we actually locked. */
-tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
-                       int ltype, enum tdb_lock_flags waitflag)
+static bool tdb_has_free_lock(struct tdb_context *tdb)
+{
+       unsigned int i;
+
+       for (i=0; i<tdb->num_lockrecs; i++) {
+               if (tdb->lockrecs[i].off
+                   > TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE)
+                       return true;
+       }
+       return false;
+}
+
+int tdb_lock_hashes(struct tdb_context *tdb,
+                   tdb_off_t hash_lock,
+                   tdb_len_t hash_range,
+                   int ltype, enum tdb_lock_flags waitflag)
 {
-       tdb_off_t list = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
-       /* Header can change ONLY if we had no locks before. */
-       bool can_change = tdb->num_lockrecs == 0;
+       /* FIXME: Do this properly, using hlock_range */
+       unsigned lock = TDB_HASH_LOCK_START
+               + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
 
        /* a allrecord lock allows us to avoid per chain locks */
        if (tdb->allrecord_lock.count &&
            (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
-               return list;
+               return 0;
        }
 
        if (tdb->allrecord_lock.count) {
                tdb->ecode = TDB_ERR_LOCK;
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                        "tdb_lock_list: have %s allrecordlock\n",
+                        "tdb_lock_hashes: have %s allrecordlock\n",
                         tdb->allrecord_lock.ltype == F_RDLCK
                         ? "read" : "write");
-               return TDB_OFF_ERR;
+               return -1;
        }
 
-again:
-       if (tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag))
-               return TDB_OFF_ERR;
-
-       if (can_change && unlikely(header_changed(tdb))) {
-               tdb_off_t new = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
-               if (new != list) {
-                       tdb_nest_unlock(tdb, TDB_HASH_LOCK_START+list, ltype);
-                       list = new;
-                       goto again;
-               }
+       if (tdb_has_free_lock(tdb)) {
+               tdb->ecode = TDB_ERR_LOCK;
+               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                        "tdb_lock_hashes: have free lock already\n");
+               return -1;
+       }
+
+       if (tdb_has_expansion_lock(tdb)) {
+               tdb->ecode = TDB_ERR_LOCK;
+               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                        "tdb_lock_hashes: have expansion lock already\n");
+               return -1;
        }
-       return list;
+
+       return tdb_nest_lock(tdb, lock, ltype, waitflag);
 }
 
-int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
+int tdb_unlock_hashes(struct tdb_context *tdb,
+                     tdb_off_t hash_lock,
+                     tdb_len_t hash_range, int ltype)
 {
-       list &= ((1ULL << tdb->header.v.hash_bits) - 1);
+       unsigned lock = TDB_HASH_LOCK_START
+               + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
 
        /* a allrecord lock allows us to avoid per chain locks */
        if (tdb->allrecord_lock.count) {
@@ -673,26 +701,30 @@ int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
                    && ltype == F_WRLCK) {
                        tdb->ecode = TDB_ERR_LOCK;
                        tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                                "tdb_unlock_list RO allrecord!\n");
+                                "tdb_unlock_hashes RO allrecord!\n");
                        return -1;
                }
                return 0;
-       } else {
-               return tdb_nest_unlock(tdb, TDB_HASH_LOCK_START + list, ltype);
        }
+
+       return tdb_nest_unlock(tdb, lock, ltype);
 }
 
-/* Free list locks come after hash locks */
-int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
-                      enum tdb_lock_flags waitflag)
+/* Hash locks use TDB_HASH_LOCK_START + the next 30 bits.
+ * Then we begin; bucket offsets are sizeof(tdb_len_t) apart, so we divide.
+ * The result is that on 32 bit systems we don't use lock values > 2^31 on
+ * files that are less than 4GB.
+ */
+static tdb_off_t free_lock_off(tdb_off_t b_off)
 {
-       /* You're supposed to have a hash lock first! */
-       if (!(tdb->flags & TDB_NOLOCK) && !tdb_has_locks(tdb)) {
-               tdb->ecode = TDB_ERR_LOCK;
-               tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                        "tdb_lock_free_list without lock!\n");
-               return -1;
-       }
+       return TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE
+               + b_off / sizeof(tdb_off_t);
+}
+
+int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
+                        enum tdb_lock_flags waitflag)
+{
+       assert(b_off >= sizeof(struct tdb_header));
 
        /* a allrecord lock allows us to avoid per chain locks */
        if (tdb->allrecord_lock.count) {
@@ -700,23 +732,28 @@ int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
                        return 0;
                tdb->ecode = TDB_ERR_LOCK;
                tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                        "tdb_lock_free_list with RO allrecordlock!\n");
+                        "tdb_lock_free_bucket with RO allrecordlock!\n");
                return -1;
        }
 
-       return tdb_nest_lock(tdb, TDB_HASH_LOCK_START
-                            + (1ULL << tdb->header.v.hash_bits)
-                            + flist, F_WRLCK, waitflag);
+#if 0 /* FIXME */
+       if (tdb_has_expansion_lock(tdb)) {
+               tdb->ecode = TDB_ERR_LOCK;
+               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                        "tdb_lock_free_bucket: have expansion lock already\n");
+               return -1;
+       }
+#endif
+
+       return tdb_nest_lock(tdb, free_lock_off(b_off), F_WRLCK, waitflag);
 }
 
-void tdb_unlock_free_list(struct tdb_context *tdb, tdb_off_t flist)
+void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off)
 {
        if (tdb->allrecord_lock.count)
                return;
 
-       tdb_nest_unlock(tdb, TDB_HASH_LOCK_START
-                       + (1ULL << tdb->header.v.hash_bits)
-                       + flist, F_WRLCK);
+       tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
 }
 
 /* Even if the entry isn't in this hash bucket, you'd have to lock this
@@ -728,7 +765,7 @@ static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
        int ret;
        uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
 
-       ret = tdb_lock_list(tdb, h, ltype, waitflag) == TDB_OFF_ERR ? -1 : 0;
+       ret = tdb_lock_hashes(tdb, h, 1, ltype, waitflag);
        tdb_trace_1rec(tdb, func, *key);
        return ret;
 }
@@ -744,7 +781,7 @@ int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
 {
        uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
        tdb_trace_1rec(tdb, "tdb_chainunlock", key);
-       return tdb_unlock_list(tdb, h, F_WRLCK);
+       return tdb_unlock_hashes(tdb, h, 1, F_WRLCK);
 }
 
 #if 0