]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/free.c
tdb2: initial commit (doesn't work, still writing tests)
[ccan] / ccan / tdb2 / free.c
1  /* 
2    Trivial Database 2: free list/block handling
3    Copyright (C) Rusty Russell 2010
4    
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 3 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 */
18 #include "private.h"
19 #include <ccan/likely/likely.h>
20 #include <time.h>
21 #include <assert.h>
22 #include <limits.h>
23
24 /* We have to be able to fit a free record here. */
25 #define MIN_DATA_LEN    \
26         (sizeof(struct tdb_free_record) - sizeof(struct tdb_used_record))
27
28 /* We have a series of free lists, each one covering a "zone" of the file.
29  *
30  * For each zone we have a series of per-size buckets, and a final bucket for
31  * "too big".
32  *
33  * It's possible to move the free_list_head, but *only* under the allrecord
34  * lock. */
35 static tdb_off_t free_list_off(struct tdb_context *tdb, unsigned int list)
36 {
37         return tdb->header.v.free_off + list * sizeof(tdb_off_t);
38 }
39
40 /* We're a library: playing with srandom() is unfriendly.  srandom_r
41  * probably lacks portability.  We don't need very random here. */
42 static unsigned int quick_random(struct tdb_context *tdb)
43 {
44         return getpid() + time(NULL) + (unsigned long)tdb;
45 }
46
47 /* Start by using a random zone to spread the load. */
48 uint64_t random_free_zone(struct tdb_context *tdb)
49 {
50         /* num_zones might be out of date, but can only increase */
51         return quick_random(tdb) % tdb->header.v.num_zones;
52 }
53
54 static unsigned fls64(uint64_t val)
55 {
56 #if HAVE_BUILTIN_CLZL
57         if (val <= ULONG_MAX) {
58                 /* This is significantly faster! */
59                 return val ? sizeof(long) * CHAR_BIT - __builtin_clzl(val) : 0;
60         } else {
61 #endif
62         uint64_t r = 64;
63
64         if (!val)
65                 return 0;
66         if (!(val & 0xffffffff00000000ull)) {
67                 val <<= 32;
68                 r -= 32;
69         }
70         if (!(val & 0xffff000000000000ull)) {
71                 val <<= 16;
72                 r -= 16;
73         }
74         if (!(val & 0xff00000000000000ull)) {
75                 val <<= 8;
76                 r -= 8;
77         }
78         if (!(val & 0xf000000000000000ull)) {
79                 val <<= 4;
80                 r -= 4;
81         }
82         if (!(val & 0xc000000000000000ull)) {
83                 val <<= 2;
84                 r -= 2;
85         }
86         if (!(val & 0x8000000000000000ull)) {
87                 val <<= 1;
88                 r -= 1;
89         }
90         return r;
91 #if HAVE_BUILTIN_CLZL
92         }
93 #endif
94 }
95
96 /* In which bucket would we find a particular record size? (ignoring header) */
97 unsigned int size_to_bucket(struct tdb_context *tdb, tdb_len_t data_len)
98 {
99         unsigned int bucket;
100
101         /* We can't have records smaller than this. */
102         assert(data_len >= MIN_DATA_LEN);
103
104         /* Ignoring the header... */
105         if (data_len - MIN_DATA_LEN <= 64) {
106                 /* 0 in bucket 0, 8 in bucket 1... 64 in bucket 6. */
107                 bucket = (data_len - MIN_DATA_LEN) / 8;
108         } else {
109                 /* After that we go power of 2. */
110                 bucket = fls64(data_len - MIN_DATA_LEN) + 2;
111         }
112
113         if (unlikely(bucket > tdb->header.v.free_buckets))
114                 bucket = tdb->header.v.free_buckets;
115         return bucket;
116 }
117
118 /* What zone does a block belong in? */ 
119 tdb_off_t zone_of(struct tdb_context *tdb, tdb_off_t off)
120 {
121         assert(tdb->header_uptodate);
122
123         return off >> tdb->header.v.zone_bits;
124 }
125
126 /* Returns fl->max_bucket + 1, or list number to search. */
127 static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket)
128 {
129         tdb_off_t first, off;
130
131         /* Speculatively search for a non-zero bucket. */
132         first = tdb->last_zone * (tdb->header.v.free_buckets+1) + bucket;
133         off = tdb_find_nonzero_off(tdb, free_list_off(tdb, first),
134                                    tdb->header.v.free_buckets - bucket);
135         return bucket + off;
136 }
137
138 static int remove_from_list(struct tdb_context *tdb,
139                             tdb_off_t list, struct tdb_free_record *r)
140 {
141         tdb_off_t off;
142
143         /* Front of list? */
144         if (r->prev == 0) {
145                 off = free_list_off(tdb, list);
146         } else {
147                 off = r->prev + offsetof(struct tdb_free_record, next);
148         }
149         /* r->prev->next = r->next */
150         if (tdb_write_off(tdb, off, r->next)) {
151                 return -1;
152         }
153
154         if (r->next != 0) {
155                 off = r->next + offsetof(struct tdb_free_record, prev);
156                 /* r->next->prev = r->prev */
157                 if (tdb_write_off(tdb, off, r->prev)) {
158                         return -1;
159                 }
160         }
161         return 0;
162 }
163
164 /* Enqueue in this free list. */
165 static int enqueue_in_free(struct tdb_context *tdb,
166                            tdb_off_t list,
167                            tdb_off_t off,
168                            struct tdb_free_record *new)
169 {
170         new->prev = 0;
171         /* new->next = head. */
172         new->next = tdb_read_off(tdb, free_list_off(tdb, list));
173         if (new->next == TDB_OFF_ERR)
174                 return -1;
175
176         if (new->next) {
177                 /* next->prev = new. */
178                 if (tdb_write_off(tdb, new->next
179                                   + offsetof(struct tdb_free_record, prev),
180                                   off) != 0)
181                         return -1;
182         }
183         /* head = new */
184         if (tdb_write_off(tdb, free_list_off(tdb, list), off) != 0)
185                 return -1;
186         
187         return tdb_write_convert(tdb, off, new, sizeof(*new));
188 }
189
190 /* List isn't locked. */
191 int add_free_record(struct tdb_context *tdb,
192                     tdb_off_t off, tdb_len_t len_with_header)
193 {
194         struct tdb_free_record new;
195         tdb_off_t list;
196         int ret;
197
198         assert(len_with_header >= sizeof(new));
199
200         new.magic = TDB_FREE_MAGIC;
201         new.data_len = len_with_header - sizeof(struct tdb_used_record);
202
203         tdb->last_zone = zone_of(tdb, off);
204         list = tdb->last_zone * (tdb->header.v.free_buckets+1)
205                 + size_to_bucket(tdb, new.data_len);
206                 
207         if (tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) != 0)
208                 return -1;
209
210         ret = enqueue_in_free(tdb, list, off, &new);
211         tdb_unlock_free_list(tdb, list);
212         return ret;
213 }
214
215 /* If we have enough left over to be useful, split that off. */
216 static int to_used_record(struct tdb_context *tdb,
217                           tdb_off_t off,
218                           tdb_len_t needed,
219                           tdb_len_t total_len,
220                           tdb_len_t *actual)
221 {
222         struct tdb_used_record used;
223         tdb_len_t leftover;
224
225         leftover = total_len - needed;
226         if (leftover < sizeof(struct tdb_free_record))
227                 leftover = 0;
228
229         *actual = total_len - leftover;
230
231         if (leftover) {
232                 if (add_free_record(tdb, off + sizeof(used) + *actual,
233                                     total_len - needed))
234                         return -1;
235         }
236         return 0;
237 }
238
239 /* Note: we unlock the current list if we coalesce or fail. */
240 static int coalesce(struct tdb_context *tdb, tdb_off_t off,
241                     tdb_off_t list, tdb_len_t data_len)
242 {
243         struct tdb_free_record pad, *r;
244         tdb_off_t end = off + sizeof(struct tdb_used_record) + data_len;
245
246         while (!tdb->methods->oob(tdb, end + sizeof(*r), 1)) {
247                 tdb_off_t nlist;
248
249                 r = tdb_get(tdb, end, &pad, sizeof(pad));
250                 if (!r)
251                         goto err;
252
253                 if (r->magic != TDB_FREE_MAGIC)
254                         break;
255
256                 nlist = zone_of(tdb, end) * (tdb->header.v.free_buckets+1)
257                         + size_to_bucket(tdb, r->data_len);
258
259                 /* We may be violating lock order here, so best effort. */
260                 if (tdb_lock_free_list(tdb, nlist, TDB_LOCK_NOWAIT) == -1)
261                         break;
262
263                 /* Now we have lock, re-check. */
264                 r = tdb_get(tdb, end, &pad, sizeof(pad));
265                 if (!r) {
266                         tdb_unlock_free_list(tdb, nlist);
267                         goto err;
268                 }
269
270                 if (unlikely(r->magic != TDB_FREE_MAGIC)) {
271                         tdb_unlock_free_list(tdb, nlist);
272                         break;
273                 }
274
275                 if (remove_from_list(tdb, list, r) == -1) {
276                         tdb_unlock_free_list(tdb, nlist);
277                         goto err;
278                 }
279
280                 end += sizeof(struct tdb_used_record) + r->data_len;
281                 tdb_unlock_free_list(tdb, nlist);
282         }
283
284         /* Didn't find any adjacent free? */
285         if (end == off + sizeof(struct tdb_used_record) + data_len)
286                 return 0;
287
288         /* OK, expand record */
289         r = tdb_get(tdb, off, &pad, sizeof(pad));
290         if (!r)
291                 goto err;
292
293         if (remove_from_list(tdb, list, r) == -1)
294                 goto err;
295
296         /* We have to drop this to avoid deadlocks. */
297         tdb_unlock_free_list(tdb, list);
298
299         if (add_free_record(tdb, off, end - off) == -1)
300                 return -1;
301         return 1;
302
303 err:
304         /* To unify error paths, we *always* unlock list. */
305         tdb_unlock_free_list(tdb, list);
306         return -1;
307 }
308
309 /* We need size bytes to put our key and data in. */
310 static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
311                                 tdb_off_t bucket, size_t size,
312                                 tdb_len_t *actual)
313 {
314         tdb_off_t list;
315         tdb_off_t off, prev, best_off;
316         struct tdb_free_record pad, best = { 0 }, *r;
317         double multiplier;
318
319 again:
320         list = tdb->last_zone * (tdb->header.v.free_buckets+1) + bucket;
321
322         /* Lock this list. */
323         if (tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == -1) {
324                 return TDB_OFF_ERR;
325         }
326
327         prev = free_list_off(tdb, list);
328         off = tdb_read_off(tdb, prev);
329
330         if (unlikely(off == TDB_OFF_ERR))
331                 goto unlock_err;
332
333         best.data_len = -1ULL;
334         best_off = 0;
335         multiplier = 1.0;
336
337         /* Walk the list to see if any are large enough, getting less fussy
338          * as we go. */
339         while (off) {
340                 prev = off;
341                 off = tdb_read_off(tdb, prev);
342                 if (unlikely(off == TDB_OFF_ERR))
343                         goto unlock_err;
344
345                 r = tdb_get(tdb, off, &pad, sizeof(*r));
346                 if (!r)
347                         goto unlock_err;
348                 if (r->magic != TDB_FREE_MAGIC) {
349                         tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
350                                  "lock_and_alloc: %llu non-free 0x%llx\n",
351                                  (long long)off, (long long)r->magic);
352                         goto unlock_err;
353                 }
354
355                 if (r->data_len >= size && r->data_len < best.data_len) {
356                         best_off = off;
357                         best = *r;
358                 }
359
360                 if (best.data_len < size * multiplier && best_off) {
361                         /* We're happy with this size: take it. */
362                         if (remove_from_list(tdb, list, &best) != 0)
363                                 goto unlock_err;
364                         tdb_unlock_free_list(tdb, list);
365
366                         if (to_used_record(tdb, best_off, size, best.data_len,
367                                            actual)) {
368                                 return -1;
369                         }
370                         return best_off;
371                 }
372                 multiplier *= 1.01;
373
374                 /* Since we're going slow anyway, try coalescing here. */
375                 switch (coalesce(tdb, off, list, r->data_len)) {
376                 case -1:
377                         /* This has already unlocked on error. */
378                         return -1;
379                 case 1:
380                         /* This has unlocked list, restart. */
381                         goto again;
382                 }
383         }
384
385         tdb_unlock_free_list(tdb, list);
386         return 0;
387
388 unlock_err:
389         tdb_unlock_free_list(tdb, list);
390         return TDB_OFF_ERR;
391 }
392
393 /* We want a really big chunk.  Look through every zone's oversize bucket */
394 static tdb_off_t huge_alloc(struct tdb_context *tdb, size_t size,
395                             tdb_len_t *actual)
396 {
397         tdb_off_t i, off;
398
399         do {
400                 for (i = 0; i < tdb->header.v.num_zones; i++) {
401                         /* Try getting one from list. */
402                         off = lock_and_alloc(tdb, tdb->header.v.free_buckets,
403                                              size, actual);
404                         if (off == TDB_OFF_ERR)
405                                 return TDB_OFF_ERR;
406                         if (off != 0)
407                                 return off;
408                         /* FIXME: Coalesce! */
409                 }
410         } while (tdb_expand(tdb, 0, size, false) == 0);
411
412         return TDB_OFF_ERR;
413 }
414
415 static tdb_off_t get_free(struct tdb_context *tdb, size_t size,
416                           tdb_len_t *actual)
417 {
418         tdb_off_t off, bucket;
419         unsigned int num_empty, step = 0;
420
421         bucket = size_to_bucket(tdb, size);
422
423         /* If we're after something bigger than a single zone, handle
424          * specially. */
425         if (unlikely(sizeof(struct tdb_used_record) + size
426                      >= (1ULL << tdb->header.v.zone_bits))) {
427                 return huge_alloc(tdb, size, actual);
428         }
429
430         /* Number of zones we search is proportional to the log of them. */
431         for (num_empty = 0; num_empty < fls64(tdb->header.v.num_zones);
432              num_empty++) {
433                 tdb_off_t b;
434
435                 /* Start at exact size bucket, and search up... */
436                 for (b = bucket; b <= tdb->header.v.num_zones; b++) {
437                         b = find_free_head(tdb, b);
438
439                         /* Non-empty list?  Try getting block. */
440                         if (b <= tdb->header.v.num_zones) {
441                                 /* Try getting one from list. */
442                                 off = lock_and_alloc(tdb, b, size, actual);
443                                 if (off == TDB_OFF_ERR)
444                                         return TDB_OFF_ERR;
445                                 if (off != 0)
446                                         return off;
447                                 /* Didn't work.  Try next bucket. */
448                         }
449                 }
450
451                 /* Try another zone, at pseudo random.  Avoid duplicates by
452                    using an odd step. */
453                 if (step == 0)
454                         step = ((quick_random(tdb)) % 65536) * 2 + 1;
455                 tdb->last_zone = (tdb->last_zone + step)
456                         % tdb->header.v.num_zones;
457         }
458         return 0;
459 }
460
461 int set_header(struct tdb_context *tdb,
462                struct tdb_used_record *rec,
463                uint64_t keylen, uint64_t datalen,
464                uint64_t actuallen, uint64_t hash)
465 {
466         uint64_t keybits = (fls64(keylen) + 1) / 2;
467
468         /* Use top bits of hash, so it's independent of hash table size. */
469         rec->magic_and_meta
470                 = (actuallen - (keylen + datalen))
471                 | ((hash >> 53) << 32)
472                 | (keybits << 43)
473                 | (TDB_MAGIC << 48);
474         rec->key_and_data_len = (keylen | (datalen << (keybits*2)));
475
476         /* Encoding can fail on big values. */
477         if (rec_key_length(rec) != keylen
478             || rec_data_length(rec) != datalen
479             || rec_extra_padding(rec) != actuallen - (keylen + datalen)) {
480                 tdb->ecode = TDB_ERR_IO;
481                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
482                          "Could not encode k=%llu,d=%llu,a=%llu\n",
483                          (long long)keylen, (long long)datalen,
484                          (long long)actuallen);
485                 return -1;
486         }
487         return 0;
488 }
489
490 static tdb_len_t adjust_size(size_t keylen, size_t datalen, bool growing)
491 {
492         tdb_len_t size = keylen + datalen;
493
494         if (size < MIN_DATA_LEN)
495                 size = MIN_DATA_LEN;
496
497         /* Overallocate if this is coming from an enlarging store. */
498         if (growing)
499                 size += datalen / 2;
500
501         /* Round to next uint64_t boundary. */
502         return (size + (sizeof(uint64_t) - 1ULL)) & ~(sizeof(uint64_t) - 1ULL);
503 }
504
505 /* If this fails, try tdb_expand. */
506 tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
507                 uint64_t hash, bool growing)
508 {
509         tdb_off_t off;
510         tdb_len_t size, actual;
511         struct tdb_used_record rec;
512
513         /* We don't want header to change during this! */
514         assert(tdb->header_uptodate);
515
516         size = adjust_size(keylen, datalen, growing);
517
518         off = get_free(tdb, size, &actual);
519         if (unlikely(off == TDB_OFF_ERR || off == 0))
520                 return off;
521
522         /* Some supergiant values can't be encoded. */
523         if (set_header(tdb, &rec, keylen, datalen, actual, hash) != 0) {
524                 add_free_record(tdb, off, sizeof(rec) + actual);
525                 return TDB_OFF_ERR;
526         }
527
528         if (tdb_write_convert(tdb, off, &rec, sizeof(rec)) != 0)
529                 return TDB_OFF_ERR;
530         
531         return off;
532 }
533
534 static bool larger_buckets_might_help(struct tdb_context *tdb)
535 {
536         /* If our buckets are already covering 1/8 of a zone, don't
537          * bother (note: might become an 1/16 of a zone if we double
538          * zone size). */
539         tdb_len_t size = (1ULL << tdb->header.v.zone_bits) / 8;
540
541         if (size >= MIN_DATA_LEN
542             && size_to_bucket(tdb, size) < tdb->header.v.free_buckets) {
543                 return false;
544         }
545
546         /* FIXME: Put stats in tdb_context or examine db itself! */
547         /* It's fairly cheap to do as we expand database. */
548         return true;
549 }
550
551 static bool zones_happy(struct tdb_context *tdb)
552 {
553         /* FIXME: look at distribution of zones. */
554         return true;
555 }
556
557 /* Expand the database. */
558 int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
559                bool growing)
560 {
561         uint64_t new_num_buckets, new_num_zones, new_zone_bits;
562         uint64_t old_num_total, i;
563         tdb_len_t add, freebucket_size, needed;
564         tdb_off_t off, old_free_off;
565         const tdb_off_t *oldf;
566         struct tdb_used_record fhdr;
567         
568         /* We need room for the record header too. */
569         needed = sizeof(struct tdb_used_record)
570                 + adjust_size(klen, dlen, growing);
571
572         /* FIXME: this is overkill.  An expand lock? */
573         if (tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false) == -1)
574                 return -1;
575
576         /* Someone may have expanded for us. */
577         if (update_header(tdb))
578                 goto success;
579
580         /* Make sure we have the latest size. */
581         tdb->methods->oob(tdb, tdb->map_size + 1, true);
582
583         /* Did we enlarge zones without enlarging file? */
584         if (tdb->map_size < tdb->header.v.num_zones<<tdb->header.v.zone_bits) {
585                 add = (tdb->header.v.num_zones<<tdb->header.v.zone_bits)
586                         - tdb->map_size;
587                 /* Updates tdb->map_size. */
588                 if (tdb->methods->expand_file(tdb, tdb->map_size, add) == -1)
589                         goto fail;
590                 if (add_free_record(tdb, tdb->map_size - add, add) == -1)
591                         goto fail;
592                 if (add >= needed) {
593                         /* Allocate from this zone. */
594                         tdb->last_zone = zone_of(tdb, tdb->map_size - add);
595                         goto success;
596                 }
597         }
598
599         /* Slow path.  Should we increase the number of buckets? */
600         new_num_buckets = tdb->header.v.free_buckets;
601         if (larger_buckets_might_help(tdb))
602                 new_num_buckets++;
603
604         /* Now we'll need room for the new free buckets, too.  Assume
605          * worst case (zones expand). */
606         needed += sizeof(fhdr)
607                 + ((tdb->header.v.num_zones+1)
608                    * (new_num_buckets+1) * sizeof(tdb_off_t));
609
610         /* If we need less that one zone, and they're working well, just add
611          * another one. */
612         if (needed < (1UL<<tdb->header.v.zone_bits) && zones_happy(tdb)) {
613                 new_num_zones = tdb->header.v.num_zones+1;
614                 new_zone_bits = tdb->header.v.zone_bits;
615                 add = 1ULL << tdb->header.v.zone_bits;
616         } else {
617                 /* Increase the zone size. */
618                 new_num_zones = tdb->header.v.num_zones;
619                 new_zone_bits = tdb->header.v.zone_bits+1;
620                 while ((new_num_zones << new_zone_bits) - tdb->map_size
621                        < needed) {
622                         new_zone_bits++;
623                 }
624
625                 /* We expand by enough zones to meet the need. */
626                 add = (needed + (1ULL << new_zone_bits)-1)
627                         & ~((1ULL << new_zone_bits)-1);
628         }
629
630         /* Updates tdb->map_size. */
631         if (tdb->methods->expand_file(tdb, tdb->map_size, add) == -1)
632                 goto fail;
633
634         /* Use first part as new free bucket array. */
635         off = tdb->map_size - add;
636         freebucket_size = new_num_zones
637                 * (new_num_buckets + 1) * sizeof(tdb_off_t);
638
639         /* Write header. */
640         if (set_header(tdb, &fhdr, 0, freebucket_size, freebucket_size, 0))
641                 goto fail;
642         if (tdb_write_convert(tdb, off, &fhdr, sizeof(fhdr)) == -1)
643                 goto fail;
644
645         /* Adjust off to point to start of buckets, add to be remainder. */
646         add -= freebucket_size + sizeof(fhdr);
647         off += sizeof(fhdr);
648
649         /* Access the old zones. */
650         old_num_total = tdb->header.v.num_zones*(tdb->header.v.free_buckets+1);
651         old_free_off = tdb->header.v.free_off;
652         oldf = tdb_access_read(tdb, old_free_off,
653                                old_num_total * sizeof(tdb_off_t));
654         if (!oldf)
655                 goto fail;
656
657         /* Switch to using our new zone. */
658         if (zero_out(tdb, off, new_num_zones * (new_num_buckets + 1)) == -1)
659                 goto fail_release;
660         tdb->header.v.free_off = off;
661         tdb->header.v.num_zones = new_num_zones;
662         tdb->header.v.free_buckets = new_num_buckets;
663
664         /* FIXME: If zone size hasn't changed, can simply copy pointers. */
665         /* FIXME: Coalesce? */
666         for (i = 0; i < old_num_total; i++) {
667                 tdb_off_t next;
668                 struct tdb_free_record rec;
669                 tdb_off_t list;
670
671                 for (off = oldf[i]; off; off = next) {
672                         if (tdb_read_convert(tdb, off, &rec, sizeof(rec)))
673                                 goto fail_release;
674
675                         list = zone_of(tdb, off)
676                                 * (tdb->header.v.free_buckets+1)
677                                 + size_to_bucket(tdb, rec.data_len);
678                         next = rec.next;
679                 
680                         if (enqueue_in_free(tdb, list, off, &rec) == -1)
681                                 goto fail_release;
682                 }
683         }
684
685
686         /* Free up the old free buckets. */
687         old_free_off -= sizeof(fhdr);
688         if (tdb_read_convert(tdb, old_free_off, &fhdr, sizeof(fhdr)) == -1)
689                 goto fail_release;
690         if (add_free_record(tdb, old_free_off,
691                             rec_data_length(&fhdr)+rec_extra_padding(&fhdr)))
692                 goto fail_release;
693
694         /* Add the rest as a new free record. */
695         if (add_free_record(tdb, tdb->map_size - add, add) == -1)
696                 goto fail_release;
697
698         /* Start allocating from where the new space is. */
699         tdb->last_zone = zone_of(tdb, tdb->map_size - add);
700         tdb_access_release(tdb, oldf);
701 success:
702         tdb_allrecord_unlock(tdb, F_WRLCK);
703         return 0;
704
705 fail_release:
706         tdb_access_release(tdb, oldf);
707 fail:
708         tdb_allrecord_unlock(tdb, F_WRLCK);
709         return -1;
710 }