]> git.ozlabs.org Git - ccan/blob - ccan/tdb/tdb.c
tdb: use TDB_RECOVERY_INVALID_MAGIC rather than 0
[ccan] / ccan / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39         
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_brlock(tdb, F_WRLCK, TDB_SEQNUM_OFS, 1,
63                        TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
64                 return;
65         }
66
67         tdb_increment_seqnum_nonblock(tdb);
68
69         tdb_brunlock(tdb, F_WRLCK, TDB_SEQNUM_OFS, 1);
70 }
71
72 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
73 {
74         return memcmp(data.dptr, key.dptr, data.dsize);
75 }
76
77 /* Returns 0 on fail.  On success, return offset of record, and fills
78    in rec */
79 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
80                         struct tdb_record *r)
81 {
82         tdb_off_t rec_ptr;
83         
84         /* read in the hash top */
85         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
86                 return 0;
87
88         /* keep looking until we find the right record */
89         while (rec_ptr) {
90                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
91                         return 0;
92
93                 if (!TDB_DEAD(r) && hash==r->full_hash
94                     && key.dsize==r->key_len
95                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
96                                       r->key_len, tdb_key_compare,
97                                       NULL) == 0) {
98                         return rec_ptr;
99                 }
100                 /* detect tight infinite loop */
101                 if (rec_ptr == r->next) {
102                         tdb->ecode = TDB_ERR_CORRUPT;
103                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
104                         return 0;
105                 }
106                 rec_ptr = r->next;
107         }
108         tdb->ecode = TDB_ERR_NOEXIST;
109         return 0;
110 }
111
112 /* As tdb_find, but if you succeed, keep the lock */
113 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
114                            struct tdb_record *rec)
115 {
116         uint32_t rec_ptr;
117
118         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
119                 return 0;
120         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
121                 tdb_unlock(tdb, BUCKET(hash), locktype);
122         return rec_ptr;
123 }
124
125 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
126
127 /* update an entry in place - this only works if the new data size
128    is <= the old data size and the key exists.
129    on failure return -1.
130 */
131 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
132 {
133         struct tdb_record rec;
134         tdb_off_t rec_ptr;
135
136         /* find entry */
137         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
138                 return -1;
139
140         /* it could be an exact duplicate of what is there - this is
141          * surprisingly common (eg. with a ldb re-index). */
142         if (rec.key_len == key.dsize && 
143             rec.data_len == dbuf.dsize &&
144             rec.full_hash == hash) {
145                 TDB_DATA data = _tdb_fetch(tdb, key);
146                 if (data.dsize == dbuf.dsize &&
147                     memcmp(data.dptr, dbuf.dptr, data.dsize) == 0) {
148                         if (data.dptr) {
149                                 free(data.dptr);
150                         }
151                         return 0;
152                 }
153                 if (data.dptr) {
154                         free(data.dptr);
155                 }
156         }
157          
158
159         /* must be long enough key, data and tailer */
160         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
161                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
162                 return -1;
163         }
164
165         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
166                       dbuf.dptr, dbuf.dsize) == -1)
167                 return -1;
168
169         if (dbuf.dsize != rec.data_len) {
170                 /* update size */
171                 rec.data_len = dbuf.dsize;
172                 return tdb_rec_write(tdb, rec_ptr, &rec);
173         }
174  
175         return 0;
176 }
177
178 /* find an entry in the database given a key */
179 /* If an entry doesn't exist tdb_err will be set to
180  * TDB_ERR_NOEXIST. If a key has no data attached
181  * then the TDB_DATA will have zero length but
182  * a non-zero pointer
183  */
184 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
185 {
186         tdb_off_t rec_ptr;
187         struct tdb_record rec;
188         TDB_DATA ret;
189         uint32_t hash;
190
191         /* find which hash bucket it is in */
192         hash = tdb->hash_fn(&key);
193         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
194                 return tdb_null;
195         }
196         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
197                                   rec.data_len);
198         ret.dsize = rec.data_len;
199         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
200         return ret;
201 }
202
203 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
204 {
205         TDB_DATA ret = _tdb_fetch(tdb, key);
206
207         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
208         return ret;
209 }
210
211 /*
212  * Find an entry in the database and hand the record's data to a parsing
213  * function. The parsing function is executed under the chain read lock, so it
214  * should be fast and should not block on other syscalls.
215  *
216  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
217  *
218  * For mmapped tdb's that do not have a transaction open it points the parsing
219  * function directly at the mmap area, it avoids the malloc/memcpy in this
220  * case. If a transaction is open or no mmap is available, it has to do
221  * malloc/read/parse/free.
222  *
223  * This is interesting for all readers of potentially large data structures in
224  * the tdb records, ldb indexes being one example.
225  */
226
227 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
228                      int (*parser)(TDB_DATA key, TDB_DATA data,
229                                    void *private_data),
230                      void *private_data)
231 {
232         tdb_off_t rec_ptr;
233         struct tdb_record rec;
234         int ret;
235         uint32_t hash;
236
237         /* find which hash bucket it is in */
238         hash = tdb->hash_fn(&key);
239
240         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
241                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key,
242                                    -TDB_ERR_NOEXIST);
243                 tdb->ecode = TDB_ERR_NOEXIST;
244                 return 0;
245         }
246         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
247
248         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
249                              rec.data_len, parser, private_data);
250
251         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
252
253         return ret;
254 }
255
256 /* check if an entry in the database exists 
257
258    note that 1 is returned if the key is found and 0 is returned if not found
259    this doesn't match the conventions in the rest of this module, but is
260    compatible with gdbm
261 */
262 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
263 {
264         struct tdb_record rec;
265         
266         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
267                 return 0;
268         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
269         return 1;
270 }
271
272 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
273 {
274         uint32_t hash = tdb->hash_fn(&key);
275         int ret;
276
277         ret = tdb_exists_hash(tdb, key, hash);
278         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
279         return ret;
280 }
281
282 /* actually delete an entry in the database given the offset */
283 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
284 {
285         tdb_off_t last_ptr, i;
286         struct tdb_record lastrec;
287
288         if (tdb->read_only || tdb->traverse_read) return -1;
289
290         if (tdb->traverse_write != 0 || 
291             tdb_write_lock_record(tdb, rec_ptr) == -1) {
292                 /* Someone traversing here: mark it as dead */
293                 rec->magic = TDB_DEAD_MAGIC;
294                 return tdb_rec_write(tdb, rec_ptr, rec);
295         }
296         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
297                 return -1;
298
299         /* find previous record in hash chain */
300         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
301                 return -1;
302         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
303                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
304                         return -1;
305
306         /* unlink it: next ptr is at start of record. */
307         if (last_ptr == 0)
308                 last_ptr = TDB_HASH_TOP(rec->full_hash);
309         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
310                 return -1;
311
312         /* recover the space */
313         if (tdb_free(tdb, rec_ptr, rec) == -1)
314                 return -1;
315         return 0;
316 }
317
318 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
319 {
320         int res = 0;
321         tdb_off_t rec_ptr;
322         struct tdb_record rec;
323         
324         /* read in the hash top */
325         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
326                 return 0;
327
328         while (rec_ptr) {
329                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
330                         return 0;
331
332                 if (rec.magic == TDB_DEAD_MAGIC) {
333                         res += 1;
334                 }
335                 rec_ptr = rec.next;
336         }
337         return res;
338 }
339
340 /*
341  * Purge all DEAD records from a hash chain
342  */
343 static int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
344 {
345         int res = -1;
346         struct tdb_record rec;
347         tdb_off_t rec_ptr;
348
349         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
350                 return -1;
351         }
352         
353         /* read in the hash top */
354         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
355                 goto fail;
356
357         while (rec_ptr) {
358                 tdb_off_t next;
359
360                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
361                         goto fail;
362                 }
363
364                 next = rec.next;
365
366                 if (rec.magic == TDB_DEAD_MAGIC
367                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
368                         goto fail;
369                 }
370                 rec_ptr = next;
371         }
372         res = 0;
373  fail:
374         tdb_unlock(tdb, -1, F_WRLCK);
375         return res;
376 }
377
378 /* delete an entry in the database given a key */
379 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
380 {
381         tdb_off_t rec_ptr;
382         struct tdb_record rec;
383         int ret;
384
385         if (tdb->max_dead_records != 0) {
386
387                 /*
388                  * Allow for some dead records per hash chain, mainly for
389                  * tdb's with a very high create/delete rate like locking.tdb.
390                  */
391
392                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
393                         return -1;
394
395                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
396                         /*
397                          * Don't let the per-chain freelist grow too large,
398                          * delete all existing dead records
399                          */
400                         tdb_purge_dead(tdb, hash);
401                 }
402
403                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
404                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
405                         return -1;
406                 }
407
408                 /*
409                  * Just mark the record as dead.
410                  */
411                 rec.magic = TDB_DEAD_MAGIC;
412                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
413         }
414         else {
415                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
416                                                    &rec)))
417                         return -1;
418
419                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
420         }
421
422         if (ret == 0) {
423                 tdb_increment_seqnum(tdb);
424         }
425
426         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
427                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
428         return ret;
429 }
430
431 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
432 {
433         uint32_t hash = tdb->hash_fn(&key);
434         int ret;
435
436         ret = tdb_delete_hash(tdb, key, hash);
437         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
438         return ret;
439 }
440
441 /*
442  * See if we have a dead record around with enough space
443  */
444 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
445                                struct tdb_record *r, tdb_len_t length)
446 {
447         tdb_off_t rec_ptr;
448         
449         /* read in the hash top */
450         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
451                 return 0;
452
453         /* keep looking until we find the right record */
454         while (rec_ptr) {
455                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
456                         return 0;
457
458                 if (TDB_DEAD(r) && r->rec_len >= length) {
459                         /*
460                          * First fit for simple coding, TODO: change to best
461                          * fit
462                          */
463                         return rec_ptr;
464                 }
465                 rec_ptr = r->next;
466         }
467         return 0;
468 }
469
470 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
471                       TDB_DATA dbuf, int flag, uint32_t hash)
472 {
473         struct tdb_record rec;
474         tdb_off_t rec_ptr;
475         char *p = NULL;
476         int ret = -1;
477
478         /* check for it existing, on insert. */
479         if (flag == TDB_INSERT) {
480                 if (tdb_exists_hash(tdb, key, hash)) {
481                         tdb->ecode = TDB_ERR_EXISTS;
482                         goto fail;
483                 }
484         } else {
485                 /* first try in-place update, on modify or replace. */
486                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
487                         goto done;
488                 }
489                 if (tdb->ecode == TDB_ERR_NOEXIST &&
490                     flag == TDB_MODIFY) {
491                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
492                          we should fail the store */
493                         goto fail;
494                 }
495         }
496         /* reset the error code potentially set by the tdb_update() */
497         tdb->ecode = TDB_SUCCESS;
498
499         /* delete any existing record - if it doesn't exist we don't
500            care.  Doing this first reduces fragmentation, and avoids
501            coalescing with `allocated' block before it's updated. */
502         if (flag != TDB_INSERT)
503                 tdb_delete_hash(tdb, key, hash);
504
505         /* Copy key+value *before* allocating free space in case malloc
506            fails and we are left with a dead spot in the tdb. */
507
508         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
509                 tdb->ecode = TDB_ERR_OOM;
510                 goto fail;
511         }
512
513         memcpy(p, key.dptr, key.dsize);
514         if (dbuf.dsize)
515                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
516
517         if (tdb->max_dead_records != 0) {
518                 /*
519                  * Allow for some dead records per hash chain, look if we can
520                  * find one that can hold the new record. We need enough space
521                  * for key, data and tailer. If we find one, we don't have to
522                  * consult the central freelist.
523                  */
524                 rec_ptr = tdb_find_dead(
525                         tdb, hash, &rec,
526                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
527
528                 if (rec_ptr != 0) {
529                         rec.key_len = key.dsize;
530                         rec.data_len = dbuf.dsize;
531                         rec.full_hash = hash;
532                         rec.magic = TDB_MAGIC;
533                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
534                             || tdb->methods->tdb_write(
535                                     tdb, rec_ptr + sizeof(rec),
536                                     p, key.dsize + dbuf.dsize) == -1) {
537                                 goto fail;
538                         }
539                         goto done;
540                 }
541         }
542
543         /*
544          * We have to allocate some space from the freelist, so this means we
545          * have to lock it. Use the chance to purge all the DEAD records from
546          * the hash chain under the freelist lock.
547          */
548
549         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
550                 goto fail;
551         }
552
553         if ((tdb->max_dead_records != 0)
554             && (tdb_purge_dead(tdb, hash) == -1)) {
555                 tdb_unlock(tdb, -1, F_WRLCK);
556                 goto fail;
557         }
558
559         /* we have to allocate some space */
560         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
561
562         tdb_unlock(tdb, -1, F_WRLCK);
563
564         if (rec_ptr == 0) {
565                 goto fail;
566         }
567
568         /* Read hash top into next ptr */
569         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
570                 goto fail;
571
572         rec.key_len = key.dsize;
573         rec.data_len = dbuf.dsize;
574         rec.full_hash = hash;
575         rec.magic = TDB_MAGIC;
576
577         /* write out and point the top of the hash chain at it */
578         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
579             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
580             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
581                 /* Need to tdb_unallocate() here */
582                 goto fail;
583         }
584
585  done:
586         ret = 0;
587  fail:
588         if (ret == 0) {
589                 tdb_increment_seqnum(tdb);
590         }
591
592         SAFE_FREE(p); 
593         return ret;
594 }
595
596 /* store an element in the database, replacing any existing element
597    with the same key 
598
599    return 0 on success, -1 on failure
600 */
601 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
602 {
603         uint32_t hash;
604         int ret;
605
606         if (tdb->read_only || tdb->traverse_read) {
607                 tdb->ecode = TDB_ERR_RDONLY;
608                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag,
609                                         -TDB_ERR_RDONLY);
610                 return -1;
611         }
612
613         /* find which hash bucket it is in */
614         hash = tdb->hash_fn(&key);
615         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
616                 return -1;
617
618         ret = _tdb_store(tdb, key, dbuf, flag, hash);
619         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
620         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
621         return ret;
622 }
623
624
625 /* Append to an entry. Create if not exist. */
626 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
627 {
628         uint32_t hash;
629         TDB_DATA dbuf;
630         int ret = -1;
631
632         /* find which hash bucket it is in */
633         hash = tdb->hash_fn(&key);
634         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
635                 return -1;
636
637         dbuf = _tdb_fetch(tdb, key);
638
639         if (dbuf.dptr == NULL) {
640                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
641         } else {
642                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
643                 unsigned char *new_dptr;
644
645                 /* realloc '0' is special: don't do that. */
646                 if (new_len == 0)
647                         new_len = 1;
648                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
649                 if (new_dptr == NULL) {
650                         free(dbuf.dptr);
651                 }
652                 dbuf.dptr = new_dptr;
653         }
654
655         if (dbuf.dptr == NULL) {
656                 tdb->ecode = TDB_ERR_OOM;
657                 goto failed;
658         }
659
660         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
661         dbuf.dsize += new_dbuf.dsize;
662
663         ret = _tdb_store(tdb, key, dbuf, 0, hash);
664         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
665         
666 failed:
667         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
668         SAFE_FREE(dbuf.dptr);
669         return ret;
670 }
671
672
673 /*
674   return the name of the current tdb file
675   useful for external logging functions
676 */
677 const char *tdb_name(struct tdb_context *tdb)
678 {
679         return tdb->name;
680 }
681
682 /*
683   return the underlying file descriptor being used by tdb, or -1
684   useful for external routines that want to check the device/inode
685   of the fd
686 */
687 int tdb_fd(struct tdb_context *tdb)
688 {
689         return tdb->fd;
690 }
691
692 /*
693   return the current logging function
694   useful for external tdb routines that wish to log tdb errors
695 */
696 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
697 {
698         return tdb->log.log_fn;
699 }
700
701
702 /*
703   get the tdb sequence number. Only makes sense if the writers opened
704   with TDB_SEQNUM set. Note that this sequence number will wrap quite
705   quickly, so it should only be used for a 'has something changed'
706   test, not for code that relies on the count of the number of changes
707   made. If you want a counter then use a tdb record.
708
709   The aim of this sequence number is to allow for a very lightweight
710   test of a possible tdb change.
711 */
712 int tdb_get_seqnum(struct tdb_context *tdb)
713 {
714         tdb_off_t seqnum=0;
715
716         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
717         tdb_trace_ret(tdb, "tdb_get_seqnum", seqnum);
718         return seqnum;
719 }
720
721 int tdb_hash_size(struct tdb_context *tdb)
722 {
723         return tdb->header.hash_size;
724 }
725
726 size_t tdb_map_size(struct tdb_context *tdb)
727 {
728         return tdb->map_size;
729 }
730
731 int tdb_get_flags(struct tdb_context *tdb)
732 {
733         return tdb->flags;
734 }
735
736 void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
737 {
738         if ((flags & TDB_ALLOW_NESTING) &&
739             (flags & TDB_DISALLOW_NESTING)) {
740                 tdb->ecode = TDB_ERR_NESTING;
741                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
742                         "allow_nesting and disallow_nesting are not allowed together!"));
743                 return;
744         }
745
746         if (flags & TDB_ALLOW_NESTING) {
747                 tdb->flags &= ~TDB_DISALLOW_NESTING;
748         }
749         if (flags & TDB_DISALLOW_NESTING) {
750                 tdb->flags &= ~TDB_ALLOW_NESTING;
751         }
752
753         tdb->flags |= flags;
754 }
755
756 void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
757 {
758         if ((flags & TDB_ALLOW_NESTING) &&
759             (flags & TDB_DISALLOW_NESTING)) {
760                 tdb->ecode = TDB_ERR_NESTING;
761                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
762                         "allow_nesting and disallow_nesting are not allowed together!"));
763                 return;
764         }
765
766         if (flags & TDB_ALLOW_NESTING) {
767                 tdb->flags |= TDB_DISALLOW_NESTING;
768         }
769         if (flags & TDB_DISALLOW_NESTING) {
770                 tdb->flags |= TDB_ALLOW_NESTING;
771         }
772
773         tdb->flags &= ~flags;
774 }
775
776
777 /*
778   enable sequence number handling on an open tdb
779 */
780 void tdb_enable_seqnum(struct tdb_context *tdb)
781 {
782         tdb->flags |= TDB_SEQNUM;
783 }
784
785
786 /*
787   add a region of the file to the freelist. Length is the size of the region in bytes, 
788   which includes the free list header that needs to be added
789  */
790 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
791 {
792         struct tdb_record rec;
793         if (length <= sizeof(rec)) {
794                 /* the region is not worth adding */
795                 return 0;
796         }
797         if (length + offset > tdb->map_size) {
798                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
799                 return -1;              
800         }
801         memset(&rec,'\0',sizeof(rec));
802         rec.rec_len = length - sizeof(rec);
803         if (tdb_free(tdb, offset, &rec) == -1) {
804                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
805                 return -1;
806         }
807         return 0;
808 }
809
810 /*
811   wipe the entire database, deleting all records. This can be done
812   very fast by using a global lock. The entire data portion of the
813   file becomes a single entry in the freelist.
814
815   This code carefully steps around the recovery area, leaving it alone
816  */
817 int tdb_wipe_all(struct tdb_context *tdb)
818 {
819         int i;
820         tdb_off_t offset = 0;
821         ssize_t data_len;
822         tdb_off_t recovery_head;
823         tdb_len_t recovery_size = 0;
824
825         if (tdb_lockall(tdb) != 0) {
826                 return -1;
827         }
828
829         tdb_trace(tdb, "tdb_wipe_all");
830
831         /* see if the tdb has a recovery area, and remember its size
832            if so. We don't want to lose this as otherwise each
833            tdb_wipe_all() in a transaction will increase the size of
834            the tdb by the size of the recovery area */
835         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
836                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
837                 goto failed;
838         }
839
840         if (recovery_head != 0) {
841                 struct tdb_record rec;
842                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
843                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
844                         return -1;
845                 }       
846                 recovery_size = rec.rec_len + sizeof(rec);
847         }
848
849         /* wipe the hashes */
850         for (i=0;i<tdb->header.hash_size;i++) {
851                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
852                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
853                         goto failed;
854                 }
855         }
856
857         /* wipe the freelist */
858         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
859                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
860                 goto failed;
861         }
862
863         /* add all the rest of the file to the freelist, possibly leaving a gap 
864            for the recovery area */
865         if (recovery_size == 0) {
866                 /* the simple case - the whole file can be used as a freelist */
867                 data_len = (tdb->map_size - TDB_DATA_START(tdb->header.hash_size));
868                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
869                         goto failed;
870                 }
871         } else {
872                 /* we need to add two freelist entries - one on either
873                    side of the recovery area 
874
875                    Note that we cannot shift the recovery area during
876                    this operation. Only the transaction.c code may
877                    move the recovery area or we risk subtle data
878                    corruption
879                 */
880                 data_len = (recovery_head - TDB_DATA_START(tdb->header.hash_size));
881                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
882                         goto failed;
883                 }
884                 /* and the 2nd free list entry after the recovery area - if any */
885                 data_len = tdb->map_size - (recovery_head+recovery_size);
886                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
887                         goto failed;
888                 }
889         }
890
891         if (tdb_unlockall(tdb) != 0) {
892                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
893                 goto failed;
894         }
895
896         return 0;
897
898 failed:
899         tdb_unlockall(tdb);
900         return -1;
901 }
902
903
904 struct traverse_state {
905         bool error;
906         struct tdb_context *dest_db;
907 };
908
909 /*
910   traverse function for repacking
911  */
912 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
913 {
914         struct traverse_state *state = (struct traverse_state *)private;
915         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
916                 state->error = true;
917                 return -1;
918         }
919         return 0;
920 }
921
922 /*
923   repack a tdb
924  */
925 int tdb_repack(struct tdb_context *tdb)
926 {
927         struct tdb_context *tmp_db;
928         struct traverse_state state;
929
930         tdb_trace(tdb, "tdb_repack");
931
932         if (tdb_transaction_start(tdb) != 0) {
933                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to start transaction\n"));
934                 return -1;
935         }
936
937         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
938         if (tmp_db == NULL) {
939                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to create tmp_db\n"));
940                 tdb_transaction_cancel(tdb);
941                 return -1;
942         }
943
944         state.error = false;
945         state.dest_db = tmp_db;
946
947         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
948                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to traverse copying out\n"));
949                 tdb_transaction_cancel(tdb);
950                 tdb_close(tmp_db);
951                 return -1;              
952         }
953
954         if (state.error) {
955                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Error during traversal\n"));
956                 tdb_transaction_cancel(tdb);
957                 tdb_close(tmp_db);
958                 return -1;
959         }
960
961         if (tdb_wipe_all(tdb) != 0) {
962                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to wipe database\n"));
963                 tdb_transaction_cancel(tdb);
964                 tdb_close(tmp_db);
965                 return -1;
966         }
967
968         state.error = false;
969         state.dest_db = tdb;
970
971         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
972                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to traverse copying back\n"));
973                 tdb_transaction_cancel(tdb);
974                 tdb_close(tmp_db);
975                 return -1;              
976         }
977
978         if (state.error) {
979                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Error during second traversal\n"));
980                 tdb_transaction_cancel(tdb);
981                 tdb_close(tmp_db);
982                 return -1;
983         }
984
985         tdb_close(tmp_db);
986
987         if (tdb_transaction_commit(tdb) != 0) {
988                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to commit\n"));
989                 return -1;
990         }
991
992         return 0;
993 }
994
995 #ifdef TDB_TRACE
996 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
997 {
998         if (write(tdb->tracefd, str, strlen(str)) != strlen(str)) {
999                 close(tdb->tracefd);
1000                 tdb->tracefd = -1;
1001         }
1002 }
1003
1004 static void tdb_trace_start(struct tdb_context *tdb)
1005 {
1006         tdb_off_t seqnum=0;
1007         char msg[sizeof(tdb_off_t) * 4];
1008
1009         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1010         sprintf(msg, "%u ", seqnum);
1011         tdb_trace_write(tdb, msg);
1012 }
1013
1014 static void tdb_trace_end(struct tdb_context *tdb)
1015 {
1016         tdb_trace_write(tdb, "\n");
1017 }
1018
1019 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1020 {
1021         char msg[sizeof(ret) * 4];
1022         sprintf(msg, " = %i\n", ret);
1023         tdb_trace_write(tdb, msg);
1024 }
1025
1026 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1027 {
1028         char msg[20 + rec.dsize*2], *p;
1029         unsigned int i;
1030
1031         /* We differentiate zero-length records from non-existent ones. */
1032         if (rec.dptr == NULL) {
1033                 tdb_trace_write(tdb, " NULL");
1034                 return;
1035         }
1036
1037         p = msg;
1038         p += sprintf(p, " %zu:", rec.dsize);
1039         for (i = 0; i < rec.dsize; i++)
1040                 p += sprintf(p, "%02x", rec.dptr[i]);
1041
1042         tdb_trace_write(tdb, msg);
1043 }
1044
1045 void tdb_trace(struct tdb_context *tdb, const char *op)
1046 {
1047         tdb_trace_start(tdb);
1048         tdb_trace_write(tdb, op);
1049         tdb_trace_end(tdb);
1050 }
1051
1052 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1053 {
1054         char msg[sizeof(tdb_off_t) * 4];
1055
1056         sprintf(msg, "%u ", seqnum);
1057         tdb_trace_write(tdb, msg);
1058         tdb_trace_write(tdb, op);
1059         tdb_trace_end(tdb);
1060 }
1061
1062 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1063                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1064 {
1065         char msg[128];
1066
1067         sprintf(msg, "%s %u %#x %#x", op, hash_size, tdb_flags, open_flags);
1068         tdb_trace_start(tdb);
1069         tdb_trace_write(tdb, msg);
1070         tdb_trace_end(tdb);
1071 }
1072
1073 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1074 {
1075         tdb_trace_start(tdb);
1076         tdb_trace_write(tdb, op);
1077         tdb_trace_end_ret(tdb, ret);
1078 }
1079
1080 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1081 {
1082         tdb_trace_start(tdb);
1083         tdb_trace_write(tdb, op);
1084         tdb_trace_write(tdb, " =");
1085         tdb_trace_record(tdb, ret);
1086         tdb_trace_end(tdb);
1087 }
1088
1089 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1090                     TDB_DATA rec)
1091 {
1092         tdb_trace_start(tdb);
1093         tdb_trace_write(tdb, op);
1094         tdb_trace_record(tdb, rec);
1095         tdb_trace_end(tdb);
1096 }
1097
1098 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1099                         TDB_DATA rec, int ret)
1100 {
1101         tdb_trace_start(tdb);
1102         tdb_trace_write(tdb, op);
1103         tdb_trace_record(tdb, rec);
1104         tdb_trace_end_ret(tdb, ret);
1105 }
1106
1107 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1108                            TDB_DATA rec, TDB_DATA ret)
1109 {
1110         tdb_trace_start(tdb);
1111         tdb_trace_write(tdb, op);
1112         tdb_trace_record(tdb, rec);
1113         tdb_trace_write(tdb, " =");
1114         tdb_trace_record(tdb, ret);
1115         tdb_trace_end(tdb);
1116 }
1117
1118 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1119                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1120                              int ret)
1121 {
1122         char msg[sizeof(ret) * 4];
1123
1124         sprintf(msg, " %#x", flag); 
1125         tdb_trace_start(tdb);
1126         tdb_trace_write(tdb, op);
1127         tdb_trace_record(tdb, rec1);
1128         tdb_trace_record(tdb, rec2);
1129         tdb_trace_write(tdb, msg);
1130         tdb_trace_end_ret(tdb, ret);
1131 }
1132
1133 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1134                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1135 {
1136         tdb_trace_start(tdb);
1137         tdb_trace_write(tdb, op);
1138         tdb_trace_record(tdb, rec1);
1139         tdb_trace_record(tdb, rec2);
1140         tdb_trace_write(tdb, " =");
1141         tdb_trace_record(tdb, ret);
1142         tdb_trace_end(tdb);
1143 }
1144 #endif