tdb2: rework some io.c functions to return enum TDB_ERROR.
[ccan] / ccan / tdb2 / hash.c
1  /*
2    Trivial Database 2: hash handling
3    Copyright (C) Rusty Russell 2010
4
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 3 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "private.h"
19 #include <assert.h>
20 #include <ccan/hash/hash.h>
21
22 static uint64_t jenkins_hash(const void *key, size_t length, uint64_t seed,
23                              void *arg)
24 {
25         uint64_t ret;
26         /* hash64_stable assumes lower bits are more important; they are a
27          * slightly better hash.  We use the upper bits first, so swap them. */
28         ret = hash64_stable((const unsigned char *)key, length, seed);
29         return (ret >> 32) | (ret << 32);
30 }
31
32 void tdb_hash_init(struct tdb_context *tdb)
33 {
34         tdb->khash = jenkins_hash;
35         tdb->hash_priv = NULL;
36 }
37
38 uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len)
39 {
40         return tdb->khash(ptr, len, tdb->hash_seed, tdb->hash_priv);
41 }
42
43 uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
44 {
45         const struct tdb_used_record *r;
46         const void *key;
47         uint64_t klen, hash;
48
49         r = tdb_access_read(tdb, off, sizeof(*r), true);
50         if (!r)
51                 /* FIXME */
52                 return 0;
53
54         klen = rec_key_length(r);
55         tdb_access_release(tdb, r);
56
57         key = tdb_access_read(tdb, off + sizeof(*r), klen, false);
58         if (!key)
59                 return 0;
60
61         hash = tdb_hash(tdb, key, klen);
62         tdb_access_release(tdb, key);
63         return hash;
64 }
65
66 /* Get bits from a value. */
67 static uint32_t bits_from(uint64_t val, unsigned start, unsigned num)
68 {
69         assert(num <= 32);
70         return (val >> start) & ((1U << num) - 1);
71 }
72
73 /* We take bits from the top: that way we can lock whole sections of the hash
74  * by using lock ranges. */
75 static uint32_t use_bits(struct hash_info *h, unsigned num)
76 {
77         h->hash_used += num;
78         return bits_from(h->h, 64 - h->hash_used, num);
79 }
80
81 static bool key_matches(struct tdb_context *tdb,
82                         const struct tdb_used_record *rec,
83                         tdb_off_t off,
84                         const struct tdb_data *key)
85 {
86         bool ret = false;
87         const char *rkey;
88
89         if (rec_key_length(rec) != key->dsize) {
90                 add_stat(tdb, compare_wrong_keylen, 1);
91                 return ret;
92         }
93
94         rkey = tdb_access_read(tdb, off + sizeof(*rec), key->dsize, false);
95         if (!rkey)
96                 return ret;
97         if (memcmp(rkey, key->dptr, key->dsize) == 0)
98                 ret = true;
99         else
100                 add_stat(tdb, compare_wrong_keycmp, 1);
101         tdb_access_release(tdb, rkey);
102         return ret;
103 }
104
105 /* Does entry match? */
106 static bool match(struct tdb_context *tdb,
107                   struct hash_info *h,
108                   const struct tdb_data *key,
109                   tdb_off_t val,
110                   struct tdb_used_record *rec)
111 {
112         tdb_off_t off;
113         enum TDB_ERROR ecode;
114
115         add_stat(tdb, compares, 1);
116         /* Desired bucket must match. */
117         if (h->home_bucket != (val & TDB_OFF_HASH_GROUP_MASK)) {
118                 add_stat(tdb, compare_wrong_bucket, 1);
119                 return false;
120         }
121
122         /* Top bits of offset == next bits of hash. */
123         if (bits_from(val, TDB_OFF_HASH_EXTRA_BIT, TDB_OFF_UPPER_STEAL_EXTRA)
124             != bits_from(h->h, 64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
125                     TDB_OFF_UPPER_STEAL_EXTRA)) {
126                 add_stat(tdb, compare_wrong_offsetbits, 1);
127                 return false;
128         }
129
130         off = val & TDB_OFF_MASK;
131         ecode = tdb_read_convert(tdb, off, rec, sizeof(*rec));
132         if (ecode != TDB_SUCCESS) {
133                 tdb->ecode = ecode;
134                 return false;
135         }
136
137         if ((h->h & ((1 << 11)-1)) != rec_hash(rec)) {
138                 add_stat(tdb, compare_wrong_rechash, 1);
139                 return false;
140         }
141
142         return key_matches(tdb, rec, off, key);
143 }
144
145 static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned bucket)
146 {
147         return group_start
148                 + (bucket % (1 << TDB_HASH_GROUP_BITS)) * sizeof(tdb_off_t);
149 }
150
151 bool is_subhash(tdb_off_t val)
152 {
153         return (val >> TDB_OFF_UPPER_STEAL_SUBHASH_BIT) & 1;
154 }
155
156 /* FIXME: Guess the depth, don't over-lock! */
157 static tdb_off_t hlock_range(tdb_off_t group, tdb_off_t *size)
158 {
159         *size = 1ULL << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
160         return group << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
161 }
162
163 static tdb_off_t COLD find_in_chain(struct tdb_context *tdb,
164                                     struct tdb_data key,
165                                     tdb_off_t chain,
166                                     struct hash_info *h,
167                                     struct tdb_used_record *rec,
168                                     struct traverse_info *tinfo)
169 {
170         tdb_off_t off, next;
171         enum TDB_ERROR ecode;
172
173         /* In case nothing is free, we set these to zero. */
174         h->home_bucket = h->found_bucket = 0;
175
176         for (off = chain; off; off = next) {
177                 unsigned int i;
178
179                 h->group_start = off;
180                 ecode = tdb_read_convert(tdb, off, h->group, sizeof(h->group));
181                 if (ecode != TDB_SUCCESS) {
182                         tdb->ecode = ecode;
183                         return TDB_OFF_ERR;
184                 }
185
186                 for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
187                         tdb_off_t recoff;
188                         if (!h->group[i]) {
189                                 /* Remember this empty bucket. */
190                                 h->home_bucket = h->found_bucket = i;
191                                 continue;
192                         }
193
194                         /* We can insert extra bits via add_to_hash
195                          * empty bucket logic. */
196                         recoff = h->group[i] & TDB_OFF_MASK;
197                         ecode = tdb_read_convert(tdb, recoff, rec,
198                                                  sizeof(*rec));
199                         if (ecode != TDB_SUCCESS) {
200                                 tdb->ecode = ecode;
201                                 return TDB_OFF_ERR;
202                         }
203
204                         if (key_matches(tdb, rec, recoff, &key)) {
205                                 h->home_bucket = h->found_bucket = i;
206
207                                 if (tinfo) {
208                                         tinfo->levels[tinfo->num_levels]
209                                                 .hashtable = off;
210                                         tinfo->levels[tinfo->num_levels]
211                                                 .total_buckets
212                                                 = 1 << TDB_HASH_GROUP_BITS;
213                                         tinfo->levels[tinfo->num_levels].entry
214                                                 = i;
215                                         tinfo->num_levels++;
216                                 }
217                                 return recoff;
218                         }
219                 }
220                 next = tdb_read_off(tdb, off
221                                     + offsetof(struct tdb_chain, next));
222                 if (next == TDB_OFF_ERR)
223                         return TDB_OFF_ERR;
224                 if (next)
225                         next += sizeof(struct tdb_used_record);
226         }
227         return 0;
228 }
229
230 /* This is the core routine which searches the hashtable for an entry.
231  * On error, no locks are held and TDB_OFF_ERR is returned.
232  * Otherwise, hinfo is filled in (and the optional tinfo).
233  * If not found, the return value is 0.
234  * If found, the return value is the offset, and *rec is the record. */
235 tdb_off_t find_and_lock(struct tdb_context *tdb,
236                         struct tdb_data key,
237                         int ltype,
238                         struct hash_info *h,
239                         struct tdb_used_record *rec,
240                         struct traverse_info *tinfo)
241 {
242         uint32_t i, group;
243         tdb_off_t hashtable;
244         enum TDB_ERROR ecode;
245
246         h->h = tdb_hash(tdb, key.dptr, key.dsize);
247         h->hash_used = 0;
248         group = use_bits(h, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
249         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
250
251         h->hlock_start = hlock_range(group, &h->hlock_range);
252         ecode = tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
253                                 TDB_LOCK_WAIT);
254         if (ecode != TDB_SUCCESS) {
255                 tdb->ecode = ecode;
256                 return TDB_OFF_ERR;
257         }
258
259         hashtable = offsetof(struct tdb_header, hashtable);
260         if (tinfo) {
261                 tinfo->toplevel_group = group;
262                 tinfo->num_levels = 1;
263                 tinfo->levels[0].entry = 0;
264                 tinfo->levels[0].hashtable = hashtable
265                         + (group << TDB_HASH_GROUP_BITS) * sizeof(tdb_off_t);
266                 tinfo->levels[0].total_buckets = 1 << TDB_HASH_GROUP_BITS;
267         }
268
269         while (h->hash_used <= 64) {
270                 /* Read in the hash group. */
271                 h->group_start = hashtable
272                         + group * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
273
274                 ecode = tdb_read_convert(tdb, h->group_start, &h->group,
275                                          sizeof(h->group));
276                 if (ecode != TDB_SUCCESS) {
277                         tdb->ecode = ecode;
278                         goto fail;
279                 }
280
281                 /* Pointer to another hash table?  Go down... */
282                 if (is_subhash(h->group[h->home_bucket])) {
283                         hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
284                                 + sizeof(struct tdb_used_record);
285                         if (tinfo) {
286                                 /* When we come back, use *next* bucket */
287                                 tinfo->levels[tinfo->num_levels-1].entry
288                                         += h->home_bucket + 1;
289                         }
290                         group = use_bits(h, TDB_SUBLEVEL_HASH_BITS
291                                          - TDB_HASH_GROUP_BITS);
292                         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
293                         if (tinfo) {
294                                 tinfo->levels[tinfo->num_levels].hashtable
295                                         = hashtable;
296                                 tinfo->levels[tinfo->num_levels].total_buckets
297                                         = 1 << TDB_SUBLEVEL_HASH_BITS;
298                                 tinfo->levels[tinfo->num_levels].entry
299                                         = group << TDB_HASH_GROUP_BITS;
300                                 tinfo->num_levels++;
301                         }
302                         continue;
303                 }
304
305                 /* It's in this group: search (until 0 or all searched) */
306                 for (i = 0, h->found_bucket = h->home_bucket;
307                      i < (1 << TDB_HASH_GROUP_BITS);
308                      i++, h->found_bucket = ((h->found_bucket+1)
309                                              % (1 << TDB_HASH_GROUP_BITS))) {
310                         if (is_subhash(h->group[h->found_bucket]))
311                                 continue;
312
313                         if (!h->group[h->found_bucket])
314                                 break;
315
316                         if (match(tdb, h, &key, h->group[h->found_bucket],
317                                   rec)) {
318                                 if (tinfo) {
319                                         tinfo->levels[tinfo->num_levels-1].entry
320                                                 += h->found_bucket;
321                                 }
322                                 return h->group[h->found_bucket] & TDB_OFF_MASK;
323                         }
324                 }
325                 /* Didn't find it: h indicates where it would go. */
326                 return 0;
327         }
328
329         return find_in_chain(tdb, key, hashtable, h, rec, tinfo);
330
331 fail:
332         tdb_unlock_hashes(tdb, h->hlock_start, h->hlock_range, ltype);
333         return TDB_OFF_ERR;
334 }
335
336 /* I wrote a simple test, expanding a hash to 2GB, for the following
337  * cases:
338  * 1) Expanding all the buckets at once,
339  * 2) Expanding the bucket we wanted to place the new entry into.
340  * 3) Expanding the most-populated bucket,
341  *
342  * I measured the worst/average/best density during this process.
343  * 1) 3%/16%/30%
344  * 2) 4%/20%/38%
345  * 3) 6%/22%/41%
346  *
347  * So we figure out the busiest bucket for the moment.
348  */
349 static unsigned fullest_bucket(struct tdb_context *tdb,
350                                const tdb_off_t *group,
351                                unsigned new_bucket)
352 {
353         unsigned counts[1 << TDB_HASH_GROUP_BITS] = { 0 };
354         unsigned int i, best_bucket;
355
356         /* Count the new entry. */
357         counts[new_bucket]++;
358         best_bucket = new_bucket;
359
360         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
361                 unsigned this_bucket;
362
363                 if (is_subhash(group[i]))
364                         continue;
365                 this_bucket = group[i] & TDB_OFF_HASH_GROUP_MASK;
366                 if (++counts[this_bucket] > counts[best_bucket])
367                         best_bucket = this_bucket;
368         }
369
370         return best_bucket;
371 }
372
373 static bool put_into_group(tdb_off_t *group,
374                            unsigned bucket, tdb_off_t encoded)
375 {
376         unsigned int i;
377
378         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
379                 unsigned b = (bucket + i) % (1 << TDB_HASH_GROUP_BITS);
380
381                 if (group[b] == 0) {
382                         group[b] = encoded;
383                         return true;
384                 }
385         }
386         return false;
387 }
388
389 static void force_into_group(tdb_off_t *group,
390                              unsigned bucket, tdb_off_t encoded)
391 {
392         if (!put_into_group(group, bucket, encoded))
393                 abort();
394 }
395
396 static tdb_off_t encode_offset(tdb_off_t new_off, struct hash_info *h)
397 {
398         return h->home_bucket
399                 | new_off
400                 | ((uint64_t)bits_from(h->h,
401                                   64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
402                                   TDB_OFF_UPPER_STEAL_EXTRA)
403                    << TDB_OFF_HASH_EXTRA_BIT);
404 }
405
406 /* Simply overwrite the hash entry we found before. */
407 int replace_in_hash(struct tdb_context *tdb,
408                     struct hash_info *h,
409                     tdb_off_t new_off)
410 {
411         enum TDB_ERROR ecode;
412
413         ecode = tdb_write_off(tdb, hbucket_off(h->group_start, h->found_bucket),
414                               encode_offset(new_off, h));
415         if (ecode != TDB_SUCCESS) {
416                 tdb->ecode = ecode;
417                 return -1;
418         }
419         return 0;
420 }
421
422 /* We slot in anywhere that's empty in the chain. */
423 static int COLD add_to_chain(struct tdb_context *tdb,
424                              tdb_off_t subhash,
425                              tdb_off_t new_off)
426 {
427         size_t entry = tdb_find_zero_off(tdb, subhash, 1<<TDB_HASH_GROUP_BITS);
428         enum TDB_ERROR ecode;
429
430         if (entry == 1 << TDB_HASH_GROUP_BITS) {
431                 tdb_off_t next;
432
433                 next = tdb_read_off(tdb, subhash
434                                     + offsetof(struct tdb_chain, next));
435                 if (next == TDB_OFF_ERR)
436                         return -1;
437
438                 if (!next) {
439                         next = alloc(tdb, 0, sizeof(struct tdb_chain), 0,
440                                      TDB_CHAIN_MAGIC, false);
441                         if (next == TDB_OFF_ERR)
442                                 return -1;
443                         ecode = zero_out(tdb,
444                                          next+sizeof(struct tdb_used_record),
445                                          sizeof(struct tdb_chain));
446                         if (ecode != TDB_SUCCESS) {
447                                 tdb->ecode = ecode;
448                                 return -1;
449                         }
450                         ecode = tdb_write_off(tdb, subhash
451                                               + offsetof(struct tdb_chain,
452                                                          next),
453                                               next);
454                         if (ecode != TDB_SUCCESS) {
455                                 tdb->ecode = ecode;
456                                 return -1;
457                         }
458                 }
459                 return add_to_chain(tdb, next, new_off);
460         }
461
462         ecode = tdb_write_off(tdb, subhash + entry * sizeof(tdb_off_t),
463                               new_off);
464         if (ecode != TDB_SUCCESS) {
465                 tdb->ecode = ecode;
466                 return -1;
467         }
468         return 0;
469 }
470
471 /* Add into a newly created subhash. */
472 static int add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash,
473                           unsigned hash_used, tdb_off_t val)
474 {
475         tdb_off_t off = (val & TDB_OFF_MASK), *group;
476         struct hash_info h;
477         unsigned int gnum;
478         enum TDB_ERROR ecode;
479
480         h.hash_used = hash_used;
481
482         if (hash_used + TDB_SUBLEVEL_HASH_BITS > 64)
483                 return add_to_chain(tdb, subhash, off);
484
485         h.h = hash_record(tdb, off);
486         gnum = use_bits(&h, TDB_SUBLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS);
487         h.group_start = subhash
488                 + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
489         h.home_bucket = use_bits(&h, TDB_HASH_GROUP_BITS);
490
491         group = tdb_access_write(tdb, h.group_start,
492                                  sizeof(*group) << TDB_HASH_GROUP_BITS, true);
493         if (!group)
494                 return -1;
495         force_into_group(group, h.home_bucket, encode_offset(off, &h));
496         ecode = tdb_access_commit(tdb, group);
497         if (ecode != TDB_SUCCESS) {
498                 tdb->ecode = ecode;
499                 return -1;
500         }
501         return 0;
502 }
503
504 static int expand_group(struct tdb_context *tdb, struct hash_info *h)
505 {
506         unsigned bucket, num_vals, i, magic;
507         size_t subsize;
508         tdb_off_t subhash;
509         tdb_off_t vals[1 << TDB_HASH_GROUP_BITS];
510         enum TDB_ERROR ecode;
511
512         /* Attach new empty subhash under fullest bucket. */
513         bucket = fullest_bucket(tdb, h->group, h->home_bucket);
514
515         if (h->hash_used == 64) {
516                 add_stat(tdb, alloc_chain, 1);
517                 subsize = sizeof(struct tdb_chain);
518                 magic = TDB_CHAIN_MAGIC;
519         } else {
520                 add_stat(tdb, alloc_subhash, 1);
521                 subsize = (sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS);
522                 magic = TDB_HTABLE_MAGIC;
523         }
524
525         subhash = alloc(tdb, 0, subsize, 0, magic, false);
526         if (subhash == TDB_OFF_ERR)
527                 return -1;
528
529         ecode = zero_out(tdb, subhash + sizeof(struct tdb_used_record),
530                          subsize);
531         if (ecode != TDB_SUCCESS) {
532                 tdb->ecode = ecode;
533                 return -1;
534         }
535
536         /* Remove any which are destined for bucket or are in wrong place. */
537         num_vals = 0;
538         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
539                 unsigned home_bucket = h->group[i] & TDB_OFF_HASH_GROUP_MASK;
540                 if (!h->group[i] || is_subhash(h->group[i]))
541                         continue;
542                 if (home_bucket == bucket || home_bucket != i) {
543                         vals[num_vals++] = h->group[i];
544                         h->group[i] = 0;
545                 }
546         }
547         /* FIXME: This assert is valid, but we do this during unit test :( */
548         /* assert(num_vals); */
549
550         /* Overwrite expanded bucket with subhash pointer. */
551         h->group[bucket] = subhash | (1ULL << TDB_OFF_UPPER_STEAL_SUBHASH_BIT);
552
553         /* Point to actual contents of record. */
554         subhash += sizeof(struct tdb_used_record);
555
556         /* Put values back. */
557         for (i = 0; i < num_vals; i++) {
558                 unsigned this_bucket = vals[i] & TDB_OFF_HASH_GROUP_MASK;
559
560                 if (this_bucket == bucket) {
561                         if (add_to_subhash(tdb, subhash, h->hash_used, vals[i]))
562                                 return -1;
563                 } else {
564                         /* There should be room to put this back. */
565                         force_into_group(h->group, this_bucket, vals[i]);
566                 }
567         }
568         return 0;
569 }
570
571 int delete_from_hash(struct tdb_context *tdb, struct hash_info *h)
572 {
573         unsigned int i, num_movers = 0;
574         tdb_off_t movers[1 << TDB_HASH_GROUP_BITS];
575         enum TDB_ERROR ecode;
576
577         h->group[h->found_bucket] = 0;
578         for (i = 1; i < (1 << TDB_HASH_GROUP_BITS); i++) {
579                 unsigned this_bucket;
580
581                 this_bucket = (h->found_bucket+i) % (1 << TDB_HASH_GROUP_BITS);
582                 /* Empty bucket?  We're done. */
583                 if (!h->group[this_bucket])
584                         break;
585
586                 /* Ignore subhashes. */
587                 if (is_subhash(h->group[this_bucket]))
588                         continue;
589
590                 /* If this one is not happy where it is, we'll move it. */
591                 if ((h->group[this_bucket] & TDB_OFF_HASH_GROUP_MASK)
592                     != this_bucket) {
593                         movers[num_movers++] = h->group[this_bucket];
594                         h->group[this_bucket] = 0;
595                 }
596         }
597
598         /* Put back the ones we erased. */
599         for (i = 0; i < num_movers; i++) {
600                 force_into_group(h->group, movers[i] & TDB_OFF_HASH_GROUP_MASK,
601                                  movers[i]);
602         }
603
604         /* Now we write back the hash group */
605         ecode = tdb_write_convert(tdb, h->group_start,
606                                   h->group, sizeof(h->group));
607         if (ecode != TDB_SUCCESS) {
608                 tdb->ecode = ecode;
609                 return -1;
610         }
611         return 0;
612 }
613
614 int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off)
615 {
616         enum TDB_ERROR ecode;
617
618         /* We hit an empty bucket during search?  That's where it goes. */
619         if (!h->group[h->found_bucket]) {
620                 h->group[h->found_bucket] = encode_offset(new_off, h);
621                 /* Write back the modified group. */
622                 ecode = tdb_write_convert(tdb, h->group_start,
623                                           h->group, sizeof(h->group));
624                 if (ecode != TDB_SUCCESS) {
625                         tdb->ecode = ecode;
626                         return -1;
627                 }
628                 return 0;
629         }
630
631         if (h->hash_used > 64)
632                 return add_to_chain(tdb, h->group_start, new_off);
633
634         /* We're full.  Expand. */
635         if (expand_group(tdb, h) == -1)
636                 return -1;
637
638         if (is_subhash(h->group[h->home_bucket])) {
639                 /* We were expanded! */
640                 tdb_off_t hashtable;
641                 unsigned int gnum;
642
643                 /* Write back the modified group. */
644                 ecode = tdb_write_convert(tdb, h->group_start, h->group,
645                                           sizeof(h->group));
646                 if (ecode != TDB_SUCCESS) {
647                         tdb->ecode = ecode;
648                         return -1;
649                 }
650
651                 /* Move hashinfo down a level. */
652                 hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
653                         + sizeof(struct tdb_used_record);
654                 gnum = use_bits(h,TDB_SUBLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
655                 h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
656                 h->group_start = hashtable
657                         + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
658                 ecode = tdb_read_convert(tdb, h->group_start, &h->group,
659                                          sizeof(h->group));
660                 if (ecode != TDB_SUCCESS) {
661                         tdb->ecode = ecode;
662                         return -1;
663                 }
664         }
665
666         /* Expanding the group must have made room if it didn't choose this
667          * bucket. */
668         if (put_into_group(h->group, h->home_bucket, encode_offset(new_off,h))){
669                 ecode = tdb_write_convert(tdb, h->group_start,
670                                           h->group, sizeof(h->group));
671                 if (ecode != TDB_SUCCESS) {
672                         tdb->ecode = ecode;
673                         return -1;
674                 }
675                 return 0;
676         }
677
678         /* This can happen if all hashes in group (and us) dropped into same
679          * group in subhash. */
680         return add_to_hash(tdb, h, new_off);
681 }
682
683 /* Traverse support: returns offset of record, or 0 or TDB_OFF_ERR. */
684 static tdb_off_t iterate_hash(struct tdb_context *tdb,
685                               struct traverse_info *tinfo)
686 {
687         tdb_off_t off, val;
688         unsigned int i;
689         struct traverse_level *tlevel;
690
691         tlevel = &tinfo->levels[tinfo->num_levels-1];
692
693 again:
694         for (i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
695                                       tlevel->entry, tlevel->total_buckets);
696              i != tlevel->total_buckets;
697              i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
698                                       i+1, tlevel->total_buckets)) {
699                 val = tdb_read_off(tdb, tlevel->hashtable+sizeof(tdb_off_t)*i);
700                 if (unlikely(val == TDB_OFF_ERR))
701                         return TDB_OFF_ERR;
702
703                 off = val & TDB_OFF_MASK;
704
705                 /* This makes the delete-all-in-traverse case work
706                  * (and simplifies our logic a little). */
707                 if (off == tinfo->prev)
708                         continue;
709
710                 tlevel->entry = i;
711
712                 if (!is_subhash(val)) {
713                         /* Found one. */
714                         tinfo->prev = off;
715                         return off;
716                 }
717
718                 /* When we come back, we want the next one */
719                 tlevel->entry++;
720                 tinfo->num_levels++;
721                 tlevel++;
722                 tlevel->hashtable = off + sizeof(struct tdb_used_record);
723                 tlevel->entry = 0;
724                 /* Next level is a chain? */
725                 if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1))
726                         tlevel->total_buckets = (1 << TDB_HASH_GROUP_BITS);
727                 else
728                         tlevel->total_buckets = (1 << TDB_SUBLEVEL_HASH_BITS);
729                 goto again;
730         }
731
732         /* Nothing there? */
733         if (tinfo->num_levels == 1)
734                 return 0;
735
736         /* Handle chained entries. */
737         if (unlikely(tinfo->num_levels == TDB_MAX_LEVELS + 1)) {
738                 tlevel->hashtable = tdb_read_off(tdb, tlevel->hashtable
739                                                  + offsetof(struct tdb_chain,
740                                                             next));
741                 if (tlevel->hashtable == TDB_OFF_ERR)
742                         return TDB_OFF_ERR;
743                 if (tlevel->hashtable) {
744                         tlevel->hashtable += sizeof(struct tdb_used_record);
745                         tlevel->entry = 0;
746                         goto again;
747                 }
748         }
749
750         /* Go back up and keep searching. */
751         tinfo->num_levels--;
752         tlevel--;
753         goto again;
754 }
755
756 /* Return 1 if we find something, 0 if not, -1 on error. */
757 int next_in_hash(struct tdb_context *tdb,
758                  struct traverse_info *tinfo,
759                  TDB_DATA *kbuf, size_t *dlen)
760 {
761         const unsigned group_bits = TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS;
762         tdb_off_t hl_start, hl_range, off;
763         enum TDB_ERROR ecode;
764
765         while (tinfo->toplevel_group < (1 << group_bits)) {
766                 hl_start = (tdb_off_t)tinfo->toplevel_group
767                         << (64 - group_bits);
768                 hl_range = 1ULL << group_bits;
769                 ecode = tdb_lock_hashes(tdb, hl_start, hl_range, F_RDLCK,
770                                         TDB_LOCK_WAIT);
771                 if (ecode != TDB_SUCCESS) {
772                         tdb->ecode = ecode;
773                         return -1;
774                 }
775
776                 off = iterate_hash(tdb, tinfo);
777                 if (off) {
778                         struct tdb_used_record rec;
779
780                         ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
781                         if (ecode != TDB_SUCCESS) {
782                                 tdb->ecode = ecode;
783                                 tdb_unlock_hashes(tdb,
784                                                   hl_start, hl_range, F_RDLCK);
785                                 return -1;
786                         }
787                         if (rec_magic(&rec) != TDB_USED_MAGIC) {
788                                 tdb_logerr(tdb, TDB_ERR_CORRUPT,
789                                            TDB_LOG_ERROR,
790                                            "next_in_hash:"
791                                            " corrupt record at %llu",
792                                            (long long)off);
793                                 return -1;
794                         }
795
796                         kbuf->dsize = rec_key_length(&rec);
797
798                         /* They want data as well? */
799                         if (dlen) {
800                                 *dlen = rec_data_length(&rec);
801                                 kbuf->dptr = tdb_alloc_read(tdb,
802                                                             off + sizeof(rec),
803                                                             kbuf->dsize
804                                                             + *dlen);
805                         } else {
806                                 kbuf->dptr = tdb_alloc_read(tdb,
807                                                             off + sizeof(rec),
808                                                             kbuf->dsize);
809                         }
810                         tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
811                         return kbuf->dptr ? 1 : -1;
812                 }
813
814                 tdb_unlock_hashes(tdb, hl_start, hl_range, F_RDLCK);
815
816                 tinfo->toplevel_group++;
817                 tinfo->levels[0].hashtable
818                         += (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
819                 tinfo->levels[0].entry = 0;
820         }
821         return 0;
822 }
823
824 /* Return 1 if we find something, 0 if not, -1 on error. */
825 int first_in_hash(struct tdb_context *tdb,
826                   struct traverse_info *tinfo,
827                   TDB_DATA *kbuf, size_t *dlen)
828 {
829         tinfo->prev = 0;
830         tinfo->toplevel_group = 0;
831         tinfo->num_levels = 1;
832         tinfo->levels[0].hashtable = offsetof(struct tdb_header, hashtable);
833         tinfo->levels[0].entry = 0;
834         tinfo->levels[0].total_buckets = (1 << TDB_HASH_GROUP_BITS);
835
836         return next_in_hash(tdb, tinfo, kbuf, dlen);
837 }
838
839 /* Even if the entry isn't in this hash bucket, you'd have to lock this
840  * bucket to find it. */
841 static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
842                      int ltype, enum tdb_lock_flags waitflag,
843                      const char *func)
844 {
845         enum TDB_ERROR ecode;
846         uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
847         tdb_off_t lockstart, locksize;
848         unsigned int group, gbits;
849
850         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
851         group = bits_from(h, 64 - gbits, gbits);
852
853         lockstart = hlock_range(group, &locksize);
854
855         ecode = tdb_lock_hashes(tdb, lockstart, locksize, ltype, waitflag);
856         tdb_trace_1rec(tdb, func, *key);
857         if (ecode != TDB_SUCCESS) {
858                 tdb->ecode = ecode;
859                 return -1;
860         }
861         return 0;
862 }
863
864 /* lock/unlock one hash chain. This is meant to be used to reduce
865    contention - it cannot guarantee how many records will be locked */
866 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
867 {
868         return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
869 }
870
871 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
872 {
873         uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
874         tdb_off_t lockstart, locksize;
875         unsigned int group, gbits;
876         enum TDB_ERROR ecode;
877
878         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
879         group = bits_from(h, 64 - gbits, gbits);
880
881         lockstart = hlock_range(group, &locksize);
882
883         tdb_trace_1rec(tdb, "tdb_chainunlock", key);
884         ecode = tdb_unlock_hashes(tdb, lockstart, locksize, F_WRLCK);
885         if (ecode != TDB_SUCCESS) {
886                 tdb->ecode = ecode;
887                 return -1;
888         }
889         return 0;
890 }