tdb2: tdb_expand on empty database now tested.
[ccan] / ccan / tdb2 / free.c
1  /* 
2    Trivial Database 2: free list/block handling
3    Copyright (C) Rusty Russell 2010
4    
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 3 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "private.h"
19 #include <ccan/likely/likely.h>
20 #include <time.h>
21 #include <assert.h>
22 #include <limits.h>
23
24 /* We have to be able to fit a free record here. */
25 #define MIN_DATA_LEN    \
26         (sizeof(struct tdb_free_record) - sizeof(struct tdb_used_record))
27
28 /* We have a series of free lists, each one covering a "zone" of the file.
29  *
30  * For each zone we have a series of per-size buckets, and a final bucket for
31  * "too big".
32  *
33  * It's possible to move the free_list_head, but *only* under the allrecord
34  * lock. */
35 static tdb_off_t free_list_off(struct tdb_context *tdb, unsigned int list)
36 {
37         return tdb->header.v.free_off + list * sizeof(tdb_off_t);
38 }
39
40 /* We're a library: playing with srandom() is unfriendly.  srandom_r
41  * probably lacks portability.  We don't need very random here. */
42 static unsigned int quick_random(struct tdb_context *tdb)
43 {
44         return getpid() + time(NULL) + (unsigned long)tdb;
45 }
46
47 /* Start by using a random zone to spread the load. */
48 void tdb_zone_init(struct tdb_context *tdb)
49 {
50         /*
51          * We read num_zones without a proper lock, so we could have
52          * gotten a partial read.  Since zone_bits is 1 byte long, we
53          * can trust that; even if it's increased, the number of zones
54          * cannot have decreased.  And using the map size means we
55          * will not start with a zone which hasn't been filled yet.
56          */
57         tdb->last_zone = quick_random(tdb)
58                 % ((tdb->map_size >> tdb->header.v.zone_bits) + 1);
59 }
60
61 static unsigned fls64(uint64_t val)
62 {
63 #if HAVE_BUILTIN_CLZL
64         if (val <= ULONG_MAX) {
65                 /* This is significantly faster! */
66                 return val ? sizeof(long) * CHAR_BIT - __builtin_clzl(val) : 0;
67         } else {
68 #endif
69         uint64_t r = 64;
70
71         if (!val)
72                 return 0;
73         if (!(val & 0xffffffff00000000ull)) {
74                 val <<= 32;
75                 r -= 32;
76         }
77         if (!(val & 0xffff000000000000ull)) {
78                 val <<= 16;
79                 r -= 16;
80         }
81         if (!(val & 0xff00000000000000ull)) {
82                 val <<= 8;
83                 r -= 8;
84         }
85         if (!(val & 0xf000000000000000ull)) {
86                 val <<= 4;
87                 r -= 4;
88         }
89         if (!(val & 0xc000000000000000ull)) {
90                 val <<= 2;
91                 r -= 2;
92         }
93         if (!(val & 0x8000000000000000ull)) {
94                 val <<= 1;
95                 r -= 1;
96         }
97         return r;
98 #if HAVE_BUILTIN_CLZL
99         }
100 #endif
101 }
102
103 /* In which bucket would we find a particular record size? (ignoring header) */
104 unsigned int size_to_bucket(struct tdb_context *tdb, tdb_len_t data_len)
105 {
106         unsigned int bucket;
107
108         /* We can't have records smaller than this. */
109         assert(data_len >= MIN_DATA_LEN);
110
111         /* Ignoring the header... */
112         if (data_len - MIN_DATA_LEN <= 64) {
113                 /* 0 in bucket 0, 8 in bucket 1... 64 in bucket 6. */
114                 bucket = (data_len - MIN_DATA_LEN) / 8;
115         } else {
116                 /* After that we go power of 2. */
117                 bucket = fls64(data_len - MIN_DATA_LEN) + 2;
118         }
119
120         if (unlikely(bucket > tdb->header.v.free_buckets))
121                 bucket = tdb->header.v.free_buckets;
122         return bucket;
123 }
124
125 /* What zone does a block belong in? */ 
126 tdb_off_t zone_of(struct tdb_context *tdb, tdb_off_t off)
127 {
128         assert(tdb->header_uptodate);
129
130         return off >> tdb->header.v.zone_bits;
131 }
132
133 /* Returns fl->max_bucket + 1, or list number to search. */
134 static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket)
135 {
136         tdb_off_t first, off;
137
138         /* Speculatively search for a non-zero bucket. */
139         first = tdb->last_zone * (tdb->header.v.free_buckets+1) + bucket;
140         off = tdb_find_nonzero_off(tdb, free_list_off(tdb, first),
141                                    tdb->header.v.free_buckets - bucket);
142         return bucket + off;
143 }
144
145 static int remove_from_list(struct tdb_context *tdb,
146                             tdb_off_t list, struct tdb_free_record *r)
147 {
148         tdb_off_t off;
149
150         /* Front of list? */
151         if (r->prev == 0) {
152                 off = free_list_off(tdb, list);
153         } else {
154                 off = r->prev + offsetof(struct tdb_free_record, next);
155         }
156         /* r->prev->next = r->next */
157         if (tdb_write_off(tdb, off, r->next)) {
158                 return -1;
159         }
160
161         if (r->next != 0) {
162                 off = r->next + offsetof(struct tdb_free_record, prev);
163                 /* r->next->prev = r->prev */
164                 if (tdb_write_off(tdb, off, r->prev)) {
165                         return -1;
166                 }
167         }
168         return 0;
169 }
170
171 /* Enqueue in this free list. */
172 static int enqueue_in_free(struct tdb_context *tdb,
173                            tdb_off_t list,
174                            tdb_off_t off,
175                            struct tdb_free_record *new)
176 {
177         new->prev = 0;
178         /* new->next = head. */
179         new->next = tdb_read_off(tdb, free_list_off(tdb, list));
180         if (new->next == TDB_OFF_ERR)
181                 return -1;
182
183         if (new->next) {
184                 /* next->prev = new. */
185                 if (tdb_write_off(tdb, new->next
186                                   + offsetof(struct tdb_free_record, prev),
187                                   off) != 0)
188                         return -1;
189         }
190         /* head = new */
191         if (tdb_write_off(tdb, free_list_off(tdb, list), off) != 0)
192                 return -1;
193         
194         return tdb_write_convert(tdb, off, new, sizeof(*new));
195 }
196
197 /* List isn't locked. */
198 int add_free_record(struct tdb_context *tdb,
199                     tdb_off_t off, tdb_len_t len_with_header)
200 {
201         struct tdb_free_record new;
202         tdb_off_t list;
203         int ret;
204
205         assert(len_with_header >= sizeof(new));
206
207         new.magic = TDB_FREE_MAGIC;
208         new.data_len = len_with_header - sizeof(struct tdb_used_record);
209
210         tdb->last_zone = zone_of(tdb, off);
211         list = tdb->last_zone * (tdb->header.v.free_buckets+1)
212                 + size_to_bucket(tdb, new.data_len);
213                 
214         if (tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) != 0)
215                 return -1;
216
217         ret = enqueue_in_free(tdb, list, off, &new);
218         tdb_unlock_free_list(tdb, list);
219         return ret;
220 }
221
222 /* If we have enough left over to be useful, split that off. */
223 static int to_used_record(struct tdb_context *tdb,
224                           tdb_off_t off,
225                           tdb_len_t needed,
226                           tdb_len_t total_len,
227                           tdb_len_t *actual)
228 {
229         struct tdb_used_record used;
230         tdb_len_t leftover;
231
232         leftover = total_len - needed;
233         if (leftover < sizeof(struct tdb_free_record))
234                 leftover = 0;
235
236         *actual = total_len - leftover;
237
238         if (leftover) {
239                 if (add_free_record(tdb, off + sizeof(used) + *actual,
240                                     total_len - needed))
241                         return -1;
242         }
243         return 0;
244 }
245
246 /* Note: we unlock the current list if we coalesce or fail. */
247 static int coalesce(struct tdb_context *tdb, tdb_off_t off,
248                     tdb_off_t list, tdb_len_t data_len)
249 {
250         struct tdb_free_record pad, *r;
251         tdb_off_t end = off + sizeof(struct tdb_used_record) + data_len;
252
253         while (!tdb->methods->oob(tdb, end + sizeof(*r), 1)) {
254                 tdb_off_t nlist;
255
256                 r = tdb_get(tdb, end, &pad, sizeof(pad));
257                 if (!r)
258                         goto err;
259
260                 if (r->magic != TDB_FREE_MAGIC)
261                         break;
262
263                 nlist = zone_of(tdb, end) * (tdb->header.v.free_buckets+1)
264                         + size_to_bucket(tdb, r->data_len);
265
266                 /* We may be violating lock order here, so best effort. */
267                 if (tdb_lock_free_list(tdb, nlist, TDB_LOCK_NOWAIT) == -1)
268                         break;
269
270                 /* Now we have lock, re-check. */
271                 r = tdb_get(tdb, end, &pad, sizeof(pad));
272                 if (!r) {
273                         tdb_unlock_free_list(tdb, nlist);
274                         goto err;
275                 }
276
277                 if (unlikely(r->magic != TDB_FREE_MAGIC)) {
278                         tdb_unlock_free_list(tdb, nlist);
279                         break;
280                 }
281
282                 if (remove_from_list(tdb, list, r) == -1) {
283                         tdb_unlock_free_list(tdb, nlist);
284                         goto err;
285                 }
286
287                 end += sizeof(struct tdb_used_record) + r->data_len;
288                 tdb_unlock_free_list(tdb, nlist);
289         }
290
291         /* Didn't find any adjacent free? */
292         if (end == off + sizeof(struct tdb_used_record) + data_len)
293                 return 0;
294
295         /* OK, expand record */
296         r = tdb_get(tdb, off, &pad, sizeof(pad));
297         if (!r)
298                 goto err;
299
300         if (remove_from_list(tdb, list, r) == -1)
301                 goto err;
302
303         /* We have to drop this to avoid deadlocks. */
304         tdb_unlock_free_list(tdb, list);
305
306         if (add_free_record(tdb, off, end - off) == -1)
307                 return -1;
308         return 1;
309
310 err:
311         /* To unify error paths, we *always* unlock list. */
312         tdb_unlock_free_list(tdb, list);
313         return -1;
314 }
315
316 /* We need size bytes to put our key and data in. */
317 static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
318                                 tdb_off_t bucket, size_t size,
319                                 tdb_len_t *actual)
320 {
321         tdb_off_t list;
322         tdb_off_t off, prev, best_off;
323         struct tdb_free_record pad, best = { 0 }, *r;
324         double multiplier;
325
326 again:
327         list = tdb->last_zone * (tdb->header.v.free_buckets+1) + bucket;
328
329         /* Lock this list. */
330         if (tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == -1) {
331                 return TDB_OFF_ERR;
332         }
333
334         prev = free_list_off(tdb, list);
335         off = tdb_read_off(tdb, prev);
336
337         if (unlikely(off == TDB_OFF_ERR))
338                 goto unlock_err;
339
340         best.data_len = -1ULL;
341         best_off = 0;
342         multiplier = 1.0;
343
344         /* Walk the list to see if any are large enough, getting less fussy
345          * as we go. */
346         while (off) {
347                 prev = off;
348                 off = tdb_read_off(tdb, prev);
349                 if (unlikely(off == TDB_OFF_ERR))
350                         goto unlock_err;
351
352                 r = tdb_get(tdb, off, &pad, sizeof(*r));
353                 if (!r)
354                         goto unlock_err;
355                 if (r->magic != TDB_FREE_MAGIC) {
356                         tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
357                                  "lock_and_alloc: %llu non-free 0x%llx\n",
358                                  (long long)off, (long long)r->magic);
359                         goto unlock_err;
360                 }
361
362                 if (r->data_len >= size && r->data_len < best.data_len) {
363                         best_off = off;
364                         best = *r;
365                 }
366
367                 if (best.data_len < size * multiplier && best_off) {
368                         /* We're happy with this size: take it. */
369                         if (remove_from_list(tdb, list, &best) != 0)
370                                 goto unlock_err;
371                         tdb_unlock_free_list(tdb, list);
372
373                         if (to_used_record(tdb, best_off, size, best.data_len,
374                                            actual)) {
375                                 return -1;
376                         }
377                         return best_off;
378                 }
379                 multiplier *= 1.01;
380
381                 /* Since we're going slow anyway, try coalescing here. */
382                 switch (coalesce(tdb, off, list, r->data_len)) {
383                 case -1:
384                         /* This has already unlocked on error. */
385                         return -1;
386                 case 1:
387                         /* This has unlocked list, restart. */
388                         goto again;
389                 }
390         }
391
392         tdb_unlock_free_list(tdb, list);
393         return 0;
394
395 unlock_err:
396         tdb_unlock_free_list(tdb, list);
397         return TDB_OFF_ERR;
398 }
399
400 /* We want a really big chunk.  Look through every zone's oversize bucket */
401 static tdb_off_t huge_alloc(struct tdb_context *tdb, size_t size,
402                             tdb_len_t *actual)
403 {
404         tdb_off_t i, off;
405
406         do {
407                 for (i = 0; i < tdb->header.v.num_zones; i++) {
408                         /* Try getting one from list. */
409                         off = lock_and_alloc(tdb, tdb->header.v.free_buckets,
410                                              size, actual);
411                         if (off == TDB_OFF_ERR)
412                                 return TDB_OFF_ERR;
413                         if (off != 0)
414                                 return off;
415                         /* FIXME: Coalesce! */
416                 }
417         } while (tdb_expand(tdb, 0, size, false) == 0);
418
419         return TDB_OFF_ERR;
420 }
421
422 static tdb_off_t get_free(struct tdb_context *tdb, size_t size,
423                           tdb_len_t *actual)
424 {
425         tdb_off_t off, bucket;
426         unsigned int num_empty, step = 0;
427
428         bucket = size_to_bucket(tdb, size);
429
430         /* If we're after something bigger than a single zone, handle
431          * specially. */
432         if (unlikely(sizeof(struct tdb_used_record) + size
433                      >= (1ULL << tdb->header.v.zone_bits))) {
434                 return huge_alloc(tdb, size, actual);
435         }
436
437         /* Number of zones we search is proportional to the log of them. */
438         for (num_empty = 0; num_empty < fls64(tdb->header.v.num_zones);
439              num_empty++) {
440                 tdb_off_t b;
441
442                 /* Start at exact size bucket, and search up... */
443                 for (b = bucket; b <= tdb->header.v.num_zones; b++) {
444                         b = find_free_head(tdb, b);
445
446                         /* Non-empty list?  Try getting block. */
447                         if (b <= tdb->header.v.num_zones) {
448                                 /* Try getting one from list. */
449                                 off = lock_and_alloc(tdb, b, size, actual);
450                                 if (off == TDB_OFF_ERR)
451                                         return TDB_OFF_ERR;
452                                 if (off != 0)
453                                         return off;
454                                 /* Didn't work.  Try next bucket. */
455                         }
456                 }
457
458                 /* Try another zone, at pseudo random.  Avoid duplicates by
459                    using an odd step. */
460                 if (step == 0)
461                         step = ((quick_random(tdb)) % 65536) * 2 + 1;
462                 tdb->last_zone = (tdb->last_zone + step)
463                         % tdb->header.v.num_zones;
464         }
465         return 0;
466 }
467
468 int set_header(struct tdb_context *tdb,
469                struct tdb_used_record *rec,
470                uint64_t keylen, uint64_t datalen,
471                uint64_t actuallen, uint64_t hash)
472 {
473         uint64_t keybits = (fls64(keylen) + 1) / 2;
474
475         /* Use top bits of hash, so it's independent of hash table size. */
476         rec->magic_and_meta
477                 = (actuallen - (keylen + datalen))
478                 | ((hash >> 53) << 32)
479                 | (keybits << 43)
480                 | (TDB_MAGIC << 48);
481         rec->key_and_data_len = (keylen | (datalen << (keybits*2)));
482
483         /* Encoding can fail on big values. */
484         if (rec_key_length(rec) != keylen
485             || rec_data_length(rec) != datalen
486             || rec_extra_padding(rec) != actuallen - (keylen + datalen)) {
487                 tdb->ecode = TDB_ERR_IO;
488                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
489                          "Could not encode k=%llu,d=%llu,a=%llu\n",
490                          (long long)keylen, (long long)datalen,
491                          (long long)actuallen);
492                 return -1;
493         }
494         return 0;
495 }
496
497 static tdb_len_t adjust_size(size_t keylen, size_t datalen, bool growing)
498 {
499         tdb_len_t size = keylen + datalen;
500
501         if (size < MIN_DATA_LEN)
502                 size = MIN_DATA_LEN;
503
504         /* Overallocate if this is coming from an enlarging store. */
505         if (growing)
506                 size += datalen / 2;
507
508         /* Round to next uint64_t boundary. */
509         return (size + (sizeof(uint64_t) - 1ULL)) & ~(sizeof(uint64_t) - 1ULL);
510 }
511
512 /* If this fails, try tdb_expand. */
513 tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
514                 uint64_t hash, bool growing)
515 {
516         tdb_off_t off;
517         tdb_len_t size, actual;
518         struct tdb_used_record rec;
519
520         /* We don't want header to change during this! */
521         assert(tdb->header_uptodate);
522
523         size = adjust_size(keylen, datalen, growing);
524
525         off = get_free(tdb, size, &actual);
526         if (unlikely(off == TDB_OFF_ERR || off == 0))
527                 return off;
528
529         /* Some supergiant values can't be encoded. */
530         if (set_header(tdb, &rec, keylen, datalen, actual, hash) != 0) {
531                 add_free_record(tdb, off, sizeof(rec) + actual);
532                 return TDB_OFF_ERR;
533         }
534
535         if (tdb_write_convert(tdb, off, &rec, sizeof(rec)) != 0)
536                 return TDB_OFF_ERR;
537         
538         return off;
539 }
540
541 static bool larger_buckets_might_help(struct tdb_context *tdb)
542 {
543         /* If our buckets are already covering 1/8 of a zone, don't
544          * bother (note: might become an 1/16 of a zone if we double
545          * zone size). */
546         tdb_len_t size = (1ULL << tdb->header.v.zone_bits) / 8;
547
548         if (size >= MIN_DATA_LEN
549             && size_to_bucket(tdb, size) < tdb->header.v.free_buckets) {
550                 return false;
551         }
552
553         /* FIXME: Put stats in tdb_context or examine db itself! */
554         /* It's fairly cheap to do as we expand database. */
555         return true;
556 }
557
558 static bool zones_happy(struct tdb_context *tdb)
559 {
560         /* FIXME: look at distribution of zones. */
561         return true;
562 }
563
564 /* Expand the database. */
565 int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
566                bool growing)
567 {
568         uint64_t new_num_buckets, new_num_zones, new_zone_bits;
569         uint64_t i, old_num_total, old_num_zones, old_size, old_zone_bits;
570         tdb_len_t add, freebucket_size, needed;
571         tdb_off_t off, old_free_off;
572         const tdb_off_t *oldf;
573         struct tdb_used_record fhdr;
574
575         /* We need room for the record header too. */
576         needed = sizeof(struct tdb_used_record)
577                 + adjust_size(klen, dlen, growing);
578
579         /* tdb_allrecord_lock will update header; did zones change? */
580         old_zone_bits = tdb->header.v.zone_bits;
581         old_num_zones = tdb->header.v.num_zones;
582
583         /* FIXME: this is overkill.  An expand lock? */
584         if (tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false) == -1)
585                 return -1;
586
587         /* Someone may have expanded for us. */
588         if (old_zone_bits != tdb->header.v.zone_bits
589             || old_num_zones != tdb->header.v.num_zones)
590                 goto success;
591
592         /* They may have also expanded the underlying size (otherwise we'd
593          * have expanded our mmap to look at those offsets already). */
594         old_size = tdb->map_size;
595         tdb->methods->oob(tdb, tdb->map_size + 1, true);
596         if (tdb->map_size != old_size)
597                 goto success;
598
599         /* Did we enlarge zones without enlarging file? */
600         if (tdb->map_size < tdb->header.v.num_zones<<tdb->header.v.zone_bits) {
601                 add = (tdb->header.v.num_zones<<tdb->header.v.zone_bits)
602                         - tdb->map_size;
603                 /* Updates tdb->map_size. */
604                 if (tdb->methods->expand_file(tdb, add) == -1)
605                         goto fail;
606                 if (add_free_record(tdb, tdb->map_size - add, add) == -1)
607                         goto fail;
608                 if (add >= needed) {
609                         /* Allocate from this zone. */
610                         tdb->last_zone = zone_of(tdb, tdb->map_size - add);
611                         goto success;
612                 }
613         }
614
615         /* Slow path.  Should we increase the number of buckets? */
616         new_num_buckets = tdb->header.v.free_buckets;
617         if (larger_buckets_might_help(tdb))
618                 new_num_buckets++;
619
620         /* Now we'll need room for the new free buckets, too.  Assume
621          * worst case (zones expand). */
622         needed += sizeof(fhdr)
623                 + ((tdb->header.v.num_zones+1)
624                    * (new_num_buckets+1) * sizeof(tdb_off_t));
625
626         /* If we need less that one zone, and they're working well, just add
627          * another one. */
628         if (needed < (1UL<<tdb->header.v.zone_bits) && zones_happy(tdb)) {
629                 new_num_zones = tdb->header.v.num_zones+1;
630                 new_zone_bits = tdb->header.v.zone_bits;
631                 add = 1ULL << tdb->header.v.zone_bits;
632         } else {
633                 /* Increase the zone size. */
634                 new_num_zones = tdb->header.v.num_zones;
635                 new_zone_bits = tdb->header.v.zone_bits+1;
636                 while ((new_num_zones << new_zone_bits) - tdb->map_size
637                        < needed) {
638                         new_zone_bits++;
639                 }
640
641                 /* We expand by enough zones to meet the need. */
642                 add = (needed + (1ULL << new_zone_bits)-1)
643                         & ~((1ULL << new_zone_bits)-1);
644         }
645
646         /* Updates tdb->map_size. */
647         if (tdb->methods->expand_file(tdb, add) == -1)
648                 goto fail;
649
650         /* Use first part as new free bucket array. */
651         off = tdb->map_size - add;
652         freebucket_size = new_num_zones
653                 * (new_num_buckets + 1) * sizeof(tdb_off_t);
654
655         /* Write header. */
656         if (set_header(tdb, &fhdr, 0, freebucket_size, freebucket_size, 0))
657                 goto fail;
658         if (tdb_write_convert(tdb, off, &fhdr, sizeof(fhdr)) == -1)
659                 goto fail;
660
661         /* Adjust off to point to start of buckets, add to be remainder. */
662         add -= freebucket_size + sizeof(fhdr);
663         off += sizeof(fhdr);
664
665         /* Access the old zones. */
666         old_num_total = tdb->header.v.num_zones*(tdb->header.v.free_buckets+1);
667         old_free_off = tdb->header.v.free_off;
668         oldf = tdb_access_read(tdb, old_free_off,
669                                old_num_total * sizeof(tdb_off_t));
670         if (!oldf)
671                 goto fail;
672
673         /* Switch to using our new zone. */
674         if (zero_out(tdb, off, new_num_zones * (new_num_buckets + 1)) == -1)
675                 goto fail_release;
676         tdb->header.v.free_off = off;
677         tdb->header.v.num_zones = new_num_zones;
678         tdb->header.v.free_buckets = new_num_buckets;
679
680         /* FIXME: If zone size hasn't changed, can simply copy pointers. */
681         /* FIXME: Coalesce? */
682         for (i = 0; i < old_num_total; i++) {
683                 tdb_off_t next;
684                 struct tdb_free_record rec;
685                 tdb_off_t list;
686
687                 for (off = oldf[i]; off; off = next) {
688                         if (tdb_read_convert(tdb, off, &rec, sizeof(rec)))
689                                 goto fail_release;
690
691                         list = zone_of(tdb, off)
692                                 * (tdb->header.v.free_buckets+1)
693                                 + size_to_bucket(tdb, rec.data_len);
694                         next = rec.next;
695                 
696                         if (enqueue_in_free(tdb, list, off, &rec) == -1)
697                                 goto fail_release;
698                 }
699         }
700
701
702         /* Free up the old free buckets. */
703         old_free_off -= sizeof(fhdr);
704         if (tdb_read_convert(tdb, old_free_off, &fhdr, sizeof(fhdr)) == -1)
705                 goto fail_release;
706         if (add_free_record(tdb, old_free_off,
707                             rec_data_length(&fhdr)+rec_extra_padding(&fhdr)))
708                 goto fail_release;
709
710         /* Add the rest as a new free record. */
711         if (add_free_record(tdb, tdb->map_size - add, add) == -1)
712                 goto fail_release;
713
714         /* Start allocating from where the new space is. */
715         tdb->last_zone = zone_of(tdb, tdb->map_size - add);
716         tdb_access_release(tdb, oldf);
717 success:
718         tdb_allrecord_unlock(tdb, F_WRLCK);
719         return 0;
720
721 fail_release:
722         tdb_access_release(tdb, oldf);
723 fail:
724         tdb_allrecord_unlock(tdb, F_WRLCK);
725         return -1;
726 }