/* lock/unlock entire database. It can only be upgradable if you have some
* other way of guaranteeing exclusivity (ie. transaction write lock).
* Note that we don't lock the free chains: noone can get those locks
- * without a hash chain lock first. */
+ * without a hash chain lock first.
+ * The header *will be* up to date once this returns success. */
int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
enum tdb_lock_flags flags, bool upgradable)
{
return -1;
}
+ tdb->allrecord_lock.count = 1;
+ /* If it's upgradable, it's actually exclusive so we can treat
+ * it as a write lock. */
+ tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
+ tdb->allrecord_lock.off = upgradable;
+
/* Now we re-check header, holding lock. */
- if (unlikely(update_header(tdb))) {
- tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, hash_size);
+ if (unlikely(header_changed(tdb))) {
+ tdb_allrecord_unlock(tdb, ltype);
goto again;
}
/* Now check for needing recovery. */
if (unlikely(tdb_needs_recovery(tdb))) {
- tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, hash_size);
+ tdb_allrecord_unlock(tdb, ltype);
if (tdb_lock_and_recover(tdb) == -1) {
return -1;
}
goto again;
}
-
- tdb->allrecord_lock.count = 1;
- /* If it's upgradable, it's actually exclusive so we can treat
- * it as a write lock. */
- tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
- tdb->allrecord_lock.off = upgradable;
return 0;
}
tdb->allrecord_lock.count = 0;
tdb->allrecord_lock.ltype = 0;
+ tdb->header_uptodate = false;
hash_size = (1ULL << tdb->header.v.hash_bits);
}
#endif
-int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list,
- int ltype, enum tdb_lock_flags waitflag)
+/* Returns the list we actually locked. */
+tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
+ int ltype, enum tdb_lock_flags waitflag)
{
+ tdb_off_t list = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
+ /* Header can change ONLY if we had no locks before. */
+ bool can_change = tdb->num_lockrecs == 0;
+
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count &&
(ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
- return 0;
+ return list;
}
if (tdb->allrecord_lock.count) {
"tdb_lock_list: have %s allrecordlock\n",
tdb->allrecord_lock.ltype == F_RDLCK
? "read" : "write");
- return -1;
+ return TDB_OFF_ERR;
}
- /* FIXME: Should we do header_uptodate and return retry here? */
- return tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag);
+again:
+ if (tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag))
+ return TDB_OFF_ERR;
+
+ if (can_change && unlikely(header_changed(tdb))) {
+ tdb_off_t new = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
+ if (new != list) {
+ tdb_nest_unlock(tdb, TDB_HASH_LOCK_START+list, ltype);
+ list = new;
+ goto again;
+ }
+ }
+ return list;
}
int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
{
+ list &= ((1ULL << tdb->header.v.hash_bits) - 1);
+
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count) {
if (tdb->allrecord_lock.ltype == F_RDLCK
enum tdb_lock_flags waitflag)
{
/* You're supposed to have a hash lock first! */
- if (!tdb_has_locks(tdb)) {
+ if (!(tdb->flags & TDB_NOLOCK) && !tdb_has_locks(tdb)) {
tdb->ecode = TDB_ERR_LOCK;
tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
"tdb_lock_free_list without lock!\n");
}
#if 0
-static int chainlock_loop(struct tdb_context *tdb, const TDB_DATA *key,
- int ltype, enum tdb_lock_flags waitflag,
- const char *func)
+static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
+ int ltype, enum tdb_lock_flags waitflag,
+ const char *func)
{
int ret;
uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
-again:
- ret = tdb_lock_list(tdb,
- h & ((1ULL << tdb->header.v.hash_bits) - 1),
- ltype, waitflag);
- if (likely(ret == 0) && unlikely(update_header(tdb))) {
- tdb_unlock_list(tdb, h & ((1ULL << tdb->header.v.hash_bits)-1),
- ltype);
- goto again;
- }
-
+ ret = tdb_lock_list(tdb, h, ltype, waitflag) == TDB_OFF_ERR ? -1 : 0;
tdb_trace_1rec(tdb, func, *key);
return ret;
}
contention - it cannot guarantee how many records will be locked */
int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
{
- return chainlock_loop(tdb, &key, F_WRLCK, TDB_LOCK_WAIT,
- "tdb_chainlock");
+ return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
}
/* lock/unlock one hash chain, non-blocking. This is meant to be used
locked */
int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
{
- return chainlock_loop(tdb, &key, F_WRLCK, TDB_LOCK_NOWAIT,
- "tdb_chainlock_nonblock");
+ return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_NOWAIT,
+ "tdb_chainlock_nonblock");
}
int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
{
- return chainlock_loop(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
- "tdb_chainlock_read");
+ return chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
+ "tdb_chainlock_read");
}
int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
tdb->header_uptodate = false;
}
#endif
+
+void tdb_lock_init(struct tdb_context *tdb)
+{
+ tdb->num_lockrecs = 0;
+ tdb->lockrecs = NULL;
+ tdb->allrecord_lock.count = 0;
+}