tdb2: more fixes and tests for enlarging hash.
[ccan] / ccan / tdb2 / free.c
1  /* 
2    Trivial Database 2: free list/block handling
3    Copyright (C) Rusty Russell 2010
4    
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 3 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "private.h"
19 #include <ccan/likely/likely.h>
20 #include <time.h>
21 #include <assert.h>
22 #include <limits.h>
23
24 /* We have to be able to fit a free record here. */
25 #define MIN_DATA_LEN    \
26         (sizeof(struct tdb_free_record) - sizeof(struct tdb_used_record))
27
28 /* We have a series of free lists, each one covering a "zone" of the file.
29  *
30  * For each zone we have a series of per-size buckets, and a final bucket for
31  * "too big".
32  *
33  * It's possible to move the free_list_head, but *only* under the allrecord
34  * lock. */
35 static tdb_off_t free_list_off(struct tdb_context *tdb, unsigned int list)
36 {
37         return tdb->header.v.free_off + list * sizeof(tdb_off_t);
38 }
39
40 /* We're a library: playing with srandom() is unfriendly.  srandom_r
41  * probably lacks portability.  We don't need very random here. */
42 static unsigned int quick_random(struct tdb_context *tdb)
43 {
44         return getpid() + time(NULL) + (unsigned long)tdb;
45 }
46
47 /* Start by using a random zone to spread the load. */
48 void tdb_zone_init(struct tdb_context *tdb)
49 {
50         /*
51          * We read num_zones without a proper lock, so we could have
52          * gotten a partial read.  Since zone_bits is 1 byte long, we
53          * can trust that; even if it's increased, the number of zones
54          * cannot have decreased.  And using the map size means we
55          * will not start with a zone which hasn't been filled yet.
56          */
57         tdb->last_zone = quick_random(tdb)
58                 % ((tdb->map_size >> tdb->header.v.zone_bits) + 1);
59 }
60
61 static unsigned fls64(uint64_t val)
62 {
63 #if HAVE_BUILTIN_CLZL
64         if (val <= ULONG_MAX) {
65                 /* This is significantly faster! */
66                 return val ? sizeof(long) * CHAR_BIT - __builtin_clzl(val) : 0;
67         } else {
68 #endif
69         uint64_t r = 64;
70
71         if (!val)
72                 return 0;
73         if (!(val & 0xffffffff00000000ull)) {
74                 val <<= 32;
75                 r -= 32;
76         }
77         if (!(val & 0xffff000000000000ull)) {
78                 val <<= 16;
79                 r -= 16;
80         }
81         if (!(val & 0xff00000000000000ull)) {
82                 val <<= 8;
83                 r -= 8;
84         }
85         if (!(val & 0xf000000000000000ull)) {
86                 val <<= 4;
87                 r -= 4;
88         }
89         if (!(val & 0xc000000000000000ull)) {
90                 val <<= 2;
91                 r -= 2;
92         }
93         if (!(val & 0x8000000000000000ull)) {
94                 val <<= 1;
95                 r -= 1;
96         }
97         return r;
98 #if HAVE_BUILTIN_CLZL
99         }
100 #endif
101 }
102
103 /* In which bucket would we find a particular record size? (ignoring header) */
104 unsigned int size_to_bucket(struct tdb_context *tdb, tdb_len_t data_len)
105 {
106         unsigned int bucket;
107
108         /* We can't have records smaller than this. */
109         assert(data_len >= MIN_DATA_LEN);
110
111         /* Ignoring the header... */
112         if (data_len - MIN_DATA_LEN <= 64) {
113                 /* 0 in bucket 0, 8 in bucket 1... 64 in bucket 6. */
114                 bucket = (data_len - MIN_DATA_LEN) / 8;
115         } else {
116                 /* After that we go power of 2. */
117                 bucket = fls64(data_len - MIN_DATA_LEN) + 2;
118         }
119
120         if (unlikely(bucket > tdb->header.v.free_buckets))
121                 bucket = tdb->header.v.free_buckets;
122         return bucket;
123 }
124
125 /* What zone does a block belong in? */ 
126 tdb_off_t zone_of(struct tdb_context *tdb, tdb_off_t off)
127 {
128         assert(tdb->header_uptodate);
129
130         return off >> tdb->header.v.zone_bits;
131 }
132
133 /* Returns free_buckets + 1, or list number to search. */
134 static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket)
135 {
136         tdb_off_t first, off;
137
138         /* Speculatively search for a non-zero bucket. */
139         first = tdb->last_zone * (tdb->header.v.free_buckets+1) + bucket;
140         off = tdb_find_nonzero_off(tdb, free_list_off(tdb, first),
141                                    tdb->header.v.free_buckets + 1 - bucket);
142         return bucket + off;
143 }
144
145 static int remove_from_list(struct tdb_context *tdb,
146                             tdb_off_t list, struct tdb_free_record *r)
147 {
148         tdb_off_t off;
149
150         /* Front of list? */
151         if (r->prev == 0) {
152                 off = free_list_off(tdb, list);
153         } else {
154                 off = r->prev + offsetof(struct tdb_free_record, next);
155         }
156         /* r->prev->next = r->next */
157         if (tdb_write_off(tdb, off, r->next)) {
158                 return -1;
159         }
160
161         if (r->next != 0) {
162                 off = r->next + offsetof(struct tdb_free_record, prev);
163                 /* r->next->prev = r->prev */
164                 if (tdb_write_off(tdb, off, r->prev)) {
165                         return -1;
166                 }
167         }
168         return 0;
169 }
170
171 /* Enqueue in this free list. */
172 static int enqueue_in_free(struct tdb_context *tdb,
173                            tdb_off_t list,
174                            tdb_off_t off,
175                            struct tdb_free_record *new)
176 {
177         new->prev = 0;
178         /* new->next = head. */
179         new->next = tdb_read_off(tdb, free_list_off(tdb, list));
180         if (new->next == TDB_OFF_ERR)
181                 return -1;
182
183         if (new->next) {
184                 /* next->prev = new. */
185                 if (tdb_write_off(tdb, new->next
186                                   + offsetof(struct tdb_free_record, prev),
187                                   off) != 0)
188                         return -1;
189         }
190         /* head = new */
191         if (tdb_write_off(tdb, free_list_off(tdb, list), off) != 0)
192                 return -1;
193         
194         return tdb_write_convert(tdb, off, new, sizeof(*new));
195 }
196
197 /* List isn't locked. */
198 int add_free_record(struct tdb_context *tdb,
199                     tdb_off_t off, tdb_len_t len_with_header)
200 {
201         struct tdb_free_record new;
202         tdb_off_t list;
203         int ret;
204
205         assert(len_with_header >= sizeof(new));
206
207         new.magic = TDB_FREE_MAGIC;
208         new.data_len = len_with_header - sizeof(struct tdb_used_record);
209
210         tdb->last_zone = zone_of(tdb, off);
211         list = tdb->last_zone * (tdb->header.v.free_buckets+1)
212                 + size_to_bucket(tdb, new.data_len);
213
214         if (tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) != 0)
215                 return -1;
216
217         ret = enqueue_in_free(tdb, list, off, &new);
218         tdb_unlock_free_list(tdb, list);
219         return ret;
220 }
221
222 /* If we have enough left over to be useful, split that off. */
223 static int to_used_record(struct tdb_context *tdb,
224                           tdb_off_t off,
225                           tdb_len_t needed,
226                           tdb_len_t total_len,
227                           tdb_len_t *actual)
228 {
229         struct tdb_used_record used;
230         tdb_len_t leftover;
231
232         leftover = total_len - needed;
233         if (leftover < sizeof(struct tdb_free_record))
234                 leftover = 0;
235
236         *actual = total_len - leftover;
237
238         if (leftover) {
239                 if (add_free_record(tdb, off + sizeof(used) + *actual,
240                                     total_len - needed))
241                         return -1;
242         }
243         return 0;
244 }
245
246 /* Note: we unlock the current list if we coalesce or fail. */
247 static int coalesce(struct tdb_context *tdb, tdb_off_t off,
248                     tdb_off_t list, tdb_len_t data_len)
249 {
250         struct tdb_free_record pad, *r;
251         tdb_off_t end = off + sizeof(struct tdb_used_record) + data_len;
252
253         while (!tdb->methods->oob(tdb, end + sizeof(*r), 1)) {
254                 tdb_off_t nlist;
255
256                 r = tdb_get(tdb, end, &pad, sizeof(pad));
257                 if (!r)
258                         goto err;
259
260                 if (r->magic != TDB_FREE_MAGIC)
261                         break;
262
263                 nlist = zone_of(tdb, end) * (tdb->header.v.free_buckets+1)
264                         + size_to_bucket(tdb, r->data_len);
265
266                 /* We may be violating lock order here, so best effort. */
267                 if (tdb_lock_free_list(tdb, nlist, TDB_LOCK_NOWAIT) == -1)
268                         break;
269
270                 /* Now we have lock, re-check. */
271                 r = tdb_get(tdb, end, &pad, sizeof(pad));
272                 if (!r) {
273                         tdb_unlock_free_list(tdb, nlist);
274                         goto err;
275                 }
276
277                 if (unlikely(r->magic != TDB_FREE_MAGIC)) {
278                         tdb_unlock_free_list(tdb, nlist);
279                         break;
280                 }
281
282                 if (remove_from_list(tdb, list, r) == -1) {
283                         tdb_unlock_free_list(tdb, nlist);
284                         goto err;
285                 }
286
287                 end += sizeof(struct tdb_used_record) + r->data_len;
288                 tdb_unlock_free_list(tdb, nlist);
289         }
290
291         /* Didn't find any adjacent free? */
292         if (end == off + sizeof(struct tdb_used_record) + data_len)
293                 return 0;
294
295         /* OK, expand record */
296         r = tdb_get(tdb, off, &pad, sizeof(pad));
297         if (!r)
298                 goto err;
299
300         if (remove_from_list(tdb, list, r) == -1)
301                 goto err;
302
303         /* We have to drop this to avoid deadlocks. */
304         tdb_unlock_free_list(tdb, list);
305
306         if (add_free_record(tdb, off, end - off) == -1)
307                 return -1;
308         return 1;
309
310 err:
311         /* To unify error paths, we *always* unlock list. */
312         tdb_unlock_free_list(tdb, list);
313         return -1;
314 }
315
316 /* We need size bytes to put our key and data in. */
317 static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
318                                 tdb_off_t bucket, size_t size,
319                                 tdb_len_t *actual)
320 {
321         tdb_off_t list;
322         tdb_off_t off, best_off;
323         struct tdb_free_record pad, best = { 0 }, *r;
324         double multiplier;
325
326 again:
327         list = tdb->last_zone * (tdb->header.v.free_buckets+1) + bucket;
328
329         /* Lock this list. */
330         if (tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == -1) {
331                 return TDB_OFF_ERR;
332         }
333
334         best.data_len = -1ULL;
335         best_off = 0;
336         multiplier = 1.0;
337
338         /* Walk the list to see if any are large enough, getting less fussy
339          * as we go. */
340         off = tdb_read_off(tdb, free_list_off(tdb, list));
341         if (unlikely(off == TDB_OFF_ERR))
342                 goto unlock_err;
343
344         while (off) {
345                 r = tdb_get(tdb, off, &pad, sizeof(*r));
346                 if (!r)
347                         goto unlock_err;
348
349                 if (r->magic != TDB_FREE_MAGIC) {
350                         tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
351                                  "lock_and_alloc: %llu non-free 0x%llx\n",
352                                  (long long)off, (long long)r->magic);
353                         goto unlock_err;
354                 }
355
356                 if (r->data_len >= size && r->data_len < best.data_len) {
357                         best_off = off;
358                         best = *r;
359                 }
360
361                 if (best.data_len < size * multiplier && best_off)
362                         goto use_best;
363
364                 multiplier *= 1.01;
365
366                 /* Since we're going slow anyway, try coalescing here. */
367                 switch (coalesce(tdb, off, list, r->data_len)) {
368                 case -1:
369                         /* This has already unlocked on error. */
370                         return -1;
371                 case 1:
372                         /* This has unlocked list, restart. */
373                         goto again;
374                 }
375                 off = r->next;
376         }
377
378         /* If we found anything at all, use it. */
379         if (best_off) {
380         use_best:
381                 /* We're happy with this size: take it. */
382                 if (remove_from_list(tdb, list, &best) != 0)
383                         goto unlock_err;
384                 tdb_unlock_free_list(tdb, list);
385
386                 if (to_used_record(tdb, best_off, size, best.data_len,
387                                    actual)) {
388                         return -1;
389                 }
390                 return best_off;
391         }
392
393         tdb_unlock_free_list(tdb, list);
394         return 0;
395
396 unlock_err:
397         tdb_unlock_free_list(tdb, list);
398         return TDB_OFF_ERR;
399 }
400
401 /* We want a really big chunk.  Look through every zone's oversize bucket */
402 static tdb_off_t huge_alloc(struct tdb_context *tdb, size_t size,
403                             tdb_len_t *actual)
404 {
405         tdb_off_t i, off;
406
407         for (i = 0; i < tdb->header.v.num_zones; i++) {
408                 /* Try getting one from list. */
409                 off = lock_and_alloc(tdb, tdb->header.v.free_buckets,
410                                      size, actual);
411                 if (off == TDB_OFF_ERR)
412                         return TDB_OFF_ERR;
413                 if (off != 0)
414                         return off;
415                 /* FIXME: Coalesce! */
416         }
417         return 0;
418 }
419
420 static tdb_off_t get_free(struct tdb_context *tdb, size_t size,
421                           tdb_len_t *actual)
422 {
423         tdb_off_t off, bucket;
424         unsigned int num_empty, step = 0;
425
426         bucket = size_to_bucket(tdb, size);
427
428         /* If we're after something bigger than a single zone, handle
429          * specially. */
430         if (unlikely(sizeof(struct tdb_used_record) + size
431                      >= (1ULL << tdb->header.v.zone_bits))) {
432                 return huge_alloc(tdb, size, actual);
433         }
434
435         /* Number of zones we search is proportional to the log of them. */
436         for (num_empty = 0; num_empty < fls64(tdb->header.v.num_zones);
437              num_empty++) {
438                 tdb_off_t b;
439
440                 /* Start at exact size bucket, and search up... */
441                 for (b = bucket; b <= tdb->header.v.free_buckets; b++) {
442                         b = find_free_head(tdb, b);
443
444                         /* Non-empty list?  Try getting block. */
445                         if (b <= tdb->header.v.free_buckets) {
446                                 /* Try getting one from list. */
447                                 off = lock_and_alloc(tdb, b, size, actual);
448                                 if (off == TDB_OFF_ERR)
449                                         return TDB_OFF_ERR;
450                                 if (off != 0)
451                                         return off;
452                                 /* Didn't work.  Try next bucket. */
453                         }
454                 }
455
456                 /* Try another zone, at pseudo random.  Avoid duplicates by
457                    using an odd step. */
458                 if (step == 0)
459                         step = ((quick_random(tdb)) % 65536) * 2 + 1;
460                 tdb->last_zone = (tdb->last_zone + step)
461                         % tdb->header.v.num_zones;
462         }
463         return 0;
464 }
465
466 int set_header(struct tdb_context *tdb,
467                struct tdb_used_record *rec,
468                uint64_t keylen, uint64_t datalen,
469                uint64_t actuallen, uint64_t hash)
470 {
471         uint64_t keybits = (fls64(keylen) + 1) / 2;
472
473         /* Use top bits of hash, so it's independent of hash table size. */
474         rec->magic_and_meta
475                 = (actuallen - (keylen + datalen))
476                 | ((hash >> 53) << 32)
477                 | (keybits << 43)
478                 | (TDB_MAGIC << 48);
479         rec->key_and_data_len = (keylen | (datalen << (keybits*2)));
480
481         /* Encoding can fail on big values. */
482         if (rec_key_length(rec) != keylen
483             || rec_data_length(rec) != datalen
484             || rec_extra_padding(rec) != actuallen - (keylen + datalen)) {
485                 tdb->ecode = TDB_ERR_IO;
486                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
487                          "Could not encode k=%llu,d=%llu,a=%llu\n",
488                          (long long)keylen, (long long)datalen,
489                          (long long)actuallen);
490                 return -1;
491         }
492         return 0;
493 }
494
495 static tdb_len_t adjust_size(size_t keylen, size_t datalen, bool growing)
496 {
497         tdb_len_t size = keylen + datalen;
498
499         if (size < MIN_DATA_LEN)
500                 size = MIN_DATA_LEN;
501
502         /* Overallocate if this is coming from an enlarging store. */
503         if (growing)
504                 size += datalen / 2;
505
506         /* Round to next uint64_t boundary. */
507         return (size + (sizeof(uint64_t) - 1ULL)) & ~(sizeof(uint64_t) - 1ULL);
508 }
509
510 /* If this fails, try tdb_expand. */
511 tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
512                 uint64_t hash, bool growing)
513 {
514         tdb_off_t off;
515         tdb_len_t size, actual;
516         struct tdb_used_record rec;
517
518         /* We don't want header to change during this! */
519         assert(tdb->header_uptodate);
520
521         size = adjust_size(keylen, datalen, growing);
522
523         off = get_free(tdb, size, &actual);
524         if (unlikely(off == TDB_OFF_ERR || off == 0))
525                 return off;
526
527         /* Some supergiant values can't be encoded. */
528         if (set_header(tdb, &rec, keylen, datalen, actual, hash) != 0) {
529                 add_free_record(tdb, off, sizeof(rec) + actual);
530                 return TDB_OFF_ERR;
531         }
532
533         if (tdb_write_convert(tdb, off, &rec, sizeof(rec)) != 0)
534                 return TDB_OFF_ERR;
535         
536         return off;
537 }
538
539 static bool larger_buckets_might_help(struct tdb_context *tdb)
540 {
541         /* If our buckets are already covering 1/8 of a zone, don't
542          * bother (note: might become an 1/16 of a zone if we double
543          * zone size). */
544         tdb_len_t size = (1ULL << tdb->header.v.zone_bits) / 8;
545
546         if (size >= MIN_DATA_LEN
547             && size_to_bucket(tdb, size) < tdb->header.v.free_buckets) {
548                 return false;
549         }
550
551         /* FIXME: Put stats in tdb_context or examine db itself! */
552         /* It's fairly cheap to do as we expand database. */
553         return true;
554 }
555
556 static bool zones_happy(struct tdb_context *tdb)
557 {
558         /* FIXME: look at distribution of zones. */
559         return true;
560 }
561
562 /* Expand the database. */
563 int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
564                bool growing)
565 {
566         uint64_t new_num_buckets, new_num_zones, new_zone_bits;
567         uint64_t i, old_num_total, old_num_zones, old_size, old_zone_bits;
568         tdb_len_t add, freebucket_size, needed;
569         tdb_off_t off, old_free_off;
570         const tdb_off_t *oldf;
571         struct tdb_used_record fhdr;
572
573         /* We need room for the record header too. */
574         needed = sizeof(struct tdb_used_record)
575                 + adjust_size(klen, dlen, growing);
576
577         /* tdb_allrecord_lock will update header; did zones change? */
578         old_zone_bits = tdb->header.v.zone_bits;
579         old_num_zones = tdb->header.v.num_zones;
580
581         /* FIXME: this is overkill.  An expand lock? */
582         if (tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false) == -1)
583                 return -1;
584
585         /* Someone may have expanded for us. */
586         if (old_zone_bits != tdb->header.v.zone_bits
587             || old_num_zones != tdb->header.v.num_zones)
588                 goto success;
589
590         /* They may have also expanded the underlying size (otherwise we'd
591          * have expanded our mmap to look at those offsets already). */
592         old_size = tdb->map_size;
593         tdb->methods->oob(tdb, tdb->map_size + 1, true);
594         if (tdb->map_size != old_size)
595                 goto success;
596
597         /* Did we enlarge zones without enlarging file? */
598         if (tdb->map_size < tdb->header.v.num_zones<<tdb->header.v.zone_bits) {
599                 add = (tdb->header.v.num_zones<<tdb->header.v.zone_bits)
600                         - tdb->map_size;
601                 /* Updates tdb->map_size. */
602                 if (tdb->methods->expand_file(tdb, add) == -1)
603                         goto fail;
604                 if (add_free_record(tdb, tdb->map_size - add, add) == -1)
605                         goto fail;
606                 if (add >= needed) {
607                         /* Allocate from this zone. */
608                         tdb->last_zone = zone_of(tdb, tdb->map_size - add);
609                         goto success;
610                 }
611         }
612
613         /* Slow path.  Should we increase the number of buckets? */
614         new_num_buckets = tdb->header.v.free_buckets;
615         if (larger_buckets_might_help(tdb))
616                 new_num_buckets++;
617
618         /* Now we'll need room for the new free buckets, too.  Assume
619          * worst case (zones expand). */
620         needed += sizeof(fhdr)
621                 + ((tdb->header.v.num_zones+1)
622                    * (new_num_buckets+1) * sizeof(tdb_off_t));
623
624         /* If we need less that one zone, and they're working well, just add
625          * another one. */
626         if (needed < (1UL<<tdb->header.v.zone_bits) && zones_happy(tdb)) {
627                 new_num_zones = tdb->header.v.num_zones+1;
628                 new_zone_bits = tdb->header.v.zone_bits;
629                 add = 1ULL << tdb->header.v.zone_bits;
630         } else {
631                 /* Increase the zone size. */
632                 new_num_zones = tdb->header.v.num_zones;
633                 new_zone_bits = tdb->header.v.zone_bits+1;
634                 while ((new_num_zones << new_zone_bits)
635                        < tdb->map_size + needed) {
636                         new_zone_bits++;
637                 }
638
639                 /* We expand by enough full zones to meet the need. */
640                 add = ((tdb->map_size + needed + (1ULL << new_zone_bits)-1)
641                        & ~((1ULL << new_zone_bits)-1))
642                         - tdb->map_size;
643         }
644
645         /* Updates tdb->map_size. */
646         if (tdb->methods->expand_file(tdb, add) == -1)
647                 goto fail;
648
649         /* Use first part as new free bucket array. */
650         off = tdb->map_size - add;
651         freebucket_size = new_num_zones
652                 * (new_num_buckets + 1) * sizeof(tdb_off_t);
653
654         /* Write header. */
655         if (set_header(tdb, &fhdr, 0, freebucket_size, freebucket_size, 0))
656                 goto fail;
657         if (tdb_write_convert(tdb, off, &fhdr, sizeof(fhdr)) == -1)
658                 goto fail;
659
660         /* Adjust off to point to start of buckets, add to be remainder. */
661         add -= freebucket_size + sizeof(fhdr);
662         off += sizeof(fhdr);
663
664         /* Access the old zones. */
665         old_num_total = tdb->header.v.num_zones*(tdb->header.v.free_buckets+1);
666         old_free_off = tdb->header.v.free_off;
667         oldf = tdb_access_read(tdb, old_free_off,
668                                old_num_total * sizeof(tdb_off_t), true);
669         if (!oldf)
670                 goto fail;
671
672         /* Switch to using our new zone. */
673         if (zero_out(tdb, off, freebucket_size) == -1)
674                 goto fail_release;
675
676         tdb->header.v.free_off = off;
677         tdb->header.v.num_zones = new_num_zones;
678         tdb->header.v.zone_bits = new_zone_bits;
679         tdb->header.v.free_buckets = new_num_buckets;
680
681         /* FIXME: If zone size hasn't changed, can simply copy pointers. */
682         /* FIXME: Coalesce? */
683         for (i = 0; i < old_num_total; i++) {
684                 tdb_off_t next;
685                 struct tdb_free_record rec;
686                 tdb_off_t list;
687
688                 for (off = oldf[i]; off; off = next) {
689                         if (tdb_read_convert(tdb, off, &rec, sizeof(rec)))
690                                 goto fail_release;
691
692                         list = zone_of(tdb, off)
693                                 * (tdb->header.v.free_buckets+1)
694                                 + size_to_bucket(tdb, rec.data_len);
695                         next = rec.next;
696                 
697                         if (enqueue_in_free(tdb, list, off, &rec) == -1)
698                                 goto fail_release;
699                 }
700         }
701
702         /* Free up the old free buckets. */
703         old_free_off -= sizeof(fhdr);
704         if (tdb_read_convert(tdb, old_free_off, &fhdr, sizeof(fhdr)) == -1)
705                 goto fail_release;
706         if (add_free_record(tdb, old_free_off,
707                             sizeof(fhdr)
708                             + rec_data_length(&fhdr)
709                             + rec_extra_padding(&fhdr)))
710                 goto fail_release;
711
712         /* Add the rest as a new free record. */
713         if (add_free_record(tdb, tdb->map_size - add, add) == -1)
714                 goto fail_release;
715
716         /* Start allocating from where the new space is. */
717         tdb->last_zone = zone_of(tdb, tdb->map_size - add);
718         tdb_access_release(tdb, oldf);
719         if (write_header(tdb) == -1)
720                 goto fail;
721 success:
722         tdb_allrecord_unlock(tdb, F_WRLCK);
723         return 0;
724
725 fail_release:
726         tdb_access_release(tdb, oldf);
727 fail:
728         tdb_allrecord_unlock(tdb, F_WRLCK);
729         return -1;
730 }