/* List need not be locked. */
enum TDB_ERROR add_free_record(struct tdb_context *tdb,
- tdb_off_t off, tdb_len_t len_with_header)
+ tdb_off_t off, tdb_len_t len_with_header,
+ enum tdb_lock_flags waitflag)
{
tdb_off_t b_off;
tdb_len_t len;
len = len_with_header - sizeof(struct tdb_used_record);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(len));
- ecode = tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT);
+ ecode = tdb_lock_free_bucket(tdb, b_off, waitflag);
if (ecode != TDB_SUCCESS) {
return ecode;
}
return off;
}
-/* Note: we unlock the current bucket if we coalesce (> 0) or fail (-ve). */
+/* Note: we unlock the current bucket if fail (-ve), or coalesce (-ve) and
+ * need to blatt either of the *protect records (which is set to an error). */
static tdb_len_t coalesce(struct tdb_context *tdb,
tdb_off_t off, tdb_off_t b_off,
- tdb_len_t data_len)
+ tdb_len_t data_len,
+ tdb_off_t *protect1,
+ tdb_off_t *protect2)
{
tdb_off_t end;
struct tdb_free_record rec;
break;
}
+ /* Did we just mess up a record you were hoping to use? */
+ if (end == *protect1 || end == *protect2)
+ *protect1 = TDB_ERR_NOEXIST;
+
ecode = remove_from_list(tdb, nb_off, end, &rec);
check_list(tdb, nb_off);
if (ecode != TDB_SUCCESS) {
if (end == off + sizeof(struct tdb_used_record) + data_len)
return 0;
+ /* Before we expand, check this isn't one you wanted protected? */
+ if (off == *protect1 || off == *protect2)
+ *protect1 = TDB_ERR_EXISTS;
+
/* OK, expand initial record */
ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
if (ecode != TDB_SUCCESS) {
goto err;
}
- /* We have to drop this to avoid deadlocks, so make sure record
- * doesn't get coalesced by someone else! */
- rec.ftable_and_len = (TDB_FTABLE_NONE << (64 - TDB_OFF_UPPER_STEAL))
- | (end - off - sizeof(struct tdb_used_record));
- ecode = tdb_write_off(tdb, off + offsetof(struct tdb_free_record,
- ftable_and_len),
- rec.ftable_and_len);
+ /* Try locking violation first... */
+ ecode = add_free_record(tdb, off, end - off, TDB_LOCK_NOWAIT);
if (ecode != TDB_SUCCESS) {
- goto err;
- }
+ /* Need to drop lock. Can't rely on anything stable. */
+ *protect1 = TDB_ERR_CORRUPT;
+
+ /* We have to drop this to avoid deadlocks, so make sure record
+ * doesn't get coalesced by someone else! */
+ rec.ftable_and_len = (TDB_FTABLE_NONE
+ << (64 - TDB_OFF_UPPER_STEAL))
+ | (end - off - sizeof(struct tdb_used_record));
+ ecode = tdb_write_off(tdb,
+ off + offsetof(struct tdb_free_record,
+ ftable_and_len),
+ rec.ftable_and_len);
+ if (ecode != TDB_SUCCESS) {
+ goto err;
+ }
- tdb->stats.alloc_coalesce_succeeded++;
- tdb_unlock_free_bucket(tdb, b_off);
+ tdb->stats.alloc_coalesce_succeeded++;
+ tdb_unlock_free_bucket(tdb, b_off);
- ecode = add_free_record(tdb, off, end - off);
- if (ecode != TDB_SUCCESS) {
- return ecode;
+ ecode = add_free_record(tdb, off, end - off, TDB_LOCK_WAIT);
+ if (ecode != TDB_SUCCESS) {
+ return ecode;
+ }
+ } else if (TDB_OFF_IS_ERR(*protect1)) {
+ /* For simplicity, we always drop lock if they can't continue */
+ tdb_unlock_free_bucket(tdb, b_off);
}
+
/* Return usable length. */
return end - off - sizeof(struct tdb_used_record);
tdb_off_t off, b_off,best_off;
struct tdb_free_record best = { 0 };
double multiplier;
+ bool coalesce_after_best = false; /* Damn GCC warning! */
size_t size = adjust_size(keylen, datalen);
enum TDB_ERROR ecode;
if (frec_len(r) >= size && frec_len(r) < frec_len(&best)) {
best_off = off;
best = *r;
+ coalesce_after_best = false;
}
if (frec_len(&best) <= size * multiplier && best_off) {
tdb_access_release(tdb, r);
/* Since we're going slow anyway, try coalescing here. */
- coal = coalesce(tdb, off, b_off, len);
+ coal = coalesce(tdb, off, b_off, len, &best_off, &next);
if (TDB_OFF_IS_ERR(coal)) {
/* This has already unlocked on error. */
return coal;
}
- if (coal > 0) {
+ if (TDB_OFF_IS_ERR(best_off)) {
/* This has unlocked list, restart. */
goto again;
}
+ if (coal > 0)
+ coalesce_after_best = true;
off = next;
}
struct tdb_used_record rec;
size_t leftover;
+ /* If we coalesced, we might have change prev/next ptrs. */
+ if (coalesce_after_best) {
+ ecode = tdb_read_convert(tdb, best_off, &best,
+ sizeof(best));
+ if (ecode != TDB_SUCCESS)
+ goto unlock_err;
+ }
+
/* We're happy with this size: take it. */
ecode = remove_from_list(tdb, b_off, best_off, &best);
check_list(tdb, b_off);
ecode = add_free_record(tdb,
best_off + sizeof(rec)
+ frec_len(&best) - leftover,
- leftover);
+ leftover, TDB_LOCK_WAIT);
if (ecode != TDB_SUCCESS) {
best_off = ecode;
}
tdb_unlock_expand(tdb, F_WRLCK);
tdb->stats.expands++;
- return add_free_record(tdb, old_size, wanted);
+ return add_free_record(tdb, old_size, wanted, TDB_LOCK_WAIT);
}
/* This won't fail: it will expand the database if it has to. */
/* Put this record in a free list. */
enum TDB_ERROR add_free_record(struct tdb_context *tdb,
- tdb_off_t off, tdb_len_t len_with_header);
+ tdb_off_t off, tdb_len_t len_with_header,
+ enum tdb_lock_flags waitflag);
/* Set up header for a used/ftable/htable/chain record. */
enum TDB_ERROR set_header(struct tdb_context *tdb,
tdb->stats.frees++;
ecode = add_free_record(tdb, old_off,
sizeof(struct tdb_used_record)
- + key.dsize + old_room);
+ + key.dsize + old_room,
+ TDB_LOCK_WAIT);
if (ecode == TDB_SUCCESS)
ecode = replace_in_hash(tdb, h, new_off);
} else {
sizeof(struct tdb_used_record)
+ rec_key_length(&rec)
+ rec_data_length(&rec)
- + rec_extra_padding(&rec));
+ + rec_extra_padding(&rec),
+ TDB_LOCK_WAIT);
if (tdb->flags & TDB_SEQNUM)
tdb_inc_seqnum(tdb);
{
tdb->ftable_off = freetable->base.off;
tdb->ftable = ftable;
- add_free_record(tdb, eoff, sizeof(struct tdb_used_record) + elen);
+ add_free_record(tdb, eoff, sizeof(struct tdb_used_record) + elen,
+ TDB_LOCK_WAIT);
}
static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned ingroup)
int main(int argc, char *argv[])
{
- tdb_off_t b_off;
+ tdb_off_t b_off, test;
struct tdb_context *tdb;
struct tdb_layout *layout;
struct tdb_data data, key;
tdb_len_t len;
/* FIXME: Test TDB_CONVERT */
+ /* FIXME: Test lock order fail. */
- plan_tests(38);
+ plan_tests(42);
data = tdb_mkdata("world", 5);
key = tdb_mkdata("hello", 5);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(len));
/* Lock and fail to coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
- ok1(coalesce(tdb, layout->elem[1].base.off, b_off, len) == 0);
+ test = layout->elem[1].base.off;
+ ok1(coalesce(tdb, layout->elem[1].base.off, b_off, len, &test, &test)
+ == 0);
tdb_unlock_free_bucket(tdb, b_off);
ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
+ ok1(test == layout->elem[1].base.off);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
tdb_layout_free(layout);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and fail to coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
- ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024) == 0);
+ test = layout->elem[1].base.off;
+ ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
+ == 0);
tdb_unlock_free_bucket(tdb, b_off);
ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
+ ok1(test == layout->elem[1].base.off);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
tdb_layout_free(layout);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
- ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024)
+ test = layout->elem[2].base.off;
+ ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
== 1024 + sizeof(struct tdb_used_record) + 2048);
+ /* Should tell us it's erased this one... */
+ ok1(test == TDB_ERR_NOEXIST);
ok1(tdb->file->allrecord_lock.count == 0 && tdb->file->num_lockrecs == 0);
ok1(free_record_length(tdb, layout->elem[1].base.off)
== 1024 + sizeof(struct tdb_used_record) + 2048);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
- ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024)
+ test = layout->elem[2].base.off;
+ ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
== 1024 + sizeof(struct tdb_used_record) + 512);
ok1(tdb->file->allrecord_lock.count == 0 && tdb->file->num_lockrecs == 0);
ok1(free_record_length(tdb, layout->elem[1].base.off)
== 1024 + sizeof(struct tdb_used_record) + 512);
+ ok1(test == TDB_ERR_NOEXIST);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
tdb_layout_free(layout);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
- ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024) ==
- 1024 + sizeof(struct tdb_used_record) + 512
+ test = layout->elem[2].base.off;
+ ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
+ == 1024 + sizeof(struct tdb_used_record) + 512
+ sizeof(struct tdb_used_record) + 256);
ok1(tdb->file->allrecord_lock.count == 0
&& tdb->file->num_lockrecs == 0);
sizeof(struct tdb_used_record)
+ rec_key_length(&rec)
+ rec_data_length(&rec)
- + rec_extra_padding(&rec)) == 0);
+ + rec_extra_padding(&rec),
+ TDB_LOCK_NOWAIT) == 0);
ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
F_WRLCK) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
if (recovery_head != 0) {
tdb->stats.frees++;
ecode = add_free_record(tdb, recovery_head,
- sizeof(rec) + rec.max_len);
+ sizeof(rec) + rec.max_len,
+ TDB_LOCK_WAIT);
if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_recovery_allocate:"