tdb2: make tests work in parallel.
[ccan] / ccan / tdb2 / hash.c
1  /*
2    Trivial Database 2: hash handling
3    Copyright (C) Rusty Russell 2010
4
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 3 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "private.h"
19 #include <ccan/hash/hash.h>
20 #include <assert.h>
21
22 /* Default hash function. */
23 uint64_t tdb_jenkins_hash(const void *key, size_t length, uint64_t seed,
24                           void *unused)
25 {
26         uint64_t ret;
27         /* hash64_stable assumes lower bits are more important; they are a
28          * slightly better hash.  We use the upper bits first, so swap them. */
29         ret = hash64_stable((const unsigned char *)key, length, seed);
30         return (ret >> 32) | (ret << 32);
31 }
32
33 uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len)
34 {
35         return tdb->hash_fn(ptr, len, tdb->hash_seed, tdb->hash_data);
36 }
37
38 uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
39 {
40         const struct tdb_used_record *r;
41         const void *key;
42         uint64_t klen, hash;
43
44         r = tdb_access_read(tdb, off, sizeof(*r), true);
45         if (TDB_PTR_IS_ERR(r)) {
46                 /* FIXME */
47                 return 0;
48         }
49
50         klen = rec_key_length(r);
51         tdb_access_release(tdb, r);
52
53         key = tdb_access_read(tdb, off + sizeof(*r), klen, false);
54         if (TDB_PTR_IS_ERR(key)) {
55                 return 0;
56         }
57
58         hash = tdb_hash(tdb, key, klen);
59         tdb_access_release(tdb, key);
60         return hash;
61 }
62
63 /* Get bits from a value. */
64 static uint32_t bits_from(uint64_t val, unsigned start, unsigned num)
65 {
66         assert(num <= 32);
67         return (val >> start) & ((1U << num) - 1);
68 }
69
70 /* We take bits from the top: that way we can lock whole sections of the hash
71  * by using lock ranges. */
72 static uint32_t use_bits(struct hash_info *h, unsigned num)
73 {
74         h->hash_used += num;
75         return bits_from(h->h, 64 - h->hash_used, num);
76 }
77
78 static tdb_bool_err key_matches(struct tdb_context *tdb,
79                                 const struct tdb_used_record *rec,
80                                 tdb_off_t off,
81                                 const struct tdb_data *key)
82 {
83         tdb_bool_err ret = false;
84         const char *rkey;
85
86         if (rec_key_length(rec) != key->dsize) {
87                 tdb->stats.compare_wrong_keylen++;
88                 return ret;
89         }
90
91         rkey = tdb_access_read(tdb, off + sizeof(*rec), key->dsize, false);
92         if (TDB_PTR_IS_ERR(rkey)) {
93                 return TDB_PTR_ERR(rkey);
94         }
95         if (memcmp(rkey, key->dptr, key->dsize) == 0)
96                 ret = true;
97         else
98                 tdb->stats.compare_wrong_keycmp++;
99         tdb_access_release(tdb, rkey);
100         return ret;
101 }
102
103 /* Does entry match? */
104 static tdb_bool_err match(struct tdb_context *tdb,
105                           struct hash_info *h,
106                           const struct tdb_data *key,
107                           tdb_off_t val,
108                           struct tdb_used_record *rec)
109 {
110         tdb_off_t off;
111         enum TDB_ERROR ecode;
112
113         tdb->stats.compares++;
114         /* Desired bucket must match. */
115         if (h->home_bucket != (val & TDB_OFF_HASH_GROUP_MASK)) {
116                 tdb->stats.compare_wrong_bucket++;
117                 return false;
118         }
119
120         /* Top bits of offset == next bits of hash. */
121         if (bits_from(val, TDB_OFF_HASH_EXTRA_BIT, TDB_OFF_UPPER_STEAL_EXTRA)
122             != bits_from(h->h, 64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
123                     TDB_OFF_UPPER_STEAL_EXTRA)) {
124                 tdb->stats.compare_wrong_offsetbits++;
125                 return false;
126         }
127
128         off = val & TDB_OFF_MASK;
129         ecode = tdb_read_convert(tdb, off, rec, sizeof(*rec));
130         if (ecode != TDB_SUCCESS) {
131                 return ecode;
132         }
133
134         if ((h->h & ((1 << 11)-1)) != rec_hash(rec)) {
135                 tdb->stats.compare_wrong_rechash++;
136                 return false;
137         }
138
139         return key_matches(tdb, rec, off, key);
140 }
141
142 static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned bucket)
143 {
144         return group_start
145                 + (bucket % (1 << TDB_HASH_GROUP_BITS)) * sizeof(tdb_off_t);
146 }
147
148 bool is_subhash(tdb_off_t val)
149 {
150         return (val >> TDB_OFF_UPPER_STEAL_SUBHASH_BIT) & 1;
151 }
152
153 /* FIXME: Guess the depth, don't over-lock! */
154 static tdb_off_t hlock_range(tdb_off_t group, tdb_off_t *size)
155 {
156         *size = 1ULL << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
157         return group << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
158 }
159
160 static tdb_off_t COLD find_in_chain(struct tdb_context *tdb,
161                                     struct tdb_data key,
162                                     tdb_off_t chain,
163                                     struct hash_info *h,
164                                     struct tdb_used_record *rec,
165                                     struct traverse_info *tinfo)
166 {
167         tdb_off_t off, next;
168         enum TDB_ERROR ecode;
169
170         /* In case nothing is free, we set these to zero. */
171         h->home_bucket = h->found_bucket = 0;
172
173         for (off = chain; off; off = next) {
174                 unsigned int i;
175
176                 h->group_start = off;
177                 ecode = tdb_read_convert(tdb, off, h->group, sizeof(h->group));
178                 if (ecode != TDB_SUCCESS) {
179                         return ecode;
180                 }
181
182                 for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
183                         tdb_off_t recoff;
184                         if (!h->group[i]) {
185                                 /* Remember this empty bucket. */
186                                 h->home_bucket = h->found_bucket = i;
187                                 continue;
188                         }
189
190                         /* We can insert extra bits via add_to_hash
191                          * empty bucket logic. */
192                         recoff = h->group[i] & TDB_OFF_MASK;
193                         ecode = tdb_read_convert(tdb, recoff, rec,
194                                                  sizeof(*rec));
195                         if (ecode != TDB_SUCCESS) {
196                                 return ecode;
197                         }
198
199                         ecode = key_matches(tdb, rec, recoff, &key);
200                         if (ecode < 0) {
201                                 return ecode;
202                         }
203                         if (ecode == 1) {
204                                 h->home_bucket = h->found_bucket = i;
205
206                                 if (tinfo) {
207                                         tinfo->levels[tinfo->num_levels]
208                                                 .hashtable = off;
209                                         tinfo->levels[tinfo->num_levels]
210                                                 .total_buckets
211                                                 = 1 << TDB_HASH_GROUP_BITS;
212                                         tinfo->levels[tinfo->num_levels].entry
213                                                 = i;
214                                         tinfo->num_levels++;
215                                 }
216                                 return recoff;
217                         }
218                 }
219                 next = tdb_read_off(tdb, off
220                                     + offsetof(struct tdb_chain, next));
221                 if (TDB_OFF_IS_ERR(next)) {
222                         return next;
223                 }
224                 if (next)
225                         next += sizeof(struct tdb_used_record);
226         }
227         return 0;
228 }
229
230 /* This is the core routine which searches the hashtable for an entry.
231  * On error, no locks are held and -ve is returned.
232  * Otherwise, hinfo is filled in (and the optional tinfo).
233  * If not found, the return value is 0.
234  * If found, the return value is the offset, and *rec is the record. */
235 tdb_off_t find_and_lock(struct tdb_context *tdb,
236                         struct tdb_data key,
237                         int ltype,
238                         struct hash_info *h,
239                         struct tdb_used_record *rec,
240                         struct traverse_info *tinfo)
241 {
242         uint32_t i, group;
243         tdb_off_t hashtable;
244         enum TDB_ERROR ecode;
245
246         h->h = tdb_hash(tdb, key.dptr, key.dsize);
247         h->hash_used = 0;
248         group = use_bits(h, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
249         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
250
251         h->hlock_start = hlock_range(group, &h->hlock_range);
252         ecode = tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
253                                 TDB_LOCK_WAIT);
254         if (ecode != TDB_SUCCESS) {
255                 return ecode;
256         }
257
258         hashtable = offsetof(struct tdb_header, hashtable);
259         if (tinfo) {
260                 tinfo->toplevel_group = group;
261                 tinfo->num_levels = 1;
262                 tinfo->levels[0].entry = 0;
263                 tinfo->levels[0].hashtable = hashtable
264                         + (group << TDB_HASH_GROUP_BITS) * sizeof(tdb_off_t);
265                 tinfo->levels[0].total_buckets = 1 << TDB_HASH_GROUP_BITS;
266         }
267
268         while (h->hash_used <= 64) {
269                 /* Read in the hash group. */
270                 h->group_start = hashtable
271                         + group * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
272
273                 ecode = tdb_read_convert(tdb, h->group_start, &h->group,
274                                          sizeof(h->group));
275                 if (ecode != TDB_SUCCESS) {
276                         goto fail;
277                 }
278
279                 /* Pointer to another hash table?  Go down... */
280                 if (is_subhash(h->group[h->home_bucket])) {
281                         hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
282                                 + sizeof(struct tdb_used_record);
283                         if (tinfo) {
284                                 /* When we come back, use *next* bucket */
285                                 tinfo->levels[tinfo->num_levels-1].entry
286                                         += h->home_bucket + 1;
287                         }
288                         group = use_bits(h, TDB_SUBLEVEL_HASH_BITS
289                                          - TDB_HASH_GROUP_BITS);
290                         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
291                         if (tinfo) {
292                                 tinfo->levels[tinfo->num_levels].hashtable
293                                         = hashtable;
294                                 tinfo->levels[tinfo->num_levels].total_buckets
295                                         = 1 << TDB_SUBLEVEL_HASH_BITS;
296                                 tinfo->levels[tinfo->num_levels].entry
297                                         = group << TDB_HASH_GROUP_BITS;
298                                 tinfo->num_levels++;
299                         }
300                         continue;
301                 }
302
303                 /* It's in this group: search (until 0 or all searched) */
304                 for (i = 0, h->found_bucket = h->home_bucket;
305                      i < (1 << TDB_HASH_GROUP_BITS);
306                      i++, h->found_bucket = ((h->found_bucket+1)
307                                              % (1 << TDB_HASH_GROUP_BITS))) {
308                         tdb_bool_err berr;
309                         if (is_subhash(h->group[h->found_bucket]))
310                                 continue;
311
312                         if (!h->group[h->found_bucket])
313                                 break;
314
315                         berr = match(tdb, h, &key, h->group[h->found_bucket],
316                                      rec);
317                         if (berr < 0) {
318                                 ecode = berr;
319                                 goto fail;
320                         }
321                         if (berr) {
322                                 if (tinfo) {
323                                         tinfo->levels[tinfo->num_levels-1].entry
324                                                 += h->found_bucket;
325                                 }
326                                 return h->group[h->found_bucket] & TDB_OFF_MASK;
327                         }
328                 }
329                 /* Didn't find it: h indicates where it would go. */
330                 return 0;
331         }
332
333         return find_in_chain(tdb, key, hashtable, h, rec, tinfo);
334
335 fail:
336         tdb_unlock_hashes(tdb, h->hlock_start, h->hlock_range, ltype);
337         return ecode;
338 }
339
340 /* I wrote a simple test, expanding a hash to 2GB, for the following
341  * cases:
342  * 1) Expanding all the buckets at once,
343  * 2) Expanding the bucket we wanted to place the new entry into.
344  * 3) Expanding the most-populated bucket,
345  *
346  * I measured the worst/average/best density during this process.
347  * 1) 3%/16%/30%
348  * 2) 4%/20%/38%
349  * 3) 6%/22%/41%
350  *
351  * So we figure out the busiest bucket for the moment.
352  */
353 static unsigned fullest_bucket(struct tdb_context *tdb,
354                                const tdb_off_t *group,
355                                unsigned new_bucket)
356 {
357         unsigned counts[1 << TDB_HASH_GROUP_BITS] = { 0 };
358         unsigned int i, best_bucket;
359
360         /* Count the new entry. */
361         counts[new_bucket]++;
362         best_bucket = new_bucket;
363
364         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
365                 unsigned this_bucket;
366
367                 if (is_subhash(group[i]))
368                         continue;
369                 this_bucket = group[i] & TDB_OFF_HASH_GROUP_MASK;
370                 if (++counts[this_bucket] > counts[best_bucket])
371                         best_bucket = this_bucket;
372         }
373
374         return best_bucket;
375 }
376
377 static bool put_into_group(tdb_off_t *group,
378                            unsigned bucket, tdb_off_t encoded)
379 {
380         unsigned int i;
381
382         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
383                 unsigned b = (bucket + i) % (1 << TDB_HASH_GROUP_BITS);
384
385                 if (group[b] == 0) {
386                         group[b] = encoded;
387                         return true;
388                 }
389         }
390         return false;
391 }
392
393 static void force_into_group(tdb_off_t *group,
394                              unsigned bucket, tdb_off_t encoded)
395 {
396         if (!put_into_group(group, bucket, encoded))
397                 abort();
398 }
399
400 static tdb_off_t encode_offset(tdb_off_t new_off, struct hash_info *h)
401 {
402         return h->home_bucket
403                 | new_off
404                 | ((uint64_t)bits_from(h->h,
405                                   64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
406                                   TDB_OFF_UPPER_STEAL_EXTRA)
407                    << TDB_OFF_HASH_EXTRA_BIT);
408 }
409
410 /* Simply overwrite the hash entry we found before. */
411 enum TDB_ERROR replace_in_hash(struct tdb_context *tdb,
412                                struct hash_info *h,
413                                tdb_off_t new_off)
414 {
415         return tdb_write_off(tdb, hbucket_off(h->group_start, h->found_bucket),
416                              encode_offset(new_off, h));
417 }
418
419 /* We slot in anywhere that's empty in the chain. */
420 static enum TDB_ERROR COLD add_to_chain(struct tdb_context *tdb,
421                                         tdb_off_t subhash,
422                                         tdb_off_t new_off)
423 {
424         tdb_off_t entry;
425         enum TDB_ERROR ecode;
426
427         entry = tdb_find_zero_off(tdb, subhash, 1<<TDB_HASH_GROUP_BITS);
428         if (TDB_OFF_IS_ERR(entry)) {
429                 return entry;
430         }
431
432         if (entry == 1 << TDB_HASH_GROUP_BITS) {
433                 tdb_off_t next;
434
435                 next = tdb_read_off(tdb, subhash
436                                     + offsetof(struct tdb_chain, next));
437                 if (TDB_OFF_IS_ERR(next)) {
438                         return next;
439                 }
440
441                 if (!next) {
442                         next = alloc(tdb, 0, sizeof(struct tdb_chain), 0,
443                                      TDB_CHAIN_MAGIC, false);
444                         if (TDB_OFF_IS_ERR(next))
445                                 return next;
446                         ecode = zero_out(tdb,
447                                          next+sizeof(struct tdb_used_record),
448                                          sizeof(struct tdb_chain));
449                         if (ecode != TDB_SUCCESS) {
450                                 return ecode;
451                         }
452                         ecode = tdb_write_off(tdb, subhash
453                                               + offsetof(struct tdb_chain,
454                                                          next),
455                                               next);
456                         if (ecode != TDB_SUCCESS) {
457                                 return ecode;
458                         }
459                 }
460                 return add_to_chain(tdb, next, new_off);
461         }
462
463         return tdb_write_off(tdb, subhash + entry * sizeof(tdb_off_t),
464                              new_off);
465 }
466
467 /* Add into a newly created subhash. */
468 static enum TDB_ERROR add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash,
469                                      unsigned hash_used, tdb_off_t val)
470 {
471         tdb_off_t off = (val & TDB_OFF_MASK), *group;
472         struct hash_info h;
473         unsigned int gnum;
474
475         h.hash_used = hash_used;
476
477         if (hash_used + TDB_SUBLEVEL_HASH_BITS > 64)
478                 return add_to_chain(tdb, subhash, off);
479
480         h.h = hash_record(tdb, off);
481         gnum = use_bits(&h, TDB_SUBLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS);
482         h.group_start = subhash
483                 + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
484         h.home_bucket = use_bits(&h, TDB_HASH_GROUP_BITS);
485
486         group = tdb_access_write(tdb, h.group_start,
487                                  sizeof(*group) << TDB_HASH_GROUP_BITS, true);
488         if (TDB_PTR_IS_ERR(group)) {
489                 return TDB_PTR_ERR(group);
490         }
491         force_into_group(group, h.home_bucket, encode_offset(off, &h));
492         return tdb_access_commit(tdb, group);
493 }
494
495 static enum TDB_ERROR expand_group(struct tdb_context *tdb, struct hash_info *h)
496 {
497         unsigned bucket, num_vals, i, magic;
498         size_t subsize;
499         tdb_off_t subhash;
500         tdb_off_t vals[1 << TDB_HASH_GROUP_BITS];
501         enum TDB_ERROR ecode;
502
503         /* Attach new empty subhash under fullest bucket. */
504         bucket = fullest_bucket(tdb, h->group, h->home_bucket);
505
506         if (h->hash_used == 64) {
507                 tdb->stats.alloc_chain++;
508                 subsize = sizeof(struct tdb_chain);
509                 magic = TDB_CHAIN_MAGIC;
510         } else {
511                 tdb->stats.alloc_subhash++;
512                 subsize = (sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS);
513                 magic = TDB_HTABLE_MAGIC;
514         }
515
516         subhash = alloc(tdb, 0, subsize, 0, magic, false);
517         if (TDB_OFF_IS_ERR(subhash)) {
518                 return subhash;
519         }
520
521         ecode = zero_out(tdb, subhash + sizeof(struct tdb_used_record),
522                          subsize);
523         if (ecode != TDB_SUCCESS) {
524                 return ecode;
525         }
526
527         /* Remove any which are destined for bucket or are in wrong place. */
528         num_vals = 0;
529         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
530                 unsigned home_bucket = h->group[i] & TDB_OFF_HASH_GROUP_MASK;
531                 if (!h->group[i] || is_subhash(h->group[i]))
532                         continue;
533                 if (home_bucket == bucket || home_bucket != i) {
534                         vals[num_vals++] = h->group[i];
535                         h->group[i] = 0;
536                 }
537         }
538         /* FIXME: This assert is valid, but we do this during unit test :( */
539         /* assert(num_vals); */
540
541         /* Overwrite expanded bucket with subhash pointer. */
542         h->group[bucket] = subhash | (1ULL << TDB_OFF_UPPER_STEAL_SUBHASH_BIT);
543
544         /* Point to actual contents of record. */
545         subhash += sizeof(struct tdb_used_record);
546
547         /* Put values back. */
548         for (i = 0; i < num_vals; i++) {
549                 unsigned this_bucket = vals[i] & TDB_OFF_HASH_GROUP_MASK;
550
551                 if (this_bucket == bucket) {
552                         ecode = add_to_subhash(tdb, subhash, h->hash_used,
553                                                vals[i]);
554                         if (ecode != TDB_SUCCESS)
555                                 return ecode;
556                 } else {
557                         /* There should be room to put this back. */
558                         force_into_group(h->group, this_bucket, vals[i]);
559                 }
560         }
561         return TDB_SUCCESS;
562 }
563
564 enum TDB_ERROR delete_from_hash(struct tdb_context *tdb, struct hash_info *h)
565 {
566         unsigned int i, num_movers = 0;
567         tdb_off_t movers[1 << TDB_HASH_GROUP_BITS];
568
569         h->group[h->found_bucket] = 0;
570         for (i = 1; i < (1 << TDB_HASH_GROUP_BITS); i++) {
571                 unsigned this_bucket;
572
573                 this_bucket = (h->found_bucket+i) % (1 << TDB_HASH_GROUP_BITS);
574                 /* Empty bucket?  We're done. */
575                 if (!h->group[this_bucket])
576                         break;
577
578                 /* Ignore subhashes. */
579                 if (is_subhash(h->group[this_bucket]))
580                         continue;
581
582                 /* If this one is not happy where it is, we'll move it. */
583                 if ((h->group[this_bucket] & TDB_OFF_HASH_GROUP_MASK)
584                     != this_bucket) {
585                         movers[num_movers++] = h->group[this_bucket];
586                         h->group[this_bucket] = 0;
587                 }
588         }
589
590         /* Put back the ones we erased. */
591         for (i = 0; i < num_movers; i++) {
592                 force_into_group(h->group, movers[i] & TDB_OFF_HASH_GROUP_MASK,
593                                  movers[i]);
594         }
595
596         /* Now we write back the hash group */
597         return tdb_write_convert(tdb, h->group_start,
598                                  h->group, sizeof(h->group));
599 }
600
601 enum TDB_ERROR add_to_hash(struct tdb_context *tdb, struct hash_info *h,
602                            tdb_off_t new_off)
603 {
604         enum TDB_ERROR ecode;
605
606         /* We hit an empty bucket during search?  That's where it goes. */
607         if (!h->group[h->found_bucket]) {
608                 h->group[h->found_bucket] = encode_offset(new_off, h);
609                 /* Write back the modified group. */
610                 return tdb_write_convert(tdb, h->group_start,
611                                          h->group, sizeof(h->group));
612         }
613
614         if (h->hash_used > 64)
615                 return add_to_chain(tdb, h->group_start, new_off);
616
617         /* We're full.  Expand. */
618         ecode = expand_group(tdb, h);
619         if (ecode != TDB_SUCCESS) {
620                 return ecode;
621         }
622
623         if (is_subhash(h->group[h->home_bucket])) {
624                 /* We were expanded! */
625                 tdb_off_t hashtable;
626                 unsigned int gnum;
627
628                 /* Write back the modified group. */
629                 ecode = tdb_write_convert(tdb, h->group_start, h->group,
630                                           sizeof(h->group));
631                 if (ecode != TDB_SUCCESS) {
632                         return ecode;
633                 }
634
635                 /* Move hashinfo down a level. */
636                 hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
637                         + sizeof(struct tdb_used_record);
638                 gnum = use_bits(h,TDB_SUBLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
639                 h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
640                 h->group_start = hashtable
641                         + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
642                 ecode = tdb_read_convert(tdb, h->group_start, &h->group,
643                                          sizeof(h->group));
644                 if (ecode != TDB_SUCCESS) {
645                         return ecode;
646                 }
647         }
648
649         /* Expanding the group must have made room if it didn't choose this
650          * bucket. */
651         if (put_into_group(h->group, h->home_bucket, encode_offset(new_off,h))){
652                 return tdb_write_convert(tdb, h->group_start,
653                                          h->group, sizeof(h->group));
654         }
655
656         /* This can happen if all hashes in group (and us) dropped into same
657          * group in subhash. */
658         return add_to_hash(tdb, h, new_off);
659 }
660
661 /* Traverse support: returns offset of record, or 0 or -ve error. */
662 static tdb_off_t iterate_hash(struct tdb_context *tdb,
663                               struct traverse_info *tinfo)
664 {
665         tdb_off_t off, val, i;
666         struct traverse_level *tlevel;
667
668         tlevel = &tinfo->levels[tinfo->num_levels-1];
669
670 again:
671         for (i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
672                                       tlevel->entry, tlevel->total_buckets);
673              i != tlevel->total_buckets;
674              i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
675                                       i+1, tlevel->total_buckets)) {
676                 if (TDB_OFF_IS_ERR(i)) {
677                         return i;
678                 }
679
680                 val = tdb_read_off(tdb, tlevel->hashtable+sizeof(tdb_off_t)*i);
681                 if (TDB_OFF_IS_ERR(val)) {
682                         return val;
683                 }
684
685                 off = val & TDB_OFF_MASK;
686
687                 /* This makes the delete-all-in-traverse case work
688                  * (and simplifies our logic a little). */
689                 if (off == tinfo->prev)
690                         continue;
691
692                 tlevel->entry = i;
693
694                 if (!is_subhash(val)) {
695                         /* Found one. */
696                         tinfo->prev = off;
697                         return off;
698                 }
699
700                 /* When we come back, we want the next one */
701                 tlevel->entry++;
702                 tinfo->num_levels++;
703                 tlevel++;
704                 tlevel->hashtable = off + sizeof(struct tdb_used_record);
705                 tlevel->entry = 0;
706                 /* Next level is a chain? */
707                 if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1))
708                         tlevel->total_buckets = (1 << TDB_HASH_GROUP_BITS);
709                 else
710                         tlevel->total_buckets = (1 << TDB_SUBLEVEL_HASH_BITS);
711                 goto again;
712         }
713
714         /* Nothing there? */
715         if (tinfo->num_levels == 1)
716                 return 0;
717
718         /* Handle chained entries. */
719         if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1)) {
720                 tlevel->hashtable = tdb_read_off(tdb, tlevel->hashtable
721                                                  + offsetof(struct tdb_chain,
722                                                             next));
723                 if (TDB_OFF_IS_ERR(tlevel->hashtable)) {
724                         return tlevel->hashtable;
725                 }
726                 if (tlevel->hashtable) {
727                         tlevel->hashtable += sizeof(struct tdb_used_record);
728                         tlevel->entry = 0;
729                         goto again;
730                 }
731         }
732
733         /* Go back up and keep searching. */
734         tinfo->num_levels--;
735         tlevel--;
736         goto again;
737 }
738
739 /* Return success if we find something, TDB_ERR_NOEXIST if none. */
740 enum TDB_ERROR next_in_hash(struct tdb_context *tdb,
741                             struct traverse_info *tinfo,
742                             TDB_DATA *kbuf, size_t *dlen)
743 {
744         const unsigned group_bits = TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS;
745         tdb_off_t hl_start, hl_range, off;
746         enum TDB_ERROR ecode;
747
748         while (tinfo->toplevel_group < (1 << group_bits)) {
749                 hl_start = (tdb_off_t)tinfo->toplevel_group
750                         << (64 - group_bits);
751                 hl_range = 1ULL << group_bits;
752                 ecode = tdb_lock_hashes(tdb, hl_start, hl_range, F_RDLCK,
753                                         TDB_LOCK_WAIT);
754                 if (ecode != TDB_SUCCESS) {
755                         return ecode;
756                 }
757
758                 off = iterate_hash(tdb, tinfo);
759                 if (off) {
760                         struct tdb_used_record rec;
761
762                         if (TDB_OFF_IS_ERR(off)) {
763                                 ecode = off;
764                                 goto fail;
765                         }
766
767                         ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
768                         if (ecode != TDB_SUCCESS) {
769                                 goto fail;
770                         }
771                         if (rec_magic(&rec) != TDB_USED_MAGIC) {
772                                 ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT,
773                                                    TDB_LOG_ERROR,
774                                                    "next_in_hash:"
775                                                    " corrupt record at %llu",
776                                                    (long long)off);
777                                 goto fail;
778                         }
779
780                         kbuf->dsize = rec_key_length(&rec);
781
782                         /* They want data as well? */
783                         if (dlen) {
784                                 *dlen = rec_data_length(&rec);
785                                 kbuf->dptr = tdb_alloc_read(tdb,
786                                                             off + sizeof(rec),
787                                                             kbuf->dsize
788                                                             + *dlen);
789                         } else {
790                                 kbuf->dptr = tdb_alloc_read(tdb,
791                                                             off + sizeof(rec),
792                                                             kbuf->dsize);
793                         }
794                         tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
795                         if (TDB_PTR_IS_ERR(kbuf->dptr)) {
796                                 return TDB_PTR_ERR(kbuf->dptr);
797                         }
798                         return TDB_SUCCESS;
799                 }
800
801                 tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
802
803                 tinfo->toplevel_group++;
804                 tinfo->levels[0].hashtable
805                         += (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
806                 tinfo->levels[0].entry = 0;
807         }
808         return TDB_ERR_NOEXIST;
809
810 fail:
811         tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
812         return ecode;
813
814 }
815
816 enum TDB_ERROR first_in_hash(struct tdb_context *tdb,
817                              struct traverse_info *tinfo,
818                              TDB_DATA *kbuf, size_t *dlen)
819 {
820         tinfo->prev = 0;
821         tinfo->toplevel_group = 0;
822         tinfo->num_levels = 1;
823         tinfo->levels[0].hashtable = offsetof(struct tdb_header, hashtable);
824         tinfo->levels[0].entry = 0;
825         tinfo->levels[0].total_buckets = (1 << TDB_HASH_GROUP_BITS);
826
827         return next_in_hash(tdb, tinfo, kbuf, dlen);
828 }
829
830 /* Even if the entry isn't in this hash bucket, you'd have to lock this
831  * bucket to find it. */
832 static enum TDB_ERROR chainlock(struct tdb_context *tdb, const TDB_DATA *key,
833                                 int ltype, enum tdb_lock_flags waitflag,
834                                 const char *func)
835 {
836         enum TDB_ERROR ecode;
837         uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
838         tdb_off_t lockstart, locksize;
839         unsigned int group, gbits;
840
841         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
842         group = bits_from(h, 64 - gbits, gbits);
843
844         lockstart = hlock_range(group, &locksize);
845
846         ecode = tdb_lock_hashes(tdb, lockstart, locksize, ltype, waitflag);
847         tdb_trace_1rec(tdb, func, *key);
848         return ecode;
849 }
850
851 /* lock/unlock one hash chain. This is meant to be used to reduce
852    contention - it cannot guarantee how many records will be locked */
853 enum TDB_ERROR tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
854 {
855         if (tdb->flags & TDB_VERSION1) {
856                 if (tdb1_chainlock(tdb, key) == -1)
857                         return tdb->last_error;
858                 return TDB_SUCCESS;
859         }
860         return tdb->last_error = chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT,
861                                            "tdb_chainlock");
862 }
863
864 void tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
865 {
866         uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
867         tdb_off_t lockstart, locksize;
868         unsigned int group, gbits;
869
870         if (tdb->flags & TDB_VERSION1) {
871                 tdb1_chainunlock(tdb, key);
872                 return;
873         }
874
875         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
876         group = bits_from(h, 64 - gbits, gbits);
877
878         lockstart = hlock_range(group, &locksize);
879
880         tdb_trace_1rec(tdb, "tdb_chainunlock", key);
881         tdb_unlock_hashes(tdb, lockstart, locksize, F_WRLCK);
882 }
883
884 enum TDB_ERROR tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
885 {
886         if (tdb->flags & TDB_VERSION1) {
887                 if (tdb1_chainlock_read(tdb, key) == -1)
888                         return tdb->last_error;
889                 return TDB_SUCCESS;
890         }
891         return tdb->last_error = chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
892                                            "tdb_chainlock_read");
893 }
894
895 void tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
896 {
897         uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
898         tdb_off_t lockstart, locksize;
899         unsigned int group, gbits;
900
901         if (tdb->flags & TDB_VERSION1) {
902                 tdb1_chainunlock_read(tdb, key);
903                 return;
904         }
905         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
906         group = bits_from(h, 64 - gbits, gbits);
907
908         lockstart = hlock_range(group, &locksize);
909
910         tdb_trace_1rec(tdb, "tdb_chainunlock_read", key);
911         tdb_unlock_hashes(tdb, lockstart, locksize, F_RDLCK);
912 }