*/
#include "private.h"
+#include <assert.h>
+#include <ccan/build_assert/build_assert.h>
static int fcntl_lock(struct tdb_context *tdb,
int rw, off_t off, off_t len, bool waitflag)
{
struct tdb_lock_type *new_lck;
- if (offset >= TDB_HASH_LOCK_START + (1ULL << tdb->header.v.hash_bits)
- + (tdb->header.v.num_zones * (tdb->header.v.free_buckets+1))) {
+ if (offset >= TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE + tdb->map_size / 8) {
tdb->ecode = TDB_ERR_LOCK;
tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
- "tdb_lock: invalid offset %llu for ltype=%d\n",
+ "tdb_nest_lock: invalid offset %llu ltype=%d\n",
(long long)offset, ltype);
return -1;
}
+
if (tdb->flags & TDB_NOLOCK)
return 0;
if (new_lck == NULL) {
tdb->ecode = TDB_ERR_OOM;
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_lock: unable to allocate %llu lock structure",
+ "tdb_nest_lock: unable to allocate %llu lock struct",
(long long)(tdb->num_lockrecs + 1));
errno = ENOMEM;
return -1;
if ((lck == NULL) || (lck->count == 0)) {
tdb->ecode = TDB_ERR_LOCK;
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_unlock: no lock for %llu\n", (long long)off);
+ "tdb_nest_unlock: no lock for %llu\n", (long long)off);
return -1;
}
*/
*lck = tdb->lockrecs[--tdb->num_lockrecs];
- if (tdb->num_lockrecs == 0) {
- /* If we're not holding any locks, header can change. */
- tdb->header_uptodate = false;
- }
-
return ret;
}
int ret;
enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
- if (len <= 4) {
- /* Single record. Just do blocking lock. */
+ if (len <= 1) {
+ /* 0 would mean to end-of-file... */
+ assert(len != 0);
+ /* Single hash. Just do blocking lock. */
return tdb_brlock(tdb, ltype, off, len, flags);
}
}
/* lock/unlock entire database. It can only be upgradable if you have some
- * other way of guaranteeing exclusivity (ie. transaction write lock).
- * Note that we don't lock the free chains: noone can get those locks
- * without a hash chain lock first. */
+ * other way of guaranteeing exclusivity (ie. transaction write lock). */
int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
enum tdb_lock_flags flags, bool upgradable)
{
- tdb_off_t hash_size;
-
/* FIXME: There are no locks on read-only dbs */
if (tdb->read_only) {
tdb->ecode = TDB_ERR_LOCK;
return -1;
}
- /* Lock all the hash buckets. */
again:
- hash_size = (1ULL << tdb->header.v.hash_bits);
- if (tdb_lock_gradual(tdb, ltype, TDB_HASH_LOCK_START,
- 1ULL << tdb->header.v.hash_bits, flags)) {
+ /* Lock hashes, gradually. */
+ if (tdb_lock_gradual(tdb, ltype, flags, TDB_HASH_LOCK_START,
+ TDB_HASH_LOCK_RANGE)) {
if (!(flags & TDB_LOCK_PROBE)) {
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_lockall hashes failed (%s)\n",
+ "tdb_allrecord_lock hashes failed (%s)\n",
strerror(errno));
}
return -1;
}
- /* Now we re-check header, holding lock. */
- if (unlikely(update_header(tdb))) {
- tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, hash_size);
- goto again;
+ /* Lock free lists: there to end of file. */
+ if (tdb_brlock(tdb, ltype, TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE,
+ 0, flags)) {
+ if (!(flags & TDB_LOCK_PROBE)) {
+ tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+ "tdb_allrecord_lock freelist failed (%s)\n",
+ strerror(errno));
+ }
+ tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START,
+ TDB_HASH_LOCK_RANGE);
+ return -1;
}
+ tdb->allrecord_lock.count = 1;
+ /* If it's upgradable, it's actually exclusive so we can treat
+ * it as a write lock. */
+ tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
+ tdb->allrecord_lock.off = upgradable;
+
/* Now check for needing recovery. */
if (unlikely(tdb_needs_recovery(tdb))) {
- tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, hash_size);
+ tdb_allrecord_unlock(tdb, ltype);
if (tdb_lock_and_recover(tdb) == -1) {
return -1;
}
goto again;
}
-
- tdb->allrecord_lock.count = 1;
- /* If it's upgradable, it's actually exclusive so we can treat
- * it as a write lock. */
- tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
- tdb->allrecord_lock.off = upgradable;
return 0;
}
tdb_nest_unlock(tdb, TDB_OPEN_LOCK, F_WRLCK);
}
+int tdb_lock_expand(struct tdb_context *tdb, int ltype)
+{
+ return tdb_nest_lock(tdb, TDB_EXPANSION_LOCK, ltype, TDB_LOCK_WAIT);
+}
+
+void tdb_unlock_expand(struct tdb_context *tdb, int ltype)
+{
+ tdb_nest_unlock(tdb, TDB_EXPANSION_LOCK, ltype);
+}
+
/* unlock entire db */
int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
{
- tdb_off_t hash_size;
-
/* FIXME: There are no locks on read-only dbs */
if (tdb->read_only) {
tdb->ecode = TDB_ERR_LOCK;
tdb->allrecord_lock.count = 0;
tdb->allrecord_lock.ltype = 0;
- hash_size = (1ULL << tdb->header.v.hash_bits);
+ return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, 0);
+}
- return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, hash_size);
+bool tdb_has_expansion_lock(struct tdb_context *tdb)
+{
+ return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL;
}
bool tdb_has_locks(struct tdb_context *tdb)
}
#endif
-int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list,
- int ltype, enum tdb_lock_flags waitflag)
+static bool tdb_has_free_lock(struct tdb_context *tdb)
{
+ unsigned int i;
+
+ for (i=0; i<tdb->num_lockrecs; i++) {
+ if (tdb->lockrecs[i].off
+ > TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE)
+ return true;
+ }
+ return false;
+}
+
+int tdb_lock_hashes(struct tdb_context *tdb,
+ tdb_off_t hash_lock,
+ tdb_len_t hash_range,
+ int ltype, enum tdb_lock_flags waitflag)
+{
+ /* FIXME: Do this properly, using hlock_range */
+ unsigned lock = TDB_HASH_LOCK_START
+ + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
+
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count &&
(ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
if (tdb->allrecord_lock.count) {
tdb->ecode = TDB_ERR_LOCK;
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_lock_list: have %s allrecordlock\n",
+ "tdb_lock_hashes: have %s allrecordlock\n",
tdb->allrecord_lock.ltype == F_RDLCK
? "read" : "write");
return -1;
}
- /* FIXME: Should we do header_uptodate and return retry here? */
- return tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag);
+ if (tdb_has_free_lock(tdb)) {
+ tdb->ecode = TDB_ERR_LOCK;
+ tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+ "tdb_lock_hashes: have free lock already\n");
+ return -1;
+ }
+
+ if (tdb_has_expansion_lock(tdb)) {
+ tdb->ecode = TDB_ERR_LOCK;
+ tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+ "tdb_lock_hashes: have expansion lock already\n");
+ return -1;
+ }
+
+ return tdb_nest_lock(tdb, lock, ltype, waitflag);
}
-int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
+int tdb_unlock_hashes(struct tdb_context *tdb,
+ tdb_off_t hash_lock,
+ tdb_len_t hash_range, int ltype)
{
+ unsigned lock = TDB_HASH_LOCK_START
+ + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
+
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count) {
if (tdb->allrecord_lock.ltype == F_RDLCK
&& ltype == F_WRLCK) {
tdb->ecode = TDB_ERR_LOCK;
tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
- "tdb_unlock_list RO allrecord!\n");
+ "tdb_unlock_hashes RO allrecord!\n");
return -1;
}
return 0;
- } else {
- return tdb_nest_unlock(tdb, TDB_HASH_LOCK_START + list, ltype);
}
+
+ return tdb_nest_unlock(tdb, lock, ltype);
}
-/* Free list locks come after hash locks */
-int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
- enum tdb_lock_flags waitflag)
+/* Hash locks use TDB_HASH_LOCK_START + the next 30 bits.
+ * Then we begin; bucket offsets are sizeof(tdb_len_t) apart, so we divide.
+ * The result is that on 32 bit systems we don't use lock values > 2^31 on
+ * files that are less than 4GB.
+ */
+static tdb_off_t free_lock_off(tdb_off_t b_off)
{
- /* You're supposed to have a hash lock first! */
- if (!tdb_has_locks(tdb)) {
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
- "tdb_lock_free_list without lock!\n");
- return -1;
- }
+ return TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE
+ + b_off / sizeof(tdb_off_t);
+}
+
+int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
+ enum tdb_lock_flags waitflag)
+{
+ assert(b_off >= sizeof(struct tdb_header));
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count) {
return 0;
tdb->ecode = TDB_ERR_LOCK;
tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
- "tdb_lock_free_list with RO allrecordlock!\n");
+ "tdb_lock_free_bucket with RO allrecordlock!\n");
return -1;
}
- return tdb_nest_lock(tdb, TDB_HASH_LOCK_START
- + (1ULL << tdb->header.v.hash_bits)
- + flist, F_WRLCK, waitflag);
+#if 0 /* FIXME */
+ if (tdb_has_expansion_lock(tdb)) {
+ tdb->ecode = TDB_ERR_LOCK;
+ tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+ "tdb_lock_free_bucket: have expansion lock already\n");
+ return -1;
+ }
+#endif
+
+ return tdb_nest_lock(tdb, free_lock_off(b_off), F_WRLCK, waitflag);
}
-void tdb_unlock_free_list(struct tdb_context *tdb, tdb_off_t flist)
+void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off)
{
if (tdb->allrecord_lock.count)
return;
- tdb_nest_unlock(tdb, TDB_HASH_LOCK_START
- + (1ULL << tdb->header.v.hash_bits)
- + flist, F_WRLCK);
+ tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
}
-#if 0
-static int chainlock_loop(struct tdb_context *tdb, const TDB_DATA *key,
- int ltype, enum tdb_lock_flags waitflag,
- const char *func)
+/* Even if the entry isn't in this hash bucket, you'd have to lock this
+ * bucket to find it. */
+static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
+ int ltype, enum tdb_lock_flags waitflag,
+ const char *func)
{
int ret;
uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
-again:
- ret = tdb_lock_list(tdb,
- h & ((1ULL << tdb->header.v.hash_bits) - 1),
- ltype, waitflag);
- if (likely(ret == 0) && unlikely(update_header(tdb))) {
- tdb_unlock_list(tdb, h & ((1ULL << tdb->header.v.hash_bits)-1),
- ltype);
- goto again;
- }
-
+ ret = tdb_lock_hashes(tdb, h, 1, ltype, waitflag);
tdb_trace_1rec(tdb, func, *key);
return ret;
}
contention - it cannot guarantee how many records will be locked */
int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
{
- return chainlock_loop(tdb, &key, F_WRLCK, TDB_LOCK_WAIT,
- "tdb_chainlock");
+ return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
}
+int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
+{
+ uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
+ tdb_trace_1rec(tdb, "tdb_chainunlock", key);
+ return tdb_unlock_hashes(tdb, h, 1, F_WRLCK);
+}
+
+#if 0
/* lock/unlock one hash chain, non-blocking. This is meant to be used
to reduce contention - it cannot guarantee how many records will be
locked */
int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
{
- return chainlock_loop(tdb, &key, F_WRLCK, TDB_LOCK_NOWAIT,
- "tdb_chainlock_nonblock");
-}
-
-int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
-{
- uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
- tdb_trace_1rec(tdb, "tdb_chainunlock", key);
- return tdb_unlock_list(tdb, h & ((1ULL << tdb->header.v.hash_bits)-1),
- F_WRLCK);
+ return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_NOWAIT,
+ "tdb_chainlock_nonblock");
}
int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
{
- return chainlock_loop(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
- "tdb_chainlock_read");
+ return chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
+ "tdb_chainlock_read");
}
int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
tdb->header_uptodate = false;
}
#endif
+
+void tdb_lock_init(struct tdb_context *tdb)
+{
+ tdb->num_lockrecs = 0;
+ tdb->lockrecs = NULL;
+ tdb->allrecord_lock.count = 0;
+}