#include <assert.h>
#include <ccan/build_assert/build_assert.h>
+/* If we were threaded, we could wait for unlock, but we're not, so fail. */
+static bool owner_conflict(struct tdb_context *tdb, struct tdb_lock *lock)
+{
+ if (lock->owner != tdb) {
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
+ "Lock already owned by another opener");
+ return true;
+ }
+ return false;
+}
+
static int fcntl_lock(struct tdb_context *tdb,
int rw, off_t off, off_t len, bool waitflag)
{
"tdb_allrecord_upgrade failed");
}
-static struct tdb_lock *find_nestlock(struct tdb_context *tdb, tdb_off_t offset)
+static struct tdb_lock *find_nestlock(struct tdb_context *tdb, tdb_off_t offset,
+ const struct tdb_context *owner)
{
unsigned int i;
for (i=0; i<tdb->file->num_lockrecs; i++) {
if (tdb->file->lockrecs[i].off == offset) {
+ if (owner && tdb->file->lockrecs[i].owner != owner)
+ return NULL;
return &tdb->file->lockrecs[i];
}
}
add_stat(tdb, locks, 1);
- new_lck = find_nestlock(tdb, offset);
+ new_lck = find_nestlock(tdb, offset, NULL);
if (new_lck) {
+ if (owner_conflict(tdb, new_lck))
+ return -1;
+
if (new_lck->ltype == F_RDLCK && ltype == F_WRLCK) {
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
"tdb_nest_lock:"
}
}
+ tdb->file->lockrecs[tdb->file->num_lockrecs].owner = tdb;
tdb->file->lockrecs[tdb->file->num_lockrecs].off = offset;
tdb->file->lockrecs[tdb->file->num_lockrecs].count = 1;
tdb->file->lockrecs[tdb->file->num_lockrecs].ltype = ltype;
if (tdb->flags & TDB_NOLOCK)
return TDB_SUCCESS;
- lck = find_nestlock(tdb, off);
+ lck = find_nestlock(tdb, off, tdb);
if ((lck == NULL) || (lck->count == 0)) {
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
"tdb_nest_unlock: no lock for %zu",
return ecode;
}
+ tdb->file->allrecord_lock.owner = tdb;
tdb->file->allrecord_lock.count = 1;
/* If it's upgradable, it's actually exclusive so we can treat
* it as a write lock. */
bool tdb_has_open_lock(struct tdb_context *tdb)
{
return !(tdb->flags & TDB_NOLOCK)
- && find_nestlock(tdb, TDB_OPEN_LOCK) != NULL;
+ && find_nestlock(tdb, TDB_OPEN_LOCK, tdb) != NULL;
}
enum TDB_ERROR tdb_lock_expand(struct tdb_context *tdb, int ltype)
return;
}
+ if (tdb->file->allrecord_lock.owner != tdb) {
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
+ "tdb_allrecord_unlock: not locked by us!");
+ return;
+ }
+
/* Upgradable locks are marked as write locks. */
if (tdb->file->allrecord_lock.ltype != ltype
&& (!tdb->file->allrecord_lock.off || ltype != F_RDLCK)) {
bool tdb_has_expansion_lock(struct tdb_context *tdb)
{
- return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL;
+ return find_nestlock(tdb, TDB_EXPANSION_LOCK, tdb) != NULL;
}
bool tdb_has_hash_locks(struct tdb_context *tdb)
tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
}
+
+void tdb_unlock_all(struct tdb_context *tdb)
+{
+ unsigned int i;
+
+ while (tdb->file->allrecord_lock.count
+ && tdb->file->allrecord_lock.owner == tdb) {
+ tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
+ }
+
+ for (i=0; i<tdb->file->num_lockrecs; i++) {
+ if (tdb->file->lockrecs[i].owner == tdb) {
+ tdb_nest_unlock(tdb,
+ tdb->file->lockrecs[i].off,
+ tdb->file->lockrecs[i].ltype);
+ i--;
+ }
+ }
+}