#include <assert.h>
#include <ccan/build_assert/build_assert.h>
+/* If we were threaded, we could wait for unlock, but we're not, so fail. */
+static enum TDB_ERROR owner_conflict(struct tdb_context *tdb, const char *call)
+{
+ return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
+ "%s: lock owned by another tdb in this process.",
+ call);
+}
+
static int fcntl_lock(struct tdb_context *tdb,
int rw, off_t off, off_t len, bool waitflag)
{
" already upgraded?");
}
+ if (tdb->file->allrecord_lock.owner != tdb) {
+ return owner_conflict(tdb, "tdb_allrecord_upgrade");
+ }
+
while (count--) {
struct timeval tv;
if (tdb_brlock(tdb, F_WRLCK,
"tdb_allrecord_upgrade failed");
}
-static struct tdb_lock_type *find_nestlock(struct tdb_context *tdb,
- tdb_off_t offset)
+static struct tdb_lock *find_nestlock(struct tdb_context *tdb, tdb_off_t offset,
+ const struct tdb_context *owner)
{
unsigned int i;
for (i=0; i<tdb->file->num_lockrecs; i++) {
if (tdb->file->lockrecs[i].off == offset) {
+ if (owner && tdb->file->lockrecs[i].owner != owner)
+ return NULL;
return &tdb->file->lockrecs[i];
}
}
tdb_off_t offset, int ltype,
enum tdb_lock_flags flags)
{
- struct tdb_lock_type *new_lck;
+ struct tdb_lock *new_lck;
enum TDB_ERROR ecode;
if (offset > (TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE
add_stat(tdb, locks, 1);
- new_lck = find_nestlock(tdb, offset);
+ new_lck = find_nestlock(tdb, offset, NULL);
if (new_lck) {
+ if (new_lck->owner != tdb) {
+ return owner_conflict(tdb, "tdb_nest_lock");
+ }
+
if (new_lck->ltype == F_RDLCK && ltype == F_WRLCK) {
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
"tdb_nest_lock:"
"tdb_nest_lock: already have a hash lock?");
}
- new_lck = (struct tdb_lock_type *)realloc(
+ new_lck = (struct tdb_lock *)realloc(
tdb->file->lockrecs,
sizeof(*tdb->file->lockrecs) * (tdb->file->num_lockrecs+1));
if (new_lck == NULL) {
}
}
+ tdb->file->lockrecs[tdb->file->num_lockrecs].owner = tdb;
tdb->file->lockrecs[tdb->file->num_lockrecs].off = offset;
tdb->file->lockrecs[tdb->file->num_lockrecs].count = 1;
tdb->file->lockrecs[tdb->file->num_lockrecs].ltype = ltype;
static enum TDB_ERROR tdb_nest_unlock(struct tdb_context *tdb,
tdb_off_t off, int ltype)
{
- struct tdb_lock_type *lck;
+ struct tdb_lock *lck;
enum TDB_ERROR ecode;
if (tdb->flags & TDB_NOLOCK)
return TDB_SUCCESS;
- lck = find_nestlock(tdb, off);
+ lck = find_nestlock(tdb, off, tdb);
if ((lck == NULL) || (lck->count == 0)) {
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
"tdb_nest_unlock: no lock for %zu",
enum TDB_ERROR ecode;
tdb_bool_err berr;
- if (tdb->file->allrecord_lock.count
- && (ltype == F_RDLCK
- || tdb->file->allrecord_lock.ltype == F_WRLCK)) {
- tdb->file->allrecord_lock.count++;
- return TDB_SUCCESS;
- }
-
if (tdb->file->allrecord_lock.count) {
+ if (tdb->file->allrecord_lock.owner != tdb) {
+ return owner_conflict(tdb, "tdb_allrecord_lock");
+ }
+
+ if (ltype == F_RDLCK
+ || tdb->file->allrecord_lock.ltype == F_WRLCK) {
+ tdb->file->allrecord_lock.count++;
+ return TDB_SUCCESS;
+ }
+
/* a global lock of a different type exists */
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
"tdb_allrecord_lock: already have %s lock",
return ecode;
}
+ tdb->file->allrecord_lock.owner = tdb;
tdb->file->allrecord_lock.count = 1;
/* If it's upgradable, it's actually exclusive so we can treat
* it as a write lock. */
bool tdb_has_open_lock(struct tdb_context *tdb)
{
return !(tdb->flags & TDB_NOLOCK)
- && find_nestlock(tdb, TDB_OPEN_LOCK) != NULL;
+ && find_nestlock(tdb, TDB_OPEN_LOCK, tdb) != NULL;
}
enum TDB_ERROR tdb_lock_expand(struct tdb_context *tdb, int ltype)
return;
}
+ if (tdb->file->allrecord_lock.owner != tdb) {
+ tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
+ "tdb_allrecord_unlock: not locked by us!");
+ return;
+ }
+
/* Upgradable locks are marked as write locks. */
if (tdb->file->allrecord_lock.ltype != ltype
&& (!tdb->file->allrecord_lock.off || ltype != F_RDLCK)) {
bool tdb_has_expansion_lock(struct tdb_context *tdb)
{
- return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL;
+ return find_nestlock(tdb, TDB_EXPANSION_LOCK, tdb) != NULL;
}
bool tdb_has_hash_locks(struct tdb_context *tdb)
+ (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
/* a allrecord lock allows us to avoid per chain locks */
- if (tdb->file->allrecord_lock.count &&
- (ltype == tdb->file->allrecord_lock.ltype || ltype == F_RDLCK)) {
- return TDB_SUCCESS;
- }
-
if (tdb->file->allrecord_lock.count) {
+ if (tdb->file->allrecord_lock.owner != tdb)
+ return owner_conflict(tdb, "tdb_lock_hashes");
+ if (ltype == tdb->file->allrecord_lock.ltype
+ || ltype == F_RDLCK) {
+ return TDB_SUCCESS;
+ }
+
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
"tdb_lock_hashes:"
" already have %s allrecordlock",
tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
}
+
+void tdb_unlock_all(struct tdb_context *tdb)
+{
+ unsigned int i;
+
+ while (tdb->file->allrecord_lock.count
+ && tdb->file->allrecord_lock.owner == tdb) {
+ tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
+ }
+
+ for (i=0; i<tdb->file->num_lockrecs; i++) {
+ if (tdb->file->lockrecs[i].owner == tdb) {
+ tdb_nest_unlock(tdb,
+ tdb->file->lockrecs[i].off,
+ tdb->file->lockrecs[i].ltype);
+ i--;
+ }
+ }
+}