]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/tdb1_tdb.c
2c989bb9a4fb16a3a1db501fd76c87d4f43a8461
[ccan] / ccan / tdb2 / tdb1_tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb1_private.h"
29 #include <assert.h>
30
31 TDB_DATA tdb1_null;
32
33 /*
34   non-blocking increment of the tdb sequence number if the tdb has been opened using
35   the TDB_SEQNUM flag
36 */
37 void tdb1_increment_seqnum_nonblock(struct tdb_context *tdb)
38 {
39         tdb1_off_t seqnum=0;
40
41         if (!(tdb->flags & TDB_SEQNUM)) {
42                 return;
43         }
44
45         /* we ignore errors from this, as we have no sane way of
46            dealing with them.
47         */
48         tdb1_ofs_read(tdb, TDB1_SEQNUM_OFS, &seqnum);
49         seqnum++;
50         tdb1_ofs_write(tdb, TDB1_SEQNUM_OFS, &seqnum);
51 }
52
53 /*
54   increment the tdb sequence number if the tdb has been opened using
55   the TDB_SEQNUM flag
56 */
57 static void tdb1_increment_seqnum(struct tdb_context *tdb)
58 {
59         if (!(tdb->flags & TDB_SEQNUM)) {
60                 return;
61         }
62
63         if (tdb1_nest_lock(tdb, TDB1_SEQNUM_OFS, F_WRLCK,
64                            TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
65                 return;
66         }
67
68         tdb1_increment_seqnum_nonblock(tdb);
69
70         tdb1_nest_unlock(tdb, TDB1_SEQNUM_OFS, F_WRLCK);
71 }
72
73 static int tdb1_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
74 {
75         return memcmp(data.dptr, key.dptr, data.dsize);
76 }
77
78 /* Returns 0 on fail.  On success, return offset of record, and fills
79    in rec */
80 static tdb1_off_t tdb1_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
81                         struct tdb1_record *r)
82 {
83         tdb1_off_t rec_ptr;
84
85         /* read in the hash top */
86         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
87                 return 0;
88
89         /* keep looking until we find the right record */
90         while (rec_ptr) {
91                 if (tdb1_rec_read(tdb, rec_ptr, r) == -1)
92                         return 0;
93
94                 if (!TDB1_DEAD(r) && hash==r->full_hash
95                     && key.dsize==r->key_len
96                     && tdb1_parse_data(tdb, key, rec_ptr + sizeof(*r),
97                                       r->key_len, tdb1_key_compare,
98                                       NULL) == 0) {
99                         return rec_ptr;
100                 }
101                 /* detect tight infinite loop */
102                 if (rec_ptr == r->next) {
103                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT,
104                                                 TDB_LOG_ERROR,
105                                                 "tdb1_find: loop detected.");
106                         return 0;
107                 }
108                 rec_ptr = r->next;
109         }
110         tdb->last_error = TDB_ERR_NOEXIST;
111         return 0;
112 }
113
114 /* As tdb1_find, but if you succeed, keep the lock */
115 tdb1_off_t tdb1_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
116                            struct tdb1_record *rec)
117 {
118         uint32_t rec_ptr;
119
120         if (tdb1_lock(tdb, TDB1_BUCKET(hash), locktype) == -1)
121                 return 0;
122         if (!(rec_ptr = tdb1_find(tdb, key, hash, rec)))
123                 tdb1_unlock(tdb, TDB1_BUCKET(hash), locktype);
124         return rec_ptr;
125 }
126
127 static TDB_DATA _tdb1_fetch(struct tdb_context *tdb, TDB_DATA key);
128
129 /* update an entry in place - this only works if the new data size
130    is <= the old data size and the key exists.
131    on failure return -1.
132 */
133 static int tdb1_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
134 {
135         struct tdb1_record rec;
136         tdb1_off_t rec_ptr;
137
138         /* find entry */
139         if (!(rec_ptr = tdb1_find(tdb, key, hash, &rec)))
140                 return -1;
141
142         /* it could be an exact duplicate of what is there - this is
143          * surprisingly common (eg. with a ldb re-index). */
144         if (rec.key_len == key.dsize &&
145             rec.data_len == dbuf.dsize &&
146             rec.full_hash == hash) {
147                 TDB_DATA data = _tdb1_fetch(tdb, key);
148                 if (data.dsize == dbuf.dsize &&
149                     memcmp(data.dptr, dbuf.dptr, data.dsize) == 0) {
150                         if (data.dptr) {
151                                 free(data.dptr);
152                         }
153                         return 0;
154                 }
155                 if (data.dptr) {
156                         free(data.dptr);
157                 }
158         }
159
160         /* must be long enough key, data and tailer */
161         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb1_off_t)) {
162                 tdb->last_error = TDB_SUCCESS; /* Not really an error */
163                 return -1;
164         }
165
166         if (tdb->tdb1.io->tdb1_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
167                       dbuf.dptr, dbuf.dsize) == -1)
168                 return -1;
169
170         if (dbuf.dsize != rec.data_len) {
171                 /* update size */
172                 rec.data_len = dbuf.dsize;
173                 return tdb1_rec_write(tdb, rec_ptr, &rec);
174         }
175
176         return 0;
177 }
178
179 /* find an entry in the database given a key */
180 /* If an entry doesn't exist tdb1_err will be set to
181  * TDB_ERR_NOEXIST. If a key has no data attached
182  * then the TDB_DATA will have zero length but
183  * a non-zero pointer
184  */
185 static TDB_DATA _tdb1_fetch(struct tdb_context *tdb, TDB_DATA key)
186 {
187         tdb1_off_t rec_ptr;
188         struct tdb1_record rec;
189         TDB_DATA ret;
190         uint32_t hash;
191
192         /* find which hash bucket it is in */
193         hash = tdb_hash(tdb, key.dptr, key.dsize);
194         if (!(rec_ptr = tdb1_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
195                 return tdb1_null;
196
197         ret.dptr = tdb1_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
198                                   rec.data_len);
199         ret.dsize = rec.data_len;
200         tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_RDLCK);
201         return ret;
202 }
203
204 enum TDB_ERROR tdb1_fetch(struct tdb_context *tdb, TDB_DATA key, TDB_DATA *data)
205 {
206         *data = _tdb1_fetch(tdb, key);
207         if (data->dptr == NULL)
208                 return tdb->last_error;
209         return TDB_SUCCESS;
210 }
211
212 /*
213  * Find an entry in the database and hand the record's data to a parsing
214  * function. The parsing function is executed under the chain read lock, so it
215  * should be fast and should not block on other syscalls.
216  *
217  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
218  *
219  * For mmapped tdb's that do not have a transaction open it points the parsing
220  * function directly at the mmap area, it avoids the malloc/memcpy in this
221  * case. If a transaction is open or no mmap is available, it has to do
222  * malloc/read/parse/free.
223  *
224  * This is interesting for all readers of potentially large data structures in
225  * the tdb records, ldb indexes being one example.
226  *
227  * Return -1 if the record was not found.
228  */
229
230 int tdb1_parse_record(struct tdb_context *tdb, TDB_DATA key,
231                      int (*parser)(TDB_DATA key, TDB_DATA data,
232                                    void *private_data),
233                      void *private_data)
234 {
235         tdb1_off_t rec_ptr;
236         struct tdb1_record rec;
237         int ret;
238         uint32_t hash;
239
240         /* find which hash bucket it is in */
241         hash = tdb_hash(tdb, key.dptr, key.dsize);
242
243         if (!(rec_ptr = tdb1_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
244                 /* record not found */
245                 tdb->last_error = TDB_ERR_NOEXIST;
246                 return -1;
247         }
248
249         ret = tdb1_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
250                              rec.data_len, parser, private_data);
251
252         tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_RDLCK);
253
254         return ret;
255 }
256
257 /* check if an entry in the database exists
258
259    note that 1 is returned if the key is found and 0 is returned if not found
260    this doesn't match the conventions in the rest of this module, but is
261    compatible with gdbm
262 */
263 static int tdb1_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
264 {
265         struct tdb1_record rec;
266
267         if (tdb1_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
268                 return 0;
269         tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_RDLCK);
270         return 1;
271 }
272
273 int tdb1_exists(struct tdb_context *tdb, TDB_DATA key)
274 {
275         uint32_t hash = tdb_hash(tdb, key.dptr, key.dsize);
276         int ret;
277
278         assert(tdb->flags & TDB_VERSION1);
279         ret = tdb1_exists_hash(tdb, key, hash);
280         return ret;
281 }
282
283 /* actually delete an entry in the database given the offset */
284 int tdb1_do_delete(struct tdb_context *tdb, tdb1_off_t rec_ptr, struct tdb1_record *rec)
285 {
286         tdb1_off_t last_ptr, i;
287         struct tdb1_record lastrec;
288
289         if ((tdb->flags & TDB_RDONLY) || tdb->tdb1.traverse_read) return -1;
290
291         if (((tdb->tdb1.traverse_write != 0) && (!TDB1_DEAD(rec))) ||
292             tdb1_write_lock_record(tdb, rec_ptr) == -1) {
293                 /* Someone traversing here: mark it as dead */
294                 rec->magic = TDB1_DEAD_MAGIC;
295                 return tdb1_rec_write(tdb, rec_ptr, rec);
296         }
297         if (tdb1_write_unlock_record(tdb, rec_ptr) != 0)
298                 return -1;
299
300         /* find previous record in hash chain */
301         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(rec->full_hash), &i) == -1)
302                 return -1;
303         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
304                 if (tdb1_rec_read(tdb, i, &lastrec) == -1)
305                         return -1;
306
307         /* unlink it: next ptr is at start of record. */
308         if (last_ptr == 0)
309                 last_ptr = TDB1_HASH_TOP(rec->full_hash);
310         if (tdb1_ofs_write(tdb, last_ptr, &rec->next) == -1)
311                 return -1;
312
313         /* recover the space */
314         if (tdb1_free(tdb, rec_ptr, rec) == -1)
315                 return -1;
316         return 0;
317 }
318
319 static int tdb1_count_dead(struct tdb_context *tdb, uint32_t hash)
320 {
321         int res = 0;
322         tdb1_off_t rec_ptr;
323         struct tdb1_record rec;
324
325         /* read in the hash top */
326         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
327                 return 0;
328
329         while (rec_ptr) {
330                 if (tdb1_rec_read(tdb, rec_ptr, &rec) == -1)
331                         return 0;
332
333                 if (rec.magic == TDB1_DEAD_MAGIC) {
334                         res += 1;
335                 }
336                 rec_ptr = rec.next;
337         }
338         return res;
339 }
340
341 /*
342  * Purge all DEAD records from a hash chain
343  */
344 static int tdb1_purge_dead(struct tdb_context *tdb, uint32_t hash)
345 {
346         int res = -1;
347         struct tdb1_record rec;
348         tdb1_off_t rec_ptr;
349
350         if (tdb1_lock(tdb, -1, F_WRLCK) == -1) {
351                 return -1;
352         }
353
354         /* read in the hash top */
355         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
356                 goto fail;
357
358         while (rec_ptr) {
359                 tdb1_off_t next;
360
361                 if (tdb1_rec_read(tdb, rec_ptr, &rec) == -1) {
362                         goto fail;
363                 }
364
365                 next = rec.next;
366
367                 if (rec.magic == TDB1_DEAD_MAGIC
368                     && tdb1_do_delete(tdb, rec_ptr, &rec) == -1) {
369                         goto fail;
370                 }
371                 rec_ptr = next;
372         }
373         res = 0;
374  fail:
375         tdb1_unlock(tdb, -1, F_WRLCK);
376         return res;
377 }
378
379 /* delete an entry in the database given a key */
380 static int tdb1_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
381 {
382         tdb1_off_t rec_ptr;
383         struct tdb1_record rec;
384         int ret;
385
386         if (tdb->tdb1.max_dead_records != 0) {
387
388                 /*
389                  * Allow for some dead records per hash chain, mainly for
390                  * tdb's with a very high create/delete rate like locking.tdb.
391                  */
392
393                 if (tdb1_lock(tdb, TDB1_BUCKET(hash), F_WRLCK) == -1)
394                         return -1;
395
396                 if (tdb1_count_dead(tdb, hash) >= tdb->tdb1.max_dead_records) {
397                         /*
398                          * Don't let the per-chain freelist grow too large,
399                          * delete all existing dead records
400                          */
401                         tdb1_purge_dead(tdb, hash);
402                 }
403
404                 if (!(rec_ptr = tdb1_find(tdb, key, hash, &rec))) {
405                         tdb1_unlock(tdb, TDB1_BUCKET(hash), F_WRLCK);
406                         return -1;
407                 }
408
409                 /*
410                  * Just mark the record as dead.
411                  */
412                 rec.magic = TDB1_DEAD_MAGIC;
413                 ret = tdb1_rec_write(tdb, rec_ptr, &rec);
414         }
415         else {
416                 if (!(rec_ptr = tdb1_find_lock_hash(tdb, key, hash, F_WRLCK,
417                                                    &rec)))
418                         return -1;
419
420                 ret = tdb1_do_delete(tdb, rec_ptr, &rec);
421         }
422
423         if (ret == 0) {
424                 tdb1_increment_seqnum(tdb);
425         }
426
427         if (tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_WRLCK) != 0)
428                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
429                            "tdb1_delete: WARNING tdb1_unlock failed!");
430         return ret;
431 }
432
433 int tdb1_delete(struct tdb_context *tdb, TDB_DATA key)
434 {
435         uint32_t hash = tdb_hash(tdb, key.dptr, key.dsize);
436         int ret;
437
438         assert(tdb->flags & TDB_VERSION1);
439         ret = tdb1_delete_hash(tdb, key, hash);
440         return ret;
441 }
442
443 /*
444  * See if we have a dead record around with enough space
445  */
446 static tdb1_off_t tdb1_find_dead(struct tdb_context *tdb, uint32_t hash,
447                                struct tdb1_record *r, tdb1_len_t length)
448 {
449         tdb1_off_t rec_ptr;
450
451         /* read in the hash top */
452         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
453                 return 0;
454
455         /* keep looking until we find the right record */
456         while (rec_ptr) {
457                 if (tdb1_rec_read(tdb, rec_ptr, r) == -1)
458                         return 0;
459
460                 if (TDB1_DEAD(r) && r->rec_len >= length) {
461                         /*
462                          * First fit for simple coding, TODO: change to best
463                          * fit
464                          */
465                         return rec_ptr;
466                 }
467                 rec_ptr = r->next;
468         }
469         return 0;
470 }
471
472 static int _tdb1_store(struct tdb_context *tdb, TDB_DATA key,
473                        TDB_DATA dbuf, int flag, uint32_t hash)
474 {
475         struct tdb1_record rec;
476         tdb1_off_t rec_ptr;
477         char *p = NULL;
478         int ret = -1;
479
480         /* check for it existing, on insert. */
481         if (flag == TDB_INSERT) {
482                 if (tdb1_exists_hash(tdb, key, hash)) {
483                         tdb->last_error = TDB_ERR_EXISTS;
484                         goto fail;
485                 }
486         } else {
487                 /* first try in-place update, on modify or replace. */
488                 if (tdb1_update_hash(tdb, key, hash, dbuf) == 0) {
489                         goto done;
490                 }
491                 if (tdb->last_error == TDB_ERR_NOEXIST &&
492                     flag == TDB_MODIFY) {
493                         /* if the record doesn't exist and we are in TDB1_MODIFY mode then
494                          we should fail the store */
495                         goto fail;
496                 }
497         }
498         /* reset the error code potentially set by the tdb1_update() */
499         tdb->last_error = TDB_SUCCESS;
500
501         /* delete any existing record - if it doesn't exist we don't
502            care.  Doing this first reduces fragmentation, and avoids
503            coalescing with `allocated' block before it's updated. */
504         if (flag != TDB_INSERT)
505                 tdb1_delete_hash(tdb, key, hash);
506
507         /* Copy key+value *before* allocating free space in case malloc
508            fails and we are left with a dead spot in the tdb. */
509
510         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
511                 tdb->last_error = TDB_ERR_OOM;
512                 goto fail;
513         }
514
515         memcpy(p, key.dptr, key.dsize);
516         if (dbuf.dsize)
517                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
518
519         if (tdb->tdb1.max_dead_records != 0) {
520                 /*
521                  * Allow for some dead records per hash chain, look if we can
522                  * find one that can hold the new record. We need enough space
523                  * for key, data and tailer. If we find one, we don't have to
524                  * consult the central freelist.
525                  */
526                 rec_ptr = tdb1_find_dead(
527                         tdb, hash, &rec,
528                         key.dsize + dbuf.dsize + sizeof(tdb1_off_t));
529
530                 if (rec_ptr != 0) {
531                         rec.key_len = key.dsize;
532                         rec.data_len = dbuf.dsize;
533                         rec.full_hash = hash;
534                         rec.magic = TDB1_MAGIC;
535                         if (tdb1_rec_write(tdb, rec_ptr, &rec) == -1
536                             || tdb->tdb1.io->tdb1_write(
537                                     tdb, rec_ptr + sizeof(rec),
538                                     p, key.dsize + dbuf.dsize) == -1) {
539                                 goto fail;
540                         }
541                         goto done;
542                 }
543         }
544
545         /*
546          * We have to allocate some space from the freelist, so this means we
547          * have to lock it. Use the chance to purge all the DEAD records from
548          * the hash chain under the freelist lock.
549          */
550
551         if (tdb1_lock(tdb, -1, F_WRLCK) == -1) {
552                 goto fail;
553         }
554
555         if ((tdb->tdb1.max_dead_records != 0)
556             && (tdb1_purge_dead(tdb, hash) == -1)) {
557                 tdb1_unlock(tdb, -1, F_WRLCK);
558                 goto fail;
559         }
560
561         /* we have to allocate some space */
562         rec_ptr = tdb1_allocate(tdb, key.dsize + dbuf.dsize, &rec);
563
564         tdb1_unlock(tdb, -1, F_WRLCK);
565
566         if (rec_ptr == 0) {
567                 goto fail;
568         }
569
570         /* Read hash top into next ptr */
571         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec.next) == -1)
572                 goto fail;
573
574         rec.key_len = key.dsize;
575         rec.data_len = dbuf.dsize;
576         rec.full_hash = hash;
577         rec.magic = TDB1_MAGIC;
578
579         /* write out and point the top of the hash chain at it */
580         if (tdb1_rec_write(tdb, rec_ptr, &rec) == -1
581             || tdb->tdb1.io->tdb1_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
582             || tdb1_ofs_write(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1) {
583                 /* Need to tdb1_unallocate() here */
584                 goto fail;
585         }
586
587  done:
588         ret = 0;
589  fail:
590         if (ret == 0) {
591                 tdb1_increment_seqnum(tdb);
592         }
593
594         SAFE_FREE(p);
595         return ret;
596 }
597
598 /* store an element in the database, replacing any existing element
599    with the same key
600
601    return 0 on success, -1 on failure
602 */
603 int tdb1_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
604 {
605         uint32_t hash;
606         int ret;
607
608         assert(tdb->flags & TDB_VERSION1);
609
610         if ((tdb->flags & TDB_RDONLY) || tdb->tdb1.traverse_read) {
611                 tdb->last_error = TDB_ERR_RDONLY;
612                 return -1;
613         }
614
615         /* find which hash bucket it is in */
616         hash = tdb_hash(tdb, key.dptr, key.dsize);
617         if (tdb1_lock(tdb, TDB1_BUCKET(hash), F_WRLCK) == -1)
618                 return -1;
619
620         ret = _tdb1_store(tdb, key, dbuf, flag, hash);
621         tdb1_unlock(tdb, TDB1_BUCKET(hash), F_WRLCK);
622         return ret;
623 }
624
625 /* Append to an entry. Create if not exist. */
626 int tdb1_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
627 {
628         uint32_t hash;
629         TDB_DATA dbuf;
630         int ret = -1;
631
632         assert(tdb->flags & TDB_VERSION1);
633
634         /* find which hash bucket it is in */
635         hash = tdb_hash(tdb, key.dptr, key.dsize);
636         if (tdb1_lock(tdb, TDB1_BUCKET(hash), F_WRLCK) == -1)
637                 return -1;
638
639         dbuf = _tdb1_fetch(tdb, key);
640
641         if (dbuf.dptr == NULL) {
642                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
643         } else {
644                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
645                 unsigned char *new_dptr;
646
647                 /* realloc '0' is special: don't do that. */
648                 if (new_len == 0)
649                         new_len = 1;
650                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
651                 if (new_dptr == NULL) {
652                         free(dbuf.dptr);
653                 }
654                 dbuf.dptr = new_dptr;
655         }
656
657         if (dbuf.dptr == NULL) {
658                 tdb->last_error = TDB_ERR_OOM;
659                 goto failed;
660         }
661
662         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
663         dbuf.dsize += new_dbuf.dsize;
664
665         ret = _tdb1_store(tdb, key, dbuf, 0, hash);
666
667 failed:
668         tdb1_unlock(tdb, TDB1_BUCKET(hash), F_WRLCK);
669         SAFE_FREE(dbuf.dptr);
670         return ret;
671 }
672
673
674 /*
675   get the tdb sequence number. Only makes sense if the writers opened
676   with TDB1_SEQNUM set. Note that this sequence number will wrap quite
677   quickly, so it should only be used for a 'has something changed'
678   test, not for code that relies on the count of the number of changes
679   made. If you want a counter then use a tdb record.
680
681   The aim of this sequence number is to allow for a very lightweight
682   test of a possible tdb change.
683 */
684 int tdb1_get_seqnum(struct tdb_context *tdb)
685 {
686         tdb1_off_t seqnum=0;
687
688         tdb1_ofs_read(tdb, TDB1_SEQNUM_OFS, &seqnum);
689         return seqnum;
690 }
691
692
693 /*
694   add a region of the file to the freelist. Length is the size of the region in bytes,
695   which includes the free list header that needs to be added
696  */
697 static int tdb1_free_region(struct tdb_context *tdb, tdb1_off_t offset, ssize_t length)
698 {
699         struct tdb1_record rec;
700         if (length <= sizeof(rec)) {
701                 /* the region is not worth adding */
702                 return 0;
703         }
704         if (length + offset > tdb->file->map_size) {
705                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
706                                         "tdb1_free_region: adding region beyond"
707                                         " end of file");
708                 return -1;
709         }
710         memset(&rec,'\0',sizeof(rec));
711         rec.rec_len = length - sizeof(rec);
712         if (tdb1_free(tdb, offset, &rec) == -1) {
713                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
714                            "tdb1_free_region: failed to add free record");
715                 return -1;
716         }
717         return 0;
718 }
719
720 /*
721   wipe the entire database, deleting all records. This can be done
722   very fast by using a allrecord lock. The entire data portion of the
723   file becomes a single entry in the freelist.
724
725   This code carefully steps around the recovery area, leaving it alone
726  */
727 int tdb1_wipe_all(struct tdb_context *tdb)
728 {
729         int i;
730         tdb1_off_t offset = 0;
731         ssize_t data_len;
732         tdb1_off_t recovery_head;
733         tdb1_len_t recovery_size = 0;
734
735         if (tdb1_lockall(tdb) != 0) {
736                 return -1;
737         }
738
739
740         /* see if the tdb has a recovery area, and remember its size
741            if so. We don't want to lose this as otherwise each
742            tdb1_wipe_all() in a transaction will increase the size of
743            the tdb by the size of the recovery area */
744         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
745                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
746                            "tdb1_wipe_all: failed to read recovery head");
747                 goto failed;
748         }
749
750         if (recovery_head != 0) {
751                 struct tdb1_record rec;
752                 if (tdb->tdb1.io->tdb1_read(tdb, recovery_head, &rec, sizeof(rec), TDB1_DOCONV()) == -1) {
753                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
754                                    "tdb1_wipe_all: failed to read recovery record");
755                         return -1;
756                 }
757                 recovery_size = rec.rec_len + sizeof(rec);
758         }
759
760         /* wipe the hashes */
761         for (i=0;i<tdb->tdb1.header.hash_size;i++) {
762                 if (tdb1_ofs_write(tdb, TDB1_HASH_TOP(i), &offset) == -1) {
763                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
764                                    "tdb1_wipe_all: failed to write hash %d", i);
765                         goto failed;
766                 }
767         }
768
769         /* wipe the freelist */
770         if (tdb1_ofs_write(tdb, TDB1_FREELIST_TOP, &offset) == -1) {
771                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
772                            "tdb1_wipe_all: failed to write freelist");
773                 goto failed;
774         }
775
776         /* add all the rest of the file to the freelist, possibly leaving a gap
777            for the recovery area */
778         if (recovery_size == 0) {
779                 /* the simple case - the whole file can be used as a freelist */
780                 data_len = (tdb->file->map_size - TDB1_DATA_START(tdb->tdb1.header.hash_size));
781                 if (tdb1_free_region(tdb, TDB1_DATA_START(tdb->tdb1.header.hash_size), data_len) != 0) {
782                         goto failed;
783                 }
784         } else {
785                 /* we need to add two freelist entries - one on either
786                    side of the recovery area
787
788                    Note that we cannot shift the recovery area during
789                    this operation. Only the transaction.c code may
790                    move the recovery area or we risk subtle data
791                    corruption
792                 */
793                 data_len = (recovery_head - TDB1_DATA_START(tdb->tdb1.header.hash_size));
794                 if (tdb1_free_region(tdb, TDB1_DATA_START(tdb->tdb1.header.hash_size), data_len) != 0) {
795                         goto failed;
796                 }
797                 /* and the 2nd free list entry after the recovery area - if any */
798                 data_len = tdb->file->map_size - (recovery_head+recovery_size);
799                 if (tdb1_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
800                         goto failed;
801                 }
802         }
803
804         if (tdb1_unlockall(tdb) != 0) {
805                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
806                            "tdb1_wipe_all: failed to unlock");
807                 goto failed;
808         }
809
810         return 0;
811
812 failed:
813         tdb1_unlockall(tdb);
814         return -1;
815 }
816
817 struct traverse_state {
818         enum TDB_ERROR error;
819         struct tdb_context *dest_db;
820 };
821
822 /*
823   traverse function for repacking
824  */
825 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
826 {
827         struct traverse_state *state = (struct traverse_state *)private_data;
828         if (tdb1_store(state->dest_db, key, data, TDB_INSERT) != 0) {
829                 state->error = state->dest_db->last_error;
830                 return -1;
831         }
832         return 0;
833 }
834
835 /*
836   repack a tdb
837  */
838 int tdb1_repack(struct tdb_context *tdb)
839 {
840         struct tdb_context *tmp_db;
841         struct traverse_state state;
842         union tdb_attribute hsize;
843
844         hsize.base.attr = TDB_ATTRIBUTE_TDB1_HASHSIZE;
845         hsize.base.next = NULL;
846         hsize.tdb1_hashsize.hsize = tdb->tdb1.header.hash_size;
847
848         if (tdb1_transaction_start(tdb) != 0) {
849                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
850                            __location__ " Failed to start transaction");
851                 return -1;
852         }
853
854         tmp_db = tdb_open("tmpdb", TDB_INTERNAL, O_RDWR|O_CREAT, 0, &hsize);
855         if (tmp_db == NULL) {
856                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
857                                         __location__ " Failed to create tmp_db");
858                 tdb1_transaction_cancel(tdb);
859                 return -1;
860         }
861
862         state.error = TDB_SUCCESS;
863         state.dest_db = tmp_db;
864
865         if (tdb1_traverse_read(tdb, repack_traverse, &state) == -1) {
866                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
867                            __location__ " Failed to traverse copying out");
868                 tdb1_transaction_cancel(tdb);
869                 tdb_close(tmp_db);
870                 return -1;
871         }
872
873         if (state.error != TDB_SUCCESS) {
874                 tdb->last_error = tdb_logerr(tdb, state.error, TDB_LOG_ERROR,
875                                         __location__ " Error during traversal");
876                 tdb1_transaction_cancel(tdb);
877                 tdb_close(tmp_db);
878                 return -1;
879         }
880
881         if (tdb1_wipe_all(tdb) != 0) {
882                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
883                            __location__ " Failed to wipe database\n");
884                 tdb1_transaction_cancel(tdb);
885                 tdb_close(tmp_db);
886                 return -1;
887         }
888
889         state.error = TDB_SUCCESS;
890         state.dest_db = tdb;
891
892         if (tdb1_traverse_read(tmp_db, repack_traverse, &state) == -1) {
893                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
894                            __location__ " Failed to traverse copying back");
895                 tdb1_transaction_cancel(tdb);
896                 tdb_close(tmp_db);
897                 return -1;
898         }
899
900         if (state.error) {
901                 tdb->last_error = tdb_logerr(tdb, state.error, TDB_LOG_ERROR,
902                                         __location__ " Error during second traversal");
903                 tdb1_transaction_cancel(tdb);
904                 tdb_close(tmp_db);
905                 return -1;
906         }
907
908         tdb_close(tmp_db);
909
910         if (tdb1_transaction_commit(tdb) != 0) {
911                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
912                            __location__ " Failed to commit");
913                 return -1;
914         }
915
916         return 0;
917 }
918
919 /* Even on files, we can get partial writes due to signals. */
920 bool tdb1_write_all(int fd, const void *buf, size_t count)
921 {
922         while (count) {
923                 ssize_t ret;
924                 ret = write(fd, buf, count);
925                 if (ret < 0)
926                         return false;
927                 buf = (const char *)buf + ret;
928                 count -= ret;
929         }
930         return true;
931 }