Import 470750fa2e3cf987f10de48451b1ee13aab03907 from ctdb:
[ccan] / ccan / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39         
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_brlock(tdb, F_WRLCK, TDB_SEQNUM_OFS, 1,
63                        TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
64                 return;
65         }
66
67         tdb_increment_seqnum_nonblock(tdb);
68
69         tdb_brunlock(tdb, F_WRLCK, TDB_SEQNUM_OFS, 1);
70 }
71
72 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
73 {
74         return memcmp(data.dptr, key.dptr, data.dsize);
75 }
76
77 /* Returns 0 on fail.  On success, return offset of record, and fills
78    in rec */
79 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
80                         struct tdb_record *r)
81 {
82         tdb_off_t rec_ptr;
83         
84         /* read in the hash top */
85         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
86                 return 0;
87
88         /* keep looking until we find the right record */
89         while (rec_ptr) {
90                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
91                         return 0;
92
93                 if (!TDB_DEAD(r) && hash==r->full_hash
94                     && key.dsize==r->key_len
95                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
96                                       r->key_len, tdb_key_compare,
97                                       NULL) == 0) {
98                         return rec_ptr;
99                 }
100                 /* detect tight infinite loop */
101                 if (rec_ptr == r->next) {
102                         tdb->ecode = TDB_ERR_CORRUPT;
103                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
104                         return 0;
105                 }
106                 rec_ptr = r->next;
107         }
108         tdb->ecode = TDB_ERR_NOEXIST;
109         return 0;
110 }
111
112 /* As tdb_find, but if you succeed, keep the lock */
113 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
114                            struct tdb_record *rec)
115 {
116         uint32_t rec_ptr;
117
118         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
119                 return 0;
120         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
121                 tdb_unlock(tdb, BUCKET(hash), locktype);
122         return rec_ptr;
123 }
124
125 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
126
127 /* update an entry in place - this only works if the new data size
128    is <= the old data size and the key exists.
129    on failure return -1.
130 */
131 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
132 {
133         struct tdb_record rec;
134         tdb_off_t rec_ptr;
135
136         /* find entry */
137         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
138                 return -1;
139
140         /* it could be an exact duplicate of what is there - this is
141          * surprisingly common (eg. with a ldb re-index). */
142         if (rec.key_len == key.dsize && 
143             rec.data_len == dbuf.dsize &&
144             rec.full_hash == hash) {
145                 TDB_DATA data = _tdb_fetch(tdb, key);
146                 if (data.dsize == dbuf.dsize &&
147                     memcmp(data.dptr, dbuf.dptr, data.dsize) == 0) {
148                         if (data.dptr) {
149                                 free(data.dptr);
150                         }
151                         return 0;
152                 }
153                 if (data.dptr) {
154                         free(data.dptr);
155                 }
156         }
157          
158
159         /* must be long enough key, data and tailer */
160         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
161                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
162                 return -1;
163         }
164
165         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
166                       dbuf.dptr, dbuf.dsize) == -1)
167                 return -1;
168
169         if (dbuf.dsize != rec.data_len) {
170                 /* update size */
171                 rec.data_len = dbuf.dsize;
172                 return tdb_rec_write(tdb, rec_ptr, &rec);
173         }
174  
175         return 0;
176 }
177
178 /* find an entry in the database given a key */
179 /* If an entry doesn't exist tdb_err will be set to
180  * TDB_ERR_NOEXIST. If a key has no data attached
181  * then the TDB_DATA will have zero length but
182  * a non-zero pointer
183  */
184 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
185 {
186         tdb_off_t rec_ptr;
187         struct tdb_record rec;
188         TDB_DATA ret;
189         uint32_t hash;
190
191         /* find which hash bucket it is in */
192         hash = tdb->hash_fn(&key);
193         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
194                 return tdb_null;
195         }
196         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
197                                   rec.data_len);
198         ret.dsize = rec.data_len;
199         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
200         return ret;
201 }
202
203 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
204 {
205         TDB_DATA ret = _tdb_fetch(tdb, key);
206
207         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
208         return ret;
209 }
210
211 /*
212  * Find an entry in the database and hand the record's data to a parsing
213  * function. The parsing function is executed under the chain read lock, so it
214  * should be fast and should not block on other syscalls.
215  *
216  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
217  *
218  * For mmapped tdb's that do not have a transaction open it points the parsing
219  * function directly at the mmap area, it avoids the malloc/memcpy in this
220  * case. If a transaction is open or no mmap is available, it has to do
221  * malloc/read/parse/free.
222  *
223  * This is interesting for all readers of potentially large data structures in
224  * the tdb records, ldb indexes being one example.
225  */
226
227 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
228                      int (*parser)(TDB_DATA key, TDB_DATA data,
229                                    void *private_data),
230                      void *private_data)
231 {
232         tdb_off_t rec_ptr;
233         struct tdb_record rec;
234         int ret;
235         uint32_t hash;
236
237         /* find which hash bucket it is in */
238         hash = tdb->hash_fn(&key);
239
240         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
241                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key,
242                                    -TDB_ERR_NOEXIST);
243                 tdb->ecode = TDB_ERR_NOEXIST;
244                 return 0;
245         }
246         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
247
248         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
249                              rec.data_len, parser, private_data);
250
251         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
252
253         return ret;
254 }
255
256 /* check if an entry in the database exists 
257
258    note that 1 is returned if the key is found and 0 is returned if not found
259    this doesn't match the conventions in the rest of this module, but is
260    compatible with gdbm
261 */
262 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
263 {
264         struct tdb_record rec;
265         
266         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
267                 return 0;
268         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
269         return 1;
270 }
271
272 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
273 {
274         uint32_t hash = tdb->hash_fn(&key);
275         int ret;
276
277         ret = tdb_exists_hash(tdb, key, hash);
278         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
279         return ret;
280 }
281
282 /* actually delete an entry in the database given the offset */
283 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
284 {
285         tdb_off_t last_ptr, i;
286         struct tdb_record lastrec;
287
288         if (tdb->read_only || tdb->traverse_read) return -1;
289
290         if (tdb->traverse_write != 0 || 
291             tdb_write_lock_record(tdb, rec_ptr) == -1) {
292                 /* Someone traversing here: mark it as dead */
293                 rec->magic = TDB_DEAD_MAGIC;
294                 return tdb_rec_write(tdb, rec_ptr, rec);
295         }
296         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
297                 return -1;
298
299         /* find previous record in hash chain */
300         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
301                 return -1;
302         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
303                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
304                         return -1;
305
306         /* unlink it: next ptr is at start of record. */
307         if (last_ptr == 0)
308                 last_ptr = TDB_HASH_TOP(rec->full_hash);
309         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
310                 return -1;
311
312         /* recover the space */
313         if (tdb_free(tdb, rec_ptr, rec) == -1)
314                 return -1;
315         return 0;
316 }
317
318 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
319 {
320         int res = 0;
321         tdb_off_t rec_ptr;
322         struct tdb_record rec;
323         
324         /* read in the hash top */
325         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
326                 return 0;
327
328         while (rec_ptr) {
329                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
330                         return 0;
331
332                 if (rec.magic == TDB_DEAD_MAGIC) {
333                         res += 1;
334                 }
335                 rec_ptr = rec.next;
336         }
337         return res;
338 }
339
340 /*
341  * Purge all DEAD records from a hash chain
342  */
343 static int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
344 {
345         int res = -1;
346         struct tdb_record rec;
347         tdb_off_t rec_ptr;
348
349         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
350                 return -1;
351         }
352         
353         /* read in the hash top */
354         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
355                 goto fail;
356
357         while (rec_ptr) {
358                 tdb_off_t next;
359
360                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
361                         goto fail;
362                 }
363
364                 next = rec.next;
365
366                 if (rec.magic == TDB_DEAD_MAGIC
367                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
368                         goto fail;
369                 }
370                 rec_ptr = next;
371         }
372         res = 0;
373  fail:
374         tdb_unlock(tdb, -1, F_WRLCK);
375         return res;
376 }
377
378 /* delete an entry in the database given a key */
379 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
380 {
381         tdb_off_t rec_ptr;
382         struct tdb_record rec;
383         int ret;
384
385         if (tdb->max_dead_records != 0) {
386
387                 /*
388                  * Allow for some dead records per hash chain, mainly for
389                  * tdb's with a very high create/delete rate like locking.tdb.
390                  */
391
392                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
393                         return -1;
394
395                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
396                         /*
397                          * Don't let the per-chain freelist grow too large,
398                          * delete all existing dead records
399                          */
400                         tdb_purge_dead(tdb, hash);
401                 }
402
403                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
404                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
405                         return -1;
406                 }
407
408                 /*
409                  * Just mark the record as dead.
410                  */
411                 rec.magic = TDB_DEAD_MAGIC;
412                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
413         }
414         else {
415                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
416                                                    &rec)))
417                         return -1;
418
419                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
420         }
421
422         if (ret == 0) {
423                 tdb_increment_seqnum(tdb);
424         }
425
426         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
427                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
428         return ret;
429 }
430
431 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
432 {
433         uint32_t hash = tdb->hash_fn(&key);
434         int ret;
435
436         ret = tdb_delete_hash(tdb, key, hash);
437         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
438         return ret;
439 }
440
441 /*
442  * See if we have a dead record around with enough space
443  */
444 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
445                                struct tdb_record *r, tdb_len_t length)
446 {
447         tdb_off_t rec_ptr;
448         
449         /* read in the hash top */
450         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
451                 return 0;
452
453         /* keep looking until we find the right record */
454         while (rec_ptr) {
455                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
456                         return 0;
457
458                 if (TDB_DEAD(r) && r->rec_len >= length) {
459                         /*
460                          * First fit for simple coding, TODO: change to best
461                          * fit
462                          */
463                         return rec_ptr;
464                 }
465                 rec_ptr = r->next;
466         }
467         return 0;
468 }
469
470 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
471                       TDB_DATA dbuf, int flag, uint32_t hash)
472 {
473         struct tdb_record rec;
474         tdb_off_t rec_ptr;
475         char *p = NULL;
476         int ret = -1;
477
478         /* check for it existing, on insert. */
479         if (flag == TDB_INSERT) {
480                 if (tdb_exists_hash(tdb, key, hash)) {
481                         tdb->ecode = TDB_ERR_EXISTS;
482                         goto fail;
483                 }
484         } else {
485                 /* first try in-place update, on modify or replace. */
486                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
487                         goto done;
488                 }
489                 if (tdb->ecode == TDB_ERR_NOEXIST &&
490                     flag == TDB_MODIFY) {
491                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
492                          we should fail the store */
493                         goto fail;
494                 }
495         }
496         /* reset the error code potentially set by the tdb_update() */
497         tdb->ecode = TDB_SUCCESS;
498
499         /* delete any existing record - if it doesn't exist we don't
500            care.  Doing this first reduces fragmentation, and avoids
501            coalescing with `allocated' block before it's updated. */
502         if (flag != TDB_INSERT)
503                 tdb_delete_hash(tdb, key, hash);
504
505         /* Copy key+value *before* allocating free space in case malloc
506            fails and we are left with a dead spot in the tdb. */
507
508         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
509                 tdb->ecode = TDB_ERR_OOM;
510                 goto fail;
511         }
512
513         memcpy(p, key.dptr, key.dsize);
514         if (dbuf.dsize)
515                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
516
517         if (tdb->max_dead_records != 0) {
518                 /*
519                  * Allow for some dead records per hash chain, look if we can
520                  * find one that can hold the new record. We need enough space
521                  * for key, data and tailer. If we find one, we don't have to
522                  * consult the central freelist.
523                  */
524                 rec_ptr = tdb_find_dead(
525                         tdb, hash, &rec,
526                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
527
528                 if (rec_ptr != 0) {
529                         rec.key_len = key.dsize;
530                         rec.data_len = dbuf.dsize;
531                         rec.full_hash = hash;
532                         rec.magic = TDB_MAGIC;
533                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
534                             || tdb->methods->tdb_write(
535                                     tdb, rec_ptr + sizeof(rec),
536                                     p, key.dsize + dbuf.dsize) == -1) {
537                                 goto fail;
538                         }
539                         goto done;
540                 }
541         }
542
543         /*
544          * We have to allocate some space from the freelist, so this means we
545          * have to lock it. Use the chance to purge all the DEAD records from
546          * the hash chain under the freelist lock.
547          */
548
549         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
550                 goto fail;
551         }
552
553         if ((tdb->max_dead_records != 0)
554             && (tdb_purge_dead(tdb, hash) == -1)) {
555                 tdb_unlock(tdb, -1, F_WRLCK);
556                 goto fail;
557         }
558
559         /* we have to allocate some space */
560         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
561
562         tdb_unlock(tdb, -1, F_WRLCK);
563
564         if (rec_ptr == 0) {
565                 goto fail;
566         }
567
568         /* Read hash top into next ptr */
569         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
570                 goto fail;
571
572         rec.key_len = key.dsize;
573         rec.data_len = dbuf.dsize;
574         rec.full_hash = hash;
575         rec.magic = TDB_MAGIC;
576
577         /* write out and point the top of the hash chain at it */
578         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
579             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
580             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
581                 /* Need to tdb_unallocate() here */
582                 goto fail;
583         }
584
585  done:
586         ret = 0;
587  fail:
588         if (ret == 0) {
589                 tdb_increment_seqnum(tdb);
590         }
591
592         SAFE_FREE(p); 
593         return ret;
594 }
595
596 /* store an element in the database, replacing any existing element
597    with the same key 
598
599    return 0 on success, -1 on failure
600 */
601 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
602 {
603         uint32_t hash;
604         int ret;
605
606         if (tdb->read_only || tdb->traverse_read) {
607                 tdb->ecode = TDB_ERR_RDONLY;
608                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag,
609                                         -TDB_ERR_RDONLY);
610                 return -1;
611         }
612
613         /* find which hash bucket it is in */
614         hash = tdb->hash_fn(&key);
615         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
616                 return -1;
617
618         ret = _tdb_store(tdb, key, dbuf, flag, hash);
619         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
620         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
621         return ret;
622 }
623
624
625 /* Append to an entry. Create if not exist. */
626 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
627 {
628         uint32_t hash;
629         TDB_DATA dbuf;
630         int ret = -1;
631
632         /* find which hash bucket it is in */
633         hash = tdb->hash_fn(&key);
634         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
635                 return -1;
636
637         dbuf = _tdb_fetch(tdb, key);
638
639         if (dbuf.dptr == NULL) {
640                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
641         } else {
642                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
643                 unsigned char *new_dptr;
644
645                 /* realloc '0' is special: don't do that. */
646                 if (new_len == 0)
647                         new_len = 1;
648                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
649                 if (new_dptr == NULL) {
650                         free(dbuf.dptr);
651                 }
652                 dbuf.dptr = new_dptr;
653         }
654
655         if (dbuf.dptr == NULL) {
656                 tdb->ecode = TDB_ERR_OOM;
657                 goto failed;
658         }
659
660         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
661         dbuf.dsize += new_dbuf.dsize;
662
663         ret = _tdb_store(tdb, key, dbuf, 0, hash);
664         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
665         
666 failed:
667         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
668         SAFE_FREE(dbuf.dptr);
669         return ret;
670 }
671
672
673 /*
674   return the name of the current tdb file
675   useful for external logging functions
676 */
677 const char *tdb_name(struct tdb_context *tdb)
678 {
679         return tdb->name;
680 }
681
682 /*
683   return the underlying file descriptor being used by tdb, or -1
684   useful for external routines that want to check the device/inode
685   of the fd
686 */
687 int tdb_fd(struct tdb_context *tdb)
688 {
689         return tdb->fd;
690 }
691
692 /*
693   return the current logging function
694   useful for external tdb routines that wish to log tdb errors
695 */
696 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
697 {
698         return tdb->log.log_fn;
699 }
700
701
702 /*
703   get the tdb sequence number. Only makes sense if the writers opened
704   with TDB_SEQNUM set. Note that this sequence number will wrap quite
705   quickly, so it should only be used for a 'has something changed'
706   test, not for code that relies on the count of the number of changes
707   made. If you want a counter then use a tdb record.
708
709   The aim of this sequence number is to allow for a very lightweight
710   test of a possible tdb change.
711 */
712 int tdb_get_seqnum(struct tdb_context *tdb)
713 {
714         tdb_off_t seqnum=0;
715
716         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
717         tdb_trace_ret(tdb, "tdb_get_seqnum", seqnum);
718         return seqnum;
719 }
720
721 int tdb_hash_size(struct tdb_context *tdb)
722 {
723         return tdb->header.hash_size;
724 }
725
726 size_t tdb_map_size(struct tdb_context *tdb)
727 {
728         return tdb->map_size;
729 }
730
731 int tdb_get_flags(struct tdb_context *tdb)
732 {
733         return tdb->flags;
734 }
735
736 void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
737 {
738         tdb->flags |= flags;
739 }
740
741 void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
742 {
743         tdb->flags &= ~flags;
744 }
745
746
747 /*
748   enable sequence number handling on an open tdb
749 */
750 void tdb_enable_seqnum(struct tdb_context *tdb)
751 {
752         tdb->flags |= TDB_SEQNUM;
753 }
754
755
756 /*
757   add a region of the file to the freelist. Length is the size of the region in bytes, 
758   which includes the free list header that needs to be added
759  */
760 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
761 {
762         struct tdb_record rec;
763         if (length <= sizeof(rec)) {
764                 /* the region is not worth adding */
765                 return 0;
766         }
767         if (length + offset > tdb->map_size) {
768                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
769                 return -1;              
770         }
771         memset(&rec,'\0',sizeof(rec));
772         rec.rec_len = length - sizeof(rec);
773         if (tdb_free(tdb, offset, &rec) == -1) {
774                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
775                 return -1;
776         }
777         return 0;
778 }
779
780 /*
781   wipe the entire database, deleting all records. This can be done
782   very fast by using a global lock. The entire data portion of the
783   file becomes a single entry in the freelist.
784
785   This code carefully steps around the recovery area, leaving it alone
786  */
787 int tdb_wipe_all(struct tdb_context *tdb)
788 {
789         int i;
790         tdb_off_t offset = 0;
791         ssize_t data_len;
792         tdb_off_t recovery_head;
793         tdb_len_t recovery_size = 0;
794
795         if (tdb_lockall(tdb) != 0) {
796                 return -1;
797         }
798
799         tdb_trace(tdb, "tdb_wipe_all");
800
801         /* see if the tdb has a recovery area, and remember its size
802            if so. We don't want to lose this as otherwise each
803            tdb_wipe_all() in a transaction will increase the size of
804            the tdb by the size of the recovery area */
805         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
806                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
807                 goto failed;
808         }
809
810         if (recovery_head != 0) {
811                 struct tdb_record rec;
812                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
813                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
814                         return -1;
815                 }       
816                 recovery_size = rec.rec_len + sizeof(rec);
817         }
818
819         /* wipe the hashes */
820         for (i=0;i<tdb->header.hash_size;i++) {
821                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
822                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
823                         goto failed;
824                 }
825         }
826
827         /* wipe the freelist */
828         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
829                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
830                 goto failed;
831         }
832
833         /* add all the rest of the file to the freelist, possibly leaving a gap 
834            for the recovery area */
835         if (recovery_size == 0) {
836                 /* the simple case - the whole file can be used as a freelist */
837                 data_len = (tdb->map_size - TDB_DATA_START(tdb->header.hash_size));
838                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
839                         goto failed;
840                 }
841         } else {
842                 /* we need to add two freelist entries - one on either
843                    side of the recovery area 
844
845                    Note that we cannot shift the recovery area during
846                    this operation. Only the transaction.c code may
847                    move the recovery area or we risk subtle data
848                    corruption
849                 */
850                 data_len = (recovery_head - TDB_DATA_START(tdb->header.hash_size));
851                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
852                         goto failed;
853                 }
854                 /* and the 2nd free list entry after the recovery area - if any */
855                 data_len = tdb->map_size - (recovery_head+recovery_size);
856                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
857                         goto failed;
858                 }
859         }
860
861         if (tdb_unlockall(tdb) != 0) {
862                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
863                 goto failed;
864         }
865
866         return 0;
867
868 failed:
869         tdb_unlockall(tdb);
870         return -1;
871 }
872
873
874 struct traverse_state {
875         bool error;
876         struct tdb_context *dest_db;
877 };
878
879 /*
880   traverse function for repacking
881  */
882 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
883 {
884         struct traverse_state *state = (struct traverse_state *)private;
885         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
886                 state->error = true;
887                 return -1;
888         }
889         return 0;
890 }
891
892 /*
893   repack a tdb
894  */
895 int tdb_repack(struct tdb_context *tdb)
896 {
897         struct tdb_context *tmp_db;
898         struct traverse_state state;
899
900         tdb_trace(tdb, "tdb_repack");
901
902         if (tdb_transaction_start(tdb) != 0) {
903                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to start transaction\n"));
904                 return -1;
905         }
906
907         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
908         if (tmp_db == NULL) {
909                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to create tmp_db\n"));
910                 tdb_transaction_cancel(tdb);
911                 return -1;
912         }
913
914         state.error = false;
915         state.dest_db = tmp_db;
916
917         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
918                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to traverse copying out\n"));
919                 tdb_transaction_cancel(tdb);
920                 tdb_close(tmp_db);
921                 return -1;              
922         }
923
924         if (state.error) {
925                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Error during traversal\n"));
926                 tdb_transaction_cancel(tdb);
927                 tdb_close(tmp_db);
928                 return -1;
929         }
930
931         if (tdb_wipe_all(tdb) != 0) {
932                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to wipe database\n"));
933                 tdb_transaction_cancel(tdb);
934                 tdb_close(tmp_db);
935                 return -1;
936         }
937
938         state.error = false;
939         state.dest_db = tdb;
940
941         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
942                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to traverse copying back\n"));
943                 tdb_transaction_cancel(tdb);
944                 tdb_close(tmp_db);
945                 return -1;              
946         }
947
948         if (state.error) {
949                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Error during second traversal\n"));
950                 tdb_transaction_cancel(tdb);
951                 tdb_close(tmp_db);
952                 return -1;
953         }
954
955         tdb_close(tmp_db);
956
957         if (tdb_transaction_commit(tdb) != 0) {
958                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_repack: Failed to commit\n"));
959                 return -1;
960         }
961
962         return 0;
963 }
964
965 #ifdef TDB_TRACE
966 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
967 {
968         if (write(tdb->tracefd, str, strlen(str)) != strlen(str)) {
969                 close(tdb->tracefd);
970                 tdb->tracefd = -1;
971         }
972 }
973
974 static void tdb_trace_start(struct tdb_context *tdb)
975 {
976         tdb_off_t seqnum=0;
977         char msg[sizeof(tdb_off_t) * 4];
978
979         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
980         sprintf(msg, "%u ", seqnum);
981         tdb_trace_write(tdb, msg);
982 }
983
984 static void tdb_trace_end(struct tdb_context *tdb)
985 {
986         tdb_trace_write(tdb, "\n");
987 }
988
989 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
990 {
991         char msg[sizeof(ret) * 4];
992         sprintf(msg, " = %i\n", ret);
993         tdb_trace_write(tdb, msg);
994 }
995
996 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
997 {
998         char msg[20 + rec.dsize*2], *p;
999         unsigned int i;
1000
1001         /* We differentiate zero-length records from non-existent ones. */
1002         if (rec.dptr == NULL) {
1003                 tdb_trace_write(tdb, " NULL");
1004                 return;
1005         }
1006
1007         p = msg;
1008         p += sprintf(p, " %zu:", rec.dsize);
1009         for (i = 0; i < rec.dsize; i++)
1010                 p += sprintf(p, "%02x", rec.dptr[i]);
1011
1012         tdb_trace_write(tdb, msg);
1013 }
1014
1015 void tdb_trace(struct tdb_context *tdb, const char *op)
1016 {
1017         tdb_trace_start(tdb);
1018         tdb_trace_write(tdb, op);
1019         tdb_trace_end(tdb);
1020 }
1021
1022 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1023 {
1024         char msg[sizeof(tdb_off_t) * 4];
1025
1026         sprintf(msg, "%u ", seqnum);
1027         tdb_trace_write(tdb, msg);
1028         tdb_trace_write(tdb, op);
1029         tdb_trace_end(tdb);
1030 }
1031
1032 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1033                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1034 {
1035         char msg[128];
1036
1037         sprintf(msg, "%s %u %#x %#x", op, hash_size, tdb_flags, open_flags);
1038         tdb_trace_start(tdb);
1039         tdb_trace_write(tdb, msg);
1040         tdb_trace_end(tdb);
1041 }
1042
1043 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1044 {
1045         tdb_trace_start(tdb);
1046         tdb_trace_write(tdb, op);
1047         tdb_trace_end_ret(tdb, ret);
1048 }
1049
1050 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1051 {
1052         tdb_trace_start(tdb);
1053         tdb_trace_write(tdb, op);
1054         tdb_trace_write(tdb, " =");
1055         tdb_trace_record(tdb, ret);
1056         tdb_trace_end(tdb);
1057 }
1058
1059 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1060                     TDB_DATA rec)
1061 {
1062         tdb_trace_start(tdb);
1063         tdb_trace_write(tdb, op);
1064         tdb_trace_record(tdb, rec);
1065         tdb_trace_end(tdb);
1066 }
1067
1068 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1069                         TDB_DATA rec, int ret)
1070 {
1071         tdb_trace_start(tdb);
1072         tdb_trace_write(tdb, op);
1073         tdb_trace_record(tdb, rec);
1074         tdb_trace_end_ret(tdb, ret);
1075 }
1076
1077 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1078                            TDB_DATA rec, TDB_DATA ret)
1079 {
1080         tdb_trace_start(tdb);
1081         tdb_trace_write(tdb, op);
1082         tdb_trace_record(tdb, rec);
1083         tdb_trace_write(tdb, " =");
1084         tdb_trace_record(tdb, ret);
1085         tdb_trace_end(tdb);
1086 }
1087
1088 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1089                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1090                              int ret)
1091 {
1092         char msg[sizeof(ret) * 4];
1093
1094         sprintf(msg, " %#x", flag); 
1095         tdb_trace_start(tdb);
1096         tdb_trace_write(tdb, op);
1097         tdb_trace_record(tdb, rec1);
1098         tdb_trace_record(tdb, rec2);
1099         tdb_trace_write(tdb, msg);
1100         tdb_trace_end_ret(tdb, ret);
1101 }
1102
1103 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1104                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1105 {
1106         tdb_trace_start(tdb);
1107         tdb_trace_write(tdb, op);
1108         tdb_trace_record(tdb, rec1);
1109         tdb_trace_record(tdb, rec2);
1110         tdb_trace_write(tdb, " =");
1111         tdb_trace_record(tdb, ret);
1112         tdb_trace_end(tdb);
1113 }
1114 #endif