tdb2: change to using a hash tree.
authorRusty Russell <rusty@rustcorp.com.au>
Thu, 9 Sep 2010 09:29:18 +0000 (18:59 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Thu, 9 Sep 2010 09:29:18 +0000 (18:59 +0930)
As the locking issues with enlarging a hash were so nasty, we switch to a
tree structure for the entries.  It's a hash which expands to point to
sub-hashes when it fills.

This means we no longer have a 'volatile' header: the top hash cannot move.
In fact, we no longer store a copy of the header in the tdb_context; we only
need hash_seed.

New helper functions for accessing writable areas and committing the results
(if they had to be copied).  New debug test to make sure we don't hold access
while we're doing something which could cause us to unmap/remap.

Find becomes more complicated: we need to track where we found (or didn't
find) an entry so we can easily add/delete it.

Traverse becomes more complicated: we need to track where we were in the
hash tree.

32 files changed:
ccan/tdb2/check.c
ccan/tdb2/free.c
ccan/tdb2/hash.c [new file with mode: 0644]
ccan/tdb2/io.c
ccan/tdb2/lock.c
ccan/tdb2/private.h
ccan/tdb2/tdb.c
ccan/tdb2/test/layout.c
ccan/tdb2/test/layout.h
ccan/tdb2/test/run-001-encode.c
ccan/tdb2/test/run-001-fls.c
ccan/tdb2/test/run-01-new_database.c
ccan/tdb2/test/run-02-expand.c
ccan/tdb2/test/run-03-coalesce.c
ccan/tdb2/test/run-04-basichash.c [new file with mode: 0644]
ccan/tdb2/test/run-10-simple-store.c [new file with mode: 0644]
ccan/tdb2/test/run-11-simple-fetch.c [new file with mode: 0644]
ccan/tdb2/test/run-12-store.c [new file with mode: 0644]
ccan/tdb2/test/run-13-delete.c [new file with mode: 0644]
ccan/tdb2/test/run-15-append.c [new file with mode: 0644]
ccan/tdb2/test/run-20-growhash.c [new file with mode: 0644]
ccan/tdb2/test/run-append.c [deleted file]
ccan/tdb2/test/run-delete.c [deleted file]
ccan/tdb2/test/run-enlarge_hash.c [deleted file]
ccan/tdb2/test/run-hashclash.c [deleted file]
ccan/tdb2/test/run-missing-entries.c
ccan/tdb2/test/run-record-expand.c
ccan/tdb2/test/run-simple-delete.c
ccan/tdb2/test/run-simple-fetch.c [deleted file]
ccan/tdb2/test/run-simple-store.c [deleted file]
ccan/tdb2/test/run-traverse.c
ccan/tdb2/traverse.c

index 5500883ff2b04cad383953877f4c8f415b06b0b2..1ce75be065f4317b1e63a4c2bc1afb47e446478b 100644 (file)
@@ -33,63 +33,60 @@ static bool append(tdb_off_t **arr, size_t *num, tdb_off_t off)
 static bool check_header(struct tdb_context *tdb)
 {
        uint64_t hash_test;
+       struct tdb_header hdr;
+
+       if (tdb_read_convert(tdb, 0, &hdr, sizeof(hdr)) == -1)
+               return false;
+       /* magic food should not be converted, so convert back. */
+       tdb_convert(tdb, hdr.magic_food, sizeof(hdr.magic_food));
 
        hash_test = TDB_HASH_MAGIC;
        hash_test = tdb_hash(tdb, &hash_test, sizeof(hash_test));
-       if (tdb->header.hash_test != hash_test) {
+       if (hdr.hash_test != hash_test) {
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
                         "check: hash test %llu should be %llu\n",
-                        tdb->header.hash_test, hash_test);
+                        hdr.hash_test, hash_test);
                return false;
        }
-       if (strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0) {
+
+       if (strcmp(hdr.magic_food, TDB_MAGIC_FOOD) != 0) {
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
                         "check: bad magic '%.*s'\n",
-                        sizeof(tdb->header.magic_food),
-                        tdb->header.magic_food);
-               return false;
-       }
-       if (tdb->header.v.hash_bits < INITIAL_HASH_BITS) {
-               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                        "check: bad hash bits %llu\n",
-                        (long long)tdb->header.v.hash_bits);
+                        sizeof(hdr.magic_food), hdr.magic_food);
                return false;
        }
 
-       /* We check hash_off later. */
-
        /* Don't check reserved: they *can* be used later. */
        return true;
 }
 
-static int off_cmp(const tdb_off_t *a, const tdb_off_t *b)
-{
-       /* Can overflow an int. */
-       return *a > *b ? 1
-               : *a < *b ? -1
-               : 0;
-}
-
-static bool check_hash_list(struct tdb_context *tdb,
+static bool check_hash_tree(struct tdb_context *tdb,
+                           tdb_off_t off, unsigned int group_bits,
+                           uint64_t hprefix,
+                           unsigned hprefix_bits,
                            tdb_off_t used[],
-                           size_t num_used)
+                           size_t num_used,
+                           size_t *num_found);
+
+static bool check_hash_record(struct tdb_context *tdb,
+                             tdb_off_t off,
+                             uint64_t hprefix,
+                             unsigned hprefix_bits,
+                             tdb_off_t used[],
+                             size_t num_used,
+                             size_t *num_found)
 {
        struct tdb_used_record rec;
-       tdb_len_t hashlen, i, num_nonzero;
-       tdb_off_t h;
-       size_t num_found;
-
-       hashlen = sizeof(tdb_off_t) << tdb->header.v.hash_bits;
 
-       if (tdb_read_convert(tdb, tdb->header.v.hash_off - sizeof(rec),
-                            &rec, sizeof(rec)) == -1)
+       if (tdb_read_convert(tdb, off, &rec, sizeof(rec)) == -1)
                return false;
 
-       if (rec_data_length(&rec) != hashlen) {
+       if (rec_data_length(&rec)
+           != sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS) {
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
                         "tdb_check: Bad hash table length %llu vs %llu\n",
                         (long long)rec_data_length(&rec),
-                        (long long)hashlen);
+                        (long long)sizeof(tdb_off_t)<<TDB_SUBLEVEL_HASH_BITS);
                return false;
        }
        if (rec_key_length(&rec) != 0) {
@@ -105,69 +102,171 @@ static bool check_hash_list(struct tdb_context *tdb,
                return false;
        }
 
-       num_found = 0;
-       num_nonzero = 0;
-       for (i = 0, h = tdb->header.v.hash_off;
-            i < (1ULL << tdb->header.v.hash_bits);
-            i++, h += sizeof(tdb_off_t)) {
-               tdb_off_t off, *p, pos;
-               struct tdb_used_record rec;
-               uint64_t hash;
-
-               off = tdb_read_off(tdb, h);
-               if (off == TDB_OFF_ERR)
-                       return false;
-               if (!off) {
-                       num_nonzero = 0;
-                       continue;
-               }
-               /* FIXME: Check hash bits */
-               p = asearch(&off, used, num_used, off_cmp);
-               if (!p) {
-                       tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                                "tdb_check: Invalid offset %llu in hash\n",
-                                (long long)off);
-                       return false;
-               }
-               /* Mark it invalid. */
-               *p ^= 1;
-               num_found++;
-
-               if (tdb_read_convert(tdb, off, &rec, sizeof(rec)) == -1)
-                       return false;
-
-               /* Check it is hashed correctly. */
-               hash = hash_record(tdb, off);
-
-               /* Top bits must match header. */
-               if (hash >> (64 - 5) != rec_hash(&rec)) {
-                       tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                                "tdb_check: Bad hash magic at offset %llu"
-                                " (0x%llx vs 0x%llx)\n",
-                                (long long)off,
-                                (long long)hash, (long long)rec_hash(&rec));
-                       return false;
-               }
+       off += sizeof(rec);
+       return check_hash_tree(tdb, off,
+                              TDB_SUBLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS,
+                              hprefix, hprefix_bits,
+                              used, num_used, num_found);
+}
+
+static int off_cmp(const tdb_off_t *a, const tdb_off_t *b)
+{
+       /* Can overflow an int. */
+       return *a > *b ? 1
+               : *a < *b ? -1
+               : 0;
+}
+
+static uint64_t get_bits(uint64_t h, unsigned num, unsigned *used)
+{
+       *used += num;
+
+       return (h >> (64 - *used)) & ((1U << num) - 1);
+}
+
+static bool check_hash_tree(struct tdb_context *tdb,
+                           tdb_off_t off, unsigned int group_bits,
+                           uint64_t hprefix,
+                           unsigned hprefix_bits,
+                           tdb_off_t used[],
+                           size_t num_used,
+                           size_t *num_found)
+{
+       unsigned int g, b;
+       const tdb_off_t *hash;
+       struct tdb_used_record rec;
+
+       hash = tdb_access_read(tdb, off,
+                              sizeof(tdb_off_t)
+                              << (group_bits + TDB_HASH_GROUP_BITS),
+                              true);
+       if (!hash)
+               return false;
 
-               /* It must be in the right place in hash array. */
-               pos = hash & ((1ULL << tdb->header.v.hash_bits)-1);
-               if (pos < i - num_nonzero || pos > i) {
-                       /* Could be wrap from end of array?  FIXME: check? */
-                       if (i != num_nonzero) {
+       for (g = 0; g < (1 << group_bits); g++) {
+               const tdb_off_t *group = hash + (g << TDB_HASH_GROUP_BITS);
+               for (b = 0; b < (1 << TDB_HASH_GROUP_BITS); b++) {
+                       unsigned int bucket, i, used_bits;
+                       uint64_t h;
+                       tdb_off_t *p;
+                       if (group[b] == 0)
+                               continue;
+
+                       off = group[b] & TDB_OFF_MASK;
+                       p = asearch(&off, used, num_used, off_cmp);
+                       if (!p) {
+                               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                                        "tdb_check: Invalid offset %llu "
+                                        "in hash\n",
+                                        (long long)off);
+                               goto fail;
+                       }
+                       /* Mark it invalid. */
+                       *p ^= 1;
+                       (*num_found)++;
+
+                       if (is_subhash(group[b])) {
+                               uint64_t subprefix;
+                               subprefix = (hprefix 
+                                    << (group_bits + TDB_HASH_GROUP_BITS))
+                                       + g * (1 << TDB_HASH_GROUP_BITS) + b;
+
+                               if (!check_hash_record(tdb,
+                                              group[b] & TDB_OFF_MASK,
+                                              subprefix,
+                                              hprefix_bits
+                                                      + group_bits
+                                                      + TDB_HASH_GROUP_BITS,
+                                              used, num_used, num_found))
+                                       goto fail;
+                               continue;
+                       }
+                       /* A normal entry */
+
+                       /* Does it belong here at all? */
+                       h = hash_record(tdb, off);
+                       used_bits = 0;
+                       if (get_bits(h, hprefix_bits, &used_bits) != hprefix
+                           && hprefix_bits) {
                                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                                        "tdb_check: Bad hash position %llu at"
-                                        " offset %llu hash 0x%llx\n",
-                                        (long long)i,
+                                        "check: bad hash placement"
+                                        " 0x%llx vs 0x%llx\n",
+                                        (long long)h, (long long)hprefix);
+                               goto fail;
+                       }
+
+                       /* Does it belong in this group? */
+                       if (get_bits(h, group_bits, &used_bits) != g) {
+                               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                                        "check: bad group %llu vs %u\n",
+                                        (long long)h, g);
+                               goto fail;
+                       }
+
+                       /* Are bucket bits correct? */
+                       bucket = group[b] & TDB_OFF_HASH_GROUP_MASK;
+                       if (get_bits(h, TDB_HASH_GROUP_BITS, &used_bits)
+                           != bucket) {
+                               used_bits -= TDB_HASH_GROUP_BITS;
+                               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                                        "check: bad bucket %u vs %u\n",
+                                        (unsigned)get_bits(h,
+                                                           TDB_HASH_GROUP_BITS,
+                                                           &used_bits),
+                                        bucket);
+                               goto fail;
+                       }
+
+                       /* There must not be any zero entries between
+                        * the bucket it belongs in and this one! */
+                       for (i = bucket;
+                            i != b;
+                            i = (i + 1) % (1 << TDB_HASH_GROUP_BITS)) {
+                               if (group[i] == 0) {
+                                       tdb->log(tdb, TDB_DEBUG_ERROR,
+                                                tdb->log_priv,
+                                                "check: bad group placement"
+                                                " %u vs %u\n",
+                                                b, bucket);
+                                       goto fail;
+                               }
+                       }
+
+                       if (tdb_read_convert(tdb, off, &rec, sizeof(rec)) == -1)
+                               goto fail;
+
+                       /* Bottom bits must match header. */
+                       if ((h & ((1 << 5)-1)) != rec_hash(&rec)) {
+                               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+                                        "tdb_check: Bad hash magic at"
+                                        " offset %llu (0x%llx vs 0x%llx)\n",
                                         (long long)off,
-                                        (long long)hash);
-                               return false;
+                                        (long long)h,
+                                        (long long)rec_hash(&rec));
+                               goto fail;
                        }
                }
-               num_nonzero++;
        }
+       tdb_access_release(tdb, hash);
+       return true;
+
+fail:
+       tdb_access_release(tdb, hash);
+       return false;
+}
 
-       /* hash table is one of the used blocks. */
-       if (num_found != num_used - 1) {
+static bool check_hash(struct tdb_context *tdb,
+                      tdb_off_t used[],
+                      size_t num_used)
+{
+       size_t num_found = 0;
+
+       if (!check_hash_tree(tdb, offsetof(struct tdb_header, hashtable),
+                            TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS,
+                            0, 0,  used, num_used, &num_found))
+               return false;
+
+       if (num_found != num_used) {
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
                         "tdb_check: Not all entries are in hash\n");
                return false;
@@ -265,7 +364,7 @@ static tdb_len_t check_free_list(struct tdb_context *tdb,
 static tdb_off_t check_zone(struct tdb_context *tdb, tdb_off_t zone_off,
                            tdb_off_t **used, size_t *num_used,
                            tdb_off_t **free, size_t *num_free,
-                           bool *hash_found, unsigned int *max_zone_bits)
+                           unsigned int *max_zone_bits)
 {
        struct free_zone_header zhdr;
        tdb_off_t off, hdrlen;
@@ -372,9 +471,6 @@ static tdb_off_t check_zone(struct tdb_context *tdb, tdb_off_t zone_off,
                                         (long long)len, (long long)off);
                                return TDB_OFF_ERR;
                        }
-
-                       if (off + sizeof(p->u) == tdb->header.v.hash_off)
-                               *hash_found = true;
                }
        }
        return 1ULL << zhdr.zone_bits;
@@ -388,15 +484,18 @@ int tdb_check(struct tdb_context *tdb,
        tdb_off_t *free = NULL, *used = NULL, off;
        tdb_len_t len;
        size_t num_free = 0, num_used = 0, num_found = 0;
-       bool hash_found = false;
        unsigned max_zone_bits = INITIAL_ZONE_BITS;
        uint8_t tailer;
 
-       /* FIXME: need more locking? against expansion? */
        /* This always ensures the header is uptodate. */
        if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false) != 0)
                return -1;
 
+       if (tdb_lock_expand(tdb, F_RDLCK) != 0) {
+               tdb_allrecord_unlock(tdb, F_RDLCK);
+               return -1;
+       }
+
        if (!check_header(tdb))
                goto fail;
 
@@ -405,7 +504,7 @@ int tdb_check(struct tdb_context *tdb,
             off < tdb->map_size - 1;
             off += len) {
                len = check_zone(tdb, off, &used, &num_used, &free, &num_free,
-                                &hash_found, &max_zone_bits);
+                                &max_zone_bits);
                if (len == TDB_OFF_ERR)
                        goto fail;
        }
@@ -421,15 +520,8 @@ int tdb_check(struct tdb_context *tdb,
                goto fail;
        }
 
-       if (!hash_found) {
-               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                        "tdb_check: hash table not found at %llu\n",
-                        (long long)tdb->header.v.hash_off);
-               goto fail;
-       }
-
        /* FIXME: Check key uniqueness? */
-       if (!check_hash_list(tdb, used, num_used))
+       if (!check_hash(tdb, used, num_used))
                goto fail;
 
        for (off = sizeof(struct tdb_header);
@@ -446,9 +538,11 @@ int tdb_check(struct tdb_context *tdb,
        }
 
        tdb_allrecord_unlock(tdb, F_RDLCK);
+       tdb_unlock_expand(tdb, F_RDLCK);
        return 0;
 
 fail:
        tdb_allrecord_unlock(tdb, F_RDLCK);
+       tdb_unlock_expand(tdb, F_RDLCK);
        return -1;
 }
index efc29e884ea2078c54ed28616b9d0cdce1228ac1..565bc69e03570d5393dd45c0dcec43f8cf7a2ef5 100644 (file)
@@ -510,10 +510,10 @@ int set_header(struct tdb_context *tdb,
 {
        uint64_t keybits = (fls64(keylen) + 1) / 2;
 
-       /* Use top bits of hash, so it's independent of hash table size. */
+       /* Use bottom bits of hash, so it's independent of hash table size. */
        rec->magic_and_meta
                = zone_bits
-               | ((hash >> 59) << 6)
+               | ((hash & ((1 << 5)-1)) << 6)
                | ((actuallen - (keylen + datalen)) << 11)
                | (keybits << 43)
                | (TDB_MAGIC << 48);
@@ -654,8 +654,8 @@ tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
        tdb_len_t size, actual;
        struct tdb_used_record rec;
 
-       /* We don't want header to change during this! */
-       assert(tdb->header_uptodate);
+       /* We can't hold pointers during this: we could unmap! */
+       assert(!tdb->direct_access);
 
        size = adjust_size(keylen, datalen, growing);
 
diff --git a/ccan/tdb2/hash.c b/ccan/tdb2/hash.c
new file mode 100644 (file)
index 0000000..0cc9376
--- /dev/null
@@ -0,0 +1,594 @@
+ /* 
+   Trivial Database 2: hash handling
+   Copyright (C) Rusty Russell 2010
+   
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 3 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include "private.h"
+#include <assert.h>
+#include <ccan/hash/hash.h>
+
+static uint64_t jenkins_hash(const void *key, size_t length, uint64_t seed,
+                            void *arg)
+{
+       uint64_t ret;
+       /* hash64_stable assumes lower bits are more important; they are a
+        * slightly better hash.  We use the upper bits first, so swap them. */
+       ret = hash64_stable((const unsigned char *)key, length, seed);
+       return (ret >> 32) | (ret << 32);
+}
+
+void tdb_hash_init(struct tdb_context *tdb)
+{
+       tdb->khash = jenkins_hash;
+       tdb->hash_priv = NULL;
+}
+
+uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len)
+{
+       return tdb->khash(ptr, len, tdb->hash_seed, tdb->hash_priv);
+}
+
+uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
+{
+       struct tdb_used_record pad, *r;
+       const void *key;
+       uint64_t klen, hash;
+
+       r = tdb_get(tdb, off, &pad, sizeof(pad));
+       if (!r)
+               /* FIXME */
+               return 0;
+
+       klen = rec_key_length(r);
+       key = tdb_access_read(tdb, off + sizeof(pad), klen, false);
+       if (!key)
+               return 0;
+
+       hash = tdb_hash(tdb, key, klen);
+       tdb_access_release(tdb, key);
+       return hash;
+}
+
+/* Get bits from a value. */
+static uint32_t bits(uint64_t val, unsigned start, unsigned num)
+{
+       assert(num <= 32);
+       return (val >> start) & ((1U << num) - 1);
+}
+
+/* We take bits from the top: that way we can lock whole sections of the hash
+ * by using lock ranges. */
+static uint32_t use_bits(struct hash_info *h, unsigned num)
+{
+       h->hash_used += num;
+       return bits(h->h, 64 - h->hash_used, num);
+}
+
+/* Does entry match? */
+static bool match(struct tdb_context *tdb,
+                 struct hash_info *h,
+                 const struct tdb_data *key,
+                 tdb_off_t val,
+                 struct tdb_used_record *rec)
+{
+       bool ret;
+       const unsigned char *rkey;
+       tdb_off_t off;
+
+       /* FIXME: Handle hash value truncated. */
+       if (bits(val, TDB_OFF_HASH_TRUNCATED_BIT, 1))
+               abort();
+
+       /* Desired bucket must match. */
+       if (h->home_bucket != (val & TDB_OFF_HASH_GROUP_MASK))
+               return false;
+
+       /* Top bits of offset == next bits of hash. */
+       if (bits(val, TDB_OFF_HASH_EXTRA_BIT, TDB_OFF_UPPER_STEAL_EXTRA)
+           != bits(h->h, 64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
+                   TDB_OFF_UPPER_STEAL_EXTRA))
+               return false;
+
+       off = val & TDB_OFF_MASK;
+       if (tdb_read_convert(tdb, off, rec, sizeof(*rec)) == -1)
+               return false;
+
+       /* FIXME: check extra bits in header? */
+       if (rec_key_length(rec) != key->dsize)
+               return false;
+
+       rkey = tdb_access_read(tdb, off + sizeof(*rec), key->dsize, false);
+       if (!rkey)
+               return false;
+       ret = (memcmp(rkey, key->dptr, key->dsize) == 0);
+       tdb_access_release(tdb, rkey);
+       return ret;
+}
+
+static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned bucket)
+{
+       return group_start
+               + (bucket % (1 << TDB_HASH_GROUP_BITS)) * sizeof(tdb_off_t);
+}
+
+/* Truncated hashes can't be all 1: that's how we spot a sub-hash */
+bool is_subhash(tdb_off_t val)
+{
+       return val >> (64-TDB_OFF_UPPER_STEAL) == (1<<TDB_OFF_UPPER_STEAL) - 1;
+}
+
+/* This is the core routine which searches the hashtable for an entry.
+ * On error, no locks are held and TDB_OFF_ERR is returned.
+ * Otherwise, hinfo is filled in.
+ * If not found, the return value is 0.
+ * If found, the return value is the offset, and *rec is the record. */
+tdb_off_t find_and_lock(struct tdb_context *tdb,
+                       struct tdb_data key,
+                       int ltype,
+                       struct hash_info *h,
+                       struct tdb_used_record *rec)
+{
+       uint32_t i, group;
+       tdb_off_t hashtable;
+
+       h->h = tdb_hash(tdb, key.dptr, key.dsize);
+       h->hash_used = 0;
+       group = use_bits(h, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
+       h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
+
+       /* FIXME: Guess the depth, don't over-lock! */
+       h->hlock_start = (tdb_off_t)group
+               << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
+       h->hlock_range = 1ULL << (64 - (TDB_TOPLEVEL_HASH_BITS
+                                       - TDB_HASH_GROUP_BITS));
+       if (tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
+                           TDB_LOCK_WAIT))
+               return TDB_OFF_ERR;
+
+       hashtable = offsetof(struct tdb_header, hashtable);
+
+       while (likely(h->hash_used < 64)) {
+               /* Read in the hash group. */
+               h->group_start = hashtable
+                       + group * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
+
+               if (tdb_read_convert(tdb, h->group_start, &h->group,
+                                    sizeof(h->group)) == -1)
+                       goto fail;
+
+               /* Pointer to another hash table?  Go down... */
+               if (is_subhash(h->group[h->home_bucket])) {
+                       hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
+                               + sizeof(struct tdb_used_record);
+                       group = use_bits(h, TDB_SUBLEVEL_HASH_BITS
+                                        - TDB_HASH_GROUP_BITS);
+                       h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
+                       continue;
+               }
+
+               /* It's in this group: search (until 0 or all searched) */
+               for (i = 0, h->found_bucket = h->home_bucket;
+                    i < (1 << TDB_HASH_GROUP_BITS);
+                    i++, h->found_bucket = ((h->found_bucket+1)
+                                            % (1 << TDB_HASH_GROUP_BITS))) {
+                       if (is_subhash(h->group[h->found_bucket]))
+                               continue;
+
+                       if (!h->group[h->found_bucket])
+                               break;
+
+                       if (match(tdb, h, &key, h->group[h->found_bucket], rec))
+                               return h->group[h->found_bucket] & TDB_OFF_MASK;
+               }
+               /* Didn't find it: h indicates where it would go. */
+               return 0;
+       }
+
+       /* FIXME: We hit the bottom.  Chain! */
+       abort();
+
+fail:
+       tdb_unlock_hashes(tdb, h->hlock_start, h->hlock_range, ltype);
+       return TDB_OFF_ERR;
+}
+
+/* I wrote a simple test, expanding a hash to 2GB, for the following
+ * cases:
+ * 1) Expanding all the buckets at once,
+ * 2) Expanding the most-populated bucket,
+ * 3) Expanding the bucket we wanted to place the new entry ito.
+ *
+ * I measured the worst/average/best density during this process.
+ * 1) 3%/16%/30%
+ * 2) 4%/20%/38%
+ * 3) 6%/22%/41%
+ *
+ * So we figure out the busiest bucket for the moment.
+ */
+static unsigned fullest_bucket(struct tdb_context *tdb,
+                              const tdb_off_t *group,
+                              unsigned new_bucket)
+{
+       unsigned counts[1 << TDB_HASH_GROUP_BITS] = { 0 };
+       unsigned int i, best_bucket;
+
+       /* Count the new entry. */
+       counts[new_bucket]++;
+       best_bucket = new_bucket;
+
+       for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
+               unsigned this_bucket;
+
+               if (is_subhash(group[i]))
+                       continue;
+               this_bucket = group[i] & TDB_OFF_HASH_GROUP_MASK;
+               if (++counts[this_bucket] > counts[best_bucket])
+                       best_bucket = this_bucket;
+       }
+
+       return best_bucket;
+}
+
+static bool put_into_group(tdb_off_t *group,
+                          unsigned bucket, tdb_off_t encoded)
+{
+       unsigned int i;
+
+       for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
+               unsigned b = (bucket + i) % (1 << TDB_HASH_GROUP_BITS);
+
+               if (group[b] == 0) {
+                       group[b] = encoded;
+                       return true;
+               }
+       }
+       return false;
+}
+
+static void force_into_group(tdb_off_t *group,
+                            unsigned bucket, tdb_off_t encoded)
+{
+       if (!put_into_group(group, bucket, encoded))
+               abort();
+}
+
+static tdb_off_t encode_offset(tdb_off_t new_off, struct hash_info *h)
+{
+       return h->home_bucket
+               | new_off
+               | ((uint64_t)bits(h->h,
+                                 64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
+                                 TDB_OFF_UPPER_STEAL_EXTRA)
+                  << TDB_OFF_HASH_EXTRA_BIT);
+}
+
+/* Simply overwrite the hash entry we found before. */ 
+int replace_in_hash(struct tdb_context *tdb,
+                   struct hash_info *h,
+                   tdb_off_t new_off)
+{
+       return tdb_write_off(tdb, hbucket_off(h->group_start, h->found_bucket),
+                            encode_offset(new_off, h));
+}
+
+/* Add into a newly created subhash. */
+static int add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash,
+                         unsigned hash_used, tdb_off_t val)
+{
+       tdb_off_t off = (val & TDB_OFF_MASK), *group;
+       struct hash_info h;
+       unsigned int gnum;
+
+       h.hash_used = hash_used;
+
+       /* FIXME chain if hash_used == 64 */
+       if (hash_used + TDB_SUBLEVEL_HASH_BITS > 64)
+               abort();
+
+       /* FIXME: Do truncated hash bits if we can! */
+       h.h = hash_record(tdb, off);
+       gnum = use_bits(&h, TDB_SUBLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS);
+       h.group_start = subhash + sizeof(struct tdb_used_record)
+               + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
+       h.home_bucket = use_bits(&h, TDB_HASH_GROUP_BITS);
+
+       group = tdb_access_write(tdb, h.group_start,
+                                sizeof(*group) << TDB_HASH_GROUP_BITS, true);
+       if (!group)
+               return -1;
+       force_into_group(group, h.home_bucket, encode_offset(off, &h));
+       return tdb_access_commit(tdb, group);
+}
+
+static int expand_group(struct tdb_context *tdb, struct hash_info *h)
+{
+       unsigned bucket, num_vals, i;
+       tdb_off_t subhash;
+       tdb_off_t vals[1 << TDB_HASH_GROUP_BITS];
+
+       /* Attach new empty subhash under fullest bucket. */
+       bucket = fullest_bucket(tdb, h->group, h->home_bucket);
+
+       subhash = alloc(tdb, 0, sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS,
+                       0, false);
+       if (subhash == TDB_OFF_ERR)
+               return -1;
+
+       if (zero_out(tdb, subhash + sizeof(struct tdb_used_record),
+                    sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS) == -1)
+               return -1;
+
+       /* Remove any which are destined for bucket or are in wrong place. */
+       num_vals = 0;
+       for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
+               unsigned home_bucket = h->group[i] & TDB_OFF_HASH_GROUP_MASK;
+               if (!h->group[i] || is_subhash(h->group[i]))
+                       continue;
+               if (home_bucket == bucket || home_bucket != i) {
+                       vals[num_vals++] = h->group[i];
+                       h->group[i] = 0;
+               }
+       }
+       /* FIXME: This assert is valid, but we do this during unit test :( */
+       /* assert(num_vals); */
+
+       /* Overwrite expanded bucket with subhash pointer. */
+       h->group[bucket] = subhash | ~((1ULL << (64 - TDB_OFF_UPPER_STEAL))-1);
+
+       /* Put values back. */
+       for (i = 0; i < num_vals; i++) {
+               unsigned this_bucket = vals[i] & TDB_OFF_HASH_GROUP_MASK;
+
+               if (this_bucket == bucket) {
+                       if (add_to_subhash(tdb, subhash, h->hash_used, vals[i]))
+                               return -1;
+               } else {
+                       /* There should be room to put this back. */
+                       force_into_group(h->group, this_bucket, vals[i]);
+               }
+       }
+       return 0;
+}
+
+int delete_from_hash(struct tdb_context *tdb, struct hash_info *h)
+{
+       unsigned int i, num_movers = 0;
+       tdb_off_t movers[1 << TDB_HASH_GROUP_BITS];
+
+       h->group[h->found_bucket] = 0;
+       for (i = 1; i < (1 << TDB_HASH_GROUP_BITS); i++) {
+               unsigned this_bucket;
+
+               this_bucket = (h->found_bucket+i) % (1 << TDB_HASH_GROUP_BITS);
+               /* Empty bucket?  We're done. */
+               if (!h->group[this_bucket])
+                       break;
+
+               /* Ignore subhashes. */
+               if (is_subhash(h->group[this_bucket]))
+                       continue;
+
+               /* If this one is not happy where it is, we'll move it. */
+               if ((h->group[this_bucket] & TDB_OFF_HASH_GROUP_MASK)
+                   != this_bucket) {
+                       movers[num_movers++] = h->group[this_bucket];
+                       h->group[this_bucket] = 0;
+               }
+       }
+
+       /* Put back the ones we erased. */
+       for (i = 0; i < num_movers; i++) {
+               force_into_group(h->group, movers[i] & TDB_OFF_HASH_GROUP_MASK,
+                                movers[i]);
+       }
+
+       /* Now we write back the hash group */
+       return tdb_write_convert(tdb, h->group_start,
+                                h->group, sizeof(h->group));
+}
+
+int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off)
+{
+       /* FIXME: chain! */
+       if (h->hash_used >= 64)
+               abort();
+
+       /* We hit an empty bucket during search?  That's where it goes. */
+       if (!h->group[h->found_bucket]) {
+               h->group[h->found_bucket] = encode_offset(new_off, h);
+               /* Write back the modified group. */
+               return tdb_write_convert(tdb, h->group_start,
+                                        h->group, sizeof(h->group));
+       }
+
+       /* We're full.  Expand. */
+       if (expand_group(tdb, h) == -1)
+               return -1;
+
+       if (is_subhash(h->group[h->home_bucket])) {
+               /* We were expanded! */
+               tdb_off_t hashtable;
+               unsigned int gnum;
+
+               /* Write back the modified group. */
+               if (tdb_write_convert(tdb, h->group_start, h->group,
+                                     sizeof(h->group)))
+                       return -1;
+
+               /* Move hashinfo down a level. */
+               hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
+                       + sizeof(struct tdb_used_record);
+               gnum = use_bits(h,TDB_SUBLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
+               h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
+               h->group_start = hashtable
+                       + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
+               if (tdb_read_convert(tdb, h->group_start, &h->group,
+                                    sizeof(h->group)) == -1)
+                       return -1;
+       }
+
+       /* Expanding the group must have made room if it didn't choose this
+        * bucket. */
+       if (put_into_group(h->group, h->home_bucket, encode_offset(new_off, h)))
+               return tdb_write_convert(tdb, h->group_start,
+                                        h->group, sizeof(h->group));
+
+       /* This can happen if all hashes in group (and us) dropped into same
+        * group in subhash. */
+       return add_to_hash(tdb, h, new_off);
+}
+
+/* No point holding references/copies of db once we drop lock. */
+static void release_entries(struct tdb_context *tdb,
+                           struct traverse_info *tinfo)
+{
+       unsigned int i;
+
+       for (i = 0; i < tinfo->num_levels; i++) {
+               if (tinfo->levels[i].entries) {
+                       tdb_access_release(tdb, tinfo->levels[i].entries);
+                       tinfo->levels[i].entries = NULL;
+               }
+       }
+}
+
+/* Traverse support: returns offset of record, or 0 or TDB_OFF_ERR. */
+static tdb_off_t iterate_hash(struct tdb_context *tdb,
+                             struct traverse_info *tinfo)
+{
+       tdb_off_t off;
+       unsigned int i;
+       struct traverse_level *tlevel;
+
+       tlevel = &tinfo->levels[tinfo->num_levels-1];
+
+again:
+       if (!tlevel->entries) {
+               tlevel->entries = tdb_access_read(tdb, tlevel->hashtable,
+                                                 sizeof(tdb_off_t)
+                                                 * tlevel->total_buckets,
+                                                 true);
+               if (!tlevel->entries)
+                       return TDB_OFF_ERR;
+       }
+
+       /* FIXME: Use tdb_find_nonzero_off? */ 
+       for (i = tlevel->entry; i < tlevel->total_buckets; i++) {
+               if (!tlevel->entries[i] || tlevel->entries[i] == tinfo->prev)
+                       continue;
+
+               tlevel->entry = i;
+               off = tlevel->entries[i] & TDB_OFF_MASK;
+
+               if (!is_subhash(tlevel->entries[i])) {
+                       /* Found one. */
+                       tinfo->prev = tlevel->entries[i];
+                       release_entries(tdb, tinfo);
+                       return off;
+               }
+
+               /* When we come back, we want tne next one */
+               tlevel->entry++;
+               tinfo->num_levels++;
+               tlevel++;
+               tlevel->hashtable = off + sizeof(struct tdb_used_record);
+               tlevel->entry = 0;
+               tlevel->entries = NULL;
+               tlevel->total_buckets = (1 << TDB_SUBLEVEL_HASH_BITS);
+               goto again;
+       }
+
+       /* Nothing there? */
+       if (tinfo->num_levels == 1) {
+               release_entries(tdb, tinfo);
+               return 0;
+       }
+
+       /* Go back up and keep searching. */
+       tdb_access_release(tdb, tlevel->entries);
+       tinfo->num_levels--;
+       tlevel--;
+       goto again;
+}
+
+/* Return 1 if we find something, 0 if not, -1 on error. */
+int next_in_hash(struct tdb_context *tdb, int ltype,
+                struct traverse_info *tinfo,
+                TDB_DATA *kbuf, unsigned int *dlen)
+{
+       const unsigned group_bits = TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS;
+       tdb_off_t hlock_start, hlock_range, off;
+
+       while (tinfo->toplevel_group < (1 << group_bits)) {
+               hlock_start = (tdb_off_t)tinfo->toplevel_group
+                       << (64 - group_bits);
+               hlock_range = 1ULL << group_bits;
+               if (tdb_lock_hashes(tdb, hlock_start, hlock_range, ltype,
+                                   TDB_LOCK_WAIT) != 0)
+                       return -1;
+
+               off = iterate_hash(tdb, tinfo);
+               if (off) {
+                       struct tdb_used_record rec;
+
+                       if (tdb_read_convert(tdb, off, &rec, sizeof(rec))) {
+                               tdb_unlock_hashes(tdb,
+                                                 hlock_start, hlock_range,
+                                                 ltype);
+                               return -1;
+                       }
+                       kbuf->dsize = rec_key_length(&rec);
+
+                       /* They want data as well? */
+                       if (dlen) {
+                               *dlen = rec_data_length(&rec);
+                               kbuf->dptr = tdb_alloc_read(tdb, 
+                                                           off + sizeof(rec),
+                                                           kbuf->dsize
+                                                           + *dlen);
+                       } else {
+                               kbuf->dptr = tdb_alloc_read(tdb, 
+                                                           off + sizeof(rec),
+                                                           kbuf->dsize);
+                       }
+                       tdb_unlock_hashes(tdb, hlock_start, hlock_range, ltype);
+                       return kbuf->dptr ? 1 : -1;
+               }
+
+               tdb_unlock_hashes(tdb, hlock_start, hlock_range, ltype);
+
+               tinfo->toplevel_group++;
+               tinfo->levels[0].hashtable
+                       += (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
+               tinfo->levels[0].entry = 0;
+       }
+       return 0;
+}
+
+/* Return 1 if we find something, 0 if not, -1 on error. */
+int first_in_hash(struct tdb_context *tdb, int ltype,
+                 struct traverse_info *tinfo,
+                 TDB_DATA *kbuf, unsigned int *dlen)
+{
+       tinfo->prev = 0;
+       tinfo->toplevel_group = 0;
+       tinfo->num_levels = 1;
+       tinfo->levels[0].hashtable = offsetof(struct tdb_header, hashtable);
+       tinfo->levels[0].entries = NULL;
+       tinfo->levels[0].entry = 0;
+       tinfo->levels[0].total_buckets = (1 << TDB_HASH_GROUP_BITS);
+
+       return next_in_hash(tdb, ltype, tinfo, kbuf, dlen);
+}
index a973adde65df7658c50a001c75fbc931bb852153..6cb1bdda6d5e68646dfe81976b73477f47445902 100644 (file)
@@ -26,6 +26,7 @@
    License along with this library; if not, see <http://www.gnu.org/licenses/>.
 */
 #include "private.h"
+#include <assert.h>
 #include <ccan/likely/likely.h>
 
 void tdb_munmap(struct tdb_context *tdb)
@@ -72,6 +73,10 @@ static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
        struct stat st;
        int ret;
 
+       /* FIXME: We can't hold pointers during this: we could unmap! */
+       /* (We currently do this in traverse!) */
+//     assert(!tdb->direct_access || tdb_has_expansion_lock(tdb));
+
        if (len <= tdb->map_size)
                return 0;
        if (tdb->flags & TDB_INTERNAL) {
@@ -375,44 +380,29 @@ int tdb_write_off(struct tdb_context *tdb, tdb_off_t off, tdb_off_t val)
        return tdb_write_convert(tdb, off, &val, sizeof(val));
 }
 
-/* read a lump of data, allocating the space for it */
-void *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
+static void *_tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset,
+                            tdb_len_t len, unsigned int prefix)
 {
        void *buf;
 
        /* some systems don't like zero length malloc */
-       buf = malloc(len ? len : 1);
+       buf = malloc(prefix + len ? prefix + len : 1);
        if (unlikely(!buf)) {
                tdb->ecode = TDB_ERR_OOM;
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
                         "tdb_alloc_read malloc failed len=%lld\n",
-                        (long long)len);
-       } else if (unlikely(tdb->methods->read(tdb, offset, buf, len))) {
+                        (long long)prefix + len);
+       } else if (unlikely(tdb->methods->read(tdb, offset, buf+prefix, len))) {
                free(buf);
                buf = NULL;
        }
        return buf;
 }
 
-uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
+/* read a lump of data, allocating the space for it */
+void *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
 {
-       struct tdb_used_record pad, *r;
-       const void *key;
-       uint64_t klen, hash;
-
-       r = tdb_get(tdb, off, &pad, sizeof(pad));
-       if (!r)
-               /* FIXME */
-               return 0;
-
-       klen = rec_key_length(r);
-       key = tdb_access_read(tdb, off + sizeof(pad), klen, false);
-       if (!key)
-               return 0;
-
-       hash = tdb_hash(tdb, key, klen);
-       tdb_access_release(tdb, key);
-       return hash;
+       return _tdb_alloc_read(tdb, offset, len, 0);
 }
 
 static int fill(struct tdb_context *tdb,
@@ -474,19 +464,57 @@ static int tdb_expand_file(struct tdb_context *tdb, tdb_len_t addition)
        return 0;
 }
 
+/* This is only neded for tdb_access_commit, but used everywhere to simplify. */
+struct tdb_access_hdr {
+       tdb_off_t off;
+       tdb_len_t len;
+       bool convert;
+};
+
 const void *tdb_access_read(struct tdb_context *tdb,
                            tdb_off_t off, tdb_len_t len, bool convert)
 {
-       const void *ret = NULL;
+       const void *ret = NULL; 
 
        if (likely(!(tdb->flags & TDB_CONVERT)))
                ret = tdb_direct(tdb, off, len);
 
        if (!ret) {
-               ret = tdb_alloc_read(tdb, off, len);
-               if (convert)
-                       tdb_convert(tdb, (void *)ret, len);
-       }
+               struct tdb_access_hdr *hdr;
+               hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr));
+               if (hdr) {
+                       ret = hdr + 1;
+                       if (convert)
+                               tdb_convert(tdb, (void *)ret, len);
+               }
+       } else
+               tdb->direct_access++;
+
+       return ret;
+}
+
+void *tdb_access_write(struct tdb_context *tdb,
+                      tdb_off_t off, tdb_len_t len, bool convert)
+{
+       void *ret = NULL;
+
+       if (likely(!(tdb->flags & TDB_CONVERT)))
+               ret = tdb_direct(tdb, off, len);
+
+       if (!ret) {
+               struct tdb_access_hdr *hdr;
+               hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr));
+               if (hdr) {
+                       hdr->off = off;
+                       hdr->len = len;
+                       hdr->convert = convert;
+                       ret = hdr + 1;
+                       if (convert)
+                               tdb_convert(tdb, (void *)ret, len);
+               }
+       } else
+               tdb->direct_access++;
+
        return ret;
 }
 
@@ -495,7 +523,30 @@ void tdb_access_release(struct tdb_context *tdb, const void *p)
        if (!tdb->map_ptr
            || (char *)p < (char *)tdb->map_ptr
            || (char *)p >= (char *)tdb->map_ptr + tdb->map_size)
-               free((void *)p);
+               free((struct tdb_access_hdr *)p - 1);
+       else
+               tdb->direct_access--;
+}
+
+int tdb_access_commit(struct tdb_context *tdb, void *p)
+{
+       int ret = 0;
+
+       if (!tdb->map_ptr
+           || (char *)p < (char *)tdb->map_ptr
+           || (char *)p >= (char *)tdb->map_ptr + tdb->map_size) {
+               struct tdb_access_hdr *hdr;
+
+               hdr = (struct tdb_access_hdr *)p - 1;
+               if (hdr->convert)
+                       ret = tdb_write_convert(tdb, hdr->off, p, hdr->len);
+               else
+                       ret = tdb_write(tdb, hdr->off, p, hdr->len);
+               free(hdr);
+       } else
+               tdb->direct_access--;
+
+       return ret;
 }
 
 #if 0
index 1cb150a42bec6b334d4c747d2d3c2b9d6e3e38b0..15c97e3b98343b0a02ee910bdac0ec93664e3527 100644 (file)
@@ -257,7 +257,7 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype,
 {
        struct tdb_lock_type *new_lck;
 
-       if (offset >= TDB_HASH_LOCK_START + (1 << 30) + tdb->map_size / 8) {
+       if (offset >= TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE + tdb->map_size / 8) {
                tdb->ecode = TDB_ERR_LOCK;
                tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
                         "tdb_lock: invalid offset %llu ltype=%d\n",
@@ -374,11 +374,6 @@ static int tdb_nest_unlock(struct tdb_context *tdb, tdb_off_t off, int ltype)
         */
        *lck = tdb->lockrecs[--tdb->num_lockrecs];
 
-       if (tdb->num_lockrecs == 0) {
-               /* If we're not holding any locks, header can change. */
-               tdb->header_uptodate = false;
-       }
-
        return ret;
 }
 
@@ -410,8 +405,10 @@ static int tdb_lock_gradual(struct tdb_context *tdb,
        int ret;
        enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
 
-       if (len <= 4) {
-               /* Single record.  Just do blocking lock. */
+       if (len <= 1) {
+               /* 0 would mean to end-of-file... */
+               assert(len != 0);
+               /* Single hash.  Just do blocking lock. */
                return tdb_brlock(tdb, ltype, off, len, flags);
        }
 
@@ -437,14 +434,11 @@ static int tdb_lock_gradual(struct tdb_context *tdb,
 
 /* lock/unlock entire database.  It can only be upgradable if you have some
  * other way of guaranteeing exclusivity (ie. transaction write lock).
- * Note that we don't lock the free chains: noone can get those locks
- * without a hash chain lock first.
- * The header *will be* up to date once this returns success. */
+ * Note that we don't lock the free chains: currently noone can get those locks
+ * without a hash chain lock first. */
 int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
                       enum tdb_lock_flags flags, bool upgradable)
 {
-       tdb_off_t hash_size;
-
        /* FIXME: There are no locks on read-only dbs */
        if (tdb->read_only) {
                tdb->ecode = TDB_ERR_LOCK;
@@ -484,11 +478,9 @@ int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
                return -1;
        }
 
-       /* Lock all the hash buckets. */
 again:
-       hash_size = (1ULL << tdb->header.v.hash_bits);
        if (tdb_lock_gradual(tdb, ltype, flags, TDB_HASH_LOCK_START,
-                            hash_size)) {
+                            TDB_HASH_LOCK_RANGE)) {
                if (!(flags & TDB_LOCK_PROBE)) {
                        tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
                                 "tdb_lockall hashes failed (%s)\n",
@@ -503,12 +495,6 @@ again:
        tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
        tdb->allrecord_lock.off = upgradable;
 
-       /* Now we re-check header, holding lock. */
-       if (unlikely(header_changed(tdb))) {
-               tdb_allrecord_unlock(tdb, ltype);
-               goto again;
-       }
-
        /* Now check for needing recovery. */
        if (unlikely(tdb_needs_recovery(tdb))) {
                tdb_allrecord_unlock(tdb, ltype);
@@ -544,8 +530,6 @@ void tdb_unlock_expand(struct tdb_context *tdb, int ltype)
 /* unlock entire db */
 int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 {
-       tdb_off_t hash_size;
-
        /* FIXME: There are no locks on read-only dbs */
        if (tdb->read_only) {
                tdb->ecode = TDB_ERR_LOCK;
@@ -579,11 +563,15 @@ int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 
        tdb->allrecord_lock.count = 0;
        tdb->allrecord_lock.ltype = 0;
-       tdb->header_uptodate = false;
 
-       hash_size = (1ULL << tdb->header.v.hash_bits);
+       return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START,
+                           TDB_HASH_LOCK_RANGE);
+}
 
-       return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, hash_size);
+bool tdb_has_expansion_lock(struct tdb_context *tdb)
+{
+       return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL
+               || (tdb->flags & TDB_NOLOCK);
 }
 
 bool tdb_has_locks(struct tdb_context *tdb)
@@ -637,18 +625,19 @@ int tdb_unlockall_read(struct tdb_context *tdb)
 }
 #endif
 
-/* Returns the list we actually locked. */
-tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
-                       int ltype, enum tdb_lock_flags waitflag)
+int tdb_lock_hashes(struct tdb_context *tdb,
+                   tdb_off_t hash_lock,
+                   tdb_len_t hash_range,
+                   int ltype, enum tdb_lock_flags waitflag)
 {
-       tdb_off_t list = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
-       /* Header can change ONLY if we had no locks before. */
-       bool can_change = tdb->num_lockrecs == 0;
+       /* FIXME: Do this properly, using hlock_range */
+       unsigned lock = TDB_HASH_LOCK_START
+               + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
 
        /* a allrecord lock allows us to avoid per chain locks */
        if (tdb->allrecord_lock.count &&
            (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
-               return list;
+               return 0;
        }
 
        if (tdb->allrecord_lock.count) {
@@ -657,27 +646,18 @@ tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
                         "tdb_lock_list: have %s allrecordlock\n",
                         tdb->allrecord_lock.ltype == F_RDLCK
                         ? "read" : "write");
-               return TDB_OFF_ERR;
+               return -1;
        }
 
-again:
-       if (tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag))
-               return TDB_OFF_ERR;
-
-       if (can_change && unlikely(header_changed(tdb))) {
-               tdb_off_t new = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
-               if (new != list) {
-                       tdb_nest_unlock(tdb, TDB_HASH_LOCK_START+list, ltype);
-                       list = new;
-                       goto again;
-               }
-       }
-       return list;
+       return tdb_nest_lock(tdb, lock, ltype, waitflag);
 }
 
-int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
+int tdb_unlock_hashes(struct tdb_context *tdb,
+                     tdb_off_t hash_lock,
+                     tdb_len_t hash_range, int ltype)
 {
-       list &= ((1ULL << tdb->header.v.hash_bits) - 1);
+       unsigned lock = TDB_HASH_LOCK_START
+               + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
 
        /* a allrecord lock allows us to avoid per chain locks */
        if (tdb->allrecord_lock.count) {
@@ -689,9 +669,9 @@ int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
                        return -1;
                }
                return 0;
-       } else {
-               return tdb_nest_unlock(tdb, TDB_HASH_LOCK_START + list, ltype);
        }
+
+       return tdb_nest_unlock(tdb, lock, ltype);
 }
 
 /* Hash locks use TDB_HASH_LOCK_START + the next 30 bits.
@@ -701,7 +681,7 @@ int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
  */
 static tdb_off_t free_lock_off(tdb_off_t b_off)
 {
-       return TDB_HASH_LOCK_START + (1 << 30) + b_off / sizeof(tdb_off_t);
+       return TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE + b_off / sizeof(tdb_off_t);
 }
 
 int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
@@ -747,7 +727,7 @@ static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
        int ret;
        uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
 
-       ret = tdb_lock_list(tdb, h, ltype, waitflag) == TDB_OFF_ERR ? -1 : 0;
+       ret = tdb_lock_hashes(tdb, h, 1, ltype, waitflag);
        tdb_trace_1rec(tdb, func, *key);
        return ret;
 }
@@ -763,7 +743,7 @@ int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
 {
        uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
        tdb_trace_1rec(tdb, "tdb_chainunlock", key);
-       return tdb_unlock_list(tdb, h, F_WRLCK);
+       return tdb_unlock_hashes(tdb, h, 1, F_WRLCK);
 }
 
 #if 0
index 2074e3155a0046ed737cb92cadb80e7a00d6148e..a241210bebc0a559a087881f11d1d413c364656d 100644 (file)
@@ -83,13 +83,36 @@ typedef uint64_t tdb_off_t;
 /* Hash chain locks. */
 #define TDB_HASH_LOCK_START 3
 
-/* We start wih 256 hash buckets, and a 64k-sized zone. */
-#define INITIAL_HASH_BITS 8
+/* Range for hash locks. */
+#define TDB_HASH_LOCK_RANGE_BITS 30
+#define TDB_HASH_LOCK_RANGE (1 << TDB_HASH_LOCK_RANGE_BITS)
+
+/* We have 1024 entries in the top level. */
+#define TDB_TOPLEVEL_HASH_BITS 10
+/* And 64 entries in each sub-level: thus 64 bits exactly after 9 levels. */
+#define TDB_SUBLEVEL_HASH_BITS 6
+/* And 8 entries in each group, ie 8 groups per sublevel. */
+#define TDB_HASH_GROUP_BITS 3
+
+/* We start with a 64k-sized zone. */
 #define INITIAL_ZONE_BITS 16
-
 /* Try to create zones at least 32 times larger than allocations. */
 #define TDB_COMFORT_FACTOR_BITS 5
 
+/* We steal bits from the offsets to store hash info. */
+#define TDB_OFF_HASH_GROUP_MASK ((1ULL << TDB_HASH_GROUP_BITS) - 1)
+/* We steal this many upper bits, giving a maximum offset of 64 exabytes. */
+#define TDB_OFF_UPPER_STEAL 8
+#define   TDB_OFF_UPPER_STEAL_EXTRA 7
+#define   TDB_OFF_UPPER_STEAL_TRUNCBIT 1
+/* If this is set, hash is truncated (only 1 bit is valid). */
+#define TDB_OFF_HASH_TRUNCATED_BIT 56
+/* The bit number where we store next level of hash. */
+#define TDB_OFF_HASH_EXTRA_BIT 57
+/* Convenience mask to get actual offset. */
+#define TDB_OFF_MASK \
+       (((1ULL << (64 - TDB_OFF_UPPER_STEAL)) - 1) - TDB_OFF_HASH_GROUP_MASK)
+
 /* We ensure buckets up to size 1 << (zone_bits - TDB_COMFORT_FACTOR_BITS). */
 /* FIXME: test this matches size_to_bucket! */
 #define BUCKETS_FOR_ZONE(zone_bits) ((zone_bits) + 2 - TDB_COMFORT_FACTOR_BITS)
@@ -173,34 +196,61 @@ static inline uint64_t frec_magic(const struct tdb_free_record *f)
        return f->magic_and_meta & ~((1ULL << 6) - 1);
 }
 
-/* These parts can change while we have db open. */
-struct tdb_header_volatile {
-       uint64_t generation; /* Makes sure it changes on every update. */
-       uint64_t hash_bits; /* Entries in hash table. */
-       uint64_t hash_off; /* Offset of hash table. */
+/* Each zone has its set of free lists at the head.
+ *
+ * For each zone we have a series of per-size buckets, and a final bucket for
+ * "too big". */
+struct free_zone_header {
+       /* How much does this zone cover? */
+       uint64_t zone_bits;
+       /* tdb_off_t buckets[free_buckets + 1] */
 };
 
 /* this is stored at the front of every database */
 struct tdb_header {
-       char magic_food[32]; /* for /etc/magic */
+       char magic_food[64]; /* for /etc/magic */
        /* FIXME: Make me 32 bit? */
        uint64_t version; /* version of the code */
        uint64_t hash_test; /* result of hashing HASH_MAGIC. */
        uint64_t hash_seed; /* "random" seed written at creation time. */
 
-       struct tdb_header_volatile v;
+       tdb_off_t reserved[28];
 
-       tdb_off_t reserved[19];
+       /* Top level hash table. */
+       tdb_off_t hashtable[1ULL << TDB_TOPLEVEL_HASH_BITS];
 };
 
-/* Each zone has its set of free lists at the head.
- *
- * For each zone we have a series of per-size buckets, and a final bucket for
- * "too big". */
-struct free_zone_header {
-       /* How much does this zone cover? */
-       uint64_t zone_bits;
-       /* tdb_off_t buckets[free_buckets + 1] */
+/* Information about a particular (locked) hash entry. */
+struct hash_info {
+       /* Full hash value of entry. */
+       uint64_t h;
+       /* Start and length of lock acquired. */
+       tdb_off_t hlock_start;
+       tdb_len_t hlock_range;
+       /* Start of hash group. */
+       tdb_off_t group_start;
+       /* Bucket we belong in. */
+       unsigned int home_bucket;
+       /* Bucket we (or an empty space) were found in. */
+       unsigned int found_bucket;
+       /* How many bits of the hash are already used. */
+       unsigned int hash_used;
+       /* Current working group. */
+       tdb_off_t group[1 << TDB_HASH_GROUP_BITS];
+};
+
+struct traverse_info {
+       struct traverse_level {
+               tdb_off_t hashtable;
+               const tdb_off_t *entries;
+               /* We ignore groups here, and treat it as a big array. */
+               unsigned entry;
+               unsigned int total_buckets;
+       } levels[64 / TDB_SUBLEVEL_HASH_BITS];
+       unsigned int num_levels;
+       unsigned int toplevel_group;
+       /* This makes delete-everything-inside-traverse work as expected. */
+       tdb_off_t prev;
 };
 
 enum tdb_lock_flags {
@@ -224,6 +274,9 @@ struct tdb_context {
        /* Mmap (if any), or malloc (for TDB_INTERNAL). */
        void *map_ptr;
 
+       /* Are we accessing directly? (debugging check). */
+       int direct_access;
+
         /* Open file descriptor (undefined for TDB_INTERNAL). */
        int fd;
 
@@ -236,11 +289,6 @@ struct tdb_context {
        /* Error code for last tdb error. */
        enum TDB_ERROR ecode; 
 
-       /* A cached copy of the header */
-       struct tdb_header header; 
-       /* (for debugging). */
-       bool header_uptodate; 
-
        /* the flags passed to tdb_open, for tdb_reopen. */
        uint32_t flags;
 
@@ -251,6 +299,7 @@ struct tdb_context {
        /* Hash function. */
        tdb_hashfn_t khash;
        void *hash_priv;
+       uint64_t hash_seed;
 
        /* Set if we are in a transaction. */
        struct tdb_transaction *transaction;
@@ -284,19 +333,33 @@ struct tdb_methods {
 /*
   internal prototypes
 */
-/* tdb.c: */
-/* Returns true if header changed (and updates it). */
-bool header_changed(struct tdb_context *tdb);
-
-/* Commit header to disk. */
-int write_header(struct tdb_context *tdb);
+/* hash.c: */
+void tdb_hash_init(struct tdb_context *tdb);
 
 /* Hash random memory. */
 uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len);
 
-/* offset of hash table entry for this list/hash value */
-tdb_off_t hash_off(struct tdb_context *tdb, uint64_t list);
+/* Hash on disk. */
+uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off);
+
+/* Find and lock a hash entry (or where it would be). */
+tdb_off_t find_and_lock(struct tdb_context *tdb,
+                       struct tdb_data key,
+                       int ltype,
+                       struct hash_info *h,
+                       struct tdb_used_record *rec);
 
+int replace_in_hash(struct tdb_context *tdb,
+                   struct hash_info *h,
+                   tdb_off_t new_off);
+
+int add_to_hash(struct tdb_context *tdb, struct hash_info *h,
+               tdb_off_t new_off);
+
+int delete_from_hash(struct tdb_context *tdb, struct hash_info *h);
+
+/* For tdb_check */
+bool is_subhash(tdb_off_t val);
 
 /* free.c: */
 int tdb_zone_init(struct tdb_context *tdb);
@@ -338,7 +401,13 @@ void *tdb_get(struct tdb_context *tdb, tdb_off_t off, void *pad, size_t len);
 /* Either alloc a copy, or give direct access.  Release frees or noop. */
 const void *tdb_access_read(struct tdb_context *tdb,
                            tdb_off_t off, tdb_len_t len, bool convert);
+void *tdb_access_write(struct tdb_context *tdb,
+                      tdb_off_t off, tdb_len_t len, bool convert);
+
+/* Release result of tdb_access_read/write. */
 void tdb_access_release(struct tdb_context *tdb, const void *p);
+/* Commit result of tdb_acces_write. */
+int tdb_access_commit(struct tdb_context *tdb, void *p);
 
 /* Convenience routine to get an offset. */
 tdb_off_t tdb_read_off(struct tdb_context *tdb, tdb_off_t off);
@@ -373,16 +442,17 @@ int tdb_write_convert(struct tdb_context *tdb, tdb_off_t off,
 int tdb_read_convert(struct tdb_context *tdb, tdb_off_t off,
                     void *rec, size_t len);
 
-/* Hash on disk. */
-uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off);
 
 /* lock.c: */
 void tdb_lock_init(struct tdb_context *tdb);
 
-/* Lock/unlock a particular hash list. */
-tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
-                       int ltype, enum tdb_lock_flags waitflag);
-int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype);
+/* Lock/unlock a range of hashes. */
+int tdb_lock_hashes(struct tdb_context *tdb,
+                   tdb_off_t hash_lock, tdb_len_t hash_range,
+                   int ltype, enum tdb_lock_flags waitflag);
+int tdb_unlock_hashes(struct tdb_context *tdb,
+                     tdb_off_t hash_lock,
+                     tdb_len_t hash_range, int ltype);
 
 /* Lock/unlock a particular free bucket. */
 int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
@@ -404,6 +474,16 @@ void tdb_unlock_open(struct tdb_context *tdb);
 /* Serialize db expand. */
 int tdb_lock_expand(struct tdb_context *tdb, int ltype);
 void tdb_unlock_expand(struct tdb_context *tdb, int ltype);
+bool tdb_has_expansion_lock(struct tdb_context *tdb);
+
+
+/* traverse.c: */
+int first_in_hash(struct tdb_context *tdb, int ltype,
+                 struct traverse_info *tinfo,
+                 TDB_DATA *kbuf, size_t *dlen);
+int next_in_hash(struct tdb_context *tdb, int ltype,
+                struct traverse_info *tinfo,
+                TDB_DATA *kbuf, size_t *dlen);
 
 
 #if 0
index 40811c2eff83609ca2c55d757815e2a0cae9f9d1..1a229dc2b94c1538e5093a8df55d74b8dd8d74f8 100644 (file)
@@ -1,6 +1,5 @@
 #include "private.h"
 #include <ccan/tdb2/tdb2.h>
-#include <ccan/hash/hash.h>
 #include <ccan/build_assert/build_assert.h>
 #include <ccan/likely/likely.h>
 #include <assert.h>
@@ -18,51 +17,6 @@ null_log_fn(struct tdb_context *tdb,
 {
 }
 
-/* We do a lot of work assuming our copy of the header volatile area
- * is uptodate, and usually it is.  However, once we grab a lock, we have to
- * re-check it. */
-bool header_changed(struct tdb_context *tdb)
-{
-       uint64_t gen;
-
-       if (!(tdb->flags & TDB_NOLOCK) && tdb->header_uptodate) {
-               tdb->log(tdb, TDB_DEBUG_WARNING, tdb->log_priv,
-                        "warning: header uptodate already\n");
-       }
-
-       /* We could get a partial update if we're not holding any locks. */
-       assert((tdb->flags & TDB_NOLOCK) || tdb_has_locks(tdb));
-
-       tdb->header_uptodate = true;
-       gen = tdb_read_off(tdb, offsetof(struct tdb_header, v.generation));
-       if (unlikely(gen != tdb->header.v.generation)) {
-               tdb_read_convert(tdb, offsetof(struct tdb_header, v),
-                                &tdb->header.v, sizeof(tdb->header.v));
-               return true;
-       }
-       return false;
-}
-
-int write_header(struct tdb_context *tdb)
-{
-       assert(tdb_read_off(tdb, offsetof(struct tdb_header, v.generation))
-              == tdb->header.v.generation);
-       tdb->header.v.generation++;
-       return tdb_write_convert(tdb, offsetof(struct tdb_header, v),
-                                &tdb->header.v, sizeof(tdb->header.v));
-}
-
-static uint64_t jenkins_hash(const void *key, size_t length, uint64_t seed,
-                            void *arg)
-{
-       return hash64_stable((const unsigned char *)key, length, seed);
-}
-
-uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len)
-{
-       return tdb->khash(ptr, len, tdb->header.hash_seed, tdb->hash_priv);
-}
-
 static bool tdb_already_open(dev_t device, ino_t ino)
 {
        struct tdb_context *i;
@@ -124,66 +78,57 @@ static uint64_t random_number(struct tdb_context *tdb)
        return ret;
 }
 
-struct new_db_head {
+struct new_database {
        struct tdb_header hdr;
+       /* Initial free zone. */
        struct free_zone_header zhdr;
        tdb_off_t free[BUCKETS_FOR_ZONE(INITIAL_ZONE_BITS) + 1];
-       struct tdb_used_record hrec;
-       tdb_off_t hash[1ULL << INITIAL_HASH_BITS];
        struct tdb_free_record frec;
-};
-
-struct new_database {
-       struct new_db_head h;
        /* Rest up to 1 << INITIAL_ZONE_BITS is empty. */
        char space[(1 << INITIAL_ZONE_BITS)
-                  - (sizeof(struct new_db_head) - sizeof(struct tdb_header))];
+                  - sizeof(struct free_zone_header)
+                  - sizeof(tdb_off_t) * (BUCKETS_FOR_ZONE(INITIAL_ZONE_BITS)+1)
+                  - sizeof(struct tdb_free_record)];
        uint8_t tailer;
        /* Don't count final padding! */
 };
 
 /* initialise a new database */
-static int tdb_new_database(struct tdb_context *tdb)
+static int tdb_new_database(struct tdb_context *tdb, struct tdb_header *hdr)
 {
        /* We make it up in memory, then write it out if not internal */
        struct new_database newdb;
-       unsigned int bucket, magic_off, dbsize;
+       unsigned int bucket, magic_len, dbsize;
 
        /* Don't want any extra padding! */
        dbsize = offsetof(struct new_database, tailer) + sizeof(newdb.tailer);
 
        /* Fill in the header */
-       newdb.h.hdr.version = TDB_VERSION;
-       newdb.h.hdr.hash_seed = random_number(tdb);
-       newdb.h.hdr.hash_test = TDB_HASH_MAGIC;
-       newdb.h.hdr.hash_test = tdb->khash(&newdb.h.hdr.hash_test,
-                                          sizeof(newdb.h.hdr.hash_test),
-                                          newdb.h.hdr.hash_seed,
-                                          tdb->hash_priv);
-       memset(newdb.h.hdr.reserved, 0, sizeof(newdb.h.hdr.reserved));
-       newdb.h.hdr.v.generation = 0;
+       newdb.hdr.version = TDB_VERSION;
+       newdb.hdr.hash_seed = random_number(tdb);
+       newdb.hdr.hash_test = TDB_HASH_MAGIC;
+       newdb.hdr.hash_test = tdb->khash(&newdb.hdr.hash_test,
+                                        sizeof(newdb.hdr.hash_test),
+                                        newdb.hdr.hash_seed,
+                                        tdb->hash_priv);
+       memset(newdb.hdr.reserved, 0, sizeof(newdb.hdr.reserved));
        /* Initial hashes are empty. */
-       newdb.h.hdr.v.hash_bits = INITIAL_HASH_BITS;
-       newdb.h.hdr.v.hash_off = offsetof(struct new_database, h.hash);
-       set_header(tdb, &newdb.h.hrec, 0,
-                  sizeof(newdb.h.hash), sizeof(newdb.h.hash), 0,
-                  INITIAL_ZONE_BITS);
-       memset(newdb.h.hash, 0, sizeof(newdb.h.hash));
+       memset(newdb.hdr.hashtable, 0, sizeof(newdb.hdr.hashtable));
+
+       /* Free is mostly empty... */
+       newdb.zhdr.zone_bits = INITIAL_ZONE_BITS;
+       memset(newdb.free, 0, sizeof(newdb.free));
 
        /* Create the single free entry. */
-       newdb.h.frec.magic_and_meta = TDB_FREE_MAGIC | INITIAL_ZONE_BITS;
-       newdb.h.frec.data_len = (sizeof(newdb.h.frec)
+       newdb.frec.magic_and_meta = TDB_FREE_MAGIC | INITIAL_ZONE_BITS;
+       newdb.frec.data_len = (sizeof(newdb.frec)
                                 - sizeof(struct tdb_used_record)
                                 + sizeof(newdb.space));
 
-       /* Free is mostly empty... */
-       newdb.h.zhdr.zone_bits = INITIAL_ZONE_BITS;
-       memset(newdb.h.free, 0, sizeof(newdb.h.free));
-
-       /* ... except for this one bucket. */
-       bucket = size_to_bucket(INITIAL_ZONE_BITS, newdb.h.frec.data_len);
-       newdb.h.free[bucket] = offsetof(struct new_database, h.frec);
-       newdb.h.frec.next = newdb.h.frec.prev = 0;
+       /* Add it to the correct bucket. */
+       bucket = size_to_bucket(INITIAL_ZONE_BITS, newdb.frec.data_len);
+       newdb.free[bucket] = offsetof(struct new_database, frec);
+       newdb.frec.next = newdb.frec.prev = 0;
 
        /* Clear free space to keep valgrind happy, and avoid leaking stack. */
        memset(newdb.space, 0, sizeof(newdb.space));
@@ -192,16 +137,16 @@ static int tdb_new_database(struct tdb_context *tdb)
        newdb.tailer = INITIAL_ZONE_BITS;
 
        /* Magic food */
-       memset(newdb.h.hdr.magic_food, 0, sizeof(newdb.h.hdr.magic_food));
-       strcpy(newdb.h.hdr.magic_food, TDB_MAGIC_FOOD);
+       memset(newdb.hdr.magic_food, 0, sizeof(newdb.hdr.magic_food));
+       strcpy(newdb.hdr.magic_food, TDB_MAGIC_FOOD);
 
        /* This creates an endian-converted database, as if read from disk */
-       magic_off = offsetof(struct tdb_header, magic_food);
+       magic_len = sizeof(newdb.hdr.magic_food);
        tdb_convert(tdb,
-                   (char *)&newdb.h.hdr + magic_off,
-                   dbsize - 1 - magic_off);
+                   (char *)&newdb.hdr + magic_len,
+                   offsetof(struct new_database, space) - magic_len);
 
-       tdb->header = newdb.h.hdr;
+       *hdr = newdb.hdr;
 
        if (tdb->flags & TDB_INTERNAL) {
                tdb->map_size = dbsize;
@@ -235,6 +180,7 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
        int save_errno;
        uint64_t hash_test;
        unsigned v;
+       struct tdb_header hdr;
 
        tdb = malloc(sizeof(*tdb));
        if (!tdb) {
@@ -244,17 +190,15 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
        }
        tdb->name = NULL;
        tdb->map_ptr = NULL;
+       tdb->direct_access = 0;
        tdb->fd = -1;
        tdb->map_size = sizeof(struct tdb_header);
        tdb->ecode = TDB_SUCCESS;
-       /* header will be read in below. */
-       tdb->header_uptodate = false;
        tdb->flags = tdb_flags;
        tdb->log = null_log_fn;
        tdb->log_priv = NULL;
-       tdb->khash = jenkins_hash;
-       tdb->hash_priv = NULL;
        tdb->transaction = NULL;
+       tdb_hash_init(tdb);
        /* last_zone will be set below. */
        tdb_io_init(tdb);
        tdb_lock_init(tdb);
@@ -296,13 +240,13 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
        /* internal databases don't need any of the rest. */
        if (tdb->flags & TDB_INTERNAL) {
                tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
-               if (tdb_new_database(tdb) != 0) {
+               if (tdb_new_database(tdb, &hdr) != 0) {
                        tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
                                 "tdb_open: tdb_new_database failed!");
                        goto fail;
                }
-               TEST_IT(tdb->flags & TDB_CONVERT);
-               tdb_convert(tdb, &tdb->header, sizeof(tdb->header));
+               tdb_convert(tdb, &hdr.hash_seed, sizeof(hdr.hash_seed));
+               tdb->hash_seed = hdr.hash_seed;
                tdb_zone_init(tdb);
                return tdb;
        }
@@ -326,32 +270,32 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
                goto fail;      /* errno set by tdb_brlock */
        }
 
-       if (!tdb_pread_all(tdb->fd, &tdb->header, sizeof(tdb->header), 0)
-           || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0) {
-               if (!(open_flags & O_CREAT) || tdb_new_database(tdb) == -1) {
+       if (!tdb_pread_all(tdb->fd, &hdr, sizeof(hdr), 0)
+           || strcmp(hdr.magic_food, TDB_MAGIC_FOOD) != 0) {
+               if (!(open_flags & O_CREAT) || tdb_new_database(tdb, &hdr) == -1) {
                        if (errno == 0) {
                                errno = EIO; /* ie bad format or something */
                        }
                        goto fail;
                }
-       } else if (tdb->header.version != TDB_VERSION) {
-               if (tdb->header.version == bswap_64(TDB_VERSION))
+       } else if (hdr.version != TDB_VERSION) {
+               if (hdr.version == bswap_64(TDB_VERSION))
                        tdb->flags |= TDB_CONVERT;
                else {
                        /* wrong version */
                        tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
                                 "tdb_open: %s is unknown version 0x%llx\n",
-                                name, (long long)tdb->header.version);
+                                name, (long long)hdr.version);
                        errno = EIO;
                        goto fail;
                }
        }
 
-       tdb_convert(tdb, &tdb->header, sizeof(tdb->header));
+       tdb_convert(tdb, &hdr, sizeof(hdr));
+       tdb->hash_seed = hdr.hash_seed;
        hash_test = TDB_HASH_MAGIC;
-       hash_test = tdb->khash(&hash_test, sizeof(hash_test),
-                              tdb->header.hash_seed, tdb->hash_priv);
-       if (tdb->header.hash_test != hash_test) {
+       hash_test = tdb_hash(tdb, &hash_test, sizeof(hash_test));
+       if (hdr.hash_test != hash_test) {
                /* wrong hash variant */
                tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
                         "tdb_open: %s uses a different hash function\n",
@@ -420,145 +364,6 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
        return NULL;
 }
 
-tdb_off_t hash_off(struct tdb_context *tdb, uint64_t list)
-{
-       return tdb->header.v.hash_off
-               + ((list & ((1ULL << tdb->header.v.hash_bits) - 1))
-                  * sizeof(tdb_off_t));
-}
-
-/* Returns 0 if the entry is a zero (definitely not a match).
- * Returns a valid entry offset if it's a match.  Fills in rec.
- * Otherwise returns TDB_OFF_ERR: keep searching. */
-static tdb_off_t entry_matches(struct tdb_context *tdb,
-                              uint64_t list,
-                              uint64_t hash,
-                              const struct tdb_data *key,
-                              struct tdb_used_record *rec)
-{
-       tdb_off_t off;
-       uint64_t keylen;
-       const unsigned char *rkey;
-
-       list &= ((1ULL << tdb->header.v.hash_bits) - 1);
-
-       off = tdb_read_off(tdb, tdb->header.v.hash_off
-                          + list * sizeof(tdb_off_t));
-       if (off == 0 || off == TDB_OFF_ERR)
-               return off;
-
-#if 0 /* FIXME: Check other bits. */
-       unsigned int bits, bitmask, hoffextra;
-       /* Bottom three bits show how many extra hash bits. */
-       bits = (off & ((1 << TDB_EXTRA_HASHBITS_NUM) - 1)) + 1;
-       bitmask = (1 << bits)-1;
-       hoffextra = ((off >> TDB_EXTRA_HASHBITS_NUM) & bitmask);
-       uint64_t hextra = hash >> tdb->header.v.hash_bits;
-       if ((hextra & bitmask) != hoffextra) 
-               return TDB_OFF_ERR;
-       off &= ~...;
-#endif
-
-       if (tdb_read_convert(tdb, off, rec, sizeof(*rec)) == -1)
-               return TDB_OFF_ERR;
-
-       /* FIXME: check extra bits in header! */
-       keylen = rec_key_length(rec);
-       if (keylen != key->dsize)
-               return TDB_OFF_ERR;
-
-       rkey = tdb_access_read(tdb, off + sizeof(*rec), keylen, false);
-       if (!rkey)
-               return TDB_OFF_ERR;
-       if (memcmp(rkey, key->dptr, keylen) != 0)
-               off = TDB_OFF_ERR;
-       tdb_access_release(tdb, rkey);
-       return off;
-}
-
-/* FIXME: Optimize? */
-static void unlock_lists(struct tdb_context *tdb,
-                        tdb_off_t list, tdb_len_t num,
-                        int ltype)
-{
-       tdb_off_t i;
-
-       for (i = list; i < list + num; i++)
-               tdb_unlock_list(tdb, i, ltype);
-}
-
-/* FIXME: Optimize? */
-static int lock_lists(struct tdb_context *tdb,
-                     tdb_off_t list, tdb_len_t num,
-                     int ltype)
-{
-       tdb_off_t i;
-
-       for (i = list; i < list + num; i++) {
-               if (tdb_lock_list(tdb, i, ltype, TDB_LOCK_WAIT)
-                   == TDB_OFF_ERR) {
-                       unlock_lists(tdb, list, i - list, ltype);
-                       return -1;
-               }
-       }
-       return 0;
-}
-
-/* We lock hashes up to the next empty offset.  We already hold the
- * lock on the start bucket, but we may need to release and re-grab
- * it.  If we fail, we hold no locks at all! */
-static tdb_len_t relock_hash_to_zero(struct tdb_context *tdb,
-                                    tdb_off_t start, int ltype)
-{
-       tdb_len_t num, len;
-
-again:
-       num = 1ULL << tdb->header.v.hash_bits;
-       len = tdb_find_zero_off(tdb, hash_off(tdb, start), num - start);
-       if (unlikely(len == num - start)) {
-               /* We hit the end of the hash range.  Drop lock: we have
-                  to lock start of hash first. */
-               tdb_len_t pre_locks;
-
-               tdb_unlock_list(tdb, start, ltype);
-
-               /* Grab something, so header is stable. */
-               if (tdb_lock_list(tdb, 0, ltype, TDB_LOCK_WAIT))
-                       return TDB_OFF_ERR;
-               pre_locks = tdb_find_zero_off(tdb, hash_off(tdb, 0), num);
-               /* We want to lock the zero entry as well. */
-               pre_locks++;
-               if (lock_lists(tdb, 1, pre_locks - 1, ltype) == -1) {
-                       tdb_unlock_list(tdb, 0, ltype);
-                       return TDB_OFF_ERR;
-               }
-
-               /* Now lock later ones. */
-               if (unlikely(lock_lists(tdb, start, len, ltype) == -1)) {
-                       unlock_lists(tdb, 0, pre_locks, ltype);
-                       return TDB_OFF_ERR;
-               }
-               len += pre_locks;
-       } else {
-               /* We want to lock the zero entry as well. */
-               len++;
-               /* But we already have lock on start. */
-               if (unlikely(lock_lists(tdb, start+1, len-1, ltype) == -1)) {
-                       tdb_unlock_list(tdb, start, ltype);
-                       return TDB_OFF_ERR;
-               }
-       }
-
-       /* Now, did we lose the race, and it's not zero any more? */
-       if (unlikely(tdb_read_off(tdb, hash_off(tdb, start + len - 1)) != 0)) {
-               /* Leave the start locked, as expected. */
-               unlock_lists(tdb, start + 1, len - 1, ltype);
-               goto again;
-       }
-
-       return len;
-}
-
 /* FIXME: modify, don't rewrite! */
 static int update_rec_hdr(struct tdb_context *tdb,
                          tdb_off_t off,
@@ -576,186 +381,10 @@ static int update_rec_hdr(struct tdb_context *tdb,
        return tdb_write_convert(tdb, off, rec, sizeof(*rec));
 }
 
-static int hash_add(struct tdb_context *tdb,
-                   uint64_t hash, tdb_off_t off)
-{
-       tdb_off_t i, hoff, len, num;
-
-       /* Look for next space. */
-       i = (hash & ((1ULL << tdb->header.v.hash_bits) - 1));
-       len = (1ULL << tdb->header.v.hash_bits) - i;
-       num = tdb_find_zero_off(tdb, hash_off(tdb, i), len);
-
-       if (unlikely(num == len)) {
-               /* We wrapped.  Look through start of hash table. */
-               i = 0;
-               hoff = hash_off(tdb, 0);
-               len = (1ULL << tdb->header.v.hash_bits);
-               num = tdb_find_zero_off(tdb, hoff, len);
-               if (num == len) {
-                       tdb->ecode = TDB_ERR_CORRUPT;
-                       tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                                "hash_add: full hash table!\n");
-                       return -1;
-               }
-       }
-       if (tdb_read_off(tdb, hash_off(tdb, i + num)) != 0) {
-               tdb->ecode = TDB_ERR_CORRUPT;
-               tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                        "hash_add: overwriting hash table?\n");
-               return -1;
-       }
-
-       /* FIXME: Encode extra hash bits! */
-       return tdb_write_off(tdb, hash_off(tdb, i + num), off);
-}
-
-/* If we fail, others will try after us. */
-static void enlarge_hash(struct tdb_context *tdb)
-{
-       tdb_off_t newoff, oldoff, i;
-       tdb_len_t hlen;
-       uint64_t num = 1ULL << tdb->header.v.hash_bits;
-       struct tdb_used_record pad, *r;
-       unsigned int records = 0;
-
-       /* FIXME: We should do this without holding locks throughout. */
-       if (tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false) == -1)
-               return;
-
-       /* Someone else enlarged for us?  Nothing to do. */
-       if ((1ULL << tdb->header.v.hash_bits) != num)
-               goto unlock;
-
-       /* Allocate our new array. */
-       hlen = num * sizeof(tdb_off_t) * 2;
-       newoff = alloc(tdb, 0, hlen, 0, false);
-       if (unlikely(newoff == TDB_OFF_ERR))
-               goto unlock;
-       /* Step over record header! */
-       newoff += sizeof(struct tdb_used_record);
-
-       /* Starts all zero. */
-       if (zero_out(tdb, newoff, hlen) == -1)
-               goto unlock;
-
-       /* Update header now so we can use normal routines. */
-       oldoff = tdb->header.v.hash_off;
-
-       tdb->header.v.hash_bits++;
-       tdb->header.v.hash_off = newoff;
-
-       /* FIXME: If the space before is empty, we know this is in its ideal
-        * location.  Or steal a bit from the pointer to avoid rehash. */
-       for (i = 0; i < num; i++) {
-               tdb_off_t off;
-               off = tdb_read_off(tdb, oldoff + i * sizeof(tdb_off_t));
-               if (unlikely(off == TDB_OFF_ERR))
-                       goto oldheader;
-               if (off && hash_add(tdb, hash_record(tdb, off), off) == -1)
-                       goto oldheader;
-               if (off)
-                       records++;
-       }
-
-       tdb->log(tdb, TDB_DEBUG_TRACE, tdb->log_priv,
-                "enlarge_hash: moved %u records from %llu buckets.\n",
-                records, (long long)num);
-
-       /* Free up old hash. */
-       r = tdb_get(tdb, oldoff - sizeof(*r), &pad, sizeof(*r));
-       if (!r)
-               goto oldheader;
-       add_free_record(tdb, rec_zone_bits(r), oldoff - sizeof(*r),
-                       sizeof(*r)+rec_data_length(r)+rec_extra_padding(r));
-
-       /* Now we write the modified header. */
-       write_header(tdb);
-unlock:
-       tdb_allrecord_unlock(tdb, F_WRLCK);
-       return;
-
-oldheader:
-       tdb->header.v.hash_bits--;
-       tdb->header.v.hash_off = oldoff;
-       goto unlock;
-}
-
-
-/* This is the slow version of the routine which searches the
- * hashtable for an entry.
- * We lock every hash bucket up to and including the next zero one.
- */
-static tdb_off_t find_and_lock_slow(struct tdb_context *tdb,
-                                   struct tdb_data key,
-                                   uint64_t h,
-                                   int ltype,
-                                   tdb_off_t *start_lock,
-                                   tdb_len_t *num_locks,
-                                   tdb_off_t *bucket,
-                                   struct tdb_used_record *rec)
-{
-       /* Warning: this may drop the lock on *bucket! */
-       *num_locks = relock_hash_to_zero(tdb, *start_lock, ltype);
-       if (*num_locks == TDB_OFF_ERR)
-               return TDB_OFF_ERR;
-
-       for (*bucket = *start_lock;
-            *bucket < *start_lock + *num_locks;
-            (*bucket)++) {
-               tdb_off_t off = entry_matches(tdb, *bucket, h, &key, rec);
-               /* Empty entry or we found it? */
-               if (off == 0 || off != TDB_OFF_ERR)
-                       return off;
-       }
-
-       /* We didn't find a zero entry?  Something went badly wrong... */
-       unlock_lists(tdb, *start_lock, *start_lock + *num_locks, ltype);
-       tdb->ecode = TDB_ERR_CORRUPT;
-       tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                "find_and_lock: expected to find an empty hash bucket!\n");
-       return TDB_OFF_ERR;
-}
-
-/* This is the core routine which searches the hashtable for an entry.
- * On error, no locks are held and TDB_OFF_ERR is returned.
- * Otherwise, *num_locks locks of type ltype from *start_lock are held.
- * The bucket where the entry is (or would be) is in *bucket.
- * If not found, the return value is 0.
- * If found, the return value is the offset, and *rec is the record. */
-static tdb_off_t find_and_lock(struct tdb_context *tdb,
-                              struct tdb_data key,
-                              uint64_t h,
-                              int ltype,
-                              tdb_off_t *start_lock,
-                              tdb_len_t *num_locks,
-                              tdb_off_t *bucket,
-                              struct tdb_used_record *rec)
-{
-       tdb_off_t off;
-
-       /* FIXME: can we avoid locks for some fast paths? */
-       *start_lock = tdb_lock_list(tdb, h, ltype, TDB_LOCK_WAIT);
-       if (*start_lock == TDB_OFF_ERR)
-               return TDB_OFF_ERR;
-
-       /* Fast path. */
-       off = entry_matches(tdb, *start_lock, h, &key, rec);
-       if (likely(off != TDB_OFF_ERR)) {
-               *bucket = *start_lock;
-               *num_locks = 1;
-               return off;
-       }
-
-       /* Slow path, need to grab more locks and search. */
-       return find_and_lock_slow(tdb, key, h, ltype, start_lock, num_locks,
-                                 bucket, rec);
-}
-
-/* Returns -1 on error, 0 on OK" */
+/* Returns -1 on error, 0 on OK */
 static int replace_data(struct tdb_context *tdb,
-                       uint64_t h, struct tdb_data key, struct tdb_data dbuf,
-                       tdb_off_t bucket,
+                       struct hash_info *h,
+                       struct tdb_data key, struct tdb_data dbuf,
                        tdb_off_t old_off, tdb_len_t old_room,
                        unsigned old_zone,
                        bool growing)
@@ -763,19 +392,21 @@ static int replace_data(struct tdb_context *tdb,
        tdb_off_t new_off;
 
        /* Allocate a new record. */
-       new_off = alloc(tdb, key.dsize, dbuf.dsize, h, growing);
+       new_off = alloc(tdb, key.dsize, dbuf.dsize, h->h, growing);
        if (unlikely(new_off == TDB_OFF_ERR))
                return -1;
 
        /* We didn't like the existing one: remove it. */
-       if (old_off)
+       if (old_off) {
                add_free_record(tdb, old_zone, old_off,
                                sizeof(struct tdb_used_record)
                                + key.dsize + old_room);
-
-       /* FIXME: Encode extra hash bits! */
-       if (tdb_write_off(tdb, hash_off(tdb, bucket), new_off) == -1)
-               return -1;
+               if (replace_in_hash(tdb, h, new_off) == -1)
+                       return -1;
+       } else {
+               if (add_to_hash(tdb, h, new_off) == -1)
+                       return -1;
+       }
 
        new_off += sizeof(struct tdb_used_record);
        if (tdb->methods->write(tdb, new_off, key.dptr, key.dsize) == -1)
@@ -792,14 +423,13 @@ static int replace_data(struct tdb_context *tdb,
 int tdb_store(struct tdb_context *tdb,
              struct tdb_data key, struct tdb_data dbuf, int flag)
 {
-       tdb_off_t off, bucket, start, num;
+       struct hash_info h;
+       tdb_off_t off;
        tdb_len_t old_room = 0;
        struct tdb_used_record rec;
-       uint64_t h;
        int ret;
 
-       h = tdb_hash(tdb, key.dptr, key.dsize);
-       off = find_and_lock(tdb, key, h, F_WRLCK, &start, &num, &bucket, &rec);
+       off = find_and_lock(tdb, key, F_WRLCK, &h, &rec);
        if (unlikely(off == TDB_OFF_ERR))
                return -1;
 
@@ -817,13 +447,14 @@ int tdb_store(struct tdb_context *tdb,
                                /* Can modify in-place.  Easy! */
                                if (update_rec_hdr(tdb, off,
                                                   key.dsize, dbuf.dsize,
-                                                  &rec, h))
+                                                  &rec, h.h))
                                        goto fail;
                                if (tdb->methods->write(tdb, off + sizeof(rec)
                                                        + key.dsize,
                                                        dbuf.dptr, dbuf.dsize))
                                        goto fail;
-                               unlock_lists(tdb, start, num, F_WRLCK);
+                               tdb_unlock_hashes(tdb, h.hlock_start,
+                                                 h.hlock_range, F_WRLCK);
                                return 0;
                        }
                        /* FIXME: See if right record is free? */
@@ -839,35 +470,28 @@ int tdb_store(struct tdb_context *tdb,
        }
 
        /* If we didn't use the old record, this implies we're growing. */
-       ret = replace_data(tdb, h, key, dbuf, bucket, off, old_room,
+       ret = replace_data(tdb, &h, key, dbuf, off, old_room,
                           rec_zone_bits(&rec), off != 0);
-       unlock_lists(tdb, start, num, F_WRLCK);
-
-       /* FIXME: by simple simulation, this approximated 60% full.
-        * Check in real case! */
-       if (unlikely(num > 4 * tdb->header.v.hash_bits - 30))
-               enlarge_hash(tdb);
-
+       tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK);
        return ret;
 
 fail:
-       unlock_lists(tdb, start, num, F_WRLCK);
+       tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK);
        return -1;
 }
 
 int tdb_append(struct tdb_context *tdb,
               struct tdb_data key, struct tdb_data dbuf)
 {
-       tdb_off_t off, bucket, start, num;
+       struct hash_info h;
+       tdb_off_t off;
        struct tdb_used_record rec;
        tdb_len_t old_room = 0, old_dlen;
-       uint64_t h;
        unsigned char *newdata;
        struct tdb_data new_dbuf;
        int ret;
 
-       h = tdb_hash(tdb, key.dptr, key.dsize);
-       off = find_and_lock(tdb, key, h, F_WRLCK, &start, &num, &bucket, &rec);
+       off = find_and_lock(tdb, key, F_WRLCK, &h, &rec);
        if (unlikely(off == TDB_OFF_ERR))
                return -1;
 
@@ -878,7 +502,7 @@ int tdb_append(struct tdb_context *tdb,
                /* Fast path: can append in place. */
                if (rec_extra_padding(&rec) >= dbuf.dsize) {
                        if (update_rec_hdr(tdb, off, key.dsize,
-                                          old_dlen + dbuf.dsize, &rec, h))
+                                          old_dlen + dbuf.dsize, &rec, h.h))
                                goto fail;
 
                        off += sizeof(rec) + key.dsize + old_dlen;
@@ -887,7 +511,8 @@ int tdb_append(struct tdb_context *tdb,
                                goto fail;
 
                        /* FIXME: tdb_increment_seqnum(tdb); */
-                       unlock_lists(tdb, start, num, F_WRLCK);
+                       tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
+                                         F_WRLCK);
                        return 0;
                }
                /* FIXME: Check right record free? */
@@ -915,32 +540,26 @@ int tdb_append(struct tdb_context *tdb,
        }
 
        /* If they're using tdb_append(), it implies they're growing record. */
-       ret = replace_data(tdb, h, key, new_dbuf, bucket, off, old_room,
-                          rec_zone_bits(&rec), true);
-       unlock_lists(tdb, start, num, F_WRLCK);
+       ret = replace_data(tdb, &h, key, new_dbuf, off,
+                          old_room, rec_zone_bits(&rec), true);
+       tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK);
        free(newdata);
 
-       /* FIXME: by simple simulation, this approximated 60% full.
-        * Check in real case! */
-       if (unlikely(num > 4 * tdb->header.v.hash_bits - 30))
-               enlarge_hash(tdb);
-
        return ret;
 
 fail:
-       unlock_lists(tdb, start, num, F_WRLCK);
+       tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK);
        return -1;
 }
 
 struct tdb_data tdb_fetch(struct tdb_context *tdb, struct tdb_data key)
 {
-       tdb_off_t off, start, num, bucket;
+       tdb_off_t off;
        struct tdb_used_record rec;
-       uint64_t h;
+       struct hash_info h;
        struct tdb_data ret;
 
-       h = tdb_hash(tdb, key.dptr, key.dsize);
-       off = find_and_lock(tdb, key, h, F_RDLCK, &start, &num, &bucket, &rec);
+       off = find_and_lock(tdb, key, F_RDLCK, &h, &rec);
        if (unlikely(off == TDB_OFF_ERR))
                return tdb_null;
 
@@ -953,71 +572,29 @@ struct tdb_data tdb_fetch(struct tdb_context *tdb, struct tdb_data key)
                                          ret.dsize);
        }
 
-       unlock_lists(tdb, start, num, F_RDLCK);
+       tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_RDLCK);
        return ret;
 }
 
 int tdb_delete(struct tdb_context *tdb, struct tdb_data key)
 {
-       tdb_off_t i, bucket, off, start, num;
+       tdb_off_t off;
        struct tdb_used_record rec;
-       uint64_t h;
+       struct hash_info h;
 
-       h = tdb_hash(tdb, key.dptr, key.dsize);
-       start = tdb_lock_list(tdb, h, F_WRLCK, TDB_LOCK_WAIT);
-       if (unlikely(start == TDB_OFF_ERR))
-               return -1;
-
-       /* FIXME: Fastpath: if next is zero, we can delete without lock,
-        * since this lock protects us. */
-       off = find_and_lock_slow(tdb, key, h, F_WRLCK,
-                                &start, &num, &bucket, &rec);
+       off = find_and_lock(tdb, key, F_WRLCK, &h, &rec);
        if (unlikely(off == TDB_OFF_ERR))
                return -1;
 
        if (!off) {
-               /* FIXME: We could optimize not found case if it mattered, by
-                * reading offset after first lock: if it's zero, goto here. */
-               unlock_lists(tdb, start, num, F_WRLCK);
+               tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK);
                tdb->ecode = TDB_ERR_NOEXIST;
                return -1;
        }
-       /* Since we found the entry, we must have locked it and a zero. */
-       assert(num >= 2);
 
-       /* This actually unlinks it. */
-       if (tdb_write_off(tdb, hash_off(tdb, bucket), 0) == -1)
+       if (delete_from_hash(tdb, &h) == -1)
                goto unlock_err;
 
-       /* Rehash anything following. */
-       for (i = bucket+1; i != bucket + num - 1; i++) {
-               tdb_off_t hoff, off2;
-               uint64_t h2;
-
-               hoff = hash_off(tdb, i);
-               off2 = tdb_read_off(tdb, hoff);
-               if (unlikely(off2 == TDB_OFF_ERR))
-                       goto unlock_err;
-
-               /* This can happen if we raced. */
-               if (unlikely(off2 == 0))
-                       break;
-
-               /* Maybe use a bit to indicate it is in ideal place? */
-               h2 = hash_record(tdb, off2);
-               /* Is it happy where it is? */
-               if (hash_off(tdb, h2) == hoff)
-                       continue;
-
-               /* Remove it. */
-               if (tdb_write_off(tdb, hoff, 0) == -1)
-                       goto unlock_err;
-
-               /* Rehash it. */
-               if (hash_add(tdb, h2, off2) == -1)
-                       goto unlock_err;
-       }
-
        /* Free the deleted entry. */
        if (add_free_record(tdb, rec_zone_bits(&rec), off,
                            sizeof(struct tdb_used_record)
@@ -1026,11 +603,11 @@ int tdb_delete(struct tdb_context *tdb, struct tdb_data key)
                            + rec_extra_padding(&rec)) != 0)
                goto unlock_err;
 
-       unlock_lists(tdb, start, num, F_WRLCK);
+       tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK);
        return 0;
 
 unlock_err:
-       unlock_lists(tdb, start, num, F_WRLCK);
+       tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK);
        return -1;
 }
 
index 5d434c225fc6bb14d92834c47462055d16652a0b..ab3837465477943a48be4b52887673331d0d26a1 100644 (file)
@@ -10,7 +10,6 @@ struct tdb_layout *new_tdb_layout(void)
        struct tdb_layout *layout = malloc(sizeof(*layout));
        layout->num_elems = 0;
        layout->elem = NULL;
-       layout->htable = -1;
        return layout;
 }
 
@@ -63,19 +62,6 @@ void tdb_layout_add_used(struct tdb_layout *layout,
        add(layout, elem);
 }
 
-void tdb_layout_add_hashtable(struct tdb_layout *layout,
-                             unsigned int hash_bits,
-                             tdb_len_t extra)
-{
-       union tdb_layout_elem elem;
-       elem.base.type = HASHTABLE;
-       elem.hashtable.hash_bits = hash_bits;
-       elem.hashtable.extra = extra;
-       assert(layout->htable == -1U);
-       layout->htable = layout->num_elems;
-       add(layout, elem);
-}
-
 static tdb_len_t free_record_len(tdb_len_t len)
 {
        return sizeof(struct tdb_used_record) + len;
@@ -93,7 +79,8 @@ static tdb_len_t data_record_len(struct tle_used *used)
 static tdb_len_t hashtable_len(struct tle_hashtable *htable)
 {
        return sizeof(struct tdb_used_record)
-               + (sizeof(tdb_off_t) << htable->hash_bits);
+               + (sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS)
+               + htable->extra;
 }
 
 static tdb_len_t zone_header_len(struct tle_zone *zone)
@@ -127,7 +114,7 @@ static void set_hashtable(void *mem, struct tdb_context *tdb,
                          struct tle_hashtable *htable)
 {
        struct tdb_used_record *u = mem;
-       tdb_len_t len = sizeof(tdb_off_t) << htable->hash_bits;
+       tdb_len_t len = sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS;
 
        set_header(tdb, u, 0, len, len + htable->extra, 0,
                   last_zone->zone_bits);
@@ -151,17 +138,64 @@ static void add_to_freetable(struct tdb_context *tdb,
                        sizeof(struct tdb_used_record) + elen);
 }
 
+static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned ingroup)
+{
+       return group_start
+               + (ingroup % (1 << TDB_HASH_GROUP_BITS)) * sizeof(tdb_off_t);
+}
+
+/* Get bits from a value. */
+static uint32_t bits(uint64_t val, unsigned start, unsigned num)
+{
+       assert(num <= 32);
+       return (val >> start) & ((1U << num) - 1);
+}
+
+/* We take bits from the top: that way we can lock whole sections of the hash
+ * by using lock ranges. */
+static uint32_t use_bits(uint64_t h, unsigned num, unsigned *used)
+{
+       *used += num;
+       return bits(h, 64 - *used, num);
+}
+
+static tdb_off_t encode_offset(tdb_off_t new_off, unsigned bucket,
+                              uint64_t h)
+{
+       return bucket
+               | new_off
+               | ((uint64_t)bits(h, 64 - TDB_OFF_UPPER_STEAL_EXTRA,
+                                 TDB_OFF_UPPER_STEAL_EXTRA)
+                  << TDB_OFF_HASH_EXTRA_BIT);
+}
+
+/* FIXME: Our hash table handling here is primitive: we don't expand! */
 static void add_to_hashtable(struct tdb_context *tdb,
                             tdb_off_t eoff,
                             struct tdb_data key)
 {
-       uint64_t hash = tdb_hash(tdb, key.dptr, key.dsize);
-       tdb_off_t hoff;
+       uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
+       tdb_off_t b_off, group_start;
+       unsigned i, group, in_group;
+       unsigned used = 0;
+
+       group = use_bits(h, TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS, &used);
+       in_group = use_bits(h, TDB_HASH_GROUP_BITS, &used);
+
+       group_start = offsetof(struct tdb_header, hashtable)
+               + group * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
 
-       while (tdb_read_off(tdb, hoff = hash_off(tdb, hash)) != 0)
-               hash++;
+       for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
+               unsigned bucket = (in_group + i) % (1 << TDB_HASH_GROUP_BITS);
 
-       tdb_write_off(tdb, hoff, eoff);
+               b_off = hbucket_off(group_start, bucket);               
+               if (tdb_read_off(tdb, b_off) == 0) {
+                       tdb_write_off(tdb, b_off,
+                                     encode_offset(eoff, bucket, h));
+                       return;
+               }
+       }
+       abort();
 }
 
 /* FIXME: Support TDB_CONVERT */
@@ -170,12 +204,10 @@ struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
        unsigned int i;
        tdb_off_t off, len;
        tdb_len_t zone_left;
-       struct tdb_header *hdr;
        char *mem;
        struct tdb_context *tdb;
        struct tle_zone *last_zone = NULL;
 
-       assert(layout->htable != -1U);
        assert(layout->elem[0].base.type == ZONE);
 
        zone_left = 0;
@@ -221,18 +253,12 @@ struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
        mem = malloc(off+1);
        /* Now populate our header, cribbing from a real TDB header. */
        tdb = tdb_open(NULL, TDB_INTERNAL, O_RDWR, 0, &tap_log_attr);
-       hdr = (void *)mem;
-       *hdr = tdb->header;
-       hdr->v.generation++;
-       hdr->v.hash_bits = layout->elem[layout->htable].hashtable.hash_bits;
-       hdr->v.hash_off = layout->elem[layout->htable].base.off
-               + sizeof(struct tdb_used_record);
+       memcpy(mem, tdb->map_ptr, sizeof(struct tdb_header));
 
        /* Mug the tdb we have to make it use this. */
        free(tdb->map_ptr);
        tdb->map_ptr = mem;
        tdb->map_size = off+1;
-       header_changed(tdb);
 
        for (i = 0; i < layout->num_elems; i++) {
                union tdb_layout_elem *e = &layout->elem[i];
index 7a9d319d16c8edafdc6ff03b221e0281f76c3b1a..25edd8b777a4cd18e8c50522d8fbe69719df17b0 100644 (file)
@@ -10,9 +10,12 @@ void tdb_layout_add_free(struct tdb_layout *layout, tdb_len_t len);
 void tdb_layout_add_used(struct tdb_layout *layout,
                         TDB_DATA key, TDB_DATA data,
                         tdb_len_t extra);
+#if 0 /* FIXME: Allow allocation of subtables */
 void tdb_layout_add_hashtable(struct tdb_layout *layout,
-                             unsigned int hash_bits,
+                             int htable_parent, /* -1 == toplevel */
+                             unsigned int bucket,
                              tdb_len_t extra);
+#endif
 struct tdb_context *tdb_layout_get(struct tdb_layout *layout);
 
 enum layout_type {
@@ -44,7 +47,8 @@ struct tle_used {
 
 struct tle_hashtable {
        struct tle_base base;
-       unsigned hash_bits;
+       int parent;
+       unsigned int bucket;
        tdb_len_t extra;
 };
 
@@ -59,6 +63,5 @@ union tdb_layout_elem {
 struct tdb_layout {
        unsigned int num_elems;
        union tdb_layout_elem *elem;
-       unsigned int htable;
 };
 #endif /* TDB2_TEST_LAYOUT_H */
index d1b124ef034d44274d7f19fd5576a7340743defa..a62cb1d70998fb291e3fb8d7c9570722ea4513b8 100644 (file)
@@ -1,6 +1,7 @@
 #include <ccan/tdb2/tdb.c>
 #include <ccan/tdb2/free.c>
 #include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/hash.c>
 #include <ccan/tdb2/io.c>
 #include <ccan/tap/tap.h>
 #include "logging.h"
@@ -27,7 +28,7 @@ int main(int argc, char *argv[])
 
        /* We should neatly encode all values. */
        for (i = 0; i < 48; i++) {
-               uint64_t h = 1ULL << (i < 5 ? 63 - i : 63 - 4);
+               uint64_t h = 1ULL << (i < 5 ? i : 4);
                uint64_t klen = 1ULL << (i < 16 ? i : 15);
                uint64_t dlen = 1ULL << i;
                uint64_t xlen = 1ULL << (i < 32 ? i : 31);
@@ -38,7 +39,7 @@ int main(int argc, char *argv[])
                ok1(rec_key_length(&rec) == klen);
                ok1(rec_data_length(&rec) == dlen);
                ok1(rec_extra_padding(&rec) == xlen);
-               ok1((uint64_t)rec_hash(&rec) << (64 - 5) == h);
+               ok1((uint64_t)rec_hash(&rec) == h);
                ok1(rec_zone_bits(&rec) == zbits);
                ok1(rec_magic(&rec) == TDB_MAGIC);
        }
index 4ecc6f8560329438bbdd53cbf29e48bf7ed6b1c7..6b6df48d397c102220fa56f4c192ba35ec6d56c5 100644 (file)
@@ -2,6 +2,7 @@
 #include <ccan/tdb2/free.c>
 #include <ccan/tdb2/lock.c>
 #include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
 #include <ccan/tap/tap.h>
 
 static unsigned int dumb_fls(uint64_t num)
index 75f2da13e1b808026117cf8da99cae45bfcb6569..6b82d57657beb3b05f6bc880b4de6d6142d7af0a 100644 (file)
@@ -2,6 +2,7 @@
 #include <ccan/tdb2/free.c>
 #include <ccan/tdb2/lock.c>
 #include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
 #include <ccan/tdb2/check.c>
 #include <ccan/tap/tap.h>
 #include "logging.h"
index 3ce33b9f80303ff47d58cc4e79db54d14169024c..b35176453ca29941225500664b6de57a29b68c1a 100644 (file)
@@ -3,6 +3,7 @@
 #include <ccan/tdb2/lock.c>
 #include <ccan/tdb2/io.c>
 #include <ccan/tdb2/check.c>
+#include <ccan/tdb2/hash.c>
 #include <ccan/tap/tap.h>
 #include "logging.h"
 
index d3e624cf5bf66bdeda53e2b10487e543cfe1364c..529b6c7bf9625dfe980ef95a790d6fb686b16d6c 100644 (file)
@@ -2,6 +2,7 @@
 #include <ccan/tdb2/free.c>
 #include <ccan/tdb2/lock.c>
 #include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
 #include <ccan/tdb2/check.c>
 #include <ccan/tap/tap.h>
 #include "logging.h"
@@ -38,67 +39,63 @@ int main(int argc, char *argv[])
        /* No coalescing can be done due to EOF */
        layout = new_tdb_layout();
        tdb_layout_add_zone(layout, zone_bits, false);
-       tdb_layout_add_hashtable(layout, 12, 0);
        tdb = tdb_layout_get(layout);
-       len = layout->elem[2].free.len;
+       len = layout->elem[1].free.len;
        zone_off = layout->elem[0].base.off;
        ok1(tdb_check(tdb, NULL, NULL) == 0);
-       ok1(free_record_length(tdb, layout->elem[2].base.off) == len);
+       ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
 
        /* Figure out which bucket free entry is. */
        b_off = bucket_off(zone_off, size_to_bucket(zone_bits, len));
        /* Lock and fail to coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
-       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[1].base.off,
                     b_off, len) == 0);
        tdb_unlock_free_bucket(tdb, b_off);
-       tdb_unlock_list(tdb, 0, F_WRLCK);
-       ok1(free_record_length(tdb, layout->elem[2].base.off) == len);
+       ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
        ok1(tdb_check(tdb, NULL, NULL) == 0);
        tdb_close(tdb);
 
        /* No coalescing can be done due to used record */
        layout = new_tdb_layout();
        tdb_layout_add_zone(layout, zone_bits, false);
-       tdb_layout_add_hashtable(layout, 12, 0);
        tdb_layout_add_free(layout, 1024);
        tdb_layout_add_used(layout, key, data, 6);
        tdb = tdb_layout_get(layout);
        zone_off = layout->elem[0].base.off;
-       ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+       ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
        ok1(tdb_check(tdb, NULL, NULL) == 0);
 
        /* Figure out which bucket free entry is. */
        b_off = bucket_off(zone_off, size_to_bucket(zone_bits, 1024));
        /* Lock and fail to coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
-       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[1].base.off,
                     b_off, 1024) == 0);
        tdb_unlock_free_bucket(tdb, b_off);
-       ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+       ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
        ok1(tdb_check(tdb, NULL, NULL) == 0);
        tdb_close(tdb);
 
        /* Coalescing can be done due to two free records, then EOF */
        layout = new_tdb_layout();
        tdb_layout_add_zone(layout, zone_bits, false);
-       tdb_layout_add_hashtable(layout, 12, 0);
        tdb_layout_add_free(layout, 1024);
        tdb = tdb_layout_get(layout);
        zone_off = layout->elem[0].base.off;
-       len = layout->elem[3].free.len;
-       ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
-       ok1(free_record_length(tdb, layout->elem[3].base.off) == len);
+       len = layout->elem[2].free.len;
+       ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
+       ok1(free_record_length(tdb, layout->elem[2].base.off) == len);
        ok1(tdb_check(tdb, NULL, NULL) == 0);
 
        /* Figure out which bucket (first) free entry is. */
        b_off = bucket_off(zone_off, size_to_bucket(zone_bits, 1024));
        /* Lock and coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
-       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[1].base.off,
                     b_off, 1024) == 1);
        ok1(!tdb_has_locks(tdb));
-       ok1(free_record_length(tdb, layout->elem[2].base.off)
+       ok1(free_record_length(tdb, layout->elem[1].base.off)
            == 1024 + sizeof(struct tdb_used_record) + len);
        ok1(tdb_check(tdb, NULL, NULL) == 0);
        tdb_close(tdb);
@@ -106,24 +103,23 @@ int main(int argc, char *argv[])
        /* Coalescing can be done due to two free records, then data */
        layout = new_tdb_layout();
        tdb_layout_add_zone(layout, zone_bits, false);
-       tdb_layout_add_hashtable(layout, 12, 0);
        tdb_layout_add_free(layout, 1024);
        tdb_layout_add_free(layout, 512);
        tdb_layout_add_used(layout, key, data, 6);
        tdb = tdb_layout_get(layout);
        zone_off = layout->elem[0].base.off;
-       ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
-       ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
+       ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
+       ok1(free_record_length(tdb, layout->elem[2].base.off) == 512);
        ok1(tdb_check(tdb, NULL, NULL) == 0);
 
        /* Figure out which bucket free entry is. */
        b_off = bucket_off(zone_off, size_to_bucket(zone_bits, 1024));
        /* Lock and coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
-       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[1].base.off,
                     b_off, 1024) == 1);
        ok1(!tdb_has_locks(tdb));
-       ok1(free_record_length(tdb, layout->elem[2].base.off)
+       ok1(free_record_length(tdb, layout->elem[1].base.off)
            == 1024 + sizeof(struct tdb_used_record) + 512);
        ok1(tdb_check(tdb, NULL, NULL) == 0);
        tdb_close(tdb);
@@ -131,25 +127,24 @@ int main(int argc, char *argv[])
        /* Coalescing can be done due to three free records, then EOF */
        layout = new_tdb_layout();
        tdb_layout_add_zone(layout, zone_bits, false);
-       tdb_layout_add_hashtable(layout, 12, 0);
        tdb_layout_add_free(layout, 1024);
        tdb_layout_add_free(layout, 512);
        tdb = tdb_layout_get(layout);
        zone_off = layout->elem[0].base.off;
-       len = layout->elem[4].free.len;
-       ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
-       ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
-       ok1(free_record_length(tdb, layout->elem[4].base.off) == len);
+       len = layout->elem[3].free.len;
+       ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
+       ok1(free_record_length(tdb, layout->elem[2].base.off) == 512);
+       ok1(free_record_length(tdb, layout->elem[3].base.off) == len);
        ok1(tdb_check(tdb, NULL, NULL) == 0);
 
        /* Figure out which bucket free entry is. */
        b_off = bucket_off(zone_off, size_to_bucket(zone_bits, 1024));
        /* Lock and coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
-       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[1].base.off,
                     b_off, 1024) == 1);
        ok1(!tdb_has_locks(tdb));
-       ok1(free_record_length(tdb, layout->elem[2].base.off)
+       ok1(free_record_length(tdb, layout->elem[1].base.off)
            == 1024 + sizeof(struct tdb_used_record) + 512
            + sizeof(struct tdb_used_record) + len);
        ok1(tdb_check(tdb, NULL, NULL) == 0);
@@ -158,23 +153,22 @@ int main(int argc, char *argv[])
        /* Coalescing across two zones isn't possible. */
        layout = new_tdb_layout();
        tdb_layout_add_zone(layout, zone_bits, false);
-       tdb_layout_add_hashtable(layout, 12, 0);
        tdb_layout_add_zone(layout, zone_bits, true);
        tdb = tdb_layout_get(layout);
        zone_off = layout->elem[0].base.off;
-       len = layout->elem[2].free.len;
-       ok1(free_record_length(tdb, layout->elem[2].base.off) == len);
+       len = layout->elem[1].free.len;
+       ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
        ok1(tdb_check(tdb, NULL, NULL) == 0);
 
        /* Figure out which list free entry is. */
        b_off = bucket_off(zone_off, size_to_bucket(zone_bits, len));
        /* Lock and coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
-       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+       ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[1].base.off,
                     b_off, len) == 0);
        tdb_unlock_free_bucket(tdb, b_off);
        ok1(!tdb_has_locks(tdb));
-       ok1(free_record_length(tdb, layout->elem[2].base.off) == len);
+       ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
        ok1(tdb_check(tdb, NULL, NULL) == 0);
        tdb_close(tdb);
 
diff --git a/ccan/tdb2/test/run-04-basichash.c b/ccan/tdb2/test/run-04-basichash.c
new file mode 100644 (file)
index 0000000..ba70cc3
--- /dev/null
@@ -0,0 +1,259 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+/* We rig the hash so adjacent-numbered records always clash. */
+static uint64_t clash(const void *key, size_t len, uint64_t seed, void *priv)
+{
+       return ((uint64_t)*(unsigned int *)key)
+               << (64 - TDB_TOPLEVEL_HASH_BITS - 1);
+}
+
+int main(int argc, char *argv[])
+{
+       unsigned int i, j;
+       struct tdb_context *tdb;
+       unsigned int v;
+       struct tdb_used_record rec;
+       struct tdb_data key = { (unsigned char *)&v, sizeof(v) };
+       struct tdb_data dbuf = { (unsigned char *)&v, sizeof(v) };
+       union tdb_attribute hattr = { .hash = { .base = { TDB_ATTRIBUTE_HASH },
+                                               .hash_fn = clash } };
+       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
+                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT,
+                       TDB_NOMMAP|TDB_CONVERT,
+       };
+
+       hattr.base.next = &tap_log_attr;
+
+       plan_tests(sizeof(flags) / sizeof(flags[0])
+                  * (91 + (2 * ((1 << TDB_HASH_GROUP_BITS) - 1))) + 1);
+       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+               struct hash_info h;
+               tdb_off_t new_off, off, subhash;
+
+               tdb = tdb_open("run-04-basichash.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &hattr);
+               ok1(tdb);
+               if (!tdb)
+                       continue;
+
+               v = 0;
+               /* Should not find it. */
+               ok1(find_and_lock(tdb, key, F_WRLCK, &h, &rec) == 0);
+               /* Should have created correct hash. */
+               ok1(h.h == tdb_hash(tdb, key.dptr, key.dsize));
+               /* Should have located space in group 0, bucket 0. */
+               ok1(h.group_start == offsetof(struct tdb_header, hashtable));
+               ok1(h.home_bucket == 0);
+               ok1(h.found_bucket == 0);
+               ok1(h.hash_used == TDB_TOPLEVEL_HASH_BITS);
+
+               /* Should have lock on bucket 0 */
+               ok1(h.hlock_start == 0);
+               ok1(h.hlock_range == 
+                   1ULL << (64-(TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS)));
+               ok1((tdb->flags & TDB_NOLOCK) || tdb->num_lockrecs == 1);
+               ok1((tdb->flags & TDB_NOLOCK)
+                   || tdb->lockrecs[0].off == TDB_HASH_LOCK_START);
+               /* FIXME: Check lock length */
+
+               /* Allocate a new record. */
+               new_off = alloc(tdb, key.dsize, dbuf.dsize, h.h, false);
+               ok1(new_off != TDB_OFF_ERR);
+
+               /* We should be able to add it now. */
+               ok1(add_to_hash(tdb, &h, new_off) == 0);
+
+               /* Make sure we fill it in for later finding. */
+               off = new_off + sizeof(struct tdb_used_record);
+               ok1(!tdb->methods->write(tdb, off, key.dptr, key.dsize));
+               off += key.dsize;
+               ok1(!tdb->methods->write(tdb, off, dbuf.dptr, dbuf.dsize));
+
+               /* We should be able to unlock that OK. */
+               ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
+                                     F_WRLCK) == 0);
+
+               /* Database should be consistent. */
+               ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+               /* Now, this should give a successful lookup. */
+               ok1(find_and_lock(tdb, key, F_WRLCK, &h, &rec) == new_off);
+               /* Should have created correct hash. */
+               ok1(h.h == tdb_hash(tdb, key.dptr, key.dsize));
+               /* Should have located space in group 0, bucket 0. */
+               ok1(h.group_start == offsetof(struct tdb_header, hashtable));
+               ok1(h.home_bucket == 0);
+               ok1(h.found_bucket == 0);
+               ok1(h.hash_used == TDB_TOPLEVEL_HASH_BITS);
+
+               /* Should have lock on bucket 0 */
+               ok1(h.hlock_start == 0);
+               ok1(h.hlock_range == 
+                   1ULL << (64-(TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS)));
+               ok1((tdb->flags & TDB_NOLOCK) || tdb->num_lockrecs == 1);
+               ok1((tdb->flags & TDB_NOLOCK)
+                   || tdb->lockrecs[0].off == TDB_HASH_LOCK_START);
+               /* FIXME: Check lock length */
+
+               ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
+                                     F_WRLCK) == 0);
+               
+               /* Database should be consistent. */
+               ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+               /* Test expansion. */
+               v = 1;
+               ok1(find_and_lock(tdb, key, F_WRLCK, &h, &rec) == 0);
+               /* Should have created correct hash. */
+               ok1(h.h == tdb_hash(tdb, key.dptr, key.dsize));
+               /* Should have located space in group 0, bucket 1. */
+               ok1(h.group_start == offsetof(struct tdb_header, hashtable));
+               ok1(h.home_bucket == 0);
+               ok1(h.found_bucket == 1);
+               ok1(h.hash_used == TDB_TOPLEVEL_HASH_BITS);
+
+               /* Should have lock on bucket 0 */
+               ok1(h.hlock_start == 0);
+               ok1(h.hlock_range == 
+                   1ULL << (64-(TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS)));
+               ok1((tdb->flags & TDB_NOLOCK) || tdb->num_lockrecs == 1);
+               ok1((tdb->flags & TDB_NOLOCK)
+                   || tdb->lockrecs[0].off == TDB_HASH_LOCK_START);
+               /* FIXME: Check lock length */
+
+               /* Make it expand 0'th bucket. */
+               ok1(expand_group(tdb, &h) == 0);
+               /* First one should be subhash, next should be empty. */
+               ok1(is_subhash(h.group[0]));
+               subhash = (h.group[0] & TDB_OFF_MASK);
+               for (j = 1; j < (1 << TDB_HASH_GROUP_BITS); j++)
+                       ok1(h.group[j] == 0);
+
+               ok1(tdb_write_convert(tdb, h.group_start,
+                                     h.group, sizeof(h.group)) == 0);
+               ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
+                                     F_WRLCK) == 0);
+
+               /* Should be happy with expansion. */
+               ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+               /* Should be able to find it. */
+               v = 0;
+               ok1(find_and_lock(tdb, key, F_WRLCK, &h, &rec) == new_off);
+               /* Should have created correct hash. */
+               ok1(h.h == tdb_hash(tdb, key.dptr, key.dsize));
+               /* Should have located space in expanded group 0, bucket 0. */
+               ok1(h.group_start == subhash + sizeof(struct tdb_used_record));
+               ok1(h.home_bucket == 0);
+               ok1(h.found_bucket == 0);
+               ok1(h.hash_used == TDB_TOPLEVEL_HASH_BITS
+                   + TDB_SUBLEVEL_HASH_BITS);
+
+               /* Should have lock on bucket 0 */
+               ok1(h.hlock_start == 0);
+               ok1(h.hlock_range == 
+                   1ULL << (64-(TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS)));
+               ok1((tdb->flags & TDB_NOLOCK) || tdb->num_lockrecs == 1);
+               ok1((tdb->flags & TDB_NOLOCK)
+                   || tdb->lockrecs[0].off == TDB_HASH_LOCK_START);
+               /* FIXME: Check lock length */
+
+               /* Simple delete should work. */
+               ok1(delete_from_hash(tdb, &h) == 0);
+               ok1(add_free_record(tdb, rec_zone_bits(&rec), new_off,
+                                   sizeof(struct tdb_used_record)
+                                   + rec_key_length(&rec)
+                                   + rec_data_length(&rec)
+                                   + rec_extra_padding(&rec)) == 0);
+               ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
+                                     F_WRLCK) == 0);
+               ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+               /* Test second-level expansion: should expand 0th bucket. */
+               v = 0;
+               ok1(find_and_lock(tdb, key, F_WRLCK, &h, &rec) == 0);
+               /* Should have created correct hash. */
+               ok1(h.h == tdb_hash(tdb, key.dptr, key.dsize));
+               /* Should have located space in group 0, bucket 0. */
+               ok1(h.group_start == subhash + sizeof(struct tdb_used_record));
+               ok1(h.home_bucket == 0);
+               ok1(h.found_bucket == 0);
+               ok1(h.hash_used == TDB_TOPLEVEL_HASH_BITS+TDB_SUBLEVEL_HASH_BITS);
+
+               /* Should have lock on bucket 0 */
+               ok1(h.hlock_start == 0);
+               ok1(h.hlock_range == 
+                   1ULL << (64-(TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS)));
+               ok1((tdb->flags & TDB_NOLOCK) || tdb->num_lockrecs == 1);
+               ok1((tdb->flags & TDB_NOLOCK)
+                   || tdb->lockrecs[0].off == TDB_HASH_LOCK_START);
+               /* FIXME: Check lock length */
+
+               ok1(expand_group(tdb, &h) == 0);
+               /* First one should be subhash, next should be empty. */
+               ok1(is_subhash(h.group[0]));
+               subhash = (h.group[0] & TDB_OFF_MASK);
+               for (j = 1; j < (1 << TDB_HASH_GROUP_BITS); j++)
+                       ok1(h.group[j] == 0);
+               ok1(tdb_write_convert(tdb, h.group_start,
+                                     h.group, sizeof(h.group)) == 0);
+               ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
+                                     F_WRLCK) == 0);
+
+               /* Should be happy with expansion. */
+               ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+               ok1(find_and_lock(tdb, key, F_WRLCK, &h, &rec) == 0);
+               /* Should have created correct hash. */
+               ok1(h.h == tdb_hash(tdb, key.dptr, key.dsize));
+               /* Should have located space in group 0, bucket 0. */
+               ok1(h.group_start == subhash + sizeof(struct tdb_used_record));
+               ok1(h.home_bucket == 0);
+               ok1(h.found_bucket == 0);
+               ok1(h.hash_used == TDB_TOPLEVEL_HASH_BITS
+                   + TDB_SUBLEVEL_HASH_BITS * 2);
+
+               /* We should be able to add it now. */
+               /* Allocate a new record. */
+               new_off = alloc(tdb, key.dsize, dbuf.dsize, h.h, false);
+               ok1(new_off != TDB_OFF_ERR);
+               ok1(add_to_hash(tdb, &h, new_off) == 0);
+
+               /* Make sure we fill it in for later finding. */
+               off = new_off + sizeof(struct tdb_used_record);
+               ok1(!tdb->methods->write(tdb, off, key.dptr, key.dsize));
+               off += key.dsize;
+               ok1(!tdb->methods->write(tdb, off, dbuf.dptr, dbuf.dsize));
+
+               /* We should be able to unlock that OK. */
+               ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
+                                     F_WRLCK) == 0);
+
+               /* Database should be consistent. */
+               ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+               /* Should be able to find it. */
+               v = 0;
+               ok1(find_and_lock(tdb, key, F_WRLCK, &h, &rec) == new_off);
+               /* Should have created correct hash. */
+               ok1(h.h == tdb_hash(tdb, key.dptr, key.dsize));
+               /* Should have located space in expanded group 0, bucket 0. */
+               ok1(h.group_start == subhash + sizeof(struct tdb_used_record));
+               ok1(h.home_bucket == 0);
+               ok1(h.found_bucket == 0);
+               ok1(h.hash_used == TDB_TOPLEVEL_HASH_BITS
+                   + TDB_SUBLEVEL_HASH_BITS * 2);
+
+               tdb_close(tdb);
+       }
+
+       ok1(tap_log_messages == 0);
+       return exit_status();
+}
diff --git a/ccan/tdb2/test/run-10-simple-store.c b/ccan/tdb2/test/run-10-simple-store.c
new file mode 100644 (file)
index 0000000..35398fb
--- /dev/null
@@ -0,0 +1,42 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+int main(int argc, char *argv[])
+{
+       unsigned int i;
+       struct tdb_context *tdb;
+       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
+                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT, 
+                       TDB_NOMMAP|TDB_CONVERT };
+       struct tdb_data key = { (unsigned char *)"key", 3 };
+       struct tdb_data data = { (unsigned char *)"data", 4 };
+
+       plan_tests(sizeof(flags) / sizeof(flags[0]) * 9 + 1);
+       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+               tdb = tdb_open("run-10-simple-store.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
+               ok1(tdb);
+               if (tdb) {
+                       /* Modify should fail. */
+                       ok1(tdb_store(tdb, key, data, TDB_MODIFY) == -1);
+                       ok1(tdb_error(tdb) == TDB_ERR_NOEXIST);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       /* Insert should succeed. */
+                       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       /* Second insert should fail. */
+                       ok1(tdb_store(tdb, key, data, TDB_INSERT) == -1);
+                       ok1(tdb_error(tdb) == TDB_ERR_EXISTS);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       tdb_close(tdb);
+               }
+       }
+       ok1(tap_log_messages == 0);
+       return exit_status();
+}
diff --git a/ccan/tdb2/test/run-11-simple-fetch.c b/ccan/tdb2/test/run-11-simple-fetch.c
new file mode 100644 (file)
index 0000000..23395d2
--- /dev/null
@@ -0,0 +1,45 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+int main(int argc, char *argv[])
+{
+       unsigned int i;
+       struct tdb_context *tdb;
+       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
+                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT, 
+                       TDB_NOMMAP|TDB_CONVERT };
+       struct tdb_data key = { (unsigned char *)"key", 3 };
+       struct tdb_data data = { (unsigned char *)"data", 4 };
+
+       plan_tests(sizeof(flags) / sizeof(flags[0]) * 8 + 1);
+       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+               tdb = tdb_open("run-11-simple-fetch.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
+               ok1(tdb);
+               if (tdb) {
+                       struct tdb_data d;
+
+                       /* fetch should fail. */
+                       d = tdb_fetch(tdb, key);
+                       ok1(d.dptr == NULL);
+                       ok1(tdb_error(tdb) == TDB_ERR_NOEXIST);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       /* Insert should succeed. */
+                       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       /* Fetch should now work. */
+                       d = tdb_fetch(tdb, key);
+                       ok1(data_equal(d, data));
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       tdb_close(tdb);
+               }
+       }
+       ok1(tap_log_messages == 0);
+       return exit_status();
+}
diff --git a/ccan/tdb2/test/run-12-store.c b/ccan/tdb2/test/run-12-store.c
new file mode 100644 (file)
index 0000000..fd53a92
--- /dev/null
@@ -0,0 +1,62 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+/* We use the same seed which we saw a failure on. */
+static uint64_t fixedhash(const void *key, size_t len, uint64_t seed, void *p)
+{
+       return hash64_stable((const unsigned char *)key, len,
+                            *(uint64_t *)p);
+}
+
+static bool equal(struct tdb_data a, struct tdb_data b)
+{
+       if (a.dsize != b.dsize)
+               return false;
+       return memcmp(a.dptr, b.dptr, a.dsize) == 0;
+}
+
+int main(int argc, char *argv[])
+{
+       unsigned int i, j;
+       struct tdb_context *tdb;
+       uint64_t seed = 16014841315512641303ULL;
+       union tdb_attribute fixed_hattr
+               = { .hash = { .base = { TDB_ATTRIBUTE_HASH },
+                             .hash_fn = fixedhash,
+                             .hash_private = &seed } };
+       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
+                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT,
+                       TDB_NOMMAP|TDB_CONVERT };
+       struct tdb_data key = { (unsigned char *)&j, sizeof(j) };
+       struct tdb_data data = { (unsigned char *)&j, sizeof(j) };
+
+       fixed_hattr.base.next = &tap_log_attr;
+
+       plan_tests(sizeof(flags) / sizeof(flags[0]) * (1 + 500 * 2) + 1);
+       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+               tdb = tdb_open("run-12-store.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &fixed_hattr);
+               ok1(tdb);
+               if (!tdb)
+                       continue;
+
+               /* We seemed to lose some keys.
+                * Insert and check they're in there! */
+               for (j = 0; j < 500; j++) {
+                       ok1(tdb_store(tdb, key, data, TDB_REPLACE) == 0);
+                       ok1(equal(tdb_fetch(tdb, key), data));
+               }
+               tdb_close(tdb);
+       }
+
+       ok1(tap_log_messages == 0);
+       return exit_status();
+}
+
+
diff --git a/ccan/tdb2/test/run-13-delete.c b/ccan/tdb2/test/run-13-delete.c
new file mode 100644 (file)
index 0000000..2f40d55
--- /dev/null
@@ -0,0 +1,187 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+/* We rig the hash so adjacent-numbered records always clash. */
+static uint64_t clash(const void *key, size_t len, uint64_t seed, void *priv)
+{
+       return ((uint64_t)*(unsigned int *)key)
+               << (64 - TDB_TOPLEVEL_HASH_BITS - 1);
+}
+
+/* We use the same seed which we saw a failure on. */
+static uint64_t fixedhash(const void *key, size_t len, uint64_t seed, void *p)
+{
+       return hash64_stable((const unsigned char *)key, len,
+                            *(uint64_t *)p);
+}
+
+static bool store_records(struct tdb_context *tdb)
+{
+       int i;
+       struct tdb_data key = { (unsigned char *)&i, sizeof(i) };
+       struct tdb_data data = { (unsigned char *)&i, sizeof(i) };
+
+       for (i = 0; i < 1000; i++) {
+               if (tdb_store(tdb, key, data, TDB_REPLACE) != 0)
+                       return false;
+               if (tdb_fetch(tdb, key).dsize != data.dsize)
+                       return false;
+       }
+       return true;
+}
+
+static void test_val(struct tdb_context *tdb, uint64_t val)
+{
+       uint64_t v;
+       struct tdb_data key = { (unsigned char *)&v, sizeof(v) };
+       struct tdb_data data = { (unsigned char *)&v, sizeof(v) };
+
+       /* Insert an entry, then delete it. */
+       v = val;
+       /* Delete should fail. */
+       ok1(tdb_delete(tdb, key) == -1);
+       ok1(tdb_error(tdb) == TDB_ERR_NOEXIST);
+       ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+       /* Insert should succeed. */
+       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+       ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+       /* Delete should succeed. */
+       ok1(tdb_delete(tdb, key) == 0);
+       ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+       /* Re-add it, then add collision. */
+       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+       v = val + 1;
+       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+       ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+       /* Can find both? */
+       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
+       v = val;
+       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
+
+       /* Delete second one. */
+       v = val + 1;
+       ok1(tdb_delete(tdb, key) == 0);
+       ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+       /* Re-add */
+       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+       ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+       /* Now, try deleting first one. */
+       v = val;
+       ok1(tdb_delete(tdb, key) == 0);
+       ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+       /* Can still find second? */
+       v = val + 1;
+       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
+
+       /* Now, this will be ideally placed. */
+       v = val + 2;
+       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+       ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+       /* This will collide with both. */
+       v = val;
+       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+
+       /* We can still find them all, right? */
+       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
+       v = val + 1;
+       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
+       v = val + 2;
+       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
+
+       /* And if we delete val + 1, that val + 2 should not move! */
+       v = val + 1;
+       ok1(tdb_delete(tdb, key) == 0);
+       ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+       v = val;
+       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
+       v = val + 2;
+       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
+
+       /* Delete those two, so we are empty. */
+       ok1(tdb_delete(tdb, key) == 0);
+       v = val;
+       ok1(tdb_delete(tdb, key) == 0);
+
+       ok1(tdb_check(tdb, NULL, NULL) == 0);
+}
+
+int main(int argc, char *argv[])
+{
+       unsigned int i, j;
+       struct tdb_context *tdb;
+       uint64_t seed = 16014841315512641303ULL;
+       union tdb_attribute clash_hattr
+               = { .hash = { .base = { TDB_ATTRIBUTE_HASH },
+                             .hash_fn = clash } };
+       union tdb_attribute fixed_hattr
+               = { .hash = { .base = { TDB_ATTRIBUTE_HASH },
+                             .hash_fn = fixedhash,
+                             .hash_private = &seed } };
+       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
+                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT,
+                       TDB_NOMMAP|TDB_CONVERT };
+       /* These two values gave trouble before. */
+       int vals[] = { 755, 837 };
+
+       clash_hattr.base.next = &tap_log_attr;
+       fixed_hattr.base.next = &tap_log_attr;
+
+       plan_tests(sizeof(flags) / sizeof(flags[0])
+                  * (32 * 3 + 5 + sizeof(vals)/sizeof(vals[0])*2) + 1);
+       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+               tdb = tdb_open("run-13-delete.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &clash_hattr);
+               ok1(tdb);
+               if (!tdb)
+                       continue;
+
+               /* Check start of hash table. */
+               test_val(tdb, 0);
+
+               /* Check end of hash table. */
+               test_val(tdb, -1ULL);
+
+               /* Check mixed bitpattern. */
+               test_val(tdb, 0x123456789ABCDEF0ULL);
+
+               ok1(!tdb_has_locks(tdb));
+               tdb_close(tdb);
+
+               /* Deleting these entries in the db gave problems. */
+               tdb = tdb_open("run-13-delete.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &fixed_hattr);
+               ok1(tdb);
+               if (!tdb)
+                       continue;
+
+               ok1(store_records(tdb));
+               ok1(tdb_check(tdb, NULL, NULL) == 0);
+               for (j = 0; j < sizeof(vals)/sizeof(vals[0]); j++) {
+                       struct tdb_data key;
+
+                       key.dptr = (unsigned char *)&vals[j];
+                       key.dsize = sizeof(vals[j]);
+                       ok1(tdb_delete(tdb, key) == 0);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+               }
+               tdb_close(tdb);
+       }
+
+       ok1(tap_log_messages == 0);
+       return exit_status();
+}
diff --git a/ccan/tdb2/test/run-15-append.c b/ccan/tdb2/test/run-15-append.c
new file mode 100644 (file)
index 0000000..3f1fb5b
--- /dev/null
@@ -0,0 +1,100 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+#define MAX_SIZE 13100
+#define SIZE_STEP 131
+
+int main(int argc, char *argv[])
+{
+       unsigned int i, j;
+       struct tdb_context *tdb;
+       unsigned char *buffer;
+       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
+                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT, 
+                       TDB_NOMMAP|TDB_CONVERT };
+       struct tdb_data key = { (unsigned char *)"key", 3 };
+       struct tdb_data data;
+
+       buffer = malloc(MAX_SIZE);
+       for (i = 0; i < MAX_SIZE; i++)
+               buffer[i] = i;
+
+       plan_tests(sizeof(flags) / sizeof(flags[0])
+                  * ((2 + MAX_SIZE/SIZE_STEP * 4) * 2 + 6)
+                  + 1);
+
+       /* Using tdb_store. */
+       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+               tdb = tdb_open("run-append.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
+               ok1(tdb);
+               if (!tdb)
+                       continue;
+
+               for (j = 0; j < MAX_SIZE; j += SIZE_STEP) {
+                       data.dptr = buffer;
+                       data.dsize = j;
+                       ok1(tdb_store(tdb, key, data, TDB_REPLACE) == 0);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       data = tdb_fetch(tdb, key);
+                       ok1(data.dsize == j);
+                       ok1(memcmp(data.dptr, buffer, data.dsize) == 0);
+                       free(data.dptr);
+               }
+               ok1(!tdb_has_locks(tdb));
+               tdb_close(tdb);
+       }
+
+       /* Using tdb_append. */
+       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+               size_t prev_len = 0;
+               tdb = tdb_open("run-append.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
+               ok1(tdb);
+               if (!tdb)
+                       continue;
+
+               for (j = 0; j < MAX_SIZE; j += SIZE_STEP) {
+                       data.dptr = buffer + prev_len;
+                       data.dsize = j - prev_len;
+                       ok1(tdb_append(tdb, key, data) == 0);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       data = tdb_fetch(tdb, key);
+                       ok1(data.dsize == j);
+                       ok1(memcmp(data.dptr, buffer, data.dsize) == 0);
+                       free(data.dptr);
+                       prev_len = data.dsize;
+               }
+               ok1(!tdb_has_locks(tdb));
+               tdb_close(tdb);
+       }
+
+       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+               tdb = tdb_open("run-append.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
+               ok1(tdb);
+               if (!tdb)
+                       continue;
+
+               /* Huge initial store. */
+               data.dptr = buffer;
+               data.dsize = MAX_SIZE;
+               ok1(tdb_append(tdb, key, data) == 0);
+               ok1(tdb_check(tdb, NULL, NULL) == 0);
+               data = tdb_fetch(tdb, key);
+               ok1(data.dsize == MAX_SIZE);
+               ok1(memcmp(data.dptr, buffer, data.dsize) == 0);
+               free(data.dptr);
+               ok1(!tdb_has_locks(tdb));
+               tdb_close(tdb);
+       }
+
+       ok1(tap_log_messages == 0);
+       return exit_status();
+}
diff --git a/ccan/tdb2/test/run-20-growhash.c b/ccan/tdb2/test/run-20-growhash.c
new file mode 100644 (file)
index 0000000..6bcfd7d
--- /dev/null
@@ -0,0 +1,142 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+static uint64_t myhash(const void *key, size_t len, uint64_t seed, void *priv)
+{
+       return *(uint64_t *)key;
+}
+
+static void add_bits(uint64_t *val, unsigned new, unsigned new_bits,
+                    unsigned *done)
+{
+       *done += new_bits;
+       *val |= ((uint64_t)new << (64 - *done));
+}
+
+static uint64_t make_key(unsigned topgroup, unsigned topbucket,
+                        unsigned subgroup1, unsigned subbucket1,
+                        unsigned subgroup2, unsigned subbucket2)
+{
+       uint64_t key = 0;
+       unsigned done = 0;
+
+       add_bits(&key, topgroup, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS,
+                &done);
+       add_bits(&key, topbucket, TDB_HASH_GROUP_BITS, &done);
+       add_bits(&key, subgroup1, TDB_SUBLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS,
+                &done);
+       add_bits(&key, subbucket1, TDB_HASH_GROUP_BITS, &done);
+       add_bits(&key, subgroup2, TDB_SUBLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS,
+                &done);
+       add_bits(&key, subbucket2, TDB_HASH_GROUP_BITS, &done);
+       return key;
+}
+
+int main(int argc, char *argv[])
+{
+       unsigned int i, j;
+       struct tdb_context *tdb;
+       uint64_t kdata;
+       struct tdb_used_record rec;
+       struct tdb_data key = { (unsigned char *)&kdata, sizeof(kdata) };
+       struct tdb_data dbuf = { (unsigned char *)&kdata, sizeof(kdata) };
+       union tdb_attribute hattr = { .hash = { .base = { TDB_ATTRIBUTE_HASH },
+                                               .hash_fn = myhash } };
+       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
+                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT,
+                       TDB_NOMMAP|TDB_CONVERT,
+       };
+
+       hattr.base.next = &tap_log_attr;
+
+       plan_tests(sizeof(flags) / sizeof(flags[0])
+                  * (9 + (20 + 2 * ((1 << TDB_HASH_GROUP_BITS) - 2))
+                     * (1 << TDB_HASH_GROUP_BITS)) + 1);
+       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+               struct hash_info h;
+
+               tdb = tdb_open("run-04-basichash.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &hattr);
+               ok1(tdb);
+               if (!tdb)
+                       continue;
+
+               /* Fill a group. */
+               for (j = 0; j < (1 << TDB_HASH_GROUP_BITS); j++) {
+                       kdata = make_key(0, j, 0, 0, 0, 0);
+                       ok1(tdb_store(tdb, key, dbuf, TDB_INSERT) == 0);
+               }
+               ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+               /* Check first still exists. */
+               kdata = make_key(0, 0, 0, 0, 0, 0);
+               ok1(find_and_lock(tdb, key, F_RDLCK, &h, &rec) != 0);
+               /* Should have created correct hash. */
+               ok1(h.h == tdb_hash(tdb, key.dptr, key.dsize));
+               /* Should have located space in group 0, bucket 0. */
+               ok1(h.group_start == offsetof(struct tdb_header, hashtable));
+               ok1(h.home_bucket == 0);
+               ok1(h.found_bucket == 0);
+               ok1(h.hash_used == TDB_TOPLEVEL_HASH_BITS);
+               /* Entire group should be full! */
+               for (j = 0; j < (1 << TDB_HASH_GROUP_BITS); j++)
+                       ok1(h.group[j] != 0);
+
+               ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
+                                     F_RDLCK) == 0);
+
+               /* Now, add one more to each should expand (that) bucket. */
+               for (j = 0; j < (1 << TDB_HASH_GROUP_BITS); j++) {
+                       unsigned int k;
+                       kdata = make_key(0, j, 0, 1, 0, 0);
+                       ok1(tdb_store(tdb, key, dbuf, TDB_INSERT) == 0);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+                       ok1(find_and_lock(tdb, key, F_RDLCK, &h, &rec) != 0);
+                       /* Should have created correct hash. */
+                       ok1(h.h == tdb_hash(tdb, key.dptr, key.dsize));
+                       /* Should have moved to subhash */
+                       ok1(h.group_start >= sizeof(struct tdb_header));
+                       ok1(h.home_bucket == 1);
+                       ok1(h.found_bucket == 1);
+                       ok1(h.hash_used == TDB_TOPLEVEL_HASH_BITS
+                           + TDB_SUBLEVEL_HASH_BITS);
+                       ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
+                                             F_RDLCK) == 0);
+
+                       /* Keep adding, make it expand again. */
+                       for (k = 2; k < (1 << TDB_HASH_GROUP_BITS); k++) {
+                               kdata = make_key(0, j, 0, k, 0, 0);
+                               ok1(tdb_store(tdb, key, dbuf, TDB_INSERT) == 0);
+                               ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       }
+
+                       /* This should tip it over to sub-sub-hash. */
+                       kdata = make_key(0, j, 0, 0, 0, 1);
+                       ok1(tdb_store(tdb, key, dbuf, TDB_INSERT) == 0);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+                       ok1(find_and_lock(tdb, key, F_RDLCK, &h, &rec) != 0);
+                       /* Should have created correct hash. */
+                       ok1(h.h == tdb_hash(tdb, key.dptr, key.dsize));
+                       /* Should have moved to subhash */
+                       ok1(h.group_start >= sizeof(struct tdb_header));
+                       ok1(h.home_bucket == 1);
+                       ok1(h.found_bucket == 1);
+                       ok1(h.hash_used == TDB_TOPLEVEL_HASH_BITS
+                           + TDB_SUBLEVEL_HASH_BITS + TDB_SUBLEVEL_HASH_BITS);
+                       ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
+                                             F_RDLCK) == 0);
+               }
+               tdb_close(tdb);
+       }
+
+       ok1(tap_log_messages == 0);
+       return exit_status();
+}
diff --git a/ccan/tdb2/test/run-append.c b/ccan/tdb2/test/run-append.c
deleted file mode 100644 (file)
index 269fc6b..0000000
+++ /dev/null
@@ -1,99 +0,0 @@
-#include <ccan/tdb2/tdb.c>
-#include <ccan/tdb2/free.c>
-#include <ccan/tdb2/lock.c>
-#include <ccan/tdb2/io.c>
-#include <ccan/tdb2/check.c>
-#include <ccan/tap/tap.h>
-#include "logging.h"
-
-#define MAX_SIZE 13100
-#define SIZE_STEP 131
-
-int main(int argc, char *argv[])
-{
-       unsigned int i, j;
-       struct tdb_context *tdb;
-       unsigned char *buffer;
-       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
-                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT, 
-                       TDB_NOMMAP|TDB_CONVERT };
-       struct tdb_data key = { (unsigned char *)"key", 3 };
-       struct tdb_data data;
-
-       buffer = malloc(MAX_SIZE);
-       for (i = 0; i < MAX_SIZE; i++)
-               buffer[i] = i;
-
-       plan_tests(sizeof(flags) / sizeof(flags[0])
-                  * ((2 + MAX_SIZE/SIZE_STEP * 4) * 2 + 6)
-                  + 1);
-
-       /* Using tdb_store. */
-       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-               tdb = tdb_open("run-append.tdb", flags[i],
-                              O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
-               ok1(tdb);
-               if (!tdb)
-                       continue;
-
-               for (j = 0; j < MAX_SIZE; j += SIZE_STEP) {
-                       data.dptr = buffer;
-                       data.dsize = j;
-                       ok1(tdb_store(tdb, key, data, TDB_REPLACE) == 0);
-                       ok1(tdb_check(tdb, NULL, NULL) == 0);
-                       data = tdb_fetch(tdb, key);
-                       ok1(data.dsize == j);
-                       ok1(memcmp(data.dptr, buffer, data.dsize) == 0);
-                       free(data.dptr);
-               }
-               ok1(!tdb_has_locks(tdb));
-               tdb_close(tdb);
-       }
-
-       /* Using tdb_append. */
-       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-               size_t prev_len = 0;
-               tdb = tdb_open("run-append.tdb", flags[i],
-                              O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
-               ok1(tdb);
-               if (!tdb)
-                       continue;
-
-               for (j = 0; j < MAX_SIZE; j += SIZE_STEP) {
-                       data.dptr = buffer + prev_len;
-                       data.dsize = j - prev_len;
-                       ok1(tdb_append(tdb, key, data) == 0);
-                       ok1(tdb_check(tdb, NULL, NULL) == 0);
-                       data = tdb_fetch(tdb, key);
-                       ok1(data.dsize == j);
-                       ok1(memcmp(data.dptr, buffer, data.dsize) == 0);
-                       free(data.dptr);
-                       prev_len = data.dsize;
-               }
-               ok1(!tdb_has_locks(tdb));
-               tdb_close(tdb);
-       }
-
-       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-               tdb = tdb_open("run-append.tdb", flags[i],
-                              O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
-               ok1(tdb);
-               if (!tdb)
-                       continue;
-
-               /* Huge initial store. */
-               data.dptr = buffer;
-               data.dsize = MAX_SIZE;
-               ok1(tdb_append(tdb, key, data) == 0);
-               ok1(tdb_check(tdb, NULL, NULL) == 0);
-               data = tdb_fetch(tdb, key);
-               ok1(data.dsize == MAX_SIZE);
-               ok1(memcmp(data.dptr, buffer, data.dsize) == 0);
-               free(data.dptr);
-               ok1(!tdb_has_locks(tdb));
-               tdb_close(tdb);
-       }
-
-       ok1(tap_log_messages == 0);
-       return exit_status();
-}
diff --git a/ccan/tdb2/test/run-delete.c b/ccan/tdb2/test/run-delete.c
deleted file mode 100644 (file)
index cf5e47b..0000000
+++ /dev/null
@@ -1,131 +0,0 @@
-#include <ccan/tdb2/tdb.c>
-#include <ccan/tdb2/free.c>
-#include <ccan/tdb2/lock.c>
-#include <ccan/tdb2/io.c>
-#include <ccan/tdb2/check.c>
-#include <ccan/tap/tap.h>
-#include "logging.h"
-
-/* We rig the hash so adjacent-numbered records always clash. */
-static uint64_t clash(const void *key, size_t len, uint64_t seed, void *priv)
-{
-       return *(unsigned int *)key / 2;
-}
-
-static void test_val(struct tdb_context *tdb, unsigned int val)
-{
-       unsigned int v;
-       struct tdb_data key = { (unsigned char *)&v, sizeof(v) };
-       struct tdb_data data = { (unsigned char *)&v, sizeof(v) };
-
-       /* Insert an entry, then delete it. */
-       v = val;
-       /* Delete should fail. */
-       ok1(tdb_delete(tdb, key) == -1);
-       ok1(tdb_error(tdb) == TDB_ERR_NOEXIST);
-       ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-       /* Insert should succeed. */
-       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-       ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-       /* Delete should succeed. */
-       ok1(tdb_delete(tdb, key) == 0);
-       ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-       /* Re-add it, then add collision. */
-       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-       v = val + 1;
-       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-       ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-       /* Can find both? */
-       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
-       v = val;
-       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
-
-       /* Delete second one. */
-       v = val + 1;
-       ok1(tdb_delete(tdb, key) == 0);
-       ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-       /* Re-add */
-       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-       ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-       /* Now, try deleting first one. */
-       v = val;
-       ok1(tdb_delete(tdb, key) == 0);
-       ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-       /* Can still find second? */
-       v = val + 1;
-       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
-
-       /* Now, this will be ideally placed. */
-       v = val + 2;
-       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-       ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-       /* This will collide with both. */
-       v = val;
-       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-
-       /* We can still find them all, right? */
-       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
-       v = val + 1;
-       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
-       v = val + 2;
-       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
-
-       /* And if we delete val + 1, that val + 2 should not move! */
-       v = val + 1;
-       ok1(tdb_delete(tdb, key) == 0);
-       ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-       v = val;
-       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
-       v = val + 2;
-       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
-
-       /* Delete those two, so we are empty. */
-       ok1(tdb_delete(tdb, key) == 0);
-       v = val;
-       ok1(tdb_delete(tdb, key) == 0);
-
-       ok1(tdb_check(tdb, NULL, NULL) == 0);
-}
-
-int main(int argc, char *argv[])
-{
-       unsigned int i;
-       struct tdb_context *tdb;
-       union tdb_attribute hattr = { .hash = { .base = { TDB_ATTRIBUTE_HASH },
-                                               .hash_fn = clash } };
-       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
-                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT,
-                       TDB_NOMMAP|TDB_CONVERT };
-
-       hattr.base.next = &tap_log_attr;
-
-       plan_tests(sizeof(flags) / sizeof(flags[0]) * 66 + 1);
-       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-               tdb = tdb_open("run-delete.tdb", flags[i],
-                              O_RDWR|O_CREAT|O_TRUNC, 0600, &hattr);
-               ok1(tdb);
-               if (!tdb)
-                       continue;
-
-               /* Check start of hash table. */
-               test_val(tdb, 0);
-
-               /* Check end of hash table (will wrap around!). */
-               test_val(tdb, ((1 << tdb->header.v.hash_bits) - 1) * 2);
-
-               ok1(!tdb_has_locks(tdb));
-               tdb_close(tdb);
-       }
-
-       ok1(tap_log_messages == 0);
-       return exit_status();
-}
diff --git a/ccan/tdb2/test/run-enlarge_hash.c b/ccan/tdb2/test/run-enlarge_hash.c
deleted file mode 100644 (file)
index 2829908..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-#include <ccan/tdb2/tdb.c>
-#include <ccan/tdb2/free.c>
-#include <ccan/tdb2/lock.c>
-#include <ccan/tdb2/io.c>
-#include <ccan/tdb2/check.c>
-#include <ccan/tap/tap.h>
-#include "logging.h"
-
-/* We rig the hash so adjacent-numbered records always clash. */
-static uint64_t clash(const void *key, size_t len, uint64_t seed, void *priv)
-{
-       return *(unsigned int *)key / 2;
-}
-
-int main(int argc, char *argv[])
-{
-       unsigned int i;
-       struct tdb_context *tdb;
-       unsigned int v;
-       struct tdb_data key = { (unsigned char *)&v, sizeof(v) };
-       struct tdb_data data = { (unsigned char *)&v, sizeof(v) };
-       union tdb_attribute hattr = { .hash = { .base = { TDB_ATTRIBUTE_HASH },
-                                               .hash_fn = clash } };
-       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
-                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT, 
-                       TDB_NOMMAP|TDB_CONVERT };
-
-       hattr.base.next = &tap_log_attr;
-
-       plan_tests(sizeof(flags) / sizeof(flags[0]) * 11 + 1);
-       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-               tdb = tdb_open("run-enlarge-hash.tdb", flags[i],
-                              O_RDWR|O_CREAT|O_TRUNC, 0600, &hattr);
-               ok1(tdb);
-               if (!tdb)
-                       continue;
-
-               /* Put a single entry in. */
-               v = 0;
-               ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-               enlarge_hash(tdb);
-               ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-               /* Put a non-clashing entry in. */
-               v = 2;
-               ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-               enlarge_hash(tdb);
-               ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-               /* Now, make a clash. */
-               v = 1;
-               ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-               enlarge_hash(tdb);
-               ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-               /* Clash at end. */
-               v = ((1 << tdb->header.v.hash_bits) - 1) * 2;
-               ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-               v++;
-               ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-               ok1(tdb_check(tdb, NULL, NULL) == 0);
-               enlarge_hash(tdb);
-               ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-               tdb_close(tdb);
-       }
-       ok1(tap_log_messages == 0);
-       return exit_status();
-}
diff --git a/ccan/tdb2/test/run-hashclash.c b/ccan/tdb2/test/run-hashclash.c
deleted file mode 100644 (file)
index fd78b4e..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-#include <ccan/tdb2/tdb.c>
-#include <ccan/tdb2/free.c>
-#include <ccan/tdb2/lock.c>
-#include <ccan/tdb2/io.c>
-#include <ccan/tdb2/check.c>
-#include <ccan/tap/tap.h>
-#include "logging.h"
-
-/* We rig the hash so adjacent-numbered records always clash. */
-static uint64_t clash(const void *key, size_t len, uint64_t seed, void *priv)
-{
-       return *(unsigned int *)key / 2;
-}
-
-static void test_val(struct tdb_context *tdb, unsigned int val)
-{
-       unsigned int v;
-       struct tdb_data key = { (unsigned char *)&v, sizeof(v) };
-       struct tdb_data data = { (unsigned char *)&v, sizeof(v) };
-
-       /* Insert two entries, with the same hash. */
-       v = val;
-       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-       v = val + 1;
-       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-       ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-       /* Can find both? */
-       v = val;
-       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
-       v = val + 1;
-       ok1(tdb_fetch(tdb, key).dsize == data.dsize);
-}
-
-int main(int argc, char *argv[])
-{
-       unsigned int i;
-       struct tdb_context *tdb;
-       union tdb_attribute hattr = { .hash = { .base = { TDB_ATTRIBUTE_HASH },
-                                               .hash_fn = clash } };
-       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
-                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT,
-                       TDB_NOMMAP|TDB_CONVERT,
-       };
-
-       hattr.base.next = &tap_log_attr;
-
-       plan_tests(sizeof(flags) / sizeof(flags[0]) * 14 + 1);
-       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-               tdb = tdb_open("run-hashclash.tdb", flags[i],
-                              O_RDWR|O_CREAT|O_TRUNC, 0600, &hattr);
-               ok1(tdb);
-               if (!tdb)
-                       continue;
-
-               /* Check start of hash table. */
-               test_val(tdb, 0);
-
-               ok1(!tdb_has_locks(tdb));
-               tdb_close(tdb);
-
-               tdb = tdb_open("run-hashclash.tdb", flags[i],
-                              O_RDWR|O_CREAT|O_TRUNC, 0600, &hattr);
-               ok1(tdb);
-               if (!tdb)
-                       continue;
-
-               /* Check end of hash table (will wrap around!). */
-               test_val(tdb, ((1 << tdb->header.v.hash_bits) - 1) * 2);
-
-               ok1(!tdb_has_locks(tdb));
-               tdb_close(tdb);
-       }
-
-       ok1(tap_log_messages == 0);
-       return exit_status();
-}
index 2811b7556e7f2b0983c9b5d04e38a79e164036e9..bd5127dda2565d3a792540dec8fd841a2a411eb2 100644 (file)
@@ -3,6 +3,7 @@
 #include <ccan/tdb2/free.c>
 #include <ccan/tdb2/lock.c>
 #include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
 #include <ccan/tdb2/check.c>
 #include <ccan/tdb2/traverse.c>
 #include <ccan/tap/tap.h>
index 109a2ed3ddc6343178298e5c27192ebe5a7cba94..809b8d7730b20beec5b3f0970f395adaef936c42 100644 (file)
@@ -2,6 +2,7 @@
 #include <ccan/tdb2/free.c>
 #include <ccan/tdb2/lock.c>
 #include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
 #include <ccan/tdb2/check.c>
 #include <ccan/tap/tap.h>
 #include "logging.h"
index 6cdcb263b8a89c6239d1c4d0f47410cf31ff906a..d00bcbb90c59d0e72b31b73c976c00a5f343e1ee 100644 (file)
@@ -2,6 +2,7 @@
 #include <ccan/tdb2/free.c>
 #include <ccan/tdb2/lock.c>
 #include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
 #include <ccan/tdb2/check.c>
 #include <ccan/tap/tap.h>
 #include "logging.h"
diff --git a/ccan/tdb2/test/run-simple-fetch.c b/ccan/tdb2/test/run-simple-fetch.c
deleted file mode 100644 (file)
index acb0196..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-#include <ccan/tdb2/tdb.c>
-#include <ccan/tdb2/free.c>
-#include <ccan/tdb2/lock.c>
-#include <ccan/tdb2/io.c>
-#include <ccan/tdb2/check.c>
-#include <ccan/tap/tap.h>
-#include "logging.h"
-
-int main(int argc, char *argv[])
-{
-       unsigned int i;
-       struct tdb_context *tdb;
-       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
-                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT, 
-                       TDB_NOMMAP|TDB_CONVERT };
-       struct tdb_data key = { (unsigned char *)"key", 3 };
-       struct tdb_data data = { (unsigned char *)"data", 4 };
-
-       plan_tests(sizeof(flags) / sizeof(flags[0]) * 8 + 1);
-       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-               tdb = tdb_open("run-simple-fetch.tdb", flags[i],
-                              O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
-               ok1(tdb);
-               if (tdb) {
-                       struct tdb_data d;
-
-                       /* fetch should fail. */
-                       d = tdb_fetch(tdb, key);
-                       ok1(d.dptr == NULL);
-                       ok1(tdb_error(tdb) == TDB_ERR_NOEXIST);
-                       ok1(tdb_check(tdb, NULL, NULL) == 0);
-                       /* Insert should succeed. */
-                       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-                       ok1(tdb_check(tdb, NULL, NULL) == 0);
-                       /* Fetch should now work. */
-                       d = tdb_fetch(tdb, key);
-                       ok1(data_equal(d, data));
-                       ok1(tdb_check(tdb, NULL, NULL) == 0);
-                       tdb_close(tdb);
-               }
-       }
-       ok1(tap_log_messages == 0);
-       return exit_status();
-}
diff --git a/ccan/tdb2/test/run-simple-store.c b/ccan/tdb2/test/run-simple-store.c
deleted file mode 100644 (file)
index ba6f97a..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-#include <ccan/tdb2/tdb.c>
-#include <ccan/tdb2/free.c>
-#include <ccan/tdb2/lock.c>
-#include <ccan/tdb2/io.c>
-#include <ccan/tdb2/check.c>
-#include <ccan/tap/tap.h>
-#include "logging.h"
-
-int main(int argc, char *argv[])
-{
-       unsigned int i;
-       struct tdb_context *tdb;
-       int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
-                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT, 
-                       TDB_NOMMAP|TDB_CONVERT };
-       struct tdb_data key = { (unsigned char *)"key", 3 };
-       struct tdb_data data = { (unsigned char *)"data", 4 };
-
-       plan_tests(sizeof(flags) / sizeof(flags[0]) * 9 + 1);
-       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-               tdb = tdb_open("run-simple-store.tdb", flags[i],
-                              O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
-               ok1(tdb);
-               if (tdb) {
-                       /* Modify should fail. */
-                       ok1(tdb_store(tdb, key, data, TDB_MODIFY) == -1);
-                       ok1(tdb_error(tdb) == TDB_ERR_NOEXIST);
-                       ok1(tdb_check(tdb, NULL, NULL) == 0);
-                       /* Insert should succeed. */
-                       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-                       ok1(tdb_check(tdb, NULL, NULL) == 0);
-                       /* Second insert should fail. */
-                       ok1(tdb_store(tdb, key, data, TDB_INSERT) == -1);
-                       ok1(tdb_error(tdb) == TDB_ERR_EXISTS);
-                       ok1(tdb_check(tdb, NULL, NULL) == 0);
-                       tdb_close(tdb);
-               }
-       }
-       ok1(tap_log_messages == 0);
-       return exit_status();
-}
index 51d84a0a7afaa6a2554069e379e658995d53a77e..97694a842b2ccd1522d28a8f48fb44f36c1deb8b 100644 (file)
@@ -2,6 +2,7 @@
 #include <ccan/tdb2/free.c>
 #include <ccan/tdb2/lock.c>
 #include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
 #include <ccan/tdb2/check.c>
 #include <ccan/tdb2/traverse.c>
 #include <ccan/tap/tap.h>
@@ -54,6 +55,7 @@ static int trav(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, void *p)
                td->high = val;
 
        if (td->delete) {
+
                if (tdb_delete(tdb, key) != 0) {
                        td->delete_error = tdb_error(tdb);
                        return -1;
@@ -208,7 +210,7 @@ int main(int argc, char *argv[])
                ok1(td.low == td.high);
                ok1(tdb_check(tdb, NULL, NULL) == 0);
 
-               /* Deleting traverse. */
+               /* Deleting traverse (delete everything). */
                td.calls = 0;
                td.call_limit = UINT_MAX;
                td.low = INT_MAX;
@@ -222,7 +224,7 @@ int main(int argc, char *argv[])
                ok1(!td.mismatch);
                ok1(td.calls == NUM_RECORDS);
                ok1(td.low == 0);
-               ok1(td.high == NUM_RECORDS-1);
+               ok1(td.high == NUM_RECORDS - 1);
                ok1(tdb_check(tdb, NULL, NULL) == 0);
 
                /* Now it's empty! */
index d30c24cdde5cc837dd849a4eec2662dc2ef75dee..66319dd56764d823dfca54544cfd2ee0d22aea03 100644 (file)
 static int64_t traverse(struct tdb_context *tdb, int ltype,
                        tdb_traverse_func fn, void *p)
 {
-       uint64_t i, num, count = 0;
-       tdb_off_t off, prev_bucket;
-       struct tdb_used_record rec;
+       int ret;
+       struct traverse_info tinfo;
        struct tdb_data k, d;
-       bool finish = false;
+       int64_t count = 0;
 
-       /* FIXME: Do we need to start at 0? */
-       prev_bucket = tdb_lock_list(tdb, 0, ltype, TDB_LOCK_WAIT);
-       if (prev_bucket != 0)
-               return -1;
-
-       num = (1ULL << tdb->header.v.hash_bits);
-
-       for (i = tdb_find_nonzero_off(tdb, hash_off(tdb, 0), num);
-            i != num && !finish;
-            i += tdb_find_nonzero_off(tdb, hash_off(tdb, i), num - i)) {
-               if (tdb_lock_list(tdb, i, ltype, TDB_LOCK_WAIT) != i)
-                       goto fail;
-
-               off = tdb_read_off(tdb, hash_off(tdb, i));
-               if (off == TDB_OFF_ERR) {
-                       tdb_unlock_list(tdb, i, ltype);
-                       goto fail;
-               }
-
-               /* This race can happen, but look again. */
-               if (off == 0) {
-                       tdb_unlock_list(tdb, i, ltype);
-                       continue;
-               }
-
-               /* Drop previous lock. */
-               tdb_unlock_list(tdb, prev_bucket, ltype);
-               prev_bucket = i;
-
-               if (tdb_read_convert(tdb, off, &rec, sizeof(rec)) != 0)
-                       goto fail;
-
-               k.dsize = rec_key_length(&rec);
-               d.dsize = rec_data_length(&rec);
-               if (ltype == F_RDLCK) {
-                       /* Read traverses can keep the lock. */
-                       k.dptr = (void *)tdb_access_read(tdb,
-                                                        off + sizeof(rec),
-                                                        k.dsize + d.dsize,
-                                                        false);
-               } else {
-                       k.dptr = tdb_alloc_read(tdb, off + sizeof(rec),
-                                               k.dsize + d.dsize);
-               }
-               if (!k.dptr)
-                       goto fail;
+       for (ret = first_in_hash(tdb, ltype, &tinfo, &k, &d.dsize);
+            ret == 1;
+            ret = next_in_hash(tdb, ltype, &tinfo, &k, &d.dsize)) {
                d.dptr = k.dptr + k.dsize;
+               
                count++;
-
-               if (ltype == F_WRLCK) {
-                       /* Drop lock before calling out. */
-                       tdb_unlock_list(tdb, i, ltype);
-               }
-
                if (fn && fn(tdb, k, d, p))
-                       finish = true;
-
-               if (ltype == F_WRLCK) {
-                       free(k.dptr);
-                       /* Regain lock.  FIXME: Is this necessary? */
-                       if (tdb_lock_list(tdb, i, ltype, TDB_LOCK_WAIT) != i)
-                               return -1;
-
-                       /* This makes deleting under ourselves a bit nicer. */
-                       if (tdb_read_off(tdb, hash_off(tdb, i)) == off)
-                               i++;
-               } else {
-                       tdb_access_release(tdb, k.dptr);
-                       i++;
-               }
+                       break;
        }
 
-       /* Drop final lock. */
-       tdb_unlock_list(tdb, prev_bucket, ltype);
+       if (ret < 0)
+               return -1;
        return count;
-
-fail:
-       tdb_unlock_list(tdb, prev_bucket, ltype);
-       return -1;
 }
 
 int64_t tdb_traverse(struct tdb_context *tdb, tdb_traverse_func fn, void *p)