*/
#include "private.h"
+#include <assert.h>
+#include <ccan/build_assert/build_assert.h>
static int fcntl_lock(struct tdb_context *tdb,
int rw, off_t off, off_t len, bool waitflag)
fl.l_len = len;
fl.l_pid = 0;
+ add_stat(tdb, lock_lowlevel, 1);
if (waitflag)
return fcntl(tdb->fd, F_SETLKW, &fl);
- else
+ else {
+ add_stat(tdb, lock_nonblock, 1);
return fcntl(tdb->fd, F_SETLK, &fl);
+ }
}
static int fcntl_unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
}
if (!found) {
- fprintf(stderr, "Unlock on %u@%u not found!\n",
+ fprintf(stderr, "Unlock on %u@%u not found!",
(int)off, (int)len);
abort();
}
}
if (rw_type == F_WRLCK && tdb->read_only) {
- tdb->ecode = TDB_ERR_RDONLY;
+ tdb_logerr(tdb, TDB_ERR_RDONLY, TDB_DEBUG_WARNING,
+ "Write lock attempted on read-only database");
return -1;
}
/* A 32 bit system cannot open a 64-bit file, but it could have
* expanded since then: check here. */
if ((size_t)(offset + len) != offset + len) {
- tdb->ecode = TDB_ERR_IO;
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_brlock: lock on giant offset %llu\n",
+ tdb_logerr(tdb, TDB_ERR_IO, TDB_DEBUG_ERROR,
+ "tdb_brlock: lock on giant offset %llu",
(long long)(offset + len));
return -1;
}
* EAGAIN is an expected return from non-blocking
* locks. */
if (!(flags & TDB_LOCK_PROBE) && errno != EAGAIN) {
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_brlock failed (fd=%d) at"
- " offset %llu rw_type=%d flags=%d len=%llu\n",
- tdb->fd, (long long)offset, rw_type,
- flags, (long long)len);
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_brlock failed (fd=%d) at"
+ " offset %zu rw_type=%d flags=%d len=%zu:"
+ " %s",
+ tdb->fd, (size_t)offset, rw_type,
+ flags, (size_t)len, strerror(errno));
}
return -1;
}
} while (ret == -1 && errno == EINTR);
if (ret == -1) {
- tdb->log(tdb, TDB_DEBUG_TRACE, tdb->log_priv,
- "tdb_brunlock failed (fd=%d) at offset %llu"
- " rw_type=%d len=%llu\n",
- tdb->fd, (long long)offset, rw_type, (long long)len);
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_TRACE,
+ "tdb_brunlock failed (fd=%d) at offset %zu"
+ " rw_type=%d len=%zu",
+ tdb->fd, (size_t)offset, rw_type, (size_t)len);
}
return ret;
}
-#if 0
/*
upgrade a read lock to a write lock. This needs to be handled in a
special way as some OSes (such as solaris) have too conservative
int count = 1000;
if (tdb->allrecord_lock.count != 1) {
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_allrecord_upgrade failed: count %u too high\n",
- tdb->allrecord_lock.count);
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_allrecord_upgrade failed: count %u too high",
+ tdb->allrecord_lock.count);
return -1;
}
if (tdb->allrecord_lock.off != 1) {
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_allrecord_upgrade failed: already upgraded?\n");
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_allrecord_upgrade failed: already upgraded?");
return -1;
}
while (count--) {
struct timeval tv;
if (tdb_brlock(tdb, F_WRLCK,
- TDB_HASH_LOCK_START
- + (1ULL << tdb->header.v.hash_bits), 0,
+ TDB_HASH_LOCK_START, 0,
TDB_LOCK_WAIT|TDB_LOCK_PROBE) == 0) {
tdb->allrecord_lock.ltype = F_WRLCK;
tdb->allrecord_lock.off = 0;
tv.tv_usec = 1;
select(0, NULL, NULL, NULL, &tv);
}
- tdb->log(tdb, TDB_DEBUG_WARNING, tdb->log_priv,
- "tdb_allrecord_upgrade failed\n");
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_WARNING,
+ "tdb_allrecord_upgrade failed");
return -1;
}
-#endif
static struct tdb_lock_type *find_nestlock(struct tdb_context *tdb,
tdb_off_t offset)
return NULL;
}
+int tdb_lock_and_recover(struct tdb_context *tdb)
+{
+ int ret;
+
+ if (tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK,
+ false) == -1) {
+ return -1;
+ }
+
+ if (tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK) == -1) {
+ tdb_allrecord_unlock(tdb, F_WRLCK);
+ return -1;
+ }
+ ret = tdb_transaction_recover(tdb);
+
+ tdb_unlock_open(tdb);
+ tdb_allrecord_unlock(tdb, F_WRLCK);
+
+ return ret;
+}
+
/* lock an offset in the database. */
static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype,
enum tdb_lock_flags flags)
{
struct tdb_lock_type *new_lck;
- /* Header is not valid for open lock; valgrind complains. */
- if (offset >= TDB_HASH_LOCK_START) {
- if (offset > TDB_HASH_LOCK_START
- + (1ULL << tdb->header.v.hash_bits)
- + (tdb->header.v.num_zones
- * (tdb->header.v.free_buckets+1))) {
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
- "tdb_lock: invalid offset %llu ltype=%d\n",
- (long long)offset, ltype);
- return -1;
- }
+ if (offset > TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE + tdb->map_size / 8) {
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_FATAL,
+ "tdb_nest_lock: invalid offset %zu ltype=%d",
+ (size_t)offset, ltype);
+ return -1;
}
+
if (tdb->flags & TDB_NOLOCK)
return 0;
+ add_stat(tdb, locks, 1);
+
new_lck = find_nestlock(tdb, offset);
if (new_lck) {
- /*
- * Just increment the in-memory struct, posix locks
- * don't stack.
- */
+ if (new_lck->ltype == F_RDLCK && ltype == F_WRLCK) {
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_FATAL,
+ "tdb_nest_lock: offset %zu has read lock",
+ (size_t)offset);
+ return -1;
+ }
+ /* Just increment the struct, posix locks don't stack. */
new_lck->count++;
return 0;
}
+ if (tdb->num_lockrecs
+ && offset >= TDB_HASH_LOCK_START
+ && offset < TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE) {
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_FATAL,
+ "tdb_nest_lock: already have a hash lock?");
+ return -1;
+ }
+
new_lck = (struct tdb_lock_type *)realloc(
tdb->lockrecs,
sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
if (new_lck == NULL) {
- tdb->ecode = TDB_ERR_OOM;
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_lock: unable to allocate %llu lock structure",
- (long long)(tdb->num_lockrecs + 1));
+ tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_ERROR,
+ "tdb_nest_lock: unable to allocate %zu lock struct",
+ tdb->num_lockrecs + 1);
errno = ENOMEM;
return -1;
}
return -1;
}
+ /* First time we grab a lock, perhaps someone died in commit? */
+ if (!(flags & TDB_LOCK_NOCHECK)
+ && tdb->num_lockrecs == 0
+ && unlikely(tdb_needs_recovery(tdb))) {
+ tdb_brunlock(tdb, ltype, offset, 1);
+
+ if (tdb_lock_and_recover(tdb) == -1) {
+ return -1;
+ }
+
+ if (tdb_brlock(tdb, ltype, offset, 1, flags)) {
+ return -1;
+ }
+ }
+
tdb->lockrecs[tdb->num_lockrecs].off = offset;
tdb->lockrecs[tdb->num_lockrecs].count = 1;
tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
return 0;
}
-static int tdb_lock_and_recover(struct tdb_context *tdb)
-{
-#if 0 /* FIXME */
-
- int ret;
-
- /* We need to match locking order in transaction commit. */
- if (tdb_brlock(tdb, F_WRLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT)) {
- return -1;
- }
-
- if (tdb_brlock(tdb, F_WRLCK, OPEN_LOCK, 1, TDB_LOCK_WAIT)) {
- tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0);
- return -1;
- }
-
- ret = tdb_transaction_recover(tdb);
-
- tdb_brunlock(tdb, F_WRLCK, OPEN_LOCK, 1);
- tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0);
-
- return ret;
-#else
- abort();
- return -1;
-#endif
-}
-
-static bool tdb_needs_recovery(struct tdb_context *tdb)
-{
- /* FIXME */
- return false;
-}
-
static int tdb_nest_unlock(struct tdb_context *tdb, tdb_off_t off, int ltype)
{
int ret = -1;
lck = find_nestlock(tdb, off);
if ((lck == NULL) || (lck->count == 0)) {
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_unlock: no lock for %llu\n", (long long)off);
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_nest_unlock: no lock for %zu", (size_t)off);
return -1;
}
*/
*lck = tdb->lockrecs[--tdb->num_lockrecs];
- if (tdb->num_lockrecs == 0) {
- /* If we're not holding any locks, header can change. */
- tdb->header_uptodate = false;
- }
-
return ret;
}
-#if 0
/*
get the transaction lock
*/
-int tdb_transaction_lock(struct tdb_context *tdb, int ltype,
- enum tdb_lock_flags lockflags)
+int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
{
- return tdb_nest_lock(tdb, TRANSACTION_LOCK, ltype, lockflags);
+ return tdb_nest_lock(tdb, TDB_TRANSACTION_LOCK, ltype, TDB_LOCK_WAIT);
}
/*
*/
int tdb_transaction_unlock(struct tdb_context *tdb, int ltype)
{
- return tdb_nest_unlock(tdb, TRANSACTION_LOCK, ltype, false);
+ return tdb_nest_unlock(tdb, TDB_TRANSACTION_LOCK, ltype);
}
-#endif
/* We only need to lock individual bytes, but Linux merges consecutive locks
* so we lock in contiguous ranges. */
int ret;
enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
- if (len <= 4) {
- /* Single record. Just do blocking lock. */
+ if (len <= 1) {
+ /* 0 would mean to end-of-file... */
+ assert(len != 0);
+ /* Single hash. Just do blocking lock. */
return tdb_brlock(tdb, ltype, off, len, flags);
}
}
/* lock/unlock entire database. It can only be upgradable if you have some
- * other way of guaranteeing exclusivity (ie. transaction write lock).
- * Note that we don't lock the free chains: noone can get those locks
- * without a hash chain lock first.
- * The header *will be* up to date once this returns success. */
+ * other way of guaranteeing exclusivity (ie. transaction write lock). */
int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
enum tdb_lock_flags flags, bool upgradable)
{
- tdb_off_t hash_size;
-
/* FIXME: There are no locks on read-only dbs */
if (tdb->read_only) {
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_allrecord_lock: read-only\n");
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_allrecord_lock: read-only");
return -1;
}
- if (tdb->allrecord_lock.count && tdb->allrecord_lock.ltype == ltype) {
+ if (tdb->allrecord_lock.count
+ && (ltype == F_RDLCK || tdb->allrecord_lock.ltype == F_WRLCK)) {
tdb->allrecord_lock.count++;
return 0;
}
if (tdb->allrecord_lock.count) {
/* a global lock of a different type exists */
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_allrecord_lock: already have %s lock\n",
- tdb->allrecord_lock.ltype == F_RDLCK
- ? "read" : "write");
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_allrecord_lock: already have %s lock",
+ tdb->allrecord_lock.ltype == F_RDLCK
+ ? "read" : "write");
return -1;
}
- if (tdb_has_locks(tdb)) {
+ if (tdb_has_hash_locks(tdb)) {
/* can't combine global and chain locks */
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_allrecord_lock: already have chain lock\n");
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_allrecord_lock: already have chain lock");
return -1;
}
if (upgradable && ltype != F_RDLCK) {
/* tdb error: you can't upgrade a write lock! */
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_allrecord_lock: can't upgrade a write lock\n");
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_allrecord_lock: can't upgrade a write lock");
return -1;
}
- /* Lock all the hash buckets. */
+ add_stat(tdb, locks, 1);
again:
- hash_size = (1ULL << tdb->header.v.hash_bits);
+ /* Lock hashes, gradually. */
if (tdb_lock_gradual(tdb, ltype, flags, TDB_HASH_LOCK_START,
- hash_size)) {
+ TDB_HASH_LOCK_RANGE)) {
+ if (!(flags & TDB_LOCK_PROBE)) {
+ tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_ERROR,
+ "tdb_allrecord_lock hashes failed");
+ }
+ return -1;
+ }
+
+ /* Lock free tables: there to end of file. */
+ if (tdb_brlock(tdb, ltype, TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE,
+ 0, flags)) {
if (!(flags & TDB_LOCK_PROBE)) {
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_lockall hashes failed (%s)\n",
- strerror(errno));
+ tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_ERROR,
+ "tdb_allrecord_lock freetables failed");
}
+ tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START,
+ TDB_HASH_LOCK_RANGE);
return -1;
}
tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
tdb->allrecord_lock.off = upgradable;
- /* Now we re-check header, holding lock. */
- if (unlikely(header_changed(tdb))) {
- tdb_allrecord_unlock(tdb, ltype);
- goto again;
- }
-
/* Now check for needing recovery. */
- if (unlikely(tdb_needs_recovery(tdb))) {
+ if (!(flags & TDB_LOCK_NOCHECK) && unlikely(tdb_needs_recovery(tdb))) {
tdb_allrecord_unlock(tdb, ltype);
if (tdb_lock_and_recover(tdb) == -1) {
return -1;
return 0;
}
-int tdb_lock_open(struct tdb_context *tdb)
+int tdb_lock_open(struct tdb_context *tdb, enum tdb_lock_flags flags)
{
- return tdb_nest_lock(tdb, TDB_OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT);
+ return tdb_nest_lock(tdb, TDB_OPEN_LOCK, F_WRLCK, flags);
}
void tdb_unlock_open(struct tdb_context *tdb)
tdb_nest_unlock(tdb, TDB_OPEN_LOCK, F_WRLCK);
}
-/* unlock entire db */
-int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
+bool tdb_has_open_lock(struct tdb_context *tdb)
{
- tdb_off_t hash_size;
+ return find_nestlock(tdb, TDB_OPEN_LOCK) != NULL;
+}
- /* FIXME: There are no locks on read-only dbs */
- if (tdb->read_only) {
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_allrecord_unlock: read-only\n");
- return -1;
- }
+int tdb_lock_expand(struct tdb_context *tdb, int ltype)
+{
+ /* Lock doesn't protect data, so don't check (we recurse if we do!) */
+ return tdb_nest_lock(tdb, TDB_EXPANSION_LOCK, ltype,
+ TDB_LOCK_WAIT | TDB_LOCK_NOCHECK);
+}
+
+void tdb_unlock_expand(struct tdb_context *tdb, int ltype)
+{
+ tdb_nest_unlock(tdb, TDB_EXPANSION_LOCK, ltype);
+}
+/* unlock entire db */
+int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
+{
if (tdb->allrecord_lock.count == 0) {
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_allrecord_unlock: not locked!\n");
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_allrecord_unlock: not locked!");
return -1;
}
/* Upgradable locks are marked as write locks. */
if (tdb->allrecord_lock.ltype != ltype
&& (!tdb->allrecord_lock.off || ltype != F_RDLCK)) {
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_allrecord_unlock: have %s lock\n",
- tdb->allrecord_lock.ltype == F_RDLCK
- ? "read" : "write");
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_allrecord_unlock: have %s lock",
+ tdb->allrecord_lock.ltype == F_RDLCK
+ ? "read" : "write");
return -1;
}
tdb->allrecord_lock.count = 0;
tdb->allrecord_lock.ltype = 0;
- tdb->header_uptodate = false;
-
- hash_size = (1ULL << tdb->header.v.hash_bits);
- return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, hash_size);
-}
-
-bool tdb_has_locks(struct tdb_context *tdb)
-{
- return tdb->allrecord_lock.count || tdb->num_lockrecs;
-}
-
-#if 0
-/* lock entire database with write lock */
-int tdb_lockall(struct tdb_context *tdb)
-{
- tdb_trace(tdb, "tdb_lockall");
- return tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false);
+ return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, 0);
}
-/* lock entire database with write lock - nonblocking varient */
-int tdb_lockall_nonblock(struct tdb_context *tdb)
+bool tdb_has_expansion_lock(struct tdb_context *tdb)
{
- int ret = tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_NOWAIT, false);
- tdb_trace_ret(tdb, "tdb_lockall_nonblock", ret);
- return ret;
+ return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL;
}
-/* unlock entire database with write lock */
-int tdb_unlockall(struct tdb_context *tdb)
+bool tdb_has_hash_locks(struct tdb_context *tdb)
{
- tdb_trace(tdb, "tdb_unlockall");
- return tdb_allrecord_unlock(tdb, F_WRLCK);
-}
+ unsigned int i;
-/* lock entire database with read lock */
-int tdb_lockall_read(struct tdb_context *tdb)
-{
- tdb_trace(tdb, "tdb_lockall_read");
- return tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false);
+ for (i=0; i<tdb->num_lockrecs; i++) {
+ if (tdb->lockrecs[i].off >= TDB_HASH_LOCK_START
+ && tdb->lockrecs[i].off < (TDB_HASH_LOCK_START
+ + TDB_HASH_LOCK_RANGE))
+ return true;
+ }
+ return false;
}
-/* lock entire database with read lock - nonblock varient */
-int tdb_lockall_read_nonblock(struct tdb_context *tdb)
+static bool tdb_has_free_lock(struct tdb_context *tdb)
{
- int ret = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_NOWAIT, false);
- tdb_trace_ret(tdb, "tdb_lockall_read_nonblock", ret);
- return ret;
-}
+ unsigned int i;
-/* unlock entire database with read lock */
-int tdb_unlockall_read(struct tdb_context *tdb)
-{
- tdb_trace(tdb, "tdb_unlockall_read");
- return tdb_allrecord_unlock(tdb, F_RDLCK);
+ for (i=0; i<tdb->num_lockrecs; i++) {
+ if (tdb->lockrecs[i].off
+ > TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE)
+ return true;
+ }
+ return false;
}
-#endif
-/* Returns the list we actually locked. */
-tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
- int ltype, enum tdb_lock_flags waitflag)
+int tdb_lock_hashes(struct tdb_context *tdb,
+ tdb_off_t hash_lock,
+ tdb_len_t hash_range,
+ int ltype, enum tdb_lock_flags waitflag)
{
- tdb_off_t list = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
- /* Header can change ONLY if we had no locks before. */
- bool can_change = tdb->num_lockrecs == 0;
+ /* FIXME: Do this properly, using hlock_range */
+ unsigned lock = TDB_HASH_LOCK_START
+ + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count &&
(ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
- return list;
+ return 0;
}
if (tdb->allrecord_lock.count) {
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_lock_list: have %s allrecordlock\n",
- tdb->allrecord_lock.ltype == F_RDLCK
- ? "read" : "write");
- return TDB_OFF_ERR;
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_lock_hashes: already have %s allrecordlock",
+ tdb->allrecord_lock.ltype == F_RDLCK
+ ? "read" : "write");
+ return -1;
}
-again:
- if (tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag))
- return TDB_OFF_ERR;
-
- if (can_change && unlikely(header_changed(tdb))) {
- tdb_off_t new = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
- if (new != list) {
- tdb_nest_unlock(tdb, TDB_HASH_LOCK_START+list, ltype);
- list = new;
- goto again;
- }
+ if (tdb_has_free_lock(tdb)) {
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_lock_hashes: already have free lock");
+ return -1;
+ }
+
+ if (tdb_has_expansion_lock(tdb)) {
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_lock_hashes: already have expansion lock");
+ return -1;
}
- return list;
+
+ return tdb_nest_lock(tdb, lock, ltype, waitflag);
}
-int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
+int tdb_unlock_hashes(struct tdb_context *tdb,
+ tdb_off_t hash_lock,
+ tdb_len_t hash_range, int ltype)
{
- list &= ((1ULL << tdb->header.v.hash_bits) - 1);
+ unsigned lock = TDB_HASH_LOCK_START
+ + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count) {
if (tdb->allrecord_lock.ltype == F_RDLCK
&& ltype == F_WRLCK) {
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
- "tdb_unlock_list RO allrecord!\n");
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_FATAL,
+ "tdb_unlock_hashes RO allrecord!");
return -1;
}
return 0;
- } else {
- return tdb_nest_unlock(tdb, TDB_HASH_LOCK_START + list, ltype);
}
-}
-/* Free list locks come after hash locks */
-int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
- enum tdb_lock_flags waitflag)
-{
- /* You're supposed to have a hash lock first! */
- if (!(tdb->flags & TDB_NOLOCK) && !tdb_has_locks(tdb)) {
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
- "tdb_lock_free_list without lock!\n");
- return -1;
- }
-
- /* a allrecord lock allows us to avoid per chain locks */
- if (tdb->allrecord_lock.count) {
- if (tdb->allrecord_lock.ltype == F_WRLCK)
- return 0;
- tdb->ecode = TDB_ERR_LOCK;
- tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
- "tdb_lock_free_list with RO allrecordlock!\n");
- return -1;
- }
-
- return tdb_nest_lock(tdb, TDB_HASH_LOCK_START
- + (1ULL << tdb->header.v.hash_bits)
- + flist, F_WRLCK, waitflag);
-}
-
-void tdb_unlock_free_list(struct tdb_context *tdb, tdb_off_t flist)
-{
- if (tdb->allrecord_lock.count)
- return;
-
- tdb_nest_unlock(tdb, TDB_HASH_LOCK_START
- + (1ULL << tdb->header.v.hash_bits)
- + flist, F_WRLCK);
-}
-
-/* Even if the entry isn't in this hash bucket, you'd have to lock this
- * bucket to find it. */
-static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
- int ltype, enum tdb_lock_flags waitflag,
- const char *func)
-{
- int ret;
- uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
-
- ret = tdb_lock_list(tdb, h, ltype, waitflag) == TDB_OFF_ERR ? -1 : 0;
- tdb_trace_1rec(tdb, func, *key);
- return ret;
+ return tdb_nest_unlock(tdb, lock, ltype);
}
-/* lock/unlock one hash chain. This is meant to be used to reduce
- contention - it cannot guarantee how many records will be locked */
-int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
-{
- return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
-}
-
-int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
-{
- uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
- tdb_trace_1rec(tdb, "tdb_chainunlock", key);
- return tdb_unlock_list(tdb, h, F_WRLCK);
-}
-
-#if 0
-/* lock/unlock one hash chain, non-blocking. This is meant to be used
- to reduce contention - it cannot guarantee how many records will be
- locked */
-int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
-{
- return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_NOWAIT,
- "tdb_chainlock_nonblock");
-}
-
-int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
+/* Hash locks use TDB_HASH_LOCK_START + the next 30 bits.
+ * Then we begin; bucket offsets are sizeof(tdb_len_t) apart, so we divide.
+ * The result is that on 32 bit systems we don't use lock values > 2^31 on
+ * files that are less than 4GB.
+ */
+static tdb_off_t free_lock_off(tdb_off_t b_off)
{
- return chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
- "tdb_chainlock_read");
+ return TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE
+ + b_off / sizeof(tdb_off_t);
}
-int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
+int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
+ enum tdb_lock_flags waitflag)
{
- uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
- tdb_trace_1rec(tdb, "tdb_chainunlock_read", key);
- return tdb_unlock_list(tdb, h & ((1ULL << tdb->header.v.hash_bits)-1),
- F_RDLCK);
-}
+ assert(b_off >= sizeof(struct tdb_header));
-/* record lock stops delete underneath */
-int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
-{
- if (tdb->allrecord_lock.count) {
- return 0;
- }
- return off ? tdb_brlock(tdb, F_RDLCK, off, 1, TDB_LOCK_WAIT) : 0;
-}
-
-/*
- Write locks override our own fcntl readlocks, so check it here.
- Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
- an error to fail to get the lock here.
-*/
-int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
-{
- struct tdb_traverse_lock *i;
- for (i = &tdb->travlocks; i; i = i->next)
- if (i->off == off)
- return -1;
+ /* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count) {
- if (tdb->allrecord_lock.ltype == F_WRLCK) {
+ if (tdb->allrecord_lock.ltype == F_WRLCK)
return 0;
- }
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_FATAL,
+ "tdb_lock_free_bucket with RO allrecordlock!");
return -1;
}
- return tdb_brlock(tdb, F_WRLCK, off, 1, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
-}
-
-int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
-{
- if (tdb->allrecord_lock.count) {
- return 0;
- }
- return tdb_brunlock(tdb, F_WRLCK, off, 1);
-}
-/* fcntl locks don't stack: avoid unlocking someone else's */
-int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
-{
- struct tdb_traverse_lock *i;
- uint32_t count = 0;
-
- if (tdb->allrecord_lock.count) {
- return 0;
+#if 0 /* FIXME */
+ if (tdb_has_expansion_lock(tdb)) {
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
+ "tdb_lock_free_bucket: already have expansion lock");
+ return -1;
}
+#endif
- if (off == 0)
- return 0;
- for (i = &tdb->travlocks; i; i = i->next)
- if (i->off == off)
- count++;
- return (count == 1 ? tdb_brunlock(tdb, F_RDLCK, off, 1) : 0);
+ return tdb_nest_lock(tdb, free_lock_off(b_off), F_WRLCK, waitflag);
}
-/* The transaction code uses this to remove all locks. */
-void tdb_release_transaction_locks(struct tdb_context *tdb)
+void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off)
{
- unsigned int i;
-
- if (tdb->allrecord_lock.count != 0) {
- tdb_off_t hash_size, free_size;
-
- hash_size = (1ULL << tdb->header.v.hash_bits)
- * sizeof(tdb_off_t);
- free_size = tdb->header.v.free_zones
- * (tdb->header.v.free_buckets + 1) * sizeof(tdb_off_t);
-
- tdb_brunlock(tdb, tdb->allrecord_lock.ltype,
- tdb->header.v.hash_off, hash_size);
- tdb_brunlock(tdb, tdb->allrecord_lock.ltype,
- tdb->header.v.free_off, free_size);
- tdb->allrecord_lock.count = 0;
- tdb->allrecord_lock.ltype = 0;
- }
-
- for (i = 0; i<tdb->num_lockrecs; i++) {
- struct tdb_lock_type *lck = &tdb->lockrecs[i];
+ if (tdb->allrecord_lock.count)
+ return;
- tdb_brunlock(tdb, lck->ltype, lck->off, 1);
- }
- tdb->num_lockrecs = 0;
- SAFE_FREE(tdb->lockrecs);
- tdb->header_uptodate = false;
+ tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
}
-#endif
void tdb_lock_init(struct tdb_context *tdb)
{