From: Rusty Russell Date: Thu, 26 Aug 2010 10:27:56 +0000 (+0930) Subject: tdb2: now we can store a key! X-Git-Url: http://git.ozlabs.org/?p=ccan;a=commitdiff_plain;h=4916bc0a12ede1bf81c6895be9e7df39199f3290 tdb2: now we can store a key! --- diff --git a/ccan/tdb2/free.c b/ccan/tdb2/free.c index e486dcb7..134c1ff0 100644 --- a/ccan/tdb2/free.c +++ b/ccan/tdb2/free.c @@ -130,7 +130,7 @@ tdb_off_t zone_of(struct tdb_context *tdb, tdb_off_t off) return off >> tdb->header.v.zone_bits; } -/* Returns fl->max_bucket + 1, or list number to search. */ +/* Returns free_buckets + 1, or list number to search. */ static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket) { tdb_off_t first, off; @@ -138,7 +138,7 @@ static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket) /* Speculatively search for a non-zero bucket. */ first = tdb->last_zone * (tdb->header.v.free_buckets+1) + bucket; off = tdb_find_nonzero_off(tdb, free_list_off(tdb, first), - tdb->header.v.free_buckets - bucket); + tdb->header.v.free_buckets + 1 - bucket); return bucket + off; } @@ -210,7 +210,7 @@ int add_free_record(struct tdb_context *tdb, tdb->last_zone = zone_of(tdb, off); list = tdb->last_zone * (tdb->header.v.free_buckets+1) + size_to_bucket(tdb, new.data_len); - + if (tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) != 0) return -1; @@ -319,7 +319,7 @@ static tdb_off_t lock_and_alloc(struct tdb_context *tdb, tdb_len_t *actual) { tdb_off_t list; - tdb_off_t off, prev, best_off; + tdb_off_t off, best_off; struct tdb_free_record pad, best = { 0 }, *r; double multiplier; @@ -331,27 +331,21 @@ again: return TDB_OFF_ERR; } - prev = free_list_off(tdb, list); - off = tdb_read_off(tdb, prev); - - if (unlikely(off == TDB_OFF_ERR)) - goto unlock_err; - best.data_len = -1ULL; best_off = 0; multiplier = 1.0; /* Walk the list to see if any are large enough, getting less fussy * as we go. */ - while (off) { - prev = off; - off = tdb_read_off(tdb, prev); - if (unlikely(off == TDB_OFF_ERR)) - goto unlock_err; + off = tdb_read_off(tdb, free_list_off(tdb, list)); + if (unlikely(off == TDB_OFF_ERR)) + goto unlock_err; + while (off) { r = tdb_get(tdb, off, &pad, sizeof(*r)); if (!r) goto unlock_err; + if (r->magic != TDB_FREE_MAGIC) { tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, "lock_and_alloc: %llu non-free 0x%llx\n", @@ -364,18 +358,9 @@ again: best = *r; } - if (best.data_len < size * multiplier && best_off) { - /* We're happy with this size: take it. */ - if (remove_from_list(tdb, list, &best) != 0) - goto unlock_err; - tdb_unlock_free_list(tdb, list); + if (best.data_len < size * multiplier && best_off) + goto use_best; - if (to_used_record(tdb, best_off, size, best.data_len, - actual)) { - return -1; - } - return best_off; - } multiplier *= 1.01; /* Since we're going slow anyway, try coalescing here. */ @@ -387,6 +372,22 @@ again: /* This has unlocked list, restart. */ goto again; } + off = r->next; + } + + /* If we found anything at all, use it. */ + if (best_off) { + use_best: + /* We're happy with this size: take it. */ + if (remove_from_list(tdb, list, &best) != 0) + goto unlock_err; + tdb_unlock_free_list(tdb, list); + + if (to_used_record(tdb, best_off, size, best.data_len, + actual)) { + return -1; + } + return best_off; } tdb_unlock_free_list(tdb, list); @@ -440,11 +441,11 @@ static tdb_off_t get_free(struct tdb_context *tdb, size_t size, tdb_off_t b; /* Start at exact size bucket, and search up... */ - for (b = bucket; b <= tdb->header.v.num_zones; b++) { + for (b = bucket; b <= tdb->header.v.free_buckets; b++) { b = find_free_head(tdb, b); /* Non-empty list? Try getting block. */ - if (b <= tdb->header.v.num_zones) { + if (b <= tdb->header.v.free_buckets) { /* Try getting one from list. */ off = lock_and_alloc(tdb, b, size, actual); if (off == TDB_OFF_ERR) @@ -698,7 +699,6 @@ int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen, } } - /* Free up the old free buckets. */ old_free_off -= sizeof(fhdr); if (tdb_read_convert(tdb, old_free_off, &fhdr, sizeof(fhdr)) == -1) @@ -714,6 +714,8 @@ int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen, /* Start allocating from where the new space is. */ tdb->last_zone = zone_of(tdb, tdb->map_size - add); tdb_access_release(tdb, oldf); + if (write_header(tdb) == -1) + goto fail; success: tdb_allrecord_unlock(tdb, F_WRLCK); return 0; diff --git a/ccan/tdb2/lock.c b/ccan/tdb2/lock.c index 96815f26..3230b25e 100644 --- a/ccan/tdb2/lock.c +++ b/ccan/tdb2/lock.c @@ -502,7 +502,7 @@ again: tdb->allrecord_lock.off = upgradable; /* Now we re-check header, holding lock. */ - if (unlikely(update_header(tdb))) { + if (unlikely(header_changed(tdb))) { tdb_allrecord_unlock(tdb, ltype); goto again; } @@ -625,13 +625,18 @@ int tdb_unlockall_read(struct tdb_context *tdb) } #endif -int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list, - int ltype, enum tdb_lock_flags waitflag) +/* Returns the list we actually locked. */ +tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash, + int ltype, enum tdb_lock_flags waitflag) { + tdb_off_t list = hash & ((1ULL << tdb->header.v.hash_bits) - 1); + /* Header can change ONLY if we had no locks before. */ + bool can_change = tdb->num_lockrecs == 0; + /* a allrecord lock allows us to avoid per chain locks */ if (tdb->allrecord_lock.count && (ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) { - return 0; + return list; } if (tdb->allrecord_lock.count) { @@ -640,11 +645,22 @@ int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list, "tdb_lock_list: have %s allrecordlock\n", tdb->allrecord_lock.ltype == F_RDLCK ? "read" : "write"); - return -1; + return TDB_OFF_ERR; } - /* FIXME: Should we do header_uptodate and return retry here? */ - return tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag); +again: + if (tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag)) + return TDB_OFF_ERR; + + if (can_change && unlikely(header_changed(tdb))) { + tdb_off_t new = hash & ((1ULL << tdb->header.v.hash_bits) - 1); + if (new != list) { + tdb_nest_unlock(tdb, TDB_HASH_LOCK_START+list, ltype); + list = new; + goto again; + } + } + return list; } int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype) @@ -702,23 +718,14 @@ void tdb_unlock_free_list(struct tdb_context *tdb, tdb_off_t flist) } #if 0 -static int chainlock_loop(struct tdb_context *tdb, const TDB_DATA *key, - int ltype, enum tdb_lock_flags waitflag, - const char *func) +static int chainlock(struct tdb_context *tdb, const TDB_DATA *key, + int ltype, enum tdb_lock_flags waitflag, + const char *func) { int ret; uint64_t h = tdb_hash(tdb, key->dptr, key->dsize); -again: - ret = tdb_lock_list(tdb, - h & ((1ULL << tdb->header.v.hash_bits) - 1), - ltype, waitflag); - if (likely(ret == 0) && unlikely(update_header(tdb))) { - tdb_unlock_list(tdb, h & ((1ULL << tdb->header.v.hash_bits)-1), - ltype); - goto again; - } - + ret = tdb_lock_list(tdb, h, ltype, waitflag) == TDB_OFF_ERR ? -1 : 0; tdb_trace_1rec(tdb, func, *key); return ret; } @@ -727,8 +734,7 @@ again: contention - it cannot guarantee how many records will be locked */ int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key) { - return chainlock_loop(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, - "tdb_chainlock"); + return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock"); } /* lock/unlock one hash chain, non-blocking. This is meant to be used @@ -736,8 +742,8 @@ int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key) locked */ int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key) { - return chainlock_loop(tdb, &key, F_WRLCK, TDB_LOCK_NOWAIT, - "tdb_chainlock_nonblock"); + return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_NOWAIT, + "tdb_chainlock_nonblock"); } int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key) @@ -750,8 +756,8 @@ int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key) int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key) { - return chainlock_loop(tdb, &key, F_RDLCK, TDB_LOCK_WAIT, - "tdb_chainlock_read"); + return chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT, + "tdb_chainlock_read"); } int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key) diff --git a/ccan/tdb2/private.h b/ccan/tdb2/private.h index c743067a..86cfd777 100644 --- a/ccan/tdb2/private.h +++ b/ccan/tdb2/private.h @@ -136,7 +136,7 @@ static inline uint64_t rec_extra_padding(const struct tdb_used_record *r) static inline uint64_t rec_hash(const struct tdb_used_record *r) { - return ((r->magic_and_meta >> 32) & ((1ULL << 11) - 1)) << (64 - 11); + return ((r->magic_and_meta >> 32) & ((1ULL << 11) - 1)); } static inline uint16_t rec_magic(const struct tdb_used_record *r) @@ -254,8 +254,11 @@ struct tdb_methods { internal prototypes */ /* tdb.c: */ -/* Returns true if header changed. */ -bool update_header(struct tdb_context *tdb); +/* Returns true if header changed (and updates it). */ +bool header_changed(struct tdb_context *tdb); + +/* Commit header to disk. */ +int write_header(struct tdb_context *tdb); /* Hash random memory. */ uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len); @@ -349,8 +352,8 @@ uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off); void tdb_lock_init(struct tdb_context *tdb); /* Lock/unlock a particular hash list. */ -int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list, - int ltype, enum tdb_lock_flags waitflag); +tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash, + int ltype, enum tdb_lock_flags waitflag); int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype); /* Lock/unlock a particular free list. */ diff --git a/ccan/tdb2/tdb.c b/ccan/tdb2/tdb.c index 5d69eedc..2a89e36a 100644 --- a/ccan/tdb2/tdb.c +++ b/ccan/tdb2/tdb.c @@ -21,9 +21,9 @@ null_log_fn(struct tdb_context *tdb, /* We do a lot of work assuming our copy of the header volatile area * is uptodate, and usually it is. However, once we grab a lock, we have to * re-check it. */ -bool update_header(struct tdb_context *tdb) +bool header_changed(struct tdb_context *tdb) { - struct tdb_header_volatile pad, *v; + uint64_t gen; if (!(tdb->flags & TDB_NOLOCK) && tdb->header_uptodate) { tdb->log(tdb, TDB_DEBUG_WARNING, tdb->log_priv, @@ -33,17 +33,23 @@ bool update_header(struct tdb_context *tdb) /* We could get a partial update if we're not holding any locks. */ assert((tdb->flags & TDB_NOLOCK) || tdb_has_locks(tdb)); - v = tdb_get(tdb, offsetof(struct tdb_header, v), &pad, sizeof(*v)); - if (!v) { - /* On failure, imply we updated header so they retry. */ - return true; - } tdb->header_uptodate = true; - if (likely(memcmp(&tdb->header.v, v, sizeof(*v)) == 0)) { - return false; + gen = tdb_read_off(tdb, offsetof(struct tdb_header, v.generation)); + if (unlikely(gen != tdb->header.v.generation)) { + tdb_read_convert(tdb, offsetof(struct tdb_header, v), + &tdb->header.v, sizeof(tdb->header.v)); + return true; } - tdb->header.v = *v; - return true; + return false; +} + +int write_header(struct tdb_context *tdb) +{ + assert(tdb_read_off(tdb, offsetof(struct tdb_header, v.generation)) + == tdb->header.v.generation); + tdb->header.v.generation++; + return tdb_write_convert(tdb, offsetof(struct tdb_header, v), + &tdb->header.v, sizeof(tdb->header.v)); } static uint64_t jenkins_hash(const void *key, size_t length, uint64_t seed, @@ -382,10 +388,12 @@ static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data) static void unlock_lists(struct tdb_context *tdb, uint64_t start, uint64_t end, int ltype) { - do { + for (;;) { tdb_unlock_list(tdb, start, ltype); + if (start == end) + return; start = (start + 1) & ((1ULL << tdb->header.v.hash_bits) - 1); - } while (start != end); + } } /* FIXME: Return header copy? */ @@ -402,26 +410,19 @@ static tdb_off_t find_bucket_and_lock(struct tdb_context *tdb, uint64_t hextra; tdb_off_t off; - /* hash_bits might be out of date... */ -again: - *start = *end = hash & ((1ULL << tdb->header.v.hash_bits) - 1); - hextra = hash >> tdb->header.v.hash_bits; - /* FIXME: can we avoid locks for some fast paths? */ - if (tdb_lock_list(tdb, *end, ltype, TDB_LOCK_WAIT) == -1) + *start = tdb_lock_list(tdb, hash, ltype, TDB_LOCK_WAIT); + if (*start == TDB_OFF_ERR) return TDB_OFF_ERR; - /* We only need to check this for first lock. */ - if (unlikely(update_header(tdb))) { - tdb_unlock_list(tdb, *end, ltype); - goto again; - } + *end = *start; + hextra = hash >> tdb->header.v.hash_bits; while ((off = tdb_read_off(tdb, tdb->header.v.hash_off + *end * sizeof(tdb_off_t))) != TDB_OFF_ERR) { struct tdb_used_record pad, *r; - uint64_t keylen, next; + uint64_t keylen; /* Didn't find it? */ if (!off) @@ -470,16 +471,10 @@ again: lock_next: /* Lock next bucket. */ /* FIXME: We can deadlock if this wraps! */ - next = (*end + 1) & ((1ULL << tdb->header.v.hash_bits) - 1); - if (next == *start) { - tdb->ecode = TDB_ERR_CORRUPT; - tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, - "find_bucket_and_lock: full hash table!\n"); + off = tdb_lock_list(tdb, ++hash, ltype, TDB_LOCK_WAIT); + if (off == TDB_OFF_ERR) goto unlock_err; - } - if (tdb_lock_list(tdb, next, ltype, TDB_LOCK_WAIT) == -1) - goto unlock_err; - *end = next; + *end = off; } unlock_err: @@ -514,11 +509,9 @@ static void enlarge_hash(struct tdb_context *tdb) if (tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false) == -1) return; - if (unlikely(update_header(tdb))) { - /* Someone else enlarged for us? Nothing to do. */ - if ((1ULL << tdb->header.v.hash_bits) != num) - goto unlock; - } + /* Someone else enlarged for us? Nothing to do. */ + if ((1ULL << tdb->header.v.hash_bits) != num) + goto unlock; newoff = alloc(tdb, 0, num * 2, 0, false); if (unlikely(newoff == TDB_OFF_ERR)) @@ -728,7 +721,8 @@ again: num++; for (i = chain + 1; i < chain + 1 + num; i++) { - if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT) == -1) { + if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT) + == TDB_OFF_ERR) { if (i != chain + 1) unlock_lists(tdb, chain + 1, i-1, F_WRLCK); return -1; @@ -741,7 +735,8 @@ again: 1ULL << tdb->header.v.hash_bits); (*extra_locks)++; for (i = 0; i < *extra_locks; i++) { - if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_NOWAIT)) { + if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_NOWAIT) + == TDB_OFF_ERR) { /* Failed. Caller must lock in order. */ if (i) unlock_lists(tdb, 0, i-1, F_WRLCK); @@ -827,7 +822,8 @@ int tdb_delete(struct tdb_context *tdb, struct tdb_data key) /* We need extra locks at the start. */ for (i = 0; i < extra_locks; i++) { - if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT)) { + if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT) + == TDB_OFF_ERR) { if (i) unlock_lists(tdb, 0, i-1, F_WRLCK); return -1; diff --git a/ccan/tdb2/test/run-encode.c b/ccan/tdb2/test/run-encode.c index aa2ec51a..b8c24366 100644 --- a/ccan/tdb2/test/run-encode.c +++ b/ccan/tdb2/test/run-encode.c @@ -34,7 +34,7 @@ int main(int argc, char *argv[]) ok1(rec_key_length(&rec) == klen); ok1(rec_data_length(&rec) == dlen); ok1(rec_extra_padding(&rec) == xlen); - ok1(rec_hash(&rec) == h); + ok1(rec_hash(&rec) << (64 - 11) == h); ok1(rec_magic(&rec) == TDB_MAGIC); } ok1(tap_log_messages == 0); diff --git a/ccan/tdb2/test/run-simple-store.c b/ccan/tdb2/test/run-simple-store.c new file mode 100644 index 00000000..44ac9071 --- /dev/null +++ b/ccan/tdb2/test/run-simple-store.c @@ -0,0 +1,41 @@ +#include +#include +#include +#include +#include +#include +#include "logging.h" + +int main(int argc, char *argv[]) +{ + unsigned int i; + struct tdb_context *tdb; + int flags[] = { TDB_INTERNAL, TDB_DEFAULT, + TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT }; + struct tdb_data key = { (unsigned char *)"key", 3 }; + struct tdb_data data = { (unsigned char *)"data", 4 }; + + plan_tests(sizeof(flags) / sizeof(flags[0]) * 9 + 1); + for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) { + tdb = tdb_open("/tmp/run-new_database.tdb", flags[i], + O_RDWR|O_CREAT|O_TRUNC, 0600, NULL); + tdb->log = tap_log_fn; + ok1(tdb); + if (tdb) { + /* Modify should fail. */ + ok1(tdb_store(tdb, key, data, TDB_MODIFY) == -1); + ok1(tdb_error(tdb) == TDB_ERR_NOEXIST); + ok1(tdb_check(tdb, NULL, NULL) == 0); + /* Insert should succeed. */ + ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0); + ok1(tdb_check(tdb, NULL, NULL) == 0); + /* Second insert should fail. */ + ok1(tdb_store(tdb, key, data, TDB_INSERT) == -1); + ok1(tdb_error(tdb) == TDB_ERR_EXISTS); + ok1(tdb_check(tdb, NULL, NULL) == 0); + tdb_close(tdb); + } + } + ok1(tap_log_messages == 0); + return exit_status(); +}