]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/hash.c
tdb2: rework free.c functions to return enum TDB_ERROR.
[ccan] / ccan / tdb2 / hash.c
1  /*
2    Trivial Database 2: hash handling
3    Copyright (C) Rusty Russell 2010
4
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 3 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "private.h"
19 #include <assert.h>
20 #include <ccan/hash/hash.h>
21
22 static uint64_t jenkins_hash(const void *key, size_t length, uint64_t seed,
23                              void *arg)
24 {
25         uint64_t ret;
26         /* hash64_stable assumes lower bits are more important; they are a
27          * slightly better hash.  We use the upper bits first, so swap them. */
28         ret = hash64_stable((const unsigned char *)key, length, seed);
29         return (ret >> 32) | (ret << 32);
30 }
31
32 void tdb_hash_init(struct tdb_context *tdb)
33 {
34         tdb->khash = jenkins_hash;
35         tdb->hash_priv = NULL;
36 }
37
38 uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len)
39 {
40         return tdb->khash(ptr, len, tdb->hash_seed, tdb->hash_priv);
41 }
42
43 uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
44 {
45         const struct tdb_used_record *r;
46         const void *key;
47         uint64_t klen, hash;
48
49         r = tdb_access_read(tdb, off, sizeof(*r), true);
50         if (TDB_PTR_IS_ERR(r)) {
51                 tdb->ecode = TDB_PTR_ERR(r);
52                 /* FIXME */
53                 return 0;
54         }
55
56         klen = rec_key_length(r);
57         tdb_access_release(tdb, r);
58
59         key = tdb_access_read(tdb, off + sizeof(*r), klen, false);
60         if (TDB_PTR_IS_ERR(key)) {
61                 tdb->ecode = TDB_PTR_ERR(key);
62                 return 0;
63         }
64
65         hash = tdb_hash(tdb, key, klen);
66         tdb_access_release(tdb, key);
67         return hash;
68 }
69
70 /* Get bits from a value. */
71 static uint32_t bits_from(uint64_t val, unsigned start, unsigned num)
72 {
73         assert(num <= 32);
74         return (val >> start) & ((1U << num) - 1);
75 }
76
77 /* We take bits from the top: that way we can lock whole sections of the hash
78  * by using lock ranges. */
79 static uint32_t use_bits(struct hash_info *h, unsigned num)
80 {
81         h->hash_used += num;
82         return bits_from(h->h, 64 - h->hash_used, num);
83 }
84
85 static tdb_bool_err key_matches(struct tdb_context *tdb,
86                                 const struct tdb_used_record *rec,
87                                 tdb_off_t off,
88                                 const struct tdb_data *key)
89 {
90         tdb_bool_err ret = false;
91         const char *rkey;
92
93         if (rec_key_length(rec) != key->dsize) {
94                 add_stat(tdb, compare_wrong_keylen, 1);
95                 return ret;
96         }
97
98         rkey = tdb_access_read(tdb, off + sizeof(*rec), key->dsize, false);
99         if (TDB_PTR_IS_ERR(rkey)) {
100                 return TDB_PTR_ERR(rkey);
101         }
102         if (memcmp(rkey, key->dptr, key->dsize) == 0)
103                 ret = true;
104         else
105                 add_stat(tdb, compare_wrong_keycmp, 1);
106         tdb_access_release(tdb, rkey);
107         return ret;
108 }
109
110 /* Does entry match? */
111 static tdb_bool_err match(struct tdb_context *tdb,
112                           struct hash_info *h,
113                           const struct tdb_data *key,
114                           tdb_off_t val,
115                           struct tdb_used_record *rec)
116 {
117         tdb_off_t off;
118         enum TDB_ERROR ecode;
119
120         add_stat(tdb, compares, 1);
121         /* Desired bucket must match. */
122         if (h->home_bucket != (val & TDB_OFF_HASH_GROUP_MASK)) {
123                 add_stat(tdb, compare_wrong_bucket, 1);
124                 return false;
125         }
126
127         /* Top bits of offset == next bits of hash. */
128         if (bits_from(val, TDB_OFF_HASH_EXTRA_BIT, TDB_OFF_UPPER_STEAL_EXTRA)
129             != bits_from(h->h, 64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
130                     TDB_OFF_UPPER_STEAL_EXTRA)) {
131                 add_stat(tdb, compare_wrong_offsetbits, 1);
132                 return false;
133         }
134
135         off = val & TDB_OFF_MASK;
136         ecode = tdb_read_convert(tdb, off, rec, sizeof(*rec));
137         if (ecode != TDB_SUCCESS) {
138                 return ecode;
139         }
140
141         if ((h->h & ((1 << 11)-1)) != rec_hash(rec)) {
142                 add_stat(tdb, compare_wrong_rechash, 1);
143                 return false;
144         }
145
146         return key_matches(tdb, rec, off, key);
147 }
148
149 static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned bucket)
150 {
151         return group_start
152                 + (bucket % (1 << TDB_HASH_GROUP_BITS)) * sizeof(tdb_off_t);
153 }
154
155 bool is_subhash(tdb_off_t val)
156 {
157         return (val >> TDB_OFF_UPPER_STEAL_SUBHASH_BIT) & 1;
158 }
159
160 /* FIXME: Guess the depth, don't over-lock! */
161 static tdb_off_t hlock_range(tdb_off_t group, tdb_off_t *size)
162 {
163         *size = 1ULL << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
164         return group << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
165 }
166
167 static tdb_off_t COLD find_in_chain(struct tdb_context *tdb,
168                                     struct tdb_data key,
169                                     tdb_off_t chain,
170                                     struct hash_info *h,
171                                     struct tdb_used_record *rec,
172                                     struct traverse_info *tinfo)
173 {
174         tdb_off_t off, next;
175         enum TDB_ERROR ecode;
176
177         /* In case nothing is free, we set these to zero. */
178         h->home_bucket = h->found_bucket = 0;
179
180         for (off = chain; off; off = next) {
181                 unsigned int i;
182
183                 h->group_start = off;
184                 ecode = tdb_read_convert(tdb, off, h->group, sizeof(h->group));
185                 if (ecode != TDB_SUCCESS) {
186                         return ecode;
187                 }
188
189                 for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
190                         tdb_off_t recoff;
191                         if (!h->group[i]) {
192                                 /* Remember this empty bucket. */
193                                 h->home_bucket = h->found_bucket = i;
194                                 continue;
195                         }
196
197                         /* We can insert extra bits via add_to_hash
198                          * empty bucket logic. */
199                         recoff = h->group[i] & TDB_OFF_MASK;
200                         ecode = tdb_read_convert(tdb, recoff, rec,
201                                                  sizeof(*rec));
202                         if (ecode != TDB_SUCCESS) {
203                                 return ecode;
204                         }
205
206                         ecode = key_matches(tdb, rec, recoff, &key);
207                         if (ecode < 0) {
208                                 return ecode;
209                         }
210                         if (ecode == 1) {
211                                 h->home_bucket = h->found_bucket = i;
212
213                                 if (tinfo) {
214                                         tinfo->levels[tinfo->num_levels]
215                                                 .hashtable = off;
216                                         tinfo->levels[tinfo->num_levels]
217                                                 .total_buckets
218                                                 = 1 << TDB_HASH_GROUP_BITS;
219                                         tinfo->levels[tinfo->num_levels].entry
220                                                 = i;
221                                         tinfo->num_levels++;
222                                 }
223                                 return recoff;
224                         }
225                 }
226                 next = tdb_read_off(tdb, off
227                                     + offsetof(struct tdb_chain, next));
228                 if (TDB_OFF_IS_ERR(next)) {
229                         return next;
230                 }
231                 if (next)
232                         next += sizeof(struct tdb_used_record);
233         }
234         return 0;
235 }
236
237 /* This is the core routine which searches the hashtable for an entry.
238  * On error, no locks are held and -ve is returned.
239  * Otherwise, hinfo is filled in (and the optional tinfo).
240  * If not found, the return value is 0.
241  * If found, the return value is the offset, and *rec is the record. */
242 tdb_off_t find_and_lock(struct tdb_context *tdb,
243                         struct tdb_data key,
244                         int ltype,
245                         struct hash_info *h,
246                         struct tdb_used_record *rec,
247                         struct traverse_info *tinfo)
248 {
249         uint32_t i, group;
250         tdb_off_t hashtable;
251         enum TDB_ERROR ecode;
252
253         h->h = tdb_hash(tdb, key.dptr, key.dsize);
254         h->hash_used = 0;
255         group = use_bits(h, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
256         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
257
258         h->hlock_start = hlock_range(group, &h->hlock_range);
259         ecode = tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
260                                 TDB_LOCK_WAIT);
261         if (ecode != TDB_SUCCESS) {
262                 return ecode;
263         }
264
265         hashtable = offsetof(struct tdb_header, hashtable);
266         if (tinfo) {
267                 tinfo->toplevel_group = group;
268                 tinfo->num_levels = 1;
269                 tinfo->levels[0].entry = 0;
270                 tinfo->levels[0].hashtable = hashtable
271                         + (group << TDB_HASH_GROUP_BITS) * sizeof(tdb_off_t);
272                 tinfo->levels[0].total_buckets = 1 << TDB_HASH_GROUP_BITS;
273         }
274
275         while (h->hash_used <= 64) {
276                 /* Read in the hash group. */
277                 h->group_start = hashtable
278                         + group * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
279
280                 ecode = tdb_read_convert(tdb, h->group_start, &h->group,
281                                          sizeof(h->group));
282                 if (ecode != TDB_SUCCESS) {
283                         goto fail;
284                 }
285
286                 /* Pointer to another hash table?  Go down... */
287                 if (is_subhash(h->group[h->home_bucket])) {
288                         hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
289                                 + sizeof(struct tdb_used_record);
290                         if (tinfo) {
291                                 /* When we come back, use *next* bucket */
292                                 tinfo->levels[tinfo->num_levels-1].entry
293                                         += h->home_bucket + 1;
294                         }
295                         group = use_bits(h, TDB_SUBLEVEL_HASH_BITS
296                                          - TDB_HASH_GROUP_BITS);
297                         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
298                         if (tinfo) {
299                                 tinfo->levels[tinfo->num_levels].hashtable
300                                         = hashtable;
301                                 tinfo->levels[tinfo->num_levels].total_buckets
302                                         = 1 << TDB_SUBLEVEL_HASH_BITS;
303                                 tinfo->levels[tinfo->num_levels].entry
304                                         = group << TDB_HASH_GROUP_BITS;
305                                 tinfo->num_levels++;
306                         }
307                         continue;
308                 }
309
310                 /* It's in this group: search (until 0 or all searched) */
311                 for (i = 0, h->found_bucket = h->home_bucket;
312                      i < (1 << TDB_HASH_GROUP_BITS);
313                      i++, h->found_bucket = ((h->found_bucket+1)
314                                              % (1 << TDB_HASH_GROUP_BITS))) {
315                         tdb_bool_err berr;
316                         if (is_subhash(h->group[h->found_bucket]))
317                                 continue;
318
319                         if (!h->group[h->found_bucket])
320                                 break;
321
322                         berr = match(tdb, h, &key, h->group[h->found_bucket],
323                                      rec);
324                         if (berr < 0) {
325                                 ecode = berr;
326                                 goto fail;
327                         }
328                         if (berr) {
329                                 if (tinfo) {
330                                         tinfo->levels[tinfo->num_levels-1].entry
331                                                 += h->found_bucket;
332                                 }
333                                 return h->group[h->found_bucket] & TDB_OFF_MASK;
334                         }
335                 }
336                 /* Didn't find it: h indicates where it would go. */
337                 return 0;
338         }
339
340         return find_in_chain(tdb, key, hashtable, h, rec, tinfo);
341
342 fail:
343         tdb_unlock_hashes(tdb, h->hlock_start, h->hlock_range, ltype);
344         return ecode;
345 }
346
347 /* I wrote a simple test, expanding a hash to 2GB, for the following
348  * cases:
349  * 1) Expanding all the buckets at once,
350  * 2) Expanding the bucket we wanted to place the new entry into.
351  * 3) Expanding the most-populated bucket,
352  *
353  * I measured the worst/average/best density during this process.
354  * 1) 3%/16%/30%
355  * 2) 4%/20%/38%
356  * 3) 6%/22%/41%
357  *
358  * So we figure out the busiest bucket for the moment.
359  */
360 static unsigned fullest_bucket(struct tdb_context *tdb,
361                                const tdb_off_t *group,
362                                unsigned new_bucket)
363 {
364         unsigned counts[1 << TDB_HASH_GROUP_BITS] = { 0 };
365         unsigned int i, best_bucket;
366
367         /* Count the new entry. */
368         counts[new_bucket]++;
369         best_bucket = new_bucket;
370
371         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
372                 unsigned this_bucket;
373
374                 if (is_subhash(group[i]))
375                         continue;
376                 this_bucket = group[i] & TDB_OFF_HASH_GROUP_MASK;
377                 if (++counts[this_bucket] > counts[best_bucket])
378                         best_bucket = this_bucket;
379         }
380
381         return best_bucket;
382 }
383
384 static bool put_into_group(tdb_off_t *group,
385                            unsigned bucket, tdb_off_t encoded)
386 {
387         unsigned int i;
388
389         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
390                 unsigned b = (bucket + i) % (1 << TDB_HASH_GROUP_BITS);
391
392                 if (group[b] == 0) {
393                         group[b] = encoded;
394                         return true;
395                 }
396         }
397         return false;
398 }
399
400 static void force_into_group(tdb_off_t *group,
401                              unsigned bucket, tdb_off_t encoded)
402 {
403         if (!put_into_group(group, bucket, encoded))
404                 abort();
405 }
406
407 static tdb_off_t encode_offset(tdb_off_t new_off, struct hash_info *h)
408 {
409         return h->home_bucket
410                 | new_off
411                 | ((uint64_t)bits_from(h->h,
412                                   64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
413                                   TDB_OFF_UPPER_STEAL_EXTRA)
414                    << TDB_OFF_HASH_EXTRA_BIT);
415 }
416
417 /* Simply overwrite the hash entry we found before. */
418 enum TDB_ERROR replace_in_hash(struct tdb_context *tdb,
419                                struct hash_info *h,
420                                tdb_off_t new_off)
421 {
422         return tdb_write_off(tdb, hbucket_off(h->group_start, h->found_bucket),
423                              encode_offset(new_off, h));
424 }
425
426 /* We slot in anywhere that's empty in the chain. */
427 static enum TDB_ERROR COLD add_to_chain(struct tdb_context *tdb,
428                                         tdb_off_t subhash,
429                                         tdb_off_t new_off)
430 {
431         tdb_off_t entry;
432         enum TDB_ERROR ecode;
433
434         entry = tdb_find_zero_off(tdb, subhash, 1<<TDB_HASH_GROUP_BITS);
435         if (TDB_OFF_IS_ERR(entry)) {
436                 return entry;
437         }
438
439         if (entry == 1 << TDB_HASH_GROUP_BITS) {
440                 tdb_off_t next;
441
442                 next = tdb_read_off(tdb, subhash
443                                     + offsetof(struct tdb_chain, next));
444                 if (TDB_OFF_IS_ERR(next)) {
445                         return next;
446                 }
447
448                 if (!next) {
449                         next = alloc(tdb, 0, sizeof(struct tdb_chain), 0,
450                                      TDB_CHAIN_MAGIC, false);
451                         if (TDB_OFF_IS_ERR(next))
452                                 return next;
453                         ecode = zero_out(tdb,
454                                          next+sizeof(struct tdb_used_record),
455                                          sizeof(struct tdb_chain));
456                         if (ecode != TDB_SUCCESS) {
457                                 return ecode;
458                         }
459                         ecode = tdb_write_off(tdb, subhash
460                                               + offsetof(struct tdb_chain,
461                                                          next),
462                                               next);
463                         if (ecode != TDB_SUCCESS) {
464                                 return ecode;
465                         }
466                 }
467                 return add_to_chain(tdb, next, new_off);
468         }
469
470         return tdb_write_off(tdb, subhash + entry * sizeof(tdb_off_t),
471                              new_off);
472 }
473
474 /* Add into a newly created subhash. */
475 static enum TDB_ERROR add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash,
476                                      unsigned hash_used, tdb_off_t val)
477 {
478         tdb_off_t off = (val & TDB_OFF_MASK), *group;
479         struct hash_info h;
480         unsigned int gnum;
481
482         h.hash_used = hash_used;
483
484         if (hash_used + TDB_SUBLEVEL_HASH_BITS > 64)
485                 return add_to_chain(tdb, subhash, off);
486
487         h.h = hash_record(tdb, off);
488         gnum = use_bits(&h, TDB_SUBLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS);
489         h.group_start = subhash
490                 + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
491         h.home_bucket = use_bits(&h, TDB_HASH_GROUP_BITS);
492
493         group = tdb_access_write(tdb, h.group_start,
494                                  sizeof(*group) << TDB_HASH_GROUP_BITS, true);
495         if (TDB_PTR_IS_ERR(group)) {
496                 return TDB_PTR_ERR(group);
497         }
498         force_into_group(group, h.home_bucket, encode_offset(off, &h));
499         return tdb_access_commit(tdb, group);
500 }
501
502 static enum TDB_ERROR expand_group(struct tdb_context *tdb, struct hash_info *h)
503 {
504         unsigned bucket, num_vals, i, magic;
505         size_t subsize;
506         tdb_off_t subhash;
507         tdb_off_t vals[1 << TDB_HASH_GROUP_BITS];
508         enum TDB_ERROR ecode;
509
510         /* Attach new empty subhash under fullest bucket. */
511         bucket = fullest_bucket(tdb, h->group, h->home_bucket);
512
513         if (h->hash_used == 64) {
514                 add_stat(tdb, alloc_chain, 1);
515                 subsize = sizeof(struct tdb_chain);
516                 magic = TDB_CHAIN_MAGIC;
517         } else {
518                 add_stat(tdb, alloc_subhash, 1);
519                 subsize = (sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS);
520                 magic = TDB_HTABLE_MAGIC;
521         }
522
523         subhash = alloc(tdb, 0, subsize, 0, magic, false);
524         if (TDB_OFF_IS_ERR(subhash)) {
525                 return subhash;
526         }
527
528         ecode = zero_out(tdb, subhash + sizeof(struct tdb_used_record),
529                          subsize);
530         if (ecode != TDB_SUCCESS) {
531                 return ecode;
532         }
533
534         /* Remove any which are destined for bucket or are in wrong place. */
535         num_vals = 0;
536         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
537                 unsigned home_bucket = h->group[i] & TDB_OFF_HASH_GROUP_MASK;
538                 if (!h->group[i] || is_subhash(h->group[i]))
539                         continue;
540                 if (home_bucket == bucket || home_bucket != i) {
541                         vals[num_vals++] = h->group[i];
542                         h->group[i] = 0;
543                 }
544         }
545         /* FIXME: This assert is valid, but we do this during unit test :( */
546         /* assert(num_vals); */
547
548         /* Overwrite expanded bucket with subhash pointer. */
549         h->group[bucket] = subhash | (1ULL << TDB_OFF_UPPER_STEAL_SUBHASH_BIT);
550
551         /* Point to actual contents of record. */
552         subhash += sizeof(struct tdb_used_record);
553
554         /* Put values back. */
555         for (i = 0; i < num_vals; i++) {
556                 unsigned this_bucket = vals[i] & TDB_OFF_HASH_GROUP_MASK;
557
558                 if (this_bucket == bucket) {
559                         ecode = add_to_subhash(tdb, subhash, h->hash_used,
560                                                vals[i]);
561                         if (ecode != TDB_SUCCESS)
562                                 return ecode;
563                 } else {
564                         /* There should be room to put this back. */
565                         force_into_group(h->group, this_bucket, vals[i]);
566                 }
567         }
568         return TDB_SUCCESS;
569 }
570
571 enum TDB_ERROR delete_from_hash(struct tdb_context *tdb, struct hash_info *h)
572 {
573         unsigned int i, num_movers = 0;
574         tdb_off_t movers[1 << TDB_HASH_GROUP_BITS];
575
576         h->group[h->found_bucket] = 0;
577         for (i = 1; i < (1 << TDB_HASH_GROUP_BITS); i++) {
578                 unsigned this_bucket;
579
580                 this_bucket = (h->found_bucket+i) % (1 << TDB_HASH_GROUP_BITS);
581                 /* Empty bucket?  We're done. */
582                 if (!h->group[this_bucket])
583                         break;
584
585                 /* Ignore subhashes. */
586                 if (is_subhash(h->group[this_bucket]))
587                         continue;
588
589                 /* If this one is not happy where it is, we'll move it. */
590                 if ((h->group[this_bucket] & TDB_OFF_HASH_GROUP_MASK)
591                     != this_bucket) {
592                         movers[num_movers++] = h->group[this_bucket];
593                         h->group[this_bucket] = 0;
594                 }
595         }
596
597         /* Put back the ones we erased. */
598         for (i = 0; i < num_movers; i++) {
599                 force_into_group(h->group, movers[i] & TDB_OFF_HASH_GROUP_MASK,
600                                  movers[i]);
601         }
602
603         /* Now we write back the hash group */
604         return tdb_write_convert(tdb, h->group_start,
605                                  h->group, sizeof(h->group));
606 }
607
608 enum TDB_ERROR add_to_hash(struct tdb_context *tdb, struct hash_info *h,
609                            tdb_off_t new_off)
610 {
611         enum TDB_ERROR ecode;
612
613         /* We hit an empty bucket during search?  That's where it goes. */
614         if (!h->group[h->found_bucket]) {
615                 h->group[h->found_bucket] = encode_offset(new_off, h);
616                 /* Write back the modified group. */
617                 return tdb_write_convert(tdb, h->group_start,
618                                          h->group, sizeof(h->group));
619         }
620
621         if (h->hash_used > 64)
622                 return add_to_chain(tdb, h->group_start, new_off);
623
624         /* We're full.  Expand. */
625         ecode = expand_group(tdb, h);
626         if (ecode != TDB_SUCCESS) {
627                 return ecode;
628         }
629
630         if (is_subhash(h->group[h->home_bucket])) {
631                 /* We were expanded! */
632                 tdb_off_t hashtable;
633                 unsigned int gnum;
634
635                 /* Write back the modified group. */
636                 ecode = tdb_write_convert(tdb, h->group_start, h->group,
637                                           sizeof(h->group));
638                 if (ecode != TDB_SUCCESS) {
639                         return ecode;
640                 }
641
642                 /* Move hashinfo down a level. */
643                 hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
644                         + sizeof(struct tdb_used_record);
645                 gnum = use_bits(h,TDB_SUBLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
646                 h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
647                 h->group_start = hashtable
648                         + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
649                 ecode = tdb_read_convert(tdb, h->group_start, &h->group,
650                                          sizeof(h->group));
651                 if (ecode != TDB_SUCCESS) {
652                         return ecode;
653                 }
654         }
655
656         /* Expanding the group must have made room if it didn't choose this
657          * bucket. */
658         if (put_into_group(h->group, h->home_bucket, encode_offset(new_off,h))){
659                 return tdb_write_convert(tdb, h->group_start,
660                                          h->group, sizeof(h->group));
661         }
662
663         /* This can happen if all hashes in group (and us) dropped into same
664          * group in subhash. */
665         return add_to_hash(tdb, h, new_off);
666 }
667
668 /* Traverse support: returns offset of record, or 0 or -ve error. */
669 static tdb_off_t iterate_hash(struct tdb_context *tdb,
670                               struct traverse_info *tinfo)
671 {
672         tdb_off_t off, val, i;
673         struct traverse_level *tlevel;
674
675         tlevel = &tinfo->levels[tinfo->num_levels-1];
676
677 again:
678         for (i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
679                                       tlevel->entry, tlevel->total_buckets);
680              i != tlevel->total_buckets;
681              i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
682                                       i+1, tlevel->total_buckets)) {
683                 if (TDB_OFF_IS_ERR(i)) {
684                         return i;
685                 }
686
687                 val = tdb_read_off(tdb, tlevel->hashtable+sizeof(tdb_off_t)*i);
688                 if (TDB_OFF_IS_ERR(val)) {
689                         return val;
690                 }
691
692                 off = val & TDB_OFF_MASK;
693
694                 /* This makes the delete-all-in-traverse case work
695                  * (and simplifies our logic a little). */
696                 if (off == tinfo->prev)
697                         continue;
698
699                 tlevel->entry = i;
700
701                 if (!is_subhash(val)) {
702                         /* Found one. */
703                         tinfo->prev = off;
704                         return off;
705                 }
706
707                 /* When we come back, we want the next one */
708                 tlevel->entry++;
709                 tinfo->num_levels++;
710                 tlevel++;
711                 tlevel->hashtable = off + sizeof(struct tdb_used_record);
712                 tlevel->entry = 0;
713                 /* Next level is a chain? */
714                 if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1))
715                         tlevel->total_buckets = (1 << TDB_HASH_GROUP_BITS);
716                 else
717                         tlevel->total_buckets = (1 << TDB_SUBLEVEL_HASH_BITS);
718                 goto again;
719         }
720
721         /* Nothing there? */
722         if (tinfo->num_levels == 1)
723                 return 0;
724
725         /* Handle chained entries. */
726         if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1)) {
727                 tlevel->hashtable = tdb_read_off(tdb, tlevel->hashtable
728                                                  + offsetof(struct tdb_chain,
729                                                             next));
730                 if (TDB_OFF_IS_ERR(tlevel->hashtable)) {
731                         return tlevel->hashtable;
732                 }
733                 if (tlevel->hashtable) {
734                         tlevel->hashtable += sizeof(struct tdb_used_record);
735                         tlevel->entry = 0;
736                         goto again;
737                 }
738         }
739
740         /* Go back up and keep searching. */
741         tinfo->num_levels--;
742         tlevel--;
743         goto again;
744 }
745
746 /* Return success if we find something, TDB_ERR_NOEXIST if none. */
747 enum TDB_ERROR next_in_hash(struct tdb_context *tdb,
748                             struct traverse_info *tinfo,
749                             TDB_DATA *kbuf, size_t *dlen)
750 {
751         const unsigned group_bits = TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS;
752         tdb_off_t hl_start, hl_range, off;
753         enum TDB_ERROR ecode;
754
755         while (tinfo->toplevel_group < (1 << group_bits)) {
756                 hl_start = (tdb_off_t)tinfo->toplevel_group
757                         << (64 - group_bits);
758                 hl_range = 1ULL << group_bits;
759                 ecode = tdb_lock_hashes(tdb, hl_start, hl_range, F_RDLCK,
760                                         TDB_LOCK_WAIT);
761                 if (ecode != TDB_SUCCESS) {
762                         return ecode;
763                 }
764
765                 off = iterate_hash(tdb, tinfo);
766                 if (off) {
767                         struct tdb_used_record rec;
768
769                         if (TDB_OFF_IS_ERR(off)) {
770                                 ecode = off;
771                                 goto fail;
772                         }
773
774                         ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
775                         if (ecode != TDB_SUCCESS) {
776                                 goto fail;
777                         }
778                         if (rec_magic(&rec) != TDB_USED_MAGIC) {
779                                 ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT,
780                                                    TDB_LOG_ERROR,
781                                                    "next_in_hash:"
782                                                    " corrupt record at %llu",
783                                                    (long long)off);
784                                 goto fail;
785                         }
786
787                         kbuf->dsize = rec_key_length(&rec);
788
789                         /* They want data as well? */
790                         if (dlen) {
791                                 *dlen = rec_data_length(&rec);
792                                 kbuf->dptr = tdb_alloc_read(tdb,
793                                                             off + sizeof(rec),
794                                                             kbuf->dsize
795                                                             + *dlen);
796                         } else {
797                                 kbuf->dptr = tdb_alloc_read(tdb,
798                                                             off + sizeof(rec),
799                                                             kbuf->dsize);
800                         }
801                         tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
802                         if (TDB_PTR_IS_ERR(kbuf->dptr)) {
803                                 return TDB_PTR_ERR(kbuf->dptr);
804                         }
805                         return TDB_SUCCESS;
806                 }
807
808                 tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
809
810                 tinfo->toplevel_group++;
811                 tinfo->levels[0].hashtable
812                         += (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
813                 tinfo->levels[0].entry = 0;
814         }
815         return TDB_ERR_NOEXIST;
816
817 fail:
818         tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
819         return ecode;
820
821 }
822
823 enum TDB_ERROR first_in_hash(struct tdb_context *tdb,
824                              struct traverse_info *tinfo,
825                              TDB_DATA *kbuf, size_t *dlen)
826 {
827         tinfo->prev = 0;
828         tinfo->toplevel_group = 0;
829         tinfo->num_levels = 1;
830         tinfo->levels[0].hashtable = offsetof(struct tdb_header, hashtable);
831         tinfo->levels[0].entry = 0;
832         tinfo->levels[0].total_buckets = (1 << TDB_HASH_GROUP_BITS);
833
834         return next_in_hash(tdb, tinfo, kbuf, dlen);
835 }
836
837 /* Even if the entry isn't in this hash bucket, you'd have to lock this
838  * bucket to find it. */
839 static enum TDB_ERROR chainlock(struct tdb_context *tdb, const TDB_DATA *key,
840                                 int ltype, enum tdb_lock_flags waitflag,
841                                 const char *func)
842 {
843         enum TDB_ERROR ecode;
844         uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
845         tdb_off_t lockstart, locksize;
846         unsigned int group, gbits;
847
848         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
849         group = bits_from(h, 64 - gbits, gbits);
850
851         lockstart = hlock_range(group, &locksize);
852
853         ecode = tdb_lock_hashes(tdb, lockstart, locksize, ltype, waitflag);
854         tdb_trace_1rec(tdb, func, *key);
855         return ecode;
856 }
857
858 /* lock/unlock one hash chain. This is meant to be used to reduce
859    contention - it cannot guarantee how many records will be locked */
860 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
861 {
862         tdb->ecode = chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT,
863                                "tdb_chainlock");
864         if (tdb->ecode == TDB_SUCCESS)
865                 return 0;
866         return -1;
867         
868 }
869
870 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
871 {
872         uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
873         tdb_off_t lockstart, locksize;
874         unsigned int group, gbits;
875         enum TDB_ERROR ecode;
876
877         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
878         group = bits_from(h, 64 - gbits, gbits);
879
880         lockstart = hlock_range(group, &locksize);
881
882         tdb_trace_1rec(tdb, "tdb_chainunlock", key);
883         ecode = tdb_unlock_hashes(tdb, lockstart, locksize, F_WRLCK);
884         if (ecode != TDB_SUCCESS) {
885                 tdb->ecode = ecode;
886                 return -1;
887         }
888         return 0;
889 }