tdb2: add comparison stats
[ccan] / ccan / tdb2 / hash.c
1  /* 
2    Trivial Database 2: hash handling
3    Copyright (C) Rusty Russell 2010
4    
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 3 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "private.h"
19 #include <assert.h>
20 #include <ccan/hash/hash.h>
21
22 static uint64_t jenkins_hash(const void *key, size_t length, uint64_t seed,
23                              void *arg)
24 {
25         uint64_t ret;
26         /* hash64_stable assumes lower bits are more important; they are a
27          * slightly better hash.  We use the upper bits first, so swap them. */
28         ret = hash64_stable((const unsigned char *)key, length, seed);
29         return (ret >> 32) | (ret << 32);
30 }
31
32 void tdb_hash_init(struct tdb_context *tdb)
33 {
34         tdb->khash = jenkins_hash;
35         tdb->hash_priv = NULL;
36 }
37
38 uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len)
39 {
40         return tdb->khash(ptr, len, tdb->hash_seed, tdb->hash_priv);
41 }
42
43 uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
44 {
45         const struct tdb_used_record *r;
46         const void *key;
47         uint64_t klen, hash;
48
49         r = tdb_access_read(tdb, off, sizeof(*r), true);
50         if (!r)
51                 /* FIXME */
52                 return 0;
53
54         klen = rec_key_length(r);
55         tdb_access_release(tdb, r);
56
57         key = tdb_access_read(tdb, off + sizeof(*r), klen, false);
58         if (!key)
59                 return 0;
60
61         hash = tdb_hash(tdb, key, klen);
62         tdb_access_release(tdb, key);
63         return hash;
64 }
65
66 /* Get bits from a value. */
67 static uint32_t bits(uint64_t val, unsigned start, unsigned num)
68 {
69         assert(num <= 32);
70         return (val >> start) & ((1U << num) - 1);
71 }
72
73 /* We take bits from the top: that way we can lock whole sections of the hash
74  * by using lock ranges. */
75 static uint32_t use_bits(struct hash_info *h, unsigned num)
76 {
77         h->hash_used += num;
78         return bits(h->h, 64 - h->hash_used, num);
79 }
80
81 /* Does entry match? */
82 static bool match(struct tdb_context *tdb,
83                   struct hash_info *h,
84                   const struct tdb_data *key,
85                   tdb_off_t val,
86                   struct tdb_used_record *rec)
87 {
88         bool ret = false;
89         const unsigned char *rkey;
90         tdb_off_t off;
91
92         add_stat(tdb, compares, 1);
93         /* Desired bucket must match. */
94         if (h->home_bucket != (val & TDB_OFF_HASH_GROUP_MASK)) {
95                 add_stat(tdb, compare_wrong_bucket, 1);
96                 return ret;
97         }
98
99         /* Top bits of offset == next bits of hash. */
100         if (bits(val, TDB_OFF_HASH_EXTRA_BIT, TDB_OFF_UPPER_STEAL_EXTRA)
101             != bits(h->h, 64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
102                     TDB_OFF_UPPER_STEAL_EXTRA)) {
103                 add_stat(tdb, compare_wrong_offsetbits, 1);
104                 return ret;
105         }
106
107         off = val & TDB_OFF_MASK;
108         if (tdb_read_convert(tdb, off, rec, sizeof(*rec)) == -1)
109                 return ret;
110
111         /* FIXME: check extra bits in header? */
112         if (rec_key_length(rec) != key->dsize) {
113                 add_stat(tdb, compare_wrong_keylen, 1);
114                 return ret;
115         }
116
117         rkey = tdb_access_read(tdb, off + sizeof(*rec), key->dsize, false);
118         if (!rkey)
119                 return ret;
120         if (memcmp(rkey, key->dptr, key->dsize) == 0)
121                 ret = true;
122         else
123                 add_stat(tdb, compare_wrong_keycmp, 1);
124         tdb_access_release(tdb, rkey);
125         return ret;
126 }
127
128 static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned bucket)
129 {
130         return group_start
131                 + (bucket % (1 << TDB_HASH_GROUP_BITS)) * sizeof(tdb_off_t);
132 }
133
134 bool is_subhash(tdb_off_t val)
135 {
136         return (val >> TDB_OFF_UPPER_STEAL_SUBHASH_BIT) & 1;
137 }
138
139 /* FIXME: Guess the depth, don't over-lock! */
140 static tdb_off_t hlock_range(tdb_off_t group, tdb_off_t *size)
141 {
142         *size = 1ULL << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
143         return group << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
144 }
145
146 /* This is the core routine which searches the hashtable for an entry.
147  * On error, no locks are held and TDB_OFF_ERR is returned.
148  * Otherwise, hinfo is filled in (and the optional tinfo).
149  * If not found, the return value is 0.
150  * If found, the return value is the offset, and *rec is the record. */
151 tdb_off_t find_and_lock(struct tdb_context *tdb,
152                         struct tdb_data key,
153                         int ltype,
154                         struct hash_info *h,
155                         struct tdb_used_record *rec,
156                         struct traverse_info *tinfo)
157 {
158         uint32_t i, group;
159         tdb_off_t hashtable;
160
161         h->h = tdb_hash(tdb, key.dptr, key.dsize);
162         h->hash_used = 0;
163         group = use_bits(h, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
164         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
165
166         h->hlock_start = hlock_range(group, &h->hlock_range);
167         if (tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
168                             TDB_LOCK_WAIT))
169                 return TDB_OFF_ERR;
170
171         hashtable = offsetof(struct tdb_header, hashtable);
172         if (tinfo) {
173                 tinfo->toplevel_group = group;
174                 tinfo->num_levels = 1;
175                 tinfo->levels[0].entry = 0;
176                 tinfo->levels[0].hashtable = hashtable 
177                         + (group << TDB_HASH_GROUP_BITS) * sizeof(tdb_off_t);
178                 tinfo->levels[0].total_buckets = 1 << TDB_HASH_GROUP_BITS;
179         }
180
181         while (likely(h->hash_used < 64)) {
182                 /* Read in the hash group. */
183                 h->group_start = hashtable
184                         + group * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
185
186                 if (tdb_read_convert(tdb, h->group_start, &h->group,
187                                      sizeof(h->group)) == -1)
188                         goto fail;
189
190                 /* Pointer to another hash table?  Go down... */
191                 if (is_subhash(h->group[h->home_bucket])) {
192                         hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
193                                 + sizeof(struct tdb_used_record);
194                         if (tinfo) {
195                                 /* When we come back, use *next* bucket */
196                                 tinfo->levels[tinfo->num_levels-1].entry
197                                         += h->home_bucket + 1;
198                         }
199                         group = use_bits(h, TDB_SUBLEVEL_HASH_BITS
200                                          - TDB_HASH_GROUP_BITS);
201                         h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
202                         if (tinfo) {
203                                 tinfo->levels[tinfo->num_levels].hashtable
204                                         = hashtable;
205                                 tinfo->levels[tinfo->num_levels].total_buckets
206                                         = 1 << TDB_SUBLEVEL_HASH_BITS;
207                                 tinfo->levels[tinfo->num_levels].entry
208                                         = group << TDB_HASH_GROUP_BITS;
209                                 tinfo->num_levels++;
210                         }
211                         continue;
212                 }
213
214                 /* It's in this group: search (until 0 or all searched) */
215                 for (i = 0, h->found_bucket = h->home_bucket;
216                      i < (1 << TDB_HASH_GROUP_BITS);
217                      i++, h->found_bucket = ((h->found_bucket+1)
218                                              % (1 << TDB_HASH_GROUP_BITS))) {
219                         if (is_subhash(h->group[h->found_bucket]))
220                                 continue;
221
222                         if (!h->group[h->found_bucket])
223                                 break;
224
225                         if (match(tdb, h, &key, h->group[h->found_bucket],
226                                   rec)) {
227                                 if (tinfo) {
228                                         tinfo->levels[tinfo->num_levels-1].entry
229                                                 += h->found_bucket;
230                                 }
231                                 return h->group[h->found_bucket] & TDB_OFF_MASK;
232                         }
233                 }
234                 /* Didn't find it: h indicates where it would go. */
235                 return 0;
236         }
237
238         /* FIXME: We hit the bottom.  Chain! */
239         abort();
240
241 fail:
242         tdb_unlock_hashes(tdb, h->hlock_start, h->hlock_range, ltype);
243         return TDB_OFF_ERR;
244 }
245
246 /* I wrote a simple test, expanding a hash to 2GB, for the following
247  * cases:
248  * 1) Expanding all the buckets at once,
249  * 2) Expanding the bucket we wanted to place the new entry into.
250  * 3) Expanding the most-populated bucket,
251  *
252  * I measured the worst/average/best density during this process.
253  * 1) 3%/16%/30%
254  * 2) 4%/20%/38%
255  * 3) 6%/22%/41%
256  *
257  * So we figure out the busiest bucket for the moment.
258  */
259 static unsigned fullest_bucket(struct tdb_context *tdb,
260                                const tdb_off_t *group,
261                                unsigned new_bucket)
262 {
263         unsigned counts[1 << TDB_HASH_GROUP_BITS] = { 0 };
264         unsigned int i, best_bucket;
265
266         /* Count the new entry. */
267         counts[new_bucket]++;
268         best_bucket = new_bucket;
269
270         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
271                 unsigned this_bucket;
272
273                 if (is_subhash(group[i]))
274                         continue;
275                 this_bucket = group[i] & TDB_OFF_HASH_GROUP_MASK;
276                 if (++counts[this_bucket] > counts[best_bucket])
277                         best_bucket = this_bucket;
278         }
279
280         return best_bucket;
281 }
282
283 static bool put_into_group(tdb_off_t *group,
284                            unsigned bucket, tdb_off_t encoded)
285 {
286         unsigned int i;
287
288         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
289                 unsigned b = (bucket + i) % (1 << TDB_HASH_GROUP_BITS);
290
291                 if (group[b] == 0) {
292                         group[b] = encoded;
293                         return true;
294                 }
295         }
296         return false;
297 }
298
299 static void force_into_group(tdb_off_t *group,
300                              unsigned bucket, tdb_off_t encoded)
301 {
302         if (!put_into_group(group, bucket, encoded))
303                 abort();
304 }
305
306 static tdb_off_t encode_offset(tdb_off_t new_off, struct hash_info *h)
307 {
308         return h->home_bucket
309                 | new_off
310                 | ((uint64_t)bits(h->h,
311                                   64 - h->hash_used - TDB_OFF_UPPER_STEAL_EXTRA,
312                                   TDB_OFF_UPPER_STEAL_EXTRA)
313                    << TDB_OFF_HASH_EXTRA_BIT);
314 }
315
316 /* Simply overwrite the hash entry we found before. */ 
317 int replace_in_hash(struct tdb_context *tdb,
318                     struct hash_info *h,
319                     tdb_off_t new_off)
320 {
321         return tdb_write_off(tdb, hbucket_off(h->group_start, h->found_bucket),
322                              encode_offset(new_off, h));
323 }
324
325 /* Add into a newly created subhash. */
326 static int add_to_subhash(struct tdb_context *tdb, tdb_off_t subhash,
327                           unsigned hash_used, tdb_off_t val)
328 {
329         tdb_off_t off = (val & TDB_OFF_MASK), *group;
330         struct hash_info h;
331         unsigned int gnum;
332
333         h.hash_used = hash_used;
334
335         /* FIXME chain if hash_used == 64 */
336         if (hash_used + TDB_SUBLEVEL_HASH_BITS > 64)
337                 abort();
338
339         h.h = hash_record(tdb, off);
340         gnum = use_bits(&h, TDB_SUBLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS);
341         h.group_start = subhash + sizeof(struct tdb_used_record)
342                 + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
343         h.home_bucket = use_bits(&h, TDB_HASH_GROUP_BITS);
344
345         group = tdb_access_write(tdb, h.group_start,
346                                  sizeof(*group) << TDB_HASH_GROUP_BITS, true);
347         if (!group)
348                 return -1;
349         force_into_group(group, h.home_bucket, encode_offset(off, &h));
350         return tdb_access_commit(tdb, group);
351 }
352
353 static int expand_group(struct tdb_context *tdb, struct hash_info *h)
354 {
355         unsigned bucket, num_vals, i;
356         tdb_off_t subhash;
357         tdb_off_t vals[1 << TDB_HASH_GROUP_BITS];
358
359         /* Attach new empty subhash under fullest bucket. */
360         bucket = fullest_bucket(tdb, h->group, h->home_bucket);
361
362         subhash = alloc(tdb, 0, sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS,
363                         0, false);
364         if (subhash == TDB_OFF_ERR)
365                 return -1;
366
367         add_stat(tdb, alloc_subhash, 1);
368         if (zero_out(tdb, subhash + sizeof(struct tdb_used_record),
369                      sizeof(tdb_off_t) << TDB_SUBLEVEL_HASH_BITS) == -1)
370                 return -1;
371
372         /* Remove any which are destined for bucket or are in wrong place. */
373         num_vals = 0;
374         for (i = 0; i < (1 << TDB_HASH_GROUP_BITS); i++) {
375                 unsigned home_bucket = h->group[i] & TDB_OFF_HASH_GROUP_MASK;
376                 if (!h->group[i] || is_subhash(h->group[i]))
377                         continue;
378                 if (home_bucket == bucket || home_bucket != i) {
379                         vals[num_vals++] = h->group[i];
380                         h->group[i] = 0;
381                 }
382         }
383         /* FIXME: This assert is valid, but we do this during unit test :( */
384         /* assert(num_vals); */
385
386         /* Overwrite expanded bucket with subhash pointer. */
387         h->group[bucket] = subhash | (1ULL << TDB_OFF_UPPER_STEAL_SUBHASH_BIT);
388
389         /* Put values back. */
390         for (i = 0; i < num_vals; i++) {
391                 unsigned this_bucket = vals[i] & TDB_OFF_HASH_GROUP_MASK;
392
393                 if (this_bucket == bucket) {
394                         if (add_to_subhash(tdb, subhash, h->hash_used, vals[i]))
395                                 return -1;
396                 } else {
397                         /* There should be room to put this back. */
398                         force_into_group(h->group, this_bucket, vals[i]);
399                 }
400         }
401         return 0;
402 }
403
404 int delete_from_hash(struct tdb_context *tdb, struct hash_info *h)
405 {
406         unsigned int i, num_movers = 0;
407         tdb_off_t movers[1 << TDB_HASH_GROUP_BITS];
408
409         h->group[h->found_bucket] = 0;
410         for (i = 1; i < (1 << TDB_HASH_GROUP_BITS); i++) {
411                 unsigned this_bucket;
412
413                 this_bucket = (h->found_bucket+i) % (1 << TDB_HASH_GROUP_BITS);
414                 /* Empty bucket?  We're done. */
415                 if (!h->group[this_bucket])
416                         break;
417
418                 /* Ignore subhashes. */
419                 if (is_subhash(h->group[this_bucket]))
420                         continue;
421
422                 /* If this one is not happy where it is, we'll move it. */
423                 if ((h->group[this_bucket] & TDB_OFF_HASH_GROUP_MASK)
424                     != this_bucket) {
425                         movers[num_movers++] = h->group[this_bucket];
426                         h->group[this_bucket] = 0;
427                 }
428         }
429
430         /* Put back the ones we erased. */
431         for (i = 0; i < num_movers; i++) {
432                 force_into_group(h->group, movers[i] & TDB_OFF_HASH_GROUP_MASK,
433                                  movers[i]);
434         }
435
436         /* Now we write back the hash group */
437         return tdb_write_convert(tdb, h->group_start,
438                                  h->group, sizeof(h->group));
439 }
440
441 int add_to_hash(struct tdb_context *tdb, struct hash_info *h, tdb_off_t new_off)
442 {
443         /* FIXME: chain! */
444         if (h->hash_used >= 64)
445                 abort();
446
447         /* We hit an empty bucket during search?  That's where it goes. */
448         if (!h->group[h->found_bucket]) {
449                 h->group[h->found_bucket] = encode_offset(new_off, h);
450                 /* Write back the modified group. */
451                 return tdb_write_convert(tdb, h->group_start,
452                                          h->group, sizeof(h->group));
453         }
454
455         /* We're full.  Expand. */
456         if (expand_group(tdb, h) == -1)
457                 return -1;
458
459         if (is_subhash(h->group[h->home_bucket])) {
460                 /* We were expanded! */
461                 tdb_off_t hashtable;
462                 unsigned int gnum;
463
464                 /* Write back the modified group. */
465                 if (tdb_write_convert(tdb, h->group_start, h->group,
466                                       sizeof(h->group)))
467                         return -1;
468
469                 /* Move hashinfo down a level. */
470                 hashtable = (h->group[h->home_bucket] & TDB_OFF_MASK)
471                         + sizeof(struct tdb_used_record);
472                 gnum = use_bits(h,TDB_SUBLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
473                 h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
474                 h->group_start = hashtable
475                         + gnum * (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
476                 if (tdb_read_convert(tdb, h->group_start, &h->group,
477                                      sizeof(h->group)) == -1)
478                         return -1;
479         }
480
481         /* Expanding the group must have made room if it didn't choose this
482          * bucket. */
483         if (put_into_group(h->group, h->home_bucket, encode_offset(new_off, h)))
484                 return tdb_write_convert(tdb, h->group_start,
485                                          h->group, sizeof(h->group));
486
487         /* This can happen if all hashes in group (and us) dropped into same
488          * group in subhash. */
489         return add_to_hash(tdb, h, new_off);
490 }
491
492 /* Traverse support: returns offset of record, or 0 or TDB_OFF_ERR. */
493 static tdb_off_t iterate_hash(struct tdb_context *tdb,
494                               struct traverse_info *tinfo)
495 {
496         tdb_off_t off, val;
497         unsigned int i;
498         struct traverse_level *tlevel;
499
500         tlevel = &tinfo->levels[tinfo->num_levels-1];
501
502 again:
503         for (i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
504                                       tlevel->entry, tlevel->total_buckets);
505              i != tlevel->total_buckets;
506              i = tdb_find_nonzero_off(tdb, tlevel->hashtable,
507                                       i+1, tlevel->total_buckets)) {
508                 val = tdb_read_off(tdb, tlevel->hashtable+sizeof(tdb_off_t)*i);
509                 if (unlikely(val == TDB_OFF_ERR))
510                         return TDB_OFF_ERR;
511
512                 off = val & TDB_OFF_MASK;
513
514                 /* This makes the delete-all-in-traverse case work
515                  * (and simplifies our logic a little). */
516                 if (off == tinfo->prev)
517                         continue;
518
519                 tlevel->entry = i;
520
521                 if (!is_subhash(val)) {
522                         /* Found one. */
523                         tinfo->prev = off;
524                         return off;
525                 }
526
527                 /* When we come back, we want the next one */
528                 tlevel->entry++;
529                 tinfo->num_levels++;
530                 tlevel++;
531                 tlevel->hashtable = off + sizeof(struct tdb_used_record);
532                 tlevel->entry = 0;
533                 tlevel->total_buckets = (1 << TDB_SUBLEVEL_HASH_BITS);
534                 goto again;
535         }
536
537         /* Nothing there? */
538         if (tinfo->num_levels == 1)
539                 return 0;
540
541         /* Go back up and keep searching. */
542         tinfo->num_levels--;
543         tlevel--;
544         goto again;
545 }
546
547 /* Return 1 if we find something, 0 if not, -1 on error. */
548 int next_in_hash(struct tdb_context *tdb, int ltype,
549                  struct traverse_info *tinfo,
550                  TDB_DATA *kbuf, size_t *dlen)
551 {
552         const unsigned group_bits = TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS;
553         tdb_off_t hlock_start, hlock_range, off;
554
555         while (tinfo->toplevel_group < (1 << group_bits)) {
556                 hlock_start = (tdb_off_t)tinfo->toplevel_group
557                         << (64 - group_bits);
558                 hlock_range = 1ULL << group_bits;
559                 if (tdb_lock_hashes(tdb, hlock_start, hlock_range, ltype,
560                                     TDB_LOCK_WAIT) != 0)
561                         return -1;
562
563                 off = iterate_hash(tdb, tinfo);
564                 if (off) {
565                         struct tdb_used_record rec;
566
567                         if (tdb_read_convert(tdb, off, &rec, sizeof(rec))) {
568                                 tdb_unlock_hashes(tdb,
569                                                   hlock_start, hlock_range,
570                                                   ltype);
571                                 return -1;
572                         }
573                         if (rec_magic(&rec) != TDB_MAGIC) {
574                                 tdb_logerr(tdb, TDB_ERR_CORRUPT,
575                                            TDB_DEBUG_FATAL,
576                                            "next_in_hash:"
577                                            " corrupt record at %llu",
578                                            (long long)off);
579                                 return -1;
580                         }
581
582                         kbuf->dsize = rec_key_length(&rec);
583
584                         /* They want data as well? */
585                         if (dlen) {
586                                 *dlen = rec_data_length(&rec);
587                                 kbuf->dptr = tdb_alloc_read(tdb, 
588                                                             off + sizeof(rec),
589                                                             kbuf->dsize
590                                                             + *dlen);
591                         } else {
592                                 kbuf->dptr = tdb_alloc_read(tdb, 
593                                                             off + sizeof(rec),
594                                                             kbuf->dsize);
595                         }
596                         tdb_unlock_hashes(tdb, hlock_start, hlock_range, ltype);
597                         return kbuf->dptr ? 1 : -1;
598                 }
599
600                 tdb_unlock_hashes(tdb, hlock_start, hlock_range, ltype);
601
602                 tinfo->toplevel_group++;
603                 tinfo->levels[0].hashtable
604                         += (sizeof(tdb_off_t) << TDB_HASH_GROUP_BITS);
605                 tinfo->levels[0].entry = 0;
606         }
607         return 0;
608 }
609
610 /* Return 1 if we find something, 0 if not, -1 on error. */
611 int first_in_hash(struct tdb_context *tdb, int ltype,
612                   struct traverse_info *tinfo,
613                   TDB_DATA *kbuf, size_t *dlen)
614 {
615         tinfo->prev = 0;
616         tinfo->toplevel_group = 0;
617         tinfo->num_levels = 1;
618         tinfo->levels[0].hashtable = offsetof(struct tdb_header, hashtable);
619         tinfo->levels[0].entry = 0;
620         tinfo->levels[0].total_buckets = (1 << TDB_HASH_GROUP_BITS);
621
622         return next_in_hash(tdb, ltype, tinfo, kbuf, dlen);
623 }
624
625 /* Even if the entry isn't in this hash bucket, you'd have to lock this
626  * bucket to find it. */
627 static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
628                      int ltype, enum tdb_lock_flags waitflag,
629                      const char *func)
630 {
631         int ret;
632         uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
633         tdb_off_t lockstart, locksize;
634         unsigned int group, gbits;
635
636         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
637         group = bits(h, 64 - gbits, gbits);
638
639         lockstart = hlock_range(group, &locksize);
640
641         ret = tdb_lock_hashes(tdb, lockstart, locksize, ltype, waitflag);
642         tdb_trace_1rec(tdb, func, *key);
643         return ret;
644 }
645
646 /* lock/unlock one hash chain. This is meant to be used to reduce
647    contention - it cannot guarantee how many records will be locked */
648 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
649 {
650         return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
651 }
652
653 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
654 {
655         uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
656         tdb_off_t lockstart, locksize;
657         unsigned int group, gbits;
658
659         gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
660         group = bits(h, 64 - gbits, gbits);
661
662         lockstart = hlock_range(group, &locksize);
663
664         tdb_trace_1rec(tdb, "tdb_chainunlock", key);
665         return tdb_unlock_hashes(tdb, lockstart, locksize, F_WRLCK);
666 }