int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
{
+ list &= ((1ULL << tdb->header.v.hash_bits) - 1);
+
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count) {
if (tdb->allrecord_lock.ltype == F_RDLCK
tdb_io_init(tdb);
tdb_lock_init(tdb);
- /* FIXME */
- if (attr) {
- tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
- "tdb_open: attributes not yet supported\n");
- errno = EINVAL;
- goto fail;
+ while (attr) {
+ switch (attr->base.attr) {
+ case TDB_ATTRIBUTE_LOG:
+ tdb->log = attr->log.log_fn;
+ tdb->log_priv = attr->log.log_private;
+ break;
+ case TDB_ATTRIBUTE_HASH:
+ tdb->khash = attr->hash.hash_fn;
+ tdb->hash_priv = attr->hash.hash_private;
+ break;
+ default:
+ tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+ "tdb_open: unknown attribute type %u\n",
+ attr->base.attr);
+ errno = EINVAL;
+ goto fail;
+ }
+ attr = attr->base.next;
}
if ((open_flags & O_ACCMODE) == O_WRONLY) {
uint64_t keylen;
const unsigned char *rkey;
+ list &= ((1ULL << tdb->header.v.hash_bits) - 1);
+
off = tdb_read_off(tdb, tdb->header.v.hash_off
+ list * sizeof(tdb_off_t));
if (off == 0 || off == TDB_OFF_ERR)
tdb_off_t i;
for (i = list; i < list + num; i++) {
- if (tdb_lock_list(tdb, i, ltype, TDB_LOCK_WAIT) != 0) {
+ if (tdb_lock_list(tdb, i, ltype, TDB_LOCK_WAIT)
+ == TDB_OFF_ERR) {
unlock_lists(tdb, list, i - list, ltype);
return -1;
}
static tdb_len_t relock_hash_to_zero(struct tdb_context *tdb,
tdb_off_t start, int ltype)
{
- tdb_len_t num, len, pre_locks;
+ tdb_len_t num, len;
again:
num = 1ULL << tdb->header.v.hash_bits;
if (unlikely(len == num - start)) {
/* We hit the end of the hash range. Drop lock: we have
to lock start of hash first. */
+ tdb_len_t pre_locks;
+
tdb_unlock_list(tdb, start, ltype);
+
/* Grab something, so header is stable. */
if (tdb_lock_list(tdb, 0, ltype, TDB_LOCK_WAIT))
return TDB_OFF_ERR;
- len = tdb_find_zero_off(tdb, hash_off(tdb, 0), num);
- if (lock_lists(tdb, 1, len, ltype) == -1) {
+ pre_locks = tdb_find_zero_off(tdb, hash_off(tdb, 0), num);
+ /* We want to lock the zero entry as well. */
+ pre_locks++;
+ if (lock_lists(tdb, 1, pre_locks - 1, ltype) == -1) {
tdb_unlock_list(tdb, 0, ltype);
return TDB_OFF_ERR;
}
- pre_locks = len;
- len = num - start;
- } else {
- /* We already have lock on start. */
- start++;
- pre_locks = 0;
- }
- if (unlikely(lock_lists(tdb, start, len, ltype) == -1)) {
- if (pre_locks)
+
+ /* Now lock later ones. */
+ if (unlikely(lock_lists(tdb, start, len, ltype) == -1)) {
unlock_lists(tdb, 0, pre_locks, ltype);
- else
+ return TDB_OFF_ERR;
+ }
+ len += pre_locks;
+ } else {
+ /* We want to lock the zero entry as well. */
+ len++;
+ /* But we already have lock on start. */
+ if (unlikely(lock_lists(tdb, start+1, len-1, ltype) == -1)) {
tdb_unlock_list(tdb, start, ltype);
- return TDB_OFF_ERR;
+ return TDB_OFF_ERR;
+ }
}
/* Now, did we lose the race, and it's not zero any more? */
- if (unlikely(tdb_read_off(tdb, hash_off(tdb, pre_locks + len)) != 0)) {
- unlock_lists(tdb, 0, pre_locks, ltype);
+ if (unlikely(tdb_read_off(tdb, hash_off(tdb, start + len - 1)) != 0)) {
/* Leave the start locked, as expected. */
unlock_lists(tdb, start + 1, len - 1, ltype);
goto again;
}
- return pre_locks + len;
+ return len;
}
/* FIXME: modify, don't rewrite! */
for (i = start; i < start + num_locks; i++) {
off = entry_matches(tdb, i, h, &key, &rec);
/* Empty entry or we found it? */
- if (off == 0 || off != TDB_OFF_ERR) {
- old_bucket = i;
+ if (off == 0 || off != TDB_OFF_ERR)
break;
- }
}
if (i == start + num_locks)
off = 0;
+
+ /* Even if not found, this is where we put the new entry. */
+ old_bucket = i;
}
/* Now we have lock on this hash bucket. */
/* FIXME: by simple simulation, this approximated 60% full.
* Check in real case! */
- if (unlikely(num_locks > 4 * tdb->header.v.hash_bits - 31))
+ if (unlikely(num_locks > 4 * tdb->header.v.hash_bits - 30))
enlarge_hash(tdb);
return 0;
#include <stdlib.h>
#include <string.h>
#include <assert.h>
+#include "logging.h"
struct tdb_layout *new_tdb_layout(void)
{
mem = malloc(len);
/* Now populate our header, cribbing from a real TDB header. */
- tdb = tdb_open(NULL, TDB_INTERNAL, O_RDWR, 0, NULL);
+ tdb = tdb_open(NULL, TDB_INTERNAL, O_RDWR, 0, &tap_log_attr);
hdr = (void *)mem;
*hdr = tdb->header;
hdr->v.generation++;
unsigned tap_log_messages;
+union tdb_attribute tap_log_attr = {
+ .log = { .base = { .attr = TDB_ATTRIBUTE_LOG },
+ .log_fn = tap_log_fn }
+};
+
void tap_log_fn(struct tdb_context *tdb,
enum tdb_debug_level level, void *priv,
const char *fmt, ...)
abort();
diag("tdb log level %u: %s", level, p);
free(p);
- tap_log_messages++;
+ if (level != TDB_DEBUG_TRACE)
+ tap_log_messages++;
va_end(ap);
}
#include <stdbool.h>
#include <string.h>
-unsigned tap_log_messages;
+extern unsigned tap_log_messages;
+
+extern union tdb_attribute tap_log_attr;
void tap_log_fn(struct tdb_context *tdb,
enum tdb_debug_level level, void *priv,
tdb_layout_add_freetable(layout, 1, 16, 12, 0);
tdb_layout_add_free(layout, 1024);
tdb = tdb_layout_get(layout);
- tdb->log = tap_log_fn;
ok1(tdb_check(tdb, NULL, NULL) == 0);
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
tdb_layout_add_free(layout, 1024);
tdb_layout_add_used(layout, key, data, 6);
tdb = tdb_layout_get(layout);
- tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_layout_add_free(layout, 1024);
tdb_layout_add_free(layout, 512);
tdb = tdb_layout_get(layout);
- tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_layout_add_free(layout, 512);
tdb_layout_add_used(layout, key, data, 6);
tdb = tdb_layout_get(layout);
- tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_layout_add_free(layout, 512);
tdb_layout_add_free(layout, 32);
tdb = tdb_layout_get(layout);
- tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
ok1(free_record_length(tdb, layout->elem[4].base.off) == 32);
tdb_layout_add_free(layout, 32768);
tdb_layout_add_free(layout, 30000);
tdb = tdb_layout_get(layout);
- tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 32768);
ok1(zone_of(tdb, layout->elem[2].base.off) == 0);
ok1(free_record_length(tdb, layout->elem[3].base.off) == 30000);
}
total -= sizeof(struct tdb_used_record);
tdb = tdb_layout_get(layout);
- tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1 << 4);
ok1(tdb_check(tdb, NULL, NULL) == 0);
--- /dev/null
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+/* We rig the hash so adjacent-numbered records always clash. */
+static uint64_t clash(const void *key, size_t len, uint64_t seed, void *priv)
+{
+ return *(unsigned int *)key / 2;
+}
+
+static void test_val(struct tdb_context *tdb, unsigned int val)
+{
+ unsigned int v;
+ struct tdb_data key = { (unsigned char *)&v, sizeof(v) };
+ struct tdb_data data = { (unsigned char *)&v, sizeof(v) };
+
+ /* Insert an entry, then delete it. */
+ v = val;
+ /* Delete should fail. */
+ ok1(tdb_delete(tdb, key) == -1);
+ ok1(tdb_error(tdb) == TDB_ERR_NOEXIST);
+ ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+ /* Insert should succeed. */
+ ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+ ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+ /* Delete should succeed. */
+ ok1(tdb_delete(tdb, key) == 0);
+ ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+ /* Re-add it, then add collision. */
+ ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+ v = val + 1;
+ ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+ ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+ /* Can find both? */
+ ok1(tdb_fetch(tdb, key).dsize == data.dsize);
+ v = val;
+ ok1(tdb_fetch(tdb, key).dsize == data.dsize);
+
+ /* Delete second one. */
+ v = val + 1;
+ ok1(tdb_delete(tdb, key) == 0);
+ ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+ /* Re-add */
+ ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+ ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+ /* Now, try deleting first one. */
+ v = val;
+ ok1(tdb_delete(tdb, key) == 0);
+ ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+ /* Can still find second? */
+ v = val + 1;
+ ok1(tdb_fetch(tdb, key).dsize == data.dsize);
+
+ /* Delete that, so we are empty. */
+ ok1(tdb_delete(tdb, key) == 0);
+ ok1(tdb_check(tdb, NULL, NULL) == 0);
+}
+
+int main(int argc, char *argv[])
+{
+ unsigned int i;
+ struct tdb_context *tdb;
+ union tdb_attribute hattr = { .hash = { .base = { TDB_ATTRIBUTE_HASH },
+ .hash_fn = clash } };
+ int flags[] = { TDB_INTERNAL, TDB_DEFAULT,
+ TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT };
+
+ hattr.base.next = &tap_log_attr;
+
+ plan_tests(sizeof(flags) / sizeof(flags[0]) * 44 + 1);
+ for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+ tdb = tdb_open("run-delete.tdb", flags[i],
+ O_RDWR|O_CREAT|O_TRUNC, 0600, &hattr);
+ ok1(tdb);
+ if (!tdb)
+ continue;
+
+ /* Check start of hash table. */
+ test_val(tdb, 0);
+
+ /* Check end of hash table (will wrap around!). */
+ test_val(tdb, ((1 << tdb->header.v.hash_bits) - 1) * 2);
+
+ ok1(!tdb_has_locks(tdb));
+ tdb_close(tdb);
+ }
+
+ ok1(tap_log_messages == 0);
+ return exit_status();
+}
plan_tests(sizeof(flags) / sizeof(flags[0]) * 2 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-enlarge-hash.tdb", flags[i],
- O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
- tdb->log = tap_log_fn;
+ O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (tdb) {
enlarge_hash(tdb);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
}
+ /* FIXME: Test enlarging with hash clash. */
}
ok1(tap_log_messages == 0);
return exit_status();
/* First, lower level expansion tests. */
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-expand.tdb", flags[i],
- O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
- tdb->log = tap_log_fn;
+ O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (!tdb)
continue;
/* Now using tdb_expand. */
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-expand.tdb", flags[i],
- O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
- tdb->log = tap_log_fn;
+ O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (!tdb)
continue;
plan_tests(sizeof(flags) / sizeof(flags[0]) * 2 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-new_database", flags[i],
- O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
- tdb->log = tap_log_fn;
+ O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (tdb) {
ok1(tdb_check(tdb, NULL, NULL) == 0);
* (3 + (1 + (MAX_SIZE/SIZE_STEP)) * 2) + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-record-expand.tdb", flags[i],
- O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
- tdb->log = tap_log_fn;
+ O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (!tdb)
continue;
plan_tests(sizeof(flags) / sizeof(flags[0]) * 8 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-simple-delete.tdb", flags[i],
- O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
- tdb->log = tap_log_fn;
+ O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (tdb) {
/* Delete should fail. */
plan_tests(sizeof(flags) / sizeof(flags[0]) * 8 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-simple-fetch.tdb", flags[i],
- O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
- tdb->log = tap_log_fn;
+ O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (tdb) {
struct tdb_data d;
plan_tests(sizeof(flags) / sizeof(flags[0]) * 9 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-simple-store.tdb", flags[i],
- O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
- tdb->log = tap_log_fn;
+ O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (tdb) {
/* Modify should fail. */