tdb2: Add stats attribute.
[ccan] / ccan / tdb2 / hash.c
1  /* 
2    Trivial Database 2: hash handling
3    Copyright (C) Rusty Russell 2010
4    
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 3 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "private.h"
19 #include <assert.h>
20 #include <ccan/hash/hash.h>
21
22 static uint64_t jenkins_hash(const void *key, size_t length, uint64_t seed,
23                              void *arg)
24 {
25         uint64_t ret;
26         /* hash64_stable assumes lower bits are more important; they are a
27          * slightly better hash.  We use the upper bits first, so swap them. */
28         ret = hash64_stable((const unsigned char *)key, length, seed);
29         return (ret >> 32) | (ret << 32);
30 }
31
32 void tdb_hash_init(struct tdb_context *tdb)
33 {
34         tdb->khash = jenkins_hash;
35         tdb->hash_priv = NULL;
36 }
37
38 uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len)
39 {
40         return tdb->khash(ptr, len, tdb->hash_seed, tdb->hash_priv);
41 }
42
43 uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
44 {
45         struct tdb_used_record pad, *r;
46         const void *key;
47         uint64_t klen, hash;
48
49         r = tdb_get(tdb, off, &pad, sizeof(pad));
50         if (!r)
51                 /* FIXME */
52                 return 0;
53
54         klen = rec_key_length(r);
55         key = tdb_access_read(tdb, off + sizeof(pad), klen, false);
56         if (!key)
57                 return 0;
58
59         hash = tdb_hash(tdb, key, klen);
60         tdb_access_release(tdb, key);
61         return hash;
62 }
63
64 /* Get bits from a value. */
65 static uint32_t bits(uint64_t val, unsigned start, unsigned num)
66 {
67         assert(num <= 32);
68         return (val >> start) & ((1U << num) - 1);
69 }
70
71 /* We take bits from the top: that way we can lock whole sections of the hash
72  * by using lock ranges. */
73 static uint32_t use_bits(struct hash_info *h, unsigned num)
74 {
75         h->hash_used += num;
76         return bits(h->h, 64 - h->hash_used, num);
77 }
78
79 /* Does entry match? */
80 static bool match(struct tdb_context *tdb,
81                   struct hash_info *h,
82                   const struct tdb_data *key,
83                   tdb_off_t val,
84                   struct tdb_used_record *rec)
85 {
86         bool ret;
87         const unsigned char *rkey;
88         tdb_off_t off;
89
90         /* FIXME: Handle hash value truncated. */
91         if (bits(val, TDB_OFF_HASH_TRUNCATED_BIT, 1))
92                 abort();
93
94         /* Desired bucket must match. */
95         if (h->home_bucket != (val & TDB_OFF_HASH_GROUP_MASK))
96                 return false;
97
98         /* Top bits of offset == next bits of hash. */
99         if (bits(val, TDB_OFF_HASH_EXTRA_BIT, TDB_OFF_UPPER_STEAL_EXTRA)
100             != bits(h->h, 64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
101                     TDB_OFF_UPPER_STEAL_EXTRA))
102                 return false;
103
104         off = val & TDB_OFF_MASK;
105         if (tdb_read_convert(tdb, off, rec, sizeof(*rec)) == -1)
106                 return false;
107
108         /* FIXME: check extra bits in header? */
109         if (rec_key_length(rec) != key->dsize)
110                 return false;
111
112         rkey = tdb_access_read(tdb, off + sizeof(*rec), key->dsize, false);
113         if (!rkey)
114                 return false;
115         ret = (memcmp(rkey, key->dptr, key->dsize) == 0);
116         tdb_access_release(tdb, rkey);
117         return ret;
118 }
119
120 static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned bucket)
121 {
122         return group_start
123                 + (bucket % (1 << TDB_HASH_GROUP_BITS)) * sizeof(tdb_off_t);
124 }
125
126 /* Truncated hashes can't be all 1: that's how we spot a sub-hash */
127 bool is_subhash(tdb_off_t val)
128 {
129         return val >> (64-TDB_OFF_UPPER_STEAL) == (1<<TDB_OFF_UPPER_STEAL) - 1;
130 }
131
132 /* FIXME: Guess the depth, don't over-lock! */
133 static tdb_off_t hlock_range(tdb_off_t group, tdb_off_t *size)
134 {
135         *size = 1ULL << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
136         return group << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
137 }
138
139 /* This is the core routine which searches the hashtable for an entry.
140  * On error, no locks are held and TDB_OFF_ERR is returned.
141  * Otherwise, hinfo is filled in (and the optional tinfo).
142  * If not found, the return value is 0.
143  * If found, the return value is the offset, and *rec is the record. */
144 tdb_off_t find_and_lock(struct tdb_context *tdb,
145                         struct tdb_data key,
146                         int ltype,
147                         struct hash_info *h,
148                         struct tdb_used_record *rec,
149                         struct traverse_info *tinfo)
150 {
151         uint32_t i, group;
152         tdb_off_t hashtable;
153
154         h->h = tdb_hash(tdb, key.dptr, key.dsize);
155         h->hash_used = 0;
156         group = use_bits(h, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
157         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
158
159         h->hlock_start = hlock_range(group, &h->hlock_range);
160         if (tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
161                             TDB_LOCK_WAIT))
162                 return TDB_OFF_ERR;
163
164         hashtable = offsetof(struct tdb_header, hashtable);
165         if (tinfo) {
166                 tinfo->toplevel_group = group;
167                 tinfo->num_levels = 1;
168                 tinfo->levels[0].entry = 0;
169                 tinfo->levels[0].hashtable = hashtable 
170                         + (group << TDB_HASH_GROUP_BITS) * sizeof(tdb_off_t);
171                 tinfo->levels[0].total_buckets = 1 << TDB_HASH_GROUP_BITS;
172         }
173
174         while (likely(h->hash_used < 64)) {
175                 /* Read in the hash group. */
176                 h->group_start = hashtable
177                         + group * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
178
179                 if (tdb_read_convert(tdb, h->group_start, &h->group,
180                                      sizeof(h->group)) == -1)
181                         goto fail;
182
183                 /* Pointer to another hash table?  Go down... */
184                 if (is_subhash(h->group[h->home_bucket])) {
185                         hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
186                                 + sizeof(struct tdb_used_record);
187                         if (tinfo) {
188                                 /* When we come back, use *next* bucket */
189                                 tinfo->levels[tinfo->num_levels-1].entry
190                                         += h->home_bucket + 1;
191                         }
192                         group = use_bits(h, TDB_SUBLEVEL_HASH_BITS
193                                          - TDB_HASH_GROUP_BITS);
194                         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
195                         if (tinfo) {
196                                 tinfo->levels[tinfo->num_levels].hashtable
197                                         = hashtable;
198                                 tinfo->levels[tinfo->num_levels].total_buckets
199                                         = 1 << TDB_SUBLEVEL_HASH_BITS;
200                                 tinfo->levels[tinfo->num_levels].entry
201                                         = group << TDB_HASH_GROUP_BITS;
202                                 tinfo->num_levels++;
203                         }
204                         continue;
205                 }
206
207                 /* It's in this group: search (until 0 or all searched) */
208                 for (i = 0, h->found_bucket = h->home_bucket;
209                      i < (1 << TDB_HASH_GROUP_BITS);
210                      i++, h->found_bucket = ((h->found_bucket+1)
211                                              % (1 << TDB_HASH_GROUP_BITS))) {
212                         if (is_subhash(h->group[h->found_bucket]))
213                                 continue;
214
215                         if (!h->group[h->found_bucket])
216                                 break;
217
218                         if (match(tdb, h, &key, h->group[h->found_bucket],
219                                   rec)) {
220                                 if (tinfo) {
221                                         tinfo->levels[tinfo->num_levels-1].entry
222                                                 += h->found_bucket;
223                                 }
224                                 return h->group[h->found_bucket] & TDB_OFF_MASK;
225                         }
226                 }
227                 /* Didn't find it: h indicates where it would go. */
228                 return 0;
229         }
230
231         /* FIXME: We hit the bottom.  Chain! */
232         abort();
233
234 fail:
235         tdb_unlock_hashes(tdb, h->hlock_start, h->hlock_range, ltype);
236         return TDB_OFF_ERR;
237 }
238
239 /* I wrote a simple test, expanding a hash to 2GB, for the following
240  * cases:
241  * 1) Expanding all the buckets at once,
242  * 2) Expanding the most-populated bucket,
243  * 3) Expanding the bucket we wanted to place the new entry ito.
244  *
245  * I measured the worst/average/best density during this process.
246  * 1) 3%/16%/30%
247  * 2) 4%/20%/38%
248  * 3) 6%/22%/41%
249  *
250  * So we figure out the busiest bucket for the moment.
251  */
252 static unsigned fullest_bucket(struct tdb_context *tdb,
253                                const tdb_off_t *group,
254                                unsigned new_bucket)
255 {
256         unsigned counts[1 << TDB_HASH_GROUP_BITS] = { 0 };
257         unsigned int i, best_bucket;
258
259         /* Count the new entry. */
260         counts[new_bucket]++;
261         best_bucket = new_bucket;
262
263         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
264                 unsigned this_bucket;
265
266                 if (is_subhash(group[i]))
267                         continue;
268                 this_bucket = group[i] & TDB_OFF_HASH_GROUP_MASK;
269                 if (++counts[this_bucket] > counts[best_bucket])
270                         best_bucket = this_bucket;
271         }
272
273         return best_bucket;
274 }
275
276 static bool put_into_group(tdb_off_t *group,
277                            unsigned bucket, tdb_off_t encoded)
278 {
279         unsigned int i;
280
281         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
282                 unsigned b = (bucket + i) % (1 << TDB_HASH_GROUP_BITS);
283
284                 if (group[b] == 0) {
285                         group[b] = encoded;
286                         return true;
287                 }
288         }
289         return false;
290 }
291
292 static void force_into_group(tdb_off_t *group,
293                              unsigned bucket, tdb_off_t encoded)
294 {
295         if (!put_into_group(group, bucket, encoded))
296                 abort();
297 }
298
299 static tdb_off_t encode_offset(tdb_off_t new_off, struct hash_info *h)
300 {
301         return h->home_bucket
302                 | new_off
303                 | ((uint64_t)bits(h->h,
304                                   64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
305                                   TDB_OFF_UPPER_STEAL_EXTRA)
306                    << TDB_OFF_HASH_EXTRA_BIT);
307 }
308
309 /* Simply overwrite the hash entry we found before. */ 
310 int replace_in_hash(struct tdb_context *tdb,
311                     struct hash_info *h,
312                     tdb_off_t new_off)
313 {
314         return tdb_write_off(tdb, hbucket_off(h->group_start, h->found_bucket),
315                              encode_offset(new_off, h));
316 }
317
318 /* Add into a newly created subhash. */
319 static int add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash,
320                           unsigned hash_used, tdb_off_t val)
321 {
322         tdb_off_t off = (val & TDB_OFF_MASK), *group;
323         struct hash_info h;
324         unsigned int gnum;
325
326         h.hash_used = hash_used;
327
328         /* FIXME chain if hash_used == 64 */
329         if (hash_used + TDB_SUBLEVEL_HASH_BITS > 64)
330                 abort();
331
332         /* FIXME: Do truncated hash bits if we can! */
333         h.h = hash_record(tdb, off);
334         gnum = use_bits(&h, TDB_SUBLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS);
335         h.group_start = subhash + sizeof(struct tdb_used_record)
336                 + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
337         h.home_bucket = use_bits(&h, TDB_HASH_GROUP_BITS);
338
339         group = tdb_access_write(tdb, h.group_start,
340                                  sizeof(*group) << TDB_HASH_GROUP_BITS, true);
341         if (!group)
342                 return -1;
343         force_into_group(group, h.home_bucket, encode_offset(off, &h));
344         return tdb_access_commit(tdb, group);
345 }
346
347 static int expand_group(struct tdb_context *tdb, struct hash_info *h)
348 {
349         unsigned bucket, num_vals, i;
350         tdb_off_t subhash;
351         tdb_off_t vals[1 << TDB_HASH_GROUP_BITS];
352
353         /* Attach new empty subhash under fullest bucket. */
354         bucket = fullest_bucket(tdb, h->group, h->home_bucket);
355
356         subhash = alloc(tdb, 0, sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS,
357                         0, false);
358         if (subhash == TDB_OFF_ERR)
359                 return -1;
360
361         add_stat(tdb, alloc_subhash, 1);
362         if (zero_out(tdb, subhash + sizeof(struct tdb_used_record),
363                      sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS) == -1)
364                 return -1;
365
366         /* Remove any which are destined for bucket or are in wrong place. */
367         num_vals = 0;
368         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
369                 unsigned home_bucket = h->group[i] & TDB_OFF_HASH_GROUP_MASK;
370                 if (!h->group[i] || is_subhash(h->group[i]))
371                         continue;
372                 if (home_bucket == bucket || home_bucket != i) {
373                         vals[num_vals++] = h->group[i];
374                         h->group[i] = 0;
375                 }
376         }
377         /* FIXME: This assert is valid, but we do this during unit test :( */
378         /* assert(num_vals); */
379
380         /* Overwrite expanded bucket with subhash pointer. */
381         h->group[bucket] = subhash | ~((1ULL << (64 - TDB_OFF_UPPER_STEAL))-1);
382
383         /* Put values back. */
384         for (i = 0; i < num_vals; i++) {
385                 unsigned this_bucket = vals[i] & TDB_OFF_HASH_GROUP_MASK;
386
387                 if (this_bucket == bucket) {
388                         if (add_to_subhash(tdb, subhash, h->hash_used, vals[i]))
389                                 return -1;
390                 } else {
391                         /* There should be room to put this back. */
392                         force_into_group(h->group, this_bucket, vals[i]);
393                 }
394         }
395         return 0;
396 }
397
398 int delete_from_hash(struct tdb_context *tdb, struct hash_info *h)
399 {
400         unsigned int i, num_movers = 0;
401         tdb_off_t movers[1 << TDB_HASH_GROUP_BITS];
402
403         h->group[h->found_bucket] = 0;
404         for (i = 1; i < (1 << TDB_HASH_GROUP_BITS); i++) {
405                 unsigned this_bucket;
406
407                 this_bucket = (h->found_bucket+i) % (1 << TDB_HASH_GROUP_BITS);
408                 /* Empty bucket?  We're done. */
409                 if (!h->group[this_bucket])
410                         break;
411
412                 /* Ignore subhashes. */
413                 if (is_subhash(h->group[this_bucket]))
414                         continue;
415
416                 /* If this one is not happy where it is, we'll move it. */
417                 if ((h->group[this_bucket] & TDB_OFF_HASH_GROUP_MASK)
418                     != this_bucket) {
419                         movers[num_movers++] = h->group[this_bucket];
420                         h->group[this_bucket] = 0;
421                 }
422         }
423
424         /* Put back the ones we erased. */
425         for (i = 0; i < num_movers; i++) {
426                 force_into_group(h->group, movers[i] & TDB_OFF_HASH_GROUP_MASK,
427                                  movers[i]);
428         }
429
430         /* Now we write back the hash group */
431         return tdb_write_convert(tdb, h->group_start,
432                                  h->group, sizeof(h->group));
433 }
434
435 int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off)
436 {
437         /* FIXME: chain! */
438         if (h->hash_used >= 64)
439                 abort();
440
441         /* We hit an empty bucket during search?  That's where it goes. */
442         if (!h->group[h->found_bucket]) {
443                 h->group[h->found_bucket] = encode_offset(new_off, h);
444                 /* Write back the modified group. */
445                 return tdb_write_convert(tdb, h->group_start,
446                                          h->group, sizeof(h->group));
447         }
448
449         /* We're full.  Expand. */
450         if (expand_group(tdb, h) == -1)
451                 return -1;
452
453         if (is_subhash(h->group[h->home_bucket])) {
454                 /* We were expanded! */
455                 tdb_off_t hashtable;
456                 unsigned int gnum;
457
458                 /* Write back the modified group. */
459                 if (tdb_write_convert(tdb, h->group_start, h->group,
460                                       sizeof(h->group)))
461                         return -1;
462
463                 /* Move hashinfo down a level. */
464                 hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
465                         + sizeof(struct tdb_used_record);
466                 gnum = use_bits(h,TDB_SUBLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
467                 h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
468                 h->group_start = hashtable
469                         + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
470                 if (tdb_read_convert(tdb, h->group_start, &h->group,
471                                      sizeof(h->group)) == -1)
472                         return -1;
473         }
474
475         /* Expanding the group must have made room if it didn't choose this
476          * bucket. */
477         if (put_into_group(h->group, h->home_bucket, encode_offset(new_off, h)))
478                 return tdb_write_convert(tdb, h->group_start,
479                                          h->group, sizeof(h->group));
480
481         /* This can happen if all hashes in group (and us) dropped into same
482          * group in subhash. */
483         return add_to_hash(tdb, h, new_off);
484 }
485
486 /* Traverse support: returns offset of record, or 0 or TDB_OFF_ERR. */
487 static tdb_off_t iterate_hash(struct tdb_context *tdb,
488                               struct traverse_info *tinfo)
489 {
490         tdb_off_t off, val;
491         unsigned int i;
492         struct traverse_level *tlevel;
493
494         tlevel = &tinfo->levels[tinfo->num_levels-1];
495
496 again:
497         for (i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
498                                       tlevel->entry, tlevel->total_buckets);
499              i != tlevel->total_buckets;
500              i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
501                                       i+1, tlevel->total_buckets)) {
502                 val = tdb_read_off(tdb, tlevel->hashtable+sizeof(tdb_off_t)*i);
503                 if (unlikely(val == TDB_OFF_ERR))
504                         return TDB_OFF_ERR;
505
506                 off = val & TDB_OFF_MASK;
507
508                 /* This makes the delete-all-in-traverse case work
509                  * (and simplifies our logic a little). */
510                 if (off == tinfo->prev)
511                         continue;
512
513                 tlevel->entry = i;
514
515                 if (!is_subhash(val)) {
516                         /* Found one. */
517                         tinfo->prev = off;
518                         return off;
519                 }
520
521                 /* When we come back, we want the next one */
522                 tlevel->entry++;
523                 tinfo->num_levels++;
524                 tlevel++;
525                 tlevel->hashtable = off + sizeof(struct tdb_used_record);
526                 tlevel->entry = 0;
527                 tlevel->total_buckets = (1 << TDB_SUBLEVEL_HASH_BITS);
528                 goto again;
529         }
530
531         /* Nothing there? */
532         if (tinfo->num_levels == 1)
533                 return 0;
534
535         /* Go back up and keep searching. */
536         tinfo->num_levels--;
537         tlevel--;
538         goto again;
539 }
540
541 /* Return 1 if we find something, 0 if not, -1 on error. */
542 int next_in_hash(struct tdb_context *tdb, int ltype,
543                  struct traverse_info *tinfo,
544                  TDB_DATA *kbuf, size_t *dlen)
545 {
546         const unsigned group_bits = TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS;
547         tdb_off_t hlock_start, hlock_range, off;
548
549         while (tinfo->toplevel_group < (1 << group_bits)) {
550                 hlock_start = (tdb_off_t)tinfo->toplevel_group
551                         << (64 - group_bits);
552                 hlock_range = 1ULL << group_bits;
553                 if (tdb_lock_hashes(tdb, hlock_start, hlock_range, ltype,
554                                     TDB_LOCK_WAIT) != 0)
555                         return -1;
556
557                 off = iterate_hash(tdb, tinfo);
558                 if (off) {
559                         struct tdb_used_record rec;
560
561                         if (tdb_read_convert(tdb, off, &rec, sizeof(rec))) {
562                                 tdb_unlock_hashes(tdb,
563                                                   hlock_start, hlock_range,
564                                                   ltype);
565                                 return -1;
566                         }
567                         if (rec_magic(&rec) != TDB_MAGIC) {
568                                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
569                                          "next_in_hash:"
570                                          " corrupt record at %llu\n",
571                                          (long long)off);
572                                 return -1;
573                         }
574
575                         kbuf->dsize = rec_key_length(&rec);
576
577                         /* They want data as well? */
578                         if (dlen) {
579                                 *dlen = rec_data_length(&rec);
580                                 kbuf->dptr = tdb_alloc_read(tdb, 
581                                                             off + sizeof(rec),
582                                                             kbuf->dsize
583                                                             + *dlen);
584                         } else {
585                                 kbuf->dptr = tdb_alloc_read(tdb, 
586                                                             off + sizeof(rec),
587                                                             kbuf->dsize);
588                         }
589                         tdb_unlock_hashes(tdb, hlock_start, hlock_range, ltype);
590                         return kbuf->dptr ? 1 : -1;
591                 }
592
593                 tdb_unlock_hashes(tdb, hlock_start, hlock_range, ltype);
594
595                 tinfo->toplevel_group++;
596                 tinfo->levels[0].hashtable
597                         += (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
598                 tinfo->levels[0].entry = 0;
599         }
600         return 0;
601 }
602
603 /* Return 1 if we find something, 0 if not, -1 on error. */
604 int first_in_hash(struct tdb_context *tdb, int ltype,
605                   struct traverse_info *tinfo,
606                   TDB_DATA *kbuf, size_t *dlen)
607 {
608         tinfo->prev = 0;
609         tinfo->toplevel_group = 0;
610         tinfo->num_levels = 1;
611         tinfo->levels[0].hashtable = offsetof(struct tdb_header, hashtable);
612         tinfo->levels[0].entry = 0;
613         tinfo->levels[0].total_buckets = (1 << TDB_HASH_GROUP_BITS);
614
615         return next_in_hash(tdb, ltype, tinfo, kbuf, dlen);
616 }
617
618 /* Even if the entry isn't in this hash bucket, you'd have to lock this
619  * bucket to find it. */
620 static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
621                      int ltype, enum tdb_lock_flags waitflag,
622                      const char *func)
623 {
624         int ret;
625         uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
626         tdb_off_t lockstart, locksize;
627         unsigned int group, gbits;
628
629         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
630         group = bits(h, 64 - gbits, gbits);
631
632         lockstart = hlock_range(group, &locksize);
633
634         ret = tdb_lock_hashes(tdb, lockstart, locksize, ltype, waitflag);
635         tdb_trace_1rec(tdb, func, *key);
636         return ret;
637 }
638
639 /* lock/unlock one hash chain. This is meant to be used to reduce
640    contention - it cannot guarantee how many records will be locked */
641 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
642 {
643         return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
644 }
645
646 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
647 {
648         uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
649         tdb_off_t lockstart, locksize;
650         unsigned int group, gbits;
651
652         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
653         group = bits(h, 64 - gbits, gbits);
654
655         lockstart = hlock_range(group, &locksize);
656
657         tdb_trace_1rec(tdb, "tdb_chainunlock", key);
658         return tdb_unlock_hashes(tdb, lockstart, locksize, F_WRLCK);
659 }