tdb2: shorten attribute members.
[ccan] / ccan / tdb2 / hash.c
1  /*
2    Trivial Database 2: hash handling
3    Copyright (C) Rusty Russell 2010
4
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 3 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "private.h"
19 #include <assert.h>
20 #include <ccan/hash/hash.h>
21
22 static uint64_t jenkins_hash(const void *key, size_t length, uint64_t seed,
23                              void *arg)
24 {
25         uint64_t ret;
26         /* hash64_stable assumes lower bits are more important; they are a
27          * slightly better hash.  We use the upper bits first, so swap them. */
28         ret = hash64_stable((const unsigned char *)key, length, seed);
29         return (ret >> 32) | (ret << 32);
30 }
31
32 void tdb_hash_init(struct tdb_context *tdb)
33 {
34         tdb->hashfn = jenkins_hash;
35 }
36
37 uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len)
38 {
39         return tdb->hashfn(ptr, len, tdb->hash_seed, tdb->hash_data);
40 }
41
42 uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
43 {
44         const struct tdb_used_record *r;
45         const void *key;
46         uint64_t klen, hash;
47
48         r = tdb_access_read(tdb, off, sizeof(*r), true);
49         if (TDB_PTR_IS_ERR(r)) {
50                 /* FIXME */
51                 return 0;
52         }
53
54         klen = rec_key_length(r);
55         tdb_access_release(tdb, r);
56
57         key = tdb_access_read(tdb, off + sizeof(*r), klen, false);
58         if (TDB_PTR_IS_ERR(key)) {
59                 return 0;
60         }
61
62         hash = tdb_hash(tdb, key, klen);
63         tdb_access_release(tdb, key);
64         return hash;
65 }
66
67 /* Get bits from a value. */
68 static uint32_t bits_from(uint64_t val, unsigned start, unsigned num)
69 {
70         assert(num <= 32);
71         return (val >> start) & ((1U << num) - 1);
72 }
73
74 /* We take bits from the top: that way we can lock whole sections of the hash
75  * by using lock ranges. */
76 static uint32_t use_bits(struct hash_info *h, unsigned num)
77 {
78         h->hash_used += num;
79         return bits_from(h->h, 64 - h->hash_used, num);
80 }
81
82 static tdb_bool_err key_matches(struct tdb_context *tdb,
83                                 const struct tdb_used_record *rec,
84                                 tdb_off_t off,
85                                 const struct tdb_data *key)
86 {
87         tdb_bool_err ret = false;
88         const char *rkey;
89
90         if (rec_key_length(rec) != key->dsize) {
91                 add_stat(tdb, compare_wrong_keylen, 1);
92                 return ret;
93         }
94
95         rkey = tdb_access_read(tdb, off + sizeof(*rec), key->dsize, false);
96         if (TDB_PTR_IS_ERR(rkey)) {
97                 return TDB_PTR_ERR(rkey);
98         }
99         if (memcmp(rkey, key->dptr, key->dsize) == 0)
100                 ret = true;
101         else
102                 add_stat(tdb, compare_wrong_keycmp, 1);
103         tdb_access_release(tdb, rkey);
104         return ret;
105 }
106
107 /* Does entry match? */
108 static tdb_bool_err match(struct tdb_context *tdb,
109                           struct hash_info *h,
110                           const struct tdb_data *key,
111                           tdb_off_t val,
112                           struct tdb_used_record *rec)
113 {
114         tdb_off_t off;
115         enum TDB_ERROR ecode;
116
117         add_stat(tdb, compares, 1);
118         /* Desired bucket must match. */
119         if (h->home_bucket != (val & TDB_OFF_HASH_GROUP_MASK)) {
120                 add_stat(tdb, compare_wrong_bucket, 1);
121                 return false;
122         }
123
124         /* Top bits of offset == next bits of hash. */
125         if (bits_from(val, TDB_OFF_HASH_EXTRA_BIT, TDB_OFF_UPPER_STEAL_EXTRA)
126             != bits_from(h->h, 64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
127                     TDB_OFF_UPPER_STEAL_EXTRA)) {
128                 add_stat(tdb, compare_wrong_offsetbits, 1);
129                 return false;
130         }
131
132         off = val & TDB_OFF_MASK;
133         ecode = tdb_read_convert(tdb, off, rec, sizeof(*rec));
134         if (ecode != TDB_SUCCESS) {
135                 return ecode;
136         }
137
138         if ((h->h & ((1 << 11)-1)) != rec_hash(rec)) {
139                 add_stat(tdb, compare_wrong_rechash, 1);
140                 return false;
141         }
142
143         return key_matches(tdb, rec, off, key);
144 }
145
146 static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned bucket)
147 {
148         return group_start
149                 + (bucket % (1 << TDB_HASH_GROUP_BITS)) * sizeof(tdb_off_t);
150 }
151
152 bool is_subhash(tdb_off_t val)
153 {
154         return (val >> TDB_OFF_UPPER_STEAL_SUBHASH_BIT) & 1;
155 }
156
157 /* FIXME: Guess the depth, don't over-lock! */
158 static tdb_off_t hlock_range(tdb_off_t group, tdb_off_t *size)
159 {
160         *size = 1ULL << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
161         return group << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
162 }
163
164 static tdb_off_t COLD find_in_chain(struct tdb_context *tdb,
165                                     struct tdb_data key,
166                                     tdb_off_t chain,
167                                     struct hash_info *h,
168                                     struct tdb_used_record *rec,
169                                     struct traverse_info *tinfo)
170 {
171         tdb_off_t off, next;
172         enum TDB_ERROR ecode;
173
174         /* In case nothing is free, we set these to zero. */
175         h->home_bucket = h->found_bucket = 0;
176
177         for (off = chain; off; off = next) {
178                 unsigned int i;
179
180                 h->group_start = off;
181                 ecode = tdb_read_convert(tdb, off, h->group, sizeof(h->group));
182                 if (ecode != TDB_SUCCESS) {
183                         return ecode;
184                 }
185
186                 for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
187                         tdb_off_t recoff;
188                         if (!h->group[i]) {
189                                 /* Remember this empty bucket. */
190                                 h->home_bucket = h->found_bucket = i;
191                                 continue;
192                         }
193
194                         /* We can insert extra bits via add_to_hash
195                          * empty bucket logic. */
196                         recoff = h->group[i] & TDB_OFF_MASK;
197                         ecode = tdb_read_convert(tdb, recoff, rec,
198                                                  sizeof(*rec));
199                         if (ecode != TDB_SUCCESS) {
200                                 return ecode;
201                         }
202
203                         ecode = key_matches(tdb, rec, recoff, &key);
204                         if (ecode < 0) {
205                                 return ecode;
206                         }
207                         if (ecode == 1) {
208                                 h->home_bucket = h->found_bucket = i;
209
210                                 if (tinfo) {
211                                         tinfo->levels[tinfo->num_levels]
212                                                 .hashtable = off;
213                                         tinfo->levels[tinfo->num_levels]
214                                                 .total_buckets
215                                                 = 1 << TDB_HASH_GROUP_BITS;
216                                         tinfo->levels[tinfo->num_levels].entry
217                                                 = i;
218                                         tinfo->num_levels++;
219                                 }
220                                 return recoff;
221                         }
222                 }
223                 next = tdb_read_off(tdb, off
224                                     + offsetof(struct tdb_chain, next));
225                 if (TDB_OFF_IS_ERR(next)) {
226                         return next;
227                 }
228                 if (next)
229                         next += sizeof(struct tdb_used_record);
230         }
231         return 0;
232 }
233
234 /* This is the core routine which searches the hashtable for an entry.
235  * On error, no locks are held and -ve is returned.
236  * Otherwise, hinfo is filled in (and the optional tinfo).
237  * If not found, the return value is 0.
238  * If found, the return value is the offset, and *rec is the record. */
239 tdb_off_t find_and_lock(struct tdb_context *tdb,
240                         struct tdb_data key,
241                         int ltype,
242                         struct hash_info *h,
243                         struct tdb_used_record *rec,
244                         struct traverse_info *tinfo)
245 {
246         uint32_t i, group;
247         tdb_off_t hashtable;
248         enum TDB_ERROR ecode;
249
250         h->h = tdb_hash(tdb, key.dptr, key.dsize);
251         h->hash_used = 0;
252         group = use_bits(h, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
253         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
254
255         h->hlock_start = hlock_range(group, &h->hlock_range);
256         ecode = tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
257                                 TDB_LOCK_WAIT);
258         if (ecode != TDB_SUCCESS) {
259                 return ecode;
260         }
261
262         hashtable = offsetof(struct tdb_header, hashtable);
263         if (tinfo) {
264                 tinfo->toplevel_group = group;
265                 tinfo->num_levels = 1;
266                 tinfo->levels[0].entry = 0;
267                 tinfo->levels[0].hashtable = hashtable
268                         + (group << TDB_HASH_GROUP_BITS) * sizeof(tdb_off_t);
269                 tinfo->levels[0].total_buckets = 1 << TDB_HASH_GROUP_BITS;
270         }
271
272         while (h->hash_used <= 64) {
273                 /* Read in the hash group. */
274                 h->group_start = hashtable
275                         + group * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
276
277                 ecode = tdb_read_convert(tdb, h->group_start, &h->group,
278                                          sizeof(h->group));
279                 if (ecode != TDB_SUCCESS) {
280                         goto fail;
281                 }
282
283                 /* Pointer to another hash table?  Go down... */
284                 if (is_subhash(h->group[h->home_bucket])) {
285                         hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
286                                 + sizeof(struct tdb_used_record);
287                         if (tinfo) {
288                                 /* When we come back, use *next* bucket */
289                                 tinfo->levels[tinfo->num_levels-1].entry
290                                         += h->home_bucket + 1;
291                         }
292                         group = use_bits(h, TDB_SUBLEVEL_HASH_BITS
293                                          - TDB_HASH_GROUP_BITS);
294                         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
295                         if (tinfo) {
296                                 tinfo->levels[tinfo->num_levels].hashtable
297                                         = hashtable;
298                                 tinfo->levels[tinfo->num_levels].total_buckets
299                                         = 1 << TDB_SUBLEVEL_HASH_BITS;
300                                 tinfo->levels[tinfo->num_levels].entry
301                                         = group << TDB_HASH_GROUP_BITS;
302                                 tinfo->num_levels++;
303                         }
304                         continue;
305                 }
306
307                 /* It's in this group: search (until 0 or all searched) */
308                 for (i = 0, h->found_bucket = h->home_bucket;
309                      i < (1 << TDB_HASH_GROUP_BITS);
310                      i++, h->found_bucket = ((h->found_bucket+1)
311                                              % (1 << TDB_HASH_GROUP_BITS))) {
312                         tdb_bool_err berr;
313                         if (is_subhash(h->group[h->found_bucket]))
314                                 continue;
315
316                         if (!h->group[h->found_bucket])
317                                 break;
318
319                         berr = match(tdb, h, &key, h->group[h->found_bucket],
320                                      rec);
321                         if (berr < 0) {
322                                 ecode = berr;
323                                 goto fail;
324                         }
325                         if (berr) {
326                                 if (tinfo) {
327                                         tinfo->levels[tinfo->num_levels-1].entry
328                                                 += h->found_bucket;
329                                 }
330                                 return h->group[h->found_bucket] & TDB_OFF_MASK;
331                         }
332                 }
333                 /* Didn't find it: h indicates where it would go. */
334                 return 0;
335         }
336
337         return find_in_chain(tdb, key, hashtable, h, rec, tinfo);
338
339 fail:
340         tdb_unlock_hashes(tdb, h->hlock_start, h->hlock_range, ltype);
341         return ecode;
342 }
343
344 /* I wrote a simple test, expanding a hash to 2GB, for the following
345  * cases:
346  * 1) Expanding all the buckets at once,
347  * 2) Expanding the bucket we wanted to place the new entry into.
348  * 3) Expanding the most-populated bucket,
349  *
350  * I measured the worst/average/best density during this process.
351  * 1) 3%/16%/30%
352  * 2) 4%/20%/38%
353  * 3) 6%/22%/41%
354  *
355  * So we figure out the busiest bucket for the moment.
356  */
357 static unsigned fullest_bucket(struct tdb_context *tdb,
358                                const tdb_off_t *group,
359                                unsigned new_bucket)
360 {
361         unsigned counts[1 << TDB_HASH_GROUP_BITS] = { 0 };
362         unsigned int i, best_bucket;
363
364         /* Count the new entry. */
365         counts[new_bucket]++;
366         best_bucket = new_bucket;
367
368         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
369                 unsigned this_bucket;
370
371                 if (is_subhash(group[i]))
372                         continue;
373                 this_bucket = group[i] & TDB_OFF_HASH_GROUP_MASK;
374                 if (++counts[this_bucket] > counts[best_bucket])
375                         best_bucket = this_bucket;
376         }
377
378         return best_bucket;
379 }
380
381 static bool put_into_group(tdb_off_t *group,
382                            unsigned bucket, tdb_off_t encoded)
383 {
384         unsigned int i;
385
386         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
387                 unsigned b = (bucket + i) % (1 << TDB_HASH_GROUP_BITS);
388
389                 if (group[b] == 0) {
390                         group[b] = encoded;
391                         return true;
392                 }
393         }
394         return false;
395 }
396
397 static void force_into_group(tdb_off_t *group,
398                              unsigned bucket, tdb_off_t encoded)
399 {
400         if (!put_into_group(group, bucket, encoded))
401                 abort();
402 }
403
404 static tdb_off_t encode_offset(tdb_off_t new_off, struct hash_info *h)
405 {
406         return h->home_bucket
407                 | new_off
408                 | ((uint64_t)bits_from(h->h,
409                                   64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
410                                   TDB_OFF_UPPER_STEAL_EXTRA)
411                    << TDB_OFF_HASH_EXTRA_BIT);
412 }
413
414 /* Simply overwrite the hash entry we found before. */
415 enum TDB_ERROR replace_in_hash(struct tdb_context *tdb,
416                                struct hash_info *h,
417                                tdb_off_t new_off)
418 {
419         return tdb_write_off(tdb, hbucket_off(h->group_start, h->found_bucket),
420                              encode_offset(new_off, h));
421 }
422
423 /* We slot in anywhere that's empty in the chain. */
424 static enum TDB_ERROR COLD add_to_chain(struct tdb_context *tdb,
425                                         tdb_off_t subhash,
426                                         tdb_off_t new_off)
427 {
428         tdb_off_t entry;
429         enum TDB_ERROR ecode;
430
431         entry = tdb_find_zero_off(tdb, subhash, 1<<TDB_HASH_GROUP_BITS);
432         if (TDB_OFF_IS_ERR(entry)) {
433                 return entry;
434         }
435
436         if (entry == 1 << TDB_HASH_GROUP_BITS) {
437                 tdb_off_t next;
438
439                 next = tdb_read_off(tdb, subhash
440                                     + offsetof(struct tdb_chain, next));
441                 if (TDB_OFF_IS_ERR(next)) {
442                         return next;
443                 }
444
445                 if (!next) {
446                         next = alloc(tdb, 0, sizeof(struct tdb_chain), 0,
447                                      TDB_CHAIN_MAGIC, false);
448                         if (TDB_OFF_IS_ERR(next))
449                                 return next;
450                         ecode = zero_out(tdb,
451                                          next+sizeof(struct tdb_used_record),
452                                          sizeof(struct tdb_chain));
453                         if (ecode != TDB_SUCCESS) {
454                                 return ecode;
455                         }
456                         ecode = tdb_write_off(tdb, subhash
457                                               + offsetof(struct tdb_chain,
458                                                          next),
459                                               next);
460                         if (ecode != TDB_SUCCESS) {
461                                 return ecode;
462                         }
463                 }
464                 return add_to_chain(tdb, next, new_off);
465         }
466
467         return tdb_write_off(tdb, subhash + entry * sizeof(tdb_off_t),
468                              new_off);
469 }
470
471 /* Add into a newly created subhash. */
472 static enum TDB_ERROR add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash,
473                                      unsigned hash_used, tdb_off_t val)
474 {
475         tdb_off_t off = (val & TDB_OFF_MASK), *group;
476         struct hash_info h;
477         unsigned int gnum;
478
479         h.hash_used = hash_used;
480
481         if (hash_used + TDB_SUBLEVEL_HASH_BITS > 64)
482                 return add_to_chain(tdb, subhash, off);
483
484         h.h = hash_record(tdb, off);
485         gnum = use_bits(&h, TDB_SUBLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS);
486         h.group_start = subhash
487                 + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
488         h.home_bucket = use_bits(&h, TDB_HASH_GROUP_BITS);
489
490         group = tdb_access_write(tdb, h.group_start,
491                                  sizeof(*group) << TDB_HASH_GROUP_BITS, true);
492         if (TDB_PTR_IS_ERR(group)) {
493                 return TDB_PTR_ERR(group);
494         }
495         force_into_group(group, h.home_bucket, encode_offset(off, &h));
496         return tdb_access_commit(tdb, group);
497 }
498
499 static enum TDB_ERROR expand_group(struct tdb_context *tdb, struct hash_info *h)
500 {
501         unsigned bucket, num_vals, i, magic;
502         size_t subsize;
503         tdb_off_t subhash;
504         tdb_off_t vals[1 << TDB_HASH_GROUP_BITS];
505         enum TDB_ERROR ecode;
506
507         /* Attach new empty subhash under fullest bucket. */
508         bucket = fullest_bucket(tdb, h->group, h->home_bucket);
509
510         if (h->hash_used == 64) {
511                 add_stat(tdb, alloc_chain, 1);
512                 subsize = sizeof(struct tdb_chain);
513                 magic = TDB_CHAIN_MAGIC;
514         } else {
515                 add_stat(tdb, alloc_subhash, 1);
516                 subsize = (sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS);
517                 magic = TDB_HTABLE_MAGIC;
518         }
519
520         subhash = alloc(tdb, 0, subsize, 0, magic, false);
521         if (TDB_OFF_IS_ERR(subhash)) {
522                 return subhash;
523         }
524
525         ecode = zero_out(tdb, subhash + sizeof(struct tdb_used_record),
526                          subsize);
527         if (ecode != TDB_SUCCESS) {
528                 return ecode;
529         }
530
531         /* Remove any which are destined for bucket or are in wrong place. */
532         num_vals = 0;
533         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
534                 unsigned home_bucket = h->group[i] & TDB_OFF_HASH_GROUP_MASK;
535                 if (!h->group[i] || is_subhash(h->group[i]))
536                         continue;
537                 if (home_bucket == bucket || home_bucket != i) {
538                         vals[num_vals++] = h->group[i];
539                         h->group[i] = 0;
540                 }
541         }
542         /* FIXME: This assert is valid, but we do this during unit test :( */
543         /* assert(num_vals); */
544
545         /* Overwrite expanded bucket with subhash pointer. */
546         h->group[bucket] = subhash | (1ULL << TDB_OFF_UPPER_STEAL_SUBHASH_BIT);
547
548         /* Point to actual contents of record. */
549         subhash += sizeof(struct tdb_used_record);
550
551         /* Put values back. */
552         for (i = 0; i < num_vals; i++) {
553                 unsigned this_bucket = vals[i] & TDB_OFF_HASH_GROUP_MASK;
554
555                 if (this_bucket == bucket) {
556                         ecode = add_to_subhash(tdb, subhash, h->hash_used,
557                                                vals[i]);
558                         if (ecode != TDB_SUCCESS)
559                                 return ecode;
560                 } else {
561                         /* There should be room to put this back. */
562                         force_into_group(h->group, this_bucket, vals[i]);
563                 }
564         }
565         return TDB_SUCCESS;
566 }
567
568 enum TDB_ERROR delete_from_hash(struct tdb_context *tdb, struct hash_info *h)
569 {
570         unsigned int i, num_movers = 0;
571         tdb_off_t movers[1 << TDB_HASH_GROUP_BITS];
572
573         h->group[h->found_bucket] = 0;
574         for (i = 1; i < (1 << TDB_HASH_GROUP_BITS); i++) {
575                 unsigned this_bucket;
576
577                 this_bucket = (h->found_bucket+i) % (1 << TDB_HASH_GROUP_BITS);
578                 /* Empty bucket?  We're done. */
579                 if (!h->group[this_bucket])
580                         break;
581
582                 /* Ignore subhashes. */
583                 if (is_subhash(h->group[this_bucket]))
584                         continue;
585
586                 /* If this one is not happy where it is, we'll move it. */
587                 if ((h->group[this_bucket] & TDB_OFF_HASH_GROUP_MASK)
588                     != this_bucket) {
589                         movers[num_movers++] = h->group[this_bucket];
590                         h->group[this_bucket] = 0;
591                 }
592         }
593
594         /* Put back the ones we erased. */
595         for (i = 0; i < num_movers; i++) {
596                 force_into_group(h->group, movers[i] & TDB_OFF_HASH_GROUP_MASK,
597                                  movers[i]);
598         }
599
600         /* Now we write back the hash group */
601         return tdb_write_convert(tdb, h->group_start,
602                                  h->group, sizeof(h->group));
603 }
604
605 enum TDB_ERROR add_to_hash(struct tdb_context *tdb, struct hash_info *h,
606                            tdb_off_t new_off)
607 {
608         enum TDB_ERROR ecode;
609
610         /* We hit an empty bucket during search?  That's where it goes. */
611         if (!h->group[h->found_bucket]) {
612                 h->group[h->found_bucket] = encode_offset(new_off, h);
613                 /* Write back the modified group. */
614                 return tdb_write_convert(tdb, h->group_start,
615                                          h->group, sizeof(h->group));
616         }
617
618         if (h->hash_used > 64)
619                 return add_to_chain(tdb, h->group_start, new_off);
620
621         /* We're full.  Expand. */
622         ecode = expand_group(tdb, h);
623         if (ecode != TDB_SUCCESS) {
624                 return ecode;
625         }
626
627         if (is_subhash(h->group[h->home_bucket])) {
628                 /* We were expanded! */
629                 tdb_off_t hashtable;
630                 unsigned int gnum;
631
632                 /* Write back the modified group. */
633                 ecode = tdb_write_convert(tdb, h->group_start, h->group,
634                                           sizeof(h->group));
635                 if (ecode != TDB_SUCCESS) {
636                         return ecode;
637                 }
638
639                 /* Move hashinfo down a level. */
640                 hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
641                         + sizeof(struct tdb_used_record);
642                 gnum = use_bits(h,TDB_SUBLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
643                 h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
644                 h->group_start = hashtable
645                         + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
646                 ecode = tdb_read_convert(tdb, h->group_start, &h->group,
647                                          sizeof(h->group));
648                 if (ecode != TDB_SUCCESS) {
649                         return ecode;
650                 }
651         }
652
653         /* Expanding the group must have made room if it didn't choose this
654          * bucket. */
655         if (put_into_group(h->group, h->home_bucket, encode_offset(new_off,h))){
656                 return tdb_write_convert(tdb, h->group_start,
657                                          h->group, sizeof(h->group));
658         }
659
660         /* This can happen if all hashes in group (and us) dropped into same
661          * group in subhash. */
662         return add_to_hash(tdb, h, new_off);
663 }
664
665 /* Traverse support: returns offset of record, or 0 or -ve error. */
666 static tdb_off_t iterate_hash(struct tdb_context *tdb,
667                               struct traverse_info *tinfo)
668 {
669         tdb_off_t off, val, i;
670         struct traverse_level *tlevel;
671
672         tlevel = &tinfo->levels[tinfo->num_levels-1];
673
674 again:
675         for (i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
676                                       tlevel->entry, tlevel->total_buckets);
677              i != tlevel->total_buckets;
678              i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
679                                       i+1, tlevel->total_buckets)) {
680                 if (TDB_OFF_IS_ERR(i)) {
681                         return i;
682                 }
683
684                 val = tdb_read_off(tdb, tlevel->hashtable+sizeof(tdb_off_t)*i);
685                 if (TDB_OFF_IS_ERR(val)) {
686                         return val;
687                 }
688
689                 off = val & TDB_OFF_MASK;
690
691                 /* This makes the delete-all-in-traverse case work
692                  * (and simplifies our logic a little). */
693                 if (off == tinfo->prev)
694                         continue;
695
696                 tlevel->entry = i;
697
698                 if (!is_subhash(val)) {
699                         /* Found one. */
700                         tinfo->prev = off;
701                         return off;
702                 }
703
704                 /* When we come back, we want the next one */
705                 tlevel->entry++;
706                 tinfo->num_levels++;
707                 tlevel++;
708                 tlevel->hashtable = off + sizeof(struct tdb_used_record);
709                 tlevel->entry = 0;
710                 /* Next level is a chain? */
711                 if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1))
712                         tlevel->total_buckets = (1 << TDB_HASH_GROUP_BITS);
713                 else
714                         tlevel->total_buckets = (1 << TDB_SUBLEVEL_HASH_BITS);
715                 goto again;
716         }
717
718         /* Nothing there? */
719         if (tinfo->num_levels == 1)
720                 return 0;
721
722         /* Handle chained entries. */
723         if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1)) {
724                 tlevel->hashtable = tdb_read_off(tdb, tlevel->hashtable
725                                                  + offsetof(struct tdb_chain,
726                                                             next));
727                 if (TDB_OFF_IS_ERR(tlevel->hashtable)) {
728                         return tlevel->hashtable;
729                 }
730                 if (tlevel->hashtable) {
731                         tlevel->hashtable += sizeof(struct tdb_used_record);
732                         tlevel->entry = 0;
733                         goto again;
734                 }
735         }
736
737         /* Go back up and keep searching. */
738         tinfo->num_levels--;
739         tlevel--;
740         goto again;
741 }
742
743 /* Return success if we find something, TDB_ERR_NOEXIST if none. */
744 enum TDB_ERROR next_in_hash(struct tdb_context *tdb,
745                             struct traverse_info *tinfo,
746                             TDB_DATA *kbuf, size_t *dlen)
747 {
748         const unsigned group_bits = TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS;
749         tdb_off_t hl_start, hl_range, off;
750         enum TDB_ERROR ecode;
751
752         while (tinfo->toplevel_group < (1 << group_bits)) {
753                 hl_start = (tdb_off_t)tinfo->toplevel_group
754                         << (64 - group_bits);
755                 hl_range = 1ULL << group_bits;
756                 ecode = tdb_lock_hashes(tdb, hl_start, hl_range, F_RDLCK,
757                                         TDB_LOCK_WAIT);
758                 if (ecode != TDB_SUCCESS) {
759                         return ecode;
760                 }
761
762                 off = iterate_hash(tdb, tinfo);
763                 if (off) {
764                         struct tdb_used_record rec;
765
766                         if (TDB_OFF_IS_ERR(off)) {
767                                 ecode = off;
768                                 goto fail;
769                         }
770
771                         ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
772                         if (ecode != TDB_SUCCESS) {
773                                 goto fail;
774                         }
775                         if (rec_magic(&rec) != TDB_USED_MAGIC) {
776                                 ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT,
777                                                    TDB_LOG_ERROR,
778                                                    "next_in_hash:"
779                                                    " corrupt record at %llu",
780                                                    (long long)off);
781                                 goto fail;
782                         }
783
784                         kbuf->dsize = rec_key_length(&rec);
785
786                         /* They want data as well? */
787                         if (dlen) {
788                                 *dlen = rec_data_length(&rec);
789                                 kbuf->dptr = tdb_alloc_read(tdb,
790                                                             off + sizeof(rec),
791                                                             kbuf->dsize
792                                                             + *dlen);
793                         } else {
794                                 kbuf->dptr = tdb_alloc_read(tdb,
795                                                             off + sizeof(rec),
796                                                             kbuf->dsize);
797                         }
798                         tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
799                         if (TDB_PTR_IS_ERR(kbuf->dptr)) {
800                                 return TDB_PTR_ERR(kbuf->dptr);
801                         }
802                         return TDB_SUCCESS;
803                 }
804
805                 tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
806
807                 tinfo->toplevel_group++;
808                 tinfo->levels[0].hashtable
809                         += (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
810                 tinfo->levels[0].entry = 0;
811         }
812         return TDB_ERR_NOEXIST;
813
814 fail:
815         tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
816         return ecode;
817
818 }
819
820 enum TDB_ERROR first_in_hash(struct tdb_context *tdb,
821                              struct traverse_info *tinfo,
822                              TDB_DATA *kbuf, size_t *dlen)
823 {
824         tinfo->prev = 0;
825         tinfo->toplevel_group = 0;
826         tinfo->num_levels = 1;
827         tinfo->levels[0].hashtable = offsetof(struct tdb_header, hashtable);
828         tinfo->levels[0].entry = 0;
829         tinfo->levels[0].total_buckets = (1 << TDB_HASH_GROUP_BITS);
830
831         return next_in_hash(tdb, tinfo, kbuf, dlen);
832 }
833
834 /* Even if the entry isn't in this hash bucket, you'd have to lock this
835  * bucket to find it. */
836 static enum TDB_ERROR chainlock(struct tdb_context *tdb, const TDB_DATA *key,
837                                 int ltype, enum tdb_lock_flags waitflag,
838                                 const char *func)
839 {
840         enum TDB_ERROR ecode;
841         uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
842         tdb_off_t lockstart, locksize;
843         unsigned int group, gbits;
844
845         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
846         group = bits_from(h, 64 - gbits, gbits);
847
848         lockstart = hlock_range(group, &locksize);
849
850         ecode = tdb_lock_hashes(tdb, lockstart, locksize, ltype, waitflag);
851         tdb_trace_1rec(tdb, func, *key);
852         return ecode;
853 }
854
855 /* lock/unlock one hash chain. This is meant to be used to reduce
856    contention - it cannot guarantee how many records will be locked */
857 enum TDB_ERROR tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
858 {
859         return tdb->last_error = chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT,
860                                            "tdb_chainlock");
861 }
862
863 void tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
864 {
865         uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
866         tdb_off_t lockstart, locksize;
867         unsigned int group, gbits;
868
869         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
870         group = bits_from(h, 64 - gbits, gbits);
871
872         lockstart = hlock_range(group, &locksize);
873
874         tdb_trace_1rec(tdb, "tdb_chainunlock", key);
875         tdb_unlock_hashes(tdb, lockstart, locksize, F_WRLCK);
876 }
877
878 enum TDB_ERROR tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
879 {
880         return tdb->last_error = chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
881                                            "tdb_chainlock_read");
882 }
883
884 void tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
885 {
886         uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
887         tdb_off_t lockstart, locksize;
888         unsigned int group, gbits;
889
890         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
891         group = bits_from(h, 64 - gbits, gbits);
892
893         lockstart = hlock_range(group, &locksize);
894
895         tdb_trace_1rec(tdb, "tdb_chainunlock_read", key);
896         tdb_unlock_hashes(tdb, lockstart, locksize, F_RDLCK);
897 }