]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/hash.c
a52ae4674f529b17daf9c1cdd36c9fed113dbc15
[ccan] / ccan / tdb2 / hash.c
1  /*
2    Trivial Database 2: hash handling
3    Copyright (C) Rusty Russell 2010
4
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 3 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "private.h"
19 #include <assert.h>
20 #include <ccan/hash/hash.h>
21
22 static uint64_t jenkins_hash(const void *key, size_t length, uint64_t seed,
23                              void *arg)
24 {
25         uint64_t ret;
26         /* hash64_stable assumes lower bits are more important; they are a
27          * slightly better hash.  We use the upper bits first, so swap them. */
28         ret = hash64_stable((const unsigned char *)key, length, seed);
29         return (ret >> 32) | (ret << 32);
30 }
31
32 void tdb_hash_init(struct tdb_context *tdb)
33 {
34         tdb->khash = jenkins_hash;
35         tdb->hash_priv = NULL;
36 }
37
38 uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len)
39 {
40         return tdb->khash(ptr, len, tdb->hash_seed, tdb->hash_priv);
41 }
42
43 uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
44 {
45         const struct tdb_used_record *r;
46         const void *key;
47         uint64_t klen, hash;
48
49         r = tdb_access_read(tdb, off, sizeof(*r), true);
50         if (!r)
51                 /* FIXME */
52                 return 0;
53
54         klen = rec_key_length(r);
55         tdb_access_release(tdb, r);
56
57         key = tdb_access_read(tdb, off + sizeof(*r), klen, false);
58         if (!key)
59                 return 0;
60
61         hash = tdb_hash(tdb, key, klen);
62         tdb_access_release(tdb, key);
63         return hash;
64 }
65
66 /* Get bits from a value. */
67 static uint32_t bits_from(uint64_t val, unsigned start, unsigned num)
68 {
69         assert(num <= 32);
70         return (val >> start) & ((1U << num) - 1);
71 }
72
73 /* We take bits from the top: that way we can lock whole sections of the hash
74  * by using lock ranges. */
75 static uint32_t use_bits(struct hash_info *h, unsigned num)
76 {
77         h->hash_used += num;
78         return bits_from(h->h, 64 - h->hash_used, num);
79 }
80
81 static bool key_matches(struct tdb_context *tdb,
82                         const struct tdb_used_record *rec,
83                         tdb_off_t off,
84                         const struct tdb_data *key)
85 {
86         bool ret = false;
87         const char *rkey;
88
89         if (rec_key_length(rec) != key->dsize) {
90                 add_stat(tdb, compare_wrong_keylen, 1);
91                 return ret;
92         }
93
94         rkey = tdb_access_read(tdb, off + sizeof(*rec), key->dsize, false);
95         if (!rkey)
96                 return ret;
97         if (memcmp(rkey, key->dptr, key->dsize) == 0)
98                 ret = true;
99         else
100                 add_stat(tdb, compare_wrong_keycmp, 1);
101         tdb_access_release(tdb, rkey);
102         return ret;
103 }
104
105 /* Does entry match? */
106 static bool match(struct tdb_context *tdb,
107                   struct hash_info *h,
108                   const struct tdb_data *key,
109                   tdb_off_t val,
110                   struct tdb_used_record *rec)
111 {
112         tdb_off_t off;
113
114         add_stat(tdb, compares, 1);
115         /* Desired bucket must match. */
116         if (h->home_bucket != (val & TDB_OFF_HASH_GROUP_MASK)) {
117                 add_stat(tdb, compare_wrong_bucket, 1);
118                 return false;
119         }
120
121         /* Top bits of offset == next bits of hash. */
122         if (bits_from(val, TDB_OFF_HASH_EXTRA_BIT, TDB_OFF_UPPER_STEAL_EXTRA)
123             != bits_from(h->h, 64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
124                     TDB_OFF_UPPER_STEAL_EXTRA)) {
125                 add_stat(tdb, compare_wrong_offsetbits, 1);
126                 return false;
127         }
128
129         off = val & TDB_OFF_MASK;
130         if (tdb_read_convert(tdb, off, rec, sizeof(*rec)) == -1)
131                 return false;
132
133         if ((h->h & ((1 << 11)-1)) != rec_hash(rec)) {
134                 add_stat(tdb, compare_wrong_rechash, 1);
135                 return false;
136         }
137
138         return key_matches(tdb, rec, off, key);
139 }
140
141 static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned bucket)
142 {
143         return group_start
144                 + (bucket % (1 << TDB_HASH_GROUP_BITS)) * sizeof(tdb_off_t);
145 }
146
147 bool is_subhash(tdb_off_t val)
148 {
149         return (val >> TDB_OFF_UPPER_STEAL_SUBHASH_BIT) & 1;
150 }
151
152 /* FIXME: Guess the depth, don't over-lock! */
153 static tdb_off_t hlock_range(tdb_off_t group, tdb_off_t *size)
154 {
155         *size = 1ULL << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
156         return group << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
157 }
158
159 static tdb_off_t COLD find_in_chain(struct tdb_context *tdb,
160                                     struct tdb_data key,
161                                     tdb_off_t chain,
162                                     struct hash_info *h,
163                                     struct tdb_used_record *rec,
164                                     struct traverse_info *tinfo)
165 {
166         tdb_off_t off, next;
167
168         /* In case nothing is free, we set these to zero. */
169         h->home_bucket = h->found_bucket = 0;
170
171         for (off = chain; off; off = next) {
172                 unsigned int i;
173
174                 h->group_start = off;
175                 if (tdb_read_convert(tdb, off, h->group, sizeof(h->group)))
176                         return TDB_OFF_ERR;
177
178                 for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
179                         tdb_off_t recoff;
180                         if (!h->group[i]) {
181                                 /* Remember this empty bucket. */
182                                 h->home_bucket = h->found_bucket = i;
183                                 continue;
184                         }
185
186                         /* We can insert extra bits via add_to_hash
187                          * empty bucket logic. */
188                         recoff = h->group[i] & TDB_OFF_MASK;
189                         if (tdb_read_convert(tdb, recoff, rec, sizeof(*rec)))
190                                 return TDB_OFF_ERR;
191
192                         if (key_matches(tdb, rec, recoff, &key)) {
193                                 h->home_bucket = h->found_bucket = i;
194
195                                 if (tinfo) {
196                                         tinfo->levels[tinfo->num_levels]
197                                                 .hashtable = off;
198                                         tinfo->levels[tinfo->num_levels]
199                                                 .total_buckets
200                                                 = 1 << TDB_HASH_GROUP_BITS;
201                                         tinfo->levels[tinfo->num_levels].entry
202                                                 = i;
203                                         tinfo->num_levels++;
204                                 }
205                                 return recoff;
206                         }
207                 }
208                 next = tdb_read_off(tdb, off
209                                     + offsetof(struct tdb_chain, next));
210                 if (next == TDB_OFF_ERR)
211                         return TDB_OFF_ERR;
212                 if (next)
213                         next += sizeof(struct tdb_used_record);
214         }
215         return 0;
216 }
217
218 /* This is the core routine which searches the hashtable for an entry.
219  * On error, no locks are held and TDB_OFF_ERR is returned.
220  * Otherwise, hinfo is filled in (and the optional tinfo).
221  * If not found, the return value is 0.
222  * If found, the return value is the offset, and *rec is the record. */
223 tdb_off_t find_and_lock(struct tdb_context *tdb,
224                         struct tdb_data key,
225                         int ltype,
226                         struct hash_info *h,
227                         struct tdb_used_record *rec,
228                         struct traverse_info *tinfo)
229 {
230         uint32_t i, group;
231         tdb_off_t hashtable;
232         enum TDB_ERROR ecode;
233
234         h->h = tdb_hash(tdb, key.dptr, key.dsize);
235         h->hash_used = 0;
236         group = use_bits(h, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
237         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
238
239         h->hlock_start = hlock_range(group, &h->hlock_range);
240         ecode = tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
241                                 TDB_LOCK_WAIT);
242         if (ecode != TDB_SUCCESS) {
243                 tdb->ecode = ecode;
244                 return TDB_OFF_ERR;
245         }
246
247         hashtable = offsetof(struct tdb_header, hashtable);
248         if (tinfo) {
249                 tinfo->toplevel_group = group;
250                 tinfo->num_levels = 1;
251                 tinfo->levels[0].entry = 0;
252                 tinfo->levels[0].hashtable = hashtable
253                         + (group << TDB_HASH_GROUP_BITS) * sizeof(tdb_off_t);
254                 tinfo->levels[0].total_buckets = 1 << TDB_HASH_GROUP_BITS;
255         }
256
257         while (h->hash_used <= 64) {
258                 /* Read in the hash group. */
259                 h->group_start = hashtable
260                         + group * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
261
262                 if (tdb_read_convert(tdb, h->group_start, &h->group,
263                                      sizeof(h->group)) == -1)
264                         goto fail;
265
266                 /* Pointer to another hash table?  Go down... */
267                 if (is_subhash(h->group[h->home_bucket])) {
268                         hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
269                                 + sizeof(struct tdb_used_record);
270                         if (tinfo) {
271                                 /* When we come back, use *next* bucket */
272                                 tinfo->levels[tinfo->num_levels-1].entry
273                                         += h->home_bucket + 1;
274                         }
275                         group = use_bits(h, TDB_SUBLEVEL_HASH_BITS
276                                          - TDB_HASH_GROUP_BITS);
277                         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
278                         if (tinfo) {
279                                 tinfo->levels[tinfo->num_levels].hashtable
280                                         = hashtable;
281                                 tinfo->levels[tinfo->num_levels].total_buckets
282                                         = 1 << TDB_SUBLEVEL_HASH_BITS;
283                                 tinfo->levels[tinfo->num_levels].entry
284                                         = group << TDB_HASH_GROUP_BITS;
285                                 tinfo->num_levels++;
286                         }
287                         continue;
288                 }
289
290                 /* It's in this group: search (until 0 or all searched) */
291                 for (i = 0, h->found_bucket = h->home_bucket;
292                      i < (1 << TDB_HASH_GROUP_BITS);
293                      i++, h->found_bucket = ((h->found_bucket+1)
294                                              % (1 << TDB_HASH_GROUP_BITS))) {
295                         if (is_subhash(h->group[h->found_bucket]))
296                                 continue;
297
298                         if (!h->group[h->found_bucket])
299                                 break;
300
301                         if (match(tdb, h, &key, h->group[h->found_bucket],
302                                   rec)) {
303                                 if (tinfo) {
304                                         tinfo->levels[tinfo->num_levels-1].entry
305                                                 += h->found_bucket;
306                                 }
307                                 return h->group[h->found_bucket] & TDB_OFF_MASK;
308                         }
309                 }
310                 /* Didn't find it: h indicates where it would go. */
311                 return 0;
312         }
313
314         return find_in_chain(tdb, key, hashtable, h, rec, tinfo);
315
316 fail:
317         tdb_unlock_hashes(tdb, h->hlock_start, h->hlock_range, ltype);
318         return TDB_OFF_ERR;
319 }
320
321 /* I wrote a simple test, expanding a hash to 2GB, for the following
322  * cases:
323  * 1) Expanding all the buckets at once,
324  * 2) Expanding the bucket we wanted to place the new entry into.
325  * 3) Expanding the most-populated bucket,
326  *
327  * I measured the worst/average/best density during this process.
328  * 1) 3%/16%/30%
329  * 2) 4%/20%/38%
330  * 3) 6%/22%/41%
331  *
332  * So we figure out the busiest bucket for the moment.
333  */
334 static unsigned fullest_bucket(struct tdb_context *tdb,
335                                const tdb_off_t *group,
336                                unsigned new_bucket)
337 {
338         unsigned counts[1 << TDB_HASH_GROUP_BITS] = { 0 };
339         unsigned int i, best_bucket;
340
341         /* Count the new entry. */
342         counts[new_bucket]++;
343         best_bucket = new_bucket;
344
345         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
346                 unsigned this_bucket;
347
348                 if (is_subhash(group[i]))
349                         continue;
350                 this_bucket = group[i] & TDB_OFF_HASH_GROUP_MASK;
351                 if (++counts[this_bucket] > counts[best_bucket])
352                         best_bucket = this_bucket;
353         }
354
355         return best_bucket;
356 }
357
358 static bool put_into_group(tdb_off_t *group,
359                            unsigned bucket, tdb_off_t encoded)
360 {
361         unsigned int i;
362
363         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
364                 unsigned b = (bucket + i) % (1 << TDB_HASH_GROUP_BITS);
365
366                 if (group[b] == 0) {
367                         group[b] = encoded;
368                         return true;
369                 }
370         }
371         return false;
372 }
373
374 static void force_into_group(tdb_off_t *group,
375                              unsigned bucket, tdb_off_t encoded)
376 {
377         if (!put_into_group(group, bucket, encoded))
378                 abort();
379 }
380
381 static tdb_off_t encode_offset(tdb_off_t new_off, struct hash_info *h)
382 {
383         return h->home_bucket
384                 | new_off
385                 | ((uint64_t)bits_from(h->h,
386                                   64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
387                                   TDB_OFF_UPPER_STEAL_EXTRA)
388                    << TDB_OFF_HASH_EXTRA_BIT);
389 }
390
391 /* Simply overwrite the hash entry we found before. */
392 int replace_in_hash(struct tdb_context *tdb,
393                     struct hash_info *h,
394                     tdb_off_t new_off)
395 {
396         return tdb_write_off(tdb, hbucket_off(h->group_start, h->found_bucket),
397                              encode_offset(new_off, h));
398 }
399
400 /* We slot in anywhere that's empty in the chain. */
401 static int COLD add_to_chain(struct tdb_context *tdb,
402                              tdb_off_t subhash,
403                              tdb_off_t new_off)
404 {
405         size_t entry = tdb_find_zero_off(tdb, subhash, 1<<TDB_HASH_GROUP_BITS);
406
407         if (entry == 1 << TDB_HASH_GROUP_BITS) {
408                 tdb_off_t next;
409
410                 next = tdb_read_off(tdb, subhash
411                                     + offsetof(struct tdb_chain, next));
412                 if (next == TDB_OFF_ERR)
413                         return -1;
414
415                 if (!next) {
416                         next = alloc(tdb, 0, sizeof(struct tdb_chain), 0,
417                                      TDB_CHAIN_MAGIC, false);
418                         if (next == TDB_OFF_ERR)
419                                 return -1;
420                         if (zero_out(tdb, next+sizeof(struct tdb_used_record),
421                                      sizeof(struct tdb_chain)))
422                                 return -1;
423                         if (tdb_write_off(tdb, subhash
424                                           + offsetof(struct tdb_chain, next),
425                                           next) != 0)
426                                 return -1;
427                 }
428                 return add_to_chain(tdb, next, new_off);
429         }
430
431         return tdb_write_off(tdb, subhash + entry * sizeof(tdb_off_t),
432                              new_off);
433 }
434
435 /* Add into a newly created subhash. */
436 static int add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash,
437                           unsigned hash_used, tdb_off_t val)
438 {
439         tdb_off_t off = (val & TDB_OFF_MASK), *group;
440         struct hash_info h;
441         unsigned int gnum;
442
443         h.hash_used = hash_used;
444
445         if (hash_used + TDB_SUBLEVEL_HASH_BITS > 64)
446                 return add_to_chain(tdb, subhash, off);
447
448         h.h = hash_record(tdb, off);
449         gnum = use_bits(&h, TDB_SUBLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS);
450         h.group_start = subhash
451                 + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
452         h.home_bucket = use_bits(&h, TDB_HASH_GROUP_BITS);
453
454         group = tdb_access_write(tdb, h.group_start,
455                                  sizeof(*group) << TDB_HASH_GROUP_BITS, true);
456         if (!group)
457                 return -1;
458         force_into_group(group, h.home_bucket, encode_offset(off, &h));
459         return tdb_access_commit(tdb, group);
460 }
461
462 static int expand_group(struct tdb_context *tdb, struct hash_info *h)
463 {
464         unsigned bucket, num_vals, i, magic;
465         size_t subsize;
466         tdb_off_t subhash;
467         tdb_off_t vals[1 << TDB_HASH_GROUP_BITS];
468
469         /* Attach new empty subhash under fullest bucket. */
470         bucket = fullest_bucket(tdb, h->group, h->home_bucket);
471
472         if (h->hash_used == 64) {
473                 add_stat(tdb, alloc_chain, 1);
474                 subsize = sizeof(struct tdb_chain);
475                 magic = TDB_CHAIN_MAGIC;
476         } else {
477                 add_stat(tdb, alloc_subhash, 1);
478                 subsize = (sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS);
479                 magic = TDB_HTABLE_MAGIC;
480         }
481
482         subhash = alloc(tdb, 0, subsize, 0, magic, false);
483         if (subhash == TDB_OFF_ERR)
484                 return -1;
485
486         if (zero_out(tdb, subhash + sizeof(struct tdb_used_record), subsize))
487                 return -1;
488
489         /* Remove any which are destined for bucket or are in wrong place. */
490         num_vals = 0;
491         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
492                 unsigned home_bucket = h->group[i] & TDB_OFF_HASH_GROUP_MASK;
493                 if (!h->group[i] || is_subhash(h->group[i]))
494                         continue;
495                 if (home_bucket == bucket || home_bucket != i) {
496                         vals[num_vals++] = h->group[i];
497                         h->group[i] = 0;
498                 }
499         }
500         /* FIXME: This assert is valid, but we do this during unit test :( */
501         /* assert(num_vals); */
502
503         /* Overwrite expanded bucket with subhash pointer. */
504         h->group[bucket] = subhash | (1ULL << TDB_OFF_UPPER_STEAL_SUBHASH_BIT);
505
506         /* Point to actual contents of record. */
507         subhash += sizeof(struct tdb_used_record);
508
509         /* Put values back. */
510         for (i = 0; i < num_vals; i++) {
511                 unsigned this_bucket = vals[i] & TDB_OFF_HASH_GROUP_MASK;
512
513                 if (this_bucket == bucket) {
514                         if (add_to_subhash(tdb, subhash, h->hash_used, vals[i]))
515                                 return -1;
516                 } else {
517                         /* There should be room to put this back. */
518                         force_into_group(h->group, this_bucket, vals[i]);
519                 }
520         }
521         return 0;
522 }
523
524 int delete_from_hash(struct tdb_context *tdb, struct hash_info *h)
525 {
526         unsigned int i, num_movers = 0;
527         tdb_off_t movers[1 << TDB_HASH_GROUP_BITS];
528
529         h->group[h->found_bucket] = 0;
530         for (i = 1; i < (1 << TDB_HASH_GROUP_BITS); i++) {
531                 unsigned this_bucket;
532
533                 this_bucket = (h->found_bucket+i) % (1 << TDB_HASH_GROUP_BITS);
534                 /* Empty bucket?  We're done. */
535                 if (!h->group[this_bucket])
536                         break;
537
538                 /* Ignore subhashes. */
539                 if (is_subhash(h->group[this_bucket]))
540                         continue;
541
542                 /* If this one is not happy where it is, we'll move it. */
543                 if ((h->group[this_bucket] & TDB_OFF_HASH_GROUP_MASK)
544                     != this_bucket) {
545                         movers[num_movers++] = h->group[this_bucket];
546                         h->group[this_bucket] = 0;
547                 }
548         }
549
550         /* Put back the ones we erased. */
551         for (i = 0; i < num_movers; i++) {
552                 force_into_group(h->group, movers[i] & TDB_OFF_HASH_GROUP_MASK,
553                                  movers[i]);
554         }
555
556         /* Now we write back the hash group */
557         return tdb_write_convert(tdb, h->group_start,
558                                  h->group, sizeof(h->group));
559 }
560
561 int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off)
562 {
563         /* We hit an empty bucket during search?  That's where it goes. */
564         if (!h->group[h->found_bucket]) {
565                 h->group[h->found_bucket] = encode_offset(new_off, h);
566                 /* Write back the modified group. */
567                 return tdb_write_convert(tdb, h->group_start,
568                                          h->group, sizeof(h->group));
569         }
570
571         if (h->hash_used > 64)
572                 return add_to_chain(tdb, h->group_start, new_off);
573
574         /* We're full.  Expand. */
575         if (expand_group(tdb, h) == -1)
576                 return -1;
577
578         if (is_subhash(h->group[h->home_bucket])) {
579                 /* We were expanded! */
580                 tdb_off_t hashtable;
581                 unsigned int gnum;
582
583                 /* Write back the modified group. */
584                 if (tdb_write_convert(tdb, h->group_start, h->group,
585                                       sizeof(h->group)))
586                         return -1;
587
588                 /* Move hashinfo down a level. */
589                 hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
590                         + sizeof(struct tdb_used_record);
591                 gnum = use_bits(h,TDB_SUBLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
592                 h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
593                 h->group_start = hashtable
594                         + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
595                 if (tdb_read_convert(tdb, h->group_start, &h->group,
596                                      sizeof(h->group)) == -1)
597                         return -1;
598         }
599
600         /* Expanding the group must have made room if it didn't choose this
601          * bucket. */
602         if (put_into_group(h->group, h->home_bucket, encode_offset(new_off, h)))
603                 return tdb_write_convert(tdb, h->group_start,
604                                          h->group, sizeof(h->group));
605
606         /* This can happen if all hashes in group (and us) dropped into same
607          * group in subhash. */
608         return add_to_hash(tdb, h, new_off);
609 }
610
611 /* Traverse support: returns offset of record, or 0 or TDB_OFF_ERR. */
612 static tdb_off_t iterate_hash(struct tdb_context *tdb,
613                               struct traverse_info *tinfo)
614 {
615         tdb_off_t off, val;
616         unsigned int i;
617         struct traverse_level *tlevel;
618
619         tlevel = &tinfo->levels[tinfo->num_levels-1];
620
621 again:
622         for (i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
623                                       tlevel->entry, tlevel->total_buckets);
624              i != tlevel->total_buckets;
625              i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
626                                       i+1, tlevel->total_buckets)) {
627                 val = tdb_read_off(tdb, tlevel->hashtable+sizeof(tdb_off_t)*i);
628                 if (unlikely(val == TDB_OFF_ERR))
629                         return TDB_OFF_ERR;
630
631                 off = val & TDB_OFF_MASK;
632
633                 /* This makes the delete-all-in-traverse case work
634                  * (and simplifies our logic a little). */
635                 if (off == tinfo->prev)
636                         continue;
637
638                 tlevel->entry = i;
639
640                 if (!is_subhash(val)) {
641                         /* Found one. */
642                         tinfo->prev = off;
643                         return off;
644                 }
645
646                 /* When we come back, we want the next one */
647                 tlevel->entry++;
648                 tinfo->num_levels++;
649                 tlevel++;
650                 tlevel->hashtable = off + sizeof(struct tdb_used_record);
651                 tlevel->entry = 0;
652                 /* Next level is a chain? */
653                 if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1))
654                         tlevel->total_buckets = (1 << TDB_HASH_GROUP_BITS);
655                 else
656                         tlevel->total_buckets = (1 << TDB_SUBLEVEL_HASH_BITS);
657                 goto again;
658         }
659
660         /* Nothing there? */
661         if (tinfo->num_levels == 1)
662                 return 0;
663
664         /* Handle chained entries. */
665         if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1)) {
666                 tlevel->hashtable = tdb_read_off(tdb, tlevel->hashtable
667                                                  + offsetof(struct tdb_chain,
668                                                             next));
669                 if (tlevel->hashtable == TDB_OFF_ERR)
670                         return TDB_OFF_ERR;
671                 if (tlevel->hashtable) {
672                         tlevel->hashtable += sizeof(struct tdb_used_record);
673                         tlevel->entry = 0;
674                         goto again;
675                 }
676         }
677
678         /* Go back up and keep searching. */
679         tinfo->num_levels--;
680         tlevel--;
681         goto again;
682 }
683
684 /* Return 1 if we find something, 0 if not, -1 on error. */
685 int next_in_hash(struct tdb_context *tdb,
686                  struct traverse_info *tinfo,
687                  TDB_DATA *kbuf, size_t *dlen)
688 {
689         const unsigned group_bits = TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS;
690         tdb_off_t hl_start, hl_range, off;
691         enum TDB_ERROR ecode;
692
693         while (tinfo->toplevel_group < (1 << group_bits)) {
694                 hl_start = (tdb_off_t)tinfo->toplevel_group
695                         << (64 - group_bits);
696                 hl_range = 1ULL << group_bits;
697                 ecode = tdb_lock_hashes(tdb, hl_start, hl_range, F_RDLCK,
698                                         TDB_LOCK_WAIT);
699                 if (ecode != TDB_SUCCESS) {
700                         tdb->ecode = ecode;
701                         return -1;
702                 }
703
704                 off = iterate_hash(tdb, tinfo);
705                 if (off) {
706                         struct tdb_used_record rec;
707
708                         if (tdb_read_convert(tdb, off, &rec, sizeof(rec))) {
709                                 tdb_unlock_hashes(tdb,
710                                                   hl_start, hl_range, F_RDLCK);
711                                 return -1;
712                         }
713                         if (rec_magic(&rec) != TDB_USED_MAGIC) {
714                                 tdb_logerr(tdb, TDB_ERR_CORRUPT,
715                                            TDB_LOG_ERROR,
716                                            "next_in_hash:"
717                                            " corrupt record at %llu",
718                                            (long long)off);
719                                 return -1;
720                         }
721
722                         kbuf->dsize = rec_key_length(&rec);
723
724                         /* They want data as well? */
725                         if (dlen) {
726                                 *dlen = rec_data_length(&rec);
727                                 kbuf->dptr = tdb_alloc_read(tdb,
728                                                             off + sizeof(rec),
729                                                             kbuf->dsize
730                                                             + *dlen);
731                         } else {
732                                 kbuf->dptr = tdb_alloc_read(tdb,
733                                                             off + sizeof(rec),
734                                                             kbuf->dsize);
735                         }
736                         tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
737                         return kbuf->dptr ? 1 : -1;
738                 }
739
740                 tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
741
742                 tinfo->toplevel_group++;
743                 tinfo->levels[0].hashtable
744                         += (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
745                 tinfo->levels[0].entry = 0;
746         }
747         return 0;
748 }
749
750 /* Return 1 if we find something, 0 if not, -1 on error. */
751 int first_in_hash(struct tdb_context *tdb,
752                   struct traverse_info *tinfo,
753                   TDB_DATA *kbuf, size_t *dlen)
754 {
755         tinfo->prev = 0;
756         tinfo->toplevel_group = 0;
757         tinfo->num_levels = 1;
758         tinfo->levels[0].hashtable = offsetof(struct tdb_header, hashtable);
759         tinfo->levels[0].entry = 0;
760         tinfo->levels[0].total_buckets = (1 << TDB_HASH_GROUP_BITS);
761
762         return next_in_hash(tdb, tinfo, kbuf, dlen);
763 }
764
765 /* Even if the entry isn't in this hash bucket, you'd have to lock this
766  * bucket to find it. */
767 static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
768                      int ltype, enum tdb_lock_flags waitflag,
769                      const char *func)
770 {
771         enum TDB_ERROR ecode;
772         uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
773         tdb_off_t lockstart, locksize;
774         unsigned int group, gbits;
775
776         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
777         group = bits_from(h, 64 - gbits, gbits);
778
779         lockstart = hlock_range(group, &locksize);
780
781         ecode = tdb_lock_hashes(tdb, lockstart, locksize, ltype, waitflag);
782         tdb_trace_1rec(tdb, func, *key);
783         if (ecode != TDB_SUCCESS) {
784                 tdb->ecode = ecode;
785                 return -1;
786         }
787         return 0;
788 }
789
790 /* lock/unlock one hash chain. This is meant to be used to reduce
791    contention - it cannot guarantee how many records will be locked */
792 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
793 {
794         return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
795 }
796
797 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
798 {
799         uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
800         tdb_off_t lockstart, locksize;
801         unsigned int group, gbits;
802         enum TDB_ERROR ecode;
803
804         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
805         group = bits_from(h, 64 - gbits, gbits);
806
807         lockstart = hlock_range(group, &locksize);
808
809         tdb_trace_1rec(tdb, "tdb_chainunlock", key);
810         ecode = tdb_unlock_hashes(tdb, lockstart, locksize, F_WRLCK);
811         if (ecode != TDB_SUCCESS) {
812                 tdb->ecode = ecode;
813                 return -1;
814         }
815         return 0;
816 }