We currently start walking the free list again when we coalesce any record;
this is overzealous, as we only care about the next record being blatted,
or the record we currently consider "best".
We can also opportunistically try to add the coalesced record into the
new free list: if it fails, we go back to the old "mark record,
unlock, re-lock" code.
Before:
$ time ./growtdb-bench 250000 10 > /dev/null && ls -l /tmp/growtdb.tdb && time ./tdbtorture -s 0 && ls -l torture.tdb && ./speed --transaction
2000000
real 1m0.243s
user 0m13.677s
sys 0m4.336s
-rw------- 1 rusty rusty
683302864 2011-04-27 21:03 /tmp/growtdb.tdb
testing with 3 processes, 5000 loops, seed=0
OK
real 1m24.074s
user 0m0.344s
sys 0m0.468s
-rw------- 1 rusty rusty 836040 2011-04-27 21:04 torture.tdb
Adding
2000000 records: 1015 ns (
110551992 bytes)
Finding
2000000 records: 641 ns (
110551992 bytes)
Missing
2000000 records: 445 ns (
110551992 bytes)
Traversing
2000000 records: 439 ns (
110551992 bytes)
Deleting
2000000 records: 807 ns (
199517112 bytes)
Re-adding
2000000 records: 851 ns (
199517112 bytes)
Appending
2000000 records: 1301 ns (
376542552 bytes)
Churning
2000000 records: 2423 ns (
553641304 bytes)
After:
$ time ./growtdb-bench 250000 10 > /dev/null && ls -l /tmp/growtdb.tdb && time ./tdbtorture -s 0 && ls -l torture.tdb && ./speed --transaction
2000000
real 0m57.403s
user 0m11.361s
sys 0m4.056s
-rw------- 1 rusty rusty
689536976 2011-04-27 21:10 /tmp/growtdb.tdb
testing with 3 processes, 5000 loops, seed=0
OK
real 1m24.901s
user 0m0.380s
sys 0m0.512s
-rw------- 1 rusty rusty 655368 2011-04-27 21:12 torture.tdb
Adding
2000000 records: 941 ns (
110551992 bytes)
Finding
2000000 records: 603 ns (
110551992 bytes)
Missing
2000000 records: 428 ns (
110551992 bytes)
Traversing
2000000 records: 416 ns (
110551992 bytes)
Deleting
2000000 records: 741 ns (
199517112 bytes)
Re-adding
2000000 records: 819 ns (
199517112 bytes)
Appending
2000000 records: 1228 ns (
376542552 bytes)
Churning
2000000 records: 2042 ns (
553641304 bytes)
/* List need not be locked. */
enum TDB_ERROR add_free_record(struct tdb_context *tdb,
/* List need not be locked. */
enum TDB_ERROR add_free_record(struct tdb_context *tdb,
- tdb_off_t off, tdb_len_t len_with_header)
+ tdb_off_t off, tdb_len_t len_with_header,
+ enum tdb_lock_flags waitflag)
{
tdb_off_t b_off;
tdb_len_t len;
{
tdb_off_t b_off;
tdb_len_t len;
len = len_with_header - sizeof(struct tdb_used_record);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(len));
len = len_with_header - sizeof(struct tdb_used_record);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(len));
- ecode = tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT);
+ ecode = tdb_lock_free_bucket(tdb, b_off, waitflag);
if (ecode != TDB_SUCCESS) {
return ecode;
}
if (ecode != TDB_SUCCESS) {
return ecode;
}
-/* Note: we unlock the current bucket if we coalesce (> 0) or fail (-ve). */
+/* Note: we unlock the current bucket if fail (-ve), or coalesce (-ve) and
+ * need to blatt either of the *protect records (which is set to an error). */
static tdb_len_t coalesce(struct tdb_context *tdb,
tdb_off_t off, tdb_off_t b_off,
static tdb_len_t coalesce(struct tdb_context *tdb,
tdb_off_t off, tdb_off_t b_off,
+ tdb_len_t data_len,
+ tdb_off_t *protect1,
+ tdb_off_t *protect2)
{
tdb_off_t end;
struct tdb_free_record rec;
{
tdb_off_t end;
struct tdb_free_record rec;
+ /* Did we just mess up a record you were hoping to use? */
+ if (end == *protect1 || end == *protect2)
+ *protect1 = TDB_ERR_NOEXIST;
+
ecode = remove_from_list(tdb, nb_off, end, &rec);
check_list(tdb, nb_off);
if (ecode != TDB_SUCCESS) {
ecode = remove_from_list(tdb, nb_off, end, &rec);
check_list(tdb, nb_off);
if (ecode != TDB_SUCCESS) {
if (end == off + sizeof(struct tdb_used_record) + data_len)
return 0;
if (end == off + sizeof(struct tdb_used_record) + data_len)
return 0;
+ /* Before we expand, check this isn't one you wanted protected? */
+ if (off == *protect1 || off == *protect2)
+ *protect1 = TDB_ERR_EXISTS;
+
/* OK, expand initial record */
ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
if (ecode != TDB_SUCCESS) {
/* OK, expand initial record */
ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
if (ecode != TDB_SUCCESS) {
- /* We have to drop this to avoid deadlocks, so make sure record
- * doesn't get coalesced by someone else! */
- rec.ftable_and_len = (TDB_FTABLE_NONE << (64 - TDB_OFF_UPPER_STEAL))
- | (end - off - sizeof(struct tdb_used_record));
- ecode = tdb_write_off(tdb, off + offsetof(struct tdb_free_record,
- ftable_and_len),
- rec.ftable_and_len);
+ /* Try locking violation first... */
+ ecode = add_free_record(tdb, off, end - off, TDB_LOCK_NOWAIT);
if (ecode != TDB_SUCCESS) {
if (ecode != TDB_SUCCESS) {
+ /* Need to drop lock. Can't rely on anything stable. */
+ *protect1 = TDB_ERR_CORRUPT;
+
+ /* We have to drop this to avoid deadlocks, so make sure record
+ * doesn't get coalesced by someone else! */
+ rec.ftable_and_len = (TDB_FTABLE_NONE
+ << (64 - TDB_OFF_UPPER_STEAL))
+ | (end - off - sizeof(struct tdb_used_record));
+ ecode = tdb_write_off(tdb,
+ off + offsetof(struct tdb_free_record,
+ ftable_and_len),
+ rec.ftable_and_len);
+ if (ecode != TDB_SUCCESS) {
+ goto err;
+ }
- tdb->stats.alloc_coalesce_succeeded++;
- tdb_unlock_free_bucket(tdb, b_off);
+ tdb->stats.alloc_coalesce_succeeded++;
+ tdb_unlock_free_bucket(tdb, b_off);
- ecode = add_free_record(tdb, off, end - off);
- if (ecode != TDB_SUCCESS) {
- return ecode;
+ ecode = add_free_record(tdb, off, end - off, TDB_LOCK_WAIT);
+ if (ecode != TDB_SUCCESS) {
+ return ecode;
+ }
+ } else if (TDB_OFF_IS_ERR(*protect1)) {
+ /* For simplicity, we always drop lock if they can't continue */
+ tdb_unlock_free_bucket(tdb, b_off);
/* Return usable length. */
return end - off - sizeof(struct tdb_used_record);
/* Return usable length. */
return end - off - sizeof(struct tdb_used_record);
tdb_off_t off, b_off,best_off;
struct tdb_free_record best = { 0 };
double multiplier;
tdb_off_t off, b_off,best_off;
struct tdb_free_record best = { 0 };
double multiplier;
+ bool coalesce_after_best = false; /* Damn GCC warning! */
size_t size = adjust_size(keylen, datalen);
enum TDB_ERROR ecode;
size_t size = adjust_size(keylen, datalen);
enum TDB_ERROR ecode;
if (frec_len(r) >= size && frec_len(r) < frec_len(&best)) {
best_off = off;
best = *r;
if (frec_len(r) >= size && frec_len(r) < frec_len(&best)) {
best_off = off;
best = *r;
+ coalesce_after_best = false;
}
if (frec_len(&best) <= size * multiplier && best_off) {
}
if (frec_len(&best) <= size * multiplier && best_off) {
tdb_access_release(tdb, r);
/* Since we're going slow anyway, try coalescing here. */
tdb_access_release(tdb, r);
/* Since we're going slow anyway, try coalescing here. */
- coal = coalesce(tdb, off, b_off, len);
+ coal = coalesce(tdb, off, b_off, len, &best_off, &next);
if (TDB_OFF_IS_ERR(coal)) {
/* This has already unlocked on error. */
return coal;
}
if (TDB_OFF_IS_ERR(coal)) {
/* This has already unlocked on error. */
return coal;
}
+ if (TDB_OFF_IS_ERR(best_off)) {
/* This has unlocked list, restart. */
goto again;
}
/* This has unlocked list, restart. */
goto again;
}
+ if (coal > 0)
+ coalesce_after_best = true;
struct tdb_used_record rec;
size_t leftover;
struct tdb_used_record rec;
size_t leftover;
+ /* If we coalesced, we might have change prev/next ptrs. */
+ if (coalesce_after_best) {
+ ecode = tdb_read_convert(tdb, best_off, &best,
+ sizeof(best));
+ if (ecode != TDB_SUCCESS)
+ goto unlock_err;
+ }
+
/* We're happy with this size: take it. */
ecode = remove_from_list(tdb, b_off, best_off, &best);
check_list(tdb, b_off);
/* We're happy with this size: take it. */
ecode = remove_from_list(tdb, b_off, best_off, &best);
check_list(tdb, b_off);
ecode = add_free_record(tdb,
best_off + sizeof(rec)
+ frec_len(&best) - leftover,
ecode = add_free_record(tdb,
best_off + sizeof(rec)
+ frec_len(&best) - leftover,
+ leftover, TDB_LOCK_WAIT);
if (ecode != TDB_SUCCESS) {
best_off = ecode;
}
if (ecode != TDB_SUCCESS) {
best_off = ecode;
}
tdb_unlock_expand(tdb, F_WRLCK);
tdb->stats.expands++;
tdb_unlock_expand(tdb, F_WRLCK);
tdb->stats.expands++;
- return add_free_record(tdb, old_size, wanted);
+ return add_free_record(tdb, old_size, wanted, TDB_LOCK_WAIT);
}
/* This won't fail: it will expand the database if it has to. */
}
/* This won't fail: it will expand the database if it has to. */
/* Put this record in a free list. */
enum TDB_ERROR add_free_record(struct tdb_context *tdb,
/* Put this record in a free list. */
enum TDB_ERROR add_free_record(struct tdb_context *tdb,
- tdb_off_t off, tdb_len_t len_with_header);
+ tdb_off_t off, tdb_len_t len_with_header,
+ enum tdb_lock_flags waitflag);
/* Set up header for a used/ftable/htable/chain record. */
enum TDB_ERROR set_header(struct tdb_context *tdb,
/* Set up header for a used/ftable/htable/chain record. */
enum TDB_ERROR set_header(struct tdb_context *tdb,
tdb->stats.frees++;
ecode = add_free_record(tdb, old_off,
sizeof(struct tdb_used_record)
tdb->stats.frees++;
ecode = add_free_record(tdb, old_off,
sizeof(struct tdb_used_record)
- + key.dsize + old_room);
+ + key.dsize + old_room,
+ TDB_LOCK_WAIT);
if (ecode == TDB_SUCCESS)
ecode = replace_in_hash(tdb, h, new_off);
} else {
if (ecode == TDB_SUCCESS)
ecode = replace_in_hash(tdb, h, new_off);
} else {
sizeof(struct tdb_used_record)
+ rec_key_length(&rec)
+ rec_data_length(&rec)
sizeof(struct tdb_used_record)
+ rec_key_length(&rec)
+ rec_data_length(&rec)
- + rec_extra_padding(&rec));
+ + rec_extra_padding(&rec),
+ TDB_LOCK_WAIT);
if (tdb->flags & TDB_SEQNUM)
tdb_inc_seqnum(tdb);
if (tdb->flags & TDB_SEQNUM)
tdb_inc_seqnum(tdb);
{
tdb->ftable_off = freetable->base.off;
tdb->ftable = ftable;
{
tdb->ftable_off = freetable->base.off;
tdb->ftable = ftable;
- add_free_record(tdb, eoff, sizeof(struct tdb_used_record) + elen);
+ add_free_record(tdb, eoff, sizeof(struct tdb_used_record) + elen,
+ TDB_LOCK_WAIT);
}
static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned ingroup)
}
static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned ingroup)
int main(int argc, char *argv[])
{
int main(int argc, char *argv[])
{
struct tdb_context *tdb;
struct tdb_layout *layout;
struct tdb_data data, key;
tdb_len_t len;
/* FIXME: Test TDB_CONVERT */
struct tdb_context *tdb;
struct tdb_layout *layout;
struct tdb_data data, key;
tdb_len_t len;
/* FIXME: Test TDB_CONVERT */
+ /* FIXME: Test lock order fail. */
data = tdb_mkdata("world", 5);
key = tdb_mkdata("hello", 5);
data = tdb_mkdata("world", 5);
key = tdb_mkdata("hello", 5);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(len));
/* Lock and fail to coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(len));
/* Lock and fail to coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
- ok1(coalesce(tdb, layout->elem[1].base.off, b_off, len) == 0);
+ test = layout->elem[1].base.off;
+ ok1(coalesce(tdb, layout->elem[1].base.off, b_off, len, &test, &test)
+ == 0);
tdb_unlock_free_bucket(tdb, b_off);
ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
tdb_unlock_free_bucket(tdb, b_off);
ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
+ ok1(test == layout->elem[1].base.off);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
tdb_layout_free(layout);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
tdb_layout_free(layout);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and fail to coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and fail to coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
- ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024) == 0);
+ test = layout->elem[1].base.off;
+ ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
+ == 0);
tdb_unlock_free_bucket(tdb, b_off);
ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
tdb_unlock_free_bucket(tdb, b_off);
ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
+ ok1(test == layout->elem[1].base.off);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
tdb_layout_free(layout);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
tdb_layout_free(layout);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
- ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024)
+ test = layout->elem[2].base.off;
+ ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
== 1024 + sizeof(struct tdb_used_record) + 2048);
== 1024 + sizeof(struct tdb_used_record) + 2048);
+ /* Should tell us it's erased this one... */
+ ok1(test == TDB_ERR_NOEXIST);
ok1(tdb->file->allrecord_lock.count == 0 && tdb->file->num_lockrecs == 0);
ok1(free_record_length(tdb, layout->elem[1].base.off)
== 1024 + sizeof(struct tdb_used_record) + 2048);
ok1(tdb->file->allrecord_lock.count == 0 && tdb->file->num_lockrecs == 0);
ok1(free_record_length(tdb, layout->elem[1].base.off)
== 1024 + sizeof(struct tdb_used_record) + 2048);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
- ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024)
+ test = layout->elem[2].base.off;
+ ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
== 1024 + sizeof(struct tdb_used_record) + 512);
ok1(tdb->file->allrecord_lock.count == 0 && tdb->file->num_lockrecs == 0);
ok1(free_record_length(tdb, layout->elem[1].base.off)
== 1024 + sizeof(struct tdb_used_record) + 512);
== 1024 + sizeof(struct tdb_used_record) + 512);
ok1(tdb->file->allrecord_lock.count == 0 && tdb->file->num_lockrecs == 0);
ok1(free_record_length(tdb, layout->elem[1].base.off)
== 1024 + sizeof(struct tdb_used_record) + 512);
+ ok1(test == TDB_ERR_NOEXIST);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
tdb_layout_free(layout);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
tdb_layout_free(layout);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(1024));
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
- ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024) ==
- 1024 + sizeof(struct tdb_used_record) + 512
+ test = layout->elem[2].base.off;
+ ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
+ == 1024 + sizeof(struct tdb_used_record) + 512
+ sizeof(struct tdb_used_record) + 256);
ok1(tdb->file->allrecord_lock.count == 0
&& tdb->file->num_lockrecs == 0);
+ sizeof(struct tdb_used_record) + 256);
ok1(tdb->file->allrecord_lock.count == 0
&& tdb->file->num_lockrecs == 0);
sizeof(struct tdb_used_record)
+ rec_key_length(&rec)
+ rec_data_length(&rec)
sizeof(struct tdb_used_record)
+ rec_key_length(&rec)
+ rec_data_length(&rec)
- + rec_extra_padding(&rec)) == 0);
+ + rec_extra_padding(&rec),
+ TDB_LOCK_NOWAIT) == 0);
ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
F_WRLCK) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
F_WRLCK) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
if (recovery_head != 0) {
tdb->stats.frees++;
ecode = add_free_record(tdb, recovery_head,
if (recovery_head != 0) {
tdb->stats.frees++;
ecode = add_free_record(tdb, recovery_head,
- sizeof(rec) + rec.max_len);
+ sizeof(rec) + rec.max_len,
+ TDB_LOCK_WAIT);
if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_recovery_allocate:"
if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_recovery_allocate:"