]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/tdb1_tdb.c
16e945974eaf9e2bcbf6ed213c6e5096bfb18c49
[ccan] / ccan / tdb2 / tdb1_tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb1_private.h"
29 #include <assert.h>
30
31 TDB_DATA tdb1_null;
32
33 /*
34   non-blocking increment of the tdb sequence number if the tdb has been opened using
35   the TDB_SEQNUM flag
36 */
37 void tdb1_increment_seqnum_nonblock(struct tdb_context *tdb)
38 {
39         tdb1_off_t seqnum=0;
40
41         if (!(tdb->flags & TDB_SEQNUM)) {
42                 return;
43         }
44
45         /* we ignore errors from this, as we have no sane way of
46            dealing with them.
47         */
48         tdb1_ofs_read(tdb, TDB1_SEQNUM_OFS, &seqnum);
49         seqnum++;
50         tdb1_ofs_write(tdb, TDB1_SEQNUM_OFS, &seqnum);
51 }
52
53 /*
54   increment the tdb sequence number if the tdb has been opened using
55   the TDB_SEQNUM flag
56 */
57 static void tdb1_increment_seqnum(struct tdb_context *tdb)
58 {
59         if (!(tdb->flags & TDB_SEQNUM)) {
60                 return;
61         }
62
63         if (tdb1_nest_lock(tdb, TDB1_SEQNUM_OFS, F_WRLCK,
64                            TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
65                 return;
66         }
67
68         tdb1_increment_seqnum_nonblock(tdb);
69
70         tdb1_nest_unlock(tdb, TDB1_SEQNUM_OFS, F_WRLCK);
71 }
72
73 static int tdb1_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
74 {
75         return memcmp(data.dptr, key.dptr, data.dsize);
76 }
77
78 /* Returns 0 on fail.  On success, return offset of record, and fills
79    in rec */
80 static tdb1_off_t tdb1_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
81                         struct tdb1_record *r)
82 {
83         tdb1_off_t rec_ptr;
84
85         /* read in the hash top */
86         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
87                 return 0;
88
89         /* keep looking until we find the right record */
90         while (rec_ptr) {
91                 if (tdb1_rec_read(tdb, rec_ptr, r) == -1)
92                         return 0;
93
94                 if (!TDB1_DEAD(r) && hash==r->full_hash
95                     && key.dsize==r->key_len
96                     && tdb1_parse_data(tdb, key, rec_ptr + sizeof(*r),
97                                       r->key_len, tdb1_key_compare,
98                                       NULL) == 0) {
99                         return rec_ptr;
100                 }
101                 /* detect tight infinite loop */
102                 if (rec_ptr == r->next) {
103                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT,
104                                                 TDB_LOG_ERROR,
105                                                 "tdb1_find: loop detected.");
106                         return 0;
107                 }
108                 rec_ptr = r->next;
109         }
110         tdb->last_error = TDB_ERR_NOEXIST;
111         return 0;
112 }
113
114 /* As tdb1_find, but if you succeed, keep the lock */
115 tdb1_off_t tdb1_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
116                            struct tdb1_record *rec)
117 {
118         uint32_t rec_ptr;
119
120         if (tdb1_lock(tdb, TDB1_BUCKET(hash), locktype) == -1)
121                 return 0;
122         if (!(rec_ptr = tdb1_find(tdb, key, hash, rec)))
123                 tdb1_unlock(tdb, TDB1_BUCKET(hash), locktype);
124         return rec_ptr;
125 }
126
127 static TDB_DATA _tdb1_fetch(struct tdb_context *tdb, TDB_DATA key);
128
129 /* update an entry in place - this only works if the new data size
130    is <= the old data size and the key exists.
131    on failure return -1.
132 */
133 static int tdb1_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
134 {
135         struct tdb1_record rec;
136         tdb1_off_t rec_ptr;
137
138         /* find entry */
139         if (!(rec_ptr = tdb1_find(tdb, key, hash, &rec)))
140                 return -1;
141
142         /* it could be an exact duplicate of what is there - this is
143          * surprisingly common (eg. with a ldb re-index). */
144         if (rec.key_len == key.dsize &&
145             rec.data_len == dbuf.dsize &&
146             rec.full_hash == hash) {
147                 TDB_DATA data = _tdb1_fetch(tdb, key);
148                 if (data.dsize == dbuf.dsize &&
149                     memcmp(data.dptr, dbuf.dptr, data.dsize) == 0) {
150                         if (data.dptr) {
151                                 free(data.dptr);
152                         }
153                         return 0;
154                 }
155                 if (data.dptr) {
156                         free(data.dptr);
157                 }
158         }
159
160         /* must be long enough key, data and tailer */
161         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb1_off_t)) {
162                 tdb->last_error = TDB_SUCCESS; /* Not really an error */
163                 return -1;
164         }
165
166         if (tdb->tdb1.io->tdb1_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
167                       dbuf.dptr, dbuf.dsize) == -1)
168                 return -1;
169
170         if (dbuf.dsize != rec.data_len) {
171                 /* update size */
172                 rec.data_len = dbuf.dsize;
173                 return tdb1_rec_write(tdb, rec_ptr, &rec);
174         }
175
176         return 0;
177 }
178
179 /* find an entry in the database given a key */
180 /* If an entry doesn't exist tdb1_err will be set to
181  * TDB_ERR_NOEXIST. If a key has no data attached
182  * then the TDB_DATA will have zero length but
183  * a non-zero pointer
184  */
185 static TDB_DATA _tdb1_fetch(struct tdb_context *tdb, TDB_DATA key)
186 {
187         tdb1_off_t rec_ptr;
188         struct tdb1_record rec;
189         TDB_DATA ret;
190         uint32_t hash;
191
192         /* find which hash bucket it is in */
193         hash = tdb_hash(tdb, key.dptr, key.dsize);
194         if (!(rec_ptr = tdb1_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
195                 return tdb1_null;
196
197         ret.dptr = tdb1_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
198                                   rec.data_len);
199         ret.dsize = rec.data_len;
200         tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_RDLCK);
201         return ret;
202 }
203
204 enum TDB_ERROR tdb1_fetch(struct tdb_context *tdb, TDB_DATA key, TDB_DATA *data)
205 {
206         *data = _tdb1_fetch(tdb, key);
207         if (data->dptr == NULL)
208                 return tdb->last_error;
209         return TDB_SUCCESS;
210 }
211
212 /*
213  * Find an entry in the database and hand the record's data to a parsing
214  * function. The parsing function is executed under the chain read lock, so it
215  * should be fast and should not block on other syscalls.
216  *
217  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
218  *
219  * For mmapped tdb's that do not have a transaction open it points the parsing
220  * function directly at the mmap area, it avoids the malloc/memcpy in this
221  * case. If a transaction is open or no mmap is available, it has to do
222  * malloc/read/parse/free.
223  *
224  * This is interesting for all readers of potentially large data structures in
225  * the tdb records, ldb indexes being one example.
226  *
227  * Return -1 if the record was not found.
228  */
229
230 int tdb1_parse_record(struct tdb_context *tdb, TDB_DATA key,
231                      int (*parser)(TDB_DATA key, TDB_DATA data,
232                                    void *private_data),
233                      void *private_data)
234 {
235         tdb1_off_t rec_ptr;
236         struct tdb1_record rec;
237         int ret;
238         uint32_t hash;
239
240         /* find which hash bucket it is in */
241         hash = tdb_hash(tdb, key.dptr, key.dsize);
242
243         if (!(rec_ptr = tdb1_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
244                 /* record not found */
245                 tdb->last_error = TDB_ERR_NOEXIST;
246                 return -1;
247         }
248
249         ret = tdb1_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
250                              rec.data_len, parser, private_data);
251
252         tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_RDLCK);
253
254         return ret;
255 }
256
257 /* check if an entry in the database exists
258
259    note that 1 is returned if the key is found and 0 is returned if not found
260    this doesn't match the conventions in the rest of this module, but is
261    compatible with gdbm
262 */
263 static int tdb1_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
264 {
265         struct tdb1_record rec;
266
267         if (tdb1_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
268                 return 0;
269         tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_RDLCK);
270         return 1;
271 }
272
273 int tdb1_exists(struct tdb_context *tdb, TDB_DATA key)
274 {
275         uint32_t hash = tdb_hash(tdb, key.dptr, key.dsize);
276         int ret;
277
278         ret = tdb1_exists_hash(tdb, key, hash);
279         return ret;
280 }
281
282 /* actually delete an entry in the database given the offset */
283 int tdb1_do_delete(struct tdb_context *tdb, tdb1_off_t rec_ptr, struct tdb1_record *rec)
284 {
285         tdb1_off_t last_ptr, i;
286         struct tdb1_record lastrec;
287
288         if ((tdb->flags & TDB_RDONLY) || tdb->tdb1.traverse_read) return -1;
289
290         if (((tdb->tdb1.traverse_write != 0) && (!TDB1_DEAD(rec))) ||
291             tdb1_write_lock_record(tdb, rec_ptr) == -1) {
292                 /* Someone traversing here: mark it as dead */
293                 rec->magic = TDB1_DEAD_MAGIC;
294                 return tdb1_rec_write(tdb, rec_ptr, rec);
295         }
296         if (tdb1_write_unlock_record(tdb, rec_ptr) != 0)
297                 return -1;
298
299         /* find previous record in hash chain */
300         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(rec->full_hash), &i) == -1)
301                 return -1;
302         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
303                 if (tdb1_rec_read(tdb, i, &lastrec) == -1)
304                         return -1;
305
306         /* unlink it: next ptr is at start of record. */
307         if (last_ptr == 0)
308                 last_ptr = TDB1_HASH_TOP(rec->full_hash);
309         if (tdb1_ofs_write(tdb, last_ptr, &rec->next) == -1)
310                 return -1;
311
312         /* recover the space */
313         if (tdb1_free(tdb, rec_ptr, rec) == -1)
314                 return -1;
315         return 0;
316 }
317
318 static int tdb1_count_dead(struct tdb_context *tdb, uint32_t hash)
319 {
320         int res = 0;
321         tdb1_off_t rec_ptr;
322         struct tdb1_record rec;
323
324         /* read in the hash top */
325         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
326                 return 0;
327
328         while (rec_ptr) {
329                 if (tdb1_rec_read(tdb, rec_ptr, &rec) == -1)
330                         return 0;
331
332                 if (rec.magic == TDB1_DEAD_MAGIC) {
333                         res += 1;
334                 }
335                 rec_ptr = rec.next;
336         }
337         return res;
338 }
339
340 /*
341  * Purge all DEAD records from a hash chain
342  */
343 static int tdb1_purge_dead(struct tdb_context *tdb, uint32_t hash)
344 {
345         int res = -1;
346         struct tdb1_record rec;
347         tdb1_off_t rec_ptr;
348
349         if (tdb1_lock(tdb, -1, F_WRLCK) == -1) {
350                 return -1;
351         }
352
353         /* read in the hash top */
354         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
355                 goto fail;
356
357         while (rec_ptr) {
358                 tdb1_off_t next;
359
360                 if (tdb1_rec_read(tdb, rec_ptr, &rec) == -1) {
361                         goto fail;
362                 }
363
364                 next = rec.next;
365
366                 if (rec.magic == TDB1_DEAD_MAGIC
367                     && tdb1_do_delete(tdb, rec_ptr, &rec) == -1) {
368                         goto fail;
369                 }
370                 rec_ptr = next;
371         }
372         res = 0;
373  fail:
374         tdb1_unlock(tdb, -1, F_WRLCK);
375         return res;
376 }
377
378 /* delete an entry in the database given a key */
379 static int tdb1_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
380 {
381         tdb1_off_t rec_ptr;
382         struct tdb1_record rec;
383         int ret;
384
385         if (tdb->tdb1.max_dead_records != 0) {
386
387                 /*
388                  * Allow for some dead records per hash chain, mainly for
389                  * tdb's with a very high create/delete rate like locking.tdb.
390                  */
391
392                 if (tdb1_lock(tdb, TDB1_BUCKET(hash), F_WRLCK) == -1)
393                         return -1;
394
395                 if (tdb1_count_dead(tdb, hash) >= tdb->tdb1.max_dead_records) {
396                         /*
397                          * Don't let the per-chain freelist grow too large,
398                          * delete all existing dead records
399                          */
400                         tdb1_purge_dead(tdb, hash);
401                 }
402
403                 if (!(rec_ptr = tdb1_find(tdb, key, hash, &rec))) {
404                         tdb1_unlock(tdb, TDB1_BUCKET(hash), F_WRLCK);
405                         return -1;
406                 }
407
408                 /*
409                  * Just mark the record as dead.
410                  */
411                 rec.magic = TDB1_DEAD_MAGIC;
412                 ret = tdb1_rec_write(tdb, rec_ptr, &rec);
413         }
414         else {
415                 if (!(rec_ptr = tdb1_find_lock_hash(tdb, key, hash, F_WRLCK,
416                                                    &rec)))
417                         return -1;
418
419                 ret = tdb1_do_delete(tdb, rec_ptr, &rec);
420         }
421
422         if (ret == 0) {
423                 tdb1_increment_seqnum(tdb);
424         }
425
426         if (tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_WRLCK) != 0)
427                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
428                            "tdb1_delete: WARNING tdb1_unlock failed!");
429         return ret;
430 }
431
432 int tdb1_delete(struct tdb_context *tdb, TDB_DATA key)
433 {
434         uint32_t hash = tdb_hash(tdb, key.dptr, key.dsize);
435         int ret;
436
437         assert(tdb->flags & TDB_VERSION1);
438         ret = tdb1_delete_hash(tdb, key, hash);
439         return ret;
440 }
441
442 /*
443  * See if we have a dead record around with enough space
444  */
445 static tdb1_off_t tdb1_find_dead(struct tdb_context *tdb, uint32_t hash,
446                                struct tdb1_record *r, tdb1_len_t length)
447 {
448         tdb1_off_t rec_ptr;
449
450         /* read in the hash top */
451         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
452                 return 0;
453
454         /* keep looking until we find the right record */
455         while (rec_ptr) {
456                 if (tdb1_rec_read(tdb, rec_ptr, r) == -1)
457                         return 0;
458
459                 if (TDB1_DEAD(r) && r->rec_len >= length) {
460                         /*
461                          * First fit for simple coding, TODO: change to best
462                          * fit
463                          */
464                         return rec_ptr;
465                 }
466                 rec_ptr = r->next;
467         }
468         return 0;
469 }
470
471 static int _tdb1_store(struct tdb_context *tdb, TDB_DATA key,
472                        TDB_DATA dbuf, int flag, uint32_t hash)
473 {
474         struct tdb1_record rec;
475         tdb1_off_t rec_ptr;
476         char *p = NULL;
477         int ret = -1;
478
479         /* check for it existing, on insert. */
480         if (flag == TDB_INSERT) {
481                 if (tdb1_exists_hash(tdb, key, hash)) {
482                         tdb->last_error = TDB_ERR_EXISTS;
483                         goto fail;
484                 }
485         } else {
486                 /* first try in-place update, on modify or replace. */
487                 if (tdb1_update_hash(tdb, key, hash, dbuf) == 0) {
488                         goto done;
489                 }
490                 if (tdb->last_error == TDB_ERR_NOEXIST &&
491                     flag == TDB_MODIFY) {
492                         /* if the record doesn't exist and we are in TDB1_MODIFY mode then
493                          we should fail the store */
494                         goto fail;
495                 }
496         }
497         /* reset the error code potentially set by the tdb1_update() */
498         tdb->last_error = TDB_SUCCESS;
499
500         /* delete any existing record - if it doesn't exist we don't
501            care.  Doing this first reduces fragmentation, and avoids
502            coalescing with `allocated' block before it's updated. */
503         if (flag != TDB_INSERT)
504                 tdb1_delete_hash(tdb, key, hash);
505
506         /* Copy key+value *before* allocating free space in case malloc
507            fails and we are left with a dead spot in the tdb. */
508
509         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
510                 tdb->last_error = TDB_ERR_OOM;
511                 goto fail;
512         }
513
514         memcpy(p, key.dptr, key.dsize);
515         if (dbuf.dsize)
516                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
517
518         if (tdb->tdb1.max_dead_records != 0) {
519                 /*
520                  * Allow for some dead records per hash chain, look if we can
521                  * find one that can hold the new record. We need enough space
522                  * for key, data and tailer. If we find one, we don't have to
523                  * consult the central freelist.
524                  */
525                 rec_ptr = tdb1_find_dead(
526                         tdb, hash, &rec,
527                         key.dsize + dbuf.dsize + sizeof(tdb1_off_t));
528
529                 if (rec_ptr != 0) {
530                         rec.key_len = key.dsize;
531                         rec.data_len = dbuf.dsize;
532                         rec.full_hash = hash;
533                         rec.magic = TDB1_MAGIC;
534                         if (tdb1_rec_write(tdb, rec_ptr, &rec) == -1
535                             || tdb->tdb1.io->tdb1_write(
536                                     tdb, rec_ptr + sizeof(rec),
537                                     p, key.dsize + dbuf.dsize) == -1) {
538                                 goto fail;
539                         }
540                         goto done;
541                 }
542         }
543
544         /*
545          * We have to allocate some space from the freelist, so this means we
546          * have to lock it. Use the chance to purge all the DEAD records from
547          * the hash chain under the freelist lock.
548          */
549
550         if (tdb1_lock(tdb, -1, F_WRLCK) == -1) {
551                 goto fail;
552         }
553
554         if ((tdb->tdb1.max_dead_records != 0)
555             && (tdb1_purge_dead(tdb, hash) == -1)) {
556                 tdb1_unlock(tdb, -1, F_WRLCK);
557                 goto fail;
558         }
559
560         /* we have to allocate some space */
561         rec_ptr = tdb1_allocate(tdb, key.dsize + dbuf.dsize, &rec);
562
563         tdb1_unlock(tdb, -1, F_WRLCK);
564
565         if (rec_ptr == 0) {
566                 goto fail;
567         }
568
569         /* Read hash top into next ptr */
570         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec.next) == -1)
571                 goto fail;
572
573         rec.key_len = key.dsize;
574         rec.data_len = dbuf.dsize;
575         rec.full_hash = hash;
576         rec.magic = TDB1_MAGIC;
577
578         /* write out and point the top of the hash chain at it */
579         if (tdb1_rec_write(tdb, rec_ptr, &rec) == -1
580             || tdb->tdb1.io->tdb1_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
581             || tdb1_ofs_write(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1) {
582                 /* Need to tdb1_unallocate() here */
583                 goto fail;
584         }
585
586  done:
587         ret = 0;
588  fail:
589         if (ret == 0) {
590                 tdb1_increment_seqnum(tdb);
591         }
592
593         SAFE_FREE(p);
594         return ret;
595 }
596
597 /* store an element in the database, replacing any existing element
598    with the same key
599
600    return 0 on success, -1 on failure
601 */
602 int tdb1_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
603 {
604         uint32_t hash;
605         int ret;
606
607         assert(tdb->flags & TDB_VERSION1);
608
609         if ((tdb->flags & TDB_RDONLY) || tdb->tdb1.traverse_read) {
610                 tdb->last_error = TDB_ERR_RDONLY;
611                 return -1;
612         }
613
614         /* find which hash bucket it is in */
615         hash = tdb_hash(tdb, key.dptr, key.dsize);
616         if (tdb1_lock(tdb, TDB1_BUCKET(hash), F_WRLCK) == -1)
617                 return -1;
618
619         ret = _tdb1_store(tdb, key, dbuf, flag, hash);
620         tdb1_unlock(tdb, TDB1_BUCKET(hash), F_WRLCK);
621         return ret;
622 }
623
624 /* Append to an entry. Create if not exist. */
625 int tdb1_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
626 {
627         uint32_t hash;
628         TDB_DATA dbuf;
629         int ret = -1;
630
631         assert(tdb->flags & TDB_VERSION1);
632
633         /* find which hash bucket it is in */
634         hash = tdb_hash(tdb, key.dptr, key.dsize);
635         if (tdb1_lock(tdb, TDB1_BUCKET(hash), F_WRLCK) == -1)
636                 return -1;
637
638         dbuf = _tdb1_fetch(tdb, key);
639
640         if (dbuf.dptr == NULL) {
641                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
642         } else {
643                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
644                 unsigned char *new_dptr;
645
646                 /* realloc '0' is special: don't do that. */
647                 if (new_len == 0)
648                         new_len = 1;
649                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
650                 if (new_dptr == NULL) {
651                         free(dbuf.dptr);
652                 }
653                 dbuf.dptr = new_dptr;
654         }
655
656         if (dbuf.dptr == NULL) {
657                 tdb->last_error = TDB_ERR_OOM;
658                 goto failed;
659         }
660
661         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
662         dbuf.dsize += new_dbuf.dsize;
663
664         ret = _tdb1_store(tdb, key, dbuf, 0, hash);
665
666 failed:
667         tdb1_unlock(tdb, TDB1_BUCKET(hash), F_WRLCK);
668         SAFE_FREE(dbuf.dptr);
669         return ret;
670 }
671
672
673 /*
674   get the tdb sequence number. Only makes sense if the writers opened
675   with TDB1_SEQNUM set. Note that this sequence number will wrap quite
676   quickly, so it should only be used for a 'has something changed'
677   test, not for code that relies on the count of the number of changes
678   made. If you want a counter then use a tdb record.
679
680   The aim of this sequence number is to allow for a very lightweight
681   test of a possible tdb change.
682 */
683 int tdb1_get_seqnum(struct tdb_context *tdb)
684 {
685         tdb1_off_t seqnum=0;
686
687         tdb1_ofs_read(tdb, TDB1_SEQNUM_OFS, &seqnum);
688         return seqnum;
689 }
690
691
692 /*
693   add a region of the file to the freelist. Length is the size of the region in bytes,
694   which includes the free list header that needs to be added
695  */
696 static int tdb1_free_region(struct tdb_context *tdb, tdb1_off_t offset, ssize_t length)
697 {
698         struct tdb1_record rec;
699         if (length <= sizeof(rec)) {
700                 /* the region is not worth adding */
701                 return 0;
702         }
703         if (length + offset > tdb->file->map_size) {
704                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
705                                         "tdb1_free_region: adding region beyond"
706                                         " end of file");
707                 return -1;
708         }
709         memset(&rec,'\0',sizeof(rec));
710         rec.rec_len = length - sizeof(rec);
711         if (tdb1_free(tdb, offset, &rec) == -1) {
712                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
713                            "tdb1_free_region: failed to add free record");
714                 return -1;
715         }
716         return 0;
717 }
718
719 /*
720   wipe the entire database, deleting all records. This can be done
721   very fast by using a allrecord lock. The entire data portion of the
722   file becomes a single entry in the freelist.
723
724   This code carefully steps around the recovery area, leaving it alone
725  */
726 int tdb1_wipe_all(struct tdb_context *tdb)
727 {
728         int i;
729         tdb1_off_t offset = 0;
730         ssize_t data_len;
731         tdb1_off_t recovery_head;
732         tdb1_len_t recovery_size = 0;
733
734         if (tdb1_lockall(tdb) != 0) {
735                 return -1;
736         }
737
738
739         /* see if the tdb has a recovery area, and remember its size
740            if so. We don't want to lose this as otherwise each
741            tdb1_wipe_all() in a transaction will increase the size of
742            the tdb by the size of the recovery area */
743         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
744                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
745                            "tdb1_wipe_all: failed to read recovery head");
746                 goto failed;
747         }
748
749         if (recovery_head != 0) {
750                 struct tdb1_record rec;
751                 if (tdb->tdb1.io->tdb1_read(tdb, recovery_head, &rec, sizeof(rec), TDB1_DOCONV()) == -1) {
752                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
753                                    "tdb1_wipe_all: failed to read recovery record");
754                         return -1;
755                 }
756                 recovery_size = rec.rec_len + sizeof(rec);
757         }
758
759         /* wipe the hashes */
760         for (i=0;i<tdb->tdb1.header.hash_size;i++) {
761                 if (tdb1_ofs_write(tdb, TDB1_HASH_TOP(i), &offset) == -1) {
762                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
763                                    "tdb1_wipe_all: failed to write hash %d", i);
764                         goto failed;
765                 }
766         }
767
768         /* wipe the freelist */
769         if (tdb1_ofs_write(tdb, TDB1_FREELIST_TOP, &offset) == -1) {
770                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
771                            "tdb1_wipe_all: failed to write freelist");
772                 goto failed;
773         }
774
775         /* add all the rest of the file to the freelist, possibly leaving a gap
776            for the recovery area */
777         if (recovery_size == 0) {
778                 /* the simple case - the whole file can be used as a freelist */
779                 data_len = (tdb->file->map_size - TDB1_DATA_START(tdb->tdb1.header.hash_size));
780                 if (tdb1_free_region(tdb, TDB1_DATA_START(tdb->tdb1.header.hash_size), data_len) != 0) {
781                         goto failed;
782                 }
783         } else {
784                 /* we need to add two freelist entries - one on either
785                    side of the recovery area
786
787                    Note that we cannot shift the recovery area during
788                    this operation. Only the transaction.c code may
789                    move the recovery area or we risk subtle data
790                    corruption
791                 */
792                 data_len = (recovery_head - TDB1_DATA_START(tdb->tdb1.header.hash_size));
793                 if (tdb1_free_region(tdb, TDB1_DATA_START(tdb->tdb1.header.hash_size), data_len) != 0) {
794                         goto failed;
795                 }
796                 /* and the 2nd free list entry after the recovery area - if any */
797                 data_len = tdb->file->map_size - (recovery_head+recovery_size);
798                 if (tdb1_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
799                         goto failed;
800                 }
801         }
802
803         if (tdb1_unlockall(tdb) != 0) {
804                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
805                            "tdb1_wipe_all: failed to unlock");
806                 goto failed;
807         }
808
809         return 0;
810
811 failed:
812         tdb1_unlockall(tdb);
813         return -1;
814 }
815
816 struct traverse_state {
817         enum TDB_ERROR error;
818         struct tdb_context *dest_db;
819 };
820
821 /*
822   traverse function for repacking
823  */
824 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
825 {
826         struct traverse_state *state = (struct traverse_state *)private_data;
827         if (tdb1_store(state->dest_db, key, data, TDB_INSERT) != 0) {
828                 state->error = state->dest_db->last_error;
829                 return -1;
830         }
831         return 0;
832 }
833
834 /*
835   repack a tdb
836  */
837 int tdb1_repack(struct tdb_context *tdb)
838 {
839         struct tdb_context *tmp_db;
840         struct traverse_state state;
841         union tdb_attribute hsize;
842
843         hsize.base.attr = TDB_ATTRIBUTE_TDB1_HASHSIZE;
844         hsize.base.next = NULL;
845         hsize.tdb1_hashsize.hsize = tdb->tdb1.header.hash_size;
846
847         if (tdb1_transaction_start(tdb) != 0) {
848                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
849                            __location__ " Failed to start transaction");
850                 return -1;
851         }
852
853         tmp_db = tdb_open("tmpdb", TDB_INTERNAL, O_RDWR|O_CREAT, 0, &hsize);
854         if (tmp_db == NULL) {
855                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
856                                         __location__ " Failed to create tmp_db");
857                 tdb1_transaction_cancel(tdb);
858                 return -1;
859         }
860
861         state.error = TDB_SUCCESS;
862         state.dest_db = tmp_db;
863
864         if (tdb1_traverse_read(tdb, repack_traverse, &state) == -1) {
865                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
866                            __location__ " Failed to traverse copying out");
867                 tdb1_transaction_cancel(tdb);
868                 tdb_close(tmp_db);
869                 return -1;
870         }
871
872         if (state.error != TDB_SUCCESS) {
873                 tdb->last_error = tdb_logerr(tdb, state.error, TDB_LOG_ERROR,
874                                         __location__ " Error during traversal");
875                 tdb1_transaction_cancel(tdb);
876                 tdb_close(tmp_db);
877                 return -1;
878         }
879
880         if (tdb1_wipe_all(tdb) != 0) {
881                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
882                            __location__ " Failed to wipe database\n");
883                 tdb1_transaction_cancel(tdb);
884                 tdb_close(tmp_db);
885                 return -1;
886         }
887
888         state.error = TDB_SUCCESS;
889         state.dest_db = tdb;
890
891         if (tdb1_traverse_read(tmp_db, repack_traverse, &state) == -1) {
892                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
893                            __location__ " Failed to traverse copying back");
894                 tdb1_transaction_cancel(tdb);
895                 tdb_close(tmp_db);
896                 return -1;
897         }
898
899         if (state.error) {
900                 tdb->last_error = tdb_logerr(tdb, state.error, TDB_LOG_ERROR,
901                                         __location__ " Error during second traversal");
902                 tdb1_transaction_cancel(tdb);
903                 tdb_close(tmp_db);
904                 return -1;
905         }
906
907         tdb_close(tmp_db);
908
909         if (tdb1_transaction_commit(tdb) != 0) {
910                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
911                            __location__ " Failed to commit");
912                 return -1;
913         }
914
915         return 0;
916 }
917
918 /* Even on files, we can get partial writes due to signals. */
919 bool tdb1_write_all(int fd, const void *buf, size_t count)
920 {
921         while (count) {
922                 ssize_t ret;
923                 ret = write(fd, buf, count);
924                 if (ret < 0)
925                         return false;
926                 buf = (const char *)buf + ret;
927                 count -= ret;
928         }
929         return true;
930 }