tdb2: now we can store a key!
authorRusty Russell <rusty@rustcorp.com.au>
Thu, 26 Aug 2010 10:27:56 +0000 (19:57 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Thu, 26 Aug 2010 10:27:56 +0000 (19:57 +0930)
ccan/tdb2/free.c
ccan/tdb2/lock.c
ccan/tdb2/private.h
ccan/tdb2/tdb.c
ccan/tdb2/test/run-encode.c
ccan/tdb2/test/run-simple-store.c [new file with mode: 0644]

index e486dcb7ddc2feaf825963a5c2fd30c06dd78808..134c1ff025bd1263e4cabdfa3147cbf442f1e6dd 100644 (file)
@@ -130,7 +130,7 @@ tdb_off_t zone_of(struct tdb_context *tdb, tdb_off_t off)
        return off >> tdb->header.v.zone_bits;
 }
 
-/* Returns fl->max_bucket + 1, or list number to search. */
+/* Returns free_buckets + 1, or list number to search. */
 static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket)
 {
        tdb_off_t first, off;
@@ -138,7 +138,7 @@ static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket)
        /* Speculatively search for a non-zero bucket. */
        first = tdb->last_zone * (tdb->header.v.free_buckets+1) + bucket;
        off = tdb_find_nonzero_off(tdb, free_list_off(tdb, first),
-                                  tdb->header.v.free_buckets - bucket);
+                                  tdb->header.v.free_buckets + 1 - bucket);
        return bucket + off;
 }
 
@@ -210,7 +210,7 @@ int add_free_record(struct tdb_context *tdb,
        tdb->last_zone = zone_of(tdb, off);
        list = tdb->last_zone * (tdb->header.v.free_buckets+1)
                + size_to_bucket(tdb, new.data_len);
-               
+
        if (tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) != 0)
                return -1;
 
@@ -319,7 +319,7 @@ static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
                                tdb_len_t *actual)
 {
        tdb_off_t list;
-       tdb_off_t off, prev, best_off;
+       tdb_off_t off, best_off;
        struct tdb_free_record pad, best = { 0 }, *r;
        double multiplier;
 
@@ -331,27 +331,21 @@ again:
                return TDB_OFF_ERR;
        }
 
-       prev = free_list_off(tdb, list);
-       off = tdb_read_off(tdb, prev);
-
-       if (unlikely(off == TDB_OFF_ERR))
-               goto unlock_err;
-
        best.data_len = -1ULL;
        best_off = 0;
        multiplier = 1.0;
 
        /* Walk the list to see if any are large enough, getting less fussy
         * as we go. */
-       while (off) {
-               prev = off;
-               off = tdb_read_off(tdb, prev);
-               if (unlikely(off == TDB_OFF_ERR))
-                       goto unlock_err;
+       off = tdb_read_off(tdb, free_list_off(tdb, list));
+       if (unlikely(off == TDB_OFF_ERR))
+               goto unlock_err;
 
+       while (off) {
                r = tdb_get(tdb, off, &pad, sizeof(*r));
                if (!r)
                        goto unlock_err;
+
                if (r->magic != TDB_FREE_MAGIC) {
                        tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
                                 "lock_and_alloc: %llu non-free 0x%llx\n",
@@ -364,18 +358,9 @@ again:
                        best = *r;
                }
 
-               if (best.data_len < size * multiplier && best_off) {
-                       /* We're happy with this size: take it. */
-                       if (remove_from_list(tdb, list, &best) != 0)
-                               goto unlock_err;
-                       tdb_unlock_free_list(tdb, list);
+               if (best.data_len < size * multiplier && best_off)
+                       goto use_best;
 
-                       if (to_used_record(tdb, best_off, size, best.data_len,
-                                          actual)) {
-                               return -1;
-                       }
-                       return best_off;
-               }
                multiplier *= 1.01;
 
                /* Since we're going slow anyway, try coalescing here. */
@@ -387,6 +372,22 @@ again:
                        /* This has unlocked list, restart. */
                        goto again;
                }
+               off = r->next;
+       }
+
+       /* If we found anything at all, use it. */
+       if (best_off) {
+       use_best:
+               /* We're happy with this size: take it. */
+               if (remove_from_list(tdb, list, &best) != 0)
+                       goto unlock_err;
+               tdb_unlock_free_list(tdb, list);
+
+               if (to_used_record(tdb, best_off, size, best.data_len,
+                                  actual)) {
+                       return -1;
+               }
+               return best_off;
        }
 
        tdb_unlock_free_list(tdb, list);
@@ -440,11 +441,11 @@ static tdb_off_t get_free(struct tdb_context *tdb, size_t size,
                tdb_off_t b;
 
                /* Start at exact size bucket, and search up... */
-               for (b = bucket; b <= tdb->header.v.num_zones; b++) {
+               for (b = bucket; b <= tdb->header.v.free_buckets; b++) {
                        b = find_free_head(tdb, b);
 
                        /* Non-empty list?  Try getting block. */
-                       if (b <= tdb->header.v.num_zones) {
+                       if (b <= tdb->header.v.free_buckets) {
                                /* Try getting one from list. */
                                off = lock_and_alloc(tdb, b, size, actual);
                                if (off == TDB_OFF_ERR)
@@ -698,7 +699,6 @@ int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
                }
        }
 
-
        /* Free up the old free buckets. */
        old_free_off -= sizeof(fhdr);
        if (tdb_read_convert(tdb, old_free_off, &fhdr, sizeof(fhdr)) == -1)
@@ -714,6 +714,8 @@ int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
        /* Start allocating from where the new space is. */
        tdb->last_zone = zone_of(tdb, tdb->map_size - add);
        tdb_access_release(tdb, oldf);
+       if (write_header(tdb) == -1)
+               goto fail;
 success:
        tdb_allrecord_unlock(tdb, F_WRLCK);
        return 0;
index 96815f2686bf562bdbc4874b208c39204977a352..3230b25e383d3b2a3ead9a1ac7ca156b4030f303 100644 (file)
@@ -502,7 +502,7 @@ again:
        tdb->allrecord_lock.off = upgradable;
 
        /* Now we re-check header, holding lock. */
-       if (unlikely(update_header(tdb))) {
+       if (unlikely(header_changed(tdb))) {
                tdb_allrecord_unlock(tdb, ltype);
                goto again;
        }
@@ -625,13 +625,18 @@ int tdb_unlockall_read(struct tdb_context *tdb)
 }
 #endif
 
-int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list,
-                 int ltype, enum tdb_lock_flags waitflag)
+/* Returns the list we actually locked. */
+tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
+                       int ltype, enum tdb_lock_flags waitflag)
 {
+       tdb_off_t list = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
+       /* Header can change ONLY if we had no locks before. */
+       bool can_change = tdb->num_lockrecs == 0;
+
        /* a allrecord lock allows us to avoid per chain locks */
        if (tdb->allrecord_lock.count &&
            (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
-               return 0;
+               return list;
        }
 
        if (tdb->allrecord_lock.count) {
@@ -640,11 +645,22 @@ int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list,
                         "tdb_lock_list: have %s allrecordlock\n",
                         tdb->allrecord_lock.ltype == F_RDLCK
                         ? "read" : "write");
-               return -1;
+               return TDB_OFF_ERR;
        }
 
-       /* FIXME: Should we do header_uptodate and return retry here? */
-       return tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag);
+again:
+       if (tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag))
+               return TDB_OFF_ERR;
+
+       if (can_change && unlikely(header_changed(tdb))) {
+               tdb_off_t new = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
+               if (new != list) {
+                       tdb_nest_unlock(tdb, TDB_HASH_LOCK_START+list, ltype);
+                       list = new;
+                       goto again;
+               }
+       }
+       return list;
 }
 
 int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
@@ -702,23 +718,14 @@ void tdb_unlock_free_list(struct tdb_context *tdb, tdb_off_t flist)
 }
 
 #if 0
-static int chainlock_loop(struct tdb_context *tdb, const TDB_DATA *key,
-                         int ltype, enum tdb_lock_flags waitflag,
-                         const char *func)
+static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
+                    int ltype, enum tdb_lock_flags waitflag,
+                    const char *func)
 {
        int ret;
        uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
 
-again:
-       ret = tdb_lock_list(tdb,
-                           h & ((1ULL << tdb->header.v.hash_bits) - 1),
-                           ltype, waitflag);
-       if (likely(ret == 0) && unlikely(update_header(tdb))) {
-               tdb_unlock_list(tdb, h & ((1ULL << tdb->header.v.hash_bits)-1),
-                               ltype);
-               goto again;
-       }
-
+       ret = tdb_lock_list(tdb, h, ltype, waitflag) == TDB_OFF_ERR ? -1 : 0;
        tdb_trace_1rec(tdb, func, *key);
        return ret;
 }
@@ -727,8 +734,7 @@ again:
    contention - it cannot guarantee how many records will be locked */
 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
 {
-       return chainlock_loop(tdb, &key, F_WRLCK, TDB_LOCK_WAIT,
-                             "tdb_chainlock");
+       return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
 }
 
 /* lock/unlock one hash chain, non-blocking. This is meant to be used
@@ -736,8 +742,8 @@ int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
    locked */
 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
 {
-       return chainlock_loop(tdb, &key, F_WRLCK, TDB_LOCK_NOWAIT,
-                             "tdb_chainlock_nonblock");
+       return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_NOWAIT,
+                        "tdb_chainlock_nonblock");
 }
 
 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
@@ -750,8 +756,8 @@ int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
 
 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
 {
-       return chainlock_loop(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
-                             "tdb_chainlock_read");
+       return chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
+                        "tdb_chainlock_read");
 }
 
 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
index c743067a0267fb0059be9218ad56b1305fdcb7fc..86cfd7777bdd7dbcb6c0fdeafa627fe4c01e76e9 100644 (file)
@@ -136,7 +136,7 @@ static inline uint64_t rec_extra_padding(const struct tdb_used_record *r)
 
 static inline uint64_t rec_hash(const struct tdb_used_record *r)
 {
-       return ((r->magic_and_meta >> 32) & ((1ULL << 11) - 1)) << (64 - 11);
+       return ((r->magic_and_meta >> 32) & ((1ULL << 11) - 1));
 }
 
 static inline uint16_t rec_magic(const struct tdb_used_record *r)
@@ -254,8 +254,11 @@ struct tdb_methods {
   internal prototypes
 */
 /* tdb.c: */
-/* Returns true if header changed. */
-bool update_header(struct tdb_context *tdb);
+/* Returns true if header changed (and updates it). */
+bool header_changed(struct tdb_context *tdb);
+
+/* Commit header to disk. */
+int write_header(struct tdb_context *tdb);
 
 /* Hash random memory. */
 uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len);
@@ -349,8 +352,8 @@ uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off);
 void tdb_lock_init(struct tdb_context *tdb);
 
 /* Lock/unlock a particular hash list. */
-int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list,
-                 int ltype, enum tdb_lock_flags waitflag);
+tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
+                       int ltype, enum tdb_lock_flags waitflag);
 int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype);
 
 /* Lock/unlock a particular free list. */
index 5d69eedcdc4ecf58079609628d59b902c4623b56..2a89e36a42e70ec150cda354da831bb20cf0b145 100644 (file)
@@ -21,9 +21,9 @@ null_log_fn(struct tdb_context *tdb,
 /* We do a lot of work assuming our copy of the header volatile area
  * is uptodate, and usually it is.  However, once we grab a lock, we have to
  * re-check it. */
-bool update_header(struct tdb_context *tdb)
+bool header_changed(struct tdb_context *tdb)
 {
-       struct tdb_header_volatile pad, *v;
+       uint64_t gen;
 
        if (!(tdb->flags & TDB_NOLOCK) && tdb->header_uptodate) {
                tdb->log(tdb, TDB_DEBUG_WARNING, tdb->log_priv,
@@ -33,17 +33,23 @@ bool update_header(struct tdb_context *tdb)
        /* We could get a partial update if we're not holding any locks. */
        assert((tdb->flags & TDB_NOLOCK) || tdb_has_locks(tdb));
 
-       v = tdb_get(tdb, offsetof(struct tdb_header, v), &pad, sizeof(*v));
-       if (!v) {
-               /* On failure, imply we updated header so they retry. */
-               return true;
-       }
        tdb->header_uptodate = true;
-       if (likely(memcmp(&tdb->header.v, v, sizeof(*v)) == 0)) {
-               return false;
+       gen = tdb_read_off(tdb, offsetof(struct tdb_header, v.generation));
+       if (unlikely(gen != tdb->header.v.generation)) {
+               tdb_read_convert(tdb, offsetof(struct tdb_header, v),
+                                &tdb->header.v, sizeof(tdb->header.v));
+               return true;
        }
-       tdb->header.v = *v;
-       return true;
+       return false;
+}
+
+int write_header(struct tdb_context *tdb)
+{
+       assert(tdb_read_off(tdb, offsetof(struct tdb_header, v.generation))
+              == tdb->header.v.generation);
+       tdb->header.v.generation++;
+       return tdb_write_convert(tdb, offsetof(struct tdb_header, v),
+                                &tdb->header.v, sizeof(tdb->header.v));
 }
 
 static uint64_t jenkins_hash(const void *key, size_t length, uint64_t seed,
@@ -382,10 +388,12 @@ static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
 static void unlock_lists(struct tdb_context *tdb,
                         uint64_t start, uint64_t end, int ltype)
 {
-       do {
+       for (;;) {
                tdb_unlock_list(tdb, start, ltype);
+               if (start == end)
+                       return;
                start = (start + 1) & ((1ULL << tdb->header.v.hash_bits) - 1);
-       } while (start != end);
+       }
 }
 
 /* FIXME: Return header copy? */
@@ -402,26 +410,19 @@ static tdb_off_t find_bucket_and_lock(struct tdb_context *tdb,
        uint64_t hextra;
        tdb_off_t off;
 
-       /* hash_bits might be out of date... */
-again:
-       *start = *end = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
-       hextra = hash >> tdb->header.v.hash_bits;
-
        /* FIXME: can we avoid locks for some fast paths? */
-       if (tdb_lock_list(tdb, *end, ltype, TDB_LOCK_WAIT) == -1)
+       *start = tdb_lock_list(tdb, hash, ltype, TDB_LOCK_WAIT);
+       if (*start == TDB_OFF_ERR)
                return TDB_OFF_ERR;
 
-       /* We only need to check this for first lock. */
-       if (unlikely(update_header(tdb))) {
-               tdb_unlock_list(tdb, *end, ltype);
-               goto again;
-       }
+       *end = *start;
+       hextra = hash >> tdb->header.v.hash_bits;
 
        while ((off = tdb_read_off(tdb, tdb->header.v.hash_off
                                   + *end * sizeof(tdb_off_t)))
               != TDB_OFF_ERR) {
                struct tdb_used_record pad, *r;
-               uint64_t keylen, next;
+               uint64_t keylen;
 
                /* Didn't find it? */
                if (!off)
@@ -470,16 +471,10 @@ again:
        lock_next:
                /* Lock next bucket. */
                /* FIXME: We can deadlock if this wraps! */
-               next = (*end + 1) & ((1ULL << tdb->header.v.hash_bits) - 1);
-               if (next == *start) {
-                       tdb->ecode = TDB_ERR_CORRUPT;
-                       tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                                "find_bucket_and_lock: full hash table!\n");
+               off = tdb_lock_list(tdb, ++hash, ltype, TDB_LOCK_WAIT);
+               if (off == TDB_OFF_ERR)
                        goto unlock_err;
-               }
-               if (tdb_lock_list(tdb, next, ltype, TDB_LOCK_WAIT) == -1)
-                       goto unlock_err;
-               *end = next;
+               *end = off;
        }
 
 unlock_err:
@@ -514,11 +509,9 @@ static void enlarge_hash(struct tdb_context *tdb)
        if (tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false) == -1)
                return;
 
-       if (unlikely(update_header(tdb))) {
-               /* Someone else enlarged for us?  Nothing to do. */
-               if ((1ULL << tdb->header.v.hash_bits) != num)
-                       goto unlock;
-       }
+       /* Someone else enlarged for us?  Nothing to do. */
+       if ((1ULL << tdb->header.v.hash_bits) != num)
+               goto unlock;
 
        newoff = alloc(tdb, 0, num * 2, 0, false);
        if (unlikely(newoff == TDB_OFF_ERR))
@@ -728,7 +721,8 @@ again:
        num++;
 
        for (i = chain + 1; i < chain + 1 + num; i++) {
-               if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT) == -1) {
+               if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT)
+                   == TDB_OFF_ERR) {
                        if (i != chain + 1)
                                unlock_lists(tdb, chain + 1, i-1, F_WRLCK);
                        return -1;
@@ -741,7 +735,8 @@ again:
                                                 1ULL << tdb->header.v.hash_bits);
                (*extra_locks)++;
                for (i = 0; i < *extra_locks; i++) {
-                       if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_NOWAIT)) {
+                       if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_NOWAIT)
+                           == TDB_OFF_ERR) {
                                /* Failed.  Caller must lock in order. */
                                if (i)
                                        unlock_lists(tdb, 0, i-1, F_WRLCK);
@@ -827,7 +822,8 @@ int tdb_delete(struct tdb_context *tdb, struct tdb_data key)
 
                /* We need extra locks at the start. */
                for (i = 0; i < extra_locks; i++) {
-                       if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT)) {
+                       if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT)
+                           == TDB_OFF_ERR) {
                                if (i)
                                        unlock_lists(tdb, 0, i-1, F_WRLCK);
                                return -1;
index aa2ec51a3f3b9784b9fe7af6a41eacdfb28c0f82..b8c24366b7d63b9a491c28cc587775c22f9cece1 100644 (file)
@@ -34,7 +34,7 @@ int main(int argc, char *argv[])
                ok1(rec_key_length(&rec) == klen);
                ok1(rec_data_length(&rec) == dlen);
                ok1(rec_extra_padding(&rec) == xlen);
-               ok1(rec_hash(&rec) == h);
+               ok1(rec_hash(&rec) << (64 - 11) == h);
                ok1(rec_magic(&rec) == TDB_MAGIC);
        }
        ok1(tap_log_messages == 0);
diff --git a/ccan/tdb2/test/run-simple-store.c b/ccan/tdb2/test/run-simple-store.c
new file mode 100644 (file)
index 0000000..44ac907
--- /dev/null
@@ -0,0 +1,41 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+int main(int argc, char *argv[])
+{
+       unsigned int i;
+       struct tdb_context *tdb;
+       int flags[] = { TDB_INTERNAL, TDB_DEFAULT,
+                       TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT };
+       struct tdb_data key = { (unsigned char *)"key", 3 };
+       struct tdb_data data = { (unsigned char *)"data", 4 };
+
+       plan_tests(sizeof(flags) / sizeof(flags[0]) * 9 + 1);
+       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+               tdb = tdb_open("/tmp/run-new_database.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
+               tdb->log = tap_log_fn;
+               ok1(tdb);
+               if (tdb) {
+                       /* Modify should fail. */
+                       ok1(tdb_store(tdb, key, data, TDB_MODIFY) == -1);
+                       ok1(tdb_error(tdb) == TDB_ERR_NOEXIST);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       /* Insert should succeed. */
+                       ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       /* Second insert should fail. */
+                       ok1(tdb_store(tdb, key, data, TDB_INSERT) == -1);
+                       ok1(tdb_error(tdb) == TDB_ERR_EXISTS);
+                       ok1(tdb_check(tdb, NULL, NULL) == 0);
+                       tdb_close(tdb);
+               }
+       }
+       ok1(tap_log_messages == 0);
+       return exit_status();
+}