]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/hash.c
tdb2: add error conversion functions.
[ccan] / ccan / tdb2 / hash.c
1  /*
2    Trivial Database 2: hash handling
3    Copyright (C) Rusty Russell 2010
4
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 3 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "private.h"
19 #include <ccan/hash/hash.h>
20 #include <assert.h>
21
22 /* Default hash function. */
23 uint64_t tdb_jenkins_hash(const void *key, size_t length, uint64_t seed,
24                           void *unused)
25 {
26         uint64_t ret;
27         /* hash64_stable assumes lower bits are more important; they are a
28          * slightly better hash.  We use the upper bits first, so swap them. */
29         ret = hash64_stable((const unsigned char *)key, length, seed);
30         return (ret >> 32) | (ret << 32);
31 }
32
33 uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len)
34 {
35         return tdb->hash_fn(ptr, len, tdb->hash_seed, tdb->hash_data);
36 }
37
38 uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
39 {
40         const struct tdb_used_record *r;
41         const void *key;
42         uint64_t klen, hash;
43
44         r = tdb_access_read(tdb, off, sizeof(*r), true);
45         if (TDB_PTR_IS_ERR(r)) {
46                 /* FIXME */
47                 return 0;
48         }
49
50         klen = rec_key_length(r);
51         tdb_access_release(tdb, r);
52
53         key = tdb_access_read(tdb, off + sizeof(*r), klen, false);
54         if (TDB_PTR_IS_ERR(key)) {
55                 return 0;
56         }
57
58         hash = tdb_hash(tdb, key, klen);
59         tdb_access_release(tdb, key);
60         return hash;
61 }
62
63 /* Get bits from a value. */
64 static uint32_t bits_from(uint64_t val, unsigned start, unsigned num)
65 {
66         assert(num <= 32);
67         return (val >> start) & ((1U << num) - 1);
68 }
69
70 /* We take bits from the top: that way we can lock whole sections of the hash
71  * by using lock ranges. */
72 static uint32_t use_bits(struct hash_info *h, unsigned num)
73 {
74         h->hash_used += num;
75         return bits_from(h->h, 64 - h->hash_used, num);
76 }
77
78 static tdb_bool_err key_matches(struct tdb_context *tdb,
79                                 const struct tdb_used_record *rec,
80                                 tdb_off_t off,
81                                 const struct tdb_data *key)
82 {
83         tdb_bool_err ret = false;
84         const char *rkey;
85
86         if (rec_key_length(rec) != key->dsize) {
87                 tdb->stats.compare_wrong_keylen++;
88                 return ret;
89         }
90
91         rkey = tdb_access_read(tdb, off + sizeof(*rec), key->dsize, false);
92         if (TDB_PTR_IS_ERR(rkey)) {
93                 return (tdb_bool_err)TDB_PTR_ERR(rkey);
94         }
95         if (memcmp(rkey, key->dptr, key->dsize) == 0)
96                 ret = true;
97         else
98                 tdb->stats.compare_wrong_keycmp++;
99         tdb_access_release(tdb, rkey);
100         return ret;
101 }
102
103 /* Does entry match? */
104 static tdb_bool_err match(struct tdb_context *tdb,
105                           struct hash_info *h,
106                           const struct tdb_data *key,
107                           tdb_off_t val,
108                           struct tdb_used_record *rec)
109 {
110         tdb_off_t off;
111         enum TDB_ERROR ecode;
112
113         tdb->stats.compares++;
114         /* Desired bucket must match. */
115         if (h->home_bucket != (val & TDB_OFF_HASH_GROUP_MASK)) {
116                 tdb->stats.compare_wrong_bucket++;
117                 return false;
118         }
119
120         /* Top bits of offset == next bits of hash. */
121         if (bits_from(val, TDB_OFF_HASH_EXTRA_BIT, TDB_OFF_UPPER_STEAL_EXTRA)
122             != bits_from(h->h, 64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
123                     TDB_OFF_UPPER_STEAL_EXTRA)) {
124                 tdb->stats.compare_wrong_offsetbits++;
125                 return false;
126         }
127
128         off = val & TDB_OFF_MASK;
129         ecode = tdb_read_convert(tdb, off, rec, sizeof(*rec));
130         if (ecode != TDB_SUCCESS) {
131                 return (tdb_bool_err)ecode;
132         }
133
134         if ((h->h & ((1 << 11)-1)) != rec_hash(rec)) {
135                 tdb->stats.compare_wrong_rechash++;
136                 return false;
137         }
138
139         return key_matches(tdb, rec, off, key);
140 }
141
142 static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned bucket)
143 {
144         return group_start
145                 + (bucket % (1 << TDB_HASH_GROUP_BITS)) * sizeof(tdb_off_t);
146 }
147
148 bool is_subhash(tdb_off_t val)
149 {
150         return (val >> TDB_OFF_UPPER_STEAL_SUBHASH_BIT) & 1;
151 }
152
153 /* FIXME: Guess the depth, don't over-lock! */
154 static tdb_off_t hlock_range(tdb_off_t group, tdb_off_t *size)
155 {
156         *size = 1ULL << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
157         return group << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
158 }
159
160 static tdb_off_t COLD find_in_chain(struct tdb_context *tdb,
161                                     struct tdb_data key,
162                                     tdb_off_t chain,
163                                     struct hash_info *h,
164                                     struct tdb_used_record *rec,
165                                     struct traverse_info *tinfo)
166 {
167         tdb_off_t off, next;
168         enum TDB_ERROR ecode;
169
170         /* In case nothing is free, we set these to zero. */
171         h->home_bucket = h->found_bucket = 0;
172
173         for (off = chain; off; off = next) {
174                 unsigned int i;
175
176                 h->group_start = off;
177                 ecode = tdb_read_convert(tdb, off, h->group, sizeof(h->group));
178                 if (ecode != TDB_SUCCESS) {
179                         return TDB_ERR_TO_OFF(ecode);
180                 }
181
182                 for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
183                         tdb_off_t recoff;
184                         if (!h->group[i]) {
185                                 /* Remember this empty bucket. */
186                                 h->home_bucket = h->found_bucket = i;
187                                 continue;
188                         }
189
190                         /* We can insert extra bits via add_to_hash
191                          * empty bucket logic. */
192                         recoff = h->group[i] & TDB_OFF_MASK;
193                         ecode = tdb_read_convert(tdb, recoff, rec,
194                                                  sizeof(*rec));
195                         if (ecode != TDB_SUCCESS) {
196                                 return TDB_ERR_TO_OFF(ecode);
197                         }
198
199                         ecode = TDB_OFF_TO_ERR(key_matches(tdb, rec, recoff,
200                                                            &key));
201                         if (ecode < 0) {
202                                 return TDB_ERR_TO_OFF(ecode);
203                         }
204                         if (ecode == (enum TDB_ERROR)1) {
205                                 h->home_bucket = h->found_bucket = i;
206
207                                 if (tinfo) {
208                                         tinfo->levels[tinfo->num_levels]
209                                                 .hashtable = off;
210                                         tinfo->levels[tinfo->num_levels]
211                                                 .total_buckets
212                                                 = 1 << TDB_HASH_GROUP_BITS;
213                                         tinfo->levels[tinfo->num_levels].entry
214                                                 = i;
215                                         tinfo->num_levels++;
216                                 }
217                                 return recoff;
218                         }
219                 }
220                 next = tdb_read_off(tdb, off
221                                     + offsetof(struct tdb_chain, next));
222                 if (TDB_OFF_IS_ERR(next)) {
223                         return next;
224                 }
225                 if (next)
226                         next += sizeof(struct tdb_used_record);
227         }
228         return 0;
229 }
230
231 /* This is the core routine which searches the hashtable for an entry.
232  * On error, no locks are held and -ve is returned.
233  * Otherwise, hinfo is filled in (and the optional tinfo).
234  * If not found, the return value is 0.
235  * If found, the return value is the offset, and *rec is the record. */
236 tdb_off_t find_and_lock(struct tdb_context *tdb,
237                         struct tdb_data key,
238                         int ltype,
239                         struct hash_info *h,
240                         struct tdb_used_record *rec,
241                         struct traverse_info *tinfo)
242 {
243         uint32_t i, group;
244         tdb_off_t hashtable;
245         enum TDB_ERROR ecode;
246
247         h->h = tdb_hash(tdb, key.dptr, key.dsize);
248         h->hash_used = 0;
249         group = use_bits(h, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
250         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
251
252         h->hlock_start = hlock_range(group, &h->hlock_range);
253         ecode = tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
254                                 TDB_LOCK_WAIT);
255         if (ecode != TDB_SUCCESS) {
256                 return TDB_ERR_TO_OFF(ecode);
257         }
258
259         hashtable = offsetof(struct tdb_header, hashtable);
260         if (tinfo) {
261                 tinfo->toplevel_group = group;
262                 tinfo->num_levels = 1;
263                 tinfo->levels[0].entry = 0;
264                 tinfo->levels[0].hashtable = hashtable
265                         + (group << TDB_HASH_GROUP_BITS) * sizeof(tdb_off_t);
266                 tinfo->levels[0].total_buckets = 1 << TDB_HASH_GROUP_BITS;
267         }
268
269         while (h->hash_used <= 64) {
270                 /* Read in the hash group. */
271                 h->group_start = hashtable
272                         + group * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
273
274                 ecode = tdb_read_convert(tdb, h->group_start, &h->group,
275                                          sizeof(h->group));
276                 if (ecode != TDB_SUCCESS) {
277                         goto fail;
278                 }
279
280                 /* Pointer to another hash table?  Go down... */
281                 if (is_subhash(h->group[h->home_bucket])) {
282                         hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
283                                 + sizeof(struct tdb_used_record);
284                         if (tinfo) {
285                                 /* When we come back, use *next* bucket */
286                                 tinfo->levels[tinfo->num_levels-1].entry
287                                         += h->home_bucket + 1;
288                         }
289                         group = use_bits(h, TDB_SUBLEVEL_HASH_BITS
290                                          - TDB_HASH_GROUP_BITS);
291                         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
292                         if (tinfo) {
293                                 tinfo->levels[tinfo->num_levels].hashtable
294                                         = hashtable;
295                                 tinfo->levels[tinfo->num_levels].total_buckets
296                                         = 1 << TDB_SUBLEVEL_HASH_BITS;
297                                 tinfo->levels[tinfo->num_levels].entry
298                                         = group << TDB_HASH_GROUP_BITS;
299                                 tinfo->num_levels++;
300                         }
301                         continue;
302                 }
303
304                 /* It's in this group: search (until 0 or all searched) */
305                 for (i = 0, h->found_bucket = h->home_bucket;
306                      i < (1 << TDB_HASH_GROUP_BITS);
307                      i++, h->found_bucket = ((h->found_bucket+1)
308                                              % (1 << TDB_HASH_GROUP_BITS))) {
309                         tdb_bool_err berr;
310                         if (is_subhash(h->group[h->found_bucket]))
311                                 continue;
312
313                         if (!h->group[h->found_bucket])
314                                 break;
315
316                         berr = match(tdb, h, &key, h->group[h->found_bucket],
317                                      rec);
318                         if (berr < 0) {
319                                 ecode = TDB_OFF_TO_ERR(berr);
320                                 goto fail;
321                         }
322                         if (berr) {
323                                 if (tinfo) {
324                                         tinfo->levels[tinfo->num_levels-1].entry
325                                                 += h->found_bucket;
326                                 }
327                                 return h->group[h->found_bucket] & TDB_OFF_MASK;
328                         }
329                 }
330                 /* Didn't find it: h indicates where it would go. */
331                 return 0;
332         }
333
334         return find_in_chain(tdb, key, hashtable, h, rec, tinfo);
335
336 fail:
337         tdb_unlock_hashes(tdb, h->hlock_start, h->hlock_range, ltype);
338         return TDB_ERR_TO_OFF(ecode);
339 }
340
341 /* I wrote a simple test, expanding a hash to 2GB, for the following
342  * cases:
343  * 1) Expanding all the buckets at once,
344  * 2) Expanding the bucket we wanted to place the new entry into.
345  * 3) Expanding the most-populated bucket,
346  *
347  * I measured the worst/average/best density during this process.
348  * 1) 3%/16%/30%
349  * 2) 4%/20%/38%
350  * 3) 6%/22%/41%
351  *
352  * So we figure out the busiest bucket for the moment.
353  */
354 static unsigned fullest_bucket(struct tdb_context *tdb,
355                                const tdb_off_t *group,
356                                unsigned new_bucket)
357 {
358         unsigned counts[1 << TDB_HASH_GROUP_BITS] = { 0 };
359         unsigned int i, best_bucket;
360
361         /* Count the new entry. */
362         counts[new_bucket]++;
363         best_bucket = new_bucket;
364
365         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
366                 unsigned this_bucket;
367
368                 if (is_subhash(group[i]))
369                         continue;
370                 this_bucket = group[i] & TDB_OFF_HASH_GROUP_MASK;
371                 if (++counts[this_bucket] > counts[best_bucket])
372                         best_bucket = this_bucket;
373         }
374
375         return best_bucket;
376 }
377
378 static bool put_into_group(tdb_off_t *group,
379                            unsigned bucket, tdb_off_t encoded)
380 {
381         unsigned int i;
382
383         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
384                 unsigned b = (bucket + i) % (1 << TDB_HASH_GROUP_BITS);
385
386                 if (group[b] == 0) {
387                         group[b] = encoded;
388                         return true;
389                 }
390         }
391         return false;
392 }
393
394 static void force_into_group(tdb_off_t *group,
395                              unsigned bucket, tdb_off_t encoded)
396 {
397         if (!put_into_group(group, bucket, encoded))
398                 abort();
399 }
400
401 static tdb_off_t encode_offset(tdb_off_t new_off, struct hash_info *h)
402 {
403         return h->home_bucket
404                 | new_off
405                 | ((uint64_t)bits_from(h->h,
406                                   64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
407                                   TDB_OFF_UPPER_STEAL_EXTRA)
408                    << TDB_OFF_HASH_EXTRA_BIT);
409 }
410
411 /* Simply overwrite the hash entry we found before. */
412 enum TDB_ERROR replace_in_hash(struct tdb_context *tdb,
413                                struct hash_info *h,
414                                tdb_off_t new_off)
415 {
416         return tdb_write_off(tdb, hbucket_off(h->group_start, h->found_bucket),
417                              encode_offset(new_off, h));
418 }
419
420 /* We slot in anywhere that's empty in the chain. */
421 static enum TDB_ERROR COLD add_to_chain(struct tdb_context *tdb,
422                                         tdb_off_t subhash,
423                                         tdb_off_t new_off)
424 {
425         tdb_off_t entry;
426         enum TDB_ERROR ecode;
427
428         entry = tdb_find_zero_off(tdb, subhash, 1<<TDB_HASH_GROUP_BITS);
429         if (TDB_OFF_IS_ERR(entry)) {
430                 return TDB_OFF_TO_ERR(entry);
431         }
432
433         if (entry == 1 << TDB_HASH_GROUP_BITS) {
434                 tdb_off_t next;
435
436                 next = tdb_read_off(tdb, subhash
437                                     + offsetof(struct tdb_chain, next));
438                 if (TDB_OFF_IS_ERR(next)) {
439                         return TDB_OFF_TO_ERR(next);
440                 }
441
442                 if (!next) {
443                         next = alloc(tdb, 0, sizeof(struct tdb_chain), 0,
444                                      TDB_CHAIN_MAGIC, false);
445                         if (TDB_OFF_IS_ERR(next))
446                                 return TDB_OFF_TO_ERR(next);
447                         ecode = zero_out(tdb,
448                                          next+sizeof(struct tdb_used_record),
449                                          sizeof(struct tdb_chain));
450                         if (ecode != TDB_SUCCESS) {
451                                 return ecode;
452                         }
453                         ecode = tdb_write_off(tdb, subhash
454                                               + offsetof(struct tdb_chain,
455                                                          next),
456                                               next);
457                         if (ecode != TDB_SUCCESS) {
458                                 return ecode;
459                         }
460                 }
461                 return add_to_chain(tdb, next, new_off);
462         }
463
464         return tdb_write_off(tdb, subhash + entry * sizeof(tdb_off_t),
465                              new_off);
466 }
467
468 /* Add into a newly created subhash. */
469 static enum TDB_ERROR add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash,
470                                      unsigned hash_used, tdb_off_t val)
471 {
472         tdb_off_t off = (val & TDB_OFF_MASK), *group;
473         struct hash_info h;
474         unsigned int gnum;
475
476         h.hash_used = hash_used;
477
478         if (hash_used + TDB_SUBLEVEL_HASH_BITS > 64)
479                 return add_to_chain(tdb, subhash, off);
480
481         h.h = hash_record(tdb, off);
482         gnum = use_bits(&h, TDB_SUBLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS);
483         h.group_start = subhash
484                 + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
485         h.home_bucket = use_bits(&h, TDB_HASH_GROUP_BITS);
486
487         group = tdb_access_write(tdb, h.group_start,
488                                  sizeof(*group) << TDB_HASH_GROUP_BITS, true);
489         if (TDB_PTR_IS_ERR(group)) {
490                 return TDB_PTR_ERR(group);
491         }
492         force_into_group(group, h.home_bucket, encode_offset(off, &h));
493         return tdb_access_commit(tdb, group);
494 }
495
496 static enum TDB_ERROR expand_group(struct tdb_context *tdb, struct hash_info *h)
497 {
498         unsigned bucket, num_vals, i, magic;
499         size_t subsize;
500         tdb_off_t subhash;
501         tdb_off_t vals[1 << TDB_HASH_GROUP_BITS];
502         enum TDB_ERROR ecode;
503
504         /* Attach new empty subhash under fullest bucket. */
505         bucket = fullest_bucket(tdb, h->group, h->home_bucket);
506
507         if (h->hash_used == 64) {
508                 tdb->stats.alloc_chain++;
509                 subsize = sizeof(struct tdb_chain);
510                 magic = TDB_CHAIN_MAGIC;
511         } else {
512                 tdb->stats.alloc_subhash++;
513                 subsize = (sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS);
514                 magic = TDB_HTABLE_MAGIC;
515         }
516
517         subhash = alloc(tdb, 0, subsize, 0, magic, false);
518         if (TDB_OFF_IS_ERR(subhash)) {
519                 return TDB_OFF_TO_ERR(subhash);
520         }
521
522         ecode = zero_out(tdb, subhash + sizeof(struct tdb_used_record),
523                          subsize);
524         if (ecode != TDB_SUCCESS) {
525                 return ecode;
526         }
527
528         /* Remove any which are destined for bucket or are in wrong place. */
529         num_vals = 0;
530         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
531                 unsigned home_bucket = h->group[i] & TDB_OFF_HASH_GROUP_MASK;
532                 if (!h->group[i] || is_subhash(h->group[i]))
533                         continue;
534                 if (home_bucket == bucket || home_bucket != i) {
535                         vals[num_vals++] = h->group[i];
536                         h->group[i] = 0;
537                 }
538         }
539         /* FIXME: This assert is valid, but we do this during unit test :( */
540         /* assert(num_vals); */
541
542         /* Overwrite expanded bucket with subhash pointer. */
543         h->group[bucket] = subhash | (1ULL << TDB_OFF_UPPER_STEAL_SUBHASH_BIT);
544
545         /* Point to actual contents of record. */
546         subhash += sizeof(struct tdb_used_record);
547
548         /* Put values back. */
549         for (i = 0; i < num_vals; i++) {
550                 unsigned this_bucket = vals[i] & TDB_OFF_HASH_GROUP_MASK;
551
552                 if (this_bucket == bucket) {
553                         ecode = add_to_subhash(tdb, subhash, h->hash_used,
554                                                vals[i]);
555                         if (ecode != TDB_SUCCESS)
556                                 return ecode;
557                 } else {
558                         /* There should be room to put this back. */
559                         force_into_group(h->group, this_bucket, vals[i]);
560                 }
561         }
562         return TDB_SUCCESS;
563 }
564
565 enum TDB_ERROR delete_from_hash(struct tdb_context *tdb, struct hash_info *h)
566 {
567         unsigned int i, num_movers = 0;
568         tdb_off_t movers[1 << TDB_HASH_GROUP_BITS];
569
570         h->group[h->found_bucket] = 0;
571         for (i = 1; i < (1 << TDB_HASH_GROUP_BITS); i++) {
572                 unsigned this_bucket;
573
574                 this_bucket = (h->found_bucket+i) % (1 << TDB_HASH_GROUP_BITS);
575                 /* Empty bucket?  We're done. */
576                 if (!h->group[this_bucket])
577                         break;
578
579                 /* Ignore subhashes. */
580                 if (is_subhash(h->group[this_bucket]))
581                         continue;
582
583                 /* If this one is not happy where it is, we'll move it. */
584                 if ((h->group[this_bucket] & TDB_OFF_HASH_GROUP_MASK)
585                     != this_bucket) {
586                         movers[num_movers++] = h->group[this_bucket];
587                         h->group[this_bucket] = 0;
588                 }
589         }
590
591         /* Put back the ones we erased. */
592         for (i = 0; i < num_movers; i++) {
593                 force_into_group(h->group, movers[i] & TDB_OFF_HASH_GROUP_MASK,
594                                  movers[i]);
595         }
596
597         /* Now we write back the hash group */
598         return tdb_write_convert(tdb, h->group_start,
599                                  h->group, sizeof(h->group));
600 }
601
602 enum TDB_ERROR add_to_hash(struct tdb_context *tdb, struct hash_info *h,
603                            tdb_off_t new_off)
604 {
605         enum TDB_ERROR ecode;
606
607         /* We hit an empty bucket during search?  That's where it goes. */
608         if (!h->group[h->found_bucket]) {
609                 h->group[h->found_bucket] = encode_offset(new_off, h);
610                 /* Write back the modified group. */
611                 return tdb_write_convert(tdb, h->group_start,
612                                          h->group, sizeof(h->group));
613         }
614
615         if (h->hash_used > 64)
616                 return add_to_chain(tdb, h->group_start, new_off);
617
618         /* We're full.  Expand. */
619         ecode = expand_group(tdb, h);
620         if (ecode != TDB_SUCCESS) {
621                 return ecode;
622         }
623
624         if (is_subhash(h->group[h->home_bucket])) {
625                 /* We were expanded! */
626                 tdb_off_t hashtable;
627                 unsigned int gnum;
628
629                 /* Write back the modified group. */
630                 ecode = tdb_write_convert(tdb, h->group_start, h->group,
631                                           sizeof(h->group));
632                 if (ecode != TDB_SUCCESS) {
633                         return ecode;
634                 }
635
636                 /* Move hashinfo down a level. */
637                 hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
638                         + sizeof(struct tdb_used_record);
639                 gnum = use_bits(h,TDB_SUBLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
640                 h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
641                 h->group_start = hashtable
642                         + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
643                 ecode = tdb_read_convert(tdb, h->group_start, &h->group,
644                                          sizeof(h->group));
645                 if (ecode != TDB_SUCCESS) {
646                         return ecode;
647                 }
648         }
649
650         /* Expanding the group must have made room if it didn't choose this
651          * bucket. */
652         if (put_into_group(h->group, h->home_bucket, encode_offset(new_off,h))){
653                 return tdb_write_convert(tdb, h->group_start,
654                                          h->group, sizeof(h->group));
655         }
656
657         /* This can happen if all hashes in group (and us) dropped into same
658          * group in subhash. */
659         return add_to_hash(tdb, h, new_off);
660 }
661
662 /* Traverse support: returns offset of record, or 0 or -ve error. */
663 static tdb_off_t iterate_hash(struct tdb_context *tdb,
664                               struct traverse_info *tinfo)
665 {
666         tdb_off_t off, val, i;
667         struct traverse_level *tlevel;
668
669         tlevel = &tinfo->levels[tinfo->num_levels-1];
670
671 again:
672         for (i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
673                                       tlevel->entry, tlevel->total_buckets);
674              i != tlevel->total_buckets;
675              i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
676                                       i+1, tlevel->total_buckets)) {
677                 if (TDB_OFF_IS_ERR(i)) {
678                         return i;
679                 }
680
681                 val = tdb_read_off(tdb, tlevel->hashtable+sizeof(tdb_off_t)*i);
682                 if (TDB_OFF_IS_ERR(val)) {
683                         return val;
684                 }
685
686                 off = val & TDB_OFF_MASK;
687
688                 /* This makes the delete-all-in-traverse case work
689                  * (and simplifies our logic a little). */
690                 if (off == tinfo->prev)
691                         continue;
692
693                 tlevel->entry = i;
694
695                 if (!is_subhash(val)) {
696                         /* Found one. */
697                         tinfo->prev = off;
698                         return off;
699                 }
700
701                 /* When we come back, we want the next one */
702                 tlevel->entry++;
703                 tinfo->num_levels++;
704                 tlevel++;
705                 tlevel->hashtable = off + sizeof(struct tdb_used_record);
706                 tlevel->entry = 0;
707                 /* Next level is a chain? */
708                 if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1))
709                         tlevel->total_buckets = (1 << TDB_HASH_GROUP_BITS);
710                 else
711                         tlevel->total_buckets = (1 << TDB_SUBLEVEL_HASH_BITS);
712                 goto again;
713         }
714
715         /* Nothing there? */
716         if (tinfo->num_levels == 1)
717                 return 0;
718
719         /* Handle chained entries. */
720         if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1)) {
721                 tlevel->hashtable = tdb_read_off(tdb, tlevel->hashtable
722                                                  + offsetof(struct tdb_chain,
723                                                             next));
724                 if (TDB_OFF_IS_ERR(tlevel->hashtable)) {
725                         return tlevel->hashtable;
726                 }
727                 if (tlevel->hashtable) {
728                         tlevel->hashtable += sizeof(struct tdb_used_record);
729                         tlevel->entry = 0;
730                         goto again;
731                 }
732         }
733
734         /* Go back up and keep searching. */
735         tinfo->num_levels--;
736         tlevel--;
737         goto again;
738 }
739
740 /* Return success if we find something, TDB_ERR_NOEXIST if none. */
741 enum TDB_ERROR next_in_hash(struct tdb_context *tdb,
742                             struct traverse_info *tinfo,
743                             TDB_DATA *kbuf, size_t *dlen)
744 {
745         const unsigned group_bits = TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS;
746         tdb_off_t hl_start, hl_range, off;
747         enum TDB_ERROR ecode;
748
749         while (tinfo->toplevel_group < (1 << group_bits)) {
750                 hl_start = (tdb_off_t)tinfo->toplevel_group
751                         << (64 - group_bits);
752                 hl_range = 1ULL << group_bits;
753                 ecode = tdb_lock_hashes(tdb, hl_start, hl_range, F_RDLCK,
754                                         TDB_LOCK_WAIT);
755                 if (ecode != TDB_SUCCESS) {
756                         return ecode;
757                 }
758
759                 off = iterate_hash(tdb, tinfo);
760                 if (off) {
761                         struct tdb_used_record rec;
762
763                         if (TDB_OFF_IS_ERR(off)) {
764                                 ecode = TDB_OFF_TO_ERR(off);
765                                 goto fail;
766                         }
767
768                         ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
769                         if (ecode != TDB_SUCCESS) {
770                                 goto fail;
771                         }
772                         if (rec_magic(&rec) != TDB_USED_MAGIC) {
773                                 ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT,
774                                                    TDB_LOG_ERROR,
775                                                    "next_in_hash:"
776                                                    " corrupt record at %llu",
777                                                    (long long)off);
778                                 goto fail;
779                         }
780
781                         kbuf->dsize = rec_key_length(&rec);
782
783                         /* They want data as well? */
784                         if (dlen) {
785                                 *dlen = rec_data_length(&rec);
786                                 kbuf->dptr = tdb_alloc_read(tdb,
787                                                             off + sizeof(rec),
788                                                             kbuf->dsize
789                                                             + *dlen);
790                         } else {
791                                 kbuf->dptr = tdb_alloc_read(tdb,
792                                                             off + sizeof(rec),
793                                                             kbuf->dsize);
794                         }
795                         tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
796                         if (TDB_PTR_IS_ERR(kbuf->dptr)) {
797                                 return TDB_PTR_ERR(kbuf->dptr);
798                         }
799                         return TDB_SUCCESS;
800                 }
801
802                 tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
803
804                 tinfo->toplevel_group++;
805                 tinfo->levels[0].hashtable
806                         += (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
807                 tinfo->levels[0].entry = 0;
808         }
809         return TDB_ERR_NOEXIST;
810
811 fail:
812         tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
813         return ecode;
814
815 }
816
817 enum TDB_ERROR first_in_hash(struct tdb_context *tdb,
818                              struct traverse_info *tinfo,
819                              TDB_DATA *kbuf, size_t *dlen)
820 {
821         tinfo->prev = 0;
822         tinfo->toplevel_group = 0;
823         tinfo->num_levels = 1;
824         tinfo->levels[0].hashtable = offsetof(struct tdb_header, hashtable);
825         tinfo->levels[0].entry = 0;
826         tinfo->levels[0].total_buckets = (1 << TDB_HASH_GROUP_BITS);
827
828         return next_in_hash(tdb, tinfo, kbuf, dlen);
829 }
830
831 /* Even if the entry isn't in this hash bucket, you'd have to lock this
832  * bucket to find it. */
833 static enum TDB_ERROR chainlock(struct tdb_context *tdb, const TDB_DATA *key,
834                                 int ltype, enum tdb_lock_flags waitflag,
835                                 const char *func)
836 {
837         enum TDB_ERROR ecode;
838         uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
839         tdb_off_t lockstart, locksize;
840         unsigned int group, gbits;
841
842         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
843         group = bits_from(h, 64 - gbits, gbits);
844
845         lockstart = hlock_range(group, &locksize);
846
847         ecode = tdb_lock_hashes(tdb, lockstart, locksize, ltype, waitflag);
848         tdb_trace_1rec(tdb, func, *key);
849         return ecode;
850 }
851
852 /* lock/unlock one hash chain. This is meant to be used to reduce
853    contention - it cannot guarantee how many records will be locked */
854 enum TDB_ERROR tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
855 {
856         if (tdb->flags & TDB_VERSION1) {
857                 if (tdb1_chainlock(tdb, key) == -1)
858                         return tdb->last_error;
859                 return TDB_SUCCESS;
860         }
861         return tdb->last_error = chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT,
862                                            "tdb_chainlock");
863 }
864
865 void tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
866 {
867         uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
868         tdb_off_t lockstart, locksize;
869         unsigned int group, gbits;
870
871         if (tdb->flags & TDB_VERSION1) {
872                 tdb1_chainunlock(tdb, key);
873                 return;
874         }
875
876         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
877         group = bits_from(h, 64 - gbits, gbits);
878
879         lockstart = hlock_range(group, &locksize);
880
881         tdb_trace_1rec(tdb, "tdb_chainunlock", key);
882         tdb_unlock_hashes(tdb, lockstart, locksize, F_WRLCK);
883 }
884
885 enum TDB_ERROR tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
886 {
887         if (tdb->flags & TDB_VERSION1) {
888                 if (tdb1_chainlock_read(tdb, key) == -1)
889                         return tdb->last_error;
890                 return TDB_SUCCESS;
891         }
892         return tdb->last_error = chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
893                                            "tdb_chainlock_read");
894 }
895
896 void tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
897 {
898         uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
899         tdb_off_t lockstart, locksize;
900         unsigned int group, gbits;
901
902         if (tdb->flags & TDB_VERSION1) {
903                 tdb1_chainunlock_read(tdb, key);
904                 return;
905         }
906         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
907         group = bits_from(h, 64 - gbits, gbits);
908
909         lockstart = hlock_range(group, &locksize);
910
911         tdb_trace_1rec(tdb, "tdb_chainunlock_read", key);
912         tdb_unlock_hashes(tdb, lockstart, locksize, F_RDLCK);
913 }